Steps and file I/O

During astro run, each step receives a StepContext and a list of AstroFile instances hydrated from the ingested run.

Step function signature

def step_copy(_ctx: StepContext, files: list[AstroFile]) -> None:
    file = files[0]
    file.save_to("processed", "establishments.parquet", file.load())

Steps must write outputs explicitly. Validation-only steps may call load() and raise without saving.

AstroFile I/O methods

Method

Purpose

file.load()

Eager read from the active path (small files)

file.scan()

Lazy scan_parquet over the active path

file.sink(lf)

Streaming Parquet write to the active path

file.save_in_place(df)

Overwrite the ingested Parquet snapshot

file.save_in_place_lazy(lf)

Streaming overwrite of the ingested snapshot

file.save_to(subfolder, filename, df)

Write under .working/{run_id}/{subfolder}/

file.save_to_lazy(subfolder, filename, lf)

Streaming write under a subfolder

file.row_count()

Metadata-only row count

file.iter_batches()

Iterate Parquet row batches for large-file logic

file.is_large_file()

Whether the active path exceeds the large-file threshold

For large files, prefer scan() + sink() or save_in_place_lazy() over load(). See Large files.

StepContext

Each step receives a context with:

Attribute

Purpose

pipeline_dir

Directory containing pipeline.py

run_directory

.working/{run_id}/ path

run_id

Current run identifier

run_date

Date assigned to the run

step_id

Current step identifier

logger

Step-scoped logger

quarantine

Quarantine collector (see Row quarantine)

stats

Statistics recorder (see Statistics)

Example: validation step

def step_validate(_ctx: StepContext, files: list[AstroFile]) -> None:
    file = files[0]
    dataframe = file.load()
    if dataframe.is_empty():
        raise ValueError("establishments file is empty")

Example: lazy transform

def step_transform(_ctx: StepContext, files: list[AstroFile]) -> None:
    file = files[0]
    lazy_frame = file.scan().with_columns(pl.lit("processed").alias("stage"))
    file.save_to_lazy("processed", "establishments.parquet", lazy_frame)

Next steps