fasionchan

读万卷书,行万里路,品万味肴,撸万行码。

[译文]弹性分布式数据集:一种为内存化集群计算设计的容错抽象

| Comments

摘要

我们提出弹性分布式数据集(RDDs),一种分布式内存模型,以容错的方式在大集群上执行内存计算。 现有计算框架不能有效地处理以下两类问题:①迭代算法;②交互式数据挖掘,RDDs应运而生。 在这两个场景下,确保数据在内存中,性能至少有数量级的提升。 为了高效实现容错性,RDDs采用一个受限的内存共享模型——与其修改共享状态,不如对数据做一些变换(生成新数据集)。 尽管如此,我们发现RDD的表现能力足以完成一系列计算类型,包括最近的专有交互式作业编程模型,例如Pregel,甚至包括先前无法覆盖的模型。 我们在一个名为Spark的计算平台上实现了RDDs,并且已经通过一系列用户应用和基准测试进行验证。

1 简介

包括MapReduceDryad在内,集群计算框架在大规模数据分析任务中被广泛使用。 有了这类系统,用户可以使用一个高阶操作集编写并行计算任务,而不需要关心底层分布或者容错。

尽管目前主流的框架提供了不少用于访问集群计算资源的抽象方式,还是缺少关于内存方面的。 如果多次计算之间需要重用中间结果,这类框架的处理效率就非常低了。 数据重用在很多跌打机器学习算法以及图算法很常见,包括PageRankK-means聚类以及逻辑回归等。 还有一个场景是交互式数据挖掘,用户在同一数据集上跑若干个临时查询。 很不幸,在现有的计算框架,两次计算数据(比如两个MapReduce任务)复用的唯一方式是通过一个额外的存储系统(比如分布式文件系统)。 数据复制,磁盘I/O以及序列化等能带来大量的系统开销,最终支配应用的执行时间。

为了解决问题,研究人员为需要数据重用的应用开发了专有的计算框架。 例如,Pregel是一个将中间计算结果保持在内存中的迭代图计算系统;HaLoop提供一个迭代MapReduce接口。 然而,这类框架只支持非常特定的计算模式(循环一个MapReduce步骤序列),为特定模式实现隐式数据共享。 框架并没有提供通用的抽象,比如让用户加载数据集到内存中并执行一些热查询。

本文,我们提出一个新模型——弹性分布式数据集(RDDs),在大部分场景下,可以提高数据复用的效率。 RDDs是容错的并行数据结构,开发者可以将中间结果明确保持在内存中,控制数据划分以优化数据分布,并可以通过一个丰富的操作集来维护。

设计RDDs最主要的挑战是定义一个可以高效提供容错性的编程模型。 已有的内存存储集群模型,如分布式共享内存、key-value存储以及数据库等,提供了一个可以对小片可变化状态(表格)进行更新的接口。 在这种接口下,实现容错性唯一的途径是将数据或者更新日志复制到多台机器。 对数据密集型应用来说,复制操作开销太大,因为需要在集群网络间拷贝大量数据,而网络带宽又比内存小很多(慢),存储开销也不小。 另外,还带来了相当可观的存储开销。

相反,RDDs提供一种基于粗粒化变换(map/映射、filter/过滤、join/联合)的接口,所谓粗粒化变换就是对一堆数据执行同一操作。 这样,我们就有办法通过记录生成新数据集所需要的变换操作来实现容错性(复制操作定义),而不是复制实际数据。 如果某个RRD的一个分区(partition)丢失了,RDD有足够关于自己如何从其他RDD衍生而来的信息,发起重算就可以恢复丢失的分区。 因此,丢失的数据可以恢复,通常会很快,不需要高开销的复制操作。

尽管粗粒化变换接口一开始看起来比较局限,RDDs还是很好的适应大多数的并行计算应用,因为这些应用经常需要对大量数据元素应用同一操作。 再者,我们发现RDD足以表达很多集群计算模型,有些目前是作为独立系统开发的,包括MapReduceDryadLINQSQLPregel以及Hadoop。 当然,像交互式数据挖掘等现有系统没法有效解决的场景,RDD也可以自如应对。 RDD可以很好地适应我们先前提到的计算需求,这也证明了RDD模型的强大力量。

