Flink消费kafka数据实时写入Clickhouse(java版本)

Flink消费kafka数据实时写入Clickhouse(java版本),第1张

Flink消费kafka数据实时写入Clickhouse(java版本)

集群规划(辛苦我的小本本了,拖8台centos):

flink采用on yarn模式,机器资源有限,ck只装了单节点

域名IP安装的软件运行的进程zcx1192.168.220.128

hadoop

flink

kafka

NameNode

DFSZKFailoverController(zkfc)

JobManager

TaskManager

zcx2192.168.220.129

hadoop

flink

NameNode

DFSZKFailoverController(zkfc)

JobManager

TaskManager

zcx3192.168.220.130

hadoop

flink

ResourceManager

TaskManager

zcx4192.168.220.131

hadoop

zookeeper

kafka

DataNode

NodeManager

JournalNode

QuorumPeerMain

zcx5192.168.220.132

hadoop

zookeeper

kafka

DataNode

NodeManager

JournalNode

QuorumPeerMain

zcx6192.168.220.133    

hadoop

zookeeper

kafka

DataNode

NodeManager

JournalNode

QuorumPeerMain

zcx7192.168.220.134hadoopResourceManagerck3192.168.220.142clickhoueclickhouse-server

flink主要代码

public class Kafka_To_Flink_To_Clickhouse {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
        Properties properties=new Properties();
        properties.setProperty("bootstrap.servers","zcx4:9092");
        FlinkKafkaConsumer stringFlinkKafkaConsumer = new FlinkKafkaConsumer("zcx1",new SimpleStringSchema(),properties);
        stringFlinkKafkaConsumer.setStartFromEarliest();
        DataStreamSource topic = env.addSource(stringFlinkKafkaConsumer);
        SingleOutputStreamOperator map = topic.map(new MapFunction() {
            @Override
            public String map(String s) throws Exception {
                return s;
            }
        });
        tenv.registerDataStream("zcx1",map,"name");
        Table result = tenv.sqlQuery("select name from zcx1");
        DataStream rowDataStream = tenv.toDataStream(result,Zcx1.class);
        rowDataStream.print();
        rowDataStream.addSink(new MyClickhouseUtil());
        env.execute();
    }
}
public class MyClickhouseUtil extends RichSinkFunction {
    Connection connection;
    PreparedStatement preparedStatement;

    @Override
    public void invoke(Zcx1 value, Context context) throws Exception {
        preparedStatement.setString(1,value.name);
//        preparedStatement.setInt(2,value.num);
        preparedStatement.execute();
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
        connection = DriverManager.getConnection("jdbc:clickhouse://192.168.220.142:8123/default","default","GsAdBi/O");
        preparedStatement = connection.prepareStatement("insert into zcx1 values(?)");
    }

    @Override
    public void close() throws Exception {
        if(null!=connection){
            connection.close();
        }
        if(null!=preparedStatement){
            preparedStatement.close();
        }
    }
}

测试

kafka producer生产数据

实时写入clickhouse

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/zaji/5715599.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存