Sam's Notes | Sam Blog

梦想还是要有的,万一实现了呢

0%

canal 同步 mysql 到 oracle

主要内容

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费.

版本:
canal 1.1.6
MySQL 8.0 +
oracle

更新历史


canal 同步 mysql 到 oracle
主要步骤

  • mysql 配置 binlog
  • canal.deployer
  • canal.adapter

mysql 配置 binlog

  • 开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

    1
    2
    3
    4
    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

    执行成功可登陆MySQL中查看

    1
    2
    show variables like '%log_format%';
    show variables like '%log_bin%';
  • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

    1
    2
    3
    4
    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;

canal.deployer

将 canal.deployer的压缩包安装到 /data/canal/canal.deployer/ 目录下,
主要配置 conf/canal.properties
conf/example/instance.properties

配置

  • conf/canal.properties

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11

    # ----------- canal服务id,目前没有实际意义
    canal.id = 11
    #canal服务socket监听端口,代码中连接canal-server时,使用此段口连接
    canal.port = 11111
    canal.metrics.pull.port = 11112
    # ----------- zookeeper服务地址端口
    canal.zkServers =192.168.153.6:12181
    #表示实例的配置文件instance.properties地址, adapter里一致
    canal.destinations = example

  • conf/example/instance.properties
    xx都需要根据实际替换

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    #  ----------- 监听的数据库地址  修改成自己的
    canal.instance.master.address=xx.xx.xx.xx:3306
    canal.instance.master.journal.name=
    canal.instance.master.position=
    canal.instance.master.timestamp=
    canal.instance.master.gtid=

    # rds oss binlog
    canal.instance.rds.accesskey=
    canal.instance.rds.secretkey=
    canal.instance.rds.instanceId=

    # table meta tsdb info
    canal.instance.tsdb.enable=true
    #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
    #canal.instance.tsdb.dbUsername=canal
    #canal.instance.tsdb.dbPassword=canal

    #canal.instance.standby.address =
    #canal.instance.standby.journal.name =
    #canal.instance.standby.position =
    #canal.instance.standby.timestamp =
    #canal.instance.standby.gtid=

    # username/password
    # ----------- 监听的数据库账号密码 第一步 mysql中增加的 MySQL slave 权限的账户
    canal.instance.dbUsername=canalxx
    canal.instance.dbPassword=canalxxxxxx
    canal.instance.connectionCharset = UTF-8
    # enable druid Decrypt database password
    canal.instance.enableDruid=false
    #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

    # table regex
    #canal.instance.filter.regex=.*\\..* 1.1.6 直接用这个配置会出现异常: Table 'xxxxx.BASE TABLE' doesn't exist, sqlState=42S02 ------------------ 可 改成 canal.instance.filter.regex=.\..
    # 1) ----------- 配置监听的数据库 数据表 监听几个 配置几个
    # canal.instance.filter.regex=xx.t_test1xx,xx2.te
    # 2) -----------或者可以加上自己指定的数据库, dbname 替换成实际数据库
    canal.instance.filter.regex=dbname\\..*

    # table black regex
    canal.instance.filter.black.regex=mysql\\.slave_.*

    # mq config
    # 配置消息主题
    canal.mq.topic=example
  • 清理
    修改配置后, 注意清除 /conf/example目录下 meta.bat

启动

启动 sh bin/startup.sh
关闭 sh bin/stop.sh
重启 sh bin/restartup.sh

启动成功后,类似如下日志
logs/canal/canal.log