我们在一个名为Spark的计算平台上实现了RDD,已经被伯克利大学以及一些公司用来做一些研究甚至生产应用。 Spark提供了一个方便的面向语言编程接口,采用Scala语言,类似DryadLINQ。 此外,在Scala终端就可以使用Spark交互式地查询大数据集。 相信Spark是第一个可以用通用编程语言,在集群上做交互式内存数据挖掘的系统。

我们通过微基准测试以及用户应用测量来评估RDD以及Spark性能。 我们发现Spark在迭代式应用中比Hadoop要快20倍以上;真实数据分析速度可达40倍以上,可以在5-7秒的延迟内交互式地扫描1TB的数据集。 我们在Spark之上,以库的形式实现了Pregel以及HaLoop的编程模型(代码也就200行以内),包括分布优化。

本文从RDD简介(第2节)以及Spark简介(第3节)开始。 然后讨论RDD内部表示形式(第4节),实现方式(第5节)以及实验结果(第6节)。 最后讨论RDD如何占领现有集群计算模型(第7节)、调查一些相关工作(第8节)并总结。

2 弹性分布式数据集(RDDs)

这节是关于RDDs的概述。首先,我们介绍RDDs的定义(2.1小节)以及RDDsSpark中的编程接口(2.2小节)。 然后,我们将RDDs和细粒化共享内存模型进行对比(2.3小节)。 最后,我们讨论一下RDD模型的局限性。

2.1 RDD模型定义

通常,RDD是一个只读可分区的记录集合。 RDD只能通过对来自存储器的数据或者其他RDD应用某种可确定操作而产生。 为了与其他操作区分开,我们将这类操作成为变换(transformations)。 变换操作包括(map/映射、filter/过滤以及join/联合)。

RDD不需要一直存在。 相反,RDD有足够关于自己是如何从其他数据集派生(血缘)而来的信息,来从持久存储数据计算自己的分区。 这个特性很强大:理论上,程序不会应用故障时不能重建的RDD

最后,用户可以控制RDD的内存(persistence)以及分区(partitioning)。 用户知道哪些RDD会重用,因此可以为其选择合适的存储策略(比如内存存储)。 用户也可以让RDD元素以记录中的某个键为依据在不同机器上分区。 这在数据分布优化中非常有用,比如确保两个联合数据集使用同样的哈希分区方式。

2.2 Spark编程接口

Spark通过一种类似DryadLINQ的语言集成API对外提供RDD,数据集为对象,变换为调用对象方法。

开发者先通过对持久存储数据做一系列变换(mapfilter)定义一个或多个RDD。 随后就可以使用RDD对象的操作了,向应用返回一个值,或者导出到其他存储系统。 操作例子包括count,返回数据集元素数;collect返回数据集元素本身;save将数据集输出到外部存储系统。 跟DryadLINQ一样,Spark采用懒计算模式,在第一次使用时才计算RDD,这样便可以实现变换操作管道化。

此外,开发者可以调用一个persist操作,来标记一个未来希望重用的RDDSpark默认将这类RDD保存在内存中,但是在内存不足的情况下,也是可以将它们保存在内存上的。 用户也可以要求采用其他策略,比如只把RDD存储在磁盘上,或者分布在不同机器上。 最后,用户可以为每个RDD指定不同的优先级,来指定哪些内存数据先挪到磁盘。

2.2.1 案例:终端日志挖掘

假设一个web服务正面临错误的困扰,运维人员想在Hadoop文件系统中搜索TB级别的数据,以定位错误原因。 使用Spark,运维人员可以只将日志里的错误信息加载到一些节点的内存中,然后交互性地查询。 他或她很可能首先输入以下Scale代码:

1
2
3
lines = spark.textFile("hdfs://...")
errors = lines.filter(_.startsWith("ERROR"))
errors.persist()

