"""
Utilities for reading and writing local Iceberg warehouses with CytoTable.
"""
from __future__ import annotations
import json
import logging
import shutil
import tempfile
from pathlib import Path
from typing import Any, Dict, Optional, Tuple, Union, cast
import pandas as pd
import parsl
import pyarrow as pa
import pyarrow.parquet as parquet
from cytotable.constants import CYTOTABLE_DEFAULT_PARQUET_METADATA
from cytotable.convert import _run_export_workflow
from cytotable.exceptions import CytoTableException
from cytotable.presets import config
from cytotable.sources import _build_path
from cytotable.utils import _default_parsl_config, _expand_path, _parsl_loaded
from .images import (
IMAGE_TABLE_NAME,
SOURCE_IMAGE_TABLE_NAME,
add_object_id_to_profiles_frame,
image_crop_table_from_joined_chunk,
profile_with_images_frame,
source_image_table_from_joined_chunk,
)
logger = logging.getLogger(__name__)
DEFAULT_NAMESPACE = "profiles"
DEFAULT_IMAGES_NAMESPACE = "images"
DEFAULT_REGISTRY_FILE = "catalog.json"
DEFAULT_WAREHOUSE_DIR = "warehouse"
DEFAULT_PROFILES_TABLE = "joined_profiles"
DEFAULT_PROFILE_WITH_IMAGES_VIEW = "profile_with_images"
def _cytotable_iceberg_properties() -> dict[str, str]:
"""
Return CytoTable provenance properties for Iceberg tables and warehouses.
"""
return dict(CYTOTABLE_DEFAULT_PARQUET_METADATA)
try:
from pyiceberg.catalog import Catalog, MetastoreCatalog, PropertiesUpdateSummary
from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableResponse, Table
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.table.update import TableRequirement, TableUpdate
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
except ImportError as import_error:
_PYICEBERG_IMPORT_ERROR: Optional[ImportError] = import_error
else:
_PYICEBERG_IMPORT_ERROR = None
def _require_pyiceberg() -> None:
"""
Raise an informative error when pyiceberg is unavailable.
"""
if _PYICEBERG_IMPORT_ERROR is not None:
raise ImportError(
"Using CytoTable with iceberg/OME-arrow support requires the optional 'pyiceberg' dependency."
) from _PYICEBERG_IMPORT_ERROR
def _qualify(name: str, namespace: str) -> str:
"""
Return a fully qualified Iceberg identifier such as
`profiles.joined_profiles` from a bare name and namespace.
This matters for Iceberg because tables and views live within namespaces,
unlike standalone table files where a single filename can identify the
dataset directly.
"""
return name if "." in name else f"{namespace}.{name}"
def _resolve_unqualified_name(
bundle: TinyCatalog,
name: str,
) -> str:
"""
Resolve an unqualified table/view name across namespaces when unique.
"""
if "." in name:
return name
default_qualified = _qualify(name, bundle.default_namespace)
identifier = tuple(default_qualified.split("."))
if bundle.table_exists(identifier) or bundle.view_exists(identifier):
return default_qualified
matches: list[str] = []
for namespace in bundle.list_namespaces():
qualified = _qualify(name, ".".join(namespace))
candidate = tuple(qualified.split("."))
if bundle.table_exists(candidate) or bundle.view_exists(candidate):
matches.append(qualified)
if len(matches) == 1:
return matches[0]
if len(matches) > 1:
raise CytoTableException(
f"Ambiguous unqualified Iceberg name '{name}'. "
f"Use a fully qualified name such as '{matches[0]}'."
)
return default_qualified
def _warehouse_dir(path: Union[str, Path], registry_file: str) -> Path:
"""
Return the directory that stores Iceberg metadata and data files.
Args:
path:
Warehouse root path or an internal warehouse data directory.
registry_file:
Name of the CytoTable registry file that records warehouse tables
and views, used to determine whether `path` already points at the
warehouse root.
"""
root = Path(path)
return root if (root / registry_file).exists() else root / DEFAULT_WAREHOUSE_DIR
def _rewrite_join_sql_for_warehouse(joins: str, source_names: Dict[str, str]) -> str:
"""
Replace parquet reads in join SQL with registered DuckDB relation names.
"""
rewritten = joins
for source_name in source_names:
rewritten = rewritten.replace(
f"read_parquet('{source_name}.parquet')",
source_names[source_name],
)
return rewritten
def _apply_preset_defaults_to_convert_config(
*,
preset: Optional[str],
metadata: Optional[Tuple[str, ...] | list[str]],
compartments: Optional[Tuple[str, ...] | list[str]],
identifying_columns: Optional[Tuple[str, ...] | list[str]],
joins: Optional[str],
chunk_size: Optional[int],
page_keys: Optional[Dict[str, str]],
) -> Dict[str, Any]:
"""
Return convert() configuration with preset defaults applied.
"""
if preset is not None:
metadata = (
cast(Tuple[str, ...], config[preset]["CONFIG_NAMES_METADATA"])
if metadata is None
else metadata
)
compartments = (
cast(Tuple[str, ...], config[preset]["CONFIG_NAMES_COMPARTMENTS"])
if compartments is None
else compartments
)
identifying_columns = (
cast(Tuple[str, ...], config[preset]["CONFIG_IDENTIFYING_COLUMNS"])
if identifying_columns is None
else identifying_columns
)
joins = cast(str, config[preset]["CONFIG_JOINS"]) if joins is None else joins
chunk_size = (
cast(int, config[preset]["CONFIG_CHUNK_SIZE"])
if chunk_size is None
else chunk_size
)
page_keys = (
cast(Dict[str, str], config[preset]["CONFIG_PAGE_KEYS"])
if page_keys is None
else page_keys
)
return {
"metadata": tuple(metadata or ()),
"compartments": tuple(compartments or ()),
"identifying_columns": tuple(identifying_columns or ()),
"joins": joins or "",
"chunk_size": chunk_size,
"page_keys": dict(page_keys or {}),
"preset": preset,
}
def _validate_image_export_prerequisites(
*,
image_dir: Optional[str],
mask_dir: Optional[str],
outline_dir: Optional[str],
bbox_column_map: Optional[Dict[str, str]],
segmentation_file_regex: Optional[Dict[str, str]],
joins: str,
page_keys: Dict[str, str],
path_kwargs: Optional[Dict[str, Any]] = None,
) -> bool:
"""
Validate that image export configuration includes required join settings.
"""
ancillary_image_config = any(
(
mask_dir is not None,
outline_dir is not None,
bool(bbox_column_map),
bool(segmentation_file_regex),
)
)
image_export_requested = image_dir is not None or ancillary_image_config
if not image_export_requested:
return False
if image_dir is None:
raise CytoTableException(
"Image export options require 'image_dir' to be provided."
)
for label, path_value in (
("image_dir", image_dir),
("mask_dir", mask_dir),
("outline_dir", outline_dir),
):
if path_value is None:
continue
built_path = _build_path(path_value, **(path_kwargs or {}))
path_exists = (
built_path.is_dir() if isinstance(built_path, Path) else built_path.exists()
)
if not path_exists:
raise CytoTableException(
f"Image export requires '{label}' to reference an existing directory: "
f"'{path_value}'."
)
if not joins.strip():
raise CytoTableException(
"Image export requires join SQL. Provide 'joins' directly or use a "
"preset that defines them."
)
if not page_keys.get("join"):
raise CytoTableException(
"Image export requires page_keys to include a non-empty 'join' entry."
)
return True
def _validate_iceberg_join_prerequisites(
*, joins: str, page_keys: Dict[str, str]
) -> None:
"""
Validate that Iceberg export has the join configuration it requires.
"""
if not joins.strip():
raise ValueError(
"Iceberg export requires non-empty join SQL. Provide 'joins' directly "
"or use a preset that defines them."
)
if not page_keys.get("join"):
raise ValueError(
"Iceberg export requires page_keys to include a non-empty 'join' entry."
)
# Define Iceberg catalog helpers only when the optional pyiceberg dependency
# is available, so importing cytotable does not fail for parquet-only users.
if _PYICEBERG_IMPORT_ERROR is None:
class TinyCatalog(MetastoreCatalog):
"""
Tiny filesystem-backed catalog for local CytoTable Iceberg warehouses.
"""
def __init__(
self,
warehouse_root: Path,
*,
default_namespace: str = DEFAULT_NAMESPACE,
registry_file: str = DEFAULT_REGISTRY_FILE,
) -> None:
self.default_namespace = default_namespace
self.registry_path = warehouse_root / registry_file
warehouse_root.mkdir(parents=True, exist_ok=True)
super().__init__("local", warehouse=warehouse_root.resolve().as_uri())
def _read_registry(self) -> dict[str, object]:
if not self.registry_path.exists():
return {
"namespaces": [self.default_namespace],
"properties": _cytotable_iceberg_properties(),
"tables": {},
"views": {},
}
registry = json.loads(self.registry_path.read_text())
registry.setdefault("properties", _cytotable_iceberg_properties())
registry.setdefault("views", {})
return registry
def _write_registry(self, registry: dict[str, object]) -> None:
self.registry_path.write_text(
json.dumps(registry, indent=2, sort_keys=True)
)
def create_namespace(
self, namespace: str | Identifier, properties: Properties = EMPTY_DICT
) -> None:
del properties
registry = self._read_registry()
names = set(cast(list[str], registry["namespaces"]))
names.add(Catalog.namespace_to_string(namespace))
registry["namespaces"] = sorted(names)
self._write_registry(registry)
def load_namespace_properties(
self, namespace: str | Identifier
) -> dict[str, str]:
name = Catalog.namespace_to_string(namespace)
if name not in cast(list[str], self._read_registry()["namespaces"]):
raise NoSuchNamespaceError(name)
return {}
def list_namespaces(
self, namespace: str | Identifier = ()
) -> list[tuple[str, ...]]:
del namespace
return [
tuple(name.split("."))
for name in cast(list[str], self._read_registry()["namespaces"])
]
def list_tables(self, namespace: str | Identifier) -> list[tuple[str, ...]]:
prefix = f"{Catalog.namespace_to_string(namespace)}."
return [
tuple(name.split("."))
for name in sorted(
cast(dict[str, str], self._read_registry()["tables"])
)
if name.startswith(prefix)
]
def load_table(self, identifier: str | Identifier) -> Table:
name = ".".join(Catalog.identifier_to_tuple(identifier))
metadata_location = cast(
dict[str, str], self._read_registry()["tables"]
).get(name)
if metadata_location is None:
raise NoSuchTableError(name)
io = self._load_file_io(location=metadata_location)
metadata = FromInputFile.table_metadata(io.new_input(metadata_location))
return Table(
Catalog.identifier_to_tuple(identifier),
metadata,
metadata_location,
io,
self,
)
def register_table(
self, identifier: str | Identifier, metadata_location: str
) -> Table:
registry = self._read_registry()
cast(dict[str, str], registry["tables"])[
".".join(Catalog.identifier_to_tuple(identifier))
] = metadata_location
self._write_registry(registry)
return self.load_table(identifier)
def commit_table(
self,
table: Table,
requirements: tuple[TableRequirement, ...],
updates: tuple[TableUpdate, ...],
) -> CommitTableResponse:
identifier = Catalog.identifier_to_tuple(table.name())
try:
current = self.load_table(identifier)
except NoSuchTableError:
current = None
staged = self._update_and_stage_table(
current, identifier, requirements, updates
)
self._write_metadata(staged.metadata, staged.io, staged.metadata_location)
registry = self._read_registry()
cast(dict[str, str], registry["tables"])[
".".join(identifier)
] = staged.metadata_location
self._write_registry(registry)
return CommitTableResponse(
metadata=staged.metadata, metadata_location=staged.metadata_location
)
def create_table( # noqa: PLR0913
self,
identifier: str | Identifier,
schema: Schema | pa.Schema,
location: str | None = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> Table:
return self.create_table_transaction(
identifier, schema, location, partition_spec, sort_order, properties
).commit_transaction()
def table_exists(self, identifier: str | Identifier) -> bool:
return ".".join(Catalog.identifier_to_tuple(identifier)) in cast(
dict[str, str], self._read_registry()["tables"]
)
def view_exists(self, identifier: str | Identifier) -> bool:
return ".".join(Catalog.identifier_to_tuple(identifier)) in cast(
dict[str, dict[str, object]], self._read_registry()["views"]
)
def list_views(self, namespace: str | Identifier) -> list[tuple[str, ...]]:
prefix = f"{Catalog.namespace_to_string(namespace)}."
return [
tuple(name.split("."))
for name in sorted(cast(dict[str, str], self._read_registry()["views"]))
if name.startswith(prefix)
]
def drop_view(self, _identifier: str | Identifier) -> None:
raise NotImplementedError
def drop_table(self, _identifier: str | Identifier) -> None:
raise NotImplementedError
def rename_table(
self, _from_identifier: str | Identifier, _to_identifier: str | Identifier
) -> Table:
raise NotImplementedError
def drop_namespace(self, _namespace: str | Identifier) -> None:
raise NotImplementedError
def update_namespace_properties(
self,
_namespace: str | Identifier,
_removals: set[str] | None = None,
_updates: Properties = EMPTY_DICT,
) -> PropertiesUpdateSummary:
raise NotImplementedError
else:
[docs]
class TinyCatalog: # type: ignore[no-redef]
"""
Placeholder catalog when pyiceberg is unavailable.
"""
[docs]
def catalog(
warehouse_path: Union[str, Path],
*,
default_namespace: str = DEFAULT_NAMESPACE,
registry_file: str = DEFAULT_REGISTRY_FILE,
) -> TinyCatalog:
"""
Open a local Iceberg warehouse and return its tiny catalog.
"""
_require_pyiceberg()
return TinyCatalog(
_warehouse_dir(warehouse_path, registry_file),
default_namespace=default_namespace,
registry_file=registry_file,
)
[docs]
def write_iceberg_warehouse( # noqa: PLR0913
source_path: str,
warehouse_path: Union[str, Path],
source_datatype: Optional[str] = None,
metadata: Optional[Tuple[str, ...] | list[str]] = None,
compartments: Optional[Tuple[str, ...] | list[str]] = None,
identifying_columns: Optional[Tuple[str, ...] | list[str]] = None,
joins: Optional[str] = None,
chunk_size: Optional[int] = None,
infer_common_schema: bool = True,
data_type_cast_map: Optional[Dict[str, str]] = None,
add_tablenumber: Optional[bool] = None,
page_keys: Optional[Dict[str, str]] = None,
sort_output: bool = True,
preset: Optional[str] = "cellprofiler_csv",
image_dir: Optional[str] = None,
mask_dir: Optional[str] = None,
outline_dir: Optional[str] = None,
bbox_column_map: Optional[Dict[str, str]] = None,
segmentation_file_regex: Optional[Dict[str, str]] = None,
include_source_images: bool = False,
default_namespace: str = DEFAULT_NAMESPACE,
images_namespace: str = DEFAULT_IMAGES_NAMESPACE,
registry_file: str = DEFAULT_REGISTRY_FILE,
profiles_table_name: str = DEFAULT_PROFILES_TABLE,
profile_with_images_view_name: Optional[str] = DEFAULT_PROFILE_WITH_IMAGES_VIEW,
parsl_config: Optional[parsl.Config] = None,
**kwargs,
) -> str:
"""
Write a CytoTable Iceberg warehouse from raw source data.
This helper powers `convert(..., dest_backend="iceberg")` and accepts the
same core conversion arguments for source selection, joins, chunking, and
image export. See `cytotable.convert.convert` for the shared argument
semantics; this function adds Iceberg-specific options such as
`default_namespace`, `images_namespace`, `registry_file`,
`profiles_table_name`, and `profile_with_images_view_name`.
Returns:
Path to the created Iceberg warehouse root.
Raises:
CytoTableException: If the warehouse path already exists or image
export prerequisites are invalid.
ValueError: If required join SQL or join pagination keys are missing.
"""
_require_pyiceberg()
root = Path(_expand_path(str(warehouse_path)))
if root.exists():
raise CytoTableException(
f"An existing file or directory was provided as warehouse_path: '{root}'."
)
root.parent.mkdir(parents=True, exist_ok=True)
resolved = _apply_preset_defaults_to_convert_config(
preset=preset,
metadata=metadata,
compartments=compartments,
identifying_columns=identifying_columns,
joins=joins,
chunk_size=chunk_size,
page_keys=page_keys,
)
_validate_iceberg_join_prerequisites(
joins=cast(str, resolved["joins"]),
page_keys=cast(Dict[str, str], resolved["page_keys"]),
)
image_export_enabled = _validate_image_export_prerequisites(
image_dir=image_dir,
mask_dir=mask_dir,
outline_dir=outline_dir,
bbox_column_map=bbox_column_map,
segmentation_file_regex=segmentation_file_regex,
joins=cast(str, resolved["joins"]),
page_keys=cast(Dict[str, str], resolved["page_keys"]),
path_kwargs=kwargs,
)
root.mkdir(parents=True, exist_ok=False)
build_root: Optional[Path] = root
stage_dir = Path(tempfile.mkdtemp(prefix="cytotable-iceberg-", dir=str(root)))
parsl_was_loaded = _parsl_loaded()
parsl_loaded_here = False
try:
if not parsl_was_loaded:
parsl.load(parsl_config or _default_parsl_config())
parsl_loaded_here = True
else:
logger.info(
"Reusing the already loaded Parsl configuration; "
"write_iceberg_warehouse will not replace it with a new one."
)
# First materialize the analysis-ready joined profiles output as a
# single parquet artifact, then import that artifact into Iceberg.
profiles_path = cast(
str,
_run_export_workflow(
source_path=source_path,
dest_path=str(stage_dir / f"{profiles_table_name}.parquet"),
source_datatype=source_datatype,
metadata=list(cast(Tuple[str, ...], resolved["metadata"])),
compartments=list(cast(Tuple[str, ...], resolved["compartments"])),
identifying_columns=list(
cast(Tuple[str, ...], resolved["identifying_columns"])
),
concat=True,
join=True,
joins=cast(str, resolved["joins"]),
chunk_size=cast(Optional[int], resolved["chunk_size"]),
infer_common_schema=infer_common_schema,
drop_null=False,
sort_output=sort_output,
page_keys=cast(Dict[str, str], resolved["page_keys"]),
dest_datatype="parquet",
data_type_cast_map=data_type_cast_map,
add_tablenumber=add_tablenumber,
**kwargs,
),
)
bundle = catalog(
root,
default_namespace=default_namespace,
registry_file=registry_file,
)
bundle.create_namespace(default_namespace)
if image_export_enabled:
bundle.create_namespace(images_namespace)
profiles_table_exists = False
if profiles_path and Path(profiles_path).exists():
# Stamp stable object identifiers onto the materialized profile
# rows before persisting them as the warehouse's primary table.
profiles_arrow_table = pa.Table.from_pandas(
add_object_id_to_profiles_frame(
parquet.read_table(Path(profiles_path)).to_pandas(),
bbox_column_map=bbox_column_map,
),
preserve_index=False,
)
if bundle.table_exists((default_namespace, profiles_table_name)):
table = bundle.load_table((default_namespace, profiles_table_name))
else:
table = bundle.create_table(
(default_namespace, profiles_table_name),
profiles_arrow_table.schema,
properties=_cytotable_iceberg_properties(),
)
table.append(profiles_arrow_table)
profiles_table_exists = True
if image_export_enabled:
# Run the same join in chunked mode for image work so crops and
# full source-image rows can be produced lazily per chunk.
joined_chunk_paths = cast(
list[str],
_run_export_workflow(
source_path=source_path,
dest_path=str(stage_dir / "joined"),
source_datatype=source_datatype,
metadata=list(cast(Tuple[str, ...], resolved["metadata"])),
compartments=list(cast(Tuple[str, ...], resolved["compartments"])),
identifying_columns=list(
cast(Tuple[str, ...], resolved["identifying_columns"])
),
concat=False,
join=True,
joins=cast(str, resolved["joins"]),
chunk_size=cast(Optional[int], resolved["chunk_size"]),
infer_common_schema=infer_common_schema,
drop_null=False,
sort_output=sort_output,
page_keys=cast(Dict[str, str], resolved["page_keys"]),
data_type_cast_map=data_type_cast_map,
add_tablenumber=add_tablenumber,
**kwargs,
),
)
image_table: Optional[Table] = None
source_images_table: Optional[Table] = None
seen_source_image_ids: set[str] = set()
if bundle.table_exists((images_namespace, SOURCE_IMAGE_TABLE_NAME)):
# Source images are image-level assets, so deduplicate them
# across joined chunks by the stable image identifier.
existing_source_images = (
bundle.load_table((images_namespace, SOURCE_IMAGE_TABLE_NAME))
.scan()
.to_arrow()
)
if "Metadata_ImageID" in existing_source_images.column_names:
seen_source_image_ids.update(
image_id
for image_id in existing_source_images[
"Metadata_ImageID"
].to_pylist()
if image_id is not None
)
for chunk_path in joined_chunk_paths:
crop_table = image_crop_table_from_joined_chunk(
chunk_path=chunk_path,
image_dir=cast(str, image_dir),
mask_dir=mask_dir,
outline_dir=outline_dir,
bbox_column_map=bbox_column_map,
segmentation_file_regex=segmentation_file_regex,
path_kwargs=kwargs,
)
if crop_table.num_rows == 0:
continue
if image_table is None:
image_table = (
bundle.load_table((images_namespace, IMAGE_TABLE_NAME))
if bundle.table_exists((images_namespace, IMAGE_TABLE_NAME))
else bundle.create_table(
(images_namespace, IMAGE_TABLE_NAME),
crop_table.schema,
properties=_cytotable_iceberg_properties(),
)
)
image_table.append(crop_table)
if include_source_images:
source_image_table = source_image_table_from_joined_chunk(
chunk_path=chunk_path,
image_dir=cast(str, image_dir),
mask_dir=mask_dir,
outline_dir=outline_dir,
segmentation_file_regex=segmentation_file_regex,
path_kwargs=kwargs,
)
if source_image_table.num_rows != 0:
source_image_frame = source_image_table.to_pandas()
source_image_frame = source_image_frame[
~source_image_frame["Metadata_ImageID"].isin(
seen_source_image_ids
)
]
if source_image_frame.empty:
continue
filtered_source_image_table = pa.Table.from_pandas(
source_image_frame,
schema=source_image_table.schema,
preserve_index=False,
)
if source_images_table is None:
source_images_table = (
bundle.load_table(
(images_namespace, SOURCE_IMAGE_TABLE_NAME)
)
if bundle.table_exists(
(images_namespace, SOURCE_IMAGE_TABLE_NAME)
)
else bundle.create_table(
(images_namespace, SOURCE_IMAGE_TABLE_NAME),
filtered_source_image_table.schema,
properties=_cytotable_iceberg_properties(),
)
)
source_images_table.append(filtered_source_image_table)
seen_source_image_ids.update(
image_id
for image_id in source_image_frame[
"Metadata_ImageID"
].tolist()
if image_id is not None
)
if (
profiles_table_exists
and profile_with_images_view_name
and image_table is not None
):
# Persist the cross-namespace analytical view only when both
# the base profile table and image crop table exist.
registry = bundle._read_registry()
cast(dict[str, dict[str, object]], registry["views"])[
_qualify(profile_with_images_view_name, default_namespace)
] = {
"kind": "profile_with_images",
"base_table": _qualify(profiles_table_name, default_namespace),
"image_table": _qualify(IMAGE_TABLE_NAME, images_namespace),
"bbox_column_map": bbox_column_map or {},
}
bundle._write_registry(registry)
# Drop transient parquet staging after the warehouse contents have
# been committed successfully.
shutil.rmtree(stage_dir, ignore_errors=True)
build_root = None
finally:
if parsl_loaded_here:
parsl.dfk().cleanup()
if build_root is not None:
# Clean up partially built warehouse state on any failure path.
shutil.rmtree(build_root, ignore_errors=True)
return str(root)
def _read_sql_view(bundle: TinyCatalog, view_name: str) -> pd.DataFrame:
"""
Read a saved SQL view by materializing Iceberg tables into DuckDB.
"""
from cytotable.utils import _duckdb_reader
registry = bundle._read_registry()
spec = cast(dict[str, Any], cast(dict[str, Any], registry["views"])[view_name])
sql = cast(str, spec["sql"])
with _duckdb_reader() as reader:
for table_name in cast(list[str], spec["tables"]):
qualified = _qualify(table_name, bundle.default_namespace)
arrow_table = (
bundle.load_table(tuple(qualified.split("."))).scan().to_arrow()
)
reader.register(table_name, arrow_table)
return reader.execute(sql).fetch_arrow_table().to_pandas()
def _read_profile_with_images_view(bundle: TinyCatalog, view_name: str) -> pd.DataFrame:
"""
Read a saved profile/image manifest view from warehouse tables.
"""
registry = bundle._read_registry()
spec = cast(dict[str, Any], cast(dict[str, Any], registry["views"])[view_name])
joined_frame = (
bundle.load_table(tuple(cast(str, spec["base_table"]).split(".")))
.scan()
.to_arrow()
.to_pandas()
)
image_frame = (
bundle.load_table(tuple(cast(str, spec["image_table"]).split(".")))
.scan()
.to_arrow()
.to_pandas()
)
return profile_with_images_frame(
joined_frame=joined_frame,
image_frame=image_frame,
bbox_column_map=cast(Dict[str, str], spec.get("bbox_column_map") or {}),
)
def _read_registered_view(bundle: TinyCatalog, view_name: str) -> pd.DataFrame:
"""
Read a saved registry-backed warehouse view.
"""
registry = bundle._read_registry()
spec = cast(dict[str, Any], cast(dict[str, Any], registry["views"])[view_name])
kind = cast(str, spec["kind"])
if kind == "sql":
return _read_sql_view(bundle, view_name)
if kind == "profile_with_images":
return _read_profile_with_images_view(bundle, view_name)
raise CytoTableException(f"Unsupported warehouse view kind: {kind}")
[docs]
def read_iceberg_table(
warehouse_path: Union[str, Path],
table_name: str,
*,
default_namespace: str = DEFAULT_NAMESPACE,
registry_file: str = DEFAULT_REGISTRY_FILE,
) -> pd.DataFrame:
"""
Read an Iceberg table or saved SQL view from a local warehouse.
"""
bundle = catalog(
warehouse_path,
default_namespace=default_namespace,
registry_file=registry_file,
)
qualified_name = _resolve_unqualified_name(bundle, table_name)
if bundle.view_exists(tuple(qualified_name.split("."))):
return _read_registered_view(bundle, qualified_name)
return (
bundle.load_table(tuple(qualified_name.split(".")))
.scan()
.to_arrow()
.to_pandas()
)
[docs]
def list_iceberg_tables(
warehouse_path: Union[str, Path],
include_views: bool = True,
*,
default_namespace: str = DEFAULT_NAMESPACE,
registry_file: str = DEFAULT_REGISTRY_FILE,
) -> list[str]:
"""
List fully qualified tables and optional views in a local Iceberg warehouse.
"""
bundle = catalog(
warehouse_path,
default_namespace=default_namespace,
registry_file=registry_file,
)
names = [
".".join(identifier)
for namespace in bundle.list_namespaces()
for identifier in bundle.list_tables(namespace)
]
if include_views:
names.extend(
".".join(identifier)
for namespace in bundle.list_namespaces()
for identifier in bundle.list_views(namespace)
)
return sorted(names)
[docs]
def describe_iceberg_warehouse(
warehouse_path: Union[str, Path],
include_views: bool = True,
*,
default_namespace: str = DEFAULT_NAMESPACE,
registry_file: str = DEFAULT_REGISTRY_FILE,
) -> pd.DataFrame:
"""
Summarize tables and saved views within a local Iceberg warehouse.
"""
bundle = catalog(
warehouse_path,
default_namespace=default_namespace,
registry_file=registry_file,
)
rows: list[dict[str, object]] = []
for namespace in bundle.list_namespaces():
for identifier in bundle.list_tables(namespace):
table = bundle.load_table(identifier)
files = table.inspect.files().to_pandas()
current_snapshot = table.current_snapshot()
rows.append(
{
"table": ".".join(identifier),
"rows": int(files["record_count"].sum()),
"data_files": len(files),
"snapshot_id": (
current_snapshot.snapshot_id
if current_snapshot is not None
else None
),
"kind": "table",
}
)
if include_views:
for identifier in bundle.list_views(namespace):
view_name = ".".join(identifier)
rows.append(
{
"table": view_name,
"rows": len(_read_registered_view(bundle, view_name)),
"data_files": 0,
"snapshot_id": None,
"kind": "view",
}
)
return pd.DataFrame(rows).sort_values("table").reset_index(drop=True)
__all__ = [
"DEFAULT_IMAGES_NAMESPACE",
"DEFAULT_NAMESPACE",
"DEFAULT_PROFILE_WITH_IMAGES_VIEW",
"DEFAULT_PROFILES_TABLE",
"DEFAULT_REGISTRY_FILE",
"TinyCatalog",
"catalog",
"describe_iceberg_warehouse",
"list_iceberg_tables",
"read_iceberg_table",
"write_iceberg_warehouse",
]