Create JobQueueEnqueueUpdate class to call JobQueueGroup::pushLazyJobs()
authorAaron Schulz <aschulz@wikimedia.org>
Wed, 24 Oct 2018 19:28:02 +0000 (12:28 -0700)
committerAaron Schulz <aschulz@wikimedia.org>
Sun, 28 Oct 2018 22:19:06 +0000 (22:19 +0000)
This assures that MergeableUpdate tasks that lazy push job will actually
have those jobs run instead of being added after the lone callback update
to call JobQueueGroup::pushLazyJobs() already ran.

This also makes it more obvious that push will happen, since a mergeable
update is added each time lazyPush() is called and a job is buffered,
rather than rely on some magic callback enqueued into DeferredUpdates at
just the right point in multiple entry points.

Bug: T207809
Change-Id: I13382ef4a17a9ba0fd3f9964b8c62f564e47e42d

autoload.php
includes/MediaWiki.php
includes/deferred/JobQueueEnqueueUpdate.php [new file with mode: 0644]
includes/jobqueue/JobQueueGroup.php
includes/jobqueue/JobRunner.php

index f951ce9..6261f1a 100644 (file)
@@ -706,6 +706,7 @@ $wgAutoloadLocalClasses = [
        'JobQueueAggregatorRedis' => __DIR__ . '/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php',
        'JobQueueConnectionError' => __DIR__ . '/includes/jobqueue/JobQueue.php',
        'JobQueueDB' => __DIR__ . '/includes/jobqueue/JobQueueDB.php',
+       'JobQueueEnqueueUpdate' => __DIR__ . '/includes/deferred/JobQueueEnqueueUpdate.php',
        'JobQueueError' => __DIR__ . '/includes/jobqueue/JobQueue.php',
        'JobQueueFederated' => __DIR__ . '/includes/jobqueue/JobQueueFederated.php',
        'JobQueueGroup' => __DIR__ . '/includes/jobqueue/JobQueueGroup.php',
index bc57637..4b84fbd 100644 (file)
@@ -899,9 +899,6 @@ class MediaWiki {
                        __METHOD__
                );
 
-               // Important: this must be the last deferred update added (T100085, T154425)
-               DeferredUpdates::addCallableUpdate( [ JobQueueGroup::class, 'pushLazyJobs' ] );
-
                // Do any deferred jobs; preferring to run them now if a client will not wait on them
                DeferredUpdates::doUpdates( $blocksHttpClient ? 'enqueue' : 'run' );
 
diff --git a/includes/deferred/JobQueueEnqueueUpdate.php b/includes/deferred/JobQueueEnqueueUpdate.php
new file mode 100644 (file)
index 0000000..1691da2
--- /dev/null
@@ -0,0 +1,64 @@
+<?php
+/**
+ * Handler for triggering the enqueuing of lazy-pushed jobs
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+
+use Wikimedia\Assert\Assert;
+
+/**
+ * Enqueue lazy-pushed jobs that have accumulated from JobQueueGroup
+ *
+ * @ingroup JobQueue
+ * @since 1.33
+ */
+class JobQueueEnqueueUpdate implements DeferrableUpdate, MergeableUpdate {
+       /** @var array[] Map of (domain ID => IJobSpecification[]) */
+       private $jobsByDomain;
+
+       /**
+        * @param string $domain DB domain ID
+        * @param IJobSpecification[] $jobs
+        */
+       public function __construct( $domain, array $jobs ) {
+               $this->jobsByDomain[$domain] = $jobs;
+       }
+
+       public function merge( MergeableUpdate $update ) {
+               /** @var JobQueueEnqueueUpdate $update */
+               Assert::parameterType( __CLASS__, $update, '$update' );
+
+               foreach ( $update->jobsByDomain as $domain => $jobs ) {
+                       $this->jobsByDomain[$domain] = $this->jobsByDomain[$domain] ?? [];
+                       $this->jobsByDomain[$domain] = array_merge( $this->jobsByDomain[$domain], $jobs );
+               }
+       }
+
+       public function doUpdate() {
+               foreach ( $this->jobsByDomain as $domain => $jobs ) {
+                       $group = JobQueueGroup::singleton( $domain );
+                       try {
+                               $group->push( $jobs );
+                       } catch ( Exception $e ) {
+                               // Get in as many jobs as possible and let other post-send updates happen
+                               MWExceptionHandler::logException( $e );
+                       }
+               }
+       }
+}
index 820c492..06646a5 100644 (file)
@@ -43,9 +43,6 @@ class JobQueueGroup {
        /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */
        protected $coalescedQueues;
 
-       /** @var Job[] */
-       protected $bufferedJobs = [];
-
        const TYPE_DEFAULT = 1; // integer; jobs popped by default
        const TYPE_ANY = 2; // integer; any job
 
@@ -203,7 +200,7 @@ class JobQueueGroup {
                // Throw errors now instead of on push(), when other jobs may be buffered
                $this->assertValidJobs( $jobs );
 
-               $this->bufferedJobs = array_merge( $this->bufferedJobs, $jobs );
+               DeferredUpdates::addUpdate( new JobQueueEnqueueUpdate( $this->wiki, $jobs ) );
        }
 
        /**
@@ -211,17 +208,10 @@ class JobQueueGroup {
         *
         * @return void
         * @since 1.26
+        * @deprecated Since 1.33 Not needed anymore
         */
        public static function pushLazyJobs() {
-               foreach ( self::$instances as $group ) {
-                       try {
-                               $group->push( $group->bufferedJobs );
-                               $group->bufferedJobs = [];
-                       } catch ( Exception $e ) {
-                               // Get in as many jobs as possible and let other post-send updates happen
-                               MWExceptionHandler::logException( $e );
-                       }
-               }
+               wfDeprecated( __METHOD__, '1.33' );
        }
 
        /**
@@ -464,12 +454,4 @@ class JobQueueGroup {
                        }
                }
        }
-
-       function __destruct() {
-               $n = count( $this->bufferedJobs );
-               if ( $n > 0 ) {
-                       $type = implode( ', ', array_unique( array_map( 'get_class', $this->bufferedJobs ) ) );
-                       trigger_error( __METHOD__ . ": $n buffered job(s) of type(s) $type never inserted." );
-               }
-       }
 }
index 39b5b3b..676659f 100644 (file)
@@ -290,8 +290,6 @@ class JobRunner implements LoggerAwareInterface {
                        $status = $job->run();
                        $error = $job->getLastError();
                        $this->commitMasterChanges( $lbFactory, $job, $fnameTrxOwner );
-                       // Important: this must be the last deferred update added (T100085, T154425)
-                       DeferredUpdates::addCallableUpdate( [ JobQueueGroup::class, 'pushLazyJobs' ] );
                        // Run any deferred update tasks; doUpdates() manages transactions itself
                        DeferredUpdates::doUpdates();
                } catch ( Exception $e ) {