X-Git-Url: https://scripts.mit.edu/gitweb/autoinstallsdev/mediawiki.git/blobdiff_plain/19e297c21b10b1b8a3acad5e73fc71dcb35db44a..6932310fd58ebef145fa01eb76edf7150284d8ea:/includes/libs/filebackend/FileOpBatch.php diff --git a/includes/libs/filebackend/FileOpBatch.php b/includes/libs/filebackend/FileOpBatch.php new file mode 100644 index 00000000..2324098d --- /dev/null +++ b/includes/libs/filebackend/FileOpBatch.php @@ -0,0 +1,204 @@ + self::MAX_BATCH_SIZE ) { + $status->fatal( 'backend-fail-batchsize', $n, self::MAX_BATCH_SIZE ); + + return $status; + } + + $batchId = $journal->getTimestampedUUID(); + $ignoreErrors = !empty( $opts['force'] ); + $journaled = empty( $opts['nonJournaled'] ); + $maxConcurrency = isset( $opts['concurrency'] ) ? $opts['concurrency'] : 1; + + $entries = []; // file journal entry list + $predicates = FileOp::newPredicates(); // account for previous ops in prechecks + $curBatch = []; // concurrent FileOp sub-batch accumulation + $curBatchDeps = FileOp::newDependencies(); // paths used in FileOp sub-batch + $pPerformOps = []; // ordered list of concurrent FileOp sub-batches + $lastBackend = null; // last op backend name + // Do pre-checks for each operation; abort on failure... + foreach ( $performOps as $index => $fileOp ) { + $backendName = $fileOp->getBackend()->getName(); + $fileOp->setBatchId( $batchId ); // transaction ID + // Decide if this op can be done concurrently within this sub-batch + // or if a new concurrent sub-batch must be started after this one... + if ( $fileOp->dependsOn( $curBatchDeps ) + || count( $curBatch ) >= $maxConcurrency + || ( $backendName !== $lastBackend && count( $curBatch ) ) + ) { + $pPerformOps[] = $curBatch; // push this batch + $curBatch = []; // start a new sub-batch + $curBatchDeps = FileOp::newDependencies(); + } + $lastBackend = $backendName; + $curBatch[$index] = $fileOp; // keep index + // Update list of affected paths in this batch + $curBatchDeps = $fileOp->applyDependencies( $curBatchDeps ); + // Simulate performing the operation... + $oldPredicates = $predicates; + $subStatus = $fileOp->precheck( $predicates ); // updates $predicates + $status->merge( $subStatus ); + if ( $subStatus->isOK() ) { + if ( $journaled ) { // journal log entries + $entries = array_merge( $entries, + $fileOp->getJournalEntries( $oldPredicates, $predicates ) ); + } + } else { // operation failed? + $status->success[$index] = false; + ++$status->failCount; + if ( !$ignoreErrors ) { + return $status; // abort + } + } + } + // Push the last sub-batch + if ( count( $curBatch ) ) { + $pPerformOps[] = $curBatch; + } + + // Log the operations in the file journal... + if ( count( $entries ) ) { + $subStatus = $journal->logChangeBatch( $entries, $batchId ); + if ( !$subStatus->isOK() ) { + $status->merge( $subStatus ); + + return $status; // abort + } + } + + if ( $ignoreErrors ) { // treat precheck() fatals as mere warnings + $status->setResult( true, $status->value ); + } + + // Attempt each operation (in parallel if allowed and possible)... + self::runParallelBatches( $pPerformOps, $status ); + + return $status; + } + + /** + * Attempt a list of file operations sub-batches in series. + * + * The operations *in* each sub-batch will be done in parallel. + * The caller is responsible for making sure the operations + * within any given sub-batch do not depend on each other. + * This will abort remaining ops on failure. + * + * @param array $pPerformOps Batches of file ops (batches use original indexes) + * @param StatusValue $status + */ + protected static function runParallelBatches( array $pPerformOps, StatusValue $status ) { + $aborted = false; // set to true on unexpected errors + foreach ( $pPerformOps as $performOpsBatch ) { + /** @var FileOp[] $performOpsBatch */ + if ( $aborted ) { // check batch op abort flag... + // We can't continue (even with $ignoreErrors) as $predicates is wrong. + // Log the remaining ops as failed for recovery... + foreach ( $performOpsBatch as $i => $fileOp ) { + $status->success[$i] = false; + ++$status->failCount; + $performOpsBatch[$i]->logFailure( 'attempt_aborted' ); + } + continue; + } + /** @var StatusValue[] $statuses */ + $statuses = []; + $opHandles = []; + // Get the backend; all sub-batch ops belong to a single backend + /** @var FileBackendStore $backend */ + $backend = reset( $performOpsBatch )->getBackend(); + // Get the operation handles or actually do it if there is just one. + // If attemptAsync() returns a StatusValue, it was either due to an error + // or the backend does not support async ops and did it synchronously. + foreach ( $performOpsBatch as $i => $fileOp ) { + if ( !isset( $status->success[$i] ) ) { // didn't already fail in precheck() + // Parallel ops may be disabled in config due to missing dependencies, + // (e.g. needing popen()). When they are, $performOpsBatch has size 1. + $subStatus = ( count( $performOpsBatch ) > 1 ) + ? $fileOp->attemptAsync() + : $fileOp->attempt(); + if ( $subStatus->value instanceof FileBackendStoreOpHandle ) { + $opHandles[$i] = $subStatus->value; // deferred + } else { + $statuses[$i] = $subStatus; // done already + } + } + } + // Try to do all the operations concurrently... + $statuses = $statuses + $backend->executeOpHandlesInternal( $opHandles ); + // Marshall and merge all the responses (blocking)... + foreach ( $performOpsBatch as $i => $fileOp ) { + if ( !isset( $status->success[$i] ) ) { // didn't already fail in precheck() + $subStatus = $statuses[$i]; + $status->merge( $subStatus ); + if ( $subStatus->isOK() ) { + $status->success[$i] = true; + ++$status->successCount; + } else { + $status->success[$i] = false; + ++$status->failCount; + $aborted = true; // set abort flag; we can't continue + } + } + } + } + } +}