Kafka的移动偏移量(JAVA Apache Kafka)

Kafka的移动偏移量(JAVA Apache Kafka),第1张

Kafka的移动偏移量(JAVA Apache Kafka)

Kafka的移动偏移量
  • 1.pom依赖
  • 2.application.properties 配置文件
  • 3. 消费者 配置自动读取
  • 4. 偏移量工具
  • 5.测试


1.pom依赖

如果是springboot项目可以不指定版本,自动匹配

		
		
			org.apache.kafka
			kafka-clients
			2.6.0
		
		
		
			org.apache.kafka
			kafka-streams
			2.6.0
		
2.application.properties 配置文件
server.port=2333

#—————————————————————————————————Offset———————————————————————————————————————————
jmw.kafka.offset.bootstrap.servers=ip:9092
jmw.kafka.offset.zookeeper.connect=ip:2181
jmw.kafka.offset.topic=xxx
jmw.kafka.offset.group.id=xxx

jmw.kafka.offset.enable.auto.commit=false
offset.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
offset.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
3. 消费者 配置自动读取
package cn.com.kaf.configuration;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;


@Component
public class OffsetApplicationProperties implements InitializingBean {

    @Value("${jmw.kafka.offset.bootstrap.servers}")
    private String serverHostPort;

    @Value("${jmw.kafka.offset.zookeeper.connect}")
    private String zookeeperConnectHostPort;

    @Value("${jmw.kafka.offset.topic}")
    private String offsetTopic;

    @Value("${jmw.kafka.offset.group.id}")
    private String groupId;

    @Value("${jmw.kafka.offset.enable.auto.commit}")
    private String autoAutoCommit;

    @Value("${offset.key.deserializer}")
    private String key;

    @Value("${offset.value.deserializer}")
    private String value;

    
    public static String KAFKA_OFFSET_SERVER_HOST_PORT;

    
    public static String KAFKA_OFFSET_ZOOKEEPER_CONNECT;

    
    public static String KAFKA_OFFSET_TOPIC;

    public static String KAFKA_OFFSET_GROUP_ID;

    public static String KAFKA_OFFSET_ENABLE_AUTO_COMMIT;

    public static String KAFKA_KEY_SERIALIZER;

    public static String KAFKA_VALUE_SERIALIZER;

    @Override
    public void afterPropertiesSet() throws Exception {
        KAFKA_OFFSET_SERVER_HOST_PORT = serverHostPort;
        KAFKA_OFFSET_ZOOKEEPER_ConNECT = zookeeperConnectHostPort;
        KAFKA_OFFSET_TOPIC = offsetTopic;
        KAFKA_OFFSET_GROUP_ID = groupId;
        KAFKA_OFFSET_ENABLE_AUTO_COMMIT = autoAutoCommit;
        KAFKA_KEY_SERIALIZER = key;
        KAFKA_VALUE_SERIALIZER = value;
    }
}

4. 偏移量工具
package cn.com.kaf.seek;

import cn.com.kaf.configuration.OffsetApplicationProperties;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.springframework.stereotype.Component;

import java.util.*;


@Component
public class SeekOffset extends OffsetApplicationProperties {

    private static KafkaConsumer consumer = null;

    
    public static void SingleCase(int target){
        KafkaConsumer kafkaConsumer = consumer;

        final String topicall = KAFKA_OFFSET_TOPIC;
        String[] topics = topicall.split(",");

        for(String topic:topics) {
            consumer.subscribe(Arrays.asList(topic.split(",")));
            System.out.println("获取订阅-开始拉去数据");
            ConsumerRecords records = consumer.poll(10000);
            System.err.println("偏移量记录位置为: "+records.count());
            System.err.println("希望偏移量参数位置为: "+target);

            List list = consumer.partitionsFor(topic);
            System.err.println(topic + " 主题 的分区数为:" + list.size());

            List topicList = new ArrayList();

            for (PartitionInfo pt : list) {
                TopicPartition tp = new TopicPartition(topic, pt.partition());

                topicList.add(tp);
            }
            Map endMap = consumer.endOffsets(topicList);
            Map beginmap = consumer.beginningOffsets(topicList);

            int i=0;
            long aimOffset = 0;
            for (TopicPartition tp : topicList) {
                System.err.println("消费者为"+tp);
                long endOffset = endMap.get(tp);
                long beginOffset = beginmap.get(tp);

                aimOffset = endOffset - target;

                i++;
                System.err.println("topic数据总量为:"+(endOffset-beginOffset));
                if(aimOffset>0&&aimOffset>=beginOffset){
                    consumer.seek(tp, aimOffset);
                    System.err.println("偏移量—>移动成功: "+tp+"|"+aimOffset);
                }else{
                    consumer.seek(tp, beginOffset);
                    System.err.println("移动失败->并且移动至起始位置:"+tp+"|"+aimOffset+"|"+beginOffset+"|"+endOffset);
                }

            }
            consumer.commitSync();
            consumer.close();
            System.exit(0);
        }
    }

    private SeekOffset() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers",KAFKA_OFFSET_SERVER_HOST_PORT);//xxx服务器ip
        properties.put("enable.auto.commit",KAFKA_OFFSET_ENABLE_AUTO_COMMIT);
        properties.put("zookeeper.connect",KAFKA_OFFSET_ZOOKEEPER_CONNECT);
        properties.put("key.deserializer", KAFKA_KEY_SERIALIZER);
        properties.put("value.deserializer", KAFKA_VALUE_SERIALIZER);
        properties.put("group.id", KAFKA_OFFSET_GROUP_ID);
        consumer = new KafkaConsumer<>(properties);
    }

}


5.测试
import cn.com.kaf.DemoApplication;
import cn.com.kaf.consumer.ConsumerFactoryTool;
import cn.com.kaf.producer.ProducerFactoryTool;
import cn.com.kaf.seek.SeekOffset;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.junit.Before;
import org.junit.Test;
import org.springframework.boot.SpringApplication;


public class Te {

    @Before
    public void StBefore() {
        String[] args = new String[0];
        SpringApplication.run(DemoApplication.class, args);
    }

    @Test
    public void setOffset() throws InterruptedException {
        SeekOffset.SingleCase(2);
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/zaji/5635326.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存