Windows安装MySQL服务并进行binlog解析

Windows安装MySQL服务并进行binlog解析,第1张

1.官网下载 MySQL Community Server

MySQL :: Download MySQL Community Server

这里下载的是Windows (x86, 64-bit), ZIP Archive mysql-8.0.29-winx64.zip

2.下载完成后将ZIP解压到 D:\Tools\mysql-8.0.29-winx64\

3.初始化

命令行执行 mysqld --initialize

此时会在 D:\Tools\mysql-8.0.29-winx64\目录下生成data目录

4.安装MySQL服务

命令行执行 mysqld --install

5.启动MySQL服务

命令行执行 net start mysql

6.查看MySQL默认密码

  D:\Tools\mysql-8.0.29-winx64\data 目录下生成了一个.err文件,其中打印了密码

 此时使用 root/XZhgyCLcg7?t即可登录MySQL

7.配置文件

在D:\Tools\mysql-8.0.29-winx64新建my.ini文件

[mysql]
# 设置mysql客户端默认字符集
default-character-set=utf8
[mysqld]
#设置3306端口
port = 3306
# 设置mysql的安装目录
basedir=D:\Tools\mysql-8.0.29-winx64
# 设置mysql数据库的数据的存放目录
datadir=D:\Tools\mysql-8.0.29-winx64\data
# 允许最大连接数
max_connections=200
# 服务端使用的字符集默认为8比特编码的latin1字符集
character-set-server=utf8
# 创建新表时将使用的默认存储引擎
default-storage-engine=INNODB

#开启binlog日志
log_bin=ON
#设置日志三种格式:STATEMENT、ROW、MIXED 。
binlog_format=ROW
#配置serverid
server-id=1
#设置binlog清理时间
expire_logs_days=7
#binlog每个日志文件大小
max_binlog_size=100m
#binlog缓存大小
binlog_cache_size=4m
#最大binlog缓存大小
max_binlog_cache_size=512m

重启MySQL服务,即可生效。

上面的配置文件中开启了binlog

8.解析binlog文件

执行insert update之后,对binlog文件进行解析

 


	com.github.shyiko
	mysql-binlog-connector-java
	0.21.0

public class BinlogTest extends BaseTests {

    private static final Logger log = LoggerFactory.getLogger(BinlogTest.class);

    @Test
    public void test() throws Exception {
        File file = new File("D:\Tools\mysql-8.0.29-winx64\data\ON.000002");
  
        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setCompatibilityMode(
                EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
                EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
        );
        BinaryLogFileReader reader = new BinaryLogFileReader(file, eventDeserializer);
        try {
            for (Event event; (event = reader.readEvent()) != null; ) {
                EventData data = event.getData();
                System.out.println(data);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            reader.close();
        }

    }
}

9.实时监听binlog事件并解析

spring.datasource.host=localhost
spring.datasource.port=3306
spring.datasource.databaseName=test
spring.datasource.url=jdbc:mysql://${spring.datasource.host}:${spring.datasource.port}/${spring.datasource.databaseName}?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=
binlog.table.names=label,product

@Slf4j
@Component
public class CommandLineRunnerImpl implements CommandLineRunner {

    @Value("${spring.datasource.host}")
    private String dbHost;
    @Value("${spring.datasource.port}")
    private int dbPort;
    @Value("${spring.datasource.databaseName}")
    private String databaseName;
    @Value("${spring.datasource.username}")
    private String username;
    @Value("${spring.datasource.password}")
    private String password;
    @Value("#{'${binlog.table.names}'.split(',')}")
    private Set binlogTableNames;
    @Resource
    private ColumnsMapper columnsMapper;

    @Override
    public void run(String... args) throws Exception {
        BinaryLogClient client = new BinaryLogClient(dbHost, dbPort, username, password);
        client.setServerId(2);

        Map> tableNameColumnMap = Maps.newLinkedHashMap();
        Map> tableIdColumnMap = Maps.newLinkedHashMap();

        for (String binlogTableName : binlogTableNames) {
            tableNameColumnMap.put(binlogTableName, Maps.newLinkedHashMap());
            List columnsDOS = columnsMapper.queryList(databaseName, binlogTableName);
            for (ColumnsDO columnsDO : columnsDOS) {
                tableNameColumnMap.get(binlogTableName).put(columnsDO.getOrdinalPosition() - 1, columnsDO.getColumnName());
            }
        }

        client.registerEventListener(event -> {
            EventData eventData = event.getData();
            if (eventData instanceof TableMapEventData) {
                TableMapEventData tableMapEventData = (TableMapEventData) eventData;
                String tableName = tableMapEventData.getTable();
                if (binlogTableNames.contains(tableName)) {
                    tableIdColumnMap.put(tableMapEventData.getTableId(), tableNameColumnMap.get(tableName));
                }
            }
            if (eventData instanceof UpdateRowsEventData) {
                UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) eventData;
                long tableId = updateRowsEventData.getTableId();
                Map columnMap = tableIdColumnMap.get(tableId);
                if (columnMap == null) {
                    return;
                }
                for (Map.Entry row : updateRowsEventData.getRows()) {
                    Serializable[] before = row.getKey();
                    Serializable[] after = row.getValue();

                    Map beforeMap = Maps.newLinkedHashMap();
                    Map afterMap = Maps.newLinkedHashMap();

                    for (int i = 0; i < before.length; i++) {
                        beforeMap.put(columnMap.get((long) i), before[i]);
                        afterMap.put(columnMap.get((long) i), after[i]);
                    }

                    log.info("beforeMap={}", JSON.toJSONString(beforeMap));
                    log.info("afterMap={}", JSON.toJSONString(afterMap));
                }
            } else if (eventData instanceof WriteRowsEventData) {
                WriteRowsEventData writeRowsEventData = (WriteRowsEventData) eventData;
                long tableId = writeRowsEventData.getTableId();
                Map columnMap = tableIdColumnMap.get(tableId);
                if (columnMap == null) {
                    return;
                }
                List rows = writeRowsEventData.getRows();
                Map afterMap = Maps.newLinkedHashMap();
                for (Serializable[] row : rows) {
                    for (int i = 0; i < row.length; i++) {
                        afterMap.put(columnMap.get((long) i), row[i]);
                    }
                    log.info("afterMap={}", JSON.toJSONString(afterMap));
                }
            } else if (eventData instanceof DeleteRowsEventData) {
                DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) eventData;
                long tableId = deleteRowsEventData.getTableId();
                Map columnMap = tableIdColumnMap.get(tableId);
                if (columnMap == null) {
                    return;
                }
                List rows = deleteRowsEventData.getRows();
                Map beforeMap = Maps.newLinkedHashMap();
                for (Serializable[] row : rows) {
                    for (int i = 0; i < row.length; i++) {
                        beforeMap.put(columnMap.get((long) i), row[i]);
                    }
                    log.info("beforeMap={}", JSON.toJSONString(beforeMap));
                }
            } else {
                System.out.println(eventData.getClass().getName());
                System.out.println(eventData.toString());
            }
        });

        try {
            client.connect();
        } catch (IOException e) {
            log.error("client.connect error.", e);
        }
    }
}

需要读取表结构信息,以便能正确解析binlog事件 

public interface ColumnsMapper {

    List queryList(@Param("tableSchema") String tableSchema, @Param("tableName") String tableName);
}


@Data
public class ColumnsDO {

    /**
     * table_catalog
     */
    private String tableCatalog;

    /**
     * table_schema 库名
     */
    private String tableSchema;

    /**
     * table_name  表明
     */
    private String tableName;

    /**
     * column_name  字段名
     */
    private String columnName;

    /**
     * ordinal_position  字段位置序号
     */
    private Long ordinalPosition;

    /**
     * column_default  字段默认值
     */
    private String columnDefault;

    /**
     * is_nullable  是否允许为空
     */
    private String isNullable;

    /**
     * data_type  字段数据类型
     */
    private String dataType;

    /**
     * character_maximum_length  字符最大长度
     */
    private Long characterMaximumLength;

    /**
     * character_octet_length
     */
    private Long characterOctetLength;

    /**
     * numeric_precision  数字精度
     */
    private Long numericPrecision;

    /**
     * numeric_scale
     */
    private Long numericScale;

    /**
     * datetime_precision 时间精度
     */
    private Long datetimePrecision;

    /**
     * character_set_name 字符集
     */
    private String characterSetName;

    /**
     * collation_name
     */
    private String collationName;

    /**
     * column_type
     */
    private String columnType;

    /**
     * column_key
     */
    private String columnKey;

    /**
     * extra
     */
    private String extra;

    /**
     * privileges
     */
    private String privileges;

    /**
     * column_comment  字段注释
     */
    private String columnComment;

    /**
     * is_generated
     */
    private String isGenerated;

    /**
     * generation_expression
     */
    private String generationExpression;
}





    
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
    

    
        table_catalog,
        table_schema,
        table_name,
        column_name,
        ordinal_position,
        column_default,
        is_nullable,
        data_type,
        character_maximum_length,
        character_octet_length,
        numeric_precision,
        numeric_scale,
        datetime_precision,
        character_set_name,
        collation_name,
        column_type,
        column_key,
        extra,
        privileges,
        column_comment,
        is_generated,
        generation_expression
    

    


PS:MariaDB和MySQL的binlog文件有差异,使用以上代码对MariaDB的binlog文件进行解析时会出现异常。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/langs/798478.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-06
下一篇 2022-05-06

发表评论

登录后才能评论

评论列表(0条)

保存