1.linux环境下安装rdkafka依赖librdkafka:
wget https://github.com/edenhill/librdkafka/archive/refs/heads/master.zip
解压文件:unzip librdkafka-master.zip
安装扩展:
cd librdkafka
./configure
make && make install
2.安装php-rdkafka 扩展
wget https://github.com/arnaud-lb/php-rdkafka/archive/refs/heads/6.x.zip
unzip unzip 6.x.zip
mv php-rdkafka-6.x php-rdkafka
cd php-rdkafka
/usr/local/php/bin/phpize
./configure --with-php-config=/usr/local/php/bin/php-config
编译
make && make install
在php.ini添加
extension=rdkafka.so
重启php-fpm生效
3.kafka在php中的使用
创建生产者:
public s…message,
t
o
p
i
c
=
′
t
o
p
i
c
n
a
m
e
′
,
topic = 'topic_name',
topic=′topicname′,key)
{
$conf = new RdKafkaConf();
//创建生产者类型
KaTeX parse error: Undefined control sequence: RdKafka at position 16: producer = new ̲R̲d̲K̲a̲f̲k̲a̲Producer(conf);
//添加broker,可以添加多个broker,中间用英文逗号隔开
$producer->addBrokers(‘localhost:9092’);
//订阅topic主题,如果该topic不存在,则先创建该topic
$topic_obj =
p
r
o
d
u
c
e
r
−
>
n
e
w
T
o
p
i
c
(
producer->newTopic(
producer−>newTopic(topic);
//$key自定义key 根据key进行hash选取partitioner进行负载均衡
//分区策略:1-给定了分区号,直接将数据发送到指定的分区里面去,2-没有给定分区号,给定数据的key值,通过key取上hashCode进行分区
//3-既没有给定分区号,也没有给定key值,直接轮循进行分区,4-自定义分区
//设置为RD_KAFKA_PARTITION_UA(未赋值的),则会根据builtin partitioner去选择一个确定 partition
//第一个参数:是分区。RD_KAFKA_PARTITION_UA代表未分配,并让librdkafka选择分区,
//第二个参数:是消息标志,必须为0或者RD_KAFKA_MSG_F_BLOCK,代表在整个队列上阻塞生产,
//第三个参数:消息,如果不为NULL,它将被传递给主题分区程序
//第四个参数:自定义key
t
o
p
i
c
o
b
j
−
>
p
r
o
d
u
c
e
(
R
D
K
A
F
K
A
P
A
R
T
I
T
I
O
N
U
A
,
0
,
j
s
o
n
e
n
c
o
d
e
(
topic_obj->produce(RD_KAFKA_PARTITION_UA, 0, json_encode(
topicobj−>produce(RDKAFKAPARTITIONUA,0,jsonencode(message),$key);
//发送完消息后调用此接口,超时是毫秒级的时间,函数会阻塞timeout毫秒等待事件处理,调用设置的回调函数。timeout为0是非阻塞状态。
$producer->poll(0);
//等待Producer队列中的所有消息被传递。这是一个方便的方法,它调用poll(),直到len()为零或可选的超时结束。
$error = $producer->flush(1000);
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $error)
{
throw new Exception(‘Was unable to flush, messages might be lost!’);
}
}
创建消费者:
public s…topic_name=‘test’)
{
$conf = new RdKafkaConf();
$conf->set(‘group.id’, ‘test’);
//添加 kafka集群服务地址 //$conf->set('metadata.broker.list', 'localhost:9092,localhost:9093'); $rk = new RdKafkaConsumer($conf); // 设置日志级别 //$rk->setLogLevel(LOG_DEBUG); // 添加broker(经纪人) $rk->addBrokers("localhost:9093"); $topicConf = new RdKafkaTopicConf(); //0 生产者producer不等待来自broker同步完成的确认继续发送下一条(批)消息 //1 producer在leader已成功收到的数据并得到确认后发送下一条message //-1 producer在follower副本确认接收到数据后发送完成 $topicConf->set('request.required.acks', -1); //自动提交offset(在interval.ms的时间内自动提交确认、建议不要启动) $topicConf->set('enable.auto.commit', 1); //自动提交时间单位毫秒 $topicConf->set('auto.commit.interval.ms', 100); //设置offset的存储为broker-----------存储方式包括(file、broker) $topicConf->set('offset.store.method', 'broker'); //当没有初始偏移量时,从哪里开始读取,smallest:简单理解为从头开始消费,largest:简单理解为从最新的开始消费 $topicConf->set('auto.offset.reset', 'smallest'); //消费者订阅topic主题 $topic = $rk->newTopic($topic_name, $topicConf); // 参数1消费分区0 // RD_KAFKA_OFFSET_BEGINNING 重头开始消费 // RD_KAFKA_OFFSET_STORED 最后一条消费的offset记录开始消费 // RD_KAFKA_OFFSET_END 最后一条消费 $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { //参数1表示消费分区,这里是分区0;参数2表示同步阻塞1.2s超时(第二个参数是等待收到消息的最长时间,1000是一秒) $message = $topic->consume(0, 1200); if (!empty($message)) { switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: // print_r(json_decode($message->payload,true)); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for moren"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed outn"; break; default: throw new Exception($message->errstr() . ',Line=' . __LINE__, $message->err); break; } } else { //echo "No more messagesn"; break; } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)