Skip to content

Utilities

Miscellaneous functions

build_knn_graph

build_knn_graph(correlation_matrix, labels, k=5)

Build a k-nearest neighbors (kNN) graph from a correlation matrix.

Parameters:

  • correlation_matrix (ndarray) –

    Square correlation matrix.

  • labels (list) –

    Node labels corresponding to the rows/columns of the correlation matrix.

  • k (int, default: 5 ) –

    Number of nearest neighbors to connect each node to.

Returns:

  • igraph.Graph: kNN graph.

Source code in src/pySingleCellNet/utils/knn.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def build_knn_graph(correlation_matrix, labels, k=5):
    """
    Build a k-nearest neighbors (kNN) graph from a correlation matrix.

    Parameters:
        correlation_matrix (ndarray): Square correlation matrix.
        labels (list): Node labels corresponding to the rows/columns of the correlation matrix.
        k (int): Number of nearest neighbors to connect each node to.

    Returns:
        igraph.Graph: kNN graph.
    """

    # import igraph as ig

    # Ensure the correlation matrix is square
    assert correlation_matrix.shape[0] == correlation_matrix.shape[1], "Matrix must be square."

    # Initialize the graph
    n = len(labels)
    g = ig.Graph()
    g.add_vertices(n)
    g.vs["name"] = labels  # Add node labels

    # Build kNN edges
    for i in range(n):
        # Get k largest correlations (excluding self-correlation)
        neighbors = np.argsort(correlation_matrix[i, :])[-(k + 1):-1]  # Exclude the node itself
        for j in neighbors:
            g.add_edge(i, j, weight=correlation_matrix[i, j])

    return g

call_outlier_cells

call_outlier_cells(adata, metric=['total_counts'], nmads=5)

determines whether obs[metric] exceeds nmads

Parameters:

adata : AnnData The input AnnData object containing single-cell data. metric : str The column name in adata.obs holding cell metric nmads : int, optional (default=5) The number of median abs deviations to define a cell as an outlier

Returns

None The function adds a new column to adata.obs named "outlier_" + metric, but does not return anything.

Source code in src/pySingleCellNet/utils/qc.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def call_outlier_cells(adata, metric = ["total_counts"], nmads = 5):
    """
    determines whether obs[metric] exceeds nmads 

    Parameters:
    -----------
    adata : AnnData
        The input AnnData object containing single-cell data.
    metric : str
        The column name in `adata.obs` holding cell metric 
    nmads : int, optional (default=5)
        The number of median abs deviations to define a cell as an outlier

    Returns
    -------
    None
        The function adds a new column to `adata.obs` named "outlier_" + metric, but does not return anything.
    """
    M = adata.obs[metric]
    outlier = (M < np.median(M) - nmads * median_abs_deviation(M)) | (
        np.median(M) + nmads * median_abs_deviation(M) < M
    )

    new_col = "outlier_" + nmads + "_" + metric
    adata.obs[new_col] = outlier

create_gene_structure_dict_by_stage

create_gene_structure_dict_by_stage(file_path, stage)

Create a dictionary mapping structures to lists of genes expressed at a specific stage. Designed for parsing output from Jax Labs MGI data

Parameters:

  • file_path (str) –

    Path to the gene expression file.

  • stage (str or int) –

    The Theiler Stage to filter the data.

Returns:

  • dict

    A dictionary where keys are structures and values are lists of genes expressed in those structures.

Source code in src/pySingleCellNet/utils/annotation.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def create_gene_structure_dict_by_stage(file_path, stage):
    """
    Create a dictionary mapping structures to lists of genes expressed at a specific stage. Designed for parsing output from Jax Labs MGI data

    Parameters:
        file_path (str): Path to the gene expression file.
        stage (str or int): The Theiler Stage to filter the data.

    Returns:
        dict: A dictionary where keys are structures and values are lists of genes expressed in those structures.
    """
    structure_dict = defaultdict(set)  # Using a set to avoid duplicate gene symbols

    with open(file_path, 'r') as file:
        reader = csv.DictReader(file, delimiter='\t')  # Use tab-delimiter based on previous example
        for row in reader:
            if row['Theiler Stage'] == str(stage):  # Subset by stage
                structure = row['Structure']
                gene_symbol = row['Gene Symbol']
                if structure and gene_symbol:  # Ensure both fields are not empty
                    structure_dict[structure].add(gene_symbol)

    # Convert sets to lists for final output
    structure_dict = {structure: list(genes) for structure, genes in structure_dict.items()}
    return structure_dict

filter_adata_by_group_size

filter_adata_by_group_size(adata, groupby, ncells=20)

Filters an AnnData object to retain only cells from groups with at least 'ncells' cells.

Parameters:

adata : AnnData The input AnnData object containing single-cell data. groupby : str The column name in adata.obs used to define groups (e.g., cluster labels). ncells : int, optional (default=20) The minimum number of cells a group must have to be retained.

Returns:

filtered_adata : AnnData A new AnnData object containing only cells from groups with at least 'ncells' cells.

Raises:

ValueError: - If groupby is not a column in adata.obs. - If ncells is not a positive integer.

Source code in src/pySingleCellNet/utils/adataTools.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def filter_adata_by_group_size(adata: ad.AnnData, groupby: str, ncells: int = 20) -> ad.AnnData:
    """
    Filters an AnnData object to retain only cells from groups with at least 'ncells' cells.

    Parameters:
    -----------
    adata : AnnData
        The input AnnData object containing single-cell data.
    groupby : str
        The column name in `adata.obs` used to define groups (e.g., cluster labels).
    ncells : int, optional (default=20)
        The minimum number of cells a group must have to be retained.

    Returns:
    --------
    filtered_adata : AnnData
        A new AnnData object containing only cells from groups with at least 'ncells' cells.

    Raises:
    -------
    ValueError:
        - If `groupby` is not a column in `adata.obs`.
        - If `ncells` is not a positive integer.
    """
    # Input Validation
    if not isinstance(adata, ad.AnnData):
        raise TypeError(f"'adata' must be an AnnData object, but got {type(adata)}.")

    if not isinstance(groupby, str):
        raise TypeError(f"'groupby' must be a string, but got {type(groupby)}.")

    if groupby not in adata.obs.columns:
        raise ValueError(f"'{groupby}' is not a column in adata.obs. Available columns are: {adata.obs.columns.tolist()}")

    if not isinstance(ncells, int) or ncells <= 0:
        raise ValueError(f"'ncells' must be a positive integer, but got {ncells}.")

    # Compute the size of each group
    group_sizes = adata.obs[groupby].value_counts()

    # Identify groups that meet or exceed the minimum cell threshold
    valid_groups = group_sizes[group_sizes >= ncells].index.tolist()

    if not valid_groups:
        raise ValueError(f"No groups found in '{groupby}' with at least {ncells} cells.")

    # Optionally, inform the user about the filtering
    total_groups = adata.obs[groupby].nunique()
    retained_groups = len(valid_groups)
    excluded_groups = total_groups - retained_groups
    print(f"Filtering AnnData object based on group sizes in '{groupby}':")
    print(f" - Total groups: {total_groups}")
    print(f" - Groups retained (≥ {ncells} cells): {retained_groups}")
    print(f" - Groups excluded (< {ncells} cells): {excluded_groups}")

    # Create a boolean mask for cells belonging to valid groups
    mask = adata.obs[groupby].isin(valid_groups)

    # Apply the mask to filter the AnnData object
    filtered_adata = adata[mask].copy()

    # Optionally, reset indices if necessary
    # filtered_adata.obs_names = range(filtered_adata.n_obs)

    print(f"Filtered AnnData object contains {filtered_adata.n_obs} cells from {filtered_adata.obs[groupby].nunique()} groups.")

    return filtered_adata

filter_anndata_slots

filter_anndata_slots(adata, slots_to_keep, *, keep_dependencies=True)

Return a filtered COPY of adata that only keeps requested slots/keys. Unspecified slots (or with value None) are cleared.

Parameters

adata : AnnData slots_to_keep : dict Keys among {'obs','var','obsm','obsp','varm','varp','uns'}. Values are lists of names to keep within that slot; if a slot is not present in the dict or is None, all contents of that slot are removed. Example: {'obs': ['leiden','sample'], 'obsm': ['X_pca','X_umap'], 'uns': ['neighbors', 'pca', 'umap']} keep_dependencies : bool, default True If True, automatically keep cross-slot items that are commonly required: - For each neighbors block in .uns[<key>] with 'connectivities_key' / 'distances_key', also keep those in .obsp. - If an .obsp key ends with '_connectivities'/'_distances', also keep the matching .uns[<prefix>] if present. - If keeping 'X_pca' in .obsm, also keep .uns['pca'] and .varm['PCs'] if present. - If keeping 'X_umap' in .obsm, also keep .uns['umap'] if present.

Returns

AnnData A copy with filtered slots.

