1.官网下载 MySQL Community Server
MySQL :: Download MySQL Community Server
这里下载的是Windows (x86, 64-bit), ZIP Archive mysql-8.0.29-winx64.zip
2.下载完成后将ZIP解压到 D:\Tools\mysql-8.0.29-winx64\
3.初始化
命令行执行 mysqld --initialize
此时会在 D:\Tools\mysql-8.0.29-winx64\目录下生成data目录
4.安装MySQL服务
命令行执行 mysqld --install
5.启动MySQL服务
命令行执行 net start mysql
6.查看MySQL默认密码
D:\Tools\mysql-8.0.29-winx64\data 目录下生成了一个.err文件,其中打印了密码
此时使用 root/XZhgyCLcg7?t即可登录MySQL
7.配置文件
在D:\Tools\mysql-8.0.29-winx64新建my.ini文件
[mysql]
# 设置mysql客户端默认字符集
default-character-set=utf8
[mysqld]
#设置3306端口
port = 3306
# 设置mysql的安装目录
basedir=D:\Tools\mysql-8.0.29-winx64
# 设置mysql数据库的数据的存放目录
datadir=D:\Tools\mysql-8.0.29-winx64\data
# 允许最大连接数
max_connections=200
# 服务端使用的字符集默认为8比特编码的latin1字符集
character-set-server=utf8
# 创建新表时将使用的默认存储引擎
default-storage-engine=INNODB
#开启binlog日志
log_bin=ON
#设置日志三种格式:STATEMENT、ROW、MIXED 。
binlog_format=ROW
#配置serverid
server-id=1
#设置binlog清理时间
expire_logs_days=7
#binlog每个日志文件大小
max_binlog_size=100m
#binlog缓存大小
binlog_cache_size=4m
#最大binlog缓存大小
max_binlog_cache_size=512m
重启MySQL服务,即可生效。
上面的配置文件中开启了binlog
8.解析binlog文件
执行insert update之后,对binlog文件进行解析
com.github.shyiko
mysql-binlog-connector-java
0.21.0
public class BinlogTest extends BaseTests {
private static final Logger log = LoggerFactory.getLogger(BinlogTest.class);
@Test
public void test() throws Exception {
File file = new File("D:\Tools\mysql-8.0.29-winx64\data\ON.000002");
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
BinaryLogFileReader reader = new BinaryLogFileReader(file, eventDeserializer);
try {
for (Event event; (event = reader.readEvent()) != null; ) {
EventData data = event.getData();
System.out.println(data);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
reader.close();
}
}
}
9.实时监听binlog事件并解析
spring.datasource.host=localhost
spring.datasource.port=3306
spring.datasource.databaseName=test
spring.datasource.url=jdbc:mysql://${spring.datasource.host}:${spring.datasource.port}/${spring.datasource.databaseName}?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=
binlog.table.names=label,product
@Slf4j
@Component
public class CommandLineRunnerImpl implements CommandLineRunner {
@Value("${spring.datasource.host}")
private String dbHost;
@Value("${spring.datasource.port}")
private int dbPort;
@Value("${spring.datasource.databaseName}")
private String databaseName;
@Value("${spring.datasource.username}")
private String username;
@Value("${spring.datasource.password}")
private String password;
@Value("#{'${binlog.table.names}'.split(',')}")
private Set binlogTableNames;
@Resource
private ColumnsMapper columnsMapper;
@Override
public void run(String... args) throws Exception {
BinaryLogClient client = new BinaryLogClient(dbHost, dbPort, username, password);
client.setServerId(2);
Map> tableNameColumnMap = Maps.newLinkedHashMap();
Map> tableIdColumnMap = Maps.newLinkedHashMap();
for (String binlogTableName : binlogTableNames) {
tableNameColumnMap.put(binlogTableName, Maps.newLinkedHashMap());
List columnsDOS = columnsMapper.queryList(databaseName, binlogTableName);
for (ColumnsDO columnsDO : columnsDOS) {
tableNameColumnMap.get(binlogTableName).put(columnsDO.getOrdinalPosition() - 1, columnsDO.getColumnName());
}
}
client.registerEventListener(event -> {
EventData eventData = event.getData();
if (eventData instanceof TableMapEventData) {
TableMapEventData tableMapEventData = (TableMapEventData) eventData;
String tableName = tableMapEventData.getTable();
if (binlogTableNames.contains(tableName)) {
tableIdColumnMap.put(tableMapEventData.getTableId(), tableNameColumnMap.get(tableName));
}
}
if (eventData instanceof UpdateRowsEventData) {
UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) eventData;
long tableId = updateRowsEventData.getTableId();
Map columnMap = tableIdColumnMap.get(tableId);
if (columnMap == null) {
return;
}
for (Map.Entry row : updateRowsEventData.getRows()) {
Serializable[] before = row.getKey();
Serializable[] after = row.getValue();
Map beforeMap = Maps.newLinkedHashMap();
Map afterMap = Maps.newLinkedHashMap();
for (int i = 0; i < before.length; i++) {
beforeMap.put(columnMap.get((long) i), before[i]);
afterMap.put(columnMap.get((long) i), after[i]);
}
log.info("beforeMap={}", JSON.toJSONString(beforeMap));
log.info("afterMap={}", JSON.toJSONString(afterMap));
}
} else if (eventData instanceof WriteRowsEventData) {
WriteRowsEventData writeRowsEventData = (WriteRowsEventData) eventData;
long tableId = writeRowsEventData.getTableId();
Map columnMap = tableIdColumnMap.get(tableId);
if (columnMap == null) {
return;
}
List rows = writeRowsEventData.getRows();
Map afterMap = Maps.newLinkedHashMap();
for (Serializable[] row : rows) {
for (int i = 0; i < row.length; i++) {
afterMap.put(columnMap.get((long) i), row[i]);
}
log.info("afterMap={}", JSON.toJSONString(afterMap));
}
} else if (eventData instanceof DeleteRowsEventData) {
DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) eventData;
long tableId = deleteRowsEventData.getTableId();
Map columnMap = tableIdColumnMap.get(tableId);
if (columnMap == null) {
return;
}
List rows = deleteRowsEventData.getRows();
Map beforeMap = Maps.newLinkedHashMap();
for (Serializable[] row : rows) {
for (int i = 0; i < row.length; i++) {
beforeMap.put(columnMap.get((long) i), row[i]);
}
log.info("beforeMap={}", JSON.toJSONString(beforeMap));
}
} else {
System.out.println(eventData.getClass().getName());
System.out.println(eventData.toString());
}
});
try {
client.connect();
} catch (IOException e) {
log.error("client.connect error.", e);
}
}
}
需要读取表结构信息,以便能正确解析binlog事件
public interface ColumnsMapper {
List queryList(@Param("tableSchema") String tableSchema, @Param("tableName") String tableName);
}
@Data
public class ColumnsDO {
/**
* table_catalog
*/
private String tableCatalog;
/**
* table_schema 库名
*/
private String tableSchema;
/**
* table_name 表明
*/
private String tableName;
/**
* column_name 字段名
*/
private String columnName;
/**
* ordinal_position 字段位置序号
*/
private Long ordinalPosition;
/**
* column_default 字段默认值
*/
private String columnDefault;
/**
* is_nullable 是否允许为空
*/
private String isNullable;
/**
* data_type 字段数据类型
*/
private String dataType;
/**
* character_maximum_length 字符最大长度
*/
private Long characterMaximumLength;
/**
* character_octet_length
*/
private Long characterOctetLength;
/**
* numeric_precision 数字精度
*/
private Long numericPrecision;
/**
* numeric_scale
*/
private Long numericScale;
/**
* datetime_precision 时间精度
*/
private Long datetimePrecision;
/**
* character_set_name 字符集
*/
private String characterSetName;
/**
* collation_name
*/
private String collationName;
/**
* column_type
*/
private String columnType;
/**
* column_key
*/
private String columnKey;
/**
* extra
*/
private String extra;
/**
* privileges
*/
private String privileges;
/**
* column_comment 字段注释
*/
private String columnComment;
/**
* is_generated
*/
private String isGenerated;
/**
* generation_expression
*/
private String generationExpression;
}
table_catalog,
table_schema,
table_name,
column_name,
ordinal_position,
column_default,
is_nullable,
data_type,
character_maximum_length,
character_octet_length,
numeric_precision,
numeric_scale,
datetime_precision,
character_set_name,
collation_name,
column_type,
column_key,
extra,
privileges,
column_comment,
is_generated,
generation_expression
PS:MariaDB和MySQL的binlog文件有差异,使用以上代码对MariaDB的binlog文件进行解析时会出现异常。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)