第一行定义一个来源于HDFS文件的RDD(文本行集合),第二行则从中过滤生成一个新RDD。 第三行则将错误日志存储在内存中,以便后续多个查询都可以使用。 注意到,传给filter的参数是一个闭包,这是Scala的语法。

到这为止,集群其实还没有做任何实际性工作。 尽管如此,用户现在可以使用RDD进行操作了,例如统计消息数:

1
errors.count()

用户可以进一步对RDD做变换并取得结果,如下所示:

1
2
3
4
5
6
7
8
// 统计与MySQL相关的行数
errors.filter(_contains("MySQL")).count()

// 返回一个数组,包含所有与HDFS相关的日志时间
// 假设时间是日志第3个字段,以tab分隔
errors.filter(_.contains("HDFS"))
    .map(_.split('\t')(3))
    .collect()

在对errors做第一个操作之后,Spark就会将其所有分区保持在内存中,极大加快后续计算。 注意,最开始的RDD,也就是lines并不加载在内存中。 这是明智的做法,因为错误信息只是很小一部分,能够放进内存。

最后介绍一下这个模型如何实现容错性,请看Figure-1所示RDD血缘图。 在最后一个查询中,我们的起点是errorserrorslines的过滤结果。 我们对errors进一步过滤,然后用map转换,最后调用collect取得结果。 Spark调度器会对后面两个变换管道化,并向所有缓存errors分区的节点发送计算任务。 另外,如果errors某个分区丢失了,Spark则通过对lines对应分区应用filter来重建它。

2.3 RDD模型优势

我们通过对比分布式共享内存模型(Table-1),来理解RDD分布式内存抽象模型的优势。 在分布式共享内存系统中,应用读写是针对全局地址空间的特定位置。 这个模型定义,不止包括传统共享内存系统,也包括其他应用写共享状态片段的系统(Piccolo就是),以及分布式数据库。 分布式共享内存是一个非常通用的模型,但难以在一个商用集群上实现一个高效的容错模型。

RDDDSM的区别是,RDD只能通过大力度变换创建(写);而DSM可以直接对内存位置进行读写。 这个限制了RDD应用只能采用批量写,但是容错实现起来也更高效了。 特别是,RDD做检查点时,不会引入额外开销,因为通过血缘图就可以快速恢复。 另外,失败时只有丢失的分区需要重算,并且可以在不同的节点上并行进行,不需要回滚整个程序。

RDD第二个优势是不可修改特性让慢节点迁移成为可能,只需要像MapReduce那样跑慢任务副本。 对DSM系统来说,备份实现难度很大,因为不同任务可能对同一内存进行修改,互相干扰。

最后,RDD还有两个DSM没有的有点: 第一,在RDD批量操作,任务可以根据数据本地性进行调度,以提高性能。 第二,在内存不足时,只要只做扫描型操作RDD就可以优雅降级。 对于无法放进内存的分区,至少也可以达到与当前并行数据处理系统相近的性能。

2.4 不适合RDD的应用

简介已经提及,RDD最适合对同一数据集所有元素应用同一操作的批量处理应用。 这种场景下,RDD可以轻松将变换作为血缘图的一个步骤记录下来,不必记录大量数据就支持恢复丢失分区。 RDD在对一个共享状态做小粒度异步更新的场景就不那么友好了,比如web应用的存储系统或者增量网络爬虫。 这类应用,使用传统带更新日志以及数据检查点(checkpointing)的系统更合适,包括数据库、内存云(RAMCloud)。 我们的目标是提供为批处理设计一种高效的编程模型,而将异步应用留给其他专用系统。

3 Spark编程接口

Spark通过一个Scala实现的语言集成API(类似DryadLINQ)提供RDD模型。 Scala是一种跑在Java虚拟机之上的静态函数式编程语言。 使用Scala同时照顾到了简洁性(交互式使用时非常简便)以及效率(静态类型)。 RDD并不要求编程语言一定要是函数式的。

使用Spark时,开发人员需要写一个驱动程序连接到工作集群,如图Figure-2所示。 程序定义了一个或多个RDD,并且调用它们的一些操作。 Spark驱动器代码还需要记录RDD间的血缘关系。 工作节点是一些常驻进程,将RDD分区保存在内存中,可以跨操作访问。

