Apache Spark 核心编程


Spark Core 是整个项目的基础。它提供分布式任务分派、调度和基本 I/O 功能。 Spark 使用称为 RDD(弹性分布式数据集)的专用基础数据结构,它是跨机器分区的数据的逻辑集合。可以通过两种方式创建 RDD;一是通过引用外部存储系统中的数据集,二是通过在现有 RDD 上应用转换(例如 map、filter、reducer、join)。

RDD 抽象是通过语言集成的 API 公开的。这简化了编程复杂性,因为应用程序操作 RDD 的方式类似于操作本地数据集合。

火花壳


Spark提供了一个交互式shell:一个交互式分析数据的强大工具。它以 Scala 或 Python 语言提供。 Spark 的主要抽象是称为弹性分布式数据集 (RDD) 的分布式项目集合。 RDD 可以从 Hadoop 输入格式(例如 HDFS 文件)或通过转换其他 RDD 来创建。

打开火花壳

以下命令用于打开 Spark shell。

$ spark-shell

创建简单的 RDD

让我们从文本文件创建一个简单的 RDD。使用以下命令创建一个简单的 RDD。

scala> val inputfile = sc.textFile(“input.txt”)

上述命令的输出是

inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12

Spark RDD API 引入了一些 转型 and few Actions 操纵RDD。

RDD 转换


RDD 转换返回指向新 RDD 的指针,并允许你在 RDD 之间创建依赖关系。依赖链(String of Dependencies)中的每个 RDD 都有一个计算其数据的函数,并有一个指向其父 RDD 的指针(依赖)。

Spark 是惰性的,因此除非你调用一些会触发作业创建和执行的转换或操作,否则不会执行任何操作。查看以下单词计数示例的片段。

因此,RDD 转换不是一组数据,而是程序中的一个步骤(可能是唯一的步骤),告诉 Spark 如何获取数据以及如何处理数据。

下面给出了 RDD 转换的列表。

S.No 转变与意义
1

地图(功能)

返回一个新的分布式数据集,通过将源的每个元素传递给一个函数来形成 func .

2

过滤器(函数)

返回通过选择源的那些元素形成的新数据集 func 返回真。

3

平面图(函数)

类似于 map,但每个输入项可以映射到 0 个或多个输出项(所以 func 应该返回一个 Seq 而不是单个项目)。

4

地图分区(功能)

与map类似,但在RDD的每个分区(块)上分别运行,所以 func 在 T 类型的 RDD 上运行时,必须是 Iterator ⇒ Iterator 类型。

5

mapPartitionsWithIndex(func)

类似于map Partitions,但也提供 func 用一个整数值表示分区的索引,所以 func 在 T 类型的 RDD 上运行时,必须是 (Int, Iterator) ⇒ Iterator 类型。

6

样本(带替换、分数、种子)

Sample a fraction 使用给定的随机数生成器种子,无论是否替换,数据的数量。

7

联合(其他数据集)

返回一个新数据集,其中包含源数据集中元素和参数的并集。

8

intersection(otherDataset)

返回一个新的 RDD,其中包含源数据集中元素和参数的交集。

9

不同的([numTasks])

返回包含源数据集的不同元素的新数据集。

10

groupByKey([numTasks])

在 (K, V) 对的数据集上调用时,返回 (K, Iterable) 对的数据集。

注意 :如果你为了对每个key进行聚合(例如求和或平均)进行分组,使用reduceByKey或aggregateByKey会产生更好的性能。

11

reduceByKey(func, [numTasks])

在 (K, V) 对的数据集上调用时,返回 (K, V) 对的数据集,其中每个键的值使用给定的 reduce 函数聚合 func ,它的类型必须是 (V, V) ⇒ V。就像在 groupByKey 中一样,reduce 任务的数量可以通过可选的第二个参数进行配置。

12

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

当在 (K, V) 对的数据集上调用时,返回 (K, U) 对的数据集,其中每个键的值使用给定的组合函数和中性“零”值聚合。允许不同于输入值类型的聚合值类型,同时避免不必要的分配。与 groupByKey 一样,reduce 任务的数量可以通过可选的第二个参数进行配置。

13

sortByKey([升序], [numTasks])

当在 K 实现 Ordered 的 (K, V) 对数据集上调用时,返回由键按升序或降序排序的 (K, V) 对数据集,如布尔升序参数中指定的那样。

14

加入(其他数据集,[numTasks])

