canal小试牛刀第一篇:canal-server+kafka根据mysql binlog同步数据

canal小试牛刀第一篇:canal-server+kafka根据mysql binlog同步数据,第1张

canal小试牛刀第一篇:canal-server+kafka根据mysql binlog同步数据

github项目直达

1. 前情提要

直接监听mysql的binlog同步数据可以对业务无侵入。数仓搭建必备利器。

2. 准备工作
    了解canal-server了解kafka了解Canal Kafka了解mysql主从了解docker、docker-composecanal配置详解
3. 配置相关 mysql

保存以下配置至./mysql/conf.d/slave.cnf

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1

instance

容器中复制/home/admin/canal-server/conf/example/instance.properties 并 保存至./canal-server/conf/example/instance.properties 然后对应如下配置对文件内容进行修改

#  按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=canal-db:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = root
canal.instance.dbPassword = secret
...
# mq config
canal.mq.topic=example
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\..*,.*\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################
canal

在容器中复制/home/admin/canal-server/conf/canal.properties 并 保存至./canal-server/conf/canal.properties 然后对应如下配置对文件内容进行修改

# ...
# 可选项: tcp(默认), kafka, RocketMQ
canal.serverMode = kafka
# ...
kafka.bootstrap.servers = canal-kafka:9092

# 指定同步的数据表
# table regex
# test 和 test_base 数据库
# test_user 和 base_user 数据表
# 具体其他规则 仔细阅读准备工作中的 `canal配置详解`
canal.instance.filter.regex=test.test_user,test_base.base_user
docker-compose
version: "3"
services:
  redis:
      image: "redis:alpine"
      container_name: canal-redis
      command: redis-server --appendonly yes
      ports:
        - "6375:6379"
      volumes:
        - ./redis/data:/data:rw
      networks:
        - canal-test
  db:
      platform: linux/amd64
      image: mysql:latest
      container_name: canal-db
      command: --default-authentication-plugin=mysql_native_password
      restart: always
      environment:
        MYSQL_ROOT_PASSWORD: secret
      ports:
        - 3310:3306
      volumes:
        - ./mysql/conf.d/slave.cnf:/etc/mysql/conf.d/slave.cnf
        - ./mysql/mysql/data:/var/lib/mysql:rw
        - ./local:/root:rw    
      networks:
        - canal-test
      
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    container_name: canal-zk
    ports:
      - '2188:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
      # - ZOO_ENABLE_AUTH=yes
      # - ZOO_SERVER_USERS=kafka
      # - ZOO_SERVER_PASSWORDS=kafka_password
    networks:
      - canal-test
  kafka:
    image: 'bitnami/kafka:latest'
    container_name: canal-kafka
    ports:
      - '9098:9092'
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://canal-kafka:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://canal-kafka:9092
      - KAFKA_CFG_ZOOKEEPER_ConNECT=canal-zk:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      # - KAFKA_CFG_LISTENERS=SASL_SSL://:9092
      # - KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://:9092
      # - KAFKA_ZOOKEEPER_USER=kafka
      # - KAFKA_ZOOKEEPER_PASSWORD=kafka_password
      # - KAFKA_CLIENT_USER=user
      # - KAFKA_CLIENT_PASSWORD=password
      # - KAFKA_CERTIFICATE_PASSWORD=certificatePassword123
    depends_on:
      - zookeeper
    networks:
      - canal-test
  canal:
    image: canal/canal-server:latest
    container_name: canal-server          # 容器名称
    restart: always                 # 失败自动重启策略
    volumes:
      - ./canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties
      - ./canal-server/conf/example/instance.properties:/home/admin/canal-server/conf/example/instance.properties
   
    depends_on:
      - redis
      - kafka
      - db 
      - zookeeper
    networks:
      - canal-test
networks:
  canal-test:
    driver: bridge
可能用到的命令 mysql

查看binlog情况

show variables like 'binlog_format';

show variables like 'log_bin';

查看master情况

show master status;

创建数据库

CREATE DATAbase `test` CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;

创建表

use test;
CREATE TABLE `test_user` (  `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) NOT NULL DEFAULT '', `phone` varchar(255) NOT NULL DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

插入

INSERT INTO `test_user`(`id`, `name`, `phone`) VALUES (null, 'name1', '18890901234');

更新

update test_user set name="name1-1" where id=1;

删除

delete from test_user where id=1;
kafka

查看所有topic

kafka-topics.sh --bootstrap-server canal-kafka:9092 --list 

创建topic

kafka-topics.sh --bootstrap-server canal-kafka:9092 --create --partitions 1 --replication-factor 1 --topic mysql_test

生产

kafka-console-producer.sh --bootstrap-server canal-kafka:9092 --topic mysql_test

消费

kafka-console-consumer.sh --bootstrap-server canal-kafka:9092 --from-beginning --topic mysql_test

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/zaji/5711230.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存