Revert "Made JobQueueFederated no longer need "checkDelay" for delaying"
[lhc/web/wiklou.git] / includes / jobqueue / JobQueue.php
index c858e42..1a730d3 100644 (file)
@@ -49,6 +49,8 @@ abstract class JobQueue {
 
        /** @var BagOStuff */
        protected $dupCache;
+       /** @var JobQueueAggregator */
+       protected $aggr;
 
        const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
 
@@ -76,6 +78,9 @@ abstract class JobQueue {
                        throw new MWException( __CLASS__ . " does not support delayed jobs." );
                }
                $this->dupCache = wfGetCache( CACHE_ANYTHING );
+               $this->aggr = isset( $params['aggregator'] )
+                       ? $params['aggregator']
+                       : new JobQueueAggregatorNull( array() );
        }
 
        /**
@@ -187,9 +192,7 @@ abstract class JobQueue {
         * @throws JobQueueError
         */
        final public function isEmpty() {
-               wfProfileIn( __METHOD__ );
                $res = $this->doIsEmpty();
-               wfProfileOut( __METHOD__ );
 
                return $res;
        }
@@ -210,9 +213,7 @@ abstract class JobQueue {
         * @throws JobQueueError
         */
        final public function getSize() {
-               wfProfileIn( __METHOD__ );
                $res = $this->doGetSize();
-               wfProfileOut( __METHOD__ );
 
                return $res;
        }
@@ -233,9 +234,7 @@ abstract class JobQueue {
         * @throws JobQueueError
         */
        final public function getAcquiredCount() {
-               wfProfileIn( __METHOD__ );
                $res = $this->doGetAcquiredCount();
-               wfProfileOut( __METHOD__ );
 
                return $res;
        }
@@ -257,9 +256,7 @@ abstract class JobQueue {
         * @since 1.22
         */
        final public function getDelayedCount() {
-               wfProfileIn( __METHOD__ );
                $res = $this->doGetDelayedCount();
-               wfProfileOut( __METHOD__ );
 
                return $res;
        }
@@ -282,9 +279,7 @@ abstract class JobQueue {
         * @throws JobQueueError
         */
        final public function getAbandonedCount() {
-               wfProfileIn( __METHOD__ );
                $res = $this->doGetAbandonedCount();
-               wfProfileOut( __METHOD__ );
 
                return $res;
        }
@@ -308,7 +303,8 @@ abstract class JobQueue {
         * @throws JobQueueError
         */
        final public function push( $jobs, $flags = 0 ) {
-               $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags );
+               $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
+               $this->batchPush( $jobs, $flags );
        }
 
        /**
@@ -323,7 +319,7 @@ abstract class JobQueue {
         */
        final public function batchPush( array $jobs, $flags = 0 ) {
                if ( !count( $jobs ) ) {
-                       return true; // nothing to do
+                       return; // nothing to do
                }
 
                foreach ( $jobs as $job ) {
@@ -336,15 +332,14 @@ abstract class JobQueue {
                        }
                }
 
-               wfProfileIn( __METHOD__ );
                $this->doBatchPush( $jobs, $flags );
-               wfProfileOut( __METHOD__ );
+               $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
        }
 
        /**
         * @see JobQueue::batchPush()
         * @param array $jobs
-        * @param array $flags
+        * @param int $flags
         */
        abstract protected function doBatchPush( array $jobs, $flags );
 
@@ -366,9 +361,11 @@ abstract class JobQueue {
                        throw new MWException( "Unrecognized job type '{$this->type}'." );
                }
 
-               wfProfileIn( __METHOD__ );
                $job = $this->doPop();
-               wfProfileOut( __METHOD__ );
+
+               if ( !$job ) {
+                       $this->aggr->notifyQueueEmpty( $this->wiki, $this->type );
+               }
 
                // Flag this job as an old duplicate based on its "root" job...
                try {
@@ -376,7 +373,7 @@ abstract class JobQueue {
                                JobQueue::incrStats( 'job-pop-duplicate', $this->type, 1, $this->wiki );
                                $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
                        }
-               } catch ( MWException $e ) {
+               } catch ( Exception $e ) {
                        // don't lose jobs over this
                }
 
@@ -403,9 +400,7 @@ abstract class JobQueue {
                if ( $job->getType() !== $this->type ) {
                        throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
                }
-               wfProfileIn( __METHOD__ );
                $this->doAck( $job );
-               wfProfileOut( __METHOD__ );
        }
 
        /**
@@ -449,9 +444,7 @@ abstract class JobQueue {
                if ( $job->getType() !== $this->type ) {
                        throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
                }
-               wfProfileIn( __METHOD__ );
                $ok = $this->doDeduplicateRootJob( $job );
-               wfProfileOut( __METHOD__ );
 
                return $ok;
        }
@@ -494,9 +487,7 @@ abstract class JobQueue {
                if ( $job->getType() !== $this->type ) {
                        throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
                }
-               wfProfileIn( __METHOD__ );
                $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
-               wfProfileOut( __METHOD__ );
 
                return $isDuplicate;
        }
@@ -538,9 +529,7 @@ abstract class JobQueue {
         * @return void
         */
        final public function delete() {
-               wfProfileIn( __METHOD__ );
                $this->doDelete();
-               wfProfileOut( __METHOD__ );
        }
 
        /**
@@ -560,9 +549,7 @@ abstract class JobQueue {
         * @throws JobQueueError
         */
        final public function waitForBackups() {
-               wfProfileIn( __METHOD__ );
                $this->doWaitForBackups();
-               wfProfileOut( __METHOD__ );
        }
 
        /**
@@ -607,9 +594,7 @@ abstract class JobQueue {
         * @return void
         */
        final public function flushCaches() {
-               wfProfileIn( __METHOD__ );
                $this->doFlushCaches();
-               wfProfileOut( __METHOD__ );
        }
 
        /**
@@ -661,7 +646,6 @@ abstract class JobQueue {
         * @since 1.22
         */
        final public function getSiblingQueuesWithJobs( array $types ) {
-               $section = new ProfileSection( __METHOD__ );
 
                return $this->doGetSiblingQueuesWithJobs( $types );
        }
@@ -686,7 +670,6 @@ abstract class JobQueue {
         * @since 1.22
         */
        final public function getSiblingQueueSizes( array $types ) {
-               $section = new ProfileSection( __METHOD__ );
 
                return $this->doGetSiblingQueueSizes( $types );
        }