]> scripts.mit.edu Git - autoinstalls/mediawiki.git/blob - includes/debug/logger/monolog/KafkaHandler.php
MediaWiki 1.30.2
[autoinstalls/mediawiki.git] / includes / debug / logger / monolog / KafkaHandler.php
1 <?php
2 /**
3  * This program is free software; you can redistribute it and/or modify
4  * it under the terms of the GNU General Public License as published by
5  * the Free Software Foundation; either version 2 of the License, or
6  * (at your option) any later version.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License along
14  * with this program; if not, write to the Free Software Foundation, Inc.,
15  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16  * http://www.gnu.org/copyleft/gpl.html
17  *
18  * @file
19  */
20
21 namespace MediaWiki\Logger\Monolog;
22
23 use Kafka\MetaDataFromKafka;
24 use Kafka\Produce;
25 use Kafka\Protocol\Decoder;
26 use MediaWiki\Logger\LoggerFactory;
27 use Monolog\Handler\AbstractProcessingHandler;
28 use Monolog\Logger;
29 use Psr\Log\LoggerInterface;
30
31 /**
32  * Log handler sends log events to a kafka server.
33  *
34  * Constructor options array arguments:
35  * * alias: map from monolog channel to kafka topic name. When no
36  *   alias exists the topic "monolog_$channel" will be used.
37  * * swallowExceptions: Swallow exceptions that occur while talking to
38  *   kafka. Defaults to false.
39  * * logExceptions: Log exceptions talking to kafka here. Either null,
40  *   the name of a channel to log to, or an object implementing
41  *   FormatterInterface. Defaults to null.
42  *
43  * Requires the nmred/kafka-php library, version >= 1.3.0
44  *
45  * @since 1.26
46  * @author Erik Bernhardson <ebernhardson@wikimedia.org>
47  * @copyright © 2015 Erik Bernhardson and Wikimedia Foundation.
48  */
49 class KafkaHandler extends AbstractProcessingHandler {
50         /**
51          * @var Produce Sends requests to kafka
52          */
53         protected $produce;
54
55         /**
56          * @var array Optional handler configuration
57          */
58         protected $options;
59
60         /**
61          * @var array Map from topic name to partition this request produces to
62          */
63         protected $partitions = [];
64
65         /**
66          * @var array defaults for constructor options
67          */
68         private static $defaultOptions = [
69                 'alias' => [], // map from monolog channel to kafka topic
70                 'swallowExceptions' => false, // swallow exceptions sending records
71                 'logExceptions' => null, // A PSR3 logger to inform about errors
72                 'requireAck' => 0,
73         ];
74
75         /**
76          * @param Produce $produce Kafka instance to produce through
77          * @param array $options optional handler configuration
78          * @param int $level The minimum logging level at which this handler will be triggered
79          * @param bool $bubble Whether the messages that are handled can bubble up the stack or not
80          */
81         public function __construct(
82                 Produce $produce, array $options, $level = Logger::DEBUG, $bubble = true
83         ) {
84                 parent::__construct( $level, $bubble );
85                 $this->produce = $produce;
86                 $this->options = array_merge( self::$defaultOptions, $options );
87         }
88
89         /**
90          * Constructs the necessary support objects and returns a KafkaHandler
91          * instance.
92          *
93          * @param string[] $kafkaServers
94          * @param array $options
95          * @param int $level The minimum logging level at which this handle will be triggered
96          * @param bool $bubble Whether the messages that are handled can bubble the stack or not
97          * @return KafkaHandler
98          */
99         public static function factory(
100                 $kafkaServers, array $options = [], $level = Logger::DEBUG, $bubble = true
101         ) {
102                 $metadata = new MetaDataFromKafka( $kafkaServers );
103                 $produce = new Produce( $metadata );
104
105                 if ( isset( $options['sendTimeout'] ) ) {
106                         $timeOut = $options['sendTimeout'];
107                         $produce->getClient()->setStreamOption( 'SendTimeoutSec', 0 );
108                         $produce->getClient()->setStreamOption( 'SendTimeoutUSec',
109                                 intval( $timeOut * 1000000 )
110                         );
111                 }
112                 if ( isset( $options['recvTimeout'] ) ) {
113                         $timeOut = $options['recvTimeout'];
114                         $produce->getClient()->setStreamOption( 'RecvTimeoutSec', 0 );
115                         $produce->getClient()->setStreamOption( 'RecvTimeoutUSec',
116                                 intval( $timeOut * 1000000 )
117                         );
118                 }
119                 if ( isset( $options['logExceptions'] ) && is_string( $options['logExceptions'] ) ) {
120                         $options['logExceptions'] = LoggerFactory::getInstance( $options['logExceptions'] );
121                 }
122
123                 if ( isset( $options['requireAck'] ) ) {
124                         $produce->setRequireAck( $options['requireAck'] );
125                 }
126
127                 return new self( $produce, $options, $level, $bubble );
128         }
129
130         /**
131          * @inheritDoc
132          */
133         protected function write( array $record ) {
134                 if ( $record['formatted'] !== null ) {
135                         $this->addMessages( $record['channel'], [ $record['formatted'] ] );
136                         $this->send();
137                 }
138         }
139
140         /**
141          * @inheritDoc
142          */
143         public function handleBatch( array $batch ) {
144                 $channels = [];
145                 foreach ( $batch as $record ) {
146                         if ( $record['level'] < $this->level ) {
147                                 continue;
148                         }
149                         $channels[$record['channel']][] = $this->processRecord( $record );
150                 }
151
152                 $formatter = $this->getFormatter();
153                 foreach ( $channels as $channel => $records ) {
154                         $messages = [];
155                         foreach ( $records as $idx => $record ) {
156                                 $message = $formatter->format( $record );
157                                 if ( $message !== null ) {
158                                         $messages[] = $message;
159                                 }
160                         }
161                         if ( $messages ) {
162                                 $this->addMessages( $channel, $messages );
163                         }
164                 }
165
166                 $this->send();
167         }
168
169         /**
170          * Send any records in the kafka client internal queue.
171          */
172         protected function send() {
173                 try {
174                         $response = $this->produce->send();
175                 } catch ( \Kafka\Exception $e ) {
176                         $ignore = $this->warning(
177                                 'Error sending records to kafka: {exception}',
178                                 [ 'exception' => $e ] );
179                         if ( !$ignore ) {
180                                 throw $e;
181                         } else {
182                                 return;
183                         }
184                 }
185
186                 if ( is_bool( $response ) ) {
187                         return;
188                 }
189
190                 $errors = [];
191                 foreach ( $response as $topicName => $partitionResponse ) {
192                         foreach ( $partitionResponse as $partition => $info ) {
193                                 if ( $info['errCode'] === 0 ) {
194                                         // no error
195                                         continue;
196                                 }
197                                 $errors[] = sprintf(
198                                         'Error producing to %s (errno %d): %s',
199                                         $topicName,
200                                         $info['errCode'],
201                                         Decoder::getError( $info['errCode'] )
202                                 );
203                         }
204                 }
205
206                 if ( $errors ) {
207                         $error = implode( "\n", $errors );
208                         if ( !$this->warning( $error ) ) {
209                                 throw new \RuntimeException( $error );
210                         }
211                 }
212         }
213
214         /**
215          * @param string $topic Name of topic to get partition for
216          * @return int|null The random partition to produce to for this request,
217          *  or null if a partition could not be determined.
218          */
219         protected function getRandomPartition( $topic ) {
220                 if ( !array_key_exists( $topic, $this->partitions ) ) {
221                         try {
222                                 $partitions = $this->produce->getAvailablePartitions( $topic );
223                         } catch ( \Kafka\Exception $e ) {
224                                 $ignore = $this->warning(
225                                         'Error getting metadata for kafka topic {topic}: {exception}',
226                                         [ 'topic' => $topic, 'exception' => $e ] );
227                                 if ( $ignore ) {
228                                         return null;
229                                 }
230                                 throw $e;
231                         }
232                         if ( $partitions ) {
233                                 $key = array_rand( $partitions );
234                                 $this->partitions[$topic] = $partitions[$key];
235                         } else {
236                                 $details = $this->produce->getClient()->getTopicDetail( $topic );
237                                 $ignore = $this->warning(
238                                         'No partitions available for kafka topic {topic}',
239                                         [ 'topic' => $topic, 'kafka' => $details ]
240                                 );
241                                 if ( !$ignore ) {
242                                         throw new \RuntimeException( "No partitions available for kafka topic $topic" );
243                                 }
244                                 $this->partitions[$topic] = null;
245                         }
246                 }
247                 return $this->partitions[$topic];
248         }
249
250         /**
251          * Adds records for a channel to the Kafka client internal queue.
252          *
253          * @param string $channel Name of Monolog channel records belong to
254          * @param array $records List of records to append
255          */
256         protected function addMessages( $channel, array $records ) {
257                 if ( isset( $this->options['alias'][$channel] ) ) {
258                         $topic = $this->options['alias'][$channel];
259                 } else {
260                         $topic = "monolog_$channel";
261                 }
262                 $partition = $this->getRandomPartition( $topic );
263                 if ( $partition !== null ) {
264                         $this->produce->setMessages( $topic, $partition, $records );
265                 }
266         }
267
268         /**
269          * @param string $message PSR3 compatible message string
270          * @param array $context PSR3 compatible log context
271          * @return bool true if caller should ignore warning
272          */
273         protected function warning( $message, array $context = [] ) {
274                 if ( $this->options['logExceptions'] instanceof LoggerInterface ) {
275                         $this->options['logExceptions']->warning( $message, $context );
276                 }
277                 return $this->options['swallowExceptions'];
278         }
279 }