Flink流处理API

Flink流处理API,第1张

Flink流处理API

1、支持的数据类型

Flink流应用程序处理的是以数据对象表示的事件流。所以在Flink内部,我们需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink需要明确知道应用程序所处理的数据类型。Flink使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器Flink还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如lambda函数或泛型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。

​ Flink支持Java和Scala中所有常见数据类型。使用最广泛的类型有以下几种。

1.1 基础数据类型

Flink支持所有的Java和Scala基础数据类型,Int, Double, Long, String, …

DataStream numberStream = env.fromElements(1, 2, 3, 4); numberStream.map(data -> data * 2);

1.2 Java和scala元祖(Tuples)

java不像Scala天生支持元组Tuple类型,java的元组类型由Flink的包提供,默认提供Tuple0~Tuple25

DataStream> personStream = env.fromElements( 
  new Tuple2("Adam", 17), 
  new Tuple2("Sarah", 23) 
); 
personStream.filter(p -> p.f1 > 18);

1.3 Scala 样例类(case Classes)

case class Person(name:String,age:Int)

val numbers: DataStream[(String,Integer)] = env.fromElements(
  Person("张三",12),
  Person("李四",23)
)

1.4 Java简单对象(Bean) (必须提供无参构造函数)

成员变量要求都是public(或者private但是提供get、set方法)

public class Person{
  public String name;
  public int age;
  public Person() {}
  public Person( String name , int age) {
    this.name = name;
    this.age = age;
  }
}
DataStream Pe rson > persons = env.fromElements(
  new Person (" Alex", 42),
  new Person (" Wendy",23)
);

1.5 其他(ArraysListsMapsEnums等等)

Flink对Java和Scala中的一些特殊目的的类型也都是支持的,比如Java的ArrayList,HashMap,Enum等等。

2.实现UDF函数

2.1 函数类(Function Classes)

Flink暴露了所有UDF函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction等等。

下面例子实现了FilterFunction接口:

DataStream flinkTweets = tweets.filter(new FlinkFilter()); 
public static class FlinkFilter implements FilterFunction { 
  @Override public boolean filter(String value) throws Exception { 
    return value.contains("flink");
  }
}

还可以将函数实现成匿名类

DataStream flinkTweets = tweets.filter(
  new FilterFunction() { 
    @Override public boolean filter(String value) throws Exception { 
      return value.contains("flink"); 
    }
  }
);

我们filter的字符串"flink"还可以当作参数传进去。

DataStream tweets = env.readTextFile("INPUT_FILE "); 
DataStream flinkTweets = tweets.filter(new KeyWordFilter("flink")); 
public static class KeyWordFilter implements FilterFunction { 
  private String keyWord; 

  KeyWordFilter(String keyWord) { 
    this.keyWord = keyWord; 
  } 

  @Override public boolean filter(String value) throws Exception { 
    return value.contains(this.keyWord); 
  } 
}

2.2 匿名函数(Lambda Functions)

DataStream tweets = env.readTextFile("INPUT_FILE"); 
DataStream flinkTweets = tweets.filter( tweet -> tweet.contains("flink") );

2.3 富函数(Rich Functions)

富函数”是DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。

​ 它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

RichMapFunction

RichFlatMapFunction

RichFilterFunction

........

Rich Function有一个生命周期的概念。典型的生命周期方法有:

open()方法是rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用。

close()方法是生命周期中的最后一个调用的方法,做一些清理工作。

getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态

public static class MyMapFunction extends RichMapFunction> { 

  @Override 
public Tuple2 map(SensorReading value) throws Exception {
    return new Tuple2<>(getRuntimeContext().getIndexOfThisSubtask(), value.getId()); 
  } 

  @Override 
public void open(Configuration parameters) throws Exception { 
    System.out.println("my map open"); // 以下可以做一些初始化工作,例如建立一个和HDFS的连接 
  } 

  @Override 
public void close() throws Exception { 
    System.out.println("my map close"); // 以下做一些清理工作,例如断开和HDFS的连接 
  } 
}

测试代码:

package apitest.transform;

