API reference¶
This section documents the public Python API exported by Astro.
Package exports¶
The following symbols are available from the top-level astro package:
Symbol |
Description |
|---|---|
|
Base class for pipeline definitions |
|
Declarative per-file configuration |
|
Runtime file wrapper during |
|
Execution context passed to step functions |
|
Stable UUID mapping and change detection |
|
Type alias for filter step functions |
|
Run, file, and step statistics |
|
Statistics scope enum |
Pipeline¶
Base pipeline interface for Astro library users.
- class astro.pipeline.base.IngestedSource(path, data)[source]
Bases:
objectOne ingested file and its raw data. Schemas may differ across sources.
- path: Path
- data: DataFrame
- __init__(path, data)
- class astro.pipeline.base.Pipeline[source]
Bases:
ABCBase class for CSV import pipelines defined in external repositories.
Subclass
Pipeline, defineingest_files, implementconfigure_steps(), and export a module-levelpipelineinstance frompipeline.py.- name
Pipeline identifier stored in statistics.
- Type:
- execution_mode
Ingest concurrency rules (
serialorparallel).- Type:
astro.pipeline.models.ExecutionMode
- step_execution_mode
Run-step scheduling (
serialorparallel).- Type:
astro.pipeline.models.StepExecutionMode
- max_parallel_workers
Cap on concurrent run steps when parallel.
- Type:
int | None
- large_file_threshold_bytes
Size above which batched I/O paths are used.
- Type:
- ingest_batch_size
CSV rows per batch during large-file ingest.
- Type:
- run_batch_size
Parquet rows per batch during filter and
iter_batches().- Type:
- ingest_files
Expected source file patterns and Pandera schemas.
- Type:
ClassVar[list[astro.pipeline.models.IngestFileSpec]]
- name: str = 'pipeline'
- execution_mode: ExecutionMode = 'serial'
- step_execution_mode: StepExecutionMode = 'serial'
- large_file_threshold_bytes: int = 104857600
- ingest_batch_size: int = 100000
- run_batch_size: int = 100000
- __init__()[source]
- configure_steps()[source]
Register run steps via
add_stepandadd_filter.
- add_step(label, fn, files, *, step_id=None, depends_on=None, kind=StepKind.STEP)[source]
Register a custom run step.
- Parameters:
label (str) – Human-readable step name shown in the dashboard and logs.
fn (Callable[[StepContext, list[AstroFile]], None]) – Step function receiving
(StepContext, list[AstroFile]).files (Sequence[AstroFileSpec]) –
AstroFileSpecsubclasses this step reads and writes.step_id (str | None) – Optional explicit step id (defaults to slugified label).
depends_on (Sequence[str] | None) – Step ids that must complete before this step runs.
kind (StepKind) – Step kind (
steporfilter); useadd_filterfor filters.
- add_filter(label, fn, files, *, step_id=None, depends_on=None)[source]
Register a filter step that removes rows from one or more files.
- Parameters:
label (str) – Human-readable filter name.
fn (Callable[[DataFrame], DataFrame] | Expr) – Filter function returning removed rows, or a Polars expression predicate.
files (Sequence[AstroFileSpec]) –
AstroFileSpecsubclasses to filter.step_id (str | None) – Optional explicit step id (defaults to slugified label).
depends_on (Sequence[str] | None) – Step ids that must complete before this filter runs.
- property steps: list[StepDefinition]
- run(path)[source]
Legacy entry point; use
astro ingestandastro runinstead.
Configuration models¶
Pipeline configuration models.
- class astro.pipeline.models.ExecutionMode(*values)[source]
Bases:
StrEnumIngest concurrency mode.
- SERIAL = 'serial'
- PARALLEL = 'parallel'
- class astro.pipeline.models.StepExecutionMode(*values)[source]
Bases:
StrEnumRun-step scheduling mode within a single pipeline run.
- SERIAL = 'serial'
- PARALLEL = 'parallel'
- class astro.pipeline.models.IngestFileSpec(name, source_pattern, schema, encoding='utf-8', has_header=True, column_names=None)[source]
Bases:
objectExpected source file and Pandera schema for CLI ingest.
- name: str
- source_pattern: str
- schema: DataFrameSchema
- encoding: str = 'utf-8'
- has_header: bool = True
- __init__(name, source_pattern, schema, encoding='utf-8', has_header=True, column_names=None)
File containers¶
File containers for pipeline run steps.
- class astro.pipeline.files.AstroFileSpec[source]
Bases:
objectDeclarative per-file configuration referenced by run steps.
Subclass and set
ingest_nameto match an entry inPipeline.ingest_files.
- class astro.pipeline.files.AstroFile(spec, ingest_record, run_directory, _active_path, large_file_threshold_bytes=104857600, run_batch_size=100000)[source]
Bases:
objectRuntime wrapper for one ingested file within a pipeline run.
Hydrated during
astro runwith I/O methods for reading and writing Parquet. Theactive_pathtracks the current output location as steps modify the file.- spec: AstroFileSpec
- ingest_record: IngestedFileRecord
- run_directory: Path
- large_file_threshold_bytes: int = 104857600
- run_batch_size: int = 100000
- property active_path: Path
Current Parquet path for this file within the run.
- is_large_file()[source]
Return whether the active path exceeds the large-file threshold.
- load()[source]
Eagerly read the active Parquet file into a DataFrame.
- scan()[source]
Lazily scan the active Parquet file.
- row_count()[source]
Return the row count from Parquet metadata without loading data.
- iter_batches(batch_size=None)[source]
Iterate over Parquet row batches for large-file step logic.
- sink(lazy_frame, *, row_group_size=None)[source]
Stream a lazy frame to the active path without full materialization.
- save_in_place(dataframe)[source]
Overwrite the ingested Parquet snapshot and update the active path.
- save_in_place_lazy(lazy_frame)[source]
Stream-overwrite the ingested Parquet snapshot and update the active path.
- save_to(subfolder, filename, dataframe)[source]
Write a DataFrame under
.working/{run_id}/{subfolder}/and set active path.
- save_to_lazy(subfolder, filename, lazy_frame)[source]
Stream-write a lazy frame under a subfolder and set active path.
- output_path(subfolder, filename)[source]
Return the path for a file under a run subfolder without writing.
- set_active_path(path)[source]
Update the active path after an external write completes.
- classmethod hydrate(*, spec, ingest_record, run_directory, large_file_threshold_bytes=104857600, run_batch_size=100000)[source]
- __init__(spec, ingest_record, run_directory, _active_path, large_file_threshold_bytes=104857600, run_batch_size=100000)
Step context¶
Pipeline run step definitions and execution context.
- class astro.pipeline.steps.StepKind(*values)[source]
Bases:
StrEnumKind of registered pipeline step.
- STEP = 'step'
- FILTER = 'filter'
- class astro.pipeline.steps.StepDefinition(step_id, label, fn, file_specs, depends_on, kind=StepKind.STEP)[source]
Bases:
objectRegistered pipeline step metadata and callable.
- step_id: str
- label: str
- file_specs: tuple[AstroFileSpec, ...]
- kind: StepKind = 'step'
- __init__(step_id, label, fn, file_specs, depends_on, kind=StepKind.STEP)
- class astro.pipeline.steps.StepContext(pipeline_dir, run_directory, run_id, run_date, step_id, logger, report_progress, quarantine, stats)[source]
Bases:
objectExecution context passed to each pipeline step function.
- pipeline_dir
Directory containing
pipeline.py.- Type:
- run_directory
.working/{run_id}/path for the current run.- Type:
- run_id
Current run identifier.
- Type:
- run_date
Date assigned to the run.
- Type:
- step_id
Current step identifier.
- Type:
- logger
Step-scoped logger.
- Type:
- report_progress
Callback to update dashboard progress.
- Type:
collections.abc.Callable[[float | None], None]
- quarantine
Collector for quarantining invalid rows.
- Type:
astro.quarantine.collector.StepQuarantine
- stats
Recorder for run, file, and step statistics.
- Type:
astro.stats.recorder.StatisticsRecorder
- pipeline_dir: Path
- run_directory: Path
- run_id: str
- run_date: date
- step_id: str
- logger: Logger
- quarantine: StepQuarantine
- stats: StatisticsRecorder
- __init__(pipeline_dir, run_directory, run_id, run_date, step_id, logger, report_progress, quarantine, stats)
- astro.pipeline.steps.slugify_step_label(label)[source]
Filter types¶
Filter type aliases.
- astro.filter.types.FilterFn
Callable that receives input rows and returns removed rows only.
alias of
Callable[[DataFrame],DataFrame]
Statistics¶
Statistics recorder for run, file, and step metrics.
- class astro.stats.recorder.StatisticsRecorder(run_id, store, *, step_id=None)[source]
Bases:
objectRecord numeric statistics scoped to a run, file, or step.
- __init__(run_id, store, *, step_id=None)[source]
- record_run(action, value)[source]
Record a run-scoped statistic.
- record_file(file_name, action, value)[source]
Record a file-scoped statistic keyed by ingest file name.
- record_step(action, value, *, step_id=None)[source]
Record a step-scoped statistic for the current or given step.
- for_step(step_id)[source]
Statistics models.
Canonical ID resolver¶
Canonical ID resolver with vectorized Polars operations.
- class astro.resolver.resolver.CanonicalIdResolver(pipeline_dir, name, hash_groups)[source]
Bases:
objectMap source keys to canonical UUIDs and detect grouped field changes.
Stores persistent state at
{pipeline_dir}/.persistent/{name}.parquet.- __init__(pipeline_dir, name, hash_groups)[source]
Create a resolver scoped to a named persistent store.
- property store_path: Path
- resolve(data, *, source_key_column, namespace, run_date, exclude_columns=frozenset({}))[source]
Resolve canonical IDs and change flags for each row in
data.- Parameters:
data (DataFrame) – Input DataFrame containing
source_key_column.source_key_column (str) – Column with pipeline-provided identifiers.
namespace (str) – Prefix for stored keys as
{namespace}:{source_key}.run_date (date) – Date used for change tracking.
exclude_columns (frozenset[str]) – Columns excluded from
"*all"hash groups.
- Returns:
DataFrame augmented with
canonical_id,status, and{group}_changedcolumns.- Return type:
DataFrame