xopr.opr_access

Core data access interface for Open Polar Radar (OPR) datasets.

This module provides the primary interface for discovering, querying, and loading polar ice-penetrating radar data from the Open Polar Radar archive. It implements a STAC (SpatioTemporal Asset Catalog) based workflow for efficient data discovery and retrieval.

Primary Class

OPRConnection : Main interface for accessing OPR data Provides methods for querying STAC catalogs, loading radar frames, and retrieving layer picks from both file-based and database sources.

Typical Workflow
  1. Create an OPRConnection object, optionally with local caching
  2. Query the STAC catalog to find radar data matching your criteria
  3. Load radar frames as xarray Datasets
  4. Optionally load associated layer picks
  5. Process and visualize the data

See this notebook for an end-to-end example:

Examples
>>> import xopr
>>> # Connect to OPR with local caching
>>> opr = xopr.OPRConnection(cache_dir='radar_cache')
>>>
>>> # Query for data in a specific collection and segment
>>> stac_items = opr.query_frames(
...     collections=['2022_Antarctica_BaslerMKB'],
...     segment_paths=['20230109_01']
... )
>>>
>>> # Load the radar frames
>>> frames = opr.load_frames(stac_items)
>>>
>>> # Get layer picks (surface/bed)
>>> layers = opr.get_layers(frames[0])
See Also