当在 (K, V) 和 (K, W) 类型的数据集上调用时,返回 (K, (V, W)) 对的数据集,其中每个键的所有元素对。通过 leftOuterJoin、rightOuterJoin 和 fullOuterJoin 支持外连接。

15

cogroup(其他数据集,[numTasks])

当在 (K, V) 和 (K, W) 类型的数据集上调用时,返回 (K, (Iterable, Iterable)) 元组的数据集。此操作也称为 group With。

16

笛卡尔(其他数据集)

在 T 和 U 类型的数据集上调用时,返回 (T, U) 对(所有元素对)的数据集。

17

管道(命令,[envVars])

通过 shell 命令对 RDD 的每个分区进行管道传输,例如Perl 或 bash 脚本。 RDD 元素被写入进程的标准输入,输出到标准输出的行作为字符串的 RDD 返回。

18

合并(numPartitions)

将 RDD 中的分区数减少到 numPartitions。对于过滤大型数据集后更有效地运行操作很有用。

19

重新分区(numPartitions)

随机重新排列 RDD 中的数据以创建更多或更少的分区并在它们之间进行平衡。这总是对网络上的所有数据进行洗牌。

20

repartitionAndSortWithinPartitions(分区器)

根据给定的分区器对 RDD 进行重新分区,并在每个生成的分区中,按记录的键对记录进行排序。这比调用 repartition 然后在每个分区内排序更有效,因为它可以将排序向下推到 shuffle 机器中。

Actions


下表给出了返回值的操作列表。

S.No 行动与意义
1

减少(函数)

使用函数聚合数据集的元素 func (它接受两个参数并返回一个)。该函数应该是可交换的和关联的,以便可以并行正确计算。

2

收集()

在驱动程序中将数据集的所有元素作为数组返回。这通常在过滤器或其他返回足够小的数据子集的操作之后很有用。

3

count()

返回数据集中元素的数量。

4

first()

返回数据集的第一个元素(类似于 take (1))。

5

take(n)

返回第一个数组 n 数据集的元素。

6

takeSample (withReplacement,num, [seed])

返回一个随机样本的数组 num 数据集的元素,有或没有替换,可选地预先指定一个随机数生成器种子。

7

takeOrdered(n, [排序])

返回第一个 n 使用它们的自然顺序或自定义比较器的 RDD 元素。

8

保存文本文件(路径)

将数据集的元素作为文本文件(或文本文件集)写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统的给定目录中。 Spark 在每个元素上调用 toString 以将其转换为文件中的一行文本。

9

saveAsSequenceFile(path)(Java 和 Scala)

将数据集的元素作为 Hadoop SequenceFile 写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定路径中。这在实现 Hadoop 的 Writable 接口的键值对的 RDD 上可用。在 Scala 中,它也可用于可隐式转换为 Writable 的类型(Spark 包括基本类型的转换,如 Int、Double、String 等)。

10

saveAsObjectFile(path) (Java 和 Scala)

使用 Java 序列化以简单格式写入数据集的元素,然后可以使用 SparkContext.objectFile() 加载。

11

计数键()

仅适用于 (K, V) 类型的 RDD。返回 (K, Int) 对的哈希图以及每个键的计数。

12

foreach(函数)

运行一个函数 func 在数据集的每个元素上。这通常是针对诸如更新累加器或与外部存储系统交互等副作用而完成的。

注意 :在 foreach() 之外修改 Accumulators 以外的变量可能会导致未定义的行为。有关更多详细信息,请参阅了解闭包。

使用 RDD 编程


让我们借助一个示例来看看 RDD 编程中少数 RDD 转换和操作的实现。

例子

考虑一个字数统计示例:它计算文档中出现的每个单词。将以下文本视为输入并保存为 输入.txt 主目录中的文件。

输入.txt : 输入文件。

people are not as beautiful as they look, 
as they walk or as they talk.
they are only as beautiful  as they love, 
as they care as they share.

按照下面给出的过程执行给定的示例。

打开 Spark-Shell

以下命令用于打开 spark shell。通常,spark 是使用 Scala 构建的。因此,Spark 程序运行在 Scala 环境中。

$ spark-shell

如果 Spark shell 成功打开,你将找到以下输出。查看输出的最后一行“Spark context available as sc”表示 Spark 容器是自动创建的 Spark 上下文对象,名称为 sc .在开始程序的第一步之前,应该创建 SparkContext 对象。

