API reference

This section documents the public Python API exported by Astro.

Package exports

The following symbols are available from the top-level astro package:

Symbol

Description

Pipeline

Base class for pipeline definitions

AstroFileSpec

Declarative per-file configuration

AstroFile

Runtime file wrapper during astro run

StepContext

Execution context passed to step functions

CanonicalIdResolver

Stable UUID mapping and change detection

FilterFn

Type alias for filter step functions

StatisticsRecorder

Run, file, and step statistics

StatScope

Statistics scope enum

Pipeline

Base pipeline interface for Astro library users.

class astro.pipeline.base.IngestedSource(path, data)[source]

Bases: object

One ingested file and its raw data. Schemas may differ across sources.

path: Path
data: DataFrame
__init__(path, data)
class astro.pipeline.base.Pipeline[source]

Bases: ABC

Base class for CSV import pipelines defined in external repositories.

Subclass Pipeline, define ingest_files, implement configure_steps(), and export a module-level pipeline instance from pipeline.py.

name

Pipeline identifier stored in statistics.

Type:

str

execution_mode

Ingest concurrency rules (serial or parallel).

Type:

astro.pipeline.models.ExecutionMode

step_execution_mode

Run-step scheduling (serial or parallel).

Type:

astro.pipeline.models.StepExecutionMode

max_parallel_workers

Cap on concurrent run steps when parallel.

Type:

int | None

large_file_threshold_bytes

Size above which batched I/O paths are used.

Type:

int

ingest_batch_size

CSV rows per batch during large-file ingest.

Type:

int

run_batch_size

Parquet rows per batch during filter and iter_batches().

Type:

int

ingest_files

Expected source file patterns and Pandera schemas.

Type:

ClassVar[list[astro.pipeline.models.IngestFileSpec]]

name: str = 'pipeline'
execution_mode: ExecutionMode = 'serial'
step_execution_mode: StepExecutionMode = 'serial'
max_parallel_workers: int | None = None
large_file_threshold_bytes: int = 104857600
ingest_batch_size: int = 100000
run_batch_size: int = 100000
ingest_files: ClassVar[list[IngestFileSpec]]
__init__()[source]
configure_steps()[source]

Register run steps via add_step and add_filter.

add_step(label, fn, files, *, step_id=None, depends_on=None, kind=StepKind.STEP)[source]

Register a custom run step.

Parameters:
  • label (str) – Human-readable step name shown in the dashboard and logs.

  • fn (Callable[[StepContext, list[AstroFile]], None]) – Step function receiving (StepContext, list[AstroFile]).

  • files (Sequence[AstroFileSpec]) – AstroFileSpec subclasses this step reads and writes.

  • step_id (str | None) – Optional explicit step id (defaults to slugified label).

  • depends_on (Sequence[str] | None) – Step ids that must complete before this step runs.

  • kind (StepKind) – Step kind (step or filter); use add_filter for filters.

add_filter(label, fn, files, *, step_id=None, depends_on=None)[source]

Register a filter step that removes rows from one or more files.

Parameters:
  • label (str) – Human-readable filter name.

  • fn (Callable[[DataFrame], DataFrame] | Expr) – Filter function returning removed rows, or a Polars expression predicate.

  • files (Sequence[AstroFileSpec]) – AstroFileSpec subclasses to filter.

  • step_id (str | None) – Optional explicit step id (defaults to slugified label).

  • depends_on (Sequence[str] | None) – Step ids that must complete before this filter runs.

property steps: list[StepDefinition]
run(path)[source]

Legacy entry point; use astro ingest and astro run instead.

Configuration models

Pipeline configuration models.

class astro.pipeline.models.ExecutionMode(*values)[source]

Bases: StrEnum

Ingest concurrency mode.

SERIAL = 'serial'
PARALLEL = 'parallel'
class astro.pipeline.models.StepExecutionMode(*values)[source]

Bases: StrEnum

Run-step scheduling mode within a single pipeline run.

SERIAL = 'serial'
PARALLEL = 'parallel'
class astro.pipeline.models.IngestFileSpec(name, source_pattern, schema, encoding='utf-8', has_header=True, column_names=None)[source]

Bases: object

Expected source file and Pandera schema for CLI ingest.

name: str
source_pattern: str
schema: DataFrameSchema
encoding: str = 'utf-8'
has_header: bool = True
column_names: tuple[str, ...] | None = None
__init__(name, source_pattern, schema, encoding='utf-8', has_header=True, column_names=None)

File containers

File containers for pipeline run steps.

class astro.pipeline.files.AstroFileSpec[source]

Bases: object

Declarative per-file configuration referenced by run steps.

Subclass and set ingest_name to match an entry in Pipeline.ingest_files.

ingest_name: ClassVar[str]
class astro.pipeline.files.AstroFile(spec, ingest_record, run_directory, _active_path, large_file_threshold_bytes=104857600, run_batch_size=100000)[source]

Bases: object

Runtime wrapper for one ingested file within a pipeline run.

Hydrated during astro run with I/O methods for reading and writing Parquet. The active_path tracks the current output location as steps modify the file.

spec: AstroFileSpec
ingest_record: IngestedFileRecord
run_directory: Path
large_file_threshold_bytes: int = 104857600
run_batch_size: int = 100000
property active_path: Path

Current Parquet path for this file within the run.

is_large_file()[source]

Return whether the active path exceeds the large-file threshold.

load()[source]

Eagerly read the active Parquet file into a DataFrame.

scan()[source]

Lazily scan the active Parquet file.

row_count()[source]

Return the row count from Parquet metadata without loading data.

iter_batches(batch_size=None)[source]

