]> scripts.mit.edu Git - autoinstalls/mediawiki.git/blob - vendor/nmred/kafka-php/README.md
MediaWiki 1.30.2-scripts2
[autoinstalls/mediawiki.git] / vendor / nmred / kafka-php / README.md
1 Kafka-php
2 ==========
3
4 [![Build Status](https://travis-ci.org/nmred/kafka-php.svg?branch=master)](https://travis-ci.org/nmred/Kafka-php)
5
6 Kafka-php is a php client with Zookeeper integration for apache Kafka. It only supports the latest version of Kafka 0.8 which is still under development, so this module is _not production ready_ so far.
7
8 The Zookeeper integration does the following jobs:
9
10 * Loads broker metadata from Zookeeper before we can communicate with the Kafka server
11 * Watches broker state, if broker changes, the client will refresh broker and topic metadata stored in the client
12
13 ## Requirements
14
15 * Minimum PHP version: 5.3.3.
16 * Apache Kafka 0.8.x
17 * You need to have access to your Kafka instance and be able to connect through TCP. You can obtain a copy and instructions on how to setup kafka at https://github.com/kafka-dev/kafka [kafka-08-quick-start](https://cwiki.apache.org/KAFKA/kafka-08-quick-start.html)
18 * The [PHP Zookeeper extension](https://github.com/andreiz/php-zookeeper) is required if you want to use the Zookeeper-based consumer.
19 * Productor can not dependency zookeeper
20
21 ## Installation
22 Add the lib directory to the PHP include_path and use an autoloader like the one in the examples directory (the code follows the PEAR/Zend one-class-per-file convention).
23
24 ## Composer Install
25
26 Simply add a dependency on nmred/kafka-php to your project's composer.json file if you use Composer to manage the dependencies of your project. Here is a minimal example of a composer.json file :
27
28 ```
29 {
30         "require": {
31                 "nmred/kafka-php": "0.1.*"
32         }
33 }
34 ```
35
36 ## Produce
37
38 ### \Kafka\Produce::getInstance($hostList, $timeout)
39
40 * `hostList` : zookeeper host list , example 127.0.0.1:2181,192.168.1.114:2181
41 * `timeout`  : zookeeper timeout
42
43 ### \Kafka\Produce::setRequireAck($ack = -1)
44
45 * `ack`: This field indicates how many acknowledgements the servers should receive before responding to the request.
46
47 ### \Kafka\Produce::setMessages($topicName, $partitionId, $messages)
48
49 * `topicName` : The topic that data is being published to.
50 * `partitionId` : The partition that data is being published to.
51 * `messages` : [Array] publish message.
52
53 ### \Kafka\Produce::send()
54
55 send message sets to the server. 
56
57 ### Example
58
59 ``` php
60 $produce = \Kafka\Produce::getInstance('localhost:2181', 3000);
61
62 $produce->setRequireAck(-1);
63 $produce->setMessages('test', 0, array('test1111111'));
64 $produce->setMessages('test6', 0, array('test1111111'));
65 $produce->setMessages('test6', 2, array('test1111111'));
66 $produce->setMessages('test6', 1, array('test111111111133'));
67 $result = $produce->send();
68 var_dump($result);
69
70 ```
71
72 ## Consumer
73
74 ### \Kafka\Consumer::getInstance($hostList, $timeout)
75
76 * `hostList` : zookeeper host list , example 127.0.0.1:2181,192.168.1.114:2181
77 * `timeout`  : zookeeper timeout
78
79 ### \Kafka\Consumer::setGroup($groupName)
80
81 * `groupName` : Specify consumer group.
82
83 ### \Kafka\Consumer::setPartition($topicName, $partitionId, $offset = 0)
84
85 * `topicName` : The topic that data is being fetch to. 
86 * `partitionId` : The partition that data is being fetch to.
87 * `offset`: set fetch offset. default `0`.
88
89 ### \Kafka\Consumer::fetch()
90
91 return fetch message Iterator. `\Kafka\Protocol\Fetch\Topic`
92
93 ### \Kafka\Protocol\Fetch\Topic
94
95 this object is iterator
96
97 `key` : topic name
98 `value`: `\Kafka\Protocol\Fetch\Partition`
99
100 ### \Kafka\Protocol\Fetch\Partition
101
102 this object is iterator.
103
104 `key`: partition id
105 `value`: messageSet object
106
107 #### \Kafka\Protocol\Fetch\Partition::getErrCode()
108
109 return partition fetch errcode.
110
111 #### \Kafka\Protocol\Fetch\Partition::getHighOffset()
112
113 return partition fetch offset.
114
115 ### \Kafka\Protocol\Fetch\MessageSet
116
117 this object is iterator. `\Kafka\Protocol\Fetch\Message`
118
119 ### Example
120
121 ``` php
122 $consumer = \Kafka\Consumer::getInstance('localhost:2181');
123
124 $consumer->setGroup('testgroup');
125 $consumer->setPartition('test', 0);
126 $consumer->setPartition('test6', 2, 10);
127 $result = $consumer->fetch();
128 foreach ($result as $topicName => $topic) {
129     foreach ($topic as $partId => $partition) {
130         var_dump($partition->getHighOffset());
131         foreach ($partition as $message) {
132             var_dump((string)$message);
133         }
134     }
135 }
136 ```
137
138 ## Basic Protocol
139 ### Produce API
140
141 The produce API is used to send message sets to the server. For efficiency it allows sending message sets intended for many topic partitions in a single request.
142
143 \Kafka\Protocol\Encoder::produceRequest
144
145 #### Param struct
146 ``` php
147 array(
148     'required_ack' => 1,
149         // This field indicates how many acknowledgements the servers should receive before responding to the request. default `0`
150         // If it is 0 the server will not send any response
151         // If it is -1 the server will block until the message is committed by all in sync replicas before sending a response 
152         // For any number > 1 the server will block waiting for this number of acknowledgements to occur
153     'timeout' => 1000,
154         // This provides a maximum time in milliseconds the server can await the receipt of the number of acknowledgements in RequiredAcks.
155     'data' => array(
156         array(
157             'topic_name' => 'testtopic',
158                 // The topic that data is being published to.[String]
159             'partitions' => array(
160                 array(
161                     'partition_id' => 0,
162                         // The partition that data is being published to.
163                     'messages' => array(
164                         'message1', 
165                         // [String] message
166                     ),
167                 ),
168             ),
169         ),
170     ),
171 );
172 ```
173
174 #### Return
175
176 Array
177
178 #### Example
179
180 ``` php
181
182 $data = array(
183     'required_ack' => 1,
184     'timeout' => 1000,
185     'data' => array(
186         array(
187             'topic_name' => 'test',
188             'partitions' => array(
189                 array(
190                     'partition_id' => 0,
191                     'messages' => array(
192                         'message1',
193                         'message2',
194                     ),
195                 ),
196             ),
197         ),
198     ),
199 );
200
201 $conn = new \Kafka\Socket('localhost', '9092');
202 $conn->connect();
203 $encoder = new \Kafka\Protocol\Encoder($conn);
204 $encoder->produceRequest($data);
205
206 $decoder = new \Kafka\Protocol\Decoder($conn);
207 $result = $decoder->produceResponse();
208 var_dump($result);
209
210 ```
211 ### Fetch API
212
213 The fetch API is used to fetch a chunk of one or more logs for some topic-partitions. Logically one specifies the topics, partitions, and starting offset at which to begin the fetch and gets back a chunk of messages
214
215 \Kafka\Protocol\Encoder::fetchRequest
216
217 #### Param struct
218 ``` php
219 array(
220     'replica_id' => -1,
221         // The replica id indicates the node id of the replica initiating this request. default `-1`
222     'max_wait_time' => 100,
223         // The max wait time is the maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the request is issued. default 100 ms.
224     'min_bytes' => 64 * 1024 // 64k
225         // This is the minimum number of bytes of messages that must be available to give a response. default 64k.
226     'data' => array(
227         array(
228             'topic_name' => 'testtopic',
229                 // The topic that data is being published to.[String]
230             'partitions' => array(
231                 array(
232                     'partition_id' => 0,
233                         // The partition that data is being published to.
234                     'offset' => 0,
235                         // The offset to begin this fetch from. default 0
236                     'max_bytes' => 100 * 1024 * 1024,
237                         // This is the minimum number of bytes of messages that must be available to give a response. default 100Mb
238                 ),
239             ),
240         ),
241     ),
242 );
243 ```
244
245 #### Return
246
247 \Kafka\Protocol\Fetch\Topic iterator
248
249 #### Example
250 ``` php
251
252 $data = array(
253     'data' => array(
254         array(
255             'topic_name' => 'test',
256             'partitions' => array(
257                 array(
258                     'partition_id' => 0,
259                     'offset' => 0, 
260                 ),
261             ),
262         ),
263     ),
264 );
265
266 $conn = new \Kafka\Socket('localhost', '9092');
267 $conn->connect();
268 $encoder = new \Kafka\Protocol\Encoder($conn);
269 $encoder->fetchRequest($data);
270
271 $decoder = new \Kafka\Protocol\Decoder($conn);
272 $result = $decoder->fetchResponse();
273 var_dump($result);
274
275 ```
276 ### Offset API
277
278 This API describes the valid offset range available for a set of topic-partitions. As with the produce and fetch APIs requests must be directed to the broker that is currently the leader for the partitions in question. This can be determined using the metadata API.
279
280 \Kafka\Protocol\Encoder::offsetRequest
281
282 ####param struct
283 ``` php
284 array(
285     'replica_id' => -1,
286         // The replica id indicates the node id of the replica initiating this request. default `-1`
287     'data' => array(
288         array(
289             'topic_name' => 'testtopic',
290                 // The topic that data is being published to.[String]
291             'partitions' => array(
292                 array(
293                     'partition_id' => 0,
294                         // The partition that get offset .
295                     'time' => -1,
296                         // Used to ask for all messages before a certain time (ms). 
297                         // Specify -1 to receive the latest offsets
298                         // Specify -2 to receive the earliest available offset. 
299                     'max_offset' => 1, 
300                         // max return offset element. default 10000.
301                 ),
302             ),
303         ),
304     ),
305 );
306 ```
307
308 #### Return
309
310 Array.
311
312 #### Example
313
314 ``` php
315
316 $data = array(
317     'data' => array(
318         array(
319             'topic_name' => 'test',
320             'partitions' => array(
321                 array(
322                     'partition_id' => 0,
323                     'max_offset' => 10, 
324                     'time' => -1, 
325                 ),
326             ),
327         ),
328     ),
329 );
330
331 $conn = new \Kafka\Socket('localhost', '9092');
332 $conn->connect();
333 $encoder = new \Kafka\Protocol\Encoder($conn);
334 $encoder->offsetRequest($data);
335
336 $decoder = new \Kafka\Protocol\Decoder($conn);
337 $result = $decoder->offsetResponse();
338 var_dump($result);
339
340 ```
341 ### Metadata API
342
343 The metdata returned is at the partition level, but grouped together by topic for convenience and to avoid redundancy. For each partition the metadata contains the information for the leader as well as for all the replicas and the list of replicas that are currently in-sync.
344
345 \Kafka\Protocol\Encoder::metadataRequest
346
347 ####param struct
348 ``` php
349 array(
350    'topic_name1', // topic name
351 );
352 ```
353
354 #### Return
355
356 Array.
357
358 #### Example
359
360 ``` php
361
362 $data = array(
363     'test'
364 );
365
366 $conn = new \Kafka\Socket('localhost', '9092');
367 $conn->connect();
368 $encoder = new \Kafka\Protocol\Encoder($conn);
369 $encoder->metadataRequest($data);
370
371 $decoder = new \Kafka\Protocol\Decoder($conn);
372 $result = $decoder->metadataResponse();
373 var_dump($result);
374
375 ```
376 ### Offset Commit API
377
378 These APIs allow for centralized management of offsets. 
379
380 \Kafka\Protocol\Encoder::commitOffsetRequest
381
382 ####param struct
383 ``` php
384 array(
385     'group_id' => 'testgroup',
386         // consumer group 
387     'data' => array(
388         array(
389             'topic_name' => 'testtopic',
390                 // The topic that data is being published to.[String]
391             'partitions' => array(
392                 array(
393                     'partition_id' => 0,
394                         // The partition that get offset .
395                     'offset' => 0,
396                         // The offset to begin this fetch from.
397                     'time' => -1, 
398                         // If the time stamp field is set to -1, then the broker sets the time stamp to the receive time before committing the offset.
399                 ),
400             ),
401         ),
402     ),
403 );
404 ```
405
406 #### Return
407
408 Array.
409
410 #### Example
411
412 ``` php
413 $data = array(
414     'group_id' => 'testgroup',
415     'data' => array(
416         array(
417             'topic_name' => 'test',
418             'partitions' => array(
419                 array(
420                     'partition_id' => 0,
421                     'offset' => 2, 
422                 ),
423             ),
424         ),
425     ),
426 );
427
428
429 $conn = new \Kafka\Socket('localhost', '9092');
430 $conn->connect();
431 $encoder = new \Kafka\Protocol\Encoder($conn);
432 $encoder->commitOffsetRequest($data);
433
434 $decoder = new \Kafka\Protocol\Decoder($conn);
435 $result = $decoder->commitOffsetResponse();
436 var_dump($result);
437
438 ```
439 ### Offset Fetch API
440
441 These APIs allow for centralized management of offsets. 
442
443 \Kafka\Protocol\Encoder::fetchOffsetRequest
444
445 ####param struct
446 ``` php
447 array(
448     'group_id' => 'testgroup',
449         // consumer group 
450     'data' => array(
451         array(
452             'topic_name' => 'testtopic',
453                 // The topic that data is being published to.[String]
454             'partitions' => array(
455                 array(
456                     'partition_id' => 0,
457                         // The partition that get offset .
458                 ),
459             ),
460         ),
461     ),
462 );
463 ```
464
465 #### Return
466
467 Array.
468
469 #### Example
470
471 ``` php
472 $data = array(
473     'group_id' => 'testgroup',
474     'data' => array(
475         array(
476             'topic_name' => 'test',
477             'partitions' => array(
478                 array(
479                     'partition_id' => 0,
480                 ),
481             ),
482         ),
483     ),
484 );
485
486
487 $conn = new \Kafka\Socket('localhost', '9092');
488 $conn->connect();
489 $encoder = new \Kafka\Protocol\Encoder($conn);
490 $encoder->fetchOffsetRequest($data);
491
492 $decoder = new \Kafka\Protocol\Decoder($conn);
493 $result = $decoder->fetchOffsetResponse();
494 var_dump($result);
495
496 ```