xopr.opr_access

  1import re
  2import warnings
  3from typing import Union
  4
  5import antimeridian
  6import fsspec
  7import geopandas as gpd
  8import h5py
  9import hdf5storage
 10import numpy as np
 11import pandas as pd
 12import scipy.io
 13import shapely
 14import xarray as xr
 15from rustac import DuckdbClient
 16
 17from . import opr_tools, ops_api
 18from .cf_units import apply_cf_compliant_attrs
 19from .matlab_attribute_utils import (
 20    decode_hdf5_matlab_variable,
 21    extract_legacy_mat_attributes,
 22)
 23
 24
 25class OPRConnection:
 26    def __init__(self,
 27                 collection_url: str = "https://data.cresis.ku.edu/data/",
 28                 cache_dir: str = None,
 29                 stac_parquet_href: str = "gs://opr_stac/catalog/**/*.parquet"):
 30        """
 31        Initialize the OPRConnection with a collection URL and optional cache directory.
 32
 33        Parameters
 34        ----------
 35        collection_url : str
 36            The base URL for the OPR data collection.
 37        cache_dir : str, optional
 38            Directory to cache downloaded data.
 39        """
 40        self.collection_url = collection_url
 41        self.cache_dir = cache_dir
 42        self.stac_parquet_href = stac_parquet_href
 43
 44        self.fsspec_cache_kwargs = {}
 45        self.fsspec_url_prefix = ''
 46        if cache_dir:
 47            self.fsspec_cache_kwargs['cache_storage'] = cache_dir
 48            self.fsspec_cache_kwargs['check_files'] = True
 49            self.fsspec_url_prefix = 'filecache::'
 50
 51        self.db_layers_metadata = None # Cache for OPS database layers metadata
 52
 53    def _open_file(self, url):
 54        """Helper method to open files with appropriate caching."""
 55        if self.fsspec_url_prefix:
 56            return fsspec.open_local(f"{self.fsspec_url_prefix}{url}", filecache=self.fsspec_cache_kwargs)
 57        else:
 58            return fsspec.open_local(f"simplecache::{url}", simplecache=self.fsspec_cache_kwargs)
 59
 60    def _extract_segment_info(self, segment):
 61        """Extract collection, segment_path, and frame from Dataset or dict."""
 62        if isinstance(segment, xr.Dataset):
 63            return (segment.attrs.get('collection'),
 64                    segment.attrs.get('segment_path'),
 65                    segment.attrs.get('frame'))
 66        else:
 67            return (segment['collection'],
 68                    f"{segment['properties'].get('opr:date')}_{segment['properties'].get('opr:segment'):02d}",
 69                    segment['properties'].get('opr:frame'))
 70
 71    def query_frames(self, collections: list[str] = None, segment_paths: list[str] = None,
 72                     geometry = None, date_range: tuple = None, properties: dict = {},
 73                     max_items: int = None, exclude_geometry: bool = False,
 74                     search_kwargs: dict = {}) -> gpd.GeoDataFrame:
 75        """
 76        Query for radar frames based on various search criteria. Each parameter is
 77        treated as an independent criteria. If multiple parameters are passed, they are
 78        combined with AND logic.
 79
 80        A list of values may be passed to most parameters. If so, any values in the list
 81        will be treated as a match.
 82
 83        Parameters
 84        ----------
 85        collections : list[str], optional
 86            List of collection names to filter by (e.g., "2022_Antarctica_BaslerMKB").
 87        segment_paths : list[str], optional
 88            List of segment paths to filter by (e.g., "20230126_01").
 89        geometry : optional
 90            Geospatial geometry to filter by (e.g., a shapely geometry object).
 91        date_range : tuple, optional
 92            Date range to filter by (e.g., (start_date, end_date)).
 93        properties : dict, optional
 94            Additional properties to include in the query.
 95        max_items : int, optional
 96            Maximum number of items to return.
 97        exclude_geometry : bool, optional
 98            If True, exclude the geometry field from the response to reduce size.
 99        search_kwargs : dict, optional
100            Additional keyword arguments to pass to the search method.
101
102        Returns
103        -------
104        gpd.GeoDataFrame
105            GeoDataFrame containing the STAC frames matching the query criteria.
106        """
107
108        search_params = {}
109
110        # Exclude geometry -- do not return the geometry field to reduce response size
111        if exclude_geometry:
112            search_params['exclude'] = ['geometry']
113
114        # Handle collections (seasons)
115        if collections is not None:
116            search_params['collections'] = [collections] if isinstance(collections, str) else collections
117
118        # Handle geometry filtering
119        if geometry is not None:
120            if hasattr(geometry, '__geo_interface__'):
121                geometry = geometry.__geo_interface__
122
123            # Fix geometries that cross the antimeridian
124            geometry = antimeridian.fix_geojson(geometry, reverse=True)
125
126            search_params['intersects'] = geometry
127
128        # Handle date range filtering
129        if date_range is not None:
130            search_params['datetime'] = date_range
131
132        # Handle max_items
133        if max_items is not None:
134            search_params['limit'] = max_items
135        else:
136            search_params['limit'] = 1000000
137
138        # Handle segment_paths filtering using CQL2
139        filter_conditions = []
140
141        if segment_paths is not None:
142            segment_paths = [segment_paths] if isinstance(segment_paths, str) else segment_paths
143
144            # Create OR conditions for segment paths
145            segment_conditions = []
146            for segment_path in segment_paths:
147                try:
148                    date_str, segment_num_str = segment_path.split('_')
149                    segment_num = int(segment_num_str)
150
151                    # Create AND condition for this specific segment
152                    segment_condition = {
153                        "op": "and",
154                        "args": [
155                            {
156                                "op": "=",
157                                "args": [{"property": "opr:date"}, date_str]
158                            },
159                            {
160                                "op": "=",
161                                "args": [{"property": "opr:segment"}, segment_num]
162                            }
163                        ]
164                    }
165                    segment_conditions.append(segment_condition)
166                except ValueError:
167                    print(f"Warning: Invalid segment_path format '{segment_path}'. Expected format: YYYYMMDD_NN")
168                    continue
169
170            if segment_conditions:
171                if len(segment_conditions) == 1:
172                    filter_conditions.append(segment_conditions[0])
173                else:
174                    # Multiple segments - combine with OR
175                    filter_conditions.append({
176                        "op": "or",
177                        "args": segment_conditions
178                    })
179
180        # Add any additional property filters
181        for key, value in properties.items():
182            filter_conditions.append({
183                "op": "=",
184                "args": [{"property": key}, value]
185            })
186
187        # Combine all filter conditions with AND
188        if filter_conditions:
189            if len(filter_conditions) == 1:
190                filter_expr = filter_conditions[0]
191            else:
192                filter_expr = {
193                    "op": "and",
194                    "args": filter_conditions
195                }
196
197            search_params['filter'] = filter_expr
198
199        # Add any extra kwargs to search
200        search_params.update(search_kwargs)
201
202        #print(search_params) # TODO: Remove
203
204        # Perform the search
205        # from rustac import DuckdbClient
206        client = DuckdbClient()
207        items = client.search(self.stac_parquet_href, **search_params)
208        if isinstance(items, dict):
209            items = items['features']
210
211        if not items or len(items) == 0:
212            warnings.warn("No items found matching the query criteria", UserWarning)
213            return None
214
215        # Convert to GeoDataFrame
216        items_df = gpd.GeoDataFrame(items)
217        # Set index
218        items_df = items_df.set_index(items_df['id'])
219        items_df.index.name = 'stac_item_id'
220        # Set the geometry column
221        if 'geometry' in items_df.columns and not exclude_geometry:
222            items_df = items_df.set_geometry(items_df['geometry'].apply(shapely.geometry.shape))
223            items_df.crs = "EPSG:4326"
224
225        # Reorder the columns, leaving any extra columns at the end
226        desired_order = ['collection', 'geometry', 'properties', 'assets']
227        items_df = items_df[[col for col in desired_order if col in items_df.columns] + list(items_df.columns.difference(desired_order))]
228
229        return items_df
230
231    def load_frames(self, stac_items: gpd.GeoDataFrame,
232                    data_product: str = "CSARP_standard",
233                    merge_flights: bool = False,
234                    skip_errors: bool = False,
235                    ) -> Union[list[xr.Dataset], xr.Dataset]:
236        """
237        Load multiple radar frames from a list of STAC items.
238
239        Parameters
240        ----------
241        stac_items : gpd.GeoDataFrame
242            The STAC items containing asset URLs.
243        data_product : str, optional
244            The data product to load (default is "CSARP_standard").
245        merge_flights : bool, optional
246            Whether to merge frames from the same flight (default is False).
247        skip_errors : bool, optional
248            Whether to skip errors and continue loading other frames (default is False).
249
250        Returns
251        -------
252        list[xr.Dataset] or xr.Dataset
253            List of loaded radar frames as xarray Datasets or a single merged Dataset if there is only one segment and merge_flights is True.
254        """
255        frames = []
256
257        for idx, item in stac_items.iterrows():
258            try:
259                frame = self.load_frame(item, data_product)
260                frames.append(frame)
261            except Exception as e:
262                print(f"Error loading frame for item {item.get('id', 'unknown')}: {e}")
263                if skip_errors:
264                    continue
265                else:
266                    raise e
267
268        if merge_flights:
269            return opr_tools.merge_frames(frames)
270        else:
271            return frames
272
273    def load_frame(self, stac_item, data_product: str = "CSARP_standard") -> xr.Dataset:
274        """
275        Load a radar frame from a STAC item.
276
277        Parameters
278        ----------
279        stac_item
280            The STAC item containing asset URLs.
281        data_product : str, optional
282            The data product to load (default is "CSARP_standard").
283
284        Returns
285        -------
286        xr.Dataset
287            The loaded radar frame as an xarray Dataset.
288        """
289
290        assets = stac_item['assets']
291
292        # Get the data asset
293        data_asset = assets.get(data_product)
294        if not data_asset:
295            available_assets = list(assets.keys())
296            raise ValueError(f"No {data_product} asset found. Available assets: {available_assets}")
297
298        # Get the URL from the asset
299        url = data_asset.get('href')
300        if not url:
301            raise ValueError(f"No href found in {data_product} asset")
302
303        # Load the frame using the existing method
304        return self.load_frame_url(url)
305
306    def load_frame_url(self, url: str) -> xr.Dataset:
307        """
308        Load a radar frame from a given URL.
309
310        Parameters
311        ----------
312        url : str
313            The URL of the radar frame data.
314
315        Returns
316        -------
317        xr.Dataset
318            The loaded radar frame as an xarray Dataset.
319        """
320
321        file = self._open_file(url)
322
323        filetype = None
324        try:
325            ds = self._load_frame_hdf5(file)
326            filetype = 'hdf5'
327        except OSError:
328            ds = self._load_frame_matlab(file)
329            filetype = 'matlab'
330
331        # Add the source URL as an attribute
332        ds.attrs['source_url'] = url
333
334        # Apply CF-compliant attributes
335        ds = apply_cf_compliant_attrs(ds)
336
337        # Get the season and segment from the URL
338        match = re.search(r'(\d{4}_\w+_[A-Za-z0-9]+)\/([\w_]+)\/[\d_]+\/[\w]+(\d{8}_\d{2}_\d{3})', url)
339        if match:
340            collection, data_product, granule = match.groups()
341            date, segment_id, frame_id = granule.split('_')
342            ds.attrs['collection'] = collection
343            ds.attrs['data_product'] = data_product
344            ds.attrs['granule'] = granule
345            ds.attrs['segment_path'] = f"{date}_{segment_id}"
346            ds.attrs['date_str'] = date
347            ds.attrs['segment'] = int(segment_id)
348            ds.attrs['frame'] = int(frame_id)
349
350            # Load citation information
351            result = ops_api.get_segment_metadata(segment_name=ds.attrs['segment_path'], season_name=collection)
352            if result:
353                if isinstance(result['data'], str):
354                    warnings.warn(f"Warning: Unexpected result from ops_api: {result['data']}", UserWarning)
355                else:
356                    result_data = {}
357                    for key, value in result['data'].items():
358                        if len(value) == 1:
359                            result_data[key] = value[0]
360                        elif len(value) > 1:
361                            result_data[key] = set(value)
362
363                    if 'dois' in result_data:
364                        ds.attrs['doi'] = result_data['dois']
365                    if 'rors' in result_data:
366                        ds.attrs['ror'] = result_data['rors']
367                    if 'funding_sources' in result_data:
368                        ds.attrs['funder_text'] = result_data['funding_sources']
369
370        # Add the rest of the Matlab parameters
371        if filetype == 'hdf5':
372            ds.attrs['mimetype'] = 'application/x-hdf5'
373            ds.attrs.update(decode_hdf5_matlab_variable(h5py.File(file, 'r'),
374                                                        skip_variables=True,
375                                                        skip_errors=True))
376        elif filetype == 'matlab':
377            ds.attrs['mimetype'] = 'application/x-matlab-data'
378            ds.attrs.update(extract_legacy_mat_attributes(file,
379                                                          skip_keys=ds.keys(),
380                                                          skip_errors=True))
381
382        return ds
383
384    def _load_frame_hdf5(self, file) -> xr.Dataset:
385        """
386        Load a radar frame from an HDF5 file.
387
388        Parameters
389        ----------
390        file :
391            The path to the HDF5 file containing radar frame data.
392
393        Returns
394        -------
395        xr.Dataset
396            The loaded radar frame as an xarray Dataset.
397        """
398
399        ds = xr.open_dataset(file, engine='h5netcdf', phony_dims='sort')
400
401        # Re-arrange variables to provide useful dimensions and coordinates
402
403        ds = ds.squeeze() # Drop the singleton dimensions matlab adds
404
405        ds = ds.rename({ # Label the dimensions with more useful names
406            ds.Data.dims[0]: 'slow_time_idx',
407            ds.Data.dims[1]: 'twtt_idx',
408        })
409
410        # Make variables with no dimensions into scalar attributes
411        for var in ds.data_vars:
412            if ds[var].ndim == 0:
413                ds.attrs[var] = ds[var].item()
414                ds = ds.drop_vars(var)
415
416        # Make the file_type an attribute
417        if 'file_type' in ds.data_vars:
418            ds.attrs['file_type'] = ds['file_type'].to_numpy()
419            ds = ds.drop_vars('file_type')
420
421        # Name the two time coordinates
422        ds = ds.rename({'Time': 'twtt', 'GPS_time': 'slow_time'})
423        ds = ds.set_coords(['slow_time', 'twtt'])
424
425        slow_time_1d = pd.to_datetime(ds['slow_time'].values, unit='s')
426        ds = ds.assign_coords(slow_time=('slow_time_idx', slow_time_1d))
427
428        # Make twtt and slow_time the indexing coordinates
429        ds = ds.swap_dims({'twtt_idx': 'twtt'})
430        ds = ds.swap_dims({'slow_time_idx': 'slow_time'})
431
432        return ds
433
434    def _load_frame_matlab(self, file) -> xr.Dataset:
435        """
436        Load a radar frame from a MATLAB file.
437
438        Parameters
439        ----------
440        file :
441            The path to the MATLAB file containing radar frame data.
442
443        Returns
444        -------
445        xr.Dataset
446            The loaded radar frame as an xarray Dataset.
447        """
448
449        m = scipy.io.loadmat(file, mat_dtype=False)
450
451        key_dims = {
452            'Time': ('twtt',),
453            'GPS_time': ('slow_time',),
454            'Latitude': ('slow_time',),
455            'Longitude': ('slow_time',),
456            'Elevation': ('slow_time',),
457            'Roll': ('slow_time',),
458            'Pitch': ('slow_time',),
459            'Heading': ('slow_time',),
460            'Surface': ('slow_time',),
461            'Data': ('twtt', 'slow_time')
462        }
463
464        ds = xr.Dataset(
465            {
466                key: (dims, np.squeeze(m[key])) for key, dims in key_dims.items() if key in m
467            },
468            coords={
469                'twtt': ('twtt', np.squeeze(m['Time'])),
470                'slow_time': ('slow_time', pd.to_datetime(np.squeeze(m['GPS_time']), unit='s')),
471            }
472        )
473
474        return ds
475
476    def get_collections(self) -> list:
477        """
478        Get list of available STAC collections.
479
480        Returns
481        -------
482        list
483            List of collection dictionaries with metadata.
484        """
485
486        client = DuckdbClient()
487        return client.get_collections(self.stac_parquet_href)
488
489    def get_segments(self, collection_id: str) -> list:
490        """
491        Get list of available segments within a collection/season.
492
493        Parameters
494        ----------
495        collection_id : str
496            The ID of the STAC collection to query.
497
498        Returns
499        -------
500        list
501            List of flight dictionaries with flight metadata.
502        """
503        # Query STAC API for all items in collection (exclude geometry for better performance)
504        items = self.query_frames(collections=[collection_id], exclude_geometry=True)
505
506        if items is None or len(items) == 0:
507            print(f"No items found in collection '{collection_id}'")
508            return []
509
510        # Group items by segment (opr:date + opr:segment)
511        segments = {}
512        for idx, item in items.iterrows():
513            properties = item['properties']
514            date = properties['opr:date']
515            flight_num = properties['opr:segment']
516
517            if date and flight_num is not None:
518                segment_path = f"{date}_{flight_num:02d}"
519
520                if segment_path not in segments:
521                    segments[segment_path] = {
522                        'segment_path': segment_path,
523                        'date': date,
524                        'flight_number': flight_num,
525                        'collection': collection_id,
526                        'frames': [],
527                        'item_count': 0
528                    }
529
530                segments[segment_path]['frames'].append(properties.get('opr:frame'))
531                segments[segment_path]['item_count'] += 1
532
533        # Sort segments by date and flight number
534        segment_list = list(segments.values())
535        segment_list.sort(key=lambda x: (x['date'], x['flight_number']))
536
537        return segment_list
538
539    def get_layers_files(self, segment: Union[xr.Dataset, dict], raise_errors=True) -> dict:
540        """
541        Fetch layers from the CSARP_layers files
542
543        See https://gitlab.com/openpolarradar/opr/-/wikis/Layer-File-Guide for file formats
544
545        Parameters
546        ----------
547        segment : Union[xr.Dataset, dict]
548            The flight information, which can be an xarray Dataset or a dictionary representing the STAC item.
549        raise_errors : bool, optional
550            If True, raise errors when layers cannot be found.
551
552        Returns
553        -------
554        dict
555            A dictionary mapping layer IDs to their corresponding data.
556        """
557        collection, segment_path, frame = self._extract_segment_info(segment)
558
559        # If we already have a STAC item, just use it. Otherwise, query to find the matching STAC items
560        if isinstance(segment, xr.Dataset):
561            properties = {}
562            if frame:
563                properties['opr:frame'] = frame
564
565            # Query STAC collection for CSARP_layer files matching this specific segment
566
567            # Get items from this specific segment
568            stac_items = self.query_frames(collections=[collection], segment_paths=[segment_path], properties=properties)
569
570            # Filter for items that have CSARP_layer assets
571            layer_items = []
572            for idx, item in stac_items.iterrows():
573                if 'CSARP_layer' in item['assets']:
574                    layer_items.append(item)
575        else:
576            layer_items = [segment] if 'CSARP_layer' in segment.get('assets', {}) else []
577
578        if not layer_items:
579            if raise_errors:
580                raise ValueError(f"No CSARP_layer files found for segment path {segment_path} in collection {collection}")
581            else:
582                return {}
583
584        # Load each layer file and combine them
585        layer_frames = []
586        for item in layer_items:
587            layer_asset = item['assets']['CSARP_layer']
588            if layer_asset and 'href' in layer_asset:
589                url = layer_asset['href']
590                try:
591                    layer_ds = self.load_layers_file(url)
592                    layer_frames.append(layer_ds)
593                except Exception as e:
594                    raise e # TODO
595                    print(f"Warning: Failed to load layer file {url}: {e}")
596                    continue
597
598        if not layer_frames:
599            if raise_errors:
600                raise ValueError(f"No valid CSARP_layer files could be loaded for segment {segment_path} in collection {collection}")
601            else:
602                return {}
603
604        # Concatenate all layer frames along slow_time dimension
605        layers_segment = xr.concat(layer_frames, dim='slow_time', combine_attrs='drop_conflicts', data_vars='minimal')
606        layers_segment = layers_segment.sortby('slow_time')
607
608        # Trim to bounds of the original dataset
609        layers_segment = self._trim_to_bounds(layers_segment, segment)
610
611        # Find the layer organization file to map layer IDs to names
612        # Layer organization files are one per directory. For example, the layer file:
613        # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/Data_20180105_03_003.mat
614        # would have a layer organization file at:
615        # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/layer_20180105_03.mat
616
617        layer_organization_file_url = re.sub(r'Data_(\d{8}_\d{2})_\d{3}.mat', r'layer_\1.mat', url)
618        layer_organization_ds = self._load_layer_organization(self._open_file(layer_organization_file_url))
619
620        # Split into separate layers by ID
621        layers = {}
622
623        layer_ids = np.unique(layers_segment['layer'])
624
625        for i, layer_id in enumerate(layer_ids):
626            layer_id_int = int(layer_id)
627            layer_name = str(layer_organization_ds['lyr_name'].sel(lyr_id=layer_id_int).values.item().squeeze())
628            layer_group = str(layer_organization_ds['lyr_group_name'].sel(lyr_id=layer_id_int).values.item().squeeze())
629            if layer_group == '[]':
630                layer_group = ''
631            layer_display_name = f"{layer_group}:{layer_name}"
632            layer_data = {}
633
634            layer_ds = layers_segment.sel(layer=layer_id)
635            layers[layer_display_name] = layer_ds
636
637        return layers
638
639    def _trim_to_bounds(self, ds: xr.Dataset, ref: Union[xr.Dataset, dict]) -> xr.Dataset:
640        start_time, end_time = None, None
641        if isinstance(ref, xr.Dataset) and 'slow_time' in ref.coords:
642            start_time = ref['slow_time'].min()
643            end_time = ref['slow_time'].max()
644        else:
645            properties = ref.get('properties', {})
646            if 'start_datetime' in properties and 'end_datetime' in properties:
647                start_time = pd.to_datetime(properties['start_datetime'])
648                end_time = pd.to_datetime(properties['end_datetime'])
649
650        if start_time:
651            return ds.sel(slow_time=slice(start_time, end_time))
652        else:
653            return ds
654
655    def load_layers_file(self, url: str) -> xr.Dataset:
656        """
657        Load layer data from a CSARP_layer file (either HDF5 or MATLAB format).
658
659        Parameters
660        ----------
661        url : str
662            URL or path to the layer file
663
664        Returns
665        -------
666        xr.Dataset
667            Layer data in a standardized format with coordinates:
668            - slow_time: GPS time converted to datetime
669            And data variables:
670            - twtt: Two-way travel time for each layer
671            - quality: Quality values for each layer
672            - type: Type values for each layer
673            - lat, lon, elev: Geographic coordinates
674            - id: Layer IDs
675        """
676
677        file = self._open_file(url)
678
679        ds = self._load_layers(file)
680
681        # Add the source URL as an attribute
682        ds.attrs['source_url'] = url
683
684        # Apply common manipulations to match the expected structure
685        # Convert GPS time to datetime coordinate
686        if 'gps_time' in ds.variables:
687            slow_time_dt = pd.to_datetime(ds['gps_time'].values, unit='s')
688            ds = ds.assign_coords(slow_time=('slow_time', slow_time_dt))
689
690            # Set slow_time as the main coordinate and remove gps_time from data_vars
691            if 'slow_time' not in ds.dims:
692                ds = ds.swap_dims({'gps_time': 'slow_time'})
693
694            # Remove gps_time from data_vars if it exists there to avoid conflicts
695            if ('gps_time' in ds.data_vars) or ('gps_time' in ds.coords):
696                ds = ds.drop_vars('gps_time')
697
698        # Sort by slow_time if it exists
699        if 'slow_time' in ds.coords:
700            ds = ds.sortby('slow_time')
701
702        return ds
703
704    def _load_layers(self, file) -> xr.Dataset:
705        """
706        Load layer data file using hdf5storage
707
708        Parameters:
709        file : str
710            Path to the layer file
711        Returns: xr.Dataset
712        """
713
714        d = hdf5storage.loadmat(file, appendmat=False)
715
716        # Ensure 'id' is at least 1D for the layer dimension
717        id_data = np.atleast_1d(d['id'].squeeze())
718
719        # For 2D arrays with (layer, slow_time) dimensions, ensure they remain 2D
720        # even when there's only one layer
721        quality_data = np.atleast_2d(d['quality'].squeeze())
722        if quality_data.shape[0] == 1 or quality_data.ndim == 1:
723            quality_data = quality_data.reshape(len(id_data), -1)
724
725        twtt_data = np.atleast_2d(d['twtt'].squeeze())
726        if twtt_data.shape[0] == 1 or twtt_data.ndim == 1:
727            twtt_data = twtt_data.reshape(len(id_data), -1)
728
729        type_data = np.atleast_2d(d['type'].squeeze())
730        if type_data.shape[0] == 1 or type_data.ndim == 1:
731            type_data = type_data.reshape(len(id_data), -1)
732
733        ds = xr.Dataset({
734            'file_type': ((), d['file_type'].squeeze()),
735            'file_version': ((), d['file_version'].squeeze()),
736            'elev': (('slow_time',), d['elev'].squeeze()),
737            #'gps_source': ((), d['gps_source'].squeeze()),
738            'gps_time': (('slow_time',), d['gps_time'].squeeze()),
739            'id': (('layer',), id_data),
740            'lat': (('slow_time',), d['lat'].squeeze()),
741            'lon': (('slow_time',), d['lon'].squeeze()),
742            'quality': (('layer', 'slow_time'), quality_data),
743            'twtt': (('layer', 'slow_time'), twtt_data),
744            'type': (('layer', 'slow_time'), type_data),
745        })
746
747        ds = ds.assign_coords({'layer': ds['id'], 'slow_time': ds['gps_time']})
748
749        return ds
750
751    def _load_layer_organization(self, file) -> xr.Dataset:
752        """
753        Load a layer organization file
754
755        Parameters
756        ----------
757        file : str
758            Path to the HDF5 layer organization file
759
760        Returns
761        -------
762        xr.Dataset
763            Raw layer data from HDF5 file
764        """
765        d = hdf5storage.loadmat(file, appendmat=False)
766        ds = xr.Dataset({
767            'file_type': ((), d['file_type'].squeeze()),
768            'file_version': ((), d['file_version'].squeeze()),
769            'lyr_age': (('lyr_id',), np.atleast_1d(d['lyr_age'].squeeze())),
770            'lyr_age_source': (('lyr_id',), np.atleast_1d(d['lyr_age_source'].squeeze())),
771            'lyr_desc': (('lyr_id',), np.atleast_1d(d['lyr_desc'].squeeze())),
772            'lyr_group_name': (('lyr_id',), np.atleast_1d(d['lyr_group_name'].squeeze())),
773            'lyr_id': (('lyr_id',), np.atleast_1d(d['lyr_id'].squeeze())),
774            'lyr_name': (('lyr_id',), np.atleast_1d(d['lyr_name'].squeeze())),
775            'lyr_order': (('lyr_id',), np.atleast_1d(d['lyr_order'].squeeze())),
776            }, attrs={'param': d['param']})
777
778        ds = ds.set_index(lyr_id='lyr_id')
779
780        return ds
781
782    def get_layers_db(self, flight: Union[xr.Dataset, dict], include_geometry=True, raise_errors=True) -> dict:
783        """
784        Fetch layer data from the OPS API
785
786        Parameters
787        ----------
788        flight : Union[xr.Dataset, dict]
789            The flight data, which can be an xarray Dataset or a dictionary.
790        include_geometry : bool, optional
791            If True, include geometry information in the returned layers.
792
793        Returns
794        -------
795        dict
796            A dictionary mapping layer IDs to their corresponding data.
797        """
798
799        collection, segment_path, _ = self._extract_segment_info(flight)
800
801        if 'Antarctica' in collection:
802            location = 'antarctic'
803        elif 'Greenland' in collection:
804            location = 'arctic'
805        else:
806            raise ValueError("Dataset does not belong to a recognized location (Antarctica or Greenland).")
807
808        layer_points = ops_api.get_layer_points(
809            segment_name=segment_path,
810            season_name=collection,
811            location=location,
812            include_geometry=include_geometry
813        )
814
815        if layer_points['status'] != 1:
816            if raise_errors:
817                raise ValueError(f"Failed to fetch layer points. Received response with status {layer_points['status']}.")
818            else:
819                return {}
820
821        layer_ds_raw = xr.Dataset(
822            {k: (['gps_time'], v) for k, v in layer_points['data'].items() if k != 'gps_time'},
823            coords={'gps_time': layer_points['data']['gps_time']}
824        )
825        # Split into a dictionary of layers based on lyr_id
826        layer_ids = set(layer_ds_raw['lyr_id'].to_numpy())
827        layer_ids = [int(layer_id) for layer_id in layer_ids if not np.isnan(layer_id)]
828
829        # Get the mapping of layer IDs to names
830        if self.db_layers_metadata is None:
831            layer_metadata = ops_api.get_layer_metadata()
832            if layer_metadata['status'] == 1:
833                df = pd.DataFrame(layer_metadata['data'])
834                df['display_name'] = df['lyr_group_name'] + ":" + df['lyr_name']
835                self.db_layers_metadata = df
836            else:
837                raise ValueError("Failed to fetch layer metadata from OPS API.")
838
839        layers = {}
840        for layer_id in layer_ids:
841            layer_name = self.db_layers_metadata.loc[self.db_layers_metadata['lyr_id'] == layer_id, 'display_name'].values[0]
842
843            l = layer_ds_raw.where(layer_ds_raw['lyr_id'] == layer_id, drop=True)
844
845            l = l.sortby('gps_time')
846
847            l = l.rename({'gps_time': 'slow_time'})
848            l = l.set_coords(['slow_time'])
849
850            l['slow_time'] = pd.to_datetime(l['slow_time'].values, unit='s')
851
852            # Filter to the same time range as flight
853            l = self._trim_to_bounds(l, flight)
854
855            layers[layer_name] = l
856
857        return layers
858
859    def get_layers(self, ds : xr.Dataset, source: str = 'auto', include_geometry=True, raise_errors=True) -> dict:
860        """
861        Fetch layer data for a given flight dataset, either from CSARP_layer files or the OPS Database API.
862
863        Parameters
864        ----------
865        ds : xr.Dataset
866            The flight dataset containing attributes for collection and segment.
867        source : str, optional
868            The source to fetch layers from: 'auto', 'files', or 'db' (default is 'auto').
869        include_geometry : bool, optional
870            If True, include geometry information when fetching from the API.\
871        raise_errors : bool, optional
872            If True, raise errors when layers cannot be found.
873
874        Returns
875        -------
876        dict
877            A dictionary mapping layer IDs to their corresponding data.
878        """
879
880        if source == 'auto':
881            # Try to get layers from files first
882            try:
883                layers = self.get_layers_files(ds, raise_errors=True)
884                return layers
885            except:
886                # Fallback to API if no layers found in files
887                return self.get_layers_db(ds, include_geometry=include_geometry, raise_errors=raise_errors)
888        elif source == 'files':
889            return self.get_layers_files(ds, raise_errors=raise_errors)
890        elif source == 'db':
891            return self.get_layers_db(ds, include_geometry=include_geometry, raise_errors=raise_errors)
892        else:
893            raise ValueError("Invalid source specified. Must be one of: 'auto', 'files', 'db'.")
class OPRConnection:
 26class OPRConnection:
 27    def __init__(self,
 28                 collection_url: str = "https://data.cresis.ku.edu/data/",
 29                 cache_dir: str = None,
 30                 stac_parquet_href: str = "gs://opr_stac/catalog/**/*.parquet"):
 31        """
 32        Initialize the OPRConnection with a collection URL and optional cache directory.
 33
 34        Parameters
 35        ----------
 36        collection_url : str
 37            The base URL for the OPR data collection.
 38        cache_dir : str, optional
 39            Directory to cache downloaded data.
 40        """
 41        self.collection_url = collection_url
 42        self.cache_dir = cache_dir
 43        self.stac_parquet_href = stac_parquet_href
 44
 45        self.fsspec_cache_kwargs = {}
 46        self.fsspec_url_prefix = ''
 47        if cache_dir:
 48            self.fsspec_cache_kwargs['cache_storage'] = cache_dir
 49            self.fsspec_cache_kwargs['check_files'] = True
 50            self.fsspec_url_prefix = 'filecache::'
 51
 52        self.db_layers_metadata = None # Cache for OPS database layers metadata
 53
 54    def _open_file(self, url):
 55        """Helper method to open files with appropriate caching."""
 56        if self.fsspec_url_prefix:
 57            return fsspec.open_local(f"{self.fsspec_url_prefix}{url}", filecache=self.fsspec_cache_kwargs)
 58        else:
 59            return fsspec.open_local(f"simplecache::{url}", simplecache=self.fsspec_cache_kwargs)
 60
 61    def _extract_segment_info(self, segment):
 62        """Extract collection, segment_path, and frame from Dataset or dict."""
 63        if isinstance(segment, xr.Dataset):
 64            return (segment.attrs.get('collection'),
 65                    segment.attrs.get('segment_path'),
 66                    segment.attrs.get('frame'))
 67        else:
 68            return (segment['collection'],
 69                    f"{segment['properties'].get('opr:date')}_{segment['properties'].get('opr:segment'):02d}",
 70                    segment['properties'].get('opr:frame'))
 71
 72    def query_frames(self, collections: list[str] = None, segment_paths: list[str] = None,
 73                     geometry = None, date_range: tuple = None, properties: dict = {},
 74                     max_items: int = None, exclude_geometry: bool = False,
 75                     search_kwargs: dict = {}) -> gpd.GeoDataFrame:
 76        """
 77        Query for radar frames based on various search criteria. Each parameter is
 78        treated as an independent criteria. If multiple parameters are passed, they are
 79        combined with AND logic.
 80
 81        A list of values may be passed to most parameters. If so, any values in the list
 82        will be treated as a match.
 83
 84        Parameters
 85        ----------
 86        collections : list[str], optional
 87            List of collection names to filter by (e.g., "2022_Antarctica_BaslerMKB").
 88        segment_paths : list[str], optional
 89            List of segment paths to filter by (e.g., "20230126_01").
 90        geometry : optional
 91            Geospatial geometry to filter by (e.g., a shapely geometry object).
 92        date_range : tuple, optional
 93            Date range to filter by (e.g., (start_date, end_date)).
 94        properties : dict, optional
 95            Additional properties to include in the query.
 96        max_items : int, optional
 97            Maximum number of items to return.
 98        exclude_geometry : bool, optional
 99            If True, exclude the geometry field from the response to reduce size.
100        search_kwargs : dict, optional
101            Additional keyword arguments to pass to the search method.
102
103        Returns
104        -------
105        gpd.GeoDataFrame
106            GeoDataFrame containing the STAC frames matching the query criteria.
107        """
108
109        search_params = {}
110
111        # Exclude geometry -- do not return the geometry field to reduce response size
112        if exclude_geometry:
113            search_params['exclude'] = ['geometry']
114
115        # Handle collections (seasons)
116        if collections is not None:
117            search_params['collections'] = [collections] if isinstance(collections, str) else collections
118
119        # Handle geometry filtering
120        if geometry is not None:
121            if hasattr(geometry, '__geo_interface__'):
122                geometry = geometry.__geo_interface__
123
124            # Fix geometries that cross the antimeridian
125            geometry = antimeridian.fix_geojson(geometry, reverse=True)
126
127            search_params['intersects'] = geometry
128
129        # Handle date range filtering
130        if date_range is not None:
131            search_params['datetime'] = date_range
132
133        # Handle max_items
134        if max_items is not None:
135            search_params['limit'] = max_items
136        else:
137            search_params['limit'] = 1000000
138
139        # Handle segment_paths filtering using CQL2
140        filter_conditions = []
141
142        if segment_paths is not None:
143            segment_paths = [segment_paths] if isinstance(segment_paths, str) else segment_paths
144
145            # Create OR conditions for segment paths
146            segment_conditions = []
147            for segment_path in segment_paths:
148                try:
149                    date_str, segment_num_str = segment_path.split('_')
150                    segment_num = int(segment_num_str)
151
152                    # Create AND condition for this specific segment
153                    segment_condition = {
154                        "op": "and",
155                        "args": [
156                            {
157                                "op": "=",
158                                "args": [{"property": "opr:date"}, date_str]
159                            },
160                            {
161                                "op": "=",
162                                "args": [{"property": "opr:segment"}, segment_num]
163                            }
164                        ]
165                    }
166                    segment_conditions.append(segment_condition)
167                except ValueError:
168                    print(f"Warning: Invalid segment_path format '{segment_path}'. Expected format: YYYYMMDD_NN")
169                    continue
170
171            if segment_conditions:
172                if len(segment_conditions) == 1:
173                    filter_conditions.append(segment_conditions[0])
174                else:
175                    # Multiple segments - combine with OR
176                    filter_conditions.append({
177                        "op": "or",
178                        "args": segment_conditions
179                    })
180
181        # Add any additional property filters
182        for key, value in properties.items():
183            filter_conditions.append({
184                "op": "=",
185                "args": [{"property": key}, value]
186            })
187
188        # Combine all filter conditions with AND
189        if filter_conditions:
190            if len(filter_conditions) == 1:
191                filter_expr = filter_conditions[0]
192            else:
193                filter_expr = {
194                    "op": "and",
195                    "args": filter_conditions
196                }
197
198            search_params['filter'] = filter_expr
199
200        # Add any extra kwargs to search
201        search_params.update(search_kwargs)
202
203        #print(search_params) # TODO: Remove
204
205        # Perform the search
206        # from rustac import DuckdbClient
207        client = DuckdbClient()
208        items = client.search(self.stac_parquet_href, **search_params)
209        if isinstance(items, dict):
210            items = items['features']
211
212        if not items or len(items) == 0:
213            warnings.warn("No items found matching the query criteria", UserWarning)
214            return None
215
216        # Convert to GeoDataFrame
217        items_df = gpd.GeoDataFrame(items)
218        # Set index
219        items_df = items_df.set_index(items_df['id'])
220        items_df.index.name = 'stac_item_id'
221        # Set the geometry column
222        if 'geometry' in items_df.columns and not exclude_geometry:
223            items_df = items_df.set_geometry(items_df['geometry'].apply(shapely.geometry.shape))
224            items_df.crs = "EPSG:4326"
225
226        # Reorder the columns, leaving any extra columns at the end
227        desired_order = ['collection', 'geometry', 'properties', 'assets']
228        items_df = items_df[[col for col in desired_order if col in items_df.columns] + list(items_df.columns.difference(desired_order))]
229
230        return items_df
231
232    def load_frames(self, stac_items: gpd.GeoDataFrame,
233                    data_product: str = "CSARP_standard",
234                    merge_flights: bool = False,
235                    skip_errors: bool = False,
236                    ) -> Union[list[xr.Dataset], xr.Dataset]:
237        """
238        Load multiple radar frames from a list of STAC items.
239
240        Parameters
241        ----------
242        stac_items : gpd.GeoDataFrame
243            The STAC items containing asset URLs.
244        data_product : str, optional
245            The data product to load (default is "CSARP_standard").
246        merge_flights : bool, optional
247            Whether to merge frames from the same flight (default is False).
248        skip_errors : bool, optional
249            Whether to skip errors and continue loading other frames (default is False).
250
251        Returns
252        -------
253        list[xr.Dataset] or xr.Dataset
254            List of loaded radar frames as xarray Datasets or a single merged Dataset if there is only one segment and merge_flights is True.
255        """
256        frames = []
257
258        for idx, item in stac_items.iterrows():
259            try:
260                frame = self.load_frame(item, data_product)
261                frames.append(frame)
262            except Exception as e:
263                print(f"Error loading frame for item {item.get('id', 'unknown')}: {e}")
264                if skip_errors:
265                    continue
266                else:
267                    raise e
268
269        if merge_flights:
270            return opr_tools.merge_frames(frames)
271        else:
272            return frames
273
274    def load_frame(self, stac_item, data_product: str = "CSARP_standard") -> xr.Dataset:
275        """
276        Load a radar frame from a STAC item.
277
278        Parameters
279        ----------
280        stac_item
281            The STAC item containing asset URLs.
282        data_product : str, optional
283            The data product to load (default is "CSARP_standard").
284
285        Returns
286        -------
287        xr.Dataset
288            The loaded radar frame as an xarray Dataset.
289        """
290
291        assets = stac_item['assets']
292
293        # Get the data asset
294        data_asset = assets.get(data_product)
295        if not data_asset:
296            available_assets = list(assets.keys())
297            raise ValueError(f"No {data_product} asset found. Available assets: {available_assets}")
298
299        # Get the URL from the asset
300        url = data_asset.get('href')
301        if not url:
302            raise ValueError(f"No href found in {data_product} asset")
303
304        # Load the frame using the existing method
305        return self.load_frame_url(url)
306
307    def load_frame_url(self, url: str) -> xr.Dataset:
308        """
309        Load a radar frame from a given URL.
310
311        Parameters
312        ----------
313        url : str
314            The URL of the radar frame data.
315
316        Returns
317        -------
318        xr.Dataset
319            The loaded radar frame as an xarray Dataset.
320        """
321
322        file = self._open_file(url)
323
324        filetype = None
325        try:
326            ds = self._load_frame_hdf5(file)
327            filetype = 'hdf5'
328        except OSError:
329            ds = self._load_frame_matlab(file)
330            filetype = 'matlab'
331
332        # Add the source URL as an attribute
333        ds.attrs['source_url'] = url
334
335        # Apply CF-compliant attributes
336        ds = apply_cf_compliant_attrs(ds)
337
338        # Get the season and segment from the URL
339        match = re.search(r'(\d{4}_\w+_[A-Za-z0-9]+)\/([\w_]+)\/[\d_]+\/[\w]+(\d{8}_\d{2}_\d{3})', url)
340        if match:
341            collection, data_product, granule = match.groups()
342            date, segment_id, frame_id = granule.split('_')
343            ds.attrs['collection'] = collection
344            ds.attrs['data_product'] = data_product
345            ds.attrs['granule'] = granule
346            ds.attrs['segment_path'] = f"{date}_{segment_id}"
347            ds.attrs['date_str'] = date
348            ds.attrs['segment'] = int(segment_id)
349            ds.attrs['frame'] = int(frame_id)
350
351            # Load citation information
352            result = ops_api.get_segment_metadata(segment_name=ds.attrs['segment_path'], season_name=collection)
353            if result:
354                if isinstance(result['data'], str):
355                    warnings.warn(f"Warning: Unexpected result from ops_api: {result['data']}", UserWarning)
356                else:
357                    result_data = {}
358                    for key, value in result['data'].items():
359                        if len(value) == 1:
360                            result_data[key] = value[0]
361                        elif len(value) > 1:
362                            result_data[key] = set(value)
363
364                    if 'dois' in result_data:
365                        ds.attrs['doi'] = result_data['dois']
366                    if 'rors' in result_data:
367                        ds.attrs['ror'] = result_data['rors']
368                    if 'funding_sources' in result_data:
369                        ds.attrs['funder_text'] = result_data['funding_sources']
370
371        # Add the rest of the Matlab parameters
372        if filetype == 'hdf5':
373            ds.attrs['mimetype'] = 'application/x-hdf5'
374            ds.attrs.update(decode_hdf5_matlab_variable(h5py.File(file, 'r'),
375                                                        skip_variables=True,
376                                                        skip_errors=True))
377        elif filetype == 'matlab':
378            ds.attrs['mimetype'] = 'application/x-matlab-data'
379            ds.attrs.update(extract_legacy_mat_attributes(file,
380                                                          skip_keys=ds.keys(),
381                                                          skip_errors=True))
382
383        return ds
384
385    def _load_frame_hdf5(self, file) -> xr.Dataset:
386        """
387        Load a radar frame from an HDF5 file.
388
389        Parameters
390        ----------
391        file :
392            The path to the HDF5 file containing radar frame data.
393
394        Returns
395        -------
396        xr.Dataset
397            The loaded radar frame as an xarray Dataset.
398        """
399
400        ds = xr.open_dataset(file, engine='h5netcdf', phony_dims='sort')
401
402        # Re-arrange variables to provide useful dimensions and coordinates
403
404        ds = ds.squeeze() # Drop the singleton dimensions matlab adds
405
406        ds = ds.rename({ # Label the dimensions with more useful names
407            ds.Data.dims[0]: 'slow_time_idx',
408            ds.Data.dims[1]: 'twtt_idx',
409        })
410
411        # Make variables with no dimensions into scalar attributes
412        for var in ds.data_vars:
413            if ds[var].ndim == 0:
414                ds.attrs[var] = ds[var].item()
415                ds = ds.drop_vars(var)
416
417        # Make the file_type an attribute
418        if 'file_type' in ds.data_vars:
419            ds.attrs['file_type'] = ds['file_type'].to_numpy()
420            ds = ds.drop_vars('file_type')
421
422        # Name the two time coordinates
423        ds = ds.rename({'Time': 'twtt', 'GPS_time': 'slow_time'})
424        ds = ds.set_coords(['slow_time', 'twtt'])
425
426        slow_time_1d = pd.to_datetime(ds['slow_time'].values, unit='s')
427        ds = ds.assign_coords(slow_time=('slow_time_idx', slow_time_1d))
428
429        # Make twtt and slow_time the indexing coordinates
430        ds = ds.swap_dims({'twtt_idx': 'twtt'})
431        ds = ds.swap_dims({'slow_time_idx': 'slow_time'})
432
433        return ds
434
435    def _load_frame_matlab(self, file) -> xr.Dataset:
436        """
437        Load a radar frame from a MATLAB file.
438
439        Parameters
440        ----------
441        file :
442            The path to the MATLAB file containing radar frame data.
443
444        Returns
445        -------
446        xr.Dataset
447            The loaded radar frame as an xarray Dataset.
448        """
449
450        m = scipy.io.loadmat(file, mat_dtype=False)
451
452        key_dims = {
453            'Time': ('twtt',),
454            'GPS_time': ('slow_time',),
455            'Latitude': ('slow_time',),
456            'Longitude': ('slow_time',),
457            'Elevation': ('slow_time',),
458            'Roll': ('slow_time',),
459            'Pitch': ('slow_time',),
460            'Heading': ('slow_time',),
461            'Surface': ('slow_time',),
462            'Data': ('twtt', 'slow_time')
463        }
464
465        ds = xr.Dataset(
466            {
467                key: (dims, np.squeeze(m[key])) for key, dims in key_dims.items() if key in m
468            },
469            coords={
470                'twtt': ('twtt', np.squeeze(m['Time'])),
471                'slow_time': ('slow_time', pd.to_datetime(np.squeeze(m['GPS_time']), unit='s')),
472            }
473        )
474
475        return ds
476
477    def get_collections(self) -> list:
478        """
479        Get list of available STAC collections.
480
481        Returns
482        -------
483        list
484            List of collection dictionaries with metadata.
485        """
486
487        client = DuckdbClient()
488        return client.get_collections(self.stac_parquet_href)
489
490    def get_segments(self, collection_id: str) -> list:
491        """
492        Get list of available segments within a collection/season.
493
494        Parameters
495        ----------
496        collection_id : str
497            The ID of the STAC collection to query.
498
499        Returns
500        -------
501        list
502            List of flight dictionaries with flight metadata.
503        """
504        # Query STAC API for all items in collection (exclude geometry for better performance)
505        items = self.query_frames(collections=[collection_id], exclude_geometry=True)
506
507        if items is None or len(items) == 0:
508            print(f"No items found in collection '{collection_id}'")
509            return []
510
511        # Group items by segment (opr:date + opr:segment)
512        segments = {}
513        for idx, item in items.iterrows():
514            properties = item['properties']
515            date = properties['opr:date']
516            flight_num = properties['opr:segment']
517
518            if date and flight_num is not None:
519                segment_path = f"{date}_{flight_num:02d}"
520
521                if segment_path not in segments:
522                    segments[segment_path] = {
523                        'segment_path': segment_path,
524                        'date': date,
525                        'flight_number': flight_num,
526                        'collection': collection_id,
527                        'frames': [],
528                        'item_count': 0
529                    }
530
531                segments[segment_path]['frames'].append(properties.get('opr:frame'))
532                segments[segment_path]['item_count'] += 1
533
534        # Sort segments by date and flight number
535        segment_list = list(segments.values())
536        segment_list.sort(key=lambda x: (x['date'], x['flight_number']))
537
538        return segment_list
539
540    def get_layers_files(self, segment: Union[xr.Dataset, dict], raise_errors=True) -> dict:
541        """
542        Fetch layers from the CSARP_layers files
543
544        See https://gitlab.com/openpolarradar/opr/-/wikis/Layer-File-Guide for file formats
545
546        Parameters
547        ----------
548        segment : Union[xr.Dataset, dict]
549            The flight information, which can be an xarray Dataset or a dictionary representing the STAC item.
550        raise_errors : bool, optional
551            If True, raise errors when layers cannot be found.
552
553        Returns
554        -------
555        dict
556            A dictionary mapping layer IDs to their corresponding data.
557        """
558        collection, segment_path, frame = self._extract_segment_info(segment)
559
560        # If we already have a STAC item, just use it. Otherwise, query to find the matching STAC items
561        if isinstance(segment, xr.Dataset):
562            properties = {}
563            if frame:
564                properties['opr:frame'] = frame
565
566            # Query STAC collection for CSARP_layer files matching this specific segment
567
568            # Get items from this specific segment
569            stac_items = self.query_frames(collections=[collection], segment_paths=[segment_path], properties=properties)
570
571            # Filter for items that have CSARP_layer assets
572            layer_items = []
573            for idx, item in stac_items.iterrows():
574                if 'CSARP_layer' in item['assets']:
575                    layer_items.append(item)
576        else:
577            layer_items = [segment] if 'CSARP_layer' in segment.get('assets', {}) else []
578
579        if not layer_items:
580            if raise_errors:
581                raise ValueError(f"No CSARP_layer files found for segment path {segment_path} in collection {collection}")
582            else:
583                return {}
584
585        # Load each layer file and combine them
586        layer_frames = []
587        for item in layer_items:
588            layer_asset = item['assets']['CSARP_layer']
589            if layer_asset and 'href' in layer_asset:
590                url = layer_asset['href']
591                try:
592                    layer_ds = self.load_layers_file(url)
593                    layer_frames.append(layer_ds)
594                except Exception as e:
595                    raise e # TODO
596                    print(f"Warning: Failed to load layer file {url}: {e}")
597                    continue
598
599        if not layer_frames:
600            if raise_errors:
601                raise ValueError(f"No valid CSARP_layer files could be loaded for segment {segment_path} in collection {collection}")
602            else:
603                return {}
604
605        # Concatenate all layer frames along slow_time dimension
606        layers_segment = xr.concat(layer_frames, dim='slow_time', combine_attrs='drop_conflicts', data_vars='minimal')
607        layers_segment = layers_segment.sortby('slow_time')
608
609        # Trim to bounds of the original dataset
610        layers_segment = self._trim_to_bounds(layers_segment, segment)
611
612        # Find the layer organization file to map layer IDs to names
613        # Layer organization files are one per directory. For example, the layer file:
614        # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/Data_20180105_03_003.mat
615        # would have a layer organization file at:
616        # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/layer_20180105_03.mat
617
618        layer_organization_file_url = re.sub(r'Data_(\d{8}_\d{2})_\d{3}.mat', r'layer_\1.mat', url)
619        layer_organization_ds = self._load_layer_organization(self._open_file(layer_organization_file_url))
620
621        # Split into separate layers by ID
622        layers = {}
623
624        layer_ids = np.unique(layers_segment['layer'])
625
626        for i, layer_id in enumerate(layer_ids):
627            layer_id_int = int(layer_id)
628            layer_name = str(layer_organization_ds['lyr_name'].sel(lyr_id=layer_id_int).values.item().squeeze())
629            layer_group = str(layer_organization_ds['lyr_group_name'].sel(lyr_id=layer_id_int).values.item().squeeze())
630            if layer_group == '[]':
631                layer_group = ''
632            layer_display_name = f"{layer_group}:{layer_name}"
633            layer_data = {}
634
635            layer_ds = layers_segment.sel(layer=layer_id)
636            layers[layer_display_name] = layer_ds
637
638        return layers
639
640    def _trim_to_bounds(self, ds: xr.Dataset, ref: Union[xr.Dataset, dict]) -> xr.Dataset:
641        start_time, end_time = None, None
642        if isinstance(ref, xr.Dataset) and 'slow_time' in ref.coords:
643            start_time = ref['slow_time'].min()
644            end_time = ref['slow_time'].max()
645        else:
646            properties = ref.get('properties', {})
647            if 'start_datetime' in properties and 'end_datetime' in properties:
648                start_time = pd.to_datetime(properties['start_datetime'])
649                end_time = pd.to_datetime(properties['end_datetime'])
650
651        if start_time:
652            return ds.sel(slow_time=slice(start_time, end_time))
653        else:
654            return ds
655
656    def load_layers_file(self, url: str) -> xr.Dataset:
657        """
658        Load layer data from a CSARP_layer file (either HDF5 or MATLAB format).
659
660        Parameters
661        ----------
662        url : str
663            URL or path to the layer file
664
665        Returns
666        -------
667        xr.Dataset
668            Layer data in a standardized format with coordinates:
669            - slow_time: GPS time converted to datetime
670            And data variables:
671            - twtt: Two-way travel time for each layer
672            - quality: Quality values for each layer
673            - type: Type values for each layer
674            - lat, lon, elev: Geographic coordinates
675            - id: Layer IDs
676        """
677
678        file = self._open_file(url)
679
680        ds = self._load_layers(file)
681
682        # Add the source URL as an attribute
683        ds.attrs['source_url'] = url
684
685        # Apply common manipulations to match the expected structure
686        # Convert GPS time to datetime coordinate
687        if 'gps_time' in ds.variables:
688            slow_time_dt = pd.to_datetime(ds['gps_time'].values, unit='s')
689            ds = ds.assign_coords(slow_time=('slow_time', slow_time_dt))
690
691            # Set slow_time as the main coordinate and remove gps_time from data_vars
692            if 'slow_time' not in ds.dims:
693                ds = ds.swap_dims({'gps_time': 'slow_time'})
694
695            # Remove gps_time from data_vars if it exists there to avoid conflicts
696            if ('gps_time' in ds.data_vars) or ('gps_time' in ds.coords):
697                ds = ds.drop_vars('gps_time')
698
699        # Sort by slow_time if it exists
700        if 'slow_time' in ds.coords:
701            ds = ds.sortby('slow_time')
702
703        return ds
704
705    def _load_layers(self, file) -> xr.Dataset:
706        """
707        Load layer data file using hdf5storage
708
709        Parameters:
710        file : str
711            Path to the layer file
712        Returns: xr.Dataset
713        """
714
715        d = hdf5storage.loadmat(file, appendmat=False)
716
717        # Ensure 'id' is at least 1D for the layer dimension
718        id_data = np.atleast_1d(d['id'].squeeze())
719
720        # For 2D arrays with (layer, slow_time) dimensions, ensure they remain 2D
721        # even when there's only one layer
722        quality_data = np.atleast_2d(d['quality'].squeeze())
723        if quality_data.shape[0] == 1 or quality_data.ndim == 1:
724            quality_data = quality_data.reshape(len(id_data), -1)
725
726        twtt_data = np.atleast_2d(d['twtt'].squeeze())
727        if twtt_data.shape[0] == 1 or twtt_data.ndim == 1:
728            twtt_data = twtt_data.reshape(len(id_data), -1)
729
730        type_data = np.atleast_2d(d['type'].squeeze())
731        if type_data.shape[0] == 1 or type_data.ndim == 1:
732            type_data = type_data.reshape(len(id_data), -1)
733
734        ds = xr.Dataset({
735            'file_type': ((), d['file_type'].squeeze()),
736            'file_version': ((), d['file_version'].squeeze()),
737            'elev': (('slow_time',), d['elev'].squeeze()),
738            #'gps_source': ((), d['gps_source'].squeeze()),
739            'gps_time': (('slow_time',), d['gps_time'].squeeze()),
740            'id': (('layer',), id_data),
741            'lat': (('slow_time',), d['lat'].squeeze()),
742            'lon': (('slow_time',), d['lon'].squeeze()),
743            'quality': (('layer', 'slow_time'), quality_data),
744            'twtt': (('layer', 'slow_time'), twtt_data),
745            'type': (('layer', 'slow_time'), type_data),
746        })
747
748        ds = ds.assign_coords({'layer': ds['id'], 'slow_time': ds['gps_time']})
749
750        return ds
751
752    def _load_layer_organization(self, file) -> xr.Dataset:
753        """
754        Load a layer organization file
755
756        Parameters
757        ----------
758        file : str
759            Path to the HDF5 layer organization file
760
761        Returns
762        -------
763        xr.Dataset
764            Raw layer data from HDF5 file
765        """
766        d = hdf5storage.loadmat(file, appendmat=False)
767        ds = xr.Dataset({
768            'file_type': ((), d['file_type'].squeeze()),
769            'file_version': ((), d['file_version'].squeeze()),
770            'lyr_age': (('lyr_id',), np.atleast_1d(d['lyr_age'].squeeze())),
771            'lyr_age_source': (('lyr_id',), np.atleast_1d(d['lyr_age_source'].squeeze())),
772            'lyr_desc': (('lyr_id',), np.atleast_1d(d['lyr_desc'].squeeze())),
773            'lyr_group_name': (('lyr_id',), np.atleast_1d(d['lyr_group_name'].squeeze())),
774            'lyr_id': (('lyr_id',), np.atleast_1d(d['lyr_id'].squeeze())),
775            'lyr_name': (('lyr_id',), np.atleast_1d(d['lyr_name'].squeeze())),
776            'lyr_order': (('lyr_id',), np.atleast_1d(d['lyr_order'].squeeze())),
777            }, attrs={'param': d['param']})
778
779        ds = ds.set_index(lyr_id='lyr_id')
780
781        return ds
782
783    def get_layers_db(self, flight: Union[xr.Dataset, dict], include_geometry=True, raise_errors=True) -> dict:
784        """
785        Fetch layer data from the OPS API
786
787        Parameters
788        ----------
789        flight : Union[xr.Dataset, dict]
790            The flight data, which can be an xarray Dataset or a dictionary.
791        include_geometry : bool, optional
792            If True, include geometry information in the returned layers.
793
794        Returns
795        -------
796        dict
797            A dictionary mapping layer IDs to their corresponding data.
798        """
799
800        collection, segment_path, _ = self._extract_segment_info(flight)
801
802        if 'Antarctica' in collection:
803            location = 'antarctic'
804        elif 'Greenland' in collection:
805            location = 'arctic'
806        else:
807            raise ValueError("Dataset does not belong to a recognized location (Antarctica or Greenland).")
808
809        layer_points = ops_api.get_layer_points(
810            segment_name=segment_path,
811            season_name=collection,
812            location=location,
813            include_geometry=include_geometry
814        )
815
816        if layer_points['status'] != 1:
817            if raise_errors:
818                raise ValueError(f"Failed to fetch layer points. Received response with status {layer_points['status']}.")
819            else:
820                return {}
821
822        layer_ds_raw = xr.Dataset(
823            {k: (['gps_time'], v) for k, v in layer_points['data'].items() if k != 'gps_time'},
824            coords={'gps_time': layer_points['data']['gps_time']}
825        )
826        # Split into a dictionary of layers based on lyr_id
827        layer_ids = set(layer_ds_raw['lyr_id'].to_numpy())
828        layer_ids = [int(layer_id) for layer_id in layer_ids if not np.isnan(layer_id)]
829
830        # Get the mapping of layer IDs to names
831        if self.db_layers_metadata is None:
832            layer_metadata = ops_api.get_layer_metadata()
833            if layer_metadata['status'] == 1:
834                df = pd.DataFrame(layer_metadata['data'])
835                df['display_name'] = df['lyr_group_name'] + ":" + df['lyr_name']
836                self.db_layers_metadata = df
837            else:
838                raise ValueError("Failed to fetch layer metadata from OPS API.")
839
840        layers = {}
841        for layer_id in layer_ids:
842            layer_name = self.db_layers_metadata.loc[self.db_layers_metadata['lyr_id'] == layer_id, 'display_name'].values[0]
843
844            l = layer_ds_raw.where(layer_ds_raw['lyr_id'] == layer_id, drop=True)
845
846            l = l.sortby('gps_time')
847
848            l = l.rename({'gps_time': 'slow_time'})
849            l = l.set_coords(['slow_time'])
850
851            l['slow_time'] = pd.to_datetime(l['slow_time'].values, unit='s')
852
853            # Filter to the same time range as flight
854            l = self._trim_to_bounds(l, flight)
855
856            layers[layer_name] = l
857
858        return layers
859
860    def get_layers(self, ds : xr.Dataset, source: str = 'auto', include_geometry=True, raise_errors=True) -> dict:
861        """
862        Fetch layer data for a given flight dataset, either from CSARP_layer files or the OPS Database API.
863
864        Parameters
865        ----------
866        ds : xr.Dataset
867            The flight dataset containing attributes for collection and segment.
868        source : str, optional
869            The source to fetch layers from: 'auto', 'files', or 'db' (default is 'auto').
870        include_geometry : bool, optional
871            If True, include geometry information when fetching from the API.\
872        raise_errors : bool, optional
873            If True, raise errors when layers cannot be found.
874
875        Returns
876        -------
877        dict
878            A dictionary mapping layer IDs to their corresponding data.
879        """
880
881        if source == 'auto':
882            # Try to get layers from files first
883            try:
884                layers = self.get_layers_files(ds, raise_errors=True)
885                return layers
886            except:
887                # Fallback to API if no layers found in files
888                return self.get_layers_db(ds, include_geometry=include_geometry, raise_errors=raise_errors)
889        elif source == 'files':
890            return self.get_layers_files(ds, raise_errors=raise_errors)
891        elif source == 'db':
892            return self.get_layers_db(ds, include_geometry=include_geometry, raise_errors=raise_errors)
893        else:
894            raise ValueError("Invalid source specified. Must be one of: 'auto', 'files', 'db'.")
OPRConnection( collection_url: str = 'https://data.cresis.ku.edu/data/', cache_dir: str = None, stac_parquet_href: str = 'gs://opr_stac/catalog/**/*.parquet')
27    def __init__(self,
28                 collection_url: str = "https://data.cresis.ku.edu/data/",
29                 cache_dir: str = None,
30                 stac_parquet_href: str = "gs://opr_stac/catalog/**/*.parquet"):
31        """
32        Initialize the OPRConnection with a collection URL and optional cache directory.
33
34        Parameters
35        ----------
36        collection_url : str
37            The base URL for the OPR data collection.
38        cache_dir : str, optional
39            Directory to cache downloaded data.
40        """
41        self.collection_url = collection_url
42        self.cache_dir = cache_dir
43        self.stac_parquet_href = stac_parquet_href
44
45        self.fsspec_cache_kwargs = {}
46        self.fsspec_url_prefix = ''
47        if cache_dir:
48            self.fsspec_cache_kwargs['cache_storage'] = cache_dir
49            self.fsspec_cache_kwargs['check_files'] = True
50            self.fsspec_url_prefix = 'filecache::'
51
52        self.db_layers_metadata = None # Cache for OPS database layers metadata