Iterate over Parquet row batches for large-file step logic.

sink(lazy_frame, *, row_group_size=None)[source]

Stream a lazy frame to the active path without full materialization.

save_in_place(dataframe)[source]

Overwrite the ingested Parquet snapshot and update the active path.

save_in_place_lazy(lazy_frame)[source]

Stream-overwrite the ingested Parquet snapshot and update the active path.

save_to(subfolder, filename, dataframe)[source]

Write a DataFrame under .working/{run_id}/{subfolder}/ and set active path.

save_to_lazy(subfolder, filename, lazy_frame)[source]

Stream-write a lazy frame under a subfolder and set active path.

output_path(subfolder, filename)[source]

Return the path for a file under a run subfolder without writing.

set_active_path(path)[source]

Update the active path after an external write completes.

classmethod hydrate(*, spec, ingest_record, run_directory, large_file_threshold_bytes=104857600, run_batch_size=100000)[source]
__init__(spec, ingest_record, run_directory, _active_path, large_file_threshold_bytes=104857600, run_batch_size=100000)

Step context

Pipeline run step definitions and execution context.

class astro.pipeline.steps.StepKind(*values)[source]

Bases: StrEnum

Kind of registered pipeline step.

STEP = 'step'
FILTER = 'filter'
class astro.pipeline.steps.StepDefinition(step_id, label, fn, file_specs, depends_on, kind=StepKind.STEP)[source]

Bases: object

Registered pipeline step metadata and callable.

step_id: str
label: str
fn: Callable[[StepContext, list[AstroFile]], None]
file_specs: tuple[AstroFileSpec, ...]
depends_on: tuple[str, ...]
kind: StepKind = 'step'
__init__(step_id, label, fn, file_specs, depends_on, kind=StepKind.STEP)
class astro.pipeline.steps.StepContext(pipeline_dir, run_directory, run_id, run_date, step_id, logger, report_progress, quarantine, stats)[source]

Bases: object

Execution context passed to each pipeline step function.

pipeline_dir

Directory containing pipeline.py.

Type:

pathlib.Path

run_directory

.working/{run_id}/ path for the current run.

Type:

pathlib.Path

run_id

Current run identifier.

Type:

str

run_date

Date assigned to the run.

Type:

datetime.date

step_id

Current step identifier.

Type:

str

logger

Step-scoped logger.

Type:

logging.Logger

report_progress

Callback to update dashboard progress.

Type:

collections.abc.Callable[[float | None], None]

quarantine

Collector for quarantining invalid rows.

Type:

astro.quarantine.collector.StepQuarantine

stats

Recorder for run, file, and step statistics.

Type:

astro.stats.recorder.StatisticsRecorder

pipeline_dir: Path
run_directory: Path
run_id: str
run_date: date
step_id: str
logger: Logger
report_progress: Callable[[float | None], None]
quarantine: StepQuarantine
stats: StatisticsRecorder
__init__(pipeline_dir, run_directory, run_id, run_date, step_id, logger, report_progress, quarantine, stats)
astro.pipeline.steps.slugify_step_label(label)[source]

Filter types

Filter type aliases.

astro.filter.types.FilterFn

Callable that receives input rows and returns removed rows only.

alias of Callable[[DataFrame], DataFrame]

Statistics

Statistics recorder for run, file, and step metrics.

class astro.stats.recorder.StatisticsRecorder(run_id, store, *, step_id=None)[source]

Bases: object

Record numeric statistics scoped to a run, file, or step.

__init__(run_id, store, *, step_id=None)[source]
record_run(action, value)[source]

Record a run-scoped statistic.

record_file(file_name, action, value)[source]

Record a file-scoped statistic keyed by ingest file name.

record_step(action, value, *, step_id=None)[source]

Record a step-scoped statistic for the current or given step.

for_step(step_id)[source]

Statistics models.

class astro.stats.models.StatScope(*values)[source]

Bases: StrEnum

Statistics aggregation scope.

RUN = 'run'
FILE = 'file'
STEP = 'step'
class astro.stats.models.StatRecord(run_id, scope, subject, action, value, recorded_at)[source]

Bases: object

One persisted statistics row.

run_id: str
scope: StatScope
subject: str | None
action: str
value: float
recorded_at: datetime
__init__(run_id, scope, subject, action, value, recorded_at)

Canonical ID resolver

Canonical ID resolver with vectorized Polars operations.

class astro.resolver.resolver.CanonicalIdResolver(pipeline_dir, name, hash_groups)[source]

Bases: object

Map source keys to canonical UUIDs and detect grouped field changes.

Stores persistent state at {pipeline_dir}/.persistent/{name}.parquet.

__init__(pipeline_dir, name, hash_groups)[source]

Create a resolver scoped to a named persistent store.

Parameters:
  • pipeline_dir (Path) – Pipeline working directory.

  • name (str) – Store name (for example establishments).

  • hash_groups (dict[str, list[str] | Literal['*all']]) – Mapping of group names to "*all" or field name lists.

property store_path: Path
resolve(data, *, source_key_column, namespace, run_date, exclude_columns=frozenset({}))[source]

Resolve canonical IDs and change flags for each row in data.

Parameters:
  • data (DataFrame) – Input DataFrame containing source_key_column.

  • source_key_column (str) – Column with pipeline-provided identifiers.

  • namespace (str) – Prefix for stored keys as {namespace}:{source_key}.

  • run_date (date) – Date used for change tracking.

  • exclude_columns (frozenset[str]) – Columns excluded from "*all" hash groups.

Returns:

DataFrame augmented with canonical_id, status, and {group}_changed columns.

Return type:

DataFrame