Flink(59):Flink之FlinkCDC(下)

Flink(59):Flink之FlinkCDC(下),第1张

Flink(59):Flink之FlinkCDC(下)

目录

0. 相关文章链接

1. FlinkCDC1.x中存在的痛点

2. FlinkCDC2.x的设计目标

3. FlinkCDC2.x的设计实现

3.1. 整体概览

3.2. Chunk切分

3.3. Chunk分配

3.4. Chunk读取

3.5. Chunk汇报

3.6. Chunk分配

4. FlinkCDC2.x的核心原理分析

4.1. Binlog Chunk 中开始读取位置源码

4.2. 读取低位点到高位点之间的Binlog


0. 相关文章链接

Flink文章汇总

1. FlinkCDC1.x中存在的痛点

2. FlinkCDC2.x的设计目标

3. FlinkCDC2.x的设计实现 3.1. 整体概览

在对于有主键的表做初始化模式,整体的流程主要分为 5 个阶段:

    Chunk 切分;Chunk 分配; (实现并行读取数据&CheckPoint)Chunk 读取; (实现无锁读取)Chunk 汇报;Chunk 分配。

3.2. Chunk切分

        根据 Netflix DBlog 的论文中的无锁算法原理,对于目标表按照主键进行数据分片,设置每个切片的区间为左闭右开或者左开右闭来保证数据的连续性。

3.3. Chunk分配

        将划分好的 Chunk 分发给多个 SourceReader,每个 SourceReader 读取表中的一部分数据, 实现了并行读取的目标。

        同时在每个 Chunk 读取的时候可以单独做 CheckPoint,某个 Chunk 读取失败只需要单独执行该 Chunk 的任务,而不需要像 1.x 中失败了只能从头读取。

        若每个 SourceReader 保证了数据一致性,则全表就保证了数据一致性。

3.4. Chunk读取

读取可以分为 5 个阶段:

    SourceReader 读取表数据之前先记录当前的 Binlog 位置信息记为低位点;SourceReader 将自身区间内的数据查询出来并放置在 buffer 中;查询完成之后记录当前的 Binlog 位置信息记为高位点;在增量部分消费从低位点到高位点的 Binlog;根据主键,对 buffer 中的数据进行修正并输出。

通过以上5个阶段可以保证每个Chunk最终的输出就是在高位点时该Chunk中最新的数据,但是目前只是做到了保证单个 Chunk 中的数据一致性。

3.5. Chunk汇报

在 Snapshot Chunk 读取完成之后,有一个汇报的流程,如上图所示,即 SourceReader 需要将 Snapshot Chunk 完成信息汇报给 SourceEnumerator。

3.6. Chunk分配

        FlinkCDC 是支持全量+增量数据同步的,在 SourceEnumerator 接收到所有的 SnapshotChunk 完成信息之后,还有一个消费增量数据(Binlog) 的任务,此时是通过下发 Binlog Chunk给任意一个 SourceReader 进行单并发读取来实现的。

4. FlinkCDC2.x的核心原理分析 4.1. Binlog Chunk 中开始读取位置源码

MySqlHybridSplitAssigner

private MySqlBinlogSplit createBinlogSplit() {

    final List assignedSnapshotSplit = snapshotSplitAssigner

            .getAssignedSplits()

            .values()

            .stream()

            .sorted(Comparator.comparing(MySqlSplit::splitId))

            .collect(Collectors.toList());

    Map splitFinishedOffsets = snapshotSplitAssigner.getSplitFinishedOffsets();

    final List finishedSnapshotSplitInfos = new ArrayList<>();

    final Map tableSchemas = new HashMap<>();

    BinlogOffset minBinlogOffset = BinlogOffset.INITIAL_OFFSET;

    for (MySqlSnapshotSplit split : assignedSnapshotSplit) {

        // find the min binlog offset

        BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());

        if (binlogOffset.compareTo(minBinlogOffset) < 0) {

            minBinlogOffset = binlogOffset;

        }

        finishedSnapshotSplitInfos.add(

                new FinishedSnapshotSplitInfo(

                        split.getTableId(),

                        split.splitId(),

                        split.getSplitStart(),

                        split.getSplitEnd(),

                        binlogOffset));

        tableSchemas.putAll(split.getTableSchemas());

    }

    final MySqlSnapshotSplit lastSnapshotSplit = assignedSnapshotSplit.get(assignedSnapshotSplit.size() - 1).asSnapshotSplit();

    return new MySqlBinlogSplit(

            BINLOG_SPLIT_ID,

            lastSnapshotSplit.getSplitKeyType(),

            minBinlogOffset,

            BinlogOffset.NO_STOPPING_OFFSET,

            finishedSnapshotSplitInfos,

            tableSchemas

    );

}

4.2. 读取低位点到高位点之间的Binlog

BinlogSplitReader

private boolean shouldEmit(SourceRecord sourceRecord) {
    if (isDataChangeRecord(sourceRecord)) {
        TableId tableId = getTableId(sourceRecord);
        BinlogOffset position = getBinlogPosition(sourceRecord);
        // aligned, all snapshot splits of the table has reached max highWatermark
        if (position.isAtOrBefore(maxSplitHighWatermarkMap.get(tableId))) {
            return true;
        }
        Object[] key =
                getSplitKey(
                        currentBinlogSplit.getSplitKeyType(),
                        sourceRecord,
                        statefulTaskContext.getSchemaNameAdjuster()
                );
        for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
            if (RecordUtils.splitKeyRangeContains(
                    key, splitInfo.getSplitStart(), splitInfo.getSplitEnd())
                    && position.isAtOrBefore(splitInfo.getHighWatermark())) {
                return true;
            }
        }
        // not in the monitored splits scope, do not emit
        return false;
    }
    // always send the schema change event and signal event
    // we need record them to state of Flink
    return true;
}

注:此博客根据某马2020年贺岁视频改编而来 -> B站网址

注:其他相关文章链接由此进 -> Flink文章汇总

注:此博文为介绍FlinkCDC2.x的相关知识,对应FlinkCDC的详细使用可以查看Flink(58):Flink之FlinkCDC(上)

注:此博文中的相关内容和图片截取至网上Flink相关公开内容 


欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/zaji/5719984.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存