Apache Kafka 简单生产者示例


让我们创建一个使用 Java 客户端发布和消费消息的应用程序。 Kafka 生产者客户端由以下 API 组成。

Kafka生产者 API


让我们了解本节中最重要的一组 Kafka 生产者 API。 KafkaProducer API 的核心部分是班级。 KafkaProducer 类提供了一个选项,可以在其构造函数中使用以下方法连接 Kafka 代理。

  • KafkaProducer 类提供了 send 方法来将消息异步发送到主题。 send() 的签名如下

producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);
  • 制片人记录:生产者管理一个等待发送的记录缓冲区。

  • Callback:用户提供的回调,当记录已被服务器确认时执行(null表示没有回调)。

  • KafkaProducer 类提供了一个 flush 方法来确保所有之前发送的消息都已经真正完成。 flush方法的语法如下:

public void flush()
  • KafkaProducer 类提供 partitionFor 方法,该方法有助于获取给定主题的分区元数据。这可用于自定义分区。该方法的签名如下:

public Map metrics()

它返回生产者维护的内部指标映射。

  • public void close(): KafkaProducer class provides close method blocks until all previously sent requests are completed.

生产者 API


Producer API 的核心部分是班级。 Producer 类提供了一个选项,通过以下方法在其构造函数中连接 Kafka 代理。

生产者阶级

生产者类提供发送方法到send使用以下签名向单个或多个主题发送消息。


public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

有两种类型的生产者——SyncandAsync.

相同的 API 配置适用于制片人也是。它们之间的区别是同步生产者直接发送消息,但在后台发送消息。当你需要更高的吞吐量时,首选异步生产者。在 0.8 等以前的版本中,异步生产者没有用于 send() 的回调来注册错误处理程序。这仅在当前版本的 0.9 中可用。

public void close()

生产者类提供close关闭与所有 Kafka 代理的生产者池连接的方法。

配置设置


Producer API 的主要配置设置如下表所示,便于理解:

S.No配置设置和说明
1

客户端.id

标识生产者应用程序

2

生产者类型

同步或异步

3

acks

acks 配置控制生产者请求下的标准被认为是完整的。

4

retries

如果生产者请求失败,则自动使用特定值重试。

5

引导服务器

经纪人的引导列表。

6

逗留时间

如果你想减少请求的数量,你可以将 linger.ms 设置为大于某个值的值。

7

key.serializer

序列化器接口的键。

8

value.serializer

串行器接口的值。

9

批量大小

缓冲区大小。

10

缓冲内存

控制生产者可用于缓冲的内存总量。

生产者记录 API

ProducerRecord 是发送到 Kafka 集群的键/值对。ProducerRecord 类构造函数用于使用以下签名创建具有分区、键和值对的记录。

public ProducerRecord (string topic, int partition, k key, v value)
  • Topic:用户定义的主题名称,将附加到记录中。

  • 分割: 分区数

  • Key: 将被包含在记录中的密钥。


  • Value: 记录内容


public ProducerRecord (string topic, k key, v value)

ProducerRecord 类构造函数用于创建具有键值对且没有分区的记录。

  • Topic: 创建一个topic来分配记录。

  • Key: 记录键。

  • Value:记录内容。

public ProducerRecord (string topic, v value)

ProducerRecord 类创建没有分区和键的记录。

  • Topic:创建话题。

  • Value:记录内容。

ProducerRecord类方法如下表:

S.No类方法和描述
1

public string topic()

主题将附加到记录中。

2

public K key()

将包含在记录中的键。如果没有这样的键,这里会返回null。

3

public V value()

记录内容。

4

分割()

记录的分区计数

SimpleProducer 应用程序


在创建应用程序之前,首先启动 ZooKeeper 和 Kafka 代理,然后使用 create topic 命令在 Kafka 代理中创建你自己的主题。之后创建一个名为并输入以下编码。

// 导入 util.properties 包
import java.util.Properties;

// 导入简单的生产者包
import org.apache.kafka.clients.producer.Producer;

// 导入 KafkaProducer 包
import org.apache.kafka.clients.producer.KafkaProducer;

// 导入 ProducerRecord 包
import org.apache.kafka.clients.producer.ProducerRecord;

// 创建名为“SimpleProducer”的java类
public class SimpleProducer {
   