import apitest.beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class TransformTest5_RichFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        DataStream inputStream = env.readTextFile("/tmp/Flink_Tutorial/src/main/resources/sensor.txt");

        // 转换成SensorReading类型
        DataStream dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        DataStream> resultStream = dataStream.map( new MyMapper() );

        resultStream.print();

        env.execute();
    }

    // 传统的Function不能获取上下文信息,只能处理当前数据,不能和其他数据交互
    public static class MyMapper0 implements MapFunction> {
        @Override
        public Tuple2 map(SensorReading value) throws Exception {
            return new Tuple2<>(value.getId(), value.getId().length());
        }
    }

    // 实现自定义富函数类(RichMapFunction是一个抽象类)
    public static class MyMapper extends RichMapFunction> {
        @Override
        public Tuple2 map(SensorReading value) throws Exception {
//            RichFunction可以获取State状态
//            getRuntimeContext().getState();
            return new Tuple2<>(value.getId(), getRuntimeContext().getIndexOfThisSubtask());
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            // 初始化工作,一般是定义状态,或者建立数据库连接
            System.out.println("open");
        }

        @Override
        public void close() throws Exception {
            // 一般是关闭连接和清空状态的收尾 *** 作
            System.out.println("close");
        }
    }
}

3.数据重新分区 *** 作

重分区 *** 作,在DataStream类中可以看到很多Partitioner字眼的类。其中partitionCustom(...)方法用于自定义重分区。

package apitest.transform;

import apitest.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class TransformTest6_Partition {
  public static void main(String[] args) throws Exception{

    // 创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 设置并行度 = 4
    env.setParallelism(4);

    // 从文件读取数据
    DataStream inputStream = env.readTextFile("/tmp/Flink_Tutorial/src/main/resources/sensor.txt");

    // 转换成SensorReading类型
    DataStream dataStream = inputStream.map(line -> {
      String[] fields = line.split(",");
      return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
    });

    // SingleOutputStreamOperator多并行度默认就rebalance,轮询方式分配
    dataStream.print("input");

    // 1. shuffle (并非批处理中的获取一批后才打乱,这里每次获取到直接打乱且分区)
    DataStream shuffleStream = inputStream.shuffle();
    shuffleStream.print("shuffle");

    // 2. keyBy (Hash,然后取模)
    dataStream.keyBy(SensorReading::getId).print("keyBy");

    // 3. global (直接发送给第一个分区,少数特殊情况才用)
    dataStream.global().print("global");

    env.execute();
  }
}

4.Sink

Flink之流处理API之Sink

Flink没有类似于spark中foreach方法,让用户进行迭代的 *** 作。虽有对外的输出 *** 作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出 *** 作。

stream.addSink(new MySink(xxxx))

官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。

 Kafka

pom依赖



    4.0.0

    org.example
    Flink_Tutorial
    1.0-SNAPSHOT

    
        8
        8
        1.12.1
        2.12
    

    
        
            org.apache.flink
            flink-java
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-scala_${scala.binary.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-clients_${scala.binary.version}
            ${flink.version}
        

        
        
            org.apache.flink
            flink-connector-kafka_${scala.binary.version}
            ${flink.version}
        
    

编写java代码

package apitest.sink;

import apitest.beans.SensorReading;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;


public class SinkTest1_Kafka {
    public static void main(String[] args) throws Exception{
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 并行度设置为1
        env.setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

        // 从Kafka中读取数据
        DataStream inputStream = env.addSource( new FlinkKafkaConsumer("sensor", new SimpleStringSchema(), properties));

        // 序列化从Kafka中读取的数据
        DataStream dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])).toString();
        });

        // 将数据写入Kafka
        dataStream.addSink( new FlinkKafkaProducer("localhost:9092", "sinktest", new SimpleStringSchema()));
        
        env.execute();
    }
}

Redis

这里将Redis当作sink的输出对象。

pom依赖

这个可谓相当老的依赖了,2017年的。


    org.apache.bahir
    flink-connector-redis_2.11
    1.0

编写java代码

package apitest.sink;

import apitest.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;


public class SinkTest2_Redis {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从文件读取数据
        DataStream inputStream = env.readTextFile("/tmp/Flink_Tutorial/src/main/resources/sensor.txt");

        // 转换成SensorReading类型
        DataStream dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // 定义jedis连接配置(我这里连接的是docker的redis)
        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                .setHost("localhost")
                .setPort(6379)
                .setPassword("123456")
                .setDatabase(0)
                .build();