Source code in src/pySingleCellNet/utils/adataTools.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def filter_anndata_slots(
    adata,
    slots_to_keep: Dict[str, Optional[List[str]]],
    *,
    keep_dependencies: bool = True,
):
    """
    Return a filtered COPY of `adata` that only keeps requested slots/keys.
    Unspecified slots (or with value None) are cleared.

    Parameters
    ----------
    adata : AnnData
    slots_to_keep : dict
        Keys among {'obs','var','obsm','obsp','varm','varp','uns'}.
        Values are lists of names to keep within that slot; if a slot is not
        present in the dict or is None, all contents of that slot are removed.
        Example:
            {'obs': ['leiden','sample'],
             'obsm': ['X_pca','X_umap'],
             'uns':  ['neighbors', 'pca', 'umap']}
    keep_dependencies : bool, default True
        If True, automatically keep cross-slot items that are commonly required:
          - For each neighbors block in `.uns[<key>]` with
            'connectivities_key' / 'distances_key', also keep those in `.obsp`.
          - If an `.obsp` key ends with '_connectivities'/'_distances', also keep
            the matching `.uns[<prefix>]` if present.
          - If keeping 'X_pca' in `.obsm`, also keep `.uns['pca']` and `.varm['PCs']` if present.
          - If keeping 'X_umap' in `.obsm`, also keep `.uns['umap']` if present.

    Returns
    -------
    AnnData
        A copy with filtered slots.
    """
    ad = adata.copy()

    # Normalize user intent -> sets (and include absent slots as None)
    wanted: Dict[str, Optional[Set[str]]] = {}
    for slot in ['obs','var','obsm','obsp','varm','varp','uns']:
        v = slots_to_keep.get(slot, None)
        if v is None:
            wanted[slot] = None
        else:
            wanted[slot] = set(v)

    if keep_dependencies:
        # Start with the user's desired keeps; we may add to them
        for slot in ['obsm','obsp','varm','varp','uns']:
            if wanted[slot] is None:
                wanted[slot] = set()
        # --- neighbors dependencies ---
        # From kept UNS neighbors -> add OBSP matrices
        for k in list(wanted['uns']):
            if k in ad.uns and isinstance(ad.uns[k], dict):
                ck = ad.uns[k].get('connectivities_key', None)
                dk = ad.uns[k].get('distances_key', None)
                if ck is not None:
                    wanted['obsp'].add(ck)
                if dk is not None:
                    wanted['obsp'].add(dk)
        # From kept OBSP matrices -> add UNS neighbors blocks (by prefix)
        for m in list(wanted['obsp']):
            # match "<prefix>_connectivities" or "<prefix>_distances"
            m_str = str(m)
            m0 = re.sub(r'_(connectivities|distances)$', '', m_str)
            if m0 != m_str and (m0 in ad.uns):
                wanted['uns'].add(m0)

        # --- PCA/UMAP niceties ---
        if wanted['obsm'] and 'X_pca' in ad.obsm:
            if 'X_pca' in wanted['obsm']:
                if 'pca' in ad.uns:
                    wanted['uns'].add('pca')
                if 'PCs' in ad.varm:
                    wanted['varm'].add('PCs')
        if wanted['obsm'] and 'X_umap' in ad.obsm and 'X_umap' in wanted['obsm']:
            if 'umap' in ad.uns:
                wanted['uns'].add('umap')

        # If the user explicitly set a slot to None, restore None (means "clear all")
        for slot in ['obsm','obsp','varm','varp','uns']:
            if slots_to_keep.get(slot, '___SENTINEL___') is None:
                wanted[slot] = None

    # ---------- Apply filtering ----------
    # obs / var: keep only requested columns (preserve indices & dtypes)
    for slot in ['obs','var']:
        cols_keep = wanted[slot]
        if cols_keep is None:
            # drop all columns, preserve index
            empty = pd.DataFrame(index=getattr(ad, slot).index)
            setattr(ad, slot, empty)
        else:
            df = getattr(ad, slot)
            cols_exist = [c for c in df.columns if c in cols_keep]
            setattr(ad, slot, df.loc[:, cols_exist])

    # Mapping-like slots: operate in place to preserve AnnData's aligned mappings
    def _filter_mapping(mapping, keys_keep: Optional[Set[str]]):
        if keys_keep is None:
            mapping.clear()
            return
        # Remove any key not in keep set
        for k in list(mapping.keys()):
            if k not in keys_keep:
                del mapping[k]

    _filter_mapping(ad.obsm, wanted['obsm'])
    _filter_mapping(ad.obsp, wanted['obsp'])
    _filter_mapping(ad.varm, wanted['varm'])
    _filter_mapping(ad.varp, wanted['varp'])

    # .uns is a plain dict (but can be nested); keep only top-level keys
    if wanted['uns'] is None:
        ad.uns.clear()
    else:
        for k in list(ad.uns.keys()):
            if k not in wanted['uns']:
                del ad.uns[k]

    return ad

filter_gene_list

filter_gene_list(genelist, min_genes, max_genes=1000000.0)

