PHP项目中Kafka的使用

PHP项目中Kafka的使用,第1张

PHP项目中Kafka的使用

1.linux环境下安装rdkafka依赖librdkafka:
wget https://github.com/edenhill/librdkafka/archive/refs/heads/master.zip
解压文件:unzip librdkafka-master.zip
安装扩展:
cd librdkafka
./configure
make && make install

2.安装php-rdkafka 扩展
wget https://github.com/arnaud-lb/php-rdkafka/archive/refs/heads/6.x.zip
unzip unzip 6.x.zip
mv php-rdkafka-6.x php-rdkafka
cd php-rdkafka
/usr/local/php/bin/phpize
./configure --with-php-config=/usr/local/php/bin/php-config
编译
make && make install
在php.ini添加
extension=rdkafka.so
重启php-fpm生效

3.kafka在php中的使用
创建生产者:
public s…message, t o p i c = ′ t o p i c n a m e ′ , topic = 'topic_name', topic=′topicn​ame′,key)
{
$conf = new RdKafkaConf();
//创建生产者类型
KaTeX parse error: Undefined control sequence: RdKafka at position 16: producer = new ̲R̲d̲K̲a̲f̲k̲a̲Producer(conf);
//添加broker,可以添加多个broker,中间用英文逗号隔开
$producer->addBrokers(‘localhost:9092’);
//订阅topic主题,如果该topic不存在,则先创建该topic
$topic_obj = p r o d u c e r − > n e w T o p i c ( producer->newTopic( producer−>newTopic(topic);
//$key自定义key 根据key进行hash选取partitioner进行负载均衡
//分区策略:1-给定了分区号,直接将数据发送到指定的分区里面去,2-没有给定分区号,给定数据的key值,通过key取上hashCode进行分区
//3-既没有给定分区号,也没有给定key值,直接轮循进行分区,4-自定义分区
//设置为RD_KAFKA_PARTITION_UA(未赋值的),则会根据builtin partitioner去选择一个确定 partition
//第一个参数:是分区。RD_KAFKA_PARTITION_UA代表未分配,并让librdkafka选择分区,
//第二个参数:是消息标志,必须为0或者RD_KAFKA_MSG_F_BLOCK,代表在整个队列上阻塞生产,
//第三个参数:消息,如果不为NULL,它将被传递给主题分区程序
//第四个参数:自定义key
t o p i c o b j − > p r o d u c e ( R D K A F K A P A R T I T I O N U A , 0 , j s o n e n c o d e ( topic_obj->produce(RD_KAFKA_PARTITION_UA, 0, json_encode( topico​bj−>produce(RDK​AFKAP​ARTITIONU​A,0,jsone​ncode(message),$key);
//发送完消息后调用此接口,超时是毫秒级的时间,函数会阻塞timeout毫秒等待事件处理,调用设置的回调函数。timeout为0是非阻塞状态。
$producer->poll(0);
//等待Producer队列中的所有消息被传递。这是一个方便的方法,它调用poll(),直到len()为零或可选的超时结束。
$error = $producer->flush(1000);
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $error)
{
throw new Exception(‘Was unable to flush, messages might be lost!’);
}
}

创建消费者:
public s…topic_name=‘test’)
{
$conf = new RdKafkaConf();
$conf->set(‘group.id’, ‘test’);

    //添加 kafka集群服务地址
    //$conf->set('metadata.broker.list', 'localhost:9092,localhost:9093');

    $rk = new RdKafkaConsumer($conf);
    // 设置日志级别
    //$rk->setLogLevel(LOG_DEBUG);
    // 添加broker(经纪人)
    $rk->addBrokers("localhost:9093");
    $topicConf = new RdKafkaTopicConf();

    //0  生产者producer不等待来自broker同步完成的确认继续发送下一条(批)消息
    //1  producer在leader已成功收到的数据并得到确认后发送下一条message
    //-1 producer在follower副本确认接收到数据后发送完成
    $topicConf->set('request.required.acks', -1);

    //自动提交offset(在interval.ms的时间内自动提交确认、建议不要启动)
    $topicConf->set('enable.auto.commit', 1);

    //自动提交时间单位毫秒
    $topicConf->set('auto.commit.interval.ms', 100);

    //设置offset的存储为broker-----------存储方式包括(file、broker)
    $topicConf->set('offset.store.method', 'broker');
    //当没有初始偏移量时,从哪里开始读取,smallest:简单理解为从头开始消费,largest:简单理解为从最新的开始消费
    $topicConf->set('auto.offset.reset', 'smallest');

    //消费者订阅topic主题
    $topic = $rk->newTopic($topic_name, $topicConf);
    // 参数1消费分区0
    // RD_KAFKA_OFFSET_BEGINNING 重头开始消费
    // RD_KAFKA_OFFSET_STORED 最后一条消费的offset记录开始消费
    // RD_KAFKA_OFFSET_END 最后一条消费
    $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

    while (true)
    {
        
        //参数1表示消费分区,这里是分区0;参数2表示同步阻塞1.2s超时(第二个参数是等待收到消息的最长时间,1000是一秒)
        $message = $topic->consume(0, 1200);
        if (!empty($message))
        {
            switch ($message->err)
            {
                case RD_KAFKA_RESP_ERR_NO_ERROR:                       
                    // print_r(json_decode($message->payload,true));
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    echo "No more messages; will wait for moren";
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    echo "Timed outn";
                    break;
                default:
                    throw new Exception($message->errstr() . ',Line=' . __LINE__, $message->err);
                    break;
            }
        } else {
            //echo "No more messagesn";
            break;
        }
    }

}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/zaji/5688914.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存