    public static void main(String[] args) throws Exception{
      
        // 检查参数长度值
        if(args.length == 0){
            System.out.println("Enter topic name");
            return;
        }
      
        // 将 topicName 赋值给字符串变量
        String topicName = args[0].toString();
      
        // 为属性创建实例以访问生产者配置
        Properties props = new Properties();
      
        // 分配本地主机 ID
        props.put("bootstrap.servers", “localhost:9092");
      
        // 为生产者请求设置确认。
        props.put("acks", “all");
      
        // 如果请求失败,生产者可以自动重试,
        props.put("retries", 0);
      
        // 在配置中指定缓冲区大小
        props.put("batch.size", 16384);
      
        // 减少请求数小于 0
        props.put("linger.ms", 1);
      
        // buffer.memory 控制生产者可用于缓冲的内存总量。
        props.put("buffer.memory", 33554432);
      
        props.put("key.serializer",
            "org.apache.kafka.common.serializa-tion.StringSerializer");
         
        props.put("value.serializer",
            "org.apache.kafka.common.serializa-tion.StringSerializer");
      
        Producer<String, String> producer = new KafkaProducer
            <String, String>(props);
            
        for(int i = 0; i < 10; i++)
            producer.send(new ProducerRecord<String, String>(topicName,
                Integer.toString(i), Integer.toString(i)));
                    System.out.println(“Message sent successfully”);
                    producer.close();
    }
}

编译: 可以使用以下命令编译应用程序。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

执行: 应用程序可以使用以下命令执行。

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>
Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

简单的消费者示例


到目前为止,我们已经创建了一个生产者来向 Kafka 集群发送消息。现在让我们创建一个消费者来消费来自 Kafka 集群的消息。 KafkaConsumer API 用于消费来自 Kafka 集群的消息。 KafkaConsumer 类构造函数定义如下。

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs: 返回消费者配置图。

KafkaConsumer 类具有下表列出的以下重要方法。

S.No方法及说明
1

public java.util.Set<TopicPar-tition> assignment()

获取消费者当前分配的分区集。

2

public string subscription()

订阅给定的主题列表以获取动态分配的分区。

3

public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener)

订阅给定的主题列表以获取动态分配的分区。

4

public void unsubscribe()

从给定的分区列表中取消订阅主题。

5

public void sub-scribe(java.util.List<java.lang.String> topics)

订阅给定的主题列表以获取动态分配的分区。如果给定的主题列表为空,则将其视为与 unsubscribe() 相同。

6

public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener)

参数模式是指正则表达式格式的订阅模式,监听器参数从订阅模式中获取通知。

7

public void as-sign(java.util.List<TopicParti-tion> partitions)

手动将分区列表分配给客户。

8

poll()

获取使用订阅/分配 API 之一指定的主题或分区的数据。如果在轮询数据之前没有订阅主题,这将返回错误。

9

public void commitSync()

提交在最后一次 poll() 上返回的所有订阅的主题和分区列表的偏移量。相同的操作适用于 commitAsyn()。

10

public void seek(TopicPartition partition, long offset)

获取消费者将在下一个 poll() 方法中使用的当前偏移值。

11

public void resume()

恢复暂停的分区。

12

public void wakeup()

唤醒消费者。

消费者记录 API


ConsumerRecord API 用于接收来自 Kafka 集群的记录。该 API 由主题名称、分区号(从中接收记录)和指向 Kafka 分区中记录的偏移量组成。 ConsumerRecord 类用于创建具有特定主题名称、分区计数和

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • Topic: 从 Kafka 集群接收到的消费者记录的主题名称。

  • 分割: 主题分区。

  • Key: 记录的key,如果key不存在则返回null。

  • Value: 记录内容。

消费者记录 API


ConsumerRecords API 充当 ConsumerRecord 的容器。此 API 用于保存特定主题的每个分区的 ConsumerRecord 列表。它的构造函数定义如下。

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  • 主题分区:返回特定主题的分区图。


  • Records: ConsumerRecord 的返回列表。

ConsumerRecords 类定义了以下方法。

S.No方法和描述
1

public int count()

所有主题的记录数。

2

public Set partitions()

此记录集中具有数据的分区集(如果没有返回数据,则该集为空)。

3

public Iterator iterator()

迭代器使你能够循环通过集合、获取或重新移动元素。

4

public List records()

获取给定分区的记录列表。

配置设置


Consumer客户端API主要配置设置的配置设置如下:

S.No设置和说明
1

引导服务器

引导经纪人列表。

2

group.id

将单个使用者分配给组。

3

启用.auto.commit

如果值为 true,则启用偏移量的自动提交,否则不提交。

4

auto.commit.interval.ms

返回将更新的消耗偏移量写入 ZooKeeper 的频率。

5

session.timeout.ms

表示 Kafka 在放弃并继续消费消息之前将等待 ZooKeeper 响应请求(读取或写入)的毫秒数。

简单消费者应用


生产者申请步骤在这里保持不变。首先,启动 ZooKeeper 和 Kafka 代理。然后创建一个具有名为的 java 类的应用程序并键入以下代码。

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
    public static void main(String[] args) throws Exception {
        if(args.length == 0){
            System.out.println("Enter topic name");
            return;
        }
        // Kafka消费者配置设置
        String topicName = args[0].toString();
        Properties props = new Properties();
      
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer",
            "org.apache.kafka.common.serializa-tion.StringDeserializer");
        props.put("value.deserializer",
            "org.apache.kafka.common.serializa-tion.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer
            <String, String>(props);
      
        // Kafka Consumer 在此处订阅主题列表。
        consumer.subscribe(Arrays.asList(topicName))
      
        // 打印主题名称
        System.out.println("Subscribed to topic " + topicName);
        int i = 0;
      
        while (true) {
            ConsumerRecords<String, String> records = con-sumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
         
            // 打印消费者记录的偏移量、键和值。
            System.out.printf("offset = %d, key = %s, value = %s\n",
                record.offset(), record.key(), record.value());
        }
    }
}

编译: 可以使用以下命令编译应用程序。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

执行:该应用程序可以使用以下命令执行

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

输入:打开producer CLI,给topic发送一些消息。你可以将简单输入作为“Hello Consumer”。

输出: 以下将是输出。

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer