]> scripts.mit.edu Git - autoinstallsdev/mediawiki.git/blob - includes/jobqueue/JobQueueGroup.php
MediaWiki 1.30.2
[autoinstallsdev/mediawiki.git] / includes / jobqueue / JobQueueGroup.php
1 <?php
2 /**
3  * Job queue base code.
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18  * http://www.gnu.org/copyleft/gpl.html
19  *
20  * @file
21  */
22
23 /**
24  * Class to handle enqueueing of background jobs
25  *
26  * @ingroup JobQueue
27  * @since 1.21
28  */
29 class JobQueueGroup {
30         /** @var JobQueueGroup[] */
31         protected static $instances = [];
32
33         /** @var ProcessCacheLRU */
34         protected $cache;
35
36         /** @var string Wiki ID */
37         protected $wiki;
38         /** @var string|bool Read only rationale (or false if r/w) */
39         protected $readOnlyReason;
40         /** @var bool Whether the wiki is not recognized in configuration */
41         protected $invalidWiki = false;
42
43         /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */
44         protected $coalescedQueues;
45
46         /** @var Job[] */
47         protected $bufferedJobs = [];
48
49         const TYPE_DEFAULT = 1; // integer; jobs popped by default
50         const TYPE_ANY = 2; // integer; any job
51
52         const USE_CACHE = 1; // integer; use process or persistent cache
53
54         const PROC_CACHE_TTL = 15; // integer; seconds
55
56         const CACHE_VERSION = 1; // integer; cache version
57
58         /**
59          * @param string $wiki Wiki ID
60          * @param string|bool $readOnlyReason Read-only reason or false
61          */
62         protected function __construct( $wiki, $readOnlyReason ) {
63                 $this->wiki = $wiki;
64                 $this->readOnlyReason = $readOnlyReason;
65                 $this->cache = new ProcessCacheLRU( 10 );
66         }
67
68         /**
69          * @param bool|string $wiki Wiki ID
70          * @return JobQueueGroup
71          */
72         public static function singleton( $wiki = false ) {
73                 global $wgLocalDatabases;
74
75                 $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
76
77                 if ( !isset( self::$instances[$wiki] ) ) {
78                         self::$instances[$wiki] = new self( $wiki, wfConfiguredReadOnlyReason() );
79                         // Make sure jobs are not getting pushed to bogus wikis. This can confuse
80                         // the job runner system into spawning endless RPC requests that fail (T171371).
81                         if ( $wiki !== wfWikiID() && !in_array( $wiki, $wgLocalDatabases ) ) {
82                                 self::$instances[$wiki]->invalidWiki = true;
83                         }
84                 }
85
86                 return self::$instances[$wiki];
87         }
88
89         /**
90          * Destroy the singleton instances
91          *
92          * @return void
93          */
94         public static function destroySingletons() {
95                 self::$instances = [];
96         }
97
98         /**
99          * Get the job queue object for a given queue type
100          *
101          * @param string $type
102          * @return JobQueue
103          */
104         public function get( $type ) {
105                 global $wgJobTypeConf;
106
107                 $conf = [ 'wiki' => $this->wiki, 'type' => $type ];
108                 if ( isset( $wgJobTypeConf[$type] ) ) {
109                         $conf = $conf + $wgJobTypeConf[$type];
110                 } else {
111                         $conf = $conf + $wgJobTypeConf['default'];
112                 }
113                 $conf['aggregator'] = JobQueueAggregator::singleton();
114                 if ( $this->readOnlyReason !== false ) {
115                         $conf['readOnlyReason'] = $this->readOnlyReason;
116                 }
117
118                 return JobQueue::factory( $conf );
119         }
120
121         /**
122          * Insert jobs into the respective queues of which they belong
123          *
124          * This inserts the jobs into the queue specified by $wgJobTypeConf
125          * and updates the aggregate job queue information cache as needed.
126          *
127          * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs
128          * @throws InvalidArgumentException
129          * @return void
130          */
131         public function push( $jobs ) {
132                 global $wgJobTypesExcludedFromDefaultQueue;
133
134                 if ( $this->invalidWiki ) {
135                         // Do not enqueue job that cannot be run (T171371)
136                         $e = new LogicException( "Domain '{$this->wiki}' is not recognized." );
137                         MWExceptionHandler::logException( $e );
138                         return;
139                 }
140
141                 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
142                 if ( !count( $jobs ) ) {
143                         return;
144                 }
145
146                 $this->assertValidJobs( $jobs );
147
148                 $jobsByType = []; // (job type => list of jobs)
149                 foreach ( $jobs as $job ) {
150                         $jobsByType[$job->getType()][] = $job;
151                 }
152
153                 foreach ( $jobsByType as $type => $jobs ) {
154                         $this->get( $type )->push( $jobs );
155                 }
156
157                 if ( $this->cache->has( 'queues-ready', 'list' ) ) {
158                         $list = $this->cache->get( 'queues-ready', 'list' );
159                         if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
160                                 $this->cache->clear( 'queues-ready' );
161                         }
162                 }
163
164                 $cache = ObjectCache::getLocalClusterInstance();
165                 $cache->set(
166                         $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', self::TYPE_ANY ),
167                         'true',
168                         15
169                 );
170                 if ( array_diff( array_keys( $jobsByType ), $wgJobTypesExcludedFromDefaultQueue ) ) {
171                         $cache->set(
172                                 $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', self::TYPE_DEFAULT ),
173                                 'true',
174                                 15
175                         );
176                 }
177         }
178
179         /**
180          * Buffer jobs for insertion via push() or call it now if in CLI mode
181          *
182          * Note that pushLazyJobs() is registered as a deferred update just before
183          * DeferredUpdates::doUpdates() in MediaWiki and JobRunner classes in order
184          * to be executed as the very last deferred update (T100085, T154425).
185          *
186          * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs
187          * @return void
188          * @since 1.26
189          */
190         public function lazyPush( $jobs ) {
191                 if ( $this->invalidWiki ) {
192                         // Do not enqueue job that cannot be run (T171371)
193                         throw new LogicException( "Domain '{$this->wiki}' is not recognized." );
194                 }
195
196                 if ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ) {
197                         $this->push( $jobs );
198                         return;
199                 }
200
201                 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
202
203                 // Throw errors now instead of on push(), when other jobs may be buffered
204                 $this->assertValidJobs( $jobs );
205
206                 $this->bufferedJobs = array_merge( $this->bufferedJobs, $jobs );
207         }
208
209         /**
210          * Push all jobs buffered via lazyPush() into their respective queues
211          *
212          * @return void
213          * @since 1.26
214          */
215         public static function pushLazyJobs() {
216                 foreach ( self::$instances as $group ) {
217                         try {
218                                 $group->push( $group->bufferedJobs );
219                                 $group->bufferedJobs = [];
220                         } catch ( Exception $e ) {
221                                 // Get in as many jobs as possible and let other post-send updates happen
222                                 MWExceptionHandler::logException( $e );
223                         }
224                 }
225         }
226
227         /**
228          * Pop a job off one of the job queues
229          *
230          * This pops a job off a queue as specified by $wgJobTypeConf and
231          * updates the aggregate job queue information cache as needed.
232          *
233          * @param int|string $qtype JobQueueGroup::TYPE_* constant or job type string
234          * @param int $flags Bitfield of JobQueueGroup::USE_* constants
235          * @param array $blacklist List of job types to ignore
236          * @return Job|bool Returns false on failure
237          */
238         public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = [] ) {
239                 $job = false;
240
241                 if ( is_string( $qtype ) ) { // specific job type
242                         if ( !in_array( $qtype, $blacklist ) ) {
243                                 $job = $this->get( $qtype )->pop();
244                         }
245                 } else { // any job in the "default" jobs types
246                         if ( $flags & self::USE_CACHE ) {
247                                 if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
248                                         $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() );
249                                 }
250                                 $types = $this->cache->get( 'queues-ready', 'list' );
251                         } else {
252                                 $types = $this->getQueuesWithJobs();
253                         }
254
255                         if ( $qtype == self::TYPE_DEFAULT ) {
256                                 $types = array_intersect( $types, $this->getDefaultQueueTypes() );
257                         }
258
259                         $types = array_diff( $types, $blacklist ); // avoid selected types
260                         shuffle( $types ); // avoid starvation
261
262                         foreach ( $types as $type ) { // for each queue...
263                                 $job = $this->get( $type )->pop();
264                                 if ( $job ) { // found
265                                         break;
266                                 } else { // not found
267                                         $this->cache->clear( 'queues-ready' );
268                                 }
269                         }
270                 }
271
272                 return $job;
273         }
274
275         /**
276          * Acknowledge that a job was completed
277          *
278          * @param Job $job
279          * @return void
280          */
281         public function ack( Job $job ) {
282                 $this->get( $job->getType() )->ack( $job );
283         }
284
285         /**
286          * Register the "root job" of a given job into the queue for de-duplication.
287          * This should only be called right *after* all the new jobs have been inserted.
288          *
289          * @param Job $job
290          * @return bool
291          */
292         public function deduplicateRootJob( Job $job ) {
293                 return $this->get( $job->getType() )->deduplicateRootJob( $job );
294         }
295
296         /**
297          * Wait for any replica DBs or backup queue servers to catch up.
298          *
299          * This does nothing for certain queue classes.
300          *
301          * @return void
302          */
303         public function waitForBackups() {
304                 global $wgJobTypeConf;
305
306                 // Try to avoid doing this more than once per queue storage medium
307                 foreach ( $wgJobTypeConf as $type => $conf ) {
308                         $this->get( $type )->waitForBackups();
309                 }
310         }
311
312         /**
313          * Get the list of queue types
314          *
315          * @return array List of strings
316          */
317         public function getQueueTypes() {
318                 return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
319         }
320
321         /**
322          * Get the list of default queue types
323          *
324          * @return array List of strings
325          */
326         public function getDefaultQueueTypes() {
327                 global $wgJobTypesExcludedFromDefaultQueue;
328
329                 return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
330         }
331
332         /**
333          * Check if there are any queues with jobs (this is cached)
334          *
335          * @param int $type JobQueueGroup::TYPE_* constant
336          * @return bool
337          * @since 1.23
338          */
339         public function queuesHaveJobs( $type = self::TYPE_ANY ) {
340                 $cache = ObjectCache::getLocalClusterInstance();
341                 $key = $cache->makeGlobalKey( 'jobqueue', $this->wiki, 'hasjobs', $type );
342
343                 $value = $cache->get( $key );
344                 if ( $value === false ) {
345                         $queues = $this->getQueuesWithJobs();
346                         if ( $type == self::TYPE_DEFAULT ) {
347                                 $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
348                         }
349                         $value = count( $queues ) ? 'true' : 'false';
350                         $cache->add( $key, $value, 15 );
351                 }
352
353                 return ( $value === 'true' );
354         }
355
356         /**
357          * Get the list of job types that have non-empty queues
358          *
359          * @return array List of job types that have non-empty queues
360          */
361         public function getQueuesWithJobs() {
362                 $types = [];
363                 foreach ( $this->getCoalescedQueues() as $info ) {
364                         $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
365                         if ( is_array( $nonEmpty ) ) { // batching features supported
366                                 $types = array_merge( $types, $nonEmpty );
367                         } else { // we have to go through the queues in the bucket one-by-one
368                                 foreach ( $info['types'] as $type ) {
369                                         if ( !$this->get( $type )->isEmpty() ) {
370                                                 $types[] = $type;
371                                         }
372                                 }
373                         }
374                 }
375
376                 return $types;
377         }
378
379         /**
380          * Get the size of the queus for a list of job types
381          *
382          * @return array Map of (job type => size)
383          */
384         public function getQueueSizes() {
385                 $sizeMap = [];
386                 foreach ( $this->getCoalescedQueues() as $info ) {
387                         $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
388                         if ( is_array( $sizes ) ) { // batching features supported
389                                 $sizeMap = $sizeMap + $sizes;
390                         } else { // we have to go through the queues in the bucket one-by-one
391                                 foreach ( $info['types'] as $type ) {
392                                         $sizeMap[$type] = $this->get( $type )->getSize();
393                                 }
394                         }
395                 }
396
397                 return $sizeMap;
398         }
399
400         /**
401          * @return array
402          */
403         protected function getCoalescedQueues() {
404                 global $wgJobTypeConf;
405
406                 if ( $this->coalescedQueues === null ) {
407                         $this->coalescedQueues = [];
408                         foreach ( $wgJobTypeConf as $type => $conf ) {
409                                 $queue = JobQueue::factory(
410                                         [ 'wiki' => $this->wiki, 'type' => 'null' ] + $conf );
411                                 $loc = $queue->getCoalesceLocationInternal();
412                                 if ( !isset( $this->coalescedQueues[$loc] ) ) {
413                                         $this->coalescedQueues[$loc]['queue'] = $queue;
414                                         $this->coalescedQueues[$loc]['types'] = [];
415                                 }
416                                 if ( $type === 'default' ) {
417                                         $this->coalescedQueues[$loc]['types'] = array_merge(
418                                                 $this->coalescedQueues[$loc]['types'],
419                                                 array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
420                                         );
421                                 } else {
422                                         $this->coalescedQueues[$loc]['types'][] = $type;
423                                 }
424                         }
425                 }
426
427                 return $this->coalescedQueues;
428         }
429
430         /**
431          * @param string $name
432          * @return mixed
433          */
434         private function getCachedConfigVar( $name ) {
435                 // @TODO: cleanup this whole method with a proper config system
436                 if ( $this->wiki === wfWikiID() ) {
437                         return $GLOBALS[$name]; // common case
438                 } else {
439                         $wiki = $this->wiki;
440                         $cache = ObjectCache::getMainWANInstance();
441                         $value = $cache->getWithSetCallback(
442                                 $cache->makeGlobalKey( 'jobqueue', 'configvalue', $wiki, $name ),
443                                 $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ),
444                                 function () use ( $wiki, $name ) {
445                                         global $wgConf;
446
447                                         return [ 'v' => $wgConf->getConfig( $wiki, $name ) ];
448                                 },
449                                 [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG ]
450                         );
451
452                         return $value['v'];
453                 }
454         }
455
456         /**
457          * @param array $jobs
458          * @throws InvalidArgumentException
459          */
460         private function assertValidJobs( array $jobs ) {
461                 foreach ( $jobs as $job ) { // sanity checks
462                         if ( !( $job instanceof IJobSpecification ) ) {
463                                 throw new InvalidArgumentException( "Expected IJobSpecification objects" );
464                         }
465                 }
466         }
467
468         function __destruct() {
469                 $n = count( $this->bufferedJobs );
470                 if ( $n > 0 ) {
471                         $type = implode( ', ', array_unique( array_map( 'get_class', $this->bufferedJobs ) ) );
472                         trigger_error( __METHOD__ . ": $n buffered job(s) of type(s) $type never inserted." );
473                 }
474         }
475 }