Processing¶
The processing module contains the core aggregation pipeline: reading HDF5 data from S3, spatial filtering by morton cell, computing statistics, and writing results to Zarr.
Pipeline¶
zagg.processing.process_morton_cell ¶
process_morton_cell(
parent_morton: int,
parent_order: int,
child_order: int,
granule_urls: List[str],
s3_credentials: dict,
h5coro_driver=None,
config: PipelineConfig | None = None,
driver: str | None = None,
catalog_metadata: dict | None = None,
) -> Tuple[DataFrame, ProcessingMetadata]
Process one parent morton cell: read data, calculate statistics, return DataFrame.
This is a cloud-agnostic function that processes HDF5 data and returns results as a DataFrame. The caller is responsible for writing the output.
Parameters:
-
parent_morton(int) –Morton index of parent cell
-
parent_order(int) –Order of parent morton cell (e.g., 6 or 7)
-
child_order(int) –Order of child cells for statistics (typically 12)
-
granule_urls(list) –List of S3 URLs or file paths to process
-
s3_credentials(dict) –Credentials for accessing data. For S3 driver: dict with accessKeyId/secretAccessKey/sessionToken. For HTTPS driver: dict with
edl_tokenkey (bearer token string). -
h5coro_driver(class, default:None) –h5coro driver class. Overrides
driverif provided. -
config(PipelineConfig, default:None) –Pipeline config. Defaults to
default_config(). -
driver(str, default:None) –Data access driver:
"s3"or"https". Used to select h5coro driver and rewrite URLs if needed. Defaults toconfig.data_source.driveror"s3". -
catalog_metadata(dict, default:None) –Catalog metadata containing
s3_baseandhttps_basefor URL rewriting when using the HTTPS driver.
Returns:
-
tuple–(DataFrame, metadata_dict)
Statistics¶
zagg.processing.calculate_cell_statistics ¶
calculate_cell_statistics(
df_cell: DataFrame,
value_col="h_li",
sigma_col="s_li",
config: PipelineConfig | None = None,
) -> dict
Calculate summary statistics for a cell, driven by pipeline config metadata.
Parameters:
-
df_cell(DataFrame) –Dataframe containing observations for a single cell
-
value_col(str, default:'h_li') –Column name for elevation values
-
sigma_col(str, default:'s_li') –Column name for uncertainty values
-
config(PipelineConfig, default:None) –Pipeline config to use for dispatch. Defaults to
default_config().
Returns:
-
dict–Dictionary of statistics keyed by aggregation variable name
Zarr I/O¶
zagg.processing.write_dataframe_to_zarr ¶
write_dataframe_to_zarr(
df_out: DataFrame,
store: Store,
*,
chunk_idx: int,
child_order: int,
parent_order: int,
) -> Store
Write a DataFrame to an existing Zarr store.
Parameters:
-
df_out(DataFrame) –DataFrame with columns matching the pipeline config (coordinates + data variables)
-
store(Store) –Zarr-compatible store (already contains template)
-
chunk_idx(int) –The chunk index for storing data
-
child_order(int) –Order of child cells
-
parent_order(int) –Order of parent cells
Returns:
-
Store–The same store, with data written