]> scripts.mit.edu Git - autoinstallsdev/mediawiki.git/blob - includes/libs/objectcache/WANObjectCacheReaper.php
MediaWiki 1.30.2
[autoinstallsdev/mediawiki.git] / includes / libs / objectcache / WANObjectCacheReaper.php
1 <?php
2 /**
3  * This program is free software; you can redistribute it and/or modify
4  * it under the terms of the GNU General Public License as published by
5  * the Free Software Foundation; either version 2 of the License, or
6  * (at your option) any later version.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License along
14  * with this program; if not, write to the Free Software Foundation, Inc.,
15  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16  * http://www.gnu.org/copyleft/gpl.html
17  *
18  * @file
19  * @ingroup Cache
20  */
21
22 use Psr\Log\LoggerAwareInterface;
23 use Psr\Log\LoggerInterface;
24 use Psr\Log\NullLogger;
25 use Wikimedia\ScopedCallback;
26
27 /**
28  * Class for scanning through chronological, log-structured data or change logs
29  * and locally purging cache keys related to entities that appear in this data.
30  *
31  * This is useful for repairing cache when purges are missed by using a reliable
32  * stream, such as Kafka or a replicated MySQL table. Purge loss between datacenters
33  * is expected to be more common than within them.
34  *
35  * @since 1.28
36  */
37 class WANObjectCacheReaper implements LoggerAwareInterface {
38         /** @var WANObjectCache */
39         protected $cache;
40         /** @var BagOStuff */
41         protected $store;
42         /** @var callable */
43         protected $logChunkCallback;
44         /** @var callable */
45         protected $keyListCallback;
46         /** @var LoggerInterface */
47         protected $logger;
48
49         /** @var string */
50         protected $channel;
51         /** @var int */
52         protected $initialStartWindow;
53
54         /**
55          * @param WANObjectCache $cache Cache to reap bad keys from
56          * @param BagOStuff $store Cache to store positions use for locking
57          * @param callable $logCallback Callback taking arguments:
58          *          - The starting position as a UNIX timestamp
59          *          - The starting unique ID used for breaking timestamp collisions or null
60          *          - The ending position as a UNIX timestamp
61          *          - The maximum number of results to return
62          *        It returns a list of maps of (key: cache key, pos: UNIX timestamp, id: unique ID)
63          *        for each key affected, with the corrosponding event timestamp/ID information.
64          *        The events should be in ascending order, by (timestamp,id).
65          * @param callable $keyCallback Callback taking arguments:
66          *          - The WANObjectCache instance
67          *          - An object from the event log
68          *        It should return a list of WAN cache keys.
69          *        The callback must fully duck-type test the object, since can be any model class.
70          * @param array $params Additional options:
71          *          - channel: the name of the update event stream.
72          *            Default: WANObjectCache::DEFAULT_PURGE_CHANNEL.
73          *          - initialStartWindow: seconds back in time to start if the position is lost.
74          *            Default: 1 hour.
75          *          - logger: an SPL monolog instance [optional]
76          */
77         public function __construct(
78                 WANObjectCache $cache,
79                 BagOStuff $store,
80                 callable $logCallback,
81                 callable $keyCallback,
82                 array $params
83         ) {
84                 $this->cache = $cache;
85                 $this->store = $store;
86
87                 $this->logChunkCallback = $logCallback;
88                 $this->keyListCallback = $keyCallback;
89                 if ( isset( $params['channel'] ) ) {
90                         $this->channel = $params['channel'];
91                 } else {
92                         throw new UnexpectedValueException( "No channel specified." );
93                 }
94
95                 $this->initialStartWindow = isset( $params['initialStartWindow'] )
96                         ? $params['initialStartWindow']
97                         : 3600;
98                 $this->logger = isset( $params['logger'] )
99                         ? $params['logger']
100                         : new NullLogger();
101         }
102
103         public function setLogger( LoggerInterface $logger ) {
104                 $this->logger = $logger;
105         }
106
107         /**
108          * Check and reap stale keys based on a chunk of events
109          *
110          * @param int $n Number of events
111          * @return int Number of keys checked
112          */
113         final public function invoke( $n = 100 ) {
114                 $posKey = $this->store->makeGlobalKey( 'WANCache', 'reaper', $this->channel );
115                 $scopeLock = $this->store->getScopedLock( "$posKey:busy", 0 );
116                 if ( !$scopeLock ) {
117                         return 0;
118                 }
119
120                 $now = time();
121                 $status = $this->store->get( $posKey );
122                 if ( !$status ) {
123                         $status = [ 'pos' => $now - $this->initialStartWindow, 'id' => null ];
124                 }
125
126                 // Get events for entities who's keys tombstones/hold-off should have expired by now
127                 $events = call_user_func_array(
128                         $this->logChunkCallback,
129                         [ $status['pos'], $status['id'], $now - WANObjectCache::HOLDOFF_TTL - 1, $n ]
130                 );
131
132                 $event = null;
133                 $keyEvents = [];
134                 foreach ( $events as $event ) {
135                         $keys = call_user_func_array(
136                                 $this->keyListCallback,
137                                 [ $this->cache, $event['item'] ]
138                         );
139                         foreach ( $keys as $key ) {
140                                 unset( $keyEvents[$key] ); // use only the latest per key
141                                 $keyEvents[$key] = [
142                                         'pos' => $event['pos'],
143                                         'id' => $event['id']
144                                 ];
145                         }
146                 }
147
148                 $purgeCount = 0;
149                 $lastOkEvent = null;
150                 foreach ( $keyEvents as $key => $keyEvent ) {
151                         if ( !$this->cache->reap( $key, $keyEvent['pos'] ) ) {
152                                 break;
153                         }
154                         ++$purgeCount;
155                         $lastOkEvent = $event;
156                 }
157
158                 if ( $lastOkEvent ) {
159                         $ok = $this->store->merge(
160                                 $posKey,
161                                 function ( $bag, $key, $curValue ) use ( $lastOkEvent ) {
162                                         if ( !$curValue ) {
163                                                 // Use new position
164                                         } else {
165                                                 $curCoord = [ $curValue['pos'], $curValue['id'] ];
166                                                 $newCoord = [ $lastOkEvent['pos'], $lastOkEvent['id'] ];
167                                                 if ( $newCoord < $curCoord ) {
168                                                         // Keep prior position instead of rolling it back
169                                                         return $curValue;
170                                                 }
171                                         }
172
173                                         return [
174                                                 'pos' => $lastOkEvent['pos'],
175                                                 'id' => $lastOkEvent['id'],
176                                                 'ctime' => $curValue ? $curValue['ctime'] : date( 'c' )
177                                         ];
178                                 },
179                                 IExpiringStore::TTL_INDEFINITE
180                         );
181
182                         $pos = $lastOkEvent['pos'];
183                         $id = $lastOkEvent['id'];
184                         if ( $ok ) {
185                                 $this->logger->info( "Updated cache reap position ($pos, $id)." );
186                         } else {
187                                 $this->logger->error( "Could not update cache reap position ($pos, $id)." );
188                         }
189                 }
190
191                 ScopedCallback::consume( $scopeLock );
192
193                 return $purgeCount;
194         }
195
196         /**
197          * @return array|bool Returns (pos, id) map or false if not set
198          */
199         public function getState() {
200                 $posKey = $this->store->makeGlobalKey( 'WANCache', 'reaper', $this->channel );
201
202                 return $this->store->get( $posKey );
203         }
204 }