]> scripts.mit.edu Git - autoinstalls/mediawiki.git/blob - includes/JobQueue.php
MediaWiki 1.16.5-scripts
[autoinstalls/mediawiki.git] / includes / JobQueue.php
1 <?php
2 /**
3  * @defgroup JobQueue JobQueue
4  */
5
6 if ( !defined( 'MEDIAWIKI' ) ) {
7         die( "This file is part of MediaWiki, it is not a valid entry point\n" );
8 }
9
10 /**
11  * Class to both describe a background job and handle jobs.
12  *
13  * @ingroup JobQueue
14  */
15 abstract class Job {
16         var $command,
17                 $title,
18                 $params,
19                 $id,
20                 $removeDuplicates,
21                 $error;
22
23         /*-------------------------------------------------------------------------
24          * Abstract functions
25          *------------------------------------------------------------------------*/
26
27         /**
28          * Run the job
29          * @return boolean success
30          */
31         abstract function run();
32
33         /*-------------------------------------------------------------------------
34          * Static functions
35          *------------------------------------------------------------------------*/
36
37         /**
38          * @deprecated use LinksUpdate::queueRecursiveJobs()
39          */
40         /**
41          * static function queueLinksJobs( $titles ) {}
42          */
43
44         /**
45          * Pop a job of a certain type.  This tries less hard than pop() to
46          * actually find a job; it may be adversely affected by concurrent job
47          * runners.
48          */
49         static function pop_type( $type ) {
50                 wfProfilein( __METHOD__ );
51
52                 $dbw = wfGetDB( DB_MASTER );
53
54                 $row = $dbw->selectRow(
55                         'job',
56                         '*',
57                         array( 'job_cmd' => $type ),
58                         __METHOD__,
59                         array( 'LIMIT' => 1 )
60                 );
61
62                 if ( $row === false ) {
63                         wfProfileOut( __METHOD__ );
64                         return false;
65                 }
66
67                 /* Ensure we "own" this row */
68                 $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
69                 $affected = $dbw->affectedRows();
70
71                 if ( $affected == 0 ) {
72                         wfProfileOut( __METHOD__ );
73                         return false;
74                 }
75
76                 $namespace = $row->job_namespace;
77                 $dbkey = $row->job_title;
78                 $title = Title::makeTitleSafe( $namespace, $dbkey );
79                 $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ), $row->job_id );
80
81                 $dbw->delete( 'job', $job->insertFields(), __METHOD__ );
82                 $dbw->commit();
83
84                 wfProfileOut( __METHOD__ );
85                 return $job;
86         }
87
88         /**
89          * Pop a job off the front of the queue
90          *
91          * @param $offset Integer: Number of jobs to skip
92          * @return Job or false if there's no jobs
93          */
94         static function pop( $offset = 0 ) {
95                 wfProfileIn( __METHOD__ );
96
97                 $dbr = wfGetDB( DB_SLAVE );
98
99                 /* Get a job from the slave, start with an offset,
100                         scan full set afterwards, avoid hitting purged rows
101
102                         NB: If random fetch previously was used, offset
103                                 will always be ahead of few entries
104                 */
105
106                 $row = $dbr->selectRow( 'job', '*', "job_id >= ${offset}", __METHOD__,
107                         array( 'ORDER BY' => 'job_id', 'LIMIT' => 1 ) );
108
109                 // Refetching without offset is needed as some of job IDs could have had delayed commits
110                 // and have lower IDs than jobs already executed, blame concurrency :)
111                 //
112                 if ( $row === false ) {
113                         if ( $offset != 0 ) {
114                                 $row = $dbr->selectRow( 'job', '*', '', __METHOD__,
115                                         array( 'ORDER BY' => 'job_id', 'LIMIT' => 1 ) );
116                         }
117
118                         if ( $row === false ) {
119                                 wfProfileOut( __METHOD__ );
120                                 return false;
121                         }
122                 }
123                 $offset = $row->job_id;
124
125                 // Try to delete it from the master
126                 $dbw = wfGetDB( DB_MASTER );
127                 $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
128                 $affected = $dbw->affectedRows();
129                 $dbw->commit();
130
131                 if ( !$affected ) {
132                         // Failed, someone else beat us to it
133                         // Try getting a random row
134                         $row = $dbw->selectRow( 'job', array( 'MIN(job_id) as minjob',
135                                 'MAX(job_id) as maxjob' ), '1=1', __METHOD__ );
136                         if ( $row === false || is_null( $row->minjob ) || is_null( $row->maxjob ) ) {
137                                 // No jobs to get
138                                 wfProfileOut( __METHOD__ );
139                                 return false;
140                         }
141                         // Get the random row
142                         $row = $dbw->selectRow( 'job', '*',
143                                 'job_id >= ' . mt_rand( $row->minjob, $row->maxjob ), __METHOD__ );
144                         if ( $row === false ) {
145                                 // Random job gone before we got the chance to select it
146                                 // Give up
147                                 wfProfileOut( __METHOD__ );
148                                 return false;
149                         }
150                         // Delete the random row
151                         $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
152                         $affected = $dbw->affectedRows();
153                         $dbw->commit();
154
155                         if ( !$affected ) {
156                                 // Random job gone before we exclusively deleted it
157                                 // Give up
158                                 wfProfileOut( __METHOD__ );
159                                 return false;
160                         }
161                 }
162
163                 // If execution got to here, there's a row in $row that has been deleted from the database
164                 // by this thread. Hence the concurrent pop was successful.
165                 $namespace = $row->job_namespace;
166                 $dbkey = $row->job_title;
167                 $title = Title::makeTitleSafe( $namespace, $dbkey );
168                 $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ), $row->job_id );
169
170                 // Remove any duplicates it may have later in the queue
171                 // Deadlock prone section
172                 $dbw->begin();
173                 $dbw->delete( 'job', $job->insertFields(), __METHOD__ );
174                 $dbw->commit();
175
176                 wfProfileOut( __METHOD__ );
177                 return $job;
178         }
179
180         /**
181          * Create the appropriate object to handle a specific job
182          *
183          * @param $command String: Job command
184          * @param $title Title: Associated title
185          * @param $params Array: Job parameters
186          * @param $id Int: Job identifier
187          * @return Job
188          */
189         static function factory( $command, $title, $params = false, $id = 0 ) {
190                 global $wgJobClasses;
191                 if( isset( $wgJobClasses[$command] ) ) {
192                         $class = $wgJobClasses[$command];
193                         return new $class( $title, $params, $id );
194                 }
195                 throw new MWException( "Invalid job command `{$command}`" );
196         }
197
198         static function makeBlob( $params ) {
199                 if ( $params !== false ) {
200                         return serialize( $params );
201                 } else {
202                         return '';
203                 }
204         }
205
206         static function extractBlob( $blob ) {
207                 if ( (string)$blob !== '' ) {
208                         return unserialize( $blob );
209                 } else {
210                         return false;
211                 }
212         }
213
214         /**
215          * Batch-insert a group of jobs into the queue.
216          * This will be wrapped in a transaction with a forced commit.
217          *
218          * This may add duplicate at insert time, but they will be
219          * removed later on, when the first one is popped.
220          *
221          * @param $jobs array of Job objects
222          */
223         static function batchInsert( $jobs ) {
224                 if( !count( $jobs ) ) {
225                         return;
226                 }
227                 $dbw = wfGetDB( DB_MASTER );
228                 $rows = array();
229                 foreach( $jobs as $job ) {
230                         $rows[] = $job->insertFields();
231                         if ( count( $rows ) >= 50 ) {
232                                 # Do a small transaction to avoid slave lag
233                                 $dbw->begin();
234                                 $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
235                                 $dbw->commit();
236                                 $rows = array();
237                         }
238                 }
239                 if ( $rows ) {
240                         $dbw->begin();
241                         $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
242                         $dbw->commit();
243                 }
244         }
245
246         /*-------------------------------------------------------------------------
247          * Non-static functions
248          *------------------------------------------------------------------------*/
249
250         function __construct( $command, $title, $params = false, $id = 0 ) {
251                 $this->command = $command;
252                 $this->title = $title;
253                 $this->params = $params;
254                 $this->id = $id;
255
256                 // A bit of premature generalisation
257                 // Oh well, the whole class is premature generalisation really
258                 $this->removeDuplicates = true;
259         }
260
261         /**
262          * Insert a single job into the queue.
263          */
264         function insert() {
265                 $fields = $this->insertFields();
266
267                 $dbw = wfGetDB( DB_MASTER );
268
269                 if ( $this->removeDuplicates ) {
270                         $res = $dbw->select( 'job', array( '1' ), $fields, __METHOD__ );
271                         if ( $dbw->numRows( $res ) ) {
272                                 return;
273                         }
274                 }
275                 $dbw->insert( 'job', $fields, __METHOD__ );
276         }
277
278         protected function insertFields() {
279                 $dbw = wfGetDB( DB_MASTER );
280                 return array(
281                         'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
282                         'job_cmd' => $this->command,
283                         'job_namespace' => $this->title->getNamespace(),
284                         'job_title' => $this->title->getDBkey(),
285                         'job_params' => Job::makeBlob( $this->params )
286                 );
287         }
288
289         function toString() {
290                 $paramString = '';
291                 if ( $this->params ) {
292                         foreach ( $this->params as $key => $value ) {
293                                 if ( $paramString != '' ) {
294                                         $paramString .= ' ';
295                                 }
296                                 $paramString .= "$key=$value";
297                         }
298                 }
299
300                 if ( is_object( $this->title ) ) {
301                         $s = "{$this->command} " . $this->title->getPrefixedDBkey();
302                         if ( $paramString !== '' ) {
303                                 $s .= ' ' . $paramString;
304                         }
305                         return $s;
306                 } else {
307                         return "{$this->command} $paramString";
308                 }
309         }
310
311         protected function setLastError( $error ) {
312                 $this->error = $error;
313         }
314
315         function getLastError() {
316                 return $this->error;
317         }
318 }