posts - 22, comments - 32, trackbacks - 0, articles - 73
  BlogJava :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理
ringboot3+springcloudstream4.x配置集成

springcloudstream3.X 后逐渐淘汰了 @input @output @EnableBinding 这些注解 到4.X后这个注解都没有了,全部转向function 方式(关于function可以了解下)

组件版本:springboot 3、 springcloud 2022.0.0、springcloudstream4.0.0

maven依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
application.properties:
# 应用服务 WEB 访问端口
server.port=8080
#配置stream的binder,这里可以更换其它的消息中间件连接配置信息(ciicbinder 名称)
spring.cloud.stream.binders.ciicbinder.type=rabbit
spring.cloud.stream.binders.ciicbinder.environment.spring.rabbitmq.host=10.82.26.126
spring.cloud.stream.binders.ciicbinder.environment.spring.rabbitmq.port=8094
spring.cloud.stream.binders.ciicbinder.environment.spring.rabbitmq.username=admin
spring.cloud.stream.binders.ciicbinder.environment.spring.rabbitmq.password=admin
spring.cloud.stream.binders.ciicbinder.environment.spring.rabbitmq.virtual-host=/
#配置使用默认的binder的配置连接信息,如果配置多个binders,可以设置默认的值,也可以在生产者和消息者binding 中设置
spring.cloud.stream.default-binder=ciicbinder

spring.cloud.function.definition=mybinder1;mybinder
spring.cloud.stream.bindings.mybinder.producer.requiredGroups=test_topic
#如果应用中只有一个消息中间件需要配置,设置一个默认完事,项目中如果有两种(Rabbit,kafka)可以指定用那个
#spring.cloud.stream.bindings.mybinder.binder=ciicbinder
spring.cloud.stream.bindings.mybinder1-out-0.destination=zzz_test
spring.cloud.stream.bindings.mybinder1-out-0.content-type=application/json


#下边是Consumer配置
#mybinder是自定义名称-in-0 是固定写法 zzz_test 这个是通道名称(如果是Rabbit是对应的exchange)
spring.cloud.stream.bindings.mybinder-in-0.destination=zzz_test
spring.cloud.stream.bindings.mybinder-in-0.content-type=application/json
#消息者组(如果Rabbit对应topic会zzz_test.test1_topic创建topic)
spring.cloud.stream.bindings.mybinder-in-0.group=test1_topic
#消息者应用启动后自动监听消息(默认值:true)
spring.cloud.stream.bindings.mybinder-in-0.consumer.auto-startup=true
#以下是关于Rabbit私有特色设置
#设置死信队列(默认值true)
spring.cloud.stream.rabbit.bindings.mybinder.consumer.auto-bind-dlq=true
#手动ACK提交消息
spring.cloud.stream.rabbit.bindings.mybinder.consumer.acknowledge-mode=MANUAL


解释下其中 mybinder1 是自定义名称,如果希望手动向生产者mybinder1 不要使用自定义成 mybinder1-out-0 形式(这是fuction形式),fuction 大多场景是流式方式,企业线应用使用的不多。

生产者和消息者配置:
package com.example.demo.demos.web;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.function.Consumer;
import java.util.function.Supplier;


@Component
@Configuration
public class ConmerMQ {
// 如果手动发送消息,要把代码注释掉,这是fuction形式自动发消息 mybinder1 名称要和funtion名称一致。
/* @Bean
public Supplier<Message<String>> mybinder1(){
return ()->{
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int s=0;
System.out.println("生产者 产生消息-data="+"dfafdafd"+(s++));
return MessageBuilder.withPayload("message product -mybinder1="+"dfafdafd").build();
};
}*/

@Bean
public Consumer<Message<String>> mybinder() {
return data -> {
String ss = data.getPayload();
System.out.println("================consumer message=" + ss);
};
}
}
测试代码:

 @Autowired
private StreamBridge streamBridge;

// http://127.0.0.1:8080/hello?name=lisi
@RequestMapping("/hello")
@ResponseBody
public String hello(@RequestParam(name = "name", defaultValue = "unknown user") String name) {
JSONObject jsonObject=new JSONObject();
jsonObject.put("name",name);
//bindingName:如果是propertis中一到致
//spring.cloud.stream.bindings.mybinder1-out-0.destination=zzz_test 如果是fuction形式要配置mybinder1-out-0
//spring.cloud.stream.bindings.mybinder1.destination=zzz_test 如果不是fuction形式要配置 mybinder1
boolean flag=streamBridge.send("mybinder1-out-0",jsonObject.toJSONString());
return "flag=" + flag;
}
备注组件升级后都基本JDK17 工程是多model的话 开发工具比如idea2019 版本最高支持JDK13 可以统一编译水平到17,然后把POM编译设置到17,maven插件版升级到支持17 


在把工程各model language level 设置成 project default(如果idea 版低的话下拉框中选择不到17)

只有注册用户登录后才能发表评论。


网站导航: