]> scripts.mit.edu Git - autoinstalls/mediawiki.git/blob - includes/job/JobQueue.php
MediaWiki 1.17.0
[autoinstalls/mediawiki.git] / includes / job / JobQueue.php
1 <?php
2 /**
3  * Job queue base code
4  *
5  * @file
6  * @defgroup JobQueue JobQueue
7  */
8
9 if ( !defined( 'MEDIAWIKI' ) ) {
10         die( "This file is part of MediaWiki, it is not a valid entry point\n" );
11 }
12
13 /**
14  * Class to both describe a background job and handle jobs.
15  *
16  * @ingroup JobQueue
17  */
18 abstract class Job {
19         var $command,
20                 $title,
21                 $params,
22                 $id,
23                 $removeDuplicates,
24                 $error;
25
26         /*-------------------------------------------------------------------------
27          * Abstract functions
28          *------------------------------------------------------------------------*/
29
30         /**
31          * Run the job
32          * @return boolean success
33          */
34         abstract function run();
35
36         /*-------------------------------------------------------------------------
37          * Static functions
38          *------------------------------------------------------------------------*/
39
40         /**
41          * Pop a job of a certain type.  This tries less hard than pop() to
42          * actually find a job; it may be adversely affected by concurrent job
43          * runners.
44          */
45         static function pop_type( $type ) {
46                 wfProfilein( __METHOD__ );
47
48                 $dbw = wfGetDB( DB_MASTER );
49
50                 $row = $dbw->selectRow(
51                         'job',
52                         '*',
53                         array( 'job_cmd' => $type ),
54                         __METHOD__,
55                         array( 'LIMIT' => 1 )
56                 );
57
58                 if ( $row === false ) {
59                         wfProfileOut( __METHOD__ );
60                         return false;
61                 }
62
63                 /* Ensure we "own" this row */
64                 $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
65                 $affected = $dbw->affectedRows();
66
67                 if ( $affected == 0 ) {
68                         wfProfileOut( __METHOD__ );
69                         return false;
70                 }
71
72                 $namespace = $row->job_namespace;
73                 $dbkey = $row->job_title;
74                 $title = Title::makeTitleSafe( $namespace, $dbkey );
75                 $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ),
76                         $row->job_id );
77
78                 $dbw->delete( 'job', $job->insertFields(), __METHOD__ );
79                 $dbw->commit();
80
81                 wfProfileOut( __METHOD__ );
82                 return $job;
83         }
84
85         /**
86          * Pop a job off the front of the queue
87          *
88          * @param $offset Integer: Number of jobs to skip
89          * @return Job or false if there's no jobs
90          */
91         static function pop( $offset = 0 ) {
92                 wfProfileIn( __METHOD__ );
93
94                 $dbr = wfGetDB( DB_SLAVE );
95
96                 /* Get a job from the slave, start with an offset,
97                         scan full set afterwards, avoid hitting purged rows
98
99                         NB: If random fetch previously was used, offset
100                                 will always be ahead of few entries
101                 */
102
103                 $row = $dbr->selectRow( 'job', '*', "job_id >= ${offset}", __METHOD__,
104                         array( 'ORDER BY' => 'job_id', 'LIMIT' => 1 ) );
105
106                 // Refetching without offset is needed as some of job IDs could have had delayed commits
107                 // and have lower IDs than jobs already executed, blame concurrency :)
108                 //
109                 if ( $row === false ) {
110                         if ( $offset != 0 ) {
111                                 $row = $dbr->selectRow( 'job', '*', '', __METHOD__,
112                                         array( 'ORDER BY' => 'job_id', 'LIMIT' => 1 ) );
113                         }
114
115                         if ( $row === false ) {
116                                 wfProfileOut( __METHOD__ );
117                                 return false;
118                         }
119                 }
120
121                 // Try to delete it from the master
122                 $dbw = wfGetDB( DB_MASTER );
123                 $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
124                 $affected = $dbw->affectedRows();
125                 $dbw->commit();
126
127                 if ( !$affected ) {
128                         // Failed, someone else beat us to it
129                         // Try getting a random row
130                         $row = $dbw->selectRow( 'job', array( 'MIN(job_id) as minjob',
131                                 'MAX(job_id) as maxjob' ), '1=1', __METHOD__ );
132                         if ( $row === false || is_null( $row->minjob ) || is_null( $row->maxjob ) ) {
133                                 // No jobs to get
134                                 wfProfileOut( __METHOD__ );
135                                 return false;
136                         }
137                         // Get the random row
138                         $row = $dbw->selectRow( 'job', '*',
139                                 'job_id >= ' . mt_rand( $row->minjob, $row->maxjob ), __METHOD__ );
140                         if ( $row === false ) {
141                                 // Random job gone before we got the chance to select it
142                                 // Give up
143                                 wfProfileOut( __METHOD__ );
144                                 return false;
145                         }
146                         // Delete the random row
147                         $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
148                         $affected = $dbw->affectedRows();
149                         $dbw->commit();
150
151                         if ( !$affected ) {
152                                 // Random job gone before we exclusively deleted it
153                                 // Give up
154                                 wfProfileOut( __METHOD__ );
155                                 return false;
156                         }
157                 }
158
159                 // If execution got to here, there's a row in $row that has been deleted from the database
160                 // by this thread. Hence the concurrent pop was successful.
161                 $namespace = $row->job_namespace;
162                 $dbkey = $row->job_title;
163                 $title = Title::makeTitleSafe( $namespace, $dbkey );
164                 $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ), $row->job_id );
165
166                 // Remove any duplicates it may have later in the queue
167                 // Deadlock prone section
168                 $dbw->begin();
169                 $dbw->delete( 'job', $job->insertFields(), __METHOD__ );
170                 $dbw->commit();
171
172                 wfProfileOut( __METHOD__ );
173                 return $job;
174         }
175
176         /**
177          * Create the appropriate object to handle a specific job
178          *
179          * @param $command String: Job command
180          * @param $title Title: Associated title
181          * @param $params Array: Job parameters
182          * @param $id Int: Job identifier
183          * @return Job
184          */
185         static function factory( $command, $title, $params = false, $id = 0 ) {
186                 global $wgJobClasses;
187                 if( isset( $wgJobClasses[$command] ) ) {
188                         $class = $wgJobClasses[$command];
189                         return new $class( $title, $params, $id );
190                 }
191                 throw new MWException( "Invalid job command `{$command}`" );
192         }
193
194         static function makeBlob( $params ) {
195                 if ( $params !== false ) {
196                         return serialize( $params );
197                 } else {
198                         return '';
199                 }
200         }
201
202         static function extractBlob( $blob ) {
203                 if ( (string)$blob !== '' ) {
204                         return unserialize( $blob );
205                 } else {
206                         return false;
207                 }
208         }
209
210         /**
211          * Batch-insert a group of jobs into the queue.
212          * This will be wrapped in a transaction with a forced commit.
213          *
214          * This may add duplicate at insert time, but they will be
215          * removed later on, when the first one is popped.
216          *
217          * @param $jobs array of Job objects
218          */
219         static function batchInsert( $jobs ) {
220                 if ( !count( $jobs ) ) {
221                         return;
222                 }
223                 $dbw = wfGetDB( DB_MASTER );
224                 $rows = array();
225                 foreach ( $jobs as $job ) {
226                         $rows[] = $job->insertFields();
227                         if ( count( $rows ) >= 50 ) {
228                                 # Do a small transaction to avoid slave lag
229                                 $dbw->begin();
230                                 $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
231                                 $dbw->commit();
232                                 $rows = array();
233                         }
234                 }
235                 if ( $rows ) { // last chunk
236                         $dbw->begin();
237                         $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
238                         $dbw->commit();
239                 }
240                 wfIncrStats( 'job-insert', count( $jobs ) );
241         }
242
243         /**
244          * Insert a group of jobs into the queue.
245          *
246          * Same as batchInsert() but does not commit and can thus
247          * be rolled-back as part of a larger transaction. However,
248          * large batches of jobs can cause slave lag.
249          *
250          * @param $jobs array of Job objects
251          */
252         static function safeBatchInsert( $jobs ) {
253                 if ( !count( $jobs ) ) {
254                         return;
255                 }
256                 $dbw = wfGetDB( DB_MASTER );
257                 $rows = array();
258                 foreach ( $jobs as $job ) {
259                         $rows[] = $job->insertFields();
260                         if ( count( $rows ) >= 500 ) {
261                                 $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
262                                 $rows = array();
263                         }
264                 }
265                 if ( $rows ) { // last chunk
266                         $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
267                 }
268                 wfIncrStats( 'job-insert', count( $jobs ) );
269         }
270
271         /*-------------------------------------------------------------------------
272          * Non-static functions
273          *------------------------------------------------------------------------*/
274
275         function __construct( $command, $title, $params = false, $id = 0 ) {
276                 $this->command = $command;
277                 $this->title = $title;
278                 $this->params = $params;
279                 $this->id = $id;
280
281                 // A bit of premature generalisation
282                 // Oh well, the whole class is premature generalisation really
283                 $this->removeDuplicates = true;
284         }
285
286         /**
287          * Insert a single job into the queue.
288          * @return bool true on success
289          */
290         function insert() {
291                 $fields = $this->insertFields();
292
293                 $dbw = wfGetDB( DB_MASTER );
294
295                 if ( $this->removeDuplicates ) {
296                         $res = $dbw->select( 'job', array( '1' ), $fields, __METHOD__ );
297                         if ( $dbw->numRows( $res ) ) {
298                                 return;
299                         }
300                 }
301                 return $dbw->insert( 'job', $fields, __METHOD__ );
302         }
303
304         protected function insertFields() {
305                 $dbw = wfGetDB( DB_MASTER );
306                 return array(
307                         'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
308                         'job_cmd' => $this->command,
309                         'job_namespace' => $this->title->getNamespace(),
310                         'job_title' => $this->title->getDBkey(),
311                         'job_params' => Job::makeBlob( $this->params )
312                 );
313         }
314
315         function toString() {
316                 $paramString = '';
317                 if ( $this->params ) {
318                         foreach ( $this->params as $key => $value ) {
319                                 if ( $paramString != '' ) {
320                                         $paramString .= ' ';
321                                 }
322                                 $paramString .= "$key=$value";
323                         }
324                 }
325
326                 if ( is_object( $this->title ) ) {
327                         $s = "{$this->command} " . $this->title->getPrefixedDBkey();
328                         if ( $paramString !== '' ) {
329                                 $s .= ' ' . $paramString;
330                         }
331                         return $s;
332                 } else {
333                         return "{$this->command} $paramString";
334                 }
335         }
336
337         protected function setLastError( $error ) {
338                 $this->error = $error;
339         }
340
341         function getLastError() {
342                 return $this->error;
343         }
344 }