php使用Kafka生产和消费

使用docker快速部署kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
vim docker-compose.yml

version: "3.3"
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.56.112
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "test"
kafka-manager:
image: sheepkiller/kafka-manager
depends_on:
- zookeeper
- kafka
ports:
- "9000:9000"
environment:
ZK_HOSTS: zookeeper:2181

安装php的Kafka扩展

  1. 安装librdkafka

    1
    2
    3
    4
    5
    git clone https://github.com/edenhill/librdkafka.git
    cd librdkafka
    ./configure
    make
    sudo make install
  2. 安装php-rdkafka扩展

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    git clone https://github.com/arnaud-lb/php-rdkafka.git

    #生成configure文件
    /usr/local/php/bin/phpize

    #编译安装
    ./configure --with-php-config=/usr/local/php/bin/php-config
    make
    make install

    #在php.ini 文件中配置 rdkafka扩展
    vim /usr/local/php/etc/php.ini
    extension=rdkafka.so

    #查看扩展是否生效
    php -m | grep kafka

php使用Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
class kafka
{
public $broker_list;
public $topic;
public $group_id;
protected $producer = null;
protected $consumer = null;
protected $receive_wait_time;
protected $receive_wait_num;

public function __construct()
{
$this->broker_list = config('kafka.broker_list');
$this->topic = config('kafka.default_topic');
$this->group_id = config('kafka.default_group_id');
$this->producer = null;
$this->consumer = null;
$this->receive_wait_time = config('kafka.default_receive_wait_time', 10);
$this->receive_wait_num = config('kafka.default_receive_wait_num', 100);
}
/**
* 获取生产者
*/
public function Producer()
{
$conf = new \RdKafka\Conf();
$conf->set('bootstrap.servers', $this->broker_list);
// 0:不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成
// 1:leader节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功
// all:leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成
$conf->set('acks', '0');

//If you need to produce exactly once and want to keep the original produce order, uncomment the line below
//$conf->set('enable.idempotence', 'true');

$producer = new \RdKafka\Producer($conf);

$this->producer = $producer;
return $this;
}

/**
* 发送消息
*
* @param string|array $msg
* @param string $topic
* @return void
*/
public function SendMsg($msg = '', $topic = '')
{
if (empty($topic)) {
$topic = $this->topic;
}

$producer = $this->producer;

$topic = $producer->newTopic($topic);

if (!is_array($msg)) {
$msg = [$msg];
}
foreach ($msg as $value) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, $value);
$producer->poll(0);
}

for ($flushRetries = 0; $flushRetries < count($msg); $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}

if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('Kafka消息发送失败');
}
}

/**
* 获取消费者
*
* @param string $group_id
* @return void
*/
public function Consumer($group_id = '')
{
$conf = new \RdKafka\Conf();

// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
echo "Assign: ";
var_dump($partitions);
$kafka->assign($partitions);
break;

case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
echo "Revoke: ";
var_dump($partitions);
$kafka->assign(NULL);
break;

default:
throw new \Exception($err);
}
});

// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
if (empty($group_id)) {
$group_id = $this->group_id;
}

// 设置相同的group,防止一次消息被多次消费。
// 消费者启动的进程数应小于等于topic的分区数,否则多余的进程是无用的
$conf->set('group.id', $group_id);

// Initial list of Kafka brokers
$conf->set('bootstrap.servers', $this->broker_list);

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
//earliest:简单理解为从头开始消费,latest:简单理解为从最新的开始消费
$conf->set('auto.offset.reset', 'earliest');
// 在interval.ms的时间内定期向ZooKeeper提交使用者已获取的消息的偏移量
// 自动提交分区消费的位置,手动可确保消息被消费
$conf->set('enable.auto.commit', true);
$conf->set('auto.commit.interval.ms', 1000);

$consumer = new \RdKafka\KafkaConsumer($conf);

$this->consumer = $consumer;
return $this;
}

/**
* 接收消息
*
* @param string $topic
* @param array $callback
* @return void
*/
public function ReceiveMsg($topic = '', array $callback = [])
{
$consumer = $this->consumer;

if (empty($topic)) {
$topic = $this->topic;
}
if (!is_array($topic)) {
$topic = [$topic];
}
// Subscribe to topic 'test'
$consumer->subscribe($topic);

echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";

$i = 0;
$msg_list = [];
while (true) {
$i++;
if ($i > $this->receive_wait_time) {
$i = 0;
if (empty($msg_list)) {
continue;
}
if (!empty($callback)) {
call_user_func_array($callback, [$msg_list]);
}
$msg_list = [];
}
// 阻塞一秒钟
$message = $consumer->consume(1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$msg_list[] = $message->payload;
if (count($msg_list) < $this->receive_wait_num) {
continue;
}
if (!empty($callback)) {
call_user_func_array($callback, [$msg_list]);
}
$i = 0;
$msg_list = [];
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
// echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
// echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
}
}

创建topic

1
2
3
4
5
6
7
8
9
10
11
# 查看topic列表
> bin/kafka-topics.sh --list --zookeeper zookeeper:2181

# 创建topic
> bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 2 --topic test

# 查看topic分区情况
> bin/kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test

# 修改topic分区数
> bin/kafka-topics.sh --zookeeper zookeeper:2181 -alter --partitions 5 --topic test1

使用方式

1
2
3
4
5
6
7
8
9
10
生产者:

$service = new kafka();
$service->Producer()->sendMsg(json_encode(['msg'=>'hello world', 'name'=>'xupengfei'], 320), 'test_topic');

消费者:
$callbak是指定的类方法

$service = new kafka();
$service->Consumer()->receiveMsg($topic, $callback);

参考链接

  1. https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/book.rdkafka.html
  2. https://github.com/BCEBIGDATA/kafka-sample-php/blob/master/setup-librdkafka.sh

有话要说