一文搞定 Flink Task 提交执行全流程

一文搞定 Flink Task 提交执行全流程,第1张

上一篇 一文搞定 Flink Job 提交全流程 ,我们知道每一个 operator chain 作为一个整体,提交 task 。

这里创建了一个 Task 对象并启动,我们来看一下 Task 启动的时候都做了什么

首先向 blobService、netWork 注册 job ,添加监控,将jar 等添加到分布式缓存中,然后就 invoke,这也是 task 真正开始执行的地方,我们以 StreamTask 为例

init 然后对于一些 Rich Function 会先执行其 open方法,然后开始 run,就开始真正的消费数据了。我们以 flatMap 为例

当执行 run 方法时,首先呢 OneInputStreamTask.run

这一块的逻辑可具体参考

一文搞定 Flink 消费消息的全流程 、 一文搞定 Flink Checkpoint Barrier 全流程 以及 一文搞懂 Flink 处理 Barrier 全过程

我们知道当往下游发送数据的时候

继续追踪下去到 StreamFlatMap.processElement

其他的类似,如果是 kafka source task 调用的是 headOperator.run(getCheckpointLock(), getStreamStatusMaintainer()),然后去消费 kafka 中的数据。然后跟前面 一文搞定 Flink Job 提交全流程 、 写给大忙人看的Flink 消费 Kafka 、 一文搞定 Flink 消费消息的全流程 以及 一文搞定 Flink Checkpoint Barrier 全流程 就可以串起来了。而 Flink 整体流程的分析,除了 restore 之外,也差不多可以告一段落了。

Flink是依赖内存计算,计算过程中内存不够对Flink的执行效率影响很大。可以通过监控GC(Garbage Collection),评估内存使用及剩余情况来判断内存是否变成性能瓶颈,并根据情况优化。

监控节点进程的YARN的Container GC日志,如果频繁出现Full GC,需要优化GC。

GC的配置:在客户端的“conf/flink-conf.yaml”配置文件中,在“env.java.opts”配置项中添加参数:“

此处默认已经添加GC日志。

任务的并行度可以通过以下四种层次(按优先级从高到低排列)指定,用户可以根据实际的内存、CPU、数据以及应用程序逻辑的情况调整并行度参数。

您可以根据实际任务数量的多少,为JobManager设置一个合适的内存。

•在使用yarn-session命令时,添加“-jm MEM”参数设置内存。

•在使用yarn-cluster命令时,添加“-yjm MEM”参数设置内存。

每个TaskManager每个核同时能跑一个task,所以增加了TaskManager的个数相当于增大了任务的并发度。在资源充足的情况下,可以相应增加TaskManager的个数,以提高运行效率。

•在使用yarn-session命令时,添加“-n NUM”参数设置TaskManager个数。

•在使用yarn-cluster命令时,添加“-yn NUM”参数设置TaskManager个数。

每个TaskManager多个核同时能跑多个task,相当于增大了任务的并发度。但是由于所有核共用TaskManager的内存,所以要在内存和核数之间做好平衡。

•在使用yarn-session命令时,添加“-s NUM”参数设置SLOT数。

•在使用yarn-cluster命令时,添加“-ys NUM”参数设置SLOT数。

TaskManager的内存主要用于任务执行、通信等。当一个任务很大的时候,可能需要较多资源,因而内存也可以做相应的增加。

•将在使用yarn-sesion命令时,添加“-tm MEM”参数设置内存。

•将在使用yarn-cluster命令时,添加“-ytm MEM”参数设置内存。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/bake/11536899.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-16
下一篇 2023-05-16

发表评论

登录后才能评论

评论列表(0条)

保存