xopr.opr_access
1import re 2import warnings 3from typing import Union 4 5import antimeridian 6import fsspec 7import geopandas as gpd 8import h5py 9import hdf5storage 10import numpy as np 11import pandas as pd 12import scipy.io 13import shapely 14import xarray as xr 15from rustac import DuckdbClient 16 17from . import opr_tools, ops_api 18from .cf_units import apply_cf_compliant_attrs 19from .matlab_attribute_utils import ( 20 decode_hdf5_matlab_variable, 21 extract_legacy_mat_attributes, 22) 23 24 25class OPRConnection: 26 def __init__(self, 27 collection_url: str = "https://data.cresis.ku.edu/data/", 28 cache_dir: str = None, 29 stac_parquet_href: str = "gs://opr_stac/catalog/**/*.parquet"): 30 """ 31 Initialize the OPRConnection with a collection URL and optional cache directory. 32 33 Parameters 34 ---------- 35 collection_url : str 36 The base URL for the OPR data collection. 37 cache_dir : str, optional 38 Directory to cache downloaded data. 39 """ 40 self.collection_url = collection_url 41 self.cache_dir = cache_dir 42 self.stac_parquet_href = stac_parquet_href 43 44 self.fsspec_cache_kwargs = {} 45 self.fsspec_url_prefix = '' 46 if cache_dir: 47 self.fsspec_cache_kwargs['cache_storage'] = cache_dir 48 self.fsspec_cache_kwargs['check_files'] = True 49 self.fsspec_url_prefix = 'filecache::' 50 51 self.db_layers_metadata = None # Cache for OPS database layers metadata 52 53 def _open_file(self, url): 54 """Helper method to open files with appropriate caching.""" 55 if self.fsspec_url_prefix: 56 return fsspec.open_local(f"{self.fsspec_url_prefix}{url}", filecache=self.fsspec_cache_kwargs) 57 else: 58 return fsspec.open_local(f"simplecache::{url}", simplecache=self.fsspec_cache_kwargs) 59 60 def _extract_segment_info(self, segment): 61 """Extract collection, segment_path, and frame from Dataset or dict.""" 62 if isinstance(segment, xr.Dataset): 63 return (segment.attrs.get('collection'), 64 segment.attrs.get('segment_path'), 65 segment.attrs.get('frame')) 66 else: 67 return (segment['collection'], 68 f"{segment['properties'].get('opr:date')}_{segment['properties'].get('opr:segment'):02d}", 69 segment['properties'].get('opr:frame')) 70 71 def query_frames(self, collections: list[str] = None, segment_paths: list[str] = None, 72 geometry = None, date_range: tuple = None, properties: dict = {}, 73 max_items: int = None, exclude_geometry: bool = False, 74 search_kwargs: dict = {}) -> gpd.GeoDataFrame: 75 """ 76 Query for radar frames based on various search criteria. Each parameter is 77 treated as an independent criteria. If multiple parameters are passed, they are 78 combined with AND logic. 79 80 A list of values may be passed to most parameters. If so, any values in the list 81 will be treated as a match. 82 83 Parameters 84 ---------- 85 collections : list[str], optional 86 List of collection names to filter by (e.g., "2022_Antarctica_BaslerMKB"). 87 segment_paths : list[str], optional 88 List of segment paths to filter by (e.g., "20230126_01"). 89 geometry : optional 90 Geospatial geometry to filter by (e.g., a shapely geometry object). 91 date_range : tuple, optional 92 Date range to filter by (e.g., (start_date, end_date)). 93 properties : dict, optional 94 Additional properties to include in the query. 95 max_items : int, optional 96 Maximum number of items to return. 97 exclude_geometry : bool, optional 98 If True, exclude the geometry field from the response to reduce size. 99 search_kwargs : dict, optional 100 Additional keyword arguments to pass to the search method. 101 102 Returns 103 ------- 104 gpd.GeoDataFrame 105 GeoDataFrame containing the STAC frames matching the query criteria. 106 """ 107 108 search_params = {} 109 110 # Exclude geometry -- do not return the geometry field to reduce response size 111 if exclude_geometry: 112 search_params['exclude'] = ['geometry'] 113 114 # Handle collections (seasons) 115 if collections is not None: 116 search_params['collections'] = [collections] if isinstance(collections, str) else collections 117 118 # Handle geometry filtering 119 if geometry is not None: 120 if hasattr(geometry, '__geo_interface__'): 121 geometry = geometry.__geo_interface__ 122 123 # Fix geometries that cross the antimeridian 124 geometry = antimeridian.fix_geojson(geometry, reverse=True) 125 126 search_params['intersects'] = geometry 127 128 # Handle date range filtering 129 if date_range is not None: 130 search_params['datetime'] = date_range 131 132 # Handle max_items 133 if max_items is not None: 134 search_params['limit'] = max_items 135 else: 136 search_params['limit'] = 1000000 137 138 # Handle segment_paths filtering using CQL2 139 filter_conditions = [] 140 141 if segment_paths is not None: 142 segment_paths = [segment_paths] if isinstance(segment_paths, str) else segment_paths 143 144 # Create OR conditions for segment paths 145 segment_conditions = [] 146 for segment_path in segment_paths: 147 try: 148 date_str, segment_num_str = segment_path.split('_') 149 segment_num = int(segment_num_str) 150 151 # Create AND condition for this specific segment 152 segment_condition = { 153 "op": "and", 154 "args": [ 155 { 156 "op": "=", 157 "args": [{"property": "opr:date"}, date_str] 158 }, 159 { 160 "op": "=", 161 "args": [{"property": "opr:segment"}, segment_num] 162 } 163 ] 164 } 165 segment_conditions.append(segment_condition) 166 except ValueError: 167 print(f"Warning: Invalid segment_path format '{segment_path}'. Expected format: YYYYMMDD_NN") 168 continue 169 170 if segment_conditions: 171 if len(segment_conditions) == 1: 172 filter_conditions.append(segment_conditions[0]) 173 else: 174 # Multiple segments - combine with OR 175 filter_conditions.append({ 176 "op": "or", 177 "args": segment_conditions 178 }) 179 180 # Add any additional property filters 181 for key, value in properties.items(): 182 filter_conditions.append({ 183 "op": "=", 184 "args": [{"property": key}, value] 185 }) 186 187 # Combine all filter conditions with AND 188 if filter_conditions: 189 if len(filter_conditions) == 1: 190 filter_expr = filter_conditions[0] 191 else: 192 filter_expr = { 193 "op": "and", 194 "args": filter_conditions 195 } 196 197 search_params['filter'] = filter_expr 198 199 # Add any extra kwargs to search 200 search_params.update(search_kwargs) 201 202 #print(search_params) # TODO: Remove 203 204 # Perform the search 205 # from rustac import DuckdbClient 206 client = DuckdbClient() 207 items = client.search(self.stac_parquet_href, **search_params) 208 if isinstance(items, dict): 209 items = items['features'] 210 211 if not items or len(items) == 0: 212 warnings.warn("No items found matching the query criteria", UserWarning) 213 return None 214 215 # Convert to GeoDataFrame 216 items_df = gpd.GeoDataFrame(items) 217 # Set index 218 items_df = items_df.set_index(items_df['id']) 219 items_df.index.name = 'stac_item_id' 220 # Set the geometry column 221 if 'geometry' in items_df.columns and not exclude_geometry: 222 items_df = items_df.set_geometry(items_df['geometry'].apply(shapely.geometry.shape)) 223 items_df.crs = "EPSG:4326" 224 225 # Reorder the columns, leaving any extra columns at the end 226 desired_order = ['collection', 'geometry', 'properties', 'assets'] 227 items_df = items_df[[col for col in desired_order if col in items_df.columns] + list(items_df.columns.difference(desired_order))] 228 229 return items_df 230 231 def load_frames(self, stac_items: gpd.GeoDataFrame, 232 data_product: str = "CSARP_standard", 233 merge_flights: bool = False, 234 skip_errors: bool = False, 235 ) -> Union[list[xr.Dataset], xr.Dataset]: 236 """ 237 Load multiple radar frames from a list of STAC items. 238 239 Parameters 240 ---------- 241 stac_items : gpd.GeoDataFrame 242 The STAC items containing asset URLs. 243 data_product : str, optional 244 The data product to load (default is "CSARP_standard"). 245 merge_flights : bool, optional 246 Whether to merge frames from the same flight (default is False). 247 skip_errors : bool, optional 248 Whether to skip errors and continue loading other frames (default is False). 249 250 Returns 251 ------- 252 list[xr.Dataset] or xr.Dataset 253 List of loaded radar frames as xarray Datasets or a single merged Dataset if there is only one segment and merge_flights is True. 254 """ 255 frames = [] 256 257 for idx, item in stac_items.iterrows(): 258 try: 259 frame = self.load_frame(item, data_product) 260 frames.append(frame) 261 except Exception as e: 262 print(f"Error loading frame for item {item.get('id', 'unknown')}: {e}") 263 if skip_errors: 264 continue 265 else: 266 raise e 267 268 if merge_flights: 269 return opr_tools.merge_frames(frames) 270 else: 271 return frames 272 273 def load_frame(self, stac_item, data_product: str = "CSARP_standard") -> xr.Dataset: 274 """ 275 Load a radar frame from a STAC item. 276 277 Parameters 278 ---------- 279 stac_item 280 The STAC item containing asset URLs. 281 data_product : str, optional 282 The data product to load (default is "CSARP_standard"). 283 284 Returns 285 ------- 286 xr.Dataset 287 The loaded radar frame as an xarray Dataset. 288 """ 289 290 assets = stac_item['assets'] 291 292 # Get the data asset 293 data_asset = assets.get(data_product) 294 if not data_asset: 295 available_assets = list(assets.keys()) 296 raise ValueError(f"No {data_product} asset found. Available assets: {available_assets}") 297 298 # Get the URL from the asset 299 url = data_asset.get('href') 300 if not url: 301 raise ValueError(f"No href found in {data_product} asset") 302 303 # Load the frame using the existing method 304 return self.load_frame_url(url) 305 306 def load_frame_url(self, url: str) -> xr.Dataset: 307 """ 308 Load a radar frame from a given URL. 309 310 Parameters 311 ---------- 312 url : str 313 The URL of the radar frame data. 314 315 Returns 316 ------- 317 xr.Dataset 318 The loaded radar frame as an xarray Dataset. 319 """ 320 321 file = self._open_file(url) 322 323 filetype = None 324 try: 325 ds = self._load_frame_hdf5(file) 326 filetype = 'hdf5' 327 except OSError: 328 ds = self._load_frame_matlab(file) 329 filetype = 'matlab' 330 331 # Add the source URL as an attribute 332 ds.attrs['source_url'] = url 333 334 # Apply CF-compliant attributes 335 ds = apply_cf_compliant_attrs(ds) 336 337 # Get the season and segment from the URL 338 match = re.search(r'(\d{4}_\w+_[A-Za-z0-9]+)\/([\w_]+)\/[\d_]+\/[\w]+(\d{8}_\d{2}_\d{3})', url) 339 if match: 340 collection, data_product, granule = match.groups() 341 date, segment_id, frame_id = granule.split('_') 342 ds.attrs['collection'] = collection 343 ds.attrs['data_product'] = data_product 344 ds.attrs['granule'] = granule 345 ds.attrs['segment_path'] = f"{date}_{segment_id}" 346 ds.attrs['date_str'] = date 347 ds.attrs['segment'] = int(segment_id) 348 ds.attrs['frame'] = int(frame_id) 349 350 # Load citation information 351 result = ops_api.get_segment_metadata(segment_name=ds.attrs['segment_path'], season_name=collection) 352 if result: 353 if isinstance(result['data'], str): 354 warnings.warn(f"Warning: Unexpected result from ops_api: {result['data']}", UserWarning) 355 else: 356 result_data = {} 357 for key, value in result['data'].items(): 358 if len(value) == 1: 359 result_data[key] = value[0] 360 elif len(value) > 1: 361 result_data[key] = set(value) 362 363 if 'dois' in result_data: 364 ds.attrs['doi'] = result_data['dois'] 365 if 'rors' in result_data: 366 ds.attrs['ror'] = result_data['rors'] 367 if 'funding_sources' in result_data: 368 ds.attrs['funder_text'] = result_data['funding_sources'] 369 370 # Add the rest of the Matlab parameters 371 if filetype == 'hdf5': 372 ds.attrs['mimetype'] = 'application/x-hdf5' 373 ds.attrs.update(decode_hdf5_matlab_variable(h5py.File(file, 'r'), 374 skip_variables=True, 375 skip_errors=True)) 376 elif filetype == 'matlab': 377 ds.attrs['mimetype'] = 'application/x-matlab-data' 378 ds.attrs.update(extract_legacy_mat_attributes(file, 379 skip_keys=ds.keys(), 380 skip_errors=True)) 381 382 return ds 383 384 def _load_frame_hdf5(self, file) -> xr.Dataset: 385 """ 386 Load a radar frame from an HDF5 file. 387 388 Parameters 389 ---------- 390 file : 391 The path to the HDF5 file containing radar frame data. 392 393 Returns 394 ------- 395 xr.Dataset 396 The loaded radar frame as an xarray Dataset. 397 """ 398 399 ds = xr.open_dataset(file, engine='h5netcdf', phony_dims='sort') 400 401 # Re-arrange variables to provide useful dimensions and coordinates 402 403 ds = ds.squeeze() # Drop the singleton dimensions matlab adds 404 405 ds = ds.rename({ # Label the dimensions with more useful names 406 ds.Data.dims[0]: 'slow_time_idx', 407 ds.Data.dims[1]: 'twtt_idx', 408 }) 409 410 # Make variables with no dimensions into scalar attributes 411 for var in ds.data_vars: 412 if ds[var].ndim == 0: 413 ds.attrs[var] = ds[var].item() 414 ds = ds.drop_vars(var) 415 416 # Make the file_type an attribute 417 if 'file_type' in ds.data_vars: 418 ds.attrs['file_type'] = ds['file_type'].to_numpy() 419 ds = ds.drop_vars('file_type') 420 421 # Name the two time coordinates 422 ds = ds.rename({'Time': 'twtt', 'GPS_time': 'slow_time'}) 423 ds = ds.set_coords(['slow_time', 'twtt']) 424 425 slow_time_1d = pd.to_datetime(ds['slow_time'].values, unit='s') 426 ds = ds.assign_coords(slow_time=('slow_time_idx', slow_time_1d)) 427 428 # Make twtt and slow_time the indexing coordinates 429 ds = ds.swap_dims({'twtt_idx': 'twtt'}) 430 ds = ds.swap_dims({'slow_time_idx': 'slow_time'}) 431 432 return ds 433 434 def _load_frame_matlab(self, file) -> xr.Dataset: 435 """ 436 Load a radar frame from a MATLAB file. 437 438 Parameters 439 ---------- 440 file : 441 The path to the MATLAB file containing radar frame data. 442 443 Returns 444 ------- 445 xr.Dataset 446 The loaded radar frame as an xarray Dataset. 447 """ 448 449 m = scipy.io.loadmat(file, mat_dtype=False) 450 451 key_dims = { 452 'Time': ('twtt',), 453 'GPS_time': ('slow_time',), 454 'Latitude': ('slow_time',), 455 'Longitude': ('slow_time',), 456 'Elevation': ('slow_time',), 457 'Roll': ('slow_time',), 458 'Pitch': ('slow_time',), 459 'Heading': ('slow_time',), 460 'Surface': ('slow_time',), 461 'Data': ('twtt', 'slow_time') 462 } 463 464 ds = xr.Dataset( 465 { 466 key: (dims, np.squeeze(m[key])) for key, dims in key_dims.items() if key in m 467 }, 468 coords={ 469 'twtt': ('twtt', np.squeeze(m['Time'])), 470 'slow_time': ('slow_time', pd.to_datetime(np.squeeze(m['GPS_time']), unit='s')), 471 } 472 ) 473 474 return ds 475 476 def get_collections(self) -> list: 477 """ 478 Get list of available STAC collections. 479 480 Returns 481 ------- 482 list 483 List of collection dictionaries with metadata. 484 """ 485 486 client = DuckdbClient() 487 return client.get_collections(self.stac_parquet_href) 488 489 def get_segments(self, collection_id: str) -> list: 490 """ 491 Get list of available segments within a collection/season. 492 493 Parameters 494 ---------- 495 collection_id : str 496 The ID of the STAC collection to query. 497 498 Returns 499 ------- 500 list 501 List of flight dictionaries with flight metadata. 502 """ 503 # Query STAC API for all items in collection (exclude geometry for better performance) 504 items = self.query_frames(collections=[collection_id], exclude_geometry=True) 505 506 if items is None or len(items) == 0: 507 print(f"No items found in collection '{collection_id}'") 508 return [] 509 510 # Group items by segment (opr:date + opr:segment) 511 segments = {} 512 for idx, item in items.iterrows(): 513 properties = item['properties'] 514 date = properties['opr:date'] 515 flight_num = properties['opr:segment'] 516 517 if date and flight_num is not None: 518 segment_path = f"{date}_{flight_num:02d}" 519 520 if segment_path not in segments: 521 segments[segment_path] = { 522 'segment_path': segment_path, 523 'date': date, 524 'flight_number': flight_num, 525 'collection': collection_id, 526 'frames': [], 527 'item_count': 0 528 } 529 530 segments[segment_path]['frames'].append(properties.get('opr:frame')) 531 segments[segment_path]['item_count'] += 1 532 533 # Sort segments by date and flight number 534 segment_list = list(segments.values()) 535 segment_list.sort(key=lambda x: (x['date'], x['flight_number'])) 536 537 return segment_list 538 539 def get_layers_files(self, segment: Union[xr.Dataset, dict], raise_errors=True) -> dict: 540 """ 541 Fetch layers from the CSARP_layers files 542 543 See https://gitlab.com/openpolarradar/opr/-/wikis/Layer-File-Guide for file formats 544 545 Parameters 546 ---------- 547 segment : Union[xr.Dataset, dict] 548 The flight information, which can be an xarray Dataset or a dictionary representing the STAC item. 549 raise_errors : bool, optional 550 If True, raise errors when layers cannot be found. 551 552 Returns 553 ------- 554 dict 555 A dictionary mapping layer IDs to their corresponding data. 556 """ 557 collection, segment_path, frame = self._extract_segment_info(segment) 558 559 # If we already have a STAC item, just use it. Otherwise, query to find the matching STAC items 560 if isinstance(segment, xr.Dataset): 561 properties = {} 562 if frame: 563 properties['opr:frame'] = frame 564 565 # Query STAC collection for CSARP_layer files matching this specific segment 566 567 # Get items from this specific segment 568 stac_items = self.query_frames(collections=[collection], segment_paths=[segment_path], properties=properties) 569 570 # Filter for items that have CSARP_layer assets 571 layer_items = [] 572 for idx, item in stac_items.iterrows(): 573 if 'CSARP_layer' in item['assets']: 574 layer_items.append(item) 575 else: 576 layer_items = [segment] if 'CSARP_layer' in segment.get('assets', {}) else [] 577 578 if not layer_items: 579 if raise_errors: 580 raise ValueError(f"No CSARP_layer files found for segment path {segment_path} in collection {collection}") 581 else: 582 return {} 583 584 # Load each layer file and combine them 585 layer_frames = [] 586 for item in layer_items: 587 layer_asset = item['assets']['CSARP_layer'] 588 if layer_asset and 'href' in layer_asset: 589 url = layer_asset['href'] 590 try: 591 layer_ds = self.load_layers_file(url) 592 layer_frames.append(layer_ds) 593 except Exception as e: 594 raise e # TODO 595 print(f"Warning: Failed to load layer file {url}: {e}") 596 continue 597 598 if not layer_frames: 599 if raise_errors: 600 raise ValueError(f"No valid CSARP_layer files could be loaded for segment {segment_path} in collection {collection}") 601 else: 602 return {} 603 604 # Concatenate all layer frames along slow_time dimension 605 layers_segment = xr.concat(layer_frames, dim='slow_time', combine_attrs='drop_conflicts', data_vars='minimal') 606 layers_segment = layers_segment.sortby('slow_time') 607 608 # Trim to bounds of the original dataset 609 layers_segment = self._trim_to_bounds(layers_segment, segment) 610 611 # Find the layer organization file to map layer IDs to names 612 # Layer organization files are one per directory. For example, the layer file: 613 # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/Data_20180105_03_003.mat 614 # would have a layer organization file at: 615 # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/layer_20180105_03.mat 616 617 layer_organization_file_url = re.sub(r'Data_(\d{8}_\d{2})_\d{3}.mat', r'layer_\1.mat', url) 618 layer_organization_ds = self._load_layer_organization(self._open_file(layer_organization_file_url)) 619 620 # Split into separate layers by ID 621 layers = {} 622 623 layer_ids = np.unique(layers_segment['layer']) 624 625 for i, layer_id in enumerate(layer_ids): 626 layer_id_int = int(layer_id) 627 layer_name = str(layer_organization_ds['lyr_name'].sel(lyr_id=layer_id_int).values.item().squeeze()) 628 layer_group = str(layer_organization_ds['lyr_group_name'].sel(lyr_id=layer_id_int).values.item().squeeze()) 629 if layer_group == '[]': 630 layer_group = '' 631 layer_display_name = f"{layer_group}:{layer_name}" 632 layer_data = {} 633 634 layer_ds = layers_segment.sel(layer=layer_id) 635 layers[layer_display_name] = layer_ds 636 637 return layers 638 639 def _trim_to_bounds(self, ds: xr.Dataset, ref: Union[xr.Dataset, dict]) -> xr.Dataset: 640 start_time, end_time = None, None 641 if isinstance(ref, xr.Dataset) and 'slow_time' in ref.coords: 642 start_time = ref['slow_time'].min() 643 end_time = ref['slow_time'].max() 644 else: 645 properties = ref.get('properties', {}) 646 if 'start_datetime' in properties and 'end_datetime' in properties: 647 start_time = pd.to_datetime(properties['start_datetime']) 648 end_time = pd.to_datetime(properties['end_datetime']) 649 650 if start_time: 651 return ds.sel(slow_time=slice(start_time, end_time)) 652 else: 653 return ds 654 655 def load_layers_file(self, url: str) -> xr.Dataset: 656 """ 657 Load layer data from a CSARP_layer file (either HDF5 or MATLAB format). 658 659 Parameters 660 ---------- 661 url : str 662 URL or path to the layer file 663 664 Returns 665 ------- 666 xr.Dataset 667 Layer data in a standardized format with coordinates: 668 - slow_time: GPS time converted to datetime 669 And data variables: 670 - twtt: Two-way travel time for each layer 671 - quality: Quality values for each layer 672 - type: Type values for each layer 673 - lat, lon, elev: Geographic coordinates 674 - id: Layer IDs 675 """ 676 677 file = self._open_file(url) 678 679 ds = self._load_layers(file) 680 681 # Add the source URL as an attribute 682 ds.attrs['source_url'] = url 683 684 # Apply common manipulations to match the expected structure 685 # Convert GPS time to datetime coordinate 686 if 'gps_time' in ds.variables: 687 slow_time_dt = pd.to_datetime(ds['gps_time'].values, unit='s') 688 ds = ds.assign_coords(slow_time=('slow_time', slow_time_dt)) 689 690 # Set slow_time as the main coordinate and remove gps_time from data_vars 691 if 'slow_time' not in ds.dims: 692 ds = ds.swap_dims({'gps_time': 'slow_time'}) 693 694 # Remove gps_time from data_vars if it exists there to avoid conflicts 695 if ('gps_time' in ds.data_vars) or ('gps_time' in ds.coords): 696 ds = ds.drop_vars('gps_time') 697 698 # Sort by slow_time if it exists 699 if 'slow_time' in ds.coords: 700 ds = ds.sortby('slow_time') 701 702 return ds 703 704 def _load_layers(self, file) -> xr.Dataset: 705 """ 706 Load layer data file using hdf5storage 707 708 Parameters: 709 file : str 710 Path to the layer file 711 Returns: xr.Dataset 712 """ 713 714 d = hdf5storage.loadmat(file, appendmat=False) 715 716 # Ensure 'id' is at least 1D for the layer dimension 717 id_data = np.atleast_1d(d['id'].squeeze()) 718 719 # For 2D arrays with (layer, slow_time) dimensions, ensure they remain 2D 720 # even when there's only one layer 721 quality_data = np.atleast_2d(d['quality'].squeeze()) 722 if quality_data.shape[0] == 1 or quality_data.ndim == 1: 723 quality_data = quality_data.reshape(len(id_data), -1) 724 725 twtt_data = np.atleast_2d(d['twtt'].squeeze()) 726 if twtt_data.shape[0] == 1 or twtt_data.ndim == 1: 727 twtt_data = twtt_data.reshape(len(id_data), -1) 728 729 type_data = np.atleast_2d(d['type'].squeeze()) 730 if type_data.shape[0] == 1 or type_data.ndim == 1: 731 type_data = type_data.reshape(len(id_data), -1) 732 733 ds = xr.Dataset({ 734 'file_type': ((), d['file_type'].squeeze()), 735 'file_version': ((), d['file_version'].squeeze()), 736 'elev': (('slow_time',), d['elev'].squeeze()), 737 #'gps_source': ((), d['gps_source'].squeeze()), 738 'gps_time': (('slow_time',), d['gps_time'].squeeze()), 739 'id': (('layer',), id_data), 740 'lat': (('slow_time',), d['lat'].squeeze()), 741 'lon': (('slow_time',), d['lon'].squeeze()), 742 'quality': (('layer', 'slow_time'), quality_data), 743 'twtt': (('layer', 'slow_time'), twtt_data), 744 'type': (('layer', 'slow_time'), type_data), 745 }) 746 747 ds = ds.assign_coords({'layer': ds['id'], 'slow_time': ds['gps_time']}) 748 749 return ds 750 751 def _load_layer_organization(self, file) -> xr.Dataset: 752 """ 753 Load a layer organization file 754 755 Parameters 756 ---------- 757 file : str 758 Path to the HDF5 layer organization file 759 760 Returns 761 ------- 762 xr.Dataset 763 Raw layer data from HDF5 file 764 """ 765 d = hdf5storage.loadmat(file, appendmat=False) 766 ds = xr.Dataset({ 767 'file_type': ((), d['file_type'].squeeze()), 768 'file_version': ((), d['file_version'].squeeze()), 769 'lyr_age': (('lyr_id',), np.atleast_1d(d['lyr_age'].squeeze())), 770 'lyr_age_source': (('lyr_id',), np.atleast_1d(d['lyr_age_source'].squeeze())), 771 'lyr_desc': (('lyr_id',), np.atleast_1d(d['lyr_desc'].squeeze())), 772 'lyr_group_name': (('lyr_id',), np.atleast_1d(d['lyr_group_name'].squeeze())), 773 'lyr_id': (('lyr_id',), np.atleast_1d(d['lyr_id'].squeeze())), 774 'lyr_name': (('lyr_id',), np.atleast_1d(d['lyr_name'].squeeze())), 775 'lyr_order': (('lyr_id',), np.atleast_1d(d['lyr_order'].squeeze())), 776 }, attrs={'param': d['param']}) 777 778 ds = ds.set_index(lyr_id='lyr_id') 779 780 return ds 781 782 def get_layers_db(self, flight: Union[xr.Dataset, dict], include_geometry=True, raise_errors=True) -> dict: 783 """ 784 Fetch layer data from the OPS API 785 786 Parameters 787 ---------- 788 flight : Union[xr.Dataset, dict] 789 The flight data, which can be an xarray Dataset or a dictionary. 790 include_geometry : bool, optional 791 If True, include geometry information in the returned layers. 792 793 Returns 794 ------- 795 dict 796 A dictionary mapping layer IDs to their corresponding data. 797 """ 798 799 collection, segment_path, _ = self._extract_segment_info(flight) 800 801 if 'Antarctica' in collection: 802 location = 'antarctic' 803 elif 'Greenland' in collection: 804 location = 'arctic' 805 else: 806 raise ValueError("Dataset does not belong to a recognized location (Antarctica or Greenland).") 807 808 layer_points = ops_api.get_layer_points( 809 segment_name=segment_path, 810 season_name=collection, 811 location=location, 812 include_geometry=include_geometry 813 ) 814 815 if layer_points['status'] != 1: 816 if raise_errors: 817 raise ValueError(f"Failed to fetch layer points. Received response with status {layer_points['status']}.") 818 else: 819 return {} 820 821 layer_ds_raw = xr.Dataset( 822 {k: (['gps_time'], v) for k, v in layer_points['data'].items() if k != 'gps_time'}, 823 coords={'gps_time': layer_points['data']['gps_time']} 824 ) 825 # Split into a dictionary of layers based on lyr_id 826 layer_ids = set(layer_ds_raw['lyr_id'].to_numpy()) 827 layer_ids = [int(layer_id) for layer_id in layer_ids if not np.isnan(layer_id)] 828 829 # Get the mapping of layer IDs to names 830 if self.db_layers_metadata is None: 831 layer_metadata = ops_api.get_layer_metadata() 832 if layer_metadata['status'] == 1: 833 df = pd.DataFrame(layer_metadata['data']) 834 df['display_name'] = df['lyr_group_name'] + ":" + df['lyr_name'] 835 self.db_layers_metadata = df 836 else: 837 raise ValueError("Failed to fetch layer metadata from OPS API.") 838 839 layers = {} 840 for layer_id in layer_ids: 841 layer_name = self.db_layers_metadata.loc[self.db_layers_metadata['lyr_id'] == layer_id, 'display_name'].values[0] 842 843 l = layer_ds_raw.where(layer_ds_raw['lyr_id'] == layer_id, drop=True) 844 845 l = l.sortby('gps_time') 846 847 l = l.rename({'gps_time': 'slow_time'}) 848 l = l.set_coords(['slow_time']) 849 850 l['slow_time'] = pd.to_datetime(l['slow_time'].values, unit='s') 851 852 # Filter to the same time range as flight 853 l = self._trim_to_bounds(l, flight) 854 855 layers[layer_name] = l 856 857 return layers 858 859 def get_layers(self, ds : xr.Dataset, source: str = 'auto', include_geometry=True, raise_errors=True) -> dict: 860 """ 861 Fetch layer data for a given flight dataset, either from CSARP_layer files or the OPS Database API. 862 863 Parameters 864 ---------- 865 ds : xr.Dataset 866 The flight dataset containing attributes for collection and segment. 867 source : str, optional 868 The source to fetch layers from: 'auto', 'files', or 'db' (default is 'auto'). 869 include_geometry : bool, optional 870 If True, include geometry information when fetching from the API.\ 871 raise_errors : bool, optional 872 If True, raise errors when layers cannot be found. 873 874 Returns 875 ------- 876 dict 877 A dictionary mapping layer IDs to their corresponding data. 878 """ 879 880 if source == 'auto': 881 # Try to get layers from files first 882 try: 883 layers = self.get_layers_files(ds, raise_errors=True) 884 return layers 885 except: 886 # Fallback to API if no layers found in files 887 return self.get_layers_db(ds, include_geometry=include_geometry, raise_errors=raise_errors) 888 elif source == 'files': 889 return self.get_layers_files(ds, raise_errors=raise_errors) 890 elif source == 'db': 891 return self.get_layers_db(ds, include_geometry=include_geometry, raise_errors=raise_errors) 892 else: 893 raise ValueError("Invalid source specified. Must be one of: 'auto', 'files', 'db'.")
26class OPRConnection: 27 def __init__(self, 28 collection_url: str = "https://data.cresis.ku.edu/data/", 29 cache_dir: str = None, 30 stac_parquet_href: str = "gs://opr_stac/catalog/**/*.parquet"): 31 """ 32 Initialize the OPRConnection with a collection URL and optional cache directory. 33 34 Parameters 35 ---------- 36 collection_url : str 37 The base URL for the OPR data collection. 38 cache_dir : str, optional 39 Directory to cache downloaded data. 40 """ 41 self.collection_url = collection_url 42 self.cache_dir = cache_dir 43 self.stac_parquet_href = stac_parquet_href 44 45 self.fsspec_cache_kwargs = {} 46 self.fsspec_url_prefix = '' 47 if cache_dir: 48 self.fsspec_cache_kwargs['cache_storage'] = cache_dir 49 self.fsspec_cache_kwargs['check_files'] = True 50 self.fsspec_url_prefix = 'filecache::' 51 52 self.db_layers_metadata = None # Cache for OPS database layers metadata 53 54 def _open_file(self, url): 55 """Helper method to open files with appropriate caching.""" 56 if self.fsspec_url_prefix: 57 return fsspec.open_local(f"{self.fsspec_url_prefix}{url}", filecache=self.fsspec_cache_kwargs) 58 else: 59 return fsspec.open_local(f"simplecache::{url}", simplecache=self.fsspec_cache_kwargs) 60 61 def _extract_segment_info(self, segment): 62 """Extract collection, segment_path, and frame from Dataset or dict.""" 63 if isinstance(segment, xr.Dataset): 64 return (segment.attrs.get('collection'), 65 segment.attrs.get('segment_path'), 66 segment.attrs.get('frame')) 67 else: 68 return (segment['collection'], 69 f"{segment['properties'].get('opr:date')}_{segment['properties'].get('opr:segment'):02d}", 70 segment['properties'].get('opr:frame')) 71 72 def query_frames(self, collections: list[str] = None, segment_paths: list[str] = None, 73 geometry = None, date_range: tuple = None, properties: dict = {}, 74 max_items: int = None, exclude_geometry: bool = False, 75 search_kwargs: dict = {}) -> gpd.GeoDataFrame: 76 """ 77 Query for radar frames based on various search criteria. Each parameter is 78 treated as an independent criteria. If multiple parameters are passed, they are 79 combined with AND logic. 80 81 A list of values may be passed to most parameters. If so, any values in the list 82 will be treated as a match. 83 84 Parameters 85 ---------- 86 collections : list[str], optional 87 List of collection names to filter by (e.g., "2022_Antarctica_BaslerMKB"). 88 segment_paths : list[str], optional 89 List of segment paths to filter by (e.g., "20230126_01"). 90 geometry : optional 91 Geospatial geometry to filter by (e.g., a shapely geometry object). 92 date_range : tuple, optional 93 Date range to filter by (e.g., (start_date, end_date)). 94 properties : dict, optional 95 Additional properties to include in the query. 96 max_items : int, optional 97 Maximum number of items to return. 98 exclude_geometry : bool, optional 99 If True, exclude the geometry field from the response to reduce size. 100 search_kwargs : dict, optional 101 Additional keyword arguments to pass to the search method. 102 103 Returns 104 ------- 105 gpd.GeoDataFrame 106 GeoDataFrame containing the STAC frames matching the query criteria. 107 """ 108 109 search_params = {} 110 111 # Exclude geometry -- do not return the geometry field to reduce response size 112 if exclude_geometry: 113 search_params['exclude'] = ['geometry'] 114 115 # Handle collections (seasons) 116 if collections is not None: 117 search_params['collections'] = [collections] if isinstance(collections, str) else collections 118 119 # Handle geometry filtering 120 if geometry is not None: 121 if hasattr(geometry, '__geo_interface__'): 122 geometry = geometry.__geo_interface__ 123 124 # Fix geometries that cross the antimeridian 125 geometry = antimeridian.fix_geojson(geometry, reverse=True) 126 127 search_params['intersects'] = geometry 128 129 # Handle date range filtering 130 if date_range is not None: 131 search_params['datetime'] = date_range 132 133 # Handle max_items 134 if max_items is not None: 135 search_params['limit'] = max_items 136 else: 137 search_params['limit'] = 1000000 138 139 # Handle segment_paths filtering using CQL2 140 filter_conditions = [] 141 142 if segment_paths is not None: 143 segment_paths = [segment_paths] if isinstance(segment_paths, str) else segment_paths 144 145 # Create OR conditions for segment paths 146 segment_conditions = [] 147 for segment_path in segment_paths: 148 try: 149 date_str, segment_num_str = segment_path.split('_') 150 segment_num = int(segment_num_str) 151 152 # Create AND condition for this specific segment 153 segment_condition = { 154 "op": "and", 155 "args": [ 156 { 157 "op": "=", 158 "args": [{"property": "opr:date"}, date_str] 159 }, 160 { 161 "op": "=", 162 "args": [{"property": "opr:segment"}, segment_num] 163 } 164 ] 165 } 166 segment_conditions.append(segment_condition) 167 except ValueError: 168 print(f"Warning: Invalid segment_path format '{segment_path}'. Expected format: YYYYMMDD_NN") 169 continue 170 171 if segment_conditions: 172 if len(segment_conditions) == 1: 173 filter_conditions.append(segment_conditions[0]) 174 else: 175 # Multiple segments - combine with OR 176 filter_conditions.append({ 177 "op": "or", 178 "args": segment_conditions 179 }) 180 181 # Add any additional property filters 182 for key, value in properties.items(): 183 filter_conditions.append({ 184 "op": "=", 185 "args": [{"property": key}, value] 186 }) 187 188 # Combine all filter conditions with AND 189 if filter_conditions: 190 if len(filter_conditions) == 1: 191 filter_expr = filter_conditions[0] 192 else: 193 filter_expr = { 194 "op": "and", 195 "args": filter_conditions 196 } 197 198 search_params['filter'] = filter_expr 199 200 # Add any extra kwargs to search 201 search_params.update(search_kwargs) 202 203 #print(search_params) # TODO: Remove 204 205 # Perform the search 206 # from rustac import DuckdbClient 207 client = DuckdbClient() 208 items = client.search(self.stac_parquet_href, **search_params) 209 if isinstance(items, dict): 210 items = items['features'] 211 212 if not items or len(items) == 0: 213 warnings.warn("No items found matching the query criteria", UserWarning) 214 return None 215 216 # Convert to GeoDataFrame 217 items_df = gpd.GeoDataFrame(items) 218 # Set index 219 items_df = items_df.set_index(items_df['id']) 220 items_df.index.name = 'stac_item_id' 221 # Set the geometry column 222 if 'geometry' in items_df.columns and not exclude_geometry: 223 items_df = items_df.set_geometry(items_df['geometry'].apply(shapely.geometry.shape)) 224 items_df.crs = "EPSG:4326" 225 226 # Reorder the columns, leaving any extra columns at the end 227 desired_order = ['collection', 'geometry', 'properties', 'assets'] 228 items_df = items_df[[col for col in desired_order if col in items_df.columns] + list(items_df.columns.difference(desired_order))] 229 230 return items_df 231 232 def load_frames(self, stac_items: gpd.GeoDataFrame, 233 data_product: str = "CSARP_standard", 234 merge_flights: bool = False, 235 skip_errors: bool = False, 236 ) -> Union[list[xr.Dataset], xr.Dataset]: 237 """ 238 Load multiple radar frames from a list of STAC items. 239 240 Parameters 241 ---------- 242 stac_items : gpd.GeoDataFrame 243 The STAC items containing asset URLs. 244 data_product : str, optional 245 The data product to load (default is "CSARP_standard"). 246 merge_flights : bool, optional 247 Whether to merge frames from the same flight (default is False). 248 skip_errors : bool, optional 249 Whether to skip errors and continue loading other frames (default is False). 250 251 Returns 252 ------- 253 list[xr.Dataset] or xr.Dataset 254 List of loaded radar frames as xarray Datasets or a single merged Dataset if there is only one segment and merge_flights is True. 255 """ 256 frames = [] 257 258 for idx, item in stac_items.iterrows(): 259 try: 260 frame = self.load_frame(item, data_product) 261 frames.append(frame) 262 except Exception as e: 263 print(f"Error loading frame for item {item.get('id', 'unknown')}: {e}") 264 if skip_errors: 265 continue 266 else: 267 raise e 268 269 if merge_flights: 270 return opr_tools.merge_frames(frames) 271 else: 272 return frames 273 274 def load_frame(self, stac_item, data_product: str = "CSARP_standard") -> xr.Dataset: 275 """ 276 Load a radar frame from a STAC item. 277 278 Parameters 279 ---------- 280 stac_item 281 The STAC item containing asset URLs. 282 data_product : str, optional 283 The data product to load (default is "CSARP_standard"). 284 285 Returns 286 ------- 287 xr.Dataset 288 The loaded radar frame as an xarray Dataset. 289 """ 290 291 assets = stac_item['assets'] 292 293 # Get the data asset 294 data_asset = assets.get(data_product) 295 if not data_asset: 296 available_assets = list(assets.keys()) 297 raise ValueError(f"No {data_product} asset found. Available assets: {available_assets}") 298 299 # Get the URL from the asset 300 url = data_asset.get('href') 301 if not url: 302 raise ValueError(f"No href found in {data_product} asset") 303 304 # Load the frame using the existing method 305 return self.load_frame_url(url) 306 307 def load_frame_url(self, url: str) -> xr.Dataset: 308 """ 309 Load a radar frame from a given URL. 310 311 Parameters 312 ---------- 313 url : str 314 The URL of the radar frame data. 315 316 Returns 317 ------- 318 xr.Dataset 319 The loaded radar frame as an xarray Dataset. 320 """ 321 322 file = self._open_file(url) 323 324 filetype = None 325 try: 326 ds = self._load_frame_hdf5(file) 327 filetype = 'hdf5' 328 except OSError: 329 ds = self._load_frame_matlab(file) 330 filetype = 'matlab' 331 332 # Add the source URL as an attribute 333 ds.attrs['source_url'] = url 334 335 # Apply CF-compliant attributes 336 ds = apply_cf_compliant_attrs(ds) 337 338 # Get the season and segment from the URL 339 match = re.search(r'(\d{4}_\w+_[A-Za-z0-9]+)\/([\w_]+)\/[\d_]+\/[\w]+(\d{8}_\d{2}_\d{3})', url) 340 if match: 341 collection, data_product, granule = match.groups() 342 date, segment_id, frame_id = granule.split('_') 343 ds.attrs['collection'] = collection 344 ds.attrs['data_product'] = data_product 345 ds.attrs['granule'] = granule 346 ds.attrs['segment_path'] = f"{date}_{segment_id}" 347 ds.attrs['date_str'] = date 348 ds.attrs['segment'] = int(segment_id) 349 ds.attrs['frame'] = int(frame_id) 350 351 # Load citation information 352 result = ops_api.get_segment_metadata(segment_name=ds.attrs['segment_path'], season_name=collection) 353 if result: 354 if isinstance(result['data'], str): 355 warnings.warn(f"Warning: Unexpected result from ops_api: {result['data']}", UserWarning) 356 else: 357 result_data = {} 358 for key, value in result['data'].items(): 359 if len(value) == 1: 360 result_data[key] = value[0] 361 elif len(value) > 1: 362 result_data[key] = set(value) 363 364 if 'dois' in result_data: 365 ds.attrs['doi'] = result_data['dois'] 366 if 'rors' in result_data: 367 ds.attrs['ror'] = result_data['rors'] 368 if 'funding_sources' in result_data: 369 ds.attrs['funder_text'] = result_data['funding_sources'] 370 371 # Add the rest of the Matlab parameters 372 if filetype == 'hdf5': 373 ds.attrs['mimetype'] = 'application/x-hdf5' 374 ds.attrs.update(decode_hdf5_matlab_variable(h5py.File(file, 'r'), 375 skip_variables=True, 376 skip_errors=True)) 377 elif filetype == 'matlab': 378 ds.attrs['mimetype'] = 'application/x-matlab-data' 379 ds.attrs.update(extract_legacy_mat_attributes(file, 380 skip_keys=ds.keys(), 381 skip_errors=True)) 382 383 return ds 384 385 def _load_frame_hdf5(self, file) -> xr.Dataset: 386 """ 387 Load a radar frame from an HDF5 file. 388 389 Parameters 390 ---------- 391 file : 392 The path to the HDF5 file containing radar frame data. 393 394 Returns 395 ------- 396 xr.Dataset 397 The loaded radar frame as an xarray Dataset. 398 """ 399 400 ds = xr.open_dataset(file, engine='h5netcdf', phony_dims='sort') 401 402 # Re-arrange variables to provide useful dimensions and coordinates 403 404 ds = ds.squeeze() # Drop the singleton dimensions matlab adds 405 406 ds = ds.rename({ # Label the dimensions with more useful names 407 ds.Data.dims[0]: 'slow_time_idx', 408 ds.Data.dims[1]: 'twtt_idx', 409 }) 410 411 # Make variables with no dimensions into scalar attributes 412 for var in ds.data_vars: 413 if ds[var].ndim == 0: 414 ds.attrs[var] = ds[var].item() 415 ds = ds.drop_vars(var) 416 417 # Make the file_type an attribute 418 if 'file_type' in ds.data_vars: 419 ds.attrs['file_type'] = ds['file_type'].to_numpy() 420 ds = ds.drop_vars('file_type') 421 422 # Name the two time coordinates 423 ds = ds.rename({'Time': 'twtt', 'GPS_time': 'slow_time'}) 424 ds = ds.set_coords(['slow_time', 'twtt']) 425 426 slow_time_1d = pd.to_datetime(ds['slow_time'].values, unit='s') 427 ds = ds.assign_coords(slow_time=('slow_time_idx', slow_time_1d)) 428 429 # Make twtt and slow_time the indexing coordinates 430 ds = ds.swap_dims({'twtt_idx': 'twtt'}) 431 ds = ds.swap_dims({'slow_time_idx': 'slow_time'}) 432 433 return ds 434 435 def _load_frame_matlab(self, file) -> xr.Dataset: 436 """ 437 Load a radar frame from a MATLAB file. 438 439 Parameters 440 ---------- 441 file : 442 The path to the MATLAB file containing radar frame data. 443 444 Returns 445 ------- 446 xr.Dataset 447 The loaded radar frame as an xarray Dataset. 448 """ 449 450 m = scipy.io.loadmat(file, mat_dtype=False) 451 452 key_dims = { 453 'Time': ('twtt',), 454 'GPS_time': ('slow_time',), 455 'Latitude': ('slow_time',), 456 'Longitude': ('slow_time',), 457 'Elevation': ('slow_time',), 458 'Roll': ('slow_time',), 459 'Pitch': ('slow_time',), 460 'Heading': ('slow_time',), 461 'Surface': ('slow_time',), 462 'Data': ('twtt', 'slow_time') 463 } 464 465 ds = xr.Dataset( 466 { 467 key: (dims, np.squeeze(m[key])) for key, dims in key_dims.items() if key in m 468 }, 469 coords={ 470 'twtt': ('twtt', np.squeeze(m['Time'])), 471 'slow_time': ('slow_time', pd.to_datetime(np.squeeze(m['GPS_time']), unit='s')), 472 } 473 ) 474 475 return ds 476 477 def get_collections(self) -> list: 478 """ 479 Get list of available STAC collections. 480 481 Returns 482 ------- 483 list 484 List of collection dictionaries with metadata. 485 """ 486 487 client = DuckdbClient() 488 return client.get_collections(self.stac_parquet_href) 489 490 def get_segments(self, collection_id: str) -> list: 491 """ 492 Get list of available segments within a collection/season. 493 494 Parameters 495 ---------- 496 collection_id : str 497 The ID of the STAC collection to query. 498 499 Returns 500 ------- 501 list 502 List of flight dictionaries with flight metadata. 503 """ 504 # Query STAC API for all items in collection (exclude geometry for better performance) 505 items = self.query_frames(collections=[collection_id], exclude_geometry=True) 506 507 if items is None or len(items) == 0: 508 print(f"No items found in collection '{collection_id}'") 509 return [] 510 511 # Group items by segment (opr:date + opr:segment) 512 segments = {} 513 for idx, item in items.iterrows(): 514 properties = item['properties'] 515 date = properties['opr:date'] 516 flight_num = properties['opr:segment'] 517 518 if date and flight_num is not None: 519 segment_path = f"{date}_{flight_num:02d}" 520 521 if segment_path not in segments: 522 segments[segment_path] = { 523 'segment_path': segment_path, 524 'date': date, 525 'flight_number': flight_num, 526 'collection': collection_id, 527 'frames': [], 528 'item_count': 0 529 } 530 531 segments[segment_path]['frames'].append(properties.get('opr:frame')) 532 segments[segment_path]['item_count'] += 1 533 534 # Sort segments by date and flight number 535 segment_list = list(segments.values()) 536 segment_list.sort(key=lambda x: (x['date'], x['flight_number'])) 537 538 return segment_list 539 540 def get_layers_files(self, segment: Union[xr.Dataset, dict], raise_errors=True) -> dict: 541 """ 542 Fetch layers from the CSARP_layers files 543 544 See https://gitlab.com/openpolarradar/opr/-/wikis/Layer-File-Guide for file formats 545 546 Parameters 547 ---------- 548 segment : Union[xr.Dataset, dict] 549 The flight information, which can be an xarray Dataset or a dictionary representing the STAC item. 550 raise_errors : bool, optional 551 If True, raise errors when layers cannot be found. 552 553 Returns 554 ------- 555 dict 556 A dictionary mapping layer IDs to their corresponding data. 557 """ 558 collection, segment_path, frame = self._extract_segment_info(segment) 559 560 # If we already have a STAC item, just use it. Otherwise, query to find the matching STAC items 561 if isinstance(segment, xr.Dataset): 562 properties = {} 563 if frame: 564 properties['opr:frame'] = frame 565 566 # Query STAC collection for CSARP_layer files matching this specific segment 567 568 # Get items from this specific segment 569 stac_items = self.query_frames(collections=[collection], segment_paths=[segment_path], properties=properties) 570 571 # Filter for items that have CSARP_layer assets 572 layer_items = [] 573 for idx, item in stac_items.iterrows(): 574 if 'CSARP_layer' in item['assets']: 575 layer_items.append(item) 576 else: 577 layer_items = [segment] if 'CSARP_layer' in segment.get('assets', {}) else [] 578 579 if not layer_items: 580 if raise_errors: 581 raise ValueError(f"No CSARP_layer files found for segment path {segment_path} in collection {collection}") 582 else: 583 return {} 584 585 # Load each layer file and combine them 586 layer_frames = [] 587 for item in layer_items: 588 layer_asset = item['assets']['CSARP_layer'] 589 if layer_asset and 'href' in layer_asset: 590 url = layer_asset['href'] 591 try: 592 layer_ds = self.load_layers_file(url) 593 layer_frames.append(layer_ds) 594 except Exception as e: 595 raise e # TODO 596 print(f"Warning: Failed to load layer file {url}: {e}") 597 continue 598 599 if not layer_frames: 600 if raise_errors: 601 raise ValueError(f"No valid CSARP_layer files could be loaded for segment {segment_path} in collection {collection}") 602 else: 603 return {} 604 605 # Concatenate all layer frames along slow_time dimension 606 layers_segment = xr.concat(layer_frames, dim='slow_time', combine_attrs='drop_conflicts', data_vars='minimal') 607 layers_segment = layers_segment.sortby('slow_time') 608 609 # Trim to bounds of the original dataset 610 layers_segment = self._trim_to_bounds(layers_segment, segment) 611 612 # Find the layer organization file to map layer IDs to names 613 # Layer organization files are one per directory. For example, the layer file: 614 # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/Data_20180105_03_003.mat 615 # would have a layer organization file at: 616 # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/layer_20180105_03.mat 617 618 layer_organization_file_url = re.sub(r'Data_(\d{8}_\d{2})_\d{3}.mat', r'layer_\1.mat', url) 619 layer_organization_ds = self._load_layer_organization(self._open_file(layer_organization_file_url)) 620 621 # Split into separate layers by ID 622 layers = {} 623 624 layer_ids = np.unique(layers_segment['layer']) 625 626 for i, layer_id in enumerate(layer_ids): 627 layer_id_int = int(layer_id) 628 layer_name = str(layer_organization_ds['lyr_name'].sel(lyr_id=layer_id_int).values.item().squeeze()) 629 layer_group = str(layer_organization_ds['lyr_group_name'].sel(lyr_id=layer_id_int).values.item().squeeze()) 630 if layer_group == '[]': 631 layer_group = '' 632 layer_display_name = f"{layer_group}:{layer_name}" 633 layer_data = {} 634 635 layer_ds = layers_segment.sel(layer=layer_id) 636 layers[layer_display_name] = layer_ds 637 638 return layers 639 640 def _trim_to_bounds(self, ds: xr.Dataset, ref: Union[xr.Dataset, dict]) -> xr.Dataset: 641 start_time, end_time = None, None 642 if isinstance(ref, xr.Dataset) and 'slow_time' in ref.coords: 643 start_time = ref['slow_time'].min() 644 end_time = ref['slow_time'].max() 645 else: 646 properties = ref.get('properties', {}) 647 if 'start_datetime' in properties and 'end_datetime' in properties: 648 start_time = pd.to_datetime(properties['start_datetime']) 649 end_time = pd.to_datetime(properties['end_datetime']) 650 651 if start_time: 652 return ds.sel(slow_time=slice(start_time, end_time)) 653 else: 654 return ds 655 656 def load_layers_file(self, url: str) -> xr.Dataset: 657 """ 658 Load layer data from a CSARP_layer file (either HDF5 or MATLAB format). 659 660 Parameters 661 ---------- 662 url : str 663 URL or path to the layer file 664 665 Returns 666 ------- 667 xr.Dataset 668 Layer data in a standardized format with coordinates: 669 - slow_time: GPS time converted to datetime 670 And data variables: 671 - twtt: Two-way travel time for each layer 672 - quality: Quality values for each layer 673 - type: Type values for each layer 674 - lat, lon, elev: Geographic coordinates 675 - id: Layer IDs 676 """ 677 678 file = self._open_file(url) 679 680 ds = self._load_layers(file) 681 682 # Add the source URL as an attribute 683 ds.attrs['source_url'] = url 684 685 # Apply common manipulations to match the expected structure 686 # Convert GPS time to datetime coordinate 687 if 'gps_time' in ds.variables: 688 slow_time_dt = pd.to_datetime(ds['gps_time'].values, unit='s') 689 ds = ds.assign_coords(slow_time=('slow_time', slow_time_dt)) 690 691 # Set slow_time as the main coordinate and remove gps_time from data_vars 692 if 'slow_time' not in ds.dims: 693 ds = ds.swap_dims({'gps_time': 'slow_time'}) 694 695 # Remove gps_time from data_vars if it exists there to avoid conflicts 696 if ('gps_time' in ds.data_vars) or ('gps_time' in ds.coords): 697 ds = ds.drop_vars('gps_time') 698 699 # Sort by slow_time if it exists 700 if 'slow_time' in ds.coords: 701 ds = ds.sortby('slow_time') 702 703 return ds 704 705 def _load_layers(self, file) -> xr.Dataset: 706 """ 707 Load layer data file using hdf5storage 708 709 Parameters: 710 file : str 711 Path to the layer file 712 Returns: xr.Dataset 713 """ 714 715 d = hdf5storage.loadmat(file, appendmat=False) 716 717 # Ensure 'id' is at least 1D for the layer dimension 718 id_data = np.atleast_1d(d['id'].squeeze()) 719 720 # For 2D arrays with (layer, slow_time) dimensions, ensure they remain 2D 721 # even when there's only one layer 722 quality_data = np.atleast_2d(d['quality'].squeeze()) 723 if quality_data.shape[0] == 1 or quality_data.ndim == 1: 724 quality_data = quality_data.reshape(len(id_data), -1) 725 726 twtt_data = np.atleast_2d(d['twtt'].squeeze()) 727 if twtt_data.shape[0] == 1 or twtt_data.ndim == 1: 728 twtt_data = twtt_data.reshape(len(id_data), -1) 729 730 type_data = np.atleast_2d(d['type'].squeeze()) 731 if type_data.shape[0] == 1 or type_data.ndim == 1: 732 type_data = type_data.reshape(len(id_data), -1) 733 734 ds = xr.Dataset({ 735 'file_type': ((), d['file_type'].squeeze()), 736 'file_version': ((), d['file_version'].squeeze()), 737 'elev': (('slow_time',), d['elev'].squeeze()), 738 #'gps_source': ((), d['gps_source'].squeeze()), 739 'gps_time': (('slow_time',), d['gps_time'].squeeze()), 740 'id': (('layer',), id_data), 741 'lat': (('slow_time',), d['lat'].squeeze()), 742 'lon': (('slow_time',), d['lon'].squeeze()), 743 'quality': (('layer', 'slow_time'), quality_data), 744 'twtt': (('layer', 'slow_time'), twtt_data), 745 'type': (('layer', 'slow_time'), type_data), 746 }) 747 748 ds = ds.assign_coords({'layer': ds['id'], 'slow_time': ds['gps_time']}) 749 750 return ds 751 752 def _load_layer_organization(self, file) -> xr.Dataset: 753 """ 754 Load a layer organization file 755 756 Parameters 757 ---------- 758 file : str 759 Path to the HDF5 layer organization file 760 761 Returns 762 ------- 763 xr.Dataset 764 Raw layer data from HDF5 file 765 """ 766 d = hdf5storage.loadmat(file, appendmat=False) 767 ds = xr.Dataset({ 768 'file_type': ((), d['file_type'].squeeze()), 769 'file_version': ((), d['file_version'].squeeze()), 770 'lyr_age': (('lyr_id',), np.atleast_1d(d['lyr_age'].squeeze())), 771 'lyr_age_source': (('lyr_id',), np.atleast_1d(d['lyr_age_source'].squeeze())), 772 'lyr_desc': (('lyr_id',), np.atleast_1d(d['lyr_desc'].squeeze())), 773 'lyr_group_name': (('lyr_id',), np.atleast_1d(d['lyr_group_name'].squeeze())), 774 'lyr_id': (('lyr_id',), np.atleast_1d(d['lyr_id'].squeeze())), 775 'lyr_name': (('lyr_id',), np.atleast_1d(d['lyr_name'].squeeze())), 776 'lyr_order': (('lyr_id',), np.atleast_1d(d['lyr_order'].squeeze())), 777 }, attrs={'param': d['param']}) 778 779 ds = ds.set_index(lyr_id='lyr_id') 780 781 return ds 782 783 def get_layers_db(self, flight: Union[xr.Dataset, dict], include_geometry=True, raise_errors=True) -> dict: 784 """ 785 Fetch layer data from the OPS API 786 787 Parameters 788 ---------- 789 flight : Union[xr.Dataset, dict] 790 The flight data, which can be an xarray Dataset or a dictionary. 791 include_geometry : bool, optional 792 If True, include geometry information in the returned layers. 793 794 Returns 795 ------- 796 dict 797 A dictionary mapping layer IDs to their corresponding data. 798 """ 799 800 collection, segment_path, _ = self._extract_segment_info(flight) 801 802 if 'Antarctica' in collection: 803 location = 'antarctic' 804 elif 'Greenland' in collection: 805 location = 'arctic' 806 else: 807 raise ValueError("Dataset does not belong to a recognized location (Antarctica or Greenland).") 808 809 layer_points = ops_api.get_layer_points( 810 segment_name=segment_path, 811 season_name=collection, 812 location=location, 813 include_geometry=include_geometry 814 ) 815 816 if layer_points['status'] != 1: 817 if raise_errors: 818 raise ValueError(f"Failed to fetch layer points. Received response with status {layer_points['status']}.") 819 else: 820 return {} 821 822 layer_ds_raw = xr.Dataset( 823 {k: (['gps_time'], v) for k, v in layer_points['data'].items() if k != 'gps_time'}, 824 coords={'gps_time': layer_points['data']['gps_time']} 825 ) 826 # Split into a dictionary of layers based on lyr_id 827 layer_ids = set(layer_ds_raw['lyr_id'].to_numpy()) 828 layer_ids = [int(layer_id) for layer_id in layer_ids if not np.isnan(layer_id)] 829 830 # Get the mapping of layer IDs to names 831 if self.db_layers_metadata is None: 832 layer_metadata = ops_api.get_layer_metadata() 833 if layer_metadata['status'] == 1: 834 df = pd.DataFrame(layer_metadata['data']) 835 df['display_name'] = df['lyr_group_name'] + ":" + df['lyr_name'] 836 self.db_layers_metadata = df 837 else: 838 raise ValueError("Failed to fetch layer metadata from OPS API.") 839 840 layers = {} 841 for layer_id in layer_ids: 842 layer_name = self.db_layers_metadata.loc[self.db_layers_metadata['lyr_id'] == layer_id, 'display_name'].values[0] 843 844 l = layer_ds_raw.where(layer_ds_raw['lyr_id'] == layer_id, drop=True) 845 846 l = l.sortby('gps_time') 847 848 l = l.rename({'gps_time': 'slow_time'}) 849 l = l.set_coords(['slow_time']) 850 851 l['slow_time'] = pd.to_datetime(l['slow_time'].values, unit='s') 852 853 # Filter to the same time range as flight 854 l = self._trim_to_bounds(l, flight) 855 856 layers[layer_name] = l 857 858 return layers 859 860 def get_layers(self, ds : xr.Dataset, source: str = 'auto', include_geometry=True, raise_errors=True) -> dict: 861 """ 862 Fetch layer data for a given flight dataset, either from CSARP_layer files or the OPS Database API. 863 864 Parameters 865 ---------- 866 ds : xr.Dataset 867 The flight dataset containing attributes for collection and segment. 868 source : str, optional 869 The source to fetch layers from: 'auto', 'files', or 'db' (default is 'auto'). 870 include_geometry : bool, optional 871 If True, include geometry information when fetching from the API.\ 872 raise_errors : bool, optional 873 If True, raise errors when layers cannot be found. 874 875 Returns 876 ------- 877 dict 878 A dictionary mapping layer IDs to their corresponding data. 879 """ 880 881 if source == 'auto': 882 # Try to get layers from files first 883 try: 884 layers = self.get_layers_files(ds, raise_errors=True) 885 return layers 886 except: 887 # Fallback to API if no layers found in files 888 return self.get_layers_db(ds, include_geometry=include_geometry, raise_errors=raise_errors) 889 elif source == 'files': 890 return self.get_layers_files(ds, raise_errors=raise_errors) 891 elif source == 'db': 892 return self.get_layers_db(ds, include_geometry=include_geometry, raise_errors=raise_errors) 893 else: 894 raise ValueError("Invalid source specified. Must be one of: 'auto', 'files', 'db'.")
27 def __init__(self, 28 collection_url: str = "https://data.cresis.ku.edu/data/", 29 cache_dir: str = None, 30 stac_parquet_href: str = "gs://opr_stac/catalog/**/*.parquet"): 31 """ 32 Initialize the OPRConnection with a collection URL and optional cache directory. 33 34 Parameters 35 ---------- 36 collection_url : str 37 The base URL for the OPR data collection. 38 cache_dir : str, optional 39 Directory to cache downloaded data. 40 """ 41 self.collection_url = collection_url 42 self.cache_dir = cache_dir 43 self.stac_parquet_href = stac_parquet_href 44 45 self.fsspec_cache_kwargs = {} 46 self.fsspec_url_prefix = '' 47 if cache_dir: 48 self.fsspec_cache_kwargs['cache_storage'] = cache_dir 49 self.fsspec_cache_kwargs['check_files'] = True 50 self.fsspec_url_prefix = 'filecache::' 51 52 self.db_layers_metadata = None # Cache for OPS database layers metadata
Initialize the OPRConnection with a collection URL and optional cache directory.
Parameters
- collection_url (str): The base URL for the OPR data collection.
- cache_dir (str, optional): Directory to cache downloaded data.
72 def query_frames(self, collections: list[str] = None, segment_paths: list[str] = None, 73 geometry = None, date_range: tuple = None, properties: dict = {}, 74 max_items: int = None, exclude_geometry: bool = False, 75 search_kwargs: dict = {}) -> gpd.GeoDataFrame: 76 """ 77 Query for radar frames based on various search criteria. Each parameter is 78 treated as an independent criteria. If multiple parameters are passed, they are 79 combined with AND logic. 80 81 A list of values may be passed to most parameters. If so, any values in the list 82 will be treated as a match. 83 84 Parameters 85 ---------- 86 collections : list[str], optional 87 List of collection names to filter by (e.g., "2022_Antarctica_BaslerMKB"). 88 segment_paths : list[str], optional 89 List of segment paths to filter by (e.g., "20230126_01"). 90 geometry : optional 91 Geospatial geometry to filter by (e.g., a shapely geometry object). 92 date_range : tuple, optional 93 Date range to filter by (e.g., (start_date, end_date)). 94 properties : dict, optional 95 Additional properties to include in the query. 96 max_items : int, optional 97 Maximum number of items to return. 98 exclude_geometry : bool, optional 99 If True, exclude the geometry field from the response to reduce size. 100 search_kwargs : dict, optional 101 Additional keyword arguments to pass to the search method. 102 103 Returns 104 ------- 105 gpd.GeoDataFrame 106 GeoDataFrame containing the STAC frames matching the query criteria. 107 """ 108 109 search_params = {} 110 111 # Exclude geometry -- do not return the geometry field to reduce response size 112 if exclude_geometry: 113 search_params['exclude'] = ['geometry'] 114 115 # Handle collections (seasons) 116 if collections is not None: 117 search_params['collections'] = [collections] if isinstance(collections, str) else collections 118 119 # Handle geometry filtering 120 if geometry is not None: 121 if hasattr(geometry, '__geo_interface__'): 122 geometry = geometry.__geo_interface__ 123 124 # Fix geometries that cross the antimeridian 125 geometry = antimeridian.fix_geojson(geometry, reverse=True) 126 127 search_params['intersects'] = geometry 128 129 # Handle date range filtering 130 if date_range is not None: 131 search_params['datetime'] = date_range 132 133 # Handle max_items 134 if max_items is not None: 135 search_params['limit'] = max_items 136 else: 137 search_params['limit'] = 1000000 138 139 # Handle segment_paths filtering using CQL2 140 filter_conditions = [] 141 142 if segment_paths is not None: 143 segment_paths = [segment_paths] if isinstance(segment_paths, str) else segment_paths 144 145 # Create OR conditions for segment paths 146 segment_conditions = [] 147 for segment_path in segment_paths: 148 try: 149 date_str, segment_num_str = segment_path.split('_') 150 segment_num = int(segment_num_str) 151 152 # Create AND condition for this specific segment 153 segment_condition = { 154 "op": "and", 155 "args": [ 156 { 157 "op": "=", 158 "args": [{"property": "opr:date"}, date_str] 159 }, 160 { 161 "op": "=", 162 "args": [{"property": "opr:segment"}, segment_num] 163 } 164 ] 165 } 166 segment_conditions.append(segment_condition) 167 except ValueError: 168 print(f"Warning: Invalid segment_path format '{segment_path}'. Expected format: YYYYMMDD_NN") 169 continue 170 171 if segment_conditions: 172 if len(segment_conditions) == 1: 173 filter_conditions.append(segment_conditions[0]) 174 else: 175 # Multiple segments - combine with OR 176 filter_conditions.append({ 177 "op": "or", 178 "args": segment_conditions 179 }) 180 181 # Add any additional property filters 182 for key, value in properties.items(): 183 filter_conditions.append({ 184 "op": "=", 185 "args": [{"property": key}, value] 186 }) 187 188 # Combine all filter conditions with AND 189 if filter_conditions: 190 if len(filter_conditions) == 1: 191 filter_expr = filter_conditions[0] 192 else: 193 filter_expr = { 194 "op": "and", 195 "args": filter_conditions 196 } 197 198 search_params['filter'] = filter_expr 199 200 # Add any extra kwargs to search 201 search_params.update(search_kwargs) 202 203 #print(search_params) # TODO: Remove 204 205 # Perform the search 206 # from rustac import DuckdbClient 207 client = DuckdbClient() 208 items = client.search(self.stac_parquet_href, **search_params) 209 if isinstance(items, dict): 210 items = items['features'] 211 212 if not items or len(items) == 0: 213 warnings.warn("No items found matching the query criteria", UserWarning) 214 return None 215 216 # Convert to GeoDataFrame 217 items_df = gpd.GeoDataFrame(items) 218 # Set index 219 items_df = items_df.set_index(items_df['id']) 220 items_df.index.name = 'stac_item_id' 221 # Set the geometry column 222 if 'geometry' in items_df.columns and not exclude_geometry: 223 items_df = items_df.set_geometry(items_df['geometry'].apply(shapely.geometry.shape)) 224 items_df.crs = "EPSG:4326" 225 226 # Reorder the columns, leaving any extra columns at the end 227 desired_order = ['collection', 'geometry', 'properties', 'assets'] 228 items_df = items_df[[col for col in desired_order if col in items_df.columns] + list(items_df.columns.difference(desired_order))] 229 230 return items_df
Query for radar frames based on various search criteria. Each parameter is treated as an independent criteria. If multiple parameters are passed, they are combined with AND logic.
A list of values may be passed to most parameters. If so, any values in the list will be treated as a match.
Parameters
- collections (list[str], optional): List of collection names to filter by (e.g., "2022_Antarctica_BaslerMKB").
- segment_paths (list[str], optional): List of segment paths to filter by (e.g., "20230126_01").
- geometry (optional): Geospatial geometry to filter by (e.g., a shapely geometry object).
- date_range (tuple, optional): Date range to filter by (e.g., (start_date, end_date)).
- properties (dict, optional): Additional properties to include in the query.
- max_items (int, optional): Maximum number of items to return.
- exclude_geometry (bool, optional): If True, exclude the geometry field from the response to reduce size.
- search_kwargs (dict, optional): Additional keyword arguments to pass to the search method.
Returns
- gpd.GeoDataFrame: GeoDataFrame containing the STAC frames matching the query criteria.
232 def load_frames(self, stac_items: gpd.GeoDataFrame, 233 data_product: str = "CSARP_standard", 234 merge_flights: bool = False, 235 skip_errors: bool = False, 236 ) -> Union[list[xr.Dataset], xr.Dataset]: 237 """ 238 Load multiple radar frames from a list of STAC items. 239 240 Parameters 241 ---------- 242 stac_items : gpd.GeoDataFrame 243 The STAC items containing asset URLs. 244 data_product : str, optional 245 The data product to load (default is "CSARP_standard"). 246 merge_flights : bool, optional 247 Whether to merge frames from the same flight (default is False). 248 skip_errors : bool, optional 249 Whether to skip errors and continue loading other frames (default is False). 250 251 Returns 252 ------- 253 list[xr.Dataset] or xr.Dataset 254 List of loaded radar frames as xarray Datasets or a single merged Dataset if there is only one segment and merge_flights is True. 255 """ 256 frames = [] 257 258 for idx, item in stac_items.iterrows(): 259 try: 260 frame = self.load_frame(item, data_product) 261 frames.append(frame) 262 except Exception as e: 263 print(f"Error loading frame for item {item.get('id', 'unknown')}: {e}") 264 if skip_errors: 265 continue 266 else: 267 raise e 268 269 if merge_flights: 270 return opr_tools.merge_frames(frames) 271 else: 272 return frames
Load multiple radar frames from a list of STAC items.
Parameters
- stac_items (gpd.GeoDataFrame): The STAC items containing asset URLs.
- data_product (str, optional): The data product to load (default is "CSARP_standard").
- merge_flights (bool, optional): Whether to merge frames from the same flight (default is False).
- skip_errors (bool, optional): Whether to skip errors and continue loading other frames (default is False).
Returns
- list[xr.Dataset] or xr.Dataset: List of loaded radar frames as xarray Datasets or a single merged Dataset if there is only one segment and merge_flights is True.
274 def load_frame(self, stac_item, data_product: str = "CSARP_standard") -> xr.Dataset: 275 """ 276 Load a radar frame from a STAC item. 277 278 Parameters 279 ---------- 280 stac_item 281 The STAC item containing asset URLs. 282 data_product : str, optional 283 The data product to load (default is "CSARP_standard"). 284 285 Returns 286 ------- 287 xr.Dataset 288 The loaded radar frame as an xarray Dataset. 289 """ 290 291 assets = stac_item['assets'] 292 293 # Get the data asset 294 data_asset = assets.get(data_product) 295 if not data_asset: 296 available_assets = list(assets.keys()) 297 raise ValueError(f"No {data_product} asset found. Available assets: {available_assets}") 298 299 # Get the URL from the asset 300 url = data_asset.get('href') 301 if not url: 302 raise ValueError(f"No href found in {data_product} asset") 303 304 # Load the frame using the existing method 305 return self.load_frame_url(url)
Load a radar frame from a STAC item.
Parameters
- stac_item: The STAC item containing asset URLs.
- data_product (str, optional): The data product to load (default is "CSARP_standard").
Returns
- xr.Dataset: The loaded radar frame as an xarray Dataset.
307 def load_frame_url(self, url: str) -> xr.Dataset: 308 """ 309 Load a radar frame from a given URL. 310 311 Parameters 312 ---------- 313 url : str 314 The URL of the radar frame data. 315 316 Returns 317 ------- 318 xr.Dataset 319 The loaded radar frame as an xarray Dataset. 320 """ 321 322 file = self._open_file(url) 323 324 filetype = None 325 try: 326 ds = self._load_frame_hdf5(file) 327 filetype = 'hdf5' 328 except OSError: 329 ds = self._load_frame_matlab(file) 330 filetype = 'matlab' 331 332 # Add the source URL as an attribute 333 ds.attrs['source_url'] = url 334 335 # Apply CF-compliant attributes 336 ds = apply_cf_compliant_attrs(ds) 337 338 # Get the season and segment from the URL 339 match = re.search(r'(\d{4}_\w+_[A-Za-z0-9]+)\/([\w_]+)\/[\d_]+\/[\w]+(\d{8}_\d{2}_\d{3})', url) 340 if match: 341 collection, data_product, granule = match.groups() 342 date, segment_id, frame_id = granule.split('_') 343 ds.attrs['collection'] = collection 344 ds.attrs['data_product'] = data_product 345 ds.attrs['granule'] = granule 346 ds.attrs['segment_path'] = f"{date}_{segment_id}" 347 ds.attrs['date_str'] = date 348 ds.attrs['segment'] = int(segment_id) 349 ds.attrs['frame'] = int(frame_id) 350 351 # Load citation information 352 result = ops_api.get_segment_metadata(segment_name=ds.attrs['segment_path'], season_name=collection) 353 if result: 354 if isinstance(result['data'], str): 355 warnings.warn(f"Warning: Unexpected result from ops_api: {result['data']}", UserWarning) 356 else: 357 result_data = {} 358 for key, value in result['data'].items(): 359 if len(value) == 1: 360 result_data[key] = value[0] 361 elif len(value) > 1: 362 result_data[key] = set(value) 363 364 if 'dois' in result_data: 365 ds.attrs['doi'] = result_data['dois'] 366 if 'rors' in result_data: 367 ds.attrs['ror'] = result_data['rors'] 368 if 'funding_sources' in result_data: 369 ds.attrs['funder_text'] = result_data['funding_sources'] 370 371 # Add the rest of the Matlab parameters 372 if filetype == 'hdf5': 373 ds.attrs['mimetype'] = 'application/x-hdf5' 374 ds.attrs.update(decode_hdf5_matlab_variable(h5py.File(file, 'r'), 375 skip_variables=True, 376 skip_errors=True)) 377 elif filetype == 'matlab': 378 ds.attrs['mimetype'] = 'application/x-matlab-data' 379 ds.attrs.update(extract_legacy_mat_attributes(file, 380 skip_keys=ds.keys(), 381 skip_errors=True)) 382 383 return ds
Load a radar frame from a given URL.
Parameters
- url (str): The URL of the radar frame data.
Returns
- xr.Dataset: The loaded radar frame as an xarray Dataset.
477 def get_collections(self) -> list: 478 """ 479 Get list of available STAC collections. 480 481 Returns 482 ------- 483 list 484 List of collection dictionaries with metadata. 485 """ 486 487 client = DuckdbClient() 488 return client.get_collections(self.stac_parquet_href)
Get list of available STAC collections.
Returns
- list: List of collection dictionaries with metadata.
490 def get_segments(self, collection_id: str) -> list: 491 """ 492 Get list of available segments within a collection/season. 493 494 Parameters 495 ---------- 496 collection_id : str 497 The ID of the STAC collection to query. 498 499 Returns 500 ------- 501 list 502 List of flight dictionaries with flight metadata. 503 """ 504 # Query STAC API for all items in collection (exclude geometry for better performance) 505 items = self.query_frames(collections=[collection_id], exclude_geometry=True) 506 507 if items is None or len(items) == 0: 508 print(f"No items found in collection '{collection_id}'") 509 return [] 510 511 # Group items by segment (opr:date + opr:segment) 512 segments = {} 513 for idx, item in items.iterrows(): 514 properties = item['properties'] 515 date = properties['opr:date'] 516 flight_num = properties['opr:segment'] 517 518 if date and flight_num is not None: 519 segment_path = f"{date}_{flight_num:02d}" 520 521 if segment_path not in segments: 522 segments[segment_path] = { 523 'segment_path': segment_path, 524 'date': date, 525 'flight_number': flight_num, 526 'collection': collection_id, 527 'frames': [], 528 'item_count': 0 529 } 530 531 segments[segment_path]['frames'].append(properties.get('opr:frame')) 532 segments[segment_path]['item_count'] += 1 533 534 # Sort segments by date and flight number 535 segment_list = list(segments.values()) 536 segment_list.sort(key=lambda x: (x['date'], x['flight_number'])) 537 538 return segment_list
Get list of available segments within a collection/season.
Parameters
- collection_id (str): The ID of the STAC collection to query.
Returns
- list: List of flight dictionaries with flight metadata.
540 def get_layers_files(self, segment: Union[xr.Dataset, dict], raise_errors=True) -> dict: 541 """ 542 Fetch layers from the CSARP_layers files 543 544 See https://gitlab.com/openpolarradar/opr/-/wikis/Layer-File-Guide for file formats 545 546 Parameters 547 ---------- 548 segment : Union[xr.Dataset, dict] 549 The flight information, which can be an xarray Dataset or a dictionary representing the STAC item. 550 raise_errors : bool, optional 551 If True, raise errors when layers cannot be found. 552 553 Returns 554 ------- 555 dict 556 A dictionary mapping layer IDs to their corresponding data. 557 """ 558 collection, segment_path, frame = self._extract_segment_info(segment) 559 560 # If we already have a STAC item, just use it. Otherwise, query to find the matching STAC items 561 if isinstance(segment, xr.Dataset): 562 properties = {} 563 if frame: 564 properties['opr:frame'] = frame 565 566 # Query STAC collection for CSARP_layer files matching this specific segment 567 568 # Get items from this specific segment 569 stac_items = self.query_frames(collections=[collection], segment_paths=[segment_path], properties=properties) 570 571 # Filter for items that have CSARP_layer assets 572 layer_items = [] 573 for idx, item in stac_items.iterrows(): 574 if 'CSARP_layer' in item['assets']: 575 layer_items.append(item) 576 else: 577 layer_items = [segment] if 'CSARP_layer' in segment.get('assets', {}) else [] 578 579 if not layer_items: 580 if raise_errors: 581 raise ValueError(f"No CSARP_layer files found for segment path {segment_path} in collection {collection}") 582 else: 583 return {} 584 585 # Load each layer file and combine them 586 layer_frames = [] 587 for item in layer_items: 588 layer_asset = item['assets']['CSARP_layer'] 589 if layer_asset and 'href' in layer_asset: 590 url = layer_asset['href'] 591 try: 592 layer_ds = self.load_layers_file(url) 593 layer_frames.append(layer_ds) 594 except Exception as e: 595 raise e # TODO 596 print(f"Warning: Failed to load layer file {url}: {e}") 597 continue 598 599 if not layer_frames: 600 if raise_errors: 601 raise ValueError(f"No valid CSARP_layer files could be loaded for segment {segment_path} in collection {collection}") 602 else: 603 return {} 604 605 # Concatenate all layer frames along slow_time dimension 606 layers_segment = xr.concat(layer_frames, dim='slow_time', combine_attrs='drop_conflicts', data_vars='minimal') 607 layers_segment = layers_segment.sortby('slow_time') 608 609 # Trim to bounds of the original dataset 610 layers_segment = self._trim_to_bounds(layers_segment, segment) 611 612 # Find the layer organization file to map layer IDs to names 613 # Layer organization files are one per directory. For example, the layer file: 614 # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/Data_20180105_03_003.mat 615 # would have a layer organization file at: 616 # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/layer_20180105_03.mat 617 618 layer_organization_file_url = re.sub(r'Data_(\d{8}_\d{2})_\d{3}.mat', r'layer_\1.mat', url) 619 layer_organization_ds = self._load_layer_organization(self._open_file(layer_organization_file_url)) 620 621 # Split into separate layers by ID 622 layers = {} 623 624 layer_ids = np.unique(layers_segment['layer']) 625 626 for i, layer_id in enumerate(layer_ids): 627 layer_id_int = int(layer_id) 628 layer_name = str(layer_organization_ds['lyr_name'].sel(lyr_id=layer_id_int).values.item().squeeze()) 629 layer_group = str(layer_organization_ds['lyr_group_name'].sel(lyr_id=layer_id_int).values.item().squeeze()) 630 if layer_group == '[]': 631 layer_group = '' 632 layer_display_name = f"{layer_group}:{layer_name}" 633 layer_data = {} 634 635 layer_ds = layers_segment.sel(layer=layer_id) 636 layers[layer_display_name] = layer_ds 637 638 return layers
Fetch layers from the CSARP_layers files
See https://gitlab.com/openpolarradar/opr/-/wikis/Layer-File-Guide for file formats
Parameters
- segment (Union[xr.Dataset, dict]): The flight information, which can be an xarray Dataset or a dictionary representing the STAC item.
- raise_errors (bool, optional): If True, raise errors when layers cannot be found.
Returns
- dict: A dictionary mapping layer IDs to their corresponding data.
656 def load_layers_file(self, url: str) -> xr.Dataset: 657 """ 658 Load layer data from a CSARP_layer file (either HDF5 or MATLAB format). 659 660 Parameters 661 ---------- 662 url : str 663 URL or path to the layer file 664 665 Returns 666 ------- 667 xr.Dataset 668 Layer data in a standardized format with coordinates: 669 - slow_time: GPS time converted to datetime 670 And data variables: 671 - twtt: Two-way travel time for each layer 672 - quality: Quality values for each layer 673 - type: Type values for each layer 674 - lat, lon, elev: Geographic coordinates 675 - id: Layer IDs 676 """ 677 678 file = self._open_file(url) 679 680 ds = self._load_layers(file) 681 682 # Add the source URL as an attribute 683 ds.attrs['source_url'] = url 684 685 # Apply common manipulations to match the expected structure 686 # Convert GPS time to datetime coordinate 687 if 'gps_time' in ds.variables: 688 slow_time_dt = pd.to_datetime(ds['gps_time'].values, unit='s') 689 ds = ds.assign_coords(slow_time=('slow_time', slow_time_dt)) 690 691 # Set slow_time as the main coordinate and remove gps_time from data_vars 692 if 'slow_time' not in ds.dims: 693 ds = ds.swap_dims({'gps_time': 'slow_time'}) 694 695 # Remove gps_time from data_vars if it exists there to avoid conflicts 696 if ('gps_time' in ds.data_vars) or ('gps_time' in ds.coords): 697 ds = ds.drop_vars('gps_time') 698 699 # Sort by slow_time if it exists 700 if 'slow_time' in ds.coords: 701 ds = ds.sortby('slow_time') 702 703 return ds
Load layer data from a CSARP_layer file (either HDF5 or MATLAB format).
Parameters
- url (str): URL or path to the layer file
Returns
- xr.Dataset: Layer data in a standardized format with coordinates:
- slow_time: GPS time converted to datetime And data variables:
- twtt: Two-way travel time for each layer
- quality: Quality values for each layer
- type: Type values for each layer
- lat, lon, elev: Geographic coordinates
- id: Layer IDs
783 def get_layers_db(self, flight: Union[xr.Dataset, dict], include_geometry=True, raise_errors=True) -> dict: 784 """ 785 Fetch layer data from the OPS API 786 787 Parameters 788 ---------- 789 flight : Union[xr.Dataset, dict] 790 The flight data, which can be an xarray Dataset or a dictionary. 791 include_geometry : bool, optional 792 If True, include geometry information in the returned layers. 793 794 Returns 795 ------- 796 dict 797 A dictionary mapping layer IDs to their corresponding data. 798 """ 799 800 collection, segment_path, _ = self._extract_segment_info(flight) 801 802 if 'Antarctica' in collection: 803 location = 'antarctic' 804 elif 'Greenland' in collection: 805 location = 'arctic' 806 else: 807 raise ValueError("Dataset does not belong to a recognized location (Antarctica or Greenland).") 808 809 layer_points = ops_api.get_layer_points( 810 segment_name=segment_path, 811 season_name=collection, 812 location=location, 813 include_geometry=include_geometry 814 ) 815 816 if layer_points['status'] != 1: 817 if raise_errors: 818 raise ValueError(f"Failed to fetch layer points. Received response with status {layer_points['status']}.") 819 else: 820 return {} 821 822 layer_ds_raw = xr.Dataset( 823 {k: (['gps_time'], v) for k, v in layer_points['data'].items() if k != 'gps_time'}, 824 coords={'gps_time': layer_points['data']['gps_time']} 825 ) 826 # Split into a dictionary of layers based on lyr_id 827 layer_ids = set(layer_ds_raw['lyr_id'].to_numpy()) 828 layer_ids = [int(layer_id) for layer_id in layer_ids if not np.isnan(layer_id)] 829 830 # Get the mapping of layer IDs to names 831 if self.db_layers_metadata is None: 832 layer_metadata = ops_api.get_layer_metadata() 833 if layer_metadata['status'] == 1: 834 df = pd.DataFrame(layer_metadata['data']) 835 df['display_name'] = df['lyr_group_name'] + ":" + df['lyr_name'] 836 self.db_layers_metadata = df 837 else: 838 raise ValueError("Failed to fetch layer metadata from OPS API.") 839 840 layers = {} 841 for layer_id in layer_ids: 842 layer_name = self.db_layers_metadata.loc[self.db_layers_metadata['lyr_id'] == layer_id, 'display_name'].values[0] 843 844 l = layer_ds_raw.where(layer_ds_raw['lyr_id'] == layer_id, drop=True) 845 846 l = l.sortby('gps_time') 847 848 l = l.rename({'gps_time': 'slow_time'}) 849 l = l.set_coords(['slow_time']) 850 851 l['slow_time'] = pd.to_datetime(l['slow_time'].values, unit='s') 852 853 # Filter to the same time range as flight 854 l = self._trim_to_bounds(l, flight) 855 856 layers[layer_name] = l 857 858 return layers
Fetch layer data from the OPS API
Parameters
- flight (Union[xr.Dataset, dict]): The flight data, which can be an xarray Dataset or a dictionary.
- include_geometry (bool, optional): If True, include geometry information in the returned layers.
Returns
- dict: A dictionary mapping layer IDs to their corresponding data.
860 def get_layers(self, ds : xr.Dataset, source: str = 'auto', include_geometry=True, raise_errors=True) -> dict: 861 """ 862 Fetch layer data for a given flight dataset, either from CSARP_layer files or the OPS Database API. 863 864 Parameters 865 ---------- 866 ds : xr.Dataset 867 The flight dataset containing attributes for collection and segment. 868 source : str, optional 869 The source to fetch layers from: 'auto', 'files', or 'db' (default is 'auto'). 870 include_geometry : bool, optional 871 If True, include geometry information when fetching from the API.\ 872 raise_errors : bool, optional 873 If True, raise errors when layers cannot be found. 874 875 Returns 876 ------- 877 dict 878 A dictionary mapping layer IDs to their corresponding data. 879 """ 880 881 if source == 'auto': 882 # Try to get layers from files first 883 try: 884 layers = self.get_layers_files(ds, raise_errors=True) 885 return layers 886 except: 887 # Fallback to API if no layers found in files 888 return self.get_layers_db(ds, include_geometry=include_geometry, raise_errors=raise_errors) 889 elif source == 'files': 890 return self.get_layers_files(ds, raise_errors=raise_errors) 891 elif source == 'db': 892 return self.get_layers_db(ds, include_geometry=include_geometry, raise_errors=raise_errors) 893 else: 894 raise ValueError("Invalid source specified. Must be one of: 'auto', 'files', 'db'.")
Fetch layer data for a given flight dataset, either from CSARP_layer files or the OPS Database API.
Parameters
- ds (xr.Dataset): The flight dataset containing attributes for collection and segment.
- source (str, optional): The source to fetch layers from: 'auto', 'files', or 'db' (default is 'auto').
- include_geometry (bool, optional): If True, include geometry information when fetching from the API. raise_errors : bool, optional If True, raise errors when layers cannot be found.
Returns
- dict: A dictionary mapping layer IDs to their corresponding data.