Processing¶
The processing module contains the core aggregation pipeline: reading HDF5 data from S3, spatial filtering by morton cell, computing statistics, and writing results to Zarr.
Pipeline¶
zagg.processing.process_morton_cell ¶
process_morton_cell(
parent_morton: int,
parent_order: int,
child_order: int,
granule_urls: List[str],
s3_credentials: dict,
h5coro_driver=None,
config: PipelineConfig | None = None,
driver: str | None = None,
grid=None,
) -> Tuple[DataFrame, ProcessingMetadata]
Deprecated HEALPix-flavored alias for :func:process_shard.
Constructs a stateless HealpixGrid and forwards to process_shard.
Statistics¶
zagg.processing.calculate_cell_statistics ¶
calculate_cell_statistics(
cell_data: dict[str, Any],
value_col: str = "h_li",
sigma_col: str = "s_li",
config: PipelineConfig | None = None,
) -> dict
Calculate summary statistics for a cell, driven by pipeline config metadata.
User contract
The supported aggregation surface is anything expressible in numpy. Each
agg field names a function that :func:zagg.config.resolve_function
turns into a callable: a bare name ("min", "nanmean") resolves to
np.<name> via getattr(np, ...), an "np."-prefixed name the same
way, and a dotted path ("numpy.quantile") via import. This means the full
numpy NaN-aware family — np.nanmean, np.nanvar, np.nanmax,
np.nanmin, np.nansum, np.nanstd, np.nanmedian, … — is
usable directly from the config template with no special-casing, and is
reduced with numpy's own NaN semantics (see test_numpy_nan_aware_functions).
The experimental handoff="arrow-kernel" path is an opt-in acceleration
for the kernel-able subset only; it does not change or narrow this contract
(see the EXPERIMENTAL block below).
Parameters:
-
cell_data(dict[str, Any]) –Eval namespace for a single cell. Keys are column names; values are numpy arrays of equal length. May also carry chunk-level scalars injected by the per-chunk precompute hook (issue #30), which a per-cell expression can reference by name.
-
value_col(str, default:'h_li') –Column name for elevation values.
-
sigma_col(str, default:'s_li') –Column name for uncertainty values.
-
config(PipelineConfig, default:None) –Pipeline config to use for dispatch. Defaults to
default_config().
Returns:
-
dict–Dictionary of statistics keyed by aggregation variable name.
Zarr I/O¶
zagg.processing.write_dataframe_to_zarr ¶
Write a per-shard output carrier to an existing Zarr template.
Parameters:
-
df_out(DataFrame or Table) –Coordinate + data-variable columns. A
pyarrow.Tableis used when the config declares anyvectorfield (issue #29): itsFixedSizeListcolumns carry the per-celltrailing_shapepayload, written to a Zarr array with a trailing dimension. Cell count must equalprod(grid.chunk_shape); cells are in the grid's canonical chunk order (grid.children(shard_key)). -
store(Store) –Zarr-compatible store with the template already written.
-
grid(OutputGrid) –Grid the data was aggregated against. Provides
group_pathandchunk_shapefor routing the write. -
chunk_idx(tuple of int) –Storage block index for this shard, as returned by
grid.block_index(shard_key).
Returns:
-
Store–The same store, with data written.