Defining pipelines¶
Pipelines declare ingest configuration and register ordered run steps in a pipeline.py module.
Pipeline contract¶
When Astro runs in a directory containing pipeline.py, it discovers and loads that module. The module must export a pipeline object implementing the Pipeline interface.
Configuration attributes¶
Attribute |
Purpose |
|---|---|
|
Expected source file patterns and Pandera schemas for CLI ingest |
|
|
|
|
|
Cap on concurrent run steps when parallel (defaults to |
|
Size above which batched ingest/filter/quarantine paths are used (default 100MB) |
|
CSV rows per batch during large-file ingest (default 100,000) |
|
Parquet rows per batch during filter steps and |
Each IngestFileSpec.name must be unique and start with an alphanumeric character. It may contain letters, numbers, ., _, and -.
Step registration¶
Register steps in configure_steps():
add_step(label, fn, files, depends_on=[...])— custom run stepadd_filter(label, fn, files, depends_on=[...])— declarative row filter
Each step references one or more AstroFileSpec subclasses. Each source file may have a different schema.
Astro pipeline authors register steps with add_step() and add_filter(). The older Pipeline.run() style is not used for CLI pipelines.
Example pipeline¶
from astro import AstroFileSpec, Pipeline
from astro.pipeline import ExecutionMode, IngestFileSpec
from astro.pipeline.files import AstroFile
from astro.pipeline.steps import StepContext
import pandera.polars as pa
class EstablishmentsFile(AstroFileSpec):
ingest_name = "establishments"
def step_copy_establishments(_ctx: StepContext, files: list[AstroFile]) -> None:
file = files[0]
file.save_to("processed", "establishments.parquet", file.load())
class ExamplePipeline(Pipeline):
name = "example"
execution_mode = ExecutionMode.SERIAL
ingest_files = [
IngestFileSpec(
name="establishments",
source_pattern="edubase*.csv",
schema=pa.DataFrameSchema(
{"URN": pa.Column(str), "EstablishmentName": pa.Column(str)},
strict="filter",
),
),
]
def configure_steps(self) -> None:
self.add_step(
"Copy establishments to processed",
step_copy_establishments,
[EstablishmentsFile()],
)
pipeline = ExamplePipeline()
AstroFileSpec¶
AstroFileSpec is a declarative per-file configuration container. Subclass it and set ingest_name to match an entry in ingest_files:
class EstablishmentsFile(AstroFileSpec):
ingest_name = "establishments"
You can add custom attributes for use in step functions:
class EstablishmentsFile(AstroFileSpec):
ingest_name = "establishments"
marker = "configured"
Step dependencies¶
Use depends_on to declare step ordering. Step IDs are derived from labels (slugified) unless you pass an explicit step_id:
self.add_filter("Remove closed", remove_closed, [EstablishmentsFile()])
self.add_step(
"Transform open schools",
step_transform,
[EstablishmentsFile()],
depends_on=["remove-closed"],
)
Parallel step execution¶
Set step_execution_mode = StepExecutionMode.PARALLEL to dispatch independent steps concurrently. Steps that touch the same ingested file name are serialized with per-file locks. Parallel scheduling works best for I/O-bound and Polars work; pure-Python CPU-bound steps will not scale due to the GIL.
Next steps¶
Ingest — ingest behaviour and execution modes
Steps and file I/O — read and write data in steps
Row filtering — declarative row filters