xopr.geometry: Geographic utilities for region selection and projection
xopr.radar_util: Processing functions for radar data and layers
ops_api: Interface to the OPS database API

   1"""
   2Core data access interface for Open Polar Radar (OPR) datasets.
   3
   4This module provides the primary interface for discovering, querying, and loading
   5polar ice-penetrating radar data from the Open Polar Radar archive. It implements
   6a STAC (SpatioTemporal Asset Catalog) based workflow for efficient data discovery
   7and retrieval.
   8
   9Primary Class
  10-------------
  11OPRConnection : Main interface for accessing OPR data
  12    Provides methods for querying STAC catalogs, loading radar frames,
  13    and retrieving layer picks from both file-based and database sources.
  14
  15Typical Workflow
  16----------------
  171. Create an OPRConnection object, optionally with local caching
  182. Query the STAC catalog to find radar data matching your criteria
  193. Load radar frames as xarray Datasets
  204. Optionally load associated layer picks
  215. Process and visualize the data
  22
  23See this notebook for an end-to-end example:
  24- https://docs.englacial.org/xopr/demo-notebook/
  25
  26Examples
  27--------
  28>>> import xopr
  29>>> # Connect to OPR with local caching
  30>>> opr = xopr.OPRConnection(cache_dir='radar_cache')
  31>>>
  32>>> # Query for data in a specific collection and segment
  33>>> stac_items = opr.query_frames(
  34...     collections=['2022_Antarctica_BaslerMKB'],
  35...     segment_paths=['20230109_01']
  36... )
  37>>>
  38>>> # Load the radar frames
  39>>> frames = opr.load_frames(stac_items)
  40>>>
  41>>> # Get layer picks (surface/bed)
  42>>> layers = opr.get_layers(frames[0])
  43
  44See Also
  45--------
  46xopr.geometry : Geographic utilities for region selection and projection
  47xopr.radar_util : Processing functions for radar data and layers
  48ops_api : Interface to the OPS database API
  49"""
  50
  51import re
  52import warnings
  53from typing import Union
  54
  55import antimeridian
  56import fsspec
  57import geopandas as gpd
  58import h5py
  59import hdf5storage
  60import numpy as np
  61import pandas as pd
  62import scipy.io
  63import shapely
  64import xarray as xr
  65from rustac import DuckdbClient
  66
  67from . import opr_tools, ops_api
  68from .cf_units import apply_cf_compliant_attrs
  69from .matlab_attribute_utils import (
  70    decode_hdf5_matlab_variable,
  71    extract_legacy_mat_attributes,
  72)
  73from .stac_cache import get_opr_catalog_path, sync_opr_catalogs
  74
  75
  76class OPRConnection:
  77    def __init__(self,
  78                 collection_url: str = "https://data.cresis.ku.edu/data/",
  79                 cache_dir: str = None,
  80                 stac_parquet_href: str = None,
  81                 sync_catalogs: bool = True):
  82        """
  83        Initialize connection to OPR data archive.
  84
  85        Parameters
  86        ----------
  87        collection_url : str, optional
  88            Base URL for OPR data collection.
  89        cache_dir : str, optional
  90            Directory to cache downloaded data for faster repeated access.
  91        stac_parquet_href : str, optional
  92            Path or URL pattern to STAC catalog parquet files.  When *None*
  93            (default), the local cache is used if available, falling back to
  94            the S3 catalog.
  95        sync_catalogs : bool, optional
  96            If True (default), sync OPR STAC catalogs to a local cache
  97            before resolving the catalog path.  Uses ETag-based change
  98            detection so repeated calls are cheap (single HTTP request).
  99        """
 100        self.collection_url = collection_url
 101        self.cache_dir = cache_dir
 102        self._user_set_href = stac_parquet_href is not None
 103        if sync_catalogs and not self._user_set_href:
 104            sync_opr_catalogs()
 105        self.stac_parquet_href = stac_parquet_href or get_opr_catalog_path()
 106
 107        self.fsspec_cache_kwargs = {}
 108        self.fsspec_url_prefix = ''
 109        if cache_dir:
 110            self.fsspec_cache_kwargs['cache_storage'] = cache_dir
 111            self.fsspec_cache_kwargs['check_files'] = True
 112            self.fsspec_url_prefix = 'filecache::'
 113
 114        self.db_layers_metadata = None # Cache for OPS database layers metadata
 115
 116        # Lazy-initialized DuckDB client for STAC queries — reusing a single
 117        # session caches remote file metadata (parquet footers) across calls
 118        self._duckdb_client = None
 119
 120    @property
 121    def duckdb_client(self):
 122        """Lazy-initialized DuckDB client, recreated after pickling."""
 123        if self._duckdb_client is None:
 124            self._duckdb_client = DuckdbClient()
 125        return self._duckdb_client
 126
 127    def __getstate__(self):
 128        state = self.__dict__.copy()
 129        state['_duckdb_client'] = None  # DuckdbClient can't be pickled
 130        return state
 131
 132    def _open_file(self, url):
 133        """Helper method to open files with appropriate caching.
 134
 135        Local filesystem paths are returned as-is (no fsspec wrapping).
 136        Remote URLs (http, https, s3, gs) use filecache or simplecache.
 137        """
 138        if not url.startswith(('http://', 'https://', 's3://', 'gs://')):
 139            return url
 140        if self.fsspec_url_prefix:
 141            return fsspec.open_local(f"{self.fsspec_url_prefix}{url}", filecache=self.fsspec_cache_kwargs)
 142        else:
 143            return fsspec.open_local(f"simplecache::{url}", simplecache=self.fsspec_cache_kwargs)
 144
 145    def _extract_segment_info(self, segment):
 146        """Extract collection, segment_path, and frame from Dataset or dict."""
 147        if isinstance(segment, xr.Dataset):
 148            return (segment.attrs.get('collection'),
 149                    segment.attrs.get('segment_path'),
 150                    segment.attrs.get('frame'))
 151        else:
 152            return (segment['collection'],
 153                    f"{segment['properties'].get('opr:date')}_{segment['properties'].get('opr:segment'):02d}",
 154                    segment['properties'].get('opr:frame'))
 155
 156    def query_frames(self, collections: list[str] = None, segment_paths: list[str] = None,
 157                     geometry = None, date_range: tuple = None, properties: dict = {},
 158                     max_items: int = None, exclude_geometry: bool = False,
 159                     search_kwargs: dict = {}) -> gpd.GeoDataFrame:
 160        """
 161        Query STAC catalog for radar frames matching search criteria.
 162
 163        Multiple parameters are combined with AND logic. Lists of values passed to a
 164        single parameter are treated with OR logic (any value matches).
 165
 166        Parameters
 167        ----------
 168        collections : list[str] or str, optional
 169            Collection name(s) (e.g., "2022_Antarctica_BaslerMKB").
 170        segment_paths : list[str] or str, optional
 171            Segment path(s) in format "YYYYMMDD_NN" (e.g., "20230126_01").
 172        geometry : shapely geometry or dict, optional
 173            Geospatial geometry to filter by intersection.
 174        date_range : str, optional
 175            Date range in ISO format (e.g., "2021-01-01/2025-06-01").
 176        properties : dict, optional
 177            Additional STAC properties to filter by.
 178        max_items : int, optional
 179            Maximum number of items to return.
 180        exclude_geometry : bool, optional
 181            If True, exclude geometry field to reduce response size.
 182        search_kwargs : dict, optional
 183            Additional keyword arguments for STAC search.
 184
 185        Returns
 186        -------
 187        gpd.GeoDataFrame or None
 188            GeoDataFrame of matching STAC items with columns for collection,
 189            geometry, properties, and assets. Returns None if no matches found.
 190        """
 191
 192        search_params = {}
 193
 194        # Exclude geometry -- do not return the geometry field to reduce response size
 195        if exclude_geometry:
 196            search_params['exclude'] = ['geometry']
 197
 198        # Handle collections (seasons)
 199        if collections is not None:
 200            search_params['collections'] = [collections] if isinstance(collections, str) else collections
 201
 202        # Handle geometry filtering
 203        if geometry is not None:
 204            if hasattr(geometry, '__geo_interface__'):
 205                geometry = geometry.__geo_interface__
 206
 207            # Fix geometries that cross the antimeridian
 208            geometry = antimeridian.fix_geojson(geometry, reverse=True)
 209
 210            search_params['intersects'] = geometry
 211
 212        # Handle date range filtering
 213        if date_range is not None:
 214            search_params['datetime'] = date_range
 215
 216        # Handle max_items
 217        if max_items is not None:
 218            search_params['limit'] = max_items
 219
 220        # Handle segment_paths filtering using CQL2
 221        filter_conditions = []
 222
 223        if segment_paths is not None:
 224            segment_paths = [segment_paths] if isinstance(segment_paths, str) else segment_paths
 225
 226            # Create OR conditions for segment paths
 227            segment_conditions = []
 228            for segment_path in segment_paths:
 229                try:
 230                    date_str, segment_num_str = segment_path.split('_')
 231                    segment_num = int(segment_num_str)
 232
 233                    # Create AND condition for this specific segment
 234                    segment_condition = {
 235                        "op": "and",
 236                        "args": [
 237                            {
 238                                "op": "=",
 239                                "args": [{"property": "opr:date"}, date_str]
 240                            },
 241                            {
 242                                "op": "=",
 243                                "args": [{"property": "opr:segment"}, segment_num]
 244                            }
 245                        ]
 246                    }
 247                    segment_conditions.append(segment_condition)
 248                except ValueError:
 249                    print(f"Warning: Invalid segment_path format '{segment_path}'. Expected format: YYYYMMDD_NN")
 250                    continue
 251
 252            if segment_conditions:
 253                if len(segment_conditions) == 1:
 254                    filter_conditions.append(segment_conditions[0])
 255                else:
 256                    # Multiple segments - combine with OR
 257                    filter_conditions.append({
 258                        "op": "or",
 259                        "args": segment_conditions
 260                    })
 261
 262        # Add any additional property filters
 263        for key, value in properties.items():
 264            filter_conditions.append({
 265                "op": "=",
 266                "args": [{"property": key}, value]
 267            })
 268
 269        # Combine all filter conditions with AND
 270        if filter_conditions:
 271            if len(filter_conditions) == 1:
 272                filter_expr = filter_conditions[0]
 273            else:
 274                filter_expr = {
 275                    "op": "and",
 276                    "args": filter_conditions
 277                }
 278
 279            search_params['filter'] = filter_expr
 280
 281        # Add any extra kwargs to search
 282        search_params.update(search_kwargs)
 283
 284        #print(search_params) # TODO: Remove
 285
 286        # Perform the search
 287        # from rustac import DuckdbClient
 288        items = self.duckdb_client.search(self.stac_parquet_href, **search_params)
 289        if isinstance(items, dict):
 290            items = items['features']
 291
 292        if not items or len(items) == 0:
 293            warnings.warn("No items found matching the query criteria", UserWarning)
 294            return None
 295
 296        # Convert to GeoDataFrame
 297        items_df = gpd.GeoDataFrame(items)
 298        # Set index
 299        items_df = items_df.set_index(items_df['id'])
 300        items_df.index.name = 'stac_item_id'
 301        # Set the geometry column
 302        if 'geometry' in items_df.columns and not exclude_geometry:
 303            items_df = items_df.set_geometry(items_df['geometry'].apply(shapely.geometry.shape))
 304            items_df.crs = "EPSG:4326"
 305
 306        # Reorder the columns, leaving any extra columns at the end
 307        desired_order = ['collection', 'geometry', 'properties', 'assets']
 308        items_df = items_df[[col for col in desired_order if col in items_df.columns] + list(items_df.columns.difference(desired_order))]
 309
 310        return items_df
 311
 312    def load_frames(self, stac_items: gpd.GeoDataFrame,
 313                    data_product: str = "CSARP_standard",
 314                    image: Union[int, None] = None,
 315                    merge_flights: bool = False,
 316                    skip_errors: bool = False,
 317                    allow_unlisted_products: bool = False
 318                    ) -> Union[list[xr.Dataset], xr.Dataset]:
 319        """
 320        Load multiple radar frames from STAC items.
 321
 322        Parameters
 323        ----------
 324        stac_items : gpd.GeoDataFrame
 325            STAC items returned from query_frames.
 326        data_product : str, optional
 327            Data product to load (e.g., "CSARP_standard", "CSARP_qlook").
 328        image : int or None, optional
 329            The image number to load for each frame. If None (default), loads the
 330            combined image. If specified, `allow_unlisted_products` must be True.
 331        merge_flights : bool, optional
 332            If True, merge frames from the same segment into single Datasets.
 333        skip_errors : bool, optional
 334            If True, skip failed frames and continue loading others.
 335        allow_unlisted_products : bool, optional
 336            If True, attempt to load the specified data product even if it's not
 337            listed in the item's assets. See `load_frame` for details.
 338
 339        Returns
 340        -------
 341        list[xr.Dataset] or xr.Dataset
 342            List of radar frame Datasets, or list of merged Datasets if
 343            merge_flights=True.
 344        """
 345        frames = []
 346
 347        for idx, item in stac_items.iterrows():
 348            try:
 349                frame = self.load_frame(item, data_product, image=image, allow_unlisted_products=allow_unlisted_products)
 350                frames.append(frame)
 351            except Exception as e:
 352                print(f"Error loading frame for item {item.get('id', 'unknown')}: {e}")
 353                if skip_errors:
 354                    continue
 355                else:
 356                    raise e
 357
 358        if merge_flights:
 359            return opr_tools.merge_frames(frames)
 360        else:
 361            return frames
 362
 363    def load_frame(self, stac_item, data_product: str = "CSARP_standard",
 364                   image: Union[int, None] = None,
 365                   allow_unlisted_products: bool = False) -> xr.Dataset:
 366        """
 367        Load a single radar frame from a STAC item.
 368
 369        Parameters
 370        ----------
 371        stac_item : dict or pd.Series
 372            STAC item containing asset URLs.
 373        data_product : str, optional
 374            Data product to load (e.g., "CSARP_standard", "CSARP_qlook").
 375        image : int or None, optional
 376            The image number to load for this frame. If None (default), loads the
 377            combined image. If specified, `allow_unlisted_products` must be True.
 378        allow_unlisted_products : bool, optional
 379            If True, attempt to load the specified data product even if it's not
 380            listed in the item's assets.  This can be useful for loading non-standard
 381            products. If set to True and the data product is not found in the STAC
 382            item assets, the method will attempt to construct the URL based on any
 383            available CSARP_* asset. (If the frame is entirely unlisted, you can use
 384            `load_frame_url` instead.)
 385        Returns
 386        -------
 387        xr.Dataset
 388            Radar frame with coordinates slow_time and twtt, and data variables
 389            including Data, Latitude, Longitude, Elevation, Surface, etc.
 390        """
 391
 392        assets = stac_item['assets']
 393
 394        # Get the data asset
 395        data_asset = assets.get(data_product)
 396        if not data_asset:
 397            if not allow_unlisted_products:
 398                available_assets = list(assets.keys())
 399                raise ValueError(f"{data_product} asset not found in STAC item. Available assets: {available_assets}")
 400            else:
 401                # Find any available CSARP_* asset to use as a template for constructing the URL
 402                template_asset_name = None
 403                template_asset_url = None
 404                for asset_name, asset_info in assets.items():
 405                    if asset_name.startswith('CSARP_'):
 406                        template_asset_name = asset_name
 407                        template_asset_url = asset_info.get('href')
 408                        break
 409
 410                if not template_asset_url:
 411                    available_assets = list(assets.keys())
 412                    raise ValueError(f"No CSARP_* asset found in STAC item to use as a template for constructing URL for {data_product}. Available assets: {available_assets}")
 413
 414                url = template_asset_url.replace(template_asset_name, data_product)
 415        else:
 416            # The asset does exist in the STAC item, so just get the URL from the asset
 417            url = data_asset.get('href')
 418
 419            if not url:
 420                raise ValueError(f"No href found in {data_product} asset")
 421
 422        # If a specific image is requested, modify the URL to point to that image
 423        if image is not None:
 424            if not allow_unlisted_products:
 425                raise ValueError("Specifying an image number requires allow_unlisted_products=True to construct the URL")
 426            url = url.replace("Data_", f"Data_img_{image:02d}_")
 427
 428        # Load the frame using the existing method
 429        return self.load_frame_url(url)
 430
 431    def load_frame_url(self, url: str) -> xr.Dataset:
 432        """
 433        Load a radar frame directly from a URL.
 434
 435        Automatically detects and handles both HDF5 and legacy MATLAB formats.
 436
 437        Parameters
 438        ----------
 439        url : str
 440            URL to radar frame data file (.mat).
 441
 442        Returns
 443        -------
 444        xr.Dataset
 445            Radar frame with CF-compliant metadata and citation information.
 446        """
 447
 448        file = self._open_file(url)
 449
 450        filetype = None
 451        try:
 452            ds = self._load_frame_hdf5(file)
 453            filetype = 'hdf5'
 454        except OSError:
 455            ds = self._load_frame_matlab(file)
 456            filetype = 'matlab'
 457
 458        # Add the source URL as an attribute
 459        ds.attrs['source_url'] = url
 460
 461        # Apply CF-compliant attributes
 462        ds = apply_cf_compliant_attrs(ds)
 463
 464        # Get the season and segment from the URL
 465        match = re.search(r'(\d{4}_\w+_[A-Za-z0-9]+)\/([\w_]+)\/[\d_]+\/[\w]+(\d{8}_\d{2}_\d{3})', url)
 466        if match:
 467            collection, data_product, granule = match.groups()
 468            date, segment_id, frame_id = granule.split('_')
 469            ds.attrs['collection'] = collection
 470            ds.attrs['data_product'] = data_product
 471            ds.attrs['granule'] = granule
 472            ds.attrs['segment_path'] = f"{date}_{segment_id}"
 473            ds.attrs['date_str'] = date
 474            ds.attrs['segment'] = int(segment_id)
 475            ds.attrs['frame'] = int(frame_id)
 476
 477            # Load citation information
 478            result = ops_api.get_segment_metadata(segment_name=ds.attrs['segment_path'], season_name=collection)
 479            if result:
 480                if isinstance(result['data'], str):
 481                    warnings.warn(f"Warning: Unexpected result from ops_api: {result['data']}", UserWarning)
 482                else:
 483                    result_data = {}
 484                    for key, value in result['data'].items():
 485                        if len(value) == 1:
 486                            result_data[key] = value[0]
 487                        elif len(value) > 1:
 488                            result_data[key] = set(value)
 489
 490                    if 'dois' in result_data:
 491                        ds.attrs['doi'] = result_data['dois']
 492                    if 'rors' in result_data:
 493                        ds.attrs['ror'] = result_data['rors']
 494                    if 'funding_sources' in result_data:
 495                        ds.attrs['funder_text'] = result_data['funding_sources']
 496
 497        # Add the rest of the Matlab parameters
 498        if filetype == 'hdf5':
 499            ds.attrs['mimetype'] = 'application/x-hdf5'
 500            ds.attrs.update(decode_hdf5_matlab_variable(h5py.File(file, 'r'),
 501                                                        skip_variables=True,
 502                                                        skip_errors=True))
 503        elif filetype == 'matlab':
 504            ds.attrs['mimetype'] = 'application/x-matlab-data'
 505            ds.attrs.update(extract_legacy_mat_attributes(file,
 506                                                          skip_keys=ds.keys(),
 507                                                          skip_errors=True))
 508
 509        return ds
 510
 511    def _load_frame_hdf5(self, file) -> xr.Dataset:
 512        """
 513        Load a radar frame from an HDF5 file.
 514
 515        Parameters
 516        ----------
 517        file :
 518            The path to the HDF5 file containing radar frame data.
 519
 520        Returns
 521        -------
 522        xr.Dataset
 523            The loaded radar frame as an xarray Dataset.
 524        """
 525
 526        ds = xr.open_dataset(file, engine='h5netcdf', phony_dims='sort')
 527
 528        # Re-arrange variables to provide useful dimensions and coordinates
 529
 530        ds = ds.squeeze() # Drop the singleton dimensions matlab adds
 531
 532        ds = ds.rename({ # Label the dimensions with more useful names
 533            ds.Data.dims[0]: 'slow_time_idx',
 534            ds.Data.dims[1]: 'twtt_idx',
 535        })
 536
 537        # Make variables with no dimensions into scalar attributes
 538        for var in ds.data_vars:
 539            if ds[var].ndim == 0:
 540                ds.attrs[var] = ds[var].item()
 541                ds = ds.drop_vars(var)
 542
 543        # Make the file_type an attribute
 544        if 'file_type' in ds.data_vars:
 545            ds.attrs['file_type'] = ds['file_type'].to_numpy()
 546            ds = ds.drop_vars('file_type')
 547
 548        # Name the two time coordinates
 549        ds = ds.rename({'Time': 'twtt', 'GPS_time': 'slow_time'})
 550        ds = ds.set_coords(['slow_time', 'twtt'])
 551
 552        slow_time_1d = pd.to_datetime(ds['slow_time'].values, unit='s')
 553        ds = ds.assign_coords(slow_time=('slow_time_idx', slow_time_1d))
 554
 555        # Make twtt and slow_time the indexing coordinates
 556        ds = ds.swap_dims({'twtt_idx': 'twtt'})
 557        ds = ds.swap_dims({'slow_time_idx': 'slow_time'})
 558
 559        return ds
 560
 561    def _load_frame_matlab(self, file) -> xr.Dataset:
 562        """
 563        Load a radar frame from a MATLAB file.
 564
 565        Parameters
 566        ----------
 567        file :
 568            The path to the MATLAB file containing radar frame data.
 569
 570        Returns
 571        -------
 572        xr.Dataset
 573            The loaded radar frame as an xarray Dataset.
 574        """
 575
 576        m = scipy.io.loadmat(file, mat_dtype=False)
 577
 578        key_dims = {
 579            'Time': ('twtt',),
 580            'GPS_time': ('slow_time',),
 581            'Latitude': ('slow_time',),
 582            'Longitude': ('slow_time',),
 583            'Elevation': ('slow_time',),
 584            'Roll': ('slow_time',),
 585            'Pitch': ('slow_time',),
 586            'Heading': ('slow_time',),
 587            'Surface': ('slow_time',),
 588            'Data': ('twtt', 'slow_time')
 589        }
 590
 591        ds = xr.Dataset(
 592            {
 593                key: (dims, np.squeeze(m[key])) for key, dims in key_dims.items() if key in m
 594            },
 595            coords={
 596                'twtt': ('twtt', np.squeeze(m['Time'])),
 597                'slow_time': ('slow_time', pd.to_datetime(np.squeeze(m['GPS_time']), unit='s')),
 598            }
 599        )
 600
 601        return ds
 602
 603    def get_collections(self) -> list:
 604        """
 605        Get all available collections (seasons/campaigns) in STAC catalog.
 606
 607        Returns
 608        -------
 609        list[dict]
 610            List of collection dictionaries with id, description, and extent.
 611        """
 612
 613        return self.duckdb_client.get_collections(self.stac_parquet_href)
 614
 615    def get_segments(self, collection_id: str) -> list:
 616        """
 617        Get all segments (flights) within a collection.
 618
 619        Parameters
 620        ----------
 621        collection_id : str
 622            STAC collection ID (e.g., "2022_Antarctica_BaslerMKB").
 623
 624        Returns
 625        -------
 626        list[dict]
 627            List of segment dictionaries with segment_path, date, flight_number,
 628            and frame count.
 629        """
 630        # Query STAC API for all items in collection (exclude geometry for better performance)
 631        items = self.query_frames(collections=[collection_id], exclude_geometry=True)
 632
 633        if items is None or len(items) == 0:
 634            print(f"No items found in collection '{collection_id}'")
 635            return []
 636
 637        # Group items by segment (opr:date + opr:segment)
 638        segments = {}
 639        for idx, item in items.iterrows():
 640            properties = item['properties']
 641            date = properties['opr:date']
 642            flight_num = properties['opr:segment']
 643
 644            if date and flight_num is not None:
 645                segment_path = f"{date}_{flight_num:02d}"
 646
 647                if segment_path not in segments:
 648                    segments[segment_path] = {
 649                        'segment_path': segment_path,
 650                        'date': date,
 651                        'flight_number': flight_num,
 652                        'collection': collection_id,
 653                        'frames': [],
 654                        'item_count': 0
 655                    }
 656
 657                segments[segment_path]['frames'].append(properties.get('opr:frame'))
 658                segments[segment_path]['item_count'] += 1
 659
 660        # Sort segments by date and flight number
 661        segment_list = list(segments.values())
 662        segment_list.sort(key=lambda x: (x['date'], x['flight_number']))
 663
 664        return segment_list
 665
 666    def _get_layers_files(self, segment: Union[xr.Dataset, dict], raise_errors=True) -> dict:
 667        """
 668        Fetch layers from the CSARP_layers files
 669
 670        See https://gitlab.com/openpolarradar/opr/-/wikis/Layer-File-Guide for file formats
 671
 672        Parameters
 673        ----------
 674        segment : xr.Dataset or dict
 675            Radar frame Dataset or STAC item.
 676        raise_errors : bool, optional
 677            If True, raise errors when layers cannot be found.
 678
 679        Returns
 680        -------
 681        dict
 682            Dictionary mapping layer names (e.g., "standard:surface",
 683            "standard:bottom") to layer Datasets.
 684        """
 685        collection, segment_path, frame = self._extract_segment_info(segment)
 686
 687        # If we already have a STAC item, just use it. Otherwise, query to find the matching STAC items
 688        if isinstance(segment, xr.Dataset):
 689            properties = {}
 690            if frame:
 691                properties['opr:frame'] = frame
 692
 693            # Query STAC collection for CSARP_layer files matching this specific segment
 694
 695            # Get items from this specific segment
 696            stac_items = self.query_frames(collections=[collection], segment_paths=[segment_path], properties=properties)
 697
 698            # Filter for items that have CSARP_layer assets
 699            layer_items = []
 700            for idx, item in stac_items.iterrows():
 701                if 'CSARP_layer' in item['assets']:
 702                    layer_items.append(item)
 703        else:
 704            layer_items = [segment] if 'CSARP_layer' in segment.get('assets', {}) else []
 705
 706        if not layer_items:
 707            if raise_errors:
 708                raise ValueError(f"No CSARP_layer files found for segment path {segment_path} in collection {collection}")
 709            else:
 710                return {}
 711
 712        # Load each layer file and combine them
 713        layer_frames = []
 714        for item in layer_items:
 715            layer_asset = item['assets']['CSARP_layer']
 716            if layer_asset and 'href' in layer_asset:
 717                url = layer_asset['href']
 718                try:
 719                    layer_ds = self.load_layers_file(url)
 720                    layer_frames.append(layer_ds)
 721                except Exception as e:
 722                    raise e # TODO
 723                    print(f"Warning: Failed to load layer file {url}: {e}")
 724                    continue
 725
 726        if not layer_frames:
 727            if raise_errors:
 728                raise ValueError(f"No valid CSARP_layer files could be loaded for segment {segment_path} in collection {collection}")
 729            else:
 730                return {}
 731
 732        # Concatenate all layer frames along slow_time dimension
 733        layers_segment = xr.concat(layer_frames, dim='slow_time', combine_attrs='drop_conflicts', data_vars='minimal')
 734        layers_segment = layers_segment.sortby('slow_time')
 735
 736        # Trim to bounds of the original dataset
 737        layers_segment = self._trim_to_bounds(layers_segment, segment)
 738
 739        # Find the layer organization file to map layer IDs to names
 740        # Layer organization files are one per directory. For example, the layer file:
 741        # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/Data_20180105_03_003.mat
 742        # would have a layer organization file at:
 743        # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/layer_20180105_03.mat
 744
 745        layer_organization_file_url = re.sub(r'Data_(\d{8}_\d{2})_\d{3}.mat', r'layer_\1.mat', url)
 746        layer_organization_ds = self._load_layer_organization(self._open_file(layer_organization_file_url))
 747
 748        # Split into separate layers by ID
 749        layers = {}
 750
 751        layer_ids = np.unique(layers_segment['layer'])
 752
 753        for i, layer_id in enumerate(layer_ids):
 754            layer_id_int = int(layer_id)
 755            layer_name = str(layer_organization_ds['lyr_name'].sel(lyr_id=layer_id_int).values.item().squeeze())
 756            layer_group = str(layer_organization_ds['lyr_group_name'].sel(lyr_id=layer_id_int).values.item().squeeze())
 757            if layer_group == '[]':
 758                layer_group = ''
 759            layer_display_name = f"{layer_group}:{layer_name}"
 760
 761            layer_ds = layers_segment.sel(layer=layer_id).drop_vars('layer')
 762            # Only add non-empty layers with at least some non-NaN data
 763            if layer_ds.sizes.get('slow_time', 0) > 0:
 764                if not layer_ds.to_array().isnull().all():
 765                    layers[layer_display_name] = layer_ds
 766
 767        return layers
 768
 769    def _trim_to_bounds(self, ds: xr.Dataset, ref: Union[xr.Dataset, dict]) -> xr.Dataset:
 770        start_time, end_time = None, None
 771        if isinstance(ref, xr.Dataset) and 'slow_time' in ref.coords:
 772            start_time = ref['slow_time'].min()
 773            end_time = ref['slow_time'].max()
 774        else:
 775            properties = ref.get('properties', {})
 776            if 'start_datetime' in properties and 'end_datetime' in properties:
 777                start_time = pd.to_datetime(properties['start_datetime'])
 778                end_time = pd.to_datetime(properties['end_datetime'])
 779
 780        if start_time:
 781            return ds.sel(slow_time=slice(start_time, end_time))
 782        else:
 783            return ds
 784
 785    def load_layers_file(self, url: str) -> xr.Dataset:
 786        """
 787        Load layer data from CSARP_layer file.
 788
 789        Parameters
 790        ----------
 791        url : str
 792            URL or path to layer file.
 793
 794        Returns
 795        -------
 796        xr.Dataset
 797            Layer data with coordinates slow_time and layer, and data variables
 798            twtt, quality, type, lat, lon, and elev.
 799            Layer data in a standardized format with coordinates:
 800            - slow_time: GPS time converted to datetime
 801            And data variables:
 802            - twtt: Two-way travel time for each layer
 803            - quality: Quality values for each layer
 804            - type: Type values for each layer
 805            - lat, lon, elev: Geographic coordinates
 806            - id: Layer IDs
 807        """
 808
 809        file = self._open_file(url)
 810
 811        ds = self._load_layers(file)
 812
 813        # Add the source URL as an attribute
 814        ds.attrs['source_url'] = url
 815
 816        # Apply common manipulations to match the expected structure
 817        # Convert GPS time to datetime coordinate
 818        if 'gps_time' in ds.variables:
 819            slow_time_dt = pd.to_datetime(ds['gps_time'].values, unit='s')
 820            ds = ds.assign_coords(slow_time=('slow_time', slow_time_dt))
 821
 822            # Set slow_time as the main coordinate and remove gps_time from data_vars
 823            if 'slow_time' not in ds.dims:
 824                ds = ds.swap_dims({'gps_time': 'slow_time'})
 825
 826            # Remove gps_time from data_vars if it exists there to avoid conflicts
 827            if ('gps_time' in ds.data_vars) or ('gps_time' in ds.coords):
 828                ds = ds.drop_vars('gps_time')
 829
 830        # Sort by slow_time if it exists
 831        if 'slow_time' in ds.coords:
 832            ds = ds.sortby('slow_time')
 833
 834        return ds
 835
 836    def _load_layers(self, file) -> xr.Dataset:
 837        """
 838        Load layer data file using hdf5storage
 839
 840        Parameters:
 841        file : str
 842            Path to the layer file
 843        Returns: xr.Dataset
 844        """
 845
 846        d = hdf5storage.loadmat(file, appendmat=False)
 847
 848        # Ensure 'id' is at least 1D for the layer dimension
 849        id_data = np.atleast_1d(d['id'].squeeze())
 850
 851        # For 2D arrays with (layer, slow_time) dimensions, ensure they remain 2D
 852        # even when there's only one layer
 853        quality_data = np.atleast_2d(d['quality'].squeeze())
 854        if quality_data.shape[0] == 1 or quality_data.ndim == 1:
 855            quality_data = quality_data.reshape(len(id_data), -1)
 856
 857        twtt_data = np.atleast_2d(d['twtt'].squeeze())
 858        if twtt_data.shape[0] == 1 or twtt_data.ndim == 1:
 859            twtt_data = twtt_data.reshape(len(id_data), -1)
 860
 861        type_data = np.atleast_2d(d['type'].squeeze())
 862        if type_data.shape[0] == 1 or type_data.ndim == 1:
 863            type_data = type_data.reshape(len(id_data), -1)
 864
 865        ds = xr.Dataset({
 866            'file_type': ((), d['file_type'].squeeze()),
 867            'file_version': ((), d['file_version'].squeeze()),
 868            'elev': (('slow_time',), d['elev'].squeeze()),
 869            #'gps_source': ((), d['gps_source'].squeeze()),
 870            'gps_time': (('slow_time',), d['gps_time'].squeeze()),
 871            'id': (('layer',), id_data),
 872            'lat': (('slow_time',), d['lat'].squeeze()),
 873            'lon': (('slow_time',), d['lon'].squeeze()),
 874            'quality': (('layer', 'slow_time'), quality_data),
 875            'twtt': (('layer', 'slow_time'), twtt_data),
 876            'type': (('layer', 'slow_time'), type_data),
 877        })
 878
 879        ds = ds.assign_coords({'layer': ds['id'], 'slow_time': ds['gps_time']})
 880
 881        return ds
 882
 883    def _load_layer_organization(self, file) -> xr.Dataset:
 884        """
 885        Load a layer organization file
 886
 887        Parameters
 888        ----------
 889        file : str
 890            Path to the HDF5 layer organization file
 891
 892        Returns
 893        -------
 894        xr.Dataset
 895            Raw layer data from HDF5 file
 896        """
 897        d = hdf5storage.loadmat(file, appendmat=False)
 898        ds = xr.Dataset({
 899            'file_type': ((), d['file_type'].squeeze()),
 900            'file_version': ((), d['file_version'].squeeze()),
 901            'lyr_age': (('lyr_id',), np.atleast_1d(d['lyr_age'].squeeze())),
 902            'lyr_age_source': (('lyr_id',), np.atleast_1d(d['lyr_age_source'].squeeze())),
 903            'lyr_desc': (('lyr_id',), np.atleast_1d(d['lyr_desc'].squeeze())),
 904            'lyr_group_name': (('lyr_id',), np.atleast_1d(d['lyr_group_name'].squeeze())),
 905            'lyr_id': (('lyr_id',), np.atleast_1d(d['lyr_id'].squeeze())),
 906            'lyr_name': (('lyr_id',), np.atleast_1d(d['lyr_name'].squeeze())),
 907            'lyr_order': (('lyr_id',), np.atleast_1d(d['lyr_order'].squeeze())),
 908            }, attrs={'param': d['param']})
 909
 910        ds = ds.set_index(lyr_id='lyr_id')
 911
 912        return ds
 913
 914    def _get_layers_db(self, flight: Union[xr.Dataset, dict], include_geometry=True, raise_errors=True) -> dict:
 915        """
 916        Load layer picks from OPS database API.
 917
 918        Parameters
 919        ----------
 920        flight : xr.Dataset or dict
 921            Radar frame Dataset or STAC item.
 922        include_geometry : bool, optional
 923            If True, include geometry information in layers.
 924        raise_errors : bool, optional
 925            If True, raise errors when layers cannot be found.
 926
 927        Returns
 928        -------
 929        dict
 930            Dictionary mapping layer names to layer Datasets.
 931        """
 932
 933        collection, segment_path, _ = self._extract_segment_info(flight)
 934
 935        if 'Antarctica' in collection:
 936            location = 'antarctic'
 937        elif 'Greenland' in collection:
 938            location = 'arctic'
 939        else:
 940            raise ValueError("Dataset does not belong to a recognized location (Antarctica or Greenland).")
 941
 942        layer_points = ops_api.get_layer_points(
 943            segment_name=segment_path,
 944            season_name=collection,
 945            location=location,
 946            include_geometry=include_geometry
 947        )
 948
 949        if layer_points['status'] != 1:
 950            if raise_errors:
 951                raise ValueError(f"Failed to fetch layer points. Received response with status {layer_points['status']}.")
 952            else:
 953                return {}
 954
 955        layer_ds_raw = xr.Dataset(
 956            {k: (['gps_time'], v) for k, v in layer_points['data'].items() if k != 'gps_time'},
 957            coords={'gps_time': layer_points['data']['gps_time']}
 958        )
 959        # Split into a dictionary of layers based on lyr_id
 960        layer_ids = set(layer_ds_raw['lyr_id'].to_numpy())
 961        layer_ids = [int(layer_id) for layer_id in layer_ids if not np.isnan(layer_id)]
 962
 963        # Get the mapping of layer IDs to names
 964        if self.db_layers_metadata is None:
 965            layer_metadata = ops_api.get_layer_metadata()
 966            if layer_metadata['status'] == 1:
 967                df = pd.DataFrame(layer_metadata['data'])
 968                df['display_name'] = df['lyr_group_name'] + ":" + df['lyr_name']
 969                self.db_layers_metadata = df
 970            else:
 971                raise ValueError("Failed to fetch layer metadata from OPS API.")
 972
 973        layers = {}
 974        for layer_id in layer_ids:
 975            layer_name = self.db_layers_metadata.loc[self.db_layers_metadata['lyr_id'] == layer_id, 'display_name'].values[0]
 976
 977            layer_ds = layer_ds_raw.where(layer_ds_raw['lyr_id'] == layer_id, drop=True)
 978
 979            layer_ds = layer_ds.sortby('gps_time')
 980
 981            layer_ds = layer_ds.rename({'gps_time': 'slow_time'})
 982            layer_ds = layer_ds.set_coords(['slow_time'])
 983
 984            layer_ds['slow_time'] = pd.to_datetime(layer_ds['slow_time'].values, unit='s')
 985
 986            # Filter to the same time range as flight
 987            layer_ds = self._trim_to_bounds(layer_ds, flight)
 988
 989            # Only add non-empty layers with at least some non-NaN data
 990            if layer_ds.sizes.get('slow_time', 0) > 0:
 991                if not layer_ds.to_array().isnull().all():
 992                    layers[layer_name] = layer_ds
 993
 994        return layers
 995
 996    def get_layers(self, ds : Union[xr.Dataset, dict], source: str = 'auto', include_geometry=True, errors='warn') -> dict:
 997        """
 998        Load layer picks from files or database.
 999
1000        Tries files first, falls back to database if source='auto'.
1001
1002        Parameters
1003        ----------
1004        ds : Union[xr.Dataset, dict]
1005            Radar frame Dataset or STAC item.
1006        source : {'auto', 'files', 'db'}, optional
1007            Source for layers. 'auto' tries files then falls back to database.
1008        include_geometry : bool, optional
1009            If True, include geometry information when fetching from the API.
1010        errors : str, optional
1011            How to handle missing layer data: 'warn' (default) returns None with a warning,
1012            'error' raises an exception.
1013
1014        Returns
1015        -------
1016        dict or None
1017            A dictionary mapping layer IDs to their corresponding data, or None if no layers
1018            found and errors='warn'.
1019        """
1020        collection, segment_path, _ = self._extract_segment_info(ds)
1021
1022        if source == 'auto':
1023            # Try to get layers from files first
1024            try:
1025                layers = self._get_layers_files(ds, raise_errors=True)
1026                return layers
1027            except Exception:
1028                # Fallback to API if no layers found in files
1029                try:
1030                    return self._get_layers_db(ds, include_geometry=include_geometry, raise_errors=True)
1031                except Exception as e:
1032                    if errors == 'error':
1033                        raise
1034                    else:
1035                        warnings.warn(f"No layer data found for {collection}/{segment_path}: {e}")
1036                        return None
1037        elif source == 'files':
1038            try:
1039                return self._get_layers_files(ds, raise_errors=True)
1040            except Exception as e:
1041                if errors == 'error':
1042                    raise
1043                else:
1044                    warnings.warn(f"No layer files found for {collection}/{segment_path}: {e}")
1045                    return None
1046        elif source == 'db':
1047            try:
1048                return self._get_layers_db(ds, include_geometry=include_geometry, raise_errors=True)
1049            except Exception as e:
1050                if errors == 'error':
1051                    raise
1052                else:
1053                    warnings.warn(f"No layer data in DB for {collection}/{segment_path}: {e}")
1054                    return None
1055        else:
1056            raise ValueError("Invalid source specified. Must be one of: 'auto', 'files', 'db'.")
class OPRConnection:
  77class OPRConnection:
  78    def __init__(self,
  79                 collection_url: str = "https://data.cresis.ku.edu/data/",
  80                 cache_dir: str = None,
  81                 stac_parquet_href: str = None,
  82                 sync_catalogs: bool = True):
  83        """
  84        Initialize connection to OPR data archive.
  85
  86        Parameters
  87        ----------
  88        collection_url : str, optional
  89            Base URL for OPR data collection.
  90        cache_dir : str, optional
  91            Directory to cache downloaded data for faster repeated access.
  92        stac_parquet_href : str, optional
  93            Path or URL pattern to STAC catalog parquet files.  When *None*
  94            (default), the local cache is used if available, falling back to
  95            the S3 catalog.
  96        sync_catalogs : bool, optional
  97            If True (default), sync OPR STAC catalogs to a local cache
  98            before resolving the catalog path.  Uses ETag-based change
  99            detection so repeated calls are cheap (single HTTP request).
 100        """
 101        self.collection_url = collection_url
 102        self.cache_dir = cache_dir
 103        self._user_set_href = stac_parquet_href is not None
 104        if sync_catalogs and not self._user_set_href:
 105            sync_opr_catalogs()
 106        self.stac_parquet_href = stac_parquet_href or get_opr_catalog_path()
 107
 108        self.fsspec_cache_kwargs = {}
 109        self.fsspec_url_prefix = ''
 110        if cache_dir:
 111            self.fsspec_cache_kwargs['cache_storage'] = cache_dir
 112            self.fsspec_cache_kwargs['check_files'] = True
 113            self.fsspec_url_prefix = 'filecache::'
 114
 115        self.db_layers_metadata = None # Cache for OPS database layers metadata
 116
 117        # Lazy-initialized DuckDB client for STAC queries — reusing a single
 118        # session caches remote file metadata (parquet footers) across calls
 119        self._duckdb_client = None
 120
 121    @property
 122    def duckdb_client(self):
 123        """Lazy-initialized DuckDB client, recreated after pickling."""
 124        if self._duckdb_client is None:
 125            self._duckdb_client = DuckdbClient()
 126        return self._duckdb_client
 127
 128    def __getstate__(self):
 129        state = self.__dict__.copy()
 130        state['_duckdb_client'] = None  # DuckdbClient can't be pickled
 131        return state
 132
 133    def _open_file(self, url):
 134        """Helper method to open files with appropriate caching.
 135
 136        Local filesystem paths are returned as-is (no fsspec wrapping).
 137        Remote URLs (http, https, s3, gs) use filecache or simplecache.
 138        """
 139        if not url.startswith(('http://', 'https://', 's3://', 'gs://')):
 140            return url
 141        if self.fsspec_url_prefix:
 142            return fsspec.open_local(f"{self.fsspec_url_prefix}{url}", filecache=self.fsspec_cache_kwargs)
 143        else:
 144            return fsspec.open_local(f"simplecache::{url}", simplecache=self.fsspec_cache_kwargs)
 145
 146    def _extract_segment_info(self, segment):
 147        """Extract collection, segment_path, and frame from Dataset or dict."""
 148        if isinstance(segment, xr.Dataset):
 149            return (segment.attrs.get('collection'),
 150                    segment.attrs.get('segment_path'),
 151                    segment.attrs.get('frame'))
 152        else:
 153            return (segment['collection'],
 154                    f"{segment['properties'].get('opr:date')}_{segment['properties'].get('opr:segment'):02d}",
 155                    segment['properties'].get('opr:frame'))
 156
 157    def query_frames(self, collections: list[str] = None, segment_paths: list[str] = None,
 158                     geometry = None, date_range: tuple = None, properties: dict = {},
 159                     max_items: int = None, exclude_geometry: bool = False,
 160                     search_kwargs: dict = {}) -> gpd.GeoDataFrame:
 161        """
 162        Query STAC catalog for radar frames matching search criteria.
 163
 164        Multiple parameters are combined with AND logic. Lists of values passed to a
 165        single parameter are treated with OR logic (any value matches).
 166
 167        Parameters
 168        ----------
 169        collections : list[str] or str, optional
 170            Collection name(s) (e.g., "2022_Antarctica_BaslerMKB").
 171        segment_paths : list[str] or str, optional
 172            Segment path(s) in format "YYYYMMDD_NN" (e.g., "20230126_01").
 173        geometry : shapely geometry or dict, optional
 174            Geospatial geometry to filter by intersection.
 175        date_range : str, optional
 176            Date range in ISO format (e.g., "2021-01-01/2025-06-01").
 177        properties : dict, optional
 178            Additional STAC properties to filter by.
 179        max_items : int, optional
 180            Maximum number of items to return.
 181        exclude_geometry : bool, optional
 182            If True, exclude geometry field to reduce response size.
 183        search_kwargs : dict, optional
 184            Additional keyword arguments for STAC search.
 185
 186        Returns
 187        -------
 188        gpd.GeoDataFrame or None
 189            GeoDataFrame of matching STAC items with columns for collection,
 190            geometry, properties, and assets. Returns None if no matches found.
 191        """
 192
 193        search_params = {}
 194
 195        # Exclude geometry -- do not return the geometry field to reduce response size
 196        if exclude_geometry:
 197            search_params['exclude'] = ['geometry']
 198
 199        # Handle collections (seasons)
 200        if collections is not None:
 201            search_params['collections'] = [collections] if isinstance(collections, str) else collections
 202
 203        # Handle geometry filtering
 204        if geometry is not None:
 205            if hasattr(geometry, '__geo_interface__'):
 206                geometry = geometry.__geo_interface__
 207
 208            # Fix geometries that cross the antimeridian
 209            geometry = antimeridian.fix_geojson(geometry, reverse=True)
 210
 211            search_params['intersects'] = geometry
 212
 213        # Handle date range filtering
 214        if date_range is not None:
 215            search_params['datetime'] = date_range
 216
 217        # Handle max_items
 218        if max_items is not None:
 219            search_params['limit'] = max_items
 220
 221        # Handle segment_paths filtering using CQL2
 222        filter_conditions = []
 223
 224        if segment_paths is not None:
 225            segment_paths = [segment_paths] if isinstance(segment_paths, str) else segment_paths
 226
 227            # Create OR conditions for segment paths
 228            segment_conditions = []
 229            for segment_path in segment_paths:
 230                try:
 231                    date_str, segment_num_str = segment_path.split('_')
 232                    segment_num = int(segment_num_str)
 233
 234                    # Create AND condition for this specific segment
 235                    segment_condition = {
 236                        "op": "and",
 237                        "args": [
 238                            {
 239                                "op": "=",
 240                                "args": [{"property": "opr:date"}, date_str]
 241                            },
 242                            {
 243                                "op": "=",
 244                                "args": [{"property": "opr:segment"}, segment_num]
 245                            }
 246                        ]
 247                    }
 248                    segment_conditions.append(segment_condition)
 249                except ValueError:
 250                    print(f"Warning: Invalid segment_path format '{segment_path}'. Expected format: YYYYMMDD_NN")
 251                    continue
 252
 253            if segment_conditions:
 254                if len(segment_conditions) == 1:
 255                    filter_conditions.append(segment_conditions[0])
 256                else:
 257                    # Multiple segments - combine with OR
 258                    filter_conditions.append({
 259                        "op": "or",
 260                        "args": segment_conditions
 261                    })
 262
 263        # Add any additional property filters
 264        for key, value in properties.items():
 265            filter_conditions.append({
 266                "op": "=",
 267                "args": [{"property": key}, value]
 268            })
 269
 270        # Combine all filter conditions with AND
 271        if filter_conditions:
 272            if len(filter_conditions) == 1:
 273                filter_expr = filter_conditions[0]
 274            else:
 275                filter_expr = {
 276                    "op": "and",
 277                    "args": filter_conditions
 278                }
 279
 280            search_params['filter'] = filter_expr
 281
 282        # Add any extra kwargs to search
 283        search_params.update(search_kwargs)
 284
 285        #print(search_params) # TODO: Remove
 286
 287        # Perform the search
 288        # from rustac import DuckdbClient
 289        items = self.duckdb_client.search(self.stac_parquet_href, **search_params)
 290        if isinstance(items, dict):
 291            items = items['features']
 292
 293        if not items or len(items) == 0:
 294            warnings.warn("No items found matching the query criteria", UserWarning)
 295            return None
 296
 297        # Convert to GeoDataFrame
 298        items_df = gpd.GeoDataFrame(items)
 299        # Set index
 300        items_df = items_df.set_index(items_df['id'])
 301        items_df.index.name = 'stac_item_id'
 302        # Set the geometry column
 303        if 'geometry' in items_df.columns and not exclude_geometry:
 304            items_df = items_df.set_geometry(items_df['geometry'].apply(shapely.geometry.shape))
 305            items_df.crs = "EPSG:4326"
 306
 307        # Reorder the columns, leaving any extra columns at the end
 308        desired_order = ['collection', 'geometry', 'properties', 'assets']
 309        items_df = items_df[[col for col in desired_order if col in items_df.columns] + list(items_df.columns.difference(desired_order))]
 310
 311        return items_df
 312
 313    def load_frames(self, stac_items: gpd.GeoDataFrame,
 314                    data_product: str = "CSARP_standard",
 315                    image: Union[int, None] = None,
 316                    merge_flights: bool = False,
 317                    skip_errors: bool = False,
 318                    allow_unlisted_products: bool = False
 319                    ) -> Union[list[xr.Dataset], xr.Dataset]:
 320        """
 321        Load multiple radar frames from STAC items.
 322
 323        Parameters
 324        ----------
 325        stac_items : gpd.GeoDataFrame
 326            STAC items returned from query_frames.
 327        data_product : str, optional
 328            Data product to load (e.g., "CSARP_standard", "CSARP_qlook").
 329        image : int or None, optional
 330            The image number to load for each frame. If None (default), loads the
 331            combined image. If specified, `allow_unlisted_products` must be True.
 332        merge_flights : bool, optional
 333            If True, merge frames from the same segment into single Datasets.
 334        skip_errors : bool, optional
 335            If True, skip failed frames and continue loading others.
 336        allow_unlisted_products : bool, optional
 337            If True, attempt to load the specified data product even if it's not
 338            listed in the item's assets. See `load_frame` for details.
 339
 340        Returns
 341        -------
 342        list[xr.Dataset] or xr.Dataset
 343            List of radar frame Datasets, or list of merged Datasets if
 344            merge_flights=True.
 345        """
 346        frames = []
 347
 348        for idx, item in stac_items.iterrows():
 349            try:
 350                frame = self.load_frame(item, data_product, image=image, allow_unlisted_products=allow_unlisted_products)
 351                frames.append(frame)
 352            except Exception as e:
 353                print(f"Error loading frame for item {item.get('id', 'unknown')}: {e}")
 354                if skip_errors:
 355                    continue
 356                else:
 357                    raise e
 358
 359        if merge_flights:
 360            return opr_tools.merge_frames(frames)
 361        else:
 362            return frames
 363
 364    def load_frame(self, stac_item, data_product: str = "CSARP_standard",
 365                   image: Union[int, None] = None,
 366                   allow_unlisted_products: bool = False) -> xr.Dataset:
 367        """
 368        Load a single radar frame from a STAC item.
 369
 370        Parameters
 371        ----------
 372        stac_item : dict or pd.Series
 373            STAC item containing asset URLs.
 374        data_product : str, optional
 375            Data product to load (e.g., "CSARP_standard", "CSARP_qlook").
 376        image : int or None, optional
 377            The image number to load for this frame. If None (default), loads the
 378            combined image. If specified, `allow_unlisted_products` must be True.
 379        allow_unlisted_products : bool, optional
 380            If True, attempt to load the specified data product even if it's not
 381            listed in the item's assets.  This can be useful for loading non-standard
 382            products. If set to True and the data product is not found in the STAC
 383            item assets, the method will attempt to construct the URL based on any
 384            available CSARP_* asset. (If the frame is entirely unlisted, you can use
 385            `load_frame_url` instead.)
 386        Returns
 387        -------
 388        xr.Dataset
 389            Radar frame with coordinates slow_time and twtt, and data variables
 390            including Data, Latitude, Longitude, Elevation, Surface, etc.
 391        """
 392
 393        assets = stac_item['assets']
 394
 395        # Get the data asset
 396        data_asset = assets.get(data_product)
 397        if not data_asset:
 398            if not allow_unlisted_products:
 399                available_assets = list(assets.keys())
 400                raise ValueError(f"{data_product} asset not found in STAC item. Available assets: {available_assets}")
 401            else:
 402                # Find any available CSARP_* asset to use as a template for constructing the URL
 403                template_asset_name = None
 404                template_asset_url = None
 405                for asset_name, asset_info in assets.items():
 406                    if asset_name.startswith('CSARP_'):
 407                        template_asset_name = asset_name
 408                        template_asset_url = asset_info.get('href')
 409                        break
 410
 411                if not template_asset_url:
 412                    available_assets = list(assets.keys())
 413                    raise ValueError(f"No CSARP_* asset found in STAC item to use as a template for constructing URL for {data_product}. Available assets: {available_assets}")
 414
 415                url = template_asset_url.replace(template_asset_name, data_product)
 416        else:
 417            # The asset does exist in the STAC item, so just get the URL from the asset
 418            url = data_asset.get('href')
 419
 420            if not url:
 421                raise ValueError(f"No href found in {data_product} asset")
 422
 423        # If a specific image is requested, modify the URL to point to that image
 424        if image is not None:
 425            if not allow_unlisted_products:
 426                raise ValueError("Specifying an image number requires allow_unlisted_products=True to construct the URL")
 427            url = url.replace("Data_", f"Data_img_{image:02d}_")
 428
 429        # Load the frame using the existing method
 430        return self.load_frame_url(url)
 431
 432    def load_frame_url(self, url: str) -> xr.Dataset:
 433        """
 434        Load a radar frame directly from a URL.
 435
 436        Automatically detects and handles both HDF5 and legacy MATLAB formats.
 437
 438        Parameters
 439        ----------
 440        url : str
 441            URL to radar frame data file (.mat).
 442
 443        Returns
 444        -------
 445        xr.Dataset
 446            Radar frame with CF-compliant metadata and citation information.
 447        """
 448
 449        file = self._open_file(url)
 450
 451        filetype = None
 452        try:
 453            ds = self._load_frame_hdf5(file)
 454            filetype = 'hdf5'
 455        except OSError:
 456            ds = self._load_frame_matlab(file)
 457            filetype = 'matlab'
 458
 459        # Add the source URL as an attribute
 460        ds.attrs['source_url'] = url
 461
 462        # Apply CF-compliant attributes
 463        ds = apply_cf_compliant_attrs(ds)
 464
 465        # Get the season and segment from the URL
 466        match = re.search(r'(\d{4}_\w+_[A-Za-z0-9]+)\/([\w_]+)\/[\d_]+\/[\w]+(\d{8}_\d{2}_\d{3})', url)
 467        if match:
 468            collection, data_product, granule = match.groups()
 469            date, segment_id, frame_id = granule.split('_')
 470            ds.attrs['collection'] = collection
 471            ds.attrs['data_product'] = data_product
 472            ds.attrs['granule'] = granule
 473            ds.attrs['segment_path'] = f"{date}_{segment_id}"
 474            ds.attrs['date_str'] = date
 475            ds.attrs['segment'] = int(segment_id)
 476            ds.attrs['frame'] = int(frame_id)
 477
 478            # Load citation information
 479            result = ops_api.get_segment_metadata(segment_name=ds.attrs['segment_path'], season_name=collection)
 480            if result:
 481                if isinstance(result['data'], str):
 482                    warnings.warn(f"Warning: Unexpected result from ops_api: {result['data']}", UserWarning)
 483                else:
 484                    result_data = {}
 485                    for key, value in result['data'].items():
 486                        if len(value) == 1:
 487                            result_data[key] = value[0]
 488                        elif len(value) > 1:
 489                            result_data[key] = set(value)
 490
 491                    if 'dois' in result_data:
 492                        ds.attrs['doi'] = result_data['dois']
 493                    if 'rors' in result_data:
 494                        ds.attrs['ror'] = result_data['rors']
 495                    if 'funding_sources' in result_data:
 496                        ds.attrs['funder_text'] = result_data['funding_sources']
 497
 498        # Add the rest of the Matlab parameters
 499        if filetype == 'hdf5':
 500            ds.attrs['mimetype'] = 'application/x-hdf5'
 501            ds.attrs.update(decode_hdf5_matlab_variable(h5py.File(file, 'r'),
 502                                                        skip_variables=True,
 503                                                        skip_errors=True))
 504        elif filetype == 'matlab':
 505            ds.attrs['mimetype'] = 'application/x-matlab-data'
 506            ds.attrs.update(extract_legacy_mat_attributes(file,
 507                                                          skip_keys=ds.keys(),
 508                                                          skip_errors=True))
 509
 510        return ds
 511
 512    def _load_frame_hdf5(self, file) -> xr.Dataset:
 513        """
 514        Load a radar frame from an HDF5 file.
 515
 516        Parameters
 517        ----------
 518        file :
 519            The path to the HDF5 file containing radar frame data.
 520
 521        Returns
 522        -------
 523        xr.Dataset
 524            The loaded radar frame as an xarray Dataset.
 525        """
 526
 527        ds = xr.open_dataset(file, engine='h5netcdf', phony_dims='sort')
 528
 529        # Re-arrange variables to provide useful dimensions and coordinates
 530
 531        ds = ds.squeeze() # Drop the singleton dimensions matlab adds
 532
 533        ds = ds.rename({ # Label the dimensions with more useful names
 534            ds.Data.dims[0]: 'slow_time_idx',
 535            ds.Data.dims[1]: 'twtt_idx',
 536        })
 537
 538        # Make variables with no dimensions into scalar attributes
 539        for var in ds.data_vars:
 540            if ds[var].ndim == 0:
 541                ds.attrs[var] = ds[var].item()
 542                ds = ds.drop_vars(var)
 543
 544        # Make the file_type an attribute
 545        if 'file_type' in ds.data_vars:
 546            ds.attrs['file_type'] = ds['file_type'].to_numpy()
 547            ds = ds.drop_vars('file_type')
 548
 549        # Name the two time coordinates
 550        ds = ds.rename({'Time': 'twtt', 'GPS_time': 'slow_time'})
 551        ds = ds.set_coords(['slow_time', 'twtt'])
 552
 553        slow_time_1d = pd.to_datetime(ds['slow_time'].values, unit='s')
 554        ds = ds.assign_coords(slow_time=('slow_time_idx', slow_time_1d))
 555
 556        # Make twtt and slow_time the indexing coordinates
 557        ds = ds.swap_dims({'twtt_idx': 'twtt'})
 558        ds = ds.swap_dims({'slow_time_idx': 'slow_time'})
 559
 560        return ds
 561
 562    def _load_frame_matlab(self, file) -> xr.Dataset:
 563        """
 564        Load a radar frame from a MATLAB file.
 565
 566        Parameters
 567        ----------
 568        file :
 569            The path to the MATLAB file containing radar frame data.
 570
 571        Returns
 572        -------
 573        xr.Dataset
 574            The loaded radar frame as an xarray Dataset.
 575        """
 576
 577        m = scipy.io.loadmat(file, mat_dtype=False)
 578
 579        key_dims = {
 580            'Time': ('twtt',),
 581            'GPS_time': ('slow_time',),
 582            'Latitude': ('slow_time',),
 583            'Longitude': ('slow_time',),
 584            'Elevation': ('slow_time',),
 585            'Roll': ('slow_time',),
 586            'Pitch': ('slow_time',),
 587            'Heading': ('slow_time',),
 588            'Surface': ('slow_time',),
 589            'Data': ('twtt', 'slow_time')
 590        }
 591
 592        ds = xr.Dataset(
 593            {
 594                key: (dims, np.squeeze(m[key])) for key, dims in key_dims.items() if key in m
 595            },
 596            coords={
 597                'twtt': ('twtt', np.squeeze(m['Time'])),
 598                'slow_time': ('slow_time', pd.to_datetime(np.squeeze(m['GPS_time']), unit='s')),
 599            }
 600        )
 601
 602        return ds
 603
 604    def get_collections(self) -> list:
 605        """
 606        Get all available collections (seasons/campaigns) in STAC catalog.
 607
 608        Returns
 609        -------
 610        list[dict]
 611            List of collection dictionaries with id, description, and extent.
 612        """
 613
 614        return self.duckdb_client.get_collections(self.stac_parquet_href)
 615
 616    def get_segments(self, collection_id: str) -> list:
 617        """
 618        Get all segments (flights) within a collection.
 619
 620        Parameters
 621        ----------
 622        collection_id : str
 623            STAC collection ID (e.g., "2022_Antarctica_BaslerMKB").
 624
 625        Returns
 626        -------
 627        list[dict]
 628            List of segment dictionaries with segment_path, date, flight_number,
 629            and frame count.
 630        """
 631        # Query STAC API for all items in collection (exclude geometry for better performance)
 632        items = self.query_frames(collections=[collection_id], exclude_geometry=True)
 633
 634        if items is None or len(items) == 0:
 635            print(f"No items found in collection '{collection_id}'")
 636            return []
 637
 638        # Group items by segment (opr:date + opr:segment)
 639        segments = {}
 640        for idx, item in items.iterrows():
 641            properties = item['properties']
 642            date = properties['opr:date']
 643            flight_num = properties['opr:segment']
 644
 645            if date and flight_num is not None:
 646                segment_path = f"{date}_{flight_num:02d}"
 647
 648                if segment_path not in segments:
 649                    segments[segment_path] = {
 650                        'segment_path': segment_path,
 651                        'date': date,
 652                        'flight_number': flight_num,
 653                        'collection': collection_id,
 654                        'frames': [],
 655                        'item_count': 0
 656                    }
 657
 658                segments[segment_path]['frames'].append(properties.get('opr:frame'))
 659                segments[segment_path]['item_count'] += 1
 660
 661        # Sort segments by date and flight number
 662        segment_list = list(segments.values())
 663        segment_list.sort(key=lambda x: (x['date'], x['flight_number']))
 664
 665        return segment_list
 666
 667    def _get_layers_files(self, segment: Union[xr.Dataset, dict], raise_errors=True) -> dict:
 668        """
 669        Fetch layers from the CSARP_layers files
 670
 671        See https://gitlab.com/openpolarradar/opr/-/wikis/Layer-File-Guide for file formats
 672
 673        Parameters
 674        ----------
 675        segment : xr.Dataset or dict
 676            Radar frame Dataset or STAC item.
 677        raise_errors : bool, optional
 678            If True, raise errors when layers cannot be found.
 679
 680        Returns
 681        -------
 682        dict
 683            Dictionary mapping layer names (e.g., "standard:surface",
 684            "standard:bottom") to layer Datasets.
 685        """
 686        collection, segment_path, frame = self._extract_segment_info(segment)
 687
 688        # If we already have a STAC item, just use it. Otherwise, query to find the matching STAC items
 689        if isinstance(segment, xr.Dataset):
 690            properties = {}
 691            if frame:
 692                properties['opr:frame'] = frame
 693
 694            # Query STAC collection for CSARP_layer files matching this specific segment
 695
 696            # Get items from this specific segment
 697            stac_items = self.query_frames(collections=[collection], segment_paths=[segment_path], properties=properties)
 698
 699            # Filter for items that have CSARP_layer assets
 700            layer_items = []
 701            for idx, item in stac_items.iterrows():
 702                if 'CSARP_layer' in item['assets']:
 703                    layer_items.append(item)
 704        else:
 705            layer_items = [segment] if 'CSARP_layer' in segment.get('assets', {}) else []
 706
 707        if not layer_items:
 708            if raise_errors:
 709                raise ValueError(f"No CSARP_layer files found for segment path {segment_path} in collection {collection}")
 710            else:
 711                return {}
 712
 713        # Load each layer file and combine them
 714        layer_frames = []
 715        for item in layer_items:
 716            layer_asset = item['assets']['CSARP_layer']
 717            if layer_asset and 'href' in layer_asset:
 718                url = layer_asset['href']
 719                try:
 720                    layer_ds = self.load_layers_file(url)
 721                    layer_frames.append(layer_ds)
 722                except Exception as e:
 723                    raise e # TODO
 724                    print(f"Warning: Failed to load layer file {url}: {e}")
 725                    continue
 726
 727        if not layer_frames:
 728            if raise_errors:
 729                raise ValueError(f"No valid CSARP_layer files could be loaded for segment {segment_path} in collection {collection}")
 730            else:
 731                return {}
 732
 733        # Concatenate all layer frames along slow_time dimension
 734        layers_segment = xr.concat(layer_frames, dim='slow_time', combine_attrs='drop_conflicts', data_vars='minimal')
 735        layers_segment = layers_segment.sortby('slow_time')
 736
 737        # Trim to bounds of the original dataset
 738        layers_segment = self._trim_to_bounds(layers_segment, segment)
 739
 740        # Find the layer organization file to map layer IDs to names
 741        # Layer organization files are one per directory. For example, the layer file:
 742        # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/Data_20180105_03_003.mat
 743        # would have a layer organization file at:
 744        # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/layer_20180105_03.mat
 745
 746        layer_organization_file_url = re.sub(r'Data_(\d{8}_\d{2})_\d{3}.mat', r'layer_\1.mat', url)
 747        layer_organization_ds = self._load_layer_organization(self._open_file(layer_organization_file_url))
 748
 749        # Split into separate layers by ID
 750        layers = {}
 751
 752        layer_ids = np.unique(layers_segment['layer'])
 753
 754        for i, layer_id in enumerate(layer_ids):
 755            layer_id_int = int(layer_id)
 756            layer_name = str(layer_organization_ds['lyr_name'].sel(lyr_id=layer_id_int).values.item().squeeze())
 757            layer_group = str(layer_organization_ds['lyr_group_name'].sel(lyr_id=layer_id_int).values.item().squeeze())
 758            if layer_group == '[]':
 759                layer_group = ''
 760            layer_display_name = f"{layer_group}:{layer_name}"
 761
 762            layer_ds = layers_segment.sel(layer=layer_id).drop_vars('layer')
 763            # Only add non-empty layers with at least some non-NaN data
 764            if layer_ds.sizes.get('slow_time', 0) > 0:
 765                if not layer_ds.to_array().isnull().all():
 766                    layers[layer_display_name] = layer_ds
 767
 768        return layers
 769
 770    def _trim_to_bounds(self, ds: xr.Dataset, ref: Union[xr.Dataset, dict]) -> xr.Dataset:
 771        start_time, end_time = None, None
 772        if isinstance(ref, xr.Dataset) and 'slow_time' in ref.coords:
 773            start_time = ref['slow_time'].min()
 774            end_time = ref['slow_time'].max()
 775        else:
 776            properties = ref.get('properties', {})
 777            if 'start_datetime' in properties and 'end_datetime' in properties:
 778                start_time = pd.to_datetime(properties['start_datetime'])
 779                end_time = pd.to_datetime(properties['end_datetime'])
 780
 781        if start_time:
 782            return ds.sel(slow_time=slice(start_time, end_time))
 783        else:
 784            return ds
 785
 786    def load_layers_file(self, url: str) -> xr.Dataset:
 787        """
 788        Load layer data from CSARP_layer file.
 789
 790        Parameters
 791        ----------
 792        url : str
 793            URL or path to layer file.
 794
 795        Returns
 796        -------
 797        xr.Dataset
 798            Layer data with coordinates slow_time and layer, and data variables
 799            twtt, quality, type, lat, lon, and elev.
 800            Layer data in a standardized format with coordinates:
 801            - slow_time: GPS time converted to datetime
 802            And data variables:
 803            - twtt: Two-way travel time for each layer
 804            - quality: Quality values for each layer
 805            - type: Type values for each layer
 806            - lat, lon, elev: Geographic coordinates
 807            - id: Layer IDs
 808        """
 809
 810        file = self._open_file(url)
 811
 812        ds = self._load_layers(file)
 813
 814        # Add the source URL as an attribute
 815        ds.attrs['source_url'] = url
 816
 817        # Apply common manipulations to match the expected structure
 818        # Convert GPS time to datetime coordinate
 819        if 'gps_time' in ds.variables:
 820            slow_time_dt = pd.to_datetime(ds['gps_time'].values, unit='s')
 821            ds = ds.assign_coords(slow_time=('slow_time', slow_time_dt))
 822
 823            # Set slow_time as the main coordinate and remove gps_time from data_vars
 824            if 'slow_time' not in ds.dims:
 825                ds = ds.swap_dims({'gps_time': 'slow_time'})
 826
 827            # Remove gps_time from data_vars if it exists there to avoid conflicts
 828            if ('gps_time' in ds.data_vars) or ('gps_time' in ds.coords):
 829                ds = ds.drop_vars('gps_time')
 830
 831        # Sort by slow_time if it exists
 832        if 'slow_time' in ds.coords:
 833            ds = ds.sortby('slow_time')
 834
 835        return ds
 836
 837    def _load_layers(self, file) -> xr.Dataset:
 838        """
 839        Load layer data file using hdf5storage
 840
 841        Parameters:
 842        file : str
 843            Path to the layer file
 844        Returns: xr.Dataset
 845        """
 846
 847        d = hdf5storage.loadmat(file, appendmat=False)
 848
 849        # Ensure 'id' is at least 1D for the layer dimension
 850        id_data = np.atleast_1d(d['id'].squeeze())
 851
 852        # For 2D arrays with (layer, slow_time) dimensions, ensure they remain 2D
 853        # even when there's only one layer
 854        quality_data = np.atleast_2d(d['quality'].squeeze())
 855        if quality_data.shape[0] == 1 or quality_data.ndim == 1:
 856            quality_data = quality_data.reshape(len(id_data), -1)
 857
 858        twtt_data = np.atleast_2d(d['twtt'].squeeze())
 859        if twtt_data.shape[0] == 1 or twtt_data.ndim == 1:
 860            twtt_data = twtt_data.reshape(len(id_data), -1)
 861
 862        type_data = np.atleast_2d(d['type'].squeeze())
 863        if type_data.shape[0] == 1 or type_data.ndim == 1:
 864            type_data = type_data.reshape(len(id_data), -1)
 865
 866        ds = xr.Dataset({
 867            'file_type': ((), d['file_type'].squeeze()),
 868            'file_version': ((), d['file_version'].squeeze()),
 869            'elev': (('slow_time',), d['elev'].squeeze()),
 870            #'gps_source': ((), d['gps_source'].squeeze()),
 871            'gps_time': (('slow_time',), d['gps_time'].squeeze()),
 872            'id': (('layer',), id_data),
 873            'lat': (('slow_time',), d['lat'].squeeze()),
 874            'lon': (('slow_time',), d['lon'].squeeze()),
 875            'quality': (('layer', 'slow_time'), quality_data),
 876            'twtt': (('layer', 'slow_time'), twtt_data),
 877            'type': (('layer', 'slow_time'), type_data),
 878        })
 879
 880        ds = ds.assign_coords({'layer': ds['id'], 'slow_time': ds['gps_time']})
 881
 882        return ds
 883
 884    def _load_layer_organization(self, file) -> xr.Dataset:
 885        """
 886        Load a layer organization file
 887
 888        Parameters
 889        ----------
 890        file : str
 891            Path to the HDF5 layer organization file
 892
 893        Returns
 894        -------
 895        xr.Dataset
 896            Raw layer data from HDF5 file
 897        """
 898        d = hdf5storage.loadmat(file, appendmat=False)
 899        ds = xr.Dataset({
 900            'file_type': ((), d['file_type'].squeeze()),
 901            'file_version': ((), d['file_version'].squeeze()),
 902            'lyr_age': (('lyr_id',), np.atleast_1d(d['lyr_age'].squeeze())),
 903            'lyr_age_source': (('lyr_id',), np.atleast_1d(d['lyr_age_source'].squeeze())),
 904            'lyr_desc': (('lyr_id',), np.atleast_1d(d['lyr_desc'].squeeze())),
 905            'lyr_group_name': (('lyr_id',), np.atleast_1d(d['lyr_group_name'].squeeze())),
 906            'lyr_id': (('lyr_id',), np.atleast_1d(d['lyr_id'].squeeze())),
 907            'lyr_name': (('lyr_id',), np.atleast_1d(d['lyr_name'].squeeze())),
 908            'lyr_order': (('lyr_id',), np.atleast_1d(d['lyr_order'].squeeze())),
 909            }, attrs={'param': d['param']})
 910
 911        ds = ds.set_index(lyr_id='lyr_id')
 912
 913        return ds
 914
 915    def _get_layers_db(self, flight: Union[xr.Dataset, dict], include_geometry=True, raise_errors=True) -> dict:
 916        """
 917        Load layer picks from OPS database API.
 918
 919        Parameters
 920        ----------
 921        flight : xr.Dataset or dict
 922            Radar frame Dataset or STAC item.
 923        include_geometry : bool, optional
 924            If True, include geometry information in layers.
 925        raise_errors : bool, optional
 926            If True, raise errors when layers cannot be found.
 927
 928        Returns
 929        -------
 930        dict
 931            Dictionary mapping layer names to layer Datasets.
 932        """
 933
 934        collection, segment_path, _ = self._extract_segment_info(flight)
 935
 936        if 'Antarctica' in collection:
 937            location = 'antarctic'
 938        elif 'Greenland' in collection:
 939            location = 'arctic'
 940        else:
 941            raise ValueError("Dataset does not belong to a recognized location (Antarctica or Greenland).")
 942
 943        layer_points = ops_api.get_layer_points(
 944            segment_name=segment_path,
 945            season_name=collection,
 946            location=location,
 947            include_geometry=include_geometry
 948        )
 949
 950        if layer_points['status'] != 1:
 951            if raise_errors:
 952                raise ValueError(f"Failed to fetch layer points. Received response with status {layer_points['status']}.")
 953            else:
 954                return {}
 955
 956        layer_ds_raw = xr.Dataset(
 957            {k: (['gps_time'], v) for k, v in layer_points['data'].items() if k != 'gps_time'},
 958            coords={'gps_time': layer_points['data']['gps_time']}
 959        )
 960        # Split into a dictionary of layers based on lyr_id
 961        layer_ids = set(layer_ds_raw['lyr_id'].to_numpy())
 962        layer_ids = [int(layer_id) for layer_id in layer_ids if not np.isnan(layer_id)]
 963
 964        # Get the mapping of layer IDs to names
 965        if self.db_layers_metadata is None:
 966            layer_metadata = ops_api.get_layer_metadata()
 967            if layer_metadata['status'] == 1:
 968                df = pd.DataFrame(layer_metadata['data'])
 969                df['display_name'] = df['lyr_group_name'] + ":" + df['lyr_name']
 970                self.db_layers_metadata = df
 971            else:
 972                raise ValueError("Failed to fetch layer metadata from OPS API.")
 973
 974        layers = {}
 975        for layer_id in layer_ids:
 976            layer_name = self.db_layers_metadata.loc[self.db_layers_metadata['lyr_id'] == layer_id, 'display_name'].values[0]
 977
 978            layer_ds = layer_ds_raw.where(layer_ds_raw['lyr_id'] == layer_id, drop=True)
 979
 980            layer_ds = layer_ds.sortby('gps_time')
 981
 982            layer_ds = layer_ds.rename({'gps_time': 'slow_time'})
 983            layer_ds = layer_ds.set_coords(['slow_time'])
 984
 985            layer_ds['slow_time'] = pd.to_datetime(layer_ds['slow_time'].values, unit='s')
 986
 987            # Filter to the same time range as flight
 988            layer_ds = self._trim_to_bounds(layer_ds, flight)
 989
 990            # Only add non-empty layers with at least some non-NaN data
 991            if layer_ds.sizes.get('slow_time', 0) > 0:
 992                if not layer_ds.to_array().isnull().all():
 993                    layers[layer_name] = layer_ds
 994
 995        return layers
 996
 997    def get_layers(self, ds : Union[xr.Dataset, dict], source: str = 'auto', include_geometry=True, errors='warn') -> dict:
 998        """
 999        Load layer picks from files or database.
1000
1001        Tries files first, falls back to database if source='auto'.
1002
1003        Parameters
1004        ----------
1005        ds : Union[xr.Dataset, dict]
1006            Radar frame Dataset or STAC item.
1007        source : {'auto', 'files', 'db'}, optional
1008            Source for layers. 'auto' tries files then falls back to database.
1009        include_geometry : bool, optional
1010            If True, include geometry information when fetching from the API.
1011        errors : str, optional
1012            How to handle missing layer data: 'warn' (default) returns None with a warning,
1013            'error' raises an exception.
1014
1015        Returns
1016        -------
1017        dict or None
1018            A dictionary mapping layer IDs to their corresponding data, or None if no layers
1019            found and errors='warn'.
1020        """
1021        collection, segment_path, _ = self._extract_segment_info(ds)
1022
1023        if source == 'auto':
1024            # Try to get layers from files first
1025            try:
1026                layers = self._get_layers_files(ds, raise_errors=True)
1027                return layers
1028            except Exception:
1029                # Fallback to API if no layers found in files
1030                try:
1031                    return self._get_layers_db(ds, include_geometry=include_geometry, raise_errors=True)
1032                except Exception as e:
1033                    if errors == 'error':
1034                        raise
1035                    else:
1036                        warnings.warn(f"No layer data found for {collection}/{segment_path}: {e}")
1037                        return None
1038        elif source == 'files':
1039            try:
1040                return self._get_layers_files(ds, raise_errors=True)
1041            except Exception as e:
1042                if errors == 'error':
1043                    raise
1044                else:
1045                    warnings.warn(f"No layer files found for {collection}/{segment_path}: {e}")
1046                    return None
1047        elif source == 'db':
1048            try:
1049                return self._get_layers_db(ds, include_geometry=include_geometry, raise_errors=True)
1050            except Exception as e:
1051                if errors == 'error':
1052                    raise
1053                else:
1054                    warnings.warn(f"No layer data in DB for {collection}/{segment_path}: {e}")
1055                    return None
1056        else:
1057            raise ValueError("Invalid source specified. Must be one of: 'auto', 'files', 'db'.")
OPRConnection( collection_url: str = 'https://data.cresis.ku.edu/data/', cache_dir: str = None, stac_parquet_href: str = None, sync_catalogs: bool = True)
 78    def __init__(self,
 79                 collection_url: str = "https://data.cresis.ku.edu/data/",
 80                 cache_dir: str = None,
 81                 stac_parquet_href: str = None,
 82                 sync_catalogs: bool = True):
 83        """
 84        Initialize connection to OPR data archive.
 85
 86        Parameters
 87        ----------
 88        collection_url : str, optional
 89            Base URL for OPR data collection.
 90        cache_dir : str, optional
 91            Directory to cache downloaded data for faster repeated access.
 92        stac_parquet_href : str, optional
 93            Path or URL pattern to STAC catalog parquet files.  When *None*
 94            (default), the local cache is used if available, falling back to
 95            the S3 catalog.
 96        sync_catalogs : bool, optional
 97            If True (default), sync OPR STAC catalogs to a local cache
 98            before resolving the catalog path.  Uses ETag-based change
 99            detection so repeated calls are cheap (single HTTP request).
100        """
101        self.collection_url = collection_url
102        self.cache_dir = cache_dir
103        self._user_set_href = stac_parquet_href is not None
104        if sync_catalogs and not self._user_set_href:
105            sync_opr_catalogs()
106        self.stac_parquet_href = stac_parquet_href or get_opr_catalog_path()
107
108        self.fsspec_cache_kwargs = {}
109        self.fsspec_url_prefix = ''
110        if cache_dir:
111            self.fsspec_cache_kwargs['cache_storage'] = cache_dir
112            self.fsspec_cache_kwargs['check_files'] = True
113            self.fsspec_url_prefix = 'filecache::'
114
115        self.db_layers_metadata = None # Cache for OPS database layers metadata
116
117        # Lazy-initialized DuckDB client for STAC queries — reusing a single
118        # session caches remote file metadata (parquet footers) across calls
119        self._duckdb_client = None

