posts - 22, comments - 32, trackbacks - 0, articles - 73
  BlogJava :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理

创建多线程Apache Kafka消费者

Posted on 2018-11-30 18:26 为自己代言 阅读(1401) 评论(0)  编辑  收藏 所属分类: 消息中间件

1.为什么我们需要多线程消费者模型?

假设我们实现了一个通知模块,允许用户订阅来自其他用户,其他应用程序的通知。我们的模块读取将由其他用户,应用程序写入Kafka集群的消息。在这种情况下,我们可以获得写入Kafka主题的其他人的所有通知,我们的模块将创建一个消费者来订阅该主题。

一开始似乎一切都很好。但是,如果其他应用程序,用户...产生的通知数量快速增加并超过我们模块可以处理的速率,会发生什么?

好吧,一切都还是......还不错。我们模块尚未处理的所有消息/通知仍在Kafka主题中。但是,当消息数量过多时,事情会变得更加危险。满足保留策略时,其中一些将丢失(请注意,Kafka保留策略可以基于时间,基于分区大小,基于密钥)。更重要的是,当我们的通知模块远远落后于收入通知/消息时,它不再是真正的“通知”模块。

是时候考虑多线程消费者模型了。

2.多线程Apache Kafka消费者模型

我想在这篇文章中提到两种可能的模型。

  • 具有自己线程的多个消费者(模型#1)
  • 单个消费者,多个工作者处理线程(模型#2)

他们两个都有自己的优点和缺点

2.1模型#1。多个消费者拥有自己的主题

多线程Apache Kafka Consumer  - 具有自己线程的多个消费者

多个消费者拥有自己的主题

优点缺点
易于实施 消费者的总数受限于该主题的总分区。可能不会使用冗余消费者。
在每个分区上实现按顺序处理更容易。与经纪人的更多TCP连接

2.2模型#2。单个消费者,多个工作者处理线程

单个消费者,多个工作者处理线程

单个消费者,多个工作者处理线程

 

优点缺点
可以灵活地扩展处理线程的数量在每个分区上实现按顺序处理并不容易。假设在2个不同线程处理的同一分区上有2条消息。为了保证订单,必须以某种方式协调这两个线程。

3.实施

以下是两个型号的实施细节。

3.1。条件

  • Apache Kafka 0.9 / 0.10代理安装在本地计算机或远程计算机上。您可以参考此链接进行设置。
  • JDK 7/8安装在您的开发PC上。
  • Eclipse 4(我正在使用Eclipse Mars 4.5)
  • Maven 3

3.2源代码结构

示例源代码添加在Github中。您可以使用git将存储库拉到PC或简单下载示例的zip 版本并解压缩到您的PC。

获得源代码后,您可以将源代码导入Eclipse并运行测试。

要导入:

  •  菜单文件 - >导入 - > Maven - >现有Maven项目
  • 浏览到您的源代码位置
  • 单击“ 完成”按钮完成导入

这是Eclipse中的项目结构:

创建多线程Apache Kafka使用者 - 源代码

创建多线程Apache Kafka使用者 - 源代码

源代码包括上述两种模型的实现。软件包com.howtoprogram.kafka.multipleconsumers包含Model#1的所有源代码:具有自己的线程的多个消费者和软件包com.howtoprogram.kafka.singleconsumer包含Model#2的所有源代码:单个使用者,多个工人处理线程

3.3 Maven pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0
                                       http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.howtoprogram</groupId>
    <artifactId>kafka-multithreaded-java-example</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>Kafka-MultiThread-Java-Example</name>
 
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
 
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.9.0.1</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
</project>
我们在此示例中使用kafka-clients-0.9.0.1库。Java编译器1.8

3.4。具有自己线程的多个消费者(模型#1)

3.4.1类图

具有自己线程的多个消费者 - 类图

具有自己线程的多个消费者 - 类图

该部分的源代码包括4个类:

NotificationProducerThread.java是一个生产者线程,向Kafka代理生成消息

NotificationConsumerThread.java是一个消费者线程,消费来自Kafka经纪人的消息

NotificationConsumerGroup.java创建一组NotificationConsumerThread(s)

MultipleConsumersMain.java包含 main方法,运行程序以生成和使用消息。

3.4.2 NotificationProducerThread.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.howtoprogram.kafka.multipleconsumers;
 
import java.util.Properties;
 
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
 
public class NotificationProducerThread implements Runnable {
 
  private final KafkaProducer<String, String> producer;
  private final String topic;
 
  public NotificationProducerThread(String brokers, String topic) {
    Properties prop = createProducerConfig(brokers);
    this.producer = new KafkaProducer<String, String>(prop);
    this.topic = topic;
  }
 
  private static Properties createProducerConfig(String brokers) {
    Properties props = new Properties();
    props.put("bootstrap.servers", brokers);
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    return props;
  }
 
  @Override
  public void run() {
    System.out.println("Produces 3 messages");
    for (int i = 0; i < 5; i++) {
      String msg = "Message " + i;
      producer.send(new ProducerRecord<String, String>(topic, msg), new Callback() {
        public void onCompletion(RecordMetadata metadata, Exception e) {
          if (e != null) {
            e.printStackTrace();
          }
          System.out.println("Sent:" + msg + ", Partition: " + metadata.partition() + ", Offset: "
              + metadata.offset());
        }
      });
 
    }
    // closes producer
    producer.close();
 
  }
}
当这个生成器线程运行时,它将向代理生成5条消息。

3.4.3 NotificationConsumerThread.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.howtoprogram.kafka.multipleconsumers;
 
import java.util.Arrays;
import java.util.Properties;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
 
public class NotificationConsumerThread implements Runnable {
 
  private final KafkaConsumer<String, String> consumer;
  private final String topic;
 
  public NotificationConsumerThread(String brokers, String groupId, String topic) {
    Properties prop = createConsumerConfig(brokers, groupId);
    this.consumer = new KafkaConsumer<>(prop);
    this.topic = topic;
    this.consumer.subscribe(Arrays.asList(this.topic));
  }
 
  private static Properties createConsumerConfig(String brokers, String groupId) {
    Properties props = new Properties();
    props.put("bootstrap.servers", brokers);
    props.put("group.id", groupId);
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("auto.offset.reset", "earliest");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    return props;
  }
 
  @Override
  public void run() {
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
        System.out.println("Receive message: " + record.value() + ", Partition: "
            + record.partition() + ", Offset: " + record.offset() + ", by ThreadID: "
            + Thread.currentThread().getId());
      }
    }
 
  }
 
 
}
当此消费者线程运行时,它将轮询主题或分区中的数据。

3.4.4 NotificationConsumerGroup.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.howtoprogram.kafka.multipleconsumers;
 
import java.util.ArrayList;
import java.util.List;
 
public class NotificationConsumerGroup {
 
  private final int numberOfConsumers;
  private final String groupId;
  private final String topic;
  private final String brokers;
  private List<NotificationConsumerThread> consumers;
 
  public NotificationConsumerGroup(String brokers, String groupId, String topic,
      int numberOfConsumers) {
    this.brokers = brokers;
    this.topic = topic;
    this.groupId = groupId;
    this.numberOfConsumers = numberOfConsumers;
    consumers = new ArrayList<>();
    for (int i = 0; i < this.numberOfConsumers; i++) {
      NotificationConsumerThread ncThread =
          new NotificationConsumerThread(this.brokers, this.groupId, this.topic);
      consumers.add(ncThread);
    }
  }
 
  public void execute() {
    for (NotificationConsumerThread ncThread : consumers) {
      Thread t = new Thread(ncThread);
      t.start();
    }
  }
 
  /**
   * @return the numberOfConsumers
   */
  public int getNumberOfConsumers() {
    return numberOfConsumers;
  }
 
  /**
   * @return the groupId
   */
  public String getGroupId() {
    return groupId;
  }
 
}
此类基于给定的参数创建一组使用者线程:

经纪人:消费者团体将与之联系的Kafka经纪人

groupId:组ID。该组中的所有消费者将拥有相同的groupId

topic:使用者组将获取数据的主题

numberOfConsumer:将为该组创建的使用者数量

3.4.5 MultipleConsumersMain.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.howtoprogram.kafka.multipleconsumers;
 
public final class MultipleConsumersMain {
 
  public static void main(String[] args) {
 
    String brokers = "localhost:9092";
    String groupId = "group01";
    String topic = "HelloKafkaTopic1";
    int numberOfConsumer = 3;
 
 
    if (args != null && args.length > 4) {
      brokers = args[0];
      groupId = args[1];
      topic = args[2];
      numberOfConsumer = Integer.parseInt(args[3]);
    }
 
    // Start Notification Producer Thread
    NotificationProducerThread producerThread = new NotificationProducerThread(brokers, topic);
    Thread t1 = new Thread(producerThread);
    t1.start();
 
    // Start group of Notification Consumers
    NotificationConsumerGroup consumerGroup =
        new NotificationConsumerGroup(brokers, groupId, topic, numberOfConsumer);
 
    consumerGroup.execute();
 
    try {
      Thread.sleep(100000);
    } catch (InterruptedException ie) {
 
    }
  }
}
这个类是入口点,包含测试源代码的主要方法。在这个类中,我们将创建一个生成器线程,为主题生成5条消息,该主题有3个分区:HelloKafkaTopic1

然后我们在自己的线程上创建一组3个使用者来使用来自HelloKafkaTopic1主题的消息。

3.4.5运行示例。

创建一个包含3个分区的主题HelloKafkaTopic1

1
2
3
#cd $APACHE_KAFKA_HOME
./bin/kafka-topics.sh --create --zookeeper localhost:2181 \
      --replication-factor 1 --partitions 3 --topic HelloKafkaTopic1

在日食上 打开MultipleConsumersMain.java 右键单击 - >运行方式 - > Java应用程序或使用快捷方式:Alt + Shift + x,j启动main方法。

我的日食输出如下:

生成3条消息已
发送:消息0,分区:2,偏移量:21已
发送:消息3,分区:2,偏移量:22已
发送:消息1,分区:1,偏移量:24已
发送:消息4,分区:1,偏移: 25已
发送:消息2,分区:0,偏移量:21 
接收消息:消息2,分区:0,偏移量:21,按线程ID:13 
接收消息:消息1,分区:1,偏移量:24,按线程ID:14 
接收消息:消息4,分区:1,偏移量:25,通过ThreadID:14 
接收消息:消息0,分区:2,偏移量:21,通过ThreadID:15 
接收消息:消息3,分区:2,偏移:22,通过ThreadID:15

请注意,HelloKafkaTopci1 有3个分区。Kafka生产者客户端将消息0,3分配给分区#2,将消息1,4分配给分区#1,将消息2分配给分区0。

包含3个消费者的消费者群体,他们拥有自己的线程与ThreadID:13,14,15。分区#0的所有消息都被消费者线程使用:#13。分区#1的消息由消费者线程#1使用。并且消费者线程#15消耗了分区#2的消息。

请注意,您可能会获得不同的分区号和ThreadID。但是,在这种情况下,每个分区将由每个使用者线程处理。

3.5模型#2:单个消费者,多个工作者处理线程

3.5.1类图

 单个使用者,多个工作者处理线程 - 类图

单个使用者,多个工作者处理线程 - 类图

该部分的源代码也包括4个类:

NotificationProducerThread.java是一个生产者线程,向Kafka代理生成消息。

NotificationConsumer.java是一个消费者,它有一个后台线程池,从主题接收消息并发送到池。

ConsumerThreadHandler.java是一种工作线程,它处理从NotificationConsumer调度的消息的业务处理。

SingleConsumerMain .java。包括main方法,运行程序来生成和使用消息。

 

3.5.2  NotificationProducerThread.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.howtoprogram.kafka.singleconsumer;
 
import java.util.Properties;
 
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
 
public class NotificationProducerThread implements Runnable {
 
  private final KafkaProducer<String, String> producer;
  private final String topic;
 
  public NotificationProducerThread(String brokers, String topic) {
    Properties prop = createProducerConfig(brokers);
    this.producer = new KafkaProducer<String, String>(prop);
    this.topic = topic;
  }
 
  private static Properties createProducerConfig(String brokers) {
    Properties props = new Properties();
    props.put("bootstrap.servers", brokers);
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    return props;
  }
 
  @Override
  public void run() {
    System.out.println("Produces 5 messages");
    for (int i = 0; i < 5; i++) {
      String msg = "Message " + i;
      producer.send(new ProducerRecord<String, String>(topic, msg), new Callback() {
        public void onCompletion(RecordMetadata metadata, Exception e) {
          if (e != null) {
            e.printStackTrace();
          }
          System.out.println("Sent:" + msg + ", Offset: " + metadata.offset());
        }
      });
      try {
        Thread.sleep(100);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
 
    }
 
    // closes producer
    producer.close();
 
  }
}
这是生产者线程,它将向代理生成5条消息

3.5.3 ConsumerThreadHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.howtoprogram.kafka.singleconsumer;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
 
public class ConsumerThreadHandler implements Runnable {
 
  private ConsumerRecord consumerRecord;
 
  public ConsumerThreadHandler(ConsumerRecord consumerRecord) {
    this.consumerRecord = consumerRecord;
  }
 
  public void run() {
    System.out.println("Process: " + consumerRecord.value() + ", Offset: " + consumerRecord.offset()
        + ", By ThreadID: " + Thread.currentThread().getId());
  }
}
此线程处理从使用者分派的消息。在此示例中,它只是打印出消息,主题偏移和当前ThreadID。

3.5.4 NotificationConsumer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package com.howtoprogram.kafka.singleconsumer;
 
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
 
public class NotificationConsumer {
 
  private final KafkaConsumer<String, String> consumer;
  private final String topic;
  // Threadpool of consumers
  private ExecutorService executor;
 
  public NotificationConsumer(String brokers, String groupId, String topic) {
    Properties prop = createConsumerConfig(brokers, groupId);
    this.consumer = new KafkaConsumer<>(prop);
    this.topic = topic;
    this.consumer.subscribe(Arrays.asList(this.topic));
  }
 
  /**
   * Creates a {@link ThreadPoolExecutor} with a given number of threads to consume the messages
   * from the broker.
   *
   * @param numberOfThreads The number of threads will be used to consume the message
   */
  public void execute(int numberOfThreads) {
 
    // Initialize a ThreadPool with size = 5 and use the BlockingQueue with size =1000 to
    // hold submitted tasks.
    executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 0L, TimeUnit.MILLISECONDS,
        new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
 
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (final ConsumerRecord record : records) {
        executor.submit(new ConsumerThreadHandler(record));
      }
    }
  }
 
  private static Properties createConsumerConfig(String brokers, String groupId) {
    Properties props = new Properties();
    props.put("bootstrap.servers", brokers);
    props.put("group.id", groupId);
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("auto.offset.reset", "earliest");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    return props;
  }
 
  public void shutdown() {
    if (consumer != null) {
      consumer.close();
    }
    if (executor != null) {
      executor.shutdown();
    }
    try {
      if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
        System.out
            .println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
      }
    } catch (InterruptedException e) {
      System.out.println("Interrupted during shutdown, exiting uncleanly");
    }
  }
 
}
该类包含ConsumerThreadHandler线程的线程池。它从主题接收消息并为池分配处理处理程序。

3.5.5 SingleConsumerMain.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.howtoprogram.kafka.singleconsumer;
 
public final class SingleConsumerMain {
 
  public static void main(String[] args) {
 
    String brokers = "localhost:9092";
    String groupId = "group01";
    String topic = "HelloKafkaTopic";
    int numberOfThread = 3;
 
    if (args != null && args.length > 4) {
      brokers = args[0];
      groupId = args[1];
      topic = args[2];
      numberOfThread = Integer.parseInt(args[3]);
    }
 
    // Start Notification Producer Thread
    NotificationProducerThread producerThread = new NotificationProducerThread(brokers, topic);
    Thread t1 = new Thread(producerThread);
    t1.start();
 
    // Start group of Notification Consumer Thread
    NotificationConsumer consumers = new NotificationConsumer(brokers, groupId, topic);
 
    consumers.execute(numberOfThread);
 
    try {
      Thread.sleep(100000);
    } catch (InterruptedException ie) {
 
    }
    consumers.shutdown();
  }
}
入口点,包含运行示例的主要方法。它创建一个NotificationProducerThread 线程,该线程为主题生成5条消息:HelloKafkaTopic。然后,它创建一个NotificationConsumer 对象,该对象将从上述主题接收消息并分派到3 ConsumerThreadHandler线程池进行处理。

