Defining pipelines

Pipelines declare ingest configuration and register ordered run steps in a pipeline.py module.

Pipeline contract

When Astro runs in a directory containing pipeline.py, it discovers and loads that module. The module must export a pipeline object implementing the Pipeline interface.

Configuration attributes

Attribute

Purpose

ingest_files

Expected source file patterns and Pandera schemas for CLI ingest

execution_mode

serial or parallel ingest concurrency rules

step_execution_mode

serial (default) or parallel run-step scheduling

max_parallel_workers

Cap on concurrent run steps when parallel (defaults to min(32, cpu_count + 4))

large_file_threshold_bytes

Size above which batched ingest/filter/quarantine paths are used (default 100MB)

ingest_batch_size

CSV rows per batch during large-file ingest (default 100,000)

run_batch_size

Parquet rows per batch during filter steps and iter_batches() (default 100,000)

Each IngestFileSpec.name must be unique and start with an alphanumeric character. It may contain letters, numbers, ., _, and -.

Step registration

Register steps in configure_steps():

  • add_step(label, fn, files, depends_on=[...]) — custom run step

  • add_filter(label, fn, files, depends_on=[...]) — declarative row filter

Each step references one or more AstroFileSpec subclasses. Each source file may have a different schema.

Astro pipeline authors register steps with add_step() and add_filter(). The older Pipeline.run() style is not used for CLI pipelines.

Example pipeline

from astro import AstroFileSpec, Pipeline
from astro.pipeline import ExecutionMode, IngestFileSpec
from astro.pipeline.files import AstroFile
from astro.pipeline.steps import StepContext
import pandera.polars as pa


class EstablishmentsFile(AstroFileSpec):
    ingest_name = "establishments"


def step_copy_establishments(_ctx: StepContext, files: list[AstroFile]) -> None:
    file = files[0]
    file.save_to("processed", "establishments.parquet", file.load())


class ExamplePipeline(Pipeline):
    name = "example"
    execution_mode = ExecutionMode.SERIAL
    ingest_files = [
        IngestFileSpec(
            name="establishments",
            source_pattern="edubase*.csv",
            schema=pa.DataFrameSchema(
                {"URN": pa.Column(str), "EstablishmentName": pa.Column(str)},
                strict="filter",
            ),
        ),
    ]

    def configure_steps(self) -> None:
        self.add_step(
            "Copy establishments to processed",
            step_copy_establishments,
            [EstablishmentsFile()],
        )


pipeline = ExamplePipeline()

AstroFileSpec

AstroFileSpec is a declarative per-file configuration container. Subclass it and set ingest_name to match an entry in ingest_files:

class EstablishmentsFile(AstroFileSpec):
    ingest_name = "establishments"

You can add custom attributes for use in step functions:

class EstablishmentsFile(AstroFileSpec):
    ingest_name = "establishments"
    marker = "configured"

Step dependencies

Use depends_on to declare step ordering. Step IDs are derived from labels (slugified) unless you pass an explicit step_id:

self.add_filter("Remove closed", remove_closed, [EstablishmentsFile()])
self.add_step(
    "Transform open schools",
    step_transform,
    [EstablishmentsFile()],
    depends_on=["remove-closed"],
)

Parallel step execution

Set step_execution_mode = StepExecutionMode.PARALLEL to dispatch independent steps concurrently. Steps that touch the same ingested file name are serialized with per-file locks. Parallel scheduling works best for I/O-bound and Polars work; pure-Python CPU-bound steps will not scale due to the GIL.

Next steps