wiki = $params['wiki']; $this->type = $params['type']; $this->order = isset( $params['order'] ) ? $params['order'] : 'random'; } /** * Get a job queue object of the specified type. * $params includes: * class : what job class to use (determines job type) * wiki : wiki ID of the wiki the jobs are for (defaults to current wiki) * type : The name of the job types this queue handles * order : Order that pop() selects jobs, either "timestamp" or "random". * If "timestamp" is used, the queue will effectively be FIFO. Note that * pop() will not be exactly FIFO, and even if it was, job completion would * not appear to be exactly FIFO since jobs can take different times to finish. * If "random" is used, pop() will pick jobs in random order. This might be * useful for improving concurrency depending on the queue storage medium. * * @param $params array * @return JobQueue * @throws MWException */ final public static function factory( array $params ) { $class = $params['class']; if ( !MWInit::classExists( $class ) ) { throw new MWException( "Invalid job queue class '$class'." ); } $obj = new $class( $params ); if ( !( $obj instanceof self ) ) { throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." ); } return $obj; } /** * @return string Wiki ID */ final public function getWiki() { return $this->wiki; } /** * @return string Job type that this queue handles */ final public function getType() { return $this->type; } /** * @return bool Quickly check if the queue is empty */ final public function isEmpty() { wfProfileIn( __METHOD__ ); $res = $this->doIsEmpty(); wfProfileOut( __METHOD__ ); return $res; } /** * @see JobQueue::isEmpty() * @return bool */ abstract protected function doIsEmpty(); /** * Push a batch of jobs into the queue * * @param $jobs array List of Jobs * @param $flags integer Bitfield (supports JobQueue::QoS_Atomic) * @return bool */ final public function batchPush( array $jobs, $flags = 0 ) { foreach ( $jobs as $job ) { if ( $job->getType() !== $this->type ) { throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); } } wfProfileIn( __METHOD__ ); $ok = $this->doBatchPush( $jobs, $flags ); if ( $ok ) { wfIncrStats( 'job-insert', count( $jobs ) ); } wfProfileOut( __METHOD__ ); return $ok; } /** * @see JobQueue::batchPush() * @return bool */ abstract protected function doBatchPush( array $jobs, $flags ); /** * Pop a job off of the queue * * @return Job|bool Returns false on failure */ final public function pop() { wfProfileIn( __METHOD__ ); $job = $this->doPop(); if ( $job ) { wfIncrStats( 'job-pop' ); } wfProfileOut( __METHOD__ ); return $job; } /** * @see JobQueue::pop() * @return Job */ abstract protected function doPop(); /** * Acknowledge that a job was completed * * @param $job Job * @return bool */ final public function ack( Job $job ) { if ( $job->getType() !== $this->type ) { throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); } wfProfileIn( __METHOD__ ); $ok = $this->doAck( $job ); wfProfileOut( __METHOD__ ); return $ok; } /** * @see JobQueue::ack() * @return bool */ abstract protected function doAck( Job $job ); /** * Wait for any slaves or backup servers to catch up * * @return void */ final public function waitForBackups() { wfProfileIn( __METHOD__ ); $this->doWaitForBackups(); wfProfileOut( __METHOD__ ); } /** * @see JobQueue::waitForBackups() * @return void */ protected function doWaitForBackups() {} }