Source code for crispyx.dimred

"""Dimension reduction module for crispyx.

Provides streaming/on-disk PCA and neighbor computation to avoid memory issues
with large datasets. Follows Scanpy-style API patterns.
"""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING

import numpy as np
from scipy import sparse
from scipy.linalg import eigh
from sklearn.decomposition import IncrementalPCA
from tqdm.auto import tqdm

if TYPE_CHECKING:
    import anndata as AnnData

from .data import (
    calculate_pca_chunk_size,
    _to_dense,
    write_obsm_to_h5ad,
    write_varm_to_h5ad,
    write_uns_dict_to_h5ad,
    write_obsp_to_h5ad,
    AnnData as CrispyxAnnData,
)

logger = logging.getLogger(__name__)


def _streaming_pca_sparse_cov(
    adata: "AnnData",
    n_comps: int = 50,
    chunk_size: int = 2048,
    use_highly_variable: bool = True,
    return_info: bool = False,
    show_progress: bool = True,
) -> tuple[np.ndarray, np.ndarray, np.ndarray, dict | None]:
    """Compute PCA using sparse covariance method.
    
    Efficient for datasets with moderate gene counts (< ~15K genes).
    Computes X^T @ X in a streaming fashion, exploiting sparsity.
    
    Parameters
    ----------
    adata
        AnnData object with expression data.
    n_comps
        Number of principal components to compute.
    chunk_size
        Number of cells per chunk.
    use_highly_variable
        If True and 'highly_variable' exists in var, use only HVGs.
    return_info
        If True, return additional info dict.
    show_progress
        Show progress bar.
    
    Returns
    -------
    X_pca
        PCA-transformed data (n_obs × n_comps).
    components
        Principal components (n_comps × n_vars).
    variance_ratio
        Variance explained ratio for each component.
    info
        Optional dict with mean, variance, etc. if return_info=True.
    """
    X = adata.X
    n_obs, n_vars_total = adata.shape
    
    # Determine which genes to use
    if use_highly_variable and "highly_variable" in adata.var.columns:
        gene_mask = adata.var["highly_variable"].values
        gene_indices = np.where(gene_mask)[0]
        n_vars = len(gene_indices)
        logger.info(f"Using {n_vars} highly variable genes for PCA")
    else:
        gene_indices = None
        n_vars = n_vars_total
        logger.info(f"Using all {n_vars} genes for PCA")
    
    n_comps = min(n_comps, n_vars, n_obs)
    
    # Streaming: compute sums and X^T @ X
    gene_sums = np.zeros(n_vars, dtype=np.float64)
    XTX = np.zeros((n_vars, n_vars), dtype=np.float64)
    
    n_chunks = (n_obs + chunk_size - 1) // chunk_size
    pbar_desc = "Computing covariance"
    
    for chunk_start in tqdm(
        range(0, n_obs, chunk_size),
        total=n_chunks,
        desc=pbar_desc,
        disable=not show_progress,
    ):
        chunk_end = min(chunk_start + chunk_size, n_obs)
        chunk = X[chunk_start:chunk_end, :]
        
        # Subset to selected genes
        if gene_indices is not None:
            chunk = chunk[:, gene_indices]
        
        # Convert to dense if sparse
        chunk_dense = _to_dense(chunk)
        
        # Accumulate sums
        gene_sums += chunk_dense.sum(axis=0)
        
        # Accumulate X^T @ X (exploits sparsity if input was sparse)
        XTX += chunk_dense.T @ chunk_dense
    
    # Compute mean
    mean = gene_sums / n_obs
    
    # Compute covariance: (X^T @ X) / n - mean @ mean^T
    cov = XTX / n_obs - np.outer(mean, mean)
    
    # Eigendecomposition (get top n_comps)
    # eigh returns eigenvalues in ascending order
    eigenvalues, eigenvectors = eigh(
        cov,
        subset_by_index=[n_vars - n_comps, n_vars - 1],
    )
    
    # Reverse to get descending order
    eigenvalues = eigenvalues[::-1]
    components = eigenvectors[:, ::-1].T  # (n_comps, n_vars)
    
    # Variance explained ratio
    total_variance = np.trace(cov)
    variance_ratio = eigenvalues / total_variance
    
    # Transform data: second pass through data
    X_pca = np.zeros((n_obs, n_comps), dtype=np.float32)
    
    for chunk_start in tqdm(
        range(0, n_obs, chunk_size),
        total=n_chunks,
        desc="Transforming data",
        disable=not show_progress,
    ):
        chunk_end = min(chunk_start + chunk_size, n_obs)
        chunk = X[chunk_start:chunk_end, :]
        
        if gene_indices is not None:
            chunk = chunk[:, gene_indices]
        
        chunk_dense = _to_dense(chunk)
        
        # Center and project
        chunk_centered = chunk_dense - mean
        X_pca[chunk_start:chunk_end] = (chunk_centered @ components.T).astype(np.float32)
    
    info = None
    if return_info:
        info = {
            "mean": mean,
            "variance": eigenvalues,
            "variance_ratio": variance_ratio,
            "gene_indices": gene_indices,
        }
    
    return X_pca, components, variance_ratio, info