Initialize the OPRConnection with a collection URL and optional cache directory.

Parameters
  • collection_url (str): The base URL for the OPR data collection.
  • cache_dir (str, optional): Directory to cache downloaded data.
collection_url
cache_dir
stac_parquet_href
fsspec_cache_kwargs
fsspec_url_prefix
db_layers_metadata
def query_frames( self, collections: list[str] = None, segment_paths: list[str] = None, geometry=None, date_range: tuple = None, properties: dict = {}, max_items: int = None, exclude_geometry: bool = False, search_kwargs: dict = {}) -> geopandas.geodataframe.GeoDataFrame:
 72    def query_frames(self, collections: list[str] = None, segment_paths: list[str] = None,
 73                     geometry = None, date_range: tuple = None, properties: dict = {},
 74                     max_items: int = None, exclude_geometry: bool = False,
 75                     search_kwargs: dict = {}) -> gpd.GeoDataFrame:
 76        """
 77        Query for radar frames based on various search criteria. Each parameter is
 78        treated as an independent criteria. If multiple parameters are passed, they are
 79        combined with AND logic.
 80
 81        A list of values may be passed to most parameters. If so, any values in the list
 82        will be treated as a match.
 83
 84        Parameters
 85        ----------
 86        collections : list[str], optional
 87            List of collection names to filter by (e.g., "2022_Antarctica_BaslerMKB").
 88        segment_paths : list[str], optional
 89            List of segment paths to filter by (e.g., "20230126_01").
 90        geometry : optional
 91            Geospatial geometry to filter by (e.g., a shapely geometry object).
 92        date_range : tuple, optional
 93            Date range to filter by (e.g., (start_date, end_date)).
 94        properties : dict, optional
 95            Additional properties to include in the query.
 96        max_items : int, optional
 97            Maximum number of items to return.
 98        exclude_geometry : bool, optional
 99            If True, exclude the geometry field from the response to reduce size.
100        search_kwargs : dict, optional
101            Additional keyword arguments to pass to the search method.
102
103        Returns
104        -------
105        gpd.GeoDataFrame
106            GeoDataFrame containing the STAC frames matching the query criteria.
107        """
108
109        search_params = {}
110
111        # Exclude geometry -- do not return the geometry field to reduce response size
112        if exclude_geometry:
113            search_params['exclude'] = ['geometry']
114
115        # Handle collections (seasons)
116        if collections is not None:
117            search_params['collections'] = [collections] if isinstance(collections, str) else collections
118
119        # Handle geometry filtering
120        if geometry is not None:
121            if hasattr(geometry, '__geo_interface__'):
122                geometry = geometry.__geo_interface__
123
124            # Fix geometries that cross the antimeridian
125            geometry = antimeridian.fix_geojson(geometry, reverse=True)
126
127            search_params['intersects'] = geometry
128
129        # Handle date range filtering
130        if date_range is not None:
131            search_params['datetime'] = date_range
132
133        # Handle max_items
134        if max_items is not None:
135            search_params['limit'] = max_items
136        else:
137            search_params['limit'] = 1000000
138
139        # Handle segment_paths filtering using CQL2
140        filter_conditions = []
141
142        if segment_paths is not None:
143            segment_paths = [segment_paths] if isinstance(segment_paths, str) else segment_paths
144
145            # Create OR conditions for segment paths
146            segment_conditions = []
147            for segment_path in segment_paths:
148                try:
149                    date_str, segment_num_str = segment_path.split('_')
150                    segment_num = int(segment_num_str)
151
152                    # Create AND condition for this specific segment
153                    segment_condition = {
154                        "op": "and",
155                        "args": [
156                            {
157                                "op": "=",
158                                "args": [{"property": "opr:date"}, date_str]
159                            },
160                            {
161                                "op": "=",
162                                "args": [{"property": "opr:segment"}, segment_num]
163                            }
164                        ]
165                    }
166                    segment_conditions.append(segment_condition)
167                except ValueError:
168                    print(f"Warning: Invalid segment_path format '{segment_path}'. Expected format: YYYYMMDD_NN")
169                    continue
170
171            if segment_conditions:
172                if len(segment_conditions) == 1:
173                    filter_conditions.append(segment_conditions[0])
174                else:
175                    # Multiple segments - combine with OR
176                    filter_conditions.append({
177                        "op": "or",
178                        "args": segment_conditions
179                    })
180
181        # Add any additional property filters
182        for key, value in properties.items():
183            filter_conditions.append({
184                "op": "=",
185                "args": [{"property": key}, value]
186            })
187
188        # Combine all filter conditions with AND
189        if filter_conditions:
190            if len(filter_conditions) == 1:
191                filter_expr = filter_conditions[0]
192            else:
193                filter_expr = {
194                    "op": "and",
195                    "args": filter_conditions
196                }
197
198            search_params['filter'] = filter_expr
199
200        # Add any extra kwargs to search
201        search_params.update(search_kwargs)
202
203        #print(search_params) # TODO: Remove
204
205        # Perform the search
206        # from rustac import DuckdbClient
207        client = DuckdbClient()
208        items = client.search(self.stac_parquet_href, **search_params)
209        if isinstance(items, dict):
210            items = items['features']
211
212        if not items or len(items) == 0:
213            warnings.warn("No items found matching the query criteria", UserWarning)
214            return None
215
216        # Convert to GeoDataFrame
217        items_df = gpd.GeoDataFrame(items)
218        # Set index
219        items_df = items_df.set_index(items_df['id'])
220        items_df.index.name = 'stac_item_id'
221        # Set the geometry column
222        if 'geometry' in items_df.columns and not exclude_geometry:
223            items_df = items_df.set_geometry(items_df['geometry'].apply(shapely.geometry.shape))
224            items_df.crs = "EPSG:4326"
225
226        # Reorder the columns, leaving any extra columns at the end
227        desired_order = ['collection', 'geometry', 'properties', 'assets']
228        items_df = items_df[[col for col in desired_order if col in items_df.columns] + list(items_df.columns.difference(desired_order))]
229
230        return items_df

