xopr.bedmap

Bedmap data integration module for xopr.

This module provides functionality to:

  • Convert bedmap CSV files to cloud-optimized GeoParquet format
  • Create GeoParquet STAC catalogs for bedmap data discovery
  • Query and retrieve bedmap data efficiently
 1"""
 2Bedmap data integration module for xopr.
 3
 4This module provides functionality to:
 5- Convert bedmap CSV files to cloud-optimized GeoParquet format
 6- Create GeoParquet STAC catalogs for bedmap data discovery
 7- Query and retrieve bedmap data efficiently
 8"""
 9
10from .catalog import (
11    build_bedmap_geoparquet_catalog,
12    read_parquet_metadata,
13)
14from .converter import (
15    batch_convert_bedmap,
16    convert_bedmap_csv,
17    parse_bedmap_metadata,
18)
19from .geometry import (
20    _transformer_from_polar,
21    # Expose transformers for direct use
22    _transformer_to_polar,
23    calculate_bbox,
24    calculate_haversine_distances,
25    check_intersects_polar,
26    extract_flight_lines,
27    get_polar_bounds,
28    simplify_multiline_geometry,
29)
30from .query import (
31    fetch_bedmap,
32    get_bedmap_cache_path,
33    query_bedmap,
34    query_bedmap_catalog,
35    query_bedmap_local,
36)
37
38__all__ = [
39    # Converter functions
40    'convert_bedmap_csv',
41    'batch_convert_bedmap',
42    'parse_bedmap_metadata',
43    # Geometry functions
44    'extract_flight_lines',
45    'calculate_haversine_distances',
46    'simplify_multiline_geometry',
47    'calculate_bbox',
48    'get_polar_bounds',
49    'check_intersects_polar',
50    '_transformer_to_polar',
51    '_transformer_from_polar',
52    # Catalog functions
53    'read_parquet_metadata',
54    'build_bedmap_geoparquet_catalog',
55    # Query functions
56    'query_bedmap',
57    'query_bedmap_catalog',
58    'query_bedmap_local',
59    'fetch_bedmap',
60    'get_bedmap_cache_path',
61]
62
63__version__ = '0.1.0'
def convert_bedmap_csv( csv_path: Union[str, pathlib.Path], output_dir: Union[str, pathlib.Path], simplify_tolerance_km: float = 10.0) -> Dict:
420def convert_bedmap_csv(
421    csv_path: Union[str, Path],
422    output_dir: Union[str, Path],
423    simplify_tolerance_km: float = 10.0,
424) -> Dict:
425    """
426    Convert a single bedmap CSV file to cloud-optimized GeoParquet format.
427
428    Creates GeoParquet files with WKB-encoded Point geometry and zstd compression.
429    For large files (>600k rows), applies Hilbert curve sorting with 50k row groups
430    to optimize spatial queries.
431
432    Parameters
433    ----------
434    csv_path : str or Path
435        Path to the input CSV file
436    output_dir : str or Path
437        Directory for output GeoParquet file
438    simplify_tolerance_km : float
439        Tolerance for geometry simplification in kilometers (uses polar
440        stereographic projection to avoid distortion near poles)
441
442    Returns
443    -------
444    dict
445        Dictionary containing metadata and bounds information
446    """
447    csv_path = Path(csv_path)
448    output_dir = Path(output_dir)
449    output_dir.mkdir(parents=True, exist_ok=True)
450
451    total_start = time.time()
452    print(f"Converting {csv_path.name}...")
453
454    # Parse metadata from header
455    metadata = parse_bedmap_metadata(csv_path)
456
457    # Read CSV data using pyarrow for better performance on large files
458    t0 = time.time()
459    df = _read_csv_fast(csv_path)
460    print(f"  Read {len(df):,} rows ({time.time() - t0:.1f}s)")
461
462    # Convert trajectory_id to string (it may be numeric in some files)
463    df['trajectory_id'] = df['trajectory_id'].astype(str)
464
465    # Replace -9999 with NaN for numeric columns (but not trajectory_id)
466    numeric_columns = df.select_dtypes(include=[np.number]).columns
467    for col in numeric_columns:
468        df[col] = df[col].replace(-9999, np.nan)
469
470    # Normalize longitude from 0-360 to -180 to 180 convention
471    df['longitude (degree_east)'] = _normalize_longitude(
472        df['longitude (degree_east)'].values
473    )
474
475    # Handle date/time conversion
476    df['timestamp'] = create_timestamps(df, metadata)
477
478    # Add source_file column (without extension)
479    source_name = csv_path.stem  # Gets filename without extension
480    df['source_file'] = source_name
481
482    # Extract flight line geometry for metadata
483    multiline_geom = extract_flight_lines(df)
484
485    if multiline_geom is not None:
486        simplified_geom = simplify_multiline_geometry(
487            multiline_geom,
488            tolerance_km=simplify_tolerance_km
489        )
490    else:
491        simplified_geom = None
492        warnings.warn(f"Could not extract flight lines from {csv_path.name}")
493
494    # Calculate spatial bounds
495    bbox = calculate_bbox(df)
496
497    # Extract temporal extent
498    temporal_start, temporal_end = extract_temporal_extent(df, metadata)
499
500    # Convert simplified geometry to WKB hex for storage
501    flight_line_wkb = None
502    if simplified_geom is not None:
503        flight_line_wkb = simplified_geom.wkb_hex
504
505    # Prepare metadata dictionary (will be stored in parquet schema)
506    file_metadata = {
507        'source_csv': csv_path.name,
508        'bedmap_version': _extract_bedmap_version(csv_path.name),
509        'spatial_bounds': {
510            'bbox': list(bbox) if bbox else None,  # Convert tuple to list for JSON
511        },
512        'temporal_bounds': {
513            'start': temporal_start.isoformat() if temporal_start else None,
514            'end': temporal_end.isoformat() if temporal_end else None,
515        },
516        'row_count': len(df),
517        'original_metadata': metadata,
518        'flight_line_wkb': flight_line_wkb,  # WKB hex string for STAC catalog
519    }
520
521    # Create Point geometry from lon/lat columns (vectorized for performance)
522    t0 = time.time()
523    geometry = gpd.points_from_xy(
524        df['longitude (degree_east)'],
525        df['latitude (degree_north)']
526    )
527
528    # Create GeoDataFrame with WKB Point geometry
529    gdf = gpd.GeoDataFrame(
530        df.drop(columns=['longitude (degree_east)', 'latitude (degree_north)',
531                        'date', 'time_UTC'], errors='ignore'),
532        geometry=geometry,
533        crs='EPSG:4326'
534    )
535    print(f"  Created GeoDataFrame ({time.time() - t0:.1f}s)")
536
537    # Determine if Hilbert sorting is needed based on row count
538    row_count = len(gdf)
539    use_hilbert = row_count > HILBERT_ROW_THRESHOLD
540
541    if use_hilbert:
542        print(f"  Applying Hilbert sorting ({row_count:,} rows > {HILBERT_ROW_THRESHOLD:,} threshold)")
543        gdf = _apply_hilbert_sorting(gdf)
544        row_group_size = HILBERT_ROW_GROUP_SIZE
545    else:
546        row_group_size = None  # Use default (single row group)
547
548    # Prepare output path
549    output_path = output_dir / f"{source_name}.parquet"
550
551    # Write as GeoParquet with zstd compression and bedmap metadata
552    t0 = time.time()
553    _write_geoparquet_with_metadata(
554        gdf=gdf,
555        output_path=output_path,
556        bedmap_metadata=file_metadata,
557        compression='zstd',
558        row_group_size=row_group_size
559    )
560    print(f"  Written to {output_path} ({time.time() - t0:.1f}s)")
561
562    print(f"  Rows: {row_count:,}")
563    if use_hilbert:
564        print(f"  Row groups: {row_group_size:,} rows each")
565    if bbox:
566        print(f"  Spatial extent: {bbox[0]:.2f}, {bbox[1]:.2f} to {bbox[2]:.2f}, {bbox[3]:.2f}")
567    if temporal_start and temporal_end:
568        print(f"  Temporal extent: {temporal_start.date()} to {temporal_end.date()}")
569
570    total_time = time.time() - total_start
571    print(f"  Total conversion time: {total_time:.1f}s")
572
573    return file_metadata

