xopr.opr_access

Core data access interface for Open Polar Radar (OPR) datasets.

This module provides the primary interface for discovering, querying, and loading polar ice-penetrating radar data from the Open Polar Radar archive. It implements a STAC (SpatioTemporal Asset Catalog) based workflow for efficient data discovery and retrieval.

Primary Class

OPRConnection : Main interface for accessing OPR data Provides methods for querying STAC catalogs, loading radar frames, and retrieving layer picks from both file-based and database sources.

Typical Workflow
  1. Create an OPRConnection object, optionally with local caching
  2. Query the STAC catalog to find radar data matching your criteria
  3. Load radar frames as xarray Datasets
  4. Optionally load associated layer picks
  5. Process and visualize the data

See this notebook for an end-to-end example:

Examples
>>> import xopr
>>> # Connect to OPR with local caching
>>> opr = xopr.OPRConnection(cache_dir='radar_cache')
>>>
>>> # Query for data in a specific collection and segment
>>> stac_items = opr.query_frames(
...     collections=['2022_Antarctica_BaslerMKB'],
...     segment_paths=['20230109_01']
... )
>>>
>>> # Load the radar frames
>>> frames = opr.load_frames(stac_items)
>>>
>>> # Get layer picks (surface/bed)
>>> layers = opr.get_layers(frames[0])
See Also

