]> scripts.mit.edu Git - autoinstalls/mediawiki.git/blob - includes/jobqueue/JobQueue.php
MediaWiki 1.30.2-scripts2
[autoinstalls/mediawiki.git] / includes / jobqueue / JobQueue.php
1 <?php
2 /**
3  * Job queue base code.
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18  * http://www.gnu.org/copyleft/gpl.html
19  *
20  * @file
21  * @defgroup JobQueue JobQueue
22  */
23 use MediaWiki\MediaWikiServices;
24
25 /**
26  * Class to handle enqueueing and running of background jobs
27  *
28  * @ingroup JobQueue
29  * @since 1.21
30  */
31 abstract class JobQueue {
32         /** @var string Wiki ID */
33         protected $wiki;
34         /** @var string Job type */
35         protected $type;
36         /** @var string Job priority for pop() */
37         protected $order;
38         /** @var int Time to live in seconds */
39         protected $claimTTL;
40         /** @var int Maximum number of times to try a job */
41         protected $maxTries;
42         /** @var string|bool Read only rationale (or false if r/w) */
43         protected $readOnlyReason;
44
45         /** @var BagOStuff */
46         protected $dupCache;
47         /** @var JobQueueAggregator */
48         protected $aggr;
49
50         const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
51
52         const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
53
54         /**
55          * @param array $params
56          * @throws MWException
57          */
58         protected function __construct( array $params ) {
59                 $this->wiki = $params['wiki'];
60                 $this->type = $params['type'];
61                 $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0;
62                 $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3;
63                 if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
64                         $this->order = $params['order'];
65                 } else {
66                         $this->order = $this->optimalOrder();
67                 }
68                 if ( !in_array( $this->order, $this->supportedOrders() ) ) {
69                         throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
70                 }
71                 $this->dupCache = wfGetCache( CACHE_ANYTHING );
72                 $this->aggr = isset( $params['aggregator'] )
73                         ? $params['aggregator']
74                         : new JobQueueAggregatorNull( [] );
75                 $this->readOnlyReason = isset( $params['readOnlyReason'] )
76                         ? $params['readOnlyReason']
77                         : false;
78         }
79
80         /**
81          * Get a job queue object of the specified type.
82          * $params includes:
83          *   - class      : What job class to use (determines job type)
84          *   - wiki       : wiki ID of the wiki the jobs are for (defaults to current wiki)
85          *   - type       : The name of the job types this queue handles
86          *   - order      : Order that pop() selects jobs, one of "fifo", "timestamp" or "random".
87          *                  If "fifo" is used, the queue will effectively be FIFO. Note that job
88          *                  completion will not appear to be exactly FIFO if there are multiple
89          *                  job runners since jobs can take different times to finish once popped.
90          *                  If "timestamp" is used, the queue will at least be loosely ordered
91          *                  by timestamp, allowing for some jobs to be popped off out of order.
92          *                  If "random" is used, pop() will pick jobs in random order.
93          *                  Note that it may only be weakly random (e.g. a lottery of the oldest X).
94          *                  If "any" is choosen, the queue will use whatever order is the fastest.
95          *                  This might be useful for improving concurrency for job acquisition.
96          *   - claimTTL   : If supported, the queue will recycle jobs that have been popped
97          *                  but not acknowledged as completed after this many seconds. Recycling
98          *                  of jobs simply means re-inserting them into the queue. Jobs can be
99          *                  attempted up to three times before being discarded.
100          *   - readOnlyReason : Set this to a string to make the queue read-only.
101          *
102          * Queue classes should throw an exception if they do not support the options given.
103          *
104          * @param array $params
105          * @return JobQueue
106          * @throws MWException
107          */
108         final public static function factory( array $params ) {
109                 $class = $params['class'];
110                 if ( !class_exists( $class ) ) {
111                         throw new MWException( "Invalid job queue class '$class'." );
112                 }
113                 $obj = new $class( $params );
114                 if ( !( $obj instanceof self ) ) {
115                         throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." );
116                 }
117
118                 return $obj;
119         }
120
121         /**
122          * @return string Wiki ID
123          */
124         final public function getWiki() {
125                 return $this->wiki;
126         }
127
128         /**
129          * @return string Job type that this queue handles
130          */
131         final public function getType() {
132                 return $this->type;
133         }
134
135         /**
136          * @return string One of (random, timestamp, fifo, undefined)
137          */
138         final public function getOrder() {
139                 return $this->order;
140         }
141
142         /**
143          * Get the allowed queue orders for configuration validation
144          *
145          * @return array Subset of (random, timestamp, fifo, undefined)
146          */
147         abstract protected function supportedOrders();
148
149         /**
150          * Get the default queue order to use if configuration does not specify one
151          *
152          * @return string One of (random, timestamp, fifo, undefined)
153          */
154         abstract protected function optimalOrder();
155
156         /**
157          * Find out if delayed jobs are supported for configuration validation
158          *
159          * @return bool Whether delayed jobs are supported
160          */
161         protected function supportsDelayedJobs() {
162                 return false; // not implemented
163         }
164
165         /**
166          * @return bool Whether delayed jobs are enabled
167          * @since 1.22
168          */
169         final public function delayedJobsEnabled() {
170                 return $this->supportsDelayedJobs();
171         }
172
173         /**
174          * @return string|bool Read-only rational or false if r/w
175          * @since 1.27
176          */
177         public function getReadOnlyReason() {
178                 return $this->readOnlyReason;
179         }
180
181         /**
182          * Quickly check if the queue has no available (unacquired, non-delayed) jobs.
183          * Queue classes should use caching if they are any slower without memcached.
184          *
185          * If caching is used, this might return false when there are actually no jobs.
186          * If pop() is called and returns false then it should correct the cache. Also,
187          * calling flushCaches() first prevents this. However, this affect is typically
188          * not distinguishable from the race condition between isEmpty() and pop().
189          *
190          * @return bool
191          * @throws JobQueueError
192          */
193         final public function isEmpty() {
194                 $res = $this->doIsEmpty();
195
196                 return $res;
197         }
198
199         /**
200          * @see JobQueue::isEmpty()
201          * @return bool
202          */
203         abstract protected function doIsEmpty();
204
205         /**
206          * Get the number of available (unacquired, non-delayed) jobs in the queue.
207          * Queue classes should use caching if they are any slower without memcached.
208          *
209          * If caching is used, this number might be out of date for a minute.
210          *
211          * @return int
212          * @throws JobQueueError
213          */
214         final public function getSize() {
215                 $res = $this->doGetSize();
216
217                 return $res;
218         }
219
220         /**
221          * @see JobQueue::getSize()
222          * @return int
223          */
224         abstract protected function doGetSize();
225
226         /**
227          * Get the number of acquired jobs (these are temporarily out of the queue).
228          * Queue classes should use caching if they are any slower without memcached.
229          *
230          * If caching is used, this number might be out of date for a minute.
231          *
232          * @return int
233          * @throws JobQueueError
234          */
235         final public function getAcquiredCount() {
236                 $res = $this->doGetAcquiredCount();
237
238                 return $res;
239         }
240
241         /**
242          * @see JobQueue::getAcquiredCount()
243          * @return int
244          */
245         abstract protected function doGetAcquiredCount();
246
247         /**
248          * Get the number of delayed jobs (these are temporarily out of the queue).
249          * Queue classes should use caching if they are any slower without memcached.
250          *
251          * If caching is used, this number might be out of date for a minute.
252          *
253          * @return int
254          * @throws JobQueueError
255          * @since 1.22
256          */
257         final public function getDelayedCount() {
258                 $res = $this->doGetDelayedCount();
259
260                 return $res;
261         }
262
263         /**
264          * @see JobQueue::getDelayedCount()
265          * @return int
266          */
267         protected function doGetDelayedCount() {
268                 return 0; // not implemented
269         }
270
271         /**
272          * Get the number of acquired jobs that can no longer be attempted.
273          * Queue classes should use caching if they are any slower without memcached.
274          *
275          * If caching is used, this number might be out of date for a minute.
276          *
277          * @return int
278          * @throws JobQueueError
279          */
280         final public function getAbandonedCount() {
281                 $res = $this->doGetAbandonedCount();
282
283                 return $res;
284         }
285
286         /**
287          * @see JobQueue::getAbandonedCount()
288          * @return int
289          */
290         protected function doGetAbandonedCount() {
291                 return 0; // not implemented
292         }
293
294         /**
295          * Push one or more jobs into the queue.
296          * This does not require $wgJobClasses to be set for the given job type.
297          * Outside callers should use JobQueueGroup::push() instead of this function.
298          *
299          * @param IJobSpecification|IJobSpecification[] $jobs
300          * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC)
301          * @return void
302          * @throws JobQueueError
303          */
304         final public function push( $jobs, $flags = 0 ) {
305                 $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
306                 $this->batchPush( $jobs, $flags );
307         }
308
309         /**
310          * Push a batch of jobs into the queue.
311          * This does not require $wgJobClasses to be set for the given job type.
312          * Outside callers should use JobQueueGroup::push() instead of this function.
313          *
314          * @param IJobSpecification[] $jobs
315          * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC)
316          * @return void
317          * @throws MWException
318          */
319         final public function batchPush( array $jobs, $flags = 0 ) {
320                 $this->assertNotReadOnly();
321
322                 if ( !count( $jobs ) ) {
323                         return; // nothing to do
324                 }
325
326                 foreach ( $jobs as $job ) {
327                         if ( $job->getType() !== $this->type ) {
328                                 throw new MWException(
329                                         "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
330                         } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
331                                 throw new MWException(
332                                         "Got delayed '{$job->getType()}' job; delays are not supported." );
333                         }
334                 }
335
336                 $this->doBatchPush( $jobs, $flags );
337                 $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
338
339                 foreach ( $jobs as $job ) {
340                         if ( $job->isRootJob() ) {
341                                 $this->deduplicateRootJob( $job );
342                         }
343                 }
344         }
345
346         /**
347          * @see JobQueue::batchPush()
348          * @param IJobSpecification[] $jobs
349          * @param int $flags
350          */
351         abstract protected function doBatchPush( array $jobs, $flags );
352
353         /**
354          * Pop a job off of the queue.
355          * This requires $wgJobClasses to be set for the given job type.
356          * Outside callers should use JobQueueGroup::pop() instead of this function.
357          *
358          * @throws MWException
359          * @return Job|bool Returns false if there are no jobs
360          */
361         final public function pop() {
362                 global $wgJobClasses;
363
364                 $this->assertNotReadOnly();
365                 if ( $this->wiki !== wfWikiID() ) {
366                         throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." );
367                 } elseif ( !isset( $wgJobClasses[$this->type] ) ) {
368                         // Do not pop jobs if there is no class for the queue type
369                         throw new MWException( "Unrecognized job type '{$this->type}'." );
370                 }
371
372                 $job = $this->doPop();
373
374                 if ( !$job ) {
375                         $this->aggr->notifyQueueEmpty( $this->wiki, $this->type );
376                 }
377
378                 // Flag this job as an old duplicate based on its "root" job...
379                 try {
380                         if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
381                                 self::incrStats( 'dupe_pops', $this->type );
382                                 $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
383                         }
384                 } catch ( Exception $e ) {
385                         // don't lose jobs over this
386                 }
387
388                 return $job;
389         }
390
391         /**
392          * @see JobQueue::pop()
393          * @return Job|bool
394          */
395         abstract protected function doPop();
396
397         /**
398          * Acknowledge that a job was completed.
399          *
400          * This does nothing for certain queue classes or if "claimTTL" is not set.
401          * Outside callers should use JobQueueGroup::ack() instead of this function.
402          *
403          * @param Job $job
404          * @return void
405          * @throws MWException
406          */
407         final public function ack( Job $job ) {
408                 $this->assertNotReadOnly();
409                 if ( $job->getType() !== $this->type ) {
410                         throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
411                 }
412
413                 $this->doAck( $job );
414         }
415
416         /**
417          * @see JobQueue::ack()
418          * @param Job $job
419          */
420         abstract protected function doAck( Job $job );
421
422         /**
423          * Register the "root job" of a given job into the queue for de-duplication.
424          * This should only be called right *after* all the new jobs have been inserted.
425          * This is used to turn older, duplicate, job entries into no-ops. The root job
426          * information will remain in the registry until it simply falls out of cache.
427          *
428          * This requires that $job has two special fields in the "params" array:
429          *   - rootJobSignature : hash (e.g. SHA1) that identifies the task
430          *   - rootJobTimestamp : TS_MW timestamp of this instance of the task
431          *
432          * A "root job" is a conceptual job that consist of potentially many smaller jobs
433          * that are actually inserted into the queue. For example, "refreshLinks" jobs are
434          * spawned when a template is edited. One can think of the task as "update links
435          * of pages that use template X" and an instance of that task as a "root job".
436          * However, what actually goes into the queue are range and leaf job subtypes.
437          * Since these jobs include things like page ID ranges and DB master positions,
438          * and can morph into smaller jobs recursively, simple duplicate detection
439          * for individual jobs being identical (like that of job_sha1) is not useful.
440          *
441          * In the case of "refreshLinks", if these jobs are still in the queue when the template
442          * is edited again, we want all of these old refreshLinks jobs for that template to become
443          * no-ops. This can greatly reduce server load, since refreshLinks jobs involves parsing.
444          * Essentially, the new batch of jobs belong to a new "root job" and the older ones to a
445          * previous "root job" for the same task of "update links of pages that use template X".
446          *
447          * This does nothing for certain queue classes.
448          *
449          * @param IJobSpecification $job
450          * @throws MWException
451          * @return bool
452          */
453         final public function deduplicateRootJob( IJobSpecification $job ) {
454                 $this->assertNotReadOnly();
455                 if ( $job->getType() !== $this->type ) {
456                         throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
457                 }
458
459                 return $this->doDeduplicateRootJob( $job );
460         }
461
462         /**
463          * @see JobQueue::deduplicateRootJob()
464          * @param IJobSpecification $job
465          * @throws MWException
466          * @return bool
467          */
468         protected function doDeduplicateRootJob( IJobSpecification $job ) {
469                 if ( !$job->hasRootJobParams() ) {
470                         throw new MWException( "Cannot register root job; missing parameters." );
471                 }
472                 $params = $job->getRootJobParams();
473
474                 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
475                 // Callers should call batchInsert() and then this function so that if the insert
476                 // fails, the de-duplication registration will be aborted. Since the insert is
477                 // deferred till "transaction idle", do the same here, so that the ordering is
478                 // maintained. Having only the de-duplication registration succeed would cause
479                 // jobs to become no-ops without any actual jobs that made them redundant.
480                 $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job
481                 if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
482                         return true; // a newer version of this root job was enqueued
483                 }
484
485                 // Update the timestamp of the last root job started at the location...
486                 return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
487         }
488
489         /**
490          * Check if the "root" job of a given job has been superseded by a newer one
491          *
492          * @param Job $job
493          * @throws MWException
494          * @return bool
495          */
496         final protected function isRootJobOldDuplicate( Job $job ) {
497                 if ( $job->getType() !== $this->type ) {
498                         throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
499                 }
500                 $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
501
502                 return $isDuplicate;
503         }
504
505         /**
506          * @see JobQueue::isRootJobOldDuplicate()
507          * @param Job $job
508          * @return bool
509          */
510         protected function doIsRootJobOldDuplicate( Job $job ) {
511                 if ( !$job->hasRootJobParams() ) {
512                         return false; // job has no de-deplication info
513                 }
514                 $params = $job->getRootJobParams();
515
516                 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
517                 // Get the last time this root job was enqueued
518                 $timestamp = $this->dupCache->get( $key );
519
520                 // Check if a new root job was started at the location after this one's...
521                 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
522         }
523
524         /**
525          * @param string $signature Hash identifier of the root job
526          * @return string
527          */
528         protected function getRootJobCacheKey( $signature ) {
529                 list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
530
531                 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
532         }
533
534         /**
535          * Deleted all unclaimed and delayed jobs from the queue
536          *
537          * @throws JobQueueError
538          * @since 1.22
539          * @return void
540          */
541         final public function delete() {
542                 $this->assertNotReadOnly();
543
544                 $this->doDelete();
545         }
546
547         /**
548          * @see JobQueue::delete()
549          * @throws MWException
550          */
551         protected function doDelete() {
552                 throw new MWException( "This method is not implemented." );
553         }
554
555         /**
556          * Wait for any replica DBs or backup servers to catch up.
557          *
558          * This does nothing for certain queue classes.
559          *
560          * @return void
561          * @throws JobQueueError
562          */
563         final public function waitForBackups() {
564                 $this->doWaitForBackups();
565         }
566
567         /**
568          * @see JobQueue::waitForBackups()
569          * @return void
570          */
571         protected function doWaitForBackups() {
572         }
573
574         /**
575          * Clear any process and persistent caches
576          *
577          * @return void
578          */
579         final public function flushCaches() {
580                 $this->doFlushCaches();
581         }
582
583         /**
584          * @see JobQueue::flushCaches()
585          * @return void
586          */
587         protected function doFlushCaches() {
588         }
589
590         /**
591          * Get an iterator to traverse over all available jobs in this queue.
592          * This does not include jobs that are currently acquired or delayed.
593          * Note: results may be stale if the queue is concurrently modified.
594          *
595          * @return Iterator
596          * @throws JobQueueError
597          */
598         abstract public function getAllQueuedJobs();
599
600         /**
601          * Get an iterator to traverse over all delayed jobs in this queue.
602          * Note: results may be stale if the queue is concurrently modified.
603          *
604          * @return Iterator
605          * @throws JobQueueError
606          * @since 1.22
607          */
608         public function getAllDelayedJobs() {
609                 return new ArrayIterator( [] ); // not implemented
610         }
611
612         /**
613          * Get an iterator to traverse over all claimed jobs in this queue
614          *
615          * Callers should be quick to iterator over it or few results
616          * will be returned due to jobs being acknowledged and deleted
617          *
618          * @return Iterator
619          * @throws JobQueueError
620          * @since 1.26
621          */
622         public function getAllAcquiredJobs() {
623                 return new ArrayIterator( [] ); // not implemented
624         }
625
626         /**
627          * Get an iterator to traverse over all abandoned jobs in this queue
628          *
629          * @return Iterator
630          * @throws JobQueueError
631          * @since 1.25
632          */
633         public function getAllAbandonedJobs() {
634                 return new ArrayIterator( [] ); // not implemented
635         }
636
637         /**
638          * Do not use this function outside of JobQueue/JobQueueGroup
639          *
640          * @return string
641          * @since 1.22
642          */
643         public function getCoalesceLocationInternal() {
644                 return null;
645         }
646
647         /**
648          * Check whether each of the given queues are empty.
649          * This is used for batching checks for queues stored at the same place.
650          *
651          * @param array $types List of queues types
652          * @return array|null (list of non-empty queue types) or null if unsupported
653          * @throws MWException
654          * @since 1.22
655          */
656         final public function getSiblingQueuesWithJobs( array $types ) {
657                 return $this->doGetSiblingQueuesWithJobs( $types );
658         }
659
660         /**
661          * @see JobQueue::getSiblingQueuesWithJobs()
662          * @param array $types List of queues types
663          * @return array|null (list of queue types) or null if unsupported
664          */
665         protected function doGetSiblingQueuesWithJobs( array $types ) {
666                 return null; // not supported
667         }
668
669         /**
670          * Check the size of each of the given queues.
671          * For queues not served by the same store as this one, 0 is returned.
672          * This is used for batching checks for queues stored at the same place.
673          *
674          * @param array $types List of queues types
675          * @return array|null (job type => whether queue is empty) or null if unsupported
676          * @throws MWException
677          * @since 1.22
678          */
679         final public function getSiblingQueueSizes( array $types ) {
680                 return $this->doGetSiblingQueueSizes( $types );
681         }
682
683         /**
684          * @see JobQueue::getSiblingQueuesSize()
685          * @param array $types List of queues types
686          * @return array|null (list of queue types) or null if unsupported
687          */
688         protected function doGetSiblingQueueSizes( array $types ) {
689                 return null; // not supported
690         }
691
692         /**
693          * @throws JobQueueReadOnlyError
694          */
695         protected function assertNotReadOnly() {
696                 if ( $this->readOnlyReason !== false ) {
697                         throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
698                 }
699         }
700
701         /**
702          * Call wfIncrStats() for the queue overall and for the queue type
703          *
704          * @param string $key Event type
705          * @param string $type Job type
706          * @param int $delta
707          * @since 1.22
708          */
709         public static function incrStats( $key, $type, $delta = 1 ) {
710                 static $stats;
711                 if ( !$stats ) {
712                         $stats = MediaWikiServices::getInstance()->getStatsdDataFactory();
713                 }
714                 $stats->updateCount( "jobqueue.{$key}.all", $delta );
715                 $stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
716         }
717 }
718
719 /**
720  * @ingroup JobQueue
721  * @since 1.22
722  */
723 class JobQueueError extends MWException {
724 }
725
726 class JobQueueConnectionError extends JobQueueError {
727 }
728
729 class JobQueueReadOnlyError extends JobQueueError {
730
731 }