Kafka(七).Springboot集成,Kafka监控Eagle

Kafka(七).Springboot集成,Kafka监控Eagle,第1张

Kafka(七).Springboot集成,Kafka监控Eagle Kafka(七).Kafka监控 Kafka -Eagle 和SpringBoot集成 一.Kafka监控安装 1.下载软件压缩包 http://download.kafka-eagle.org/ 2.解压后文件有
[root@CentOSA kafka-eagle]# ll
总用量 0
drwxr-xr-x. 2 root root 33 1月   2 22:26 bin
drwxr-xr-x. 2 root root 62 1月   2 22:26 conf
drwxr-xr-x. 2 root root  6 9月  13 01:12 db
drwxr-xr-x. 2 root root 23 1月   2 22:26 font
drwxr-xr-x. 9 root root 91 9月  13 01:12 kms
drwxr-xr-x. 2 root root  6 9月  13 01:12 logs
3.配置文件
[root@CentOSA conf]# cat system-config.properties 
######################################
# multi zookeeper & kafka cluster list
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
######################################
efak.zk.cluster.alias=cluster1,cluster2
# 1.修改zookeeper 集群
cluster1.zk.list=CentOSA:2181,CentOSB:2181,CentOSC:2181
# cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181

######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123

######################################
# broker size online list
######################################
cluster1.efak.broker.size=20

######################################
# zk client thread limit
######################################
kafka.zk.limit.size=32

######################################
# EFAK webui port 2.服务端口
######################################
efak.webui.port=8048

######################################
# kafka jmx acl and ssl authenticate
######################################
cluster1.efak.jmx.acl=false
cluster1.efak.jmx.user=keadmin
cluster1.efak.jmx.password=keadmin123
cluster1.efak.jmx.ssl=false
cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore
cluster1.efak.jmx.truststore.password=ke123456

######################################
# kafka offset storage 
######################################
cluster1.efak.offset.storage=kafka
# cluster2.efak.offset.storage=zk

######################################
# kafka jmx uri
######################################
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi

######################################
# kafka metrics, 15 days by default 3.报表图 这个要开必须开启kafka jmx  
######################################
efak.metrics.charts=true
efak.metrics.retain=15

######################################
# kafka sql topic records max
######################################
efak.sql.topic.records.max=5000
efak.sql.topic.preview.records.max=10

######################################
# delete kafka topic token  4.管理的密码
######################################
efak.topic.token=keadmin

######################################
# kafka sasl authenticate
######################################
cluster1.efak.sasl.enable=false
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=SCRAM-SHA-256
cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
cluster1.efak.sasl.client.id=
cluster1.efak.blacklist.topics=
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=
#cluster2.efak.sasl.enable=false
#cluster2.efak.sasl.protocol=SASL_PLAINTEXT
#cluster2.efak.sasl.mechanism=PLAIN
#cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
#cluster2.efak.sasl.client.id=
#cluster2.efak.blacklist.topics=
#cluster2.efak.sasl.cgroup.enable=false
#cluster2.efak.sasl.cgroup.topics=

######################################
# kafka ssl authenticate
######################################
cluster3.efak.ssl.enable=false
cluster3.efak.ssl.protocol=SSL
cluster3.efak.ssl.truststore.location=
cluster3.efak.ssl.truststore.password=
cluster3.efak.ssl.keystore.location=
cluster3.efak.ssl.keystore.password=
cluster3.efak.ssl.key.password=
cluster3.efak.ssl.endpoint.identification.algorithm=https
cluster3.efak.blacklist.topics=
cluster3.efak.ssl.cgroup.enable=false
cluster3.efak.ssl.cgroup.topics=

######################################
# kafka sqlite jdbc driver address 
######################################
efak.driver=org.sqlite.JDBC
efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
efak.username=root
efak.password=www.kafka-eagle.org

######################################
# kafka mysql jdbc driver address 5.插件依赖的数据库
######################################
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=root
4.配置KE_HOME

因为启动的时候需要这个变量;

二. Springboot 项目集成 1.pom.xml


    4.0.0

    com.sff
    springbootkafka
    1.0-SNAPSHOT

    
        org.springframework.boot
        spring-boot-starter-parent
        2.2.6.RELEASE
    

    
        
            org.springframework.boot
            spring-boot-starter
        

        
            org.springframework.kafka
            spring-kafka
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        
    

    
        8
        8
    


2.配置文件
spring.kafka.bootstrap-servers=CentOSA:9092,CentOSB:9092,CentOSC:9092

#producer
spring.kafka.producer.retries=5
spring.kafka.producer.acks=all
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
## 开启事务 如果 1.代码里面使用事务的api 2.使用transaction  若无发送数据会报错
spring.kafka.producer.transaction-id-prefix=transaction-id-
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.properties.enable.idempotence=true

# consumer
spring.kafka.consumer.group-id=springboot-kafka
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
##批量消费
# 若没有配置 则每次消费一个
# 若配置 KafkaListeners 注解的方法 就不是一个对象了 而是一个List  使用一个对象会报错
spring.kafka.listener.type=batch
spring.kafka.listener.poll-timeout=1000
spring.kafka.listener.concurrency=1
spring.kafka.consumer.max-poll-records=20
3.监听kafka代码 3.1 则每次消费一个
    @KafkaListeners(
            value = {
                    @KafkaListener(topics = {"topic06"})
            }
    )
    public void recevice3(ConsumerRecord record) {
        System.out.println("topic06: "+record.value());
    }
3.2 每次消费多个
    @KafkaListeners(
            value = {
                    @KafkaListener(topics = {"topic06"})
            }
    )
    public void recevice3(List> records) {
        System.out.println("consumer size:"+records.size());
        for (ConsumerRecord record : records) {
            System.out.println("topic06: " + record.value());
        }
    }
4.发送kafka代码 4.1 发送没有事务
    public void sendMessage(String topic ,String value) throws InterruptedException {
        ProducerRecord record = new ProducerRecord<>(topic, value);
        kafkaTemplate.send(record);
    }
4.2 发送携带事务一
    public void testSend(){
        kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
            @Override
            public Object doInOperations(KafkaOperations operations) {
                for (int i = 0; i < 50; i++) {
                    ProducerRecord record = new ProducerRecord<>("topic06", "testSend"+i);
                    kafkaTemplate.send(record);
                }
                return null;
            }
        });
    }
4.3 发送携带事务二
    @Transactional
    public void sendMessage(String topic, String value)  {
        ProducerRecord record = new ProducerRecord<>(topic, value);
        kafkaTemplate.send(record);
    }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/zaji/5696200.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存