xopr.opr_access
Core data access interface for Open Polar Radar (OPR) datasets.
This module provides the primary interface for discovering, querying, and loading polar ice-penetrating radar data from the Open Polar Radar archive. It implements a STAC (SpatioTemporal Asset Catalog) based workflow for efficient data discovery and retrieval.
Primary Class
OPRConnection : Main interface for accessing OPR data Provides methods for querying STAC catalogs, loading radar frames, and retrieving layer picks from both file-based and database sources.
Typical Workflow
- Create an OPRConnection object, optionally with local caching
- Query the STAC catalog to find radar data matching your criteria
- Load radar frames as xarray Datasets
- Optionally load associated layer picks
- Process and visualize the data
See this notebook for an end-to-end example:
Examples
>>> import xopr
>>> # Connect to OPR with local caching
>>> opr = xopr.OPRConnection(cache_dir='radar_cache')
>>>
>>> # Query for data in a specific collection and segment
>>> stac_items = opr.query_frames(
... collections=['2022_Antarctica_BaslerMKB'],
... segment_paths=['20230109_01']
... )
>>>
>>> # Load the radar frames
>>> frames = opr.load_frames(stac_items)
>>>
>>> # Get layer picks (surface/bed)
>>> layers = opr.get_layers(frames[0])
See Also
xopr.geometry: Geographic utilities for region selection and projection
xopr.radar_util: Processing functions for radar data and layers
ops_api: Interface to the OPS database API
1""" 2Core data access interface for Open Polar Radar (OPR) datasets. 3 4This module provides the primary interface for discovering, querying, and loading 5polar ice-penetrating radar data from the Open Polar Radar archive. It implements 6a STAC (SpatioTemporal Asset Catalog) based workflow for efficient data discovery 7and retrieval. 8 9Primary Class 10------------- 11OPRConnection : Main interface for accessing OPR data 12 Provides methods for querying STAC catalogs, loading radar frames, 13 and retrieving layer picks from both file-based and database sources. 14 15Typical Workflow 16---------------- 171. Create an OPRConnection object, optionally with local caching 182. Query the STAC catalog to find radar data matching your criteria 193. Load radar frames as xarray Datasets 204. Optionally load associated layer picks 215. Process and visualize the data 22 23See this notebook for an end-to-end example: 24- https://docs.englacial.org/xopr/demo-notebook/ 25 26Examples 27-------- 28>>> import xopr 29>>> # Connect to OPR with local caching 30>>> opr = xopr.OPRConnection(cache_dir='radar_cache') 31>>> 32>>> # Query for data in a specific collection and segment 33>>> stac_items = opr.query_frames( 34... collections=['2022_Antarctica_BaslerMKB'], 35... segment_paths=['20230109_01'] 36... ) 37>>> 38>>> # Load the radar frames 39>>> frames = opr.load_frames(stac_items) 40>>> 41>>> # Get layer picks (surface/bed) 42>>> layers = opr.get_layers(frames[0]) 43 44See Also 45-------- 46xopr.geometry : Geographic utilities for region selection and projection 47xopr.radar_util : Processing functions for radar data and layers 48ops_api : Interface to the OPS database API 49""" 50 51import re 52import warnings 53from typing import Union 54 55import antimeridian 56import fsspec 57import geopandas as gpd 58import h5py 59import hdf5storage 60import numpy as np 61import pandas as pd 62import scipy.io 63import shapely 64import xarray as xr 65from rustac import DuckdbClient 66 67from . import opr_tools, ops_api 68from .cf_units import apply_cf_compliant_attrs 69from .matlab_attribute_utils import ( 70 decode_hdf5_matlab_variable, 71 extract_legacy_mat_attributes, 72) 73from .stac_cache import get_opr_catalog_path, sync_opr_catalogs 74 75 76class OPRConnection: 77 def __init__(self, 78 collection_url: str = "https://data.cresis.ku.edu/data/", 79 cache_dir: str = None, 80 stac_parquet_href: str = None, 81 sync_catalogs: bool = True): 82 """ 83 Initialize connection to OPR data archive. 84 85 Parameters 86 ---------- 87 collection_url : str, optional 88 Base URL for OPR data collection. 89 cache_dir : str, optional 90 Directory to cache downloaded data for faster repeated access. 91 stac_parquet_href : str, optional 92 Path or URL pattern to STAC catalog parquet files. When *None* 93 (default), the local cache is used if available, falling back to 94 the S3 catalog. 95 sync_catalogs : bool, optional 96 If True (default), sync OPR STAC catalogs to a local cache 97 before resolving the catalog path. Uses ETag-based change 98 detection so repeated calls are cheap (single HTTP request). 99 """ 100 self.collection_url = collection_url 101 self.cache_dir = cache_dir 102 self._user_set_href = stac_parquet_href is not None 103 if sync_catalogs and not self._user_set_href: 104 sync_opr_catalogs() 105 self.stac_parquet_href = stac_parquet_href or get_opr_catalog_path() 106 107 self.fsspec_cache_kwargs = {} 108 self.fsspec_url_prefix = '' 109 if cache_dir: 110 self.fsspec_cache_kwargs['cache_storage'] = cache_dir 111 self.fsspec_cache_kwargs['check_files'] = True 112 self.fsspec_url_prefix = 'filecache::' 113 114 self.db_layers_metadata = None # Cache for OPS database layers metadata 115 116 # Lazy-initialized DuckDB client for STAC queries — reusing a single 117 # session caches remote file metadata (parquet footers) across calls 118 self._duckdb_client = None 119 120 @property 121 def duckdb_client(self): 122 """Lazy-initialized DuckDB client, recreated after pickling.""" 123 if self._duckdb_client is None: 124 self._duckdb_client = DuckdbClient() 125 return self._duckdb_client 126 127 def __getstate__(self): 128 state = self.__dict__.copy() 129 state['_duckdb_client'] = None # DuckdbClient can't be pickled 130 return state 131 132 def _open_file(self, url): 133 """Helper method to open files with appropriate caching. 134 135 Local filesystem paths are returned as-is (no fsspec wrapping). 136 Remote URLs (http, https, s3, gs) use filecache or simplecache. 137 """ 138 if not url.startswith(('http://', 'https://', 's3://', 'gs://')): 139 return url 140 if self.fsspec_url_prefix: 141 return fsspec.open_local(f"{self.fsspec_url_prefix}{url}", filecache=self.fsspec_cache_kwargs) 142 else: 143 return fsspec.open_local(f"simplecache::{url}", simplecache=self.fsspec_cache_kwargs) 144 145 def _extract_segment_info(self, segment): 146 """Extract collection, segment_path, and frame from Dataset or dict.""" 147 if isinstance(segment, xr.Dataset): 148 return (segment.attrs.get('collection'), 149 segment.attrs.get('segment_path'), 150 segment.attrs.get('frame')) 151 else: 152 return (segment['collection'], 153 f"{segment['properties'].get('opr:date')}_{segment['properties'].get('opr:segment'):02d}", 154 segment['properties'].get('opr:frame')) 155 156 def query_frames(self, collections: list[str] = None, segment_paths: list[str] = None, 157 geometry = None, date_range: tuple = None, properties: dict = {}, 158 max_items: int = None, exclude_geometry: bool = False, 159 search_kwargs: dict = {}) -> gpd.GeoDataFrame: 160 """ 161 Query STAC catalog for radar frames matching search criteria. 162 163 Multiple parameters are combined with AND logic. Lists of values passed to a 164 single parameter are treated with OR logic (any value matches). 165 166 Parameters 167 ---------- 168 collections : list[str] or str, optional 169 Collection name(s) (e.g., "2022_Antarctica_BaslerMKB"). 170 segment_paths : list[str] or str, optional 171 Segment path(s) in format "YYYYMMDD_NN" (e.g., "20230126_01"). 172 geometry : shapely geometry or dict, optional 173 Geospatial geometry to filter by intersection. 174 date_range : str, optional 175 Date range in ISO format (e.g., "2021-01-01/2025-06-01"). 176 properties : dict, optional 177 Additional STAC properties to filter by. 178 max_items : int, optional 179 Maximum number of items to return. 180 exclude_geometry : bool, optional 181 If True, exclude geometry field to reduce response size. 182 search_kwargs : dict, optional 183 Additional keyword arguments for STAC search. 184 185 Returns 186 ------- 187 gpd.GeoDataFrame or None 188 GeoDataFrame of matching STAC items with columns for collection, 189 geometry, properties, and assets. Returns None if no matches found. 190 """ 191 192 search_params = {} 193 194 # Exclude geometry -- do not return the geometry field to reduce response size 195 if exclude_geometry: 196 search_params['exclude'] = ['geometry'] 197 198 # Handle collections (seasons) 199 if collections is not None: 200 search_params['collections'] = [collections] if isinstance(collections, str) else collections 201 202 # Handle geometry filtering 203 if geometry is not None: 204 if hasattr(geometry, '__geo_interface__'): 205 geometry = geometry.__geo_interface__ 206 207 # Fix geometries that cross the antimeridian 208 geometry = antimeridian.fix_geojson(geometry, reverse=True) 209 210 search_params['intersects'] = geometry 211 212 # Handle date range filtering 213 if date_range is not None: 214 search_params['datetime'] = date_range 215 216 # Handle max_items 217 if max_items is not None: 218 search_params['limit'] = max_items 219 220 # Handle segment_paths filtering using CQL2 221 filter_conditions = [] 222 223 if segment_paths is not None: 224 segment_paths = [segment_paths] if isinstance(segment_paths, str) else segment_paths 225 226 # Create OR conditions for segment paths 227 segment_conditions = [] 228 for segment_path in segment_paths: 229 try: 230 date_str, segment_num_str = segment_path.split('_') 231 segment_num = int(segment_num_str) 232 233 # Create AND condition for this specific segment 234 segment_condition = { 235 "op": "and", 236 "args": [ 237 { 238 "op": "=", 239 "args": [{"property": "opr:date"}, date_str] 240 }, 241 { 242 "op": "=", 243 "args": [{"property": "opr:segment"}, segment_num] 244 } 245 ] 246 } 247 segment_conditions.append(segment_condition) 248 except ValueError: 249 print(f"Warning: Invalid segment_path format '{segment_path}'. Expected format: YYYYMMDD_NN") 250 continue 251 252 if segment_conditions: 253 if len(segment_conditions) == 1: 254 filter_conditions.append(segment_conditions[0]) 255 else: 256 # Multiple segments - combine with OR 257 filter_conditions.append({ 258 "op": "or", 259 "args": segment_conditions 260 }) 261 262 # Add any additional property filters 263 for key, value in properties.items(): 264 filter_conditions.append({ 265 "op": "=", 266 "args": [{"property": key}, value] 267 }) 268 269 # Combine all filter conditions with AND 270 if filter_conditions: 271 if len(filter_conditions) == 1: 272 filter_expr = filter_conditions[0] 273 else: 274 filter_expr = { 275 "op": "and", 276 "args": filter_conditions 277 } 278 279 search_params['filter'] = filter_expr 280 281 # Add any extra kwargs to search 282 search_params.update(search_kwargs) 283 284 #print(search_params) # TODO: Remove 285 286 # Perform the search 287 # from rustac import DuckdbClient 288 items = self.duckdb_client.search(self.stac_parquet_href, **search_params) 289 if isinstance(items, dict): 290 items = items['features'] 291 292 if not items or len(items) == 0: 293 warnings.warn("No items found matching the query criteria", UserWarning) 294 return None 295 296 # Convert to GeoDataFrame 297 items_df = gpd.GeoDataFrame(items) 298 # Set index 299 items_df = items_df.set_index(items_df['id']) 300 items_df.index.name = 'stac_item_id' 301 # Set the geometry column 302 if 'geometry' in items_df.columns and not exclude_geometry: 303 items_df = items_df.set_geometry(items_df['geometry'].apply(shapely.geometry.shape)) 304 items_df.crs = "EPSG:4326" 305 306 # Reorder the columns, leaving any extra columns at the end 307 desired_order = ['collection', 'geometry', 'properties', 'assets'] 308 items_df = items_df[[col for col in desired_order if col in items_df.columns] + list(items_df.columns.difference(desired_order))] 309 310 return items_df 311 312 def load_frames(self, stac_items: gpd.GeoDataFrame, 313 data_product: str = "CSARP_standard", 314 image: Union[int, None] = None, 315 merge_flights: bool = False, 316 skip_errors: bool = False, 317 allow_unlisted_products: bool = False 318 ) -> Union[list[xr.Dataset], xr.Dataset]: 319 """ 320 Load multiple radar frames from STAC items. 321 322 Parameters 323 ---------- 324 stac_items : gpd.GeoDataFrame 325 STAC items returned from query_frames. 326 data_product : str, optional 327 Data product to load (e.g., "CSARP_standard", "CSARP_qlook"). 328 image : int or None, optional 329 The image number to load for each frame. If None (default), loads the 330 combined image. If specified, `allow_unlisted_products` must be True. 331 merge_flights : bool, optional 332 If True, merge frames from the same segment into single Datasets. 333 skip_errors : bool, optional 334 If True, skip failed frames and continue loading others. 335 allow_unlisted_products : bool, optional 336 If True, attempt to load the specified data product even if it's not 337 listed in the item's assets. See `load_frame` for details. 338 339 Returns 340 ------- 341 list[xr.Dataset] or xr.Dataset 342 List of radar frame Datasets, or list of merged Datasets if 343 merge_flights=True. 344 """ 345 frames = [] 346 347 for idx, item in stac_items.iterrows(): 348 try: 349 frame = self.load_frame(item, data_product, image=image, allow_unlisted_products=allow_unlisted_products) 350 frames.append(frame) 351 except Exception as e: 352 print(f"Error loading frame for item {item.get('id', 'unknown')}: {e}") 353 if skip_errors: 354 continue 355 else: 356 raise e 357 358 if merge_flights: 359 return opr_tools.merge_frames(frames) 360 else: 361 return frames 362 363 def load_frame(self, stac_item, data_product: str = "CSARP_standard", 364 image: Union[int, None] = None, 365 allow_unlisted_products: bool = False) -> xr.Dataset: 366 """ 367 Load a single radar frame from a STAC item. 368 369 Parameters 370 ---------- 371 stac_item : dict or pd.Series 372 STAC item containing asset URLs. 373 data_product : str, optional 374 Data product to load (e.g., "CSARP_standard", "CSARP_qlook"). 375 image : int or None, optional 376 The image number to load for this frame. If None (default), loads the 377 combined image. If specified, `allow_unlisted_products` must be True. 378 allow_unlisted_products : bool, optional 379 If True, attempt to load the specified data product even if it's not 380 listed in the item's assets. This can be useful for loading non-standard 381 products. If set to True and the data product is not found in the STAC 382 item assets, the method will attempt to construct the URL based on any 383 available CSARP_* asset. (If the frame is entirely unlisted, you can use 384 `load_frame_url` instead.) 385 Returns 386 ------- 387 xr.Dataset 388 Radar frame with coordinates slow_time and twtt, and data variables 389 including Data, Latitude, Longitude, Elevation, Surface, etc. 390 """ 391 392 assets = stac_item['assets'] 393 394 # Get the data asset 395 data_asset = assets.get(data_product) 396 if not data_asset: 397 if not allow_unlisted_products: 398 available_assets = list(assets.keys()) 399 raise ValueError(f"{data_product} asset not found in STAC item. Available assets: {available_assets}") 400 else: 401 # Find any available CSARP_* asset to use as a template for constructing the URL 402 template_asset_name = None 403 template_asset_url = None 404 for asset_name, asset_info in assets.items(): 405 if asset_name.startswith('CSARP_'): 406 template_asset_name = asset_name 407 template_asset_url = asset_info.get('href') 408 break 409 410 if not template_asset_url: 411 available_assets = list(assets.keys()) 412 raise ValueError(f"No CSARP_* asset found in STAC item to use as a template for constructing URL for {data_product}. Available assets: {available_assets}") 413 414 url = template_asset_url.replace(template_asset_name, data_product) 415 else: 416 # The asset does exist in the STAC item, so just get the URL from the asset 417 url = data_asset.get('href') 418 419 if not url: 420 raise ValueError(f"No href found in {data_product} asset") 421 422 # If a specific image is requested, modify the URL to point to that image 423 if image is not None: 424 if not allow_unlisted_products: 425 raise ValueError("Specifying an image number requires allow_unlisted_products=True to construct the URL") 426 url = url.replace("Data_", f"Data_img_{image:02d}_") 427 428 # Load the frame using the existing method 429 return self.load_frame_url(url) 430 431 def load_frame_url(self, url: str) -> xr.Dataset: 432 """ 433 Load a radar frame directly from a URL. 434 435 Automatically detects and handles both HDF5 and legacy MATLAB formats. 436 437 Parameters 438 ---------- 439 url : str 440 URL to radar frame data file (.mat). 441 442 Returns 443 ------- 444 xr.Dataset 445 Radar frame with CF-compliant metadata and citation information. 446 """ 447 448 file = self._open_file(url) 449 450 filetype = None 451 try: 452 ds = self._load_frame_hdf5(file) 453 filetype = 'hdf5' 454 except OSError: 455 ds = self._load_frame_matlab(file) 456 filetype = 'matlab' 457 458 # Add the source URL as an attribute 459 ds.attrs['source_url'] = url 460 461 # Apply CF-compliant attributes 462 ds = apply_cf_compliant_attrs(ds) 463 464 # Get the season and segment from the URL 465 match = re.search(r'(\d{4}_\w+_[A-Za-z0-9]+)\/([\w_]+)\/[\d_]+\/[\w]+(\d{8}_\d{2}_\d{3})', url) 466 if match: 467 collection, data_product, granule = match.groups() 468 date, segment_id, frame_id = granule.split('_') 469 ds.attrs['collection'] = collection 470 ds.attrs['data_product'] = data_product 471 ds.attrs['granule'] = granule 472 ds.attrs['segment_path'] = f"{date}_{segment_id}" 473 ds.attrs['date_str'] = date 474 ds.attrs['segment'] = int(segment_id) 475 ds.attrs['frame'] = int(frame_id) 476 477 # Load citation information 478 result = ops_api.get_segment_metadata(segment_name=ds.attrs['segment_path'], season_name=collection) 479 if result: 480 if isinstance(result['data'], str): 481 warnings.warn(f"Warning: Unexpected result from ops_api: {result['data']}", UserWarning) 482 else: 483 result_data = {} 484 for key, value in result['data'].items(): 485 if len(value) == 1: 486 result_data[key] = value[0] 487 elif len(value) > 1: 488 result_data[key] = set(value) 489 490 if 'dois' in result_data: 491 ds.attrs['doi'] = result_data['dois'] 492 if 'rors' in result_data: 493 ds.attrs['ror'] = result_data['rors'] 494 if 'funding_sources' in result_data: 495 ds.attrs['funder_text'] = result_data['funding_sources'] 496 497 # Add the rest of the Matlab parameters 498 if filetype == 'hdf5': 499 ds.attrs['mimetype'] = 'application/x-hdf5' 500 ds.attrs.update(decode_hdf5_matlab_variable(h5py.File(file, 'r'), 501 skip_variables=True, 502 skip_errors=True)) 503 elif filetype == 'matlab': 504 ds.attrs['mimetype'] = 'application/x-matlab-data' 505 ds.attrs.update(extract_legacy_mat_attributes(file, 506 skip_keys=ds.keys(), 507 skip_errors=True)) 508 509 return ds 510 511 def _load_frame_hdf5(self, file) -> xr.Dataset: 512 """ 513 Load a radar frame from an HDF5 file. 514 515 Parameters 516 ---------- 517 file : 518 The path to the HDF5 file containing radar frame data. 519 520 Returns 521 ------- 522 xr.Dataset 523 The loaded radar frame as an xarray Dataset. 524 """ 525 526 ds = xr.open_dataset(file, engine='h5netcdf', phony_dims='sort') 527 528 # Re-arrange variables to provide useful dimensions and coordinates 529 530 ds = ds.squeeze() # Drop the singleton dimensions matlab adds 531 532 ds = ds.rename({ # Label the dimensions with more useful names 533 ds.Data.dims[0]: 'slow_time_idx', 534 ds.Data.dims[1]: 'twtt_idx', 535 }) 536 537 # Make variables with no dimensions into scalar attributes 538 for var in ds.data_vars: 539 if ds[var].ndim == 0: 540 ds.attrs[var] = ds[var].item() 541 ds = ds.drop_vars(var) 542 543 # Make the file_type an attribute 544 if 'file_type' in ds.data_vars: 545 ds.attrs['file_type'] = ds['file_type'].to_numpy() 546 ds = ds.drop_vars('file_type') 547 548 # Name the two time coordinates 549 ds = ds.rename({'Time': 'twtt', 'GPS_time': 'slow_time'}) 550 ds = ds.set_coords(['slow_time', 'twtt']) 551 552 slow_time_1d = pd.to_datetime(ds['slow_time'].values, unit='s') 553 ds = ds.assign_coords(slow_time=('slow_time_idx', slow_time_1d)) 554 555 # Make twtt and slow_time the indexing coordinates 556 ds = ds.swap_dims({'twtt_idx': 'twtt'}) 557 ds = ds.swap_dims({'slow_time_idx': 'slow_time'}) 558 559 return ds 560 561 def _load_frame_matlab(self, file) -> xr.Dataset: 562 """ 563 Load a radar frame from a MATLAB file. 564 565 Parameters 566 ---------- 567 file : 568 The path to the MATLAB file containing radar frame data. 569 570 Returns 571 ------- 572 xr.Dataset 573 The loaded radar frame as an xarray Dataset. 574 """ 575 576 m = scipy.io.loadmat(file, mat_dtype=False) 577 578 key_dims = { 579 'Time': ('twtt',), 580 'GPS_time': ('slow_time',), 581 'Latitude': ('slow_time',), 582 'Longitude': ('slow_time',), 583 'Elevation': ('slow_time',), 584 'Roll': ('slow_time',), 585 'Pitch': ('slow_time',), 586 'Heading': ('slow_time',), 587 'Surface': ('slow_time',), 588 'Data': ('twtt', 'slow_time') 589 } 590 591 ds = xr.Dataset( 592 { 593 key: (dims, np.squeeze(m[key])) for key, dims in key_dims.items() if key in m 594 }, 595 coords={ 596 'twtt': ('twtt', np.squeeze(m['Time'])), 597 'slow_time': ('slow_time', pd.to_datetime(np.squeeze(m['GPS_time']), unit='s')), 598 } 599 ) 600 601 return ds 602 603 def get_collections(self) -> list: 604 """ 605 Get all available collections (seasons/campaigns) in STAC catalog. 606 607 Returns 608 ------- 609 list[dict] 610 List of collection dictionaries with id, description, and extent. 611 """ 612 613 return self.duckdb_client.get_collections(self.stac_parquet_href) 614 615 def get_segments(self, collection_id: str) -> list: 616 """ 617 Get all segments (flights) within a collection. 618 619 Parameters 620 ---------- 621 collection_id : str 622 STAC collection ID (e.g., "2022_Antarctica_BaslerMKB"). 623 624 Returns 625 ------- 626 list[dict] 627 List of segment dictionaries with segment_path, date, flight_number, 628 and frame count. 629 """ 630 # Query STAC API for all items in collection (exclude geometry for better performance) 631 items = self.query_frames(collections=[collection_id], exclude_geometry=True) 632 633 if items is None or len(items) == 0: 634 print(f"No items found in collection '{collection_id}'") 635 return [] 636 637 # Group items by segment (opr:date + opr:segment) 638 segments = {} 639 for idx, item in items.iterrows(): 640 properties = item['properties'] 641 date = properties['opr:date'] 642 flight_num = properties['opr:segment'] 643 644 if date and flight_num is not None: 645 segment_path = f"{date}_{flight_num:02d}" 646 647 if segment_path not in segments: 648 segments[segment_path] = { 649 'segment_path': segment_path, 650 'date': date, 651 'flight_number': flight_num, 652 'collection': collection_id, 653 'frames': [], 654 'item_count': 0 655 } 656 657 segments[segment_path]['frames'].append(properties.get('opr:frame')) 658 segments[segment_path]['item_count'] += 1 659 660 # Sort segments by date and flight number 661 segment_list = list(segments.values()) 662 segment_list.sort(key=lambda x: (x['date'], x['flight_number'])) 663 664 return segment_list 665 666 def _get_layers_files(self, segment: Union[xr.Dataset, dict], raise_errors=True) -> dict: 667 """ 668 Fetch layers from the CSARP_layers files 669 670 See https://gitlab.com/openpolarradar/opr/-/wikis/Layer-File-Guide for file formats 671 672 Parameters 673 ---------- 674 segment : xr.Dataset or dict 675 Radar frame Dataset or STAC item. 676 raise_errors : bool, optional 677 If True, raise errors when layers cannot be found. 678 679 Returns 680 ------- 681 dict 682 Dictionary mapping layer names (e.g., "standard:surface", 683 "standard:bottom") to layer Datasets. 684 """ 685 collection, segment_path, frame = self._extract_segment_info(segment) 686 687 # If we already have a STAC item, just use it. Otherwise, query to find the matching STAC items 688 if isinstance(segment, xr.Dataset): 689 properties = {} 690 if frame: 691 properties['opr:frame'] = frame 692 693 # Query STAC collection for CSARP_layer files matching this specific segment 694 695 # Get items from this specific segment 696 stac_items = self.query_frames(collections=[collection], segment_paths=[segment_path], properties=properties) 697 698 # Filter for items that have CSARP_layer assets 699 layer_items = [] 700 for idx, item in stac_items.iterrows(): 701 if 'CSARP_layer' in item['assets']: 702 layer_items.append(item) 703 else: 704 layer_items = [segment] if 'CSARP_layer' in segment.get('assets', {}) else [] 705 706 if not layer_items: 707 if raise_errors: 708 raise ValueError(f"No CSARP_layer files found for segment path {segment_path} in collection {collection}") 709 else: 710 return {} 711 712 # Load each layer file and combine them 713 layer_frames = [] 714 for item in layer_items: 715 layer_asset = item['assets']['CSARP_layer'] 716 if layer_asset and 'href' in layer_asset: 717 url = layer_asset['href'] 718 try: 719 layer_ds = self.load_layers_file(url) 720 layer_frames.append(layer_ds) 721 except Exception as e: 722 raise e # TODO 723 print(f"Warning: Failed to load layer file {url}: {e}") 724 continue 725 726 if not layer_frames: 727 if raise_errors: 728 raise ValueError(f"No valid CSARP_layer files could be loaded for segment {segment_path} in collection {collection}") 729 else: 730 return {} 731 732 # Concatenate all layer frames along slow_time dimension 733 layers_segment = xr.concat(layer_frames, dim='slow_time', combine_attrs='drop_conflicts', data_vars='minimal') 734 layers_segment = layers_segment.sortby('slow_time') 735 736 # Trim to bounds of the original dataset 737 layers_segment = self._trim_to_bounds(layers_segment, segment) 738 739 # Find the layer organization file to map layer IDs to names 740 # Layer organization files are one per directory. For example, the layer file: 741 # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/Data_20180105_03_003.mat 742 # would have a layer organization file at: 743 # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/layer_20180105_03.mat 744 745 layer_organization_file_url = re.sub(r'Data_(\d{8}_\d{2})_\d{3}.mat', r'layer_\1.mat', url) 746 layer_organization_ds = self._load_layer_organization(self._open_file(layer_organization_file_url)) 747 748 # Split into separate layers by ID 749 layers = {} 750 751 layer_ids = np.unique(layers_segment['layer']) 752 753 for i, layer_id in enumerate(layer_ids): 754 layer_id_int = int(layer_id) 755 layer_name = str(layer_organization_ds['lyr_name'].sel(lyr_id=layer_id_int).values.item().squeeze()) 756 layer_group = str(layer_organization_ds['lyr_group_name'].sel(lyr_id=layer_id_int).values.item().squeeze()) 757 if layer_group == '[]': 758 layer_group = '' 759 layer_display_name = f"{layer_group}:{layer_name}" 760 761 layer_ds = layers_segment.sel(layer=layer_id).drop_vars('layer') 762 # Only add non-empty layers with at least some non-NaN data 763 if layer_ds.sizes.get('slow_time', 0) > 0: 764 if not layer_ds.to_array().isnull().all(): 765 layers[layer_display_name] = layer_ds 766 767 return layers 768 769 def _trim_to_bounds(self, ds: xr.Dataset, ref: Union[xr.Dataset, dict]) -> xr.Dataset: 770 start_time, end_time = None, None 771 if isinstance(ref, xr.Dataset) and 'slow_time' in ref.coords: 772 start_time = ref['slow_time'].min() 773 end_time = ref['slow_time'].max() 774 else: 775 properties = ref.get('properties', {}) 776 if 'start_datetime' in properties and 'end_datetime' in properties: 777 start_time = pd.to_datetime(properties['start_datetime']) 778 end_time = pd.to_datetime(properties['end_datetime']) 779 780 if start_time: 781 return ds.sel(slow_time=slice(start_time, end_time)) 782 else: 783 return ds 784 785 def load_layers_file(self, url: str) -> xr.Dataset: 786 """ 787 Load layer data from CSARP_layer file. 788 789 Parameters 790 ---------- 791 url : str 792 URL or path to layer file. 793 794 Returns 795 ------- 796 xr.Dataset 797 Layer data with coordinates slow_time and layer, and data variables 798 twtt, quality, type, lat, lon, and elev. 799 Layer data in a standardized format with coordinates: 800 - slow_time: GPS time converted to datetime 801 And data variables: 802 - twtt: Two-way travel time for each layer 803 - quality: Quality values for each layer 804 - type: Type values for each layer 805 - lat, lon, elev: Geographic coordinates 806 - id: Layer IDs 807 """ 808 809 file = self._open_file(url) 810 811 ds = self._load_layers(file) 812 813 # Add the source URL as an attribute 814 ds.attrs['source_url'] = url 815 816 # Apply common manipulations to match the expected structure 817 # Convert GPS time to datetime coordinate 818 if 'gps_time' in ds.variables: 819 slow_time_dt = pd.to_datetime(ds['gps_time'].values, unit='s') 820 ds = ds.assign_coords(slow_time=('slow_time', slow_time_dt)) 821 822 # Set slow_time as the main coordinate and remove gps_time from data_vars 823 if 'slow_time' not in ds.dims: 824 ds = ds.swap_dims({'gps_time': 'slow_time'}) 825 826 # Remove gps_time from data_vars if it exists there to avoid conflicts 827 if ('gps_time' in ds.data_vars) or ('gps_time' in ds.coords): 828 ds = ds.drop_vars('gps_time') 829 830 # Sort by slow_time if it exists 831 if 'slow_time' in ds.coords: 832 ds = ds.sortby('slow_time') 833 834 return ds 835 836 def _load_layers(self, file) -> xr.Dataset: 837 """ 838 Load layer data file using hdf5storage 839 840 Parameters: 841 file : str 842 Path to the layer file 843 Returns: xr.Dataset 844 """ 845 846 d = hdf5storage.loadmat(file, appendmat=False) 847 848 # Ensure 'id' is at least 1D for the layer dimension 849 id_data = np.atleast_1d(d['id'].squeeze()) 850 851 # For 2D arrays with (layer, slow_time) dimensions, ensure they remain 2D 852 # even when there's only one layer 853 quality_data = np.atleast_2d(d['quality'].squeeze()) 854 if quality_data.shape[0] == 1 or quality_data.ndim == 1: 855 quality_data = quality_data.reshape(len(id_data), -1) 856 857 twtt_data = np.atleast_2d(d['twtt'].squeeze()) 858 if twtt_data.shape[0] == 1 or twtt_data.ndim == 1: 859 twtt_data = twtt_data.reshape(len(id_data), -1) 860 861 type_data = np.atleast_2d(d['type'].squeeze()) 862 if type_data.shape[0] == 1 or type_data.ndim == 1: 863 type_data = type_data.reshape(len(id_data), -1) 864 865 ds = xr.Dataset({ 866 'file_type': ((), d['file_type'].squeeze()), 867 'file_version': ((), d['file_version'].squeeze()), 868 'elev': (('slow_time',), d['elev'].squeeze()), 869 #'gps_source': ((), d['gps_source'].squeeze()), 870 'gps_time': (('slow_time',), d['gps_time'].squeeze()), 871 'id': (('layer',), id_data), 872 'lat': (('slow_time',), d['lat'].squeeze()), 873 'lon': (('slow_time',), d['lon'].squeeze()), 874 'quality': (('layer', 'slow_time'), quality_data), 875 'twtt': (('layer', 'slow_time'), twtt_data), 876 'type': (('layer', 'slow_time'), type_data), 877 }) 878 879 ds = ds.assign_coords({'layer': ds['id'], 'slow_time': ds['gps_time']}) 880 881 return ds 882 883 def _load_layer_organization(self, file) -> xr.Dataset: 884 """ 885 Load a layer organization file 886 887 Parameters 888 ---------- 889 file : str 890 Path to the HDF5 layer organization file 891 892 Returns 893 ------- 894 xr.Dataset 895 Raw layer data from HDF5 file 896 """ 897 d = hdf5storage.loadmat(file, appendmat=False) 898 ds = xr.Dataset({ 899 'file_type': ((), d['file_type'].squeeze()), 900 'file_version': ((), d['file_version'].squeeze()), 901 'lyr_age': (('lyr_id',), np.atleast_1d(d['lyr_age'].squeeze())), 902 'lyr_age_source': (('lyr_id',), np.atleast_1d(d['lyr_age_source'].squeeze())), 903 'lyr_desc': (('lyr_id',), np.atleast_1d(d['lyr_desc'].squeeze())), 904 'lyr_group_name': (('lyr_id',), np.atleast_1d(d['lyr_group_name'].squeeze())), 905 'lyr_id': (('lyr_id',), np.atleast_1d(d['lyr_id'].squeeze())), 906 'lyr_name': (('lyr_id',), np.atleast_1d(d['lyr_name'].squeeze())), 907 'lyr_order': (('lyr_id',), np.atleast_1d(d['lyr_order'].squeeze())), 908 }, attrs={'param': d['param']}) 909 910 ds = ds.set_index(lyr_id='lyr_id') 911 912 return ds 913 914 def _get_layers_db(self, flight: Union[xr.Dataset, dict], include_geometry=True, raise_errors=True) -> dict: 915 """ 916 Load layer picks from OPS database API. 917 918 Parameters 919 ---------- 920 flight : xr.Dataset or dict 921 Radar frame Dataset or STAC item. 922 include_geometry : bool, optional 923 If True, include geometry information in layers. 924 raise_errors : bool, optional 925 If True, raise errors when layers cannot be found. 926 927 Returns 928 ------- 929 dict 930 Dictionary mapping layer names to layer Datasets. 931 """ 932 933 collection, segment_path, _ = self._extract_segment_info(flight) 934 935 if 'Antarctica' in collection: 936 location = 'antarctic' 937 elif 'Greenland' in collection: 938 location = 'arctic' 939 else: 940 raise ValueError("Dataset does not belong to a recognized location (Antarctica or Greenland).") 941 942 layer_points = ops_api.get_layer_points( 943 segment_name=segment_path, 944 season_name=collection, 945 location=location, 946 include_geometry=include_geometry 947 ) 948 949 if layer_points['status'] != 1: 950 if raise_errors: 951 raise ValueError(f"Failed to fetch layer points. Received response with status {layer_points['status']}.") 952 else: 953 return {} 954 955 layer_ds_raw = xr.Dataset( 956 {k: (['gps_time'], v) for k, v in layer_points['data'].items() if k != 'gps_time'}, 957 coords={'gps_time': layer_points['data']['gps_time']} 958 ) 959 # Split into a dictionary of layers based on lyr_id 960 layer_ids = set(layer_ds_raw['lyr_id'].to_numpy()) 961 layer_ids = [int(layer_id) for layer_id in layer_ids if not np.isnan(layer_id)] 962 963 # Get the mapping of layer IDs to names 964 if self.db_layers_metadata is None: 965 layer_metadata = ops_api.get_layer_metadata() 966 if layer_metadata['status'] == 1: 967 df = pd.DataFrame(layer_metadata['data']) 968 df['display_name'] = df['lyr_group_name'] + ":" + df['lyr_name'] 969 self.db_layers_metadata = df 970 else: 971 raise ValueError("Failed to fetch layer metadata from OPS API.") 972 973 layers = {} 974 for layer_id in layer_ids: 975 layer_name = self.db_layers_metadata.loc[self.db_layers_metadata['lyr_id'] == layer_id, 'display_name'].values[0] 976 977 layer_ds = layer_ds_raw.where(layer_ds_raw['lyr_id'] == layer_id, drop=True) 978 979 layer_ds = layer_ds.sortby('gps_time') 980 981 layer_ds = layer_ds.rename({'gps_time': 'slow_time'}) 982 layer_ds = layer_ds.set_coords(['slow_time']) 983 984 layer_ds['slow_time'] = pd.to_datetime(layer_ds['slow_time'].values, unit='s') 985 986 # Filter to the same time range as flight 987 layer_ds = self._trim_to_bounds(layer_ds, flight) 988 989 # Only add non-empty layers with at least some non-NaN data 990 if layer_ds.sizes.get('slow_time', 0) > 0: 991 if not layer_ds.to_array().isnull().all(): 992 layers[layer_name] = layer_ds 993 994 return layers 995 996 def get_layers(self, ds : Union[xr.Dataset, dict], source: str = 'auto', include_geometry=True, errors='warn') -> dict: 997 """ 998 Load layer picks from files or database. 999 1000 Tries files first, falls back to database if source='auto'. 1001 1002 Parameters 1003 ---------- 1004 ds : Union[xr.Dataset, dict] 1005 Radar frame Dataset or STAC item. 1006 source : {'auto', 'files', 'db'}, optional 1007 Source for layers. 'auto' tries files then falls back to database. 1008 include_geometry : bool, optional 1009 If True, include geometry information when fetching from the API. 1010 errors : str, optional 1011 How to handle missing layer data: 'warn' (default) returns None with a warning, 1012 'error' raises an exception. 1013 1014 Returns 1015 ------- 1016 dict or None 1017 A dictionary mapping layer IDs to their corresponding data, or None if no layers 1018 found and errors='warn'. 1019 """ 1020 collection, segment_path, _ = self._extract_segment_info(ds) 1021 1022 if source == 'auto': 1023 # Try to get layers from files first 1024 try: 1025 layers = self._get_layers_files(ds, raise_errors=True) 1026 return layers 1027 except Exception: 1028 # Fallback to API if no layers found in files 1029 try: 1030 return self._get_layers_db(ds, include_geometry=include_geometry, raise_errors=True) 1031 except Exception as e: 1032 if errors == 'error': 1033 raise 1034 else: 1035 warnings.warn(f"No layer data found for {collection}/{segment_path}: {e}") 1036 return None 1037 elif source == 'files': 1038 try: 1039 return self._get_layers_files(ds, raise_errors=True) 1040 except Exception as e: 1041 if errors == 'error': 1042 raise 1043 else: 1044 warnings.warn(f"No layer files found for {collection}/{segment_path}: {e}") 1045 return None 1046 elif source == 'db': 1047 try: 1048 return self._get_layers_db(ds, include_geometry=include_geometry, raise_errors=True) 1049 except Exception as e: 1050 if errors == 'error': 1051 raise 1052 else: 1053 warnings.warn(f"No layer data in DB for {collection}/{segment_path}: {e}") 1054 return None 1055 else: 1056 raise ValueError("Invalid source specified. Must be one of: 'auto', 'files', 'db'.")
77class OPRConnection: 78 def __init__(self, 79 collection_url: str = "https://data.cresis.ku.edu/data/", 80 cache_dir: str = None, 81 stac_parquet_href: str = None, 82 sync_catalogs: bool = True): 83 """ 84 Initialize connection to OPR data archive. 85 86 Parameters 87 ---------- 88 collection_url : str, optional 89 Base URL for OPR data collection. 90 cache_dir : str, optional 91 Directory to cache downloaded data for faster repeated access. 92 stac_parquet_href : str, optional 93 Path or URL pattern to STAC catalog parquet files. When *None* 94 (default), the local cache is used if available, falling back to 95 the S3 catalog. 96 sync_catalogs : bool, optional 97 If True (default), sync OPR STAC catalogs to a local cache 98 before resolving the catalog path. Uses ETag-based change 99 detection so repeated calls are cheap (single HTTP request). 100 """ 101 self.collection_url = collection_url 102 self.cache_dir = cache_dir 103 self._user_set_href = stac_parquet_href is not None 104 if sync_catalogs and not self._user_set_href: 105 sync_opr_catalogs() 106 self.stac_parquet_href = stac_parquet_href or get_opr_catalog_path() 107 108 self.fsspec_cache_kwargs = {} 109 self.fsspec_url_prefix = '' 110 if cache_dir: 111 self.fsspec_cache_kwargs['cache_storage'] = cache_dir 112 self.fsspec_cache_kwargs['check_files'] = True 113 self.fsspec_url_prefix = 'filecache::' 114 115 self.db_layers_metadata = None # Cache for OPS database layers metadata 116 117 # Lazy-initialized DuckDB client for STAC queries — reusing a single 118 # session caches remote file metadata (parquet footers) across calls 119 self._duckdb_client = None 120 121 @property 122 def duckdb_client(self): 123 """Lazy-initialized DuckDB client, recreated after pickling.""" 124 if self._duckdb_client is None: 125 self._duckdb_client = DuckdbClient() 126 return self._duckdb_client 127 128 def __getstate__(self): 129 state = self.__dict__.copy() 130 state['_duckdb_client'] = None # DuckdbClient can't be pickled 131 return state 132 133 def _open_file(self, url): 134 """Helper method to open files with appropriate caching. 135 136 Local filesystem paths are returned as-is (no fsspec wrapping). 137 Remote URLs (http, https, s3, gs) use filecache or simplecache. 138 """ 139 if not url.startswith(('http://', 'https://', 's3://', 'gs://')): 140 return url 141 if self.fsspec_url_prefix: 142 return fsspec.open_local(f"{self.fsspec_url_prefix}{url}", filecache=self.fsspec_cache_kwargs) 143 else: 144 return fsspec.open_local(f"simplecache::{url}", simplecache=self.fsspec_cache_kwargs) 145 146 def _extract_segment_info(self, segment): 147 """Extract collection, segment_path, and frame from Dataset or dict.""" 148 if isinstance(segment, xr.Dataset): 149 return (segment.attrs.get('collection'), 150 segment.attrs.get('segment_path'), 151 segment.attrs.get('frame')) 152 else: 153 return (segment['collection'], 154 f"{segment['properties'].get('opr:date')}_{segment['properties'].get('opr:segment'):02d}", 155 segment['properties'].get('opr:frame')) 156 157 def query_frames(self, collections: list[str] = None, segment_paths: list[str] = None, 158 geometry = None, date_range: tuple = None, properties: dict = {}, 159 max_items: int = None, exclude_geometry: bool = False, 160 search_kwargs: dict = {}) -> gpd.GeoDataFrame: 161 """ 162 Query STAC catalog for radar frames matching search criteria. 163 164 Multiple parameters are combined with AND logic. Lists of values passed to a 165 single parameter are treated with OR logic (any value matches). 166 167 Parameters 168 ---------- 169 collections : list[str] or str, optional 170 Collection name(s) (e.g., "2022_Antarctica_BaslerMKB"). 171 segment_paths : list[str] or str, optional 172 Segment path(s) in format "YYYYMMDD_NN" (e.g., "20230126_01"). 173 geometry : shapely geometry or dict, optional 174 Geospatial geometry to filter by intersection. 175 date_range : str, optional 176 Date range in ISO format (e.g., "2021-01-01/2025-06-01"). 177 properties : dict, optional 178 Additional STAC properties to filter by. 179 max_items : int, optional 180 Maximum number of items to return. 181 exclude_geometry : bool, optional 182 If True, exclude geometry field to reduce response size. 183 search_kwargs : dict, optional 184 Additional keyword arguments for STAC search. 185 186 Returns 187 ------- 188 gpd.GeoDataFrame or None 189 GeoDataFrame of matching STAC items with columns for collection, 190 geometry, properties, and assets. Returns None if no matches found. 191 """ 192 193 search_params = {} 194 195 # Exclude geometry -- do not return the geometry field to reduce response size 196 if exclude_geometry: 197 search_params['exclude'] = ['geometry'] 198 199 # Handle collections (seasons) 200 if collections is not None: 201 search_params['collections'] = [collections] if isinstance(collections, str) else collections 202 203 # Handle geometry filtering 204 if geometry is not None: 205 if hasattr(geometry, '__geo_interface__'): 206 geometry = geometry.__geo_interface__ 207 208 # Fix geometries that cross the antimeridian 209 geometry = antimeridian.fix_geojson(geometry, reverse=True) 210 211 search_params['intersects'] = geometry 212 213 # Handle date range filtering 214 if date_range is not None: 215 search_params['datetime'] = date_range 216 217 # Handle max_items 218 if max_items is not None: 219 search_params['limit'] = max_items 220 221 # Handle segment_paths filtering using CQL2 222 filter_conditions = [] 223 224 if segment_paths is not None: 225 segment_paths = [segment_paths] if isinstance(segment_paths, str) else segment_paths 226 227 # Create OR conditions for segment paths 228 segment_conditions = [] 229 for segment_path in segment_paths: 230 try: 231 date_str, segment_num_str = segment_path.split('_') 232 segment_num = int(segment_num_str) 233 234 # Create AND condition for this specific segment 235 segment_condition = { 236 "op": "and", 237 "args": [ 238 { 239 "op": "=", 240 "args": [{"property": "opr:date"}, date_str] 241 }, 242 { 243 "op": "=", 244 "args": [{"property": "opr:segment"}, segment_num] 245 } 246 ] 247 } 248 segment_conditions.append(segment_condition) 249 except ValueError: 250 print(f"Warning: Invalid segment_path format '{segment_path}'. Expected format: YYYYMMDD_NN") 251 continue 252 253 if segment_conditions: 254 if len(segment_conditions) == 1: 255 filter_conditions.append(segment_conditions[0]) 256 else: 257 # Multiple segments - combine with OR 258 filter_conditions.append({ 259 "op": "or", 260 "args": segment_conditions 261 }) 262 263 # Add any additional property filters 264 for key, value in properties.items(): 265 filter_conditions.append({ 266 "op": "=", 267 "args": [{"property": key}, value] 268 }) 269 270 # Combine all filter conditions with AND 271 if filter_conditions: 272 if len(filter_conditions) == 1: 273 filter_expr = filter_conditions[0] 274 else: 275 filter_expr = { 276 "op": "and", 277 "args": filter_conditions 278 } 279 280 search_params['filter'] = filter_expr 281 282 # Add any extra kwargs to search 283 search_params.update(search_kwargs) 284 285 #print(search_params) # TODO: Remove 286 287 # Perform the search 288 # from rustac import DuckdbClient 289 items = self.duckdb_client.search(self.stac_parquet_href, **search_params) 290 if isinstance(items, dict): 291 items = items['features'] 292 293 if not items or len(items) == 0: 294 warnings.warn("No items found matching the query criteria", UserWarning) 295 return None 296 297 # Convert to GeoDataFrame 298 items_df = gpd.GeoDataFrame(items) 299 # Set index 300 items_df = items_df.set_index(items_df['id']) 301 items_df.index.name = 'stac_item_id' 302 # Set the geometry column 303 if 'geometry' in items_df.columns and not exclude_geometry: 304 items_df = items_df.set_geometry(items_df['geometry'].apply(shapely.geometry.shape)) 305 items_df.crs = "EPSG:4326" 306 307 # Reorder the columns, leaving any extra columns at the end 308 desired_order = ['collection', 'geometry', 'properties', 'assets'] 309 items_df = items_df[[col for col in desired_order if col in items_df.columns] + list(items_df.columns.difference(desired_order))] 310 311 return items_df 312 313 def load_frames(self, stac_items: gpd.GeoDataFrame, 314 data_product: str = "CSARP_standard", 315 image: Union[int, None] = None, 316 merge_flights: bool = False, 317 skip_errors: bool = False, 318 allow_unlisted_products: bool = False 319 ) -> Union[list[xr.Dataset], xr.Dataset]: 320 """ 321 Load multiple radar frames from STAC items. 322 323 Parameters 324 ---------- 325 stac_items : gpd.GeoDataFrame 326 STAC items returned from query_frames. 327 data_product : str, optional 328 Data product to load (e.g., "CSARP_standard", "CSARP_qlook"). 329 image : int or None, optional 330 The image number to load for each frame. If None (default), loads the 331 combined image. If specified, `allow_unlisted_products` must be True. 332 merge_flights : bool, optional 333 If True, merge frames from the same segment into single Datasets. 334 skip_errors : bool, optional 335 If True, skip failed frames and continue loading others. 336 allow_unlisted_products : bool, optional 337 If True, attempt to load the specified data product even if it's not 338 listed in the item's assets. See `load_frame` for details. 339 340 Returns 341 ------- 342 list[xr.Dataset] or xr.Dataset 343 List of radar frame Datasets, or list of merged Datasets if 344 merge_flights=True. 345 """ 346 frames = [] 347 348 for idx, item in stac_items.iterrows(): 349 try: 350 frame = self.load_frame(item, data_product, image=image, allow_unlisted_products=allow_unlisted_products) 351 frames.append(frame) 352 except Exception as e: 353 print(f"Error loading frame for item {item.get('id', 'unknown')}: {e}") 354 if skip_errors: 355 continue 356 else: 357 raise e 358 359 if merge_flights: 360 return opr_tools.merge_frames(frames) 361 else: 362 return frames 363 364 def load_frame(self, stac_item, data_product: str = "CSARP_standard", 365 image: Union[int, None] = None, 366 allow_unlisted_products: bool = False) -> xr.Dataset: 367 """ 368 Load a single radar frame from a STAC item. 369 370 Parameters 371 ---------- 372 stac_item : dict or pd.Series 373 STAC item containing asset URLs. 374 data_product : str, optional 375 Data product to load (e.g., "CSARP_standard", "CSARP_qlook"). 376 image : int or None, optional 377 The image number to load for this frame. If None (default), loads the 378 combined image. If specified, `allow_unlisted_products` must be True. 379 allow_unlisted_products : bool, optional 380 If True, attempt to load the specified data product even if it's not 381 listed in the item's assets. This can be useful for loading non-standard 382 products. If set to True and the data product is not found in the STAC 383 item assets, the method will attempt to construct the URL based on any 384 available CSARP_* asset. (If the frame is entirely unlisted, you can use 385 `load_frame_url` instead.) 386 Returns 387 ------- 388 xr.Dataset 389 Radar frame with coordinates slow_time and twtt, and data variables 390 including Data, Latitude, Longitude, Elevation, Surface, etc. 391 """ 392 393 assets = stac_item['assets'] 394 395 # Get the data asset 396 data_asset = assets.get(data_product) 397 if not data_asset: 398 if not allow_unlisted_products: 399 available_assets = list(assets.keys()) 400 raise ValueError(f"{data_product} asset not found in STAC item. Available assets: {available_assets}") 401 else: 402 # Find any available CSARP_* asset to use as a template for constructing the URL 403 template_asset_name = None 404 template_asset_url = None 405 for asset_name, asset_info in assets.items(): 406 if asset_name.startswith('CSARP_'): 407 template_asset_name = asset_name 408 template_asset_url = asset_info.get('href') 409 break 410 411 if not template_asset_url: 412 available_assets = list(assets.keys()) 413 raise ValueError(f"No CSARP_* asset found in STAC item to use as a template for constructing URL for {data_product}. Available assets: {available_assets}") 414 415 url = template_asset_url.replace(template_asset_name, data_product) 416 else: 417 # The asset does exist in the STAC item, so just get the URL from the asset 418 url = data_asset.get('href') 419 420 if not url: 421 raise ValueError(f"No href found in {data_product} asset") 422 423 # If a specific image is requested, modify the URL to point to that image 424 if image is not None: 425 if not allow_unlisted_products: 426 raise ValueError("Specifying an image number requires allow_unlisted_products=True to construct the URL") 427 url = url.replace("Data_", f"Data_img_{image:02d}_") 428 429 # Load the frame using the existing method 430 return self.load_frame_url(url) 431 432 def load_frame_url(self, url: str) -> xr.Dataset: 433 """ 434 Load a radar frame directly from a URL. 435 436 Automatically detects and handles both HDF5 and legacy MATLAB formats. 437 438 Parameters 439 ---------- 440 url : str 441 URL to radar frame data file (.mat). 442 443 Returns 444 ------- 445 xr.Dataset 446 Radar frame with CF-compliant metadata and citation information. 447 """ 448 449 file = self._open_file(url) 450 451 filetype = None 452 try: 453 ds = self._load_frame_hdf5(file) 454 filetype = 'hdf5' 455 except OSError: 456 ds = self._load_frame_matlab(file) 457 filetype = 'matlab' 458 459 # Add the source URL as an attribute 460 ds.attrs['source_url'] = url 461 462 # Apply CF-compliant attributes 463 ds = apply_cf_compliant_attrs(ds) 464 465 # Get the season and segment from the URL 466 match = re.search(r'(\d{4}_\w+_[A-Za-z0-9]+)\/([\w_]+)\/[\d_]+\/[\w]+(\d{8}_\d{2}_\d{3})', url) 467 if match: 468 collection, data_product, granule = match.groups() 469 date, segment_id, frame_id = granule.split('_') 470 ds.attrs['collection'] = collection 471 ds.attrs['data_product'] = data_product 472 ds.attrs['granule'] = granule 473 ds.attrs['segment_path'] = f"{date}_{segment_id}" 474 ds.attrs['date_str'] = date 475 ds.attrs['segment'] = int(segment_id) 476 ds.attrs['frame'] = int(frame_id) 477 478 # Load citation information 479 result = ops_api.get_segment_metadata(segment_name=ds.attrs['segment_path'], season_name=collection) 480 if result: 481 if isinstance(result['data'], str): 482 warnings.warn(f"Warning: Unexpected result from ops_api: {result['data']}", UserWarning) 483 else: 484 result_data = {} 485 for key, value in result['data'].items(): 486 if len(value) == 1: 487 result_data[key] = value[0] 488 elif len(value) > 1: 489 result_data[key] = set(value) 490 491 if 'dois' in result_data: 492 ds.attrs['doi'] = result_data['dois'] 493 if 'rors' in result_data: 494 ds.attrs['ror'] = result_data['rors'] 495 if 'funding_sources' in result_data: 496 ds.attrs['funder_text'] = result_data['funding_sources'] 497 498 # Add the rest of the Matlab parameters 499 if filetype == 'hdf5': 500 ds.attrs['mimetype'] = 'application/x-hdf5' 501 ds.attrs.update(decode_hdf5_matlab_variable(h5py.File(file, 'r'), 502 skip_variables=True, 503 skip_errors=True)) 504 elif filetype == 'matlab': 505 ds.attrs['mimetype'] = 'application/x-matlab-data' 506 ds.attrs.update(extract_legacy_mat_attributes(file, 507 skip_keys=ds.keys(), 508 skip_errors=True)) 509 510 return ds 511 512 def _load_frame_hdf5(self, file) -> xr.Dataset: 513 """ 514 Load a radar frame from an HDF5 file. 515 516 Parameters 517 ---------- 518 file : 519 The path to the HDF5 file containing radar frame data. 520 521 Returns 522 ------- 523 xr.Dataset 524 The loaded radar frame as an xarray Dataset. 525 """ 526 527 ds = xr.open_dataset(file, engine='h5netcdf', phony_dims='sort') 528 529 # Re-arrange variables to provide useful dimensions and coordinates 530 531 ds = ds.squeeze() # Drop the singleton dimensions matlab adds 532 533 ds = ds.rename({ # Label the dimensions with more useful names 534 ds.Data.dims[0]: 'slow_time_idx', 535 ds.Data.dims[1]: 'twtt_idx', 536 }) 537 538 # Make variables with no dimensions into scalar attributes 539 for var in ds.data_vars: 540 if ds[var].ndim == 0: 541 ds.attrs[var] = ds[var].item() 542 ds = ds.drop_vars(var) 543 544 # Make the file_type an attribute 545 if 'file_type' in ds.data_vars: 546 ds.attrs['file_type'] = ds['file_type'].to_numpy() 547 ds = ds.drop_vars('file_type') 548 549 # Name the two time coordinates 550 ds = ds.rename({'Time': 'twtt', 'GPS_time': 'slow_time'}) 551 ds = ds.set_coords(['slow_time', 'twtt']) 552 553 slow_time_1d = pd.to_datetime(ds['slow_time'].values, unit='s') 554 ds = ds.assign_coords(slow_time=('slow_time_idx', slow_time_1d)) 555 556 # Make twtt and slow_time the indexing coordinates 557 ds = ds.swap_dims({'twtt_idx': 'twtt'}) 558 ds = ds.swap_dims({'slow_time_idx': 'slow_time'}) 559 560 return ds 561 562 def _load_frame_matlab(self, file) -> xr.Dataset: 563 """ 564 Load a radar frame from a MATLAB file. 565 566 Parameters 567 ---------- 568 file : 569 The path to the MATLAB file containing radar frame data. 570 571 Returns 572 ------- 573 xr.Dataset 574 The loaded radar frame as an xarray Dataset. 575 """ 576 577 m = scipy.io.loadmat(file, mat_dtype=False) 578 579 key_dims = { 580 'Time': ('twtt',), 581 'GPS_time': ('slow_time',), 582 'Latitude': ('slow_time',), 583 'Longitude': ('slow_time',), 584 'Elevation': ('slow_time',), 585 'Roll': ('slow_time',), 586 'Pitch': ('slow_time',), 587 'Heading': ('slow_time',), 588 'Surface': ('slow_time',), 589 'Data': ('twtt', 'slow_time') 590 } 591 592 ds = xr.Dataset( 593 { 594 key: (dims, np.squeeze(m[key])) for key, dims in key_dims.items() if key in m 595 }, 596 coords={ 597 'twtt': ('twtt', np.squeeze(m['Time'])), 598 'slow_time': ('slow_time', pd.to_datetime(np.squeeze(m['GPS_time']), unit='s')), 599 } 600 ) 601 602 return ds 603 604 def get_collections(self) -> list: 605 """ 606 Get all available collections (seasons/campaigns) in STAC catalog. 607 608 Returns 609 ------- 610 list[dict] 611 List of collection dictionaries with id, description, and extent. 612 """ 613 614 return self.duckdb_client.get_collections(self.stac_parquet_href) 615 616 def get_segments(self, collection_id: str) -> list: 617 """ 618 Get all segments (flights) within a collection. 619 620 Parameters 621 ---------- 622 collection_id : str 623 STAC collection ID (e.g., "2022_Antarctica_BaslerMKB"). 624 625 Returns 626 ------- 627 list[dict] 628 List of segment dictionaries with segment_path, date, flight_number, 629 and frame count. 630 """ 631 # Query STAC API for all items in collection (exclude geometry for better performance) 632 items = self.query_frames(collections=[collection_id], exclude_geometry=True) 633 634 if items is None or len(items) == 0: 635 print(f"No items found in collection '{collection_id}'") 636 return [] 637 638 # Group items by segment (opr:date + opr:segment) 639 segments = {} 640 for idx, item in items.iterrows(): 641 properties = item['properties'] 642 date = properties['opr:date'] 643 flight_num = properties['opr:segment'] 644 645 if date and flight_num is not None: 646 segment_path = f"{date}_{flight_num:02d}" 647 648 if segment_path not in segments: 649 segments[segment_path] = { 650 'segment_path': segment_path, 651 'date': date, 652 'flight_number': flight_num, 653 'collection': collection_id, 654 'frames': [], 655 'item_count': 0 656 } 657 658 segments[segment_path]['frames'].append(properties.get('opr:frame')) 659 segments[segment_path]['item_count'] += 1 660 661 # Sort segments by date and flight number 662 segment_list = list(segments.values()) 663 segment_list.sort(key=lambda x: (x['date'], x['flight_number'])) 664 665 return segment_list 666 667 def _get_layers_files(self, segment: Union[xr.Dataset, dict], raise_errors=True) -> dict: 668 """ 669 Fetch layers from the CSARP_layers files 670 671 See https://gitlab.com/openpolarradar/opr/-/wikis/Layer-File-Guide for file formats 672 673 Parameters 674 ---------- 675 segment : xr.Dataset or dict 676 Radar frame Dataset or STAC item. 677 raise_errors : bool, optional 678 If True, raise errors when layers cannot be found. 679 680 Returns 681 ------- 682 dict 683 Dictionary mapping layer names (e.g., "standard:surface", 684 "standard:bottom") to layer Datasets. 685 """ 686 collection, segment_path, frame = self._extract_segment_info(segment) 687 688 # If we already have a STAC item, just use it. Otherwise, query to find the matching STAC items 689 if isinstance(segment, xr.Dataset): 690 properties = {} 691 if frame: 692 properties['opr:frame'] = frame 693 694 # Query STAC collection for CSARP_layer files matching this specific segment 695 696 # Get items from this specific segment 697 stac_items = self.query_frames(collections=[collection], segment_paths=[segment_path], properties=properties) 698 699 # Filter for items that have CSARP_layer assets 700 layer_items = [] 701 for idx, item in stac_items.iterrows(): 702 if 'CSARP_layer' in item['assets']: 703 layer_items.append(item) 704 else: 705 layer_items = [segment] if 'CSARP_layer' in segment.get('assets', {}) else [] 706 707 if not layer_items: 708 if raise_errors: 709 raise ValueError(f"No CSARP_layer files found for segment path {segment_path} in collection {collection}") 710 else: 711 return {} 712 713 # Load each layer file and combine them 714 layer_frames = [] 715 for item in layer_items: 716 layer_asset = item['assets']['CSARP_layer'] 717 if layer_asset and 'href' in layer_asset: 718 url = layer_asset['href'] 719 try: 720 layer_ds = self.load_layers_file(url) 721 layer_frames.append(layer_ds) 722 except Exception as e: 723 raise e # TODO 724 print(f"Warning: Failed to load layer file {url}: {e}") 725 continue 726 727 if not layer_frames: 728 if raise_errors: 729 raise ValueError(f"No valid CSARP_layer files could be loaded for segment {segment_path} in collection {collection}") 730 else: 731 return {} 732 733 # Concatenate all layer frames along slow_time dimension 734 layers_segment = xr.concat(layer_frames, dim='slow_time', combine_attrs='drop_conflicts', data_vars='minimal') 735 layers_segment = layers_segment.sortby('slow_time') 736 737 # Trim to bounds of the original dataset 738 layers_segment = self._trim_to_bounds(layers_segment, segment) 739 740 # Find the layer organization file to map layer IDs to names 741 # Layer organization files are one per directory. For example, the layer file: 742 # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/Data_20180105_03_003.mat 743 # would have a layer organization file at: 744 # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/layer_20180105_03.mat 745 746 layer_organization_file_url = re.sub(r'Data_(\d{8}_\d{2})_\d{3}.mat', r'layer_\1.mat', url) 747 layer_organization_ds = self._load_layer_organization(self._open_file(layer_organization_file_url)) 748 749 # Split into separate layers by ID 750 layers = {} 751 752 layer_ids = np.unique(layers_segment['layer']) 753 754 for i, layer_id in enumerate(layer_ids): 755 layer_id_int = int(layer_id) 756 layer_name = str(layer_organization_ds['lyr_name'].sel(lyr_id=layer_id_int).values.item().squeeze()) 757 layer_group = str(layer_organization_ds['lyr_group_name'].sel(lyr_id=layer_id_int).values.item().squeeze()) 758 if layer_group == '[]': 759 layer_group = '' 760 layer_display_name = f"{layer_group}:{layer_name}" 761 762 layer_ds = layers_segment.sel(layer=layer_id).drop_vars('layer') 763 # Only add non-empty layers with at least some non-NaN data 764 if layer_ds.sizes.get('slow_time', 0) > 0: 765 if not layer_ds.to_array().isnull().all(): 766 layers[layer_display_name] = layer_ds 767 768 return layers 769 770 def _trim_to_bounds(self, ds: xr.Dataset, ref: Union[xr.Dataset, dict]) -> xr.Dataset: 771 start_time, end_time = None, None 772 if isinstance(ref, xr.Dataset) and 'slow_time' in ref.coords: 773 start_time = ref['slow_time'].min() 774 end_time = ref['slow_time'].max() 775 else: 776 properties = ref.get('properties', {}) 777 if 'start_datetime' in properties and 'end_datetime' in properties: 778 start_time = pd.to_datetime(properties['start_datetime']) 779 end_time = pd.to_datetime(properties['end_datetime']) 780 781 if start_time: 782 return ds.sel(slow_time=slice(start_time, end_time)) 783 else: 784 return ds 785 786 def load_layers_file(self, url: str) -> xr.Dataset: 787 """ 788 Load layer data from CSARP_layer file. 789 790 Parameters 791 ---------- 792 url : str 793 URL or path to layer file. 794 795 Returns 796 ------- 797 xr.Dataset 798 Layer data with coordinates slow_time and layer, and data variables 799 twtt, quality, type, lat, lon, and elev. 800 Layer data in a standardized format with coordinates: 801 - slow_time: GPS time converted to datetime 802 And data variables: 803 - twtt: Two-way travel time for each layer 804 - quality: Quality values for each layer 805 - type: Type values for each layer 806 - lat, lon, elev: Geographic coordinates 807 - id: Layer IDs 808 """ 809 810 file = self._open_file(url) 811 812 ds = self._load_layers(file) 813 814 # Add the source URL as an attribute 815 ds.attrs['source_url'] = url 816 817 # Apply common manipulations to match the expected structure 818 # Convert GPS time to datetime coordinate 819 if 'gps_time' in ds.variables: 820 slow_time_dt = pd.to_datetime(ds['gps_time'].values, unit='s') 821 ds = ds.assign_coords(slow_time=('slow_time', slow_time_dt)) 822 823 # Set slow_time as the main coordinate and remove gps_time from data_vars 824 if 'slow_time' not in ds.dims: 825 ds = ds.swap_dims({'gps_time': 'slow_time'}) 826 827 # Remove gps_time from data_vars if it exists there to avoid conflicts 828 if ('gps_time' in ds.data_vars) or ('gps_time' in ds.coords): 829 ds = ds.drop_vars('gps_time') 830 831 # Sort by slow_time if it exists 832 if 'slow_time' in ds.coords: 833 ds = ds.sortby('slow_time') 834 835 return ds 836 837 def _load_layers(self, file) -> xr.Dataset: 838 """ 839 Load layer data file using hdf5storage 840 841 Parameters: 842 file : str 843 Path to the layer file 844 Returns: xr.Dataset 845 """ 846 847 d = hdf5storage.loadmat(file, appendmat=False) 848 849 # Ensure 'id' is at least 1D for the layer dimension 850 id_data = np.atleast_1d(d['id'].squeeze()) 851 852 # For 2D arrays with (layer, slow_time) dimensions, ensure they remain 2D 853 # even when there's only one layer 854 quality_data = np.atleast_2d(d['quality'].squeeze()) 855 if quality_data.shape[0] == 1 or quality_data.ndim == 1: 856 quality_data = quality_data.reshape(len(id_data), -1) 857 858 twtt_data = np.atleast_2d(d['twtt'].squeeze()) 859 if twtt_data.shape[0] == 1 or twtt_data.ndim == 1: 860 twtt_data = twtt_data.reshape(len(id_data), -1) 861 862 type_data = np.atleast_2d(d['type'].squeeze()) 863 if type_data.shape[0] == 1 or type_data.ndim == 1: 864 type_data = type_data.reshape(len(id_data), -1) 865 866 ds = xr.Dataset({ 867 'file_type': ((), d['file_type'].squeeze()), 868 'file_version': ((), d['file_version'].squeeze()), 869 'elev': (('slow_time',), d['elev'].squeeze()), 870 #'gps_source': ((), d['gps_source'].squeeze()), 871 'gps_time': (('slow_time',), d['gps_time'].squeeze()), 872 'id': (('layer',), id_data), 873 'lat': (('slow_time',), d['lat'].squeeze()), 874 'lon': (('slow_time',), d['lon'].squeeze()), 875 'quality': (('layer', 'slow_time'), quality_data), 876 'twtt': (('layer', 'slow_time'), twtt_data), 877 'type': (('layer', 'slow_time'), type_data), 878 }) 879 880 ds = ds.assign_coords({'layer': ds['id'], 'slow_time': ds['gps_time']}) 881 882 return ds 883 884 def _load_layer_organization(self, file) -> xr.Dataset: 885 """ 886 Load a layer organization file 887 888 Parameters 889 ---------- 890 file : str 891 Path to the HDF5 layer organization file 892 893 Returns 894 ------- 895 xr.Dataset 896 Raw layer data from HDF5 file 897 """ 898 d = hdf5storage.loadmat(file, appendmat=False) 899 ds = xr.Dataset({ 900 'file_type': ((), d['file_type'].squeeze()), 901 'file_version': ((), d['file_version'].squeeze()), 902 'lyr_age': (('lyr_id',), np.atleast_1d(d['lyr_age'].squeeze())), 903 'lyr_age_source': (('lyr_id',), np.atleast_1d(d['lyr_age_source'].squeeze())), 904 'lyr_desc': (('lyr_id',), np.atleast_1d(d['lyr_desc'].squeeze())), 905 'lyr_group_name': (('lyr_id',), np.atleast_1d(d['lyr_group_name'].squeeze())), 906 'lyr_id': (('lyr_id',), np.atleast_1d(d['lyr_id'].squeeze())), 907 'lyr_name': (('lyr_id',), np.atleast_1d(d['lyr_name'].squeeze())), 908 'lyr_order': (('lyr_id',), np.atleast_1d(d['lyr_order'].squeeze())), 909 }, attrs={'param': d['param']}) 910 911 ds = ds.set_index(lyr_id='lyr_id') 912 913 return ds 914 915 def _get_layers_db(self, flight: Union[xr.Dataset, dict], include_geometry=True, raise_errors=True) -> dict: 916 """ 917 Load layer picks from OPS database API. 918 919 Parameters 920 ---------- 921 flight : xr.Dataset or dict 922 Radar frame Dataset or STAC item. 923 include_geometry : bool, optional 924 If True, include geometry information in layers. 925 raise_errors : bool, optional 926 If True, raise errors when layers cannot be found. 927 928 Returns 929 ------- 930 dict 931 Dictionary mapping layer names to layer Datasets. 932 """ 933 934 collection, segment_path, _ = self._extract_segment_info(flight) 935 936 if 'Antarctica' in collection: 937 location = 'antarctic' 938 elif 'Greenland' in collection: 939 location = 'arctic' 940 else: 941 raise ValueError("Dataset does not belong to a recognized location (Antarctica or Greenland).") 942 943 layer_points = ops_api.get_layer_points( 944 segment_name=segment_path, 945 season_name=collection, 946 location=location, 947 include_geometry=include_geometry 948 ) 949 950 if layer_points['status'] != 1: 951 if raise_errors: 952 raise ValueError(f"Failed to fetch layer points. Received response with status {layer_points['status']}.") 953 else: 954 return {} 955 956 layer_ds_raw = xr.Dataset( 957 {k: (['gps_time'], v) for k, v in layer_points['data'].items() if k != 'gps_time'}, 958 coords={'gps_time': layer_points['data']['gps_time']} 959 ) 960 # Split into a dictionary of layers based on lyr_id 961 layer_ids = set(layer_ds_raw['lyr_id'].to_numpy()) 962 layer_ids = [int(layer_id) for layer_id in layer_ids if not np.isnan(layer_id)] 963 964 # Get the mapping of layer IDs to names 965 if self.db_layers_metadata is None: 966 layer_metadata = ops_api.get_layer_metadata() 967 if layer_metadata['status'] == 1: 968 df = pd.DataFrame(layer_metadata['data']) 969 df['display_name'] = df['lyr_group_name'] + ":" + df['lyr_name'] 970 self.db_layers_metadata = df 971 else: 972 raise ValueError("Failed to fetch layer metadata from OPS API.") 973 974 layers = {} 975 for layer_id in layer_ids: 976 layer_name = self.db_layers_metadata.loc[self.db_layers_metadata['lyr_id'] == layer_id, 'display_name'].values[0] 977 978 layer_ds = layer_ds_raw.where(layer_ds_raw['lyr_id'] == layer_id, drop=True) 979 980 layer_ds = layer_ds.sortby('gps_time') 981 982 layer_ds = layer_ds.rename({'gps_time': 'slow_time'}) 983 layer_ds = layer_ds.set_coords(['slow_time']) 984 985 layer_ds['slow_time'] = pd.to_datetime(layer_ds['slow_time'].values, unit='s') 986 987 # Filter to the same time range as flight 988 layer_ds = self._trim_to_bounds(layer_ds, flight) 989 990 # Only add non-empty layers with at least some non-NaN data 991 if layer_ds.sizes.get('slow_time', 0) > 0: 992 if not layer_ds.to_array().isnull().all(): 993 layers[layer_name] = layer_ds 994 995 return layers 996 997 def get_layers(self, ds : Union[xr.Dataset, dict], source: str = 'auto', include_geometry=True, errors='warn') -> dict: 998 """ 999 Load layer picks from files or database. 1000 1001 Tries files first, falls back to database if source='auto'. 1002 1003 Parameters 1004 ---------- 1005 ds : Union[xr.Dataset, dict] 1006 Radar frame Dataset or STAC item. 1007 source : {'auto', 'files', 'db'}, optional 1008 Source for layers. 'auto' tries files then falls back to database. 1009 include_geometry : bool, optional 1010 If True, include geometry information when fetching from the API. 1011 errors : str, optional 1012 How to handle missing layer data: 'warn' (default) returns None with a warning, 1013 'error' raises an exception. 1014 1015 Returns 1016 ------- 1017 dict or None 1018 A dictionary mapping layer IDs to their corresponding data, or None if no layers 1019 found and errors='warn'. 1020 """ 1021 collection, segment_path, _ = self._extract_segment_info(ds) 1022 1023 if source == 'auto': 1024 # Try to get layers from files first 1025 try: 1026 layers = self._get_layers_files(ds, raise_errors=True) 1027 return layers 1028 except Exception: 1029 # Fallback to API if no layers found in files 1030 try: 1031 return self._get_layers_db(ds, include_geometry=include_geometry, raise_errors=True) 1032 except Exception as e: 1033 if errors == 'error': 1034 raise 1035 else: 1036 warnings.warn(f"No layer data found for {collection}/{segment_path}: {e}") 1037 return None 1038 elif source == 'files': 1039 try: 1040 return self._get_layers_files(ds, raise_errors=True) 1041 except Exception as e: 1042 if errors == 'error': 1043 raise 1044 else: 1045 warnings.warn(f"No layer files found for {collection}/{segment_path}: {e}") 1046 return None 1047 elif source == 'db': 1048 try: 1049 return self._get_layers_db(ds, include_geometry=include_geometry, raise_errors=True) 1050 except Exception as e: 1051 if errors == 'error': 1052 raise 1053 else: 1054 warnings.warn(f"No layer data in DB for {collection}/{segment_path}: {e}") 1055 return None 1056 else: 1057 raise ValueError("Invalid source specified. Must be one of: 'auto', 'files', 'db'.")
78 def __init__(self, 79 collection_url: str = "https://data.cresis.ku.edu/data/", 80 cache_dir: str = None, 81 stac_parquet_href: str = None, 82 sync_catalogs: bool = True): 83 """ 84 Initialize connection to OPR data archive. 85 86 Parameters 87 ---------- 88 collection_url : str, optional 89 Base URL for OPR data collection. 90 cache_dir : str, optional 91 Directory to cache downloaded data for faster repeated access. 92 stac_parquet_href : str, optional 93 Path or URL pattern to STAC catalog parquet files. When *None* 94 (default), the local cache is used if available, falling back to 95 the S3 catalog. 96 sync_catalogs : bool, optional 97 If True (default), sync OPR STAC catalogs to a local cache 98 before resolving the catalog path. Uses ETag-based change 99 detection so repeated calls are cheap (single HTTP request). 100 """ 101 self.collection_url = collection_url 102 self.cache_dir = cache_dir 103 self._user_set_href = stac_parquet_href is not None 104 if sync_catalogs and not self._user_set_href: 105 sync_opr_catalogs() 106 self.stac_parquet_href = stac_parquet_href or get_opr_catalog_path() 107 108 self.fsspec_cache_kwargs = {} 109 self.fsspec_url_prefix = '' 110 if cache_dir: 111 self.fsspec_cache_kwargs['cache_storage'] = cache_dir 112 self.fsspec_cache_kwargs['check_files'] = True 113 self.fsspec_url_prefix = 'filecache::' 114 115 self.db_layers_metadata = None # Cache for OPS database layers metadata 116 117 # Lazy-initialized DuckDB client for STAC queries — reusing a single 118 # session caches remote file metadata (parquet footers) across calls 119 self._duckdb_client = None
Initialize connection to OPR data archive.
Parameters
- collection_url (str, optional): Base URL for OPR data collection.
- cache_dir (str, optional): Directory to cache downloaded data for faster repeated access.
- stac_parquet_href (str, optional): Path or URL pattern to STAC catalog parquet files. When None (default), the local cache is used if available, falling back to the S3 catalog.
- sync_catalogs (bool, optional): If True (default), sync OPR STAC catalogs to a local cache before resolving the catalog path. Uses ETag-based change detection so repeated calls are cheap (single HTTP request).
121 @property 122 def duckdb_client(self): 123 """Lazy-initialized DuckDB client, recreated after pickling.""" 124 if self._duckdb_client is None: 125 self._duckdb_client = DuckdbClient() 126 return self._duckdb_client
Lazy-initialized DuckDB client, recreated after pickling.
157 def query_frames(self, collections: list[str] = None, segment_paths: list[str] = None, 158 geometry = None, date_range: tuple = None, properties: dict = {}, 159 max_items: int = None, exclude_geometry: bool = False, 160 search_kwargs: dict = {}) -> gpd.GeoDataFrame: 161 """ 162 Query STAC catalog for radar frames matching search criteria. 163 164 Multiple parameters are combined with AND logic. Lists of values passed to a 165 single parameter are treated with OR logic (any value matches). 166 167 Parameters 168 ---------- 169 collections : list[str] or str, optional 170 Collection name(s) (e.g., "2022_Antarctica_BaslerMKB"). 171 segment_paths : list[str] or str, optional 172 Segment path(s) in format "YYYYMMDD_NN" (e.g., "20230126_01"). 173 geometry : shapely geometry or dict, optional 174 Geospatial geometry to filter by intersection. 175 date_range : str, optional 176 Date range in ISO format (e.g., "2021-01-01/2025-06-01"). 177 properties : dict, optional 178 Additional STAC properties to filter by. 179 max_items : int, optional 180 Maximum number of items to return. 181 exclude_geometry : bool, optional 182 If True, exclude geometry field to reduce response size. 183 search_kwargs : dict, optional 184 Additional keyword arguments for STAC search. 185 186 Returns 187 ------- 188 gpd.GeoDataFrame or None 189 GeoDataFrame of matching STAC items with columns for collection, 190 geometry, properties, and assets. Returns None if no matches found. 191 """ 192 193 search_params = {} 194 195 # Exclude geometry -- do not return the geometry field to reduce response size 196 if exclude_geometry: 197 search_params['exclude'] = ['geometry'] 198 199 # Handle collections (seasons) 200 if collections is not None: 201 search_params['collections'] = [collections] if isinstance(collections, str) else collections 202 203 # Handle geometry filtering 204 if geometry is not None: 205 if hasattr(geometry, '__geo_interface__'): 206 geometry = geometry.__geo_interface__ 207 208 # Fix geometries that cross the antimeridian 209 geometry = antimeridian.fix_geojson(geometry, reverse=True) 210 211 search_params['intersects'] = geometry 212 213 # Handle date range filtering 214 if date_range is not None: 215 search_params['datetime'] = date_range 216 217 # Handle max_items 218 if max_items is not None: 219 search_params['limit'] = max_items 220 221 # Handle segment_paths filtering using CQL2 222 filter_conditions = [] 223 224 if segment_paths is not None: 225 segment_paths = [segment_paths] if isinstance(segment_paths, str) else segment_paths 226 227 # Create OR conditions for segment paths 228 segment_conditions = [] 229 for segment_path in segment_paths: 230 try: 231 date_str, segment_num_str = segment_path.split('_') 232 segment_num = int(segment_num_str) 233 234 # Create AND condition for this specific segment 235 segment_condition = { 236 "op": "and", 237 "args": [ 238 { 239 "op": "=", 240 "args": [{"property": "opr:date"}, date_str] 241 }, 242 { 243 "op": "=", 244 "args": [{"property": "opr:segment"}, segment_num] 245 } 246 ] 247 } 248 segment_conditions.append(segment_condition) 249 except ValueError: 250 print(f"Warning: Invalid segment_path format '{segment_path}'. Expected format: YYYYMMDD_NN") 251 continue 252 253 if segment_conditions: 254 if len(segment_conditions) == 1: 255 filter_conditions.append(segment_conditions[0]) 256 else: 257 # Multiple segments - combine with OR 258 filter_conditions.append({ 259 "op": "or", 260 "args": segment_conditions 261 }) 262 263 # Add any additional property filters 264 for key, value in properties.items(): 265 filter_conditions.append({ 266 "op": "=", 267 "args": [{"property": key}, value] 268 }) 269 270 # Combine all filter conditions with AND 271 if filter_conditions: 272 if len(filter_conditions) == 1: 273 filter_expr = filter_conditions[0] 274 else: 275 filter_expr = { 276 "op": "and", 277 "args": filter_conditions 278 } 279 280 search_params['filter'] = filter_expr 281 282 # Add any extra kwargs to search 283 search_params.update(search_kwargs) 284 285 #print(search_params) # TODO: Remove 286 287 # Perform the search 288 # from rustac import DuckdbClient 289 items = self.duckdb_client.search(self.stac_parquet_href, **search_params) 290 if isinstance(items, dict): 291 items = items['features'] 292 293 if not items or len(items) == 0: 294 warnings.warn("No items found matching the query criteria", UserWarning) 295 return None 296 297 # Convert to GeoDataFrame 298 items_df = gpd.GeoDataFrame(items) 299 # Set index 300 items_df = items_df.set_index(items_df['id']) 301 items_df.index.name = 'stac_item_id' 302 # Set the geometry column 303 if 'geometry' in items_df.columns and not exclude_geometry: 304 items_df = items_df.set_geometry(items_df['geometry'].apply(shapely.geometry.shape)) 305 items_df.crs = "EPSG:4326" 306 307 # Reorder the columns, leaving any extra columns at the end 308 desired_order = ['collection', 'geometry', 'properties', 'assets'] 309 items_df = items_df[[col for col in desired_order if col in items_df.columns] + list(items_df.columns.difference(desired_order))] 310 311 return items_df
Query STAC catalog for radar frames matching search criteria.
Multiple parameters are combined with AND logic. Lists of values passed to a single parameter are treated with OR logic (any value matches).
Parameters
- collections (list[str] or str, optional): Collection name(s) (e.g., "2022_Antarctica_BaslerMKB").
- segment_paths (list[str] or str, optional): Segment path(s) in format "YYYYMMDD_NN" (e.g., "20230126_01").
- geometry (shapely geometry or dict, optional): Geospatial geometry to filter by intersection.
- date_range (str, optional): Date range in ISO format (e.g., "2021-01-01/2025-06-01").
- properties (dict, optional): Additional STAC properties to filter by.
- max_items (int, optional): Maximum number of items to return.
- exclude_geometry (bool, optional): If True, exclude geometry field to reduce response size.
- search_kwargs (dict, optional): Additional keyword arguments for STAC search.
Returns
- gpd.GeoDataFrame or None: GeoDataFrame of matching STAC items with columns for collection, geometry, properties, and assets. Returns None if no matches found.
313 def load_frames(self, stac_items: gpd.GeoDataFrame, 314 data_product: str = "CSARP_standard", 315 image: Union[int, None] = None, 316 merge_flights: bool = False, 317 skip_errors: bool = False, 318 allow_unlisted_products: bool = False 319 ) -> Union[list[xr.Dataset], xr.Dataset]: 320 """ 321 Load multiple radar frames from STAC items. 322 323 Parameters 324 ---------- 325 stac_items : gpd.GeoDataFrame 326 STAC items returned from query_frames. 327 data_product : str, optional 328 Data product to load (e.g., "CSARP_standard", "CSARP_qlook"). 329 image : int or None, optional 330 The image number to load for each frame. If None (default), loads the 331 combined image. If specified, `allow_unlisted_products` must be True. 332 merge_flights : bool, optional 333 If True, merge frames from the same segment into single Datasets. 334 skip_errors : bool, optional 335 If True, skip failed frames and continue loading others. 336 allow_unlisted_products : bool, optional 337 If True, attempt to load the specified data product even if it's not 338 listed in the item's assets. See `load_frame` for details. 339 340 Returns 341 ------- 342 list[xr.Dataset] or xr.Dataset 343 List of radar frame Datasets, or list of merged Datasets if 344 merge_flights=True. 345 """ 346 frames = [] 347 348 for idx, item in stac_items.iterrows(): 349 try: 350 frame = self.load_frame(item, data_product, image=image, allow_unlisted_products=allow_unlisted_products) 351 frames.append(frame) 352 except Exception as e: 353 print(f"Error loading frame for item {item.get('id', 'unknown')}: {e}") 354 if skip_errors: 355 continue 356 else: 357 raise e 358 359 if merge_flights: 360 return opr_tools.merge_frames(frames) 361 else: 362 return frames
Load multiple radar frames from STAC items.
Parameters
- stac_items (gpd.GeoDataFrame): STAC items returned from query_frames.
- data_product (str, optional): Data product to load (e.g., "CSARP_standard", "CSARP_qlook").
- image (int or None, optional):
The image number to load for each frame. If None (default), loads the
combined image. If specified,
allow_unlisted_productsmust be True. - merge_flights (bool, optional): If True, merge frames from the same segment into single Datasets.
- skip_errors (bool, optional): If True, skip failed frames and continue loading others.
- allow_unlisted_products (bool, optional):
If True, attempt to load the specified data product even if it's not
listed in the item's assets. See
load_framefor details.
Returns
- list[xr.Dataset] or xr.Dataset: List of radar frame Datasets, or list of merged Datasets if merge_flights=True.
364 def load_frame(self, stac_item, data_product: str = "CSARP_standard", 365 image: Union[int, None] = None, 366 allow_unlisted_products: bool = False) -> xr.Dataset: 367 """ 368 Load a single radar frame from a STAC item. 369 370 Parameters 371 ---------- 372 stac_item : dict or pd.Series 373 STAC item containing asset URLs. 374 data_product : str, optional 375 Data product to load (e.g., "CSARP_standard", "CSARP_qlook"). 376 image : int or None, optional 377 The image number to load for this frame. If None (default), loads the 378 combined image. If specified, `allow_unlisted_products` must be True. 379 allow_unlisted_products : bool, optional 380 If True, attempt to load the specified data product even if it's not 381 listed in the item's assets. This can be useful for loading non-standard 382 products. If set to True and the data product is not found in the STAC 383 item assets, the method will attempt to construct the URL based on any 384 available CSARP_* asset. (If the frame is entirely unlisted, you can use 385 `load_frame_url` instead.) 386 Returns 387 ------- 388 xr.Dataset 389 Radar frame with coordinates slow_time and twtt, and data variables 390 including Data, Latitude, Longitude, Elevation, Surface, etc. 391 """ 392 393 assets = stac_item['assets'] 394 395 # Get the data asset 396 data_asset = assets.get(data_product) 397 if not data_asset: 398 if not allow_unlisted_products: 399 available_assets = list(assets.keys()) 400 raise ValueError(f"{data_product} asset not found in STAC item. Available assets: {available_assets}") 401 else: 402 # Find any available CSARP_* asset to use as a template for constructing the URL 403 template_asset_name = None 404 template_asset_url = None 405 for asset_name, asset_info in assets.items(): 406 if asset_name.startswith('CSARP_'): 407 template_asset_name = asset_name 408 template_asset_url = asset_info.get('href') 409 break 410 411 if not template_asset_url: 412 available_assets = list(assets.keys()) 413 raise ValueError(f"No CSARP_* asset found in STAC item to use as a template for constructing URL for {data_product}. Available assets: {available_assets}") 414 415 url = template_asset_url.replace(template_asset_name, data_product) 416 else: 417 # The asset does exist in the STAC item, so just get the URL from the asset 418 url = data_asset.get('href') 419 420 if not url: 421 raise ValueError(f"No href found in {data_product} asset") 422 423 # If a specific image is requested, modify the URL to point to that image 424 if image is not None: 425 if not allow_unlisted_products: 426 raise ValueError("Specifying an image number requires allow_unlisted_products=True to construct the URL") 427 url = url.replace("Data_", f"Data_img_{image:02d}_") 428 429 # Load the frame using the existing method 430 return self.load_frame_url(url)
Load a single radar frame from a STAC item.
Parameters
- stac_item (dict or pd.Series): STAC item containing asset URLs.
- data_product (str, optional): Data product to load (e.g., "CSARP_standard", "CSARP_qlook").
- image (int or None, optional):
The image number to load for this frame. If None (default), loads the
combined image. If specified,
allow_unlisted_productsmust be True. - allow_unlisted_products (bool, optional):
If True, attempt to load the specified data product even if it's not
listed in the item's assets. This can be useful for loading non-standard
products. If set to True and the data product is not found in the STAC
item assets, the method will attempt to construct the URL based on any
available CSARP_* asset. (If the frame is entirely unlisted, you can use
load_frame_urlinstead.)
Returns
- xr.Dataset: Radar frame with coordinates slow_time and twtt, and data variables including Data, Latitude, Longitude, Elevation, Surface, etc.
432 def load_frame_url(self, url: str) -> xr.Dataset: 433 """ 434 Load a radar frame directly from a URL. 435 436 Automatically detects and handles both HDF5 and legacy MATLAB formats. 437 438 Parameters 439 ---------- 440 url : str 441 URL to radar frame data file (.mat). 442 443 Returns 444 ------- 445 xr.Dataset 446 Radar frame with CF-compliant metadata and citation information. 447 """ 448 449 file = self._open_file(url) 450 451 filetype = None 452 try: 453 ds = self._load_frame_hdf5(file) 454 filetype = 'hdf5' 455 except OSError: 456 ds = self._load_frame_matlab(file) 457 filetype = 'matlab' 458 459 # Add the source URL as an attribute 460 ds.attrs['source_url'] = url 461 462 # Apply CF-compliant attributes 463 ds = apply_cf_compliant_attrs(ds) 464 465 # Get the season and segment from the URL 466 match = re.search(r'(\d{4}_\w+_[A-Za-z0-9]+)\/([\w_]+)\/[\d_]+\/[\w]+(\d{8}_\d{2}_\d{3})', url) 467 if match: 468 collection, data_product, granule = match.groups() 469 date, segment_id, frame_id = granule.split('_') 470 ds.attrs['collection'] = collection 471 ds.attrs['data_product'] = data_product 472 ds.attrs['granule'] = granule 473 ds.attrs['segment_path'] = f"{date}_{segment_id}" 474 ds.attrs['date_str'] = date 475 ds.attrs['segment'] = int(segment_id) 476 ds.attrs['frame'] = int(frame_id) 477 478 # Load citation information 479 result = ops_api.get_segment_metadata(segment_name=ds.attrs['segment_path'], season_name=collection) 480 if result: 481 if isinstance(result['data'], str): 482 warnings.warn(f"Warning: Unexpected result from ops_api: {result['data']}", UserWarning) 483 else: 484 result_data = {} 485 for key, value in result['data'].items(): 486 if len(value) == 1: 487 result_data[key] = value[0] 488 elif len(value) > 1: 489 result_data[key] = set(value) 490 491 if 'dois' in result_data: 492 ds.attrs['doi'] = result_data['dois'] 493 if 'rors' in result_data: 494 ds.attrs['ror'] = result_data['rors'] 495 if 'funding_sources' in result_data: 496 ds.attrs['funder_text'] = result_data['funding_sources'] 497 498 # Add the rest of the Matlab parameters 499 if filetype == 'hdf5': 500 ds.attrs['mimetype'] = 'application/x-hdf5' 501 ds.attrs.update(decode_hdf5_matlab_variable(h5py.File(file, 'r'), 502 skip_variables=True, 503 skip_errors=True)) 504 elif filetype == 'matlab': 505 ds.attrs['mimetype'] = 'application/x-matlab-data' 506 ds.attrs.update(extract_legacy_mat_attributes(file, 507 skip_keys=ds.keys(), 508 skip_errors=True)) 509 510 return ds
Load a radar frame directly from a URL.
Automatically detects and handles both HDF5 and legacy MATLAB formats.
Parameters
- url (str): URL to radar frame data file (.mat).
Returns
- xr.Dataset: Radar frame with CF-compliant metadata and citation information.
604 def get_collections(self) -> list: 605 """ 606 Get all available collections (seasons/campaigns) in STAC catalog. 607 608 Returns 609 ------- 610 list[dict] 611 List of collection dictionaries with id, description, and extent. 612 """ 613 614 return self.duckdb_client.get_collections(self.stac_parquet_href)
Get all available collections (seasons/campaigns) in STAC catalog.
Returns
- list[dict]: List of collection dictionaries with id, description, and extent.
616 def get_segments(self, collection_id: str) -> list: 617 """ 618 Get all segments (flights) within a collection. 619 620 Parameters 621 ---------- 622 collection_id : str 623 STAC collection ID (e.g., "2022_Antarctica_BaslerMKB"). 624 625 Returns 626 ------- 627 list[dict] 628 List of segment dictionaries with segment_path, date, flight_number, 629 and frame count. 630 """ 631 # Query STAC API for all items in collection (exclude geometry for better performance) 632 items = self.query_frames(collections=[collection_id], exclude_geometry=True) 633 634 if items is None or len(items) == 0: 635 print(f"No items found in collection '{collection_id}'") 636 return [] 637 638 # Group items by segment (opr:date + opr:segment) 639 segments = {} 640 for idx, item in items.iterrows(): 641 properties = item['properties'] 642 date = properties['opr:date'] 643 flight_num = properties['opr:segment'] 644 645 if date and flight_num is not None: 646 segment_path = f"{date}_{flight_num:02d}" 647 648 if segment_path not in segments: 649 segments[segment_path] = { 650 'segment_path': segment_path, 651 'date': date, 652 'flight_number': flight_num, 653 'collection': collection_id, 654 'frames': [], 655 'item_count': 0 656 } 657 658 segments[segment_path]['frames'].append(properties.get('opr:frame')) 659 segments[segment_path]['item_count'] += 1 660 661 # Sort segments by date and flight number 662 segment_list = list(segments.values()) 663 segment_list.sort(key=lambda x: (x['date'], x['flight_number'])) 664 665 return segment_list
Get all segments (flights) within a collection.
Parameters
- collection_id (str): STAC collection ID (e.g., "2022_Antarctica_BaslerMKB").
Returns
- list[dict]: List of segment dictionaries with segment_path, date, flight_number, and frame count.
786 def load_layers_file(self, url: str) -> xr.Dataset: 787 """ 788 Load layer data from CSARP_layer file. 789 790 Parameters 791 ---------- 792 url : str 793 URL or path to layer file. 794 795 Returns 796 ------- 797 xr.Dataset 798 Layer data with coordinates slow_time and layer, and data variables 799 twtt, quality, type, lat, lon, and elev. 800 Layer data in a standardized format with coordinates: 801 - slow_time: GPS time converted to datetime 802 And data variables: 803 - twtt: Two-way travel time for each layer 804 - quality: Quality values for each layer 805 - type: Type values for each layer 806 - lat, lon, elev: Geographic coordinates 807 - id: Layer IDs 808 """ 809 810 file = self._open_file(url) 811 812 ds = self._load_layers(file) 813 814 # Add the source URL as an attribute 815 ds.attrs['source_url'] = url 816 817 # Apply common manipulations to match the expected structure 818 # Convert GPS time to datetime coordinate 819 if 'gps_time' in ds.variables: 820 slow_time_dt = pd.to_datetime(ds['gps_time'].values, unit='s') 821 ds = ds.assign_coords(slow_time=('slow_time', slow_time_dt)) 822 823 # Set slow_time as the main coordinate and remove gps_time from data_vars 824 if 'slow_time' not in ds.dims: 825 ds = ds.swap_dims({'gps_time': 'slow_time'}) 826 827 # Remove gps_time from data_vars if it exists there to avoid conflicts 828 if ('gps_time' in ds.data_vars) or ('gps_time' in ds.coords): 829 ds = ds.drop_vars('gps_time') 830 831 # Sort by slow_time if it exists 832 if 'slow_time' in ds.coords: 833 ds = ds.sortby('slow_time') 834 835 return ds
Load layer data from CSARP_layer file.
Parameters
- url (str): URL or path to layer file.
Returns
- xr.Dataset: Layer data with coordinates slow_time and layer, and data variables
twtt, quality, type, lat, lon, and elev.
Layer data in a standardized format with coordinates:
- slow_time: GPS time converted to datetime And data variables:
- twtt: Two-way travel time for each layer
- quality: Quality values for each layer
- type: Type values for each layer
- lat, lon, elev: Geographic coordinates
- id: Layer IDs
997 def get_layers(self, ds : Union[xr.Dataset, dict], source: str = 'auto', include_geometry=True, errors='warn') -> dict: 998 """ 999 Load layer picks from files or database. 1000 1001 Tries files first, falls back to database if source='auto'. 1002 1003 Parameters 1004 ---------- 1005 ds : Union[xr.Dataset, dict] 1006 Radar frame Dataset or STAC item. 1007 source : {'auto', 'files', 'db'}, optional 1008 Source for layers. 'auto' tries files then falls back to database. 1009 include_geometry : bool, optional 1010 If True, include geometry information when fetching from the API. 1011 errors : str, optional 1012 How to handle missing layer data: 'warn' (default) returns None with a warning, 1013 'error' raises an exception. 1014 1015 Returns 1016 ------- 1017 dict or None 1018 A dictionary mapping layer IDs to their corresponding data, or None if no layers 1019 found and errors='warn'. 1020 """ 1021 collection, segment_path, _ = self._extract_segment_info(ds) 1022 1023 if source == 'auto': 1024 # Try to get layers from files first 1025 try: 1026 layers = self._get_layers_files(ds, raise_errors=True) 1027 return layers 1028 except Exception: 1029 # Fallback to API if no layers found in files 1030 try: 1031 return self._get_layers_db(ds, include_geometry=include_geometry, raise_errors=True) 1032 except Exception as e: 1033 if errors == 'error': 1034 raise 1035 else: 1036 warnings.warn(f"No layer data found for {collection}/{segment_path}: {e}") 1037 return None 1038 elif source == 'files': 1039 try: 1040 return self._get_layers_files(ds, raise_errors=True) 1041 except Exception as e: 1042 if errors == 'error': 1043 raise 1044 else: 1045 warnings.warn(f"No layer files found for {collection}/{segment_path}: {e}") 1046 return None 1047 elif source == 'db': 1048 try: 1049 return self._get_layers_db(ds, include_geometry=include_geometry, raise_errors=True) 1050 except Exception as e: 1051 if errors == 'error': 1052 raise 1053 else: 1054 warnings.warn(f"No layer data in DB for {collection}/{segment_path}: {e}") 1055 return None 1056 else: 1057 raise ValueError("Invalid source specified. Must be one of: 'auto', 'files', 'db'.")
Load layer picks from files or database.
Tries files first, falls back to database if source='auto'.
Parameters
- ds (Union[xr.Dataset, dict]): Radar frame Dataset or STAC item.
- source ({'auto', 'files', 'db'}, optional): Source for layers. 'auto' tries files then falls back to database.
- include_geometry (bool, optional): If True, include geometry information when fetching from the API.
- errors (str, optional): How to handle missing layer data: 'warn' (default) returns None with a warning, 'error' raises an exception.
Returns
- dict or None: A dictionary mapping layer IDs to their corresponding data, or None if no layers found and errors='warn'.