Source code for cytotable.warehouse.access

"""
Generic table access helpers for Parquet files and Iceberg warehouses.
"""

from __future__ import annotations

from pathlib import Path
from typing import Optional, Union

import pandas as pd
from pyarrow import parquet

from cytotable.exceptions import CytoTableException

from .iceberg import (
    DEFAULT_REGISTRY_FILE,
    DEFAULT_WAREHOUSE_DIR,
    list_iceberg_tables,
    read_iceberg_table,
)


def _is_iceberg_warehouse(path: Union[str, Path]) -> bool:
    """
    Determine whether a path points to a local Iceberg warehouse root.
    """

    root = Path(path)
    return (root / DEFAULT_REGISTRY_FILE).exists() or (
        root / DEFAULT_WAREHOUSE_DIR / DEFAULT_REGISTRY_FILE
    ).exists()


def _single_parquet_table_name(path: Union[str, Path]) -> str:
    """
    Return the implied single-table name for a Parquet file or dataset path.
    """

    resolved = Path(path)
    return resolved.stem if resolved.suffix == ".parquet" else resolved.name


[docs] def list_tables(path: Union[str, Path], *, include_views: bool = True) -> list[str]: """ List available table names from a Parquet path or Iceberg warehouse. """ resolved = Path(path) if _is_iceberg_warehouse(resolved): return list_iceberg_tables(resolved, include_views=include_views) if not resolved.exists(): raise CytoTableException(f"Path does not exist: '{resolved}'.") return [_single_parquet_table_name(resolved)]
[docs] def read_table( path: Union[str, Path], table_name: Optional[str] = None, ) -> pd.DataFrame: """ Read a table from a Parquet path or Iceberg warehouse. """ resolved = Path(path) if _is_iceberg_warehouse(resolved): if table_name is None: tables = list_iceberg_tables(resolved) if len(tables) != 1: raise CytoTableException( "table_name is required when reading from an Iceberg warehouse " "with multiple tables or views." ) table_name = tables[0] return read_iceberg_table(resolved, table_name) if not resolved.exists(): raise CytoTableException(f"Path does not exist: '{resolved}'.") single_name = _single_parquet_table_name(resolved) if table_name is not None and table_name not in {single_name, resolved.name}: raise CytoTableException( f"Parquet path '{resolved}' exposes a single table named " f"'{single_name}', not '{table_name}'." ) return parquet.read_table(resolved).to_pandas()
__all__ = ["list_tables", "read_table"]