3.5.6。运行示例。

你可以在eclipse上打开SingleConsumerMain.java右键单击 - >运行方式 - > Java应用程序或使用快捷方式:Alt + Shift + x,j启动main方法。

我的日食输出如下:

生成5条消息已
发送:消息0,偏移量:2602已
发送:消息1,偏移量:2603 
进程:消息0,偏移量:2602,按线程ID:13 
进程:消息1,偏移量:2603,按线程ID:14已
发送:消息2,偏移量:2604 
进程:消息2,偏移量:2604,按线程ID:15已
发送:消息3,偏移量:2605 
进程:消息3,偏移量:2605,按线程ID:13已
发送:消息4,偏移量:2606 
进程:消息4,偏移量:2606,按ThreadID:14

生产者生成5条消息,其偏移量为2602~2606。这些消息被处理为具有Ids:13,14,15的线程池。

请注意,您可能会获得不同的偏移量,ThreadID(s)。

4。结论

我们已经了解了如何使用2种可能的模型创建多线程Apache Kafka使用者。他们有自己的利弊,取决于具体情况,我们将决定哪一个适合。也许,在某些情况下,模型#2是合适的。在这种情况下,每个消费者线程将处理主题的每个分区。但是,如果此分区的数量消息太多且消费者远远落后,我们可能需要将模型#1和模型#2结合起来。

以下是与Apache Kafka主题相关的文章。如果您对它们感兴趣,可以参考以下链接:

Apache Kafka教程

Apache Kafka 0.9入门

使用Apache Kafka Docker

Apache Kafka 0.9 Java客户端API示例

Apache Kafka命令行界面

如何在Apache Kafka中编写自定义序列化程序

编写Apache Kafka自定义分区程序

Apache Kafka Connect示例

Apache Kafka命令行界面

Apache Flume Kafka源码和HDFS接收器教程


只有注册用户登录后才能发表评论。


网站导航: