canal+Kafka实现mysql与redis数据同步

canal+Kafka实现mysql与redis数据同步,第1张

前言

上篇文章简单介绍canal概念,本文结合常见的缓存业务去讲解canal使用。在实际开发过程中,通常都会把数据往redis缓存中保存一份,做下简单的查询优化。如果这时候数据库数据发生变更 *** 作,就不得不在业务代码中写一段同步更新redis的代码,但是这种 数据同步的代码和业务代码糅合在一起 看起来不是很优雅,而且还会出现数据不一致问题。那能不能把这部分同步代码从中抽离出来,形成独立模块呢?答案是肯定的,下面通过canal结合Kafka来实现mysql与redis之间的数据同步。

架构设计

通过上述结构设计图可以很清晰的知道用到的组件:MySQL、Canal、Kafka、ZooKeeper、Redis。

Kafka&Zookeeper搭建

首先在 官网 下载Kafka:

下载后解压文件夹,可以看到以下几个文件:

Kafka内部自带了zookeeper,所以暂不需要去下载搭建zookeeper集群,本文就使用Kafka自带zookeeper来实现。

通过上述zookeeper启动命令以及Kafka启动命令把服务启动,可以通过以下简单实现下是否成功:

Canal搭建

canal搭建具体可以参考上文,这里只讲解具体的参数配置:

找到/conf目录下的canal.properties配置文件:

然后配置instance,找到/conf/example/instance.properties配置文件:

经过上述配置后,就可以启动canal了。

测试

环境搭建完成后,就可以编写代码进行测试。

1、引入pom依赖

2、封装Redis工具类

在application.yml文件增加以下配置:

封装一个 *** 作Redis的工具类:

3、创建MQ消费者进行同步

创建一个CanalBean对象进行接收:

最后就可以创建一个消费者CanalConsumer进行消费:

测试Mysql与Redis同步

mysql对应的表结构如下:

启动项目后,新增一条数据:

可以在控制台看到以下输出:

如果更新呢?试一下Update语句:

同样可以在控制台看到以下输出:

经过测试完全么有问题。

总结

既然canal这么强大,难道就没缺点嘛?答案当然是存在的啦,比如:canal只能同步增量数据、不是实时同步而是准实时同步、MQ顺序问题等; 尽管有一些缺点,毕竟没有一样技术或者产品是完美的,最重要是合适。比如公司目前有个视图服务提供宽表搜索查询功能就是通过 同步Mysql数据到Es采用Canal+Kafka的方式来实现的。

private function loaderHandler(event:*):void {

switch(event.type) {

case Event.COMPLETE:

trace(_loader.data.result)

break

case Event.OPEN:

trace("open: " + event)

break

case ProgressEvent.PROGRESS:

trace("progress: " + event)

break

1、数据接入

通过kafka的restFul接口创建连接mysql的连接器并启动。

{

    "name": "mysql_stream_test",

    "config": {

        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",

        "timestamp.column.name": "",

        "incrementing.column.name": "ID",

        "connection.password": "",

        "validate.non.null": true,

        "tasks.max": 1,

        "batch.max.rows": 100,

        "table.whitelist": "baseqx.test_demo",

        "mode": "incrementing",

        "topic.prefix": "mysql_",

        "connection.user": "",

        "poll.interval.ms": 5000,

        "numeric.mapping": "best_fit",

        "connection.url": "jdbc:mysql://xxx.xxx.xxx.xxx:3306/baseqx?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true"

    }

}

2.kafka-connect创建主题中的默认数据格式为

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"NAME"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATE_TIME"}],"optional":false,"name":"test_demo"},"payload":{"ID":1,"NAME":"prestoEtl","CREATE_TIME":1606902182000}}

3.使用pyflink消费带schema的消息

#!/usr/bin/python3.7

# -*- coding: UTF-8 -*-

from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode

from pyflink.table import StreamTableEnvironment, TableConfig, SqlDialect

s_env = StreamExecutionEnvironment.get_execution_environment()

s_env.set_parallelism(1)

st_env = StreamTableEnvironment.create(s_env, TableConfig())

st_env.get_config().set_python_executable("python3")

st_env.use_catalog("default_catalog")

st_env.use_database("default_database")

# DML上可以固定schema为字符串, 用 ROW 函数封装 payload

ddlKafkaConn = """

create table sourceKafkaConn(

    `scheam`    STRING  comment 'kafkaConn每行模式',

    `payload`  ROW(ID BIGINT,NAME STRING,CREATE_TIME STRING)  comment '行数据'

)comment '从kafkaConnect获取带模式的数据'

with(

    'connector' = 'kafka',

    'topic' = 'mysql_test_demo',       

    'properties.bootstrap.servers' = '192.168.113.11:9092',

    'scan.startup.mode' = 'earliest-offset',

    'format' = 'json'

)

"""

# 'connector.startup-mode' = 'earliest-offset 表示读取最早的消息 | latest-offset 表示读取消息队列中最新的消息',

st_env.execute_sql(ddlKafkaConn)

sinkPrint = '''

    CREATE TABLE sinkPrint WITH ('connector' = 'print')

    LIKE sourceKafkaConn (EXCLUDING ALL)

'''

st_env.execute_sql(sinkPrint)

st_env.execute_sql("SHOW TABLES").print()

st_env.sql_query("select scheam,ROW(ID,NAME,CREATE_TIME) as payload from sourceKafkaConn") \

    .insert_into("sinkPrint")

st_env.execute("pyflink-kafka-v4")

4.执行

4.1pythonpyflink-kafka-v4.py

4.2flinkrun-mxxx.xxx.xxx.xxx:8081-pypyflink-kafka-v4.py

5.执行结果

+-----------------+|tablename|+-----------------

+|sinkPrint|

+|sourceKafkaConn|

+-----------------+

2 rowsinset

+I(null,1,prestoEtl,1606902182000)

+I(null,2,执行的非常好,1606902562000)

+I(null,3,使用flink解析topic的schema,1607070278000)


欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/zaji/5903355.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-03-07
下一篇 2023-03-07

发表评论

登录后才能评论

评论列表(0条)

保存