Query for radar frames based on various search criteria. Each parameter is treated as an independent criteria. If multiple parameters are passed, they are combined with AND logic.

A list of values may be passed to most parameters. If so, any values in the list will be treated as a match.

Parameters
  • collections (list[str], optional): List of collection names to filter by (e.g., "2022_Antarctica_BaslerMKB").
  • segment_paths (list[str], optional): List of segment paths to filter by (e.g., "20230126_01").
  • geometry (optional): Geospatial geometry to filter by (e.g., a shapely geometry object).
  • date_range (tuple, optional): Date range to filter by (e.g., (start_date, end_date)).
  • properties (dict, optional): Additional properties to include in the query.
  • max_items (int, optional): Maximum number of items to return.
  • exclude_geometry (bool, optional): If True, exclude the geometry field from the response to reduce size.
  • search_kwargs (dict, optional): Additional keyword arguments to pass to the search method.
Returns
  • gpd.GeoDataFrame: GeoDataFrame containing the STAC frames matching the query criteria.
def load_frames( self, stac_items: geopandas.geodataframe.GeoDataFrame, data_product: str = 'CSARP_standard', merge_flights: bool = False, skip_errors: bool = False) -> Union[list[xarray.core.dataset.Dataset], xarray.core.dataset.Dataset]:
232    def load_frames(self, stac_items: gpd.GeoDataFrame,
233                    data_product: str = "CSARP_standard",
234                    merge_flights: bool = False,
235                    skip_errors: bool = False,
236                    ) -> Union[list[xr.Dataset], xr.Dataset]:
237        """
238        Load multiple radar frames from a list of STAC items.
239
240        Parameters
241        ----------
242        stac_items : gpd.GeoDataFrame
243            The STAC items containing asset URLs.
244        data_product : str, optional
245            The data product to load (default is "CSARP_standard").
246        merge_flights : bool, optional
247            Whether to merge frames from the same flight (default is False).
248        skip_errors : bool, optional
249            Whether to skip errors and continue loading other frames (default is False).
250
251        Returns
252        -------
253        list[xr.Dataset] or xr.Dataset
254            List of loaded radar frames as xarray Datasets or a single merged Dataset if there is only one segment and merge_flights is True.
255        """
256        frames = []
257
258        for idx, item in stac_items.iterrows():
259            try:
260                frame = self.load_frame(item, data_product)
261                frames.append(frame)
262            except Exception as e:
263                print(f"Error loading frame for item {item.get('id', 'unknown')}: {e}")
264                if skip_errors:
265                    continue
266                else:
267                    raise e
268
269        if merge_flights:
270            return opr_tools.merge_frames(frames)
271        else:
272            return frames