Filter the gene lists in the provided dictionary based on their lengths.

  • genelist : dict Dictionary with keys as identifiers and values as lists of genes.
  • min_genes : int Minimum number of genes a list should have.
  • max_genes : int Maximum number of genes a list should have.
  • dict Filtered dictionary with lists that have a length between min_genes and max_genes (inclusive of min_genes and max_genes).
Source code in src/pySingleCellNet/utils/annotation.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def filter_gene_list(genelist, min_genes, max_genes=1e6):
    """
    Filter the gene lists in the provided dictionary based on their lengths.

    Parameters:
    - genelist : dict
        Dictionary with keys as identifiers and values as lists of genes.
    - min_genes : int
        Minimum number of genes a list should have.
    - max_genes : int
        Maximum number of genes a list should have.

    Returns:
    - dict
        Filtered dictionary with lists that have a length between min_genes and max_genes (inclusive of min_genes and max_genes).
    """
    filtered_dict = {key: value for key, value in genelist.items() if min_genes <= len(value) <= max_genes}
    return filtered_dict

find_knee_point

find_knee_point(adata, total_counts_column='total_counts')

Identifies the knee point of the UMI count distribution in an AnnData object.

Parameters:

  • adata (AnnData) –

    The input AnnData object.

  • total_counts_column (str, default: 'total_counts' ) –

    Column in adata.obs containing total UMI counts. Default is "total_counts".

Returns:

  • float

    The UMI count value at the knee point.

Source code in src/pySingleCellNet/utils/qc.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def find_knee_point(adata, total_counts_column="total_counts"):
    """
    Identifies the knee point of the UMI count distribution in an AnnData object.

    Parameters:
        adata (AnnData): The input AnnData object.
        total_counts_column (str): Column in `adata.obs` containing total UMI counts. Default is "total_counts".

    Returns:
        float: The UMI count value at the knee point.
    """
    # Extract total UMI counts
    umi_counts = adata.obs[total_counts_column]

    # Sort UMI counts in descending order
    sorted_umi_counts = np.sort(umi_counts)[::-1]

    # Compute cumulative UMI counts (normalized to a fraction)
    cumulative_counts = np.cumsum(sorted_umi_counts)
    cumulative_fraction = cumulative_counts / cumulative_counts[-1]

    # Compute derivatives to identify the knee point
    first_derivative = np.gradient(cumulative_fraction)
    second_derivative = np.gradient(first_derivative)

    # Find the index of the maximum curvature (knee point)
    knee_idx = np.argmax(second_derivative)
    knee_point_value = sorted_umi_counts[knee_idx]

    return knee_point_value

generate_joint_graph

generate_joint_graph(adata, connectivity_keys, weights, output_key='jointNeighbors')

Create a joint graph by combining multiple connectivity graphs with specified weights.

This function computes the weighted sum of selected connectivity and distance matrices in an AnnData object and stores the result in .obsp.

Parameters:

  • adata (AnnData) –

    The AnnData object containing connectivity matrices in .obsp.

  • connectivity_keys (list of str) –

    A list of keys in adata.obsp corresponding to connectivity matrices to combine.

  • weights (list of float) –

    A list of weights for each connectivity matrix. Must match the length of connectivity_keys.

  • output_key (str, default: 'jointNeighbors' ) –

    The base key under which to store the combined graph in .obsp. The default is 'jointNeighbors'.

Raises:

  • ValueError

    If the number of connectivity_keys does not match the number of weights.

  • KeyError

    If any key in connectivity_keys or its corresponding distances key is not found in adata.obsp.

Returns:

  • None

    Updates the AnnData object in place by adding the combined connectivity and distance matrices to .obsp and metadata to .uns.

Example

generate_joint_graph(adata, ['neighbors_connectivities', 'umap_connectivities'], [0.7, 0.3]) adata.obsp['jointNeighbors_connectivities'] adata.uns['jointNeighbors']