xopr.geometry: Geographic utilities for region selection and projection
xopr.radar_util: Processing functions for radar data and layers
ops_api: Interface to the OPS database API

   1"""
   2Core data access interface for Open Polar Radar (OPR) datasets.
   3
   4This module provides the primary interface for discovering, querying, and loading
   5polar ice-penetrating radar data from the Open Polar Radar archive. It implements
   6a STAC (SpatioTemporal Asset Catalog) based workflow for efficient data discovery
   7and retrieval.
   8
   9Primary Class
  10-------------
  11OPRConnection : Main interface for accessing OPR data
  12    Provides methods for querying STAC catalogs, loading radar frames,
  13    and retrieving layer picks from both file-based and database sources.
  14
  15Typical Workflow
  16----------------
  171. Create an OPRConnection object, optionally with local caching
  182. Query the STAC catalog to find radar data matching your criteria
  193. Load radar frames as xarray Datasets
  204. Optionally load associated layer picks
  215. Process and visualize the data
  22
  23See this notebook for an end-to-end example:
  24- https://docs.englacial.org/xopr/demo-notebook/
  25
  26Examples
  27--------
  28>>> import xopr
  29>>> # Connect to OPR with local caching
  30>>> opr = xopr.OPRConnection(cache_dir='radar_cache')
  31>>>
  32>>> # Query for data in a specific collection and segment
  33>>> stac_items = opr.query_frames(
  34...     collections=['2022_Antarctica_BaslerMKB'],
  35...     segment_paths=['20230109_01']
  36... )
  37>>>
  38>>> # Load the radar frames
  39>>> frames = opr.load_frames(stac_items)
  40>>>
  41>>> # Get layer picks (surface/bed)
  42>>> layers = opr.get_layers(frames[0])
  43
  44See Also
  45--------
  46xopr.geometry : Geographic utilities for region selection and projection
  47xopr.radar_util : Processing functions for radar data and layers
  48ops_api : Interface to the OPS database API
  49"""
  50
  51import re
  52import warnings
  53from typing import Union
  54
  55import antimeridian
  56import fsspec
  57import geopandas as gpd
  58import h5py
  59import hdf5storage
  60import numpy as np
  61import pandas as pd
  62import scipy.io
  63import shapely
  64import xarray as xr
  65from rustac import DuckdbClient
  66
  67from . import opr_tools, ops_api
  68from .cf_units import apply_cf_compliant_attrs
  69from .matlab_attribute_utils import (
  70    decode_hdf5_matlab_variable,
  71    extract_legacy_mat_attributes,
  72)
  73from .stac_cache import get_opr_catalog_path, sync_opr_catalogs
  74
  75
  76class OPRConnection:
  77    def __init__(self,
  78                 collection_url: str = "https://data.cresis.ku.edu/data/",
  79                 cache_dir: str = None,
  80                 stac_parquet_href: str = None,
  81                 sync_catalogs: bool = True):
  82        """
  83        Initialize connection to OPR data archive.
  84
  85        Parameters
  86        ----------
  87        collection_url : str, optional
  88            Base URL for OPR data collection.
  89        cache_dir : str, optional
  90            Directory to cache downloaded data for faster repeated access.
  91        stac_parquet_href : str, optional
  92            Path or URL pattern to STAC catalog parquet files.  When *None*
  93            (default), the local cache is used if available, falling back to
  94            the S3 catalog.
  95        sync_catalogs : bool, optional
  96            If True (default), sync OPR STAC catalogs to a local cache
  97            before resolving the catalog path.  Uses ETag-based change
  98            detection so repeated calls are cheap (single HTTP request).
  99        """
 100        self.collection_url = collection_url
 101        self.cache_dir = cache_dir
 102        self._user_set_href = stac_parquet_href is not None
 103        if sync_catalogs and not self._user_set_href:
 104            sync_opr_catalogs()
 105        self.stac_parquet_href = stac_parquet_href or get_opr_catalog_path()
 106
 107        self.fsspec_cache_kwargs = {}
 108        self.fsspec_url_prefix = ''
 109        if cache_dir:
 110            self.fsspec_cache_kwargs['cache_storage'] = cache_dir
 111            self.fsspec_cache_kwargs['check_files'] = True
 112            self.fsspec_url_prefix = 'filecache::'
 113
 114        self.db_layers_metadata = None # Cache for OPS database layers metadata
 115
 116        # Lazy-initialized DuckDB client for STAC queries — reusing a single
 117        # session caches remote file metadata (parquet footers) across calls
 118        self._duckdb_client = None
 119
 120    @property
 121    def duckdb_client(self):
 122        """Lazy-initialized DuckDB client, recreated after pickling."""
 123        if self._duckdb_client is None:
 124            self._duckdb_client = DuckdbClient()
 125        return self._duckdb_client
 126
 127    def __getstate__(self):
 128        state = self.__dict__.copy()
 129        state['_duckdb_client'] = None  # DuckdbClient can't be pickled
 130        return state
 131
 132    def _open_file(self, url):
 133        """Helper method to open files with appropriate caching."""
 134        if self.fsspec_url_prefix:
 135            return fsspec.open_local(f"{self.fsspec_url_prefix}{url}", filecache=self.fsspec_cache_kwargs)
 136        else:
 137            return fsspec.open_local(f"simplecache::{url}", simplecache=self.fsspec_cache_kwargs)
 138
 139    def _extract_segment_info(self, segment):
 140        """Extract collection, segment_path, and frame from Dataset or dict."""
 141        if isinstance(segment, xr.Dataset):
 142            return (segment.attrs.get('collection'),
 143                    segment.attrs.get('segment_path'),
 144                    segment.attrs.get('frame'))
 145        else:
 146            return (segment['collection'],
 147                    f"{segment['properties'].get('opr:date')}_{segment['properties'].get('opr:segment'):02d}",
 148                    segment['properties'].get('opr:frame'))
 149
 150    def query_frames(self, collections: list[str] = None, segment_paths: list[str] = None,
 151                     geometry = None, date_range: tuple = None, properties: dict = {},
 152                     max_items: int = None, exclude_geometry: bool = False,
 153                     search_kwargs: dict = {}) -> gpd.GeoDataFrame:
 154        """
 155        Query STAC catalog for radar frames matching search criteria.
 156
 157        Multiple parameters are combined with AND logic. Lists of values passed to a
 158        single parameter are treated with OR logic (any value matches).
 159
 160        Parameters
 161        ----------
 162        collections : list[str] or str, optional
 163            Collection name(s) (e.g., "2022_Antarctica_BaslerMKB").
 164        segment_paths : list[str] or str, optional
 165            Segment path(s) in format "YYYYMMDD_NN" (e.g., "20230126_01").
 166        geometry : shapely geometry or dict, optional
 167            Geospatial geometry to filter by intersection.
 168        date_range : str, optional
 169            Date range in ISO format (e.g., "2021-01-01/2025-06-01").
 170        properties : dict, optional
 171            Additional STAC properties to filter by.
 172        max_items : int, optional
 173            Maximum number of items to return.
 174        exclude_geometry : bool, optional
 175            If True, exclude geometry field to reduce response size.
 176        search_kwargs : dict, optional
 177            Additional keyword arguments for STAC search.
 178
 179        Returns
 180        -------
 181        gpd.GeoDataFrame or None
 182            GeoDataFrame of matching STAC items with columns for collection,
 183            geometry, properties, and assets. Returns None if no matches found.
 184        """
 185
 186        search_params = {}
 187
 188        # Exclude geometry -- do not return the geometry field to reduce response size
 189        if exclude_geometry:
 190            search_params['exclude'] = ['geometry']
 191
 192        # Handle collections (seasons)
 193        if collections is not None:
 194            search_params['collections'] = [collections] if isinstance(collections, str) else collections
 195
 196        # Handle geometry filtering
 197        if geometry is not None:
 198            if hasattr(geometry, '__geo_interface__'):
 199                geometry = geometry.__geo_interface__
 200
 201            # Fix geometries that cross the antimeridian
 202            geometry = antimeridian.fix_geojson(geometry, reverse=True)
 203
 204            search_params['intersects'] = geometry
 205
 206        # Handle date range filtering
 207        if date_range is not None:
 208            search_params['datetime'] = date_range
 209
 210        # Handle max_items
 211        if max_items is not None:
 212            search_params['limit'] = max_items
 213
 214        # Handle segment_paths filtering using CQL2
 215        filter_conditions = []
 216
 217        if segment_paths is not None:
 218            segment_paths = [segment_paths] if isinstance(segment_paths, str) else segment_paths
 219
 220            # Create OR conditions for segment paths
 221            segment_conditions = []
 222            for segment_path in segment_paths:
 223                try:
 224                    date_str, segment_num_str = segment_path.split('_')
 225                    segment_num = int(segment_num_str)
 226
 227                    # Create AND condition for this specific segment
 228                    segment_condition = {
 229                        "op": "and",
 230                        "args": [
 231                            {
 232                                "op": "=",
 233                                "args": [{"property": "opr:date"}, date_str]
 234                            },
 235                            {
 236                                "op": "=",
 237                                "args": [{"property": "opr:segment"}, segment_num]
 238                            }
 239                        ]
 240                    }
 241                    segment_conditions.append(segment_condition)
 242                except ValueError:
 243                    print(f"Warning: Invalid segment_path format '{segment_path}'. Expected format: YYYYMMDD_NN")
 244                    continue
 245
 246            if segment_conditions:
 247                if len(segment_conditions) == 1:
 248                    filter_conditions.append(segment_conditions[0])
 249                else:
 250                    # Multiple segments - combine with OR
 251                    filter_conditions.append({
 252                        "op": "or",
 253                        "args": segment_conditions
 254                    })
 255
 256        # Add any additional property filters
 257        for key, value in properties.items():
 258            filter_conditions.append({
 259                "op": "=",
 260                "args": [{"property": key}, value]
 261            })
 262
 263        # Combine all filter conditions with AND
 264        if filter_conditions:
 265            if len(filter_conditions) == 1:
 266                filter_expr = filter_conditions[0]
 267            else:
 268                filter_expr = {
 269                    "op": "and",
 270                    "args": filter_conditions
 271                }
 272
 273            search_params['filter'] = filter_expr
 274
 275        # Add any extra kwargs to search
 276        search_params.update(search_kwargs)
 277
 278        #print(search_params) # TODO: Remove
 279
 280        # Perform the search
 281        # from rustac import DuckdbClient
 282        items = self.duckdb_client.search(self.stac_parquet_href, **search_params)
 283        if isinstance(items, dict):
 284            items = items['features']
 285
 286        if not items or len(items) == 0:
 287            warnings.warn("No items found matching the query criteria", UserWarning)
 288            return None
 289
 290        # Convert to GeoDataFrame
 291        items_df = gpd.GeoDataFrame(items)
 292        # Set index
 293        items_df = items_df.set_index(items_df['id'])
 294        items_df.index.name = 'stac_item_id'
 295        # Set the geometry column
 296        if 'geometry' in items_df.columns and not exclude_geometry:
 297            items_df = items_df.set_geometry(items_df['geometry'].apply(shapely.geometry.shape))
 298            items_df.crs = "EPSG:4326"
 299
 300        # Reorder the columns, leaving any extra columns at the end
 301        desired_order = ['collection', 'geometry', 'properties', 'assets']
 302        items_df = items_df[[col for col in desired_order if col in items_df.columns] + list(items_df.columns.difference(desired_order))]
 303
 304        return items_df
 305
 306    def load_frames(self, stac_items: gpd.GeoDataFrame,
 307                    data_product: str = "CSARP_standard",
 308                    merge_flights: bool = False,
 309                    skip_errors: bool = False,
 310                    ) -> Union[list[xr.Dataset], xr.Dataset]:
 311        """
 312        Load multiple radar frames from STAC items.
 313
 314        Parameters
 315        ----------
 316        stac_items : gpd.GeoDataFrame
 317            STAC items returned from query_frames.
 318        data_product : str, optional
 319            Data product to load (e.g., "CSARP_standard", "CSARP_qlook").
 320        merge_flights : bool, optional
 321            If True, merge frames from the same segment into single Datasets.
 322        skip_errors : bool, optional
 323            If True, skip failed frames and continue loading others.
 324
 325        Returns
 326        -------
 327        list[xr.Dataset] or xr.Dataset
 328            List of radar frame Datasets, or list of merged Datasets if
 329            merge_flights=True.
 330        """
 331        frames = []
 332
 333        for idx, item in stac_items.iterrows():
 334            try:
 335                frame = self.load_frame(item, data_product)
 336                frames.append(frame)
 337            except Exception as e:
 338                print(f"Error loading frame for item {item.get('id', 'unknown')}: {e}")
 339                if skip_errors:
 340                    continue
 341                else:
 342                    raise e
 343
 344        if merge_flights:
 345            return opr_tools.merge_frames(frames)
 346        else:
 347            return frames
 348
 349    def load_frame(self, stac_item, data_product: str = "CSARP_standard") -> xr.Dataset:
 350        """
 351        Load a single radar frame from a STAC item.
 352
 353        Parameters
 354        ----------
 355        stac_item : dict or pd.Series
 356            STAC item containing asset URLs.
 357        data_product : str, optional
 358            Data product to load (e.g., "CSARP_standard", "CSARP_qlook").
 359
 360        Returns
 361        -------
 362        xr.Dataset
 363            Radar frame with coordinates slow_time and twtt, and data variables
 364            including Data, Latitude, Longitude, Elevation, Surface, etc.
 365        """
 366
 367        assets = stac_item['assets']
 368
 369        # Get the data asset
 370        data_asset = assets.get(data_product)
 371        if not data_asset:
 372            available_assets = list(assets.keys())
 373            raise ValueError(f"No {data_product} asset found. Available assets: {available_assets}")
 374
 375        # Get the URL from the asset
 376        url = data_asset.get('href')
 377        if not url:
 378            raise ValueError(f"No href found in {data_product} asset")
 379
 380        # Load the frame using the existing method
 381        return self.load_frame_url(url)
 382
 383    def load_frame_url(self, url: str) -> xr.Dataset:
 384        """
 385        Load a radar frame directly from a URL.
 386
 387        Automatically detects and handles both HDF5 and legacy MATLAB formats.
 388
 389        Parameters
 390        ----------
 391        url : str
 392            URL to radar frame data file (.mat).
 393
 394        Returns
 395        -------
 396        xr.Dataset
 397            Radar frame with CF-compliant metadata and citation information.
 398        """
 399
 400        file = self._open_file(url)
 401
 402        filetype = None
 403        try:
 404            ds = self._load_frame_hdf5(file)
 405            filetype = 'hdf5'
 406        except OSError:
 407            ds = self._load_frame_matlab(file)
 408            filetype = 'matlab'
 409
 410        # Add the source URL as an attribute
 411        ds.attrs['source_url'] = url
 412
 413        # Apply CF-compliant attributes
 414        ds = apply_cf_compliant_attrs(ds)
 415
 416        # Get the season and segment from the URL
 417        match = re.search(r'(\d{4}_\w+_[A-Za-z0-9]+)\/([\w_]+)\/[\d_]+\/[\w]+(\d{8}_\d{2}_\d{3})', url)
 418        if match:
 419            collection, data_product, granule = match.groups()
 420            date, segment_id, frame_id = granule.split('_')
 421            ds.attrs['collection'] = collection
 422            ds.attrs['data_product'] = data_product
 423            ds.attrs['granule'] = granule
 424            ds.attrs['segment_path'] = f"{date}_{segment_id}"
 425            ds.attrs['date_str'] = date
 426            ds.attrs['segment'] = int(segment_id)
 427            ds.attrs['frame'] = int(frame_id)
 428
 429            # Load citation information
 430            result = ops_api.get_segment_metadata(segment_name=ds.attrs['segment_path'], season_name=collection)
 431            if result:
 432                if isinstance(result['data'], str):
 433                    warnings.warn(f"Warning: Unexpected result from ops_api: {result['data']}", UserWarning)
 434                else:
 435                    result_data = {}
 436                    for key, value in result['data'].items():
 437                        if len(value) == 1:
 438                            result_data[key] = value[0]
 439                        elif len(value) > 1:
 440                            result_data[key] = set(value)
 441
 442                    if 'dois' in result_data:
 443                        ds.attrs['doi'] = result_data['dois']
 444                    if 'rors' in result_data:
 445                        ds.attrs['ror'] = result_data['rors']
 446                    if 'funding_sources' in result_data:
 447                        ds.attrs['funder_text'] = result_data['funding_sources']
 448
 449        # Add the rest of the Matlab parameters
 450        if filetype == 'hdf5':
 451            ds.attrs['mimetype'] = 'application/x-hdf5'
 452            ds.attrs.update(decode_hdf5_matlab_variable(h5py.File(file, 'r'),
 453                                                        skip_variables=True,
 454                                                        skip_errors=True))
 455        elif filetype == 'matlab':
 456            ds.attrs['mimetype'] = 'application/x-matlab-data'
 457            ds.attrs.update(extract_legacy_mat_attributes(file,
 458                                                          skip_keys=ds.keys(),
 459                                                          skip_errors=True))
 460
 461        return ds
 462
 463    def _load_frame_hdf5(self, file) -> xr.Dataset:
 464        """
 465        Load a radar frame from an HDF5 file.
 466
 467        Parameters
 468        ----------
 469        file :
 470            The path to the HDF5 file containing radar frame data.
 471
 472        Returns
 473        -------
 474        xr.Dataset
 475            The loaded radar frame as an xarray Dataset.
 476        """
 477
 478        ds = xr.open_dataset(file, engine='h5netcdf', phony_dims='sort')
 479
 480        # Re-arrange variables to provide useful dimensions and coordinates
 481
 482        ds = ds.squeeze() # Drop the singleton dimensions matlab adds
 483
 484        ds = ds.rename({ # Label the dimensions with more useful names
 485            ds.Data.dims[0]: 'slow_time_idx',
 486            ds.Data.dims[1]: 'twtt_idx',
 487        })
 488
 489        # Make variables with no dimensions into scalar attributes
 490        for var in ds.data_vars:
 491            if ds[var].ndim == 0:
 492                ds.attrs[var] = ds[var].item()
 493                ds = ds.drop_vars(var)
 494
 495        # Make the file_type an attribute
 496        if 'file_type' in ds.data_vars:
 497            ds.attrs['file_type'] = ds['file_type'].to_numpy()
 498            ds = ds.drop_vars('file_type')
 499
 500        # Name the two time coordinates
 501        ds = ds.rename({'Time': 'twtt', 'GPS_time': 'slow_time'})
 502        ds = ds.set_coords(['slow_time', 'twtt'])
 503
 504        slow_time_1d = pd.to_datetime(ds['slow_time'].values, unit='s')
 505        ds = ds.assign_coords(slow_time=('slow_time_idx', slow_time_1d))
 506
 507        # Make twtt and slow_time the indexing coordinates
 508        ds = ds.swap_dims({'twtt_idx': 'twtt'})
 509        ds = ds.swap_dims({'slow_time_idx': 'slow_time'})
 510
 511        return ds
 512
 513    def _load_frame_matlab(self, file) -> xr.Dataset:
 514        """
 515        Load a radar frame from a MATLAB file.
 516
 517        Parameters
 518        ----------
 519        file :
 520            The path to the MATLAB file containing radar frame data.
 521
 522        Returns
 523        -------
 524        xr.Dataset
 525            The loaded radar frame as an xarray Dataset.
 526        """
 527
 528        m = scipy.io.loadmat(file, mat_dtype=False)
 529
 530        key_dims = {
 531            'Time': ('twtt',),
 532            'GPS_time': ('slow_time',),
 533            'Latitude': ('slow_time',),
 534            'Longitude': ('slow_time',),
 535            'Elevation': ('slow_time',),
 536            'Roll': ('slow_time',),
 537            'Pitch': ('slow_time',),
 538            'Heading': ('slow_time',),
 539            'Surface': ('slow_time',),
 540            'Data': ('twtt', 'slow_time')
 541        }
 542
 543        ds = xr.Dataset(
 544            {
 545                key: (dims, np.squeeze(m[key])) for key, dims in key_dims.items() if key in m
 546            },
 547            coords={
 548                'twtt': ('twtt', np.squeeze(m['Time'])),
 549                'slow_time': ('slow_time', pd.to_datetime(np.squeeze(m['GPS_time']), unit='s')),
 550            }
 551        )
 552
 553        return ds
 554
 555    def get_collections(self) -> list:
 556        """
 557        Get all available collections (seasons/campaigns) in STAC catalog.
 558
 559        Returns
 560        -------
 561        list[dict]
 562            List of collection dictionaries with id, description, and extent.
 563        """
 564
 565        return self.duckdb_client.get_collections(self.stac_parquet_href)
 566
 567    def get_segments(self, collection_id: str) -> list:
 568        """
 569        Get all segments (flights) within a collection.
 570
 571        Parameters
 572        ----------
 573        collection_id : str
 574            STAC collection ID (e.g., "2022_Antarctica_BaslerMKB").
 575
 576        Returns
 577        -------
 578        list[dict]
 579            List of segment dictionaries with segment_path, date, flight_number,
 580            and frame count.
 581        """
 582        # Query STAC API for all items in collection (exclude geometry for better performance)
 583        items = self.query_frames(collections=[collection_id], exclude_geometry=True)
 584
 585        if items is None or len(items) == 0:
 586            print(f"No items found in collection '{collection_id}'")
 587            return []
 588
 589        # Group items by segment (opr:date + opr:segment)
 590        segments = {}
 591        for idx, item in items.iterrows():
 592            properties = item['properties']
 593            date = properties['opr:date']
 594            flight_num = properties['opr:segment']
 595
 596            if date and flight_num is not None:
 597                segment_path = f"{date}_{flight_num:02d}"
 598
 599                if segment_path not in segments:
 600                    segments[segment_path] = {
 601                        'segment_path': segment_path,
 602                        'date': date,
 603                        'flight_number': flight_num,
 604                        'collection': collection_id,
 605                        'frames': [],
 606                        'item_count': 0
 607                    }
 608
 609                segments[segment_path]['frames'].append(properties.get('opr:frame'))
 610                segments[segment_path]['item_count'] += 1
 611
 612        # Sort segments by date and flight number
 613        segment_list = list(segments.values())
 614        segment_list.sort(key=lambda x: (x['date'], x['flight_number']))
 615
 616        return segment_list
 617
 618    def _get_layers_files(self, segment: Union[xr.Dataset, dict], raise_errors=True) -> dict:
 619        """
 620        Fetch layers from the CSARP_layers files
 621
 622        See https://gitlab.com/openpolarradar/opr/-/wikis/Layer-File-Guide for file formats
 623
 624        Parameters
 625        ----------
 626        segment : xr.Dataset or dict
 627            Radar frame Dataset or STAC item.
 628        raise_errors : bool, optional
 629            If True, raise errors when layers cannot be found.
 630
 631        Returns
 632        -------
 633        dict
 634            Dictionary mapping layer names (e.g., "standard:surface",
 635            "standard:bottom") to layer Datasets.
 636        """
 637        collection, segment_path, frame = self._extract_segment_info(segment)
 638
 639        # If we already have a STAC item, just use it. Otherwise, query to find the matching STAC items
 640        if isinstance(segment, xr.Dataset):
 641            properties = {}
 642            if frame:
 643                properties['opr:frame'] = frame
 644
 645            # Query STAC collection for CSARP_layer files matching this specific segment
 646
 647            # Get items from this specific segment
 648            stac_items = self.query_frames(collections=[collection], segment_paths=[segment_path], properties=properties)
 649
 650            # Filter for items that have CSARP_layer assets
 651            layer_items = []
 652            for idx, item in stac_items.iterrows():
 653                if 'CSARP_layer' in item['assets']:
 654                    layer_items.append(item)
 655        else:
 656            layer_items = [segment] if 'CSARP_layer' in segment.get('assets', {}) else []
 657
 658        if not layer_items:
 659            if raise_errors:
 660                raise ValueError(f"No CSARP_layer files found for segment path {segment_path} in collection {collection}")
 661            else:
 662                return {}
 663
 664        # Load each layer file and combine them
 665        layer_frames = []
 666        for item in layer_items:
 667            layer_asset = item['assets']['CSARP_layer']
 668            if layer_asset and 'href' in layer_asset:
 669                url = layer_asset['href']
 670                try:
 671                    layer_ds = self.load_layers_file(url)
 672                    layer_frames.append(layer_ds)
 673                except Exception as e:
 674                    raise e # TODO
 675                    print(f"Warning: Failed to load layer file {url}: {e}")
 676                    continue
 677
 678        if not layer_frames:
 679            if raise_errors:
 680                raise ValueError(f"No valid CSARP_layer files could be loaded for segment {segment_path} in collection {collection}")
 681            else:
 682                return {}
 683
 684        # Concatenate all layer frames along slow_time dimension
 685        layers_segment = xr.concat(layer_frames, dim='slow_time', combine_attrs='drop_conflicts', data_vars='minimal')
 686        layers_segment = layers_segment.sortby('slow_time')
 687
 688        # Trim to bounds of the original dataset
 689        layers_segment = self._trim_to_bounds(layers_segment, segment)
 690
 691        # Find the layer organization file to map layer IDs to names
 692        # Layer organization files are one per directory. For example, the layer file:
 693        # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/Data_20180105_03_003.mat
 694        # would have a layer organization file at:
 695        # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/layer_20180105_03.mat
 696
 697        layer_organization_file_url = re.sub(r'Data_(\d{8}_\d{2})_\d{3}.mat', r'layer_\1.mat', url)
 698        layer_organization_ds = self._load_layer_organization(self._open_file(layer_organization_file_url))
 699
 700        # Split into separate layers by ID
 701        layers = {}
 702
 703        layer_ids = np.unique(layers_segment['layer'])
 704
 705        for i, layer_id in enumerate(layer_ids):
 706            layer_id_int = int(layer_id)
 707            layer_name = str(layer_organization_ds['lyr_name'].sel(lyr_id=layer_id_int).values.item().squeeze())
 708            layer_group = str(layer_organization_ds['lyr_group_name'].sel(lyr_id=layer_id_int).values.item().squeeze())
 709            if layer_group == '[]':
 710                layer_group = ''
 711            layer_display_name = f"{layer_group}:{layer_name}"
 712
 713            layer_ds = layers_segment.sel(layer=layer_id).drop_vars('layer')
 714            # Only add non-empty layers with at least some non-NaN data
 715            if layer_ds.sizes.get('slow_time', 0) > 0:
 716                if not layer_ds.to_array().isnull().all():
 717                    layers[layer_display_name] = layer_ds
 718
 719        return layers
 720
 721    def _trim_to_bounds(self, ds: xr.Dataset, ref: Union[xr.Dataset, dict]) -> xr.Dataset:
 722        start_time, end_time = None, None
 723        if isinstance(ref, xr.Dataset) and 'slow_time' in ref.coords:
 724            start_time = ref['slow_time'].min()
 725            end_time = ref['slow_time'].max()
 726        else:
 727            properties = ref.get('properties', {})
 728            if 'start_datetime' in properties and 'end_datetime' in properties:
 729                start_time = pd.to_datetime(properties['start_datetime'])
 730                end_time = pd.to_datetime(properties['end_datetime'])
 731
 732        if start_time:
 733            return ds.sel(slow_time=slice(start_time, end_time))
 734        else:
 735            return ds
 736
 737    def load_layers_file(self, url: str) -> xr.Dataset:
 738        """
 739        Load layer data from CSARP_layer file.
 740
 741        Parameters
 742        ----------
 743        url : str
 744            URL or path to layer file.
 745
 746        Returns
 747        -------
 748        xr.Dataset
 749            Layer data with coordinates slow_time and layer, and data variables
 750            twtt, quality, type, lat, lon, and elev.
 751            Layer data in a standardized format with coordinates:
 752            - slow_time: GPS time converted to datetime
 753            And data variables:
 754            - twtt: Two-way travel time for each layer
 755            - quality: Quality values for each layer
 756            - type: Type values for each layer
 757            - lat, lon, elev: Geographic coordinates
 758            - id: Layer IDs
 759        """
 760
 761        file = self._open_file(url)
 762
 763        ds = self._load_layers(file)
 764
 765        # Add the source URL as an attribute
 766        ds.attrs['source_url'] = url
 767
 768        # Apply common manipulations to match the expected structure
 769        # Convert GPS time to datetime coordinate
 770        if 'gps_time' in ds.variables:
 771            slow_time_dt = pd.to_datetime(ds['gps_time'].values, unit='s')
 772            ds = ds.assign_coords(slow_time=('slow_time', slow_time_dt))
 773
 774            # Set slow_time as the main coordinate and remove gps_time from data_vars
 775            if 'slow_time' not in ds.dims:
 776                ds = ds.swap_dims({'gps_time': 'slow_time'})
 777
 778            # Remove gps_time from data_vars if it exists there to avoid conflicts
 779            if ('gps_time' in ds.data_vars) or ('gps_time' in ds.coords):
 780                ds = ds.drop_vars('gps_time')
 781
 782        # Sort by slow_time if it exists
 783        if 'slow_time' in ds.coords:
 784            ds = ds.sortby('slow_time')
 785
 786        return ds
 787
 788    def _load_layers(self, file) -> xr.Dataset:
 789        """
 790        Load layer data file using hdf5storage
 791
 792        Parameters:
 793        file : str
 794            Path to the layer file
 795        Returns: xr.Dataset
 796        """
 797
 798        d = hdf5storage.loadmat(file, appendmat=False)
 799
 800        # Ensure 'id' is at least 1D for the layer dimension
 801        id_data = np.atleast_1d(d['id'].squeeze())
 802
 803        # For 2D arrays with (layer, slow_time) dimensions, ensure they remain 2D
 804        # even when there's only one layer
 805        quality_data = np.atleast_2d(d['quality'].squeeze())
 806        if quality_data.shape[0] == 1 or quality_data.ndim == 1:
 807            quality_data = quality_data.reshape(len(id_data), -1)
 808
 809        twtt_data = np.atleast_2d(d['twtt'].squeeze())
 810        if twtt_data.shape[0] == 1 or twtt_data.ndim == 1:
 811            twtt_data = twtt_data.reshape(len(id_data), -1)
 812
 813        type_data = np.atleast_2d(d['type'].squeeze())
 814        if type_data.shape[0] == 1 or type_data.ndim == 1:
 815            type_data = type_data.reshape(len(id_data), -1)
 816
 817        ds = xr.Dataset({
 818            'file_type': ((), d['file_type'].squeeze()),
 819            'file_version': ((), d['file_version'].squeeze()),
 820            'elev': (('slow_time',), d['elev'].squeeze()),
 821            #'gps_source': ((), d['gps_source'].squeeze()),
 822            'gps_time': (('slow_time',), d['gps_time'].squeeze()),
 823            'id': (('layer',), id_data),
 824            'lat': (('slow_time',), d['lat'].squeeze()),
 825            'lon': (('slow_time',), d['lon'].squeeze()),
 826            'quality': (('layer', 'slow_time'), quality_data),
 827            'twtt': (('layer', 'slow_time'), twtt_data),
 828            'type': (('layer', 'slow_time'), type_data),
 829        })
 830
 831        ds = ds.assign_coords({'layer': ds['id'], 'slow_time': ds['gps_time']})
 832
 833        return ds
 834
 835    def _load_layer_organization(self, file) -> xr.Dataset:
 836        """
 837        Load a layer organization file
 838
 839        Parameters
 840        ----------
 841        file : str
 842            Path to the HDF5 layer organization file
 843
 844        Returns
 845        -------
 846        xr.Dataset
 847            Raw layer data from HDF5 file
 848        """
 849        d = hdf5storage.loadmat(file, appendmat=False)
 850        ds = xr.Dataset({
 851            'file_type': ((), d['file_type'].squeeze()),
 852            'file_version': ((), d['file_version'].squeeze()),
 853            'lyr_age': (('lyr_id',), np.atleast_1d(d['lyr_age'].squeeze())),
 854            'lyr_age_source': (('lyr_id',), np.atleast_1d(d['lyr_age_source'].squeeze())),
 855            'lyr_desc': (('lyr_id',), np.atleast_1d(d['lyr_desc'].squeeze())),
 856            'lyr_group_name': (('lyr_id',), np.atleast_1d(d['lyr_group_name'].squeeze())),
 857            'lyr_id': (('lyr_id',), np.atleast_1d(d['lyr_id'].squeeze())),
 858            'lyr_name': (('lyr_id',), np.atleast_1d(d['lyr_name'].squeeze())),
 859            'lyr_order': (('lyr_id',), np.atleast_1d(d['lyr_order'].squeeze())),
 860            }, attrs={'param': d['param']})
 861
 862        ds = ds.set_index(lyr_id='lyr_id')
 863
 864        return ds
 865
 866    def _get_layers_db(self, flight: Union[xr.Dataset, dict], include_geometry=True, raise_errors=True) -> dict:
 867        """
 868        Load layer picks from OPS database API.
 869
 870        Parameters
 871        ----------
 872        flight : xr.Dataset or dict
 873            Radar frame Dataset or STAC item.
 874        include_geometry : bool, optional
 875            If True, include geometry information in layers.
 876        raise_errors : bool, optional
 877            If True, raise errors when layers cannot be found.
 878
 879        Returns
 880        -------
 881        dict
 882            Dictionary mapping layer names to layer Datasets.
 883        """
 884
 885        collection, segment_path, _ = self._extract_segment_info(flight)
 886
 887        if 'Antarctica' in collection:
 888            location = 'antarctic'
 889        elif 'Greenland' in collection:
 890            location = 'arctic'
 891        else:
 892            raise ValueError("Dataset does not belong to a recognized location (Antarctica or Greenland).")
 893
 894        layer_points = ops_api.get_layer_points(
 895            segment_name=segment_path,
 896            season_name=collection,
 897            location=location,
 898            include_geometry=include_geometry
 899        )
 900
 901        if layer_points['status'] != 1:
 902            if raise_errors:
 903                raise ValueError(f"Failed to fetch layer points. Received response with status {layer_points['status']}.")
 904            else:
 905                return {}
 906
 907        layer_ds_raw = xr.Dataset(
 908            {k: (['gps_time'], v) for k, v in layer_points['data'].items() if k != 'gps_time'},
 909            coords={'gps_time': layer_points['data']['gps_time']}
 910        )
 911        # Split into a dictionary of layers based on lyr_id
 912        layer_ids = set(layer_ds_raw['lyr_id'].to_numpy())
 913        layer_ids = [int(layer_id) for layer_id in layer_ids if not np.isnan(layer_id)]
 914
 915        # Get the mapping of layer IDs to names
 916        if self.db_layers_metadata is None:
 917            layer_metadata = ops_api.get_layer_metadata()
 918            if layer_metadata['status'] == 1:
 919                df = pd.DataFrame(layer_metadata['data'])
 920                df['display_name'] = df['lyr_group_name'] + ":" + df['lyr_name']
 921                self.db_layers_metadata = df
 922            else:
 923                raise ValueError("Failed to fetch layer metadata from OPS API.")
 924
 925        layers = {}
 926        for layer_id in layer_ids:
 927            layer_name = self.db_layers_metadata.loc[self.db_layers_metadata['lyr_id'] == layer_id, 'display_name'].values[0]
 928
 929            layer_ds = layer_ds_raw.where(layer_ds_raw['lyr_id'] == layer_id, drop=True)
 930
 931            layer_ds = layer_ds.sortby('gps_time')
 932
 933            layer_ds = layer_ds.rename({'gps_time': 'slow_time'})
 934            layer_ds = layer_ds.set_coords(['slow_time'])
 935
 936            layer_ds['slow_time'] = pd.to_datetime(layer_ds['slow_time'].values, unit='s')
 937
 938            # Filter to the same time range as flight
 939            layer_ds = self._trim_to_bounds(layer_ds, flight)
 940
 941            # Only add non-empty layers with at least some non-NaN data
 942            if layer_ds.sizes.get('slow_time', 0) > 0:
 943                if not layer_ds.to_array().isnull().all():
 944                    layers[layer_name] = layer_ds
 945
 946        return layers
 947
 948    def get_layers(self, ds : Union[xr.Dataset, dict], source: str = 'auto', include_geometry=True, errors='warn') -> dict:
 949        """
 950        Load layer picks from files or database.
 951
 952        Tries files first, falls back to database if source='auto'.
 953
 954        Parameters
 955        ----------
 956        ds : Union[xr.Dataset, dict]
 957            Radar frame Dataset or STAC item.
 958        source : {'auto', 'files', 'db'}, optional
 959            Source for layers. 'auto' tries files then falls back to database.
 960        include_geometry : bool, optional
 961            If True, include geometry information when fetching from the API.
 962        errors : str, optional
 963            How to handle missing layer data: 'warn' (default) returns None with a warning,
 964            'error' raises an exception.
 965
 966        Returns
 967        -------
 968        dict or None
 969            A dictionary mapping layer IDs to their corresponding data, or None if no layers
 970            found and errors='warn'.
 971        """
 972        collection, segment_path, _ = self._extract_segment_info(ds)
 973
 974        if source == 'auto':
 975            # Try to get layers from files first
 976            try:
 977                layers = self._get_layers_files(ds, raise_errors=True)
 978                return layers
 979            except Exception:
 980                # Fallback to API if no layers found in files
 981                try:
 982                    return self._get_layers_db(ds, include_geometry=include_geometry, raise_errors=True)
 983                except Exception as e:
 984                    if errors == 'error':
 985                        raise
 986                    else:
 987                        warnings.warn(f"No layer data found for {collection}/{segment_path}: {e}")
 988                        return None
 989        elif source == 'files':
 990            try:
 991                return self._get_layers_files(ds, raise_errors=True)
 992            except Exception as e:
 993                if errors == 'error':
 994                    raise
 995                else:
 996                    warnings.warn(f"No layer files found for {collection}/{segment_path}: {e}")
 997                    return None
 998        elif source == 'db':
 999            try:
