Source code for astro.pipeline.steps

"""Pipeline run step definitions and execution context."""

from __future__ import annotations

import logging
import re
from collections.abc import Callable
from dataclasses import dataclass, field
from datetime import date
from enum import StrEnum
from pathlib import Path

from astro.pipeline.files import AstroFile, AstroFileSpec
from astro.quarantine.collector import StepQuarantine
from astro.stats.recorder import StatisticsRecorder

StepFn = Callable[["StepContext", list[AstroFile]], None]
ProgressCallback = Callable[[float | None], None]


[docs] class StepKind(StrEnum): """Kind of registered pipeline step.""" STEP = "step" FILTER = "filter"
[docs] @dataclass(frozen=True) class StepDefinition: """Registered pipeline step metadata and callable.""" step_id: str label: str fn: StepFn file_specs: tuple[AstroFileSpec, ...] depends_on: tuple[str, ...] kind: StepKind = field(default=StepKind.STEP)
[docs] @dataclass(frozen=True) class StepContext: """Execution context passed to each pipeline step function. Attributes: pipeline_dir: Directory containing ``pipeline.py``. run_directory: ``.working/{run_id}/`` path for the current run. run_id: Current run identifier. run_date: Date assigned to the run. step_id: Current step identifier. logger: Step-scoped logger. report_progress: Callback to update dashboard progress. quarantine: Collector for quarantining invalid rows. stats: Recorder for run, file, and step statistics. """ pipeline_dir: Path run_directory: Path run_id: str run_date: date step_id: str logger: logging.Logger report_progress: ProgressCallback quarantine: StepQuarantine stats: StatisticsRecorder
[docs] def slugify_step_label(label: str) -> str: slug = re.sub(r"[^a-z0-9]+", "-", label.lower()).strip("-") return slug or "step"