DPark 是一个类似MapReduce 的基于Mesos(Apache 下的一个集群管理器,提供了有效的、跨分布式应用或框架的资源隔离和共享等功能)的集群并行计算框架(Cluster Computing Framework),DPark 是Spark 的Python克隆版本,是一个Python 实现的分布式计算框架,可以非常方便地实现大规模数据处理和低延时的迭代计算。该计算框架类似于MapReduce,但是比其更灵活,可以用Python 非常方便地进行分布式计算,并且提供了更多的功能,以便更好地进行迭代式计算。DPark 由国内的豆瓣公司开发实现和负责维护,据豆瓣公司的描述,豆瓣公司内部的绝大多数数据分析都使用DPark 完成,整个项目也正趋于完善。
RDD 是DPark 的核心概念,是DPark 使用的一种数据模型,RDD 的一个重要特征就是在计算过程中,一个RDD 可以在不同的并行循环中被重复利用。
RDD 是一种支持容错、可进行并行计算的元素集合。一个RDD 由多个分片(Split)组成,分片是计算过程中并行计算的基本单位。目前,DPark 支持从以下两种数据源中获取RDD。
(1)并行化数据集:可以将一个普通的Python 数据集合(如list)拆分为若干分片后组成一个RDD。
(2)分布式文件系统中的单个或者多个文件:即将分布式文件系统中的文件按照行拆分后,组成一个RDD,目前,DPark 支持两种格式的文件,即文本格式文件和CSV 格式文件。
从上述数据源生成的RDD以及在并行计算过程中新生成的RDD都支持相同的操作和变换。在DPark 中通过上述两种数据源生成RDD 的具体方法如下。
① 从并行化数据集生成RDD。
对于并行化的数据集,可以调用DparkContext 的parallelize 函数得到,方法是:rdd = ctx.parallelize(seq, numSlices)。其中,参数numSlices 表示将数据集seq 划分的分片数,DPark 会将每个分片作为一个任务发送到集群上进行并行计算,对于基于Mesos 的集群节点,通常每个CPU 可以运行2~4 个任务。numSlices 针对不同的集群设置,也都有相应的默认值。
② 从分布式文件系统生成RDD。
DPark 可以读取从分布式文件系统中的一个或多个文件生成RDD,目前支持的文件格式有文本文件和CSV 格式文件,而且将来会增加对更多文件格式的支持,例如,支持从MySQL 中读取数据。当然,用户也可以自己按照RDD 的要求实现对应的RDD。
● 读取文本文件为RDD。
可以调用DparkContext 的textFile 方法从文本文件生成RDD。text_rdd = ctx.textFile('data.txt', splitSize=64<<20)通过这种方式创建的RDD 中,每个元素为源文件中的一行,包含行结尾的回车符,splitSize 指定RDD 中每个分片的大小,默认为64MB。
● 从CSV 格式文件生成RDD。
可以调用DparkContext 的csvFile 方法从文本文件生成RDD。csv_rdd = ctx.csvFile('data.csv', splitSize=32<<20, dialect='excel')通过这种方式生成的RDD 中,每个元素为源文件中每一行分割后生成的数组。splitSize 指定RDD 中每个分片的大小,默认为32MB。dialect 参数指定csv 文件中使用的分隔符,具体请参见csv.reader,默认使用逗号('excel')分割。
1.1.2 共享变量
在 DPark 中,具体的计算过程发生在集群的每个计算节点上,所以DPark需要将RDD 的分片数据和计算函数(如map 和reduce 函数)进行序列化,通过网络发送到计算节点上,然后在计算节点上执行反序列化,在这个过程中,计算操作所依赖的全局变量、模块和闭包等也会被复制到该计算节点上,所以,在计算节点中对普通变量的修改不会影响到主程序中的变量。
对于在计算过程中数据共享的需要,DPark 通过提供共享变量来实现。共享变量在不同的计算任务之间可以进行共享式的读/写,DPark 目前支持两种类型的共享变量:只读广播变量和只可以写的累加器。
● 广播变量。
DPark 支持将变量一次性发送到集群的所有计算节点上,这种变量称为广播变量。使用广播变量避免了变量在每次调用时需要通过网络传输,执行序列化和反序列化等操作。这种情况一般用在计算函数需要依赖一个特别大的数据集的时候。需要注意的是,被广播的对象大小不能超出计算节点的内存限制。DPark 使用较高效率的广播算法执行广播变量在集群中的传递,目前支持分布式文件系统和树形结构两种广播算法。
●累加器。
累加器可以在执行数据量较小的任务的时候,用于收集任务产生的少量数据。累加器只支持add 操作,不支持删除、更新等操作,DPark 默认使用的累加器可以支持数值类型、list 类型和dict 类型,用户也可以自定义累加器。
DPark 并行计算模型基于上节讲到的DPark 的两个基本概念,一是基于对RDD 的分布式计算,二是基于计算过程中能够通过不同的机器访问的共享变量。共享变量必须很容易在现存的分布式文件系统中实现。RDD 能够在多次循环计算过程中反复被利用,所以DPark 支持将RDD 缓存在计算节点的内存中以加快计算。
RDD 目前支持两种类型的并行计算。
(1)变换
变换是将现有的RDD 通过运算变成另外一个RDD,例如,DPark 中的map、filter 操作就是该功能。
(2)操作
操作是指将现有的RDD 进行聚合运算,然后将计算结果立即返回给主程序,例如,DPark 中的求和操作count。需要说明的是,所有的RDD 的变换都是滞后的,当在一个RDD 上调用变换函数(如map)的时候,并没有立即执行该计算,只是生成了一个携带计算信息的新的RDD,只有当RDD 需要将某个操作结果返回给主程序的时候,才执行真正的计算。这种设计有如下好处。
(1)提高效率:DPark 可以自动将多个变换操作进行合并,然后同时运行,这样可以最大程度地减少数据传输量。
(2)容错:在某个分片计算失败的情况下,RDD 由于携带有计算信息,因此可以重新执行计算。
另外,RDD 支持一种很特别且很重要的变换,即缓存(cache),当在某个RDD 上调用cache()函数的时候,每个计算节点会将分配给自己的计算分片的结果缓存在内存中,当以后需要对该RDD 或者从该RDD 转换来的RDD 进行操作的时候,计算节点就可以直接从内存中取得该RDD 的计算结果。很显然,当整个计算过程需要对RDD 进行重复利用时,缓存技术将大大提高计算性能。这种设计方法对于迭代式计算非常有帮助。
在前面DPark 和MapReduce 的比较一节中已经提到,DPark 和MapReduce的关键性区别在于处理数据流上。MapReduce 基于非循环的数据流模型,而DPark 对于需要重复使用数据集的迭代式算法具有较高的效率。这其中的关键是DPark 使用了一种特殊的数据模型,即RDD。
RDD 的设计目标是既保留了MapReduce 等数据流处理模型优点,例如,自动容错、数据本地化、可拓展性强等,也添加了自己独特的特性,即将一部分数据缓存在内存中,以加快对这部分数据的查询和计算的效率。
RDD 可以认为提供了一个高度限制的共享内存,RDD 只读或者只能从别的RDD 转化而来。这些限制可以降低自动容错需要的开销,RDD 使用一种称为“血统”的容错机制,也就是每个RDD 都携带了它是如何从别的RDD 转变过来的信息以及如何重建某一分片的信息。
在 RDD 这种数据模型出现之前,也有很多数据处理模型被创造出来解决非循环的数据模型中的问题。例如,Google 的Pregel(迭代图计算框架)、Twister和HaLoop(迭代MapReduce 框架)等,这些模型的一个共同特征就是应用场景有限,这些模型是根据企业(如Google)自己的业务需求而开发出来的,所以无法通用。而基于RDD 的DPark 提供了一种更加通用的迭代式并行计算框架,用户可以显示控制计算的中间结果,自由控制计算的流程。
RDD 的发明者已经在Spark 的基础上实现了Pregel(100 行Scala 代码)以及迭代MapReduce 框架(200 行Scala 代码),从实现代码行数上可以看到,Spark提供了一个很简洁的编程模型。Spark 在其他框架不能很好适用的场景下也可以达到很好的效果。例如,在交互式的大数据查询上。一些实践和研究表明,Spark 在那些迭代式计算中比Hadoop 快20 倍,能够在5~7s 的时间内交互式地查询1TB 的数据(比Hadoop 快40 倍)。
通常,实现分布式数据集容错有两种常用的方法,即数据检查点和记录更新。
考虑到RDD 的应用场景(面向大规模数据的存储和分析),给数据设置检查点代价较高,原因在于设置检查点需要在集群节点之间进行大量数据的拷贝操作,而这种拷贝操作会受到集群带宽的限制,而且带宽是集群中的稀缺资源,拷贝操作会牺牲大量的带宽,而且拷贝也增加了集群节点的存储负载。
考虑到上述情况,DPark 使用记录更新的方法实现容错,但是更新所有记录的方法也需要较高的代价,所以RDD 仅支持粗颗粒度变换,即仅记录在单个分片上的单个操作,然后创建某个RDD 的变换序列存储下来,即保存“血统”信息,当出现数据丢失的情况时,可以根据保存的“血统”信息重新构建数据,以此来达到数据容错的功能。
当时由于RDD 在计算过程中需要进行多次变换,导致变换的序列很长,因此根据这些很长的变换序列恢复丢失的数据需要很长的时间,所以DPark 设计者建议在变换序列很长的情况下,适当地建立一些数据检测点以加快实现容错。DPark 目前还没有实现自动判断是否需要建立检查点的机制,用户可以通过saveAsTextFile 方法来手动设置数据检查点。据DPark 网站描述,DPark 会在后来的升级版本中添加这一机制,实现更加自动化、更加友好的容错机制。
在DPark 设计中,为了将来使RDD 支持更多类型的变换,而不用改变现存的任务调度机制,而且也为了保持RDD 的基于“血统”的容错机制,RDD被设计了几个通用接口,具体来讲,每个RDD 必须包含如下4 方面的信息。
● 数据源分割后的数据分片信息,即源代码中的split 变量。
● “血统”信息,即该RDD 所依赖的父RDD 信息以及二者之间的关系信息,即源代码中的dependence 变量。
● 计算函数,即该RDD 如何从父RDD 经过计算转变得来,即源代码中的iterator(split)和compute 函数。
● 如何对数据进行分片和分片保持位置相关的元数据信息,即源代码中的partitioner 和preferredLocations。
下面进行举例说明。
例如,从分布式文件系统中的文件转换得到的RDD,这些RDD 中的数据分片通过对源数据文件进行分割后得到的,它们没有父RDD,这些RDD 的计算函数只是读取文件中的每一行,然后返回给子RDD,对于通过map 函数转换得到的RDD,会具有和父RDD 相同的数据分片。
上面所列的RDD 的四个通用信息如何表达父RDD 和子RDD 之间的关系是DPark 和Spark 必须要考虑的事情,在Spark 实现中,将这种依赖关系划分为两种类型:窄依赖和宽依赖。窄依赖是指子RDD 的每个数据分片只对父RDD 中的有限个数据分片有依赖关系,而且依赖的数量在规模上要和父RDD 分片数量差别很大。宽依赖是指子RDD 中的每个数据分片都可以对父RDD中的每个数据分片有依赖关系。例如,如图2-20 左侧所示,对于map,filter 变换操作就属于窄依赖,由map 变换产生的子RDD 中的某个数据分片只对父RDD相应的数据分片有依赖,这就是一种窄依赖关系。但是对于groupByKey 变换操作,由于每个子RDD 中的数据分片对父RDD 中的所有数据分片都有依赖,所以这是一种宽依赖关系。
DPark 和Spark 对父子RDD 依赖关系进行分类的特性,主要是为了针对不同的依赖类型使用不同的任务调度机制和数据容错机制,从而更加高效地进行计算。对于窄依赖关系,可以在计算节点上根据父RDD 中的数据分片计算得到子RDD 中相应的数据分片。对于宽依赖,意味着子RDD 中的数据分片的计算需要在父RDD 中的所有数据分片计算完成的情况下才可以进行。而且对于窄依赖来说,数据丢失或者出错所需要的恢复时间要比宽依赖少很多,因为对于窄依赖来说,只有丢失的那些数据分片需要重新计算,而对于宽依赖,则需要对丢失的数据分片的所有祖先RDD 重新计算一遍。所以,DPark 和Spark 建议,对于有长“血统”链特别是存在宽依赖的情况下,需要在适当的时间设置一个数据检查点来避免过长的数据恢复时间。
在DPark 实现中,设计者试图利用RDD 的特性为所有的RDD 操作找到一种最有效的执行策略,任务调度器提供一个runJob 接口给RDD 使用,该接口接收三个参数,分别是RDD 对象本身、感兴趣的部分数据分片和数据分片上的操作函数。当RDD 需要执行一个操作的时候,例如,对RDD 执行count、collect 等操作,DPark 就会调用runJob 接口来执行并行计算。
从总体来讲,DPark 的任务调度机制很像Dryad 的任务调度方法,其区别在于DPark 在进行任务调度的时候会考虑RDD 的哪些数据分片需要缓存在集群的哪些计算节点上,选择的过程是这样实现的,首先,RDD 根据自身携带的“血统”序列信息创建出一些阶段stage,每个阶段会尽可能多地包含可以连续运行的变换,即基于窄依赖的变换,一个stage 的边界是那些需要在节点之间移动数据的宽依赖变换,或者是那些已经被缓存的RDD。在如图1 所示的整体计算的示例中表示了阶段的分割,只有父阶段完成了计算,子阶段的计算才能开始,运行的时候每个数据分片分配一个任务,没有父子关系的阶段之间可以并行计算。图1 中的Stage 1 和Stage 2 就可以并行计算,Stage 3 必须在Stage 1 和Stage2 计算完成后才能开始。
在进行具体任务分配的时候,调度器会根据数据本地化的原则来分配计算任务到计算节点,例如,某个计算任务需要访问一个已经被缓存的数据分片,那么调度器就将该任务分配给缓存有该数据分片的计算节点来执行,这种调度策略可以最大程度地减少集群网络带宽的占用,降低集群节点通信代价。
如果一个计算任务执行失败,但是它所在阶段的父阶段的数据没有丢失,那么调度器可以调度该任务到其他计算节点上重新运行。如果它所在的父阶段已经不可用,那么调度器需要重新提交父阶段中的所有需要重新计算的任务。
共享变量是DPark 中除了RDD 外的另外一个重要的概念,目前在DPark中支持两种类型的共享变量:广播变量和累加器。下面简单介绍两种共享变量的实现机制。
(1)广播变量的实现
在 DPark 中,当在程序中调用RDD 上的map、filter 等变换操作时,会传递一个变换函数给这些变换操作,DPark 在实际运行该函数的时候需要将该函数所需要的闭包序列化后通过网络传送到计算节点上,在计算节点上计算函数就可以调用函数域的变量、函数等。但是如果闭包中的函数需要访问一个较大的数据集,那么序列化闭包并在集群中传送的方式所花费的代价就很高。考虑到这种情况,DPark 支持使用广播变量,从而使用户可以一次性将信息发送到所有集群的计算节点上。目前实现的算法有基于文件系统的广播算法和树形广播算法。
在分布式文件系统广播算法中,当用户为变量v 创建一个广播变量bv 的时候,DPark 会为其创建一个唯一的广播ID,然后将变量v 序列化后存储在文件系统中的一个文件里。当传递闭包的时候,DPark 会将bv 而不是v 序列化后传递过去,而广播变量bv 的序列化结果就是广播ID。当用户使用bv.value 中v的值时候,DPark 会在缓存中检查v 是否存在,如果存在,直接使用,如果不存在,就将对应的文件反序列化后提供给计算节点使用。
在树形广播算法中,当用户为变量v 创建一个广播变量bv 时,DPark 也会给该广播变量创建一个唯一的广播ID,但是不会将v 序列化后存放到文件系统的某一个文件中,而是利用zeromq 在master 上绑定一个端口,当worker 需要读取真实数据的时候,如果本地缓存中没有该数据,就访问master 节点请求数据,当一个worker 获取了数据,就可以将该数据传播给其他worker,其传播过程是一个N 叉树形。
两种算法中,一般来说,树形广播算法会更快一些,因为树形广播算法只有内存和网络I/O 操作,而分布式文件系统算法不仅有内存和网络I/O 操作,还有磁盘操作,所以相对来说,速度会慢一些。在Spark 实现中,除了上述两种广播算法,还支持P2P 的广播算法,但是DPark 目前还没有实现该算法。
(2)累加器的实现
在 DPark 中创建累加器的时候,需要提供accumulator.AccumulatorParam这样一个对象。该对象中有两个值,分别代表累加器的0 值和加法操作定义。例如,对于数值型累加器,0 值就是数值0,加法定义就是普通的数值加法。但是对于列表型累加器,0 值就是空的[ ],加法就是extend 函数。当累加器被创建的时候,DPark 会给累加器创建一个唯一的ID,当累加器随着计算任务被分配到计算节点上,在节点上会创建值为0 的累加器的副本,在计算节点上对累加器进行的任何操作都是在该副本上的操作,不会传递到主程序中的累加器上。当任务完成返回后,DPark 调度器会将这个副本累加器和主程序中的累加器进行合并,在所有的计算任务完成后,才能得到最终累加器的正确值。