- 1.pom依赖
- 2.application.properties 配置文件
- 3. 消费者 配置自动读取
- 4. 偏移量工具
- 5.测试
1.pom依赖
如果是springboot项目可以不指定版本,自动匹配
2.application.properties 配置文件org.apache.kafka kafka-clients2.6.0 org.apache.kafka kafka-streams2.6.0
server.port=2333 #—————————————————————————————————Offset——————————————————————————————————————————— jmw.kafka.offset.bootstrap.servers=ip:9092 jmw.kafka.offset.zookeeper.connect=ip:2181 jmw.kafka.offset.topic=xxx jmw.kafka.offset.group.id=xxx jmw.kafka.offset.enable.auto.commit=false offset.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer offset.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer3. 消费者 配置自动读取
package cn.com.kaf.configuration; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Component public class OffsetApplicationProperties implements InitializingBean { @Value("${jmw.kafka.offset.bootstrap.servers}") private String serverHostPort; @Value("${jmw.kafka.offset.zookeeper.connect}") private String zookeeperConnectHostPort; @Value("${jmw.kafka.offset.topic}") private String offsetTopic; @Value("${jmw.kafka.offset.group.id}") private String groupId; @Value("${jmw.kafka.offset.enable.auto.commit}") private String autoAutoCommit; @Value("${offset.key.deserializer}") private String key; @Value("${offset.value.deserializer}") private String value; public static String KAFKA_OFFSET_SERVER_HOST_PORT; public static String KAFKA_OFFSET_ZOOKEEPER_CONNECT; public static String KAFKA_OFFSET_TOPIC; public static String KAFKA_OFFSET_GROUP_ID; public static String KAFKA_OFFSET_ENABLE_AUTO_COMMIT; public static String KAFKA_KEY_SERIALIZER; public static String KAFKA_VALUE_SERIALIZER; @Override public void afterPropertiesSet() throws Exception { KAFKA_OFFSET_SERVER_HOST_PORT = serverHostPort; KAFKA_OFFSET_ZOOKEEPER_ConNECT = zookeeperConnectHostPort; KAFKA_OFFSET_TOPIC = offsetTopic; KAFKA_OFFSET_GROUP_ID = groupId; KAFKA_OFFSET_ENABLE_AUTO_COMMIT = autoAutoCommit; KAFKA_KEY_SERIALIZER = key; KAFKA_VALUE_SERIALIZER = value; } }4. 偏移量工具
package cn.com.kaf.seek; import cn.com.kaf.configuration.OffsetApplicationProperties; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.springframework.stereotype.Component; import java.util.*; @Component public class SeekOffset extends OffsetApplicationProperties { private static KafkaConsumer consumer = null; public static void SingleCase(int target){ KafkaConsumer kafkaConsumer = consumer; final String topicall = KAFKA_OFFSET_TOPIC; String[] topics = topicall.split(","); for(String topic:topics) { consumer.subscribe(Arrays.asList(topic.split(","))); System.out.println("获取订阅-开始拉去数据"); ConsumerRecords5.测试records = consumer.poll(10000); System.err.println("偏移量记录位置为: "+records.count()); System.err.println("希望偏移量参数位置为: "+target); List list = consumer.partitionsFor(topic); System.err.println(topic + " 主题 的分区数为:" + list.size()); List topicList = new ArrayList (); for (PartitionInfo pt : list) { TopicPartition tp = new TopicPartition(topic, pt.partition()); topicList.add(tp); } Map endMap = consumer.endOffsets(topicList); Map beginmap = consumer.beginningOffsets(topicList); int i=0; long aimOffset = 0; for (TopicPartition tp : topicList) { System.err.println("消费者为"+tp); long endOffset = endMap.get(tp); long beginOffset = beginmap.get(tp); aimOffset = endOffset - target; i++; System.err.println("topic数据总量为:"+(endOffset-beginOffset)); if(aimOffset>0&&aimOffset>=beginOffset){ consumer.seek(tp, aimOffset); System.err.println("偏移量—>移动成功: "+tp+"|"+aimOffset); }else{ consumer.seek(tp, beginOffset); System.err.println("移动失败->并且移动至起始位置:"+tp+"|"+aimOffset+"|"+beginOffset+"|"+endOffset); } } consumer.commitSync(); consumer.close(); System.exit(0); } } private SeekOffset() { Properties properties = new Properties(); properties.put("bootstrap.servers",KAFKA_OFFSET_SERVER_HOST_PORT);//xxx服务器ip properties.put("enable.auto.commit",KAFKA_OFFSET_ENABLE_AUTO_COMMIT); properties.put("zookeeper.connect",KAFKA_OFFSET_ZOOKEEPER_CONNECT); properties.put("key.deserializer", KAFKA_KEY_SERIALIZER); properties.put("value.deserializer", KAFKA_VALUE_SERIALIZER); properties.put("group.id", KAFKA_OFFSET_GROUP_ID); consumer = new KafkaConsumer<>(properties); } }
import cn.com.kaf.DemoApplication; import cn.com.kaf.consumer.ConsumerFactoryTool; import cn.com.kaf.producer.ProducerFactoryTool; import cn.com.kaf.seek.SeekOffset; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.junit.Before; import org.junit.Test; import org.springframework.boot.SpringApplication; public class Te { @Before public void StBefore() { String[] args = new String[0]; SpringApplication.run(DemoApplication.class, args); } @Test public void setOffset() throws InterruptedException { SeekOffset.SingleCase(2); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)