posts - 22, comments - 32, trackbacks - 0, articles - 73
  BlogJava :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理

kafka 开发遇到的坑解决方法

Posted on 2018-11-30 16:05 为自己代言 阅读(4183) 评论(0)  编辑  收藏 所属分类: 消息中间件
以前一直没有接触过kafka 消息中间件,现在公司要用它来做消息服务(sub/pub),安装都不多说了 主要是开发的时候遇到问题和解决方法:

版本: zookeeper-3.4.12.tar.gz  kafka_2.12-2.1.0.tgz  连接工具: kafkatool_64bit.exe   集成: spring boot 

pom.xml:

         <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>1.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.1.0</version>
        </dependency>

程序就集成:

    @Bean
    @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE, proxyMode = ScopedProxyMode.TARGET_CLASS)
    public KafkaConsumer<String, String> getKafkaConsumer() {
        return new KafkaConsumer<String, String>(consumerConfigs());
    }

 问题就在这里  KafkaConsumer 是让spring IOC来管理,刚刚开始只有@Bean 生成的对象实例就只有一个,但是在启动线程消息的时候只能一个对象一个线程,如果一个对象在启用线程去消费会报  KafkaConsumer is not safe for multi-threaded access

解决办法:
1.线程与KafkaConsumer对象实例的对应关系是1:1
2.要保证线程与KafkaConsumer对象的关系是固定不变的,也就是说,一个线程始终都只能操作同一个KafkaConsumer对象且一个KafkaConsumer对象始终是由同一个线程来操作的 所以在 @Bean 又加了  @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE, proxyMode = ScopedProxyMode.TARGET_CLASS) 来每一次用生成一个新实例对象 

2:问题 线程与KafkaConsumer对象实例的对应关系是1:1  ,但订阅的对对象 和线程使用poll  KafkaConsumer 对象又会发生变化导致监听消费报错
    Consumer is not subscribed to any topics or assigned any partitions,为什么会报没有定阅呢,明明已经定阅了
 
   解决办法不要让spring IOC 来管理KafkaConsumer 生成实例对象  使用new 方式生成。

看来了解下原理是很重要的以下是比较不错的文章(里边还有多线程消费源码和原理讲解)





只有注册用户登录后才能发表评论。


网站导航: