]> scripts.mit.edu Git - autoinstalls/mediawiki.git/blob - maintenance/storage/recompressTracked.php
MediaWiki 1.17.0
[autoinstalls/mediawiki.git] / maintenance / storage / recompressTracked.php
1 <?php
2
3 $optionsWithArgs = RecompressTracked::getOptionsWithArgs();
4 require( dirname( __FILE__ ) . '/../commandLine.inc' );
5
6 if ( count( $args ) < 1 ) {
7         echo "Usage: php recompressTracked.php [options] <cluster> [... <cluster>...]
8 Moves blobs indexed by trackBlobs.php to a specified list of destination clusters, and recompresses them in the process. Restartable.
9
10 Options:
11         --procs <procs>         Set the number of child processes (default 1)
12         --copy-only             Copy only, do not update the text table. Restart without this option to complete.
13         --debug-log <file>      Log debugging data to the specified file
14         --info-log <file>       Log progress messages to the specified file
15         --critical-log <file>   Log error messages to the specified file
16 ";
17         exit( 1 );
18 }
19
20 $job = RecompressTracked::newFromCommandLine( $args, $options );
21 $job->execute();
22
23 class RecompressTracked {
24         var $destClusters;
25         var $batchSize = 1000;
26         var $orphanBatchSize = 1000;
27         var $reportingInterval = 10;
28         var $numProcs = 1;
29         var $useDiff, $pageBlobClass, $orphanBlobClass;
30         var $slavePipes, $slaveProcs, $prevSlaveId;
31         var $copyOnly = false;
32         var $isChild = false;
33         var $slaveId = false;
34         var $noCount = false;
35         var $debugLog, $infoLog, $criticalLog;
36         var $store;
37
38         static $optionsWithArgs = array( 'procs', 'slave-id', 'debug-log', 'info-log', 'critical-log' );
39         static $cmdLineOptionMap = array(
40                 'no-count' => 'noCount',
41                 'procs' => 'numProcs',
42                 'copy-only' => 'copyOnly',
43                 'child' => 'isChild',
44                 'slave-id' => 'slaveId',
45                 'debug-log' => 'debugLog',
46                 'info-log' => 'infoLog',
47                 'critical-log' => 'criticalLog',
48         );
49
50         static function getOptionsWithArgs() {
51                 return self::$optionsWithArgs;
52         }
53
54         static function newFromCommandLine( $args, $options ) {
55                 $jobOptions = array( 'destClusters' => $args );
56                 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
57                         if ( isset( $options[$cmdOption] ) ) {
58                                 $jobOptions[$classOption] = $options[$cmdOption];
59                         }
60                 }
61                 return new self( $jobOptions );
62         }
63
64         function __construct( $options ) {
65                 foreach ( $options as $name => $value ) {
66                         $this->$name = $value;
67                 }
68                 $this->store = new ExternalStoreDB;
69                 if ( !$this->isChild ) {
70                         $GLOBALS['wgDebugLogPrefix'] = "RCT M: ";
71                 } elseif ( $this->slaveId !== false ) {
72                         $GLOBALS['wgDebugLogPrefix'] = "RCT {$this->slaveId}: ";
73                 }
74                 $this->useDiff = function_exists( 'xdiff_string_bdiff' );
75                 $this->pageBlobClass = $this->useDiff ? 'DiffHistoryBlob' : 'ConcatenatedGzipHistoryBlob';
76                 $this->orphanBlobClass = 'ConcatenatedGzipHistoryBlob';
77         }
78
79         function debug( $msg ) {
80                 wfDebug( "$msg\n" );
81                 if ( $this->debugLog ) {
82                         $this->logToFile( $msg, $this->debugLog );
83                 }
84
85         }
86
87         function info( $msg ) {
88                 echo "$msg\n";
89                 if ( $this->infoLog ) {
90                         $this->logToFile( $msg, $this->infoLog );
91                 }
92         }
93
94         function critical( $msg ) {
95                 echo "$msg\n";
96                 if ( $this->criticalLog ) {
97                         $this->logToFile( $msg, $this->criticalLog );
98                 }
99         }
100
101         function logToFile( $msg, $file ) {
102                 $header = '[' . date( 'd\TH:i:s' ) . '] ' . wfHostname() . ' ' . posix_getpid();
103                 if ( $this->slaveId !== false ) {
104                         $header .= "({$this->slaveId})";
105                 }
106                 $header .= ' ' . wfWikiID();
107                 wfErrorLog( sprintf( "%-50s %s\n", $header, $msg ), $file );
108         }
109
110         /**
111          * Wait until the selected slave has caught up to the master.
112          * This allows us to use the slave for things that were committed in a
113          * previous part of this batch process.
114          */
115         function syncDBs() {
116                 $dbw = wfGetDB( DB_MASTER );
117                 $dbr = wfGetDB( DB_SLAVE );
118                 $pos = $dbw->getMasterPos();
119                 $dbr->masterPosWait( $pos, 100000 );
120         }
121
122         /**
123          * Execute parent or child depending on the isChild option
124          */
125         function execute() {
126                 if ( $this->isChild ) {
127                         $this->executeChild();
128                 } else {
129                         $this->executeParent();
130                 }
131         }
132
133         /**
134          * Execute the parent process
135          */
136         function executeParent() {
137                 if ( !$this->checkTrackingTable() ) {
138                         return;
139                 }
140
141                 $this->syncDBs();
142                 $this->startSlaveProcs();
143                 $this->doAllPages();
144                 $this->doAllOrphans();
145                 $this->killSlaveProcs();
146         }
147
148         /**
149          * Make sure the tracking table exists and isn't empty
150          */
151         function checkTrackingTable() {
152                 $dbr = wfGetDB( DB_SLAVE );
153                 if ( !$dbr->tableExists( 'blob_tracking' ) ) {
154                         $this->critical( "Error: blob_tracking table does not exist" );
155                         return false;
156                 }
157                 $row = $dbr->selectRow( 'blob_tracking', '*', false, __METHOD__ );
158                 if ( !$row ) {
159                         $this->info( "Warning: blob_tracking table contains no rows, skipping this wiki." );
160                         return false;
161                 }
162                 return true;
163         }
164
165         /**
166          * Start the worker processes.
167          * These processes will listen on stdin for commands.
168          * This necessary because text recompression is slow: loading, compressing and
169          * writing are all slow.
170          */
171         function startSlaveProcs() {
172                 $cmd = 'php ' . wfEscapeShellArg( __FILE__ );
173                 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
174                         if ( $cmdOption == 'slave-id' ) {
175                                 continue;
176                         } elseif ( in_array( $cmdOption, self::$optionsWithArgs ) && isset( $this->$classOption ) ) {
177                                 $cmd .= " --$cmdOption " . wfEscapeShellArg( $this->$classOption );
178                         } elseif ( $this->$classOption ) {
179                                 $cmd .= " --$cmdOption";
180                         }
181                 }
182                 $cmd .= ' --child' .
183                         ' --wiki ' . wfEscapeShellArg( wfWikiID() ) .
184                         ' ' . call_user_func_array( 'wfEscapeShellArg', $this->destClusters );
185
186                 $this->slavePipes = $this->slaveProcs = array();
187                 for ( $i = 0; $i < $this->numProcs; $i++ ) {
188                         $pipes = false;
189                         $spec = array(
190                                 array( 'pipe', 'r' ),
191                                 array( 'file', 'php://stdout', 'w' ),
192                                 array( 'file', 'php://stderr', 'w' )
193                         );
194                         wfSuppressWarnings();
195                         $proc = proc_open( "$cmd --slave-id $i", $spec, $pipes );
196                         wfRestoreWarnings();
197                         if ( !$proc ) {
198                                 $this->critical( "Error opening slave process: $cmd" );
199                                 exit( 1 );
200                         }
201                         $this->slaveProcs[$i] = $proc;
202                         $this->slavePipes[$i] = $pipes[0];
203                 }
204                 $this->prevSlaveId = -1;
205         }
206
207         /**
208          * Gracefully terminate the child processes
209          */
210         function killSlaveProcs() {
211                 $this->info( "Waiting for slave processes to finish..." );
212                 for ( $i = 0; $i < $this->numProcs; $i++ ) {
213                         $this->dispatchToSlave( $i, 'quit' );
214                 }
215                 for ( $i = 0; $i < $this->numProcs; $i++ ) {
216                         $status = proc_close( $this->slaveProcs[$i] );
217                         if ( $status ) {
218                                 $this->critical( "Warning: child #$i exited with status $status" );
219                         }
220                 }
221                 $this->info( "Done." );
222         }
223
224         /**
225          * Dispatch a command to the next available slave.
226          * This may block until a slave finishes its work and becomes available.
227          */
228         function dispatch( /*...*/ ) {
229                 $args = func_get_args();
230                 $pipes = $this->slavePipes;
231                 $numPipes = stream_select( $x = array(), $pipes, $y = array(), 3600 );
232                 if ( !$numPipes ) {
233                         $this->critical( "Error waiting to write to slaves. Aborting" );
234                         exit( 1 );
235                 }
236                 for ( $i = 0; $i < $this->numProcs; $i++ ) {
237                         $slaveId = ( $i + $this->prevSlaveId + 1 ) % $this->numProcs;
238                         if ( isset( $pipes[$slaveId] ) ) {
239                                 $this->prevSlaveId = $slaveId;
240                                 $this->dispatchToSlave( $slaveId, $args );
241                                 return;
242                         }
243                 }
244                 $this->critical( "Unreachable" );
245                 exit( 1 );
246         }
247
248         /**
249          * Dispatch a command to a specified slave
250          */
251         function dispatchToSlave( $slaveId, $args ) {
252                 $args = (array)$args;
253                 $cmd = implode( ' ',  $args );
254                 fwrite( $this->slavePipes[$slaveId], "$cmd\n" );
255         }
256
257         /**
258          * Move all tracked pages to the new clusters
259          */
260         function doAllPages() {
261                 $dbr = wfGetDB( DB_SLAVE );
262                 $i = 0;
263                 $startId = 0;
264                 if ( $this->noCount ) {
265                         $numPages = '[unknown]';
266                 } else {
267                         $numPages = $dbr->selectField( 'blob_tracking',
268                                 'COUNT(DISTINCT bt_page)',
269                                 # A condition is required so that this query uses the index
270                                 array( 'bt_moved' => 0 ),
271                                 __METHOD__
272                         );
273                 }
274                 if ( $this->copyOnly ) {
275                         $this->info( "Copying pages..." );
276                 } else {
277                         $this->info( "Moving pages..." );
278                 }
279                 while ( true ) {
280                         $res = $dbr->select( 'blob_tracking',
281                                 array( 'bt_page' ),
282                                 array(
283                                         'bt_moved' => 0,
284                                         'bt_page > ' . $dbr->addQuotes( $startId )
285                                 ),
286                                 __METHOD__,
287                                 array(
288                                         'DISTINCT',
289                                         'ORDER BY' => 'bt_page',
290                                         'LIMIT' => $this->batchSize,
291                                 )
292                         );
293                         if ( !$res->numRows() ) {
294                                 break;
295                         }
296                         foreach ( $res as $row ) {
297                                 $this->dispatch( 'doPage', $row->bt_page );
298                                 $i++;
299                         }
300                         $startId = $row->bt_page;
301                         $this->report( 'pages', $i, $numPages );
302                 }
303                 $this->report( 'pages', $i, $numPages );
304                 if ( $this->copyOnly ) {
305                         $this->info( "All page copies queued." );
306                 } else {
307                         $this->info( "All page moves queued." );
308                 }
309         }
310
311         /**
312          * Display a progress report
313          */
314         function report( $label, $current, $end ) {
315                 $this->numBatches++;
316                 if ( $current == $end || $this->numBatches >= $this->reportingInterval ) {
317                         $this->numBatches = 0;
318                         $this->info( "$label: $current / $end" );
319                         $this->waitForSlaves();
320                 }
321         }
322
323         /**
324          * Move all orphan text to the new clusters
325          */
326         function doAllOrphans() {
327                 $dbr = wfGetDB( DB_SLAVE );
328                 $startId = 0;
329                 $i = 0;
330                 if ( $this->noCount ) {
331                         $numOrphans = '[unknown]';
332                 } else {
333                         $numOrphans = $dbr->selectField( 'blob_tracking',
334                                 'COUNT(DISTINCT bt_text_id)',
335                                 array( 'bt_moved' => 0, 'bt_page' => 0 ),
336                                 __METHOD__ );
337                         if ( !$numOrphans ) {
338                                 return;
339                         }
340                 }
341                 if ( $this->copyOnly ) {
342                         $this->info( "Copying orphans..." );
343                 } else {
344                         $this->info( "Moving orphans..." );
345                 }
346
347                 while ( true ) {
348                         $res = $dbr->select( 'blob_tracking',
349                                 array( 'bt_text_id' ),
350                                 array(
351                                         'bt_moved' => 0,
352                                         'bt_page' => 0,
353                                         'bt_text_id > ' . $dbr->addQuotes( $startId )
354                                 ),
355                                 __METHOD__,
356                                 array(
357                                         'DISTINCT',
358                                         'ORDER BY' => 'bt_text_id',
359                                         'LIMIT' => $this->batchSize
360                                 )
361                         );
362                         if ( !$res->numRows() ) {
363                                 break;
364                         }
365                         $ids = array();
366                         foreach ( $res as $row ) {
367                                 $ids[] = $row->bt_text_id;
368                                 $i++;
369                         }
370                         // Need to send enough orphan IDs to the child at a time to fill a blob,
371                         // so orphanBatchSize needs to be at least ~100.
372                         // batchSize can be smaller or larger.
373                         while ( count( $ids ) > $this->orphanBatchSize ) {
374                                 $args = array_slice( $ids, 0, $this->orphanBatchSize );
375                                 $ids = array_slice( $ids, $this->orphanBatchSize );
376                                 array_unshift( $args, 'doOrphanList' );
377                                 call_user_func_array( array( $this, 'dispatch' ), $args );
378                         }
379                         if ( count( $ids ) ) {
380                                 $args = $ids;
381                                 array_unshift( $args, 'doOrphanList' );
382                                 call_user_func_array( array( $this, 'dispatch' ), $args );
383                         }
384
385                         $startId = $row->bt_text_id;
386                         $this->report( 'orphans', $i, $numOrphans );
387                 }
388                 $this->report( 'orphans', $i, $numOrphans );
389                 $this->info( "All orphans queued." );
390         }
391
392         /**
393          * Main entry point for worker processes
394          */
395         function executeChild() {
396                 $this->debug( 'starting' );
397                 $this->syncDBs();
398
399                 while ( !feof( STDIN ) ) {
400                         $line = rtrim( fgets( STDIN ) );
401                         if ( $line == '' ) {
402                                 continue;
403                         }
404                         $this->debug( $line );
405                         $args = explode( ' ', $line );
406                         $cmd = array_shift( $args );
407                         switch ( $cmd ) {
408                         case 'doPage':
409                                 $this->doPage( intval( $args[0] ) );
410                                 break;
411                         case 'doOrphanList':
412                                 $this->doOrphanList( array_map( 'intval', $args ) );
413                                 break;
414                         case 'quit':
415                                 return;
416                         }
417                         $this->waitForSlaves();
418                 }
419         }
420
421         /**
422          * Move tracked text in a given page
423          */
424         function doPage( $pageId ) {
425                 $title = Title::newFromId( $pageId );
426                 if ( $title ) {
427                         $titleText = $title->getPrefixedText();
428                 } else {
429                         $titleText = '[deleted]';
430                 }
431                 $dbr = wfGetDB( DB_SLAVE );
432
433                 // Finish any incomplete transactions
434                 if ( !$this->copyOnly ) {
435                         $this->finishIncompleteMoves( array( 'bt_page' => $pageId ) );
436                         $this->syncDBs();
437                 }
438
439                 $startId = 0;
440                 $trx = new CgzCopyTransaction( $this, $this->pageBlobClass );
441
442                 while ( true ) {
443                         $res = $dbr->select(
444                                 array( 'blob_tracking', 'text' ),
445                                 '*',
446                                 array(
447                                         'bt_page' => $pageId,
448                                         'bt_text_id > ' . $dbr->addQuotes( $startId ),
449                                         'bt_moved' => 0,
450                                         'bt_new_url IS NULL',
451                                         'bt_text_id=old_id',
452                                 ),
453                                 __METHOD__,
454                                 array(
455                                         'ORDER BY' => 'bt_text_id',
456                                         'LIMIT' => $this->batchSize
457                                 )
458                         );
459                         if ( !$res->numRows() ) {
460                                 break;
461                         }
462
463                         $lastTextId = 0;
464                         foreach ( $res as $row ) {
465                                 if ( $lastTextId == $row->bt_text_id ) {
466                                         // Duplicate (null edit)
467                                         continue;
468                                 }
469                                 $lastTextId = $row->bt_text_id;
470                                 // Load the text
471                                 $text = Revision::getRevisionText( $row );
472                                 if ( $text === false ) {
473                                         $this->critical( "Error loading {$row->bt_rev_id}/{$row->bt_text_id}" );
474                                         continue;
475                                 }
476
477                                 // Queue it
478                                 if ( !$trx->addItem( $text, $row->bt_text_id ) ) {
479                                         $this->debug( "$titleText: committing blob with " . $trx->getSize() . " items" );
480                                         $trx->commit();
481                                         $trx = new CgzCopyTransaction( $this, $this->pageBlobClass );
482                                         $this->waitForSlaves();
483                                 }
484                         }
485                         $startId = $row->bt_text_id;
486                 }
487
488                 $this->debug( "$titleText: committing blob with " . $trx->getSize() . " items" );
489                 $trx->commit();
490         }
491
492         /**
493          * Atomic move operation.
494          *
495          * Write the new URL to the text table and set the bt_moved flag.
496          *
497          * This is done in a single transaction to provide restartable behaviour
498          * without data loss.
499          *
500          * The transaction is kept short to reduce locking.
501          */
502         function moveTextRow( $textId, $url ) {
503                 if ( $this->copyOnly ) {
504                         $this->critical( "Internal error: can't call moveTextRow() in --copy-only mode" );
505                         exit( 1 );
506                 }
507                 $dbw = wfGetDB( DB_MASTER );
508                 $dbw->begin();
509                 $dbw->update( 'text',
510                         array( // set
511                                 'old_text' => $url,
512                                 'old_flags' => 'external,utf-8',
513                         ),
514                         array( // where
515                                 'old_id' => $textId
516                         ),
517                         __METHOD__
518                 );
519                 $dbw->update( 'blob_tracking',
520                         array( 'bt_moved' => 1 ),
521                         array( 'bt_text_id' => $textId ),
522                         __METHOD__
523                 );
524                 $dbw->commit();
525         }
526
527         /**
528          * Moves are done in two phases: bt_new_url and then bt_moved.
529          *  - bt_new_url indicates that the text has been copied to the new cluster.
530          *  - bt_moved indicates that the text table has been updated.
531          *
532          * This function completes any moves that only have done bt_new_url. This
533          * can happen when the script is interrupted, or when --copy-only is used.
534          */
535         function finishIncompleteMoves( $conds ) {
536                 $dbr = wfGetDB( DB_SLAVE );
537
538                 $startId = 0;
539                 $conds = array_merge( $conds, array(
540                         'bt_moved' => 0,
541                         'bt_new_url IS NOT NULL'
542                 ) );
543                 while ( true ) {
544                         $res = $dbr->select( 'blob_tracking',
545                                 '*',
546                                 array_merge( $conds, array( 'bt_text_id > ' . $dbr->addQuotes( $startId ) ) ),
547                                 __METHOD__,
548                                 array(
549                                         'ORDER BY' => 'bt_text_id',
550                                         'LIMIT' => $this->batchSize,
551                                 )
552                         );
553                         if ( !$res->numRows() ) {
554                                 break;
555                         }
556                         $this->debug( 'Incomplete: ' . $res->numRows() . ' rows' );
557                         foreach ( $res as $row ) {
558                                 $this->moveTextRow( $row->bt_text_id, $row->bt_new_url );
559                                 if ( $row->bt_text_id % 10 == 0 ) {
560                                         $this->waitForSlaves();
561                                 }
562                         }
563                         $startId = $row->bt_text_id;
564                 }
565         }
566
567         /**
568          * Returns the name of the next target cluster
569          */
570         function getTargetCluster() {
571                 $cluster = next( $this->destClusters );
572                 if ( $cluster === false ) {
573                         $cluster = reset( $this->destClusters );
574                 }
575                 return $cluster;
576         }
577
578         /**
579          * Gets a DB master connection for the given external cluster name
580          */
581         function getExtDB( $cluster ) {
582                 $lb = wfGetLBFactory()->getExternalLB( $cluster );
583                 return $lb->getConnection( DB_MASTER );
584         }
585
586         /**
587          * Move an orphan text_id to the new cluster
588          */
589         function doOrphanList( $textIds ) {
590                 // Finish incomplete moves
591                 if ( !$this->copyOnly ) {
592                         $this->finishIncompleteMoves( array( 'bt_text_id' => $textIds ) );
593                         $this->syncDBs();
594                 }
595
596                 $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass );
597
598                 $res = wfGetDB( DB_SLAVE )->select(
599                         array( 'text', 'blob_tracking' ),
600                         array( 'old_id', 'old_text', 'old_flags' ),
601                         array(
602                                 'old_id' => $textIds,
603                                 'bt_text_id=old_id',
604                                 'bt_moved' => 0,
605                         ),
606                         __METHOD__,
607                         array( 'DISTINCT' )
608                 );
609
610                 foreach ( $res as $row ) {
611                         $text = Revision::getRevisionText( $row );
612                         if ( $text === false ) {
613                                 $this->critical( "Error: cannot load revision text for old_id={$row->old_id}" );
614                                 continue;
615                         }
616
617                         if ( !$trx->addItem( $text, $row->old_id ) ) {
618                                 $this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" );
619                                 $trx->commit();
620                                 $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass );
621                                 $this->waitForSlaves();
622                         }
623                 }
624                 $this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" );
625                 $trx->commit();
626         }
627
628         /**
629          * Wait for slaves (quietly)
630          */
631         function waitForSlaves() {
632                 $lb = wfGetLB();
633                 while ( true ) {
634                         list( $host, $maxLag ) = $lb->getMaxLag();
635                         if ( $maxLag < 2 ) {
636                                 break;
637                         }
638                         sleep( 5 );
639                 }
640         }
641 }
642
643 /**
644  * Class to represent a recompression operation for a single CGZ blob
645  */
646 class CgzCopyTransaction {
647         var $parent;
648         var $blobClass;
649         var $cgz;
650         var $referrers;
651
652         /**
653          * Create a transaction from a RecompressTracked object
654          */
655         function __construct( $parent, $blobClass ) {
656                 $this->blobClass = $blobClass;
657                 $this->cgz = false;
658                 $this->texts = array();
659                 $this->parent = $parent;
660         }
661
662         /**
663          * Add text.
664          * Returns false if it's ready to commit.
665          */
666         function addItem( $text, $textId ) {
667                 if ( !$this->cgz ) {
668                         $class = $this->blobClass;
669                         $this->cgz = new $class;
670                 }
671                 $hash = $this->cgz->addItem( $text );
672                 $this->referrers[$textId] = $hash;
673                 $this->texts[$textId] = $text;
674                 return $this->cgz->isHappy();
675         }
676
677         function getSize() {
678                 return count( $this->texts );
679         }
680
681         /**
682          * Recompress text after some aberrant modification
683          */
684         function recompress() {
685                 $class = $this->blobClass;
686                 $this->cgz = new $class;
687                 $this->referrers = array();
688                 foreach ( $this->texts as $textId => $text ) {
689                         $hash = $this->cgz->addItem( $text );
690                         $this->referrers[$textId] = $hash;
691                 }
692         }
693
694         /**
695          * Commit the blob.
696          * Does nothing if no text items have been added.
697          * May skip the move if --copy-only is set.
698          */
699         function commit() {
700                 $originalCount = count( $this->texts );
701                 if ( !$originalCount ) {
702                         return;
703                 }
704
705                 // Check to see if the target text_ids have been moved already.
706                 //
707                 // We originally read from the slave, so this can happen when a single
708                 // text_id is shared between multiple pages. It's rare, but possible
709                 // if a delete/move/undelete cycle splits up a null edit.
710                 //
711                 // We do a locking read to prevent closer-run race conditions.
712                 $dbw = wfGetDB( DB_MASTER );
713                 $dbw->begin();
714                 $res = $dbw->select( 'blob_tracking',
715                         array( 'bt_text_id', 'bt_moved' ),
716                         array( 'bt_text_id' => array_keys( $this->referrers ) ),
717                         __METHOD__, array( 'FOR UPDATE' ) );
718                 $dirty = false;
719                 foreach ( $res as $row ) {
720                         if ( $row->bt_moved ) {
721                                 # This row has already been moved, remove it
722                                 $this->parent->debug( "TRX: conflict detected in old_id={$row->bt_text_id}" );
723                                 unset( $this->texts[$row->bt_text_id] );
724                                 $dirty = true;
725                         }
726                 }
727
728                 // Recompress the blob if necessary
729                 if ( $dirty ) {
730                         if ( !count( $this->texts ) ) {
731                                 // All have been moved already
732                                 if ( $originalCount > 1 ) {
733                                         // This is suspcious, make noise
734                                         $this->critical( "Warning: concurrent operation detected, are there two conflicting " .
735                                                 "processes running, doing the same job?" );
736                                 }
737                                 return;
738                         }
739                         $this->recompress();
740                 }
741
742                 // Insert the data into the destination cluster
743                 $targetCluster = $this->parent->getTargetCluster();
744                 $store = $this->parent->store;
745                 $targetDB = $store->getMaster( $targetCluster );
746                 $targetDB->clearFlag( DBO_TRX ); // we manage the transactions
747                 $targetDB->begin();
748                 $baseUrl = $this->parent->store->store( $targetCluster, serialize( $this->cgz ) );
749
750                 // Write the new URLs to the blob_tracking table
751                 foreach ( $this->referrers as $textId => $hash ) {
752                         $url = $baseUrl . '/' . $hash;
753                         $dbw->update( 'blob_tracking',
754                                 array( 'bt_new_url' => $url ),
755                                 array(
756                                         'bt_text_id' => $textId,
757                                         'bt_moved' => 0, # Check for concurrent conflicting update
758                                 ),
759                                 __METHOD__
760                         );
761                 }
762
763                 $targetDB->commit();
764                 // Critical section here: interruption at this point causes blob duplication
765                 // Reversing the order of the commits would cause data loss instead
766                 $dbw->commit();
767
768                 // Write the new URLs to the text table and set the moved flag
769                 if ( !$this->parent->copyOnly ) {
770                         foreach ( $this->referrers as $textId => $hash ) {
771                                 $url = $baseUrl . '/' . $hash;
772                                 $this->parent->moveTextRow( $textId, $url );
773                         }
774                 }
775         }
776 }
777