2.2.1节已经提及,用户将闭包作为参数传递给RDD的操作方法,如mapScalaJava对象表示闭包,可以序列化并通过网络传递给其他节点。 与闭包绑定的变量,Scala将其作为字段保存在闭包对象中。 因此,我们可以写var x = 5; rdd.map(_ + x)这样的代码,对RDD的每个元素加5

RDD本身是静态类型对象,参数为对象类型。 举例,RDD[Int]表示由整型元素组成的RDD。 我们大部分例子不会显示指定类型,因为Scala支持类型推导。

尽管我们在Scala中暴露RDD的做法从概念上将非常简单,还是需要通过反射(reflection)来使用闭包对象的特性。 为了让Spark能在Scala解析器中直接使用,我们也需要做一些额外工作,5.2节会介绍。 好在,就算这样也不至于要修改Scala编译器代码。

3.1 Spark中的RDD操作

Table-2列举了Spark提供的主要RDD变换和操作。 我们给出了每个操作的签名,方括号内是参数类型。 需要特别注意,变换操作(transformation)是定义新RDD的懒操作,而即时操作(action)启动计算,返回一个值或者向外部存储输出数据。

注意到,有些操作,如join,只对key-valueRDD有效。 操作名字也有讲究,匹配Scala API以及其他函数式语言。 例如,map是一对一映射;而flatMap是一对多(与MapReduce中的类似)。

除了这些操作,用户还可以要求RDD持续(存在内存)。 此外,用户可以获得RDD分区顺序(由Partitioner类表示),并基于这个划分另一个数据集。 groupByKeyreduceByKey以及sort自动生成一个哈希或者范围分区RDD

3.2 应用实例

作为2.2.1节数据挖掘案例的补充,我们再介绍两个交互式应用:逻辑回归(logistic regression)和RageRank。 后者,我们将看到如果通过RDD分区控制提升性能。

3.2.1 逻辑回归

很多机器学习算法天生就是迭代式的——需要迭代地跑优化程序,比如梯度下降(gradient descent),来最大化一个函数。 将数据保持在内存中,迭代起来要快很多。

下面我们举个例子,实现逻辑回归——一个通用分类算法,为两个点集合(比如垃圾邮件和非垃圾邮件)寻找一个界面(hyperplane)w。 算法采用梯度下降:从一个随机向量w开始,每次迭代对所有数据应用一个与w相关的函数并求和,此次来优化w

1
2
3
4
5
6
7
8
9
val points = spark.textFile("...")
                .map(parsePoint).persist()
var w = xxxx // random initial vector
for (i <- 1 to ITERATIONS) {
    val gradient = points.map(
        p => p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
    ).reduce((a, b) => a+b)
    w -= gradient
}

开始我们对文本文件每一行应用map方法解析成Point对象,生成一个内存RDD。 接下来,在一个循环里对点集合应用mapreduce变换累加当前w的函数来计算每一步的梯度。 在6.1节中,我们将看到,将数据点保持在内存中,迭代速度可以提升20倍以上。

3.2.2 PageRank

另一种更复杂的数据共享模型是PageRank。 在Spark上,我们可以这样写PageRank

1
2
3
// 将链接图加载成由(URL, 外链)对组成的RRD
val links = spark.textFile("...").map("...").persist()
var ranks = xxxx // (URL,等级)对RDD

这个程序得到如Figure-3所示的RDD血缘图。 每次迭代,我们根据上次迭代的等级(ranks)和贡献值(contribs),以及静态链接集计算出新等级(ranks)RDD。 有意思的是,随着迭代次数增多,血缘图也在变长。 因此,对一个需要多次迭代的作业,需要将一些ranks版本进行复制持久,以降低故障恢复时间。 用户可以用RELIABLE标志调用persist方法来实现。 相反,links数据集就不需要持久化,因为其分区的重建只需要对输入文件对应快应用map即可,非常高效。 因为一个文档可能有很多链接,而其等级只是一个数字,links数据集比ranks数据集大很多,通过血缘关系进行恢复比将整个程序内存状态进行快照更省时间。

