Row filtering

Filter steps remove rows from one or more files using author-defined logic. Astro applies the filter boilerplate: split input rows, save kept rows in place, persist removed rows for audit, and record statistics.

Filter steps always complete normally (unlike quarantine).

Register a filter

The author function receives the input pl.DataFrame and returns removed rows only:

def remove_closed(df: pl.DataFrame) -> pl.DataFrame:
    return df.filter(pl.col("EstablishmentName").str.contains("Closed"))


class ExamplePipeline(Pipeline):
    def configure_steps(self) -> None:
        self.add_filter("Remove closed schools", remove_closed, [EstablishmentsFile()])
        self.add_step(
            "Transform open schools",
            step_transform,
            [EstablishmentsFile()],
            depends_on=["remove-closed-schools"],
        )

You can also pass a Polars expression predicate:

self.add_filter("Remove closed", pl.col("status") == "closed", [EstablishmentsFile()])

What Astro does for each file

  1. Loads the current active parquet (or uses batched path for large files)

  2. Calls the filter function to obtain removed rows

  3. Validates removed rows are a subset of the input (matching columns; semi-join check)

  4. Writes kept rows back with save_in_place()

  5. Writes removed rows to filtered/{step_id}/{ingest_name}.parquet

  6. Records rows_filtered and rows_kept statistics

Run directory layout

.working/{run_id}/
  ingested/
  filtered/{step_id}/{ingest_name}.parquet
  manifest.json

Filtered Parquet rows use the same schema as the source file (no extra framework columns).

Duplicate rows

Splitting uses joins on all columns, so identical duplicate rows may not partition cleanly if the filter returns fewer copies than exist in the input.

Large files

Filter steps automatically use a batched path when the file exceeds large_file_threshold_bytes. See Large files.

Next steps