Source code in src/pySingleCellNet/utils/knn.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def generate_joint_graph(adata, connectivity_keys, weights, output_key='jointNeighbors'):
    """Create a joint graph by combining multiple connectivity graphs with specified weights.

    This function computes the weighted sum of selected connectivity and distance matrices 
    in an AnnData object and stores the result in `.obsp`.

    Args:
        adata (AnnData): 
            The AnnData object containing connectivity matrices in `.obsp`.
        connectivity_keys (list of str): 
            A list of keys in `adata.obsp` corresponding to connectivity matrices to combine.
        weights (list of float): 
            A list of weights for each connectivity matrix. Must match the length of `connectivity_keys`.
        output_key (str, optional): 
            The base key under which to store the combined graph in `.obsp`. 
            The default is `'jointNeighbors'`.

    Raises:
        ValueError: If the number of `connectivity_keys` does not match the number of `weights`.
        KeyError: If any key in `connectivity_keys` or its corresponding distances key is not found in `adata.obsp`.

    Returns:
        None: 
            Updates the AnnData object in place by adding the combined connectivity and distance matrices
            to `.obsp` and metadata to `.uns`.

    Example:
        >>> generate_joint_graph(adata, ['neighbors_connectivities', 'umap_connectivities'], [0.7, 0.3])
        >>> adata.obsp['jointNeighbors_connectivities']
        >>> adata.uns['jointNeighbors']
    """

    if len(connectivity_keys) != len(weights):
        raise ValueError("The number of connectivity keys must match the number of weights.")

    # Initialize the joint graph and distances matrix with zeros
    joint_graph = None
    joint_distances = None
    # Loop through each connectivity key and weight
    for key, weight in zip(connectivity_keys, weights):
        if key not in adata.obsp:
            raise KeyError(f"'{key}' not found in adata.obsp.")

        # Retrieve the connectivity matrix
        connectivity_matrix = adata.obsp[key]
        # Assume corresponding distances key exists
        distances_key = key.replace('connectivities', 'distances')
        if distances_key not in adata.obsp:
            raise KeyError(f"'{distances_key}' not found in adata.obsp.")
        distances_matrix = adata.obsp[distances_key]
        # Initialize or accumulate the weighted connectivity and distances matrices
        if joint_graph is None:
            joint_graph = weight * connectivity_matrix
            joint_distances = weight * distances_matrix
        else:
            joint_graph += weight * connectivity_matrix
            joint_distances += weight * distances_matrix

    # Save the resulting joint graph and distances matrix in the specified keys of .obsp

    adata.obsp[output_key + '_connectivities'] = joint_graph
    adata.obsp[output_key + '_distances'] = joint_distances

    # Save metadata about the joint graph in .uns
    adata.uns[output_key] = {
        'connectivities_key': output_key + '_connectivities',
        'distances_key': output_key + '_distances',
        'params': {
            'connectivity_keys': connectivity_keys,
            'weights': weights,
            'method': "umap"
        }
    }

get_unique_colors

get_unique_colors(n_colors)

Generate a list of unique colors from the Tab20, Tab20b, and Tab20c colormaps.

Parameters: - n_colors: The number of unique colors needed.

Returns: - A list of unique colors.

Source code in src/pySingleCellNet/utils/colors.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def get_unique_colors(n_colors):
    """
    Generate a list of unique colors from the Tab20, Tab20b, and Tab20c colormaps.

    Parameters:
    - n_colors: The number of unique colors needed.

    Returns:
    - A list of unique colors.
    """
    # Get the colormaps
    tab20 = plt.get_cmap('tab20').colors
    tab20b = plt.get_cmap('tab20b').colors
    tab20c = plt.get_cmap('tab20c').colors

    # Combine the colors from the colormaps
    combined_colors = np.vstack([tab20, tab20b, tab20c])

    # Check if the requested number of colors exceeds the available unique colors
    if n_colors > len(combined_colors):
        raise ValueError(f"Requested number of colors ({n_colors}) exceeds the available unique colors ({len(combined_colors)}).")

    # Select the required number of unique colors
    selected_colors = combined_colors[:n_colors]
    return selected_colors

mito_rib

mito_rib(adQ, species='MM', log1p=True, clean=True)

Calculate mitochondrial and ribosomal QC metrics and add them to the .var attribute of the AnnData object.

Parameters

adQ : AnnData Annotated data matrix with observations (cells) and variables (features). species : str, optional (default: "MM") The species of the input data. Can be "MM" (Mus musculus) or "HS" (Homo sapiens). clean : bool, optional (default: True) Whether to remove mitochondrial and ribosomal genes from the data.

Returns

AnnData Annotated data matrix with QC metrics added to the .var attribute.

