fasionchan

读万卷书,行万里路,品万味肴,撸万行码。

[译文]Spark快速入门教程

| Comments

本文提供一个快速使用Spark的指引。 我们会先通过交互式终端介绍API(使用Python语言),然后介绍怎么用JavaScala或者Python编写一个应用。

开始之前,需要先从Spark官网下载一份Spark发行包。 由于我们不会直接使用HDFS,你可以不用管Hadoop版本。

注意到,Spark2.0之前,主要的编程接口是RDD(Resilient Distributed Dataset/弹性分布式数据集)。 2.0以后,RDD被另一种类似数据集取代,支持强类型而且自带更丰富的优化。 RDD接口依然有效,完整的参考文档在:RDD编程指南。 然而,我们还是推荐使用新版数据集,性能比RDD更好。 更多关于数据集的信息,请参考:SQL编程指南

使用Spark终端做交互式分析

Spark终端(shell)提供一种学习API的快捷方式,以及一系列用于交互式数据分析的强大工具。 它提供了Python版以及Scala(跑在Java虚拟机之上,可以直接使用Java类库)版。 以Python为例,在Spark目录中运行以下命令:

1
./bin/pyspark

Spark最主要的抽象是分布式对象集合,成为数据集。 数据集既可以从Hadoop产生(例如HDFS文件),也可以是从其他数据集转化而来的。 由于Python非常动态的特性,数据集不必是严格的Python类型。 所有数据集在Python里面都是行式数据集,称为DataFrame(数据帧),与pandasR中的概念类似。 我们先从代码目录中读入README文件内容来生成一个新的DataFrame

1
>>> textFile - spark.read.text('README.md')

之后,你可以直接从DataFrame取值,调用一些处理动作,或者将其变换成一个新的DataFrame。 更多细节,请参考API文档

1
2
3
4
5
>>> textFile.count()    # DataFrame行数
103

>>> textFile.first()    # DataFrame第一行数据
Row(value=u'# Apache Spark')

现在,我们尝试将其变换成一个新的DataFrame。我们调用filter方法来生成一个新DataFrame,内容为文件的一个子集。

1
>>> linesWithSpark = textFile.filter(textFile.value.contains('Spark'))

当然可以用链式写法连接变换和操作:

1
2
>>> textFile.filter(textFile.value.contains('Spark')).count()    # 有多少行包含"Spark"?
20

更多数据集操作

数据集操作和变换可以用来做更复杂的计算(组合)。 假设我们要找出词数最多的行:

1
2
3
>>> from pyspark.sql.functions import *
>>> textFile.select(size(split(textFile.value, '\s+')).name('numWords')).agg(max(col('numWords')).name('mostWords')).collect()
[Row(mostWords=22)]

第一步是将每一行映射成一个数值,并命名为numWords,并生成了一个新DataFrame。 然后调用新DataFrame.agg方法,找出最大的词数。 selectagg接收的参数都是,我们可以用数据集名.列名这种方式从DataFrame取出指定列。 注意到,我们从pyspark.sql.functions导入很多有用函数,用来对指定列做一些处理并生成一个新列。

最常见的数据流模式是MapReduceHadoop就是这样。 Spark也可以轻松实现一个MapReduce数据流:

1
wordCounts = textFile.select(explode(split(textFile.value, '\s+')).alias('word')).groupBy('word').count()

在这里,我们在select函数里头使用了explode函数,将一个行式数据集变换成一个单词数据集。 然后,组合groupBy函数和count函数统计单词出现次数,生成一个新的DataFrame,包括两列:word为单词以及count为次数。 最后我们可以调用collect函数,取出统计结果:

1
2
>>> wordCounts.collect()
[Row(word=u'online', count=1), Row(word=u'graphs', count=1), Row(word=u'["Parallel', count=1), ...]

缓存

Spark还支持将数据集推到集群内存缓存中。这在数据需要重复访问时特别有用,比如需要查询一小块热数据,或者跑一个类似PageRank的迭代算法。 下面这个简单的例子,将我们的linesWithSpark数据集标志为缓存的:

1
2
3
4
5
6
7
8
>>> linesWithSpark.cache()
DataFrame[value: string]

>>> linesWithSpark.count()
20

>>> linesWithSpark.count()
20

实际上,用Spark来分析和缓存一个只有100行的文件显得有点傻(大材小用/牛刀杀鸡)。 我们感兴趣的是,这些函数可以用于非常大的数据集,就算分布在几十甚至几百台机器上也可以。 你同样可以用bin/spark连接到一个集群做一些交互式操作,具体参考:RDD编程指南

独立应用

接下来,以Python(PySpark)为例,介绍如何使用Spark API来实现独立应用。

我们新建一个简单的Spark应用,名为SimpleApp.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from pyspark.sql import (
    SparkSession,
)

logFile = 'YOUR_SPARK_HOME/README.md'
spark = SparkSession.builder.appName(__name__).getOrCreate()
logData = spark.read.text(logFile).cache()

numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()

print('Lines with a: %i, lines with b: %i' % (numAs, numBs))

spark.stop()

这个程序只是统计包含字母a和字母b的行数。 请注意,你需要将YOUR_SPARK_HOME换成Spark目录(你安装Spark的地方)。 跟ScalaJava例子一样,我们使用SparkSession来新建数据集。 对于使用自定义类或者第三方库的应用,我们也可以通过spark-submit命令--py-files参数指定代码依赖(所有文件打包成zip包),运行spark-submit --help查看更多帮助信息。 SimpleApp是一个简单的例子,我们无须指定任何代码依赖。

最后,我们使用bin/spark-submit脚本将应用跑起来:

1
2
3
4
$ YOUR_SPARK_HOME/bin/spark-submit --master local[4] SimpleApp.py
...
Lines with a: 61, lines with b: 30
...

下一步

先恭喜你完成第一个Spark应用!

  • 更深入学习API可以从RDD编程指南以及SQL编程指南
  • 在一个集群上跑应用,可以参考部署指引
  • Sparkexample目录提供若干个例子(ScalaJavaPythonR),你可以试着运行:
1
2
3
4
5
6
7
8
# Scala和Java,使用run-example:
./bin/run-example SparkPi

# Python直接使用spark-submit
./bin/spark-submit examples/src/main/python/pi.py

# R也是直接使用spark-submit
./bin/spark-submit examples/src/main/r/dataframe.R

参考资料

Comments