Spring Cloud Stream
# 1 简介
# 1.1 概述
Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。
Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。
通过使用 Spring Cloud Stream,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。
提示
目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka 的自动化配置。
# 2 快速搭建
我们通过一个简单的示例对 Spring Cloud Stream 有一个初步的认识。中间件使用 RabbitMQ,创建 spring-cloud-stream 模块。
# 2.1 引入依赖
编辑 pom.xml 文件,引入 Spring Cloud Stream 对 RabbitMQ 支持的 spring-cloud-starter-stream-rabbit 依赖,该依赖包是 Spring Cloud Stream 对 RabbitMQ 支持的封装,其中包含了对 RabbitMQ 的自动化配置等内容。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
2
3
4
5
6
7
8
9
10
11
12
# 3 开发指南
# 3.1 apollo 增加配置stream.yml
spring:
cloud:
stream:
bindings:
#输入通道名称,对应java代码InputInterface定义的名称
rabbit-mq-demo-test-input:
#通道主题名
destination: rabbit-mq-demo-test
contentType: application/json
#消费组名称, 多节点消费保证唯一
group: rabbit-mq-demo-test
#绑定的QM配置
binder: rabbit-test
consumer:
enable-auto-commit: false
auto-offset-reset: earliest
max-poll-records: 1000
concurrency: 6
rabbit-mq-demo-test-output:
destination: rabbit-mq-demo-test
contentType: application/json
group:
binder: rabbit-test
consumer:
enable-auto-commit: false
auto-offset-reset: earliest
max-poll-records: 1000
concurrency: 6
demo-test-input:
destination: demo-test
contentType: application/json
group: demo-test
binder: kafka-test
consumer:
enable-auto-commit: false
auto-offset-reset: earliest
max-poll-records: 1000
concurrency: 6
demo-test-output:
destination: demo-test
contentType: application/json
group: demo-test
binder: kafka-test
consumer:
enable-auto-commit: false
auto-offset-reset: earliest
max-poll-records: 1000
concurrency: 6
#默认配置, 当存在多个配置事必须配置, 否则获取不到MQ配置
default-binder: rabbit-test
binders:
#RabbitMQ配置
rabbit-test:
type: rabbit
environment:
spring:
rabbitmq:
host: xxx.xxx.xxx.xxx
port: 5672
username: guest
password: guest
virtual-host: /
#Kafka配置
kafka-test:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: xxx.xxx.xxx.xxx:9092
auto-add-partitions: true
auto-create-topics: true
min-partition-count: 1
replication-factor: 3
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# 3.2: SpringBoot项目启动类,添加注解SpringBootApplication,EnableApolloConfig
@EnableApolloConfig({"stream.yml"})
@SpringBootApplication(exclude = {RabbitAutoConfiguration.class})
public class PlatformDemoApplication {
public static void main(String[] args) {
SpringApplication.run(PlatformDemoApplication.class, args);
}
}
2
3
4
5
6
7
8
9
屏蔽Rabbit org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect 错误
# 3.3: 创建消息通道绑定的接口
创建 InputInterface 接口,通过 @Input 注解定义输入通道和输出通道,另外,@Input 注解都还有一个 value 属性,该属性可以用来设置消息通道的名称,
RabbitMQ: rabbit-mq-demo-test-input
Kafka: demo-test-input
@Component
public interface InputInterface {
//RabbitMQ接收者通道
String RABBIT_MQ_DEMO_TEST_INPUT = "rabbit-mq-demo-test-input";
//Kafka接收者通道
String KAFKA_DEMO_TEST_INPUT = "demo-test-input";
@Input(RABBIT_MQ_DEMO_TEST_INPUT)
SubscribableChannel rabbitMQInput();
@Input(KAFKA_DEMO_TEST_INPUT)
SubscribableChannel kafkaSendInput();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
创建 OutputInterface接口,通过@Output 注解定义输入通道和输出通道,另外@Output 注解都还有一个 value 属性,该属性可以用来设置消息通道的名称,
RabbitMQ: rabbit-mq-demo-test-output
Kafka: demo-test-output
@Component
public interface OutputInterface {
//RabbitMQ接收者通道
String RABBIT_MQ_DEMO_TEST_INPUT = "rabbit-mq-demo-test-output";
//Kafka接收者通道
String KAFKA_DEMO_TEST_INPUT = "demo-test-output";
@Output(RABBIT_MQ_DEMO_TEST_INPUT)
SubscribableChannel rabbitMQSendMessage();
@Output(KAFKA_DEMO_TEST_INPUT)
SubscribableChannel kafkaSendMessage();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 3.4 发送消息
创建测试消息实体MessageDTO
@Data
public class MessageDTO {
/**
* ID
*/
private Integer id;
/**
* 编码
*/
private String code;
/**
* 名称
*/
private String name;
/**
* 模块名称
*/
private String module;
/**
* 操作类型
*/
private String operation;
/**
* 冗余字段
*/
private String json;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
创建MqMessageService接口以及MqServiceImpl实现类
public interface MqMessageService {
boolean sendRabbitMqMessage(MessageDTO dto);
boolean sendKafkaMessage(MessageDTO dto);
}
@Slf4j
@Service
@EnableBinding(value = {OutputInterface.class})
public class MqMessageServiceImpl implements MqMessageService {
@Autowired
private OutputInterface outputInterface;
@Override
public boolean sendRabbitMqMessage(MessageDTO dto) {
Message message = MessageBuilder.withPayload(dto).build();
return outputInterface.rabbitMQSendMessage().send(message);
}
@Override
public boolean sendKafkaMessage(MessageDTO dto) {
Message message = MessageBuilder.withPayload(dto).build();
return outputInterface.kafkaSendMessage().send(message);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 3.5 接收消息
创建监听器InputStreamListener
@Slf4j
@EnableBinding(InputInterface.class)
@Component
public class InputStreamListener {
@StreamListener(value = InputInterface.RABBIT_MQ_DEMO_TEST_INPUT)
public void showRabbitMQMessage(@Payload MessageDTO dto) {
log.info("showRabbitMQMessage message :[{}]", dto);
}
@StreamListener(value = InputInterface.KAFKA_DEMO_TEST_INPUT)
public void showKafkaMessage(@Payload MessageDTO dto) {
log.info("showKafkaMessage message:{}", dto);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 3.6 接口测试
1.创建MQController
@Api(value = "MQController", tags = "MQ测试")
@RequestMapping("/mqDemo")
@RestController
@Slf4j
public class MQController {
@Autowired
private MqMessageService mqMessageService;
@ApiOperation(value = "发送RabbitMQ消息", notes = "发送RabbitMQ消息")
@PostMapping("sendRabbitMqMessage")
public Result<Boolean> sendRabbitMqMessage(@Valid @RequestBody MessageDTO messageDTO) {
return Result.success(mqMessageService.sendRabbitMqMessage(messageDTO));
}
@ApiOperation(value = "发送Kafka消息", notes = "发送Kafka消息")
@PostMapping("sendKafkaMessage")
public Result<Boolean> sendKafkaMessage(@Valid @RequestBody MessageDTO messageDTO) {
return Result.success(mqMessageService.sendKafkaMessage(messageDTO));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2.Postman测试
RabbitMq
发送接口:http://localhost:8080/demoapi/mqDemo/sendRabbitMqMessage
测试数据
{
"id":1,
"code":"code",
"name":"name",
"module":"module",
"operation":"INSERT",
"json":"发送RabbitMQ数据"
}
2
3
4
5
6
7
8
9
监听器接收RabbitMQ数据
c.y.p.d.m.l.InputStreamListener - showRabbitMQMessage message :[MessageDTO(id=1, code=code, name=name, module=module, operation=INSERT, json=发送RabbitMQ数据)]
Kafka
发送接口:http://localhost:8080/demoapi/mqDemo/sendKafkaMessage
{
"id":2,
"code":"code",
"name":"name",
"module":"module",
"operation":"INSERT",
"json":"发送Kafka数据"
}
2
3
4
5
6
7
8
9
监听器接收Kafka数据
INFO c.y.p.d.m.l.InputStreamListener - showKafkaMessage message:MessageDTO(id=2, code=code, name=name, module=module, operation=INSERT, json=发送Kafka数据)