]> scripts.mit.edu Git - autoinstallsdev/mediawiki.git/blob - vendor/monolog/monolog/src/Monolog/Handler/AmqpHandler.php
MediaWiki 1.30.2
[autoinstallsdev/mediawiki.git] / vendor / monolog / monolog / src / Monolog / Handler / AmqpHandler.php
1 <?php
2
3 /*
4  * This file is part of the Monolog package.
5  *
6  * (c) Jordi Boggiano <j.boggiano@seld.be>
7  *
8  * For the full copyright and license information, please view the LICENSE
9  * file that was distributed with this source code.
10  */
11
12 namespace Monolog\Handler;
13
14 use Monolog\Logger;
15 use Monolog\Formatter\JsonFormatter;
16 use PhpAmqpLib\Message\AMQPMessage;
17 use PhpAmqpLib\Channel\AMQPChannel;
18 use AMQPExchange;
19
20 class AmqpHandler extends AbstractProcessingHandler
21 {
22     /**
23      * @var AMQPExchange|AMQPChannel $exchange
24      */
25     protected $exchange;
26
27     /**
28      * @var string
29      */
30     protected $exchangeName;
31
32     /**
33      * @param AMQPExchange|AMQPChannel $exchange     AMQPExchange (php AMQP ext) or PHP AMQP lib channel, ready for use
34      * @param string                   $exchangeName
35      * @param int                      $level
36      * @param bool                     $bubble       Whether the messages that are handled can bubble up the stack or not
37      */
38     public function __construct($exchange, $exchangeName = 'log', $level = Logger::DEBUG, $bubble = true)
39     {
40         if ($exchange instanceof AMQPExchange) {
41             $exchange->setName($exchangeName);
42         } elseif ($exchange instanceof AMQPChannel) {
43             $this->exchangeName = $exchangeName;
44         } else {
45             throw new \InvalidArgumentException('PhpAmqpLib\Channel\AMQPChannel or AMQPExchange instance required');
46         }
47         $this->exchange = $exchange;
48
49         parent::__construct($level, $bubble);
50     }
51
52     /**
53      * {@inheritDoc}
54      */
55     protected function write(array $record)
56     {
57         $data = $record["formatted"];
58         $routingKey = $this->getRoutingKey($record);
59
60         if ($this->exchange instanceof AMQPExchange) {
61             $this->exchange->publish(
62                 $data,
63                 $routingKey,
64                 0,
65                 array(
66                     'delivery_mode' => 2,
67                     'content_type' => 'application/json',
68                 )
69             );
70         } else {
71             $this->exchange->basic_publish(
72                 $this->createAmqpMessage($data),
73                 $this->exchangeName,
74                 $routingKey
75             );
76         }
77     }
78
79     /**
80      * {@inheritDoc}
81      */
82     public function handleBatch(array $records)
83     {
84         if ($this->exchange instanceof AMQPExchange) {
85             parent::handleBatch($records);
86
87             return;
88         }
89
90         foreach ($records as $record) {
91             if (!$this->isHandling($record)) {
92                 continue;
93             }
94
95             $record = $this->processRecord($record);
96             $data = $this->getFormatter()->format($record);
97
98             $this->exchange->batch_basic_publish(
99                 $this->createAmqpMessage($data),
100                 $this->exchangeName,
101                 $this->getRoutingKey($record)
102             );
103         }
104
105         $this->exchange->publish_batch();
106     }
107
108     /**
109      * Gets the routing key for the AMQP exchange
110      *
111      * @param  array  $record
112      * @return string
113      */
114     protected function getRoutingKey(array $record)
115     {
116         $routingKey = sprintf(
117             '%s.%s',
118             // TODO 2.0 remove substr call
119             substr($record['level_name'], 0, 4),
120             $record['channel']
121         );
122
123         return strtolower($routingKey);
124     }
125
126     /**
127      * @param  string      $data
128      * @return AMQPMessage
129      */
130     private function createAmqpMessage($data)
131     {
132         return new AMQPMessage(
133             (string) $data,
134             array(
135                 'delivery_mode' => 2,
136                 'content_type' => 'application/json',
137             )
138         );
139     }
140
141     /**
142      * {@inheritDoc}
143      */
144     protected function getDefaultFormatter()
145     {
146         return new JsonFormatter(JsonFormatter::BATCH_MODE_JSON, false);
147     }
148 }