]> scripts.mit.edu Git - autoinstalls/mediawiki.git/blob - includes/OrderedStreamingForkController.php
MediaWiki 1.30.2-scripts2
[autoinstalls/mediawiki.git] / includes / OrderedStreamingForkController.php
1 <?php
2
3 /**
4  * Reads lines of work from an input stream and farms them out to multiple
5  * child streams. Each child has exactly one piece of work in flight at a given
6  * moment. Writes the result of child's work to an output stream. If numProcs
7  * <= zero the work will be performed in process.
8  *
9  * This class amends ForkController with the requirement that the output is
10  * produced in the same exact order as input values were.
11  *
12  * Currently used by CirrusSearch extension to implement CLI search script.
13  *
14  * @ingroup Maintenance
15  * @since 1.30
16  *
17  * This program is free software; you can redistribute it and/or modify
18  * it under the terms of the GNU General Public License as published by
19  * the Free Software Foundation; either version 2 of the License, or
20  * (at your option) any later version.
21  *
22  * This program is distributed in the hope that it will be useful,
23  * but WITHOUT ANY WARRANTY; without even the implied warranty of
24  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25  * GNU General Public License for more details.
26  *
27  * You should have received a copy of the GNU General Public License along
28  * with this program; if not, write to the Free Software Foundation, Inc.,
29  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
30  * http://www.gnu.org/copyleft/gpl.html
31  */
32 class OrderedStreamingForkController extends ForkController {
33         /** @var callable */
34         protected $workCallback;
35         /** @var resource */
36         protected $input;
37         /** @var resource */
38         protected $output;
39         /** @var int */
40         protected $nextOutputId;
41         /** @var string[] Int key indicates order, value is data */
42         protected $delayedOutputData = [];
43
44         /**
45          * @param int $numProcs The number of worker processes to fork
46          * @param callable $workCallback A callback to call in the child process
47          *  once for each line of work to process.
48          * @param resource $input A socket to read work lines from
49          * @param resource $output A socket to write the result of work to.
50          */
51         public function __construct( $numProcs, $workCallback, $input, $output ) {
52                 parent::__construct( $numProcs );
53                 $this->workCallback = $workCallback;
54                 $this->input = $input;
55                 $this->output = $output;
56         }
57
58         /**
59          * @inheritDoc
60          */
61         public function start() {
62                 if ( $this->procsToStart > 0 ) {
63                         $status = parent::start();
64                         if ( $status === 'child' ) {
65                                 $this->consume();
66                         }
67                 } else {
68                         $status = 'parent';
69                         $this->consumeNoFork();
70                 }
71                 return $status;
72         }
73
74         /**
75          * @param int $numProcs
76          * @return string
77          */
78         protected function forkWorkers( $numProcs ) {
79                 $this->prepareEnvironment();
80
81                 $childSockets = [];
82                 // Create the child processes
83                 for ( $i = 0; $i < $numProcs; $i++ ) {
84                         $sockets = stream_socket_pair( STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP );
85                         // Do the fork
86                         $pid = pcntl_fork();
87                         if ( $pid === -1 || $pid === false ) {
88                                 echo "Error creating child processes\n";
89                                 exit( 1 );
90                         }
91
92                         if ( !$pid ) {
93                                 $this->initChild();
94                                 $this->childNumber = $i;
95                                 $this->input = $sockets[0];
96                                 $this->output = $sockets[0];
97                                 fclose( $sockets[1] );
98                                 return 'child';
99                         } else {
100                                 // This is the parent process
101                                 $this->children[$pid] = true;
102                                 fclose( $sockets[0] );
103                                 $childSockets[] = $sockets[1];
104                         }
105                 }
106                 $this->feedChildren( $childSockets );
107                 foreach ( $childSockets as $socket ) {
108                         // if a child has already shutdown the sockets will be closed,
109                         // closing a second time would raise a warning.
110                         if ( is_resource( $socket ) ) {
111                                 fclose( $socket );
112                         }
113                 }
114                 return 'parent';
115         }
116
117         /**
118          * Child worker process. Reads work from $this->input and writes the
119          * result of that work to $this->output when completed.
120          */
121         protected function consume() {
122                 while ( !feof( $this->input ) ) {
123                         $line = trim( fgets( $this->input ) );
124                         if ( $line ) {
125                                 list( $id, $data ) = json_decode( $line );
126                                 $result = call_user_func( $this->workCallback, $data );
127                                 fwrite( $this->output, json_encode( [ $id, $result ] ) . "\n" );
128                         }
129                 }
130         }
131
132         /**
133          * Special cased version of self::consume() when no forking occurs
134          */
135         protected function consumeNoFork() {
136                 while ( !feof( $this->input ) ) {
137                         $line = trim( fgets( $this->input ) );
138                         if ( $line ) {
139                                 $result = call_user_func( $this->workCallback, $line );
140                                 fwrite( $this->output, "$result\n" );
141                         }
142                 }
143         }
144
145         /**
146          * Reads lines of work from $this->input and farms them out to
147          * the provided socket.
148          *
149          * @param resource[] $sockets
150          */
151         protected function feedChildren( array $sockets ) {
152                 $used = [];
153                 $id = 0;
154                 $this->nextOutputId = 0;
155
156                 while ( !feof( $this->input ) ) {
157                         $data = fgets( $this->input );
158                         if ( $used ) {
159                                 do {
160                                         $this->updateAvailableSockets( $sockets, $used, $sockets ? 0 : 5 );
161                                 } while ( !$sockets );
162                         }
163                         $data = trim( $data );
164                         if ( !$data ) {
165                                 continue;
166                         }
167                         $socket = array_pop( $sockets );
168                         fwrite( $socket, json_encode( [ $id++, $data ] ) . "\n" );
169                         $used[] = $socket;
170                 }
171                 while ( $used ) {
172                         $this->updateAvailableSockets( $sockets, $used, 5 );
173                 }
174         }
175
176         /**
177          * Moves sockets from $used to $sockets when they are available
178          * for more work
179          *
180          * @param resource[] &$sockets List of sockets that are waiting for work
181          * @param resource[] &$used List of sockets currently performing work
182          * @param int $timeout The number of seconds to block waiting. 0 for
183          *  non-blocking operation.
184          */
185         protected function updateAvailableSockets( &$sockets, &$used, $timeout ) {
186                 $read = $used;
187                 $write = $except = [];
188                 stream_select( $read, $write, $except, $timeout );
189                 foreach ( $read as $socket ) {
190                         $line = fgets( $socket );
191                         list( $id, $data ) = json_decode( trim( $line ) );
192                         $this->receive( (int)$id, $data );
193                         $sockets[] = $socket;
194                         $idx = array_search( $socket, $used );
195                         unset( $used[$idx] );
196                 }
197         }
198
199         /**
200          * @param int $id
201          * @param string $data
202          */
203         protected function receive( $id, $data ) {
204                 if ( $id !== $this->nextOutputId ) {
205                         $this->delayedOutputData[$id] = $data;
206                         return;
207                 }
208                 fwrite( $this->output, $data . "\n" );
209                 $this->nextOutputId = $id + 1;
210                 while ( isset( $this->delayedOutputData[$this->nextOutputId] ) ) {
211                         fwrite( $this->output, $this->delayedOutputData[$this->nextOutputId] . "\n" );
212                         unset( $this->delayedOutputData[$this->nextOutputId] );
213                         $this->nextOutputId++;
214                 }
215         }
216 }