原文:Bioinformatics with Python Cookbook

协议:CC BY-NC-SA 4.0

十、用于生物信息学的机器学习

机器学习在各种各样的环境中使用,计算生物学也不例外。机器学习在该领域有无数的应用,可能最古老和最著名的是使用主成分分析 ( PCA )利用基因组学研究人口结构。由于这是一个新兴领域,因此还有许多其他潜在的应用。在这一章中,我们将从生物信息学的角度介绍机器学习的概念。

鉴于机器学习是一个非常复杂的主题,可以很容易地写满一本书,这里我们打算采取一种直观的方法,让你大致了解一些机器学习技术如何有助于解决生物学问题。如果您发现这些技术有用,您将理解基本概念,并可以继续阅读更详细的文献。

如果你正在使用 Docker,并且因为本章中的所有库都是数据分析的基础,它们都可以在 Docker 镜像tiagoantao/bioinformatics_ml中找到。

在本章中,我们将介绍以下配方:

  • 介绍 sci kit-通过 PCA 示例学习
  • 使用 PCA 上的聚类对样本进行分类
  • 利用决策树探索乳腺癌特征
  • 使用随机森林预测乳腺癌结果

介绍 sci kit-通过 PCA 示例学习

PCA 是一种统计程序,用于将多个变量的维度降低到一个更小的线性不相关的子集。在 第六章 中,我们看到了一个基于使用外部应用的 PCA 实现。在这个菜谱中,我们将为群体遗传学实现相同的 PCA,但将使用scikit-learn库。Scikit-learn 是用于机器学习的基本 Python 库之一,本菜谱是对该库的介绍。PCA 是一种无监督的机器学习形式——我们不提供关于样本类别的信息。我们将在本章的其他配方中讨论监督技术。

提醒一下,我们将从 HapMap 项目中计算 11 个人类群体的 PCA。

准备就绪

您将需要运行第六章 中 的第一个配方,以便生成hapmap10_auto_noofs_ld_12 PLINK 文件(等位基因记录为 1 和 2)。从群体遗传学的角度来看,我们需要 LD-pruned 标记来产生可靠的 PCA。我们不会冒险在这里使用后代,因为它可能会使结果有偏差。我们的菜谱将需要pygenomics库,可以使用以下命令安装它:

pip install pygenomics

代码在Chapter10/PCA.py笔记本里。

怎么做…