Initialize connection to OPR data archive.

Parameters
  • collection_url (str, optional): Base URL for OPR data collection.
  • cache_dir (str, optional): Directory to cache downloaded data for faster repeated access.
  • stac_parquet_href (str, optional): Path or URL pattern to STAC catalog parquet files. When None (default), the local cache is used if available, falling back to the S3 catalog.
  • sync_catalogs (bool, optional): If True (default), sync OPR STAC catalogs to a local cache before resolving the catalog path. Uses ETag-based change detection so repeated calls are cheap (single HTTP request).
collection_url
cache_dir
stac_parquet_href
fsspec_cache_kwargs
fsspec_url_prefix
db_layers_metadata
duckdb_client
121    @property
122    def duckdb_client(self):
123        """Lazy-initialized DuckDB client, recreated after pickling."""
124        if self._duckdb_client is None:
125            self._duckdb_client = DuckdbClient()
126        return self._duckdb_client

Lazy-initialized DuckDB client, recreated after pickling.

def query_frames( self, collections: list[str] = None, segment_paths: list[str] = None, geometry=None, date_range: tuple = None, properties: dict = {}, max_items: int = None, exclude_geometry: bool = False, search_kwargs: dict = {}) -> geopandas.geodataframe.GeoDataFrame:
157    def query_frames(self, collections: list[str] = None, segment_paths: list[str] = None,
158                     geometry = None, date_range: tuple = None, properties: dict = {},
159                     max_items: int = None, exclude_geometry: bool = False,
160                     search_kwargs: dict = {}) -> gpd.GeoDataFrame:
161        """
162        Query STAC catalog for radar frames matching search criteria.
163
164        Multiple parameters are combined with AND logic. Lists of values passed to a
165        single parameter are treated with OR logic (any value matches).
166
167        Parameters
168        ----------
169        collections : list[str] or str, optional
170            Collection name(s) (e.g., "2022_Antarctica_BaslerMKB").
171        segment_paths : list[str] or str, optional
172            Segment path(s) in format "YYYYMMDD_NN" (e.g., "20230126_01").
173        geometry : shapely geometry or dict, optional
174            Geospatial geometry to filter by intersection.
175        date_range : str, optional
176            Date range in ISO format (e.g., "2021-01-01/2025-06-01").
177        properties : dict, optional
178            Additional STAC properties to filter by.
179        max_items : int, optional
180            Maximum number of items to return.
181        exclude_geometry : bool, optional
182            If True, exclude geometry field to reduce response size.
183        search_kwargs : dict, optional
184            Additional keyword arguments for STAC search.
185
186        Returns
187        -------
188        gpd.GeoDataFrame or None
189            GeoDataFrame of matching STAC items with columns for collection,
190            geometry, properties, and assets. Returns None if no matches found.
191        """
192
193        search_params = {}
194
195        # Exclude geometry -- do not return the geometry field to reduce response size
196        if exclude_geometry:
197            search_params['exclude'] = ['geometry']
198
199        # Handle collections (seasons)
200        if collections is not None:
201            search_params['collections'] = [collections] if isinstance(collections, str) else collections
202
203        # Handle geometry filtering
204        if geometry is not None:
205            if hasattr(geometry, '__geo_interface__'):
206                geometry = geometry.__geo_interface__
207
208            # Fix geometries that cross the antimeridian
209            geometry = antimeridian.fix_geojson(geometry, reverse=True)
210
211            search_params['intersects'] = geometry
212
213        # Handle date range filtering
214        if date_range is not None:
215            search_params['datetime'] = date_range
216
217        # Handle max_items
218        if max_items is not None:
219            search_params['limit'] = max_items
220
221        # Handle segment_paths filtering using CQL2
222        filter_conditions = []
223
224        if segment_paths is not None:
225            segment_paths = [segment_paths] if isinstance(segment_paths, str) else segment_paths
226
227            # Create OR conditions for segment paths
228            segment_conditions = []
229            for segment_path in segment_paths:
230                try:
231                    date_str, segment_num_str = segment_path.split('_')
232                    segment_num = int(segment_num_str)
233
234                    # Create AND condition for this specific segment
235                    segment_condition = {
236                        "op": "and",
237                        "args": [
238                            {
239                                "op": "=",
240                                "args": [{"property": "opr:date"}, date_str]
241                            },
242                            {
243                                "op": "=",
244                                "args": [{"property": "opr:segment"}, segment_num]
245                            }
246                        ]
247                    }
248                    segment_conditions.append(segment_condition)
249                except ValueError:
250                    print(f"Warning: Invalid segment_path format '{segment_path}'. Expected format: YYYYMMDD_NN")
251                    continue
252
253            if segment_conditions:
254                if len(segment_conditions) == 1:
255                    filter_conditions.append(segment_conditions[0])
256                else:
257                    # Multiple segments - combine with OR
258                    filter_conditions.append({
259                        "op": "or",
260                        "args": segment_conditions
261                    })
262
263        # Add any additional property filters
264        for key, value in properties.items():
265            filter_conditions.append({
266                "op": "=",
267                "args": [{"property": key}, value]
268            })
269
270        # Combine all filter conditions with AND
271        if filter_conditions:
272            if len(filter_conditions) == 1:
273                filter_expr = filter_conditions[0]
274            else:
275                filter_expr = {
276                    "op": "and",
277                    "args": filter_conditions
278                }
279
280            search_params['filter'] = filter_expr
281
282        # Add any extra kwargs to search
283        search_params.update(search_kwargs)
284
285        #print(search_params) # TODO: Remove
286
287        # Perform the search
288        # from rustac import DuckdbClient
289        items = self.duckdb_client.search(self.stac_parquet_href, **search_params)
290        if isinstance(items, dict):
291            items = items['features']
292
293        if not items or len(items) == 0:
294            warnings.warn("No items found matching the query criteria", UserWarning)
295            return None
296
297        # Convert to GeoDataFrame
298        items_df = gpd.GeoDataFrame(items)
299        # Set index
300        items_df = items_df.set_index(items_df['id'])
301        items_df.index.name = 'stac_item_id'
302        # Set the geometry column
303        if 'geometry' in items_df.columns and not exclude_geometry:
304            items_df = items_df.set_geometry(items_df['geometry'].apply(shapely.geometry.shape))
305            items_df.crs = "EPSG:4326"
306
307        # Reorder the columns, leaving any extra columns at the end
308        desired_order = ['collection', 'geometry', 'properties', 'assets']
309        items_df = items_df[[col for col in desired_order if col in items_df.columns] + list(items_df.columns.difference(desired_order))]
310
311        return items_df

