准备条件

  • 安装jdk
  • 安装kafka

参考网址:

http://www.04007.cn/article/549.html jdk的安装
http://kafka.apachecn.org/ kafka中文文档
https://packagist.org/packages/nmred/kafka-php nmred/kafka-php kafka的php库
https://mirrors.bfsu.edu.cn/apache/kafka/2.5.0/ kafka下载 下载不带src的 例如kafka_2.12-2.5.0.tgz
https://www.cnblogs.com/gwyy/p/12206176.html php kafka扩展-rdkafka 比较火的这个

安装流程

  1. 先安装jdk kafka需要用到java
  2. 安装kafka 安装的时候选择不带src的文件 例如kafka_2.12-2.5.0.tgz
  3. 安装扩展 rdkafka

遇到的问题

安装扩展时如果pecl的默认不是大于5.6.0的话 使用pecl安装rdkafka是会报错的
可以直接到php版本的目录下的bin目录的也有个pecl
/xxx/php安装目录/bin/pecl install rdkafka
file
安装完成后去到php.ini 修改配置
extension = /www/server/php/72/lib/php/extensions/no-debug-non-zts-20170718/rdkafka.so

重载一下php的配置或者重启

再查看一下phpinfo就可以看到rdkafka的扩展了

启动kafka

  1. cd 到kafka的安装目录

  2. 先启动zookeeper服务器

    bin/zookeeper-server-start.sh config/zookeeper.properties
  3. 启动kafka服务

    bin/kafka-server-start.sh config/server.properties

测试

  • 前面的扩展和第三方库都安装成功后

produe.php


<?php 
require 'vendor/autoload.php';
date_default_timezone_set('PRC');
//use Monolog\Logger;

use Monolog\Handler\StreamHandler;
use Monolog\Logger;
//use Monolog\Handler\StdoutHandler;
// Create the logger
$path = __DIR__.'/my_logger';
$logger = new Logger($path);
// Now add some handlers
//$logger->pushHandler(new StdoutHandler());
$logger->pushHandler(new StreamHandler($path, Logger::WARNING));

$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9092');//默认使用9092 可以在config/properties中修改
$config->setBrokerVersion('1.0.0');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer(
    function() {
        return [
            [
                'topic' => 'test',//topic
                'value' => 'test....message.'.date('Y-m-d H:i:s'),
                'key' => 'testkey',
            ],
        ];
    }
);
$producer->setLogger($logger);
$producer->success(function($result) {
    echo "success;" . PHP_EOL;
    var_dump($result);
});
$producer->error(function($errorCode) {
        var_dump($errorCode);
});
$producer->send(true);

consumer.php

<?php

require 'vendor/autoload.php';
date_default_timezone_set('PRC');

use Monolog\Handler\StreamHandler;
use Monolog\Logger;
// Create the logger
$path = __DIR__ . '/my_logger';
$logger = new Logger($path);
// Now add some handlers
$logger->pushHandler(new StreamHandler($path, Logger::WARNING));

$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9092');
$config->setGroupId('test');//对应获取的topic
$config->setBrokerVersion('1.0.0');//版本号也要和生产者一样
$config->setTopics(['test']);//监听的topic
//$config->setOffsetReset('earliest');
$consumer = new \Kafka\Consumer();
$consumer->setLogger($logger);
$consumer->start(function($topic, $part, $message) {
    echo "收到的topic是:" . $topic.PHP_EOL;
    echo "messgea是:" . PHP_EOL;
    var_dump($message);
    echo '----------' . PHP_EOL;
});
  • 启动服务后分别在cli中运行,就可以看到效果了