1000                return self._get_layers_db(ds, include_geometry=include_geometry, raise_errors=True)
1001            except Exception as e:
1002                if errors == 'error':
1003                    raise
1004                else:
1005                    warnings.warn(f"No layer data in DB for {collection}/{segment_path}: {e}")
1006                    return None
1007        else:
1008            raise ValueError("Invalid source specified. Must be one of: 'auto', 'files', 'db'.")
class OPRConnection:
  77class OPRConnection:
  78    def __init__(self,
  79                 collection_url: str = "https://data.cresis.ku.edu/data/",
  80                 cache_dir: str = None,
  81                 stac_parquet_href: str = None,
  82                 sync_catalogs: bool = True):
  83        """
  84        Initialize connection to OPR data archive.
  85
  86        Parameters
  87        ----------
  88        collection_url : str, optional
  89            Base URL for OPR data collection.
  90        cache_dir : str, optional
  91            Directory to cache downloaded data for faster repeated access.
  92        stac_parquet_href : str, optional
  93            Path or URL pattern to STAC catalog parquet files.  When *None*
  94            (default), the local cache is used if available, falling back to
  95            the S3 catalog.
  96        sync_catalogs : bool, optional
  97            If True (default), sync OPR STAC catalogs to a local cache
  98            before resolving the catalog path.  Uses ETag-based change
  99            detection so repeated calls are cheap (single HTTP request).
 100        """
 101        self.collection_url = collection_url
 102        self.cache_dir = cache_dir
 103        self._user_set_href = stac_parquet_href is not None
 104        if sync_catalogs and not self._user_set_href:
 105            sync_opr_catalogs()
 106        self.stac_parquet_href = stac_parquet_href or get_opr_catalog_path()
 107
 108        self.fsspec_cache_kwargs = {}
 109        self.fsspec_url_prefix = ''
 110        if cache_dir:
 111            self.fsspec_cache_kwargs['cache_storage'] = cache_dir
 112            self.fsspec_cache_kwargs['check_files'] = True
 113            self.fsspec_url_prefix = 'filecache::'
 114
 115        self.db_layers_metadata = None # Cache for OPS database layers metadata
 116
 117        # Lazy-initialized DuckDB client for STAC queries — reusing a single
 118        # session caches remote file metadata (parquet footers) across calls
 119        self._duckdb_client = None
 120
 121    @property
 122    def duckdb_client(self):
 123        """Lazy-initialized DuckDB client, recreated after pickling."""
 124        if self._duckdb_client is None:
 125            self._duckdb_client = DuckdbClient()
 126        return self._duckdb_client
 127
 128    def __getstate__(self):
 129        state = self.__dict__.copy()
 130        state['_duckdb_client'] = None  # DuckdbClient can't be pickled
 131        return state
 132
 133    def _open_file(self, url):
 134        """Helper method to open files with appropriate caching."""
 135        if self.fsspec_url_prefix:
 136            return fsspec.open_local(f"{self.fsspec_url_prefix}{url}", filecache=self.fsspec_cache_kwargs)
 137        else:
 138            return fsspec.open_local(f"simplecache::{url}", simplecache=self.fsspec_cache_kwargs)
 139
 140    def _extract_segment_info(self, segment):
 141        """Extract collection, segment_path, and frame from Dataset or dict."""
 142        if isinstance(segment, xr.Dataset):
 143            return (segment.attrs.get('collection'),
 144                    segment.attrs.get('segment_path'),
 145                    segment.attrs.get('frame'))
 146        else:
 147            return (segment['collection'],
 148                    f"{segment['properties'].get('opr:date')}_{segment['properties'].get('opr:segment'):02d}",
 149                    segment['properties'].get('opr:frame'))
 150
 151    def query_frames(self, collections: list[str] = None, segment_paths: list[str] = None,
 152                     geometry = None, date_range: tuple = None, properties: dict = {},
 153                     max_items: int = None, exclude_geometry: bool = False,
 154                     search_kwargs: dict = {}) -> gpd.GeoDataFrame:
 155        """
 156        Query STAC catalog for radar frames matching search criteria.
 157
 158        Multiple parameters are combined with AND logic. Lists of values passed to a
 159        single parameter are treated with OR logic (any value matches).
 160
 161        Parameters
 162        ----------
 163        collections : list[str] or str, optional
 164            Collection name(s) (e.g., "2022_Antarctica_BaslerMKB").
 165        segment_paths : list[str] or str, optional
 166            Segment path(s) in format "YYYYMMDD_NN" (e.g., "20230126_01").
 167        geometry : shapely geometry or dict, optional
 168            Geospatial geometry to filter by intersection.
 169        date_range : str, optional
 170            Date range in ISO format (e.g., "2021-01-01/2025-06-01").
 171        properties : dict, optional
 172            Additional STAC properties to filter by.
 173        max_items : int, optional
 174            Maximum number of items to return.
 175        exclude_geometry : bool, optional
 176            If True, exclude geometry field to reduce response size.
 177        search_kwargs : dict, optional
 178            Additional keyword arguments for STAC search.
 179
 180        Returns
 181        -------
 182        gpd.GeoDataFrame or None
 183            GeoDataFrame of matching STAC items with columns for collection,
 184            geometry, properties, and assets. Returns None if no matches found.
 185        """
 186
 187        search_params = {}
 188
 189        # Exclude geometry -- do not return the geometry field to reduce response size
 190        if exclude_geometry:
 191            search_params['exclude'] = ['geometry']
 192
 193        # Handle collections (seasons)
 194        if collections is not None:
 195            search_params['collections'] = [collections] if isinstance(collections, str) else collections
 196
 197        # Handle geometry filtering
 198        if geometry is not None:
 199            if hasattr(geometry, '__geo_interface__'):
 200                geometry = geometry.__geo_interface__
 201
 202            # Fix geometries that cross the antimeridian
 203            geometry = antimeridian.fix_geojson(geometry, reverse=True)
 204
 205            search_params['intersects'] = geometry
 206
 207        # Handle date range filtering
 208        if date_range is not None:
 209            search_params['datetime'] = date_range
 210
 211        # Handle max_items
 212        if max_items is not None:
 213            search_params['limit'] = max_items
 214
 215        # Handle segment_paths filtering using CQL2
 216        filter_conditions = []
 217
 218        if segment_paths is not None:
 219            segment_paths = [segment_paths] if isinstance(segment_paths, str) else segment_paths
 220
 221            # Create OR conditions for segment paths
 222            segment_conditions = []
 223            for segment_path in segment_paths:
 224                try:
 225                    date_str, segment_num_str = segment_path.split('_')
 226                    segment_num = int(segment_num_str)
 227
 228                    # Create AND condition for this specific segment
 229                    segment_condition = {
 230                        "op": "and",
 231                        "args": [
 232                            {
 233                                "op": "=",
 234                                "args": [{"property": "opr:date"}, date_str]
 235                            },
 236                            {
 237                                "op": "=",
 238                                "args": [{"property": "opr:segment"}, segment_num]
 239                            }
 240                        ]
 241                    }
 242                    segment_conditions.append(segment_condition)
 243                except ValueError:
 244                    print(f"Warning: Invalid segment_path format '{segment_path}'. Expected format: YYYYMMDD_NN")
 245                    continue
 246
 247            if segment_conditions:
 248                if len(segment_conditions) == 1:
 249                    filter_conditions.append(segment_conditions[0])
 250                else:
 251                    # Multiple segments - combine with OR
 252                    filter_conditions.append({
 253                        "op": "or",
 254                        "args": segment_conditions
 255                    })
 256
 257        # Add any additional property filters
 258        for key, value in properties.items():
 259            filter_conditions.append({
 260                "op": "=",
 261                "args": [{"property": key}, value]
 262            })
 263
 264        # Combine all filter conditions with AND
 265        if filter_conditions:
 266            if len(filter_conditions) == 1:
 267                filter_expr = filter_conditions[0]
 268            else:
 269                filter_expr = {
 270                    "op": "and",
 271                    "args": filter_conditions
 272                }
 273
 274            search_params['filter'] = filter_expr
 275
 276        # Add any extra kwargs to search
 277        search_params.update(search_kwargs)
 278
 279        #print(search_params) # TODO: Remove
 280
 281        # Perform the search
 282        # from rustac import DuckdbClient
 283        items = self.duckdb_client.search(self.stac_parquet_href, **search_params)
 284        if isinstance(items, dict):
 285            items = items['features']
 286
 287        if not items or len(items) == 0:
 288            warnings.warn("No items found matching the query criteria", UserWarning)
 289            return None
 290
 291        # Convert to GeoDataFrame
 292        items_df = gpd.GeoDataFrame(items)
 293        # Set index
 294        items_df = items_df.set_index(items_df['id'])
 295        items_df.index.name = 'stac_item_id'
 296        # Set the geometry column
 297        if 'geometry' in items_df.columns and not exclude_geometry:
 298            items_df = items_df.set_geometry(items_df['geometry'].apply(shapely.geometry.shape))
 299            items_df.crs = "EPSG:4326"
 300
 301        # Reorder the columns, leaving any extra columns at the end
 302        desired_order = ['collection', 'geometry', 'properties', 'assets']
 303        items_df = items_df[[col for col in desired_order if col in items_df.columns] + list(items_df.columns.difference(desired_order))]
 304
 305        return items_df
 306
 307    def load_frames(self, stac_items: gpd.GeoDataFrame,
 308                    data_product: str = "CSARP_standard",
 309                    merge_flights: bool = False,
 310                    skip_errors: bool = False,
 311                    ) -> Union[list[xr.Dataset], xr.Dataset]:
 312        """
 313        Load multiple radar frames from STAC items.
 314
 315        Parameters
 316        ----------
 317        stac_items : gpd.GeoDataFrame
 318            STAC items returned from query_frames.
 319        data_product : str, optional
 320            Data product to load (e.g., "CSARP_standard", "CSARP_qlook").
 321        merge_flights : bool, optional
 322            If True, merge frames from the same segment into single Datasets.
 323        skip_errors : bool, optional
 324            If True, skip failed frames and continue loading others.
 325
 326        Returns
 327        -------
 328        list[xr.Dataset] or xr.Dataset
 329            List of radar frame Datasets, or list of merged Datasets if
 330            merge_flights=True.
 331        """
 332        frames = []
 333
 334        for idx, item in stac_items.iterrows():
 335            try:
 336                frame = self.load_frame(item, data_product)
 337                frames.append(frame)
 338            except Exception as e:
 339                print(f"Error loading frame for item {item.get('id', 'unknown')}: {e}")
 340                if skip_errors:
 341                    continue
 342                else:
 343                    raise e
 344
 345        if merge_flights:
 346            return opr_tools.merge_frames(frames)
 347        else:
 348            return frames
 349
 350    def load_frame(self, stac_item, data_product: str = "CSARP_standard") -> xr.Dataset:
 351        """
 352        Load a single radar frame from a STAC item.
 353
 354        Parameters
 355        ----------
 356        stac_item : dict or pd.Series
 357            STAC item containing asset URLs.
 358        data_product : str, optional
 359            Data product to load (e.g., "CSARP_standard", "CSARP_qlook").
 360
 361        Returns
 362        -------
 363        xr.Dataset
 364            Radar frame with coordinates slow_time and twtt, and data variables
 365            including Data, Latitude, Longitude, Elevation, Surface, etc.
 366        """
 367
 368        assets = stac_item['assets']
 369
 370        # Get the data asset
 371        data_asset = assets.get(data_product)
 372        if not data_asset:
 373            available_assets = list(assets.keys())
 374            raise ValueError(f"No {data_product} asset found. Available assets: {available_assets}")
 375
 376        # Get the URL from the asset
 377        url = data_asset.get('href')
 378        if not url:
 379            raise ValueError(f"No href found in {data_product} asset")
 380
 381        # Load the frame using the existing method
 382        return self.load_frame_url(url)
 383
 384    def load_frame_url(self, url: str) -> xr.Dataset:
 385        """
 386        Load a radar frame directly from a URL.
 387
 388        Automatically detects and handles both HDF5 and legacy MATLAB formats.
 389
 390        Parameters
 391        ----------
 392        url : str
 393            URL to radar frame data file (.mat).
 394
 395        Returns
 396        -------
 397        xr.Dataset
 398            Radar frame with CF-compliant metadata and citation information.
 399        """
 400
 401        file = self._open_file(url)
 402
 403        filetype = None
 404        try:
 405            ds = self._load_frame_hdf5(file)
 406            filetype = 'hdf5'
 407        except OSError:
 408            ds = self._load_frame_matlab(file)
 409            filetype = 'matlab'
 410
 411        # Add the source URL as an attribute
 412        ds.attrs['source_url'] = url
 413
 414        # Apply CF-compliant attributes
 415        ds = apply_cf_compliant_attrs(ds)
 416
 417        # Get the season and segment from the URL
 418        match = re.search(r'(\d{4}_\w+_[A-Za-z0-9]+)\/([\w_]+)\/[\d_]+\/[\w]+(\d{8}_\d{2}_\d{3})', url)
 419        if match:
 420            collection, data_product, granule = match.groups()
 421            date, segment_id, frame_id = granule.split('_')
 422            ds.attrs['collection'] = collection
 423            ds.attrs['data_product'] = data_product
 424            ds.attrs['granule'] = granule
 425            ds.attrs['segment_path'] = f"{date}_{segment_id}"
 426            ds.attrs['date_str'] = date
 427            ds.attrs['segment'] = int(segment_id)
 428            ds.attrs['frame'] = int(frame_id)
 429
 430            # Load citation information
 431            result = ops_api.get_segment_metadata(segment_name=ds.attrs['segment_path'], season_name=collection)
 432            if result:
 433                if isinstance(result['data'], str):
 434                    warnings.warn(f"Warning: Unexpected result from ops_api: {result['data']}", UserWarning)
 435                else:
 436                    result_data = {}
 437                    for key, value in result['data'].items():
 438                        if len(value) == 1:
 439                            result_data[key] = value[0]
 440                        elif len(value) > 1:
 441                            result_data[key] = set(value)
 442
 443                    if 'dois' in result_data:
 444                        ds.attrs['doi'] = result_data['dois']
 445                    if 'rors' in result_data:
 446                        ds.attrs['ror'] = result_data['rors']
 447                    if 'funding_sources' in result_data:
 448                        ds.attrs['funder_text'] = result_data['funding_sources']
 449
 450        # Add the rest of the Matlab parameters
 451        if filetype == 'hdf5':
 452            ds.attrs['mimetype'] = 'application/x-hdf5'
 453            ds.attrs.update(decode_hdf5_matlab_variable(h5py.File(file, 'r'),
 454                                                        skip_variables=True,
 455                                                        skip_errors=True))
 456        elif filetype == 'matlab':
 457            ds.attrs['mimetype'] = 'application/x-matlab-data'
 458            ds.attrs.update(extract_legacy_mat_attributes(file,
 459                                                          skip_keys=ds.keys(),
 460                                                          skip_errors=True))
 461
 462        return ds
 463
 464    def _load_frame_hdf5(self, file) -> xr.Dataset:
 465        """
 466        Load a radar frame from an HDF5 file.
 467
 468        Parameters
 469        ----------
 470        file :
 471            The path to the HDF5 file containing radar frame data.
 472
 473        Returns
 474        -------
 475        xr.Dataset
 476            The loaded radar frame as an xarray Dataset.
 477        """
 478
 479        ds = xr.open_dataset(file, engine='h5netcdf', phony_dims='sort')
 480
 481        # Re-arrange variables to provide useful dimensions and coordinates
 482
 483        ds = ds.squeeze() # Drop the singleton dimensions matlab adds
 484
 485        ds = ds.rename({ # Label the dimensions with more useful names
 486            ds.Data.dims[0]: 'slow_time_idx',
 487            ds.Data.dims[1]: 'twtt_idx',
 488        })
 489
 490        # Make variables with no dimensions into scalar attributes
 491        for var in ds.data_vars:
 492            if ds[var].ndim == 0:
 493                ds.attrs[var] = ds[var].item()
 494                ds = ds.drop_vars(var)
 495
 496        # Make the file_type an attribute
 497        if 'file_type' in ds.data_vars:
 498            ds.attrs['file_type'] = ds['file_type'].to_numpy()
 499            ds = ds.drop_vars('file_type')
 500
 501        # Name the two time coordinates
 502        ds = ds.rename({'Time': 'twtt', 'GPS_time': 'slow_time'})
 503        ds = ds.set_coords(['slow_time', 'twtt'])
 504
 505        slow_time_1d = pd.to_datetime(ds['slow_time'].values, unit='s')
 506        ds = ds.assign_coords(slow_time=('slow_time_idx', slow_time_1d))
 507
 508        # Make twtt and slow_time the indexing coordinates
 509        ds = ds.swap_dims({'twtt_idx': 'twtt'})
 510        ds = ds.swap_dims({'slow_time_idx': 'slow_time'})
 511
 512        return ds
 513
 514    def _load_frame_matlab(self, file) -> xr.Dataset:
 515        """
 516        Load a radar frame from a MATLAB file.
 517
 518        Parameters
 519        ----------
 520        file :
 521            The path to the MATLAB file containing radar frame data.
 522
 523        Returns
 524        -------
 525        xr.Dataset
 526            The loaded radar frame as an xarray Dataset.
 527        """
 528
 529        m = scipy.io.loadmat(file, mat_dtype=False)
 530
 531        key_dims = {
 532            'Time': ('twtt',),
 533            'GPS_time': ('slow_time',),
 534            'Latitude': ('slow_time',),
 535            'Longitude': ('slow_time',),
 536            'Elevation': ('slow_time',),
 537            'Roll': ('slow_time',),
 538            'Pitch': ('slow_time',),
 539            'Heading': ('slow_time',),
 540            'Surface': ('slow_time',),
 541            'Data': ('twtt', 'slow_time')
 542        }
 543
 544        ds = xr.Dataset(
 545            {
 546                key: (dims, np.squeeze(m[key])) for key, dims in key_dims.items() if key in m
 547            },
 548            coords={
 549                'twtt': ('twtt', np.squeeze(m['Time'])),
 550                'slow_time': ('slow_time', pd.to_datetime(np.squeeze(m['GPS_time']), unit='s')),
 551            }
 552        )
 553
 554        return ds
 555
 556    def get_collections(self) -> list:
 557        """
 558        Get all available collections (seasons/campaigns) in STAC catalog.
 559
 560        Returns
 561        -------
 562        list[dict]
 563            List of collection dictionaries with id, description, and extent.
 564        """
 565
 566        return self.duckdb_client.get_collections(self.stac_parquet_href)
 567
 568    def get_segments(self, collection_id: str) -> list:
 569        """
 570        Get all segments (flights) within a collection.
 571
 572        Parameters
 573        ----------
 574        collection_id : str
 575            STAC collection ID (e.g., "2022_Antarctica_BaslerMKB").
 576
 577        Returns
 578        -------
 579        list[dict]
 580            List of segment dictionaries with segment_path, date, flight_number,
 581            and frame count.
 582        """
 583        # Query STAC API for all items in collection (exclude geometry for better performance)
 584        items = self.query_frames(collections=[collection_id], exclude_geometry=True)
 585
 586        if items is None or len(items) == 0:
 587            print(f"No items found in collection '{collection_id}'")
 588            return []
 589
 590        # Group items by segment (opr:date + opr:segment)
 591        segments = {}
 592        for idx, item in items.iterrows():
 593            properties = item['properties']
 594            date = properties['opr:date']
 595            flight_num = properties['opr:segment']
 596
 597            if date and flight_num is not None:
 598                segment_path = f"{date}_{flight_num:02d}"
 599
 600                if segment_path not in segments:
 601                    segments[segment_path] = {
 602                        'segment_path': segment_path,
 603                        'date': date,
 604                        'flight_number': flight_num,
 605                        'collection': collection_id,
 606                        'frames': [],
 607                        'item_count': 0
 608                    }
 609
 610                segments[segment_path]['frames'].append(properties.get('opr:frame'))
 611                segments[segment_path]['item_count'] += 1
 612
 613        # Sort segments by date and flight number
 614        segment_list = list(segments.values())
 615        segment_list.sort(key=lambda x: (x['date'], x['flight_number']))
 616
 617        return segment_list
 618
 619    def _get_layers_files(self, segment: Union[xr.Dataset, dict], raise_errors=True) -> dict:
 620        """
 621        Fetch layers from the CSARP_layers files
 622
 623        See https://gitlab.com/openpolarradar/opr/-/wikis/Layer-File-Guide for file formats
 624
 625        Parameters
 626        ----------
 627        segment : xr.Dataset or dict
 628            Radar frame Dataset or STAC item.
 629        raise_errors : bool, optional
 630            If True, raise errors when layers cannot be found.
 631
 632        Returns
 633        -------
 634        dict
 635            Dictionary mapping layer names (e.g., "standard:surface",
 636            "standard:bottom") to layer Datasets.
 637        """
 638        collection, segment_path, frame = self._extract_segment_info(segment)
 639
 640        # If we already have a STAC item, just use it. Otherwise, query to find the matching STAC items
 641        if isinstance(segment, xr.Dataset):
 642            properties = {}
 643            if frame:
 644                properties['opr:frame'] = frame
 645
 646            # Query STAC collection for CSARP_layer files matching this specific segment
 647
 648            # Get items from this specific segment
 649            stac_items = self.query_frames(collections=[collection], segment_paths=[segment_path], properties=properties)
 650
 651            # Filter for items that have CSARP_layer assets
 652            layer_items = []
 653            for idx, item in stac_items.iterrows():
 654                if 'CSARP_layer' in item['assets']:
 655                    layer_items.append(item)
 656        else:
 657            layer_items = [segment] if 'CSARP_layer' in segment.get('assets', {}) else []
 658
 659        if not layer_items:
 660            if raise_errors:
 661                raise ValueError(f"No CSARP_layer files found for segment path {segment_path} in collection {collection}")
 662            else:
 663                return {}
 664
 665        # Load each layer file and combine them
 666        layer_frames = []
 667        for item in layer_items:
 668            layer_asset = item['assets']['CSARP_layer']
 669            if layer_asset and 'href' in layer_asset:
 670                url = layer_asset['href']
 671                try:
 672                    layer_ds = self.load_layers_file(url)
 673                    layer_frames.append(layer_ds)
 674                except Exception as e:
 675                    raise e # TODO
 676                    print(f"Warning: Failed to load layer file {url}: {e}")
 677                    continue
 678
 679        if not layer_frames:
 680            if raise_errors:
 681                raise ValueError(f"No valid CSARP_layer files could be loaded for segment {segment_path} in collection {collection}")
 682            else:
 683                return {}
 684
 685        # Concatenate all layer frames along slow_time dimension
 686        layers_segment = xr.concat(layer_frames, dim='slow_time', combine_attrs='drop_conflicts', data_vars='minimal')
 687        layers_segment = layers_segment.sortby('slow_time')
 688
 689        # Trim to bounds of the original dataset
 690        layers_segment = self._trim_to_bounds(layers_segment, segment)
 691
 692        # Find the layer organization file to map layer IDs to names
 693        # Layer organization files are one per directory. For example, the layer file:
 694        # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/Data_20180105_03_003.mat
 695        # would have a layer organization file at:
 696        # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/layer_20180105_03.mat
 697
 698        layer_organization_file_url = re.sub(r'Data_(\d{8}_\d{2})_\d{3}.mat', r'layer_\1.mat', url)
 699        layer_organization_ds = self._load_layer_organization(self._open_file(layer_organization_file_url))
 700
 701        # Split into separate layers by ID
 702        layers = {}
 703
 704        layer_ids = np.unique(layers_segment['layer'])
 705
 706        for i, layer_id in enumerate(layer_ids):
 707            layer_id_int = int(layer_id)
 708            layer_name = str(layer_organization_ds['lyr_name'].sel(lyr_id=layer_id_int).values.item().squeeze())
 709            layer_group = str(layer_organization_ds['lyr_group_name'].sel(lyr_id=layer_id_int).values.item().squeeze())
 710            if layer_group == '[]':
 711                layer_group = ''
 712            layer_display_name = f"{layer_group}:{layer_name}"
 713
 714            layer_ds = layers_segment.sel(layer=layer_id).drop_vars('layer')
 715            # Only add non-empty layers with at least some non-NaN data
 716            if layer_ds.sizes.get('slow_time', 0) > 0:
 717                if not layer_ds.to_array().isnull().all():
 718                    layers[layer_display_name] = layer_ds
 719
 720        return layers
 721
 722    def _trim_to_bounds(self, ds: xr.Dataset, ref: Union[xr.Dataset, dict]) -> xr.Dataset:
 723        start_time, end_time = None, None
 724        if isinstance(ref, xr.Dataset) and 'slow_time' in ref.coords:
 725            start_time = ref['slow_time'].min()
 726            end_time = ref['slow_time'].max()
 727        else:
 728            properties = ref.get('properties', {})
 729            if 'start_datetime' in properties and 'end_datetime' in properties:
 730                start_time = pd.to_datetime(properties['start_datetime'])
 731                end_time = pd.to_datetime(properties['end_datetime'])
 732
 733        if start_time:
 734            return ds.sel(slow_time=slice(start_time, end_time))
 735        else:
 736            return ds
 737
 738    def load_layers_file(self, url: str) -> xr.Dataset:
 739        """
 740        Load layer data from CSARP_layer file.
 741
 742        Parameters
 743        ----------
 744        url : str
 745            URL or path to layer file.
 746
 747        Returns
 748        -------
 749        xr.Dataset
 750            Layer data with coordinates slow_time and layer, and data variables
 751            twtt, quality, type, lat, lon, and elev.
 752            Layer data in a standardized format with coordinates:
 753            - slow_time: GPS time converted to datetime
 754            And data variables:
 755            - twtt: Two-way travel time for each layer
 756            - quality: Quality values for each layer
 757            - type: Type values for each layer
 758            - lat, lon, elev: Geographic coordinates
 759            - id: Layer IDs
 760        """
 761
 762        file = self._open_file(url)
 763
 764        ds = self._load_layers(file)
 765
 766        # Add the source URL as an attribute
 767        ds.attrs['source_url'] = url
 768
 769        # Apply common manipulations to match the expected structure
 770        # Convert GPS time to datetime coordinate
 771        if 'gps_time' in ds.variables:
 772            slow_time_dt = pd.to_datetime(ds['gps_time'].values, unit='s')
 773            ds = ds.assign_coords(slow_time=('slow_time', slow_time_dt))
 774
 775            # Set slow_time as the main coordinate and remove gps_time from data_vars
 776            if 'slow_time' not in ds.dims:
 777                ds = ds.swap_dims({'gps_time': 'slow_time'})
 778
 779            # Remove gps_time from data_vars if it exists there to avoid conflicts
 780            if ('gps_time' in ds.data_vars) or ('gps_time' in ds.coords):
 781                ds = ds.drop_vars('gps_time')
 782
 783        # Sort by slow_time if it exists
 784        if 'slow_time' in ds.coords:
 785            ds = ds.sortby('slow_time')
 786
 787        return ds
 788
 789    def _load_layers(self, file) -> xr.Dataset:
 790        """
 791        Load layer data file using hdf5storage
 792
 793        Parameters:
 794        file : str
 795            Path to the layer file
 796        Returns: xr.Dataset
 797        """
 798
 799        d = hdf5storage.loadmat(file, appendmat=False)
 800
 801        # Ensure 'id' is at least 1D for the layer dimension
 802        id_data = np.atleast_1d(d['id'].squeeze())
 803
 804        # For 2D arrays with (layer, slow_time) dimensions, ensure they remain 2D
 805        # even when there's only one layer
 806        quality_data = np.atleast_2d(d['quality'].squeeze())
 807        if quality_data.shape[0] == 1 or quality_data.ndim == 1:
 808            quality_data = quality_data.reshape(len(id_data), -1)
 809
 810        twtt_data = np.atleast_2d(d['twtt'].squeeze())
 811        if twtt_data.shape[0] == 1 or twtt_data.ndim == 1:
 812            twtt_data = twtt_data.reshape(len(id_data), -1)
 813
 814        type_data = np.atleast_2d(d['type'].squeeze())
 815        if type_data.shape[0] == 1 or type_data.ndim == 1:
 816            type_data = type_data.reshape(len(id_data), -1)
 817
 818        ds = xr.Dataset({
 819            'file_type': ((), d['file_type'].squeeze()),
 820            'file_version': ((), d['file_version'].squeeze()),
 821            'elev': (('slow_time',), d['elev'].squeeze()),
 822            #'gps_source': ((), d['gps_source'].squeeze()),
 823            'gps_time': (('slow_time',), d['gps_time'].squeeze()),
 824            'id': (('layer',), id_data),
 825            'lat': (('slow_time',), d['lat'].squeeze()),
 826            'lon': (('slow_time',), d['lon'].squeeze()),
 827            'quality': (('layer', 'slow_time'), quality_data),
 828            'twtt': (('layer', 'slow_time'), twtt_data),
 829            'type': (('layer', 'slow_time'), type_data),
 830        })
 831
 832        ds = ds.assign_coords({'layer': ds['id'], 'slow_time': ds['gps_time']})
 833
 834        return ds
 835
 836    def _load_layer_organization(self, file) -> xr.Dataset:
 837        """
 838        Load a layer organization file
 839
 840        Parameters
 841        ----------
 842        file : str
 843            Path to the HDF5 layer organization file
 844
 845        Returns
 846        -------
 847        xr.Dataset
 848            Raw layer data from HDF5 file
 849        """
 850        d = hdf5storage.loadmat(file, appendmat=False)
 851        ds = xr.Dataset({
 852            'file_type': ((), d['file_type'].squeeze()),
 853            'file_version': ((), d['file_version'].squeeze()),
 854            'lyr_age': (('lyr_id',), np.atleast_1d(d['lyr_age'].squeeze())),
 855            'lyr_age_source': (('lyr_id',), np.atleast_1d(d['lyr_age_source'].squeeze())),
 856            'lyr_desc': (('lyr_id',), np.atleast_1d(d['lyr_desc'].squeeze())),
 857            'lyr_group_name': (('lyr_id',), np.atleast_1d(d['lyr_group_name'].squeeze())),
 858            'lyr_id': (('lyr_id',), np.atleast_1d(d['lyr_id'].squeeze())),
 859            'lyr_name': (('lyr_id',), np.atleast_1d(d['lyr_name'].squeeze())),
 860            'lyr_order': (('lyr_id',), np.atleast_1d(d['lyr_order'].squeeze())),
 861            }, attrs={'param': d['param']})
 862
 863        ds = ds.set_index(lyr_id='lyr_id')
 864
 865        return ds
 866
 867    def _get_layers_db(self, flight: Union[xr.Dataset, dict], include_geometry=True, raise_errors=True) -> dict:
 868        """
 869        Load layer picks from OPS database API.
 870
 871        Parameters
 872        ----------
 873        flight : xr.Dataset or dict
 874            Radar frame Dataset or STAC item.
 875        include_geometry : bool, optional
 876            If True, include geometry information in layers.
 877        raise_errors : bool, optional
 878            If True, raise errors when layers cannot be found.
 879
 880        Returns
 881        -------
 882        dict
 883            Dictionary mapping layer names to layer Datasets.
 884        """
 885
 886        collection, segment_path, _ = self._extract_segment_info(flight)
 887
 888        if 'Antarctica' in collection:
 889            location = 'antarctic'
 890        elif 'Greenland' in collection:
 891            location = 'arctic'
 892        else:
 893            raise ValueError("Dataset does not belong to a recognized location (Antarctica or Greenland).")
 894
 895        layer_points = ops_api.get_layer_points(
 896            segment_name=segment_path,
 897            season_name=collection,
 898            location=location,
 899            include_geometry=include_geometry
 900        )
 901
 902        if layer_points['status'] != 1:
 903            if raise_errors:
 904                raise ValueError(f"Failed to fetch layer points. Received response with status {layer_points['status']}.")
 905            else:
 906                return {}
 907
 908        layer_ds_raw = xr.Dataset(
 909            {k: (['gps_time'], v) for k, v in layer_points['data'].items() if k != 'gps_time'},
 910            coords={'gps_time': layer_points['data']['gps_time']}
 911        )
 912        # Split into a dictionary of layers based on lyr_id
 913        layer_ids = set(layer_ds_raw['lyr_id'].to_numpy())
 914        layer_ids = [int(layer_id) for layer_id in layer_ids if not np.isnan(layer_id)]
 915
 916        # Get the mapping of layer IDs to names
 917        if self.db_layers_metadata is None:
 918            layer_metadata = ops_api.get_layer_metadata()
 919            if layer_metadata['status'] == 1:
 920                df = pd.DataFrame(layer_metadata['data'])
 921                df['display_name'] = df['lyr_group_name'] + ":" + df['lyr_name']
 922                self.db_layers_metadata = df
 923            else:
 924                raise ValueError("Failed to fetch layer metadata from OPS API.")
 925
 926        layers = {}
 927        for layer_id in layer_ids:
 928            layer_name = self.db_layers_metadata.loc[self.db_layers_metadata['lyr_id'] == layer_id, 'display_name'].values[0]
 929
 930            layer_ds = layer_ds_raw.where(layer_ds_raw['lyr_id'] == layer_id, drop=True)
 931
 932            layer_ds = layer_ds.sortby('gps_time')
 933
 934            layer_ds = layer_ds.rename({'gps_time': 'slow_time'})
 935            layer_ds = layer_ds.set_coords(['slow_time'])
 936
 937            layer_ds['slow_time'] = pd.to_datetime(layer_ds['slow_time'].values, unit='s')
 938
 939            # Filter to the same time range as flight
 940            layer_ds = self._trim_to_bounds(layer_ds, flight)
 941
 942            # Only add non-empty layers with at least some non-NaN data
 943            if layer_ds.sizes.get('slow_time', 0) > 0:
 944                if not layer_ds.to_array().isnull().all():
 945                    layers[layer_name] = layer_ds
 946
 947        return layers
 948
 949    def get_layers(self, ds : Union[xr.Dataset, dict], source: str = 'auto', include_geometry=True, errors='warn') -> dict:
 950        """
 951        Load layer picks from files or database.
 952
 953        Tries files first, falls back to database if source='auto'.
 954
 955        Parameters
 956        ----------
 957        ds : Union[xr.Dataset, dict]
 958            Radar frame Dataset or STAC item.
 959        source : {'auto', 'files', 'db'}, optional
 960            Source for layers. 'auto' tries files then falls back to database.
 961        include_geometry : bool, optional
 962            If True, include geometry information when fetching from the API.
 963        errors : str, optional
 964            How to handle missing layer data: 'warn' (default) returns None with a warning,
 965            'error' raises an exception.
 966
 967        Returns
 968        -------
 969        dict or None
 970            A dictionary mapping layer IDs to their corresponding data, or None if no layers
 971            found and errors='warn'.
 972        """
 973        collection, segment_path, _ = self._extract_segment_info(ds)
 974
 975        if source == 'auto':
 976            # Try to get layers from files first
 977            try:
 978                layers = self._get_layers_files(ds, raise_errors=True)
 979                return layers
 980            except Exception:
 981                # Fallback to API if no layers found in files
 982                try:
 983                    return self._get_layers_db(ds, include_geometry=include_geometry, raise_errors=True)
 984                except Exception as e:
 985                    if errors == 'error':
 986                        raise
 987                    else:
 988                        warnings.warn(f"No layer data found for {collection}/{segment_path}: {e}")
 989                        return None
 990        elif source == 'files':
 991            try:
 992                return self._get_layers_files(ds, raise_errors=True)
 993            except Exception as e:
 994                if errors == 'error':
 995                    raise
 996                else:
 997                    warnings.warn(f"No layer files found for {collection}/{segment_path}: {e}")
 998                    return None
 999        elif source == 'db':