接着,我们可以控制RDD分区方式来优化PageRank应用的通讯。 如果我们已经为links指定了一种分区方式(哈希分区),我们可以对ranks采用相同的分区方式,确保对linksranksjoin操作不需要通讯(数据在同一机器上)。 我们也可以写一个自定义Partitioner

1
links = spark.textFile(...).map(...).partitionBy(myPartFunc).persist()

4 RDD表现形式

RDD模型设计另一个挑战是挑选一种表现形式,可以跟踪经过一系列变换后的血缘关系。 理想状态下,一个实现RDD的系统需要提供一个尽可能完善的变换操作集(多多益善),而且用户可以随意组合。 我们提出了一种基于图的RDD表示方法,很好地实现了这一目标。 我们在Spark中采用了这种方法,支持大量的变换方式,而不需要为不同的变换加上特殊的调度逻辑,极大地简化了系统的设计。

简而言之,我们通过以下5个基本要素来定义一个RDD:①分区集合,数据集的原子组成部分;②父RDD依赖;③转化函数(从父RDD计算得到本RDD);④分区以及数据分布元数据。 举例,一个表示HDFS文件的RDD,每个数据块对应一个分区,分区知道数据块分布在哪个节点上。 显然,对一个RDD应用map方法,相当于对每个分区应用map方法,因而结果有相同的分区数。

这个接口设计最有趣的问题是,怎么表示RDD间的依赖关系。 我们发现,将依赖分成两种比较有用:窄依赖,父RDD一个分区最多只被子RDD一个分区依赖;宽依赖,被子RDD多个分区所依赖。 很显然,map操作就是一个窄依赖,而join操作就是一个宽依赖(除非父RDD是哈希分区的)。 图4展示其他例子。

这样区分依赖出于两方面考虑: 首先,窄依赖可以在(一个)集群节点上做流水线式执行,计算每一个分区数据。 举个例子,我们可以先应用map操作,然后应用filter操作,一个接着一个。 相反,做宽依赖计算时,所有父分区都要可用,因而需要在不同节点间传递数据(类似MapReduce操作)。 其次,对窄依赖来说,节点故障恢复更为简单——只有丢失的父分区需要重算,而且可以在多个节点上并行进行。 但对于宽依赖来说,一个节点故障会导致所有祖先节点中某些分区不可用(数据分散分布),因此需要全部重算。 对一个图式的宽依赖来说,一个节点故障将导致所有,只有全部重算。

在通用接口之下,Spark大部分变换都可以用不到20行代码实现。 甚至于,Spark新手不需要理解调度器的工作细节,就可以实现新的变换(比如,取样(sampling)或者自定义的联合(join))。 接下来,我们快速介绍一些RDD实现。

HDFS文件:在我们的例子中,输入RRDHDFS文件。 这些RRDpartitions方法返回一个分区列表,每个文件块一个(块偏移量(offset)存储在分区对象中)。 preferredLocations返回块所在节点,iterator方法读取数据块。

map:对任一RDD调用map变换后等到一个映射RDD。 对父对象所有记录应用传给map的方法,则得到子对象。因此,子对象和父对象有相同的分区数和意向存储位置。

union:对两个RDD调用union操作,生成一个新RDD,分区是两个父RDD的并集。 每个子对象分区由对应的父分区通过窄依赖计算而来。

samplesample(采样)操作与map类似,只不过子RRD为每个分区设置了一个随机数生成器,用来随机从父记录中抽样。

join:联合两个RRD可能产生两个窄依赖(如果都是用同一个哈希(hash)/区间(range)分区器),两个宽依赖,或者混合(只有一个父对象使用区间划分器)。 不管哪种情况,子RDD都有一个分区器(从父RDD继承或者默认的哈希分区器)。

5 实现

我们用大概14000Scala代码实现了Spark。 系统跑在Mesos集群管理器之上,可以与HadoopMPI或者其他计算应用共享资源。 Spark应用作为独立的Mesos应用运行,有自己的驱动(master)和工作节点,应用间资源共享由Mesos处理。