Source code in src/pySingleCellNet/utils/qc.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def mito_rib(adQ: AnnData, species: str = "MM", log1p = True, clean: bool = True) -> AnnData:
    """
    Calculate mitochondrial and ribosomal QC metrics and add them to the `.var` attribute of the AnnData object.

    Parameters
    ----------
    adQ : AnnData
        Annotated data matrix with observations (cells) and variables (features).
    species : str, optional (default: "MM")
        The species of the input data. Can be "MM" (Mus musculus) or "HS" (Homo sapiens).
    clean : bool, optional (default: True)
        Whether to remove mitochondrial and ribosomal genes from the data.

    Returns
    -------
    AnnData
        Annotated data matrix with QC metrics added to the `.var` attribute.
    """
    # Create a copy of the input data
    adata = adQ.copy()

    # Define mitochondrial and ribosomal gene prefixes based on the species
    if species == 'MM':
        mt_prefix = "mt-"
        ribo_prefix = ("Rps","Rpl")
    else:
        mt_prefix = "MT-"
        ribo_prefix = ("RPS","RPL")

    # Add mitochondrial and ribosomal gene flags to the `.var` attribute
    adata.var['mt'] = adata.var_names.str.startswith((mt_prefix))
    adata.var['ribo'] = adata.var_names.str.startswith(ribo_prefix)

    # Calculate QC metrics using Scanpy's `calculate_qc_metrics` function
    sc.pp.calculate_qc_metrics(
        adata,
        qc_vars=['ribo', 'mt'],
        percent_top=None,
        log1p=log1p,
        inplace=True
    )

    # Optionally remove mitochondrial and ribosomal genes from the data
    if clean:
        mito_genes = adata.var_names.str.startswith(mt_prefix)
        ribo_genes = adata.var_names.str.startswith(ribo_prefix)
        remove = np.add(mito_genes, ribo_genes)
        keep = np.invert(remove)
        adata = adata[:,keep].copy()
        # sc.pp.calculate_qc_metrics(adata,percent_top=None,log1p=False,inplace=True)

    return adata

read_gmt

read_gmt(file_path)

Read a Gene Matrix Transposed (GMT) file and return a dictionary of gene sets.

Parameters:

  • file_path (str) –

    Path to the GMT file.

Returns:

  • dict ( dict ) –

    A dictionary where keys are gene set names and values are lists of associated genes.

Source code in src/pySingleCellNet/utils/annotation.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def read_gmt(file_path: str) -> dict:
    """
    Read a Gene Matrix Transposed (GMT) file and return a dictionary of gene sets.

    Args:
        file_path (str): Path to the GMT file.

    Returns:
        dict: A dictionary where keys are gene set names and values are lists of associated genes.
    """
    gene_sets = {}

    with open(file_path, 'r') as gmt_file:
        for line in gmt_file:
            columns = line.strip().split('\t')
            gene_set_name = columns[0]
            description = columns[1]  # This can be ignored if not needed
            genes = columns[2:]

            gene_sets[gene_set_name] = genes

    return gene_sets

rename_cluster_labels

rename_cluster_labels(adata, AnnData, old_col='cluster', new_col='short_cluster')

Renames cluster labels in the specified .obs column with multi-letter codes.

  • All unique labels (including NaN) are mapped in order of appearance to a base-26 style ID: 'A', 'B', ..., 'Z', 'AA', 'AB', etc.
  • The new labels are stored as a categorical column in adata.obs[new_col].

Parameters:

  • adata (AnnData) –

    The AnnData object containing the cluster labels.

  • old_col (str, default: 'cluster' ) –

    The name of the .obs column that has the original cluster labels. Defaults to "cluster".

  • new_col (str, default: 'short_cluster' ) –

    The name of the new .obs column that will store the shortened labels. Defaults to "short_cluster".

Returns:

  • None ( None ) –

    The function adds a new column to adata.obs in place.

Source code in src/pySingleCellNet/utils/adataTools.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def rename_cluster_labels(
    adata: ad,AnnData,
    old_col: str = "cluster",
    new_col: str = "short_cluster"
) -> None:
    """
    Renames cluster labels in the specified .obs column with multi-letter codes.

    - All unique labels (including NaN) are mapped in order of appearance to 
      a base-26 style ID: 'A', 'B', ..., 'Z', 'AA', 'AB', etc.
    - The new labels are stored as a categorical column in `adata.obs[new_col]`.

    Args:
        adata (AnnData):
            The AnnData object containing the cluster labels.
        old_col (str, optional):
            The name of the .obs column that has the original cluster labels.
            Defaults to "cluster".
        new_col (str, optional):
            The name of the new .obs column that will store the shortened labels.
            Defaults to "short_cluster".

    Returns:
        None: The function adds a new column to `adata.obs` in place.
    """

    # 1. Extract unique labels (including NaN), in the order they appear
    unique_labels = adata.obs[old_col].unique()

    # 2. Helper function for base-26 labeling
    def index_to_label(idx: int) -> str:
        """
        Convert a zero-based index to a base-26 letter code:
        0 -> A
        1 -> B
        ...
        25 -> Z
        26 -> AA
        27 -> AB
        ...
        """
        letters = []
        while True:
            remainder = idx % 26
            letter = chr(ord('A') + remainder)
            letters.append(letter)
            idx = idx // 26 - 1
            if idx < 0:
                break
        return ''.join(letters[::-1])

    # 3. Build the mapping (including NaN -> next code)
    label_map = {}
    for i, lbl in enumerate(unique_labels):
        label_map[lbl] = index_to_label(i)

    # 4. Apply the mapping to create the new column
    adata.obs[new_col] = adata.obs[old_col].map(label_map)
    adata.obs[new_col] = adata.obs[new_col].astype("category")