看看下面的步骤:

  1. 我们从加载样本的元数据开始。在我们的例子中,我们将加载每个样本所属的人群:

    import os
    from sklearn.decomposition import PCA
    import numpy as np
    from genomics.popgen.pca import plot
    f = open('../Chapter06/relationships_w_pops_041510.txt')
    ind_pop = {}
    f.readline()  # header
    for l in f:
        toks = l.rstrip().split('\t')
        fam_id = toks[0]
        ind_id = toks[1]
        pop = toks[-1]
        ind_pop['/'.join([fam_id, ind_id])] = pop
    f.close()
    
  2. 我们现在得到个体的顺序以及我们将要处理的 SNP 的数量:

    f = open('../Chapter06/hapmap10_auto_noofs_ld_12.ped')
    ninds = 0
    ind_order = []
    for line in f:
        ninds += 1
        toks = line[:100].replace(' ', '\t').split('\t')
        fam_id = toks[0]
        ind_id = toks[1]
        ind_order.append('%s/%s' % (fam_id, ind_id))
    nsnps = (len(line.replace(' ', '\t').split('\t')) - 6) // 2
    f.close()
    
  3. 我们创建了一个数组,它将被送到 PCA:

    pca_array = np.empty((ninds, nsnps), dtype=int)
    print(pca_array.shape)
    f = open('../Chapter06/hapmap10_auto_noofs_ld_12.ped')
    for ind, line in enumerate(f):
        snps = line.replace(' ', '\t').split('\t')[6:]
        for pos in range(len(snps) // 2):
            a1 = int(snps[2 * pos])
            a2 = int(snps[2 * pos])
            my_code = a1 + a2 - 2
            pca_array[ind, pos] = my_code
    f.close()
    
  4. 最后,我们用多达八个分量计算 PCA 。然后,我们使用transform方法获得所有样本的 8-D 坐标。

    my_pca = PCA(n_components=8)
    my_pca.fit(pca_array)
    trans = my_pca.transform(pca_array)
    
  5. 最后,我们绘制 PCA:

    sc_ind_comp = {}
    for i, ind_pca in enumerate(trans):
        sc_ind_comp[ind_order[i]] = ind_pca
    plot.render_pca_eight(sc_ind_comp, cluster=ind_pop)
    

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 10.1-scikit-learn 生成的数据集的 PC1 到 PC8

还有更多…

对于在科学期刊上发表的,我会推荐使用第六章中的配方,仅仅因为它是基于一个已发表的且备受推崇的方法。也就是说,这段代码的结果在性质上是相似的,并且以非常相似的方式对数据进行聚类(如果与第六章图进行比较,垂直轴上方向的反转与解释 PCA 图无关)。

利用 PCA 上的聚类对样本进行分类

基因组学中的主成分分析让我们看到样本是如何聚集的。在许多情况下,来自同一人群的个体会出现在图表的同一区域。但是我们想更进一步,预测新个体在群体中的位置。为了做到这一点,我们将从 PCA 数据开始,因为它可以降维——使数据处理更容易——然后应用 K-Means 聚类算法来预测新样本的位置。我们将使用与上面食谱中相同的数据集。我们将使用除一个样本之外的所有样本来训练算法,然后我们将预测剩余样本的位置。

k 均值聚类可以是监督算法的一个例子。在这些类型的算法中,我们需要一个训练数据集,以便算法能够学习。在训练算法之后,它将能够为新的样本预测某个结果。在我们的例子中,我们希望能够预测人口数量。

警告

当前的食谱旨在温和地介绍监督算法及其背后的概念。我们训练算法的方式远非最佳。正确训练监督算法的问题将在本章的最后一个配方中提到。

准备就绪

我们将使用与之前配方中相同的数据。该配方的代码可在Chapter10/Clustering.py中找到。

怎么做…

让我们来看看:

  1. 我们首先加载人口信息——这类似于我们在前面的配方中所做的:

    import os
    import matplotlib.pyplot as plt
    from sklearn.cluster import KMeans
    from sklearn.decomposition import PCA
    import numpy as np
    from genomics.popgen.pca import plot
    f = open('../Chapter06/relationships_w_pops_041510.txt')
    ind_pop = {}
    f.readline()  # header
    for l in f:
        toks = l.rstrip().split('\t')
        fam_id = toks[0]
        ind_id = toks[1]
        pop = toks[-1]
        ind_pop['/'.join([fam_id, ind_id])] = pop
    f.close()
    
    f = open('../Chapter06/hapmap10_auto_noofs_ld_12.ped')
    ninds = 0
    ind_order = []
    for line in f:
        ninds += 1
        toks = line[:100].replace(' ', '\t').split('\t') #  for speed
        fam_id = toks[0]
        ind_id = toks[1]
        ind_order.append('%s/%s' % (fam_id, ind_id))
    nsnps = (len(line.replace(' ', '\t').split('\t')) - 6) // 2
    print (nsnps)
    f.close()
    
  2. 我们现在将所有样本数据——SNP——加载到一个 NumPy 数组中:

    all_array = np.empty((ninds, nsnps), dtype=int)
    f = open('../Chapter06/hapmap10_auto_noofs_ld_12.ped')
    for ind, line in enumerate(f):
        snps = line.replace(' ', '\t').split('\t')[6:]
        for pos in range(len(snps) // 2):
            a1 = int(snps[2 * pos])
            a2 = int(snps[2 * pos])
            my_code = a1 + a2 - 2
            all_array[ind, pos] = my_code
    f.close()
    
  3. 我们将数组分成两个数据集,即除了一个个体之外的所有个体的训练案例,以及用单个个体进行测试的案例:

    predict_case = all_array[-1, :]
    pca_array = all_array[:-1,:]
    
    last_ind = ind_order[-1]
    last_ind, ind_pop[last_ind]
    

我们的测试案例是个体 Y076/NA19124,我们知道他属于约鲁巴人。

  1. 我们现在计算将用于 K 均值聚类的训练集的 PCA:

    my_pca = PCA(n_components=2)
    my_pca.fit(pca_array)
    trans = my_pca.transform(pca_array)
    
    sc_ind_comp = {}
    for i, ind_pca in enumerate(trans):
        sc_ind_comp[ind_order[i]] = ind_pca
    plot.render_pca(sc_ind_comp, cluster=ind_pop)
    

下面是输出,这将有助于检查聚类结果:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 10.2 - PC1 和 PC2 带有颜色编码的人口

  1. 在我们开始计算 K-means 聚类之前,让我们编写一个函数,通过运行算法

    def plot_kmeans_pca(trans, kmeans):
        x_min, x_max = trans[:, 0].min() - 1, trans[:, 0].max() + 1
        y_min, y_max = trans[:, 1].min() - 1, trans[:, 1].max() + 1
        mesh_x, mesh_y = np.meshgrid(np.arange(x_min, x_max, 0.5), np.arange(y_min, y_max, 0.5))
    
        k_surface = kmeans.predict(np.c_[mesh_x.ravel(), mesh_y.ravel()]).reshape(mesh_x.shape)
        fig, ax = plt.subplots(1,1, dpi=300)
        ax.imshow(
            k_surface, origin="lower", cmap=plt.cm.Pastel1,
            extent=(mesh_x.min(), mesh_x.max(), mesh_y.min(), mesh_y.max()),
        )
        ax.plot(trans[:, 0], trans[:, 1], "k.", markersize=2)
        ax.set_title("KMeans clustering of PCA data")
        ax.set_xlim(x_min, x_max)
        ax.set_ylim(y_min, y_max)
        ax.set_xticks(())
        ax.set_yticks(())
        return ax
    

    来绘制聚类表面

  2. 现在让我们用样本来拟合算法。因为我们有 11 个群体,所以我们将为 11 个集群进行训练:

    kmeans11 = KMeans(n_clusters=11).fit(trans)
    plot_kmeans_pca(trans, kmeans11)
    

这里是输出:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 10.3-11 个集群的集群表面

如果您与这里的图进行比较,您可以直观地看到聚类没有什么意义:它没有很好地映射到已知的人口。有人可能会认为这种具有 11 个聚类的聚类算法不是很有用。

小费

scikit-learn 中实现了许多其他聚类算法,在一些场景中,它们可能比 K-means 执行得更好。你可以在 https://scikit-learn.org/stable/modules/clustering.xhtml 找到它们。值得怀疑的是,在这个特定的情况下,对于 11 个集群来说,任何替代方案都不会有更好的表现。

  1. 虽然看起来 K-means 聚类不能解析 11 个群体,但是如果我们使用不同数量的聚类,它仍然可以提供一些预测。简单地看一下图表,我们可以看到四个独立的区块。如果我们使用四个集群会有什么结果?

    kmeans4 = KMeans(n_clusters=4).fit(trans)
    plot_kmeans_pca(trans, kmeans4)
    

以下是输出:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 10.4 -四个集群的集群表面

这四个团体现在基本上是清楚的。但是它们有直观意义吗?如果是这样,我们可以利用这种聚类方法。事实上,他们有。左边的聚类由非洲人组成,最上面的聚类是欧洲人,最下面的是东亚人。中间的一个是最神秘的,因为它包含古吉拉特人和墨西哥人的后裔,但这种混合最初来自 PCA,而不是由聚类本身引起的。

  1. 让我们看看预测在我们忽略的单个情况下表现如何:

    pca_predict = my_pca.transform([predict_case])
    kmeans4.predict(pca_predict)
    

我们的样本预计在第 1 类。我们现在需要更深入一点。

  1. 让我们找出集群 1 是什么意思。我们从训练集中取出最后一个人,他也是一个约鲁巴人,然后看看他被分配到哪个集群:

    last_train = ind_order[-2]
    last_train, ind_pop[last_train]
    kmeans4.predict(trans)[0]
    

它确实是簇 1,所以预测是正确的。

还有更多…

值得重申的是,我们正在努力实现对机器学习的直观理解。在这个阶段,你应该已经掌握了你能从监督学习中获得什么,以及聚类算法的示例用法。关于训练机器学习算法的程序还有很多要说的,我们将在最后一个食谱中部分揭示。

使用决策树探索乳腺癌特征

当我们收到一个数据集时,我们面临的第一个问题是决定开始分析什么。刚开始时,常常会有一种不知该先做什么的失落感。这里,我们将介绍一种基于决策树的探索性方法。决策树的最大优势在于,它们将为我们提供构建决策树的规则,让我们初步了解数据的情况。

在这个例子中,我们将使用来自乳腺癌患者的特征观察数据集。具有 699 个数据条目的数据集包括诸如凝块厚度、细胞大小的均匀性或染色质类型的信息。结果是良性或恶性肿瘤。这些特征用从 0 到 10 的值进行编码。有关该项目的更多信息,请访问 http://archive . ics . UCI . edu/ml/datasets/breast+cancer+Wisconsin+% 28 diagnostic % 29。

准备就绪

我们将下载数据和文档:

wget http://archive.ics.uci.edu/ml/machine-learning-databases/breast-cancer-wisconsin/breast-cancer-wisconsin.data
wget http://archive.ics.uci.edu/ml/machine-learning-databases/breast-cancer-wisconsin/breast-cancer-wisconsin.names

数据文件被格式化为 CSV 文件。关于内容的信息可以在第二个下载的文件中找到。

该配方的代码可在Chapter10/Decision_Tree.py中找到。

怎么做…

请遵循以下步骤:

  1. 我们做的第一件事是删除一小部分数据不完整的个体:

    import numpy as np
    import matplotlib.pyplot as plt
    import pandas as pd
    from sklearn import tree
    f = open('breast-cancer-wisconsin.data')
    w = open('clean.data', 'w')
    for line in f:
        if line.find('?') > -1:
            continue
        w.write(line)
    f.close()
    w.close()
    

小费

在这种情况下,删除数据不完整的个人就足够了,因为他们只是数据集的一小部分,我们只是在做探索性分析。对于大量缺失的情况,或者当我们试图做一些更严格的事情时,您将不得不使用一些方法来处理缺失的数据,我们在这里不做探讨。

  1. 我们现在要读取数据,给所有列命名:

    column_names = [
        'sample_id', 'clump_thickness', 'uniformity_cell_size',
        'uniformity_cell shape', 'marginal_adhesion',
        'single_epithelial_cell_size', 'bare_nuclei',
        'bland_chromatin', 'normal_nucleoli', 'mitoses',
        'class'
    ]
    samples = pd.read_csv('clean.data', header=None, names=column_names, index_col=0)
    
  2. 我们现在将从结果中分离特征,并使用 0 和 1 对结果进行重新编码:

    training_input = samples.iloc[:,:-1]
    target = samples.iloc[:,-1].apply(lambda x: 0 if x == 2 else 1)
    
  3. 现在让我们基于这个数据创建一个最大深度为 3 的决策树:

    clf = tree.DecisionTreeClassifier(max_depth=3)
    clf.fit(training_input, target)
    
  4. 让我们先来看看哪些特性是最重要的:

    importances = pd.Series(
        clf.feature_importances_ * 100,
        index=training_input.columns).sort_values(ascending=False)
    importances
    

以下是按重要性排序的特性:

uniformity_cell_size           83.972870
uniformity_cell shape           7.592903
bare_nuclei                     4.310045
clump_thickness                 4.124183
marginal_adhesion               0.000000
single_epithelial_cell_size     0.000000
bland_chromatin                 0.000000
normal_nucleoli                 0.000000
mitoses                         0.000000

记住这只是探索性的分析。在下一个食谱中,我们将努力产生更可靠的排名。底部特征为零的原因是我们要求最大深度为 3,在这种情况下,可能没有使用所有特征。

  1. 我们可以对我们实现的准确性做一些本地分析:

    100 * clf.score(training_input, target)
    

我们得到了 96%的性能。我们不应该用它自己的训练集来测试算法,因为这是相当循环的。我们将在下一个食谱中再次讨论这个问题。

  1. 最后,让我们绘制决策树:

    fig, ax = plt.subplots(1, dpi=300)
    tree.plot_tree(clf,ax=ax, feature_names=training_input.columns, class_names=['Benign', 'Malignant'])
    

此产生以下输出:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 10.5 -乳腺癌数据集的决策树

让我们从根节点开始:它有一个标准uniformity_cell_size < 2.5和一个良性分类。分割树的主要特征是单元大小的一致性。在顶部节点的良性分类简单地来自于数据集上的大多数样本是良性的这一事实。现在从根开始看右边的节点:它有 265 个样本,其中大部分是恶性的,标准为uniformity_cell_shape < 2.5

这些规则允许您对驱动数据集的因素有一个初步的理解。决策树不是很精确,所以把它们作为你的第一步。

使用随机森林预测乳腺癌结果

我们现在将使用随机森林来预测一些患者的结果。随机森林是一种集成方法(它将使用其他机器学习算法的几个实例),使用许多决策树来得出关于数据的可靠结论。我们将使用与上一个食谱相同的例子:乳腺癌的特征和结果。

这个食谱有两个主要目标:向你介绍随机森林和关于机器学习算法训练的问题。

准备就绪

该配方的代码可在Chapter10/Random_Forest.py中找到。

怎么做……

看一下代码:

  1. 和上一个配方一样,我们从剔除缺失信息的样本开始:

    import pandas as pd
    import numpy as np
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import train_test_split
    from sklearn.tree import export_graphviz
    f = open('breast-cancer-wisconsin.data')
    w = open('clean.data', 'w')
    for line in f:
        if line.find('?') > -1:
            continue
        w.write(line)
    f.close()
    w.close()
    
  2. 我们现在加载清理后的数据:

    column_names = [
        'sample_id', 'clump_thickness', 'uniformity_cell_size',
        'uniformity_cell shape', 'marginal_adhesion',
        'single_epithelial_cell_size', 'bare_nuclei',
        'bland_chromatin', 'normal_nucleoli', 'mitoses',
        'class'
    ]
    samples = pd.read_csv('clean.data', header=None, names=column_names, index_col=0)
    samples
    
    
  3. 我们在特征和结果中分离读取的数据:

    training_input = samples.iloc[:, :-1]
    target = samples.iloc[:, -1]
    
  4. 我们创建一个分类器,并使数据适合它:

    clf = RandomForestClassifier(max_depth=3, n_estimators=200)
    clf.fit(training_input, target)
    

这里最重要的参数是n_estimators:我们要求森林由 200 棵树构成。

  1. 我们现在将的特征按照重要性排序:

    importances = pd.Series(
        clf.feature_importances_ * 100,
        index=training_input.columns).sort_values(ascending=False)
    importances
    

以下是输出:

uniformity_cell_size           30.422515
uniformity_cell shape          21.522259
bare_nuclei                    18.410346
single_epithelial_cell_size    10.959655
bland_chromatin                 9.600714
clump_thickness                 3.619585
normal_nucleoli                 3.549669
marginal_adhesion               1.721133
mitoses                         0.194124

结果是不确定的,这意味着您可能会有不同的结果。此外,请注意,随机森林与前一个配方中的决策树有很大不同。这是预料之中的,因为决策树是一个单一的估计器,其中森林重 200 棵树,并且更可靠。

  1. 我们可以给这个案例打分:

    clf.score(training_input, target)
    

我得到 97.95%的成绩。由于算法是随机的,您可能会得到稍微不同的值。正如我们在前面的食谱中所说的,从训练集中获得分数是非常循环的,远远不是最佳实践。

  1. 为了让更真实地了解算法的准确性,我们需要将数据分成两部分——训练集和测试集:

    for test_size in [0.01, 0.1, 0.2, 0.5, 0.8, 0.9, 0.99]:
        X_train, X_test, y_train, y_test = train_test_split(
            trainning_input, target, test_size=test_size)
        tclf = RandomForestClassifier(max_depth=3)
        tclf.fit(X_train, y_train)
        score = tclf.score(X_test, y_test)
        print(f'{1 - test_size:.1%} {score:.2%}')
    

输出如下(请记住,您将获得不同的值):

99.0% 71.43%
90.0% 94.20%
80.0% 97.81%
50.0% 97.66%
20.0% 96.89%
10.0% 94.80%
1.0% 92.02%

如果你只用 1%的数据训练,你只能得到 71%的准确率,而如果你用更多的数据训练,准确率会超过 90%。请注意,准确性不会随着训练集的大小单调增加。决定训练集的大小是一件复杂的事情,各种问题会导致意想不到的副作用。

还有更多…

我们只是触及了训练和测试机器学习算法的表面。例如,监督数据集通常分为 3 个,而不是 2 个(训练、测试和交叉验证)。为了训练你的算法和更多类型的算法,你需要考虑更多的问题。在这一章中,我们试图发展基本的直觉来理解机器学习,但如果你打算遵循这条路线,这只不过是你的起点。

十一、使用 Dask 和 Zarr 实现并行处理

生物信息学数据集正以指数速度增长。基于标准工具(如 Pandas)的数据分析策略假设数据集能够容纳在内存中(尽管为核外分析做了一些准备)或者单台机器能够有效地处理所有数据。不幸的是,这对于许多现代数据集来说是不现实的。

在本章中,我们将介绍两个能够处理非常大的数据集和昂贵计算的库:

  • Dask 是一个允许并行计算的库,可以从单台计算机扩展到非常大的云和集群环境。Dask 提供了类似于 Pandas 和 NumPy 的接口,同时允许您处理分布在许多计算机上的大型数据集。
  • Zarr 是一个存储压缩和分块多维数组的库。正如我们将看到的,这些阵列是为处理在大型计算机集群中处理的非常大的数据集而定制的,同时如果需要,仍然能够在单台计算机上处理数据。

我们的食谱将利用蚊子基因组学的数据介绍这些先进的文库。您应该将这段代码作为起点,引导您走上处理大型数据集的道路。大型数据集的并行处理是一个复杂的主题,这是您旅程的开始,而不是结束。

因为所有这些库都是数据分析的基础,如果您正在使用 Docker,它们都可以在tiagoantao/bioinformatics_dask Docker 映像中找到。

在本章中,我们将介绍以下配方:

  • 使用 Zarr 读取基因组数据
  • 使用 Python 多重处理并行处理数据
  • 使用 Dask 处理基于 NumPy 阵列的基因组数据
  • dask.distributed调度任务

使用 Zarr 读取基因组数据

zarr(zarr.readthedocs.io/en/stable/)将基于阵列的数据——如 NumPy——存储在磁盘和云存储的层次结构中。Zarr 用来表示数组的数据结构不仅非常紧凑,而且允许并行读写,这一点我们将在下一篇菜谱中看到。在这个食谱中,我们将从冈比亚按蚊 1000 基因组项目(malariagen.github.io/vector-data/ag3/download.xhtml)中读取并处理基因组学数据。这里,我们将简单地进行顺序处理,以简化对 Zarr 的介绍;在下面的食谱中,我们将进行并行处理。我们的项目将计算单个染色体上所有基因组位置的缺失率。

准备就绪

疟蚊 1000 个基因组数据可从谷歌云平台 ( GCP )获得。要从 GCP 下载数据,你需要gsutil,可从cloud.google.com/storage/docs/gsutil_install获得。安装完gsutil后,下载数据(~ 2GB(GB)),代码如下:

mkdir -p data/AG1000G-AO/
gsutil -m rsync -r \
         -x '.*/calldata/(AD|GQ|MQ)/.*' \
         gs://vo_agam_release/v3/snp_genotypes/all/AG1000G-AO/ \
         data/AG1000G-AO/ > /dev/null

我们从项目中下载样本的子集。下载数据后,处理数据的代码可以在Chapter11/Zarr_Intro.py中找到。

怎么做…

看一看以下步骤中的即可开始:

  1. 让我们从检查 Zarr 文件中可用的结构开始:

    import numpy as np
    import zarr 
    mosquito = zarr.open('data/AG1000G-AO')
    print(mosquito.tree())
    

我们首先打开 Zarr 文件(我们很快就会看到,这可能实际上不是一个文件)。之后,我们打印其中可用的数据树:

/
├── 2L
│   └── calldata
│       └── GT (48525747, 81, 2) int8
├── 2R
│   └── calldata
│       └── GT (60132453, 81, 2) int8
├── 3L
│   └── calldata
│       └── GT (40758473, 81, 2) int8
├── 3R
│   └── calldata
│       └── GT (52226568, 81, 2) int8
├── X
│   └── calldata
│       └── GT (23385349, 81, 2) int8
└── samples (81,) |S24

Zarr 文件有五个数组:四个对应于蚊子的染色体——2L2R3L3RX ( Y不包括在内)——其中一个有一个包含 81 个样本的列表。最后一个数组包含了样本名称—这个文件中有 81 个样本。染色体数据由 8 位整数(int8)组成,样本名称为字符串。

  1. 现在,让我们探索染色体2L的数据。先说一些基本信息:

    gt_2l = mosquito['/2L/calldata/GT']
    gt_2l
    

以下是输出:

<zarr.core.Array '/2L/calldata/GT' (48525747, 81, 2) int8>

对于81样本,我们有一系列4852547 单核苷酸多态性 ( SNPs )。对于每个 SNP 和样本,我们有2等位基因。

  1. 现在让我们来看看数据是如何存储的:

    gt_2l.info
    

输出如下所示:

Name               : /2L/calldata/GT
Type               : zarr.core.Array
Data type          : int8
Shape              : (48525747, 81, 2)
Chunk shape        : (300000, 50, 2)
Order              : C
Read-only          : False
Compressor         : Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0)
Store type         : zarr.storage.DirectoryStore
No. bytes          : 7861171014 (7.3G)
No. bytes stored   : 446881559 (426.2M)
Storage ratio      : 17.6
Chunks initialized : 324/324

这里有很多东西要解开,但是现在,我们将把集中在存储类型、存储的字节和存储比率上。Store type值是zarr.storage.DirectoryStore,所以数据不在单个文件中,而是在一个目录中。数据的原始大小是7.3 GB!但是 Zarr 使用的是压缩格式,将大小缩小到426.2 兆字节 ( MB )。这意味着压缩比为17.6

  1. 让我们看看数据是如何存储在目录中的。如果你列出AG1000G-AO目录的内容,你会发现如下结构:

    .
    ├── 2L
    │   └── calldata
    │       └── GT
    ├── 2R
    │   └── calldata
    │       └── GT
    ├── 3L
    │   └── calldata
    │       └── GT
    ├── 3R
    │   └── calldata
    │       └── GT
    ├── samples
    └── X
     └── calldata
     └── GT
    
  2. 如果你列出2L/calldata/GT的内容,你会发现大量的文件编码数组:

    0.0.0
    0.1.0
    1.0.0
    ...
    160.0.0
    160.1.0
    

2L/calldata/GT目录中有 324 个文件。记住,在上一步中,我们有一个名为Chunk shape的参数,其值为(300000, 50, 2)

Zarr 将数组分割成块,这些块比加载整个数组更容易在内存中处理。每个区块有 30000x50x2 个元素。假设我们有 48525747 个 SNP,我们需要 162 个组块来表示 SNP 的数量(48525747/300000 = 161.75),然后乘以 2 得到样本数(每个组块 81 个样本/50 = 1.62)。因此,我们最终得到 162*2 个块/文件。

小费

分块是一种广泛用于处理无法一次完全加载到内存中的数据的技术。这包括许多其他库,如 Pandas 或 Zarr。稍后我们将看到一个关于 Zarr 的例子。更重要的一点是,你应该知道分块的概念,因为它适用于许多需要大数据的情况。

  1. 在我们加载 Zarr 数据进行处理之前,让我们创建一个函数来计算一个块的一些基本基因组统计数据。我们将计算缺失、祖先纯合子的数量和杂合子的数量:

    def calc_stats(my_chunk):
        num_miss = np.sum(np.equal(my_chunk[:,:,0], -1), axis=1)
        num_anc_hom = np.sum(
            np.all([
                np.equal(my_chunk[:,:,0], 0),
                np.equal(my_chunk[:,:,0], my_chunk[:,:,1])], axis=0), axis=1)
        num_het = np.sum(
            np.not_equal(
                my_chunk[:,:,0],
                my_chunk[:,:,1]), axis=1)
        return num_miss, num_anc_hom, num_het
    

如果您查看前面的函数,您会注意到没有任何与 Zarr 相关的内容:它只是 NumPy 代码。Zarr 有一个非常轻量级的应用编程接口 ( API ),它公开了 NumPy 内部的大部分数据,如果你懂 NumPy 的话,使用起来相当容易。

  1. 最后,让我们遍历我们的数据——也就是说,遍历我们的块来计算我们的统计:

    complete_data = 0
    more_anc_hom = 0
    total_pos = 0
    for chunk_pos in range(ceil(max_pos / chunk_pos_size)):
        start_pos = chunk_pos * chunk_pos_size
        end_pos = min(max_pos + 1, (chunk_pos + 1) * chunk_pos_size)
        my_chunk = gt_2l[start_pos:end_pos, :, :]
        num_samples = my_chunk.shape[1]
        num_miss, num_anc_hom, num_het = calc_stats(my_chunk)
        chunk_complete_data = np.sum(np.equal(num_miss, 0))
        chunk_more_anc_hom = np.sum(num_anc_hom > num_het)
        complete_data += chunk_complete_data
        more_anc_hom += chunk_more_anc_hom
        total_pos += (end_pos - start_pos)
    print(complete_data, more_anc_hom, total_pos)
    

大部分代码负责块的管理,并涉及 ?? 算法来决定访问数组的哪一部分。就准备好的 Zarr 数据而言,重要的部分是my_chunk = gt_2l[start_pos:end_pos, :, :]行。当你切片一个 Zarr 数组时,它会自动返回一个 NumPy 数组。

小费

非常小心你带入内存的数据量。请记住,大多数 Zarr 数组将远远大于您可用的内存,因此,如果您试图加载它,您的应用程序甚至您的计算机都将崩溃。例如,如果您执行all_data = gt_2l[:, :, :],您将需要大约 8 GB 的空闲内存来加载它——正如我们前面看到的,数据大小为 7.3 GB。

还有更多…

Zarr 的特性比这里介绍的要多得多,虽然我们会在接下来的食谱中探索更多,但还是有一些你应该知道的可能性。例如,Zarr 是唯一允许并发数据写入的库之一。您还可以更改 Zarr 表示的内部格式。

正如我们在这里看到的,Zarr 能够以非常高效的方式压缩数据——这是通过使用 Blosc 库(www.blosc.org/)实现的。由于 Blosc 的灵活性,您可以更改 Zarr 数据的内部压缩算法。

参见

Zarr 还有的替代格式——比如分层数据格式 5(HD F5)(en.wikipedia.org/wiki/Hierarchical_Data_Format)和网络通用数据格式(NetCDF)(en.wikipedia.org/wiki/NetCDF)。虽然这些在生物信息学领域之外更常见,但它们的功能较少——例如,缺乏并发写入。

使用 Python 多重处理并行处理数据

当处理大量数据时,一种策略是并行处理,这样就可以利用所有可用的中央处理器 ( CPU )的能力,因为现代机器有许多内核。在理论上最好的情况下,如果您的机器有八个内核,如果您进行并行处理,您可以获得八倍的性能提升。

不幸的是,典型的 Python 代码只使用了一个内核。也就是说,Python 有内置功能来使用所有可用的 CPU 能力;事实上,Python 为此提供了几个途径。在这个菜谱中,我们将使用内置的multiprocessing模块。这里介绍的解决方案在单台计算机上运行良好,如果数据集适合内存,但是如果您想在集群或云中扩展它,您应该考虑 Dask,我们将在接下来的两个食谱中介绍它。

我们的目标将再次是计算一些关于缺失和杂合性的统计数据。

准备就绪

我们将使用与之前配方中相同的数据。该配方的代码可在Chapter11/MP_Intro.py中找到。

怎么做…

按照以下步骤开始:

  1. 我们将使用与上一个配方完全相同的函数来计算统计数据——这主要是基于数字的:

    import numpy as np
    import zarr
    def calc_stats(my_chunk):
        num_miss = np.sum(np.equal(my_chunk[:,:,0], -1), axis=1)
        num_anc_hom = np.sum(
            np.all([
                np.equal(my_chunk[:,:,0], 0),
                np.equal(my_chunk[:,:,0], my_chunk[:,:,1])], axis=0), axis=1)
        num_het = np.sum(
            np.not_equal(
                my_chunk[:,:,0],
                my_chunk[:,:,1]), axis=1)
        return num_miss, num_anc_hom, num_het
    
  2. 让我们访问我们的蚊子数据:

    mosquito = zarr.open('data/AG1000G-AO')
    gt_2l = mosquito['/2L/calldata/GT']
    
  3. 虽然我们使用相同的函数来计算统计数据,但是我们的方法对于整个数据集来说是不同的。首先,我们计算我们称之为calc_stats的所有区间。间隔将被设计成与变体的组块划分完全匹配:

    chunk_pos_size = gt_2l.chunks[0]
    max_pos = gt_2l.shape[0]
    intervals = []
    for chunk_pos in range(ceil(max_pos / chunk_pos_size)):
        start_pos = chunk_pos * chunk_pos_size
        end_pos = min(max_pos + 1, (chunk_pos + 1) * chunk_pos_size)
        intervals.append((start_pos, end_pos))
    

重要的是,我们的区间列表与磁盘上的分块相关。只要这个映射尽可能接近,计算将是高效的。

  1. 我们现在要分离代码来计算函数中的每个区间。这很重要,因为multiprocessing模块将在它创建的每个进程

    def compute_interval(interval):
        start_pos, end_pos = interval
        my_chunk = gt_2l[start_pos:end_pos, :, :]
        num_samples = my_chunk.shape[1]
        num_miss, num_anc_hom, num_het = calc_stats(my_chunk)
        chunk_complete_data = np.sum(np.equal(num_miss, 0))
        chunk_more_anc_hom = np.sum(num_anc_hom > num_het)
        return chunk_complete_data, chunk_more_anc_hom
    

    上多次执行这个函数

  2. 我们现在终于要在几个内核上执行我们的代码了:

    with Pool() as p:
        print(p)
        chunk_returns = p.map(compute_interval, intervals)
        complete_data = sum(map(lambda x: x[0], chunk_returns))
        more_anc_hom = sum(map(lambda x: x[1], chunk_returns))
        print(complete_data, more_anc_hom)
    

第一行使用multiprocessing.Pool对象创建一个上下文管理器。默认情况下,Pool对象创建几个编号为os.cpu_count()的进程。这个池提供了一个map函数,它将在所有创建的进程中调用我们的compute_interval函数。每个呼叫将占用一个时间间隔。

还有更多…

这个菜谱简单介绍了如何使用 Python 进行并行处理,而不需要使用外部库。也就是说,它为 Python 的并发并行处理提供了最重要的构件。

由于 Python 中线程管理的实现方式,线程化并不是真正并行处理的可行替代方案。纯 Python 代码不能使用多线程并行运行。

您可能使用的一些库 NumPy 通常就是这种情况——能够利用所有底层处理器,即使是在执行一段连续的代码时。确保在使用外部库时,不要过度使用处理器资源:当您有多个进程时会出现这种情况,并且底层库也会使用许多内核。

参见

  • 关于multiprocessing模块还有更多要讨论。你可以从 https://docs.python.org/3/library/multiprocessing.xhtml 的的标准文档开始。
  • 要理解为什么基于 Python 的多线程不能利用所有的 CPU 资源,请阅读在realpython.com/python-gil/.全局解释器锁 ( GIL

使用 Dask 处理基于 NumPy 阵列的基因组数据

Dask 是一个提供高级并行的库,可以从单台计算机扩展到非常大的集群或云操作。它还提供了处理大于内存的数据集的能力。它能够提供类似于常见 Python 库(如 NumPy、Pandas 或 scikit-learn)的接口。

我们将重复之前食谱中的例子的子集,即计算数据集中 SNPs 的缺失。我们将使用与 Dask 提供的 NumPy 类似的接口。

在我们开始之前,请注意 Dask 的语义与 NumPy 或 Pandas 等库有很大不同:它是一个懒惰的库。例如,当您指定一个调用相当于—比方说— np.sum时,您实际上并不是在计算一个总和,而是添加一个任务,这个任务在将来最终会计算它。让我们进入食谱,让事情更清楚。

准备就绪

我们将以完全不同的方式对 Zarr 数据进行重新分块。我们这样做的原因是,我们可以在准备我们的算法时可视化任务图。具有五个操作的任务图比具有数百个节点的任务图更容易可视化。实际上,你不应该像我们在这里做的那样分成这么小的块。事实上,如果您完全不重新分块这个数据集,您将会非常好。我们这样做只是为了可视化的目的:

import zarr
mosquito = zarr.open('data/AG1000G-AO/2L/calldata/GT')
zarr.array(
    mosquito,
    chunks=(1 + 48525747 // 4, 81, 2),
    store='data/rechunk')

我们最终会得到非常大的块,虽然这有利于我们的可视化目的,但它们可能太大而不适合内存。

该配方的代码可在Chapter11/Dask_Intro.py中找到。

怎么做…

  1. 让我们首先加载数据并检查数据帧的大小:

    import numpy as np
    import dask.array as da
    
    mosquito = da.from_zarr('data/rechunk')
    mosquito
    

如果您在 Jupyter 中执行,下面是输出:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 11.1-Dask 数组的 Jupyter 输出,总结了我们的数据

整个数组占用了7.32 GB。最重要的数字是块大小:1.83 GB。每个工作者将需要有足够的内存来处理一个块。请记住,我们仅使用如此少量的块来绘制这里的任务。

由于块的大小很大,我们最终只有四个块。

我们还没有在内存中加载任何东西:我们只是指定了我们最终想要做的事情。我们正在创建一个要执行的任务图,而不是执行——至少现在是这样。

  1. 让我们看看加载数据需要执行哪些任务:

    mosquito.visualize()
    

以下是输出:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 11.2 -加载我们的 Zarr 数组需要执行的任务

因此,我们有四个任务要执行,每个块一个。

  1. 现在,让我们看一下计算每个块的缺失的函数:

    def calc_stats(variant):
        variant = variant.reshape(variant.shape[0] // 2, 2)
        misses = np.equal(variant, -1)
        return misses
    

每个块的函数将在 NumPy 数组上操作。请注意不同之处:我们用来处理主循环的代码使用 Dask 数组,但是在块级别,数据被表示为 NumPy 数组。因此,根据 NumPy 的要求,块必须适合内存。

  1. 后来我们实际使用函数的时候,需要有一个二维 ( 2D )数组。假设数组是三维的 ( 3D ),我们将需要对数组

    mosquito_2d = mosquito.reshape(
        mosquito.shape[0],
        mosquito.shape[1] * mosquito.shape[2])
    mosquito_2d.visualize()
    

    进行整形

以下是目前的任务图表:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 11.3 -加载基因组数据并重塑它的任务图

reshape操作发生在dask.array级别,而不是在 NumPy 级别,所以它只是向任务图添加了节点。仍然没有执行。

  1. 现在,让我们准备在所有数据集上执行该函数——意味着将任务添加到我们的任务图中。有很多种执行方式;这里,我们将使用dask.array提供的apply_along_axis函数,它基于 NumPy 的同名函数:

    max_pos = 10000000
    stats = da.apply_along_axis(
        calc_stats, 1, mosquito_2d[:max_pos,:],
        shape=(max_pos,), dtype=np.int64)
    stats.visualize()
    

目前,我们只打算研究前一百万个位置。正如您在任务图中看到的,Dask 非常聪明,只向计算中涉及的块添加了一个操作:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 11.4 -包括统计计算的完整任务图

  1. 请记住,到目前为止,我们还没有计算任何东西。现在是实际执行任务图的时候了:

    stats = stats.compute() 
    

这将开始计算。精确的计算是如何完成的,我们将在下一个食谱中讨论。

警告

由于块的大小,这段代码可能会使你的计算机崩溃。至少有 16 GB 的内存,你会很安全。记住,你可以使用更小的块大小——你应该使用更小的块大小。我们只是使用这样的块大小,以便能够生成前面显示的任务图(如果不是这样,它们可能有数百个节点,并且不可打印)。

还有更多…

我们在这里没有花任何时间讨论优化 Dask 代码的策略——那将是一本独立的书。对于非常复杂的算法,您需要进一步研究最佳实践。

Dask 提供了类似于其他已知 Python 库(如 Pandas 或 scikit-learn)的接口,可用于并行处理。您也可以将它用于不基于现有库的一般算法。

参见

  • 对于 Dask 的最佳实践,你最好的起点是 Dask 文档本身,尤其是 https://docs.dask.org/en/latest/best-practices.xhtml ??。

使用 dask.distributed 调度任务

Dask 在执行方面非常灵活:我们可以在本地执行,在科学的集群上执行,或者在云上执行。这种灵活性是有代价的:它需要参数化。配置 Dask 调度和执行有几种选择,但最通用的是dask.distributed,因为它能够管理不同种类的基础设施。因为我不能假设你可以访问集群或者云,比如亚马逊网络服务 ( AWS )或者 GCP,我们将在你的本地机器上设置计算,但是记住你可以在非常不同的平台上设置dask.distributed

在这里,我们将再次对按蚊 1000 基因组项目的变体进行简单的统计。

准备就绪

在我们从dask.distributed开始之前,我们应该注意到【Dask 有一个默认的调度器,它实际上可以根据你的目标库而改变。例如,下面是我们的 NumPy 示例的调度程序:

import dask
from dask.base import get_scheduler
import dask.array as da
mosquito = da.from_zarr('data/AG1000G-AO/2L/calldata/GT')
print(get_scheduler(collections=[mosquito]).__module__)

输出如下所示:

dask.threaded

Dask 在这里使用一个线程调度器。这对 NumPy 数组有意义:NumPy 实现本身是多线程的(真正的并行多线程)。当底层库自身并行运行时,我们不希望有很多进程运行。如果你有一个 Pandas 数据框架,Dask 可能会选择一个多处理器调度器。由于 Pandas 不是并行的,Dask 本身并行运行是有意义的。

好了——现在我们已经了解了重要的细节,让我们回到准备我们的环境。

有一个集中的调度器和一组工人,我们需要启动它们。在命令行中运行以下代码启动调度程序:

dask-scheduler --port 8786 --dashboard-address 8787

我们可以在与调度程序相同的机器上启动 workers,如下所示:

dask-worker --nprocs 2 –nthreads 1 127.0.0.1:8786

我指定了两个进程,每个进程有一个线程。这对于 NumPy 代码来说是合理的,但是实际的配置将取决于您的工作负载(如果您在集群或云上,这将完全不同)。

小费

您实际上不需要像我在这里做的那样手动启动整个过程。如果您自己没有准备好系统,将会为您启动一些东西,但并不真正针对您的工作负载进行优化(有关详细信息,请参见下一节)。但是我想让您感受一下这种努力,因为在许多情况下,您必须自己建立基础设施。

同样,我们将使用来自第一个配方的数据。确保你下载了并做好准备,正如它的准备部分所解释的。我们将不使用重新分块的部分——我们将在下一节的 Dask 代码中使用它。我们的代码可以在Chapter11/Dask_distributed.py中找到。

怎么做…

按照以下步骤开始:

  1. 让我们首先连接到我们之前创建的调度器:

    import numpy as np
    import zarr
    import dask.array as da
    from dask.distributed import Client
    
    client = Client('127.0.0.1:8786')
    client
    

如果你在 Jupyter 上,你会得到一个很好的输出,总结了你在这个菜谱的准备好部分创建的配置:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 11.5-dask . distributed 的执行环境概要

您会注意到这里对仪表板的引用。dask.distributed为提供了一个网络上的实时仪表盘,允许你跟踪计算的状态。将您的浏览器指向 http://127.0.0.1:8787/以找到它,或者只需跟随图 11.5 中提供的链接。

因为我们还没有进行任何计算,所以仪表板基本上是空的。请务必浏览顶部的许多选项卡:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 11.6-dask . distributed 仪表板的启动状态

  1. 让我们加载数据。更严谨一点,让我们准备加载数据的任务图:

    mosquito = da.from_zarr('data/AG1000G-AO/2L/calldata/GT')
    mosquito
    

下面是 Jupyter 上的输出:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 11.7 -染色体 2L 的原始 Zarr 阵列总结

  1. 为了便于形象化,让我们再来一次。我们还将为第二个维度准备一个单独的块,也就是样本。这是因为我们对缺失的计算需要所有的样本,在我们的具体例子中,每个样本有两个块是没有意义的:

    mosquito = mosquito.rechunk((mosquito.shape[0]//8, 81, 2))
    

提醒一下,我们有非常大的块,你可能会有内存问题。如果是这种情况,那么您可以使用原始块运行它。只是可视化会不可读。

  1. 在我们继续之前,让我们要求 Dask 不仅要执行重新分块,还要在工人中准备好结果:

    mosquito = mosquito.persist()
    

persist调用确保数据在工人中可用。在下面的截图中,您可以在计算过程中的某个地方找到仪表板。您可以找到每个节点上正在执行的任务、已完成和待完成任务的摘要,以及每个工作者存储的字节数。值得注意的是溢出到磁盘的概念(见屏幕左上角)。如果没有足够的内存存储所有区块,它们将被临时写入磁盘:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 11.8 -执行持久化功能重新分块数组时的仪表板状态

  1. 现在让我们来计算统计数据。我们将对最后一个配方使用不同的方法——我们将要求 Dask 对每个块应用一个函数:

    def calc_stats(my_chunk):
        num_miss = np.sum(
            np.equal(my_chunk[0][0][:,:,0], -1),
            axis=1)
        return num_miss
    stats = da.blockwise(
        calc_stats, 'i', mosquito, 'ijk',
        dtype=np.uint8)
    stats.visualize()
    

记住,每个块不是一个dask.array实例,而是一个 NumPy 数组,所以代码在 NumPy 数组上工作。这是当前的任务图表。没有加载数据的操作,因为前面执行的函数执行了所有这些操作:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

图 11.9 -从持久数据开始对块调用 calc_stats 函数

  1. 最后,我们可以得到我们的结果:

    stat_results = stats.compute()
    

还有更多…

关于dask.distributed接口还有很多可以说的。在这里,我们介绍了其架构和仪表板的基本概念。

dask.distributed提供基于 Python 的标准async模块的异步接口。由于这一章的介绍性质,我们不会涉及它,但建议你看一看。

参见

  • 你可以在 https://distributed.dask.org/en/stable/的dask.distributed文档开始。
  • 在许多情况下,你将需要在集群或云中部署你的代码。查看不同平台上的资源部署文档:docs.dask.org/en/latest/deploying.xhtml
  • 在您掌握了这里的内容之后,下一步将是学习 Python 中的异步计算。看看 https://docs.python.org/3/library/asyncio-task.xhtml 的。

十二、生物信息学的函数式编程

Python 是一种多范式语言,允许你以多种不同的方式表达计算。它有时被称为面向对象的 ( OO )语言:当然,你可以用 OO 方言写代码,但是你也可以使用其他风格。Python 中的大多数代码都是以命令式风格编写的:没有沿着类层次结构的结构,这是面向对象范式的典型特征,大多数代码都改变了状态,例如,如果你写了x = x + 1,你就改变了变量x的状态。

如果你写复杂的代码,特别是需要并行处理的代码,命令式和面向对象的范例会遇到一些限制。对于在单台机器上运行的简单脚本,命令式和面向对象式就可以了,但是生物信息学是一个大数据企业,您通常需要纵向扩展和横向扩展,因为有大量信息要处理,并且许多算法计算量很大。

函数式编程对于生物信息学中越来越常见的复杂和并行问题非常有用。许多用于高吞吐量数据分析的现代架构都是基于函数式编程思想的,例如,MapReduce 范式最初是在 Google 和 Apache Spark 等平台上开发的。这些思想也直接适用于 Dask,甚至使顺序代码更加健壮。

函数式编程是基于函数,纯函数求值,避免可变性。在这一章中,我们将采用一种非常实用的方法来介绍这些概念,并举例说明它们在典型的生物信息学应用中的用例。最后,您应该对函数式编程有一个基本的概念理解,但最重要的是,您将理解它的实用性和适用性。

如果您正在使用 Docker,并且因为所有这些库都是数据分析的基础,所以它们都可以在 Docker 映像tiagoantao/bioinformatics_base中找到。

在本章中,我们将介绍以下配方:

  • 理解纯函数
  • 理解不变性
  • 避免可变性作为健壮的开发模式
  • 使用惰性编程实现流水线操作
  • Python 递归的局限性
  • Python 的functools模块展示

理解纯函数

一个纯函数有几个重要的属性:对于相同的输入,它产生相同的输出,并且它没有副作用(它不改变全局变量,也不做 I/O)。在这个食谱中,我们将介绍几个简单的例子来阐明这个概念。我们最感兴趣的是第二个特性:没有副作用。在后面的食谱中,将会清楚为什么纯函数会非常有用。

我们将开发一个非常简单的例子,在这个例子中,我们对每个样本中测序的基因进行计数。我们将有一个文本文件的基因计数数据库。例如,我们可能在一个样本上测定了 LCT 和 TP53 的序列,在另一个样本上测定了 LCT、MRAP2 和 POMC 的序列。总数将是:TP53: 1,LCT: 2,MRPA2: 1,POMC: 1。我们将使用一个 CSV 文件,可以很容易地阅读熊猫,甚至只是 CSV 模块。

准备就绪

我们将使用一个简单的 CSV 文件作为我们的小数据库。该配方的代码可在Chapter12/Pure.py中找到。数据库文件是my_genes.csv,在my_genes.csv.base有一个保存的原始状态,以防你需要回到原始数据库状态。

怎么做…

看看下面的步骤就可以开始了:

  1. 让我们首先创建两个简单的函数来加载和保存数据。我们还需要一些时间来恢复原始数据库:

    import shutil
    import pandas as pd
    
    def restore_db(file_name):
        shutil.copyfile(f'{file_name}.base', file_name)
    
    def load(file_name):
        df = pd.read_csv(file_name).set_index('gene')
        return dict(df['count'])
    
     def save(dict_db, file_name):
        pd.Series(dict_db).to_csv(
            file_name, index_label='gene', header=['count'])
    

我们用熊猫来照顾坚持。这是一个非常简单的例子;您也可以只使用csv模块。

  1. 我们将考虑三个替代函数来报告在样本中看到的基因;下面是第一个:

    def add_sample_csv(gene_list):
        gene_count = load('my_genes.csv')
        for gene in gene_list:
            gene_count[gene]=gene_count(gene,0)+1
        save(gene_count, 'my_genes.csv')
    

该函数自动用新样本保存数据。它有读写文件的副作用,所以它不是一个纯粹的函数。

  1. 这里有一个第二选择,也报告样本中的基因:

    def add_sample_global_dict(gene_list):
        global gene_count
        for gene in gene_list:
            gene_count[gene] = gene_count.get(0) + 1
    

这个函数使用模块中的一个全局变量并更新它,这是一个副作用。我们将不使用这个函数,但它是作为一个有副作用的函数的例子提供的。我们应该避免全局变量。

  1. 下面是一个纯函数变体:

    def add_sample_dict(dict_db, gene_list):
        for gene in gene_list:
            dict_db[gene] = dict_db.get(0) + 1
        return dict_db
    

这个函数改变了它的一个参数——dict_db——正如我们将在下一个菜谱中看到的,从函数式编程的角度来看,这不是一个最佳实践。然而,对于相同的输出,它总是返回相同的结果,并且没有副作用,所以这将是一个很好的第一种方法。

  1. 假设我们现在运行以下代码:

    add_sample_csv(['MC4R', 'TYR'])
    

但是,如果我们错误地运行了 10 次,会有什么后果呢?从逻辑角度来看,我们会将两个基因都多报 9 次,从而破坏我们数据库的内容。

  1. 作为一种选择,考虑以下:

    add_sample_dict(gene_count, ['MC4R', 'TYR'])
    

如果运行 10 次,会有什么后果?这仍然是一个错误(因为我们正在改变gene_count),但至少它还没有被提交到磁盘。在进行探索性的数据分析时,这种方言会舒服得多——您可以在知道不会损坏外部数据的情况下运行它。在下一个食谱中,我们将看到一种替代方法,使重新运行代码问题更少。

还有更多…

在接下来的几个食谱中,我们将通过非常实际的案例来了解为什么纯函数会非常有用。但是有一个具体的例子很难解释,因此,我们将在这里介绍这个理论。

纯函数使得并行化代码更加容易。想象一个执行分布式计算的框架,比如 Dask 或 Spark。这种框架必须处理硬件故障的潜在情况,代码需要被移动并可能在其他地方重复。如果您的代码在每次运行时都写入磁盘(这是一个副作用的例子),那么分布式框架的恢复就要复杂得多。如果您的代码没有副作用,那么分布式框架可以重复您的功能,而无需考虑数据一致性。事实上,许多分布式框架不允许在可并行化代码中产生副作用。他们也可能反对数据结构的可变性,这是我们现在要讨论的。

理解不变性

函数式编程的另一个共同特点是数据结构通常是不可变的。当你习惯于命令式编程时,这是一个很难理解的概念——命令式编程是指在没有对象的情况下编程,对象的状态会随着时间的推移而改变。在这里,我们将看到一个简单的例子,用一种不可变的方式使前面的配方中的函数工作:也就是说,没有对象被改变,如果我们需要传递新的信息,我们就创建新的。

这个菜谱将从数据结构的角度对不变性做一个简短的介绍。从某种意义上来说,这将是你能在大多数书中找到的标准演示。然而,我们主要考虑的是将可变性作为一种代码设计模式来讨论,这也是下面食谱的主题。但对于这一点,我们需要先理解不变性。

我们将研究两个函数:一个会改变数据结构,另一个不会。这将在我们在本章前面的食谱中遵循的例子的上下文中完成。

准备就绪

我们将使用与之前配方相同的数据。该配方的代码可在Chapter12/Mutability.py中找到。

怎么做…

请遵循以下步骤:

  1. 根据前面的配方,我们有一个改变输入字典的函数:

    def add_sample_dict(dict_db, gene_list):
        for gene in gene_list:
            dict_db.get(gene,0) +1
    

如果你用add_sample_dict(gene_count, ['DEPP'])调用这个代码,而没有输出参数,你的字典的gene_count将会是突变的,以将1添加到基因DEPP中。如果您运行这个函数的次数过多——这在进行探索性数据分析时很常见——您可能会错误地更改字典。

  1. 将前面的实现与下面的进行对比:

    def add_sample_new_dict(dict_db, gene_list):
        my_dict_db = dict(dict_db)
        for gene in gene_list:
            my_dict_db[gene] = my_dict_db.get(0) + 1
        return my_dict_db
    

在这种情况下,我们将dict_db复制到一个新的字典my_dict_db,并添加额外的基因列表。制作现有词典的副本需要耗费内存和时间,但是原始词典dict_db没有改变。

  1. 如果您使用这个实现,您肯定您的输入参数永远不会改变:

    new_gene_count = add_sample_new_dict(gene_count, ['DEPP'])
    

gene_count不是变的,new_gene_count是一本全新的词典。您可以重复执行任意多次,而不必担心每次执行的影响。

还有更多…

这个简单的方法提供了一个不改变参数的函数的例子。我们现在的情况是,我们可以想执行多少次这个函数就执行多少次——无论是错误的还是故意的——而不会对代码的其余部分产生任何影响。这在我们测试代码或运行探索性数据分析时非常有用,因为我们不必担心副作用。这也方便了分布式执行器(如 Dask 或 Spark)的工作,因为它们不必担心检查现有对象的状态:它们是固定的。

对于一般的软件设计来说,这里也有重要的教训,即使你不使用分布式执行器。这就是我们在下面的食谱中要研究的内容。

避免可变性是一种健壮的开发模式

前面的配方引入了不可变数据结构的概念。在这份食谱中,我们将讨论一种设计模式,它可以避免代码中的持久数据库可变性,直到最后。就伪代码而言,长脚本中的大多数应用程序如下工作:

Do computation 1
Write computation 1 to disk or database
Do computation 2
Write computation 2 to disk or database
….
Do computation n
Write computation n to disk or database

在这里,我们将展示一个替代范例,并讨论为什么从弹性的角度来看它通常更好:

Do computation 1
Write computation 1 to temporary storage
Do computation 2
Write computation 2 to temporary storage
...
Do computation n
Write computation n to temporary storage
Take all temporary data and write it to definitive disk and database

首先,我们将展示这两种方法的代码,然后讨论为什么对于复杂的脚本,后者在大多数情况下是更好的方法。

我们将使用与前面食谱中相同的例子:我们将报告在不同样品中看到的基因。

准备就绪

我们将使用与之前配方中相同的数据。该配方的代码可在Chapter12/Persistence1.pyChapter12/Persistence2.py中找到。

怎么做…

让我们用一些共享代码来设置这两个解决方案。

  1. 两种解决方案仍然使用loadsave函数:

    import pandas as pd
    
    def restore_db(file_name):
        shutil.copyfile(f'{file_name}.base', file_name)
    
    def load(file_name):
        df = pd.read_csv(file_name).set_index('gene')
        return dict(df['count'])
    
    def save(dict_db, file_name):
        pd.Series(dict_db).to_csv(
            file_name, index_label='gene', header=['count'])
    
  2. 第一种选择是,我们边走边存,如下所示:

    def add_sample_csv(gene_list):
        gene_count = load('my_genes.csv')
        for gene in gene_list:
            gene_count[gene]=gene_count(gene,0)+1
        save(gene_count, 'my_genes.csv')
    
    add_sample_csv(['MC4R', 'TYR'])
    add_sample_csv(['LCT', 'HLA-A'])
    add_sample_csv(['HLA-B', 'HLA-C'])
    

每当我们添加一个样本,我们就更新主数据库。虽然从 I/O 的角度来看,这种解决方案不是非常有效,但这不是这里要讨论的问题——实际上,从 I/O 的角度来看,大多数这种设计模式往往比我们接下来要看的更有效。

  1. 当我们在最后存储最终数据时,第二个选择如下:

    def add_sample_new_dict(dict_db, gene_list):
        my_dict_db = dict(dict_db)  # next recipe
        for gene in gene_list:
            dict_db.get(gene,0) +1
        return my_dict_db
    
    gene_count = load('my_genes.csv')
    gene_count = add_sample_new_dict(gene_count, ['MC4R', 'TYR'])
    gene_count = add_sample_new_dict(gene_count, ['LCT', 'HLA-A'])
    gene_count = add_sample_new_dict(gene_count, ['HLA-B', 'HLA-C'])
    save(gene_count, 'my_genes.csv')
    

在这种情况下,我们只在最后更新主数据库。我们使用内存字典来维护中间数据。

还有更多…

那么,为什么要这样做呢?延迟写入最终数据的解决方案更加复杂,因为我们必须潜在地存储部分结果并对其进行管理(在这种情况下,我们使用gene_count字典——这是一个简单的例子)。事实上,这是真的,但代码有 bug,磁盘存储空间填满,计算机在现实世界中会崩溃,所以从务实的角度来看,面对这一点是更重要的考虑因素。

假设您的第一个解决方案由于某种原因在执行过程中停止了工作。数据库处于未知状态,因为您不知道代码在哪里。您不能从零开始重新启动代码,因为它的一部分可能已经被执行了。因此,您必须检查数据库的状态,查看代码在哪里停止,并部分运行丢失的部分,或者找到一种方法回滚执行的部分。它既麻烦又容易出错。

现在,假设在第二种方法的执行过程中出现了一个问题:您不必担心数据的中间状态,因为它不会影响您的最终数据库,并且会在第二次执行中完全重新计算。您可以重新运行代码,而不用担心不一致的状态。唯一的例外是最后一行,但这不仅不太常见,而且您还可以将所有精力集中在这一点上,以使流程尽可能具有弹性。

这能一直做到吗?还是只在某些情况下有效?大多数代码都可以创建临时文件。此外,许多生物信息学分析不是实时事务性的,所以通常只在最后才更新 SQL 数据库很容易。现在,您有了一个与整个代码相关的单点故障——如果有问题,您知道您只需要关注最后的部分。

在可能的情况下,这种方法将使复杂代码的日常维护更加容易。

使用惰性编程进行流水线操作

懒惰编程是一种策略,我们把计算推迟到真正需要的时候。与急切编程相比,它有许多优点,在急切编程中,我们一调用计算就计算所有的东西。

Python 为懒惰编程提供了许多机制——事实上,从 Python 2 到 Python 3 的最大变化之一就是语言变得更加懒惰。

为了理解懒惰编程,我们将再次使用我们的基因数据库,并用它做一个练习。我们将检查是否至少有 n 个基因,每个基因有 y 个读数(例如,三个基因,每个基因有五个读数)。比方说,这可以是对我们数据库质量的一种衡量——也就是说,衡量我们是否有足够的基因和一定数量的样本。

我们将考虑两种实现:一种是懒惰的,另一种是渴望的。然后我们将比较这两种方法的含义。

准备就绪

该配方的代码可在Chapter12/Lazy.py中找到。

怎么做…

看看这两种不同的方法:

  1. 让我们从急切的实现开始:

    import pandas as pd
    def load(file_name):
        df = pd.read_csv(file_name).set_index('gene')
        return dict(df['count'])
    
    def get_min_reads(all_data, min_reads):
        return {
            gene: count
            for gene, count in all_data.items()
            if count >= min_reads
        }
    
    def has_min_observations(subset_data, min_observations):
        return len(subset_data) >= min_observations
    print(has_min_observations(
        get_min_reads(
            load('my_genes.csv'), 4
        ), 3))
    

注意代码加载所有数据,检查所有条目的计数4,并查看是否至少有3个具有该计数的观察值。

  1. 这是一个另类,懒惰的版本:

    def get_rec(file_name):
        with open(file_name) as f:
            f.readline()  # header
            for line in f:
                toks = line.strip().split(',')
                yield toks[0], int(toks[1])
    
    def gene_min_reads(source, min_reads):
        for gene, count in source:
            if count >= min_reads:
                yield gene
    
    def gene_min_observations(subset_source, min_observations):
        my_observations = 0
        for gene in subset_source:
            my_observations += 1
            if my_observations == min_observations:
                return True
        return False
    print(gene_min_observations(
        gene_min_reads(
            get_rec('my_genes.csv'), 4
        ), 2))
    

这段代码以一种完全不同的方式运行。首先,get_recgene_min_reads是生成器(注意yield,所以它们只会在需要的时候返回结果,一个接一个。gene_min_observations一旦观察到所需的观察次数,就会返回。这意味着将被读取和处理的唯一数据是达到结果的最少数据。在最坏的情况下,这仍然是整个文件,但在许多情况下,它可以少得多。

还有更多…

对于非常大的文件,这种方法的优势显而易见。虽然我们的玩具文件没有明显的优势,但是如果文件太大而不适合内存,第一种方法就会崩溃!此外,管道的每一步都需要足够的内存来存储它消耗的所有数据,并且所有的步骤都需要同时在内存中。因此,即使对于中等大小的文件,内存问题也可能在急切版本中出现,而惰性版本可以避免这种问题。

很多时候,惰性代码做的处理也少得多:就像前面的例子一样,它可能会在看到所有数据之前就停止计算。这并不总是确定的,但在许多用例中都会发生。

从历史的角度来看,Python 2 比 Python 3 更有热情。这实际上是从 Python 2 到 Python 3 的主要变化之一。举个简单的例子,考虑一下 Python 2 和 3 中range的行为。

我们的惰性实现的一部分可以通过使用一些 Python 内置模块以更简洁的方式编写——我们将在最终的食谱中再次讨论这一点。

使用 Python 递归的局限性

递归——一个调用自己的函数——是函数式编程中的一个基本概念。许多迎合函数式编程的语言能够无限递归。Python 不是这些语言中的一种。你可以在 Python 中使用递归,但是你必须意识到它的局限性,即递归会导致相当大的内存开销,并且它不能用来代替迭代——这是编写函数式程序时的典型情况。

为了了解 Python 递归的限制,我们将以几种方式实现斐波那契数列。提醒一下,斐波那契可以定义如下:

Fib(0) = 0
Fib(1) = 1
Fib(n) = Fib(n -1) + Fib(n –2)

我们也在计算阶乘函数,但在这种情况下只是以递归的方式:

Fact(1) = 1
Fact(n) = n * Fact(n –1 )

准备就绪

我们的代码可以在Chapter12/Recursion.py中找到。

怎么做…

按照以下步骤开始:

  1. 先说一个迭代版的斐波那契:

    def fibo_iter(n):
        if n < 2:
            return n
        last = 1
        second_last = 0
        for _i in range(2, n + 1):
            result = second_last + last
            second_last = last
            last = result
        return result
    

这个版本已经足够了,而且也非常高效。我们并不是说迭代版本在任何方面都不如递归版本。相反,用 Python,可能会表现得更好。

  1. 下面是一个递归版本:

    def fibo_naive(n):
        if n == 0:
            return 0
        if n == 1:
            return 1
        return fibo_naive(n - 1) + fibo_naive(n - 2)
    

如果您运行它,您会发现与迭代版本相比,性能受到了相当大的影响。此外,如果您要求fibo_naive(1000)或一个类似的大数字,代码将根本不能很好地执行(1000 个案例可能需要很多小时),而迭代版本将充分执行。在接下来的食谱中,我们将实际修复其中的一部分。但是现在,让我们更深入地研究递归和 Python 的问题。

  1. 为了让的情况变得非常明显,让我们用实现一个阶乘函数,从递归实现的角度来看,这个函数尽可能简单(甚至比斐波那契更简单):

    def factorial(n):
        if n == 1:
            return 1
        return n * factorial(n - 1)
    

如果你用一个小数字运行这个函数,比如factorial(5),你会得到正确的答案,5。但是如果你尝试一个大的数字,比如factorial(20000),你会得到如下结果:

Traceback (most recent call last):
  File "Recursion.py", line 8, in <module>
    factorial(20000)
  File "Recursion.py", line 4, in factorial
    return n * factorial(n - 1)
  File "Recursion.py", line 4, in factorial
    return n * factorial(n - 1)
  File "Recursion.py", line 4, in factorial
    return n * factorial(n - 1)
  [Previous line repeated 995 more times]
  File "Recursion.py", line 2, in factorial
    if n == 1:
RecursionError: maximum recursion depth exceeded in comparison

这个错误提示在 Python 中你可以递归多少是有限制的:Python 只能递归到某个限制(见sys.setrecursionlimit来改变这一点)。虽然您可以将递归的数量变得更大,但是维护堆栈所需的内存总会有一个限制。还有其他方法可以显式实现递归,但代价是速度。许多语言都实现了一个称为尾部调用优化 ( TCO )的特性,它允许高性能的无限级递归,但是 Python 没有实现它。

还有更多…

在 Python 中,您可以对不需要多次循环调用的简单情况使用递归,但是在 Python 中,递归并不是迭代解决方案的一般替代品。递归可能是 Python 函数世界中实现得最差的主要特性。

Python 的 functools 模块展示

Python 有三个内置模块,在用函数式方言编写代码时非常有用:functoolsoperatoritertools。在本食谱中,我们将简要讨论functools模块。例如,基本的reduce功能(这是 MapReduce 名字的一部分)只有在导入functools时才可用。

虽然对这些模块的详细探究对于单个配方来说太长了,但是我们将通过使用functools中的功能改进之前配方的一些代码来展示一些功能,并展示模块效用的一些说明性示例。

准备就绪

我们的代码可以在Chapter12/Builtin.py中找到。我们将参考以前的食谱。

怎么做…

让我们看几个说明性的例子:

  1. 还记得我们在前面的配方中阶乘函数的递归实现不是很高效吗?让我们用一种非常简单的方式来缓解它的许多问题:

    import functools
    
    @functools.cache
    def fibo(n):
        if n == 0:
            return 0
        if n == 1:
            return 1
        return fibo(n - 1) + fibo(n - 2)
    

如果你记得前一个配方中的递归代码,它非常慢,即使对于小数字也是如此。做这件事可能需要几个小时。这个函数只需添加@functools.cache装饰器,就可以更快地处理更多的数字。这是由于cache装饰器实现了记忆化。

什么是记忆化?

记忆化是这样一个过程,计算机通过它缓存执行的结果,并在下一次调用它时,不再次计算它,而只是返回存储的结果。例如,第一次调用fibo(10),为了得到结果,实际上执行了函数,但是第二次调用时,结果在第一次运行时被缓存,没有执行就返回了。只有当函数总是为相等的输入返回相同的输出,并且没有副作用时,记忆化才能正确工作。也就是说,记忆化只对纯函数起作用。

  1. 对于另一个使用函数方法替代现有代码的示例,从使用惰性编程进行流水线操作方法:

    def gene_min_reads(source, min_reads):
        for gene, count in source:
            if count >= min_reads:
                yield gene
    

    中获取函数

这里已经有了一些功能性的味道,因为我们使用了一个生成器。

  1. 这个函数可以用一种功能性更强的方言来编写:

    def gene_min_reads(source, min_reads):
        return map(
            lambda x: x[0],
            filter(lambda x: x[1] >= min_reads,
            source.items()))
    

这里有很多东西需要打开。首先看内置的filter函数:它会将第一个参数中定义的函数应用到第二个参数上迭代器的对象上。在我们的例子中,第二个元素大于或等于min_reads的对象将被保留。然后,map函数获取每个对象(类型为('GENE', count))并只返回第一部分。mapfilter函数在函数式编程的方言中非常常见。还要注意匿名函数的典型函数概念,a lambda:这是一个只在一个地方使用的函数定义——它对非常小的函数非常有用。在这种情况下,没有直接的理由以这种方式重写(特别是因为前面定义的生成器类型已经为我们的问题提供了函数式方言的最重要的特性),但是您会发现在许多情况下这种类型的表示更加简洁和有表现力。

  1. 另一个使用分布式系统时你可能需要的重要概念是部分函数应用——这里有一个使用最基本算术函数的简单例子:

    def multiplication(x, y):
        return x * y
    double = functools.partial(multiplication, 2)
    double(3)
    

double被定义为部分解决multiplication中的一个参数——因此,我们使用术语部分函数应用。这不仅仅是一个风格上的问题,因为有时在分布式平台中(或者甚至仅仅是多处理池的map函数),您被限制为只能提供一个参数——因此,部分函数应用程序成为一种必要。

还有更多…

functools模块中有许多更有趣的应用程序可以查看。从功能的角度来看,itertoolsoperator模块也有许多惯用的功能。

参见…

Python 有几个函数式编程库,例如,参见以下内容:

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