集群规划(辛苦我的小本本了,拖8台centos):
flink采用on yarn模式,机器资源有限,ck只装了单节点
hadoop
flink
kafka
NameNode
DFSZKFailoverController(zkfc)
JobManager
TaskManager
hadoop
flink
NameNode
DFSZKFailoverController(zkfc)
JobManager
TaskManager
hadoop
flink
ResourceManager
TaskManager
hadoop
zookeeper
kafka
DataNode
NodeManager
JournalNode
QuorumPeerMain
hadoop
zookeeper
kafka
DataNode
NodeManager
JournalNode
QuorumPeerMain
hadoop
zookeeper
kafka
DataNode
NodeManager
JournalNode
QuorumPeerMain
flink主要代码
public class Kafka_To_Flink_To_Clickhouse { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); Properties properties=new Properties(); properties.setProperty("bootstrap.servers","zcx4:9092"); FlinkKafkaConsumerstringFlinkKafkaConsumer = new FlinkKafkaConsumer ("zcx1",new SimpleStringSchema(),properties); stringFlinkKafkaConsumer.setStartFromEarliest(); DataStreamSource topic = env.addSource(stringFlinkKafkaConsumer); SingleOutputStreamOperator map = topic.map(new MapFunction () { @Override public String map(String s) throws Exception { return s; } }); tenv.registerDataStream("zcx1",map,"name"); Table result = tenv.sqlQuery("select name from zcx1"); DataStream rowDataStream = tenv.toDataStream(result,Zcx1.class); rowDataStream.print(); rowDataStream.addSink(new MyClickhouseUtil()); env.execute(); } }
public class MyClickhouseUtil extends RichSinkFunction{ Connection connection; PreparedStatement preparedStatement; @Override public void invoke(Zcx1 value, Context context) throws Exception { preparedStatement.setString(1,value.name); // preparedStatement.setInt(2,value.num); preparedStatement.execute(); } @Override public void open(Configuration parameters) throws Exception { Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); connection = DriverManager.getConnection("jdbc:clickhouse://192.168.220.142:8123/default","default","GsAdBi/O"); preparedStatement = connection.prepareStatement("insert into zcx1 values(?)"); } @Override public void close() throws Exception { if(null!=connection){ connection.close(); } if(null!=preparedStatement){ preparedStatement.close(); } } }
测试
kafka producer生产数据
实时写入clickhouse
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)