Skip to content

Reference

See below for descriptions of relevant methods for computing IHD.

adjust_spectra_for_nuccomp(a_spectra, b_spectra, a_denom, b_denom)

Scale the counts in the input mutation spectra such that they are adjusted in terms of nucleotide context. For a given mutation type (e.g., C>T), if the a_spectra contains more C>[A/T/G] mutations, it may simply be because the samples represented in a_spectra had more cytosines that were accessible to variant calling. If a_denom contains more C nucleotides than b_denom, we adjust the counts of the C>[A/T/G] mutations in a_spectra by a scaling factor (b_denom / a_denom). And vice versa.

Parameters:

Name Type Description Default
a_spectra ndarray

1D numpy array containing the aggregate mutation spectrum in group "A."

required
b_spectra ndarray

1D numpy array containing the aggregate mutation spectrum in group "B."

required
a_denom ndarray

1D numpy array containing the aggregate number of callable base pairs in group "A." At each index i in this array, the total count of accessible nucleotides should correspond to the "reference" nucleotide of the corresponding mutation type in a_spectra at the same index. For example, if the first two entries of a_spectra corresponds to the aggregate count of C>T and C>A mutations, the first two entries of a_denom should both contain the aggregate counts of accessible C nucleotides in the group.

required
b_denom ndarray

1D numpy array containing the aggregate number of callable base pairs in group "B." At each index i in this array, the total count of accessible nucleotides should correspond to the "reference" nucleotide of the corresponding mutation type in b_spectra at the same index. For example, if the first two entries of b_spectra corresponds to the aggregate count of C>T and C>A mutations, the first two entries of b_denom should both contain the aggregate counts of accessible C nucleotides in the group.

required

Returns:

Name Type Description
adj_a_spectra ndarray

The first of the input mutation spectra, adjusted for nucleotide context.

adj_b_spectra ndarray

The second of the input mutation spectra, adjusted for nucleotide context.