        dataStream.addSink(new RedisSink<>(config, new MyRedisMapper()));

        env.execute();
    }

    // 自定义RedisMapper
    public static class MyRedisMapper implements RedisMapper {
        // 定义保存数据到redis的命令,存成Hash表,hset sensor_temp id temperature
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "sensor_temp");
        }

        @Override
        public String getKeyFromData(SensorReading data) {
            return data.getId();
        }

        @Override
        public String getValueFromData(SensorReading data) {
            return data.getTemperature().toString();
        }
    }
}

Elasticsearch

pom依赖


    org.apache.flink
    flink-connector-elasticsearch7_2.12
    1.12.1

编写java代码

package apitest.sink;

import apitest.beans.SensorReading;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;


public class SinkTest3_Es {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从文件读取数据
        DataStream inputStream = env.readTextFile("/tmp/Flink_Tutorial/src/main/resources/sensor.txt");

        // 转换成SensorReading类型
        DataStream dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // 定义es的连接配置
        List httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("localhost", 9200));

        dataStream.addSink( new ElasticsearchSink.Builder(httpHosts, new MyEsSinkFunction()).build());

        env.execute();
    }

    // 实现自定义的ES写入 *** 作
    public static class MyEsSinkFunction implements ElasticsearchSinkFunction {
        @Override
        public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) {
            // 定义写入的数据source
            HashMap dataSource = new HashMap<>();
            dataSource.put("id", element.getId());
            dataSource.put("temp", element.getTemperature().toString());
            dataSource.put("ts", element.getTimestamp().toString());

            // 创建请求,作为向es发起的写入命令(ES7统一type就是_doc,不再允许指定type)
            IndexRequest indexRequest = Requests.indexRequest()
                    .index("sensor")
                    .source(dataSource);

            // 用index发送请求
            indexer.add(indexRequest);
        }
    }
}

JDBC 自定义sink

pom依赖


    mysql
    mysql-connector-java
    8.0.19

新建数据库 CREATE DATAbase `flink_test` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;

新建schema 

CREATE TABLE `sensor_temp` ( `id` varchar(32) NOT NULL, `temp` double NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

编写java代码

package apitest.sink;

import apitest.beans.SensorReading;
import apitest.source.SourceTest4_UDF;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;


public class SinkTest4_Jdbc {
    public static void main(String[] args) throws Exception {

        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置并行度 = 1
        env.setParallelism(1);

        // 从文件读取数据
//        DataStream inputStream = env.readTextFile("/tmp/Flink_Tutorial/src/main/resources/sensor.txt");
//
//        // 转换成SensorReading类型
//        DataStream dataStream = inputStream.map(line -> {
//            String[] fields = line.split(",");
//            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
//        });

        // 使用之前编写的随机变动温度的SourceFunction来生成数据
        DataStream dataStream = env.addSource(new SourceTest4_UDF.MySensorSource());

        dataStream.addSink(new MyJdbcSink());

        env.execute();
    }

    // 实现自定义的SinkFunction
    public static class MyJdbcSink extends RichSinkFunction {
        // 声明连接和预编译语句
        Connection connection = null;
        PreparedStatement insertStmt = null;
        PreparedStatement updateStmt = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink_test?useUnicode=true&serverTimezone=Asia/Shanghai&characterEncoding=UTF-8&useSSL=false", "root", "example");
            insertStmt = connection.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)");
            updateStmt = connection.prepareStatement("update sensor_temp set temp = ? where id = ?");
        }

        // 每来一条数据,调用连接,执行sql
        @Override
        public void invoke(SensorReading value, Context context) throws Exception {
            // 直接执行更新语句,如果没有更新那么就插入
            updateStmt.setDouble(1, value.getTemperature());
            updateStmt.setString(2, value.getId());
            updateStmt.execute();
            if (updateStmt.getUpdateCount() == 0) {
                insertStmt.setString(1, value.getId());
                insertStmt.setDouble(2, value.getTemperature());
                insertStmt.execute();
            }
        }

        @Override
        public void close() throws Exception {
            insertStmt.close();
            updateStmt.close();
            connection.close();
        }
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/zaji/5154805.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-18
下一篇 2022-11-18

发表评论

登录后才能评论

评论列表(0条)

保存