Load multiple radar frames from a list of STAC items.

Parameters
  • stac_items (gpd.GeoDataFrame): The STAC items containing asset URLs.
  • data_product (str, optional): The data product to load (default is "CSARP_standard").
  • merge_flights (bool, optional): Whether to merge frames from the same flight (default is False).
  • skip_errors (bool, optional): Whether to skip errors and continue loading other frames (default is False).
Returns
  • list[xr.Dataset] or xr.Dataset: List of loaded radar frames as xarray Datasets or a single merged Dataset if there is only one segment and merge_flights is True.
def load_frame( self, stac_item, data_product: str = 'CSARP_standard') -> xarray.core.dataset.Dataset:
274    def load_frame(self, stac_item, data_product: str = "CSARP_standard") -> xr.Dataset:
275        """
276        Load a radar frame from a STAC item.
277
278        Parameters
279        ----------
280        stac_item
281            The STAC item containing asset URLs.
282        data_product : str, optional
283            The data product to load (default is "CSARP_standard").
284
285        Returns
286        -------
287        xr.Dataset
288            The loaded radar frame as an xarray Dataset.
289        """
290
291        assets = stac_item['assets']
292
293        # Get the data asset
294        data_asset = assets.get(data_product)
295        if not data_asset:
296            available_assets = list(assets.keys())
297            raise ValueError(f"No {data_product} asset found. Available assets: {available_assets}")
298
299        # Get the URL from the asset
300        url = data_asset.get('href')
301        if not url:
302            raise ValueError(f"No href found in {data_product} asset")
303
304        # Load the frame using the existing method
305        return self.load_frame_url(url)