Convert a single bedmap CSV file to cloud-optimized GeoParquet format.

Creates GeoParquet files with WKB-encoded Point geometry and zstd compression. For large files (>600k rows), applies Hilbert curve sorting with 50k row groups to optimize spatial queries.

Parameters
  • csv_path (str or Path): Path to the input CSV file
  • output_dir (str or Path): Directory for output GeoParquet file
  • simplify_tolerance_km (float): Tolerance for geometry simplification in kilometers (uses polar stereographic projection to avoid distortion near poles)
Returns
  • dict: Dictionary containing metadata and bounds information
def batch_convert_bedmap( input_dir: Union[str, pathlib.Path], output_dir: Union[str, pathlib.Path], pattern: str = '*.csv', parallel: bool = False, n_workers: int = 4) -> List[Dict]:
600def batch_convert_bedmap(
601    input_dir: Union[str, Path],
602    output_dir: Union[str, Path],
603    pattern: str = "*.csv",
604    parallel: bool = False,
605    n_workers: int = 4
606) -> List[Dict]:
607    """
608    Batch convert multiple bedmap CSV files to GeoParquet.
609
610    Parameters
611    ----------
612    input_dir : str or Path
613        Directory containing input CSV files
614    output_dir : str or Path
615        Directory for output GeoParquet files
616    pattern : str
617        Glob pattern for CSV files
618    parallel : bool
619        Whether to process files in parallel
620    n_workers : int
621        Number of parallel workers
622
623    Returns
624    -------
625    list
626        List of metadata dictionaries for all converted files
627    """
628    input_dir = Path(input_dir)
629    output_dir = Path(output_dir)
630
631    # Find all CSV files
632    csv_files = sorted(input_dir.glob(pattern))
633    print(f"Found {len(csv_files)} CSV files to convert")
634
635    if not csv_files:
636        warnings.warn(f"No CSV files found matching pattern '{pattern}' in {input_dir}")
637        return []
638
639    metadata_list = []
640
641    if parallel and len(csv_files) > 1:
642        with ProcessPoolExecutor(max_workers=n_workers) as executor:
643            futures = {
644                executor.submit(convert_bedmap_csv, csv_file, output_dir): csv_file
645                for csv_file in csv_files
646            }
647
648            for future in tqdm(as_completed(futures), total=len(csv_files),
649                               desc="Converting files"):
650                try:
651                    metadata = future.result()
652                    metadata_list.append(metadata)
653                except Exception as e:
654                    csv_file = futures[future]
655                    print(f"Error processing {csv_file.name}: {e}")
656
657    else:
658        # Sequential processing
659        for csv_file in tqdm(csv_files, desc="Converting files"):
660            try:
661                metadata = convert_bedmap_csv(csv_file, output_dir)
662                metadata_list.append(metadata)
663            except Exception as e:
664                print(f"Error processing {csv_file.name}: {e}")
665
666    print(f"\nSuccessfully converted {len(metadata_list)} files")
667    return metadata_list

Batch convert multiple bedmap CSV files to GeoParquet.

Parameters
  • input_dir (str or Path): Directory containing input CSV files
  • output_dir (str or Path): Directory for output GeoParquet files
  • pattern (str): Glob pattern for CSV files
  • parallel (bool): Whether to process files in parallel
  • n_workers (int): Number of parallel workers
Returns
  • list: List of metadata dictionaries for all converted files
def parse_bedmap_metadata(csv_path: Union[str, pathlib.Path]) -> Dict:
150def parse_bedmap_metadata(csv_path: Union[str, Path]) -> Dict:
151    """
152    Parse metadata from bedmap CSV header lines.
153
154    Parameters
155    ----------
156    csv_path : str or Path
157        Path to the bedmap CSV file
158
159    Returns
160    -------
161    dict
162        Dictionary containing parsed metadata fields
163    """
164    metadata = {}
165
166    with open(csv_path, 'r') as f:
167        for line in f:
168            if not line.startswith('#'):
169                break
170
171            # Remove '#' and whitespace
172            line = line[1:].strip()
173
174            # Skip empty lines
175            if not line:
176                continue
177
178            # Parse key-value pairs
179            if ':' in line:
180                key, value = line.split(':', 1)
181                key = key.strip().replace(' ', '_').replace('-', '_')
182                value = value.strip()
183
184                # Parse numeric values where appropriate
185                if key in ['time_coverage_start', 'time_coverage_end']:
186                    try:
187                        metadata[key] = int(value)
188                    except ValueError:
189                        metadata[key] = value
190                elif key in ['electromagnetic_wave_speed_in_ice', 'firn_correction', 'centre_frequency']:
191                    # Extract numeric value and unit
192                    parts = value.split('(')
193                    if len(parts) > 0:
194                        try:
195                            metadata[key] = float(parts[0].strip())
196                            if len(parts) > 1:
197                                metadata[f"{key}_unit"] = parts[1].rstrip(')')
198                        except ValueError:
199                            metadata[key] = value
200                else:
201                    metadata[key] = value
202
203    return metadata

Parse metadata from bedmap CSV header lines.

Parameters
  • csv_path (str or Path): Path to the bedmap CSV file
Returns
  • dict: Dictionary containing parsed metadata fields
