X-Git-Url: https://scripts.mit.edu/gitweb/autoinstallsdev/mediawiki.git/blobdiff_plain/19e297c21b10b1b8a3acad5e73fc71dcb35db44a..6932310fd58ebef145fa01eb76edf7150284d8ea:/maintenance/storage/recompressTracked.php diff --git a/maintenance/storage/recompressTracked.php b/maintenance/storage/recompressTracked.php index 8974a74d..c5dd53b1 100644 --- a/maintenance/storage/recompressTracked.php +++ b/maintenance/storage/recompressTracked.php @@ -1,18 +1,45 @@ [... ...] -Moves blobs indexed by trackBlobs.php to a specified list of destination clusters, and recompresses them in the process. Restartable. +Moves blobs indexed by trackBlobs.php to a specified list of destination clusters, +and recompresses them in the process. Restartable. Options: - --procs Set the number of child processes (default 1) - --copy-only Copy only, do not update the text table. Restart without this option to complete. - --debug-log Log debugging data to the specified file - --info-log Log progress messages to the specified file - --critical-log Log error messages to the specified file + --procs Set the number of child processes (default 1) + --copy-only Copy only, do not update the text table. Restart + without this option to complete. + --debug-log Log debugging data to the specified file + --info-log Log progress messages to the specified file + --critical-log Log error messages to the specified file "; exit( 1 ); } @@ -20,44 +47,59 @@ Options: $job = RecompressTracked::newFromCommandLine( $args, $options ); $job->execute(); +/** + * Maintenance script that moves blobs indexed by trackBlobs.php to a specified + * list of destination clusters, and recompresses them in the process. + * + * @ingroup Maintenance ExternalStorage + */ class RecompressTracked { - var $destClusters; - var $batchSize = 1000; - var $orphanBatchSize = 1000; - var $reportingInterval = 10; - var $numProcs = 1; - var $useDiff, $pageBlobClass, $orphanBlobClass; - var $slavePipes, $slaveProcs, $prevSlaveId; - var $copyOnly = false; - var $isChild = false; - var $slaveId = false; - var $noCount = false; - var $debugLog, $infoLog, $criticalLog; - var $store; - - static $optionsWithArgs = array( 'procs', 'slave-id', 'debug-log', 'info-log', 'critical-log' ); - static $cmdLineOptionMap = array( + public $destClusters; + public $batchSize = 1000; + public $orphanBatchSize = 1000; + public $reportingInterval = 10; + public $numProcs = 1; + public $numBatches = 0; + public $pageBlobClass, $orphanBlobClass; + public $replicaPipes, $replicaProcs, $prevReplicaId; + public $copyOnly = false; + public $isChild = false; + public $replicaId = false; + public $noCount = false; + public $debugLog, $infoLog, $criticalLog; + public $store; + + private static $optionsWithArgs = [ + 'procs', + 'replica-id', + 'debug-log', + 'info-log', + 'critical-log' + ]; + + private static $cmdLineOptionMap = [ 'no-count' => 'noCount', 'procs' => 'numProcs', 'copy-only' => 'copyOnly', 'child' => 'isChild', - 'slave-id' => 'slaveId', + 'replica-id' => 'replicaId', 'debug-log' => 'debugLog', 'info-log' => 'infoLog', 'critical-log' => 'criticalLog', - ); + ]; static function getOptionsWithArgs() { return self::$optionsWithArgs; } static function newFromCommandLine( $args, $options ) { - $jobOptions = array( 'destClusters' => $args ); + $jobOptions = [ 'destClusters' => $args ]; foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) { if ( isset( $options[$cmdOption] ) ) { $jobOptions[$classOption] = $options[$cmdOption]; } } + return new self( $jobOptions ); } @@ -68,11 +110,11 @@ class RecompressTracked { $this->store = new ExternalStoreDB; if ( !$this->isChild ) { $GLOBALS['wgDebugLogPrefix'] = "RCT M: "; - } elseif ( $this->slaveId !== false ) { - $GLOBALS['wgDebugLogPrefix'] = "RCT {$this->slaveId}: "; + } elseif ( $this->replicaId !== false ) { + $GLOBALS['wgDebugLogPrefix'] = "RCT {$this->replicaId}: "; } - $this->useDiff = function_exists( 'xdiff_string_bdiff' ); - $this->pageBlobClass = $this->useDiff ? 'DiffHistoryBlob' : 'ConcatenatedGzipHistoryBlob'; + $this->pageBlobClass = function_exists( 'xdiff_string_bdiff' ) ? + 'DiffHistoryBlob' : 'ConcatenatedGzipHistoryBlob'; $this->orphanBlobClass = 'ConcatenatedGzipHistoryBlob'; } @@ -81,7 +123,6 @@ class RecompressTracked { if ( $this->debugLog ) { $this->logToFile( $msg, $this->debugLog ); } - } function info( $msg ) { @@ -100,21 +141,21 @@ class RecompressTracked { function logToFile( $msg, $file ) { $header = '[' . date( 'd\TH:i:s' ) . '] ' . wfHostname() . ' ' . posix_getpid(); - if ( $this->slaveId !== false ) { - $header .= "({$this->slaveId})"; + if ( $this->replicaId !== false ) { + $header .= "({$this->replicaId})"; } $header .= ' ' . wfWikiID(); - wfErrorLog( sprintf( "%-50s %s\n", $header, $msg ), $file ); + LegacyLogger::emit( sprintf( "%-50s %s\n", $header, $msg ), $file ); } /** - * Wait until the selected slave has caught up to the master. - * This allows us to use the slave for things that were committed in a + * Wait until the selected replica DB has caught up to the master. + * This allows us to use the replica DB for things that were committed in a * previous part of this batch process. */ function syncDBs() { $dbw = wfGetDB( DB_MASTER ); - $dbr = wfGetDB( DB_SLAVE ); + $dbr = wfGetDB( DB_REPLICA ); $pos = $dbw->getMasterPos(); $dbr->masterPosWait( $pos, 100000 ); } @@ -139,26 +180,30 @@ class RecompressTracked { } $this->syncDBs(); - $this->startSlaveProcs(); + $this->startReplicaProcs(); $this->doAllPages(); $this->doAllOrphans(); - $this->killSlaveProcs(); + $this->killReplicaProcs(); } /** * Make sure the tracking table exists and isn't empty + * @return bool */ function checkTrackingTable() { - $dbr = wfGetDB( DB_SLAVE ); + $dbr = wfGetDB( DB_REPLICA ); if ( !$dbr->tableExists( 'blob_tracking' ) ) { $this->critical( "Error: blob_tracking table does not exist" ); + return false; } - $row = $dbr->selectRow( 'blob_tracking', '*', false, __METHOD__ ); + $row = $dbr->selectRow( 'blob_tracking', '*', '', __METHOD__ ); if ( !$row ) { $this->info( "Warning: blob_tracking table contains no rows, skipping this wiki." ); + return false; } + return true; } @@ -168,10 +213,10 @@ class RecompressTracked { * This necessary because text recompression is slow: loading, compressing and * writing are all slow. */ - function startSlaveProcs() { + function startReplicaProcs() { $cmd = 'php ' . wfEscapeShellArg( __FILE__ ); foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) { - if ( $cmdOption == 'slave-id' ) { + if ( $cmdOption == 'replica-id' ) { continue; } elseif ( in_array( $cmdOption, self::$optionsWithArgs ) && isset( $this->$classOption ) ) { $cmd .= " --$cmdOption " . wfEscapeShellArg( $this->$classOption ); @@ -183,37 +228,37 @@ class RecompressTracked { ' --wiki ' . wfEscapeShellArg( wfWikiID() ) . ' ' . call_user_func_array( 'wfEscapeShellArg', $this->destClusters ); - $this->slavePipes = $this->slaveProcs = array(); + $this->replicaPipes = $this->replicaProcs = []; for ( $i = 0; $i < $this->numProcs; $i++ ) { - $pipes = false; - $spec = array( - array( 'pipe', 'r' ), - array( 'file', 'php://stdout', 'w' ), - array( 'file', 'php://stderr', 'w' ) - ); - wfSuppressWarnings(); - $proc = proc_open( "$cmd --slave-id $i", $spec, $pipes ); - wfRestoreWarnings(); + $pipes = []; + $spec = [ + [ 'pipe', 'r' ], + [ 'file', 'php://stdout', 'w' ], + [ 'file', 'php://stderr', 'w' ] + ]; + MediaWiki\suppressWarnings(); + $proc = proc_open( "$cmd --replica-id $i", $spec, $pipes ); + MediaWiki\restoreWarnings(); if ( !$proc ) { - $this->critical( "Error opening slave process: $cmd" ); + $this->critical( "Error opening replica DB process: $cmd" ); exit( 1 ); } - $this->slaveProcs[$i] = $proc; - $this->slavePipes[$i] = $pipes[0]; + $this->replicaProcs[$i] = $proc; + $this->replicaPipes[$i] = $pipes[0]; } - $this->prevSlaveId = -1; + $this->prevReplicaId = -1; } /** * Gracefully terminate the child processes */ - function killSlaveProcs() { - $this->info( "Waiting for slave processes to finish..." ); + function killReplicaProcs() { + $this->info( "Waiting for replica DB processes to finish..." ); for ( $i = 0; $i < $this->numProcs; $i++ ) { - $this->dispatchToSlave( $i, 'quit' ); + $this->dispatchToReplica( $i, 'quit' ); } for ( $i = 0; $i < $this->numProcs; $i++ ) { - $status = proc_close( $this->slaveProcs[$i] ); + $status = proc_close( $this->replicaProcs[$i] ); if ( $status ) { $this->critical( "Warning: child #$i exited with status $status" ); } @@ -222,22 +267,23 @@ class RecompressTracked { } /** - * Dispatch a command to the next available slave. - * This may block until a slave finishes its work and becomes available. + * Dispatch a command to the next available replica DB. + * This may block until a replica DB finishes its work and becomes available. */ function dispatch( /*...*/ ) { $args = func_get_args(); - $pipes = $this->slavePipes; - $numPipes = stream_select( $x = array(), $pipes, $y = array(), 3600 ); + $pipes = $this->replicaPipes; + $numPipes = stream_select( $x = [], $pipes, $y = [], 3600 ); if ( !$numPipes ) { - $this->critical( "Error waiting to write to slaves. Aborting" ); + $this->critical( "Error waiting to write to replica DBs. Aborting" ); exit( 1 ); } for ( $i = 0; $i < $this->numProcs; $i++ ) { - $slaveId = ( $i + $this->prevSlaveId + 1 ) % $this->numProcs; - if ( isset( $pipes[$slaveId] ) ) { - $this->prevSlaveId = $slaveId; - $this->dispatchToSlave( $slaveId, $args ); + $replicaId = ( $i + $this->prevReplicaId + 1 ) % $this->numProcs; + if ( isset( $pipes[$replicaId] ) ) { + $this->prevReplicaId = $replicaId; + $this->dispatchToReplica( $replicaId, $args ); + return; } } @@ -246,19 +292,21 @@ class RecompressTracked { } /** - * Dispatch a command to a specified slave + * Dispatch a command to a specified replica DB + * @param int $replicaId + * @param array|string $args */ - function dispatchToSlave( $slaveId, $args ) { + function dispatchToReplica( $replicaId, $args ) { $args = (array)$args; - $cmd = implode( ' ', $args ); - fwrite( $this->slavePipes[$slaveId], "$cmd\n" ); + $cmd = implode( ' ', $args ); + fwrite( $this->replicaPipes[$replicaId], "$cmd\n" ); } /** * Move all tracked pages to the new clusters */ function doAllPages() { - $dbr = wfGetDB( DB_SLAVE ); + $dbr = wfGetDB( DB_REPLICA ); $i = 0; $startId = 0; if ( $this->noCount ) { @@ -267,7 +315,7 @@ class RecompressTracked { $numPages = $dbr->selectField( 'blob_tracking', 'COUNT(DISTINCT bt_page)', # A condition is required so that this query uses the index - array( 'bt_moved' => 0 ), + [ 'bt_moved' => 0 ], __METHOD__ ); } @@ -278,26 +326,26 @@ class RecompressTracked { } while ( true ) { $res = $dbr->select( 'blob_tracking', - array( 'bt_page' ), - array( + [ 'bt_page' ], + [ 'bt_moved' => 0, 'bt_page > ' . $dbr->addQuotes( $startId ) - ), + ], __METHOD__, - array( + [ 'DISTINCT', 'ORDER BY' => 'bt_page', 'LIMIT' => $this->batchSize, - ) + ] ); if ( !$res->numRows() ) { break; } foreach ( $res as $row ) { + $startId = $row->bt_page; $this->dispatch( 'doPage', $row->bt_page ); $i++; } - $startId = $row->bt_page; $this->report( 'pages', $i, $numPages ); } $this->report( 'pages', $i, $numPages ); @@ -310,13 +358,16 @@ class RecompressTracked { /** * Display a progress report + * @param string $label + * @param int $current + * @param int $end */ function report( $label, $current, $end ) { $this->numBatches++; if ( $current == $end || $this->numBatches >= $this->reportingInterval ) { $this->numBatches = 0; $this->info( "$label: $current / $end" ); - $this->waitForSlaves(); + MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication(); } } @@ -324,7 +375,7 @@ class RecompressTracked { * Move all orphan text to the new clusters */ function doAllOrphans() { - $dbr = wfGetDB( DB_SLAVE ); + $dbr = wfGetDB( DB_REPLICA ); $startId = 0; $i = 0; if ( $this->noCount ) { @@ -332,7 +383,7 @@ class RecompressTracked { } else { $numOrphans = $dbr->selectField( 'blob_tracking', 'COUNT(DISTINCT bt_text_id)', - array( 'bt_moved' => 0, 'bt_page' => 0 ), + [ 'bt_moved' => 0, 'bt_page' => 0 ], __METHOD__ ); if ( !$numOrphans ) { return; @@ -346,24 +397,25 @@ class RecompressTracked { while ( true ) { $res = $dbr->select( 'blob_tracking', - array( 'bt_text_id' ), - array( + [ 'bt_text_id' ], + [ 'bt_moved' => 0, 'bt_page' => 0, 'bt_text_id > ' . $dbr->addQuotes( $startId ) - ), + ], __METHOD__, - array( + [ 'DISTINCT', 'ORDER BY' => 'bt_text_id', 'LIMIT' => $this->batchSize - ) + ] ); if ( !$res->numRows() ) { break; } - $ids = array(); + $ids = []; foreach ( $res as $row ) { + $startId = $row->bt_text_id; $ids[] = $row->bt_text_id; $i++; } @@ -374,15 +426,14 @@ class RecompressTracked { $args = array_slice( $ids, 0, $this->orphanBatchSize ); $ids = array_slice( $ids, $this->orphanBatchSize ); array_unshift( $args, 'doOrphanList' ); - call_user_func_array( array( $this, 'dispatch' ), $args ); + call_user_func_array( [ $this, 'dispatch' ], $args ); } if ( count( $ids ) ) { $args = $ids; array_unshift( $args, 'doOrphanList' ); - call_user_func_array( array( $this, 'dispatch' ), $args ); + call_user_func_array( [ $this, 'dispatch' ], $args ); } - $startId = $row->bt_text_id; $this->report( 'orphans', $i, $numOrphans ); } $this->report( 'orphans', $i, $numOrphans ); @@ -405,56 +456,59 @@ class RecompressTracked { $args = explode( ' ', $line ); $cmd = array_shift( $args ); switch ( $cmd ) { - case 'doPage': - $this->doPage( intval( $args[0] ) ); - break; - case 'doOrphanList': - $this->doOrphanList( array_map( 'intval', $args ) ); - break; - case 'quit': - return; + case 'doPage': + $this->doPage( intval( $args[0] ) ); + break; + case 'doOrphanList': + $this->doOrphanList( array_map( 'intval', $args ) ); + break; + case 'quit': + return; } - $this->waitForSlaves(); + MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication(); } } /** * Move tracked text in a given page + * + * @param int $pageId */ function doPage( $pageId ) { - $title = Title::newFromId( $pageId ); + $title = Title::newFromID( $pageId ); if ( $title ) { $titleText = $title->getPrefixedText(); } else { $titleText = '[deleted]'; } - $dbr = wfGetDB( DB_SLAVE ); + $dbr = wfGetDB( DB_REPLICA ); // Finish any incomplete transactions if ( !$this->copyOnly ) { - $this->finishIncompleteMoves( array( 'bt_page' => $pageId ) ); + $this->finishIncompleteMoves( [ 'bt_page' => $pageId ] ); $this->syncDBs(); } $startId = 0; $trx = new CgzCopyTransaction( $this, $this->pageBlobClass ); + $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); while ( true ) { $res = $dbr->select( - array( 'blob_tracking', 'text' ), + [ 'blob_tracking', 'text' ], '*', - array( + [ 'bt_page' => $pageId, 'bt_text_id > ' . $dbr->addQuotes( $startId ), 'bt_moved' => 0, 'bt_new_url IS NULL', 'bt_text_id=old_id', - ), + ], __METHOD__, - array( + [ 'ORDER BY' => 'bt_text_id', 'LIMIT' => $this->batchSize - ) + ] ); if ( !$res->numRows() ) { break; @@ -462,6 +516,7 @@ class RecompressTracked { $lastTextId = 0; foreach ( $res as $row ) { + $startId = $row->bt_text_id; if ( $lastTextId == $row->bt_text_id ) { // Duplicate (null edit) continue; @@ -479,10 +534,9 @@ class RecompressTracked { $this->debug( "$titleText: committing blob with " . $trx->getSize() . " items" ); $trx->commit(); $trx = new CgzCopyTransaction( $this, $this->pageBlobClass ); - $this->waitForSlaves(); + $lbFactory->waitForReplication(); } } - $startId = $row->bt_text_id; } $this->debug( "$titleText: committing blob with " . $trx->getSize() . " items" ); @@ -494,10 +548,13 @@ class RecompressTracked { * * Write the new URL to the text table and set the bt_moved flag. * - * This is done in a single transaction to provide restartable behaviour + * This is done in a single transaction to provide restartable behavior * without data loss. * * The transaction is kept short to reduce locking. + * + * @param int $textId + * @param string $url */ function moveTextRow( $textId, $url ) { if ( $this->copyOnly ) { @@ -505,23 +562,23 @@ class RecompressTracked { exit( 1 ); } $dbw = wfGetDB( DB_MASTER ); - $dbw->begin(); + $dbw->begin( __METHOD__ ); $dbw->update( 'text', - array( // set + [ // set 'old_text' => $url, 'old_flags' => 'external,utf-8', - ), - array( // where + ], + [ // where 'old_id' => $textId - ), + ], __METHOD__ ); $dbw->update( 'blob_tracking', - array( 'bt_moved' => 1 ), - array( 'bt_text_id' => $textId ), + [ 'bt_moved' => 1 ], + [ 'bt_text_id' => $textId ], __METHOD__ ); - $dbw->commit(); + $dbw->commit( __METHOD__ ); } /** @@ -531,80 +588,91 @@ class RecompressTracked { * * This function completes any moves that only have done bt_new_url. This * can happen when the script is interrupted, or when --copy-only is used. + * + * @param array $conds */ function finishIncompleteMoves( $conds ) { - $dbr = wfGetDB( DB_SLAVE ); + $dbr = wfGetDB( DB_REPLICA ); + $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); $startId = 0; - $conds = array_merge( $conds, array( + $conds = array_merge( $conds, [ 'bt_moved' => 0, 'bt_new_url IS NOT NULL' - ) ); + ] ); while ( true ) { $res = $dbr->select( 'blob_tracking', '*', - array_merge( $conds, array( 'bt_text_id > ' . $dbr->addQuotes( $startId ) ) ), + array_merge( $conds, [ 'bt_text_id > ' . $dbr->addQuotes( $startId ) ] ), __METHOD__, - array( + [ 'ORDER BY' => 'bt_text_id', 'LIMIT' => $this->batchSize, - ) + ] ); if ( !$res->numRows() ) { break; } $this->debug( 'Incomplete: ' . $res->numRows() . ' rows' ); foreach ( $res as $row ) { + $startId = $row->bt_text_id; $this->moveTextRow( $row->bt_text_id, $row->bt_new_url ); if ( $row->bt_text_id % 10 == 0 ) { - $this->waitForSlaves(); + $lbFactory->waitForReplication(); } } - $startId = $row->bt_text_id; } } /** * Returns the name of the next target cluster + * @return string */ function getTargetCluster() { $cluster = next( $this->destClusters ); if ( $cluster === false ) { $cluster = reset( $this->destClusters ); } + return $cluster; } /** * Gets a DB master connection for the given external cluster name + * @param string $cluster + * @return Database */ function getExtDB( $cluster ) { $lb = wfGetLBFactory()->getExternalLB( $cluster ); + return $lb->getConnection( DB_MASTER ); } /** * Move an orphan text_id to the new cluster + * + * @param array $textIds */ function doOrphanList( $textIds ) { // Finish incomplete moves if ( !$this->copyOnly ) { - $this->finishIncompleteMoves( array( 'bt_text_id' => $textIds ) ); + $this->finishIncompleteMoves( [ 'bt_text_id' => $textIds ] ); $this->syncDBs(); } $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass ); - $res = wfGetDB( DB_SLAVE )->select( - array( 'text', 'blob_tracking' ), - array( 'old_id', 'old_text', 'old_flags' ), - array( + $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + $res = wfGetDB( DB_REPLICA )->select( + [ 'text', 'blob_tracking' ], + [ 'old_id', 'old_text', 'old_flags' ], + [ 'old_id' => $textIds, 'bt_text_id=old_id', 'bt_moved' => 0, - ), + ], __METHOD__, - array( 'DISTINCT' ) + [ 'DISTINCT' ] ); foreach ( $res as $row ) { @@ -618,50 +686,43 @@ class RecompressTracked { $this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" ); $trx->commit(); $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass ); - $this->waitForSlaves(); + $lbFactory->waitForReplication(); } } $this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" ); $trx->commit(); } - - /** - * Wait for slaves (quietly) - */ - function waitForSlaves() { - $lb = wfGetLB(); - while ( true ) { - list( $host, $maxLag ) = $lb->getMaxLag(); - if ( $maxLag < 2 ) { - break; - } - sleep( 5 ); - } - } } /** * Class to represent a recompression operation for a single CGZ blob */ class CgzCopyTransaction { - var $parent; - var $blobClass; - var $cgz; - var $referrers; + /** @var RecompressTracked */ + public $parent; + public $blobClass; + /** @var ConcatenatedGzipHistoryBlob */ + public $cgz; + public $referrers; /** * Create a transaction from a RecompressTracked object + * @param RecompressTracked $parent + * @param string $blobClass */ function __construct( $parent, $blobClass ) { $this->blobClass = $blobClass; $this->cgz = false; - $this->texts = array(); + $this->texts = []; $this->parent = $parent; } /** * Add text. * Returns false if it's ready to commit. + * @param string $text + * @param int $textId + * @return bool */ function addItem( $text, $textId ) { if ( !$this->cgz ) { @@ -671,6 +732,7 @@ class CgzCopyTransaction { $hash = $this->cgz->addItem( $text ); $this->referrers[$textId] = $hash; $this->texts[$textId] = $text; + return $this->cgz->isHappy(); } @@ -684,7 +746,7 @@ class CgzCopyTransaction { function recompress() { $class = $this->blobClass; $this->cgz = new $class; - $this->referrers = array(); + $this->referrers = []; foreach ( $this->texts as $textId => $text ) { $hash = $this->cgz->addItem( $text ); $this->referrers[$textId] = $hash; @@ -702,19 +764,20 @@ class CgzCopyTransaction { return; } - // Check to see if the target text_ids have been moved already. - // - // We originally read from the slave, so this can happen when a single - // text_id is shared between multiple pages. It's rare, but possible - // if a delete/move/undelete cycle splits up a null edit. - // - // We do a locking read to prevent closer-run race conditions. + /* Check to see if the target text_ids have been moved already. + * + * We originally read from the replica DB, so this can happen when a single + * text_id is shared between multiple pages. It's rare, but possible + * if a delete/move/undelete cycle splits up a null edit. + * + * We do a locking read to prevent closer-run race conditions. + */ $dbw = wfGetDB( DB_MASTER ); - $dbw->begin(); + $dbw->begin( __METHOD__ ); $res = $dbw->select( 'blob_tracking', - array( 'bt_text_id', 'bt_moved' ), - array( 'bt_text_id' => array_keys( $this->referrers ) ), - __METHOD__, array( 'FOR UPDATE' ) ); + [ 'bt_text_id', 'bt_moved' ], + [ 'bt_text_id' => array_keys( $this->referrers ) ], + __METHOD__, [ 'FOR UPDATE' ] ); $dirty = false; foreach ( $res as $row ) { if ( $row->bt_moved ) { @@ -731,9 +794,11 @@ class CgzCopyTransaction { // All have been moved already if ( $originalCount > 1 ) { // This is suspcious, make noise - $this->critical( "Warning: concurrent operation detected, are there two conflicting " . + $this->parent->critical( + "Warning: concurrent operation detected, are there two conflicting " . "processes running, doing the same job?" ); } + return; } $this->recompress(); @@ -744,26 +809,26 @@ class CgzCopyTransaction { $store = $this->parent->store; $targetDB = $store->getMaster( $targetCluster ); $targetDB->clearFlag( DBO_TRX ); // we manage the transactions - $targetDB->begin(); + $targetDB->begin( __METHOD__ ); $baseUrl = $this->parent->store->store( $targetCluster, serialize( $this->cgz ) ); // Write the new URLs to the blob_tracking table foreach ( $this->referrers as $textId => $hash ) { $url = $baseUrl . '/' . $hash; $dbw->update( 'blob_tracking', - array( 'bt_new_url' => $url ), - array( + [ 'bt_new_url' => $url ], + [ 'bt_text_id' => $textId, 'bt_moved' => 0, # Check for concurrent conflicting update - ), + ], __METHOD__ ); } - $targetDB->commit(); + $targetDB->commit( __METHOD__ ); // Critical section here: interruption at this point causes blob duplication // Reversing the order of the commits would cause data loss instead - $dbw->commit(); + $dbw->commit( __METHOD__ ); // Write the new URLs to the text table and set the moved flag if ( !$this->parent->copyOnly ) { @@ -774,4 +839,3 @@ class CgzCopyTransaction { } } } -