"""
CytoTable: convert - transforming data for use with pyctyominer.
"""
import itertools
import logging
from typing import Any, Dict, List, Literal, Optional, Tuple, Union, cast
import parsl
from parsl.app.app import python_app
from cytotable.exceptions import CytoTableException
from cytotable.presets import config
from cytotable.sources import _gather_sources
from cytotable.utils import (
_column_sort,
_default_parsl_config,
_expand_path,
_parsl_loaded,
evaluate_futures,
)
logger = logging.getLogger(__name__)
@python_app
def _get_table_columns_and_types(
source: Dict[str, Any], sort_output: bool
) -> List[Dict[str, str]]:
"""
Gather column data from table through duckdb.
Args:
source: Dict[str, Any]
Contains source data details. Represents a single
file or table of some kind.
sort_output:
Specifies whether to sort cytotable output or not.
Returns:
List[Dict[str, str]]
list of dictionaries which each include column level information
"""
import duckdb
from cytotable.utils import _duckdb_reader, _sqlite_mixed_type_query_to_parquet
source_path = source["source_path"]
source_type = str(source_path.suffix).lower()
# prepare the data source in the form of a duckdb query
select_source = (
f"read_csv_auto('{source_path}')"
if source_type == ".csv"
else f"sqlite_scan('{source_path}', '{source['table_name']}')"
)
# Query top 5 results from table and use pragma_storage_info() to
# gather duckdb interpreted data typing. We gather 5 values for
# each column to help with type inferences (where smaller sets
# may yield lower data type accuracy for the full table).
select_query = """
/* we create an in-mem table for later use with the pragma_storage_info call
as this call only functions with materialized tables and not views or related */
CREATE TABLE column_details AS
(SELECT *
FROM &select_source
LIMIT 5
);
/* selects specific column metadata from pragma_storage_info */
SELECT DISTINCT
column_id,
column_name,
segment_type as column_dtype
FROM pragma_storage_info('column_details')
/* avoid duplicate entries in the form of VALIDITY segment_types */
WHERE segment_type != 'VALIDITY'
/* explicitly order the columns by their id to avoid inconsistent results */
ORDER BY column_id ASC;
"""
# attempt to read the data to parquet from duckdb
# with exception handling to read mixed-type data
# using sqlite3 and special utility function
try:
# isolate using new connection to read data based on pageset
# and export directly to parquet via duckdb (avoiding need to return data to python)
# perform the query and create a list of dictionaries with the column data for table
with _duckdb_reader() as ddb_reader:
return (
ddb_reader.execute(
select_query.replace("&select_source", select_source)
)
.arrow()
.to_pylist()
)
except duckdb.Error as e:
# if we see a mismatched type error
# run a more nuanced query through sqlite
# to handle the mixed types
if "Mismatch Type Error" in str(e) and source_type == ".sqlite":
arrow_data_tbl = _sqlite_mixed_type_query_to_parquet(
source_path=str(source["source_path"]),
table_name=str(source["table_name"]),
page_key=source["page_key"],
pageset=source["pagesets"][0],
sort_output=sort_output,
)
with _duckdb_reader() as ddb_reader:
return (
ddb_reader.execute(
select_query.replace("&select_source", "arrow_data_tbl")
)
.arrow()
.to_pylist()
)
else:
raise
@python_app
def _prep_cast_column_data_types(
columns: List[Dict[str, str]], data_type_cast_map: Dict[str, str]
) -> List[Dict[str, str]]:
"""
Cast data types per what is received in cast_map.
Example:
- columns: [{"column_id":0, "column_name":"colname", "column_dtype":"DOUBLE"}]
- data_type_cast_map: {"float": "float32"}
The above passed through this function will set the "column_dtype" value
to a "REAL" dtype ("REAL" in duckdb is roughly equivalent to "float32")
Args:
table_path: str:
Path to a parquet file which will be modified.
data_type_cast_map: Dict[str, str]
A dictionary mapping data type groups to specific types.
Roughly to eventually align with DuckDB types:
https://duckdb.org/docs/sql/data_types/overview
Note: includes synonym matching for common naming convention
use in Pandas and/or PyArrow via cytotable.utils.DATA_TYPE_SYNONYMS
Returns:
List[Dict[str, str]]
list of dictionaries which each include column level information
"""
from functools import partial
from cytotable.utils import _arrow_type_cast_if_specified
if data_type_cast_map is not None:
return list(
# map across all columns
map(
partial(
# attempts to cast the columns provided
_arrow_type_cast_if_specified,
# set static data_type_case_map arg for
# use with all fields
data_type_cast_map=data_type_cast_map,
),
columns,
)
)
return columns
@python_app
def _get_table_keyset_pagination_sets(
chunk_size: int,
page_key: str,
source: Optional[Dict[str, Any]] = None,
sql_stmt: Optional[str] = None,
) -> Union[List[Tuple[Union[int, float], Union[int, float]]], None]:
"""
Get table data chunk keys for later use in capturing segments
of values. This work also provides a chance to catch problematic
input data which will be ignored with warnings.
Args:
source: Dict[str, Any]
Contains the source data to be chunked. Represents a single
file or table of some kind.
chunk_size: int
The size in rowcount of the chunks to create.
page_key: str
The column name to be used to identify pagination chunks.
Expected to be of numeric type (int, float) for ordering.
sql_stmt:
Optional sql statement to form the pagination set from.
Default behavior extracts pagination sets from the full
data source.
Returns:
List[Any]
List of keys to use for reading the data later on.
"""
import logging
import sqlite3
from contextlib import closing
import duckdb
from cytotable.exceptions import NoInputDataException
from cytotable.utils import _duckdb_reader, _generate_pagesets
logger = logging.getLogger(__name__)
if source is not None:
table_name = source["table_name"] if "table_name" in source.keys() else None
source_path = source["source_path"]
source_type = str(source_path.suffix).lower()
try:
with _duckdb_reader() as ddb_reader:
if source_type == ".csv":
sql_query = f"SELECT {page_key} FROM read_csv_auto('{source_path}', header=TRUE, delim=',') ORDER BY {page_key}"
else:
sql_query = f"SELECT {page_key} FROM sqlite_scan('{source_path}', '{table_name}') ORDER BY {page_key}"
page_keys = [
results[0] for results in ddb_reader.execute(sql_query).fetchall()
]
# exception case for when we have mixed types
# (i.e. integer col with string and ints) in a sqlite column
except duckdb.TypeMismatchException:
with closing(sqlite3.connect(source_path)) as cx:
with cx:
page_keys = [
key[0]
for key in cx.execute(
f"SELECT {page_key} FROM {table_name} ORDER BY {page_key};"
).fetchall()
if isinstance(key[0], (int, float))
]
except (
duckdb.InvalidInputException,
NoInputDataException,
) as invalid_input_exc:
logger.warning(
msg=f"Skipping file due to input file errors: {str(invalid_input_exc)}"
)
return None
elif sql_stmt is not None:
with _duckdb_reader() as ddb_reader:
sql_query = f"SELECT {page_key} FROM ({sql_stmt}) ORDER BY {page_key}"
page_keys = ddb_reader.execute(sql_query).fetchall()
page_keys = [key[0] for key in page_keys]
return _generate_pagesets(page_keys, chunk_size)
@python_app
def _source_pageset_to_parquet(
source_group_name: str,
source: Dict[str, Any],
pageset: Tuple[Union[int, float], Union[int, float]],
dest_path: str,
sort_output: bool,
) -> str:
"""
Export source data to chunked parquet file using chunk size and offsets.
Args:
source_group_name: str
Name of the source group (for ex. compartment or metadata table name).
source: Dict[str, Any]
Contains the source data to be chunked. Represents a single
file or table of some kind along with collected information about table.
pageset: Tuple[int, int]
The pageset for chunking the data from source.
dest_path: str
Path to store the output data.
sort_output: bool
Specifies whether to sort cytotable output or not.
Returns:
str
A string of the output filepath.
"""
import pathlib
import duckdb
from cloudpathlib import AnyPath
from cytotable.utils import (
_duckdb_reader,
_sqlite_mixed_type_query_to_parquet,
_write_parquet_table_with_metadata,
)
# attempt to build dest_path
source_dest_path = (
f"{dest_path}/{str(AnyPath(source_group_name).stem).lower()}/"
f"{str(source['source_path'].parent.name).lower()}"
)
pathlib.Path(source_dest_path).mkdir(parents=True, exist_ok=True)
# add source table columns
casted_source_cols = [
# here we cast the column to the specified type ensure the colname remains the same
f"CAST(\"{column['column_name']}\" AS {column['column_dtype']}) AS \"{column['column_name']}\""
for column in source["columns"]
]
# create selection statement from lists above
select_columns = ",".join(
# if we should sort the output, add the metadata_cols
casted_source_cols
if sort_output
else casted_source_cols
)
# build output query and filepath base
# (chunked output will append offset to keep output paths unique)
if str(source["source_path"].suffix).lower() == ".csv":
base_query = f"SELECT {select_columns} FROM read_csv_auto('{str(source['source_path'])}', header=TRUE, delim=',')"
result_filepath_base = f"{source_dest_path}/{str(source['source_path'].stem)}"
elif str(source["source_path"].suffix).lower() == ".sqlite":
base_query = f"SELECT {select_columns} FROM sqlite_scan('{str(source['source_path'])}', '{str(source['table_name'])}')"
result_filepath_base = f"{source_dest_path}/{str(source['source_path'].stem)}.{source['table_name']}"
# form a filepath which indicates the pageset
result_filepath = f"{result_filepath_base}-{pageset[0]}-{pageset[1]}.parquet"
# Attempt to read the data to parquet file
# using duckdb for extraction and pyarrow for
# writing data to a parquet file.
try:
# read data with chunk size + offset
# and export to parquet
with _duckdb_reader() as ddb_reader:
_write_parquet_table_with_metadata(
table=ddb_reader.execute(
f"""
{base_query}
WHERE {source['page_key']} BETWEEN {pageset[0]} AND {pageset[1]}
/* optional ordering per pageset */
{"ORDER BY " + source['page_key'] if sort_output else ""};
"""
).arrow(),
where=result_filepath,
)
# Include exception handling to read mixed-type data
# using sqlite3 and special utility function.
except duckdb.Error as e:
# if we see a mismatched type error
# run a more nuanced query through sqlite
# to handle the mixed types
if (
"Mismatch Type Error" in str(e)
and str(source["source_path"].suffix).lower() == ".sqlite"
):
_write_parquet_table_with_metadata(
# here we use sqlite instead of duckdb to extract
# data for special cases where column and value types
# may not align (which is valid functionality in SQLite).
table=_sqlite_mixed_type_query_to_parquet(
source_path=str(source["source_path"]),
table_name=str(source["table_name"]),
page_key=source["page_key"],
pageset=pageset,
sort_output=sort_output,
),
where=result_filepath,
)
else:
raise
# return the filepath for the chunked output file
return result_filepath
@python_app
def _prepend_column_name(
table_path: str,
source_group_name: str,
identifying_columns: List[str],
metadata: Union[List[str], Tuple[str, ...]],
compartments: List[str],
) -> str:
"""
Rename columns using the source group name, avoiding identifying columns.
Notes:
* A source_group_name represents a filename referenced as part of what
is specified within targets.
Args:
table_path: str:
Path to a parquet file which will be modified.
source_group_name: str:
Name of data source source group (for common compartments, etc).
identifying_columns: List[str]:
Column names which are used as ID's and as a result need to be
treated differently when renaming.
metadata: Union[List[str], Tuple[str, ...]]:
List of source data names which are used as metadata.
compartments: List[str]:
List of source data names which are used as compartments.
Returns:
str
Path to the modified file.
"""
import logging
import pathlib
import pyarrow.parquet as parquet
from cytotable.constants import CYTOTABLE_ARROW_USE_MEMORY_MAPPING
from cytotable.utils import _write_parquet_table_with_metadata
logger = logging.getLogger(__name__)
targets = tuple(metadata) + tuple(compartments)
# if we have no targets or metadata to work from, return the table unchanged
if len(targets) == 0:
logger.warning(
msg=(
"Skipping column name prepend operations "
"because no compartments or metadata were provided."
)
)
return table_path
table = parquet.read_table(
source=table_path, memory_map=CYTOTABLE_ARROW_USE_MEMORY_MAPPING
)
# stem of source group name
# for example:
# targets: ['cytoplasm']
# source_group_name: 'Per_Cytoplasm.sqlite'
# source_group_name_stem: 'Cytoplasm'
source_group_name_stem = targets[
# return first result from generator below as index to targets
next(
i
for i, val in enumerate(targets)
# compare if value from targets in source_group_name stem
if val.lower() in str(pathlib.Path(source_group_name).stem).lower()
)
# capitalize the result
].capitalize()
# capture updated column names as new variable
updated_column_names = []
for column_name in table.column_names:
# if-condition for prepending source_group_name_stem to column name
# where colname is not in identifying_columns parameter values
# and where the column is not already prepended with source_group_name_stem
# for example:
# source_group_name_stem: 'Cells'
# column_name: 'AreaShape_Area'
# updated_column_name: 'Cells_AreaShape_Area'
if column_name not in identifying_columns and not column_name.startswith(
source_group_name_stem.capitalize()
):
updated_column_names.append(f"{source_group_name_stem}_{column_name}")
# if-condition for prepending 'Metadata_' to column name
# where colname is in identifying_columns parameter values
# and where the column is already prepended with source_group_name_stem
# for example:
# source_group_name_stem: 'Cells'
# column_name: 'Cells_Number_Object_Number'
# updated_column_name: 'Metadata_Cells_Number_Object_Number'
elif column_name in identifying_columns and column_name.startswith(
source_group_name_stem.capitalize()
):
updated_column_names.append(f"Metadata_{column_name}")
# if-condition for prepending 'Metadata' and source_group_name_stem to column name
# where colname is in identifying_columns parameter values
# and where the colname does not already start with 'Metadata_'
# and colname not in metadata list
# and colname does not include 'ObjectNumber' or 'TableNumber'
# (which are specially treated column names in this context)
# for example:
# source_group_name_stem: 'Cells'
# column_name: 'Parent_Nuclei'
# updated_column_name: 'Metadata_Cells_Parent_Nuclei'
elif (
column_name in identifying_columns
and not column_name.startswith("Metadata_")
and not any(item.capitalize() in column_name for item in metadata)
and not any(item in column_name for item in ["ObjectNumber", "TableNumber"])
):
updated_column_names.append(
f"Metadata_{source_group_name_stem}_{column_name}"
)
# if-condition for prepending 'Metadata' to column name
# where colname doesn't already start with 'Metadata_'
# and colname is in identifying_columns parameter values
# for example:
# column_name: 'ObjectNumber'
# updated_column_name: 'Metadata_ObjectNumber'
elif (
not column_name.startswith("Metadata_")
and column_name in identifying_columns
):
updated_column_names.append(f"Metadata_{column_name}")
# else we add the existing colname to the updated list as-is
else:
updated_column_names.append(column_name)
# perform table column name updates
_write_parquet_table_with_metadata(
table=table.rename_columns(updated_column_names), where=table_path
)
return table_path
@python_app
def _concat_source_group(
source_group_name: str,
source_group: List[Dict[str, Any]],
dest_path: str,
common_schema: Optional[List[Tuple[str, str]]] = None,
sort_output: bool = True,
) -> List[Dict[str, Any]]:
"""
Concatenate group of source data together as single file.
For a reference to data concatenation within Arrow see the following:
https://arrow.apache.org/docs/python/generated/pyarrow.concat_tables.html
Notes: this function presumes a multi-directory, multi-file common data
structure for compartments and other data. For example:
Source (file tree):
.. code-block:: bash
root
├── subdir_1
│ └── Cells.csv
└── subdir_2
└── Cells.csv
Becomes:
.. code-block:: python
# earlier data read into parquet chunks from multiple
# data source files.
read_data = [
{"table": ["cells-1.parquet", "cells-2.parquet"]},
{"table": ["cells-1.parquet", "cells-2.parquet"]},
]
# focus of this function
concatted = [{"table": ["cells.parquet"]}]
Args:
source_group_name: str
Name of data source source group (for common compartments, etc).
source_group: List[Dict[str, Any]]:
Data structure containing grouped data for concatenation.
dest_path: Optional[str] (Default value = None)
Optional destination path for concatenated sources.
common_schema: List[Tuple[str, str]] (Default value = None)
Common schema to use for concatenation amongst arrow tables
which may have slightly different but compatible schema.
sort_output: bool
Specifies whether to sort cytotable output or not.
Returns:
List[Dict[str, Any]]
Updated dictionary containing concatenated sources.
"""
import errno
import pathlib
import pyarrow as pa
import pyarrow.parquet as parquet
from cytotable.constants import (
CYTOTABLE_ARROW_USE_MEMORY_MAPPING,
CYTOTABLE_DEFAULT_PARQUET_METADATA,
)
from cytotable.exceptions import SchemaException
from cytotable.utils import _natural_sort
# build a result placeholder
concatted: List[Dict[str, Any]] = [
{
# source path becomes parent's parent dir with the same filename
"source_path": pathlib.Path(
(
f"{source_group[0]['source_path'].parent.parent}"
f"/{source_group[0]['source_path'].stem}"
)
)
}
]
# build destination path for file to land
destination_path = pathlib.Path(
(
f"{dest_path}/{str(pathlib.Path(source_group_name).stem).lower()}/"
f"{str(pathlib.Path(source_group_name).stem).lower()}.parquet"
)
)
# if there's already a file remove it
destination_path.unlink(missing_ok=True)
# ensure the parent directories exist:
destination_path.parent.mkdir(parents=True, exist_ok=True)
# build the schema for concatenation writer
writer_schema = pa.schema(common_schema).with_metadata(
CYTOTABLE_DEFAULT_PARQUET_METADATA
)
# build a parquet file writer which will be used to append files
# as a single concatted parquet file, referencing the first file's schema
# (all must be the same schema)
with parquet.ParquetWriter(str(destination_path), writer_schema) as writer:
for source in source_group:
tables = [table for table in source["table"]]
if sort_output:
tables = _natural_sort(tables)
for table in tables:
# if we haven't inferred the common schema
# check that our file matches the expected schema, otherwise raise an error
if common_schema is None and not writer_schema.equals(
parquet.read_schema(table)
):
raise SchemaException(
(
f"Detected mismatching schema for target concatenation group members:"
f" {str(source_group[0]['table'])} and {str(table)}"
)
)
# read the file from the list and write to the concatted parquet file
# note: we pass column order based on the first chunk file to help ensure schema
# compatibility for the writer
writer.write_table(
parquet.read_table(
table,
schema=writer_schema,
memory_map=CYTOTABLE_ARROW_USE_MEMORY_MAPPING,
)
)
# remove the file which was written in the concatted parquet file (we no longer need it)
pathlib.Path(table).unlink()
# attempt to clean up dir containing original table(s) only if it's empty
try:
pathlib.Path(pathlib.Path(source["table"][0]).parent).rmdir()
except OSError as os_err:
# raise only if we don't have a dir not empty errno
if os_err.errno != errno.ENOTEMPTY:
raise
# return the concatted parquet filename
concatted[0]["table"] = [destination_path]
return concatted
@python_app()
def _prepare_join_sql(
sources: Dict[str, List[Dict[str, Any]]],
joins: str,
) -> str:
"""
Prepare join SQL statement with actual locations of data based on the sources.
Args:
sources: Dict[str, List[Dict[str, Any]]]:
Grouped datasets of files which will be used by other functions.
Includes the metadata concerning location of actual data.
joins: str:
DuckDB-compatible SQL which will be used to perform the join
operations using the join_group keys as a reference.
sort_output: bool
Specifies whether to sort cytotable output or not.
Returns:
str:
String representing the SQL to be used in later join work.
"""
import pathlib
# replace with real location of sources for join sql
order_by_tables = []
for key, val in sources.items():
if pathlib.Path(key).stem.lower() in joins.lower():
table_name = str(pathlib.Path(key).stem.lower())
joins = joins.replace(
f"'{table_name}.parquet'",
str([str(table) for table in val[0]["table"]]),
)
order_by_tables.append(table_name)
# add the order by statements to the join
return joins
@python_app
def _join_source_pageset(
dest_path: str,
joins: str,
page_key: str,
pageset: Tuple[int, int],
sort_output: bool,
drop_null: bool,
) -> str:
"""
Join sources based on join group keys (group of specific join column values)
Args:
dest_path: str:
Destination path to write file-based content.
joins: str:
DuckDB-compatible SQL which will be used to perform the join
operations using the join_group keys as a reference.
join_group: List[Dict[str, Any]]:
Group of joinable keys to be used as "chunked" filter
of overall dataset.
drop_null: bool:
Whether to drop rows with null values within the resulting
joined data.
Returns:
str
Path to joined file which is created as a result of this function.
"""
import pathlib
from cytotable.utils import _duckdb_reader, _write_parquet_table_with_metadata
with _duckdb_reader() as ddb_reader:
result = ddb_reader.execute(
f"""
WITH joined AS (
{joins}
)
SELECT *
FROM joined
WHERE {page_key} BETWEEN {pageset[0]} AND {pageset[1]}
/* optional sorting per pagset */
{"ORDER BY " + page_key if sort_output else ""};
"""
).arrow()
# drop nulls if specified
if drop_null:
result = result.drop_null()
# account for duplicate column names from joins
cols = []
# reversed order column check as col removals will change index order
for i, colname in reversed(list(enumerate(result.column_names))):
if colname not in cols:
cols.append(colname)
else:
result = result.remove_column(i)
# inner sorted alphabetizes any columns which may not be part of custom_sort
# outer sort provides pycytominer-specific column sort order
result = result.select(sorted(sorted(result.column_names), key=_column_sort))
result_file_path = (
# store the result in the parent of the dest_path
f"{str(pathlib.Path(dest_path).parent)}/"
# use the dest_path stem in the name
f"{str(pathlib.Path(dest_path).stem)}-"
# add the pageset indication to the filename
f"{pageset[0]}-{pageset[1]}.parquet"
)
# write the result
_write_parquet_table_with_metadata(
table=result,
where=result_file_path,
)
return result_file_path
@python_app
def _concat_join_sources(
sources: Dict[str, List[Dict[str, Any]]],
dest_path: str,
join_sources: List[str],
sort_output: bool = True,
) -> str:
"""
Concatenate join sources from parquet-based chunks.
For a reference to data concatenation within Arrow see the following:
https://arrow.apache.org/docs/python/generated/pyarrow.concat_tables.html
Args:
sources: Dict[str, List[Dict[str, Any]]]:
Grouped datasets of files which will be used by other functions.
Includes the metadata concerning location of actual data.
dest_path: str:
Destination path to write file-based content.
join_sources: List[str]:
List of local filepath destination for join source chunks
which will be concatenated.
sort_output: bool
Specifies whether to sort cytotable output or not.
Returns:
str
Path to concatenated file which is created as a result of this function.
"""
import pathlib
import shutil
import pyarrow.parquet as parquet
from cytotable.constants import (
CYTOTABLE_ARROW_USE_MEMORY_MAPPING,
CYTOTABLE_DEFAULT_PARQUET_METADATA,
)
from cytotable.utils import _natural_sort
# remove the unjoined concatted compartments to prepare final dest_path usage
# (we now have joined results)
flattened_sources = list(itertools.chain(*list(sources.values())))
for source in flattened_sources:
for table in source["table"]:
pathlib.Path(table).unlink(missing_ok=True)
# remove dir if we have it
if pathlib.Path(dest_path).is_dir():
shutil.rmtree(path=dest_path)
# build a parquet file writer which will be used to append files
# as a single concatted parquet file, referencing the first file's schema
# (all must be the same schema)
writer_schema = parquet.read_schema(join_sources[0]).with_metadata(
CYTOTABLE_DEFAULT_PARQUET_METADATA
)
with parquet.ParquetWriter(str(dest_path), writer_schema) as writer:
for table_path in (
join_sources
if not sort_output
else _natural_sort(list_to_sort=join_sources)
):
writer.write_table(
parquet.read_table(
table_path,
schema=writer_schema,
memory_map=CYTOTABLE_ARROW_USE_MEMORY_MAPPING,
)
)
# remove the file which was written in the concatted parquet file (we no longer need it)
pathlib.Path(table_path).unlink()
# return modified sources format to indicate the final result
# and retain the other source data for reference as needed
return dest_path
@python_app
def _infer_source_group_common_schema(
source_group: List[Dict[str, Any]],
data_type_cast_map: Optional[Dict[str, str]] = None,
) -> List[Tuple[str, str]]:
"""
Infers a common schema for group of parquet files which may have
similar but slightly different schema or data. Intended to assist with
data concatenation and other operations.
Args:
source_group: List[Dict[str, Any]]:
Group of one or more data sources which includes metadata about
path to parquet data.
data_type_cast_map: Optional[Dict[str, str]], default None
A dictionary mapping data type groups to specific types.
Roughly includes Arrow data types language from:
https://arrow.apache.org/docs/python/api/datatypes.html
Returns:
List[Tuple[str, str]]
A list of tuples which includes column name and PyArrow datatype.
This data will later be used as the basis for forming a PyArrow schema.
"""
import pyarrow as pa
import pyarrow.parquet as parquet
from cytotable.exceptions import SchemaException
# read first file for basis of schema and column order for all others
common_schema = parquet.read_schema(source_group[0]["table"][0])
# infer common basis of schema and column order for all others
for schema in [
parquet.read_schema(table)
for source in source_group
for table in source["table"]
]:
# account for completely equal schema
if schema.equals(common_schema):
continue
# gather field names from schema
schema_field_names = [item.name for item in schema]
# reversed enumeration because removing indexes ascendingly changes schema field order
for index, field in reversed(list(enumerate(common_schema))):
# check whether field name is contained within writer basis, remove if not
# note: because this only checks for naming, we defer to initially detected type
if field.name not in schema_field_names:
common_schema = common_schema.remove(index)
# check if we have a nulltype and non-nulltype conflict, deferring to non-nulltype
elif pa.types.is_null(field.type) and not pa.types.is_null(
schema.field(field.name).type
):
common_schema = common_schema.set(
index, field.with_type(schema.field(field.name).type)
)
# check if we have an integer to float challenge and enable later casting
elif pa.types.is_integer(field.type) and pa.types.is_floating(
schema.field(field.name).type
):
common_schema = common_schema.set(
index,
field.with_type(
# use float64 as a default here if we aren't casting floats
pa.float64()
if data_type_cast_map is None
or "float" not in data_type_cast_map.keys()
# otherwise use the float data type cast type
else pa.type_for_alias(data_type_cast_map["float"])
),
)
if len(list(common_schema.names)) == 0:
raise SchemaException(
(
"No common schema basis to perform concatenation for source group."
" All columns mismatch one another within the group."
)
)
# return a python-native list of tuples with column names and str types
return list(
zip(
common_schema.names,
[str(schema_type) for schema_type in common_schema.types],
)
)
[docs]def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
source_path: str,
dest_path: str,
source_datatype: Optional[str],
metadata: Optional[Union[List[str], Tuple[str, ...]]],
compartments: Optional[Union[List[str], Tuple[str, ...]]],
identifying_columns: Optional[Union[List[str], Tuple[str, ...]]],
concat: bool,
join: bool,
joins: Optional[str],
chunk_size: Optional[int],
infer_common_schema: bool,
drop_null: bool,
sort_output: bool,
page_keys: Dict[str, str],
data_type_cast_map: Optional[Dict[str, str]] = None,
**kwargs,
) -> Union[Dict[str, List[Dict[str, Any]]], str]:
"""
Export data to parquet.
Args:
source_path: str:
str reference to read source files from.
Note: may be local or remote object-storage
location using convention "s3://..." or similar.
dest_path: str:
Path to write files to. This path will be used for
intermediary data work and must be a new file or directory path.
This parameter will result in a directory on `join=False`.
This parameter will result in a single file on `join=True`.
Note: this may only be a local path.
source_datatype: Optional[str]: (Default value = None)
Source datatype to focus on during conversion.
metadata: Union[List[str], Tuple[str, ...]]:
Metadata names to use for conversion.
compartments: Union[List[str], Tuple[str, ...]]: (Default value = None)
Compartment names to use for conversion.
identifying_columns: Union[List[str], Tuple[str, ...]]:
Column names which are used as ID's and as a result need to be
ignored with regards to renaming.
concat: bool:
Whether to concatenate similar files together.
join: bool:
Whether to join the compartment data together into one dataset.
joins: str:
DuckDB-compatible SQL which will be used to perform the join operations.
chunk_size: Optional[int],
Size of join chunks which is used to limit data size during join ops.
infer_common_schema: bool: (Default value = True)
Whether to infer a common schema when concatenating sources.
drop_null: bool:
Whether to drop null results.
sort_output: bool
Specifies whether to sort cytotable output or not.
page_keys: Dict[str, str]
A dictionary which defines which column names are used for keyset pagination
in order to perform data extraction.
data_type_cast_map: Dict[str, str]
A dictionary mapping data type groups to specific types.
Roughly includes Arrow data types language from:
https://arrow.apache.org/docs/python/api/datatypes.html
**kwargs: Any:
Keyword args used for gathering source data, primarily relevant for
Cloudpathlib cloud-based client configuration.
Returns:
Union[Dict[str, List[Dict[str, Any]]], str]:
Grouped sources which include metadata about destination filepath
where parquet file was written or a string filepath for the joined
result.
"""
# gather sources to be processed
sources = _gather_sources(
source_path=source_path,
source_datatype=source_datatype,
targets=(
list(metadata) + list(compartments)
if metadata is not None and compartments is not None
else []
),
**kwargs,
)
# expand the destination path
expanded_dest_path = _expand_path(path=dest_path)
# check that each source group name has a pagination key
for source_group_name in sources.keys():
matching_keys = [
key for key in page_keys.keys() if key.lower() in source_group_name.lower()
]
if not matching_keys:
raise CytoTableException(
f"No matching key found in page_keys for source_group_name: {source_group_name}."
"Please include a pagination key based on a column name from the table."
)
# prepare pagesets for chunked data export from source tables
pagesets_prepared = {
source_group_name: [
dict(
source,
**{
"page_key": (
page_key := [
value
for key, value in page_keys.items()
if key.lower() in source_group_name.lower()
][0]
),
"pagesets": _get_table_keyset_pagination_sets(
source=source,
chunk_size=chunk_size,
page_key=page_key,
),
},
)
for source in source_group_vals
]
for source_group_name, source_group_vals in sources.items()
}
# if pagesets is none and we haven't halted, remove the file as there
# were input formatting errors which will create challenges downstream
invalid_files_dropped = {
source_group_name: [
# ensure we have pagesets
source
for source in source_group_vals
if source["pagesets"] is not None
]
for source_group_name, source_group_vals in evaluate_futures(
pagesets_prepared
).items()
# ensure we have source_groups with at least one source table
if len(source_group_vals) > 0
}
# gather column names and types from source tables
column_names_and_types_gathered = {
source_group_name: [
dict(
source,
**{
"columns": _prep_cast_column_data_types(
columns=_get_table_columns_and_types(
source=source, sort_output=sort_output
),
data_type_cast_map=data_type_cast_map,
)
},
)
for source in source_group_vals
]
for source_group_name, source_group_vals in invalid_files_dropped.items()
}
results = {
source_group_name: [
dict(
source,
**{
"table": [
# perform column renaming and create potential return result
_prepend_column_name(
# perform chunked data export to parquet using pagesets
table_path=_source_pageset_to_parquet(
source_group_name=source_group_name,
source=source,
pageset=pageset,
dest_path=expanded_dest_path,
sort_output=sort_output,
),
source_group_name=source_group_name,
identifying_columns=identifying_columns,
metadata=metadata,
compartments=compartments,
)
for pageset in source["pagesets"]
]
},
)
for source in source_group_vals
]
for source_group_name, source_group_vals in evaluate_futures(
column_names_and_types_gathered
).items()
}
# if we're concatting or joining and need to infer the common schema
if (concat or join) and infer_common_schema:
# create a common schema for concatenation work
common_schema_determined = {
source_group_name: [
{
"sources": source_group_vals,
"common_schema": _infer_source_group_common_schema(
source_group=source_group_vals,
data_type_cast_map=data_type_cast_map,
),
}
]
for source_group_name, source_group_vals in evaluate_futures(
results
).items()
}
# if concat or join, concat the source groups
# note: join implies a concat, but concat does not imply a join
# We concat to join in order to create a common schema for join work
# performed after concatenation.
if concat or join:
# create a potential return result for concatenation output
results = {
source_group_name: _concat_source_group(
source_group_name=source_group_name,
source_group=source_group_vals[0]["sources"],
dest_path=expanded_dest_path,
common_schema=source_group_vals[0]["common_schema"],
sort_output=sort_output,
)
for source_group_name, source_group_vals in evaluate_futures(
common_schema_determined
).items()
}
# conditional section for merging
# note: join implies a concat, but concat does not imply a join
if join:
# evaluate the results as they're used multiple times below
evaluated_results = evaluate_futures(results)
prepared_joins_sql = _prepare_join_sql(
sources=evaluated_results, joins=joins
).result()
page_key_join = [
value for key, value in page_keys.items() if key.lower() == "join"
][0]
# map joined results based on the join groups gathered above
# note: after mapping we end up with a list of strings (task returns str)
join_sources_result = [
_join_source_pageset(
# gather the result of concatted sources prior to
# join group merging as each mapped task run will need
# full concat results
dest_path=expanded_dest_path,
joins=prepared_joins_sql,
page_key=page_key_join,
pageset=pageset,
sort_output=sort_output,
drop_null=drop_null,
)
# create join group for querying the concatenated
# data in order to perform memory-safe joining
# per user chunk size specification.
for pageset in _get_table_keyset_pagination_sets(
sql_stmt=prepared_joins_sql,
chunk_size=chunk_size,
page_key=page_key_join,
).result()
]
# concat our join chunks together as one cohesive dataset
# return results in common format which includes metadata
# for lineage and debugging
results = _concat_join_sources(
dest_path=expanded_dest_path,
join_sources=[join.result() for join in join_sources_result],
sources=evaluated_results,
sort_output=sort_output,
)
# wrap the final result as a future and return
return evaluate_futures(results)
[docs]def convert( # pylint: disable=too-many-arguments,too-many-locals
source_path: str,
dest_path: str,
dest_datatype: Literal["parquet"],
source_datatype: Optional[str] = None,
metadata: Optional[Union[List[str], Tuple[str, ...]]] = None,
compartments: Optional[Union[List[str], Tuple[str, ...]]] = None,
identifying_columns: Optional[Union[List[str], Tuple[str, ...]]] = None,
concat: bool = True,
join: bool = True,
joins: Optional[str] = None,
chunk_size: Optional[int] = None,
infer_common_schema: bool = True,
drop_null: bool = False,
data_type_cast_map: Optional[Dict[str, str]] = None,
page_keys: Optional[Dict[str, str]] = None,
sort_output: bool = True,
preset: Optional[str] = "cellprofiler_csv",
parsl_config: Optional[parsl.Config] = None,
**kwargs,
) -> Union[Dict[str, List[Dict[str, Any]]], str]:
"""
Convert file-based data from various sources to Pycytominer-compatible standards.
Note: source paths may be local or remote object-storage location
using convention "s3://..." or similar.
Args:
source_path: str:
str reference to read source files from.
Note: may be local or remote object-storage location
using convention "s3://..." or similar.
dest_path: str:
Path to write files to. This path will be used for
intermediary data work and must be a new file or directory path.
This parameter will result in a directory on `join=False`.
This parameter will result in a single file on `join=True`.
Note: this may only be a local path.
dest_datatype: Literal["parquet"]:
Destination datatype to write to.
source_datatype: Optional[str]: (Default value = None)
Source datatype to focus on during conversion.
metadata: Union[List[str], Tuple[str, ...]]:
Metadata names to use for conversion.
compartments: Union[List[str], Tuple[str, str, str, str]]:
(Default value = None)
Compartment names to use for conversion.
identifying_columns: Union[List[str], Tuple[str, ...]]:
Column names which are used as ID's and as a result need to be
ignored with regards to renaming.
concat: bool: (Default value = True)
Whether to concatenate similar files together.
join: bool: (Default value = True)
Whether to join the compartment data together into one dataset
joins: str: (Default value = None):
DuckDB-compatible SQL which will be used to perform the join operations.
chunk_size: Optional[int] (Default value = None)
Size of join chunks which is used to limit data size during join ops
infer_common_schema: bool (Default value = True)
Whether to infer a common schema when concatenating sources.
data_type_cast_map: Dict[str, str], (Default value = None)
A dictionary mapping data type groups to specific types.
Roughly includes Arrow data types language from:
https://arrow.apache.org/docs/python/api/datatypes.html
page_keys: str:
The table and column names to be used for key pagination.
Uses the form: {"table_name":"column_name"}.
Expects columns to include numeric data (ints or floats).
Interacts with the `chunk_size` parameter to form
pages of `chunk_size`.
sort_output: bool (Default value = True)
Specifies whether to sort cytotable output or not.
drop_null: bool (Default value = False)
Whether to drop nan/null values from results
preset: str (Default value = "cellprofiler_csv")
an optional group of presets to use based on common configurations
parsl_config: Optional[parsl.Config] (Default value = None)
Optional Parsl configuration to use for running CytoTable operations.
Note: when using CytoTable multiple times in the same process,
CytoTable will use the first provided configuration for all runs.
Returns:
Union[Dict[str, List[Dict[str, Any]]], str]
Grouped sources which include metadata about destination filepath
where parquet file was written or str of joined result filepath.
Example:
.. code-block:: python
from cytotable import convert
# using a local path with cellprofiler csv presets
convert(
source_path="./tests/data/cellprofiler/ExampleHuman",
source_datatype="csv",
dest_path="ExampleHuman.parquet",
dest_datatype="parquet",
preset="cellprofiler_csv",
)
# using an s3-compatible path with no signature for client
# and cellprofiler csv presets
convert(
source_path="s3://s3path",
source_datatype="csv",
dest_path="s3_local_result",
dest_datatype="parquet",
concat=True,
preset="cellprofiler_csv",
no_sign_request=True,
)
# using local path with cellprofiler sqlite presets
convert(
source_path="example.sqlite",
dest_path="example.parquet",
dest_datatype="parquet",
preset="cellprofiler_sqlite",
)
"""
# Raise an exception if an existing path is provided as the destination
# to avoid removing existing data or unrelated data removal.
if _expand_path(dest_path).exists():
raise CytoTableException(
(
"An existing file or directory was provided as dest_path: "
f"'{dest_path}'. Please use a new path for this parameter."
)
)
# attempt to load parsl configuration if we didn't already load one
if not _parsl_loaded():
# if we don't have a parsl configuration provided, load the default
if parsl_config is None:
parsl.load(_default_parsl_config())
else:
# else we attempt to load the given parsl configuration
parsl.load(parsl_config)
else:
# otherwise warn the user about previous config.
logger.warning("Reusing previously loaded Parsl configuration.")
# optionally load preset configuration for arguments
# note: defer to overrides from parameters whose values
# are not None (allows intermixing of presets and overrides)
if preset is not None:
metadata = (
cast(list, config[preset]["CONFIG_NAMES_METADATA"])
if metadata is None
else metadata
)
compartments = (
cast(list, config[preset]["CONFIG_NAMES_COMPARTMENTS"])
if compartments is None
else compartments
)
identifying_columns = (
cast(list, config[preset]["CONFIG_IDENTIFYING_COLUMNS"])
if identifying_columns is None
else identifying_columns
)
joins = cast(str, config[preset]["CONFIG_JOINS"]) if joins is None else joins
chunk_size = (
cast(int, config[preset]["CONFIG_CHUNK_SIZE"])
if chunk_size is None
else chunk_size
)
page_keys = (
cast(dict, config[preset]["CONFIG_PAGE_KEYS"])
if page_keys is None
else page_keys
)
# Raise an exception for scenarios where one configures CytoTable to join
# but does not provide a pagination key for the joins.
if join and (page_keys is None or "join" not in page_keys.keys()):
raise CytoTableException(
(
"When using join=True one must pass a 'join' pagination key "
"in the page_keys parameter. The 'join' pagination key is a column "
"name found within the joined results based on the SQL provided from "
"the joins parameter. This special key is required as not all columns "
"from the source tables might not be included."
)
)
# send sources to be written to parquet if selected
if dest_datatype == "parquet":
output = _to_parquet(
source_path=source_path,
dest_path=dest_path,
source_datatype=source_datatype,
metadata=metadata,
compartments=compartments,
identifying_columns=identifying_columns,
concat=concat,
join=join,
joins=joins,
chunk_size=chunk_size,
infer_common_schema=infer_common_schema,
drop_null=drop_null,
data_type_cast_map=data_type_cast_map,
sort_output=sort_output,
page_keys=cast(dict, page_keys),
**kwargs,
)
return output