1000            try:
1001                return self._get_layers_db(ds, include_geometry=include_geometry, raise_errors=True)
1002            except Exception as e:
1003                if errors == 'error':
1004                    raise
1005                else:
1006                    warnings.warn(f"No layer data in DB for {collection}/{segment_path}: {e}")
1007                    return None
1008        else:
1009            raise ValueError("Invalid source specified. Must be one of: 'auto', 'files', 'db'.")
OPRConnection( collection_url: str = 'https://data.cresis.ku.edu/data/', cache_dir: str = None, stac_parquet_href: str = None, sync_catalogs: bool = True)
 78    def __init__(self,
 79                 collection_url: str = "https://data.cresis.ku.edu/data/",
 80                 cache_dir: str = None,
 81                 stac_parquet_href: str = None,
 82                 sync_catalogs: bool = True):
 83        """
 84        Initialize connection to OPR data archive.
 85
 86        Parameters
 87        ----------
 88        collection_url : str, optional
 89            Base URL for OPR data collection.
 90        cache_dir : str, optional
 91            Directory to cache downloaded data for faster repeated access.
 92        stac_parquet_href : str, optional
 93            Path or URL pattern to STAC catalog parquet files.  When *None*
 94            (default), the local cache is used if available, falling back to
 95            the S3 catalog.
 96        sync_catalogs : bool, optional
 97            If True (default), sync OPR STAC catalogs to a local cache
 98            before resolving the catalog path.  Uses ETag-based change
 99            detection so repeated calls are cheap (single HTTP request).
100        """
101        self.collection_url = collection_url
102        self.cache_dir = cache_dir
103        self._user_set_href = stac_parquet_href is not None
104        if sync_catalogs and not self._user_set_href:
105            sync_opr_catalogs()
106        self.stac_parquet_href = stac_parquet_href or get_opr_catalog_path()
107
108        self.fsspec_cache_kwargs = {}
109        self.fsspec_url_prefix = ''
110        if cache_dir:
111            self.fsspec_cache_kwargs['cache_storage'] = cache_dir
112            self.fsspec_cache_kwargs['check_files'] = True
113            self.fsspec_url_prefix = 'filecache::'
114
115        self.db_layers_metadata = None # Cache for OPS database layers metadata
116
117        # Lazy-initialized DuckDB client for STAC queries — reusing a single
118        # session caches remote file metadata (parquet footers) across calls
119        self._duckdb_client = None