def _streaming_pca_incremental(
    adata: "AnnData",
    n_comps: int = 50,
    chunk_size: int = 1024,
    use_highly_variable: bool = True,
    return_info: bool = False,
    show_progress: bool = True,
) -> tuple[np.ndarray, np.ndarray, np.ndarray, dict | None]:
    """Compute PCA using IncrementalPCA from sklearn.
    
    Memory-efficient for datasets with many genes (> ~15K genes).
    Streams through data without loading all into memory.
    
    Parameters
    ----------
    adata
        AnnData object with expression data.
    n_comps
        Number of principal components to compute.
    chunk_size
        Number of cells per chunk. Must be >= n_comps.
    use_highly_variable
        If True and 'highly_variable' exists in var, use only HVGs.
    return_info
        If True, return additional info dict.
    show_progress
        Show progress bar.
    
    Returns
    -------
    X_pca
        PCA-transformed data (n_obs × n_comps).
    components
        Principal components (n_comps × n_vars).
    variance_ratio
        Variance explained ratio for each component.
    info
        Optional dict with mean, noise_variance, etc. if return_info=True.
    """
    X = adata.X
    n_obs, n_vars_total = adata.shape
    
    # Determine which genes to use
    if use_highly_variable and "highly_variable" in adata.var.columns:
        gene_mask = adata.var["highly_variable"].values
        gene_indices = np.where(gene_mask)[0]
        n_vars = len(gene_indices)
        logger.info(f"Using {n_vars} highly variable genes for PCA")
    else:
        gene_indices = None
        n_vars = n_vars_total
        logger.info(f"Using all {n_vars} genes for PCA")
    
    n_comps = min(n_comps, n_vars, n_obs)
    
    # Ensure chunk_size >= n_comps for IncrementalPCA
    actual_chunk_size = max(chunk_size, n_comps)
    if actual_chunk_size != chunk_size:
        logger.info(f"Adjusted chunk size from {chunk_size} to {actual_chunk_size} (>= n_comps)")
    
    # Initialize IncrementalPCA
    ipca = IncrementalPCA(n_components=n_comps)
    
    n_chunks = (n_obs + actual_chunk_size - 1) // actual_chunk_size
    
    # First pass: partial_fit to learn components
    for chunk_start in tqdm(
        range(0, n_obs, actual_chunk_size),
        total=n_chunks,
        desc="Learning PCA",
        disable=not show_progress,
    ):
        chunk_end = min(chunk_start + actual_chunk_size, n_obs)
        chunk = X[chunk_start:chunk_end, :]
        
        if gene_indices is not None:
            chunk = chunk[:, gene_indices]
        
        chunk_dense = _to_dense(chunk)
        
        # Skip if chunk smaller than n_comps (last chunk edge case)
        if chunk_dense.shape[0] < n_comps:
            logger.debug(f"Skipping small final chunk of size {chunk_dense.shape[0]}")
            continue
        
        ipca.partial_fit(chunk_dense)
    
    # Extract components
    components = ipca.components_  # (n_comps, n_vars)
    variance_ratio = ipca.explained_variance_ratio_
    
    # Second pass: transform data
    X_pca = np.zeros((n_obs, n_comps), dtype=np.float32)
    
    for chunk_start in tqdm(
        range(0, n_obs, actual_chunk_size),
        total=n_chunks,
        desc="Transforming data",
        disable=not show_progress,
    ):
        chunk_end = min(chunk_start + actual_chunk_size, n_obs)
        chunk = X[chunk_start:chunk_end, :]
        
        if gene_indices is not None:
            chunk = chunk[:, gene_indices]
        
        chunk_dense = _to_dense(chunk)
        X_pca[chunk_start:chunk_end] = ipca.transform(chunk_dense).astype(np.float32)
    
    info = None
    if return_info:
        info = {
            "mean": ipca.mean_,
            "variance": ipca.explained_variance_,
            "variance_ratio": variance_ratio,
            "gene_indices": gene_indices,
            "noise_variance": ipca.noise_variance_,
        }
    
    return X_pca, components, variance_ratio, info