Spark assembly has been built with Hive, including Datanucleus jars on classpath 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
15/06/04 15类似于 map,但每个输入项可以映射到 0 个或多个输出项(所以22 INFO SecurityManager: Changing view acls to: hadoop 
15/06/04 15类似于 map,但每个输入项可以映射到 0 个或多个输出项(所以22 INFO SecurityManager: Changing modify acls to: hadoop 
15/06/04 15类似于 map,但每个输入项可以映射到 0 个或多个输出项(所以22 INFO SecurityManager: SecurityManager: authentication disabled;
    ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15类似于 map,但每个输入项可以映射到 0 个或多个输出项(所以22 INFO HttpServer: Starting HTTP Server 
15/06/04 15类似于 map,但每个输入项可以映射到 0 个或多个输出项(所以23 INFO Utils: Successfully started service 'HTTP class server' on port 43292. 
Welcome to 
        ____              __
     / __/__  ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/  '_/ 
    /___/ .__/\_,_/_/ /_/\_\   version 1.4.0
        /_/
		
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) 
Type in expressions to have them evaluated. 
Spark context available as sc 
scala>

创建一个 RDD

首先,我们必须使用 Spark-Scala API 读取输入文件并创建一个 RDD。

以下命令用于从给定位置读取文件。在这里,新的 RDD 以 inputfile 的名称创建。在 textFile(“”) 方法中作为参数给出的字符串是输入文件名的绝对路径。但是,如果只给出文件名,则表示输入文件在当前位置。

scala> val inputfile = sc.textFile("input.txt")

执行字数转换

我们的目标是计算文件中的单词。创建一个平面图,将每行拆分为单词( flatMap(line ⇒ line.split(“ ”) ).

接下来,将每个单词读取为具有值的键 ‘1’ ( = ) 使用映射函数 ( 地图(字⇒(字,1) ).

最后,通过添加相似键的值来减少这些键( reduceByKey(_+_) ).

以下命令用于执行字数统计逻辑。执行这个之后,你不会发现任何输出,因为这不是一个动作,这是一个转换;指向一个新的 RDD 或告诉 spark 如何处理给定的数据)

scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

当前RDD

在使用 RDD 时,如果你想了解当前 RDD,请使用以下命令。它将向你显示有关当前 RDD 及其调试依赖项的描述。

scala> counts.toDebugString

缓存转换

你可以使用persist() 或cache() 方法将RDD 标记为持久化。第一次在动作中计算时,它将保存在节点的内存中。使用以下命令将中间转换存储在内存中。

scala> counts.cache()

应用动作

应用一个动作,比如存储所有的转换,结果到一个文本文件中。 saveAsTextFile(“ ”) 方法的 String 参数是输出文件夹的绝对路径。尝试使用以下命令将输出保存在文本文件中。在以下示例中,“输出”文件夹位于当前位置。

scala> counts.saveAsTextFile("output")

检查输出

打开另一个终端以转到主目录(在另一个终端中执行 spark)。使用以下命令检查输出目录。

[hadoop@localhost ~]$ cd output/ 
[hadoop@localhost output]$ ls -1 
 
part-00000 
part-00001 
_SUCCESS

以下命令用于查看输出 Part-00000 files.

[hadoop@localhost output]$ cat part-00000
(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1) 

以下命令用于查看输出 零件-00001 files.

[hadoop@localhost output]$ cat part-00001 
(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1) 

联合国坚持储存


在 UN-persisting 之前,如果你想查看此应用程序使用的存储空间,请在浏览器中使用以下 URL。

http:// 本地主机:4040

你将看到以下屏幕,其中显示了在 Spark shell 上运行的应用程序使用的存储空间。

storage space

如果要 UN-persist 特定 RDD 的存储空间,请使用以下命令。

Scala> counts.unpersist()

你将看到如下输出:

15/06/27 00管道(命令,[envVars])33 INFO ShuffledRDD: Removing RDD 9 from persistence list 
15/06/27 00管道(命令,[envVars])33 INFO BlockManager: Removing RDD 9 
15/06/27 00管道(命令,[envVars])33 INFO BlockManager: Removing block rdd_9_1 
15/06/27 00管道(命令,[envVars])33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810) 
15/06/27 00管道(命令,[envVars])33 INFO BlockManager: Removing block rdd_9_0 
15/06/27 00管道(命令,[envVars])33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106) 
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14

要验证浏览器中的存储空间,请使用以下 URL。

http:// 本地主机:4040/

你将看到以下屏幕。它显示了在 Spark shell 上运行的应用程序使用的存储空间。

Storage space for application