Statistics¶
Pipeline runs record numeric statistics scoped to a run, file, or step, keyed by an action name.
Each update:
Logs an INFO line to
astro.stats(visible in console, dashboard, andastro.log)Upserts the value in SQLite (later calls replace the prior value for the same scope/subject/action)
Log format:
STAT run=abc12 scope=step subject=validate action=rows_quarantined value=3
Run-scoped statistics use subject=- in log output.
Step API¶
Each StepContext exposes a stats recorder:
def step_transform(ctx: StepContext, files: list[AstroFile]) -> None:
file = files[0]
dataframe = file.load()
ctx.stats.record_file(file.spec.__class__.ingest_name, "rows_read", dataframe.height)
ctx.stats.record_run("custom_counter", 1)
ctx.stats.record_step("rows_written", dataframe.height)
file.save_in_place(dataframe)
Method |
Scope |
|---|---|
|
Run |
|
File (ingest name) |
|
Current step |
StatisticsRecorder and StatScope are also exported from astro for use outside step functions.
Built-in statistics¶
Phase |
Scope |
Action |
When recorded |
|---|---|---|---|
Ingest |
file |
|
After each file materializes |
Ingest |
run |
|
After successful ingest |
Ingest |
run |
|
On ingest failure |
Run |
step |
|
After each step executes |
Run |
step |
|
When a step quarantines rows |
Run |
file |
|
After a filter step processes a file |
Run |
step |
|
After a filter step (total removed) |
Run |
run |
|
When run finishes |
Run |
run |
|
When run finishes |
Run |
run |
|
When run finishes with quarantined steps |
SQLite storage¶
Statistics are stored in {pipeline_dir}/.astro/stats.db:
runs:run_id,pipeline_name,status,source_directory,created_at,ingested_atingest_files: per-file row/column counts, source path, parquet path, source sizestatistics: generic metrics withrun_id,scope,subject,action,value,recorded_at
See Working directory layout for the full directory layout.