[docs] def pca( adata: "AnnData", n_comps: int = 50, method: str = "auto", use_highly_variable: bool = True, chunk_size: int | None = None, random_state: int = 0, copy: bool = False, show_progress: bool = True, ) -> "AnnData" | None: """Compute Principal Component Analysis (PCA) on backed AnnData. Streaming implementation that works with on-disk data to avoid memory issues. Automatically selects the optimal method based on dataset characteristics. For backed data (crispyx.AnnData wrapper), results are written directly to the h5ad file using a close-write-reopen pattern, keeping .X on disk. Parameters ---------- adata The annotated data matrix. n_comps Number of principal components to compute. Default 50. method PCA method to use: - 'auto': Automatically select based on gene count and memory - 'sparse_cov': Use sparse covariance method (fast for ≤15K genes) - 'incremental': Use IncrementalPCA (memory-efficient for >15K genes) use_highly_variable If True and 'highly_variable' exists in var, restrict to HVGs. Default True. chunk_size Number of cells to process per chunk. If None, automatically calculated based on available memory. random_state Random seed (not currently used, for API compatibility). copy If True, return a copy of adata with PCA results. If False, modify adata in place and return None. show_progress Show progress bars during computation. Default True. Returns ------- adata : AnnData | None If copy=True, returns modified AnnData. Otherwise modifies in place. Modifies adata -------------- obsm['X_pca'] PCA-transformed data (n_obs × n_comps). varm['PCs'] Principal components (n_vars × n_comps). Only includes selected genes. uns['pca'] Dict with 'variance', 'variance_ratio', 'use_highly_variable'. Examples -------- >>> import crispyx as cx >>> adata = cx.read_backed("data.h5ad") >>> cx.pp.pca(adata, n_comps=50) >>> adata.obsm['X_pca'].shape (n_obs, 50) """ import anndata # Detect if this is a crispyx.AnnData wrapper (backed, with .path) is_crispyx_wrapper = isinstance(adata, CrispyxAnnData) if copy: # For copy mode, load into memory if is_crispyx_wrapper: adata = adata.to_memory() elif hasattr(adata, 'file') and adata.file is not None: adata = adata.to_memory() else: adata = adata.copy() n_obs, n_vars = adata.shape # Determine gene count for PCA (after HVG filtering) if use_highly_variable and "highly_variable" in adata.var.columns: n_vars_pca = adata.var["highly_variable"].sum() else: n_vars_pca = n_vars # Calculate chunk size and method if not provided if chunk_size is None: chunk_size, selected_method = calculate_pca_chunk_size( n_obs=n_obs, n_vars=n_vars_pca, n_comps=n_comps, method=method, ) else: # Still need to select method if method == "auto": _, selected_method = calculate_pca_chunk_size( n_obs=n_obs, n_vars=n_vars_pca, n_comps=n_comps, method=method, ) else: selected_method = method logger.info(f"Running PCA with method='{selected_method}', chunk_size={chunk_size}") # Run PCA if selected_method == "sparse_cov": X_pca, components, variance_ratio, info = _streaming_pca_sparse_cov( adata, n_comps=n_comps, chunk_size=chunk_size, use_highly_variable=use_highly_variable, return_info=True, show_progress=show_progress, ) else: X_pca, components, variance_ratio, info = _streaming_pca_incremental( adata, n_comps=n_comps, chunk_size=chunk_size, use_highly_variable=use_highly_variable, return_info=True, show_progress=show_progress, ) # Prepare PCs array if use_highly_variable and "highly_variable" in adata.var.columns: pcs_full = np.zeros((n_vars, n_comps), dtype=np.float32) pcs_full[info["gene_indices"], :] = components.T else: pcs_full = components.T.astype(np.float32) # Prepare uns dict pca_uns = { "variance": info["variance"], "variance_ratio": variance_ratio, "use_highly_variable": use_highly_variable, "method": selected_method, "n_comps": n_comps, } # Store results: use close-write-reopen for crispyx wrapper, direct for in-memory if is_crispyx_wrapper and not copy: # Close file handle path = adata.path adata.close() # Write results to h5ad file write_obsm_to_h5ad(path, "X_pca", X_pca) write_varm_to_h5ad(path, "PCs", pcs_full) write_uns_dict_to_h5ad(path, "pca", pca_uns) # File will be reopened lazily on next access logger.info( f"PCA complete: {n_comps} components, " f"variance explained: {variance_ratio.sum():.2%} (written to {path})" ) else: # In-memory AnnData: store directly adata.obsm["X_pca"] = X_pca adata.varm["PCs"] = pcs_full adata.uns["pca"] = pca_uns logger.info( f"PCA complete: {n_comps} components, " f"variance explained: {variance_ratio.sum():.2%}" ) if copy: return adata return None
def _compute_connectivities_umap( knn_indices: np.ndarray, knn_distances: np.ndarray, n_obs: int, n_neighbors: int, ) -> sparse.csr_matrix: """Compute UMAP-style connectivities from KNN graph. Follows the UMAP fuzzy simplicial set construction. """ from scipy.sparse import coo_matrix # Simple UMAP-style connectivities: 1 / (1 + distance) # More sophisticated version would use local connectivity rows = np.repeat(np.arange(n_obs), n_neighbors) cols = knn_indices.ravel() # Avoid division by zero dists = knn_distances.ravel() dists = np.maximum(dists, 1e-10) # Simple connectivity: exponential decay # sigma = local bandwidth (use mean of k-th neighbor distance) sigma = np.mean(knn_distances[:, -1]) sigma = max(sigma, 1e-10) data = np.exp(-dists / sigma) connectivities = coo_matrix((data, (rows, cols)), shape=(n_obs, n_obs)) connectivities = connectivities.tocsr() # Symmetrize: (A + A.T) / 2 connectivities = (connectivities + connectivities.T) / 2 return connectivities def _compute_distances_sparse( knn_indices: np.ndarray, knn_distances: np.ndarray, n_obs: int, n_neighbors: int, ) -> sparse.csr_matrix: """Convert KNN indices/distances to sparse distance matrix.""" from scipy.sparse import coo_matrix rows = np.repeat(np.arange(n_obs), n_neighbors) cols = knn_indices.ravel() data = knn_distances.ravel() distances = coo_matrix((data, (rows, cols)), shape=(n_obs, n_obs)) return distances.tocsr()
[docs] def neighbors( adata: "AnnData", n_neighbors: int = 15, n_pcs: int | None = None, use_rep: str = "X_pca", metric: str = "euclidean", method: str = "umap", random_state: int = 0, copy: bool = False, show_progress: bool = True, ) -> "AnnData" | None: """Compute k-nearest neighbors graph from embeddings. Uses pre-computed embeddings (typically PCA) to build a KNN graph. The embeddings are loaded into memory for efficient distance computation. For backed data (crispyx.AnnData wrapper), results are written directly to the h5ad file using a close-write-reopen pattern, keeping .X on disk. Parameters ---------- adata The annotated data matrix with embeddings in .obsm. n_neighbors Number of neighbors in the KNN graph. Default 15. n_pcs Number of PCs to use from the embedding. If None, uses all. use_rep Key in .obsm to use for distance computation. Default 'X_pca'. metric Distance metric. Default 'euclidean'. Supports 'euclidean', 'cosine', 'manhattan', etc. method KNN algorithm: 'umap' (uses pynndescent, fast approximate) or 'sklearn' (exact but slower). Default 'umap'. random_state Random seed for reproducibility. copy If True, return a copy with results. Otherwise modify in place. show_progress Show progress information. Default True. Returns ------- adata : AnnData | None If copy=True, returns modified AnnData. Otherwise modifies in place. Modifies adata -------------- obsp['distances'] Sparse distance matrix (n_obs × n_obs). obsp['connectivities'] Sparse connectivity matrix (n_obs × n_obs). uns['neighbors'] Dict with parameters: n_neighbors, method, metric, use_rep. Examples -------- >>> import crispyx as cx >>> adata = cx.read_backed("data.h5ad") >>> cx.pp.pca(adata, n_comps=50) >>> cx.pp.neighbors(adata, n_neighbors=15) >>> adata.obsp['connectivities'] <sparse matrix (n_obs, n_obs)> """ # Detect if this is a crispyx.AnnData wrapper (backed, with .path) is_crispyx_wrapper = isinstance(adata, CrispyxAnnData) if copy: if is_crispyx_wrapper: adata = adata.to_memory() elif hasattr(adata, 'file') and adata.file is not None: adata = adata.to_memory() else: adata = adata.copy() # Get embeddings if use_rep not in adata.obsm: raise ValueError( f"'{use_rep}' not found in adata.obsm. " f"Run cx.pp.pca() first or specify a valid use_rep. " f"Available keys: {list(adata.obsm.keys())}" ) X = adata.obsm[use_rep] # Load to memory if needed (backed obsm) if hasattr(X, 'to_memory'): X = X.to_memory() X = np.asarray(X) # Subset PCs if requested if n_pcs is not None and n_pcs < X.shape[1]: X = X[:, :n_pcs] logger.info(f"Using first {n_pcs} components from {use_rep}") n_obs, n_dims = X.shape logger.info( f"Computing {n_neighbors}-NN graph on {n_obs} cells × {n_dims} dims " f"using method='{method}'" ) # Compute KNN if method == "umap": try: from pynndescent import NNDescent except ImportError: raise ImportError( "pynndescent is required for method='umap'. " "Install with: pip install pynndescent" ) # Build index and query index = NNDescent( X, n_neighbors=n_neighbors, metric=metric, random_state=random_state, verbose=show_progress, ) knn_indices, knn_distances = index.neighbor_graph elif method == "sklearn": from sklearn.neighbors import NearestNeighbors nn = NearestNeighbors( n_neighbors=n_neighbors, metric=metric, algorithm="auto", ) nn.fit(X) knn_distances, knn_indices = nn.kneighbors(X) else: raise ValueError(f"Unknown method '{method}'. Use 'umap' or 'sklearn'.") # Build sparse matrices distances = _compute_distances_sparse(knn_indices, knn_distances, n_obs, n_neighbors) connectivities = _compute_connectivities_umap(knn_indices, knn_distances, n_obs, n_neighbors) # Prepare uns dict neighbors_uns = { "connectivities_key": "connectivities", "distances_key": "distances", "params": { "n_neighbors": n_neighbors, "method": method, "metric": metric, "use_rep": use_rep, "n_pcs": n_pcs if n_pcs is not None else X.shape[1], }, } # Store results: use close-write-reopen for crispyx wrapper, direct for in-memory if is_crispyx_wrapper and not copy: # Close file handle path = adata.path adata.close() # Write results to h5ad file write_obsp_to_h5ad(path, "distances", distances) write_obsp_to_h5ad(path, "connectivities", connectivities) write_uns_dict_to_h5ad(path, "neighbors", neighbors_uns) # File will be reopened lazily on next access logger.info( f"Neighbors complete: {n_neighbors} neighbors, " f"{connectivities.nnz} connections (written to {path})" ) else: # In-memory AnnData: store directly adata.obsp["distances"] = distances adata.obsp["connectivities"] = connectivities adata.uns["neighbors"] = neighbors_uns logger.info( f"Neighbors complete: {n_neighbors} neighbors, " f"{connectivities.nnz} connections" ) if copy: return adata return None
[docs] def umap( adata: "AnnData", min_dist: float = 0.5, spread: float = 1.0, n_components: int = 2, neighbors_key: str = "neighbors", random_state: int = 0, copy: bool = False, ) -> "AnnData" | None: """Compute UMAP embedding from pre-computed neighbor graph. Uses the neighbor graph stored in adata.obsp (computed by cx.pp.neighbors) to create a 2D UMAP embedding. This is memory-efficient because only the neighbor graph and embedding need to be in memory, not the full expression matrix. For backed data (crispyx.AnnData wrapper), results are written directly to the h5ad file using a close-write-reopen pattern, keeping .X on disk. Parameters ---------- adata The annotated data matrix with a neighbor graph computed. min_dist The effective minimum distance between embedded points. Smaller values result in a more clustered embedding. Default 0.5. spread The effective scale of embedded points. In combination with min_dist this determines how clustered/clumped the embedding is. Default 1.0. n_components Number of UMAP dimensions. Default 2. neighbors_key Key in .uns where neighbor information is stored. Default 'neighbors'. random_state Random seed for reproducibility. copy If True, return a copy with results. Otherwise modify in place. Returns ------- adata : AnnData | None If copy=True, returns modified AnnData. Otherwise modifies in place. Modifies adata -------------- obsm['X_umap'] UMAP embedding (n_obs × n_components). uns['umap'] Dict with UMAP parameters. Examples -------- >>> import crispyx as cx >>> adata = cx.read_backed("data.h5ad") >>> cx.pp.pca(adata, n_comps=50) >>> cx.pp.neighbors(adata, n_neighbors=15) >>> cx.tl.umap(adata) >>> adata.obsm['X_umap'].shape (n_obs, 2) Notes ----- This function wraps scanpy.tl.umap. The neighbor graph must be computed first using cx.pp.neighbors(). Memory requirements scale linearly with the number of cells: approximately 0.75MB per 1000 cells for 15 neighbors. See Also -------- cx.pp.neighbors : Compute k-nearest neighbors graph. cx.pl.umap : Plot UMAP embedding. """ import scanpy as sc # Detect if this is a crispyx.AnnData wrapper (backed, with .path) is_crispyx_wrapper = isinstance(adata, CrispyxAnnData) # Check for neighbor graph if neighbors_key not in adata.uns: raise ValueError( f"'{neighbors_key}' not found in adata.uns. " f"Run cx.pp.neighbors() first." ) # For backed data, we need to load neighbors into memory if is_crispyx_wrapper and not copy: path = adata.path n_obs = adata.n_obs # Load neighbor graph components into memory connectivities = adata.obsp["connectivities"] if sparse.issparse(connectivities): connectivities = connectivities.tocsr() else: connectivities = sparse.csr_matrix(connectivities) distances = adata.obsp["distances"] if sparse.issparse(distances): distances = distances.tocsr() else: distances = sparse.csr_matrix(distances) neighbors_uns = dict(adata.uns[neighbors_key]) # Create a minimal in-memory AnnData with neighbors and optionally X_pca import anndata obsm_dict = {} init_pos = "spectral" # Include X_pca if available for spectral initialization if "X_pca" in adata.obsm: X_pca = adata.obsm["X_pca"] X_pca = np.asarray(X_pca) obsm_dict["X_pca"] = X_pca else: # Use random initialization if no X_pca init_pos = "random" logger.info("X_pca not found, using random initialization for UMAP") adata_mem = anndata.AnnData( X=sparse.csr_matrix((n_obs, 1)), # Minimal X (not used) obsp={ "connectivities": connectivities, "distances": distances, }, uns={neighbors_key: neighbors_uns}, ) # Add obsm separately to avoid shape mismatch issues for key, val in obsm_dict.items(): adata_mem.obsm[key] = val logger.info( f"Computing UMAP embedding on {n_obs} cells " f"(min_dist={min_dist}, spread={spread})" ) # Run scanpy's UMAP on the minimal AnnData sc.tl.umap( adata_mem, min_dist=min_dist, spread=spread, n_components=n_components, init_pos=init_pos, neighbors_key=neighbors_key, random_state=random_state, ) # Extract results X_umap = adata_mem.obsm["X_umap"] umap_uns = adata_mem.uns.get("umap", { "params": { "min_dist": min_dist, "spread": spread, "n_components": n_components, "random_state": random_state, } }) # Close crispyx wrapper and write results adata.close() write_obsm_to_h5ad(path, "X_umap", X_umap) write_uns_dict_to_h5ad(path, "umap", umap_uns) logger.info( f"UMAP complete: {n_components} components (written to {path})" ) return None else: # In-memory or copy mode if copy: if is_crispyx_wrapper: adata = adata.to_memory() elif hasattr(adata, 'file') and adata.file is not None: adata = adata.to_memory() else: adata = adata.copy() logger.info( f"Computing UMAP embedding on {adata.n_obs} cells " f"(min_dist={min_dist}, spread={spread})" ) # Run scanpy's UMAP sc.tl.umap( adata, min_dist=min_dist, spread=spread, n_components=n_components, neighbors_key=neighbors_key, random_state=random_state, ) logger.info(f"UMAP complete: {n_components} components") if copy: return adata return None