Initialize connection to OPR data archive.

Parameters
  • collection_url (str, optional): Base URL for OPR data collection.
  • cache_dir (str, optional): Directory to cache downloaded data for faster repeated access.
  • stac_parquet_href (str, optional): Path or URL pattern to STAC catalog parquet files. When None (default), the local cache is used if available, falling back to the S3 catalog.
  • sync_catalogs (bool, optional): If True (default), sync OPR STAC catalogs to a local cache before resolving the catalog path. Uses ETag-based change detection so repeated calls are cheap (single HTTP request).
collection_url
cache_dir
stac_parquet_href
fsspec_cache_kwargs
fsspec_url_prefix
db_layers_metadata
duckdb_client
121    @property
122    def duckdb_client(self):
123        """Lazy-initialized DuckDB client, recreated after pickling."""
124        if self._duckdb_client is None:
125            self._duckdb_client = DuckdbClient()
126        return self._duckdb_client

Lazy-initialized DuckDB client, recreated after pickling.

def query_frames( self, collections: list[str] = None, segment_paths: list[str] = None, geometry=None, date_range: tuple = None, properties: dict = {}, max_items: int = None, exclude_geometry: bool = False, search_kwargs: dict = {}) -> geopandas.geodataframe.GeoDataFrame:
151    def query_frames(self, collections: list[str] = None, segment_paths: list[str] = None,
152                     geometry = None, date_range: tuple = None, properties: dict = {},
153                     max_items: int = None, exclude_geometry: bool = False,
154                     search_kwargs: dict = {}) -> gpd.GeoDataFrame:
155        """
156        Query STAC catalog for radar frames matching search criteria.
157
158        Multiple parameters are combined with AND logic. Lists of values passed to a
159        single parameter are treated with OR logic (any value matches).
160
161        Parameters
162        ----------
163        collections : list[str] or str, optional
164            Collection name(s) (e.g., "2022_Antarctica_BaslerMKB").
165        segment_paths : list[str] or str, optional
166            Segment path(s) in format "YYYYMMDD_NN" (e.g., "20230126_01").
167        geometry : shapely geometry or dict, optional
168            Geospatial geometry to filter by intersection.
169        date_range : str, optional
170            Date range in ISO format (e.g., "2021-01-01/2025-06-01").
171        properties : dict, optional
172            Additional STAC properties to filter by.
173        max_items : int, optional
174            Maximum number of items to return.
175        exclude_geometry : bool, optional
176            If True, exclude geometry field to reduce response size.
177        search_kwargs : dict, optional
178            Additional keyword arguments for STAC search.
179
180        Returns
181        -------
182        gpd.GeoDataFrame or None
183            GeoDataFrame of matching STAC items with columns for collection,
184            geometry, properties, and assets. Returns None if no matches found.
185        """
186
187        search_params = {}
188
189        # Exclude geometry -- do not return the geometry field to reduce response size
190        if exclude_geometry:
191            search_params['exclude'] = ['geometry']
192
193        # Handle collections (seasons)
194        if collections is not None:
195            search_params['collections'] = [collections] if isinstance(collections, str) else collections
196
197        # Handle geometry filtering
198        if geometry is not None:
199            if hasattr(geometry, '__geo_interface__'):
200                geometry = geometry.__geo_interface__
201
202            # Fix geometries that cross the antimeridian
203            geometry = antimeridian.fix_geojson(geometry, reverse=True)
204
205            search_params['intersects'] = geometry
206
207        # Handle date range filtering
208        if date_range is not None:
209            search_params['datetime'] = date_range
210
211        # Handle max_items
212        if max_items is not None:
213            search_params['limit'] = max_items
214
215        # Handle segment_paths filtering using CQL2
216        filter_conditions = []
217
218        if segment_paths is not None:
219            segment_paths = [segment_paths] if isinstance(segment_paths, str) else segment_paths
220
221            # Create OR conditions for segment paths
222            segment_conditions = []
223            for segment_path in segment_paths:
224                try:
225                    date_str, segment_num_str = segment_path.split('_')
226                    segment_num = int(segment_num_str)
227
228                    # Create AND condition for this specific segment
229                    segment_condition = {
230                        "op": "and",
231                        "args": [
232                            {
233                                "op": "=",
234                                "args": [{"property": "opr:date"}, date_str]
235                            },
236                            {
237                                "op": "=",
238                                "args": [{"property": "opr:segment"}, segment_num]
239                            }
240                        ]
241                    }
242                    segment_conditions.append(segment_condition)
243                except ValueError:
244                    print(f"Warning: Invalid segment_path format '{segment_path}'. Expected format: YYYYMMDD_NN")
245                    continue
246
247            if segment_conditions:
248                if len(segment_conditions) == 1:
249                    filter_conditions.append(segment_conditions[0])
250                else:
251                    # Multiple segments - combine with OR
252                    filter_conditions.append({
253                        "op": "or",
254                        "args": segment_conditions
255                    })
256
257        # Add any additional property filters
258        for key, value in properties.items():
259            filter_conditions.append({
260                "op": "=",
261                "args": [{"property": key}, value]
262            })
263
264        # Combine all filter conditions with AND
265        if filter_conditions:
266            if len(filter_conditions) == 1:
267                filter_expr = filter_conditions[0]
268            else:
269                filter_expr = {
270                    "op": "and",
271                    "args": filter_conditions
272                }
273
274            search_params['filter'] = filter_expr
275
276        # Add any extra kwargs to search
277        search_params.update(search_kwargs)
278
279        #print(search_params) # TODO: Remove
280
281        # Perform the search
282        # from rustac import DuckdbClient
283        items = self.duckdb_client.search(self.stac_parquet_href, **search_params)
284        if isinstance(items, dict):
285            items = items['features']
286
287        if not items or len(items) == 0:
288            warnings.warn("No items found matching the query criteria", UserWarning)
289            return None
290
291        # Convert to GeoDataFrame
292        items_df = gpd.GeoDataFrame(items)
293        # Set index
294        items_df = items_df.set_index(items_df['id'])
295        items_df.index.name = 'stac_item_id'
296        # Set the geometry column
297        if 'geometry' in items_df.columns and not exclude_geometry:
298            items_df = items_df.set_geometry(items_df['geometry'].apply(shapely.geometry.shape))
299            items_df.crs = "EPSG:4326"
300
301        # Reorder the columns, leaving any extra columns at the end
302        desired_order = ['collection', 'geometry', 'properties', 'assets']
303        items_df = items_df[[col for col in desired_order if col in items_df.columns] + list(items_df.columns.difference(desired_order))]
304
305        return items_df

