Proof of concept parallel processing with Danga Gearman and PEAR Net_Gearman.
[lhc/web/wiklou.git] / maintenance / gearman / gearman.inc
1 <?php
2
3 require( 'Net/Gearman/Client.php' );
4 require( 'Net/Gearman/Worker.php' );
5
6 class MWGearmanJob extends Net_Gearman_Job_Common {
7 function switchWiki( $wiki, $jobParams ) {
8 echo "Switching to $wiki\n";
9 $php = readlink( '/proc/' . posix_getpid() . '/exe' );
10 $args = array( $_SERVER['PHP_SELF'],
11 '--wiki', $wiki,
12 '--fake-job', serialize( $jobParams ) );
13 $args = array_merge( $args, $GLOBALS['args'] );
14 pcntl_exec( $php, $args, $_ENV );
15 echo "Error running exec\n";
16 }
17
18 function run( $params ) {
19 if ( wfWikiID() !== $params['wiki'] ) {
20 $this->switchWiki( $params['wiki'], $params );
21 }
22 self::runNoSwitch( $params );
23 }
24
25 static function runNoSwitch( $params ) {
26 echo implode( ' ', $params ) . "\n";
27 $title = Title::newFromText( $params['title'] );
28 $mwJob = Job::factory( $params['command'], $title, $params['params'] );
29 return $mwJob->run();
30 }
31 }
32
33 class NonScaryGearmanWorker extends Net_Gearman_Worker {
34
35 /**
36 * Copied from Net_Gearman_Worker but with the scary "run any PHP file in
37 * the filesystem" feature removed.
38 */
39 protected function doWork($socket) {
40 Net_Gearman_Connection::send($socket, 'grab_job');
41
42 $resp = array('function' => 'noop');
43 while (count($resp) && $resp['function'] == 'noop') {
44 $resp = Net_Gearman_Connection::blockingRead($socket);
45 }
46
47 if (in_array($resp['function'], array('noop', 'no_job'))) {
48 return false;
49 }
50
51 if ($resp['function'] != 'job_assign') {
52 throw new Net_Gearman_Exception('Holy Cow! What are you doing?!');
53 }
54
55 $name = $resp['data']['func'];
56 $handle = $resp['data']['handle'];
57 $arg = array();
58
59 if (isset($resp['data']['arg']) &&
60 Net_Gearman_Connection::stringLength($resp['data']['arg'])) {
61 $arg = json_decode($resp['data']['arg'], true);
62 }
63
64 ### START MW DIFFERENT BIT
65 if ( $name != 'mw_job' ) {
66 throw new Net_Gearman_Job_Exception('Invalid function');
67 }
68 $job = new MWGearmanJob($socket, $handle);
69 ### END MW DIFFERENT BIT
70
71 try {
72 $this->start($handle, $name, $arg);
73 $res = $job->run($arg);
74 if (!is_array($res)) {
75 $res = array('result' => $res);
76 }
77
78 $job->complete($res);
79 $this->complete($handle, $name, $res);
80 } catch (Net_Gearman_Job_Exception $e) {
81 $job->fail();
82 $this->fail($handle, $name, $e);
83 }
84
85 // Force the job's destructor to run
86 $job = null;
87
88 return true;
89 }
90 }
91