About-Flink

About-Flink,第1张

About-Flink

about-Flink

一、Flink简介

1.1、flink特点1.2、分层Api1.3、Flink vs Spark Streaming 二、Flink批处理应用

2.1、依赖的引入2.2、准备批处理文件2.3、wordCount编码2.4、自定义类 三、Flink流处理应用

3.1、wordCount编码3.2、设置并行度-默认为43.2、数据来源socket3.3、配置文件参数提取 四、Standlone环境运行job

4.1、Standlone环境的搭建4.2、配置文件说明4.3、提交jar包入口4.4、命令行提交Job 五、Flink On Yarn

5.1、session-Cluster模式5.2、Per-Job-cluster模式 六、Flink 四大组件

6.1、Flink运行时的组件6.2、任务提交流程6.3、任务调度原理6.4、并行度6.5、程序和数据流6.6、数据的传输形式6.6、任务链 七、F流处理PAI

7.1、Environment

7.1.1、getExcuteionEnvironment7.1.2、createLocalEnvironment7.1.3、createRemoteEnvironment 7.2、Source

7.2.1、List7.2.2、source from file7.2.3、source from kafka7.2.4、自定义source 7.3、Treansform

7.3.1、map7.3.2、FlatMap7.3.3、Fliter7.3.4、Keyby7.3.5、滚动聚合7.3.6、Reduce7.3.7、Split和select

一、Flink简介 1.1、flink特点

低延迟,高吞吐、结果精准,良好的容错

• 支持事件时间(event-time)和处理时间(processing-time)语义
• 精确一次(exactly-once)的状态一致性保证
• 低延迟,每秒处理数百万个事件,毫秒级延迟
• 与众多常用存储系统的连接
• 高可用,动态扩展,实现7*24小时全天候运行
1.2、分层Api
	越顶层越抽象,表达含义越简明,使用越方便
	越底层越具体,表达能力越丰富,使用越灵活

1.3、Flink vs Spark Streaming
•数据模型
-spark采用RDD模型,spark streaming的DStream实际上也就是一组组小批数据RDD的集合
-flink基秀数据模型是数据流,以及事件(Event)序列

•运行时架构
-spark是批计算,将DAG划分为不同的stage,一个完成后才可以计算下一个
-flink是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节
点进行处理
二、Flink批处理应用 2.1、依赖的引入

2.2、准备批处理文件

2.3、wordCount编码

2.4、自定义类

结果输出
三、Flink流处理应用 3.1、wordCount编码


3.2、设置并行度-默认为4

3.2、数据来源socket

nc lk -7777
3.3、配置文件参数提取


四、Standlone环境运行job 4.1、Standlone环境的搭建

下载包 解压flink-1.10.1-bin.scala_2.12.tgz 4.2、配置文件说明

1、通常jobmanager 的配置比 taskmanager,因为干活的是taskmanager
2、并行度不一定比slots小,一定比集群总的slots小

启动一个jobmanage和一个taskmanager

配置参考
4.3、提交jar包入口



配置文件4个槽,只占用了2个槽。以并行度最高来群定soilt的使用个数。个数不够,超时后会报超时。
运行结果
4.4、命令行提交Job

查看运行的job列表
命令行取消job
查看运行的和取消的所有列表
五、Flink On Yarn

flink提供了两种yarn上运行模式,分别为session-Cluster和per-Job-cluster的模式以Yarn模式部署Flink任务时,要求Flink是有Haddop支持的版本,1.7以上版本,需要将整合hadoop支持的依赖放入Flink 的 lib下。 5.1、session-Cluster模式


Flink bin 目录下启动 -n 可不指定
执行提交job命令 有Session 找 Session集群 没session找 Standlone

5.2、Per-Job-cluster模式


基本 *** 作
六、Flink 四大组件 6.1、Flink运行时的组件

作业管理器 JobManager作用

	1、控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager所控制执行。
	2、JobManager会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow Graph)和打包了所有的类、库和其它资源的JAR包。
	3、JobManager会把JobGraph转换成一个物理层面的数据流图,这个图被叫做"执行图"(ExecutionGraph),包含了所有可以并发执行的任务。
	4、JobManager会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(兀skManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的hskManager上。而在运行过程中,JobManager会负责所有需要中央协调的 *** 作,比如说检查点(checkpoints)的协调。

任务管理器 TaskManager

	1、Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots),插槽的数量限制了TaskManager能够执行的任务数量。
	2、启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。
	3、在执行过程中,TgskManager可以跟其它运行同一应用程序的TaskManager交换数据。

资源管理器 ResourceManager

	1、主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger插槽是Flink中定义的处理资源单元。
	2、Flink为不同的环境和资源管理工具提供了不同资源管理器,比如SRN、Mesos、K8s, 以及Standalone部署。
	3、当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分酉已给JobManager。如果ResourceManager没有有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动心TaskManager进程的容器。

分发器

	1、可以跨作业运行,它为应用提交提供了REST接口。
	2、当一个应用被提交执行时,分发器就会启动并将应用移交给一个
JobManager
	3、Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执
行的信息。
	4、Dispatcher在架构中可能并不是必需的,这取决于应用提交运行
的方式。
6.2、任务提交流程


6.3、任务调度原理

6.4、并行度



6.5、程序和数据流


6.6、数据的传输形式

6.6、任务链


七、F流处理PAI 7.1、Environment 7.1.1、getExcuteionEnvironment

7.1.2、createLocalEnvironment

7.1.3、createRemoteEnvironment

7.2、Source 7.2.1、List
import com.tan.flink.bean.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;

public class SourceFromCollection {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream inputDataStream = env.fromCollection(Arrays.asList(
                new SensorReading("sensor_1", 1547718199L, 35.8),
                new SensorReading("sensor_6", 1547718201L, 15.4),
                new SensorReading("sensor_7", 1547718202L, 6.7),
                new SensorReading("sensor_10", 1547718205L, 38.1)
        ));
        inputDataStream.print();
        env.execute();
    }
}
7.2.2、source from file
env.readTextFile(path);
7.2.3、source from kafka

pom 依赖

		
            org.apache.flink
            flink-connector-kafka-0.11_2.12
            1.10.1
        

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;

public class SourceFromKafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // kafka 配置
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.200.102:9092,192.168.200.102:9092,192.168.200.104:9092");
        properties.setProperty("group.id", "flink-kafka");
        properties.setProperty("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

        DataStreamSource inputDataStream = env.addSource(new FlinkKafkaConsumer011(
                "sensor",
                new SimpleStringSchema(),
                properties
        ));

        inputDataStream.print();
        env.execute();
    }
}

7.2.4、自定义source

需要实现SourceFunction 或者继承SourceFunction的富函数RichSourceFunction

	import com.tan.flink.bean.SensorReading;
	import org.apache.flink.streaming.api.datastream.DataStreamSource;
	import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
	import org.apache.flink.streaming.api.functions.source.SourceFunction;
	import java.util.Random;
	import java.util.UUID;
	
	public class SourceFromCustom {
	    public static void main(String[] args) throws Exception {
	        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	        DataStreamSource inputDataStream = env.addSource(new CustomSource());
	        inputDataStream.print();
	        env.execute();
	    }
	
	    public static class CustomSource implements SourceFunction {
	        boolean running = true;
	        @Override
	        public void run(SourceContext sourceContext) throws Exception {
	
	            Random random = new Random();
	            while (running) {
	                // 每隔 100 秒数据
	                for (int i = 0; i < 5; i++) {
	                    String id = UUID.randomUUID().toString().substring(0, 8);
	                    long timestamp = System.currentTimeMillis();
	                    double temperature = 60 + random.nextGaussian() * 20;
	                    sourceContext.collect(new SensorReading(id, timestamp, temperature));
	
	                    Thread.sleep(100L);
	                }
	
	                Thread.sleep(1000L);
	            }
	        }
	
	        @Override
	        public void cancel() {
	            running = false;
	        }
	    }
	}

7.3、Treansform 7.3.1、map 7.3.2、FlatMap 7.3.3、Fliter 7.3.4、Keyby 7.3.5、滚动聚合 7.3.6、Reduce 7.3.7、Split和select

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/zaji/5716221.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存