xopr.stac
STAC catalog creation utilities for Open Polar Radar data.
This module provides tools for generating STAC (SpatioTemporal Asset Catalog) metadata for OPR datasets, enabling spatial and temporal search capabilities across radar campaigns and data products.
1""" 2STAC catalog creation utilities for Open Polar Radar data. 3 4This module provides tools for generating STAC (SpatioTemporal Asset Catalog) 5metadata for OPR datasets, enabling spatial and temporal search capabilities 6across radar campaigns and data products. 7""" 8 9from .catalog import ( 10 build_catalog_from_parquet_metadata, 11 create_collection, 12 create_item, 13 create_items_from_flight_data, 14 export_collection_to_parquet, 15) 16from .config import load_config, save_config, validate_config 17from .geometry import ( 18 build_collection_extent_and_geometry, 19 simplify_geometry_polar_projection, 20) 21from .metadata import ( 22 collect_uniform_metadata, 23 discover_campaigns, 24 discover_flight_lines, 25 extract_item_metadata, 26) 27 28__all__ = [ 29 # Configuration 30 "load_config", 31 "save_config", 32 "validate_config", 33 # Catalog functions 34 "create_collection", 35 "create_item", 36 "create_items_from_flight_data", 37 "export_collection_to_parquet", 38 "build_catalog_from_parquet_metadata", 39 # Metadata functions 40 "extract_item_metadata", 41 "discover_campaigns", 42 "discover_flight_lines", 43 "collect_uniform_metadata", 44 # Geometry functions 45 "build_collection_extent_and_geometry", 46 "simplify_geometry_polar_projection" 47]
22def load_config( 23 config_path: Union[str, Path], 24 overrides: Optional[List[str]] = None, 25 environment: Optional[str] = None 26) -> DictConfig: 27 """ 28 Load configuration from YAML file with optional overrides. 29 30 Parameters 31 ---------- 32 config_path : Union[str, Path] 33 Path to YAML configuration file 34 overrides : List[str], optional 35 Command-line overrides in dot notation 36 Example: ["data.primary_product=CSARP_qlook", "processing.n_workers=8"] 37 environment : str, optional 38 Environment name to apply (e.g., "production", "test", "development") 39 40 Returns 41 ------- 42 DictConfig 43 Configuration object with dot-notation access 44 45 Examples 46 -------- 47 >>> conf = load_config("config/catalog.yaml") 48 >>> print(conf.data.primary_product) 49 'CSARP_standard' 50 51 >>> conf = load_config( 52 ... "config/catalog.yaml", 53 ... overrides=["processing.n_workers=16"], 54 ... environment="production" 55 ... ) 56 """ 57 config_path = Path(config_path) 58 if not config_path.exists(): 59 raise FileNotFoundError(f"Configuration file not found: {config_path}") 60 61 # Load user configuration directly 62 conf = OmegaConf.load(config_path) 63 64 # Apply environment-specific overrides if specified 65 if environment and "environments" in conf: 66 if environment in conf.environments: 67 logging.info(f"Applying environment: {environment}") 68 env_conf = conf.environments[environment] 69 conf = OmegaConf.merge(conf, env_conf) 70 else: 71 logging.warning(f"Environment '{environment}' not found in config") 72 73 # Apply command-line overrides 74 if overrides: 75 logging.debug(f"Applying overrides: {overrides}") 76 override_conf = OmegaConf.from_dotlist(overrides) 77 conf = OmegaConf.merge(conf, override_conf) 78 79 # Resolve all interpolations (${...} references) 80 OmegaConf.resolve(conf) 81 82 # Remove environments section from runtime config (no longer needed) 83 if "environments" in conf: 84 del conf["environments"] 85 86 return conf
Load configuration from YAML file with optional overrides.
Parameters
- config_path (Union[str, Path]): Path to YAML configuration file
- overrides (List[str], optional): Command-line overrides in dot notation Example: ["data.primary_product=CSARP_qlook", "processing.n_workers=8"]
- environment (str, optional): Environment name to apply (e.g., "production", "test", "development")
Returns
- DictConfig: Configuration object with dot-notation access
Examples
>>> conf = load_config("config/catalog.yaml")
>>> print(conf.data.primary_product)
'CSARP_standard'
>>> conf = load_config(
... "config/catalog.yaml",
... overrides=["processing.n_workers=16"],
... environment="production"
... )
89def save_config(conf: DictConfig, output_path: Union[str, Path], add_metadata: bool = True): 90 """ 91 Save configuration to file for reproducibility. 92 93 Parameters 94 ---------- 95 conf : DictConfig 96 Configuration to save 97 output_path : Union[str, Path] 98 Where to save the configuration 99 add_metadata : bool 100 Whether to add generation metadata 101 """ 102 output_path = Path(output_path) 103 output_path.parent.mkdir(parents=True, exist_ok=True) 104 105 if add_metadata: 106 # Add metadata about when/where this was generated 107 from datetime import datetime 108 save_conf = OmegaConf.create({ 109 "_metadata": { 110 "generated_at": datetime.now().isoformat(), 111 "working_directory": os.getcwd(), 112 }, 113 **OmegaConf.to_container(conf) 114 }) 115 else: 116 save_conf = conf 117 118 OmegaConf.save(save_conf, output_path) 119 logging.info(f"Configuration saved to: {output_path}")
Save configuration to file for reproducibility.
Parameters
- conf (DictConfig): Configuration to save
- output_path (Union[str, Path]): Where to save the configuration
- add_metadata (bool): Whether to add generation metadata
122def validate_config(conf: DictConfig) -> bool: 123 """ 124 Basic validation of required configuration fields. 125 126 Parameters 127 ---------- 128 conf : DictConfig 129 Configuration to validate 130 131 Returns 132 ------- 133 bool 134 True if valid, raises ValueError if not 135 """ 136 required_fields = [ 137 "data.root", 138 "data.primary_product", 139 "output.path", 140 "output.catalog_id", 141 "output.catalog_description", 142 ] 143 144 for field in required_fields: 145 if OmegaConf.select(conf, field) is None: 146 raise ValueError(f"Required configuration field missing: {field}") 147 148 # Validate n_workers is positive 149 if conf.processing.n_workers <= 0: 150 raise ValueError(f"Invalid n_workers: {conf.processing.n_workers}. Must be positive") 151 152 # Validate paths exist 153 data_root = Path(conf.data.root) 154 if not data_root.exists(): 155 raise ValueError(f"Data root does not exist: {data_root}") 156 157 return True
Basic validation of required configuration fields.
Parameters
- conf (DictConfig): Configuration to validate
Returns
- bool: True if valid, raises ValueError if not
25def create_collection( 26 collection_id: str, 27 description: str, 28 extent: pystac.Extent, 29 license: str = "various", 30 stac_extensions: Optional[List[str]] = None 31) -> pystac.Collection: 32 """ 33 Create a STAC collection for a campaign or data product grouping. 34 35 Parameters 36 ---------- 37 collection_id : str 38 Unique identifier for the collection. 39 description : str 40 Human-readable description of the collection. 41 extent : pystac.Extent 42 Spatial and temporal extent of the collection. 43 license : str, default "" 44 Data license identifier. 45 stac_extensions : list of str, optional 46 List of STAC extension URLs to enable. If None, defaults to 47 empty list. 48 49 Returns 50 ------- 51 pystac.Collection 52 Collection object. 53 54 Examples 55 -------- 56 >>> from datetime import datetime 57 >>> import pystac 58 >>> extent = pystac.Extent( 59 ... spatial=pystac.SpatialExtent([[-180, -90, 180, 90]]), 60 ... temporal=pystac.TemporalExtent([[datetime(2016, 1, 1), datetime(2016, 12, 31)]]) 61 ... ) 62 >>> collection = create_collection("2016_campaign", "2016 Antarctic flights", extent) 63 >>> item = create_item("item_001", geometry, bbox, datetime.now()) 64 >>> collection.add_item(item) 65 """ 66 if stac_extensions is None: 67 stac_extensions = [] 68 69 collection = pystac.Collection( 70 id=collection_id, 71 description=description, 72 extent=extent, 73 license=license, 74 stac_extensions=stac_extensions 75 ) 76 77 return collection
Create a STAC collection for a campaign or data product grouping.
Parameters
- collection_id (str): Unique identifier for the collection.
- description (str): Human-readable description of the collection.
- extent (pystac.Extent): Spatial and temporal extent of the collection.
- license (str, default ""): Data license identifier.
- stac_extensions (list of str, optional): List of STAC extension URLs to enable. If None, defaults to empty list.
Returns
- pystac.Collection: Collection object.
Examples
>>> from datetime import datetime
>>> import pystac
>>> extent = pystac.Extent(
... spatial=pystac.SpatialExtent([[-180, -90, 180, 90]]),
... temporal=pystac.TemporalExtent([[datetime(2016, 1, 1), datetime(2016, 12, 31)]])
... )
>>> collection = create_collection("2016_campaign", "2016 Antarctic flights", extent)
>>> item = create_item("item_001", geometry, bbox, datetime.now())
>>> collection.add_item(item)
80def create_item( 81 item_id: str, 82 geometry: Dict[str, Any], 83 bbox: List[float], 84 datetime: Any, 85 properties: Optional[Dict[str, Any]] = None, 86 assets: Optional[Dict[str, pystac.Asset]] = None, 87 stac_extensions: Optional[List[str]] = None 88) -> pystac.Item: 89 """ 90 Create a STAC item for a flight line data segment. 91 92 Parameters 93 ---------- 94 item_id : str 95 Unique identifier for the item. 96 geometry : dict 97 GeoJSON geometry object. 98 bbox : list of float 99 Bounding box coordinates [xmin, ymin, xmax, ymax]. 100 datetime : datetime 101 Acquisition datetime. 102 properties : dict, optional 103 Additional metadata properties. If None, defaults to empty dict. 104 assets : dict of str to pystac.Asset, optional 105 Dictionary of assets (data files, thumbnails, etc.). Keys are 106 asset names, values are pystac.Asset objects. 107 stac_extensions : list of str, optional 108 List of STAC extension URLs to enable. If None, defaults to 109 file extension. 110 111 Returns 112 ------- 113 pystac.Item 114 Item object with specified properties and assets. 115 116 Examples 117 -------- 118 >>> from datetime import datetime 119 >>> import pystac 120 >>> geometry = {"type": "Point", "coordinates": [-71.0, 42.0]} 121 >>> bbox = [-71.1, 41.9, -70.9, 42.1] 122 >>> props = {"instrument": "radar", "platform": "aircraft"} 123 >>> assets = { 124 ... "data": pystac.Asset(href="https://example.com/data.mat", media_type="application/octet-stream") 125 ... } 126 >>> item = create_item("flight_001", geometry, bbox, datetime.now(), props, assets) 127 """ 128 if properties is None: 129 properties = {} 130 if stac_extensions is None: 131 stac_extensions = ['https://stac-extensions.github.io/file/v2.1.0/schema.json'] 132 133 item = pystac.Item( 134 id=item_id, 135 geometry=geometry, 136 bbox=bbox, 137 datetime=datetime, 138 properties=properties, 139 stac_extensions=stac_extensions 140 ) 141 142 if assets: 143 for key, asset in assets.items(): 144 item.add_asset(key, asset) 145 146 return item
Create a STAC item for a flight line data segment.
Parameters
- item_id (str): Unique identifier for the item.
- geometry (dict): GeoJSON geometry object.
- bbox (list of float): Bounding box coordinates [xmin, ymin, xmax, ymax].
- datetime (datetime): Acquisition datetime.
- properties (dict, optional): Additional metadata properties. If None, defaults to empty dict.
- assets (dict of str to pystac.Asset, optional): Dictionary of assets (data files, thumbnails, etc.). Keys are asset names, values are pystac.Asset objects.
- stac_extensions (list of str, optional): List of STAC extension URLs to enable. If None, defaults to file extension.
Returns
- pystac.Item: Item object with specified properties and assets.
Examples
>>> from datetime import datetime
>>> import pystac
>>> geometry = {"type": "Point", "coordinates": [-71.0, 42.0]}
>>> bbox = [-71.1, 41.9, -70.9, 42.1]
>>> props = {"instrument": "radar", "platform": "aircraft"}
>>> assets = {
... "data": pystac.Asset(href="https://example.com/data.mat", media_type="application/octet-stream")
... }
>>> item = create_item("flight_001", geometry, bbox, datetime.now(), props, assets)
149def create_items_from_flight_data( 150 flight_data: Dict[str, Any], 151 config: DictConfig, 152 base_url: str = "https://data.cresis.ku.edu/data/rds/", 153 campaign_name: str = "", 154 primary_data_product: str = "CSARP_standard", 155 verbose: bool = False, 156 error_log_file: Optional[Union[str, Path]] = None 157) -> List[pystac.Item]: 158 """ 159 Create STAC items from flight line data. 160 161 Parameters 162 ---------- 163 flight_data : dict 164 Flight metadata from discover_flight_lines(). Expected to contain 165 'flight_id' and 'data_files' keys. 166 config : DictConfig 167 Configuration object with geometry.tolerance setting for simplification. 168 base_url : str, default "https://data.cresis.ku.edu/data/rds/" 169 Base URL for constructing asset hrefs. 170 campaign_name : str, default "" 171 Campaign name for URL construction. 172 primary_data_product : str, default "CSARP_standard" 173 Data product name to use as primary data source. 174 verbose : bool, default False 175 If True, print details for each item being processed. 176 error_log_file : str or Path, optional 177 Path to file where metadata extraction errors will be logged. 178 If None, errors are printed to stdout (default behavior). 179 180 Returns 181 ------- 182 list of pystac.Item 183 List of STAC Item objects, one per MAT file in the flight data. 184 Each item contains geometry, temporal information, and asset links. 185 """ 186 items = [] 187 flight_id = flight_data['flight_id'] 188 189 primary_data_files = flight_data['data_files'][primary_data_product].values() 190 191 for data_file_path in primary_data_files: 192 data_path = Path(data_file_path) 193 194 try: 195 # Extract metadata from MAT file only (no CSV needed) 196 metadata = extract_item_metadata(data_path, conf=config) 197 except Exception as e: 198 error_msg = f"Failed to extract metadata for {data_path}: {e}" 199 200 if error_log_file is not None: 201 # Log to file 202 with open(error_log_file, 'a', encoding='utf-8') as f: 203 f.write(f"{error_msg}\n") 204 else: 205 # Fallback to print (current behavior) 206 print(f"Warning: {error_msg}") 207 208 continue 209 210 item_id = f"{data_path.stem}" 211 212 # Simplify geometry using config tolerance 213 simplified_geom = simplify_geometry_polar_projection( 214 metadata['geom'], 215 simplify_tolerance=config.geometry.tolerance 216 ) 217 geometry = mapping(simplified_geom) 218 bbox = list(metadata['bbox'].bounds) 219 datetime = metadata['date'] 220 221 rel_mat_path = f"{campaign_name}/{primary_data_product}/{flight_id}/{data_path.name}" 222 data_href = base_url + rel_mat_path 223 224 # Extract frame number from MAT filename (e.g., "Data_20161014_03_001.mat" -> "001") 225 frame_match = re.search(r'_(\d+)\.mat$', data_path.name) 226 frame = frame_match.group(1) 227 228 # Extract date and segment number from flight_id (e.g., "20161014_03" -> "20161014", "03") 229 # Split on underscore to avoid assuming fixed lengths 230 parts = flight_id.split('_') 231 date_part = parts[0] # YYYYMMDD 232 segment_num_str = parts[1] # Segment number as string (formerly flight number) 233 234 # Create OPR-specific properties 235 properties = { 236 'opr:date': date_part, 237 'opr:segment': int(segment_num_str), # Changed from opr:flight 238 'opr:frame': int(frame) # Changed from opr:segment 239 } 240 241 # Add scientific extension properties if available 242 item_stac_extensions = ['https://stac-extensions.github.io/file/v2.1.0/schema.json'] 243 244 # Map metadata keys to property names 245 meta_mapping = { 246 'doi': 'sci:doi', 247 'citation': 'sci:citation', 248 'frequency': 'opr:frequency', 249 'bandwidth': 'opr:bandwidth' 250 } 251 252 for key, prop in meta_mapping.items(): 253 if metadata.get(key) is not None: 254 properties[prop] = metadata[key] 255 256 if any(metadata.get(k) is not None for k in ['doi', 'citation']): 257 item_stac_extensions.append('https://stac-extensions.github.io/scientific/v1.0.0/schema.json') 258 259 assets = {} 260 261 for data_product_type in flight_data['data_files'].keys(): 262 if data_path.name in flight_data['data_files'][data_product_type]: 263 product_path = flight_data['data_files'][data_product_type][data_path.name] 264 file_type = metadata.get('mimetype') # get_mat_file_type(product_path) 265 if verbose: 266 print(f"[{file_type}] {product_path}") 267 assets[data_product_type] = pystac.Asset( 268 href=base_url + f"{campaign_name}/{data_product_type}/{flight_id}/{data_path.name}", 269 media_type=file_type 270 ) 271 if data_product_type == primary_data_product: 272 assets['data'] = assets[data_product_type] 273 274 thumb_href = base_url + f"{campaign_name}/images/{flight_id}/{flight_id}_{frame}_2echo_picks.jpg" 275 assets['thumbnails'] = pystac.Asset( 276 href=thumb_href, 277 media_type=pystac.MediaType.JPEG 278 ) 279 280 flight_path_href = base_url + f"{campaign_name}/images/{flight_id}/{flight_id}_{frame}_0maps.jpg" 281 assets['flight_path'] = pystac.Asset( 282 href=flight_path_href, 283 media_type=pystac.MediaType.JPEG 284 ) 285 286 item = create_item( 287 item_id=item_id, 288 geometry=geometry, 289 bbox=bbox, 290 datetime=datetime, 291 properties=properties, 292 assets=assets, 293 stac_extensions=item_stac_extensions 294 ) 295 296 items.append(item) 297 298 return items
Create STAC items from flight line data.
Parameters
- flight_data (dict): Flight metadata from discover_flight_lines(). Expected to contain 'flight_id' and 'data_files' keys.
- config (DictConfig): Configuration object with geometry.tolerance setting for simplification.
- base_url : str, default "https (//data.cresis.ku.edu/data/rds/"): Base URL for constructing asset hrefs.
- campaign_name (str, default ""): Campaign name for URL construction.
- primary_data_product (str, default "CSARP_standard"): Data product name to use as primary data source.
- verbose (bool, default False): If True, print details for each item being processed.
- error_log_file (str or Path, optional): Path to file where metadata extraction errors will be logged. If None, errors are printed to stdout (default behavior).
Returns
- list of pystac.Item: List of STAC Item objects, one per MAT file in the flight data. Each item contains geometry, temporal information, and asset links.
365def export_collection_to_parquet( 366 collection: pystac.Collection, 367 config: DictConfig, 368 provider: str = None, 369 hemisphere: str = None 370) -> Optional[Path]: 371 """ 372 Export a single STAC collection to a parquet file with collection metadata. 373 374 This function directly converts STAC items to GeoParquet format without 375 intermediate NDJSON, and includes the collection metadata in the Parquet 376 file metadata as per the STAC GeoParquet specification. 377 378 Parameters 379 ---------- 380 collection : pystac.Collection 381 STAC collection to export 382 config : DictConfig 383 Configuration object with output.path and logging.verbose settings 384 provider : str, optional 385 Data provider from config (awi, cresis, dtu, utig) 386 hemisphere : str, optional 387 Hemisphere ('north' or 'south'). If not provided, will attempt to detect. 388 389 Returns 390 ------- 391 Path or None 392 Path to the created parquet file, or None if no items to export 393 394 Examples 395 -------- 396 >>> from omegaconf import OmegaConf 397 >>> config = OmegaConf.create({'output': {'path': './output'}, 'logging': {'verbose': True}}) 398 >>> parquet_path = export_collection_to_parquet(collection, config, provider='cresis') 399 >>> print(f"Exported to {parquet_path}") 400 """ 401 # Extract settings from config 402 output_dir = Path(config.output.path) 403 verbose = config.logging.get('verbose', False) 404 405 # Get items from collection and subcollections 406 collection_items = list(collection.get_items()) 407 if not collection_items: 408 for child_collection in collection.get_collections(): 409 collection_items.extend(list(child_collection.get_items())) 410 411 if not collection_items: 412 if verbose: 413 print(f" Skipping {collection.id}: no items") 414 return None 415 416 # Determine hemisphere if not provided 417 if hemisphere is None: 418 # Try from collection name first 419 hemisphere = determine_hemisphere_from_name(collection.id) 420 421 # Fall back to geometry-based detection 422 if hemisphere is None: 423 hemisphere = determine_hemisphere_from_geometry(collection_items) 424 if verbose and hemisphere: 425 print(f" Detected hemisphere from geometry: {hemisphere}") 426 427 # Get provider from config if not provided 428 if provider is None: 429 provider = config.data.get('provider') 430 431 if verbose: 432 if hemisphere: 433 print(f" Hemisphere: {hemisphere}") 434 else: 435 print(f" WARNING: Could not determine hemisphere for {collection.id}") 436 if provider: 437 print(f" Provider: {provider}") 438 else: 439 print(f" WARNING: No provider specified for {collection.id}") 440 441 # Ensure output directory exists 442 output_dir.mkdir(parents=True, exist_ok=True) 443 444 # Export to parquet 445 parquet_file = output_dir / f"{collection.id}.parquet" 446 447 if verbose: 448 print(f" Exporting collection: {collection.id} ({len(collection_items)} items)") 449 450 # Build collections metadata - single collection in this case 451 collection_dict = collection.to_dict() 452 453 # Add OPR metadata to collection 454 props = collection_dict.setdefault('properties', {}) 455 if hemisphere: 456 props['opr:hemisphere'] = hemisphere 457 if provider: 458 props['opr:provider'] = provider 459 460 # Clean collection links - remove item links with None hrefs 461 if 'links' in collection_dict: 462 collection_dict['links'] = [ 463 link for link in collection_dict['links'] 464 if not (link.get('rel') == 'item' and link.get('href') is None) 465 ] 466 collections_dict = { 467 collection.id: collection_dict 468 } 469 470 # Clean items and add xopr metadata before export 471 clean_items = [] 472 for item in collection_items: 473 item_dict = item.to_dict() 474 475 # Add OPR metadata to each item 476 if 'properties' not in item_dict: 477 item_dict['properties'] = {} 478 if hemisphere: 479 item_dict['properties']['opr:hemisphere'] = hemisphere 480 if provider: 481 item_dict['properties']['opr:provider'] = provider 482 483 # Clean links with None hrefs 484 if 'links' in item_dict: 485 item_dict['links'] = [ 486 link for link in item_dict['links'] 487 if link.get('href') is not None 488 ] 489 clean_items.append(item_dict) 490 491 # Convert items to Arrow format 492 record_batch_reader = stac_geoparquet.arrow.parse_stac_items_to_arrow(clean_items) 493 494 # Write to Parquet with collection metadata 495 # Note: Using collection_metadata for compatibility with stac-geoparquet 0.7.0 496 # In newer versions (>0.8), this should be 'collections' parameter 497 stac_geoparquet.arrow.to_parquet( 498 table=record_batch_reader, 499 output_path=parquet_file, 500 collection_metadata=collection_dict, # Single collection metadata (cleaned) 501 schema_version="1.1.0", # Use latest schema version 502 compression="snappy", # Use snappy compression for better performance 503 write_statistics=True # Write column statistics for query optimization 504 ) 505 506 if verbose: 507 size_kb = parquet_file.stat().st_size / 1024 508 print(f" ✅ {collection.id}.parquet saved ({size_kb:.1f} KB)") 509 510 return parquet_file
Export a single STAC collection to a parquet file with collection metadata.
This function directly converts STAC items to GeoParquet format without intermediate NDJSON, and includes the collection metadata in the Parquet file metadata as per the STAC GeoParquet specification.
Parameters
- collection (pystac.Collection): STAC collection to export
- config (DictConfig): Configuration object with output.path and logging.verbose settings
- provider (str, optional): Data provider from config (awi, cresis, dtu, utig)
- hemisphere (str, optional): Hemisphere ('north' or 'south'). If not provided, will attempt to detect.
Returns
- Path or None: Path to the created parquet file, or None if no items to export
Examples
>>> from omegaconf import OmegaConf
>>> config = OmegaConf.create({'output': {'path': './output'}, 'logging': {'verbose': True}})
>>> parquet_path = export_collection_to_parquet(collection, config, provider='cresis')
>>> print(f"Exported to {parquet_path}")
513def build_catalog_from_parquet_metadata( 514 parquet_paths: List[Path], 515 output_file: Path, 516 catalog_id: str = "OPR", 517 catalog_description: str = "Open Polar Radar airborne data", 518 verbose: bool = False 519) -> None: 520 """ 521 Build a catalog.json file from parquet files by reading their metadata. 522 523 This function reads collection metadata from parquet files and creates a 524 catalog.json file with proper links to the parquet files. Unlike 525 export_collections_metadata which expects STAC collections as input, 526 this function works directly with parquet files. 527 528 Parameters 529 ---------- 530 parquet_paths : List[Path] 531 List of paths to parquet files containing STAC collections 532 output_file : Path 533 Output path for the catalog.json file 534 catalog_id : str, optional 535 Catalog ID, by default "OPR" 536 catalog_description : str, optional 537 Catalog description, by default "Open Polar Radar airborne data" 538 base_url : str, optional 539 Base URL for asset hrefs. If None, uses relative paths 540 verbose : bool, optional 541 If True, print progress messages 542 543 Examples 544 -------- 545 >>> import glob 546 >>> parquet_files = [Path(p) for p in glob.glob("output/*.parquet")] 547 >>> build_catalog_from_parquet_metadata( 548 ... parquet_files, Path("output/catalog.json"), verbose=True 549 ... ) 550 """ 551 if verbose: 552 print(f"Building catalog from {len(parquet_paths)} parquet files") 553 554 collections_data = [] 555 556 for parquet_path in parquet_paths: 557 if not parquet_path.exists(): 558 if verbose: 559 print(f" ⚠️ Skipping non-existent file: {parquet_path}") 560 continue 561 562 try: 563 # Read parquet metadata without loading the data 564 parquet_metadata = pq.read_metadata(str(parquet_path)) 565 file_metadata = parquet_metadata.metadata 566 567 # Extract collection metadata from parquet file 568 collection_dict = None 569 if file_metadata: 570 # Try new format first (stac:collections) 571 if b'stac:collections' in file_metadata: 572 collections_json = file_metadata[b'stac:collections'].decode('utf-8') 573 collections_meta = json.loads(collections_json) 574 # Get the first (and should be only) collection 575 if collections_meta: 576 collection_id = list(collections_meta.keys())[0] 577 collection_dict = collections_meta[collection_id] 578 # Try legacy format used by stac-geoparquet 0.7.0 579 elif b'stac-geoparquet' in file_metadata: 580 geoparquet_meta = json.loads(file_metadata[b'stac-geoparquet'].decode('utf-8')) 581 if 'collection' in geoparquet_meta: 582 collection_dict = geoparquet_meta['collection'] 583 584 if not collection_dict: 585 if verbose: 586 print(f" ⚠️ No collection metadata found in {parquet_path.name}") 587 continue 588 589 # Extract relevant metadata for collections.json format 590 collection_id = collection_dict.get('id', parquet_path.stem) 591 592 # Build collection entry 593 collection_entry = { 594 'type': 'Collection', 595 'stac_version': collection_dict.get('stac_version', '1.1.0'), 596 'id': collection_id, 597 'description': collection_dict.get('description', f"Collection {collection_id}"), 598 'license': collection_dict.get('license', 'various'), 599 'extent': collection_dict.get('extent'), 600 'links': [], # Clear links as we'll build our own 601 'assets': { 602 'data': { 603 'href': f"./{parquet_path.name}", 604 'type': 'application/vnd.apache.parquet', 605 'title': 'Collection data in Apache Parquet format', 606 'roles': ['data'] 607 } 608 } 609 } 610 611 # Add optional fields if present 612 if 'title' in collection_dict: 613 collection_entry['title'] = collection_dict['title'] 614 615 # Add STAC extensions if present 616 if collection_dict.get('stac_extensions'): 617 collection_entry['stac_extensions'] = collection_dict['stac_extensions'] 618 619 # Add any extra fields that might be present (like sci:doi, etc) 620 for key in collection_dict: 621 if key.startswith('sci:') or key.startswith('sar:') or key.startswith('proj:'): 622 collection_entry[key] = collection_dict[key] 623 624 collections_data.append(collection_entry) 625 626 if verbose: 627 num_rows = parquet_metadata.num_rows 628 print(f" ✅ Added {collection_id} ({num_rows} items)") 629 630 except Exception as e: 631 if verbose: 632 print(f" ❌ Error reading {parquet_path.name}: {e}") 633 continue 634 635 if not collections_data: 636 raise ValueError("No valid collections found in parquet files") 637 638 # Sort collections by ID for consistent output 639 collections_data.sort(key=lambda x: x['id']) 640 641 # Build the catalog structure 642 catalog = { 643 'type': 'Catalog', 644 'id': catalog_id, 645 'stac_version': '1.1.0', 646 'description': catalog_description, 647 'links': [ 648 { 649 'rel': 'root', 650 'href': './catalog.json', 651 'type': 'application/json' 652 } 653 ] 654 } 655 656 # Add child links for each collection 657 for collection in collections_data: 658 catalog['links'].append({ 659 'rel': 'child', 660 'href': f"./{collection['id']}.parquet", 661 'type': 'application/vnd.apache.parquet', 662 'title': collection.get('title', collection['description']) 663 }) 664 665 # Determine common STAC extensions across all collections 666 all_extensions = set() 667 for collection in collections_data: 668 if 'stac_extensions' in collection: 669 all_extensions.update(collection['stac_extensions']) 670 671 if all_extensions: 672 catalog['stac_extensions'] = sorted(list(all_extensions)) 673 674 # Ensure output directory exists 675 output_file.parent.mkdir(parents=True, exist_ok=True) 676 677 # Write the catalog.json file 678 with open(output_file, 'w') as f: 679 json.dump(catalog, f, indent=2, separators=(",", ": ")) 680 681 if verbose: 682 print(f"\n✅ Catalog saved to {output_file}") 683 print(f" Contains {len(collections_data)} collections") 684 685 # Also save a collections.json file for compatibility 686 collections_file = output_file.parent / "collections.json" 687 with open(collections_file, 'w') as f: 688 json.dump(collections_data, f, indent=2, separators=(",", ": ")) 689 690 if verbose: 691 print(f"✅ Collections metadata saved to {collections_file}")
Build a catalog.json file from parquet files by reading their metadata.
This function reads collection metadata from parquet files and creates a catalog.json file with proper links to the parquet files. Unlike export_collections_metadata which expects STAC collections as input, this function works directly with parquet files.
Parameters
- parquet_paths (List[Path]): List of paths to parquet files containing STAC collections
- output_file (Path): Output path for the catalog.json file
- catalog_id (str, optional): Catalog ID, by default "OPR"
- catalog_description (str, optional): Catalog description, by default "Open Polar Radar airborne data"
- base_url (str, optional): Base URL for asset hrefs. If None, uses relative paths
- verbose (bool, optional): If True, print progress messages
Examples
>>> import glob
>>> parquet_files = [Path(p) for p in glob.glob("output/*.parquet")]
>>> build_catalog_from_parquet_metadata(
... parquet_files, Path("output/catalog.json"), verbose=True
... )
93def extract_item_metadata( 94 mat_file_path: Union[str, Path] = None, 95 dataset=None, 96 conf: Optional[DictConfig] = None 97) -> Dict[str, Any]: 98 """ 99 Extract metadata from MAT/HDF5 file with optional configuration. 100 101 Parameters 102 ---------- 103 mat_file_path : Union[str, Path], optional 104 Path or URL to MAT/HDF5 file 105 dataset : xarray.Dataset, optional 106 Pre-loaded dataset 107 conf : DictConfig, optional 108 Configuration for geometry simplification 109 110 Returns 111 ------- 112 Dict[str, Any] 113 Extracted metadata including geometry, bbox, date, etc. 114 """ 115 # Validate input 116 if (mat_file_path is None) == (dataset is None): 117 raise ValueError("Exactly one of mat_file_path or dataset must be provided") 118 119 should_close_dataset = False 120 121 if mat_file_path is not None: 122 if isinstance(mat_file_path, str): 123 file_path = Path(mat_file_path) 124 else: 125 file_path = mat_file_path 126 127 # Check existence for local files 128 if not str(mat_file_path).startswith(('http://', 'https://')): 129 if not file_path.exists(): 130 raise FileNotFoundError(f"MAT file not found: {file_path}") 131 132 opr = OPRConnection(cache_dir="radar_cache") 133 ds = opr.load_frame_url(str(mat_file_path)) 134 should_close_dataset = True 135 else: 136 ds = dataset 137 138 with warnings.catch_warnings(): 139 warnings.simplefilter("ignore", UserWarning) 140 date = pd.to_datetime(ds['slow_time'].mean().values).to_pydatetime() 141 142 # Create geometry 143 geom_series = gpd.GeoSeries(map(Point, zip(ds['Longitude'].values, ds['Latitude'].values))) 144 line = LineString(geom_series.tolist()) 145 146 # Apply simplification based on config 147 if conf and conf.get('geometry', {}).get('simplify', True): 148 tolerance = conf.geometry.get('tolerance', 100.0) 149 line = simplify_geometry_polar_projection(line, simplify_tolerance=tolerance) 150 151 bounds = shapely.bounds(line) 152 boundingbox = box(bounds[0], bounds[1], bounds[2], bounds[3]) 153 154 # Extract radar parameters with config fallback support 155 low_freq = None 156 high_freq = None 157 used_config_fallback = False 158 159 # Check if config provides radar frequency values 160 config_has_radar = ( 161 conf is not None 162 and conf.get('radar', {}).get('f0') is not None 163 and conf.get('radar', {}).get('f1') is not None 164 ) 165 config_override = conf is not None and conf.get('radar', {}).get('override', False) 166 167 if config_override and config_has_radar: 168 # Config override takes precedence 169 low_freq = float(conf.radar.f0) 170 high_freq = float(conf.radar.f1) 171 else: 172 # Try to extract from data first 173 try: 174 stable_wfs = extract_stable_wfs_params(find_radar_wfs_params(ds)) 175 if 'f0' in stable_wfs and 'f1' in stable_wfs: 176 low_freq_array = stable_wfs['f0'] 177 high_freq_array = stable_wfs['f1'] 178 179 unique_low_freq = np.unique(low_freq_array) 180 if len(unique_low_freq) != 1: 181 raise ValueError(f"Multiple low frequency values found: {unique_low_freq}") 182 low_freq = float(unique_low_freq[0]) 183 184 unique_high_freq = np.unique(high_freq_array) 185 if len(unique_high_freq) != 1: 186 raise ValueError(f"Multiple high frequency values found: {unique_high_freq}") 187 high_freq = float(unique_high_freq[0]) 188 except KeyError: 189 pass # Will try config fallback 190 191 # Config fallback if extraction failed 192 if (low_freq is None or high_freq is None) and config_has_radar: 193 low_freq = float(conf.radar.f0) 194 high_freq = float(conf.radar.f1) 195 used_config_fallback = True 196 197 # Error if neither source has values 198 if low_freq is None or high_freq is None: 199 raise ValueError( 200 "Radar frequency parameters (f0, f1) not found in data file " 201 "and not provided in config. Add radar.f0 and radar.f1 to your config." 202 ) 203 204 # Log warning if using fallback (only when verbose) 205 if used_config_fallback and conf and conf.get('logging', {}).get('verbose', False): 206 logging.warning(f"Using config fallback for radar frequencies: f0={low_freq}, f1={high_freq}") 207 208 bandwidth = float(np.abs(high_freq - low_freq)) 209 center_freq = float((low_freq + high_freq) / 2) 210 211 # Extract science metadata with config fallback support 212 doi = None 213 cite = None 214 used_sci_fallback = False 215 216 # Check if config provides sci metadata values 217 config_has_sci = conf is not None and conf.get('sci') is not None 218 sci_override = config_has_sci and conf.get('sci', {}).get('override', False) 219 220 if sci_override and config_has_sci: 221 # Config override takes precedence 222 doi = conf.sci.get('doi') 223 cite = conf.sci.get('citation') 224 else: 225 # Try to extract from data first 226 doi = ds.attrs.get('doi', None) 227 cite = ds.attrs.get('funder_text', None) 228 229 # Config fallback if extraction returned None 230 if config_has_sci: 231 if doi is None and conf.sci.get('doi') is not None: 232 doi = conf.sci.doi 233 used_sci_fallback = True 234 if cite is None and conf.sci.get('citation') is not None: 235 cite = conf.sci.citation 236 used_sci_fallback = True 237 238 # Log warning if using fallback (only when verbose) 239 if used_sci_fallback and conf and conf.get('logging', {}).get('verbose', False): 240 logging.warning(f"Using config fallback for sci metadata: doi={doi}, citation={cite}") 241 242 mime = ds.attrs['mimetype'] 243 244 if should_close_dataset: 245 ds.close() 246 247 return { 248 'geom': line, 249 'bbox': boundingbox, 250 'date': date, 251 'frequency': center_freq, 252 'bandwidth': bandwidth, 253 'doi': doi, 254 'citation': cite, 255 'mimetype': mime 256 }
Extract metadata from MAT/HDF5 file with optional configuration.
Parameters
- mat_file_path (Union[str, Path], optional): Path or URL to MAT/HDF5 file
- dataset (xarray.Dataset, optional): Pre-loaded dataset
- conf (DictConfig, optional): Configuration for geometry simplification
Returns
- Dict[str, Any]: Extracted metadata including geometry, bbox, date, etc.
259def discover_campaigns(data_root: Union[str, Path], conf: Optional[DictConfig] = None) -> List[Dict[str, str]]: 260 """ 261 Discover all campaigns in the data directory. 262 263 Parameters 264 ---------- 265 data_root : Union[str, Path] 266 Root directory containing campaign subdirectories 267 conf : DictConfig, optional 268 Configuration with optional filters 269 270 Returns 271 ------- 272 List[Dict[str, str]] 273 List of campaign metadata dictionaries 274 """ 275 campaign_pattern = re.compile(r'^(\d{4})_([^_]+)_([^_]+)$') 276 campaigns = [] 277 278 data_root = Path(data_root) 279 280 if not data_root.exists(): 281 raise FileNotFoundError(f"Data root directory not found: {data_root}") 282 283 for item in data_root.iterdir(): 284 if item.is_dir(): 285 match = campaign_pattern.match(item.name) 286 if match: 287 year, location, aircraft = match.groups() 288 289 # Apply filters if config provided 290 if conf and 'campaigns' in conf.data: 291 include = conf.data.campaigns.get('include', []) 292 exclude = conf.data.campaigns.get('exclude', []) 293 294 if include and item.name not in include: 295 continue 296 if exclude and item.name in exclude: 297 continue 298 299 campaigns.append({ 300 'name': item.name, 301 'year': year, 302 'location': location, 303 'aircraft': aircraft, 304 'path': str(item) 305 }) 306 307 return sorted(campaigns, key=lambda x: (x['year'], x['name']))
Discover all campaigns in the data directory.
Parameters
- data_root (Union[str, Path]): Root directory containing campaign subdirectories
- conf (DictConfig, optional): Configuration with optional filters
Returns
- List[Dict[str, str]]: List of campaign metadata dictionaries
27def discover_flight_lines(campaign_path: Union[str, Path], conf: DictConfig) -> List[Dict[str, Any]]: 28 """ 29 Discover flight lines for a campaign using configuration. 30 31 Parameters 32 ---------- 33 campaign_path : Union[str, Path] 34 Path to campaign directory 35 conf : DictConfig 36 Configuration object with data.primary_product and data.extra_products 37 38 Returns 39 ------- 40 List[Dict[str, Any]] 41 List of flight line metadata dictionaries 42 """ 43 campaign_path = Path(campaign_path) 44 45 # Get products from config 46 primary_product = conf.data.primary_product 47 extra_products = conf.data.get('extra_products', []) or [] 48 49 product_path = campaign_path / primary_product 50 51 if not product_path.exists(): 52 raise FileNotFoundError(f"Data product directory not found: {product_path}") 53 54 flight_pattern = re.compile(r'^(\d{8}_\d+)$') 55 flights = [] 56 57 for flight_dir in product_path.iterdir(): 58 if flight_dir.is_dir(): 59 match = flight_pattern.match(flight_dir.name) 60 if match: 61 flight_id = match.group(1) 62 parts = flight_id.split('_') 63 date_part = parts[0] 64 flight_num = parts[1] 65 66 # Collect data files for primary product 67 data_files = { 68 primary_product: { 69 f.name: str(f) for f in flight_dir.glob("*.mat") 70 if "_img" not in f.name 71 } 72 } 73 74 # Include extra data products if they exist 75 for extra_product in extra_products: 76 extra_product_path = campaign_path / extra_product / flight_dir.name 77 if extra_product_path.exists(): 78 data_files[extra_product] = { 79 f.name: str(f) for f in extra_product_path.glob("*.mat") 80 } 81 82 if data_files: 83 flights.append({ 84 'flight_id': flight_id, 85 'date': date_part, 86 'flight_num': flight_num, 87 'data_files': data_files 88 }) 89 90 return sorted(flights, key=lambda x: x['flight_id'])
Discover flight lines for a campaign using configuration.
Parameters
- campaign_path (Union[str, Path]): Path to campaign directory
- conf (DictConfig): Configuration object with data.primary_product and data.extra_products
Returns
- List[Dict[str, Any]]: List of flight line metadata dictionaries
353def collect_uniform_metadata(items: List, property_keys: List[str]) -> tuple[List[str], dict]: 354 """ 355 Collect metadata properties that have uniform values across items. 356 357 Parameters 358 ---------- 359 items : List[pystac.Item] 360 List of STAC items to extract metadata from 361 property_keys : List[str] 362 List of property keys to check 363 364 Returns 365 ------- 366 tuple 367 (extensions_needed, extra_fields_dict) 368 """ 369 SCI_EXT = 'https://stac-extensions.github.io/scientific/v1.0.0/schema.json' 370 SAR_EXT = 'https://stac-extensions.github.io/sar/v1.3.0/schema.json' 371 372 extensions = [] 373 extra_fields = {} 374 375 property_mappings = { 376 'sci:doi': SCI_EXT, 377 'sci:citation': SCI_EXT, 378 'sar:center_frequency': SAR_EXT, 379 'sar:bandwidth': SAR_EXT 380 } 381 382 for key in property_keys: 383 values = [ 384 item.properties.get(key) 385 for item in items 386 if item.properties.get(key) is not None 387 ] 388 389 if values and len(np.unique(values)) == 1: 390 ext = property_mappings.get(key) 391 if ext and ext not in extensions: 392 extensions.append(ext) 393 extra_fields[key] = values[0] 394 395 return extensions, extra_fields
Collect metadata properties that have uniform values across items.
Parameters
- items (List[pystac.Item]): List of STAC items to extract metadata from
- property_keys (List[str]): List of property keys to check
Returns
- tuple: (extensions_needed, extra_fields_dict)
68def build_collection_extent_and_geometry( 69 items: List[pystac.Item] 70) -> pystac.Extent: 71 """ 72 Calculate spatial and temporal extent from a list of items. 73 74 Parameters 75 ---------- 76 items : list of pystac.Item 77 List of STAC items to compute extent from. 78 79 Returns 80 ------- 81 pystac.Extent 82 Combined spatial and temporal extent covering all input items. 83 84 Raises 85 ------ 86 ValueError 87 If items list is empty. 88 """ 89 if not items: 90 raise ValueError("Cannot build extent from empty item list") 91 92 # Build extent using bboxes 93 bboxes = [] 94 datetimes = [] 95 96 for item in items: 97 if item.bbox: 98 bbox_geom = shapely.geometry.box(*item.bbox) 99 bboxes.append(bbox_geom) 100 101 if item.datetime: 102 datetimes.append(item.datetime) 103 104 if bboxes: 105 union_bbox = bboxes[0] 106 for bbox in bboxes[1:]: 107 union_bbox = union_bbox.union(bbox) 108 109 collection_bbox = list(union_bbox.bounds) 110 spatial_extent = pystac.SpatialExtent(bboxes=[collection_bbox]) 111 else: 112 spatial_extent = pystac.SpatialExtent(bboxes=[[-180, -90, 180, 90]]) 113 114 if datetimes: 115 sorted_times = sorted(datetimes) 116 temporal_extent = pystac.TemporalExtent( 117 intervals=[[sorted_times[0], sorted_times[-1]]] 118 ) 119 else: 120 temporal_extent = pystac.TemporalExtent(intervals=[[None, None]]) 121 122 extent = pystac.Extent(spatial=spatial_extent, temporal=temporal_extent) 123 return extent
Calculate spatial and temporal extent from a list of items.
Parameters
- items (list of pystac.Item): List of STAC items to compute extent from.
Returns
- pystac.Extent: Combined spatial and temporal extent covering all input items.
Raises
- ValueError: If items list is empty.
17def simplify_geometry_polar_projection( 18 geometry: shapely.geometry.base.BaseGeometry, 19 simplify_tolerance: float = 100.0 20) -> shapely.geometry.base.BaseGeometry: 21 """ 22 Simplify geometry using appropriate polar stereographic projection. 23 24 Parameters 25 ---------- 26 geometry : shapely.geometry.base.BaseGeometry 27 Input geometry in WGS84 coordinates 28 simplify_tolerance : float, default 100.0 29 Tolerance for shapely.simplify() in meters (used in polar projection) 30 31 Returns 32 ------- 33 shapely.geometry.base.BaseGeometry 34 Simplified geometry in WGS84 coordinates 35 """ 36 if not geometry or not geometry.is_valid: 37 return geometry 38 39 # Determine appropriate polar projection based on geometry centroid 40 centroid = geometry.centroid 41 lat = centroid.y 42 43 if lat < 0: 44 # Antarctic/South Polar Stereographic 45 target_epsg = 3031 46 else: 47 # Arctic/North Polar Stereographic 48 target_epsg = 3413 49 50 # Set up coordinate transformations 51 wgs84 = pyproj.CRS('EPSG:4326') 52 polar_proj = pyproj.CRS(f'EPSG:{target_epsg}') 53 54 # Transform to polar projection 55 transformer_to_polar = pyproj.Transformer.from_crs(wgs84, polar_proj, always_xy=True) 56 transformer_to_wgs84 = pyproj.Transformer.from_crs(polar_proj, wgs84, always_xy=True) 57 58 # Project to polar coordinates 59 projected_geom = transform(transformer_to_polar.transform, geometry) 60 61 # Simplify in projected coordinates (tolerance in meters) 62 simplified_geom = projected_geom.simplify(simplify_tolerance, preserve_topology=True) 63 64 # Transform back to WGS84 65 return transform(transformer_to_wgs84.transform, simplified_geom)
Simplify geometry using appropriate polar stereographic projection.
Parameters
- geometry (shapely.geometry.base.BaseGeometry): Input geometry in WGS84 coordinates
- simplify_tolerance (float, default 100.0): Tolerance for shapely.simplify() in meters (used in polar projection)
Returns
- shapely.geometry.base.BaseGeometry: Simplified geometry in WGS84 coordinates