Apache Kafka 与 Spark 的集成


在本章中,我们将讨论如何将 Apache Kafka 与 Spark Streaming API 集成。

关于Spark


Spark Streaming API 支持实时数据流的可扩展、高吞吐量、容错流处理。数据可以从 Kafka、Flume、Twitter 等多种来源获取,并且可以使用复杂的算法进行处理,例如 map、reduce、join 和 window 等高级函数。最后,处理后的数据可以推送到文件系统、数据库和实时仪表板。弹性分布式数据集 (RDD) 是 Spark 的基本数据结构。它是一个不可变的分布式对象集合。 RDD 中的每个数据集都被划分为逻辑分区,可以在集群的不同节点上进行计算。

与 Spark 集成


Kafka 是用于 Spark 流的潜在消息传递和集成平台。 Kafka 充当实时数据流的中心枢纽,并使用 Spark Streaming 中的复杂算法进行处理。处理完数据后,Spark Streaming 可以将结果发布到另一个 Kafka 主题或存储在 HDFS、数据库或仪表板中。下图描述了概念流程。

Integration with Spark

现在,让我们详细了解一下 Kafka-Spark API。

SparkConf API

它代表 Spark 应用程序的配置。用于将各种 Spark 参数设置为键值对。

类有以下方法:

  • 设置(字符串键,字符串值):设置配置变量。

  • 删除(字符串键): 从配置中删除密钥。

  • setAppName(字符串名称):为你的应用设置应用名称。

  • 获取(字符串键):获取密钥

StreamingContext API

这是 Spark 功能的主要入口点。 SparkContext 表示与 Spark 集群的连接,可用于在集群上创建 RDD、累加器和广播变量。签名定义如下所示。

public StreamingContext(String master, String appName, Duration batchDuration, 
    String sparkHome, scala.collection.Seq<String> jars,
    scala.collection.Map<String,String> environment)
  • master:要连接的集群URL(例如mesos://host:port、spark://host:port、local[4])。

  • appName: 作业名称,显示在集群 Web UI 上

  • 批处理持续时间:分批流式数据的时间间隔

public StreamingContext(SparkConf conf, Duration batchDuration)

通过提供新 SparkContext 所需的配置来创建 StreamingContext。

  • conf: 火花参数

  • 批处理持续时间:分批流式数据的时间间隔

KafkaUtils API

KafkaUtils API 用于将 Kafka 集群连接到 Spark 流。这个API有重要的方法签名定义如下。

public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
    StreamingContext ssc, String zkQuorum, String groupId,
    scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)

上面显示的方法用于创建从 Kafka Brokers 提取消息的输入流。

  • ssc: StreamingContext 对象。

  • zkQuorum: Zookeeper 法定人数。

  • groupId:这个消费者的组id。

  • topics:返回一张要消费的主题图。

  • 存储级别: 用于存储接收到的对象的存储级别。

KafkaUtils API 有另一个方法 createDirectStream,用于创建一个输入流,该输入流直接从 Kafka Brokers 拉取消息,而不使用任何接收器。这个流可以保证来自 Kafka 的每条消息只包含在转换中一次。

示例应用程序是在 Scala 中完成的。要编译应用程序,请下载并安装, scala 构建工具(类似于 maven)。主要应用程序代码如下所示。

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaWordCount {
    def main(args: Array[String]) {
        if (args.length < 4) {
            System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
            System.exit(1)
        }

        val Array(zkQuorum, group, topics, numThreads) = args
        val sparkConf = new SparkConf().setAppName("KafkaWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(2))
        ssc.checkpoint("checkpoint")

        val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
        val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1L))
            .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
        wordCounts.print()

        ssc.start()
        ssc.awaitTermination()
    }
}

构建脚本

spark-kafka 集成依赖于 spark、spark streaming 和 spark Kafka 集成 jar。创建一个新文件并指定应用程序详细信息及其依赖项。这将在编译和打包应用程序时下载必要的 jar。

name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"

编译/打包

运行以下命令编译打包应用程序的jar文件。我们需要将 jar 文件提交到 spark 控制台来运行应用程序。

sbt package

提交给 Spark

启动 Kafka Producer CLI(在上一章解释),新建一个名为并提供一些示例消息,如下所示。

Another spark test message

运行以下命令将应用程序提交到 spark 控制台。

/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>

此应用程序的示例输出如下所示。

spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..