Source code in ihd/utils.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
def adjust_spectra_for_nuccomp(
    a_spectra: np.ndarray,
    b_spectra: np.ndarray,
    a_denom: np.ndarray,
    b_denom: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
    """Scale the counts in the input mutation spectra
    such that they are adjusted in terms of nucleotide context.
    For a given mutation type (e.g., C>T), if the `a_spectra` 
    contains more C>[A/T/G] mutations, it may simply be because
    the samples represented in `a_spectra` had more cytosines 
    that were accessible to variant calling. If `a_denom` contains
    more C nucleotides than `b_denom`, we adjust the counts of the 
    C>[A/T/G] mutations in `a_spectra` by a scaling factor 
    (`b_denom` / `a_denom`). And vice versa.

    Args:
        a_spectra (np.ndarray): 1D numpy array containing the aggregate mutation \
            spectrum in group "A."

        b_spectra (np.ndarray): 1D numpy array containing the aggregate mutation \
            spectrum in group "B."

        a_denom (np.ndarray): 1D numpy array containing the aggregate number of \
            callable base pairs in group "A." At each index i in this array, the \
            total count of accessible nucleotides should correspond to the "reference" \
            nucleotide of the corresponding mutation type in `a_spectra` at the same index. \
            For example, if the first two entries of `a_spectra` corresponds to the aggregate count \
            of C>T and C>A mutations, the first two entries of `a_denom` should both contain \
            the aggregate counts of accessible C nucleotides in the group.

        b_denom (np.ndarray): 1D numpy array containing the aggregate number of \
            callable base pairs in group "B." At each index i in this array, the \
            total count of accessible nucleotides should correspond to the "reference" \
            nucleotide of the corresponding mutation type in `b_spectra` at the same index.
            For example, if the first two entries of `b_spectra` corresponds to the aggregate count \
            of C>T and C>A mutations, the first two entries of `b_denom` should both contain \
            the aggregate counts of accessible C nucleotides in the group.

    Returns:
        adj_a_spectra (np.ndarray): The first of the input mutation spectra, adjusted for nucleotide context.
        adj_b_spectra (np.ndarray): The second of the input mutation spectra, adjusted for nucleotide context.
    """
    # store the adjusted mutation spectra in two new arrays
    new_a_spectra, new_b_spectra = (
        np.zeros(a_spectra.shape),
        np.zeros(b_spectra.shape),
    )

    # loop over the indices of the input mutation spectra
    for nuc_i in np.arange(a_spectra.shape[0]):
        # get the aggregate count of the mutation type in either group
        a_mut_count, b_mut_count = a_spectra[nuc_i], b_spectra[nuc_i]
        # get the aggregate count of accessible nucleotides in either group
        a_nuc_count, b_nuc_count = a_denom[nuc_i], b_denom[nuc_i]

        # if the count of nucleotides in group A is larger than in group B,
        # adjust group A down by the scaling factor
        if a_nuc_count > b_nuc_count:
            adj_factor = b_nuc_count / a_nuc_count
            a_mut_count *= adj_factor
        # and vice versa if group B > group A
        elif b_nuc_count > a_nuc_count:
            adj_factor = a_nuc_count / b_nuc_count
            b_mut_count *= adj_factor

        new_a_spectra[nuc_i] = a_mut_count
        new_b_spectra[nuc_i] = b_mut_count

    return new_a_spectra, new_b_spectra

calculate_confint(spectra, genotype_matrix, covariate_matrix, distance_method=compute_manual_chisquare, n_permutations=10000, progress=True, adjust_statistics=True, conf_int=90.0)

Calculate a confidence interval around the maximum observed distance peak by performing bootstrap resampling. In each of the n_permutations trials do the following: 1. resample the input mutation spectra with replacement. 2. run an IHD scan by computing the distance between the aggregate mutation spectrum of samples with either genotype at every marker in the genotype_matrix. 3. record the maximum distance encountered at any marker, as well as the marker index at which that distance was observed. We'll consider that marker to be the likely "peak." Then, we return the bounds of the markers that contain 95% of all observed peaks.

Parameters:

Name Type Description Default
spectra ndarray

A 2D numpy array of mutation spectra in all genotyped samples, of size (N, M) where N is the number of samples and M is the number of mutation types.

required
genotype_matrix ndarray

A 2D numpy array of genotypes at every genotyped marker, of size (G, N), where G is the number of genotyped sites and N is the number of samples.

required
covariate_matrix ndarray

A 2D numpy array of size (C, N), where C is the number of covariates and N is the number of samples.

required
distance_method Callable

Callable method to compute the distance between aggregate mutation spectra. Must accept two 1D numpy arrays and return a single floating point value. Defaults to compute_manual_chisquare.

compute_manual_chisquare
n_permutations int

Number of permutations to perform (i.e., number of times to resample the spectra with replacement. Defaults to 1_000.

10000
progress bool

Whether to output a count of how many permutations have completed. Defaults to False.

True
adjust_statistics bool

Whether to compute adjusted statistics at each marker by regressing genotype similarity against statistics. Defaults to True.

True
conf_int float

The confidence interval to compute around the marker with the highest distance value. Defaults to 95.

90.0

Returns:

Name Type Description
conf_ints Tuple

Tuple of two integers, corresponding to the lower and upper bound markers defining the specified confidence interval.

Source code in ihd/utils.py
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
@numba.njit(parallel=True)
def calculate_confint(
    spectra: np.ndarray,
    genotype_matrix: np.ndarray,
    covariate_matrix: np.ndarray,
    distance_method: Callable = compute_manual_chisquare,
    n_permutations: int = 10_000,
    progress: bool = True,
    adjust_statistics: bool = True,
    conf_int: float = 90.0,
) -> Tuple[int, int]:
    """Calculate a confidence interval around the maximum observed 
    distance peak by performing bootstrap resampling.  In each of the 
    `n_permutations` trials do the following: 1. resample the input mutation `spectra`
    with replacement. 2. run an IHD scan by computing the distance between
    the aggregate mutation spectrum of samples with either genotype
    at every marker in the `genotype_matrix`. 3. record the maximum distance encountered 
    at any marker, as well as the marker index at which that distance was observed.
    We'll consider that marker to be the likely "peak."
    Then, we return the bounds of the markers that contain 95% of all observed 
    peaks.

    Args:
        spectra (np.ndarray): A 2D numpy array of mutation spectra in all \
            genotyped samples, of size (N, M) where N is the number of samples \
            and M is the number of mutation types.

        genotype_matrix (np.ndarray): A 2D numpy array of genotypes at every \
            genotyped marker, of size (G, N), where G is the number of genotyped \
            sites and N is the number of samples.

        covariate_matrix (np.ndarray): A 2D numpy array of size (C, N), where C is \
            the number of covariates and N is the number of samples.

        distance_method (Callable, optional): Callable method to compute the \
            distance between aggregate mutation spectra. Must accept two 1D numpy \
            arrays and return a single floating point value. Defaults to \
            `compute_manual_chisquare`.

        n_permutations (int, optional): Number of permutations to perform \
            (i.e., number of times to resample the spectra with replacement. \
            Defaults to 1_000.

        progress (bool, optional): Whether to output a count of how many permutations \
            have completed. Defaults to False.

        adjust_statistics (bool, optional): Whether to compute adjusted statistics \
            at each marker by regressing genotype similarity against statistics. \
            Defaults to True.

        conf_int (float, optional): The confidence interval to compute around the marker \
            with the highest distance value. Defaults to 95.


    Returns:
        conf_ints (Tuple): Tuple of two integers, corresponding to the lower \
            and upper bound markers defining the specified confidence interval.
    """

    # store index of marker at max peak encountered in each permutation
    peak_markers: np.ndarray = np.zeros(n_permutations)

    for pi in numba.prange(n_permutations):
        if pi > 0 and pi % 1000 == 0 and progress:
            print(pi)
        # resample the mutation spectra by bootstrapping
        resampled_idxs = np.random.randint(
            0,
            high=spectra.shape[0],
            size=spectra.shape[0],
        )

        resampled_spectra = spectra[resampled_idxs, :]
        # resample the corresponding genotype data using the indices
        resampled_genotype_matrix = genotype_matrix[:, resampled_idxs]
        # recalculate genotype similarities using the resampled genotype matrix. NOTE: this is slow!
        resampled_genotype_similarity = compute_genotype_similarity(
            resampled_genotype_matrix
        )
        # resample the covariate matrix to include the bootstrap resampled samples
        resampled_covariate_matrix = covariate_matrix[:, resampled_idxs]
        resampled_covariate_ratios = calculate_covariate_by_marker(
            resampled_covariate_matrix,
            resampled_genotype_matrix,
        )

        # perform the IHD scan
        focal_dists = perform_ihd_scan(
            resampled_spectra,
            resampled_genotype_matrix,
            resampled_genotype_similarity,
            resampled_covariate_ratios,
            distance_method=distance_method,
            adjust_statistics=adjust_statistics,
        )

        peak_marker_i = np.argmax(focal_dists)
        peak_markers[pi] = peak_marker_i

    pctile_lo = (100 - conf_int) / 2.0
    pctile_hi = conf_int + pctile_lo

    return (
        int(np.percentile(peak_markers, q=pctile_lo)),
        int(np.percentile(peak_markers, q=pctile_hi)),
        peak_markers,
    )

calculate_covariate_by_marker(covariate_matrix, genotype_matrix)

For each covariate in the covariate_matrix, compute the ratio of covariate values in samples with A or B genotypes at each marker in the genotype_matrix.

Parameters:

Name Type Description Default
covariate_matrix ndarray

A 2D numpy array of size (C, N), where C is the number of covariates and N is the number of samples.

required
genotype_matrix ndarray

A 2D numpy array of genotypes at every genotyped marker, of size (G, N), where G is the number of genotyped sites and N is the number of samples.

required

Returns:

Type Description
ndarray

np.ndarray: A 2D numpy array of size (G, C), where G is the number of genotyped sites and C is the number of covariates.

Source code in ihd/utils.py
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
@numba.njit
def calculate_covariate_by_marker(
    covariate_matrix: np.ndarray,
    genotype_matrix: np.ndarray,
) -> np.ndarray:
    """For each covariate in the `covariate_matrix`, compute the ratio of
        covariate values in samples with A or B genotypes at each marker in the
        `genotype_matrix`.

    Args:
        covariate_matrix (np.ndarray): A 2D numpy array of size (C, N), where C is \
            the number of covariates and N is the number of samples.

        genotype_matrix (np.ndarray): A 2D numpy array of genotypes at \
            every genotyped marker, of size (G, N), where G is the number \
            of genotyped sites and N is the number of samples.

    Returns:
        np.ndarray: A 2D numpy array of size (G, C), where G is the number of \
            genotyped sites and C is the number of covariates.
    """
    # store the ratio of covariate values between D and B haplotypes
    # at every marker along the genome for each covariate
    covariate_ratios: np.ndarray = np.zeros(
        (genotype_matrix.shape[0], covariate_matrix.shape[0]),
        dtype=np.float64,
    )
    # loop over every site in the genotype matrix
    for ni in np.arange(genotype_matrix.shape[0]):
        a_hap_idxs = np.where(genotype_matrix[ni] == 0)[0]
        b_hap_idxs = np.where(genotype_matrix[ni] == 2)[0]
        # loop over every covariate in the covariate matrix
        for ci in np.arange(covariate_matrix.shape[0]):
            # subset covariate matrix to the covariate of interest
            covariate_matrix_sub = covariate_matrix[ci]
            # compute ratio of covariate values between the two groups
            a_sum = np.sum(covariate_matrix_sub[a_hap_idxs])
            b_sum = np.sum(covariate_matrix_sub[b_hap_idxs])
            ratio = a_sum / b_sum
        covariate_ratios[ni, ci] = ratio

    return covariate_ratios

compute_allele_frequency(genotype_matrix)

Given a genotype matrix of size (G, N) where G is the number of genotyped sites and N is the number of samples, compute the allele frequency at every marker.

Parameters:

Name Type Description Default
genotype_matrix ndarray

A 2D numpy array of genotypes at every genotyped marker, of size (G, N), where G is the number of genotyped sites and N is the number of samples.

required

Returns:

Name Type Description
allele_frequencies ndarray

A 1D numpy array of size (G, ) containing allele frequencies at every genotyped site in the collection of N samples.

Source code in ihd/utils.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@numba.njit
def compute_allele_frequency(genotype_matrix: np.ndarray) -> np.ndarray:
    """Given a genotype matrix of size (G, N) where G is the number of 
    genotyped sites and N is the number of samples, compute the allele 
    frequency at every marker.

    Args:
        genotype_matrix (np.ndarray): A 2D numpy array of genotypes at \
            every genotyped marker, of size (G, N), where G is the number \
            of genotyped sites and N is the number of samples.

    Returns:
        allele_frequencies (np.ndarray): A 1D numpy array of size (G, ) containing allele frequencies \
        at every genotyped site in the collection of N samples.
    """
    # figure out the allele number
    allele_number = genotype_matrix.shape[1] * 2
    # sum the genotypes at each locus
    allele_counts = compute_nansum(genotype_matrix, row=True)
    # get allele frequencies
    return allele_counts / allele_number

compute_genotype_similarity(genotype_matrix)

Compute the genetic similarity between groups of haplotypes at every genotyped marker. At each marker, divide the haplotypes into two groups based on the parental allele each haplotype inherited. In each group, calculate allele frequencies at every marker along the genome. Then, calculate the Pearson correlation coefficient between the two groups' allele frequency vectors.

Parameters:

Name Type Description Default
genotype_matrix ndarray

A 2D numpy array of genotypes at every genotyped marker, of size (G, N), where G is the number of genotyped sites and N is the number of samples.

required

Returns:

Name Type Description
genotype_similarity ndarray

A 1D numpy array of size (G, ), where G is the number of genotyped sites.

Source code in ihd/utils.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
@numba.njit
def compute_genotype_similarity(genotype_matrix: np.ndarray) -> np.ndarray:
    """Compute the genetic similarity between groups of haplotypes
    at every genotyped marker. At each marker, divide the haplotypes into
    two groups based on the parental allele each haplotype inherited. In 
    each group, calculate allele frequencies at every marker along the genome.
    Then, calculate the Pearson correlation coefficient between the two groups'
    allele frequency vectors.

    Args:
        genotype_matrix (np.ndarray): A 2D numpy array of genotypes at \
            every genotyped marker, of size (G, N), where G is the number \
            of genotyped sites and N is the number of samples.

    Returns:
        genotype_similarity (np.ndarray): A 1D numpy array of size (G, ), where G is the number \
        of genotyped sites.
    """
    # store genotype similarities at every marker
    genotype_sims: np.ndarray = np.zeros(genotype_matrix.shape[0], dtype=np.float64)
    # loop over every site in the genotype matrix
    for ni in np.arange(genotype_matrix.shape[0]):
        a_hap_idxs = np.where(genotype_matrix[ni] == 0)[0]
        b_hap_idxs = np.where(genotype_matrix[ni] == 2)[0]
        # compute allele frequencies in each haplotype group
        a_afs, b_afs = (
            compute_allele_frequency(genotype_matrix[:, a_hap_idxs]),
            compute_allele_frequency(genotype_matrix[:, b_hap_idxs]),
        )
        # compute Pearson correlation between allele frequencies
        af_corr = np.corrcoef(a_afs, b_afs)[0][1]
        # NOTE: not sure how good of an idea this is, even if it's rare
        if np.isnan(af_corr):
            af_corr = 0
        genotype_sims[ni] = af_corr

    return genotype_sims

compute_haplotype_distance(a_haps, b_haps, distance_method=compute_manual_chisquare)

Compute the distance between the aggregate mutation spectrum of two collections of haplotype mutation data. The input arrays should both be 2D numpy arrays of size (N, M), with rows and columns corresponding to samples and mutation types, respectively. This method will first aggregate the mutation spectra across samples to create two new 1D arrays, each of size (M, ). Then, it will compute the distance between those two 1D arrays using the provided distance method.

Parameters:

Name Type Description Default
a_haps ndarray

2D array of size (N, M) containing the mutation spectrum of each sample, where N is the number of samples and M is the number of mutation types.

required
b_haps ndarray

2D array of size (N, M) containing the mutation spectrum of each sample, where N is the number of samples and M is the number of mutation types.

required
distance_method Callable

Callable method to compute the distance between aggregate mutation spectra. Must accept two 1D numpy arrays and return a single floating point value. Defaults to compute_manual_chisquare.

compute_manual_chisquare

Returns:

Name Type Description
distance float64

Distance between the aggregate mutation spectra of the two haplotypes.

Source code in ihd/utils.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
@numba.njit
def compute_haplotype_distance(
    a_haps: np.ndarray,
    b_haps: np.ndarray,
    distance_method: Callable = compute_manual_chisquare,
) -> np.float64:
    """Compute the distance between the aggregate
    mutation spectrum of two collections of haplotype mutation data.
    The input arrays should both be 2D numpy arrays of size (N, M), 
    with rows and columns corresponding to samples and mutation types, 
    respectively. This method will first aggregate the mutation spectra across 
    samples to create two new 1D arrays, each of size (M, ). Then, it 
    will compute the distance between those two 1D arrays using the provided 
    distance method.

    Args:
        a_haps (np.ndarray): 2D array of size (N, M) containing the mutation \
            spectrum of each sample, where N is the number of samples and M is \
            the number of mutation types.

        b_haps (np.ndarray): 2D array of size (N, M) containing the mutation \
            spectrum of each sample, where N is the number of samples and M \
            is the number of mutation types.

        distance_method (Callable, optional): Callable method to compute the \
            distance between aggregate mutation spectra. Must accept two 1D numpy \
            arrays and return a single floating point value. Defaults to \
            `compute_manual_chisquare`.

    Returns:
        distance (np.float64): Distance between the aggregate \
            mutation spectra of the two haplotypes.
    """
    # first, sum the spectrum arrays such that we add up the
    # total number of C>T, C>A, etc. across all samples.
    a_hap_sums = np.sum(a_haps, axis=0)
    b_hap_sums = np.sum(b_haps, axis=0)

    if np.sum(a_hap_sums) == 0 or np.sum(b_hap_sums) == 0:
        dist = 0
    else:
        dist = distance_method(a_hap_sums, b_hap_sums)
    return dist

compute_manual_chisquare(a, b)

Compute a chi-square test of independence between two groups of observations. Since numba doesn't cooperate with the scipy.stats implementation, I've written a "manual" version of the calculation so that it's jit-able.

Parameters:

Name Type Description Default
a ndarray

1D numpy array of (N, ) observations.

required
b ndarray

1D numpy array of (N, ) obseravtions.

required

Returns:

Name Type Description
chisquare_stat float64

The Chi-square statistic comparing observed vs. expected values of each observation.

Source code in ihd/utils.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@numba.njit
def compute_manual_chisquare(a: np.ndarray, b: np.ndarray) -> np.float64:
    """Compute a chi-square test of independence between two
    groups of observations. Since numba doesn't cooperate with the
    `scipy.stats` implementation, I've written a "manual" version
    of the calculation so that it's jit-able.

    Args:
        a (np.ndarray): 1D numpy array of (N, ) observations.
        b (np.ndarray): 1D numpy array of (N, ) obseravtions.

    Returns:
        chisquare_stat (np.float64): The Chi-square statistic comparing \
            observed vs. expected values of each observation.
    """
    observed = np.vstack((a, b))
    row_sums = compute_nansum(observed, row=True)
    col_sums = compute_nansum(observed, row=False)
    total = np.nansum(observed)

    expected = np.zeros(observed.shape, dtype=np.float64)
    for row_i in np.arange(row_sums.shape[0]):
        for col_i in np.arange(col_sums.shape[0]):
            exp = (row_sums[row_i] * col_sums[col_i]) / total
            expected[row_i, col_i] = exp
    chi_stat = np.sum(np.square(observed - expected) / expected)
    return chi_stat

compute_manual_cosine_distance(a, b)

Compute the cosine distance between two 1D numpy arrays. Although methods to compute cosine similarity and distance exist in scipy and sklearn, they are not numba.njit'able.

Parameters:

Name Type Description Default
a ndarray

A 1D numpy array of size (N, ).

required
b ndarray

A 1D numpy array of size (N, ).

required

Returns:

Name Type Description
cosine_distance float64

Cosine distance between a and b.

Source code in ihd/utils.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@numba.njit
def compute_manual_cosine_distance(a: np.ndarray, b: np.ndarray) -> np.float64:
    """Compute the cosine distance between
    two 1D numpy arrays. Although methods to compute
    cosine similarity and distance exist in `scipy` and
    `sklearn`, they are not `numba.njit`'able.

    Args:
        a (np.ndarray): A 1D numpy array of size (N, ).
        b (np.ndarray): A 1D numpy array of size (N, ).

    Returns:
        cosine_distance (np.float64): Cosine distance between a and b.
    """
    dot = a.dot(b)
    a_sumsq, b_sumsq = np.sum(np.square(a)), np.sum(np.square(b))
    a_norm, b_norm = np.sqrt(a_sumsq), np.sqrt(b_sumsq)
    cossim = dot / (a_norm * b_norm)
    return 1 - cossim

compute_nansum(a, row=True)

Compute the sum of a 2D numpy array on a per-row basis, ignoring nans. Since numba does not support kwargs in the np.nansum function, it's necessary to piece this out into its own function so that it can be decorated with numba.njit.

Parameters:

Name Type Description Default
a ndarray

A 2D numpy array of size (N, M).

required
row bool

Whether to calculate means by row or column. Defaults to True.

True

Returns:

Name Type Description
rowsums ndarray

A 1D numpy array of size (N, ) containing sums of every row in the input.

Source code in ihd/utils.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@numba.njit
def compute_nansum(a: np.ndarray, row: bool = True) -> np.ndarray:
    """Compute the sum of a 2D numpy array
    on a per-row basis, ignoring nans. Since `numba` does not
    support kwargs in the `np.nansum` function, it's
    necessary to piece this out into its own function
    so that it can be decorated with `numba.njit`.

    Args:
        a (np.ndarray): A 2D numpy array of size (N, M).

        row (bool, optional): Whether to calculate means by row or column. Defaults to True.


    Returns:
        rowsums (np.ndarray): A 1D numpy array of size (N, ) containing \
            sums of every row in the input.
    """
    idx = 0 if row else 1
    empty_a = np.zeros(a.shape[idx])
    for i in range(a.shape[idx]):
        empty_a[i] = np.nansum(a[i]) if row else np.nansum(a[:, i])
    return empty_a

compute_residuals(X, y)

Use ordinary least-squares (OLS) to fit a linear model predicting y as a function of X. Then, compute the residuals between the predicted y-values and the true y-values.

Parameters:

Name Type Description Default
X ndarray

2D numpy array of size (M, N), where M is the number of observations and N is the number of independent variables.

required
y ndarray

1D numpy array of size (M, ), containing the dependent variables for all M observations.

required

Returns:

Name Type Description
resids ndarray

1D numpy array of size (M, ) containing residuals.

Source code in ihd/utils.py
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
@numba.njit
def compute_residuals(
    X: np.ndarray,
    y: np.ndarray,
) -> np.ndarray:
    """Use ordinary least-squares (OLS) to fit a linear model
    predicting `y` as a function of `X`. Then, compute the residuals
    between the predicted y-values and the true y-values.

    Args:
        X (np.ndarray): 2D numpy array of size (M, N), where M is the number of \
            observations and N is the number of independent variables.

        y (np.ndarray): 1D numpy array of size (M, ), containing the dependent variables \
            for all M observations.

    Returns:
        resids (np.ndarray): 1D numpy array of size (M, ) containing residuals.
    """
    ones = np.ones(X.shape[0]).reshape(-1, 1)
    X = np.hstack((X, ones))
    coefs = np.linalg.lstsq(X, y)[0]
    y_ = np.sum(X[:, :-1] * coefs[:-1], axis=1) + coefs[-1]
    resids = y - y_
    return resids

compute_spectra(mutations, k=1, cpg=True)

Compute the mutation spectrum of every sample in the input pd.DataFrame of mutation data. The input dataframe should either contain a single entry for every mutation observed in every sample, or the aggregate count of each mutation type observed in each sample. The input dataframe must have at least three columns: 'Strain' (denoting the sample), 'kmer' (denoting the 3-mer context of the mutation -- e.g, CCT>CAT), and 'count' (denoting the number of times a mutation of type 'kmer' was observed in 'sample'). If the dataframe contains a single entry for every mutation observed in every sample, then the 'count' column should contain a value of 1 in every row.

Parameters:

Name Type Description Default
mutations DataFrame

Pandas dataframe containing information about the mutations observed in each sample. The dataframe must have at least three columns: 'sample' (denoting the sample), 'kmer' (denoting the 3-mer context of the mutation), and 'count' (denoting the number of times a mutation of type 'kmer' was observed in 'sample').

required
k int

k-mer size of mutations (e.g., k=3 means that we will treat mutations as NXN NYN, where Ns represent the nucleotide contexts on either side of the mutation, whereas k=1 means we will treat them as X Y). Defaults to 1.

1
cpg bool

whether to separate CpG>TpG mutations into their own mutation type, distinct from C>T. Defaults to True.

True

Returns:

Name Type Description
samples List[str]

A list of samples in the dataframe (which are indexed in the same order as the rows of the 2D spectra array).

mutation_types List[str]

A list of the unique mutation types observed in the dataframe.

spectra ndarray

A 2D numpy array of mutation spectra in all genotyped samples, of size (N, M) where N is the number of samples and M is the number of mutation types.

Source code in ihd/utils.py
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
def compute_spectra(
    mutations: pd.DataFrame,
    k: int = 1,
    cpg: bool = True,
):
    """Compute the mutation spectrum of every sample in 
    the input pd.DataFrame of mutation data. The input
    dataframe should either contain a single entry for every mutation observed
    in every sample, or the aggregate count of each mutation type
    observed in each sample. The input dataframe must have at least three columns:
    'Strain' (denoting the sample), 'kmer' (denoting the 3-mer context of the 
    mutation -- e.g, CCT>CAT), and 'count' (denoting the number of times
    a mutation of type 'kmer' was observed in 'sample'). If the dataframe contains
    a single entry for every mutation observed in every sample, then the 'count'
    column should contain a value of 1 in every row.

    Args:
        mutations (pd.DataFrame): Pandas dataframe containing information \
            about the mutations observed in each sample. The dataframe must have \
            at least three columns: 'sample' (denoting the sample), 'kmer' (denoting \
            the 3-mer context of the mutation), and 'count' (denoting the number \
            of times a mutation of type 'kmer' was observed in 'sample').

        k (int, optional): k-mer size of mutations (e.g., k=3 means that we \
            will treat mutations as NXN NYN, where Ns represent the nucleotide \
            contexts on either side of the mutation, whereas k=1 means we will \
            treat them as X Y). Defaults to 1.

        cpg (bool, optional): whether to separate CpG>TpG mutations into their \
            own mutation type, distinct from C>T. Defaults to True.

    Returns:
        samples (List[str]): A list of samples in the dataframe (which are \
            indexed in the same order as the rows of the 2D `spectra` array).
        mutation_types (List[str]): A list of the unique mutation types \
            observed in the dataframe.
        spectra (np.ndarray): A 2D numpy array of mutation spectra in \
            all genotyped samples, of size (N, M) where N is the number \
            of samples and M is the number of mutation types.
    """

    # compute 3-mer spectra
    hap_spectra_agg = (
        mutations.groupby(["sample", "kmer"]).agg({"count": sum}).reset_index()
    )
    # if 1-mer spectra are desired, compute that
    if k == 1:
        # add base mutation type
        hap_spectra_agg["base_mut"] = hap_spectra_agg["kmer"].apply(
            lambda k: find_central_mut(k, cpg=cpg)
        )
        hap_spectra_agg = (
            hap_spectra_agg.groupby(["sample", "base_mut"])
            .agg({"count": sum})
            .reset_index()
        )
    # get spectra as per-haplotype vectors of mutation counts
    mut_col = "base_mut" if k == 1 else "kmer"
    spectra = (
        hap_spectra_agg.pivot(index="sample", columns=mut_col)
        .reset_index()
        .fillna(value=0)
    )
    samples, mutations, spectra = (
        spectra["sample"].to_list(),
        [el[1] for el in spectra.columns[1:]],
        spectra.values[:, 1:],
    )

    return samples, mutations, spectra.astype(np.float64)

get_covariate_matrix(pheno, samples, covariate_cols=['n_generations'])

Generate a matrix of covariate values for each sample. Returns a 2D numpy array of size (C, N), where C is the number of covariates specified in the covariate_cols argument, and N is the number of samples.

Parameters:

Name Type Description Default
pheno DataFrame

Pandas dataframe containing metadata about each sample. This dataframe should have a "sample" column and should contain each of the columns specified in covariate_cols.

required
samples List[str]

List of samples to subset the dataframe.

required
covariate_cols List[str]

List of column names in pheno that should be used as covariates. Defaults to ["n_generations"].

['n_generations']

Returns:

Type Description
ndarray

np.ndarray: A 2D numpy array of size (C, N), where C is the number of covariates and N is the number of samples.

Source code in ihd/utils.py
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
def get_covariate_matrix(
    pheno: pd.DataFrame,
    samples: List[str],
    covariate_cols: List[str] = ["n_generations"],
) -> np.ndarray:
    """Generate a matrix of covariate values for each sample. Returns
    a 2D numpy array of size (C, N), where C is the number of covariates
    specified in the `covariate_cols` argument, and N is the number of samples.

    Args:
        pheno (pd.DataFrame): Pandas dataframe containing metadata about each sample. \
            This dataframe should have a "sample" column and should contain each of the columns \
            specified in `covariate_cols`.

        samples (List[str]): List of samples to subset the dataframe.

        covariate_cols (List[str], optional): List of column names in `pheno` that should \
            be used as covariates. Defaults to ["n_generations"].

    Returns:
        np.ndarray: A 2D numpy array of size (C, N), where C is the number of covariates \
            and N is the number of samples.
    """

    cols = ["sample"] + covariate_cols
    # subset pheno information to relevant samples
    pheno_sub = (
        pheno[pheno["sample"].isin(samples)].drop_duplicates(cols).set_index("sample")
    )
    covariate_matrix = pheno_sub.loc[samples][covariate_cols].values.T
    return covariate_matrix

perform_ihd_scan(spectra, genotype_matrix, genotype_similarity, covariate_ratios, distance_method=compute_manual_chisquare, adjust_statistics=True)

Iterate over every genotyped marker in the genotype_matrix, divide the haplotypes into two groups based on sample genotypes at the marker, and compute the distance between the aggregate mutation spectra of each group. Return a list of cosine distances of length (G, ), where G is the number of genotyped sites.

Parameters:

Name Type Description Default
spectra ndarray

A 2D numpy array of mutation spectra in all genotyped samples, of size (N, M) where N is the number of samples and M is the number of mutation types.

required
genotype_matrix ndarray

A 2D numpy array of genotypes at every genotyped marker, of size (G, N), where G is the number of genotyped sites and N is the number of samples.

required
genotype_similarity ndarray

A 1D numpy array of size (G, ), where G is the number of genotyped sites. Contains the correlation coefficient between genome-wide allele frequencies of haplotypes with A vs. B genotypes at every site.

required
covariate_ratios ndarray

A 1D numpy array of size (G, ), where G is the number of genotyped sites. Contains the ratio of covariate values between haplotypes with A vs. B genotypes at every site.

required
distance_method Callable

Callable method to compute the distance between aggregate mutation spectra. Must accept two 1D numpy arrays and return a single floating point value. Defaults to compute_manual_chisquare.

compute_manual_chisquare
adjust_statistics bool

Whether to compute adjusted statistics at each marker by regressing genotype similarity against statistics. Defaults to True.

True

Returns:

Name Type Description
distances List[float64]

List of inter-haplotype distances at every marker.

Source code in ihd/utils.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
@numba.njit
def perform_ihd_scan(
    spectra: np.ndarray,
    genotype_matrix: np.ndarray,
    genotype_similarity: np.ndarray,
    covariate_ratios: np.ndarray,
    distance_method: Callable = compute_manual_chisquare,
    adjust_statistics: bool = True,
) -> np.ndarray:
    """Iterate over every genotyped marker in the `genotype_matrix`, 
    divide the haplotypes into two groups based on sample genotypes at the 
    marker, and compute the distance between the aggregate mutation spectra
    of each group. Return a list of cosine distances of length (G, ), where
    G is the number of genotyped sites.

    Args:
        spectra (np.ndarray): A 2D numpy array of mutation spectra in \
            all genotyped samples, of size (N, M) where N is the number of \
            samples and M is the number of mutation types.

        genotype_matrix (np.ndarray): A 2D numpy array of genotypes at \
            every genotyped marker, of size (G, N), where G is the number \
            of genotyped sites and N is the number of samples.

        genotype_similarity (np.ndarray): A 1D numpy array of size (G, ), where \
            G is the number of genotyped sites. Contains the correlation coefficient \
            between genome-wide allele frequencies of haplotypes with A vs. B genotypes \
            at every site.

        covariate_ratios (np.ndarray): A 1D numpy array of size (G, ), where \
            G is the number of genotyped sites. Contains the ratio of covariate \
            values between haplotypes with A vs. B genotypes at every site.

        distance_method (Callable, optional): Callable method to compute the \
            distance between aggregate mutation spectra. Must accept two 1D numpy \
            arrays and return a single floating point value. Defaults to \
            `compute_manual_chisquare`.

        adjust_statistics (bool, optional): Whether to compute adjusted statistics \
            at each marker by regressing genotype similarity against statistics. \
            Defaults to True.

    Returns:
        distances (List[np.float64]): List of inter-haplotype \
        distances at every marker.
    """

    # store distances at each marker
    focal_dist: np.ndarray = np.zeros(genotype_matrix.shape[0], dtype=np.float64)
    # loop over every site in the genotype matrix
    for ni in np.arange(genotype_matrix.shape[0]):
        a_hap_idxs = np.where(genotype_matrix[ni] == 0)[0]
        b_hap_idxs = np.where(genotype_matrix[ni] == 2)[0]

        a_spectra, b_spectra = (
            spectra[a_hap_idxs],
            spectra[b_hap_idxs],
        )

        cur_dist = compute_haplotype_distance(
            a_spectra,
            b_spectra,
            distance_method=distance_method,
        )
        focal_dist[ni] = cur_dist

    if adjust_statistics:
        covariate_matrix_full = np.hstack(
            (
                genotype_similarity.reshape(-1, 1),
                covariate_ratios,
            )
        )
        return compute_residuals(covariate_matrix_full, focal_dist)
    else:
        return focal_dist

perform_permutation_test(spectra, genotype_matrix, genotype_similarity, covariate_ratios, strata, distance_method=compute_manual_chisquare, n_permutations=1000, progress=False, adjust_statistics=True)

Conduct a permutation test to assess the significance of any observed IHD peaks. In each of the n_permutations trials, do the following: 1. create a shuffled version of the input mutation spectra, so that sample names/indices no longer correspond to the appropriate mutation spectrum. 2. run an IHD scan by computing the distance between the aggregate mutation spectrum of samples with either genotype at every marker in the genotype_matrix. 3. store the maximum distance encountered at any marker. Then, return a list of the maximum distances encountered in each of the trials. Alternatively, if comparison_wide is True, return a matrix of size (P, G), where P is the number of permutations and G is the number of genotyped markers, in which we store the distance value encountered at every marker in every permutation trial.

Parameters:

Name Type Description Default
spectra ndarray

A 2D numpy array of mutation spectra in all genotyped samples, of size (N, M) where N is the number of samples and M is the number of mutation types.

required
genotype_matrix ndarray

A 2D numpy array of genotypes at every genotyped marker, of size (G, N), where G is the number of genotyped sites and N is the number of samples.

required
genotype_similarity ndarray

A 1D numpy array of correlation coefficients of size (G, ), where G is the number of genotyped sites. At each element of the array, we store the correlation coefficient between genome-wide D allele frequencies calculated in samples with either allele at the corresponding site G_i.

required
covariate_ratios ndarray

A 1D numpy array of size (G, ), where G is the number of genotyped sites. Contains the ratio of covariate values between haplotypes with A vs. B genotypes at every site.

required
strata ndarray

A 1D numpy array of "group labels" of size (N, ), where N is the number of samples. If samples are assigned different group labels, their spectra will be permuted within those groups during the permutation testing step.

required
distance_method Callable

Callable method to compute the distance between aggregate mutation spectra. Must accept two 1D numpy arrays and return a single floating point value. Defaults to compute_manual_chisquare.

compute_manual_chisquare
n_permutations int

Number of permutations to perform (i.e., number of times to shuffle the spectra and compute IHDs at each marker). Defaults to 1_000.

1000
progress bool

Whether to output a count of how many permutations have completed. Defaults to False.

False
adjust_statistics bool

Whether to compute adjusted statistics at each marker by regressing genotype similarity against statistics. Defaults to True.

True

Returns:

Name Type Description
null_distances ndarray

2D numpy array of size (P, G) where P is the number of permutations and G is either 1 (if we're computing a genome-wide distance threshold) or the number of genotyped markers (if we're computing thresholds at each individual marker).

Source code in ihd/utils.py
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
@numba.njit(parallel=True)
def perform_permutation_test(
    spectra: np.ndarray,
    genotype_matrix: np.ndarray,
    genotype_similarity: np.ndarray,
    covariate_ratios: np.ndarray,
    strata: np.ndarray,
    distance_method: Callable = compute_manual_chisquare,
    n_permutations: int = 1_000,
    progress: bool = False,
    adjust_statistics: bool = True,
) -> np.ndarray:
    """Conduct a permutation test to assess the significance of 
    any observed IHD peaks. In each of the `n_permutations` trials, 
    do the following: 1. create a shuffled version of the input mutation `spectra`, so that
    sample names/indices no longer correspond to the appropriate mutation
    spectrum. 2. run an IHD scan by computing the distance between
    the aggregate mutation spectrum of samples with either genotype
    at every marker in the `genotype_matrix`. 3. store the maximum distance encountered at any marker.
    Then, return a list of the maximum distances encountered in each of
    the trials. Alternatively, if `comparison_wide` is True, return a matrix of
    size (P, G), where P is the number of permutations and G is the number of
    genotyped markers, in which we store the distance value encountered at
    every marker in every permutation trial.

    Args:
        spectra (np.ndarray): A 2D numpy array of mutation spectra in all \
            genotyped samples, of size (N, M) where N is the number of samples \
            and M is the number of mutation types.

        genotype_matrix (np.ndarray): A 2D numpy array of genotypes at every \
            genotyped marker, of size (G, N), where G is the number of genotyped \
            sites and N is the number of samples.

        genotype_similarity (np.ndarray): A 1D numpy array of correlation coefficients \
            of size (G, ), where G is the number of genotyped sites. At each element of \
            the array, we store the correlation coefficient between genome-wide D allele \
            frequencies calculated in samples with either allele at the corresponding site G_i.

        covariate_ratios (np.ndarray): A 1D numpy array of size (G, ), where \
            G is the number of genotyped sites. Contains the ratio of covariate \
            values between haplotypes with A vs. B genotypes at every site.

        strata (np.ndarray): A 1D numpy array of "group labels" of size (N, ), where \
            N is the number of samples. If samples are assigned different group labels, their \
            spectra will be permuted *within* those groups during the permutation testing step.

        distance_method (Callable, optional): Callable method to compute the \
            distance between aggregate mutation spectra. Must accept two 1D numpy \
            arrays and return a single floating point value. Defaults to \
            `compute_manual_chisquare`.

        n_permutations (int, optional): Number of permutations to perform \
            (i.e., number of times to shuffle the spectra and compute IHDs at \
            each marker). Defaults to 1_000.

        progress (bool, optional): Whether to output a count of how many permutations \
            have completed. Defaults to False.

        adjust_statistics (bool, optional): Whether to compute adjusted statistics \
            at each marker by regressing genotype similarity against statistics. \
            Defaults to True.


    Returns:
        null_distances (np.ndarray): 2D numpy array of size (P, G) \
            where P is the number of permutations and G is either 1 \
            (if we're computing a genome-wide distance threshold) or \
            the number of genotyped markers (if we're computing thresholds \
            at each individual marker).
    """

    # store max distance encountered in each permutation
    null_distances: np.ndarray = np.zeros(n_permutations)

    for pi in numba.prange(n_permutations):
        if pi > 0 and pi % 100 == 0 and progress:
            print(pi)
        # shuffle the mutation spectra by row
        shuffled_spectra = shuffle_spectra(spectra, strata)

        # perform the IHD scan
        perm_distances = perform_ihd_scan(
            shuffled_spectra,
            genotype_matrix,
            genotype_similarity,
            covariate_ratios,
            distance_method=distance_method,
            adjust_statistics=adjust_statistics,
        )

        null_distances[pi] = np.max(perm_distances)

    return null_distances

shuffle_spectra(spectra, groups)

Randomly shuffle the rows of a 2D numpy array of mutation spectrum data of size (N, M), where N is the number of samples and M is the number of mutation types. Shuffled array is returned such that sample mutation spectra no longer correspond to the appropriate sample indices into the rows of the array. Samples are also shuffled within the specified groups. In other words, we only shuffle the indices of samples that correspond to a unique value in the groups array.

Parameters:

Name Type Description Default
spectra ndarray

2D numpy array of mutation spectrum data of shape (N, M), where N is the number of samples and M is the number of mutation types.

required
groups ndarray

1D numpy array of group labels (e.g., epochs) that each sample belongs to, of shape (N, ) where N is the number of samples. Mutation spectra will only be shuffled within their assigned group.

required

Returns:

Name Type Description
shuffled_spectra ndarray

The input array, but with shuffled rows.

Source code in ihd/utils.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
@numba.njit
def shuffle_spectra(
    spectra: np.ndarray,
    groups: np.ndarray,
) -> np.ndarray:
    """Randomly shuffle the rows of a 2D numpy array of 
    mutation spectrum data of size (N, M), where N is the number
    of samples and M is the number of mutation types. Shuffled array
    is returned such that sample mutation spectra no longer correspond to the
    appropriate sample indices into the rows of the array. Samples are also
    shuffled within the specified `groups`. In other words, we only shuffle
    the indices of samples that correspond to a unique value in the `groups` 
    array.

    Args:
        spectra (np.ndarray): 2D numpy array of mutation spectrum data of \
        shape (N, M), where N is the number of samples and M is the number \
        of mutation types.

        groups (np.ndarray): 1D numpy array of group labels (e.g., epochs) \
            that each sample belongs to, of shape (N, ) where N is the number \
            of samples. Mutation spectra will only be shuffled *within* \
            their assigned group.

    Returns:
        shuffled_spectra (np.ndarray): The input array, but with shuffled rows.
    """
    shuffled_spectra = np.zeros(spectra.shape)

    uniq_groups = np.unique(groups)
    for g in uniq_groups:
        # get corresponding indices of samples in the current group
        g_i_true = np.where(groups == g)[0]
        g_i_shuffled = np.where(groups == g)[0]
        # shuffle just the group indices so that sample labels
        # in that group no longer correspond to the correct
        # mutation spectra arrays
        np.random.shuffle(g_i_shuffled)
        shuffled_spectra[g_i_true, :] = spectra[g_i_shuffled, :]
    return shuffled_spectra