Query STAC catalog for radar frames matching search criteria.

Multiple parameters are combined with AND logic. Lists of values passed to a single parameter are treated with OR logic (any value matches).

Parameters
  • collections (list[str] or str, optional): Collection name(s) (e.g., "2022_Antarctica_BaslerMKB").
  • segment_paths (list[str] or str, optional): Segment path(s) in format "YYYYMMDD_NN" (e.g., "20230126_01").
  • geometry (shapely geometry or dict, optional): Geospatial geometry to filter by intersection.
  • date_range (str, optional): Date range in ISO format (e.g., "2021-01-01/2025-06-01").
  • properties (dict, optional): Additional STAC properties to filter by.
  • max_items (int, optional): Maximum number of items to return.
  • exclude_geometry (bool, optional): If True, exclude geometry field to reduce response size.
  • search_kwargs (dict, optional): Additional keyword arguments for STAC search.
Returns
  • gpd.GeoDataFrame or None: GeoDataFrame of matching STAC items with columns for collection, geometry, properties, and assets. Returns None if no matches found.
def load_frames( self, stac_items: geopandas.geodataframe.GeoDataFrame, data_product: str = 'CSARP_standard', image: Optional[int] = None, merge_flights: bool = False, skip_errors: bool = False, allow_unlisted_products: bool = False) -> Union[list[xarray.core.dataset.Dataset], xarray.core.dataset.Dataset]:
313    def load_frames(self, stac_items: gpd.GeoDataFrame,
314                    data_product: str = "CSARP_standard",
315                    image: Union[int, None] = None,
316                    merge_flights: bool = False,
317                    skip_errors: bool = False,
318                    allow_unlisted_products: bool = False
319                    ) -> Union[list[xr.Dataset], xr.Dataset]:
320        """
321        Load multiple radar frames from STAC items.
322
323        Parameters
324        ----------
325        stac_items : gpd.GeoDataFrame
326            STAC items returned from query_frames.
327        data_product : str, optional
328            Data product to load (e.g., "CSARP_standard", "CSARP_qlook").
329        image : int or None, optional
330            The image number to load for each frame. If None (default), loads the
331            combined image. If specified, `allow_unlisted_products` must be True.
332        merge_flights : bool, optional
333            If True, merge frames from the same segment into single Datasets.
334        skip_errors : bool, optional
335            If True, skip failed frames and continue loading others.
336        allow_unlisted_products : bool, optional
337            If True, attempt to load the specified data product even if it's not
338            listed in the item's assets. See `load_frame` for details.
339
340        Returns
341        -------
342        list[xr.Dataset] or xr.Dataset
343            List of radar frame Datasets, or list of merged Datasets if
344            merge_flights=True.
345        """
346        frames = []
347
348        for idx, item in stac_items.iterrows():
349            try:
350                frame = self.load_frame(item, data_product, image=image, allow_unlisted_products=allow_unlisted_products)
351                frames.append(frame)
352            except Exception as e:
353                print(f"Error loading frame for item {item.get('id', 'unknown')}: {e}")
354                if skip_errors:
355                    continue
356                else:
357                    raise e
358
359        if merge_flights:
360            return opr_tools.merge_frames(frames)
361        else:
362            return frames

