Posted on 2018-11-30 16:05
为自己代言 阅读(4176)
评论(0) 编辑 收藏 所属分类:
消息中间件
以前一直没有接触过kafka 消息中间件,现在公司要用它来做消息服务(sub/pub),安装都不多说了 主要是开发的时候遇到问题和解决方法:
版本: zookeeper-3.4.12.tar.gz kafka_2.12-2.1.0.tgz 连接工具: kafkatool_64bit.exe 集成: spring boot
pom.xml:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
程序就集成:
@Bean
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE, proxyMode = ScopedProxyMode.TARGET_CLASS)
public KafkaConsumer<String, String> getKafkaConsumer() {
return new KafkaConsumer<String, String>(consumerConfigs());
}
问题就在这里 KafkaConsumer 是让spring IOC来管理,刚刚开始只有@Bean 生成的对象实例就只有一个,但是在启动线程消息的时候只能一个对象一个线程,如果一个对象在启用线程去消费会报
KafkaConsumer is not safe for multi-threaded access
解决办法:
1.线程与KafkaConsumer对象实例的对应关系是1:1
2.要保证线程与KafkaConsumer对象的关系是固定不变的,也就是说,一个线程始终都只能操作同一个KafkaConsumer对象且一个KafkaConsumer对象始终是由同一个线程来操作的 所以在 @Bean 又加了 @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE, proxyMode = ScopedProxyMode.TARGET_CLASS) 来每一次用生成一个新实例对象
2:问题
线程与KafkaConsumer对象实例的对应关系是1:1 ,但订阅的对对象 和线程使用poll KafkaConsumer 对象又会发生变化导致监听消费报错
Consumer is not subscribed to any topics or assigned any partitions,为什么会报没有定阅呢,明明已经定阅了
解决办法不要让spring IOC 来管理
KafkaConsumer 生成实例对象 使用new 方式生成。
看来了解下原理是很重要的以下是比较不错的文章(里边还有多线程消费源码和原理讲解)