* Submit jobs in batches and wait for each batch, to avoid overflowing the queue
[lhc/web/wiklou.git] / maintenance / gearman / gearman.inc
1 <?php
2
3 require( 'Net/Gearman/Client.php' );
4 require( 'Net/Gearman/Worker.php' );
5
6 class MWGearmanJob extends Net_Gearman_Job_Common {
7 function switchWiki( $wiki, $params ) {
8 echo "Switching to $wiki\n";
9
10 # Pretend that we have completed it right now, because the new process won't do it
11 $this->complete( array( 'result' => true ) );
12 socket_close( $this->conn );
13
14 # Find PHP
15 $php = readlink( '/proc/' . posix_getpid() . '/exe' );
16
17 # Run the worker script
18 $args = array( $_SERVER['PHP_SELF'],
19 '--wiki', $wiki,
20 '--fake-job', serialize( $params ) );
21 $args = array_merge( $args, $GLOBALS['args'] );
22 pcntl_exec( $php, $args, $_ENV );
23 echo "Error running exec\n";
24 }
25
26 function run( $params ) {
27 if ( wfWikiID() !== $params['wiki'] ) {
28 $this->switchWiki( $params['wiki'], $params );
29 }
30 return self::runNoSwitch( $params );
31 }
32
33 static function runNoSwitch( $params ) {
34 echo implode( ' ', $params ) . "\n";
35 $title = Title::newFromText( $params['title'] );
36 $mwJob = Job::factory( $params['command'], $title, $params['params'] );
37 return $mwJob->run();
38 }
39 }
40
41 class NonScaryGearmanWorker extends Net_Gearman_Worker {
42
43 /**
44 * Copied from Net_Gearman_Worker but with the scary "run any PHP file in
45 * the filesystem" feature removed.
46 */
47 protected function doWork($socket) {
48 Net_Gearman_Connection::send($socket, 'grab_job');
49
50 $resp = array('function' => 'noop');
51 while (count($resp) && $resp['function'] == 'noop') {
52 $resp = Net_Gearman_Connection::blockingRead($socket);
53 }
54
55 if (in_array($resp['function'], array('noop', 'no_job'))) {
56 return false;
57 }
58
59 if ($resp['function'] != 'job_assign') {
60 throw new Net_Gearman_Exception('Holy Cow! What are you doing?!');
61 }
62
63 $name = $resp['data']['func'];
64 $handle = $resp['data']['handle'];
65 $arg = array();
66
67 if (isset($resp['data']['arg']) &&
68 Net_Gearman_Connection::stringLength($resp['data']['arg'])) {
69 $arg = json_decode($resp['data']['arg'], true);
70 }
71
72 ### START MW DIFFERENT BIT
73 if ( $name != 'mw_job' ) {
74 throw new Net_Gearman_Job_Exception('Invalid function');
75 }
76 $job = new MWGearmanJob($socket, $handle);
77 ### END MW DIFFERENT BIT
78
79 try {
80 $this->start($handle, $name, $arg);
81 $res = $job->run($arg);
82 if (!is_array($res)) {
83 $res = array('result' => $res);
84 }
85
86 $job->complete($res);
87 $this->complete($handle, $name, $res);
88 } catch (Net_Gearman_Job_Exception $e) {
89 $job->fail();
90 $this->fail($handle, $name, $e);
91 }
92
93 // Force the job's destructor to run
94 $job = null;
95
96 return true;
97 }
98 }
99