You're reading the documentation for a development version. For the latest released version, please have a look at v1.0.1.
Source code for src.cosmicqc.analyze
"""
Module for detecting various quality control aspects from source data.
"""
import operator
import pathlib
import warnings
from functools import reduce
from typing import Dict, List, Optional, Tuple, Union
import pandas as pd
import yaml
from cytodataframe.frame import CytoDataFrame
from scipy.stats import zscore as scipy_zscore
DEFAULT_QC_THRESHOLD_FILE = (
f"{pathlib.Path(__file__).parent!s}/data/qc_nuclei_thresholds_default.yml"
)
# `label_outliers` and the helper accept raw caller input, including `None` to mean
# "load all named conditions from the thresholds file".
LabelOutliersFeatureThresholdInput = Optional[
Union[Dict[str, float], Dict[str, Dict[str, float]], str]
]
# `identify_outliers` only accepts concrete threshold input.
IdentifyOutliersFeatureThresholdInput = Union[
Dict[str, float], Dict[str, Dict[str, float]], str
]
[docs]
def _warn_if_inline_thresholds_ignore_file(
feature_thresholds_file: Optional[str],
feature_thresholds: LabelOutliersFeatureThresholdInput,
) -> None:
"""Warn when inline thresholds override a non-default thresholds file."""
if (
feature_thresholds_file is not None
and feature_thresholds_file != DEFAULT_QC_THRESHOLD_FILE
and isinstance(feature_thresholds, dict)
):
warnings.warn(
(
"Non-string `feature_thresholds` were provided, so "
"`feature_thresholds_file` will be ignored."
),
UserWarning,
stacklevel=3,
)
[docs]
def _convert_feature_threshold_input_to_named_threshold_dicts(
feature_thresholds_file: str,
feature_thresholds: LabelOutliersFeatureThresholdInput = None,
) -> List[Tuple[str, Dict[str, float]]]:
"""
Convert feature threshold input into named threshold dictionaries
for processing.
Args:
feature_thresholds_file: str
Path to the YAML file containing threshold definitions.
feature_thresholds: LabelOutliersFeatureThresholdInput
If you do not provide feature_thresholds, the function will default to using
the default YAML file.
- None: Use all thresholds from the file (only applicable to
`label_outliers`).
- str: Named threshold set from the file.
- Dict[str, float]: Single unnamed threshold set.
- Dict[str, Dict[str, float]]: Multiple named threshold sets.
Returns:
List[Tuple[str, Dict[str, float]]]: A list of (name, thresholds_dict) tuples.
If a single unnamed threshold dictionary is provided as input, it will be
returned with the name "custom".
Raises:
ValueError: If the input format is invalid.
"""
# If feature_thresholds is None, return all thresholds from the YAML
# file (default or specified) -> only used in label_outliers
if feature_thresholds is None:
return list(
read_thresholds_set_from_file(
feature_thresholds_file=feature_thresholds_file,
).items()
)
# If feature_thresholds is a string, treat it as a key to look up in the YAML file
if isinstance(feature_thresholds, str):
return [
(
feature_thresholds,
read_thresholds_set_from_file(
feature_thresholds=feature_thresholds,
feature_thresholds_file=feature_thresholds_file,
),
)
]
# If feature_thresholds is a dict, determine if it's single or multiple conditions
if isinstance(feature_thresholds, dict):
# Warn user if they provided inline thresholds that will override the file
_warn_if_inline_thresholds_ignore_file(
feature_thresholds_file=feature_thresholds_file,
feature_thresholds=feature_thresholds,
)
# Check that the dict is not empty to avoid silent failures later
if feature_thresholds == {}:
raise ValueError("feature_thresholds cannot be empty.")
# If all values are dicts, treat as multiple conditions
if all(isinstance(v, dict) for v in feature_thresholds.values()):
return list(feature_thresholds.items())
# If all values are numeric, treat as a single unnamed condition
if all(isinstance(v, (int, float)) for v in feature_thresholds.values()):
return [("custom", feature_thresholds)]
# If we reach here, the input format is invalid or unexpected
raise ValueError("Invalid feature_thresholds format.")
[docs]
def _create_condition_map(
df: CytoDataFrame,
outlier_df: CytoDataFrame,
thresholds: Dict[str, float],
name_prefix: str,
) -> Tuple[List[pd.Series], Dict[str, str]]:
"""
Create boolean outlier conditions and z-score column names for one threshold set.
Args:
df: CytoDataFrame
Source data used to calculate z-scores.
outlier_df: CytoDataFrame
Working dataframe where z-score columns are stored.
thresholds: Dict[str, float]
Dictionary of feature thresholds.
name_prefix: str
Prefix to use when naming z-score columns.
Raises:
ValueError: If a feature in thresholds does not exist in the DataFrame.
Returns:
Tuple[List[pd.Series], Dict[str, str]]
A list of boolean condition series and a mapping of features to their
z-score column names.
"""
# Keep track of z-score column names for each feature
zscore_columns = {}
# Loop through each feature and threshold to create z-score columns and conditions
for feature in thresholds:
# Check that the feature exists in the DataFrame before attempting to create
if feature not in df.columns:
raise ValueError(f"Feature '{feature}' does not exist in the DataFrame.")
# Set the name for the z-score column for this feature and prefix
zscore_col = f"{name_prefix}_{feature}_zscore"
# Reuse any z-score column that has already been computed for this prefix.
if zscore_col not in outlier_df.columns:
outlier_df[zscore_col] = scipy_zscore(df[feature])
# Store the z-score column name for this feature to use when creating conditions
zscore_columns[feature] = zscore_col
# Create a boolean condition for each feature based on the specified threshold.
conditions = [
(
outlier_df[zscore_columns[feature]] > threshold
if threshold > 0
else outlier_df[zscore_columns[feature]] < threshold
)
for feature, threshold in thresholds.items()
]
return conditions, zscore_columns
[docs]
def identify_outliers( # noqa: PLR0913
df: Union[CytoDataFrame, pd.DataFrame, str],
feature_thresholds: IdentifyOutliersFeatureThresholdInput,
feature_thresholds_file: Optional[str] = DEFAULT_QC_THRESHOLD_FILE,
condition_name: Optional[str] = None,
include_threshold_scores: bool = False,
export_path: Optional[str] = None,
) -> Union[
pd.Series,
CytoDataFrame,
]:
"""
This function uses z-scoring to format the data for detecting outlier
nuclei or cells using specific CellProfiler features.
Args:
df: Union[CytoDataFrame, pd.DataFrame, str]
Input dataframe or file path.
feature_thresholds: IdentifyOutliersFeatureThresholdInput
Either:
1. {feature: threshold}
2. {condition_name: {feature: threshold}}
3. string key from a YAML file
include_threshold_scores: bool
Whether to include z-score columns in the output.
condition_name: Optional[str]
Optional explicit name to use for CQC columns when
`feature_thresholds` is a single dict (only features and thresholds).
Default name is "custom" if not provided.
export_path: Optional[str]
If provided, export the result.
Returns:
Union[pd.Series, CytoDataFrame]
Return shape depends on whether one or multiple conditions are provided:
- Single condition:
Returns a boolean `pd.Series` if `include_threshold_scores` is False.
Returns a `CytoDataFrame` with z-score columns and the outlier column
if `include_threshold_scores` is True.
- Multiple conditions via `Dict[str, Dict[str, float]]`:
Returns a `CytoDataFrame`.
If `include_threshold_scores` is False, it contains one outlier
column per condition.
If `include_threshold_scores` is True, it contains the per-condition
z-score columns plus one outlier column per condition.
"""
# Convert input to CytoDataFrame if not already one
df = CytoDataFrame(data=df)
# Create a copy of the dataframe to avoid modifying the original
outlier_df = df.copy()
# Determine the key for naming if feature_thresholds is a string
thresholds_key = feature_thresholds if isinstance(feature_thresholds, str) else None
_warn_if_inline_thresholds_ignore_file(
feature_thresholds_file=feature_thresholds_file,
feature_thresholds=feature_thresholds,
)
# Read thresholds from file if a string key is provided
if isinstance(feature_thresholds, str):
feature_thresholds = read_thresholds_set_from_file(
feature_thresholds=feature_thresholds,
feature_thresholds_file=feature_thresholds_file,
)
# Confirm that if a dict is provided, it is not empty to avoid silent failures later
if isinstance(feature_thresholds, dict) and feature_thresholds == {}:
raise ValueError("feature_thresholds cannot be empty.")
# Handle multiple conditions if feature_thresholds is a dict of dicts
if feature_thresholds and isinstance(next(iter(feature_thresholds.values())), dict):
# We have multiple named conditions, so combine the per-condition results
# into a single CytoDataFrame.
result_frames = []
# Loop through each condition set and create the corresponding outlier columns
for name, thresholds in feature_thresholds.items():
# Set name prefix for this condition
name_prefix = f"Metadata_cqc_{name}"
# Create the condition map for this set of thresholds
conditions, zscore_columns = _create_condition_map(
df=df,
outlier_df=outlier_df,
thresholds=thresholds,
name_prefix=name_prefix,
)
# Combine conditions with AND logic to determine overall outlier status
# for this condition set
is_outlier_series = reduce(operator.and_, conditions).rename(
f"{name_prefix}_is_outlier"
)
# If we want to include threshold scores, we either return just the boolean
# series or a DataFrame with z-scores and the boolean column
if include_threshold_scores:
result_frames.append(
outlier_df[[*list(zscore_columns.values()), is_outlier_series.name]]
if is_outlier_series.name in outlier_df.columns
else pd.concat(
[outlier_df[list(zscore_columns.values())], is_outlier_series],
axis=1,
)
)
else:
result_frames.append(is_outlier_series.to_frame())
result = CytoDataFrame(
data=pd.concat(result_frames, axis=1),
data_context_dir=df._custom_attrs["data_context_dir"],
data_mask_context_dir=df._custom_attrs["data_mask_context_dir"],
)
if export_path is not None:
result.export(file_path=export_path)
return result
# If we have a single condition (either from a dict or from a string key),
# we create one set of columns
name_prefix = f"Metadata_cqc_{condition_name or thresholds_key or 'custom'}"
# Create the condition map for this set of thresholds
conditions, zscore_columns = _create_condition_map(
df=df,
outlier_df=outlier_df,
thresholds=feature_thresholds,
name_prefix=name_prefix,
)
# Combine conditions with AND logic to determine overall outlier status
# for this condition set
is_outlier_series = reduce(operator.and_, conditions).rename(
f"{name_prefix}_is_outlier"
)
# If we want to include threshold scores, we either return just the boolean series
# or a DataFrame with z-scores and the boolean column
if include_threshold_scores:
zscore_df = outlier_df[list(zscore_columns.values())]
result = CytoDataFrame(
data=pd.concat([zscore_df, is_outlier_series], axis=1),
data_context_dir=df._custom_attrs["data_context_dir"],
data_mask_context_dir=df._custom_attrs["data_mask_context_dir"],
)
else:
result = is_outlier_series
# Export if export_path is provided
if export_path is not None:
export_df = CytoDataFrame(result) if isinstance(result, pd.Series) else result
export_df.export(file_path=export_path)
return result
[docs]
def find_outliers(
df: Union[CytoDataFrame, pd.DataFrame, str],
metadata_columns: List[str],
feature_thresholds: Union[Dict[str, float], str],
feature_thresholds_file: Optional[str] = DEFAULT_QC_THRESHOLD_FILE,
export_path: Optional[str] = None,
) -> pd.DataFrame:
"""
This function uses identify_outliers to return a dataframe
with only the outliers and provided metadata columns.
NOTE: This function can only be used with a single set of feature thresholds
for finding what outliers look like in the data. For multiple sets of
feature thresholds, use label_outliers instead and filter the results for
the condition(s) of interest.
Args:
df: Union[CytoDataFrame, pd.DataFrame, str]
DataFrame or file string-based filepath of a
Parquet, CSV, or TSV file with CytoTable output or similar data.
metadata_columns: List[str]
List of metadata columns that should be outputted with the outlier data.
feature_thresholds: Dict[str, float]
One of two options:
A dictionary with the feature name(s) as the key(s) and their assigned
threshold for identifying outliers. Positive int for the threshold
will detect outliers "above" than the mean, negative int will detect
outliers "below" the mean.
Or a string which is a named key reference found within
the feature_thresholds_file yaml file.
feature_thresholds_file: Optional[str] = DEFAULT_QC_THRESHOLD_FILE,
An optional feature thresholds file where thresholds may be
defined within a file.
export_path: Optional[str] = None
An optional path to export the data using CytoDataFrame export
capabilities. If None no export is performed.
Note: compatible exports are CSV's, TSV's, and parquet.
Returns:
pd.DataFrame:
Outlier data frame for the given conditions.
"""
_warn_if_inline_thresholds_ignore_file(
feature_thresholds_file=feature_thresholds_file,
feature_thresholds=feature_thresholds,
)
# Convert input to CytoDataFrame if it's a file path or a pandas DataFrame
if isinstance(feature_thresholds, str):
feature_thresholds = read_thresholds_set_from_file(
feature_thresholds=feature_thresholds,
feature_thresholds_file=feature_thresholds_file,
)
# Determine the required columns for processing and output
required_columns = list(feature_thresholds.keys()) + metadata_columns
# Ensure the DataFrame contains the required columns and convert to CytoDataFrame
df = CytoDataFrame(data=df)[required_columns]
# Check for NaN values in the required feature columns and warn if any are found
if any(df[list(feature_thresholds.keys())].isna().any()):
print(
"Warning: NaN values found in the DataFrame. "
"These will be dropped before processing."
)
df = df.dropna(subset=list(feature_thresholds.keys()))
# Use identify_outliers to get a boolean mask of outliers based on the
# provided thresholds
outliers_mask = identify_outliers(
df=df,
feature_thresholds=feature_thresholds,
feature_thresholds_file=feature_thresholds_file,
)
# Filter the original DataFrame to return only the outliers along with the
# specified metadata columns
outliers_df = df[outliers_mask]
# Print summary statistics about the outliers found
print(
"Number of outliers:",
outliers_df.shape[0],
f"({'{:.2f}'.format((outliers_df.shape[0] / df.shape[0]) * 100)}%)",
)
print("Outliers Range:")
for feature in feature_thresholds:
print(f"{feature} Min:", outliers_df[feature].min())
print(f"{feature} Max:", outliers_df[feature].max())
# Select only the required columns for output (metadata + features)
result = outliers_df[required_columns]
# Export if export_path is provided
if export_path is not None:
result.export(file_path=export_path)
return result
[docs]
def label_outliers( # noqa: PLR0913
df: Union[CytoDataFrame, pd.DataFrame, str],
feature_thresholds: LabelOutliersFeatureThresholdInput = None,
feature_thresholds_file: Optional[str] = DEFAULT_QC_THRESHOLD_FILE,
include_threshold_scores: bool = False,
export_path: Optional[str] = None,
export_as_annotations: bool = False,
annotation_metadata_columns: Optional[List[str]] = None,
) -> CytoDataFrame:
"""
This function labels outliers in the input dataframe based on specified
feature thresholds and exports the whole dataframe or an annotations file with
just metadata and outlier labels.
Args:
df: Union[CytoDataFrame, pd.DataFrame, str]
DataFrame or file path (Parquet, CSV, or TSV).
feature_thresholds: LabelOutliersFeatureThresholdInput
Defines one or more QC conditions.
- Single condition:
{"feature": threshold}
- Multiple conditions:
{
"undersegmented_cells": {"feature1": -1, "feature2": -1},
"oversegmented_nuclei": {"feature3": 2},
}
- String:
Named condition from the feature_thresholds_file.
- None:
Run all conditions defined in the thresholds file.
feature_thresholds_file: Optional[str] = DEFAULT_QC_THRESHOLD_FILE
YAML file containing named threshold conditions.
include_threshold_scores: bool = False
If True, include per-feature z-score columns in the output.
export_path: Optional[str] = None
Path to export results.
export_as_annotations: bool = False
If True, export only metadata + QC columns (annotations file).
If False, export the full dataset.
annotation_metadata_columns: Optional[List[str]] = None
Metadata columns to include when `export_as_annotations=True`.
If annotation export is requested, these columns are required and
will be written alongside the generated `Metadata_cqc_*` columns.
Returns:
CytoDataFrame:
Either the full dataframe or only metadata with added QC columns:
- Metadata_cqc_<condition>_is_outlier
- (optional) Metadata_cqc_<condition>_<feature>_zscore if
include_threshold_scores=True
When ``export_as_annotations=True``, the exported file contains only
`annotation_metadata_columns` plus QC-related columns.
"""
# Convert input to CytoDataFrame if it's a file path or a pandas DataFrame
if not isinstance(df, CytoDataFrame):
df = CytoDataFrame(data=df)
# Keep track of custom attributes to preserve them in the output
custom_attrs = dict(df._custom_attrs)
# Convert feature_thresholds input into a list of (name, thresholds) tuples
# for consistent processing
threshold_sets = _convert_feature_threshold_input_to_named_threshold_dicts(
feature_thresholds=feature_thresholds,
feature_thresholds_file=feature_thresholds_file,
)
# Start with the original dataframe and iteratively add outlier columns
# for each condition
base_df = df.drop(
columns=[c for c in df.columns if c.startswith("Metadata_cqc_")],
errors="ignore",
)
results = [base_df]
# Loop through each set of thresholds and identify outliers
for name, thresholds in threshold_sets:
detected = identify_outliers(
df=df,
feature_thresholds=thresholds,
feature_thresholds_file=feature_thresholds_file,
condition_name=name,
include_threshold_scores=include_threshold_scores,
)
# If the result is a Series, convert it to a DataFrame for
# consistent concatenation
if isinstance(detected, pd.Series):
condition_df = detected.to_frame()
else:
condition_df = detected
# Append the condition DataFrame to the results list for later concatenation
results.append(condition_df)
# Concatenate all results along the columns, ensuring we don't duplicate columns
clean_results = [r for r in results if isinstance(r, (pd.Series, pd.DataFrame))]
# Create the final result DataFrame by concatenating the original data
# with the new outlier columns
result = CytoDataFrame(
pd.concat(clean_results, axis=1),
**custom_attrs,
)
# Export if export_path is provided
if export_path is not None:
export_df = result.copy()
# If exporting as annotations, filter to only metadata and CQC columns
if export_as_annotations:
if annotation_metadata_columns is None:
raise ValueError(
"`annotation_metadata_columns` must be provided when "
"`export_as_annotations=True`."
)
# Check that all specified annotation metadata columns are present in the df
missing_metadata_cols = [
col
for col in annotation_metadata_columns
if col not in export_df.columns
]
# Raise error if any specified metadata columns are missing
if missing_metadata_cols:
raise ValueError(
"The following annotation metadata columns were not found in the "
f"dataframe: {missing_metadata_cols}"
)
# Set CQC columns as those that start with "Metadata_cqc_" to capture
# the outlier labels and z-score columns
cqc_cols = [
col for col in export_df.columns if col.startswith("Metadata_cqc_")
]
# Filter the export_df to only include metadata and CQC columns
export_df = export_df[annotation_metadata_columns + cqc_cols]
# Use CytoDataFrame export capabilities to export the result
CytoDataFrame(data=export_df, **custom_attrs).export(file_path=export_path)
return result
[docs]
def read_thresholds_set_from_file(
feature_thresholds_file: str, feature_thresholds: Optional[str] = None
) -> Union[Dict[str, int], Dict[str, Dict[str, int]]]:
"""
Reads a set of feature thresholds from a specified file.
This function takes the path to a feature thresholds file and a
specific feature threshold string, reads the file, and returns
the thresholds set from the file.
Args:
feature_thresholds_file (str):
The path to the file containing feature thresholds.
feature_thresholds (Optional str, default None):
A string specifying the feature thresholds.
If we have None, return all thresholds.
Returns:
dict: A dictionary containing the processed feature thresholds.
Raises:
LookupError: If the file does not contain the specified feature_thresholds key.
"""
# Read the thresholds from the specified YAML file
with open(feature_thresholds_file, "r") as file:
thresholds = yaml.safe_load(file)
# If feature_thresholds is None, return all thresholds from the file
if feature_thresholds is None:
return thresholds["thresholds"]
# If feature_thresholds is a string, look it up in the thresholds and
# return the corresponding set
if feature_thresholds not in thresholds["thresholds"]:
raise LookupError(
(
f"Unable to find threshold set by name {feature_thresholds}"
f" within {feature_thresholds_file}"
)
)
return thresholds["thresholds"][feature_thresholds]