Load a radar frame from a STAC item.

Parameters
  • stac_item: The STAC item containing asset URLs.
  • data_product (str, optional): The data product to load (default is "CSARP_standard").
Returns
  • xr.Dataset: The loaded radar frame as an xarray Dataset.
def load_frame_url(self, url: str) -> xarray.core.dataset.Dataset:
307    def load_frame_url(self, url: str) -> xr.Dataset:
308        """
309        Load a radar frame from a given URL.
310
311        Parameters
312        ----------
313        url : str
314            The URL of the radar frame data.
315
316        Returns
317        -------
318        xr.Dataset
319            The loaded radar frame as an xarray Dataset.
320        """
321
322        file = self._open_file(url)
323
324        filetype = None
325        try:
326            ds = self._load_frame_hdf5(file)
327            filetype = 'hdf5'
328        except OSError:
329            ds = self._load_frame_matlab(file)
330            filetype = 'matlab'
331
332        # Add the source URL as an attribute
333        ds.attrs['source_url'] = url
334
335        # Apply CF-compliant attributes
336        ds = apply_cf_compliant_attrs(ds)
337
338        # Get the season and segment from the URL
339        match = re.search(r'(\d{4}_\w+_[A-Za-z0-9]+)\/([\w_]+)\/[\d_]+\/[\w]+(\d{8}_\d{2}_\d{3})', url)
340        if match:
341            collection, data_product, granule = match.groups()
342            date, segment_id, frame_id = granule.split('_')
343            ds.attrs['collection'] = collection
344            ds.attrs['data_product'] = data_product
345            ds.attrs['granule'] = granule
346            ds.attrs['segment_path'] = f"{date}_{segment_id}"
347            ds.attrs['date_str'] = date
348            ds.attrs['segment'] = int(segment_id)
349            ds.attrs['frame'] = int(frame_id)
350
351            # Load citation information
352            result = ops_api.get_segment_metadata(segment_name=ds.attrs['segment_path'], season_name=collection)
353            if result:
354                if isinstance(result['data'], str):
355                    warnings.warn(f"Warning: Unexpected result from ops_api: {result['data']}", UserWarning)
356                else:
357                    result_data = {}
358                    for key, value in result['data'].items():
359                        if len(value) == 1:
360                            result_data[key] = value[0]
361                        elif len(value) > 1:
362                            result_data[key] = set(value)
363
364                    if 'dois' in result_data:
365                        ds.attrs['doi'] = result_data['dois']
366                    if 'rors' in result_data:
367                        ds.attrs['ror'] = result_data['rors']
368                    if 'funding_sources' in result_data:
369                        ds.attrs['funder_text'] = result_data['funding_sources']
370
371        # Add the rest of the Matlab parameters
372        if filetype == 'hdf5':
373            ds.attrs['mimetype'] = 'application/x-hdf5'
374            ds.attrs.update(decode_hdf5_matlab_variable(h5py.File(file, 'r'),
375                                                        skip_variables=True,
376                                                        skip_errors=True))
377        elif filetype == 'matlab':
378            ds.attrs['mimetype'] = 'application/x-matlab-data'
379            ds.attrs.update(extract_legacy_mat_attributes(file,
380                                                          skip_keys=ds.keys(),
381                                                          skip_errors=True))
382
383        return ds