Query STAC catalog for radar frames matching search criteria.

Multiple parameters are combined with AND logic. Lists of values passed to a single parameter are treated with OR logic (any value matches).

Parameters
  • collections (list[str] or str, optional): Collection name(s) (e.g., "2022_Antarctica_BaslerMKB").
  • segment_paths (list[str] or str, optional): Segment path(s) in format "YYYYMMDD_NN" (e.g., "20230126_01").
  • geometry (shapely geometry or dict, optional): Geospatial geometry to filter by intersection.
  • date_range (str, optional): Date range in ISO format (e.g., "2021-01-01/2025-06-01").
  • properties (dict, optional): Additional STAC properties to filter by.
  • max_items (int, optional): Maximum number of items to return.
  • exclude_geometry (bool, optional): If True, exclude geometry field to reduce response size.
  • search_kwargs (dict, optional): Additional keyword arguments for STAC search.
Returns
  • gpd.GeoDataFrame or None: GeoDataFrame of matching STAC items with columns for collection, geometry, properties, and assets. Returns None if no matches found.
def load_frames( self, stac_items: geopandas.geodataframe.GeoDataFrame, data_product: str = 'CSARP_standard', merge_flights: bool = False, skip_errors: bool = False) -> Union[list[xarray.core.dataset.Dataset], xarray.core.dataset.Dataset]:
307    def load_frames(self, stac_items: gpd.GeoDataFrame,
308                    data_product: str = "CSARP_standard",
309                    merge_flights: bool = False,
310                    skip_errors: bool = False,
311                    ) -> Union[list[xr.Dataset], xr.Dataset]:
312        """
313        Load multiple radar frames from STAC items.
314
315        Parameters
316        ----------
317        stac_items : gpd.GeoDataFrame
318            STAC items returned from query_frames.
319        data_product : str, optional
320            Data product to load (e.g., "CSARP_standard", "CSARP_qlook").
321        merge_flights : bool, optional
322            If True, merge frames from the same segment into single Datasets.
323        skip_errors : bool, optional
324            If True, skip failed frames and continue loading others.
325
326        Returns
327        -------
328        list[xr.Dataset] or xr.Dataset
329            List of radar frame Datasets, or list of merged Datasets if
330            merge_flights=True.
331        """
332        frames = []
333
334        for idx, item in stac_items.iterrows():
335            try:
336                frame = self.load_frame(item, data_product)
337                frames.append(frame)
338            except Exception as e:
339                print(f"Error loading frame for item {item.get('id', 'unknown')}: {e}")
340                if skip_errors:
341                    continue
342                else:
343                    raise e
344
345        if merge_flights:
346            return opr_tools.merge_frames(frames)
347        else:
348            return frames