Load multiple radar frames from STAC items.

Parameters
  • stac_items (gpd.GeoDataFrame): STAC items returned from query_frames.
  • data_product (str, optional): Data product to load (e.g., "CSARP_standard", "CSARP_qlook").
  • image (int or None, optional): The image number to load for each frame. If None (default), loads the combined image. If specified, allow_unlisted_products must be True.
  • merge_flights (bool, optional): If True, merge frames from the same segment into single Datasets.
  • skip_errors (bool, optional): If True, skip failed frames and continue loading others.
  • allow_unlisted_products (bool, optional): If True, attempt to load the specified data product even if it's not listed in the item's assets. See load_frame for details.
Returns
  • list[xr.Dataset] or xr.Dataset: List of radar frame Datasets, or list of merged Datasets if merge_flights=True.
def load_frame( self, stac_item, data_product: str = 'CSARP_standard', image: Optional[int] = None, allow_unlisted_products: bool = False) -> xarray.core.dataset.Dataset:
364    def load_frame(self, stac_item, data_product: str = "CSARP_standard",
365                   image: Union[int, None] = None,
366                   allow_unlisted_products: bool = False) -> xr.Dataset:
367        """
368        Load a single radar frame from a STAC item.
369
370        Parameters
371        ----------
372        stac_item : dict or pd.Series
373            STAC item containing asset URLs.
374        data_product : str, optional
375            Data product to load (e.g., "CSARP_standard", "CSARP_qlook").
376        image : int or None, optional
377            The image number to load for this frame. If None (default), loads the
378            combined image. If specified, `allow_unlisted_products` must be True.
379        allow_unlisted_products : bool, optional
380            If True, attempt to load the specified data product even if it's not
381            listed in the item's assets.  This can be useful for loading non-standard
382            products. If set to True and the data product is not found in the STAC
383            item assets, the method will attempt to construct the URL based on any
384            available CSARP_* asset. (If the frame is entirely unlisted, you can use
385            `load_frame_url` instead.)
386        Returns
387        -------
388        xr.Dataset
389            Radar frame with coordinates slow_time and twtt, and data variables
390            including Data, Latitude, Longitude, Elevation, Surface, etc.
391        """
392
393        assets = stac_item['assets']
394
395        # Get the data asset
396        data_asset = assets.get(data_product)
397        if not data_asset:
398            if not allow_unlisted_products:
399                available_assets = list(assets.keys())
400                raise ValueError(f"{data_product} asset not found in STAC item. Available assets: {available_assets}")
401            else:
402                # Find any available CSARP_* asset to use as a template for constructing the URL
403                template_asset_name = None
404                template_asset_url = None
405                for asset_name, asset_info in assets.items():
406                    if asset_name.startswith('CSARP_'):
407                        template_asset_name = asset_name
408                        template_asset_url = asset_info.get('href')
409                        break
410
411                if not template_asset_url:
412                    available_assets = list(assets.keys())
413                    raise ValueError(f"No CSARP_* asset found in STAC item to use as a template for constructing URL for {data_product}. Available assets: {available_assets}")
414
415                url = template_asset_url.replace(template_asset_name, data_product)
416        else:
417            # The asset does exist in the STAC item, so just get the URL from the asset
418            url = data_asset.get('href')
419
420            if not url:
421                raise ValueError(f"No href found in {data_product} asset")
422
423        # If a specific image is requested, modify the URL to point to that image
424        if image is not None:
425            if not allow_unlisted_products:
426                raise ValueError("Specifying an image number requires allow_unlisted_products=True to construct the URL")
427            url = url.replace("Data_", f"Data_img_{image:02d}_")
428
429        # Load the frame using the existing method
430        return self.load_frame_url(url)

Load a single radar frame from a STAC item.

Parameters
  • stac_item (dict or pd.Series): STAC item containing asset URLs.
  • data_product (str, optional): Data product to load (e.g., "CSARP_standard", "CSARP_qlook").
  • image (int or None, optional): The image number to load for this frame. If None (default), loads the combined image. If specified, allow_unlisted_products must be True.
  • allow_unlisted_products (bool, optional): If True, attempt to load the specified data product even if it's not listed in the item's assets. This can be useful for loading non-standard products. If set to True and the data product is not found in the STAC item assets, the method will attempt to construct the URL based on any available CSARP_* asset. (If the frame is entirely unlisted, you can use load_frame_url instead.)
Returns
  • xr.Dataset: Radar frame with coordinates slow_time and twtt, and data variables including Data, Latitude, Longitude, Elevation, Surface, etc.
def load_frame_url(self, url: str) -> xarray.core.dataset.Dataset:
432    def load_frame_url(self, url: str) -> xr.Dataset:
433        """
434        Load a radar frame directly from a URL.
435
436        Automatically detects and handles both HDF5 and legacy MATLAB formats.
437
438        Parameters
439        ----------
440        url : str
441            URL to radar frame data file (.mat).
442
443        Returns
444        -------
445        xr.Dataset
446            Radar frame with CF-compliant metadata and citation information.
447        """
448
449        file = self._open_file(url)
450
451        filetype = None
452        try:
453            ds = self._load_frame_hdf5(file)
454            filetype = 'hdf5'
455        except OSError:
456            ds = self._load_frame_matlab(file)
457            filetype = 'matlab'
458
459        # Add the source URL as an attribute
460        ds.attrs['source_url'] = url
461
462        # Apply CF-compliant attributes
463        ds = apply_cf_compliant_attrs(ds)
464
465        # Get the season and segment from the URL
466        match = re.search(r'(\d{4}_\w+_[A-Za-z0-9]+)\/([\w_]+)\/[\d_]+\/[\w]+(\d{8}_\d{2}_\d{3})', url)
467        if match:
468            collection, data_product, granule = match.groups()
469            date, segment_id, frame_id = granule.split('_')
470            ds.attrs['collection'] = collection
471            ds.attrs['data_product'] = data_product
472            ds.attrs['granule'] = granule
473            ds.attrs['segment_path'] = f"{date}_{segment_id}"
474            ds.attrs['date_str'] = date
475            ds.attrs['segment'] = int(segment_id)
476            ds.attrs['frame'] = int(frame_id)
477
478            # Load citation information
479            result = ops_api.get_segment_metadata(segment_name=ds.attrs['segment_path'], season_name=collection)
480            if result:
481                if isinstance(result['data'], str):
482                    warnings.warn(f"Warning: Unexpected result from ops_api: {result['data']}", UserWarning)
483                else:
484                    result_data = {}
485                    for key, value in result['data'].items():
486                        if len(value) == 1:
487                            result_data[key] = value[0]
488                        elif len(value) > 1:
489                            result_data[key] = set(value)
490
491                    if 'dois' in result_data:
492                        ds.attrs['doi'] = result_data['dois']
493                    if 'rors' in result_data:
494                        ds.attrs['ror'] = result_data['rors']
495                    if 'funding_sources' in result_data:
496                        ds.attrs['funder_text'] = result_data['funding_sources']
497
498        # Add the rest of the Matlab parameters
499        if filetype == 'hdf5':
500            ds.attrs['mimetype'] = 'application/x-hdf5'
501            ds.attrs.update(decode_hdf5_matlab_variable(h5py.File(file, 'r'),
502                                                        skip_variables=True,
503                                                        skip_errors=True))
504        elif filetype == 'matlab':
505            ds.attrs['mimetype'] = 'application/x-matlab-data'
506            ds.attrs.update(extract_legacy_mat_attributes(file,
507                                                          skip_keys=ds.keys(),
508                                                          skip_errors=True))
509
510        return ds

Load a radar frame directly from a URL.

Automatically detects and handles both HDF5 and legacy MATLAB formats.

Parameters
  • url (str): URL to radar frame data file (.mat).
Returns
  • xr.Dataset: Radar frame with CF-compliant metadata and citation information.
def get_collections(self) -> list:
604    def get_collections(self) -> list:
605        """
606        Get all available collections (seasons/campaigns) in STAC catalog.
607
608        Returns
609        -------
610        list[dict]
611            List of collection dictionaries with id, description, and extent.
612        """
613
614        return self.duckdb_client.get_collections(self.stac_parquet_href)

Get all available collections (seasons/campaigns) in STAC catalog.

Returns
  • list[dict]: List of collection dictionaries with id, description, and extent.
def get_segments(self, collection_id: str) -> list:
616    def get_segments(self, collection_id: str) -> list:
617        """
618        Get all segments (flights) within a collection.
619
620        Parameters
621        ----------
622        collection_id : str
623            STAC collection ID (e.g., "2022_Antarctica_BaslerMKB").
624
625        Returns
626        -------
627        list[dict]
628            List of segment dictionaries with segment_path, date, flight_number,
629            and frame count.
630        """
631        # Query STAC API for all items in collection (exclude geometry for better performance)
632        items = self.query_frames(collections=[collection_id], exclude_geometry=True)
633
634        if items is None or len(items) == 0:
635            print(f"No items found in collection '{collection_id}'")
636            return []
637
638        # Group items by segment (opr:date + opr:segment)
639        segments = {}
640        for idx, item in items.iterrows():
641            properties = item['properties']
642            date = properties['opr:date']
643            flight_num = properties['opr:segment']
644
645            if date and flight_num is not None:
646                segment_path = f"{date}_{flight_num:02d}"
647
648                if segment_path not in segments:
649                    segments[segment_path] = {
650                        'segment_path': segment_path,
651                        'date': date,
652                        'flight_number': flight_num,
653                        'collection': collection_id,
654                        'frames': [],
655                        'item_count': 0
656                    }
657
658                segments[segment_path]['frames'].append(properties.get('opr:frame'))
659                segments[segment_path]['item_count'] += 1
660
661        # Sort segments by date and flight number
662        segment_list = list(segments.values())
663        segment_list.sort(key=lambda x: (x['date'], x['flight_number']))
664
665        return segment_list

Get all segments (flights) within a collection.

Parameters
  • collection_id (str): STAC collection ID (e.g., "2022_Antarctica_BaslerMKB").
Returns
  • list[dict]: List of segment dictionaries with segment_path, date, flight_number, and frame count.
def load_layers_file(self, url: str) -> xarray.core.dataset.Dataset:
786    def load_layers_file(self, url: str) -> xr.Dataset:
787        """
788        Load layer data from CSARP_layer file.
789
790        Parameters
791        ----------
792        url : str
793            URL or path to layer file.
794
795        Returns
796        -------
797        xr.Dataset
798            Layer data with coordinates slow_time and layer, and data variables
799            twtt, quality, type, lat, lon, and elev.
800            Layer data in a standardized format with coordinates:
801            - slow_time: GPS time converted to datetime
802            And data variables:
803            - twtt: Two-way travel time for each layer
804            - quality: Quality values for each layer
805            - type: Type values for each layer
806            - lat, lon, elev: Geographic coordinates
807            - id: Layer IDs
808        """
809
810        file = self._open_file(url)
811
812        ds = self._load_layers(file)
813
814        # Add the source URL as an attribute
815        ds.attrs['source_url'] = url
816
817        # Apply common manipulations to match the expected structure
818        # Convert GPS time to datetime coordinate
819        if 'gps_time' in ds.variables:
820            slow_time_dt = pd.to_datetime(ds['gps_time'].values, unit='s')
821            ds = ds.assign_coords(slow_time=('slow_time', slow_time_dt))
822
823            # Set slow_time as the main coordinate and remove gps_time from data_vars
824            if 'slow_time' not in ds.dims:
825                ds = ds.swap_dims({'gps_time': 'slow_time'})
826
827            # Remove gps_time from data_vars if it exists there to avoid conflicts
828            if ('gps_time' in ds.data_vars) or ('gps_time' in ds.coords):
829                ds = ds.drop_vars('gps_time')
830
831        # Sort by slow_time if it exists
832        if 'slow_time' in ds.coords:
833            ds = ds.sortby('slow_time')
834
835        return ds

Load layer data from CSARP_layer file.

Parameters
  • url (str): URL or path to layer file.
Returns
  • xr.Dataset: Layer data with coordinates slow_time and layer, and data variables twtt, quality, type, lat, lon, and elev. Layer data in a standardized format with coordinates:
    • slow_time: GPS time converted to datetime And data variables:
    • twtt: Two-way travel time for each layer
    • quality: Quality values for each layer
    • type: Type values for each layer
    • lat, lon, elev: Geographic coordinates
    • id: Layer IDs
def get_layers( self, ds: Union[xarray.core.dataset.Dataset, dict], source: str = 'auto', include_geometry=True, errors='warn') -> dict:
 997    def get_layers(self, ds : Union[xr.Dataset, dict], source: str = 'auto', include_geometry=True, errors='warn') -> dict:
 998        """
 999        Load layer picks from files or database.
1000
1001        Tries files first, falls back to database if source='auto'.
1002
1003        Parameters
1004        ----------
1005        ds : Union[xr.Dataset, dict]
1006            Radar frame Dataset or STAC item.
1007        source : {'auto', 'files', 'db'}, optional
1008            Source for layers. 'auto' tries files then falls back to database.
1009        include_geometry : bool, optional
1010            If True, include geometry information when fetching from the API.
1011        errors : str, optional
1012            How to handle missing layer data: 'warn' (default) returns None with a warning,
1013            'error' raises an exception.
1014
1015        Returns
1016        -------
1017        dict or None
1018            A dictionary mapping layer IDs to their corresponding data, or None if no layers
1019            found and errors='warn'.
1020        """
1021        collection, segment_path, _ = self._extract_segment_info(ds)
1022
1023        if source == 'auto':
1024            # Try to get layers from files first
1025            try:
1026                layers = self._get_layers_files(ds, raise_errors=True)
1027                return layers
1028            except Exception:
1029                # Fallback to API if no layers found in files
1030                try:
1031                    return self._get_layers_db(ds, include_geometry=include_geometry, raise_errors=True)
1032                except Exception as e:
1033                    if errors == 'error':
1034                        raise
1035                    else:
1036                        warnings.warn(f"No layer data found for {collection}/{segment_path}: {e}")
1037                        return None
1038        elif source == 'files':
1039            try:
1040                return self._get_layers_files(ds, raise_errors=True)
1041            except Exception as e:
1042                if errors == 'error':
1043                    raise
1044                else:
1045                    warnings.warn(f"No layer files found for {collection}/{segment_path}: {e}")
1046                    return None
1047        elif source == 'db':
1048            try:
1049                return self._get_layers_db(ds, include_geometry=include_geometry, raise_errors=True)
1050            except Exception as e:
1051                if errors == 'error':
1052                    raise
1053                else:
1054                    warnings.warn(f"No layer data in DB for {collection}/{segment_path}: {e}")
1055                    return None
1056        else:
1057            raise ValueError("Invalid source specified. Must be one of: 'auto', 'files', 'db'.")

Load layer picks from files or database.

Tries files first, falls back to database if source='auto'.

Parameters
  • ds (Union[xr.Dataset, dict]): Radar frame Dataset or STAC item.
  • source ({'auto', 'files', 'db'}, optional): Source for layers. 'auto' tries files then falls back to database.
  • include_geometry (bool, optional): If True, include geometry information when fetching from the API.
  • errors (str, optional): How to handle missing layer data: 'warn' (default) returns None with a warning, 'error' raises an exception.
Returns
  • dict or None: A dictionary mapping layer IDs to their corresponding data, or None if no layers found and errors='warn'.