RabbitMQ配置备忘
# 1 配置示例
添加死信及延时队列时,需删除旧有交换机和队列
spring:
cloud:
stream:
bindings:
cainiao-big-bag-data-input:
destination: cainiao-big-bag-data
binder: mq-crs
group: crs-cainiao
consumer:
maxAttempts: 3
backOffInitialInterval: 10000
backOffMaxInterval: 200000
backOffMultiplier: 3.0
# 回传配载预报信息更新队列
test-callback-stowage_forecast-output:
destination: test-callback-stowage_forecast
binder: mq-crs
test-callback-stowage_forecast-input:
destination: test-callback-stowage_forecast
binder: mq-crs
group: test-callbackstowageForecast
# 回传承运重量信息队列
test-weight-output:
destination: test-weight
binder: mq-crs
test-weight-input:
destination: test-weight
binder: mq-crs
group: test-callbackWeight
rabbit:
bindings:
cainiao-big-bag-data-input:
consumer:
concurrency: 4
max-concurrency: 8
prefetch: 10
auto-bind-dlq: true
republish-to-dlq: true
test-callback-stowage_forecast-input:
consumer:
concurrency: 4
max-concurrency: 8
prefetch: 10
auto-bind-dlq: true
republish-to-dlq: true
test-weight-output:
producer:
delayedExchange: true
test-weight-input:
consumer:
delayedExchange: true
concurrency: 4
max-concurrency: 8
prefetch: 10
auto-bind-dlq: true
republish-to-dlq: true
defaultBinder: mq-crs
binders:
mq-crs:
type: rabbit
environment:
spring:
rabbitmq:
addresses: xxxx.xx.xx.xxx:5672
username: admin
password: 123456
virtual-host: /yl-crs-cn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# 2 延时队列
发送端
@Component
public interface TestOutputSource {
String TIKTOK_WEIGHT_OUTPUT = "test-weight-output";
@Output(TIKTOK_WEIGHT_OUTPUT)
MessageChannel testWeightOutput();
}
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
@Service
@Slf4j
@EnableBinding(value = {TestOutputSource.class})
public class YlCrsCallbackFlightInfoServiceImpl implements IYlCrsCallbackFlightInfoService {
@Resource
private TestOutputSource testOutputSource;
public void sendMes() {
//正常发送
TransportInfoDTO transportInfoDTO = buildTransportInfoDTO(bigBagConfirmDTO);
testOutputSource.transportDataOutput().send(MessageBuilder.withPayload(transportInfoDTO).build());
//延迟10分钟发送
TestWeightDTO testWeightDTO = buildTestWeightDTO(bigBagConfirmDTO);
Message message = MessageBuilder.withPayload(testWeightDTO).setHeader("x-delay", 600000).build();
testOutputSource.testWeightOutput().send(message);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
消费端
public interface TestInputSource {
String TIKTOK_WEIGHT_INPUT = "test-weight-input";
@Input(TIKTOK_WEIGHT_INPUT)
MessageChannel testWeightInput();
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
@Slf4j
@EnableBinding(TestInputSource.class)
public class CallbackTestListener {
@StreamListener(value = TestInputSource.TIKTOK_WEIGHT_INPUT)
public void callbackWight(@Payload Message<TestWeightDTO> message) {
TestWeightDTO request = message.getPayload();
}
}
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
上次更新: 2022/11/24, 17:59:25