Load multiple radar frames from STAC items.

Parameters
  • stac_items (gpd.GeoDataFrame): STAC items returned from query_frames.
  • data_product (str, optional): Data product to load (e.g., "CSARP_standard", "CSARP_qlook").
  • merge_flights (bool, optional): If True, merge frames from the same segment into single Datasets.
  • skip_errors (bool, optional): If True, skip failed frames and continue loading others.
Returns
  • list[xr.Dataset] or xr.Dataset: List of radar frame Datasets, or list of merged Datasets if merge_flights=True.
def load_frame( self, stac_item, data_product: str = 'CSARP_standard') -> xarray.core.dataset.Dataset:
350    def load_frame(self, stac_item, data_product: str = "CSARP_standard") -> xr.Dataset:
351        """
352        Load a single radar frame from a STAC item.
353
354        Parameters
355        ----------
356        stac_item : dict or pd.Series
357            STAC item containing asset URLs.
358        data_product : str, optional
359            Data product to load (e.g., "CSARP_standard", "CSARP_qlook").
360
361        Returns
362        -------
363        xr.Dataset
364            Radar frame with coordinates slow_time and twtt, and data variables
365            including Data, Latitude, Longitude, Elevation, Surface, etc.
366        """
367
368        assets = stac_item['assets']
369
370        # Get the data asset
371        data_asset = assets.get(data_product)
372        if not data_asset:
373            available_assets = list(assets.keys())
374            raise ValueError(f"No {data_product} asset found. Available assets: {available_assets}")
375
376        # Get the URL from the asset
377        url = data_asset.get('href')
378        if not url:
379            raise ValueError(f"No href found in {data_product} asset")
380
381        # Load the frame using the existing method
382        return self.load_frame_url(url)

Load a single radar frame from a STAC item.

Parameters
  • stac_item (dict or pd.Series): STAC item containing asset URLs.
  • data_product (str, optional): Data product to load (e.g., "CSARP_standard", "CSARP_qlook").
Returns
  • xr.Dataset: Radar frame with coordinates slow_time and twtt, and data variables including Data, Latitude, Longitude, Elevation, Surface, etc.
def load_frame_url(self, url: str) -> xarray.core.dataset.Dataset:
384    def load_frame_url(self, url: str) -> xr.Dataset:
385        """
386        Load a radar frame directly from a URL.
387
388        Automatically detects and handles both HDF5 and legacy MATLAB formats.
389
390        Parameters
391        ----------
392        url : str
393            URL to radar frame data file (.mat).
394
395        Returns
396        -------
397        xr.Dataset
398            Radar frame with CF-compliant metadata and citation information.
399        """
400
401        file = self._open_file(url)
402
403        filetype = None
404        try:
405            ds = self._load_frame_hdf5(file)
406            filetype = 'hdf5'
407        except OSError:
408            ds = self._load_frame_matlab(file)
409            filetype = 'matlab'
410
411        # Add the source URL as an attribute
412        ds.attrs['source_url'] = url
413
414        # Apply CF-compliant attributes
415        ds = apply_cf_compliant_attrs(ds)
416
417        # Get the season and segment from the URL
418        match = re.search(r'(\d{4}_\w+_[A-Za-z0-9]+)\/([\w_]+)\/[\d_]+\/[\w]+(\d{8}_\d{2}_\d{3})', url)
419        if match:
420            collection, data_product, granule = match.groups()
421            date, segment_id, frame_id = granule.split('_')
422            ds.attrs['collection'] = collection
423            ds.attrs['data_product'] = data_product
424            ds.attrs['granule'] = granule
425            ds.attrs['segment_path'] = f"{date}_{segment_id}"
426            ds.attrs['date_str'] = date
427            ds.attrs['segment'] = int(segment_id)
428            ds.attrs['frame'] = int(frame_id)
429
430            # Load citation information
431            result = ops_api.get_segment_metadata(segment_name=ds.attrs['segment_path'], season_name=collection)
432            if result:
433                if isinstance(result['data'], str):
434                    warnings.warn(f"Warning: Unexpected result from ops_api: {result['data']}", UserWarning)
435                else:
436                    result_data = {}
437                    for key, value in result['data'].items():
438                        if len(value) == 1:
439                            result_data[key] = value[0]
440                        elif len(value) > 1:
441                            result_data[key] = set(value)
442
443                    if 'dois' in result_data:
444                        ds.attrs['doi'] = result_data['dois']
445                    if 'rors' in result_data:
446                        ds.attrs['ror'] = result_data['rors']
447                    if 'funding_sources' in result_data:
448                        ds.attrs['funder_text'] = result_data['funding_sources']
449
450        # Add the rest of the Matlab parameters
451        if filetype == 'hdf5':
452            ds.attrs['mimetype'] = 'application/x-hdf5'
453            ds.attrs.update(decode_hdf5_matlab_variable(h5py.File(file, 'r'),
454                                                        skip_variables=True,
455                                                        skip_errors=True))
456        elif filetype == 'matlab':
457            ds.attrs['mimetype'] = 'application/x-matlab-data'
458            ds.attrs.update(extract_legacy_mat_attributes(file,
459                                                          skip_keys=ds.keys(),
460                                                          skip_errors=True))
461
462        return ds

Load a radar frame directly from a URL.

Automatically detects and handles both HDF5 and legacy MATLAB formats.

Parameters
  • url (str): URL to radar frame data file (.mat).
Returns
  • xr.Dataset: Radar frame with CF-compliant metadata and citation information.
def get_collections(self) -> list:
556    def get_collections(self) -> list:
557        """
558        Get all available collections (seasons/campaigns) in STAC catalog.
559
560        Returns
561        -------
562        list[dict]
563            List of collection dictionaries with id, description, and extent.
564        """
565
566        return self.duckdb_client.get_collections(self.stac_parquet_href)

Get all available collections (seasons/campaigns) in STAC catalog.

Returns
  • list[dict]: List of collection dictionaries with id, description, and extent.
def get_segments(self, collection_id: str) -> list:
568    def get_segments(self, collection_id: str) -> list:
569        """
570        Get all segments (flights) within a collection.
571
572        Parameters
573        ----------
574        collection_id : str
575            STAC collection ID (e.g., "2022_Antarctica_BaslerMKB").
576
577        Returns
578        -------
579        list[dict]
580            List of segment dictionaries with segment_path, date, flight_number,
581            and frame count.
582        """
583        # Query STAC API for all items in collection (exclude geometry for better performance)
584        items = self.query_frames(collections=[collection_id], exclude_geometry=True)
585
586        if items is None or len(items) == 0:
587            print(f"No items found in collection '{collection_id}'")
588            return []
589
590        # Group items by segment (opr:date + opr:segment)
591        segments = {}
592        for idx, item in items.iterrows():
593            properties = item['properties']
594            date = properties['opr:date']
595            flight_num = properties['opr:segment']
596
597            if date and flight_num is not None:
598                segment_path = f"{date}_{flight_num:02d}"
599
600                if segment_path not in segments:
601                    segments[segment_path] = {
602                        'segment_path': segment_path,
603                        'date': date,
604                        'flight_number': flight_num,
605                        'collection': collection_id,
606                        'frames': [],
607                        'item_count': 0
608                    }
609
610                segments[segment_path]['frames'].append(properties.get('opr:frame'))
611                segments[segment_path]['item_count'] += 1
612
613        # Sort segments by date and flight number
614        segment_list = list(segments.values())
615        segment_list.sort(key=lambda x: (x['date'], x['flight_number']))
616
617        return segment_list

Get all segments (flights) within a collection.

Parameters
  • collection_id (str): STAC collection ID (e.g., "2022_Antarctica_BaslerMKB").
Returns
  • list[dict]: List of segment dictionaries with segment_path, date, flight_number, and frame count.
def load_layers_file(self, url: str) -> xarray.core.dataset.Dataset:
738    def load_layers_file(self, url: str) -> xr.Dataset:
739        """
740        Load layer data from CSARP_layer file.
741
742        Parameters
743        ----------
744        url : str
745            URL or path to layer file.
746
747        Returns
748        -------
749        xr.Dataset
750            Layer data with coordinates slow_time and layer, and data variables
751            twtt, quality, type, lat, lon, and elev.
752            Layer data in a standardized format with coordinates:
753            - slow_time: GPS time converted to datetime
754            And data variables:
755            - twtt: Two-way travel time for each layer
756            - quality: Quality values for each layer
757            - type: Type values for each layer
758            - lat, lon, elev: Geographic coordinates
759            - id: Layer IDs
760        """
761
762        file = self._open_file(url)
763
764        ds = self._load_layers(file)
765
766        # Add the source URL as an attribute
767        ds.attrs['source_url'] = url
768
769        # Apply common manipulations to match the expected structure
770        # Convert GPS time to datetime coordinate
771        if 'gps_time' in ds.variables:
772            slow_time_dt = pd.to_datetime(ds['gps_time'].values, unit='s')
773            ds = ds.assign_coords(slow_time=('slow_time', slow_time_dt))
774
775            # Set slow_time as the main coordinate and remove gps_time from data_vars
776            if 'slow_time' not in ds.dims:
777                ds = ds.swap_dims({'gps_time': 'slow_time'})
778
779            # Remove gps_time from data_vars if it exists there to avoid conflicts
780            if ('gps_time' in ds.data_vars) or ('gps_time' in ds.coords):
781                ds = ds.drop_vars('gps_time')
782
783        # Sort by slow_time if it exists
784        if 'slow_time' in ds.coords:
785            ds = ds.sortby('slow_time')
786
787        return ds

Load layer data from CSARP_layer file.

Parameters
  • url (str): URL or path to layer file.
Returns
  • xr.Dataset: Layer data with coordinates slow_time and layer, and data variables twtt, quality, type, lat, lon, and elev. Layer data in a standardized format with coordinates:
    • slow_time: GPS time converted to datetime And data variables:
    • twtt: Two-way travel time for each layer
    • quality: Quality values for each layer
    • type: Type values for each layer
    • lat, lon, elev: Geographic coordinates
    • id: Layer IDs
def get_layers( self, ds: Union[xarray.core.dataset.Dataset, dict], source: str = 'auto', include_geometry=True, errors='warn') -> dict:
 949    def get_layers(self, ds : Union[xr.Dataset, dict], source: str = 'auto', include_geometry=True, errors='warn') -> dict:
 950        """
 951        Load layer picks from files or database.
 952
 953        Tries files first, falls back to database if source='auto'.
 954
 955        Parameters
 956        ----------
 957        ds : Union[xr.Dataset, dict]
 958            Radar frame Dataset or STAC item.
 959        source : {'auto', 'files', 'db'}, optional
 960            Source for layers. 'auto' tries files then falls back to database.
 961        include_geometry : bool, optional
 962            If True, include geometry information when fetching from the API.
 963        errors : str, optional
 964            How to handle missing layer data: 'warn' (default) returns None with a warning,
 965            'error' raises an exception.
 966
 967        Returns
 968        -------
 969        dict or None
 970            A dictionary mapping layer IDs to their corresponding data, or None if no layers
 971            found and errors='warn'.
 972        """
 973        collection, segment_path, _ = self._extract_segment_info(ds)
 974
 975        if source == 'auto':
 976            # Try to get layers from files first
 977            try:
 978                layers = self._get_layers_files(ds, raise_errors=True)
 979                return layers
 980            except Exception:
 981                # Fallback to API if no layers found in files
 982                try:
 983                    return self._get_layers_db(ds, include_geometry=include_geometry, raise_errors=True)
 984                except Exception as e:
 985                    if errors == 'error':
 986                        raise
 987                    else:
 988                        warnings.warn(f"No layer data found for {collection}/{segment_path}: {e}")
 989                        return None
 990        elif source == 'files':
 991            try:
 992                return self._get_layers_files(ds, raise_errors=True)
 993            except Exception as e:
 994                if errors == 'error':
 995                    raise
 996                else:
 997                    warnings.warn(f"No layer files found for {collection}/{segment_path}: {e}")
 998                    return None
 999        elif source == 'db':
1000            try:
1001                return self._get_layers_db(ds, include_geometry=include_geometry, raise_errors=True)
1002            except Exception as e:
1003                if errors == 'error':
1004                    raise
1005                else:
1006                    warnings.warn(f"No layer data in DB for {collection}/{segment_path}: {e}")
1007                    return None
1008        else:
1009            raise ValueError("Invalid source specified. Must be one of: 'auto', 'files', 'db'.")

Load layer picks from files or database.

Tries files first, falls back to database if source='auto'.

Parameters
  • ds (Union[xr.Dataset, dict]): Radar frame Dataset or STAC item.
  • source ({'auto', 'files', 'db'}, optional): Source for layers. 'auto' tries files then falls back to database.
  • include_geometry (bool, optional): If True, include geometry information when fetching from the API.
  • errors (str, optional): How to handle missing layer data: 'warn' (default) returns None with a warning, 'error' raises an exception.
Returns
  • dict or None: A dictionary mapping layer IDs to their corresponding data, or None if no layers found and errors='warn'.