Canal数据同步
提示
Canal + Kafka + Mysql 数据同步方案示例。
# 1 MySQL开启binlog
查看MySQLs是否开启了binlog及binlog-format是否ROW模式。
-- 查看数据库版本
select version();
-- 显示OFF未开启 ON开启
show variables like ‘log_bin’
-- binlog_format 有三种:ROW,STATEMENT,MIXID
show variables like 'binlog_format';
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
MySQL安装目录下修改my.ini。
-- 在mysqld下面添加
log_bin=mysql-bin
binlog-format=ROW
server-id=1
-- 进入命令行重启mysql
停止 net stop mysql57
启动 net start mysql57
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
赋予canal用户复制权限。
-- 创建用户
create user 'canal'@'%' identified by '123456';
-- REPLICATION CLIENT
-- REPLICATION SLAVE
-- 复制相关。一般复制账号需要这两个权限。
grant select,replication slave, replication client on *.* to 'canal'@'%';
-- 刷新权限
FLUSH PRIVILEGES;
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# 2 安装部署Canal
提示
canal.deployer-1.1.5.tar.gz 安装包已先上传到 / 根目录上。
可通过wget方式下载:
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
# 创建目录
mkdir /louis/canal-1.1.5
# 解压
tar -zvxf canal.deployer-1.1.5.tar.gz -C /louis/canal-1.1.5
1
2
3
4
2
3
4
解压后目录如下
- bin # 运维脚本文件
- conf # 配置文件目录
canal_local.properties # canal本地配置,一般不需要改动
canal.properties # canal服务配置
logback.xml # logback日志配置
metrics # 度量统计配置
spring # spring-实例配置,主要和binlog位置计算、一些策略配置相关,可以在canal.properties选用其中的任意一个配置文件
example # 实例配置文件夹,一般认为单个数据库对应一个独立的实例配置文件夹
instance.properties # 实例配置,一般指单个数据库的配置
- lib # 服务依赖包
- logs # 日志文件输出目录
- plugin # 支持的插件目录
connector.kafka-1.1.5-jar-with-dependencies.jar #kafka依赖包
connector.rabbitmq-1.1.5-jar-with-dependencies.jar #rabbitmq依赖包
connector.rocketmq-1.1.5-jar-with-dependencies.jar #rocketmq依赖包
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 3 启动Zookeeper和Kafka
/louis/zookeeper-3.5.9/bin/zkServer.sh start
/louis/kafka_2.8.1/bin/kafka-server-start.sh /louis/kafka_2.8.1/config/server.properties &
/louis/kafka_2.8.1/bin/kafka-server-start.sh /louis/kafka_2.8.1/config/server-1.properties &
/louis/kafka_2.8.1/bin/kafka-server-start.sh /louis/kafka_2.8.1/config/server-2.properties &
1
2
3
4
5
2
3
4
5
启动Zookeeper客户端查看Kafka启动情况
/louis/zookeeper-3.5.9/bin/zkCli.sh
# 进入客户端
[zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
[0, 1, 2]
1
2
3
4
5
2
3
4
5
Kafka新建同步用的topic
louis-topic
1
# 4 Canal配置文件修改
conf/canal.properties
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
# 配置要同步的kafka信息
kafka.bootstrap.servers = 172.16.227.132:9092,172.16.227.132:9093,172.16.227.132:9094
1
2
3
4
2
3
4
conf/example/instance.properties
# 值改成安装mysql服务器的ip及端口号
canal.instance.master.address=127.0.0.1:3306
# 前面新建的数据库备份账号canal及其密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=123456
# 配置要同步的kafka信息
canal.mq.topic=louis-topic
canal.mq.partition=0
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 5 启动Canal
sh bin/startup.sh
1
# 6 测试数据同步
编写Kafka消费服务
package com.kafka.kafkaDemo;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class MsgConsumer {
private final static String TOPIC_NAME = "louis-topic";
private final static String CONSUMER_GROUP_NAME = "testGroup";
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.16.227.132:9092,172.16.227.132:9093,172.16.227.132:9094");
// 消费分组名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
/*
consumer给broker发送心跳的间隔时间,broker接收到心跳如果此时有rebalance发生会通过心跳响应将
rebalance方案下发给consumer,这个时间可以稍微短一点
*/
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
/*
服务端broker多久感知不到一个consumer心跳就认为他故障了,会将其踢出消费组,
对应的Partition也会被重新分配给其他consumer,默认是10秒
*/
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
//一次poll最大拉取消息的条数,如果消费者处理速度很快,可以设置大点,如果处理速度一般,可以设置小点
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
/*
如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,
会将其踢出消费组,将分区分配给别的consumer消费
*/
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
/*
* poll() API 是拉取消息的长轮询
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
Mysql插入测试数据
INSERT INTO `user`
(id, name)
VALUES(8, '888');
1
2
3
2
3
Kafka消费端收到消息
收到消息:partition = 0,offset = 11, key = null, value =
{
"data":[
{
"id":"8",
"name":"888"
}
],
"database":"datatest",
"es":1653296022000,
"id":84,
"isDdl":false,
"mysqlType":{
"id":"int(11)",
"name":"varchar(50)"
},
"old":null,
"pkNames":[
"id"
],
"sql":"",
"sqlType":{
"id":4,
"name":12
},
"table":"user",
"ts":1652899613492,
"type":"INSERT"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Mysql新增列
ALTER TABLE datatest.`user` ADD sex varchar(2) NULL;
1
Kafka消费端收到消息
收到消息:partition = 0,offset = 12, key = null, value =
{
"data":null,
"database":"datatest",
"es":1653296552000,
"id":85,
"isDdl":true,
"mysqlType":null,
"old":null,
"pkNames":null,
"sql":"/* ApplicationName=DBeaver Ultimate 21.3.0 - SQLEditor <Script-9.sql> */ ALTER TABLE datatest.`user` ADD sex varchar(2) NULL",
"sqlType":null,
"table":"user",
"ts":1652900187875,
"type":"ALTER"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 7 安装包下载
百度网盘
https://pan.baidu.com/s/1pvGbnkMpdqs3ICMpMsrVow (opens new window)
提取码 : cr1x
上次更新: 2022/11/24, 17:59:25