def extract_flight_lines( df: pandas.core.frame.DataFrame, lon_col: str = 'longitude (degree_east)', lat_col: str = 'latitude (degree_north)', distance_threshold_km: float = 10.0, min_points_per_segment: int = 2) -> Optional[shapely.geometry.multilinestring.MultiLineString]:
 60def extract_flight_lines(
 61    df: pd.DataFrame,
 62    lon_col: str = 'longitude (degree_east)',
 63    lat_col: str = 'latitude (degree_north)',
 64    distance_threshold_km: float = 10.0,
 65    min_points_per_segment: int = 2
 66) -> Optional[MultiLineString]:
 67    """
 68    Extract flight line segments from point data.
 69
 70    Breaks lines when consecutive points are more than threshold distance apart.
 71
 72    Parameters
 73    ----------
 74    df : pd.DataFrame
 75        DataFrame containing latitude and longitude columns
 76    lon_col : str
 77        Name of longitude column
 78    lat_col : str
 79        Name of latitude column
 80    distance_threshold_km : float
 81        Distance threshold in kilometers for line segmentation
 82    min_points_per_segment : int
 83        Minimum points required to create a line segment
 84
 85    Returns
 86    -------
 87    MultiLineString or None
 88        Multiline geometry representing flight paths, or None if no valid segments
 89    """
 90    valid_coords = df[[lat_col, lon_col]].dropna()
 91
 92    if len(valid_coords) < min_points_per_segment:
 93        return None
 94
 95    coords_array = valid_coords[[lat_col, lon_col]].values
 96    distances = calculate_haversine_distances(coords_array, Unit.KILOMETERS)
 97    breakpoints = np.where(distances > distance_threshold_km)[0] + 1
 98    breakpoints = np.concatenate([[0], breakpoints, [len(coords_array)]])
 99
100    segments = []
101    for i in range(len(breakpoints) - 1):
102        start_idx = breakpoints[i]
103        end_idx = breakpoints[i + 1]
104
105        if end_idx - start_idx >= min_points_per_segment:
106            segment_coords = coords_array[start_idx:end_idx]
107            segment_coords_lonlat = segment_coords[:, [1, 0]]  # swap to lon, lat
108
109            try:
110                line = LineString(segment_coords_lonlat)
111                if line.is_valid and not line.is_empty:
112                    segments.append(line)
113            except Exception as e:
114                print(f"Warning: Could not create line segment: {e}")
115                continue
116
117    if not segments:
118        return None
119
120    return MultiLineString(segments)

Extract flight line segments from point data.

Breaks lines when consecutive points are more than threshold distance apart.

Parameters
  • df (pd.DataFrame): DataFrame containing latitude and longitude columns
  • lon_col (str): Name of longitude column
  • lat_col (str): Name of latitude column
  • distance_threshold_km (float): Distance threshold in kilometers for line segmentation
  • min_points_per_segment (int): Minimum points required to create a line segment
Returns
  • MultiLineString or None: Multiline geometry representing flight paths, or None if no valid segments
def calculate_haversine_distances( lat_lon_array: numpy.ndarray, unit: haversine.haversine.Unit = <Unit.KILOMETERS: 'km'>) -> numpy.ndarray:
31def calculate_haversine_distances(
32    lat_lon_array: np.ndarray,
33    unit: Unit = Unit.KILOMETERS
34) -> np.ndarray:
35    """
36    Calculate haversine distances between consecutive points.
37
38    Parameters
39    ----------
40    lat_lon_array : np.ndarray
41        Array of shape (n, 2) with latitude in column 0, longitude in column 1
42    unit : haversine.Unit
43        Unit for distance calculation (default: KILOMETERS)
44
45    Returns
46    -------
47    np.ndarray
48        Array of distances between consecutive points (length n-1)
49    """
50    if len(lat_lon_array) < 2:
51        return np.array([])
52
53    coords = lat_lon_array
54    coords_shifted = np.roll(coords, -1, axis=0)
55    distances = haversine_vector(coords[:-1], coords_shifted[:-1], unit)
56
57    return distances

Calculate haversine distances between consecutive points.

Parameters
  • lat_lon_array (np.ndarray): Array of shape (n, 2) with latitude in column 0, longitude in column 1
  • unit (haversine.Unit): Unit for distance calculation (default: KILOMETERS)
Returns
  • np.ndarray: Array of distances between consecutive points (length n-1)
def simplify_multiline_geometry( geometry: shapely.geometry.multilinestring.MultiLineString, tolerance_km: float = 10.0, preserve_topology: bool = False) -> shapely.geometry.multilinestring.MultiLineString:
123def simplify_multiline_geometry(
124    geometry: MultiLineString,
125    tolerance_km: float = 10.0,
126    preserve_topology: bool = False
127) -> MultiLineString:
128    """
129    Simplify multiline geometry using polar projection.
130
131    Parameters
132    ----------
133    geometry : MultiLineString
134        Input multiline geometry to simplify (WGS84)
135    tolerance_km : float
136        Simplification tolerance in kilometers
137    preserve_topology : bool
138        Whether to preserve topology during simplification
139
140    Returns
141    -------
142    MultiLineString
143        Simplified multiline geometry (WGS84)
144    """
145    if geometry is None or geometry.is_empty:
146        return geometry
147
148    tolerance_m = tolerance_km * 1000
149
150    simplified_lines = []
151    for line in geometry.geoms:
152        coords = np.array(line.coords)
153        if len(coords) < 2:
154            continue
155
156        # Transform to polar, simplify, transform back
157        lons, lats = coords[:, 0], coords[:, 1]
158        x, y = _transformer_to_polar.transform(lons, lats)
159
160        polar_line = LineString(zip(x, y))
161        simplified_polar = polar_line.simplify(tolerance_m, preserve_topology=preserve_topology)
162
163        if simplified_polar.is_empty:
164            continue
165
166        polar_coords = np.array(simplified_polar.coords)
167        px, py = polar_coords[:, 0], polar_coords[:, 1]
168        result_lons, result_lats = _transformer_from_polar.transform(px, py)
169
170        result_line = LineString(zip(result_lons, result_lats))
171        if result_line.is_valid and not result_line.is_empty:
172            simplified_lines.append(result_line)
173
174    if not simplified_lines:
175        return None
176
177    return MultiLineString(simplified_lines)

Simplify multiline geometry using polar projection.

Parameters
  • geometry (MultiLineString): Input multiline geometry to simplify (WGS84)
  • tolerance_km (float): Simplification tolerance in kilometers
  • preserve_topology (bool): Whether to preserve topology during simplification
Returns
  • MultiLineString: Simplified multiline geometry (WGS84)
def calculate_bbox( df: pandas.core.frame.DataFrame, lon_col: str = 'longitude (degree_east)', lat_col: str = 'latitude (degree_north)') -> Tuple[float, float, float, float]:
180def calculate_bbox(
181    df: pd.DataFrame,
182    lon_col: str = 'longitude (degree_east)',
183    lat_col: str = 'latitude (degree_north)'
184) -> Tuple[float, float, float, float]:
185    """
186    Calculate bounding box from coordinate data.
187
188    Parameters
189    ----------
190    df : pd.DataFrame
191        DataFrame containing coordinate columns
192    lon_col : str
193        Name of longitude column
194    lat_col : str
195        Name of latitude column
196
197    Returns
198    -------
199    tuple
200        Bounding box as (min_lon, min_lat, max_lon, max_lat)
201    """
202    valid_coords = df[[lon_col, lat_col]].dropna()
203
204    if valid_coords.empty:
205        return None
206
207    return (
208        valid_coords[lon_col].min(),
209        valid_coords[lat_col].min(),
210        valid_coords[lon_col].max(),
211        valid_coords[lat_col].max()
212    )

Calculate bounding box from coordinate data.

Parameters
  • df (pd.DataFrame): DataFrame containing coordinate columns
  • lon_col (str): Name of longitude column
  • lat_col (str): Name of latitude column
Returns
  • tuple: Bounding box as (min_lon, min_lat, max_lon, max_lat)
def get_polar_bounds( geometry: shapely.geometry.base.BaseGeometry) -> Tuple[float, float, float, float]:
215def get_polar_bounds(
216    geometry: shapely.geometry.base.BaseGeometry
217) -> Tuple[float, float, float, float]:
218    """
219    Get bounds of a WGS84 geometry in Antarctic Polar Stereographic (EPSG:3031).
220
221    Useful for spatial queries because polar bounds don't have antimeridian issues.
222    """
223    if geometry is None or geometry.is_empty:
224        return None
225
226    polar_geom = shapely_transform(_transformer_to_polar.transform, geometry)
227    return polar_geom.bounds

Get bounds of a WGS84 geometry in Antarctic Polar Stereographic (EPSG:3031).

Useful for spatial queries because polar bounds don't have antimeridian issues.

def check_intersects_polar( geometry1: shapely.geometry.base.BaseGeometry, geometry2: shapely.geometry.base.BaseGeometry) -> bool:
230def check_intersects_polar(
231    geometry1: shapely.geometry.base.BaseGeometry,
232    geometry2: shapely.geometry.base.BaseGeometry
233) -> bool:
234    """Check if two WGS84 geometries intersect using polar projection."""
235    if geometry1 is None or geometry2 is None:
236        return False
237    if geometry1.is_empty or geometry2.is_empty:
238        return False
239
240    polar1 = shapely_transform(_transformer_to_polar.transform, geometry1)
241    polar2 = shapely_transform(_transformer_to_polar.transform, geometry2)
242
243    return polar1.intersects(polar2)

Check if two WGS84 geometries intersect using polar projection.

_transformer_to_polar = <Concatenated Operation Transformer: pipeline> Description: axis order change (2D) + Antarctic Polar Stereographic Area of Use: - name: World - bounds: (-180.0, -90.0, 180.0, 90.0)
_transformer_from_polar = <Concatenated Operation Transformer: pipeline> Description: Inverse of Antarctic Polar Stereographic + axis order change (2D) Area of Use: - name: World - bounds: (-180.0, -90.0, 180.0, 90.0)
def read_parquet_metadata(parquet_path: Union[str, pathlib.Path]) -> Dict:
19def read_parquet_metadata(parquet_path: Union[str, Path]) -> Dict:
20    """
21    Read bedmap metadata from a GeoParquet file's schema metadata.
22
23    The metadata includes the flight line geometry as WKB hex string,
24    spatial bounds, temporal bounds, and other file metadata.
25
26    Parameters
27    ----------
28    parquet_path : str or Path
29        Path to the GeoParquet file
30
31    Returns
32    -------
33    dict
34        Metadata dictionary from the parquet file, with 'flight_line_geometry'
35        key containing the deserialized shapely geometry object
36    """
37    parquet_file = pq.ParquetFile(parquet_path)
38
39    # Get metadata from parquet schema
40    schema_metadata = parquet_file.schema_arrow.metadata or {}
41    metadata_bytes = schema_metadata.get(b'bedmap_metadata')
42
43    if not metadata_bytes:
44        warnings.warn(f"No bedmap_metadata found in {parquet_path}")
45        return {}
46
47    metadata = json.loads(metadata_bytes.decode())
48
49    # Convert WKB hex string back to shapely geometry
50    wkb_hex = metadata.get('flight_line_wkb')
51    if wkb_hex:
52        try:
53            metadata['flight_line_geometry'] = wkb.loads(wkb_hex, hex=True)
54        except Exception as e:
55            warnings.warn(f"Could not parse WKB geometry: {e}")
56            metadata['flight_line_geometry'] = None
57    else:
58        metadata['flight_line_geometry'] = None
59
60    return metadata

Read bedmap metadata from a GeoParquet file's schema metadata.

The metadata includes the flight line geometry as WKB hex string, spatial bounds, temporal bounds, and other file metadata.

Parameters
  • parquet_path (str or Path): Path to the GeoParquet file
Returns
  • dict: Metadata dictionary from the parquet file, with 'flight_line_geometry' key containing the deserialized shapely geometry object
def build_bedmap_geoparquet_catalog( parquet_dir: Union[str, pathlib.Path], output_dir: Union[str, pathlib.Path], base_href: str = 'gs://opr_stac/bedmap/data/', simplify_tolerance: float = 0.01) -> Dict[str, geopandas.geodataframe.GeoDataFrame]:
 63def build_bedmap_geoparquet_catalog(
 64    parquet_dir: Union[str, Path],
 65    output_dir: Union[str, Path],
 66    base_href: str = 'gs://opr_stac/bedmap/data/',
 67    simplify_tolerance: float = 0.01
 68) -> Dict[str, gpd.GeoDataFrame]:
 69    """
 70    Build GeoParquet STAC catalogs from bedmap data files.
 71
 72    Creates separate GeoParquet catalog files per bedmap version:
 73    - bedmap1.parquet (BM1 items)
 74    - bedmap2.parquet (BM2 items)
 75    - bedmap3.parquet (BM3 items)
 76
 77    Each catalog file contains one row per data file with:
 78    - WKB geometry column (flight line geometry for map display)
 79    - Metadata columns for STAC queries
 80    - asset_href pointing to the data parquet file
 81
 82    Parameters
 83    ----------
 84    parquet_dir : str or Path
 85        Directory containing bedmap parquet data files
 86    output_dir : str or Path
 87        Output directory for catalog GeoParquet files
 88    base_href : str
 89        Base URL for data assets (where data files are hosted)
 90    simplify_tolerance : float
 91        Tolerance for geometry simplification in degrees
 92
 93    Returns
 94    -------
 95    dict
 96        Dictionary mapping version names to GeoDataFrames
 97    """
 98    parquet_dir = Path(parquet_dir)
 99    output_dir = Path(output_dir)
100    output_dir.mkdir(parents=True, exist_ok=True)
101
102    parquet_files = sorted(parquet_dir.glob('*.parquet'))
103    print(f"Building GeoParquet catalogs from {len(parquet_files)} files")
104
105    # Group features by bedmap version
106    version_features = {
107        'BM1': [],
108        'BM2': [],
109        'BM3': [],
110    }
111
112    for parquet_file in parquet_files:
113        print(f"  Processing {parquet_file.name}...")
114
115        # Read metadata (will extract from file contents if not embedded)
116        try:
117            metadata = read_parquet_metadata(parquet_file)
118        except Exception as e:
119            print(f"    Warning: Failed to read metadata: {e}")
120            continue
121
122        if not metadata:
123            print(f"    Warning: No metadata found, skipping")
124            continue
125
126        # Get geometry - may be object or WKT string
127        geometry = metadata.get('flight_line_geometry')
128        spatial_bounds = metadata.get('spatial_bounds', {})
129        bbox = spatial_bounds.get('bbox')
130
131        if geometry is None:
132            # Try loading from WKT
133            geometry_wkt = spatial_bounds.get('geometry')
134            if geometry_wkt:
135                try:
136                    geometry = wkt.loads(geometry_wkt)
137                except Exception as e:
138                    print(f"    Warning: Could not parse geometry: {e}")
139                    geometry = None
140
141        # Simplify geometry for visualization
142        if geometry is not None:
143            geometry = geometry.simplify(simplify_tolerance, preserve_topology=True)
144
145        if geometry is None:
146            print(f"    Warning: No geometry, skipping")
147            continue
148
149        # Extract metadata fields
150        original_metadata = metadata.get('original_metadata', {})
151        bedmap_version = metadata.get('bedmap_version', 'unknown')
152        temporal_bounds = metadata.get('temporal_bounds', {})
153
154        # Build asset href
155        asset_href = base_href.rstrip('/') + '/' + parquet_file.name
156
157        # Create feature dict matching what polar.html expects
158        feature = {
159            'geometry': geometry,
160            'id': parquet_file.stem,
161            'collection': f'bedmap{bedmap_version[-1]}' if bedmap_version.startswith('BM') else 'bedmap',
162            'name': parquet_file.stem,
163            'description': f"{original_metadata.get('project', '')} - {original_metadata.get('institution', '')}".strip(' -'),
164            # Additional metadata for queries
165            'asset_href': asset_href,
166            'bedmap_version': bedmap_version,
167            'row_count': metadata.get('row_count', 0),
168            'institution': original_metadata.get('institution', ''),
169            'project': original_metadata.get('project', ''),
170            'bbox_minx': bbox[0] if bbox else None,
171            'bbox_miny': bbox[1] if bbox else None,
172            'bbox_maxx': bbox[2] if bbox else None,
173            'bbox_maxy': bbox[3] if bbox else None,
174            'temporal_start': temporal_bounds.get('start'),
175            'temporal_end': temporal_bounds.get('end'),
176        }
177
178        # Add to appropriate version group
179        if bedmap_version in version_features:
180            version_features[bedmap_version].append(feature)
181        else:
182            print(f"    Warning: Unknown version {bedmap_version}, skipping")
183
184    # Create and write GeoParquet catalogs per version
185    catalogs = {}
186    for version, features in version_features.items():
187        if not features:
188            continue
189
190        # Create GeoDataFrame
191        gdf = gpd.GeoDataFrame(features, crs='EPSG:4326')
192
193        # Output filename: bedmap1.parquet, bedmap2.parquet, bedmap3.parquet
194        version_num = version[-1]  # Extract '1', '2', or '3' from 'BM1', 'BM2', 'BM3'
195        output_path = output_dir / f'bedmap{version_num}.parquet'
196
197        # Write to GeoParquet
198        gdf.to_parquet(output_path)
199
200        print(f"  Created {output_path.name}: {len(gdf)} items")
201        catalogs[version] = gdf
202
203    print(f"\nGeoParquet catalogs written to {output_dir}")
204    return catalogs

Build GeoParquet STAC catalogs from bedmap data files.

Creates separate GeoParquet catalog files per bedmap version:

  • bedmap1.parquet (BM1 items)
  • bedmap2.parquet (BM2 items)
  • bedmap3.parquet (BM3 items)

Each catalog file contains one row per data file with:

  • WKB geometry column (flight line geometry for map display)
  • Metadata columns for STAC queries
  • asset_href pointing to the data parquet file
Parameters
  • parquet_dir (str or Path): Directory containing bedmap parquet data files
  • output_dir (str or Path): Output directory for catalog GeoParquet files
  • base_href (str): Base URL for data assets (where data files are hosted)
  • simplify_tolerance (float): Tolerance for geometry simplification in degrees
Returns
  • dict: Dictionary mapping version names to GeoDataFrames
