Proof of concept parallel processing with Danga Gearman and PEAR Net_Gearman.
authorTim Starling <tstarling@users.mediawiki.org>
Wed, 25 Feb 2009 13:04:23 +0000 (13:04 +0000)
committerTim Starling <tstarling@users.mediawiki.org>
Wed, 25 Feb 2009 13:04:23 +0000 (13:04 +0000)
maintenance/gearman/gearman.inc [new file with mode: 0644]
maintenance/gearman/gearmanRefreshLinks.php [new file with mode: 0644]
maintenance/gearman/gearmanWorker.php [new file with mode: 0644]

diff --git a/maintenance/gearman/gearman.inc b/maintenance/gearman/gearman.inc
new file mode 100644 (file)
index 0000000..cafef90
--- /dev/null
@@ -0,0 +1,91 @@
+<?php
+
+require( 'Net/Gearman/Client.php' );
+require( 'Net/Gearman/Worker.php' );
+
+class MWGearmanJob extends Net_Gearman_Job_Common {
+       function switchWiki( $wiki, $jobParams ) {
+               echo "Switching to $wiki\n";
+               $php = readlink( '/proc/' . posix_getpid() . '/exe' );
+               $args = array( $_SERVER['PHP_SELF'], 
+                       '--wiki', $wiki,
+                       '--fake-job', serialize( $jobParams ) );
+               $args = array_merge( $args, $GLOBALS['args'] );
+               pcntl_exec( $php, $args, $_ENV );
+               echo "Error running exec\n";
+       }
+
+       function run( $params ) {
+               if ( wfWikiID() !== $params['wiki'] ) {
+                       $this->switchWiki( $params['wiki'], $params );
+               }
+               self::runNoSwitch( $params );
+       }
+
+       static function runNoSwitch( $params ) {
+               echo implode( ' ', $params ) . "\n";
+               $title = Title::newFromText( $params['title'] );
+               $mwJob = Job::factory( $params['command'], $title, $params['params'] );
+               return $mwJob->run();
+       }
+}
+
+class NonScaryGearmanWorker extends Net_Gearman_Worker {
+       
+       /**
+        * Copied from Net_Gearman_Worker but with the scary "run any PHP file in 
+        * the filesystem" feature removed.
+        */
+       protected function doWork($socket) {
+               Net_Gearman_Connection::send($socket, 'grab_job');
+
+               $resp = array('function' => 'noop');
+               while (count($resp) && $resp['function'] == 'noop') {
+                       $resp = Net_Gearman_Connection::blockingRead($socket);
+               } 
+
+               if (in_array($resp['function'], array('noop', 'no_job'))) {
+                       return false;
+               }
+
+               if ($resp['function'] != 'job_assign') {
+                       throw new Net_Gearman_Exception('Holy Cow! What are you doing?!');
+               }
+
+               $name   = $resp['data']['func'];
+               $handle = $resp['data']['handle'];
+               $arg    = array();
+
+               if (isset($resp['data']['arg']) && 
+                       Net_Gearman_Connection::stringLength($resp['data']['arg'])) {
+                               $arg = json_decode($resp['data']['arg'], true);
+                       }
+
+               ### START MW DIFFERENT BIT
+               if ( $name != 'mw_job' ) {
+                       throw new Net_Gearman_Job_Exception('Invalid function');
+               }
+               $job = new MWGearmanJob($socket, $handle);
+               ### END MW DIFFERENT BIT
+
+               try {
+                       $this->start($handle, $name, $arg);
+                       $res = $job->run($arg); 
+                       if (!is_array($res)) {
+                               $res = array('result' => $res);
+                       }
+
+                       $job->complete($res);
+                       $this->complete($handle, $name, $res);
+               } catch (Net_Gearman_Job_Exception $e) {
+                       $job->fail(); 
+                       $this->fail($handle, $name, $e); 
+               }
+
+               // Force the job's destructor to run
+               $job = null;
+
+               return true;
+       }
+}
+
diff --git a/maintenance/gearman/gearmanRefreshLinks.php b/maintenance/gearman/gearmanRefreshLinks.php
new file mode 100644 (file)
index 0000000..dc11fb4
--- /dev/null
@@ -0,0 +1,26 @@
+<?php
+
+$optionsWithArgs = array( 'fake-job' );
+
+require( dirname(__FILE__).'/../commandLine.inc' );
+require( dirname(__FILE__).'/gearman.inc' );
+
+if ( !$args ) {
+       $args = array( 'localhost' );
+}
+$client = new Net_Gearman_Client( $args );
+
+$dbr = wfGetDB( DB_SLAVE );
+$res = $dbr->select( 'page', array( 'page_namespace', 'page_title' ), false,
+       __METHOD__, array( 'LIMIT' => 2 ) );
+foreach ( $res as $row ) {
+       $title = Title::makeTitle( $row->page_namespace, $row->page_title );
+       $params = array(
+               'wiki' => wfWikiID(),
+               'title' => $title->getPrefixedDBkey(),
+               'command' => 'refreshLinks',
+               'params' => false,
+       );
+       $client->mw_job( $params );
+}
+
diff --git a/maintenance/gearman/gearmanWorker.php b/maintenance/gearman/gearmanWorker.php
new file mode 100644 (file)
index 0000000..1e2f384
--- /dev/null
@@ -0,0 +1,19 @@
+<?php
+
+$optionsWithArgs = array( 'fake-job' );
+require( dirname(__FILE__).'/../commandLine.inc' );
+require( dirname(__FILE__).'/gearman.inc' );
+
+if ( !$args ) {
+       $args = array( 'localhost' );
+}
+
+if ( isset( $options['fake-job'] ) ) {
+       $params = unserialize( $options['fake-job'] );
+       MWGearmanJob::runNoSwitch( $params );
+}
+
+$worker = new NonScaryGearmanWorker( $args );
+$worker->addAbility( 'mw_job' );
+$worker->beginWork();
+