Skip to content

Processing

The processing module contains the core aggregation pipeline: reading HDF5 data from S3, spatial filtering by morton cell, computing statistics, and writing results to Zarr.

Pipeline

zagg.processing.process_morton_cell

process_morton_cell(
    parent_morton: int,
    parent_order: int,
    child_order: int,
    granule_urls: List[str],
    s3_credentials: dict,
    h5coro_driver=None,
    config: PipelineConfig | None = None,
    driver: str | None = None,
    catalog_metadata: dict | None = None,
) -> Tuple[DataFrame, ProcessingMetadata]

Process one parent morton cell: read data, calculate statistics, return DataFrame.

This is a cloud-agnostic function that processes HDF5 data and returns results as a DataFrame. The caller is responsible for writing the output.

Parameters:

  • parent_morton (int) –

    Morton index of parent cell

  • parent_order (int) –

    Order of parent morton cell (e.g., 6 or 7)

  • child_order (int) –

    Order of child cells for statistics (typically 12)

  • granule_urls (list) –

    List of S3 URLs or file paths to process

  • s3_credentials (dict) –

    Credentials for accessing data. For S3 driver: dict with accessKeyId/secretAccessKey/sessionToken. For HTTPS driver: dict with edl_token key (bearer token string).

  • h5coro_driver (class, default: None ) –

    h5coro driver class. Overrides driver if provided.

  • config (PipelineConfig, default: None ) –

    Pipeline config. Defaults to default_config().

  • driver (str, default: None ) –

    Data access driver: "s3" or "https". Used to select h5coro driver and rewrite URLs if needed. Defaults to config.data_source.driver or "s3".

  • catalog_metadata (dict, default: None ) –

    Catalog metadata containing s3_base and https_base for URL rewriting when using the HTTPS driver.

Returns:

  • tuple

    (DataFrame, metadata_dict)

Statistics

zagg.processing.calculate_cell_statistics

calculate_cell_statistics(
    df_cell: DataFrame,
    value_col="h_li",
    sigma_col="s_li",
    config: PipelineConfig | None = None,
) -> dict

Calculate summary statistics for a cell, driven by pipeline config metadata.

Parameters:

  • df_cell (DataFrame) –

    Dataframe containing observations for a single cell

  • value_col (str, default: 'h_li' ) –

    Column name for elevation values

  • sigma_col (str, default: 's_li' ) –

    Column name for uncertainty values

  • config (PipelineConfig, default: None ) –

    Pipeline config to use for dispatch. Defaults to default_config().

Returns:

  • dict

    Dictionary of statistics keyed by aggregation variable name

Zarr I/O

zagg.processing.write_dataframe_to_zarr

write_dataframe_to_zarr(
    df_out: DataFrame,
    store: Store,
    *,
    chunk_idx: int,
    child_order: int,
    parent_order: int,
) -> Store

Write a DataFrame to an existing Zarr store.

Parameters:

  • df_out (DataFrame) –

    DataFrame with columns matching the pipeline config (coordinates + data variables)

  • store (Store) –

    Zarr-compatible store (already contains template)

  • chunk_idx (int) –

    The chunk index for storing data

  • child_order (int) –

    Order of child cells

  • parent_order (int) –

    Order of parent cells

Returns:

  • Store

    The same store, with data written