def query_bedmap( collections: Optional[List[str]] = None, geometry: Optional[shapely.geometry.base.BaseGeometry] = None, date_range: Optional[Tuple[datetime.datetime, datetime.datetime]] = None, properties: Optional[Dict] = None, max_rows: Optional[int] = None, columns: Optional[List[str]] = None, exclude_geometry: bool = True, catalog_path: str = 'local', local_cache: bool = False, data_dir: Union[str, pathlib.Path, NoneType] = None, show_progress: bool = False) -> geopandas.geodataframe.GeoDataFrame:
500def query_bedmap(
501    collections: Optional[List[str]] = None,
502    geometry: Optional[shapely.geometry.base.BaseGeometry] = None,
503    date_range: Optional[Tuple[datetime, datetime]] = None,
504    properties: Optional[Dict] = None,
505    max_rows: Optional[int] = None,
506    columns: Optional[List[str]] = None,
507    exclude_geometry: bool = True,
508    catalog_path: str = 'local',
509    local_cache: bool = False,
510    data_dir: Optional[Union[str, Path]] = None,
511    show_progress: bool = False,
512) -> gpd.GeoDataFrame:
513    """
514    Query bedmap data from GeoParquet catalogs and return filtered data.
515
516    This function:
517    1. Uses rustac to query the STAC GeoParquet catalogs for matching items
518    2. Uses DuckDB for efficient partial reads with spatial/temporal filtering
519    3. Returns a GeoDataFrame with the requested data
520
521    Parameters
522    ----------
523    collections : list of str, optional
524        Filter by bedmap version (e.g., ['bedmap1', 'bedmap2', 'bedmap3'])
525    geometry : shapely geometry, optional
526        Spatial filter geometry
527    date_range : tuple of datetime, optional
528        Temporal filter (start_date, end_date)
529    properties : dict, optional
530        Additional property filters (e.g., {'institution': 'AWI'})
531    max_rows : int, optional
532        Maximum number of rows to return
533    columns : list of str, optional
534        Specific columns to retrieve
535    exclude_geometry : bool, default True
536        If True, don't create geometry column (keeps lat/lon as separate columns)
537    catalog_path : str, default 'local'
538        Catalog source: 'local' (cached, downloaded on first use),
539        'cloud' (direct from GCS), or a custom path/URL pattern.
540    local_cache : bool, default False
541        If True, use locally cached bedmap data files for faster queries.
542        Files will be downloaded automatically if not present.
543        Recommended for repeated queries in loops.
544    data_dir : str or Path, optional
545        Directory for local data cache. Defaults to radar_cache/bedmap/
546    show_progress : bool, default False
547        If True, show DuckDB progress bar during queries. Disabled by default
548        to avoid IOPub rate limit errors in Jupyter notebooks.
549
550    Returns
551    -------
552    geopandas.GeoDataFrame
553        GeoDataFrame with bedmap data
554
555    Examples
556    --------
557    >>> from shapely.geometry import box
558    >>> bbox = box(-20, -75, -10, -70)
559    >>> df = query_bedmap(
560    ...     geometry=bbox,
561    ...     date_range=(datetime(1994, 1, 1), datetime(1995, 12, 31)),
562    ...     collections=['bedmap2']
563    ... )
564
565    >>> # Use local cache for faster repeated queries
566    >>> for bbox in many_bboxes:
567    ...     df = query_bedmap(geometry=bbox, local_cache=True)
568    """
569    # If using local cache, bypass STAC catalog and query local files directly
570    if local_cache:
571        return _query_bedmap_cached(
572            collections=collections,
573            geometry=geometry,
574            date_range=date_range,
575            columns=columns,
576            max_rows=max_rows,
577            exclude_geometry=exclude_geometry,
578            data_dir=data_dir,
579            show_progress=show_progress,
580        )
581
582    # Query catalog for matching items using rustac
583    catalog_items = query_bedmap_catalog(
584        collections=collections,
585        geometry=geometry,
586        date_range=date_range,
587        properties=properties,
588        catalog_path=catalog_path,
589    )
590
591    if catalog_items.empty:
592        warnings.warn("No matching bedmap items found in catalog")
593        return gpd.GeoDataFrame()
594
595    print(f"Found {len(catalog_items)} matching files in catalog")
596
597    # Get data file URLs from the catalog items
598    # The rustac STAC GeoParquet format stores asset_href in the properties dict
599    parquet_urls = []
600    for idx, item in catalog_items.iterrows():
601        # Primary: check properties dict for asset_href
602        if 'properties' in item and isinstance(item['properties'], dict):
603            props = item['properties']
604            if 'asset_href' in props and props['asset_href']:
605                parquet_urls.append(props['asset_href'])
606                continue
607
608        # Fallback: check for assets dict (standard STAC structure)
609        if 'assets' in item and item['assets']:
610            assets = item['assets']
611            if isinstance(assets, dict):
612                for asset_key, asset_info in assets.items():
613                    if isinstance(asset_info, dict) and 'href' in asset_info:
614                        parquet_urls.append(asset_info['href'])
615                        break
616                    elif isinstance(asset_info, str):
617                        parquet_urls.append(asset_info)
618                        break
619
620        # Final fallback: check for asset_href column directly
621        if 'asset_href' in catalog_items.columns:
622            if item.get('asset_href'):
623                parquet_urls.append(item['asset_href'])
624
625    if not parquet_urls:
626        warnings.warn("No asset URLs found in catalog items")
627        return gpd.GeoDataFrame()
628
629    # Build and execute DuckDB query
630    print(f"Querying {len(parquet_urls)} GeoParquet files...")
631
632    query = build_duckdb_query(
633        parquet_urls=parquet_urls,
634        geometry=geometry,
635        date_range=date_range,
636        columns=columns,
637        max_rows=max_rows
638    )
639
640    conn = duckdb.connect()
641    try:
642        if not show_progress:
643            conn.execute("SET enable_progress_bar = false")
644        # Enable cloud storage and spatial extension support
645        conn.execute("INSTALL httpfs; LOAD httpfs;")
646        conn.execute("INSTALL spatial; LOAD spatial;")
647
648        result_df = conn.execute(query).df()
649    except Exception as e:
650        warnings.warn(f"Error executing DuckDB query: {e}")
651        return gpd.GeoDataFrame()
652    finally:
653        conn.close()
654
655    if result_df.empty:
656        return gpd.GeoDataFrame()
657
658    print(f"Retrieved {len(result_df):,} rows")
659
660    # Query already extracts lon/lat from geometry using ST_X/ST_Y
661    if 'lon' in result_df.columns and 'lat' in result_df.columns:
662        if not exclude_geometry:
663            # Create geometry from lon/lat
664            geometry = gpd.points_from_xy(result_df['lon'], result_df['lat'])
665            gdf = gpd.GeoDataFrame(result_df, geometry=geometry, crs='EPSG:4326')
666            return gdf
667        else:
668            # Just return with lon/lat columns (no geometry)
669            return gpd.GeoDataFrame(result_df)
670
671    return gpd.GeoDataFrame(result_df)

Query bedmap data from GeoParquet catalogs and return filtered data.

This function:

  1. Uses rustac to query the STAC GeoParquet catalogs for matching items
  2. Uses DuckDB for efficient partial reads with spatial/temporal filtering
  3. Returns a GeoDataFrame with the requested data
Parameters
  • collections (list of str, optional): Filter by bedmap version (e.g., ['bedmap1', 'bedmap2', 'bedmap3'])
  • geometry (shapely geometry, optional): Spatial filter geometry
  • date_range (tuple of datetime, optional): Temporal filter (start_date, end_date)
  • properties (dict, optional): Additional property filters (e.g., {'institution': 'AWI'})
  • max_rows (int, optional): Maximum number of rows to return
  • columns (list of str, optional): Specific columns to retrieve
  • exclude_geometry (bool, default True): If True, don't create geometry column (keeps lat/lon as separate columns)
  • catalog_path (str, default 'local'): Catalog source: 'local' (cached, downloaded on first use), 'cloud' (direct from GCS), or a custom path/URL pattern.
  • local_cache (bool, default False): If True, use locally cached bedmap data files for faster queries. Files will be downloaded automatically if not present. Recommended for repeated queries in loops.
  • data_dir (str or Path, optional): Directory for local data cache. Defaults to radar_cache/bedmap/
  • show_progress (bool, default False): If True, show DuckDB progress bar during queries. Disabled by default to avoid IOPub rate limit errors in Jupyter notebooks.
Returns
  • geopandas.GeoDataFrame: GeoDataFrame with bedmap data
Examples
>>> from shapely.geometry import box
>>> bbox = box(-20, -75, -10, -70)
>>> df = query_bedmap(
...     geometry=bbox,
...     date_range=(datetime(1994, 1, 1), datetime(1995, 12, 31)),
...     collections=['bedmap2']
... )
>>> # Use local cache for faster repeated queries
>>> for bbox in many_bboxes:
...     df = query_bedmap(geometry=bbox, local_cache=True)
def query_bedmap_catalog( collections: Optional[List[str]] = None, geometry: Optional[shapely.geometry.base.BaseGeometry] = None, date_range: Optional[Tuple[datetime.datetime, datetime.datetime]] = None, properties: Optional[Dict] = None, max_items: Optional[int] = None, exclude_geometry: bool = False, catalog_path: str = 'local') -> geopandas.geodataframe.GeoDataFrame:
349def query_bedmap_catalog(
350    collections: Optional[List[str]] = None,
351    geometry: Optional[shapely.geometry.base.BaseGeometry] = None,
352    date_range: Optional[Tuple[datetime, datetime]] = None,
353    properties: Optional[Dict] = None,
354    max_items: Optional[int] = None,
355    exclude_geometry: bool = False,
356    catalog_path: str = 'local',
357) -> gpd.GeoDataFrame:
358    """
359    Query GeoParquet STAC catalogs for matching bedmap items using rustac.
360
361    This function uses rustac's DuckdbClient to perform efficient spatial queries
362    on STAC GeoParquet catalogs, following the same pattern as
363    OPRConnection.query_frames().
364
365    Parameters
366    ----------
367    collections : list of str, optional
368        Filter by bedmap version (e.g., ['bedmap1', 'bedmap2', 'bedmap3'])
369    geometry : shapely geometry, optional
370        Spatial filter geometry
371    date_range : tuple of datetime, optional
372        Temporal filter (start_date, end_date)
373    properties : dict, optional
374        Additional property filters using CQL2 (e.g., {'institution': 'AWI'})
375    max_items : int, optional
376        Maximum number of items to return
377    exclude_geometry : bool, default False
378        If True, exclude geometry column from results
379    catalog_path : str, default 'local'
380        Catalog source: 'local' (cached, downloaded on first use),
381        'cloud' (direct from GCS), or a custom path/URL pattern.
382
383    Returns
384    -------
385    geopandas.GeoDataFrame
386        Matching catalog items with asset_href for data access
387    """
388    # Resolve catalog path
389    if catalog_path == 'local':
390        catalog_path = get_bedmap_catalog_path()
391    elif catalog_path == 'cloud':
392        catalog_path = 'gs://opr_stac/bedmap/bedmap*.parquet'
393
394    search_params = {}
395
396    # Exclude geometry
397    if exclude_geometry:
398        search_params['exclude'] = ['geometry']
399
400    # Handle collections (bedmap versions)
401    if collections is not None:
402        # Map collection names to their catalog patterns
403        collection_list = [collections] if isinstance(collections, str) else collections
404        search_params['collections'] = collection_list
405
406    # Handle geometry filtering
407    if geometry is not None:
408        if hasattr(geometry, '__geo_interface__'):
409            geom_dict = geometry.__geo_interface__
410        else:
411            geom_dict = geometry
412
413        # Fix geometries that cross the antimeridian
414        geom_dict = antimeridian.fix_geojson(geom_dict, reverse=True)
415
416        search_params['intersects'] = geom_dict
417
418    # Note: The bedmap STAC catalogs currently don't have datetime fields,
419    # so temporal filtering at the catalog level is not supported.
420    # Date filtering can be applied later in the DuckDB query step if the
421    # data files have timestamp columns.
422    if date_range is not None:
423        warnings.warn(
424            "Temporal filtering (date_range) is not supported at the catalog level "
425            "for bedmap data. The date_range parameter will be applied during "
426            "the data query step if timestamp columns are available.",
427            UserWarning
428        )
429
430    # Handle max_items
431    if max_items is not None:
432        search_params['limit'] = max_items
433    else:
434        search_params['limit'] = 1000000
435
436    # Handle property filters using CQL2
437    filter_conditions = []
438
439    if properties:
440        for key, value in properties.items():
441            if isinstance(value, list):
442                # Create OR conditions for multiple values
443                value_conditions = []
444                for v in value:
445                    value_conditions.append({
446                        "op": "=",
447                        "args": [{"property": key}, v]
448                    })
449                if len(value_conditions) == 1:
450                    filter_conditions.append(value_conditions[0])
451                else:
452                    filter_conditions.append({
453                        "op": "or",
454                        "args": value_conditions
455                    })
456            else:
457                filter_conditions.append({
458                    "op": "=",
459                    "args": [{"property": key}, value]
460                })
461
462    # Combine all filter conditions with AND
463    if filter_conditions:
464        if len(filter_conditions) == 1:
465            filter_expr = filter_conditions[0]
466        else:
467            filter_expr = {
468                "op": "and",
469                "args": filter_conditions
470            }
471        search_params['filter'] = filter_expr
472
473    # Perform the search using rustac
474    client = DuckdbClient()
475    items = client.search(catalog_path, **search_params)
476
477    if isinstance(items, dict):
478        items = items['features']
479
480    if not items or len(items) == 0:
481        warnings.warn("No items found matching the query criteria", UserWarning)
482        return gpd.GeoDataFrame()
483
484    # Convert to GeoDataFrame
485    items_df = gpd.GeoDataFrame(items)
486
487    # Set index
488    if 'id' in items_df.columns:
489        items_df = items_df.set_index(items_df['id'])
490        items_df.index.name = 'stac_item_id'
491
492    # Set the geometry column
493    if 'geometry' in items_df.columns and not exclude_geometry:
494        items_df = items_df.set_geometry(items_df['geometry'].apply(shapely.geometry.shape))
495        items_df.crs = "EPSG:4326"
496
497    return items_df

Query GeoParquet STAC catalogs for matching bedmap items using rustac.

This function uses rustac's DuckdbClient to perform efficient spatial queries on STAC GeoParquet catalogs, following the same pattern as OPRConnection.query_frames().

Parameters
  • collections (list of str, optional): Filter by bedmap version (e.g., ['bedmap1', 'bedmap2', 'bedmap3'])
  • geometry (shapely geometry, optional): Spatial filter geometry
  • date_range (tuple of datetime, optional): Temporal filter (start_date, end_date)
  • properties (dict, optional): Additional property filters using CQL2 (e.g., {'institution': 'AWI'})
  • max_items (int, optional): Maximum number of items to return
  • exclude_geometry (bool, default False): If True, exclude geometry column from results
  • catalog_path (str, default 'local'): Catalog source: 'local' (cached, downloaded on first use), 'cloud' (direct from GCS), or a custom path/URL pattern.
Returns
  • geopandas.GeoDataFrame: Matching catalog items with asset_href for data access
def query_bedmap_local( parquet_dir: Union[str, pathlib.Path], geometry: Optional[shapely.geometry.base.BaseGeometry] = None, date_range: Optional[Tuple[datetime.datetime, datetime.datetime]] = None, columns: Optional[List[str]] = None, max_rows: Optional[int] = None, exclude_geometry: bool = True) -> geopandas.geodataframe.GeoDataFrame:
776def query_bedmap_local(
777    parquet_dir: Union[str, Path],
778    geometry: Optional[shapely.geometry.base.BaseGeometry] = None,
779    date_range: Optional[Tuple[datetime, datetime]] = None,
780    columns: Optional[List[str]] = None,
781    max_rows: Optional[int] = None,
782    exclude_geometry: bool = True
783) -> gpd.GeoDataFrame:
784    """
785    Query bedmap data from local GeoParquet files.
786
787    Simplified version for querying local files without STAC catalog.
788
789    Parameters
790    ----------
791    parquet_dir : str or Path
792        Directory containing GeoParquet files
793    geometry : shapely geometry, optional
794        Spatial filter
795    date_range : tuple of datetime, optional
796        Temporal filter
797    columns : list of str, optional
798        Columns to select
799    max_rows : int, optional
800        Maximum rows to return
801    exclude_geometry : bool, default True
802        Whether to exclude geometry column
803
804    Returns
805    -------
806    geopandas.GeoDataFrame
807        Query results
808    """
809    parquet_dir = Path(parquet_dir)
810    parquet_files = sorted(parquet_dir.glob('*.parquet'))
811
812    if not parquet_files:
813        warnings.warn(f"No parquet files found in {parquet_dir}")
814        return gpd.GeoDataFrame()
815
816    parquet_urls = [str(f) for f in parquet_files]
817
818    query = build_duckdb_query(
819        parquet_urls=parquet_urls,
820        geometry=geometry,
821        date_range=date_range,
822        columns=columns,
823        max_rows=max_rows
824    )
825
826    conn = duckdb.connect()
827    try:
828        # Disable progress bar to avoid IOPub rate limit in notebooks
829        conn.execute("SET enable_progress_bar = false")
830        # Load spatial extension for ST_X/ST_Y functions
831        conn.execute("INSTALL spatial; LOAD spatial;")
832        result_df = conn.execute(query).df()
833    finally:
834        conn.close()
835
836    if result_df.empty:
837        return gpd.GeoDataFrame()
838
839    # Query already extracts lon/lat from geometry using ST_X/ST_Y
840    if 'lon' in result_df.columns and 'lat' in result_df.columns:
841        if not exclude_geometry:
842            # Create geometry from lon/lat
843            geometry = gpd.points_from_xy(result_df['lon'], result_df['lat'])
844            gdf = gpd.GeoDataFrame(result_df, geometry=geometry, crs='EPSG:4326')
845            return gdf
846        else:
847            # Just return with lon/lat columns (no geometry)
848            return gpd.GeoDataFrame(result_df)
849
850    return gpd.GeoDataFrame(result_df)

Query bedmap data from local GeoParquet files.

Simplified version for querying local files without STAC catalog.

Parameters
  • parquet_dir (str or Path): Directory containing GeoParquet files
  • geometry (shapely geometry, optional): Spatial filter
  • date_range (tuple of datetime, optional): Temporal filter
  • columns (list of str, optional): Columns to select
  • max_rows (int, optional): Maximum rows to return
  • exclude_geometry (bool, default True): Whether to exclude geometry column
Returns
  • geopandas.GeoDataFrame: Query results
def fetch_bedmap( version: str = 'all', data_dir: Union[str, pathlib.Path, NoneType] = None, force: bool = False) -> Dict[str, List[pathlib.Path]]:
 68def fetch_bedmap(
 69    version: str = 'all',
 70    data_dir: Optional[Union[str, Path]] = None,
 71    force: bool = False
 72) -> Dict[str, List[Path]]:
 73    """
 74    Download bedmap GeoParquet data files to local cache.
 75
 76    Reads the STAC catalog to get asset hrefs, then downloads all data files
 77    for the requested bedmap version(s).
 78
 79    Parameters
 80    ----------
 81    version : str, default 'all'
 82        Which bedmap version(s) to download:
 83        - 'all': Download all versions (bedmap1, bedmap2, bedmap3)
 84        - 'bedmap1', 'bedmap2', 'bedmap3': Download specific version
 85    data_dir : str or Path, optional
 86        Directory to store downloaded data files. Defaults to radar_cache/bedmap/
 87    force : bool, default False
 88        If True, re-download even if files already exist
 89
 90    Returns
 91    -------
 92    dict
 93        Mapping of version names to lists of local file paths
 94
 95    Examples
 96    --------
 97    >>> # Download all bedmap data
 98    >>> paths = fetch_bedmap()
 99    >>> print(len(paths['bedmap2']))  # Number of files downloaded
100    45
101
102    >>> # Download only bedmap3
103    >>> paths = fetch_bedmap(version='bedmap3')
104
105    >>> # Use with query_bedmap for faster repeated queries
106    >>> paths = fetch_bedmap()
107    >>> for bbox in many_bboxes:
108    ...     df = query_bedmap(geometry=bbox, local_cache=True)
109    """
110    # Data files go to radar_cache/bedmap/ (matches OPRConnection convention)
111    data_dir = Path(data_dir) if data_dir else DEFAULT_BEDMAP_DATA_DIR
112    data_dir.mkdir(parents=True, exist_ok=True)
113
114    # Determine which versions to download
115    valid_versions = list(BEDMAP_CATALOG_URLS.keys())
116    if version == 'all':
117        versions = valid_versions
118    elif version in BEDMAP_CATALOG_URLS:
119        versions = [version]
120    else:
121        raise ValueError(
122            f"Unknown version '{version}'. "
123            f"Valid options: 'all', {valid_versions}"
124        )
125
126    downloaded = {}
127    for ver in versions:
128        catalog_url = BEDMAP_CATALOG_URLS[ver]
129        print(f"{ver}: Reading STAC catalog to get data file locations...")
130
131        # Read catalog with DuckDB to extract asset_href column
132        https_catalog_url = _gs_to_https(catalog_url)
133        try:
134            conn = duckdb.connect()
135            conn.execute("SET enable_progress_bar = false")
136            query = f"SELECT asset_href FROM read_parquet('{https_catalog_url}')"
137            result = conn.execute(query).fetchall()
138            conn.close()
139            hrefs = [row[0] for row in result if row[0]]
140        except Exception as e:
141            warnings.warn(f"Failed to read catalog for {ver}: {e}")
142            continue
143
144        if not hrefs:
145            warnings.warn(f"No asset hrefs found in {ver} catalog")
146            continue
147
148        print(f"{ver}: Found {len(hrefs)} data files to download")
149
150        # Download each data file
151        downloaded_files = []
152        for href in hrefs:
153            filename = Path(href).name
154            local_path = data_dir / filename
155
156            if local_path.exists() and not force:
157                downloaded_files.append(local_path)
158                continue
159
160            if _download_file(href, local_path):
161                downloaded_files.append(local_path)
162                print(f"  Downloaded: {filename}")
163
164        downloaded[ver] = downloaded_files
165        print(f"{ver}: {len(downloaded_files)} files cached in {data_dir}")
166
167    return downloaded

Download bedmap GeoParquet data files to local cache.

Reads the STAC catalog to get asset hrefs, then downloads all data files for the requested bedmap version(s).

Parameters
  • version (str, default 'all'): Which bedmap version(s) to download:
    • 'all': Download all versions (bedmap1, bedmap2, bedmap3)
    • 'bedmap1', 'bedmap2', 'bedmap3': Download specific version
  • data_dir (str or Path, optional): Directory to store downloaded data files. Defaults to radar_cache/bedmap/
  • force (bool, default False): If True, re-download even if files already exist
Returns
  • dict: Mapping of version names to lists of local file paths
Examples
>>> # Download all bedmap data
>>> paths = fetch_bedmap()
>>> print(len(paths['bedmap2']))  # Number of files downloaded
45
>>> # Download only bedmap3
>>> paths = fetch_bedmap(version='bedmap3')
>>> # Use with query_bedmap for faster repeated queries
>>> paths = fetch_bedmap()
>>> for bbox in many_bboxes:
...     df = query_bedmap(geometry=bbox, local_cache=True)
def get_bedmap_cache_path(data_dir: Union[str, pathlib.Path, NoneType] = None) -> pathlib.Path:
170def get_bedmap_cache_path(
171    data_dir: Optional[Union[str, Path]] = None
172) -> Path:
173    """
174    Get path to cached bedmap data directory.
175
176    Parameters
177    ----------
178    data_dir : str or Path, optional
179        Data directory. Defaults to radar_cache/bedmap/
180
181    Returns
182    -------
183    Path
184        Path to the data directory containing cached parquet files
185    """
186    return Path(data_dir) if data_dir else DEFAULT_BEDMAP_DATA_DIR

Get path to cached bedmap data directory.

Parameters
  • data_dir (str or Path, optional): Data directory. Defaults to radar_cache/bedmap/
Returns
  • Path: Path to the data directory containing cached parquet files