score_sex

score_sex(adata, y_genes=['Eif2s3y', 'Ddx3y', 'Uty'], x_inactivation_genes=['Xist', 'Tsix'])

Adds sex chromosome expression scores to an AnnData object.

This function calculates two scores for each cell in a scRNA-seq AnnData object
  • Y_score: the sum of expression values for a set of Y-chromosome specific genes.
  • X_inact_score: the sum of expression values for genes involved in X-chromosome inactivation.

The scores are added to the AnnData object's .obs DataFrame with the keys 'Y_score' and 'X_inact_score'.

Parameters

adata : AnnData An AnnData object containing scRNA-seq data, with gene names in adata.var_names. y_genes : list of str, optional List of Y-chromosome specific marker genes (default is ['Eif2s3y', 'Ddx3y', 'Uty']). x_inactivation_genes : list of str, optional List of genes involved in X-chromosome inactivation (default is ['Xist', 'Tsix']).

Raises

ValueError If none of the Y-specific or X inactivation genes are found in adata.var_names.

Returns

None The function modifies the AnnData object in place by adding the score columns to adata.obs.

Source code in src/pySingleCellNet/utils/qc.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def score_sex(
    adata, 
    y_genes=['Eif2s3y', 'Ddx3y', 'Uty'], 
    x_inactivation_genes=['Xist', 'Tsix']
):
    """
    Adds sex chromosome expression scores to an AnnData object.

    This function calculates two scores for each cell in a scRNA-seq AnnData object:
      - Y_score: the sum of expression values for a set of Y-chromosome specific genes.
      - X_inact_score: the sum of expression values for genes involved in X-chromosome inactivation.

    The scores are added to the AnnData object's `.obs` DataFrame with the keys 'Y_score' and 'X_inact_score'.

    Parameters
    ----------
    adata : AnnData
        An AnnData object containing scRNA-seq data, with gene names in `adata.var_names`.
    y_genes : list of str, optional
        List of Y-chromosome specific marker genes (default is ['Eif2s3y', 'Ddx3y', 'Uty']).
    x_inactivation_genes : list of str, optional
        List of genes involved in X-chromosome inactivation (default is ['Xist', 'Tsix']).

    Raises
    ------
    ValueError
        If none of the Y-specific or X inactivation genes are found in `adata.var_names`.

    Returns
    -------
    None
        The function modifies the AnnData object in place by adding the score columns to `adata.obs`.
    """
    # Filter for genes that are available in the dataset.
    available_y_genes = [gene for gene in y_genes if gene in adata.var_names]
    available_x_genes = [gene for gene in x_inactivation_genes if gene in adata.var_names]

    if not available_y_genes:
        raise ValueError("None of the Y-specific genes were found in the dataset.")
    if not available_x_genes:
        raise ValueError("None of the X inactivation genes were found in the dataset.")

    # Compute the sum of expression for the Y-specific genes.
    y_expression = adata[:, available_y_genes].X
    if hasattr(y_expression, "toarray"):
        y_expression = y_expression.toarray()
    adata.obs['Y_score'] = np.sum(y_expression, axis=1)

    # Compute the sum of expression for the X inactivation genes.
    x_expression = adata[:, available_x_genes].X
    if hasattr(x_expression, "toarray"):
        x_expression = x_expression.toarray()
    adata.obs['X_inact_score'] = np.sum(x_expression, axis=1)

    # Optionally, you could log some output:
    print("Added 'Y_score' and 'X_inact_score' to adata.obs for {} cells.".format(adata.n_obs))

split_adata_indices

split_adata_indices(adata, n_cells=100, groupby='cell_ontology_class', cellid=None, strata_col=None)

Splits an AnnData object into training and validation indices based on stratification by cell type and optionally by another categorical variable.

Parameters:

  • adata (AnnData) –

    The annotated data matrix to split.

  • n_cells (int, default: 100 ) –

    The number of cells to sample per cell type.

  • groupby (str, default: 'cell_ontology_class' ) –

    The column name in adata.obs that specifies the cell type. Defaults to "cell_ontology_class".

  • cellid (str, default: None ) –

    The column in adata.obs to use as a unique identifier for cells. If None, it defaults to using the index.

  • strata_col (str, default: None ) –

    The column name in adata.obs used for secondary stratification, such as developmental stage, gender, or disease status.

Returns:

  • tuple ( tuple ) –

    A tuple containing two lists: - training_indices (list): List of indices for the training set. - validation_indices (list): List of indices for the validation set.

Raises:

  • ValueError

    If any specified column names do not exist in the DataFrame.

Source code in src/pySingleCellNet/utils/adataTools.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def split_adata_indices(
    adata: ad.AnnData,
    n_cells: int = 100,
    groupby: str = "cell_ontology_class",
    cellid: str = None,
    strata_col: str  = None
) -> tuple:
    """
    Splits an AnnData object into training and validation indices based on stratification by cell type
    and optionally by another categorical variable.

    Args:
        adata (AnnData): The annotated data matrix to split.
        n_cells (int): The number of cells to sample per cell type.
        groupby (str, optional): The column name in adata.obs that specifies the cell type.
                                 Defaults to "cell_ontology_class".
        cellid (str, optional): The column in adata.obs to use as a unique identifier for cells.
                                If None, it defaults to using the index.
        strata_col (str, optional): The column name in adata.obs used for secondary stratification,
                                    such as developmental stage, gender, or disease status.

    Returns:
        tuple: A tuple containing two lists:
            - training_indices (list): List of indices for the training set.
            - validation_indices (list): List of indices for the validation set.

    Raises:
        ValueError: If any specified column names do not exist in the DataFrame.
    """
    if cellid is None:
        adata.obs["cellid"] = adata.obs.index
        cellid = "cellid"
    if groupby not in adata.obs.columns or (strata_col and strata_col not in adata.obs.columns):
        raise ValueError("Specified column names do not exist in the DataFrame.")

    cts = set(adata.obs[groupby])
    trainingids = []

    for ct in cts:
        subset = adata[adata.obs[groupby] == ct]

        if strata_col:
            stratified_ids = []
            strata_groups = subset.obs[strata_col].unique()
            n_strata = len(strata_groups)

            # Initialize desired count and structure to store samples per strata
            desired_per_group = n_cells // n_strata
            samples_per_group = {}
            remaining = 0

            # First pass: allocate base quota or maximum available if less than base
            for group in strata_groups:
                group_subset = subset[subset.obs[strata_col] == group]
                available = group_subset.n_obs
                if available < desired_per_group:
                    samples_per_group[group] = available
                    remaining += desired_per_group - available
                else:
                    samples_per_group[group] = desired_per_group

            # Second pass: redistribute remaining quota among groups that can supply more
            # Continue redistributing until either there's no remaining quota or no group can supply more.
            groups_can_supply = True
            while remaining > 0 and groups_can_supply:
                groups_can_supply = False
                for group in strata_groups:
                    group_subset = subset[subset.obs[strata_col] == group]
                    available = group_subset.n_obs
                    # Check if this group can supply an extra cell beyond what we've allocated so far
                    if samples_per_group[group] < available:
                        samples_per_group[group] += 1
                        remaining -= 1
                        groups_can_supply = True
                        if remaining == 0:
                            break

            # Sample cells for each strata group based on the determined counts
            for group in strata_groups:
                group_subset = subset[subset.obs[strata_col] == group]
                count_to_sample = samples_per_group.get(group, 0)
                if count_to_sample > 0:
                    sampled_ids = np.random.choice(
                        group_subset.obs[cellid].values, 
                        count_to_sample, 
                        replace=False
                    )
                    stratified_ids.extend(sampled_ids)

            trainingids.extend(stratified_ids)
        else:
            ccount = min(subset.n_obs, n_cells)
            sampled_ids = np.random.choice(subset.obs[cellid].values, ccount, replace=False)
            trainingids.extend(sampled_ids)

    # Get all unique IDs
    all_ids = adata.obs[cellid].values
    # Determine validation IDs
    assume_unique = adata.obs_names.is_unique
    val_ids = np.setdiff1d(all_ids, trainingids, assume_unique=assume_unique)

    return trainingids, val_ids

write_gmt

write_gmt(gene_list, filename, collection_name, prefix='')

Write a .gmt file from a gene list.

gene_list: dict Dictionary of gene sets (keys are gene set names, values are lists of genes). filename: str The name of the file to write to. collection_name: str The name of the gene set collection. prefix: str, optional A prefix to add to each gene set name.

Source code in src/pySingleCellNet/utils/annotation.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def write_gmt(gene_list, filename, collection_name, prefix=""):
    """
    Write a .gmt file from a gene list.

    Parameters:
    gene_list: dict
        Dictionary of gene sets (keys are gene set names, values are lists of genes).
    filename: str
        The name of the file to write to.
    collection_name: str
        The name of the gene set collection.
    prefix: str, optional
        A prefix to add to each gene set name.
    """
    with open(filename, mode='w') as fo:
        for akey in gene_list:
            # replace whitespace with a "_"
            gl_name = re.sub(r'\s+', '_', akey)
            if prefix:
                pfix = prefix + "_" + gl_name
            else:
                pfix = gl_name
            preface = pfix + "\t" + collection_name + "\t"
            output = preface + "\t".join(gene_list[akey])
            print(output, file=fo)