Source code for astro.pipeline.files

"""File containers for pipeline run steps."""

from __future__ import annotations

from collections.abc import Iterator
from dataclasses import dataclass
from pathlib import Path
from typing import ClassVar

import polars as pl

from astro.io.constants import DEFAULT_LARGE_FILE_THRESHOLD_BYTES, DEFAULT_RUN_BATCH_SIZE
from astro.io.parquet import is_large_file, iter_parquet_batches, parquet_row_count
from astro.working.manifest import IngestedFileRecord


[docs] class AstroFileSpec: """Declarative per-file configuration referenced by run steps. Subclass and set ``ingest_name`` to match an entry in ``Pipeline.ingest_files``. """ ingest_name: ClassVar[str]
[docs] @dataclass class AstroFile: """Runtime wrapper for one ingested file within a pipeline run. Hydrated during ``astro run`` with I/O methods for reading and writing Parquet. The ``active_path`` tracks the current output location as steps modify the file. """ spec: AstroFileSpec ingest_record: IngestedFileRecord run_directory: Path _active_path: Path large_file_threshold_bytes: int = DEFAULT_LARGE_FILE_THRESHOLD_BYTES run_batch_size: int = DEFAULT_RUN_BATCH_SIZE @property def active_path(self) -> Path: """Current Parquet path for this file within the run.""" return self._active_path
[docs] def is_large_file(self) -> bool: """Return whether the active path exceeds the large-file threshold.""" return is_large_file( self._active_path, threshold_bytes=self.large_file_threshold_bytes, )
[docs] def load(self) -> pl.DataFrame: """Eagerly read the active Parquet file into a DataFrame.""" return pl.read_parquet(self._active_path)
[docs] def scan(self) -> pl.LazyFrame: """Lazily scan the active Parquet file.""" return pl.scan_parquet(self._active_path)
[docs] def row_count(self) -> int: """Return the row count from Parquet metadata without loading data.""" return parquet_row_count(self._active_path)
[docs] def iter_batches(self, batch_size: int | None = None) -> Iterator[pl.DataFrame]: """Iterate over Parquet row batches for large-file step logic.""" resolved_batch_size = batch_size or self.run_batch_size yield from iter_parquet_batches(self._active_path, resolved_batch_size)
[docs] def sink(self, lazy_frame: pl.LazyFrame, *, row_group_size: int | None = None) -> None: """Stream a lazy frame to the active path without full materialization.""" if row_group_size is None: lazy_frame.sink_parquet(self._active_path) return lazy_frame.sink_parquet(self._active_path, row_group_size=row_group_size)
[docs] def save_in_place(self, dataframe: pl.DataFrame) -> None: """Overwrite the ingested Parquet snapshot and update the active path.""" output_path = Path(self.ingest_record.parquet_path) output_path.parent.mkdir(parents=True, exist_ok=True) dataframe.write_parquet(output_path) self._active_path = output_path
[docs] def save_in_place_lazy(self, lazy_frame: pl.LazyFrame) -> None: """Stream-overwrite the ingested Parquet snapshot and update the active path.""" output_path = Path(self.ingest_record.parquet_path) output_path.parent.mkdir(parents=True, exist_ok=True) lazy_frame.sink_parquet(output_path) self._active_path = output_path
[docs] def save_to(self, subfolder: str, filename: str, dataframe: pl.DataFrame) -> Path: """Write a DataFrame under ``.working/{run_id}/{subfolder}/`` and set active path.""" output_path = self._resolve_output_path(subfolder, filename) output_path.parent.mkdir(parents=True, exist_ok=True) dataframe.write_parquet(output_path) self._active_path = output_path return output_path
[docs] def save_to_lazy(self, subfolder: str, filename: str, lazy_frame: pl.LazyFrame) -> Path: """Stream-write a lazy frame under a subfolder and set active path.""" output_path = self._resolve_output_path(subfolder, filename) output_path.parent.mkdir(parents=True, exist_ok=True) lazy_frame.sink_parquet(output_path) self._active_path = output_path return output_path
[docs] def output_path(self, subfolder: str, filename: str) -> Path: """Return the path for a file under a run subfolder without writing.""" return self._resolve_output_path(subfolder, filename)
[docs] def set_active_path(self, path: Path) -> None: """Update the active path after an external write completes.""" resolved = path.resolve() if not resolved.is_file(): raise ValueError(f"Active path does not exist: {resolved}") self._active_path = resolved
def _resolve_output_path(self, subfolder: str, filename: str) -> Path: if not subfolder: raise ValueError("subfolder must not be empty.") if not filename: raise ValueError("filename must not be empty.") if Path(subfolder).is_absolute() or Path(filename).is_absolute(): raise ValueError("subfolder and filename must be relative.") if ".." in Path(subfolder).parts or ".." in Path(filename).parts: raise ValueError("subfolder and filename must not contain '..' components.") output_path = (self.run_directory / subfolder / filename).resolve() run_root = self.run_directory.resolve() try: output_path.relative_to(run_root) except ValueError as error: raise ValueError("output path escapes the run directory.") from error return output_path
[docs] @classmethod def hydrate( cls, *, spec: AstroFileSpec, ingest_record: IngestedFileRecord, run_directory: Path, large_file_threshold_bytes: int = DEFAULT_LARGE_FILE_THRESHOLD_BYTES, run_batch_size: int = DEFAULT_RUN_BATCH_SIZE, ) -> AstroFile: return cls( spec=spec, ingest_record=ingest_record, run_directory=run_directory, _active_path=Path(ingest_record.parquet_path), large_file_threshold_bytes=large_file_threshold_bytes, run_batch_size=run_batch_size, )