1
2
3
[main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
[main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.153.4(192.168.153.4):11111]
[main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

logs/example/example.log

1
2
3
4
5
6
7
[main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
[main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^rdp.rcp_collect_result_data$|^rdp.rcp_collect_data$
[main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
[main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
[destination = example , address = /xx.xx.xx.xx:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
[destination = example , address = /xx.xx.xx.xx:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
[destination = example , address = /xx.xx.xx.xx:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.001094,position=13581,serverId=800,gtid=,timestamp=1658301624000] cost : 3789ms , the next step is binlog dump

canal.adapter

将 canal.adapter 的压缩包安装到 /data/canal/canal.adapter/ 目录下,
主要配置 conf/application.yml
conf/rdb 添加对应的数据表配置

如果不使用远程配置, 把 conf/bootstrap.yml 全体注掉

适配器配置

总配置文件 conf/application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# --- 服务器端口 , 不要和其它服务冲突
server:
port: 88081

canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效
zookeeperHosts: # 对应集群模式下的zk地址, 如果配置了canalServerHost, 则以canalServerHost为准
syncBatchSize: 1000 # 每次同步的批数量
retries: -1 # 重试次数, -1为无限重试
timeout: # 同步超时时间, 单位毫秒
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111 # -------- 对应单机模式下的canal server的ip:port
canal.tcp.zookeeper.hosts:192.168.153.6:12181 # -------- 对应集群模式下的zk地址, 如果配置了canalServerHost, 则以canalServerHost为准
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:

srcDataSources: # 源数据库
defaultDS: # 自定义名称, 必须和 out adapter 里 dataSourceKey 一致
url: jdbc:mysql://192.168.xx.xx:3306/touch-test?useUnicode=true # jdbc url
username: rootxx # jdbc 账号
password: 123xx # jdbc 密码

canalAdapters: # 适配器列表
- instance: example # canal 实例名或者 MQ topic 名
groups: # 分组列表
- groupId: g1 # 分组id, 如果是MQ模式将用到该值
outerAdapters: # 分组内适配器列表
- name: logger # 日志打印适配器
- name: rdb # conf/rdb 目录下 outerAdapterKey 对应
key: oracle1
properties:
jdbc.driverClassName: oracle.jdbc.OracleDriver
jdbc.url: jdbc:oracle:thin:@192.168.5.xx:1521:orcl1
jdbc.username: xxx
jdbc.password: 123456xx
- name: rdb
key: oracle2
properties:
jdbc.driverClassName: oracle.jdbc.OracleDriver
jdbc.url: jdbc:oracle:thin:@192.168.5.xxx:1521:orcl1
jdbc.username: xxxx
jdbc.password: 123xx456

适配器列表

logger适配器

最简单的处理, 将受到的变更事件通过日志打印的方式进行输出, 如配置所示, 只需要定义name: logger即可

1
2
outerAdapters:                        
- name: logger

rdb 输出 关系型数据库

目前内置支持的数据库列表:
MySQL
Oracle
PostgresSQL
SQLServer

理论上jdbc的数据库都可以。

RDB表映射文件

conf/rdb目录下, 对应上面的 key: oracle1, oracle2

  • test_oracle1

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    dataSourceKey: defaultDS  # 源数据源的key, 对应上面配置的srcDataSources中的值
    destination: example # cannal的instance或者MQ的topic
    groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
    outerAdapterKey: oracle1 # adapter key, 对应上面配置outAdapters中的key
    concurrent: true # 是否按主键hash并行同步, 并行同步的表必须保证主键不会更改及主键不能为其他同步表的外键!!
    dbMapping:
    database: touch-test # 源数据源的database/shcema
    table: t_auth_perm_m # 源数据源表名
    targetTable: covid.t_auth_perm_m # 目标数据源的库名.表名
    targetPk: # 主键映射
    xx_id: xx_id # 如果是复合主键可以换行映射多个
    # mapAll: true # 是否整表映射, 要求源表和目标表字段名一模一样 (如果targetColumns也配置了映射,则以targetColumns配置为准)
    targetColumns: # 字段映射, 格式: 目标表字段: 源表字段, 如果字段名一样源表字段名可不填
    id:
    perm_description:
    perm_name:
    # etlCondition: "where c_time>={}"
    commitBatch: 3000 # 批量提交的大小
    ## Mirror schema synchronize config
    #dataSourceKey: defaultDS
    #destination: example
    #groupId: g1
    #outerAdapterKey: mysql1
    #concurrent: true
    #dbMapping:
    # mirrorDb: true
    # database: mytest
  • test_oracle2

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    dataSourceKey: defaultDS
    destination: example
    groupId: g1
    outerAdapterKey: oracle2
    concurrent: true
    dbMapping:
    database: touch-test
    table: t_auth_role_m
    targetTable: covid.t_auth_role_m
    targetPk:
    id: id
    # mapAll: true
    targetColumns:
    id:
    role_description:
    role_name:
    role_type:
    # etlCondition: "where c_time>={}"
    commitBatch: 3000 # 批量提交的大小