Flink实时写入Mysql数据

Flink实时写入Mysql数据,第1张

Flink实时写入Mysql数据

1.需要的依赖

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; 、
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;

使用JDBCAppendTableSink方法创建mysqlsink写入,数据是追加到数据库的

 //创建流处理执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        environment.setParallelism(2);
        //加载自定义数据形成数据流
        DataStreamSource personSource = environment.addSource(new DataDB());
        //转换为Row类型才能写入mysql
        DataStream personRow = personSource.map(new MapFunction() {
            @Override
            public Row map(Person person) throws Exception {
                Row row = new Row(4);
                row.setField(0, person.getPid());
                row.setField(1, person.getPname());
                row.setField(2, person.getPsex());
                row.setField(3, person.getPage());
                return row;
            }
        });
        //定义调用JDBCAppendTableSink创建mysqlSink
        JDBCAppendTableSink tableSink = JDBCAppendTableSink.builder()
                //驱动程序
                .setDrivername("com.mysql.cj.jdbc.Driver")
                //连接URL地址
                .setDBUrl("jdbc:mysql://localhost:3306/school?characterEncoding=utf-8&useSSL=false&serverTimezone=UTC")
                //用户名
                .setUsername("root")
                //密码
                .setPassword("1234")
                //写入类型
                .setParameterTypes(BasicTypeInfo.INT_TYPE_INFO,
                                    BasicTypeInfo.STRING_TYPE_INFO,
                                    BasicTypeInfo.STRING_TYPE_INFO,
                                    BasicTypeInfo.INT_TYPE_INFO)
                //写入语句
                .setQuery("insert into person values (?,?,?,?);")
                //一次批量写入条数
                .setBatchSize(10)
                .build();
        //调用mysqlsink写入mysql数据
        tableSink.emitDataStream(personRow);
        //执行程序
        environment.execute();

如果出现以下报错,

The server time zone value ‘�й���׼ʱ��’ is unrecognized or represents
more than one time zone. You must configure either the server or JDBC
driver (via the serverTimezone configuration property) to use a more
specifc time zone value if you want to utilize time zone support.

需要在url地址里面添加如下代码

&serverTimezone=UTC

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/zaji/5716403.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存