Load a radar frame from a given URL.

Parameters
  • url (str): The URL of the radar frame data.
Returns
  • xr.Dataset: The loaded radar frame as an xarray Dataset.
def get_collections(self) -> list:
477    def get_collections(self) -> list:
478        """
479        Get list of available STAC collections.
480
481        Returns
482        -------
483        list
484            List of collection dictionaries with metadata.
485        """
486
487        client = DuckdbClient()
488        return client.get_collections(self.stac_parquet_href)

Get list of available STAC collections.

Returns
  • list: List of collection dictionaries with metadata.
def get_segments(self, collection_id: str) -> list:
490    def get_segments(self, collection_id: str) -> list:
491        """
492        Get list of available segments within a collection/season.
493
494        Parameters
495        ----------
496        collection_id : str
497            The ID of the STAC collection to query.
498
499        Returns
500        -------
501        list
502            List of flight dictionaries with flight metadata.
503        """
504        # Query STAC API for all items in collection (exclude geometry for better performance)
505        items = self.query_frames(collections=[collection_id], exclude_geometry=True)
506
507        if items is None or len(items) == 0:
508            print(f"No items found in collection '{collection_id}'")
509            return []
510
511        # Group items by segment (opr:date + opr:segment)
512        segments = {}
513        for idx, item in items.iterrows():
514            properties = item['properties']
515            date = properties['opr:date']
516            flight_num = properties['opr:segment']
517
518            if date and flight_num is not None:
519                segment_path = f"{date}_{flight_num:02d}"
520
521                if segment_path not in segments:
522                    segments[segment_path] = {
523                        'segment_path': segment_path,
524                        'date': date,
525                        'flight_number': flight_num,
526                        'collection': collection_id,
527                        'frames': [],
528                        'item_count': 0
529                    }
530
531                segments[segment_path]['frames'].append(properties.get('opr:frame'))
532                segments[segment_path]['item_count'] += 1
533
534        # Sort segments by date and flight number
535        segment_list = list(segments.values())
536        segment_list.sort(key=lambda x: (x['date'], x['flight_number']))
537
538        return segment_list

