Steps and file I/O¶
During astro run, each step receives a StepContext and a list of AstroFile instances hydrated from the ingested run.
Step function signature¶
def step_copy(_ctx: StepContext, files: list[AstroFile]) -> None:
file = files[0]
file.save_to("processed", "establishments.parquet", file.load())
Steps must write outputs explicitly. Validation-only steps may call load() and raise without saving.
AstroFile I/O methods¶
Method |
Purpose |
|---|---|
|
Eager read from the active path (small files) |
|
Lazy |
|
Streaming Parquet write to the active path |
|
Overwrite the ingested Parquet snapshot |
|
Streaming overwrite of the ingested snapshot |
|
Write under |
|
Streaming write under a subfolder |
|
Metadata-only row count |
|
Iterate Parquet row batches for large-file logic |
|
Whether the active path exceeds the large-file threshold |
For large files, prefer scan() + sink() or save_in_place_lazy() over load(). See Large files.
StepContext¶
Each step receives a context with:
Attribute |
Purpose |
|---|---|
|
Directory containing |
|
|
|
Current run identifier |
|
Date assigned to the run |
|
Current step identifier |
|
Step-scoped logger |
|
Quarantine collector (see Row quarantine) |
|
Statistics recorder (see Statistics) |
Example: validation step¶
def step_validate(_ctx: StepContext, files: list[AstroFile]) -> None:
file = files[0]
dataframe = file.load()
if dataframe.is_empty():
raise ValueError("establishments file is empty")
Example: lazy transform¶
def step_transform(_ctx: StepContext, files: list[AstroFile]) -> None:
file = files[0]
lazy_frame = file.scan().with_columns(pl.lit("processed").alias("stage"))
file.save_to_lazy("processed", "establishments.parquet", lazy_frame)
Next steps¶
Row filtering — declarative row filters
Row quarantine — isolate invalid rows without aborting