"""
CytoTable: convert - transforming data for use with pyctyominer.
"""
import itertools
import logging
from typing import Any, Dict, List, Literal, Optional, Tuple, Union, cast
import parsl
from parsl.app.app import python_app
from cytotable.exceptions import CytoTableException
from cytotable.presets import config
from cytotable.sources import _gather_sources
from cytotable.utils import (
_column_sort,
_default_parsl_config,
_expand_path,
_parsl_loaded,
evaluate_futures,
)
logger = logging.getLogger(__name__)
@python_app
def _get_table_columns_and_types(
source: Dict[str, Any], sort_output: bool
) -> List[Dict[str, str]]:
"""
Gather column data from table through duckdb.
Args:
source: Dict[str, Any]
Contains source data details. Represents a single
file or table of some kind.
sort_output:
Specifies whether to sort cytotable output or not.
Returns:
List[Dict[str, str]]
list of dictionaries which each include column level information
"""
import duckdb
from cytotable.utils import _duckdb_reader, _sqlite_mixed_type_query_to_parquet
source_path = source["source_path"]
source_type = str(source_path.suffix).lower()
# prepare the data source in the form of a duckdb query
select_source = (
f"read_csv_auto('{source_path}')"
if source_type == ".csv"
else f"sqlite_scan('{source_path}', '{source['table_name']}')"
)
# Query top 5 results from table and use pragma_storage_info() to
# gather duckdb interpreted data typing. We gather 5 values for
# each column to help with type inferences (where smaller sets
# may yield lower data type accuracy for the full table).
select_query = """
/* we create an in-mem table for later use with the pragma_storage_info call
as this call only functions with materialized tables and not views or related */
CREATE TABLE column_details AS
(SELECT *
FROM &select_source
LIMIT 5
);
/* selects specific column metadata from pragma_storage_info */
SELECT DISTINCT
column_id,
column_name,
segment_type as column_dtype
FROM pragma_storage_info('column_details')
/* avoid duplicate entries in the form of VALIDITY segment_types */
WHERE segment_type != 'VALIDITY'
/* explicitly order the columns by their id to avoid inconsistent results */
ORDER BY column_id ASC;
"""
# attempt to read the data to parquet from duckdb
# with exception handling to read mixed-type data
# using sqlite3 and special utility function
try:
# isolate using new connection to read data based on pageset
# and export directly to parquet via duckdb (avoiding need to return data to python)
# perform the query and create a list of dictionaries with the column data for table
with _duckdb_reader() as ddb_reader:
return (
ddb_reader.execute(
select_query.replace("&select_source", select_source)
)
.arrow()
.to_pylist()
)
except duckdb.Error as e:
# if we see a mismatched type error
# run a more nuanced query through sqlite
# to handle the mixed types
if "Mismatch Type Error" in str(e) and source_type == ".sqlite":
arrow_data_tbl = _sqlite_mixed_type_query_to_parquet(
source_path=str(source["source_path"]),
table_name=str(source["table_name"]),
page_key=source["page_key"],
pageset=source["pagesets"][0],
sort_output=sort_output,
)
with _duckdb_reader() as ddb_reader:
return (
ddb_reader.execute(
select_query.replace("&select_source", "arrow_data_tbl")
)
.arrow()
.to_pylist()
)
else:
raise
@python_app
def _prep_cast_column_data_types(
columns: List[Dict[str, str]], data_type_cast_map: Dict[str, str]
) -> List[Dict[str, str]]:
"""
Cast data types per what is received in cast_map.
Example:
- columns: [{"column_id":0, "column_name":"colname", "column_dtype":"DOUBLE"}]
- data_type_cast_map: {"float": "float32"}
The above passed through this function will set the "column_dtype" value
to a "REAL" dtype ("REAL" in duckdb is roughly equivalent to "float32")
Args:
table_path: str:
Path to a parquet file which will be modified.
data_type_cast_map: Dict[str, str]
A dictionary mapping data type groups to specific types.
Roughly to eventually align with DuckDB types:
https://duckdb.org/docs/sql/data_types/overview
Note: includes synonym matching for common naming convention
use in Pandas and/or PyArrow via cytotable.utils.DATA_TYPE_SYNONYMS
Returns:
List[Dict[str, str]]
list of dictionaries which each include column level information
"""
from functools import partial
from cytotable.utils import _arrow_type_cast_if_specified
if data_type_cast_map is not None:
return list(
# map across all columns
map(
partial(
# attempts to cast the columns provided
_arrow_type_cast_if_specified,
# set static data_type_case_map arg for
# use with all fields
data_type_cast_map=data_type_cast_map,
),
columns,
)
)
return columns
@python_app
def _set_tablenumber(
sources: Dict[str, List[Dict[str, Any]]],
add_tablenumber: Optional[bool] = None,
) -> Dict[str, List[Dict[str, Any]]]:
"""
Gathers a "TableNumber" from the image table (if CSV) or
SQLite file (if SQLite source) which is a unique identifier
intended to help differentiate between imagenumbers
to create distinct records for single-cell profiles
referenced across multiple source data exports.
For example, ImageNumber column values from CellProfiler
will repeat across exports, meaning we may lose distinction
when combining multiple export files together through CytoTable.
Note:
- If using CSV data sources, the image.csv table is used for checksum.
- If using SQLite data sources, the entire SQLite database is used for checksum.
Args:
sources: Dict[str, List[Dict[str, Any]]]
Contains metadata about data tables and related contents.
add_tablenumber: Optional[bool]
Whether to add a calculated tablenumber.
Note: when False, adds None as the tablenumber
Returns:
List[Dict[str, Any]]
New source group with added TableNumber details.
"""
from cloudpathlib import AnyPath
from cytotable.utils import _gather_tablenumber_checksum
image_table_groups = {
# create a data structure with the common parent for each dataset
# and the calculated checksum from the image table.
# note: the source_path parent is used for non-SQLite files
# whereas the direct source path is used for SQLite files.
(
str(source["source_path"].parent)
if source["source_path"].suffix != "sqlite"
else source["source_path"]
): source["source_path"]
for source_group_name, source_group_vals in sources.items()
# use the image tables references only for the basis of the
# these calculations.
if any(
value in str(AnyPath(source_group_name).stem).lower()
for value in ["image", "per_image"]
)
for source in source_group_vals
}
# determine if we need to add tablenumber data
if (
# case for detecting multiple image tables which need to be differentiated
add_tablenumber is None
and (len(image_table_groups) <= 1)
) or (
# case for explicitly set no tablenumbers
add_tablenumber
is False
):
return {
source_group_name: [
dict(
source,
**{
"tablenumber": None,
},
)
for source in source_group_vals
]
for source_group_name, source_group_vals in sources.items()
}
# gather the image table from the source_group
tablenumber_table = {
# create a data structure with the common parent for each dataset
# and the calculated checksum from the image table
group: _gather_tablenumber_checksum(path)
for group, path in image_table_groups.items()
}
# return a modified sources data structure with the tablenumber added
return {
source_group_name: [
dict(
source,
**{"tablenumber": tablenumber_table[str(source["source_path"].parent)]},
)
for source in source_group_vals
if str(source["source_path"].parent) in list(tablenumber_table.keys())
]
for source_group_name, source_group_vals in sources.items()
}
@python_app
def _get_table_keyset_pagination_sets(
chunk_size: int,
page_key: str,
source: Optional[Dict[str, Any]] = None,
sql_stmt: Optional[str] = None,
) -> Union[List[Tuple[Union[int, float], Union[int, float]]], None]:
"""
Get table data chunk keys for later use in capturing segments
of values. This work also provides a chance to catch problematic
input data which will be ignored with warnings.
Args:
source: Dict[str, Any]
Contains the source data to be chunked. Represents a single
file or table of some kind.
chunk_size: int
The size in rowcount of the chunks to create.
page_key: str
The column name to be used to identify pagination chunks.
Expected to be of numeric type (int, float) for ordering.
sql_stmt:
Optional sql statement to form the pagination set from.
Default behavior extracts pagination sets from the full
data source.
Returns:
List[Any]
List of keys to use for reading the data later on.
"""
import logging
import sqlite3
from contextlib import closing
import duckdb
from cytotable.exceptions import NoInputDataException
from cytotable.utils import _duckdb_reader, _generate_pagesets
logger = logging.getLogger(__name__)
if source is not None:
table_name = source["table_name"] if "table_name" in source.keys() else None
source_path = source["source_path"]
source_type = str(source_path.suffix).lower()
try:
with _duckdb_reader() as ddb_reader:
if source_type == ".csv":
sql_query = f"SELECT {page_key} FROM read_csv_auto('{source_path}', header=TRUE, delim=',') ORDER BY {page_key}"
else:
sql_query = f"SELECT {page_key} FROM sqlite_scan('{source_path}', '{table_name}') ORDER BY {page_key}"
page_keys = [
results[0] for results in ddb_reader.execute(sql_query).fetchall()
]
# exception case for when we have mixed types
# (i.e. integer col with string and ints) in a sqlite column
except duckdb.TypeMismatchException:
with closing(sqlite3.connect(source_path)) as cx:
with cx:
page_keys = [
key[0]
for key in cx.execute(
f"SELECT {page_key} FROM {table_name} ORDER BY {page_key};"
).fetchall()
if isinstance(key[0], (int, float))
]
except (
duckdb.InvalidInputException,
NoInputDataException,
) as invalid_input_exc:
logger.warning(
msg=f"Skipping file due to input file errors: {str(invalid_input_exc)}"
)
return None
elif sql_stmt is not None:
with _duckdb_reader() as ddb_reader:
sql_query = f"SELECT {page_key} FROM ({sql_stmt}) ORDER BY {page_key}"
page_keys = ddb_reader.execute(sql_query).fetchall()
page_keys = [key[0] for key in page_keys]
return _generate_pagesets(page_keys, chunk_size)
@python_app
def _source_pageset_to_parquet(
source_group_name: str,
source: Dict[str, Any],
pageset: Tuple[Union[int, float], Union[int, float]],
dest_path: str,
sort_output: bool,
) -> str:
"""
Export source data to chunked parquet file using chunk size and offsets.
Args:
source_group_name: str
Name of the source group (for ex. compartment or metadata table name).
source: Dict[str, Any]
Contains the source data to be chunked. Represents a single
file or table of some kind along with collected information about table.
pageset: Tuple[int, int]
The pageset for chunking the data from source.
dest_path: str
Path to store the output data.
sort_output: bool
Specifies whether to sort cytotable output or not.
Returns:
str
A string of the output filepath.
"""
import pathlib
import duckdb
from cloudpathlib import AnyPath
from cytotable.utils import (
_duckdb_reader,
_sqlite_mixed_type_query_to_parquet,
_write_parquet_table_with_metadata,
)
# attempt to build dest_path
source_dest_path = (
f"{dest_path}/{str(AnyPath(source_group_name).stem).lower()}/"
f"{str(source['source_path'].parent.name).lower()}"
)
pathlib.Path(source_dest_path).mkdir(parents=True, exist_ok=True)
# build tablenumber segment addition (if necessary)
tablenumber_sql = (
# to become tablenumber in sql select later with bigint (8-byte integer)
# we cast here to bigint to avoid concat or join conflicts later due to
# misaligned automatic data typing.
f"CAST({source['tablenumber']} AS BIGINT) as TableNumber, "
if source["tablenumber"] is not None
# don't introduce the column if we aren't supposed to add tablenumber
# as per parameter.
else ""
)
# add source table columns
casted_source_cols = [
# here we cast the column to the specified type ensure the colname remains the same
f"CAST(\"{column['column_name']}\" AS {column['column_dtype']}) AS \"{column['column_name']}\""
for column in source["columns"]
]
# create selection statement from tablenumber_sql + lists above
select_columns = tablenumber_sql + ",".join(
# if we should sort the output, add the metadata_cols
casted_source_cols
if sort_output
else casted_source_cols
)
# build output query and filepath base
# (chunked output will append offset to keep output paths unique)
if str(source["source_path"].suffix).lower() == ".csv":
base_query = f"SELECT {select_columns} FROM read_csv_auto('{str(source['source_path'])}', header=TRUE, delim=',')"
result_filepath_base = f"{source_dest_path}/{str(source['source_path'].stem)}"
elif str(source["source_path"].suffix).lower() == ".sqlite":
base_query = f"SELECT {select_columns} FROM sqlite_scan('{str(source['source_path'])}', '{str(source['table_name'])}')"
result_filepath_base = f"{source_dest_path}/{str(source['source_path'].stem)}.{source['table_name']}"
# form a filepath which indicates the pageset
result_filepath = f"{result_filepath_base}-{pageset[0]}-{pageset[1]}.parquet"
# Attempt to read the data to parquet file
# using duckdb for extraction and pyarrow for
# writing data to a parquet file.
try:
# read data with chunk size + offset
# and export to parquet
with _duckdb_reader() as ddb_reader:
_write_parquet_table_with_metadata(
table=ddb_reader.execute(
f"""
{base_query}
WHERE {source['page_key']} BETWEEN {pageset[0]} AND {pageset[1]}
/* optional ordering per pageset */
{"ORDER BY " + source['page_key'] if sort_output else ""};
"""
).arrow(),
where=result_filepath,
)
# Include exception handling to read mixed-type data
# using sqlite3 and special utility function.
except duckdb.Error as e:
# if we see a mismatched type error
# run a more nuanced query through sqlite
# to handle the mixed types
if (
"Mismatch Type Error" in str(e)
and str(source["source_path"].suffix).lower() == ".sqlite"
):
_write_parquet_table_with_metadata(
# here we use sqlite instead of duckdb to extract
# data for special cases where column and value types
# may not align (which is valid functionality in SQLite).
table=_sqlite_mixed_type_query_to_parquet(
source_path=str(source["source_path"]),
table_name=str(source["table_name"]),
page_key=source["page_key"],
pageset=pageset,
sort_output=sort_output,
tablenumber=source["tablenumber"],
),
where=result_filepath,
)
else:
raise
# return the filepath for the chunked output file
return result_filepath
@python_app
def _prepend_column_name(
table_path: str,
source_group_name: str,
identifying_columns: List[str],
metadata: Union[List[str], Tuple[str, ...]],
compartments: List[str],
) -> str:
"""
Rename columns using the source group name, avoiding identifying columns.
Notes:
* A source_group_name represents a filename referenced as part of what
is specified within targets.
Args:
table_path: str:
Path to a parquet file which will be modified.
source_group_name: str:
Name of data source source group (for common compartments, etc).
identifying_columns: List[str]:
Column names which are used as ID's and as a result need to be
treated differently when renaming.
metadata: Union[List[str], Tuple[str, ...]]:
List of source data names which are used as metadata.
compartments: List[str]:
List of source data names which are used as compartments.
Returns:
str
Path to the modified file.
"""
import logging
import pathlib
import pyarrow.parquet as parquet
from cytotable.constants import CYTOTABLE_ARROW_USE_MEMORY_MAPPING
from cytotable.utils import _write_parquet_table_with_metadata
logger = logging.getLogger(__name__)
targets = tuple(metadata) + tuple(compartments)
# if we have no targets or metadata to work from, return the table unchanged
if len(targets) == 0:
logger.warning(
msg=(
"Skipping column name prepend operations "
"because no compartments or metadata were provided."
)
)
return table_path
table = parquet.read_table(
source=table_path, memory_map=CYTOTABLE_ARROW_USE_MEMORY_MAPPING
)
# stem of source group name
# for example:
# targets: ['cytoplasm']
# source_group_name: 'Per_Cytoplasm.sqlite'
# source_group_name_stem: 'Cytoplasm'
source_group_name_stem = targets[
# return first result from generator below as index to targets
next(
i
for i, val in enumerate(targets)
# compare if value from targets in source_group_name stem
if val.lower() in str(pathlib.Path(source_group_name).stem).lower()
)
# capitalize the result
].capitalize()
# capture updated column names as new variable
updated_column_names = []
for column_name in table.column_names:
# if-condition for prepending source_group_name_stem to column name
# where colname is not in identifying_columns parameter values
# and where the column is not already prepended with source_group_name_stem
# for example:
# source_group_name_stem: 'Cells'
# column_name: 'AreaShape_Area'
# updated_column_name: 'Cells_AreaShape_Area'
if column_name not in identifying_columns and not column_name.startswith(
source_group_name_stem.capitalize()
):
updated_column_names.append(f"{source_group_name_stem}_{column_name}")
# if-condition for prepending 'Metadata_' to column name
# where colname is in identifying_columns parameter values
# and where the column is already prepended with source_group_name_stem
# for example:
# source_group_name_stem: 'Cells'
# column_name: 'Cells_Number_Object_Number'
# updated_column_name: 'Metadata_Cells_Number_Object_Number'
elif column_name in identifying_columns and column_name.startswith(
source_group_name_stem.capitalize()
):
updated_column_names.append(f"Metadata_{column_name}")
# if-condition for prepending 'Metadata' and source_group_name_stem to column name
# where colname is in identifying_columns parameter values
# and where the colname does not already start with 'Metadata_'
# and colname not in metadata list
# and colname does not include 'ObjectNumber' or 'TableNumber'
# (which are specially treated column names in this context)
# for example:
# source_group_name_stem: 'Cells'
# column_name: 'Parent_Nuclei'
# updated_column_name: 'Metadata_Cells_Parent_Nuclei'
elif (
column_name in identifying_columns
and not column_name.startswith("Metadata_")
and not any(item.capitalize() in column_name for item in metadata)
and not any(item in column_name for item in ["ObjectNumber", "TableNumber"])
):
updated_column_names.append(
f"Metadata_{source_group_name_stem}_{column_name}"
)
# if-condition for prepending 'Metadata' to column name
# where colname doesn't already start with 'Metadata_'
# and colname is in identifying_columns parameter values
# for example:
# column_name: 'ObjectNumber'
# updated_column_name: 'Metadata_ObjectNumber'
elif (
not column_name.startswith("Metadata_")
and column_name in identifying_columns
):
updated_column_names.append(f"Metadata_{column_name}")
# else we add the existing colname to the updated list as-is
else:
updated_column_names.append(column_name)
# perform table column name updates
_write_parquet_table_with_metadata(
table=table.rename_columns(updated_column_names), where=table_path
)
return table_path
@python_app
def _concat_source_group(
source_group_name: str,
source_group: List[Dict[str, Any]],
dest_path: str,
common_schema: Optional[List[Tuple[str, str]]] = None,
sort_output: bool = True,
) -> List[Dict[str, Any]]:
"""
Concatenate group of source data together as single file.
For a reference to data concatenation within Arrow see the following:
https://arrow.apache.org/docs/python/generated/pyarrow.concat_tables.html
Notes: this function presumes a multi-directory, multi-file common data
structure for compartments and other data. For example:
Source (file tree):
.. code-block:: bash
root
├── subdir_1
│ └── Cells.csv
└── subdir_2
└── Cells.csv
Becomes:
.. code-block:: python
# earlier data read into parquet chunks from multiple
# data source files.
read_data = [
{"table": ["cells-1.parquet", "cells-2.parquet"]},
{"table": ["cells-1.parquet", "cells-2.parquet"]},
]
# focus of this function
concatted = [{"table": ["cells.parquet"]}]
Args:
source_group_name: str
Name of data source source group (for common compartments, etc).
source_group: List[Dict[str, Any]]:
Data structure containing grouped data for concatenation.
dest_path: Optional[str] (Default value = None)
Optional destination path for concatenated sources.
common_schema: List[Tuple[str, str]] (Default value = None)
Common schema to use for concatenation amongst arrow tables
which may have slightly different but compatible schema.
sort_output: bool
Specifies whether to sort cytotable output or not.
Returns:
List[Dict[str, Any]]
Updated dictionary containing concatenated sources.
"""
import errno
import pathlib
import pyarrow as pa
import pyarrow.parquet as parquet
from cytotable.constants import (
CYTOTABLE_ARROW_USE_MEMORY_MAPPING,
CYTOTABLE_DEFAULT_PARQUET_METADATA,
)
from cytotable.exceptions import SchemaException
from cytotable.utils import _natural_sort
# build a result placeholder
concatted: List[Dict[str, Any]] = [
{
# source path becomes parent's parent dir with the same filename
"source_path": pathlib.Path(
(
f"{source_group[0]['source_path'].parent.parent}"
f"/{source_group[0]['source_path'].stem}"
)
)
}
]
# build destination path for file to land
destination_path = pathlib.Path(
(
f"{dest_path}/{str(pathlib.Path(source_group_name).stem).lower()}/"
f"{str(pathlib.Path(source_group_name).stem).lower()}.parquet"
)
)
# if there's already a file remove it
destination_path.unlink(missing_ok=True)
# ensure the parent directories exist:
destination_path.parent.mkdir(parents=True, exist_ok=True)
# build the schema for concatenation writer
writer_schema = pa.schema(common_schema).with_metadata(
CYTOTABLE_DEFAULT_PARQUET_METADATA
)
# build a parquet file writer which will be used to append files
# as a single concatted parquet file, referencing the first file's schema
# (all must be the same schema)
with parquet.ParquetWriter(str(destination_path), writer_schema) as writer:
for source in source_group:
tables = [table for table in source["table"]]
if sort_output:
tables = _natural_sort(tables)
for table in tables:
# if we haven't inferred the common schema
# check that our file matches the expected schema, otherwise raise an error
if common_schema is None and not writer_schema.equals(
parquet.read_schema(table)
):
raise SchemaException(
(
f"Detected mismatching schema for target concatenation group members:"
f" {str(source_group[0]['table'])} and {str(table)}"
)
)
# read the file from the list and write to the concatted parquet file
# note: we pass column order based on the first chunk file to help ensure schema
# compatibility for the writer
writer.write_table(
parquet.read_table(
table,
schema=writer_schema,
memory_map=CYTOTABLE_ARROW_USE_MEMORY_MAPPING,
)
)
# remove the file which was written in the concatted parquet file (we no longer need it)
pathlib.Path(table).unlink()
# attempt to clean up dir containing original table(s) only if it's empty
try:
pathlib.Path(pathlib.Path(source["table"][0]).parent).rmdir()
except OSError as os_err:
# raise only if we don't have a dir not empty errno
if os_err.errno != errno.ENOTEMPTY:
raise
# return the concatted parquet filename
concatted[0]["table"] = [destination_path]
return concatted
@python_app()
def _prepare_join_sql(
sources: Dict[str, List[Dict[str, Any]]],
joins: str,
) -> str:
"""
Prepare join SQL statement with actual locations of data based on the sources.
Args:
sources: Dict[str, List[Dict[str, Any]]]:
Grouped datasets of files which will be used by other functions.
Includes the metadata concerning location of actual data.
joins: str:
DuckDB-compatible SQL which will be used to perform the join
operations using the join_group keys as a reference.
sort_output: bool
Specifies whether to sort cytotable output or not.
Returns:
str:
String representing the SQL to be used in later join work.
"""
import pathlib
# replace with real location of sources for join sql
order_by_tables = []
for key, val in sources.items():
if pathlib.Path(key).stem.lower() in joins.lower():
table_name = str(pathlib.Path(key).stem.lower())
joins = joins.replace(
f"'{table_name}.parquet'",
str([str(table) for table in val[0]["table"]]),
)
order_by_tables.append(table_name)
# add the order by statements to the join
return joins
@python_app
def _join_source_pageset(
dest_path: str,
joins: str,
page_key: str,
pageset: Tuple[int, int],
sort_output: bool,
drop_null: bool,
) -> str:
"""
Join sources based on join group keys (group of specific join column values)
Args:
dest_path: str:
Destination path to write file-based content.
joins: str:
DuckDB-compatible SQL which will be used to perform the join
operations using the join_group keys as a reference.
join_group: List[Dict[str, Any]]:
Group of joinable keys to be used as "chunked" filter
of overall dataset.
drop_null: bool:
Whether to drop rows with null values within the resulting
joined data.
Returns:
str
Path to joined file which is created as a result of this function.
"""
import pathlib
from cytotable.utils import _duckdb_reader, _write_parquet_table_with_metadata
with _duckdb_reader() as ddb_reader:
result = ddb_reader.execute(
f"""
WITH joined AS (
{joins}
)
SELECT *
FROM joined
WHERE {page_key} BETWEEN {pageset[0]} AND {pageset[1]}
/* optional sorting per pagset */
{"ORDER BY " + page_key if sort_output else ""};
"""
).arrow()
# drop nulls if specified
if drop_null:
result = result.drop_null()
# account for duplicate column names from joins
cols = []
# reversed order column check as col removals will change index order
for i, colname in reversed(list(enumerate(result.column_names))):
if colname not in cols:
cols.append(colname)
else:
result = result.remove_column(i)
# inner sorted alphabetizes any columns which may not be part of custom_sort
# outer sort provides pycytominer-specific column sort order
result = result.select(sorted(sorted(result.column_names), key=_column_sort))
result_file_path = (
# store the result in the parent of the dest_path
f"{str(pathlib.Path(dest_path).parent)}/"
# use the dest_path stem in the name
f"{str(pathlib.Path(dest_path).stem)}-"
# add the pageset indication to the filename
f"{pageset[0]}-{pageset[1]}.parquet"
)
# write the result
_write_parquet_table_with_metadata(
table=result,
where=result_file_path,
)
return result_file_path
@python_app
def _concat_join_sources(
sources: Dict[str, List[Dict[str, Any]]],
dest_path: str,
join_sources: List[str],
sort_output: bool = True,
) -> str:
"""
Concatenate join sources from parquet-based chunks.
For a reference to data concatenation within Arrow see the following:
https://arrow.apache.org/docs/python/generated/pyarrow.concat_tables.html
Args:
sources: Dict[str, List[Dict[str, Any]]]:
Grouped datasets of files which will be used by other functions.
Includes the metadata concerning location of actual data.
dest_path: str:
Destination path to write file-based content.
join_sources: List[str]:
List of local filepath destination for join source chunks
which will be concatenated.
sort_output: bool
Specifies whether to sort cytotable output or not.
Returns:
str
Path to concatenated file which is created as a result of this function.
"""
import pathlib
import shutil
import pyarrow.parquet as parquet
from cytotable.constants import (
CYTOTABLE_ARROW_USE_MEMORY_MAPPING,
CYTOTABLE_DEFAULT_PARQUET_METADATA,
)
from cytotable.utils import _natural_sort
# remove the unjoined concatted compartments to prepare final dest_path usage
# (we now have joined results)
flattened_sources = list(itertools.chain(*list(sources.values())))
for source in flattened_sources:
for table in source["table"]:
pathlib.Path(table).unlink(missing_ok=True)
# remove dir if we have it
if pathlib.Path(dest_path).is_dir():
shutil.rmtree(path=dest_path)
# build a parquet file writer which will be used to append files
# as a single concatted parquet file, referencing the first file's schema
# (all must be the same schema)
writer_schema = parquet.read_schema(join_sources[0]).with_metadata(
CYTOTABLE_DEFAULT_PARQUET_METADATA
)
with parquet.ParquetWriter(str(dest_path), writer_schema) as writer:
for table_path in (
join_sources
if not sort_output
else _natural_sort(list_to_sort=join_sources)
):
writer.write_table(
parquet.read_table(
table_path,
schema=writer_schema,
memory_map=CYTOTABLE_ARROW_USE_MEMORY_MAPPING,
)
)
# remove the file which was written in the concatted parquet file (we no longer need it)
pathlib.Path(table_path).unlink()
# return modified sources format to indicate the final result
# and retain the other source data for reference as needed
return dest_path
@python_app
def _infer_source_group_common_schema(
source_group: List[Dict[str, Any]],
data_type_cast_map: Optional[Dict[str, str]] = None,
) -> List[Tuple[str, str]]:
"""
Infers a common schema for group of parquet files which may have
similar but slightly different schema or data. Intended to assist with
data concatenation and other operations.
Args:
source_group: List[Dict[str, Any]]:
Group of one or more data sources which includes metadata about
path to parquet data.
data_type_cast_map: Optional[Dict[str, str]], default None
A dictionary mapping data type groups to specific types.
Roughly includes Arrow data types language from:
https://arrow.apache.org/docs/python/api/datatypes.html
Returns:
List[Tuple[str, str]]
A list of tuples which includes column name and PyArrow datatype.
This data will later be used as the basis for forming a PyArrow schema.
"""
import pyarrow as pa
import pyarrow.parquet as parquet
from cytotable.exceptions import SchemaException
# read first file for basis of schema and column order for all others
common_schema = parquet.read_schema(source_group[0]["table"][0])
# infer common basis of schema and column order for all others
for schema in [
parquet.read_schema(table)
for source in source_group
for table in source["table"]
]:
# account for completely equal schema
if schema.equals(common_schema):
continue
# gather field names from schema
schema_field_names = [item.name for item in schema]
# reversed enumeration because removing indexes ascendingly changes schema field order
for index, field in reversed(list(enumerate(common_schema))):
# check whether field name is contained within writer basis, remove if not
# note: because this only checks for naming, we defer to initially detected type
if field.name not in schema_field_names:
common_schema = common_schema.remove(index)
# check if we have a nulltype and non-nulltype conflict, deferring to non-nulltype
elif pa.types.is_null(field.type) and not pa.types.is_null(
schema.field(field.name).type
):
common_schema = common_schema.set(
index, field.with_type(schema.field(field.name).type)
)
# check if we have an integer to float challenge and enable later casting
elif pa.types.is_integer(field.type) and pa.types.is_floating(
schema.field(field.name).type
):
common_schema = common_schema.set(
index,
field.with_type(
# use float64 as a default here if we aren't casting floats
pa.float64()
if data_type_cast_map is None
or "float" not in data_type_cast_map.keys()
# otherwise use the float data type cast type
else pa.type_for_alias(data_type_cast_map["float"])
),
)
if len(list(common_schema.names)) == 0:
raise SchemaException(
(
"No common schema basis to perform concatenation for source group."
" All columns mismatch one another within the group."
)
)
# return a python-native list of tuples with column names and str types
return list(
zip(
common_schema.names,
[str(schema_type) for schema_type in common_schema.types],
)
)
[docs]
def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
source_path: str,
dest_path: str,
source_datatype: Optional[str],
metadata: Optional[Union[List[str], Tuple[str, ...]]],
compartments: Optional[Union[List[str], Tuple[str, ...]]],
identifying_columns: Optional[Union[List[str], Tuple[str, ...]]],
concat: bool,
join: bool,
joins: Optional[str],
chunk_size: Optional[int],
infer_common_schema: bool,
drop_null: bool,
sort_output: bool,
page_keys: Dict[str, str],
data_type_cast_map: Optional[Dict[str, str]] = None,
add_tablenumber: Optional[bool] = None,
**kwargs,
) -> Union[Dict[str, List[Dict[str, Any]]], List[Any], str]:
"""
Export data to parquet.
Args:
source_path: str:
str reference to read source files from.
Note: may be local or remote object-storage
location using convention "s3://..." or similar.
dest_path: str:
Path to write files to. This path will be used for
intermediary data work and must be a new file or directory path.
This parameter will result in a directory on `join=False`.
This parameter will result in a single file on `join=True`.
Note: this may only be a local path.
source_datatype: Optional[str]: (Default value = None)
Source datatype to focus on during conversion.
metadata: Union[List[str], Tuple[str, ...]]:
Metadata names to use for conversion.
compartments: Union[List[str], Tuple[str, ...]]: (Default value = None)
Compartment names to use for conversion.
identifying_columns: Union[List[str], Tuple[str, ...]]:
Column names which are used as ID's and as a result need to be
ignored with regards to renaming.
concat: bool:
Whether to concatenate similar files together.
join: bool:
Whether to join the compartment data together into one dataset.
joins: str:
DuckDB-compatible SQL which will be used to perform the join operations.
chunk_size: Optional[int],
Size of join chunks which is used to limit data size during join ops.
infer_common_schema: bool: (Default value = True)
Whether to infer a common schema when concatenating sources.
drop_null: bool:
Whether to drop null results.
sort_output: bool
Specifies whether to sort cytotable output or not.
page_keys: Dict[str, str]
A dictionary which defines which column names are used for keyset pagination
in order to perform data extraction.
data_type_cast_map: Dict[str, str]
A dictionary mapping data type groups to specific types.
Roughly includes Arrow data types language from:
https://arrow.apache.org/docs/python/api/datatypes.html
**kwargs: Any:
Keyword args used for gathering source data, primarily relevant for
Cloudpathlib cloud-based client configuration.
Returns:
Union[Dict[str, List[Dict[str, Any]]], str]:
Grouped sources which include metadata about destination filepath
where parquet file was written or a string filepath for the joined
result.
"""
# gather sources to be processed
sources = _gather_sources(
source_path=source_path,
source_datatype=source_datatype,
targets=(
list(metadata) + list(compartments)
if metadata is not None and compartments is not None
else []
),
**kwargs,
)
# expand the destination path
expanded_dest_path = _expand_path(path=dest_path)
# check that each source group name has a pagination key
for source_group_name in sources.keys():
matching_keys = [
key for key in page_keys.keys() if key.lower() in source_group_name.lower()
]
if not matching_keys:
raise CytoTableException(
f"No matching key found in page_keys for source_group_name: {source_group_name}."
"Please include a pagination key based on a column name from the table."
)
# prepare pagesets for chunked data export from source tables
pagesets_prepared = {
source_group_name: [
dict(
source,
**{
"page_key": (
page_key := [
value
for key, value in page_keys.items()
if key.lower() in source_group_name.lower()
][0]
),
"pagesets": _get_table_keyset_pagination_sets(
source=source,
chunk_size=chunk_size,
page_key=page_key,
),
},
)
for source in source_group_vals
]
for source_group_name, source_group_vals in sources.items()
}
# if pagesets is none and we haven't halted, remove the file as there
# were input formatting errors which will create challenges downstream
invalid_files_dropped = {
source_group_name: [
# ensure we have pagesets
source
for source in source_group_vals
if source["pagesets"] is not None
]
for source_group_name, source_group_vals in evaluate_futures(
pagesets_prepared
).items()
# ensure we have source_groups with at least one source table
if len(source_group_vals) > 0
}
# gather column names and types from source tables
column_names_and_types_gathered = {
source_group_name: [
dict(
source,
**{
"columns": _prep_cast_column_data_types(
columns=_get_table_columns_and_types(
source=source, sort_output=sort_output
),
data_type_cast_map=data_type_cast_map,
)
},
)
for source in source_group_vals
]
for source_group_name, source_group_vals in invalid_files_dropped.items()
}
# add tablenumber details, appending None if not add_tablenumber
tablenumber_prepared = _set_tablenumber(
sources=evaluate_futures(column_names_and_types_gathered),
add_tablenumber=add_tablenumber,
).result()
results = {
source_group_name: [
dict(
source,
**{
"table": [
# perform column renaming and create potential return result
_prepend_column_name(
# perform chunked data export to parquet using pagesets
table_path=_source_pageset_to_parquet(
source_group_name=source_group_name,
source=source,
pageset=pageset,
dest_path=expanded_dest_path,
sort_output=sort_output,
),
source_group_name=source_group_name,
identifying_columns=identifying_columns,
metadata=metadata,
compartments=compartments,
)
for pageset in source["pagesets"]
]
},
)
for source in source_group_vals
]
for source_group_name, source_group_vals in evaluate_futures(
tablenumber_prepared
).items()
}
# if we're concatting or joining and need to infer the common schema
if (concat or join) and infer_common_schema:
# create a common schema for concatenation work
common_schema_determined = {
source_group_name: [
{
"sources": source_group_vals,
"common_schema": _infer_source_group_common_schema(
source_group=source_group_vals,
data_type_cast_map=data_type_cast_map,
),
}
]
for source_group_name, source_group_vals in evaluate_futures(
results
).items()
}
# if concat or join, concat the source groups
# note: join implies a concat, but concat does not imply a join
# We concat to join in order to create a common schema for join work
# performed after concatenation.
if concat or join:
# create a potential return result for concatenation output
results = {
source_group_name: _concat_source_group(
source_group_name=source_group_name,
source_group=source_group_vals[0]["sources"],
dest_path=expanded_dest_path,
common_schema=source_group_vals[0]["common_schema"],
sort_output=sort_output,
)
for source_group_name, source_group_vals in evaluate_futures(
common_schema_determined
).items()
}
# conditional section for merging
# note: join implies a concat, but concat does not imply a join
if join:
# evaluate the results as they're used multiple times below
evaluated_results = evaluate_futures(results)
prepared_joins_sql = _prepare_join_sql(
sources=evaluated_results, joins=joins
).result()
page_key_join = [
value for key, value in page_keys.items() if key.lower() == "join"
][0]
# map joined results based on the join groups gathered above
# note: after mapping we end up with a list of strings (task returns str)
join_sources_result = [
_join_source_pageset(
# gather the result of concatted sources prior to
# join group merging as each mapped task run will need
# full concat results
dest_path=expanded_dest_path,
joins=prepared_joins_sql,
page_key=page_key_join,
pageset=pageset,
sort_output=sort_output,
drop_null=drop_null,
)
# create join group for querying the concatenated
# data in order to perform memory-safe joining
# per user chunk size specification.
for pageset in _get_table_keyset_pagination_sets(
sql_stmt=prepared_joins_sql,
chunk_size=chunk_size,
page_key=page_key_join,
).result()
]
if concat:
# concat our join chunks together as one cohesive dataset
# return results in common format which includes metadata
# for lineage and debugging
results = _concat_join_sources(
dest_path=expanded_dest_path,
join_sources=[join.result() for join in join_sources_result],
sources=evaluated_results,
sort_output=sort_output,
)
else:
# else we leave the joined chunks as-is and return them
return evaluate_futures(join_sources_result)
# wrap the final result as a future and return
return evaluate_futures(results)
[docs]
def convert( # pylint: disable=too-many-arguments,too-many-locals
source_path: str,
dest_path: str,
dest_datatype: Literal["parquet"],
source_datatype: Optional[str] = None,
metadata: Optional[Union[List[str], Tuple[str, ...]]] = None,
compartments: Optional[Union[List[str], Tuple[str, ...]]] = None,
identifying_columns: Optional[Union[List[str], Tuple[str, ...]]] = None,
concat: bool = True,
join: bool = True,
joins: Optional[str] = None,
chunk_size: Optional[int] = None,
infer_common_schema: bool = True,
drop_null: bool = False,
data_type_cast_map: Optional[Dict[str, str]] = None,
add_tablenumber: Optional[bool] = None,
page_keys: Optional[Dict[str, str]] = None,
sort_output: bool = True,
preset: Optional[str] = "cellprofiler_csv",
parsl_config: Optional[parsl.Config] = None,
**kwargs,
) -> Union[Dict[str, List[Dict[str, Any]]], List[Any], str]:
"""
Convert file-based data from various sources to Pycytominer-compatible standards.
Note: source paths may be local or remote object-storage location
using convention "s3://..." or similar.
Args:
source_path: str:
str reference to read source files from.
Note: may be local or remote object-storage location
using convention "s3://..." or similar.
dest_path: str:
Path to write files to. This path will be used for
intermediary data work and must be a new file or directory path.
This parameter will result in a directory on `join=False`.
This parameter will result in a single file on `join=True`.
Note: this may only be a local path.
dest_datatype: Literal["parquet"]:
Destination datatype to write to.
source_datatype: Optional[str]: (Default value = None)
Source datatype to focus on during conversion.
metadata: Union[List[str], Tuple[str, ...]]:
Metadata names to use for conversion.
compartments: Union[List[str], Tuple[str, str, str, str]]:
(Default value = None)
Compartment names to use for conversion.
identifying_columns: Union[List[str], Tuple[str, ...]]:
Column names which are used as ID's and as a result need to be
ignored with regards to renaming.
concat: bool: (Default value = True)
Whether to concatenate similar files together.
join: bool: (Default value = True)
Whether to join the compartment data together into one dataset
joins: str: (Default value = None):
DuckDB-compatible SQL which will be used to perform the join operations.
chunk_size: Optional[int] (Default value = None)
Size of join chunks which is used to limit data size during join ops
infer_common_schema: bool (Default value = True)
Whether to infer a common schema when concatenating sources.
data_type_cast_map: Dict[str, str], (Default value = None)
A dictionary mapping data type groups to specific types.
Roughly includes Arrow data types language from:
https://arrow.apache.org/docs/python/api/datatypes.html
add_tablenumber: Optional[bool]
Whether to add a calculated tablenumber which helps differentiate
various repeated values (such as ObjectNumber) within source data.
Useful for processing multiple SQLite or CSV data sources together
to retain distinction from each dataset.
page_keys: str:
The table and column names to be used for key pagination.
Uses the form: {"table_name":"column_name"}.
Expects columns to include numeric data (ints or floats).
Interacts with the `chunk_size` parameter to form
pages of `chunk_size`.
sort_output: bool (Default value = True)
Specifies whether to sort cytotable output or not.
drop_null: bool (Default value = False)
Whether to drop nan/null values from results
preset: str (Default value = "cellprofiler_csv")
an optional group of presets to use based on common configurations
parsl_config: Optional[parsl.Config] (Default value = None)
Optional Parsl configuration to use for running CytoTable operations.
Note: when using CytoTable multiple times in the same process,
CytoTable will use the first provided configuration for all runs.
Returns:
Union[Dict[str, List[Dict[str, Any]]], str]
Grouped sources which include metadata about destination filepath
where parquet file was written or str of joined result filepath.
Example:
.. code-block:: python
from cytotable import convert
# using a local path with cellprofiler csv presets
convert(
source_path="./tests/data/cellprofiler/ExampleHuman",
source_datatype="csv",
dest_path="ExampleHuman.parquet",
dest_datatype="parquet",
preset="cellprofiler_csv",
)
# using an s3-compatible path with no signature for client
# and cellprofiler csv presets
convert(
source_path="s3://s3path",
source_datatype="csv",
dest_path="s3_local_result",
dest_datatype="parquet",
concat=True,
preset="cellprofiler_csv",
no_sign_request=True,
)
# using local path with cellprofiler sqlite presets
convert(
source_path="example.sqlite",
dest_path="example.parquet",
dest_datatype="parquet",
preset="cellprofiler_sqlite",
)
"""
# Raise an exception if an existing path is provided as the destination
# to avoid removing existing data or unrelated data removal.
if _expand_path(dest_path).exists():
raise CytoTableException(
(
"An existing file or directory was provided as dest_path: "
f"'{dest_path}'. Please use a new path for this parameter."
)
)
# attempt to load parsl configuration if we didn't already load one
if not _parsl_loaded():
# if we don't have a parsl configuration provided, load the default
if parsl_config is None:
parsl.load(_default_parsl_config())
else:
# else we attempt to load the given parsl configuration
parsl.load(parsl_config)
else:
# otherwise warn the user about previous config.
logger.warning("Reusing previously loaded Parsl configuration.")
# optionally load preset configuration for arguments
# note: defer to overrides from parameters whose values
# are not None (allows intermixing of presets and overrides)
if preset is not None:
metadata = (
cast(list, config[preset]["CONFIG_NAMES_METADATA"])
if metadata is None
else metadata
)
compartments = (
cast(list, config[preset]["CONFIG_NAMES_COMPARTMENTS"])
if compartments is None
else compartments
)
identifying_columns = (
cast(list, config[preset]["CONFIG_IDENTIFYING_COLUMNS"])
if identifying_columns is None
else identifying_columns
)
joins = cast(str, config[preset]["CONFIG_JOINS"]) if joins is None else joins
chunk_size = (
cast(int, config[preset]["CONFIG_CHUNK_SIZE"])
if chunk_size is None
else chunk_size
)
page_keys = (
cast(dict, config[preset]["CONFIG_PAGE_KEYS"])
if page_keys is None
else page_keys
)
# Raise an exception for scenarios where one configures CytoTable to join
# but does not provide a pagination key for the joins.
if join and (page_keys is None or "join" not in page_keys.keys()):
raise CytoTableException(
(
"When using join=True one must pass a 'join' pagination key "
"in the page_keys parameter. The 'join' pagination key is a column "
"name found within the joined results based on the SQL provided from "
"the joins parameter. This special key is required as not all columns "
"from the source tables might not be included."
)
)
# send sources to be written to parquet if selected
if dest_datatype == "parquet":
output = _to_parquet(
source_path=source_path,
dest_path=dest_path,
source_datatype=source_datatype,
metadata=metadata,
compartments=compartments,
identifying_columns=identifying_columns,
concat=concat,
join=join,
joins=joins,
chunk_size=chunk_size,
infer_common_schema=infer_common_schema,
drop_null=drop_null,
data_type_cast_map=data_type_cast_map,
add_tablenumber=add_tablenumber,
sort_output=sort_output,
page_keys=cast(dict, page_keys),
**kwargs,
)
return output