Get list of available segments within a collection/season.

Parameters
  • collection_id (str): The ID of the STAC collection to query.
Returns
  • list: List of flight dictionaries with flight metadata.
def get_layers_files( self, segment: Union[xarray.core.dataset.Dataset, dict], raise_errors=True) -> dict:
540    def get_layers_files(self, segment: Union[xr.Dataset, dict], raise_errors=True) -> dict:
541        """
542        Fetch layers from the CSARP_layers files
543
544        See https://gitlab.com/openpolarradar/opr/-/wikis/Layer-File-Guide for file formats
545
546        Parameters
547        ----------
548        segment : Union[xr.Dataset, dict]
549            The flight information, which can be an xarray Dataset or a dictionary representing the STAC item.
550        raise_errors : bool, optional
551            If True, raise errors when layers cannot be found.
552
553        Returns
554        -------
555        dict
556            A dictionary mapping layer IDs to their corresponding data.
557        """
558        collection, segment_path, frame = self._extract_segment_info(segment)
559
560        # If we already have a STAC item, just use it. Otherwise, query to find the matching STAC items
561        if isinstance(segment, xr.Dataset):
562            properties = {}
563            if frame:
564                properties['opr:frame'] = frame
565
566            # Query STAC collection for CSARP_layer files matching this specific segment
567
568            # Get items from this specific segment
569            stac_items = self.query_frames(collections=[collection], segment_paths=[segment_path], properties=properties)
570
571            # Filter for items that have CSARP_layer assets
572            layer_items = []
573            for idx, item in stac_items.iterrows():
574                if 'CSARP_layer' in item['assets']:
575                    layer_items.append(item)
576        else:
577            layer_items = [segment] if 'CSARP_layer' in segment.get('assets', {}) else []
578
579        if not layer_items:
580            if raise_errors:
581                raise ValueError(f"No CSARP_layer files found for segment path {segment_path} in collection {collection}")
582            else:
583                return {}
584
585        # Load each layer file and combine them
586        layer_frames = []
587        for item in layer_items:
588            layer_asset = item['assets']['CSARP_layer']
589            if layer_asset and 'href' in layer_asset:
590                url = layer_asset['href']
591                try:
592                    layer_ds = self.load_layers_file(url)
593                    layer_frames.append(layer_ds)
594                except Exception as e:
595                    raise e # TODO
596                    print(f"Warning: Failed to load layer file {url}: {e}")
597                    continue
598
599        if not layer_frames:
600            if raise_errors:
601                raise ValueError(f"No valid CSARP_layer files could be loaded for segment {segment_path} in collection {collection}")
602            else:
603                return {}
604
605        # Concatenate all layer frames along slow_time dimension
606        layers_segment = xr.concat(layer_frames, dim='slow_time', combine_attrs='drop_conflicts', data_vars='minimal')
607        layers_segment = layers_segment.sortby('slow_time')
608
609        # Trim to bounds of the original dataset
610        layers_segment = self._trim_to_bounds(layers_segment, segment)
611
612        # Find the layer organization file to map layer IDs to names
613        # Layer organization files are one per directory. For example, the layer file:
614        # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/Data_20180105_03_003.mat
615        # would have a layer organization file at:
616        # https://data.cresis.ku.edu/data/rds/2017_Antarctica_BaslerJKB/CSARP_layer/20180105_03/layer_20180105_03.mat
617
618        layer_organization_file_url = re.sub(r'Data_(\d{8}_\d{2})_\d{3}.mat', r'layer_\1.mat', url)
619        layer_organization_ds = self._load_layer_organization(self._open_file(layer_organization_file_url))
620
621        # Split into separate layers by ID
622        layers = {}
623
624        layer_ids = np.unique(layers_segment['layer'])
625
626        for i, layer_id in enumerate(layer_ids):
627            layer_id_int = int(layer_id)
628            layer_name = str(layer_organization_ds['lyr_name'].sel(lyr_id=layer_id_int).values.item().squeeze())
629            layer_group = str(layer_organization_ds['lyr_group_name'].sel(lyr_id=layer_id_int).values.item().squeeze())
630            if layer_group == '[]':
631                layer_group = ''
632            layer_display_name = f"{layer_group}:{layer_name}"
633            layer_data = {}
634
635            layer_ds = layers_segment.sel(layer=layer_id)
636            layers[layer_display_name] = layer_ds
637
638        return layers

Fetch layers from the CSARP_layers files

See https://gitlab.com/openpolarradar/opr/-/wikis/Layer-File-Guide for file formats

Parameters
  • segment (Union[xr.Dataset, dict]): The flight information, which can be an xarray Dataset or a dictionary representing the STAC item.
  • raise_errors (bool, optional): If True, raise errors when layers cannot be found.
Returns
  • dict: A dictionary mapping layer IDs to their corresponding data.
def load_layers_file(self, url: str) -> xarray.core.dataset.Dataset:
656    def load_layers_file(self, url: str) -> xr.Dataset:
657        """
658        Load layer data from a CSARP_layer file (either HDF5 or MATLAB format).
659
660        Parameters
661        ----------
662        url : str
663            URL or path to the layer file
664
665        Returns
666        -------
667        xr.Dataset
668            Layer data in a standardized format with coordinates:
669            - slow_time: GPS time converted to datetime
670            And data variables:
671            - twtt: Two-way travel time for each layer
672            - quality: Quality values for each layer
673            - type: Type values for each layer
674            - lat, lon, elev: Geographic coordinates
675            - id: Layer IDs
676        """
677
678        file = self._open_file(url)
679
680        ds = self._load_layers(file)
681
682        # Add the source URL as an attribute
683        ds.attrs['source_url'] = url
684
685        # Apply common manipulations to match the expected structure
686        # Convert GPS time to datetime coordinate
687        if 'gps_time' in ds.variables:
688            slow_time_dt = pd.to_datetime(ds['gps_time'].values, unit='s')
689            ds = ds.assign_coords(slow_time=('slow_time', slow_time_dt))
690
691            # Set slow_time as the main coordinate and remove gps_time from data_vars
692            if 'slow_time' not in ds.dims:
693                ds = ds.swap_dims({'gps_time': 'slow_time'})
694
695            # Remove gps_time from data_vars if it exists there to avoid conflicts
696            if ('gps_time' in ds.data_vars) or ('gps_time' in ds.coords):
697                ds = ds.drop_vars('gps_time')
698
699        # Sort by slow_time if it exists
700        if 'slow_time' in ds.coords:
701            ds = ds.sortby('slow_time')
702
703        return ds

Load layer data from a CSARP_layer file (either HDF5 or MATLAB format).

Parameters
  • url (str): URL or path to the layer file
Returns
  • xr.Dataset: Layer data in a standardized format with coordinates:
    • slow_time: GPS time converted to datetime And data variables:
    • twtt: Two-way travel time for each layer
    • quality: Quality values for each layer
    • type: Type values for each layer
    • lat, lon, elev: Geographic coordinates
    • id: Layer IDs
def get_layers_db( self, flight: Union[xarray.core.dataset.Dataset, dict], include_geometry=True, raise_errors=True) -> dict:
783    def get_layers_db(self, flight: Union[xr.Dataset, dict], include_geometry=True, raise_errors=True) -> dict:
784        """
785        Fetch layer data from the OPS API
786
787        Parameters
788        ----------
789        flight : Union[xr.Dataset, dict]
790            The flight data, which can be an xarray Dataset or a dictionary.
791        include_geometry : bool, optional
792            If True, include geometry information in the returned layers.
793
794        Returns
795        -------
796        dict
797            A dictionary mapping layer IDs to their corresponding data.
798        """
799
800        collection, segment_path, _ = self._extract_segment_info(flight)
801
802        if 'Antarctica' in collection:
803            location = 'antarctic'
804        elif 'Greenland' in collection:
805            location = 'arctic'
806        else:
807            raise ValueError("Dataset does not belong to a recognized location (Antarctica or Greenland).")
808
809        layer_points = ops_api.get_layer_points(
810            segment_name=segment_path,
811            season_name=collection,
812            location=location,
813            include_geometry=include_geometry
814        )
815
816        if layer_points['status'] != 1:
817            if raise_errors:
818                raise ValueError(f"Failed to fetch layer points. Received response with status {layer_points['status']}.")
819            else:
820                return {}
821
822        layer_ds_raw = xr.Dataset(
823            {k: (['gps_time'], v) for k, v in layer_points['data'].items() if k != 'gps_time'},
824            coords={'gps_time': layer_points['data']['gps_time']}
825        )
826        # Split into a dictionary of layers based on lyr_id
827        layer_ids = set(layer_ds_raw['lyr_id'].to_numpy())
828        layer_ids = [int(layer_id) for layer_id in layer_ids if not np.isnan(layer_id)]
829
830        # Get the mapping of layer IDs to names
831        if self.db_layers_metadata is None:
832            layer_metadata = ops_api.get_layer_metadata()
833            if layer_metadata['status'] == 1:
834                df = pd.DataFrame(layer_metadata['data'])
835                df['display_name'] = df['lyr_group_name'] + ":" + df['lyr_name']
836                self.db_layers_metadata = df
837            else:
838                raise ValueError("Failed to fetch layer metadata from OPS API.")
839
840        layers = {}
841        for layer_id in layer_ids:
842            layer_name = self.db_layers_metadata.loc[self.db_layers_metadata['lyr_id'] == layer_id, 'display_name'].values[0]
843
844            l = layer_ds_raw.where(layer_ds_raw['lyr_id'] == layer_id, drop=True)
845
846            l = l.sortby('gps_time')
847
848            l = l.rename({'gps_time': 'slow_time'})
849            l = l.set_coords(['slow_time'])
850
851            l['slow_time'] = pd.to_datetime(l['slow_time'].values, unit='s')
852
853            # Filter to the same time range as flight
854            l = self._trim_to_bounds(l, flight)
855
856            layers[layer_name] = l
857
858        return layers

Fetch layer data from the OPS API

Parameters
  • flight (Union[xr.Dataset, dict]): The flight data, which can be an xarray Dataset or a dictionary.
  • include_geometry (bool, optional): If True, include geometry information in the returned layers.
Returns
  • dict: A dictionary mapping layer IDs to their corresponding data.
def get_layers( self, ds: xarray.core.dataset.Dataset, source: str = 'auto', include_geometry=True, raise_errors=True) -> dict:
860    def get_layers(self, ds : xr.Dataset, source: str = 'auto', include_geometry=True, raise_errors=True) -> dict:
861        """
862        Fetch layer data for a given flight dataset, either from CSARP_layer files or the OPS Database API.
863
864        Parameters
865        ----------
866        ds : xr.Dataset
867            The flight dataset containing attributes for collection and segment.
868        source : str, optional
869            The source to fetch layers from: 'auto', 'files', or 'db' (default is 'auto').
870        include_geometry : bool, optional
871            If True, include geometry information when fetching from the API.\
872        raise_errors : bool, optional
873            If True, raise errors when layers cannot be found.
874
875        Returns
876        -------
877        dict
878            A dictionary mapping layer IDs to their corresponding data.
879        """
880
881        if source == 'auto':
882            # Try to get layers from files first
883            try:
884                layers = self.get_layers_files(ds, raise_errors=True)
885                return layers
886            except:
887                # Fallback to API if no layers found in files
888                return self.get_layers_db(ds, include_geometry=include_geometry, raise_errors=raise_errors)
889        elif source == 'files':
890            return self.get_layers_files(ds, raise_errors=raise_errors)
891        elif source == 'db':
892            return self.get_layers_db(ds, include_geometry=include_geometry, raise_errors=raise_errors)
893        else:
894            raise ValueError("Invalid source specified. Must be one of: 'auto', 'files', 'db'.")

Fetch layer data for a given flight dataset, either from CSARP_layer files or the OPS Database API.

Parameters
  • ds (xr.Dataset): The flight dataset containing attributes for collection and segment.
  • source (str, optional): The source to fetch layers from: 'auto', 'files', or 'db' (default is 'auto').
  • include_geometry (bool, optional): If True, include geometry information when fetching from the API. raise_errors : bool, optional If True, raise errors when layers cannot be found.
Returns
  • dict: A dictionary mapping layer IDs to their corresponding data.