教你如何快速进行php+kafka的安装

教你如何快速进行php+kafka的安装,第1张

概述教你如何快速进行php+kafka的安装

我们学习了解了这么多关于PHP的知识,今天教你们如何快速进行PHP+kafka的安装,如果不会的“童鞋”,那就跟随本篇文章一起继续学习吧

1、 安装java,并设置相关的环境变量

> wget https://download.java.net/openjdk/jdk7u75/rI/Openjdk-7u75-b13-linux-x64-18_dec_2014.tar.gz> tar zxvf openjdk-7u75-b13-linux-x64-18_dec_2014.tar.gz> mv java-se-7u75-ri/ /opt/> export JAVA_HOME=/opt/java-se-7u75-ri> export PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin> export CLAsspATH=.:$JAVA_HOME/lib:$JAVA_HOME/lib/tools.jar#验证安装> java -verisonopenjdk version "1.7.0_75"OpenJDK Runtime Environment (build 1.7.0_75-b13)OpenJDK 64-Bit Server VM (build 24.75-b04, mixed mode)

2、安装kafka,这里以0.10.2版本为例

> wget http://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz> tar zxvf kafka_2.11-0.10.2.0.tgz> mv kafka_2.11-0.10.2.0/ /opt/kafka> cd /opt/kafka#启动zookeeper> bin/zookeeper-server-start.sh config/zookeeper.propertIEs[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.propertIEs (org.apache.zookeeper.server.quorum.QuorumPeerConfig)...#启动kafka> bin/kafka-server-start.sh config/server.propertIEs[2013-04-22 15:01:47,028] INFO Verifying propertIEs (kafka.utils.VerifiablePropertIEs)[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overrIDden to 1048576 (kafka.utils.VerifiablePropertIEs)...#尝试创建一个topic> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test> bin/kafka-topics.sh --List --zookeeper localhost:2181test#生产者写入消息> bin/kafka-console-producer.sh --broker-List localhost:9092 --topic testThis is a messageThis is another message#消费者消费消息> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginningThis is a messageThis is another message

3、安装kafka的C *** 作库

> wget https://github.com/edenhill/librdkafka/archive/v1.3.0.tar.gz> tar zxvf v1.3.0.tar.gz> cd librdkafka-1.3.0> ./configure> make && make install

4、安装PHP的kafka扩展 ,这里选择PHP-rdkafka扩展 https://github.com/arnaud-lb/PHP-rdkafka

> wget https://github.com/arnaud-lb/PHP-rdkafka/archive/4.0.2.tar.gz> tar 4.0.2.tar.gz> cd PHP-rdkafka-4.0.2> /opt/PHP7/bin/PHPize> ./configure --with-PHP-config=/opt/PHP7/bin/PHP-config> make && make install

修改PHP.ini,加入 extension=rdkafka.so

5、安装rdkafka的IDE代码提示文件

> composer create-project kwn/PHP-rdkafka-stubs PHP-rdkafka-stubs

以PHPstrom为例,在你的项目的External librarIEs右键,选择Configure PHP Include Paths,把刚刚的路径添加进来。

6、编写PHP测试代码

producer:

<?PHP$conf = new RdKafka\Conf();$conf->set('log_level', LOG_ERR);$conf->set('deBUG', 'admin');$conf->set('Metadata.broker.List', 'localhost:9092');//If you need to produce exactly once and want to keep the original produce order, uncomment the line below//$conf->set('enable.IDempotence', 'true');$producer = new RdKafka\Producer($conf);$topic = $producer->newtopic("test2");for ($i = 0; $i < 10; $i++) {    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");    $producer->poll(0);}for ($flushRetrIEs = 0; $flushRetrIEs < 10; $flushRetrIEs++) {    $result = $producer->flush(10000);    if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {        break;    }}if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {    throw new \RuntimeException('Was unable to flush, messages might be lost!');}

low-level consumer:

<?PHP$conf = new RdKafka\Conf();$conf->set('log_level', LOG_ERR);$conf->set('deBUG', 'admin');// Set the group ID. This is required when storing offsets on the broker$conf->set('group.ID', 'myConsumerGroup');$rk = new RdKafka\Consumer($conf);$rk->addbrokers("127.0.0.1");$topicConf = new RdKafka\topicConf();$topicConf->set('auto.commit.interval.ms', 100);// Set the offset store method to 'file'$topicConf->set('offset.store.method', 'broker');// Alternatively, set the offset store method to 'none'// $topicConf->set('offset.store.method', 'none');// Set where to start consuming messages when there is no initial offset in// offset store or the desired offset is out of range.// 'smallest': start from the beginning$topicConf->set('auto.offset.reset', 'smallest');$topic = $rk->newtopic("test2", $topicConf);// Start consuming partition 0$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);while (true) {    $message = $topic->consume(0, 10000);    switch ($message->err) {        case RD_KAFKA_RESP_ERR_NO_ERROR:            print_r($message);            break;        case RD_KAFKA_RESP_ERR__PARTITION_EOF:            echo "No more messages; will wait for more\n";            break;        case RD_KAFKA_RESP_ERR__TIMED_OUT:            echo "Timed out\n";            break;        default:            throw new \Exception($message->errstr(), $message->err);            break;    }}

high-level consumer:

<?PHP$conf = new RdKafka\Conf();// Set a rebalance callback to log partition assignments (optional)$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {    switch ($err) {        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:            echo "Assign: ";            var_dump($partitions);            $kafka->assign($partitions);            break;         case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:             echo "Revoke: ";             var_dump($partitions);             $kafka->assign(NulL);             break;         default:            throw new \Exception($err);    }});// Configure the group.ID. All consumer with the same group.ID will consume// different partitions.$conf->set('group.ID', 'myConsumerGroup');// Initial List of Kafka brokers$conf->set('Metadata.broker.List', '127.0.0.1');// Set where to start consuming messages when there is no initial offset in// offset store or the desired offset is out of range.// 'smallest': start from the beginning$conf->set('auto.offset.reset', 'smallest');$consumer = new RdKafka\KafkaConsumer($conf);// Subscribe to topic 'test2'$consumer->subscribe(['test2']);echo "Waiting for partition assignment... (make take some time when\n";echo "quickly re-joining the group after leaving it.)\n";while (true) {    $message = $consumer->consume(1000);    switch ($message->err) {        case RD_KAFKA_RESP_ERR_NO_ERROR:            print_r($message);            break;        case RD_KAFKA_RESP_ERR__PARTITION_EOF:            echo "No more messages; will wait for more\n";            break;        case RD_KAFKA_RESP_ERR__TIMED_OUT:            echo "Timed out\n";            break;        default:            throw new \Exception($message->errstr(), $message->err);            break;    }    sleep(2);}

推荐学习:《PHP视频教程》 总结

以上是编程之家为你收集整理的教你如何快速进行php+kafka的安装全部内容,希望文章能够帮你解决教你如何快速进行php+kafka的安装所遇到的程序开发问题。

如果觉得编程之家网站内容还不错,欢迎将编程之家网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/langs/997351.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存