xopr.opr_access
Core data access interface for Open Polar Radar (OPR) datasets.
This module provides the primary interface for discovering, querying, and loading polar ice-penetrating radar data from the Open Polar Radar archive. It implements a STAC (SpatioTemporal Asset Catalog) based workflow for efficient data discovery and retrieval.
Primary Class
OPRConnection : Main interface for accessing OPR data Provides methods for querying STAC catalogs, loading radar frames, and retrieving layer picks from both file-based and database sources.
Typical Workflow
- Create an OPRConnection object, optionally with local caching
- Query the STAC catalog to find radar data matching your criteria
- Load radar frames as xarray Datasets
- Optionally load associated layer picks
- Process and visualize the data
See this notebook for an end-to-end example:
Examples
>>> import xopr
>>> # Connect to OPR with local caching
>>> opr = xopr.OPRConnection(cache_dir='radar_cache')
>>>
>>> # Query for data in a specific collection and segment
>>> stac_items = opr.query_frames(
... collections=['2022_Antarctica_BaslerMKB'],
... segment_paths=['20230109_01']
... )
>>>
>>> # Load the radar frames
>>> frames = opr.load_frames(stac_items)
>>>
>>> # Get layer picks (surface/bed)
>>> layers = opr.get_layers(frames[0])
See Also
xopr.geometry: Geographic utilities for region selection and projection
xopr.radar_util: Processing functions for radar data and layers
ops_api: Interface to the OPS database API
1""" 2Core data access interface for Open Polar Radar (OPR) datasets. 3 4This module provides the primary interface for discovering, querying, and loading 5polar ice-penetrating radar data from the Open Polar Radar archive. It implements 6a STAC (SpatioTemporal Asset Catalog) based workflow for efficient data discovery 7and retrieval. 8 9Primary Class 10------------- 11OPRConnection : Main interface for accessing OPR data 12 Provides methods for querying STAC catalogs, loading radar frames, 13 and retrieving layer picks from both file-based and database sources. 14 15Typical Workflow 16---------------- 171. Create an OPRConnection object, optionally with local caching 182. Query the STAC catalog to find radar data matching your criteria 193. Load radar frames as xarray Datasets 204. Optionally load associated layer picks 215. Process and visualize the data 22 23See this notebook for an end-to-end example: 24- https://docs.englacial.org/xopr/demo-notebook/ 25 26Examples 27-------- 28>>> import xopr 29>>> # Connect to OPR with local caching 30>>> opr = xopr.OPRConnection(cache_dir='radar_cache') 31>>> 32>>> # Query for data in a specific collection and segment 33>>> stac_items = opr.query_frames( 34... collections=['2022_Antarctica_BaslerMKB'], 35... segment_paths=['20230109_01'] 36... ) 37>>> 38>>> # Load the radar frames 39>>> frames = opr.load_frames(stac_items) 40>>> 41>>> # Get layer picks (surface/bed) 42>>> layers = opr.get_layers(frames[0]) 43 44See Also 45-------- 46xopr.geometry : Geographic utilities for region selection and projection 47xopr.radar_util : Processing functions for radar data and layers 48ops_api : Interface to the OPS database API 49""" 50 51import re 52import warnings 53from typing import Union 54 55import antimeridian 56import fsspec 57import geopandas as gpd 58import h5py 59import hdf5storage 60import numpy as np 61import pandas as pd 62import scipy.io 63import shapely 64import xarray as xr 65from rustac import DuckdbClient 66 67from . import opr_tools, ops_api 68from .cf_units import apply_cf_compliant_attrs 69from .matlab_attribute_utils import ( 70 decode_hdf5_matlab_variable, 71 extract_legacy_mat_attributes, 72) 73from .stac_cache import get_opr_catalog_path, sync_opr_catalogs 74 75 76class OPRConnection: 77 def __init__(self, 78 collection_url: str = "https://data.cresis.ku.edu/data/", 79 cache_dir: str = None, 80 stac_parquet_href: str = None, 81 sync_catalogs: bool = True): 82 """ 83 Initialize connection to OPR data archive. 84 85 Parameters 86 ---------- 87 collection_url : str, optional 88 Base URL for OPR data collection. 89 cache_dir : str, optional 90 Directory to cache downloaded data for faster repeated access. 91 stac_parquet_href : str, optional 92 Path or URL pattern to STAC catalog parquet files. When *None* 93 (default), the local cache is used if available, falling back to 94 the S3 catalog. 95 sync_catalogs : bool, optional 96 If True (default), sync OPR STAC catalogs to a local cache 97 before resolving the catalog path. Uses ETag-based change 98 detection so repeated calls are cheap (single HTTP request). 99 """ 100 self.collection_url = collection_url 101 self.cache_dir = cache_dir 102 self._user_set_href = stac_parquet_href is not None 103 if sync_catalogs and not self._user_set_href: 104 sync_opr_catalogs() 105 self.stac_parquet_href = stac_parquet_href or get_opr_catalog_path() 106 107 self.fsspec_cache_kwargs = {} 108 self.fsspec_url_prefix = '' 109 if cache_dir: 110 self.fsspec_cache_kwargs['cache_storage'] = cache_dir 111 self.fsspec_cache_kwargs['check_files'] = True 112 self.fsspec_url_prefix = 'filecache::' 113 114 self.db_layers_metadata = None # Cache for OPS database layers metadata 115 116 # Lazy-initialized DuckDB client for STAC queries — reusing a single 117 # session caches remote file metadata (parquet footers) across calls 118 self._duckdb_client = None 119 120 @property 121 def duckdb_client(self): 122 """Lazy-initialized DuckDB client, recreated after pickling.""" 123 if self._duckdb_client is None: 124 self._duckdb_client = DuckdbClient() 125 return self._duckdb_client 126 127 def __getstate__(self): 128 state = self.__dict__.copy() 129 state['_duckdb_client'] = None # DuckdbClient can't be pickled 130 return state 131 132 def _open_file(self, url): 133 """Helper method to open files with appropriate caching.""" 134 if self.fsspec_url_prefix: 135 return fsspec.open_local(f"{self.fsspec_url_prefix}{url}", filecache=self.fsspec_cache_kwargs) 136 else: 137 return fsspec.open_local(f"simplecache::{url}", simplecache=self.fsspec_cache_kwargs) 138 139 def _extract_segment_info(self, segment): 140 """Extract collection, segment_path, and frame from Dataset or dict.""" 141 if isinstance(segment, xr.Dataset): 142 return (segment.attrs.get('collection'), 143 segment.attrs.get('segment_path'), 144 segment.attrs.get('frame')) 145 else: 146 return (segment['collection'], 147 f"{segment['properties'].get('opr:date')}_{segment['properties'].get('opr:segment'):02d}", 148 segment['properties'].get('opr:frame')) 149 150 def query_frames(self, collections: list[str] = None, segment_paths: list[str] = None, 151 geometry = None, date_range: tuple = None, properties: dict = {}, 152 max_items: int = None, exclude_geometry: bool = False, 153 search_kwargs: dict = {}) -> gpd.GeoDataFrame: 154 """ 155 Query STAC catalog for radar frames matching search criteria. 156 157 Multiple parameters are combined with AND logic. Lists of values passed to a 158 single parameter are treated with OR logic (any value matches). 159 160 Parameters 161 ---------- 162 collections : list[str] or str, optional 163 Collection name(s) (e.g., "2022_Antarctica_BaslerMKB"). 164 segment_paths : list[str] or str, optional 165 Segment path(s) in format "YYYYMMDD_NN" (e.g., "20230126_01"). 166 geometry : shapely geometry or dict, optional 167 Geospatial geometry to filter by intersection. 168 date_range : str, optional 169 Date range in ISO format (e.g., "2021-01-01/2025-06-01"). 170 properties : dict, optional 171 Additional STAC properties to filter by. 172 max_items : int, optional 173 Maximum number of items to return. 174 exclude_geometry : bool, optional 175 If True, exclude geometry field to reduce response size. 176 search_kwargs : dict, optional 177 Additional keyword arguments for STAC search. 178 179 Returns 180 ------- 181 gpd.GeoDataFrame or None 182 GeoDataFrame of matching STAC items with columns for collection, 183 geometry, properties, and assets. Returns None if no matches found. 184 """ 185 186 search_params = {} 187 188 # Exclude geometry -- do not return the geometry field to reduce response size 189 if exclude_geometry: 190 search_params['exclude'] = ['geometry'] 191 192 # Handle collections (seasons) 193 if collections is not None: 194 search_params['collections'] = [collections] if isinstance(collections, str) else collections 195 196 # Handle geometry filtering 197 if geometry is not None: 198 if hasattr(geometry, '__geo_interface__'): 199 geometry = geometry.__geo_interface__ 200 201 # Fix geometries that cross the antimeridian 202 geometry = antimeridian.fix_geojson(geometry, reverse=True) 203 204 search_params['intersects'] = geometry 205 206 # Handle date range filtering 207 if date_range is not None: 208 search_params['datetime'] = date_range 209 210 # Handle max_items 211 if max_items is not None: 212 search_params['limit'] = max_items 213 214 # Handle segment_paths filtering using CQL2 215 filter_conditions = [] 216 217 if segment_paths is not None: 218 segment_paths = [segment_paths] if isinstance(segment_paths, str) else segment_paths 219 220 # Create OR conditions for segment paths 221 segment_conditions = [] 222 for segment_path in segment_paths: 223 try: 224 date_str, segment_num_str = segment_path.split('_') 225 segment_num = int(segment_num_str) 226 227 # Create AND condition for this specific segment 228 segment_condition = { 229 "op": "and", 230 "args": [ 231 { 232 "op": "=", 233 "args": [{"property": "opr:date"}, date_str] 234 }, 235 { 236 "op": "=", 237 "args": [{"property": "opr:segment"}, segment_num] 238 } 239 ] 240 } 241 segment_conditions.append(segment_condition) 242 except ValueError: 243 print(f"Warning: Invalid segment_path format '{segment_path}'. Expected format: YYYYMMDD_NN") 244 continue 245 246 if segment_conditions: 247 if len(segment_conditions) == 1: 248 filter_conditions.append(segment_conditions[0]) 249 else: 250 # Multiple segments - combine with OR 251 filter_conditions.append({ 252 "op": "or", 253 "args": segment_conditions 254 }) 255 256 # Add any additional property filters 257 for key, value in properties.items(): 258 filter_conditions.append({ 259 "op": "=", 260 "args": [{"property": key}, value] 261 }) 262 263 # Combine all filter conditions with AND 264 if filter_conditions: 265 if len(filter_conditions) == 1: 266 filter_expr = filter_conditions[0] 267 else: 268 filter_expr = { 269 "op": "and", 270 "args": filter_conditions 271 } 272 273 search_params['filter'] = filter_expr 274 275 # Add any extra kwargs to search 276 search_params.update(search_kwargs) 277 278 #print(search_params) # TODO: Remove 279 280 # Perform the search 281 # from rustac import DuckdbClient 282 items = self.duckdb_client.search(self.stac_parquet_href, **search_params) 283 if isinstance(items, dict): 284 items = items['features'] 285 286 if not items or len(items) == 0: 287 warnings.warn("No items found matching the query criteria", UserWarning) 288 return None 289 290 # Convert to GeoDataFrame 291 items_df = gpd.GeoDataFrame(items) 292 # Set index 293 items_df = items_df.set_index(items_df['id']) 294 items_df.index.name = 'stac_item_id' 295 # Set the geometry column 296 if 'geometry' in items_df.columns and not exclude_geometry: 297 items_df = items_df.set_geometry(items_df['geometry'].apply(shapely.geometry.shape)) 298 items_df.crs = "EPSG:4326" 299 300 # Reorder the columns, leaving any extra columns at the end 301 desired_order = ['collection', 'geometry', 'properties', 'assets'] 302 items_df = items_df[[col for col in desired_order if col in items_df.columns] + list(items_df.columns.difference(desired_order))] 303 304 return items_df 305 306 def load_frames(self, stac_items: gpd.GeoDataFrame, 307 data_product: str = "CSARP_standard", 308 merge_flights: bool = False, 309 skip_errors: bool = False, 310 ) -> Union[list[xr.Dataset], xr.Dataset]: 311 """ 312 Load multiple radar frames from STAC items. 313 314 Parameters 315 ---------- 316 stac_items : gpd.GeoDataFrame 317 STAC items returned from query_frames. 318 data_product : str, optional 319 Data product to load (e.g., "CSARP_standard", "CSARP_qlook"). 320 merge_flights : bool, optional 321 If True, merge frames from the same segment into single Datasets. 322 skip_errors : bool, optional 323 If True, skip failed frames and continue loading others. 324 325 Returns 326 ------- 327 list[xr.Dataset] or xr.Dataset 328 List of radar frame Datasets, or list of merged Datasets if 329 merge_flights=True. 330 """ 331 frames = [] 332 333 for idx, item in stac_items.iterrows(): 334 try: 335 frame = self.load_frame(item, data_product) 336 frames.append(frame) 337 except Exception as e: 338 print(f"Error loading frame for item {item.get('id', 'unknown')}: {e}") 339 if skip_errors: 340 continue 341 else: 342 raise e 343 344 if merge_flights: 345 return opr_tools.merge_frames(frames) 346 else: 347 return frames 348 349 def load_frame(self, stac_item, data_product: str = "CSARP_standard") -> xr.Dataset: 350 """ 351 Load a single radar frame from a STAC item. 352 353 Parameters 354 ---------- 355 stac_item : dict or pd.Series 356 STAC item containing asset URLs. 357 data_product : str, optional 358 Data product to load (e.g., "CSARP_standard", "CSARP_qlook"). 359 360 Returns 361 ------- 362 xr.Dataset 363 Radar frame with coordinates slow_time and twtt, and data variables 364 including Data, Latitude, Longitude, Elevation, Surface, etc. 365 """ 366 367 assets = stac_item['assets'] 368 369 # Get the data asset 370 data_asset = assets.get(data_product) 371 if not data_asset: 372 available_assets = list(assets.keys()) 373 raise ValueError(f"No {data_product} asset found. Available assets: {available_assets}") 374 375 # Get the URL from the asset 376 url = data_asset.get('href') 377 if not url: 378 raise ValueError(f"No href found in {data_product} asset") 379 380 # Load the frame using the existing method 381 return self.load_frame_url(url) 382 383 def load_frame_url(self, url: str) -> xr.Dataset: 384 """ 385 Load a radar frame directly from a URL. 386 387 Automatically detects and handles both HDF5 and legacy MATLAB formats. 388 389 Parameters 390 ---------- 391 url : str 392 URL to radar frame data file (.mat). 393 394 Returns 395 ------- 396 xr.Dataset 397 Radar frame with CF-compliant metadata and citation information. 398 """ 399 400 file = self._open_file(url) 401 402 filetype = None 403 try: 404 ds = self._load_frame_hdf5(file) 405 filetype = 'hdf5' 406 except OSError: 407 ds = self._load_frame_matlab(file) 408 filetype = 'matlab' 409 410 # Add the source URL as an attribute 411 ds.attrs['source_url'] = url 412 413 # Apply CF-compliant attributes 414 ds = apply_cf_compliant_attrs(ds) 415 416 # Get the season and segment from the URL 417 match = re.search(r'(\d{4}_\w+_[A-Za-z0-9]+)\/([\w_]+)\/[\d_]+\/[\w]+(\d{8}_\d{2}_\d{3})', url) 418 if match: 419 collection, data_product, granule = match.groups() 420 date, segment_id, frame_id = granule.split('_') 421 ds.attrs['collection'] = collection 422 ds.attrs['data_product'] = data_product 423 ds.attrs['granule'] = granule 424 ds.attrs['segment_path'] = f"{date}_{segment_id}" 425 ds.attrs['date_str'] = date 426 ds.attrs['segment'] = int(segment_id) 427 ds.attrs['frame'] = int(frame_id) 428 429 # Load citation information 430 result = ops_api.get_segment_metadata(segment_name=ds.attrs['segment_path'], season_name=collection) 431 if result: 432 if isinstance(result['data'], str): 433 warnings.warn(f"Warning: Unexpected result from ops_api: {result['data']}", UserWarning) 434 else: 435 result_data = {} 436 for key, value in result['data'].items(): 437 if len(value) == 1: 438 result_data[key] = value[0] 439 elif len(value) > 1: 440 result_data[key] = set(value) 441 442 if 'dois' in result_data: 443 ds.attrs['doi'] = result_data['dois'] 444 if 'rors' in result_data: 445 ds.attrs['ror'] = result_data['rors'] 446 if 'funding_sources' in result_data: 447 ds.attrs['funder_text'] = result_data['funding_sources'] 448 449 # Add the rest of the Matlab parameters 450 if filetype == 'hdf5': 451 ds.attrs['mimetype'] = 'application/x-hdf5' 452 ds.attrs.update(decode_hdf5_matlab_variable(h5py.File(file, 'r'), 453 skip_variables=True, 454 skip_errors=True)) 455 elif filetype == 'matlab': 456 ds.attrs['mimetype'] = 'application/x-matlab-data' 457 ds.attrs.update(extract_legacy_mat_attributes(file, 458 skip_keys=ds.keys(), 459 skip_errors=True)) 460 461 return ds 462 463 def _load_frame_hdf5(self, file) -> xr.Dataset: 464 """ 465 Load a radar frame from an HDF5 file. 466 467 Parameters 468 ---------- 469 file : 470 The path to the HDF5 file containing radar frame data. 471 472 Returns 473 ------- 474 xr.Dataset 475 The loaded radar frame as an xarray Dataset. 476 """ 477 478 ds = xr.open_dataset(file, engine='h5netcdf', phony_dims='sort') 479 480 # Re-arrange variables to provide useful dimensions and coordinates 481 482 ds = ds.squeeze() # Drop the singleton dimensions matlab adds 483 484 ds = ds.rename({ # Label the dimensions with more useful names 485 ds.Data.dims[0]: 'slow_time_idx', 486 ds.Data.dims[1]: 'twtt_idx', 487 }) 488 489 # Make variables with no dimensions into scalar attributes 490 for var in ds.data_vars: 491 if ds[var].ndim == 0: 492 ds.attrs[var] = ds[var].item() 493 ds = ds.drop_vars(var) 494 495 # Make the file_type an attribute 496 if 'file_type' in ds.data_vars: 497 ds.attrs['file_type'] = ds['file_type'].to_numpy() 498 ds = ds.drop_vars('file_type') 499 500 # Name the two time coordinates 501 ds = ds.rename({'Time': 'twtt', 'GPS_time': 'slow_time'}) 502 ds = ds.set_coords(['slow_time', 'twtt']) 503 504 slow_time_1d = pd.to_datetime(ds['slow_time'].values, unit='s') 505 ds = ds.assign_coords(slow_time=('slow_time_idx', slow_time_1d)) 506 507 # Make twtt and slow_time the indexing coordinates 508 ds = ds.swap_dims({'twtt_idx': 'twtt'}) 509 ds = ds.swap_dims({'slow_time_idx': 'slow_time'}) 510 511 return ds 512 513 def _load_frame_matlab(self, file) -> xr.Dataset: 514 """ 515 Load a radar frame from a MATLAB file. 516 517 Parameters 518 ---------- 519 file : 520 The path to the MATLAB file containing radar frame data. 521 522 Returns 523 ------- 524 xr.Dataset 525 The loaded radar frame as an xarray Dataset. 526 """ 527 528 m = scipy.io.loadmat(file, mat_dtype=False) 529 530 key_dims = { 531 'Time': ('twtt',), 532 'GPS_time': ('slow_time',), 533 'Latitude': ('slow_time',), 534 'Longitude': ('slow_time',), 535 'Elevation': ('slow_time',), 536 'Roll': ('slow_time',), 537 'Pitch': ('slow_time',), 538 'Heading': ('slow_time',), 539 'Surface': ('slow_time',), 540 'Data': ('twtt', 'slow_time') 541 } 542 543 ds = xr.Dataset( 544 { 545 key: (dims, np.squeeze(m[key])) for key, dims in key_dims.items() if key in m 546 }, 547 coords={ 548 'twtt': ('twtt', np.squeeze(m['Time'])), 549 'slow_time': ('slow_time', pd.to_datetime(np.squeeze(m['GPS_time']), unit='s')), 550 } 551 ) 552 553 return ds 554 555 def get_collections(self) -> list: 556 """ 557 Get all available collections (seasons/campaigns) in STAC catalog. 558 559 Returns 560 ------- 561 list[dict] 562 List of collection dictionaries with id, description, and extent. 563 """ 564 565 return self.duckdb_client.get_collections(self.stac_parquet_href) 566 567 def get_segments(self, collection_id: str) -> list: 568 """ 569 Get all segments (flights) within a collection. 570 571 Parameters 572 ---------- 573 collection_id : str 574 STAC collection ID (e.g., "2022_Antarctica_BaslerMKB"). 575 576 Returns 577 ------- 578 list[dict] 579 List of segment dictionaries with segment_path, date, flight_number, 580 and frame count. 581 """ 582 # Query STAC API for all items in collection (exclude geometry for better performance) 583 items = self.query_frames(collections=[collection_id], exclude_geometry=True) 584 585 if items is None or len(items) == 0: 586 print(f"No items found in collection '{collection_id}'") 587 return [] 588 589 # Group items by segment (opr:date + opr:segment) 590 segments = {} 591 for idx, item in items.iterrows(): 592 properties = item['properties'] 593 date = properties['opr:date'] 594 flight_num = properties['opr:segment'] 595 596 if date and flight_num is not None: 597 segment_path = f"{date}_{flight_num:02d}" 598 599 if segment_path not in segments: 600 segments[segment_path] = { 601 'segment_path': segment_path, 602 'date': date, 603 'flight_number': flight_num, 604 'collection': collection_id, 605 'frames': [], 606 'item_count': 0 607 } 608 609 segments[segment_path]['frames'].append(properties.get('opr:frame')) 610 segments[segment_path]['item_count'] += 1 611 612 # Sort segments by date and flight number 613 segment_list = list(segments.values()) 614 segment_list.sort(key=lambda x: (x['date'], x['flight_number'])) 615 616 return segment_list 617 618 def _get_layers_files(self, segment: Union[xr.Dataset, dict], raise_errors=True) -> dict: 619 """ 620 Fetch layers from the CSARP_layers files 621 622 See https://gitlab.com/openpolarradar/opr/-/wikis/Layer-File-Guide for file formats 623 624 Parameters 625 ---------- 626 segment : xr.Dataset or dict 627 Radar frame Dataset or STAC item. 628 raise_errors : bool, optional 629 If True, raise errors when layers cannot be found. 630 631 Returns 632 ------- 633 dict 634 Dictionary mapping layer names (e.g., "standard:surface", 635 "standard:bottom") to layer Datasets. 636 """ 637 collection, segment_path, frame = self._extract_segment_info(segment) 638 639 # If we already have a STAC item, just use it. Otherwise, query to find the matching STAC items 640 if isinstance(segment, xr.Dataset): 641 properties = {} 642 if frame: 643 properties['opr:frame'] = frame 644 645 # Query STAC collection for CSARP_layer files matching this specific segment 646 647 # Get items from this specific segment 648 stac_items = self.query_frames(collections=[collection], segment_paths=[segment_path], properties=properties) 649 650 # Filter for items that have CSARP_layer assets 651 layer_items = [] 652 for idx, item in stac_items.iterrows(): 653 if 'CSARP_layer' in item['assets']: 654 layer_items.append(item) 655 else: 656 layer_items = [segment] if 'CSARP_layer' in segment.get('assets', {}) else [] 657 658 if not layer_items: 659 if raise_errors: 660 raise ValueError(f"No CSARP_layer files found for segment path {segment_path} in collection {collection}") 661 else: 662 return {} 663 664 # Load each layer file and combine them 665 layer_frames = [] 666 for item in layer_items: 667 layer_asset = item['assets']['CSARP_layer'] 668 if layer_asset and 'href' in layer_asset: 669 url = layer_asset['href'] 670 try: 671 layer_ds = self.load_layers_file(url) 672 layer_frames.append(layer_ds) 673 except Exception as e: 674 raise e # TODO 675 print(f"Warning: Failed to load layer file {url}: {e}") 676 continue 677 678 if not layer_frames: 679 if raise_errors: 680 raise ValueError(f"No valid CSARP_layer files could be loaded for segment {segment_path} in collection {collection}") 681 else: 682 return {} 683 684 # Concatenate all layer frames along slow_time dimension 685 layers_segment = xr.concat(layer_frames, dim='slow_time', combine_attrs='drop_conflicts', data_vars='minimal') 686 layers_segment = layers_segment.sortby('slow_time') 687 688 # Trim to bounds of the original dataset 689 layers_segment = self._trim_to_bounds(layers_segment, segment) 690 691 # Find the layer organization file to map layer IDs to names 692 # Layer organization files are one per directory. For example, the layer file: 693 # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/Data_20180105_03_003.mat 694 # would have a layer organization file at: 695 # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/layer_20180105_03.mat 696 697 layer_organization_file_url = re.sub(r'Data_(\d{8}_\d{2})_\d{3}.mat', r'layer_\1.mat', url) 698 layer_organization_ds = self._load_layer_organization(self._open_file(layer_organization_file_url)) 699 700 # Split into separate layers by ID 701 layers = {} 702 703 layer_ids = np.unique(layers_segment['layer']) 704 705 for i, layer_id in enumerate(layer_ids): 706 layer_id_int = int(layer_id) 707 layer_name = str(layer_organization_ds['lyr_name'].sel(lyr_id=layer_id_int).values.item().squeeze()) 708 layer_group = str(layer_organization_ds['lyr_group_name'].sel(lyr_id=layer_id_int).values.item().squeeze()) 709 if layer_group == '[]': 710 layer_group = '' 711 layer_display_name = f"{layer_group}:{layer_name}" 712 713 layer_ds = layers_segment.sel(layer=layer_id).drop_vars('layer') 714 # Only add non-empty layers with at least some non-NaN data 715 if layer_ds.sizes.get('slow_time', 0) > 0: 716 if not layer_ds.to_array().isnull().all(): 717 layers[layer_display_name] = layer_ds 718 719 return layers 720 721 def _trim_to_bounds(self, ds: xr.Dataset, ref: Union[xr.Dataset, dict]) -> xr.Dataset: 722 start_time, end_time = None, None 723 if isinstance(ref, xr.Dataset) and 'slow_time' in ref.coords: 724 start_time = ref['slow_time'].min() 725 end_time = ref['slow_time'].max() 726 else: 727 properties = ref.get('properties', {}) 728 if 'start_datetime' in properties and 'end_datetime' in properties: 729 start_time = pd.to_datetime(properties['start_datetime']) 730 end_time = pd.to_datetime(properties['end_datetime']) 731 732 if start_time: 733 return ds.sel(slow_time=slice(start_time, end_time)) 734 else: 735 return ds 736 737 def load_layers_file(self, url: str) -> xr.Dataset: 738 """ 739 Load layer data from CSARP_layer file. 740 741 Parameters 742 ---------- 743 url : str 744 URL or path to layer file. 745 746 Returns 747 ------- 748 xr.Dataset 749 Layer data with coordinates slow_time and layer, and data variables 750 twtt, quality, type, lat, lon, and elev. 751 Layer data in a standardized format with coordinates: 752 - slow_time: GPS time converted to datetime 753 And data variables: 754 - twtt: Two-way travel time for each layer 755 - quality: Quality values for each layer 756 - type: Type values for each layer 757 - lat, lon, elev: Geographic coordinates 758 - id: Layer IDs 759 """ 760 761 file = self._open_file(url) 762 763 ds = self._load_layers(file) 764 765 # Add the source URL as an attribute 766 ds.attrs['source_url'] = url 767 768 # Apply common manipulations to match the expected structure 769 # Convert GPS time to datetime coordinate 770 if 'gps_time' in ds.variables: 771 slow_time_dt = pd.to_datetime(ds['gps_time'].values, unit='s') 772 ds = ds.assign_coords(slow_time=('slow_time', slow_time_dt)) 773 774 # Set slow_time as the main coordinate and remove gps_time from data_vars 775 if 'slow_time' not in ds.dims: 776 ds = ds.swap_dims({'gps_time': 'slow_time'}) 777 778 # Remove gps_time from data_vars if it exists there to avoid conflicts 779 if ('gps_time' in ds.data_vars) or ('gps_time' in ds.coords): 780 ds = ds.drop_vars('gps_time') 781 782 # Sort by slow_time if it exists 783 if 'slow_time' in ds.coords: 784 ds = ds.sortby('slow_time') 785 786 return ds 787 788 def _load_layers(self, file) -> xr.Dataset: 789 """ 790 Load layer data file using hdf5storage 791 792 Parameters: 793 file : str 794 Path to the layer file 795 Returns: xr.Dataset 796 """ 797 798 d = hdf5storage.loadmat(file, appendmat=False) 799 800 # Ensure 'id' is at least 1D for the layer dimension 801 id_data = np.atleast_1d(d['id'].squeeze()) 802 803 # For 2D arrays with (layer, slow_time) dimensions, ensure they remain 2D 804 # even when there's only one layer 805 quality_data = np.atleast_2d(d['quality'].squeeze()) 806 if quality_data.shape[0] == 1 or quality_data.ndim == 1: 807 quality_data = quality_data.reshape(len(id_data), -1) 808 809 twtt_data = np.atleast_2d(d['twtt'].squeeze()) 810 if twtt_data.shape[0] == 1 or twtt_data.ndim == 1: 811 twtt_data = twtt_data.reshape(len(id_data), -1) 812 813 type_data = np.atleast_2d(d['type'].squeeze()) 814 if type_data.shape[0] == 1 or type_data.ndim == 1: 815 type_data = type_data.reshape(len(id_data), -1) 816 817 ds = xr.Dataset({ 818 'file_type': ((), d['file_type'].squeeze()), 819 'file_version': ((), d['file_version'].squeeze()), 820 'elev': (('slow_time',), d['elev'].squeeze()), 821 #'gps_source': ((), d['gps_source'].squeeze()), 822 'gps_time': (('slow_time',), d['gps_time'].squeeze()), 823 'id': (('layer',), id_data), 824 'lat': (('slow_time',), d['lat'].squeeze()), 825 'lon': (('slow_time',), d['lon'].squeeze()), 826 'quality': (('layer', 'slow_time'), quality_data), 827 'twtt': (('layer', 'slow_time'), twtt_data), 828 'type': (('layer', 'slow_time'), type_data), 829 }) 830 831 ds = ds.assign_coords({'layer': ds['id'], 'slow_time': ds['gps_time']}) 832 833 return ds 834 835 def _load_layer_organization(self, file) -> xr.Dataset: 836 """ 837 Load a layer organization file 838 839 Parameters 840 ---------- 841 file : str 842 Path to the HDF5 layer organization file 843 844 Returns 845 ------- 846 xr.Dataset 847 Raw layer data from HDF5 file 848 """ 849 d = hdf5storage.loadmat(file, appendmat=False) 850 ds = xr.Dataset({ 851 'file_type': ((), d['file_type'].squeeze()), 852 'file_version': ((), d['file_version'].squeeze()), 853 'lyr_age': (('lyr_id',), np.atleast_1d(d['lyr_age'].squeeze())), 854 'lyr_age_source': (('lyr_id',), np.atleast_1d(d['lyr_age_source'].squeeze())), 855 'lyr_desc': (('lyr_id',), np.atleast_1d(d['lyr_desc'].squeeze())), 856 'lyr_group_name': (('lyr_id',), np.atleast_1d(d['lyr_group_name'].squeeze())), 857 'lyr_id': (('lyr_id',), np.atleast_1d(d['lyr_id'].squeeze())), 858 'lyr_name': (('lyr_id',), np.atleast_1d(d['lyr_name'].squeeze())), 859 'lyr_order': (('lyr_id',), np.atleast_1d(d['lyr_order'].squeeze())), 860 }, attrs={'param': d['param']}) 861 862 ds = ds.set_index(lyr_id='lyr_id') 863 864 return ds 865 866 def _get_layers_db(self, flight: Union[xr.Dataset, dict], include_geometry=True, raise_errors=True) -> dict: 867 """ 868 Load layer picks from OPS database API. 869 870 Parameters 871 ---------- 872 flight : xr.Dataset or dict 873 Radar frame Dataset or STAC item. 874 include_geometry : bool, optional 875 If True, include geometry information in layers. 876 raise_errors : bool, optional 877 If True, raise errors when layers cannot be found. 878 879 Returns 880 ------- 881 dict 882 Dictionary mapping layer names to layer Datasets. 883 """ 884 885 collection, segment_path, _ = self._extract_segment_info(flight) 886 887 if 'Antarctica' in collection: 888 location = 'antarctic' 889 elif 'Greenland' in collection: 890 location = 'arctic' 891 else: 892 raise ValueError("Dataset does not belong to a recognized location (Antarctica or Greenland).") 893 894 layer_points = ops_api.get_layer_points( 895 segment_name=segment_path, 896 season_name=collection, 897 location=location, 898 include_geometry=include_geometry 899 ) 900 901 if layer_points['status'] != 1: 902 if raise_errors: 903 raise ValueError(f"Failed to fetch layer points. Received response with status {layer_points['status']}.") 904 else: 905 return {} 906 907 layer_ds_raw = xr.Dataset( 908 {k: (['gps_time'], v) for k, v in layer_points['data'].items() if k != 'gps_time'}, 909 coords={'gps_time': layer_points['data']['gps_time']} 910 ) 911 # Split into a dictionary of layers based on lyr_id 912 layer_ids = set(layer_ds_raw['lyr_id'].to_numpy()) 913 layer_ids = [int(layer_id) for layer_id in layer_ids if not np.isnan(layer_id)] 914 915 # Get the mapping of layer IDs to names 916 if self.db_layers_metadata is None: 917 layer_metadata = ops_api.get_layer_metadata() 918 if layer_metadata['status'] == 1: 919 df = pd.DataFrame(layer_metadata['data']) 920 df['display_name'] = df['lyr_group_name'] + ":" + df['lyr_name'] 921 self.db_layers_metadata = df 922 else: 923 raise ValueError("Failed to fetch layer metadata from OPS API.") 924 925 layers = {} 926 for layer_id in layer_ids: 927 layer_name = self.db_layers_metadata.loc[self.db_layers_metadata['lyr_id'] == layer_id, 'display_name'].values[0] 928 929 layer_ds = layer_ds_raw.where(layer_ds_raw['lyr_id'] == layer_id, drop=True) 930 931 layer_ds = layer_ds.sortby('gps_time') 932 933 layer_ds = layer_ds.rename({'gps_time': 'slow_time'}) 934 layer_ds = layer_ds.set_coords(['slow_time']) 935 936 layer_ds['slow_time'] = pd.to_datetime(layer_ds['slow_time'].values, unit='s') 937 938 # Filter to the same time range as flight 939 layer_ds = self._trim_to_bounds(layer_ds, flight) 940 941 # Only add non-empty layers with at least some non-NaN data 942 if layer_ds.sizes.get('slow_time', 0) > 0: 943 if not layer_ds.to_array().isnull().all(): 944 layers[layer_name] = layer_ds 945 946 return layers 947 948 def get_layers(self, ds : Union[xr.Dataset, dict], source: str = 'auto', include_geometry=True, errors='warn') -> dict: 949 """ 950 Load layer picks from files or database. 951 952 Tries files first, falls back to database if source='auto'. 953 954 Parameters 955 ---------- 956 ds : Union[xr.Dataset, dict] 957 Radar frame Dataset or STAC item. 958 source : {'auto', 'files', 'db'}, optional 959 Source for layers. 'auto' tries files then falls back to database. 960 include_geometry : bool, optional 961 If True, include geometry information when fetching from the API. 962 errors : str, optional 963 How to handle missing layer data: 'warn' (default) returns None with a warning, 964 'error' raises an exception. 965 966 Returns 967 ------- 968 dict or None 969 A dictionary mapping layer IDs to their corresponding data, or None if no layers 970 found and errors='warn'. 971 """ 972 collection, segment_path, _ = self._extract_segment_info(ds) 973 974 if source == 'auto': 975 # Try to get layers from files first 976 try: 977 layers = self._get_layers_files(ds, raise_errors=True) 978 return layers 979 except Exception: 980 # Fallback to API if no layers found in files 981 try: 982 return self._get_layers_db(ds, include_geometry=include_geometry, raise_errors=True) 983 except Exception as e: 984 if errors == 'error': 985 raise 986 else: 987 warnings.warn(f"No layer data found for {collection}/{segment_path}: {e}") 988 return None 989 elif source == 'files': 990 try: 991 return self._get_layers_files(ds, raise_errors=True) 992 except Exception as e: 993 if errors == 'error': 994 raise 995 else: 996 warnings.warn(f"No layer files found for {collection}/{segment_path}: {e}") 997 return None 998 elif source == 'db': 999 try: 1000 return self._get_layers_db(ds, include_geometry=include_geometry, raise_errors=True) 1001 except Exception as e: 1002 if errors == 'error': 1003 raise 1004 else: 1005 warnings.warn(f"No layer data in DB for {collection}/{segment_path}: {e}") 1006 return None 1007 else: 1008 raise ValueError("Invalid source specified. Must be one of: 'auto', 'files', 'db'.")
77class OPRConnection: 78 def __init__(self, 79 collection_url: str = "https://data.cresis.ku.edu/data/", 80 cache_dir: str = None, 81 stac_parquet_href: str = None, 82 sync_catalogs: bool = True): 83 """ 84 Initialize connection to OPR data archive. 85 86 Parameters 87 ---------- 88 collection_url : str, optional 89 Base URL for OPR data collection. 90 cache_dir : str, optional 91 Directory to cache downloaded data for faster repeated access. 92 stac_parquet_href : str, optional 93 Path or URL pattern to STAC catalog parquet files. When *None* 94 (default), the local cache is used if available, falling back to 95 the S3 catalog. 96 sync_catalogs : bool, optional 97 If True (default), sync OPR STAC catalogs to a local cache 98 before resolving the catalog path. Uses ETag-based change 99 detection so repeated calls are cheap (single HTTP request). 100 """ 101 self.collection_url = collection_url 102 self.cache_dir = cache_dir 103 self._user_set_href = stac_parquet_href is not None 104 if sync_catalogs and not self._user_set_href: 105 sync_opr_catalogs() 106 self.stac_parquet_href = stac_parquet_href or get_opr_catalog_path() 107 108 self.fsspec_cache_kwargs = {} 109 self.fsspec_url_prefix = '' 110 if cache_dir: 111 self.fsspec_cache_kwargs['cache_storage'] = cache_dir 112 self.fsspec_cache_kwargs['check_files'] = True 113 self.fsspec_url_prefix = 'filecache::' 114 115 self.db_layers_metadata = None # Cache for OPS database layers metadata 116 117 # Lazy-initialized DuckDB client for STAC queries — reusing a single 118 # session caches remote file metadata (parquet footers) across calls 119 self._duckdb_client = None 120 121 @property 122 def duckdb_client(self): 123 """Lazy-initialized DuckDB client, recreated after pickling.""" 124 if self._duckdb_client is None: 125 self._duckdb_client = DuckdbClient() 126 return self._duckdb_client 127 128 def __getstate__(self): 129 state = self.__dict__.copy() 130 state['_duckdb_client'] = None # DuckdbClient can't be pickled 131 return state 132 133 def _open_file(self, url): 134 """Helper method to open files with appropriate caching.""" 135 if self.fsspec_url_prefix: 136 return fsspec.open_local(f"{self.fsspec_url_prefix}{url}", filecache=self.fsspec_cache_kwargs) 137 else: 138 return fsspec.open_local(f"simplecache::{url}", simplecache=self.fsspec_cache_kwargs) 139 140 def _extract_segment_info(self, segment): 141 """Extract collection, segment_path, and frame from Dataset or dict.""" 142 if isinstance(segment, xr.Dataset): 143 return (segment.attrs.get('collection'), 144 segment.attrs.get('segment_path'), 145 segment.attrs.get('frame')) 146 else: 147 return (segment['collection'], 148 f"{segment['properties'].get('opr:date')}_{segment['properties'].get('opr:segment'):02d}", 149 segment['properties'].get('opr:frame')) 150 151 def query_frames(self, collections: list[str] = None, segment_paths: list[str] = None, 152 geometry = None, date_range: tuple = None, properties: dict = {}, 153 max_items: int = None, exclude_geometry: bool = False, 154 search_kwargs: dict = {}) -> gpd.GeoDataFrame: 155 """ 156 Query STAC catalog for radar frames matching search criteria. 157 158 Multiple parameters are combined with AND logic. Lists of values passed to a 159 single parameter are treated with OR logic (any value matches). 160 161 Parameters 162 ---------- 163 collections : list[str] or str, optional 164 Collection name(s) (e.g., "2022_Antarctica_BaslerMKB"). 165 segment_paths : list[str] or str, optional 166 Segment path(s) in format "YYYYMMDD_NN" (e.g., "20230126_01"). 167 geometry : shapely geometry or dict, optional 168 Geospatial geometry to filter by intersection. 169 date_range : str, optional 170 Date range in ISO format (e.g., "2021-01-01/2025-06-01"). 171 properties : dict, optional 172 Additional STAC properties to filter by. 173 max_items : int, optional 174 Maximum number of items to return. 175 exclude_geometry : bool, optional 176 If True, exclude geometry field to reduce response size. 177 search_kwargs : dict, optional 178 Additional keyword arguments for STAC search. 179 180 Returns 181 ------- 182 gpd.GeoDataFrame or None 183 GeoDataFrame of matching STAC items with columns for collection, 184 geometry, properties, and assets. Returns None if no matches found. 185 """ 186 187 search_params = {} 188 189 # Exclude geometry -- do not return the geometry field to reduce response size 190 if exclude_geometry: 191 search_params['exclude'] = ['geometry'] 192 193 # Handle collections (seasons) 194 if collections is not None: 195 search_params['collections'] = [collections] if isinstance(collections, str) else collections 196 197 # Handle geometry filtering 198 if geometry is not None: 199 if hasattr(geometry, '__geo_interface__'): 200 geometry = geometry.__geo_interface__ 201 202 # Fix geometries that cross the antimeridian 203 geometry = antimeridian.fix_geojson(geometry, reverse=True) 204 205 search_params['intersects'] = geometry 206 207 # Handle date range filtering 208 if date_range is not None: 209 search_params['datetime'] = date_range 210 211 # Handle max_items 212 if max_items is not None: 213 search_params['limit'] = max_items 214 215 # Handle segment_paths filtering using CQL2 216 filter_conditions = [] 217 218 if segment_paths is not None: 219 segment_paths = [segment_paths] if isinstance(segment_paths, str) else segment_paths 220 221 # Create OR conditions for segment paths 222 segment_conditions = [] 223 for segment_path in segment_paths: 224 try: 225 date_str, segment_num_str = segment_path.split('_') 226 segment_num = int(segment_num_str) 227 228 # Create AND condition for this specific segment 229 segment_condition = { 230 "op": "and", 231 "args": [ 232 { 233 "op": "=", 234 "args": [{"property": "opr:date"}, date_str] 235 }, 236 { 237 "op": "=", 238 "args": [{"property": "opr:segment"}, segment_num] 239 } 240 ] 241 } 242 segment_conditions.append(segment_condition) 243 except ValueError: 244 print(f"Warning: Invalid segment_path format '{segment_path}'. Expected format: YYYYMMDD_NN") 245 continue 246 247 if segment_conditions: 248 if len(segment_conditions) == 1: 249 filter_conditions.append(segment_conditions[0]) 250 else: 251 # Multiple segments - combine with OR 252 filter_conditions.append({ 253 "op": "or", 254 "args": segment_conditions 255 }) 256 257 # Add any additional property filters 258 for key, value in properties.items(): 259 filter_conditions.append({ 260 "op": "=", 261 "args": [{"property": key}, value] 262 }) 263 264 # Combine all filter conditions with AND 265 if filter_conditions: 266 if len(filter_conditions) == 1: 267 filter_expr = filter_conditions[0] 268 else: 269 filter_expr = { 270 "op": "and", 271 "args": filter_conditions 272 } 273 274 search_params['filter'] = filter_expr 275 276 # Add any extra kwargs to search 277 search_params.update(search_kwargs) 278 279 #print(search_params) # TODO: Remove 280 281 # Perform the search 282 # from rustac import DuckdbClient 283 items = self.duckdb_client.search(self.stac_parquet_href, **search_params) 284 if isinstance(items, dict): 285 items = items['features'] 286 287 if not items or len(items) == 0: 288 warnings.warn("No items found matching the query criteria", UserWarning) 289 return None 290 291 # Convert to GeoDataFrame 292 items_df = gpd.GeoDataFrame(items) 293 # Set index 294 items_df = items_df.set_index(items_df['id']) 295 items_df.index.name = 'stac_item_id' 296 # Set the geometry column 297 if 'geometry' in items_df.columns and not exclude_geometry: 298 items_df = items_df.set_geometry(items_df['geometry'].apply(shapely.geometry.shape)) 299 items_df.crs = "EPSG:4326" 300 301 # Reorder the columns, leaving any extra columns at the end 302 desired_order = ['collection', 'geometry', 'properties', 'assets'] 303 items_df = items_df[[col for col in desired_order if col in items_df.columns] + list(items_df.columns.difference(desired_order))] 304 305 return items_df 306 307 def load_frames(self, stac_items: gpd.GeoDataFrame, 308 data_product: str = "CSARP_standard", 309 merge_flights: bool = False, 310 skip_errors: bool = False, 311 ) -> Union[list[xr.Dataset], xr.Dataset]: 312 """ 313 Load multiple radar frames from STAC items. 314 315 Parameters 316 ---------- 317 stac_items : gpd.GeoDataFrame 318 STAC items returned from query_frames. 319 data_product : str, optional 320 Data product to load (e.g., "CSARP_standard", "CSARP_qlook"). 321 merge_flights : bool, optional 322 If True, merge frames from the same segment into single Datasets. 323 skip_errors : bool, optional 324 If True, skip failed frames and continue loading others. 325 326 Returns 327 ------- 328 list[xr.Dataset] or xr.Dataset 329 List of radar frame Datasets, or list of merged Datasets if 330 merge_flights=True. 331 """ 332 frames = [] 333 334 for idx, item in stac_items.iterrows(): 335 try: 336 frame = self.load_frame(item, data_product) 337 frames.append(frame) 338 except Exception as e: 339 print(f"Error loading frame for item {item.get('id', 'unknown')}: {e}") 340 if skip_errors: 341 continue 342 else: 343 raise e 344 345 if merge_flights: 346 return opr_tools.merge_frames(frames) 347 else: 348 return frames 349 350 def load_frame(self, stac_item, data_product: str = "CSARP_standard") -> xr.Dataset: 351 """ 352 Load a single radar frame from a STAC item. 353 354 Parameters 355 ---------- 356 stac_item : dict or pd.Series 357 STAC item containing asset URLs. 358 data_product : str, optional 359 Data product to load (e.g., "CSARP_standard", "CSARP_qlook"). 360 361 Returns 362 ------- 363 xr.Dataset 364 Radar frame with coordinates slow_time and twtt, and data variables 365 including Data, Latitude, Longitude, Elevation, Surface, etc. 366 """ 367 368 assets = stac_item['assets'] 369 370 # Get the data asset 371 data_asset = assets.get(data_product) 372 if not data_asset: 373 available_assets = list(assets.keys()) 374 raise ValueError(f"No {data_product} asset found. Available assets: {available_assets}") 375 376 # Get the URL from the asset 377 url = data_asset.get('href') 378 if not url: 379 raise ValueError(f"No href found in {data_product} asset") 380 381 # Load the frame using the existing method 382 return self.load_frame_url(url) 383 384 def load_frame_url(self, url: str) -> xr.Dataset: 385 """ 386 Load a radar frame directly from a URL. 387 388 Automatically detects and handles both HDF5 and legacy MATLAB formats. 389 390 Parameters 391 ---------- 392 url : str 393 URL to radar frame data file (.mat). 394 395 Returns 396 ------- 397 xr.Dataset 398 Radar frame with CF-compliant metadata and citation information. 399 """ 400 401 file = self._open_file(url) 402 403 filetype = None 404 try: 405 ds = self._load_frame_hdf5(file) 406 filetype = 'hdf5' 407 except OSError: 408 ds = self._load_frame_matlab(file) 409 filetype = 'matlab' 410 411 # Add the source URL as an attribute 412 ds.attrs['source_url'] = url 413 414 # Apply CF-compliant attributes 415 ds = apply_cf_compliant_attrs(ds) 416 417 # Get the season and segment from the URL 418 match = re.search(r'(\d{4}_\w+_[A-Za-z0-9]+)\/([\w_]+)\/[\d_]+\/[\w]+(\d{8}_\d{2}_\d{3})', url) 419 if match: 420 collection, data_product, granule = match.groups() 421 date, segment_id, frame_id = granule.split('_') 422 ds.attrs['collection'] = collection 423 ds.attrs['data_product'] = data_product 424 ds.attrs['granule'] = granule 425 ds.attrs['segment_path'] = f"{date}_{segment_id}" 426 ds.attrs['date_str'] = date 427 ds.attrs['segment'] = int(segment_id) 428 ds.attrs['frame'] = int(frame_id) 429 430 # Load citation information 431 result = ops_api.get_segment_metadata(segment_name=ds.attrs['segment_path'], season_name=collection) 432 if result: 433 if isinstance(result['data'], str): 434 warnings.warn(f"Warning: Unexpected result from ops_api: {result['data']}", UserWarning) 435 else: 436 result_data = {} 437 for key, value in result['data'].items(): 438 if len(value) == 1: 439 result_data[key] = value[0] 440 elif len(value) > 1: 441 result_data[key] = set(value) 442 443 if 'dois' in result_data: 444 ds.attrs['doi'] = result_data['dois'] 445 if 'rors' in result_data: 446 ds.attrs['ror'] = result_data['rors'] 447 if 'funding_sources' in result_data: 448 ds.attrs['funder_text'] = result_data['funding_sources'] 449 450 # Add the rest of the Matlab parameters 451 if filetype == 'hdf5': 452 ds.attrs['mimetype'] = 'application/x-hdf5' 453 ds.attrs.update(decode_hdf5_matlab_variable(h5py.File(file, 'r'), 454 skip_variables=True, 455 skip_errors=True)) 456 elif filetype == 'matlab': 457 ds.attrs['mimetype'] = 'application/x-matlab-data' 458 ds.attrs.update(extract_legacy_mat_attributes(file, 459 skip_keys=ds.keys(), 460 skip_errors=True)) 461 462 return ds 463 464 def _load_frame_hdf5(self, file) -> xr.Dataset: 465 """ 466 Load a radar frame from an HDF5 file. 467 468 Parameters 469 ---------- 470 file : 471 The path to the HDF5 file containing radar frame data. 472 473 Returns 474 ------- 475 xr.Dataset 476 The loaded radar frame as an xarray Dataset. 477 """ 478 479 ds = xr.open_dataset(file, engine='h5netcdf', phony_dims='sort') 480 481 # Re-arrange variables to provide useful dimensions and coordinates 482 483 ds = ds.squeeze() # Drop the singleton dimensions matlab adds 484 485 ds = ds.rename({ # Label the dimensions with more useful names 486 ds.Data.dims[0]: 'slow_time_idx', 487 ds.Data.dims[1]: 'twtt_idx', 488 }) 489 490 # Make variables with no dimensions into scalar attributes 491 for var in ds.data_vars: 492 if ds[var].ndim == 0: 493 ds.attrs[var] = ds[var].item() 494 ds = ds.drop_vars(var) 495 496 # Make the file_type an attribute 497 if 'file_type' in ds.data_vars: 498 ds.attrs['file_type'] = ds['file_type'].to_numpy() 499 ds = ds.drop_vars('file_type') 500 501 # Name the two time coordinates 502 ds = ds.rename({'Time': 'twtt', 'GPS_time': 'slow_time'}) 503 ds = ds.set_coords(['slow_time', 'twtt']) 504 505 slow_time_1d = pd.to_datetime(ds['slow_time'].values, unit='s') 506 ds = ds.assign_coords(slow_time=('slow_time_idx', slow_time_1d)) 507 508 # Make twtt and slow_time the indexing coordinates 509 ds = ds.swap_dims({'twtt_idx': 'twtt'}) 510 ds = ds.swap_dims({'slow_time_idx': 'slow_time'}) 511 512 return ds 513 514 def _load_frame_matlab(self, file) -> xr.Dataset: 515 """ 516 Load a radar frame from a MATLAB file. 517 518 Parameters 519 ---------- 520 file : 521 The path to the MATLAB file containing radar frame data. 522 523 Returns 524 ------- 525 xr.Dataset 526 The loaded radar frame as an xarray Dataset. 527 """ 528 529 m = scipy.io.loadmat(file, mat_dtype=False) 530 531 key_dims = { 532 'Time': ('twtt',), 533 'GPS_time': ('slow_time',), 534 'Latitude': ('slow_time',), 535 'Longitude': ('slow_time',), 536 'Elevation': ('slow_time',), 537 'Roll': ('slow_time',), 538 'Pitch': ('slow_time',), 539 'Heading': ('slow_time',), 540 'Surface': ('slow_time',), 541 'Data': ('twtt', 'slow_time') 542 } 543 544 ds = xr.Dataset( 545 { 546 key: (dims, np.squeeze(m[key])) for key, dims in key_dims.items() if key in m 547 }, 548 coords={ 549 'twtt': ('twtt', np.squeeze(m['Time'])), 550 'slow_time': ('slow_time', pd.to_datetime(np.squeeze(m['GPS_time']), unit='s')), 551 } 552 ) 553 554 return ds 555 556 def get_collections(self) -> list: 557 """ 558 Get all available collections (seasons/campaigns) in STAC catalog. 559 560 Returns 561 ------- 562 list[dict] 563 List of collection dictionaries with id, description, and extent. 564 """ 565 566 return self.duckdb_client.get_collections(self.stac_parquet_href) 567 568 def get_segments(self, collection_id: str) -> list: 569 """ 570 Get all segments (flights) within a collection. 571 572 Parameters 573 ---------- 574 collection_id : str 575 STAC collection ID (e.g., "2022_Antarctica_BaslerMKB"). 576 577 Returns 578 ------- 579 list[dict] 580 List of segment dictionaries with segment_path, date, flight_number, 581 and frame count. 582 """ 583 # Query STAC API for all items in collection (exclude geometry for better performance) 584 items = self.query_frames(collections=[collection_id], exclude_geometry=True) 585 586 if items is None or len(items) == 0: 587 print(f"No items found in collection '{collection_id}'") 588 return [] 589 590 # Group items by segment (opr:date + opr:segment) 591 segments = {} 592 for idx, item in items.iterrows(): 593 properties = item['properties'] 594 date = properties['opr:date'] 595 flight_num = properties['opr:segment'] 596 597 if date and flight_num is not None: 598 segment_path = f"{date}_{flight_num:02d}" 599 600 if segment_path not in segments: 601 segments[segment_path] = { 602 'segment_path': segment_path, 603 'date': date, 604 'flight_number': flight_num, 605 'collection': collection_id, 606 'frames': [], 607 'item_count': 0 608 } 609 610 segments[segment_path]['frames'].append(properties.get('opr:frame')) 611 segments[segment_path]['item_count'] += 1 612 613 # Sort segments by date and flight number 614 segment_list = list(segments.values()) 615 segment_list.sort(key=lambda x: (x['date'], x['flight_number'])) 616 617 return segment_list 618 619 def _get_layers_files(self, segment: Union[xr.Dataset, dict], raise_errors=True) -> dict: 620 """ 621 Fetch layers from the CSARP_layers files 622 623 See https://gitlab.com/openpolarradar/opr/-/wikis/Layer-File-Guide for file formats 624 625 Parameters 626 ---------- 627 segment : xr.Dataset or dict 628 Radar frame Dataset or STAC item. 629 raise_errors : bool, optional 630 If True, raise errors when layers cannot be found. 631 632 Returns 633 ------- 634 dict 635 Dictionary mapping layer names (e.g., "standard:surface", 636 "standard:bottom") to layer Datasets. 637 """ 638 collection, segment_path, frame = self._extract_segment_info(segment) 639 640 # If we already have a STAC item, just use it. Otherwise, query to find the matching STAC items 641 if isinstance(segment, xr.Dataset): 642 properties = {} 643 if frame: 644 properties['opr:frame'] = frame 645 646 # Query STAC collection for CSARP_layer files matching this specific segment 647 648 # Get items from this specific segment 649 stac_items = self.query_frames(collections=[collection], segment_paths=[segment_path], properties=properties) 650 651 # Filter for items that have CSARP_layer assets 652 layer_items = [] 653 for idx, item in stac_items.iterrows(): 654 if 'CSARP_layer' in item['assets']: 655 layer_items.append(item) 656 else: 657 layer_items = [segment] if 'CSARP_layer' in segment.get('assets', {}) else [] 658 659 if not layer_items: 660 if raise_errors: 661 raise ValueError(f"No CSARP_layer files found for segment path {segment_path} in collection {collection}") 662 else: 663 return {} 664 665 # Load each layer file and combine them 666 layer_frames = [] 667 for item in layer_items: 668 layer_asset = item['assets']['CSARP_layer'] 669 if layer_asset and 'href' in layer_asset: 670 url = layer_asset['href'] 671 try: 672 layer_ds = self.load_layers_file(url) 673 layer_frames.append(layer_ds) 674 except Exception as e: 675 raise e # TODO 676 print(f"Warning: Failed to load layer file {url}: {e}") 677 continue 678 679 if not layer_frames: 680 if raise_errors: 681 raise ValueError(f"No valid CSARP_layer files could be loaded for segment {segment_path} in collection {collection}") 682 else: 683 return {} 684 685 # Concatenate all layer frames along slow_time dimension 686 layers_segment = xr.concat(layer_frames, dim='slow_time', combine_attrs='drop_conflicts', data_vars='minimal') 687 layers_segment = layers_segment.sortby('slow_time') 688 689 # Trim to bounds of the original dataset 690 layers_segment = self._trim_to_bounds(layers_segment, segment) 691 692 # Find the layer organization file to map layer IDs to names 693 # Layer organization files are one per directory. For example, the layer file: 694 # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/Data_20180105_03_003.mat 695 # would have a layer organization file at: 696 # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/layer_20180105_03.mat 697 698 layer_organization_file_url = re.sub(r'Data_(\d{8}_\d{2})_\d{3}.mat', r'layer_\1.mat', url) 699 layer_organization_ds = self._load_layer_organization(self._open_file(layer_organization_file_url)) 700 701 # Split into separate layers by ID 702 layers = {} 703 704 layer_ids = np.unique(layers_segment['layer']) 705 706 for i, layer_id in enumerate(layer_ids): 707 layer_id_int = int(layer_id) 708 layer_name = str(layer_organization_ds['lyr_name'].sel(lyr_id=layer_id_int).values.item().squeeze()) 709 layer_group = str(layer_organization_ds['lyr_group_name'].sel(lyr_id=layer_id_int).values.item().squeeze()) 710 if layer_group == '[]': 711 layer_group = '' 712 layer_display_name = f"{layer_group}:{layer_name}" 713 714 layer_ds = layers_segment.sel(layer=layer_id).drop_vars('layer') 715 # Only add non-empty layers with at least some non-NaN data 716 if layer_ds.sizes.get('slow_time', 0) > 0: 717 if not layer_ds.to_array().isnull().all(): 718 layers[layer_display_name] = layer_ds 719 720 return layers 721 722 def _trim_to_bounds(self, ds: xr.Dataset, ref: Union[xr.Dataset, dict]) -> xr.Dataset: 723 start_time, end_time = None, None 724 if isinstance(ref, xr.Dataset) and 'slow_time' in ref.coords: 725 start_time = ref['slow_time'].min() 726 end_time = ref['slow_time'].max() 727 else: 728 properties = ref.get('properties', {}) 729 if 'start_datetime' in properties and 'end_datetime' in properties: 730 start_time = pd.to_datetime(properties['start_datetime']) 731 end_time = pd.to_datetime(properties['end_datetime']) 732 733 if start_time: 734 return ds.sel(slow_time=slice(start_time, end_time)) 735 else: 736 return ds 737 738 def load_layers_file(self, url: str) -> xr.Dataset: 739 """ 740 Load layer data from CSARP_layer file. 741 742 Parameters 743 ---------- 744 url : str 745 URL or path to layer file. 746 747 Returns 748 ------- 749 xr.Dataset 750 Layer data with coordinates slow_time and layer, and data variables 751 twtt, quality, type, lat, lon, and elev. 752 Layer data in a standardized format with coordinates: 753 - slow_time: GPS time converted to datetime 754 And data variables: 755 - twtt: Two-way travel time for each layer 756 - quality: Quality values for each layer 757 - type: Type values for each layer 758 - lat, lon, elev: Geographic coordinates 759 - id: Layer IDs 760 """ 761 762 file = self._open_file(url) 763 764 ds = self._load_layers(file) 765 766 # Add the source URL as an attribute 767 ds.attrs['source_url'] = url 768 769 # Apply common manipulations to match the expected structure 770 # Convert GPS time to datetime coordinate 771 if 'gps_time' in ds.variables: 772 slow_time_dt = pd.to_datetime(ds['gps_time'].values, unit='s') 773 ds = ds.assign_coords(slow_time=('slow_time', slow_time_dt)) 774 775 # Set slow_time as the main coordinate and remove gps_time from data_vars 776 if 'slow_time' not in ds.dims: 777 ds = ds.swap_dims({'gps_time': 'slow_time'}) 778 779 # Remove gps_time from data_vars if it exists there to avoid conflicts 780 if ('gps_time' in ds.data_vars) or ('gps_time' in ds.coords): 781 ds = ds.drop_vars('gps_time') 782 783 # Sort by slow_time if it exists 784 if 'slow_time' in ds.coords: 785 ds = ds.sortby('slow_time') 786 787 return ds 788 789 def _load_layers(self, file) -> xr.Dataset: 790 """ 791 Load layer data file using hdf5storage 792 793 Parameters: 794 file : str 795 Path to the layer file 796 Returns: xr.Dataset 797 """ 798 799 d = hdf5storage.loadmat(file, appendmat=False) 800 801 # Ensure 'id' is at least 1D for the layer dimension 802 id_data = np.atleast_1d(d['id'].squeeze()) 803 804 # For 2D arrays with (layer, slow_time) dimensions, ensure they remain 2D 805 # even when there's only one layer 806 quality_data = np.atleast_2d(d['quality'].squeeze()) 807 if quality_data.shape[0] == 1 or quality_data.ndim == 1: 808 quality_data = quality_data.reshape(len(id_data), -1) 809 810 twtt_data = np.atleast_2d(d['twtt'].squeeze()) 811 if twtt_data.shape[0] == 1 or twtt_data.ndim == 1: 812 twtt_data = twtt_data.reshape(len(id_data), -1) 813 814 type_data = np.atleast_2d(d['type'].squeeze()) 815 if type_data.shape[0] == 1 or type_data.ndim == 1: 816 type_data = type_data.reshape(len(id_data), -1) 817 818 ds = xr.Dataset({ 819 'file_type': ((), d['file_type'].squeeze()), 820 'file_version': ((), d['file_version'].squeeze()), 821 'elev': (('slow_time',), d['elev'].squeeze()), 822 #'gps_source': ((), d['gps_source'].squeeze()), 823 'gps_time': (('slow_time',), d['gps_time'].squeeze()), 824 'id': (('layer',), id_data), 825 'lat': (('slow_time',), d['lat'].squeeze()), 826 'lon': (('slow_time',), d['lon'].squeeze()), 827 'quality': (('layer', 'slow_time'), quality_data), 828 'twtt': (('layer', 'slow_time'), twtt_data), 829 'type': (('layer', 'slow_time'), type_data), 830 }) 831 832 ds = ds.assign_coords({'layer': ds['id'], 'slow_time': ds['gps_time']}) 833 834 return ds 835 836 def _load_layer_organization(self, file) -> xr.Dataset: 837 """ 838 Load a layer organization file 839 840 Parameters 841 ---------- 842 file : str 843 Path to the HDF5 layer organization file 844 845 Returns 846 ------- 847 xr.Dataset 848 Raw layer data from HDF5 file 849 """ 850 d = hdf5storage.loadmat(file, appendmat=False) 851 ds = xr.Dataset({ 852 'file_type': ((), d['file_type'].squeeze()), 853 'file_version': ((), d['file_version'].squeeze()), 854 'lyr_age': (('lyr_id',), np.atleast_1d(d['lyr_age'].squeeze())), 855 'lyr_age_source': (('lyr_id',), np.atleast_1d(d['lyr_age_source'].squeeze())), 856 'lyr_desc': (('lyr_id',), np.atleast_1d(d['lyr_desc'].squeeze())), 857 'lyr_group_name': (('lyr_id',), np.atleast_1d(d['lyr_group_name'].squeeze())), 858 'lyr_id': (('lyr_id',), np.atleast_1d(d['lyr_id'].squeeze())), 859 'lyr_name': (('lyr_id',), np.atleast_1d(d['lyr_name'].squeeze())), 860 'lyr_order': (('lyr_id',), np.atleast_1d(d['lyr_order'].squeeze())), 861 }, attrs={'param': d['param']}) 862 863 ds = ds.set_index(lyr_id='lyr_id') 864 865 return ds 866 867 def _get_layers_db(self, flight: Union[xr.Dataset, dict], include_geometry=True, raise_errors=True) -> dict: 868 """ 869 Load layer picks from OPS database API. 870 871 Parameters 872 ---------- 873 flight : xr.Dataset or dict 874 Radar frame Dataset or STAC item. 875 include_geometry : bool, optional 876 If True, include geometry information in layers. 877 raise_errors : bool, optional 878 If True, raise errors when layers cannot be found. 879 880 Returns 881 ------- 882 dict 883 Dictionary mapping layer names to layer Datasets. 884 """ 885 886 collection, segment_path, _ = self._extract_segment_info(flight) 887 888 if 'Antarctica' in collection: 889 location = 'antarctic' 890 elif 'Greenland' in collection: 891 location = 'arctic' 892 else: 893 raise ValueError("Dataset does not belong to a recognized location (Antarctica or Greenland).") 894 895 layer_points = ops_api.get_layer_points( 896 segment_name=segment_path, 897 season_name=collection, 898 location=location, 899 include_geometry=include_geometry 900 ) 901 902 if layer_points['status'] != 1: 903 if raise_errors: 904 raise ValueError(f"Failed to fetch layer points. Received response with status {layer_points['status']}.") 905 else: 906 return {} 907 908 layer_ds_raw = xr.Dataset( 909 {k: (['gps_time'], v) for k, v in layer_points['data'].items() if k != 'gps_time'}, 910 coords={'gps_time': layer_points['data']['gps_time']} 911 ) 912 # Split into a dictionary of layers based on lyr_id 913 layer_ids = set(layer_ds_raw['lyr_id'].to_numpy()) 914 layer_ids = [int(layer_id) for layer_id in layer_ids if not np.isnan(layer_id)] 915 916 # Get the mapping of layer IDs to names 917 if self.db_layers_metadata is None: 918 layer_metadata = ops_api.get_layer_metadata() 919 if layer_metadata['status'] == 1: 920 df = pd.DataFrame(layer_metadata['data']) 921 df['display_name'] = df['lyr_group_name'] + ":" + df['lyr_name'] 922 self.db_layers_metadata = df 923 else: 924 raise ValueError("Failed to fetch layer metadata from OPS API.") 925 926 layers = {} 927 for layer_id in layer_ids: 928 layer_name = self.db_layers_metadata.loc[self.db_layers_metadata['lyr_id'] == layer_id, 'display_name'].values[0] 929 930 layer_ds = layer_ds_raw.where(layer_ds_raw['lyr_id'] == layer_id, drop=True) 931 932 layer_ds = layer_ds.sortby('gps_time') 933 934 layer_ds = layer_ds.rename({'gps_time': 'slow_time'}) 935 layer_ds = layer_ds.set_coords(['slow_time']) 936 937 layer_ds['slow_time'] = pd.to_datetime(layer_ds['slow_time'].values, unit='s') 938 939 # Filter to the same time range as flight 940 layer_ds = self._trim_to_bounds(layer_ds, flight) 941 942 # Only add non-empty layers with at least some non-NaN data 943 if layer_ds.sizes.get('slow_time', 0) > 0: 944 if not layer_ds.to_array().isnull().all(): 945 layers[layer_name] = layer_ds 946 947 return layers 948 949 def get_layers(self, ds : Union[xr.Dataset, dict], source: str = 'auto', include_geometry=True, errors='warn') -> dict: 950 """ 951 Load layer picks from files or database. 952 953 Tries files first, falls back to database if source='auto'. 954 955 Parameters 956 ---------- 957 ds : Union[xr.Dataset, dict] 958 Radar frame Dataset or STAC item. 959 source : {'auto', 'files', 'db'}, optional 960 Source for layers. 'auto' tries files then falls back to database. 961 include_geometry : bool, optional 962 If True, include geometry information when fetching from the API. 963 errors : str, optional 964 How to handle missing layer data: 'warn' (default) returns None with a warning, 965 'error' raises an exception. 966 967 Returns 968 ------- 969 dict or None 970 A dictionary mapping layer IDs to their corresponding data, or None if no layers 971 found and errors='warn'. 972 """ 973 collection, segment_path, _ = self._extract_segment_info(ds) 974 975 if source == 'auto': 976 # Try to get layers from files first 977 try: 978 layers = self._get_layers_files(ds, raise_errors=True) 979 return layers 980 except Exception: 981 # Fallback to API if no layers found in files 982 try: 983 return self._get_layers_db(ds, include_geometry=include_geometry, raise_errors=True) 984 except Exception as e: 985 if errors == 'error': 986 raise 987 else: 988 warnings.warn(f"No layer data found for {collection}/{segment_path}: {e}") 989 return None 990 elif source == 'files': 991 try: 992 return self._get_layers_files(ds, raise_errors=True) 993 except Exception as e: 994 if errors == 'error': 995 raise 996 else: 997 warnings.warn(f"No layer files found for {collection}/{segment_path}: {e}") 998 return None 999 elif source == 'db': 1000 try: 1001 return self._get_layers_db(ds, include_geometry=include_geometry, raise_errors=True) 1002 except Exception as e: 1003 if errors == 'error': 1004 raise 1005 else: 1006 warnings.warn(f"No layer data in DB for {collection}/{segment_path}: {e}") 1007 return None 1008 else: 1009 raise ValueError("Invalid source specified. Must be one of: 'auto', 'files', 'db'.")
78 def __init__(self, 79 collection_url: str = "https://data.cresis.ku.edu/data/", 80 cache_dir: str = None, 81 stac_parquet_href: str = None, 82 sync_catalogs: bool = True): 83 """ 84 Initialize connection to OPR data archive. 85 86 Parameters 87 ---------- 88 collection_url : str, optional 89 Base URL for OPR data collection. 90 cache_dir : str, optional 91 Directory to cache downloaded data for faster repeated access. 92 stac_parquet_href : str, optional 93 Path or URL pattern to STAC catalog parquet files. When *None* 94 (default), the local cache is used if available, falling back to 95 the S3 catalog. 96 sync_catalogs : bool, optional 97 If True (default), sync OPR STAC catalogs to a local cache 98 before resolving the catalog path. Uses ETag-based change 99 detection so repeated calls are cheap (single HTTP request). 100 """ 101 self.collection_url = collection_url 102 self.cache_dir = cache_dir 103 self._user_set_href = stac_parquet_href is not None 104 if sync_catalogs and not self._user_set_href: 105 sync_opr_catalogs() 106 self.stac_parquet_href = stac_parquet_href or get_opr_catalog_path() 107 108 self.fsspec_cache_kwargs = {} 109 self.fsspec_url_prefix = '' 110 if cache_dir: 111 self.fsspec_cache_kwargs['cache_storage'] = cache_dir 112 self.fsspec_cache_kwargs['check_files'] = True 113 self.fsspec_url_prefix = 'filecache::' 114 115 self.db_layers_metadata = None # Cache for OPS database layers metadata 116 117 # Lazy-initialized DuckDB client for STAC queries — reusing a single 118 # session caches remote file metadata (parquet footers) across calls 119 self._duckdb_client = None
Initialize connection to OPR data archive.
Parameters
- collection_url (str, optional): Base URL for OPR data collection.
- cache_dir (str, optional): Directory to cache downloaded data for faster repeated access.
- stac_parquet_href (str, optional): Path or URL pattern to STAC catalog parquet files. When None (default), the local cache is used if available, falling back to the S3 catalog.
- sync_catalogs (bool, optional): If True (default), sync OPR STAC catalogs to a local cache before resolving the catalog path. Uses ETag-based change detection so repeated calls are cheap (single HTTP request).
121 @property 122 def duckdb_client(self): 123 """Lazy-initialized DuckDB client, recreated after pickling.""" 124 if self._duckdb_client is None: 125 self._duckdb_client = DuckdbClient() 126 return self._duckdb_client
Lazy-initialized DuckDB client, recreated after pickling.
151 def query_frames(self, collections: list[str] = None, segment_paths: list[str] = None, 152 geometry = None, date_range: tuple = None, properties: dict = {}, 153 max_items: int = None, exclude_geometry: bool = False, 154 search_kwargs: dict = {}) -> gpd.GeoDataFrame: 155 """ 156 Query STAC catalog for radar frames matching search criteria. 157 158 Multiple parameters are combined with AND logic. Lists of values passed to a 159 single parameter are treated with OR logic (any value matches). 160 161 Parameters 162 ---------- 163 collections : list[str] or str, optional 164 Collection name(s) (e.g., "2022_Antarctica_BaslerMKB"). 165 segment_paths : list[str] or str, optional 166 Segment path(s) in format "YYYYMMDD_NN" (e.g., "20230126_01"). 167 geometry : shapely geometry or dict, optional 168 Geospatial geometry to filter by intersection. 169 date_range : str, optional 170 Date range in ISO format (e.g., "2021-01-01/2025-06-01"). 171 properties : dict, optional 172 Additional STAC properties to filter by. 173 max_items : int, optional 174 Maximum number of items to return. 175 exclude_geometry : bool, optional 176 If True, exclude geometry field to reduce response size. 177 search_kwargs : dict, optional 178 Additional keyword arguments for STAC search. 179 180 Returns 181 ------- 182 gpd.GeoDataFrame or None 183 GeoDataFrame of matching STAC items with columns for collection, 184 geometry, properties, and assets. Returns None if no matches found. 185 """ 186 187 search_params = {} 188 189 # Exclude geometry -- do not return the geometry field to reduce response size 190 if exclude_geometry: 191 search_params['exclude'] = ['geometry'] 192 193 # Handle collections (seasons) 194 if collections is not None: 195 search_params['collections'] = [collections] if isinstance(collections, str) else collections 196 197 # Handle geometry filtering 198 if geometry is not None: 199 if hasattr(geometry, '__geo_interface__'): 200 geometry = geometry.__geo_interface__ 201 202 # Fix geometries that cross the antimeridian 203 geometry = antimeridian.fix_geojson(geometry, reverse=True) 204 205 search_params['intersects'] = geometry 206 207 # Handle date range filtering 208 if date_range is not None: 209 search_params['datetime'] = date_range 210 211 # Handle max_items 212 if max_items is not None: 213 search_params['limit'] = max_items 214 215 # Handle segment_paths filtering using CQL2 216 filter_conditions = [] 217 218 if segment_paths is not None: 219 segment_paths = [segment_paths] if isinstance(segment_paths, str) else segment_paths 220 221 # Create OR conditions for segment paths 222 segment_conditions = [] 223 for segment_path in segment_paths: 224 try: 225 date_str, segment_num_str = segment_path.split('_') 226 segment_num = int(segment_num_str) 227 228 # Create AND condition for this specific segment 229 segment_condition = { 230 "op": "and", 231 "args": [ 232 { 233 "op": "=", 234 "args": [{"property": "opr:date"}, date_str] 235 }, 236 { 237 "op": "=", 238 "args": [{"property": "opr:segment"}, segment_num] 239 } 240 ] 241 } 242 segment_conditions.append(segment_condition) 243 except ValueError: 244 print(f"Warning: Invalid segment_path format '{segment_path}'. Expected format: YYYYMMDD_NN") 245 continue 246 247 if segment_conditions: 248 if len(segment_conditions) == 1: 249 filter_conditions.append(segment_conditions[0]) 250 else: 251 # Multiple segments - combine with OR 252 filter_conditions.append({ 253 "op": "or", 254 "args": segment_conditions 255 }) 256 257 # Add any additional property filters 258 for key, value in properties.items(): 259 filter_conditions.append({ 260 "op": "=", 261 "args": [{"property": key}, value] 262 }) 263 264 # Combine all filter conditions with AND 265 if filter_conditions: 266 if len(filter_conditions) == 1: 267 filter_expr = filter_conditions[0] 268 else: 269 filter_expr = { 270 "op": "and", 271 "args": filter_conditions 272 } 273 274 search_params['filter'] = filter_expr 275 276 # Add any extra kwargs to search 277 search_params.update(search_kwargs) 278 279 #print(search_params) # TODO: Remove 280 281 # Perform the search 282 # from rustac import DuckdbClient 283 items = self.duckdb_client.search(self.stac_parquet_href, **search_params) 284 if isinstance(items, dict): 285 items = items['features'] 286 287 if not items or len(items) == 0: 288 warnings.warn("No items found matching the query criteria", UserWarning) 289 return None 290 291 # Convert to GeoDataFrame 292 items_df = gpd.GeoDataFrame(items) 293 # Set index 294 items_df = items_df.set_index(items_df['id']) 295 items_df.index.name = 'stac_item_id' 296 # Set the geometry column 297 if 'geometry' in items_df.columns and not exclude_geometry: 298 items_df = items_df.set_geometry(items_df['geometry'].apply(shapely.geometry.shape)) 299 items_df.crs = "EPSG:4326" 300 301 # Reorder the columns, leaving any extra columns at the end 302 desired_order = ['collection', 'geometry', 'properties', 'assets'] 303 items_df = items_df[[col for col in desired_order if col in items_df.columns] + list(items_df.columns.difference(desired_order))] 304 305 return items_df
Query STAC catalog for radar frames matching search criteria.
Multiple parameters are combined with AND logic. Lists of values passed to a single parameter are treated with OR logic (any value matches).
Parameters
- collections (list[str] or str, optional): Collection name(s) (e.g., "2022_Antarctica_BaslerMKB").
- segment_paths (list[str] or str, optional): Segment path(s) in format "YYYYMMDD_NN" (e.g., "20230126_01").
- geometry (shapely geometry or dict, optional): Geospatial geometry to filter by intersection.
- date_range (str, optional): Date range in ISO format (e.g., "2021-01-01/2025-06-01").
- properties (dict, optional): Additional STAC properties to filter by.
- max_items (int, optional): Maximum number of items to return.
- exclude_geometry (bool, optional): If True, exclude geometry field to reduce response size.
- search_kwargs (dict, optional): Additional keyword arguments for STAC search.
Returns
- gpd.GeoDataFrame or None: GeoDataFrame of matching STAC items with columns for collection, geometry, properties, and assets. Returns None if no matches found.
307 def load_frames(self, stac_items: gpd.GeoDataFrame, 308 data_product: str = "CSARP_standard", 309 merge_flights: bool = False, 310 skip_errors: bool = False, 311 ) -> Union[list[xr.Dataset], xr.Dataset]: 312 """ 313 Load multiple radar frames from STAC items. 314 315 Parameters 316 ---------- 317 stac_items : gpd.GeoDataFrame 318 STAC items returned from query_frames. 319 data_product : str, optional 320 Data product to load (e.g., "CSARP_standard", "CSARP_qlook"). 321 merge_flights : bool, optional 322 If True, merge frames from the same segment into single Datasets. 323 skip_errors : bool, optional 324 If True, skip failed frames and continue loading others. 325 326 Returns 327 ------- 328 list[xr.Dataset] or xr.Dataset 329 List of radar frame Datasets, or list of merged Datasets if 330 merge_flights=True. 331 """ 332 frames = [] 333 334 for idx, item in stac_items.iterrows(): 335 try: 336 frame = self.load_frame(item, data_product) 337 frames.append(frame) 338 except Exception as e: 339 print(f"Error loading frame for item {item.get('id', 'unknown')}: {e}") 340 if skip_errors: 341 continue 342 else: 343 raise e 344 345 if merge_flights: 346 return opr_tools.merge_frames(frames) 347 else: 348 return frames
Load multiple radar frames from STAC items.
Parameters
- stac_items (gpd.GeoDataFrame): STAC items returned from query_frames.
- data_product (str, optional): Data product to load (e.g., "CSARP_standard", "CSARP_qlook").
- merge_flights (bool, optional): If True, merge frames from the same segment into single Datasets.
- skip_errors (bool, optional): If True, skip failed frames and continue loading others.
Returns
- list[xr.Dataset] or xr.Dataset: List of radar frame Datasets, or list of merged Datasets if merge_flights=True.
350 def load_frame(self, stac_item, data_product: str = "CSARP_standard") -> xr.Dataset: 351 """ 352 Load a single radar frame from a STAC item. 353 354 Parameters 355 ---------- 356 stac_item : dict or pd.Series 357 STAC item containing asset URLs. 358 data_product : str, optional 359 Data product to load (e.g., "CSARP_standard", "CSARP_qlook"). 360 361 Returns 362 ------- 363 xr.Dataset 364 Radar frame with coordinates slow_time and twtt, and data variables 365 including Data, Latitude, Longitude, Elevation, Surface, etc. 366 """ 367 368 assets = stac_item['assets'] 369 370 # Get the data asset 371 data_asset = assets.get(data_product) 372 if not data_asset: 373 available_assets = list(assets.keys()) 374 raise ValueError(f"No {data_product} asset found. Available assets: {available_assets}") 375 376 # Get the URL from the asset 377 url = data_asset.get('href') 378 if not url: 379 raise ValueError(f"No href found in {data_product} asset") 380 381 # Load the frame using the existing method 382 return self.load_frame_url(url)
Load a single radar frame from a STAC item.
Parameters
- stac_item (dict or pd.Series): STAC item containing asset URLs.
- data_product (str, optional): Data product to load (e.g., "CSARP_standard", "CSARP_qlook").
Returns
- xr.Dataset: Radar frame with coordinates slow_time and twtt, and data variables including Data, Latitude, Longitude, Elevation, Surface, etc.
384 def load_frame_url(self, url: str) -> xr.Dataset: 385 """ 386 Load a radar frame directly from a URL. 387 388 Automatically detects and handles both HDF5 and legacy MATLAB formats. 389 390 Parameters 391 ---------- 392 url : str 393 URL to radar frame data file (.mat). 394 395 Returns 396 ------- 397 xr.Dataset 398 Radar frame with CF-compliant metadata and citation information. 399 """ 400 401 file = self._open_file(url) 402 403 filetype = None 404 try: 405 ds = self._load_frame_hdf5(file) 406 filetype = 'hdf5' 407 except OSError: 408 ds = self._load_frame_matlab(file) 409 filetype = 'matlab' 410 411 # Add the source URL as an attribute 412 ds.attrs['source_url'] = url 413 414 # Apply CF-compliant attributes 415 ds = apply_cf_compliant_attrs(ds) 416 417 # Get the season and segment from the URL 418 match = re.search(r'(\d{4}_\w+_[A-Za-z0-9]+)\/([\w_]+)\/[\d_]+\/[\w]+(\d{8}_\d{2}_\d{3})', url) 419 if match: 420 collection, data_product, granule = match.groups() 421 date, segment_id, frame_id = granule.split('_') 422 ds.attrs['collection'] = collection 423 ds.attrs['data_product'] = data_product 424 ds.attrs['granule'] = granule 425 ds.attrs['segment_path'] = f"{date}_{segment_id}" 426 ds.attrs['date_str'] = date 427 ds.attrs['segment'] = int(segment_id) 428 ds.attrs['frame'] = int(frame_id) 429 430 # Load citation information 431 result = ops_api.get_segment_metadata(segment_name=ds.attrs['segment_path'], season_name=collection) 432 if result: 433 if isinstance(result['data'], str): 434 warnings.warn(f"Warning: Unexpected result from ops_api: {result['data']}", UserWarning) 435 else: 436 result_data = {} 437 for key, value in result['data'].items(): 438 if len(value) == 1: 439 result_data[key] = value[0] 440 elif len(value) > 1: 441 result_data[key] = set(value) 442 443 if 'dois' in result_data: 444 ds.attrs['doi'] = result_data['dois'] 445 if 'rors' in result_data: 446 ds.attrs['ror'] = result_data['rors'] 447 if 'funding_sources' in result_data: 448 ds.attrs['funder_text'] = result_data['funding_sources'] 449 450 # Add the rest of the Matlab parameters 451 if filetype == 'hdf5': 452 ds.attrs['mimetype'] = 'application/x-hdf5' 453 ds.attrs.update(decode_hdf5_matlab_variable(h5py.File(file, 'r'), 454 skip_variables=True, 455 skip_errors=True)) 456 elif filetype == 'matlab': 457 ds.attrs['mimetype'] = 'application/x-matlab-data' 458 ds.attrs.update(extract_legacy_mat_attributes(file, 459 skip_keys=ds.keys(), 460 skip_errors=True)) 461 462 return ds
Load a radar frame directly from a URL.
Automatically detects and handles both HDF5 and legacy MATLAB formats.
Parameters
- url (str): URL to radar frame data file (.mat).
Returns
- xr.Dataset: Radar frame with CF-compliant metadata and citation information.
556 def get_collections(self) -> list: 557 """ 558 Get all available collections (seasons/campaigns) in STAC catalog. 559 560 Returns 561 ------- 562 list[dict] 563 List of collection dictionaries with id, description, and extent. 564 """ 565 566 return self.duckdb_client.get_collections(self.stac_parquet_href)
Get all available collections (seasons/campaigns) in STAC catalog.
Returns
- list[dict]: List of collection dictionaries with id, description, and extent.
568 def get_segments(self, collection_id: str) -> list: 569 """ 570 Get all segments (flights) within a collection. 571 572 Parameters 573 ---------- 574 collection_id : str 575 STAC collection ID (e.g., "2022_Antarctica_BaslerMKB"). 576 577 Returns 578 ------- 579 list[dict] 580 List of segment dictionaries with segment_path, date, flight_number, 581 and frame count. 582 """ 583 # Query STAC API for all items in collection (exclude geometry for better performance) 584 items = self.query_frames(collections=[collection_id], exclude_geometry=True) 585 586 if items is None or len(items) == 0: 587 print(f"No items found in collection '{collection_id}'") 588 return [] 589 590 # Group items by segment (opr:date + opr:segment) 591 segments = {} 592 for idx, item in items.iterrows(): 593 properties = item['properties'] 594 date = properties['opr:date'] 595 flight_num = properties['opr:segment'] 596 597 if date and flight_num is not None: 598 segment_path = f"{date}_{flight_num:02d}" 599 600 if segment_path not in segments: 601 segments[segment_path] = { 602 'segment_path': segment_path, 603 'date': date, 604 'flight_number': flight_num, 605 'collection': collection_id, 606 'frames': [], 607 'item_count': 0 608 } 609 610 segments[segment_path]['frames'].append(properties.get('opr:frame')) 611 segments[segment_path]['item_count'] += 1 612 613 # Sort segments by date and flight number 614 segment_list = list(segments.values()) 615 segment_list.sort(key=lambda x: (x['date'], x['flight_number'])) 616 617 return segment_list
Get all segments (flights) within a collection.
Parameters
- collection_id (str): STAC collection ID (e.g., "2022_Antarctica_BaslerMKB").
Returns
- list[dict]: List of segment dictionaries with segment_path, date, flight_number, and frame count.
738 def load_layers_file(self, url: str) -> xr.Dataset: 739 """ 740 Load layer data from CSARP_layer file. 741 742 Parameters 743 ---------- 744 url : str 745 URL or path to layer file. 746 747 Returns 748 ------- 749 xr.Dataset 750 Layer data with coordinates slow_time and layer, and data variables 751 twtt, quality, type, lat, lon, and elev. 752 Layer data in a standardized format with coordinates: 753 - slow_time: GPS time converted to datetime 754 And data variables: 755 - twtt: Two-way travel time for each layer 756 - quality: Quality values for each layer 757 - type: Type values for each layer 758 - lat, lon, elev: Geographic coordinates 759 - id: Layer IDs 760 """ 761 762 file = self._open_file(url) 763 764 ds = self._load_layers(file) 765 766 # Add the source URL as an attribute 767 ds.attrs['source_url'] = url 768 769 # Apply common manipulations to match the expected structure 770 # Convert GPS time to datetime coordinate 771 if 'gps_time' in ds.variables: 772 slow_time_dt = pd.to_datetime(ds['gps_time'].values, unit='s') 773 ds = ds.assign_coords(slow_time=('slow_time', slow_time_dt)) 774 775 # Set slow_time as the main coordinate and remove gps_time from data_vars 776 if 'slow_time' not in ds.dims: 777 ds = ds.swap_dims({'gps_time': 'slow_time'}) 778 779 # Remove gps_time from data_vars if it exists there to avoid conflicts 780 if ('gps_time' in ds.data_vars) or ('gps_time' in ds.coords): 781 ds = ds.drop_vars('gps_time') 782 783 # Sort by slow_time if it exists 784 if 'slow_time' in ds.coords: 785 ds = ds.sortby('slow_time') 786 787 return ds
Load layer data from CSARP_layer file.
Parameters
- url (str): URL or path to layer file.
Returns
- xr.Dataset: Layer data with coordinates slow_time and layer, and data variables
twtt, quality, type, lat, lon, and elev.
Layer data in a standardized format with coordinates:
- slow_time: GPS time converted to datetime And data variables:
- twtt: Two-way travel time for each layer
- quality: Quality values for each layer
- type: Type values for each layer
- lat, lon, elev: Geographic coordinates
- id: Layer IDs
949 def get_layers(self, ds : Union[xr.Dataset, dict], source: str = 'auto', include_geometry=True, errors='warn') -> dict: 950 """ 951 Load layer picks from files or database. 952 953 Tries files first, falls back to database if source='auto'. 954 955 Parameters 956 ---------- 957 ds : Union[xr.Dataset, dict] 958 Radar frame Dataset or STAC item. 959 source : {'auto', 'files', 'db'}, optional 960 Source for layers. 'auto' tries files then falls back to database. 961 include_geometry : bool, optional 962 If True, include geometry information when fetching from the API. 963 errors : str, optional 964 How to handle missing layer data: 'warn' (default) returns None with a warning, 965 'error' raises an exception. 966 967 Returns 968 ------- 969 dict or None 970 A dictionary mapping layer IDs to their corresponding data, or None if no layers 971 found and errors='warn'. 972 """ 973 collection, segment_path, _ = self._extract_segment_info(ds) 974 975 if source == 'auto': 976 # Try to get layers from files first 977 try: 978 layers = self._get_layers_files(ds, raise_errors=True) 979 return layers 980 except Exception: 981 # Fallback to API if no layers found in files 982 try: 983 return self._get_layers_db(ds, include_geometry=include_geometry, raise_errors=True) 984 except Exception as e: 985 if errors == 'error': 986 raise 987 else: 988 warnings.warn(f"No layer data found for {collection}/{segment_path}: {e}") 989 return None 990 elif source == 'files': 991 try: 992 return self._get_layers_files(ds, raise_errors=True) 993 except Exception as e: 994 if errors == 'error': 995 raise 996 else: 997 warnings.warn(f"No layer files found for {collection}/{segment_path}: {e}") 998 return None 999 elif source == 'db': 1000 try: 1001 return self._get_layers_db(ds, include_geometry=include_geometry, raise_errors=True) 1002 except Exception as e: 1003 if errors == 'error': 1004 raise 1005 else: 1006 warnings.warn(f"No layer data in DB for {collection}/{segment_path}: {e}") 1007 return None 1008 else: 1009 raise ValueError("Invalid source specified. Must be one of: 'auto', 'files', 'db'.")
Load layer picks from files or database.
Tries files first, falls back to database if source='auto'.
Parameters
- ds (Union[xr.Dataset, dict]): Radar frame Dataset or STAC item.
- source ({'auto', 'files', 'db'}, optional): Source for layers. 'auto' tries files then falls back to database.
- include_geometry (bool, optional): If True, include geometry information when fetching from the API.
- errors (str, optional): How to handle missing layer data: 'warn' (default) returns None with a warning, 'error' raises an exception.
Returns
- dict or None: A dictionary mapping layer IDs to their corresponding data, or None if no layers found and errors='warn'.