package com.jxtele; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.util.Collector; import java.util.Properties; public class KafkaWordCount { public static void main(String[] args) throws Exception{ // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //默认情况下,检查点被禁用。要启用检查点,请在StreamExecutionEnvironment上调用enableCheckpointing(n)方法, // 其中n是以毫秒为单位的检查点间隔。每隔5000 ms进行启动一个检查点,则下一个检查点将在上一个检查点完成后5秒钟内启动 env.enableCheckpointing(500); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "hadoop102:9092,hadoop102:9092,hadoop102:9092");//kafka的节点的IP或者hostName,多个使用逗号分隔 properties.setProperty("zookeeper.connect", "hadoop102:2181,hadoop102:2181,hadoop102:2181");//zookeeper的节点的IP或者hostName,多个使用逗号进行分隔 properties.setProperty("group.id", "test-consumer-group");//flink consumer flink的消费者的group.id FlinkKafkaConsumer010myConsumer = new FlinkKafkaConsumer010 ("test1", new org.apache.flink.api.common.serialization.SimpleStringSchema(), properties); DataStream inputDataStream = env.addSource(myConsumer);; DataStream > wordCountDataStream = inputDataStream .flatMap( new MyFlatMapper()) .keyBy(0) .sum(1); wordCountDataStream.print().setParallelism(1); env.execute(); } public static class MyFlatMapper implements FlatMapFunction > { public void flatMap(String value, Collector > out) throws Exception { String[] words = value.split(" "); for (String word : words) { out.collect(new Tuple2 (word, 1)); } } } }
kafka 启动:略
测试生产者代码
package com.jxtele.learn.kafka; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.Recordmetadata; import java.util.Properties; public class Producer { public static void main(String[] args) throws Exception { // 创建生产者 Properties props = new Properties(); // 这里可以配置一个或多个broker地址,会自动从broker去拉取元数据进行缓存 props.put("bootstrap.servers", "192.168.56.102:9092,192.168.56.103:9092,192.168.56.104:9092"); // 这里是把发送的key从字符串序列化为字节数组 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 这里是把发送的message从字符串序列化为字节数组 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 创建一个Producer实例:线程资源,跟各个broker建立socket连接资源 KafkaProducerproducer = new KafkaProducer (props); //ProducerRecord record = new ProducerRecord<>( // "test", "message1"); ProducerRecord record = new ProducerRecord<>( "test1","", "dasdas"); //kafka发送数据有两种方式: //1:异步的方式。 // 这是异步发送的模式 producer.send(record, new Callback() { @Override public void onCompletion(Recordmetadata metadata, Exception exception) { if (exception == null) { // 消息发送成功 System.out.println("消息发送成功"); } else { // 消息发送失败,需要重新发送 } } }); Thread.sleep(10 * 1000); //第二种方式:这是同步发送的模式 // producer.send(record).get(); // 你要一直等待人家后续一系列的步骤都做完,发送消息之后 // 有了消息的回应返回给你,你这个方法才会退出来 producer.close(); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)