Skip to content

Processing

The processing module contains the core aggregation pipeline: reading HDF5 data from S3, spatial filtering by morton cell, computing statistics, and writing results to Zarr.

Pipeline

zagg.processing.process_morton_cell

process_morton_cell(
    parent_morton: int,
    parent_order: int,
    child_order: int,
    granule_urls: List[str],
    s3_credentials: dict,
    h5coro_driver=None,
    config: PipelineConfig | None = None,
    driver: str | None = None,
    grid=None,
) -> Tuple[DataFrame, ProcessingMetadata]

Deprecated HEALPix-flavored alias for :func:process_shard.

Constructs a stateless HealpixGrid and forwards to process_shard.

Statistics

zagg.processing.calculate_cell_statistics

calculate_cell_statistics(
    cell_data: dict[str, Any],
    value_col: str = "h_li",
    sigma_col: str = "s_li",
    config: PipelineConfig | None = None,
) -> dict

Calculate summary statistics for a cell, driven by pipeline config metadata.

User contract

The supported aggregation surface is anything expressible in numpy. Each agg field names a function that :func:zagg.config.resolve_function turns into a callable: a bare name ("min", "nanmean") resolves to np.<name> via getattr(np, ...), an "np."-prefixed name the same way, and a dotted path ("numpy.quantile") via import. This means the full numpy NaN-aware familynp.nanmean, np.nanvar, np.nanmax, np.nanmin, np.nansum, np.nanstd, np.nanmedian, … — is usable directly from the config template with no special-casing, and is reduced with numpy's own NaN semantics (see test_numpy_nan_aware_functions). The experimental handoff="arrow-kernel" path is an opt-in acceleration for the kernel-able subset only; it does not change or narrow this contract (see the EXPERIMENTAL block below).

Parameters:

  • cell_data (dict[str, Any]) –

    Eval namespace for a single cell. Keys are column names; values are numpy arrays of equal length. May also carry chunk-level scalars injected by the per-chunk precompute hook (issue #30), which a per-cell expression can reference by name.

  • value_col (str, default: 'h_li' ) –

    Column name for elevation values.

  • sigma_col (str, default: 's_li' ) –

    Column name for uncertainty values.

  • config (PipelineConfig, default: None ) –

    Pipeline config to use for dispatch. Defaults to default_config().

Returns:

  • dict

    Dictionary of statistics keyed by aggregation variable name.

Zarr I/O

zagg.processing.write_dataframe_to_zarr

write_dataframe_to_zarr(
    df_out, store: Store, *, grid, chunk_idx: tuple
) -> Store

Write a per-shard output carrier to an existing Zarr template.

Parameters:

  • df_out (DataFrame or Table) –

    Coordinate + data-variable columns. A pyarrow.Table is used when the config declares any vector field (issue #29): its FixedSizeList columns carry the per-cell trailing_shape payload, written to a Zarr array with a trailing dimension. Cell count must equal prod(grid.chunk_shape); cells are in the grid's canonical chunk order (grid.children(shard_key)).

  • store (Store) –

    Zarr-compatible store with the template already written.

  • grid (OutputGrid) –

    Grid the data was aggregated against. Provides group_path and chunk_shape for routing the write.

  • chunk_idx (tuple of int) –

    Storage block index for this shard, as returned by grid.block_index(shard_key).

Returns:

  • Store

    The same store, with data written.