Apache Spark 部署


Spark 应用程序,使用 spark-submit,是一个用于在集群上部署 Spark 应用程序的 shell 命令。它通过一个统一的接口使用所有各自的集群管理器。因此,你不必为每个应用程序配置。

例子


让我们以之前使用 shell 命令的字数统计为例。在这里,我们考虑与 Spark 应用程序相同的示例。

样本输入

以下文本是输入数据,文件名为 in.txt .

people are not as beautiful as they look, 
as they walk or as they talk. 
they are only as beautiful  as they love, 
as they care as they share.

看下面的程序:

SparkWordCount.scala

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark._  

object SparkWordCount { 
    def main(args: Array[String]) {

        val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map())
		
        /* local = master URL; Word Count = application name; */
        /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */
        /* Map = variables to work nodes */
        /*creating an inputRDD to read text file (in.txt) through Spark context*/
        val input = sc.textFile("in.txt")
        /* Transform the inputRDD into countRDD */
		
        val count = input.flatMap(line ⇒ line.split(" "))
        .map(word ⇒ (word, 1))
        .reduceByKey(_ + _)
       
        /* saveAsTextFile method is an action that effects on the RDD */
        count.saveAsTextFile("outfile")
        System.out.println("OK");
    }
} 

将上述程序保存到一个名为 SparkWordCount.scala 并将其放置在用户定义的目录中,名为 火花应用 .

注意 : 在将 inputRDD 转换为 countRDD 时,我们使用 flatMap() 将行(来自文本文件)标记为单词,使用 map() 方法计算词频,使用 reduceByKey() 方法计算每个单词的重复次数。

使用以下步骤提交此申请。执行所有步骤 火花应用 通过终端目录。

第 1 步:下载 Spark Ja

编译需要 Spark 核心 jar,因此,请从以下链接下载 spark-core_2.10-1.3.0.jar 火花芯罐 并将jar文件从下载目录移动到 火花应用 目录。

第二步:编译程序

使用下面给出的命令编译上述程序。此命令应从 spark-application 目录执行。这里, /usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar 是从 Spark 库中获取的 Hadoop 支持 jar。

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

第 3 步:创建一个 JAR

使用以下命令创建 spark 应用程序的 jar 文件。这里, 字数 是 jar 文件的文件名。

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

第四步:提交火花申请

使用以下命令提交 spark 应用程序:

spark-submit --class SparkWordCount --master local wordcount.jar

如果它执行成功,那么你会发现下面给出的输出。这 OK 输入以下输出用于用户识别,这是程序的最后一行。如果你仔细阅读下面的输出,你会发现不同的东西,比如:

  • 在端口 42954 上成功启动服务“sparkDriver”
  • MemoryStore 开始时容量为 267.3 MB
  • 在 http://192.168.1.217:4040 启动 SparkUI
  • 添加JAR文件:/home/hadoop/piapplication/count.jar
  • ResultStage 1 (saveAsTextFile at SparkPi.scala:11) 在 0.566 秒内完成
  • 在 http://192.168.1.217:4040 停止 Spark Web UI
  • MemoryStore 已清除
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。04 INFO Slf4jLogger: Slf4jLogger started 
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. 
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp:// sparkDriver@192.168.1.217:42954]
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。05 INFO HttpServer: Starting HTTP Server 
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. 
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。06 INFO SparkUI: Started SparkUI at http:// 192.168.1.217:4040
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http:// 192.168.1.217:56707/jars/count.jar,时间戳为 1436343967029
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。11 INFO HadoopRDD: 输入 split: file:/home/hadoop/piapplication/in.txt:0+54 
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver 
 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK 
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。13 INFO SparkContext: Invoking stop() from shutdown hook 
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。13 INFO SparkUI: Stopped Spark web UI at http:// 192.168.1.217:4040
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。13 INFO DAGScheduler: Stopping DAGScheduler 
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。14 INFO Map输出TrackerMasterEndpoint: Map输出TrackerMasterEndpoint stopped! 
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。14 INFO MemoryStore: MemoryStore cleared 
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。14 INFO BlockManager: BlockManager stopped 
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。14 INFO BlockManagerMaster: BlockManagerMaster stopped 
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。14 INFO SparkContext: Successfully stopped SparkContext 
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。14 INFO Utils: Shutdown hook called 
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 
15/07/08 13spark://host:port、mesos://host:port、yarn 或本地。14 INFO 输出CommitCoordinator$输出CommitCoordinatorEndpoint: 输出CommitCoordinator stopped!  

第 5 步:检查输出

程序执行成功后,会发现目录名为 outfile 在火花应用程序目录中。

以下命令用于打开和检查 outfile 目录中的文件列表。

$ cd outfile 
$ ls 
Part-00000 part-00001 _SUCCESS

检查输出的命令 部分-00000 文件是:

$ cat part-00000 
(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

在 part-00001 文件中检查输出的命令是:

$ cat part-00001 
(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

阅读以下部分以了解有关“spark-submit”命令的更多信息。

火花提交语法


spark-submit [options] <app jar | python file> [app arguments]

Options

S.No Option 描述
1 --master spark://host:port、mesos://host:port、yarn 或本地。
2 --部署模式 是在本地(“客户端”)还是在集群内的一台工作机器(“集群”)上启动驱动程序(默认值:客户端)。
3 --class 你的应用程序的主类(用于 Java / Scala 应用程序)。
4 --name 你的应用程序的名称。
5 --jars 要包含在驱动程序和执行程序类路径中的本地 jar 的逗号分隔列表。
6 --包 要包含在驱动程序和执行程序类路径中的 jar 的 maven 坐标的逗号分隔列表。
7 --存储库 以逗号分隔的附加远程存储库列表,用于搜索使用 --packages 给出的 maven 坐标。
8 --py 文件 以逗号分隔的 .zip、.egg 或 .py 文件列表,用于 Python 应用程序的 PYTHON PATH。
9 --files 要放置在每个执行程序的工作目录中的文件的逗号分隔列表。
10 --conf (prop=val) 任意 Spark 配置属性。
11 --属性文件 要从中加载额外属性的文件的路径。如果未指定,这将查找 conf/spark-defaults。
12 --驱动程序内存 驱动程序内存(例如 1000M、2G)(默认值:512M)。
13 --driver-java-options 传递给驱动程序的额外 Java 选项。
14 --驱动程序库路径 要传递给驱动程序的额外库路径条目。
15 --驱动程序类路径

要传递给驱动程序的额外类路径条目。

请注意,使用 --jars 添加的 jar 会自动包含在类路径中。

16 --executor-内存 每个执行程序的内存(例如 1000M、2G)(默认值:1G)。
17 --代理用户 提交应用程序时要模拟的用户。
18 --帮助,-h 显示此帮助消息并退出。
19 --详细,-v 打印额外的调试输出。
20 - 版本 打印当前 Spark 的版本。
21 --driver-cores NUM 驱动程序核心(默认值:1)。
22 - 监督 如果给定,则在失败时重新启动驱动程序。
23 --kill 如果给定,则杀死指定的驱动程序。
24 --status 如果给定,则请求指定驱动程序的状态。
25 --total-executor-cores 所有执行者的总核心数。
26 --executor-cores 每个执行器的核心数。 (默认值:YARN 模式下为 1,或独立模式下工作线程上的所有可用内核)。