一、Flink简介
1.1、flink特点1.2、分层Api1.3、Flink vs Spark Streaming 二、Flink批处理应用
2.1、依赖的引入2.2、准备批处理文件2.3、wordCount编码2.4、自定义类 三、Flink流处理应用
3.1、wordCount编码3.2、设置并行度-默认为43.2、数据来源socket3.3、配置文件参数提取 四、Standlone环境运行job
4.1、Standlone环境的搭建4.2、配置文件说明4.3、提交jar包入口4.4、命令行提交Job 五、Flink On Yarn
5.1、session-Cluster模式5.2、Per-Job-cluster模式 六、Flink 四大组件
6.1、Flink运行时的组件6.2、任务提交流程6.3、任务调度原理6.4、并行度6.5、程序和数据流6.6、数据的传输形式6.6、任务链 七、F流处理PAI
7.1、Environment
7.1.1、getExcuteionEnvironment7.1.2、createLocalEnvironment7.1.3、createRemoteEnvironment 7.2、Source
7.2.1、List7.2.2、source from file7.2.3、source from kafka7.2.4、自定义source 7.3、Treansform
7.3.1、map7.3.2、FlatMap7.3.3、Fliter7.3.4、Keyby7.3.5、滚动聚合7.3.6、Reduce7.3.7、Split和select
一、Flink简介 1.1、flink特点低延迟,高吞吐、结果精准,良好的容错
• 支持事件时间(event-time)和处理时间(processing-time)语义 • 精确一次(exactly-once)的状态一致性保证 • 低延迟,每秒处理数百万个事件,毫秒级延迟 • 与众多常用存储系统的连接 • 高可用,动态扩展,实现7*24小时全天候运行1.2、分层Api
越顶层越抽象,表达含义越简明,使用越方便 越底层越具体,表达能力越丰富,使用越灵活1.3、Flink vs Spark Streaming
•数据模型 -spark采用RDD模型,spark streaming的DStream实际上也就是一组组小批数据RDD的集合 -flink基秀数据模型是数据流,以及事件(Event)序列 •运行时架构 -spark是批计算,将DAG划分为不同的stage,一个完成后才可以计算下一个 -flink是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节 点进行处理二、Flink批处理应用 2.1、依赖的引入 2.2、准备批处理文件 2.3、wordCount编码 2.4、自定义类
结果输出
三、Flink流处理应用
3.1、wordCount编码
nc lk -7777
3.3、配置文件参数提取
下载包 解压flink-1.10.1-bin.scala_2.12.tgz 4.2、配置文件说明
1、通常jobmanager 的配置比 taskmanager,因为干活的是taskmanager 2、并行度不一定比slots小,一定比集群总的slots小
启动一个jobmanage和一个taskmanager
配置参考
4.3、提交jar包入口
配置文件4个槽,只占用了2个槽。以并行度最高来群定soilt的使用个数。个数不够,超时后会报超时。
运行结果
4.4、命令行提交Job
查看运行的job列表
命令行取消job
查看运行的和取消的所有列表
五、Flink On Yarn
flink提供了两种yarn上运行模式,分别为session-Cluster和per-Job-cluster的模式以Yarn模式部署Flink任务时,要求Flink是有Haddop支持的版本,1.7以上版本,需要将整合hadoop支持的依赖放入Flink 的 lib下。 5.1、session-Cluster模式
Flink bin 目录下启动 -n 可不指定
执行提交job命令 有Session 找 Session集群 没session找 Standlone
5.2、Per-Job-cluster模式
基本 *** 作
六、Flink 四大组件
6.1、Flink运行时的组件
作业管理器 JobManager作用
1、控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager所控制执行。 2、JobManager会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow Graph)和打包了所有的类、库和其它资源的JAR包。 3、JobManager会把JobGraph转换成一个物理层面的数据流图,这个图被叫做"执行图"(ExecutionGraph),包含了所有可以并发执行的任务。 4、JobManager会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(兀skManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的hskManager上。而在运行过程中,JobManager会负责所有需要中央协调的 *** 作,比如说检查点(checkpoints)的协调。
任务管理器 TaskManager
1、Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots),插槽的数量限制了TaskManager能够执行的任务数量。 2、启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。 3、在执行过程中,TgskManager可以跟其它运行同一应用程序的TaskManager交换数据。
资源管理器 ResourceManager
1、主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger插槽是Flink中定义的处理资源单元。 2、Flink为不同的环境和资源管理工具提供了不同资源管理器,比如SRN、Mesos、K8s, 以及Standalone部署。 3、当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分酉已给JobManager。如果ResourceManager没有有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动心TaskManager进程的容器。
分发器
1、可以跨作业运行,它为应用提交提供了REST接口。 2、当一个应用被提交执行时,分发器就会启动并将应用移交给一个 JobManager 3、Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执 行的信息。 4、Dispatcher在架构中可能并不是必需的,这取决于应用提交运行 的方式。6.2、任务提交流程
import com.tan.flink.bean.SensorReading; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Arrays; public class SourceFromCollection { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream7.2.2、source from fileinputDataStream = env.fromCollection(Arrays.asList( new SensorReading("sensor_1", 1547718199L, 35.8), new SensorReading("sensor_6", 1547718201L, 15.4), new SensorReading("sensor_7", 1547718202L, 6.7), new SensorReading("sensor_10", 1547718205L, 38.1) )); inputDataStream.print(); env.execute(); } }
env.readTextFile(path);7.2.3、source from kafka
pom 依赖
org.apache.flink flink-connector-kafka-0.11_2.121.10.1
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import java.util.Properties; public class SourceFromKafka { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // kafka 配置 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.200.102:9092,192.168.200.102:9092,192.168.200.104:9092"); properties.setProperty("group.id", "flink-kafka"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", "latest"); DataStreamSource7.2.4、自定义sourceinputDataStream = env.addSource(new FlinkKafkaConsumer011 ( "sensor", new SimpleStringSchema(), properties )); inputDataStream.print(); env.execute(); } }
需要实现SourceFunction 或者继承SourceFunction的富函数RichSourceFunction
import com.tan.flink.bean.SensorReading; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Random; import java.util.UUID; public class SourceFromCustom { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource7.3、Treansform 7.3.1、map 7.3.2、FlatMap 7.3.3、Fliter 7.3.4、Keyby 7.3.5、滚动聚合 7.3.6、Reduce 7.3.7、Split和selectinputDataStream = env.addSource(new CustomSource()); inputDataStream.print(); env.execute(); } public static class CustomSource implements SourceFunction { boolean running = true; @Override public void run(SourceContext sourceContext) throws Exception { Random random = new Random(); while (running) { // 每隔 100 秒数据 for (int i = 0; i < 5; i++) { String id = UUID.randomUUID().toString().substring(0, 8); long timestamp = System.currentTimeMillis(); double temperature = 60 + random.nextGaussian() * 20; sourceContext.collect(new SensorReading(id, timestamp, temperature)); Thread.sleep(100L); } Thread.sleep(1000L); } } @Override public void cancel() { running = false; } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)