]> scripts.mit.edu Git - autoinstallsdev/mediawiki.git/blobdiff - includes/libs/objectcache/WANObjectCacheReaper.php
MediaWiki 1.30.2
[autoinstallsdev/mediawiki.git] / includes / libs / objectcache / WANObjectCacheReaper.php
diff --git a/includes/libs/objectcache/WANObjectCacheReaper.php b/includes/libs/objectcache/WANObjectCacheReaper.php
new file mode 100644 (file)
index 0000000..14737b1
--- /dev/null
@@ -0,0 +1,204 @@
+<?php
+/**
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup Cache
+ */
+
+use Psr\Log\LoggerAwareInterface;
+use Psr\Log\LoggerInterface;
+use Psr\Log\NullLogger;
+use Wikimedia\ScopedCallback;
+
+/**
+ * Class for scanning through chronological, log-structured data or change logs
+ * and locally purging cache keys related to entities that appear in this data.
+ *
+ * This is useful for repairing cache when purges are missed by using a reliable
+ * stream, such as Kafka or a replicated MySQL table. Purge loss between datacenters
+ * is expected to be more common than within them.
+ *
+ * @since 1.28
+ */
+class WANObjectCacheReaper implements LoggerAwareInterface {
+       /** @var WANObjectCache */
+       protected $cache;
+       /** @var BagOStuff */
+       protected $store;
+       /** @var callable */
+       protected $logChunkCallback;
+       /** @var callable */
+       protected $keyListCallback;
+       /** @var LoggerInterface */
+       protected $logger;
+
+       /** @var string */
+       protected $channel;
+       /** @var int */
+       protected $initialStartWindow;
+
+       /**
+        * @param WANObjectCache $cache Cache to reap bad keys from
+        * @param BagOStuff $store Cache to store positions use for locking
+        * @param callable $logCallback Callback taking arguments:
+        *          - The starting position as a UNIX timestamp
+        *          - The starting unique ID used for breaking timestamp collisions or null
+        *          - The ending position as a UNIX timestamp
+        *          - The maximum number of results to return
+        *        It returns a list of maps of (key: cache key, pos: UNIX timestamp, id: unique ID)
+        *        for each key affected, with the corrosponding event timestamp/ID information.
+        *        The events should be in ascending order, by (timestamp,id).
+        * @param callable $keyCallback Callback taking arguments:
+        *          - The WANObjectCache instance
+        *          - An object from the event log
+        *        It should return a list of WAN cache keys.
+        *        The callback must fully duck-type test the object, since can be any model class.
+        * @param array $params Additional options:
+        *          - channel: the name of the update event stream.
+        *            Default: WANObjectCache::DEFAULT_PURGE_CHANNEL.
+        *          - initialStartWindow: seconds back in time to start if the position is lost.
+        *            Default: 1 hour.
+        *          - logger: an SPL monolog instance [optional]
+        */
+       public function __construct(
+               WANObjectCache $cache,
+               BagOStuff $store,
+               callable $logCallback,
+               callable $keyCallback,
+               array $params
+       ) {
+               $this->cache = $cache;
+               $this->store = $store;
+
+               $this->logChunkCallback = $logCallback;
+               $this->keyListCallback = $keyCallback;
+               if ( isset( $params['channel'] ) ) {
+                       $this->channel = $params['channel'];
+               } else {
+                       throw new UnexpectedValueException( "No channel specified." );
+               }
+
+               $this->initialStartWindow = isset( $params['initialStartWindow'] )
+                       ? $params['initialStartWindow']
+                       : 3600;
+               $this->logger = isset( $params['logger'] )
+                       ? $params['logger']
+                       : new NullLogger();
+       }
+
+       public function setLogger( LoggerInterface $logger ) {
+               $this->logger = $logger;
+       }
+
+       /**
+        * Check and reap stale keys based on a chunk of events
+        *
+        * @param int $n Number of events
+        * @return int Number of keys checked
+        */
+       final public function invoke( $n = 100 ) {
+               $posKey = $this->store->makeGlobalKey( 'WANCache', 'reaper', $this->channel );
+               $scopeLock = $this->store->getScopedLock( "$posKey:busy", 0 );
+               if ( !$scopeLock ) {
+                       return 0;
+               }
+
+               $now = time();
+               $status = $this->store->get( $posKey );
+               if ( !$status ) {
+                       $status = [ 'pos' => $now - $this->initialStartWindow, 'id' => null ];
+               }
+
+               // Get events for entities who's keys tombstones/hold-off should have expired by now
+               $events = call_user_func_array(
+                       $this->logChunkCallback,
+                       [ $status['pos'], $status['id'], $now - WANObjectCache::HOLDOFF_TTL - 1, $n ]
+               );
+
+               $event = null;
+               $keyEvents = [];
+               foreach ( $events as $event ) {
+                       $keys = call_user_func_array(
+                               $this->keyListCallback,
+                               [ $this->cache, $event['item'] ]
+                       );
+                       foreach ( $keys as $key ) {
+                               unset( $keyEvents[$key] ); // use only the latest per key
+                               $keyEvents[$key] = [
+                                       'pos' => $event['pos'],
+                                       'id' => $event['id']
+                               ];
+                       }
+               }
+
+               $purgeCount = 0;
+               $lastOkEvent = null;
+               foreach ( $keyEvents as $key => $keyEvent ) {
+                       if ( !$this->cache->reap( $key, $keyEvent['pos'] ) ) {
+                               break;
+                       }
+                       ++$purgeCount;
+                       $lastOkEvent = $event;
+               }
+
+               if ( $lastOkEvent ) {
+                       $ok = $this->store->merge(
+                               $posKey,
+                               function ( $bag, $key, $curValue ) use ( $lastOkEvent ) {
+                                       if ( !$curValue ) {
+                                               // Use new position
+                                       } else {
+                                               $curCoord = [ $curValue['pos'], $curValue['id'] ];
+                                               $newCoord = [ $lastOkEvent['pos'], $lastOkEvent['id'] ];
+                                               if ( $newCoord < $curCoord ) {
+                                                       // Keep prior position instead of rolling it back
+                                                       return $curValue;
+                                               }
+                                       }
+
+                                       return [
+                                               'pos' => $lastOkEvent['pos'],
+                                               'id' => $lastOkEvent['id'],
+                                               'ctime' => $curValue ? $curValue['ctime'] : date( 'c' )
+                                       ];
+                               },
+                               IExpiringStore::TTL_INDEFINITE
+                       );
+
+                       $pos = $lastOkEvent['pos'];
+                       $id = $lastOkEvent['id'];
+                       if ( $ok ) {
+                               $this->logger->info( "Updated cache reap position ($pos, $id)." );
+                       } else {
+                               $this->logger->error( "Could not update cache reap position ($pos, $id)." );
+                       }
+               }
+
+               ScopedCallback::consume( $scopeLock );
+
+               return $purgeCount;
+       }
+
+       /**
+        * @return array|bool Returns (pos, id) map or false if not set
+        */
+       public function getState() {
+               $posKey = $this->store->makeGlobalKey( 'WANCache', 'reaper', $this->channel );
+
+               return $this->store->get( $posKey );
+       }
+}