* @return bool
*/
protected function doBatchPush( array $jobs, $flags ) {
- if ( count( $jobs ) ) {
- list( $dbw, $scope ) = $this->getMasterDB();
-
- $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated
- $rowList = array(); // list of jobs for jobs that are are not de-duplicated
-
- foreach ( $jobs as $job ) {
- $row = $this->insertFields( $job );
- if ( $job->ignoreDuplicates() ) {
- $rowSet[$row['job_sha1']] = $row;
- } else {
- $rowList[] = $row;
- }
+ list( $dbw, $scope ) = $this->getMasterDB();
+
+ $that = $this;
+ $method = __METHOD__;
+ $dbw->onTransactionIdle(
+ function() use ( $dbw, $that, $jobs, $flags, $method, $scope ) {
+ $that->doBatchPushInternal( $dbw, $jobs, $flags, $method );
}
+ );
- $key = $this->getCacheKey( 'empty' );
- $atomic = ( $flags & self::QOS_ATOMIC );
- $cache = $this->cache;
- $method = __METHOD__;
+ return true;
+ }
- $dbw->onTransactionIdle(
- function() use ( $dbw, $cache, $rowSet, $rowList, $atomic, $key, $method, $scope
- ) {
- if ( $atomic ) {
- $dbw->begin( $method ); // wrap all the job additions in one transaction
- }
- try {
- // Strip out any duplicate jobs that are already in the queue...
- if ( count( $rowSet ) ) {
- $res = $dbw->select( 'job', 'job_sha1',
- array(
- // No job_type condition since it's part of the job_sha1 hash
- 'job_sha1' => array_keys( $rowSet ),
- 'job_token' => '' // unclaimed
- ),
- $method
- );
- foreach ( $res as $row ) {
- wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." );
- unset( $rowSet[$row->job_sha1] ); // already enqueued
- }
- }
- // Build the full list of job rows to insert
- $rows = array_merge( $rowList, array_values( $rowSet ) );
- // Insert the job rows in chunks to avoid slave lag...
- foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
- $dbw->insert( 'job', $rowBatch, $method );
- }
- wfIncrStats( 'job-insert', count( $rows ) );
- wfIncrStats( 'job-insert-duplicate',
- count( $rowSet ) + count( $rowList ) - count( $rows ) );
- } catch ( DBError $e ) {
- if ( $atomic ) {
- $dbw->rollback( $method );
- }
- throw $e;
- }
- if ( $atomic ) {
- $dbw->commit( $method );
- }
+ /**
+ * This function should *not* be called outside of JobQueueDB
+ *
+ * @param DatabaseBase $dbw
+ * @param array $jobs
+ * @param int $flags
+ * @param string $method
+ * @return boolean
+ * @throws type
+ */
+ public function doBatchPushInternal( DatabaseBase $dbw, array $jobs, $flags, $method ) {
+ if ( !count( $jobs ) ) {
+ return true;
+ }
- $cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
- } );
+ $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated
+ $rowList = array(); // list of jobs for jobs that are are not de-duplicated
+ foreach ( $jobs as $job ) {
+ $row = $this->insertFields( $job );
+ if ( $job->ignoreDuplicates() ) {
+ $rowSet[$row['job_sha1']] = $row;
+ } else {
+ $rowList[] = $row;
+ }
+ }
+
+ if ( $flags & self::QOS_ATOMIC ) {
+ $dbw->begin( $method ); // wrap all the job additions in one transaction
+ }
+ try {
+ // Strip out any duplicate jobs that are already in the queue...
+ if ( count( $rowSet ) ) {
+ $res = $dbw->select( 'job', 'job_sha1',
+ array(
+ // No job_type condition since it's part of the job_sha1 hash
+ 'job_sha1' => array_keys( $rowSet ),
+ 'job_token' => '' // unclaimed
+ ),
+ $method
+ );
+ foreach ( $res as $row ) {
+ wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." );
+ unset( $rowSet[$row->job_sha1] ); // already enqueued
+ }
+ }
+ // Build the full list of job rows to insert
+ $rows = array_merge( $rowList, array_values( $rowSet ) );
+ // Insert the job rows in chunks to avoid slave lag...
+ foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
+ $dbw->insert( 'job', $rowBatch, $method );
+ }
+ JobQueue::incrStats( 'job-insert', $this->type, count( $rows ) );
+ JobQueue::incrStats( 'job-insert-duplicate', $this->type,
+ count( $rowSet ) + count( $rowList ) - count( $rows ) );
+ } catch ( DBError $e ) {
+ if ( $flags & self::QOS_ATOMIC ) {
+ $dbw->rollback( $method );
+ }
+ throw $e;
+ }
+ if ( $flags & self::QOS_ATOMIC ) {
+ $dbw->commit( $method );
}
+ $this->cache->set( $this->getCacheKey( 'empty' ), 'false', JobQueueDB::CACHE_TTL_LONG );
+
return true;
}
$this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG );
break; // nothing to do
}
- wfIncrStats( 'job-pop' );
+ JobQueue::incrStats( 'job-pop', $this->type );
// Get the job object from the row...
$title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
if ( !$title ) {
__METHOD__
);
$count += $dbw->affectedRows();
- wfIncrStats( 'job-recycle', $dbw->affectedRows() );
+ JobQueue::incrStats( 'job-recycle', $this->type, $dbw->affectedRows() );
$this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
}
}
wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." );
return false;
}
- wfIncrStats( 'job-insert', count( $items ) );
- wfIncrStats( 'job-insert-duplicate', count( $items ) - $failed - $pushed );
+ JobQueue::incrStats( 'job-insert', $this->type, count( $items ) );
+ JobQueue::incrStats( 'job-insert-duplicate', $this->type,
+ count( $items ) - $failed - $pushed );
} catch ( RedisException $e ) {
$this->throwRedisException( $this->server, $conn, $e );
}
break; // no jobs; nothing to do
}
- wfIncrStats( 'job-pop' );
+ JobQueue::incrStats( 'job-pop', $this->type );
$item = unserialize( $blob );
if ( $item === false ) {
wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." );
if ( $res ) {
list( $released, $abandoned, $pruned ) = $res;
$count += $released + $pruned;
- wfIncrStats( 'job-recycle', count( $released ) );
+ JobQueue::incrStats( 'job-recycle', $this->type, count( $released ) );
}
} catch ( RedisException $e ) {
$this->throwRedisException( $this->server, $conn, $e );