Expiring XXX record(s) for XXX:120015 ms has passed since batch creation

Expiring XXX record(s) for XXX:120015 ms has passed since batch creation,第1张

Expiring XXX record(s) for XXX:120015 ms has passed since batch creation Expiring XXX record(s) for XXX:120015 ms has passed since batch creation
    问题背景:dws曝光人+场模型聚合压测,憋量20亿左右数据;问题发生现象:flink job启动后,频繁发生checkpoint失败,并且checkpoint失败原因 :Failure reason: Checkpoint was declined.问题现场日志:
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 8 for operator aggregate -> Sink: exp sink (86/160). Failure reason: Checkpoint was declined.at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434) ...
Caused by: org.apache.flink.util.SerializedThrowable: Failed to send data to Kafka: Expiring 2483 record(s) for 【topic_name】-85:120015 ms has passed since batch creation ...
Caused by: org.apache.flink.util.SerializedThrowable: Expiring 2483 record(s) for 【topic_name】-85:120015 ms has passed since batch creation ...
org.apache.flink.runtime.jobmaster.JobMaster - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
    问题发生原因描述:
    问题的根本原因是kafka消息发送是批量发送,ProducerRecord会先存储到本地buffer,消息存储在这个buffer里的时长是有限制的【request.timeout.ms】,因此在消息量级比较大,存储在buffer里的消息,超过了request.timeout.ms这个设置时长,就会报上述Expiring XXX record(s) for XXX:120015 ms has passed since batch creation错误;而与此同时,我们开启了端到端的精准一次特性即事务,此时checkpoint与消息的pre commit绑定,pre commit 失败,导致checkpoint的失败,任务重启,大量消息积压;问题解决方案:
    a)调整 request.timeout.ms 这个参数去满足需求,让消息在buffer里待更长的时间;
    b)我们公司会给与每个生产者限速,可以提升生产者的速度,这样本地缓存的消息就不会产生积压;checkpoint失败现场截图,表现为某一个或者多个并行度checkpoint失败:

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/zaji/5706121.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存