通过Hadoop提供的输入插件APISpark能够从任何Hadoop数据源(HDFSHBase)读取数据,Scala版本要保持一致。

接下来,我们讲解系统几个重要的组成部分:作业调度器(job scheduler)、交互式Spark解析器、内存管理以及检查点(checkpointing)。

5.1 作业调度

Spark作业调度器使用我们再第4节介绍的RDD表示形式。

总体来说,调度器与Dryad类似,加上对RDD分区是否在内存的跟踪。 当用户对一个RDD调用一个操作(count或者save)时,调度器检查RDD的血缘图,来生成一个描述运行阶段的有向无环图(DAG)。 每个阶段(stage)可以包含任意只有窄依赖的多流水线式变换。 阶段边界则是需要宽依赖的洗牌式变换, 调度器接着加载任务计算每个阶段的缺失分区,直到它计算出整个目标RDD

调度器使用延迟调度算法,基于数据位置,分配计算任务。 如果有一个任务需要处理的分区数据刚好在某个节点的内存中,将任务发送到这个节点即可。 相反,如果分区所属RDD本身有偏好位置,则发送到偏好位置上。

对于宽依赖,我们通过在父分区所在节点上实现中间记录,来简化故障恢复,跟MapReduce实现map类似。

对于任务失败的情况,只要该阶段的父阶段还可用,我们就可以在另一个节点上重跑。 如果有些父阶段不可用了,我们重新提交任务,来并行计算所有缺少的分区。 我们没有做调度器容错,虽然负责RDD血缘关系很直观。

最后,虽然现在所有Spark计算任务都由一个驱动程序驱动,我们也在实验让集群任务调用一个lookup方法随机访问哈希分区RDD元素的做法。 在这种场景下,如果有一个必要的分区缺失,任务需要通知调度器来计算。

5.2 解析器集成

Scala提供了一个交互式shell,类似RubyPython。 鉴于内存数据低延迟的特性,我们想让用户在解析器中运行Spark即时任务来查询大数据集。

5.3 内存管理

SparkRDD持久化提供3种选择:反序列化Java对象形式的内存存储、序列化形式的内存存储以及磁盘存储。 第一种访问性能最快,因为Java虚拟机可以直接访问RDD对象。 第二种与Java对象相比,更加省内存,当然了访问性能稍差一些。 第三种对于重算代价大,又放不进内存的RDD特别有用。

为了管理有限的内存,我们在RDD这一级应用了一个基于LRU的剔除策略。 当一个新RDD计算出来后而存储内存不足时,我们剔除一个最近最少用来的分区,除非分区跟新分区属于同一个RDD。 这种情况下,需要将就分区保持在内存中,避免循环加载剔除。 这点很重要,因为大部分操作是在整个RDD上执行的,因此,在内存的旧分区未来很有可能需要用到。 默认策略在我们的应用上表现不错,但是我们还是通过每个RDD的存储优先级配置,给用户更多选择。

每个Spark实例有自己独立的内存空间。 未来,我们计划通过统一内存管理引入一个可以在不同实例间共享的RDD

5.4 检查点支持

虽然故障后血缘图可以用来恢复RDD,但是这种恢复操作可能很费时,特别是对血缘链很长的RDD来说。 因此,为某些RDD做磁盘检查点(快照)很有意义。

Spark现在提供了一个做检查点的API(传给persistREPLICATE标志),由用户决定哪些数据做检查点。 当然,我们也在探索如何自动做检查点。 调度器知道每个数据集的规模以及计算数据集所需要的时间,因此,完全有可能选择一个最优的RDD集合来做检查点,以最小化系统恢复时间。

最后,注意到RDD只读的特性使得检查点做起来比一般共享内存简单很多。 由于一致性不需要考虑,RDD可以在后台写出,不用暂停程序或者使用分布式快照技巧。

6 评估

7 讨论

8 相关工作

9 结论

鸣谢

引用文献

Comments