Row filtering¶
Filter steps remove rows from one or more files using author-defined logic. Astro applies the filter boilerplate: split input rows, save kept rows in place, persist removed rows for audit, and record statistics.
Filter steps always complete normally (unlike quarantine).
Register a filter¶
The author function receives the input pl.DataFrame and returns removed rows only:
def remove_closed(df: pl.DataFrame) -> pl.DataFrame:
return df.filter(pl.col("EstablishmentName").str.contains("Closed"))
class ExamplePipeline(Pipeline):
def configure_steps(self) -> None:
self.add_filter("Remove closed schools", remove_closed, [EstablishmentsFile()])
self.add_step(
"Transform open schools",
step_transform,
[EstablishmentsFile()],
depends_on=["remove-closed-schools"],
)
You can also pass a Polars expression predicate:
self.add_filter("Remove closed", pl.col("status") == "closed", [EstablishmentsFile()])
What Astro does for each file¶
Loads the current active parquet (or uses batched path for large files)
Calls the filter function to obtain removed rows
Validates removed rows are a subset of the input (matching columns; semi-join check)
Writes kept rows back with
save_in_place()Writes removed rows to
filtered/{step_id}/{ingest_name}.parquetRecords
rows_filteredandrows_keptstatistics
Run directory layout¶
.working/{run_id}/
ingested/
filtered/{step_id}/{ingest_name}.parquet
manifest.json
Filtered Parquet rows use the same schema as the source file (no extra framework columns).
Duplicate rows¶
Splitting uses joins on all columns, so identical duplicate rows may not partition cleanly if the filter returns fewer copies than exist in the input.
Large files¶
Filter steps automatically use a batched path when the file exceeds large_file_threshold_bytes. See Large files.
Next steps¶
Row quarantine — isolate invalid rows with retry support
Statistics — built-in filter statistics