Quickstart¶
This walkthrough uses the example pipeline from the Astro repository. Adapt it for your own pipeline repository.
1. Create a pipeline¶
Create pipeline.py in your project directory:
import pandera.polars as pa
import polars as pl
from astro import AstroFileSpec, Pipeline
from astro.pipeline import ExecutionMode, IngestFileSpec
from astro.pipeline.files import AstroFile
from astro.pipeline.steps import StepContext
class EstablishmentsFile(AstroFileSpec):
ingest_name = "establishments"
def remove_closed(dataframe: pl.DataFrame) -> pl.DataFrame:
return dataframe.filter(pl.col("EstablishmentName").str.contains("Closed"))
def step_copy_establishments(_ctx: StepContext, files: list[AstroFile]) -> None:
file = files[0]
file.save_to("processed", "establishments.parquet", file.load())
class ExamplePipeline(Pipeline):
name = "example"
execution_mode = ExecutionMode.SERIAL
ingest_files = [
IngestFileSpec(
name="establishments",
source_pattern="edubase*.csv",
schema=pa.DataFrameSchema(
{
"URN": pa.Column(str),
"EstablishmentName": pa.Column(str),
},
strict="filter",
),
),
]
def configure_steps(self) -> None:
self.add_filter("Remove closed establishments", remove_closed, [EstablishmentsFile()])
self.add_step(
"Copy establishments to processed",
step_copy_establishments,
[EstablishmentsFile()],
depends_on=["remove-closed-establishments"],
)
pipeline = ExamplePipeline()
The module must export a pipeline object.
2. Prepare source data¶
Create a directory with a CSV matching your ingest pattern:
source/
edubase_sample.csv
URN,EstablishmentName
100001,Open Example School
100002,Closed Example School
The source path passed to astro ingest must be a directory. The directory must contain exactly the CSV files expected by ingest_files; unexpected files and subdirectories fail validation.
3. Ingest source files¶
From the directory containing pipeline.py:
astro ingest path/to/source/
Astro will:
Validate the source directory contains exactly the expected files
Validate each CSV against its Pandera schema
Write Parquet files to
.working/{run_id}/ingested/Record statistics in
.astro/stats.db
Large files show a progress bar during materialization.
4. Run the pipeline¶
astro run
By default, Astro shows a Rich dashboard with step progress and live logs. Use plain log output instead:
astro run --mode cli
5. Inspect the result¶
.working/{run_id}/
manifest.json
ingested/
establishments.parquet
processed/
establishments.parquet
filtered/
remove-closed-establishments/
establishments.parquet
astro.log
View the pipeline flow diagram:
astro describe
List stored runs:
astro list
The copied output contains the rows left after the filter step. The removed rows are kept under filtered/ for audit.
Next steps¶
Defining pipelines — pipeline configuration in depth
Row quarantine — handle invalid rows without aborting
CLI reference — full CLI reference