Removed commented-out deprecated function
[lhc/web/wiklou.git] / includes / job / JobQueue.php
1 <?php
2 /**
3 * @defgroup JobQueue JobQueue
4 */
5
6 if ( !defined( 'MEDIAWIKI' ) ) {
7 die( "This file is part of MediaWiki, it is not a valid entry point\n" );
8 }
9
10 /**
11 * Class to both describe a background job and handle jobs.
12 *
13 * @ingroup JobQueue
14 */
15 abstract class Job {
16 var $command,
17 $title,
18 $params,
19 $id,
20 $removeDuplicates,
21 $error;
22
23 /*-------------------------------------------------------------------------
24 * Abstract functions
25 *------------------------------------------------------------------------*/
26
27 /**
28 * Run the job
29 * @return boolean success
30 */
31 abstract function run();
32
33 /*-------------------------------------------------------------------------
34 * Static functions
35 *------------------------------------------------------------------------*/
36
37 /**
38 * Pop a job of a certain type. This tries less hard than pop() to
39 * actually find a job; it may be adversely affected by concurrent job
40 * runners.
41 */
42 static function pop_type( $type ) {
43 wfProfilein( __METHOD__ );
44
45 $dbw = wfGetDB( DB_MASTER );
46
47 $row = $dbw->selectRow(
48 'job',
49 '*',
50 array( 'job_cmd' => $type ),
51 __METHOD__,
52 array( 'LIMIT' => 1 )
53 );
54
55 if ( $row === false ) {
56 wfProfileOut( __METHOD__ );
57 return false;
58 }
59
60 /* Ensure we "own" this row */
61 $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
62 $affected = $dbw->affectedRows();
63
64 if ( $affected == 0 ) {
65 wfProfileOut( __METHOD__ );
66 return false;
67 }
68
69 $namespace = $row->job_namespace;
70 $dbkey = $row->job_title;
71 $title = Title::makeTitleSafe( $namespace, $dbkey );
72 $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ),
73 $row->job_id );
74
75 $dbw->delete( 'job', $job->insertFields(), __METHOD__ );
76 $dbw->commit();
77
78 wfProfileOut( __METHOD__ );
79 return $job;
80 }
81
82 /**
83 * Pop a job off the front of the queue
84 *
85 * @param $offset Integer: Number of jobs to skip
86 * @return Job or false if there's no jobs
87 */
88 static function pop( $offset = 0 ) {
89 wfProfileIn( __METHOD__ );
90
91 $dbr = wfGetDB( DB_SLAVE );
92
93 /* Get a job from the slave, start with an offset,
94 scan full set afterwards, avoid hitting purged rows
95
96 NB: If random fetch previously was used, offset
97 will always be ahead of few entries
98 */
99
100 $row = $dbr->selectRow( 'job', '*', "job_id >= ${offset}", __METHOD__,
101 array( 'ORDER BY' => 'job_id', 'LIMIT' => 1 ) );
102
103 // Refetching without offset is needed as some of job IDs could have had delayed commits
104 // and have lower IDs than jobs already executed, blame concurrency :)
105 //
106 if ( $row === false ) {
107 if ( $offset != 0 ) {
108 $row = $dbr->selectRow( 'job', '*', '', __METHOD__,
109 array( 'ORDER BY' => 'job_id', 'LIMIT' => 1 ) );
110 }
111
112 if ( $row === false ) {
113 wfProfileOut( __METHOD__ );
114 return false;
115 }
116 }
117 $offset = $row->job_id;
118
119 // Try to delete it from the master
120 $dbw = wfGetDB( DB_MASTER );
121 $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
122 $affected = $dbw->affectedRows();
123 $dbw->commit();
124
125 if ( !$affected ) {
126 // Failed, someone else beat us to it
127 // Try getting a random row
128 $row = $dbw->selectRow( 'job', array( 'MIN(job_id) as minjob',
129 'MAX(job_id) as maxjob' ), '1=1', __METHOD__ );
130 if ( $row === false || is_null( $row->minjob ) || is_null( $row->maxjob ) ) {
131 // No jobs to get
132 wfProfileOut( __METHOD__ );
133 return false;
134 }
135 // Get the random row
136 $row = $dbw->selectRow( 'job', '*',
137 'job_id >= ' . mt_rand( $row->minjob, $row->maxjob ), __METHOD__ );
138 if ( $row === false ) {
139 // Random job gone before we got the chance to select it
140 // Give up
141 wfProfileOut( __METHOD__ );
142 return false;
143 }
144 // Delete the random row
145 $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
146 $affected = $dbw->affectedRows();
147 $dbw->commit();
148
149 if ( !$affected ) {
150 // Random job gone before we exclusively deleted it
151 // Give up
152 wfProfileOut( __METHOD__ );
153 return false;
154 }
155 }
156
157 // If execution got to here, there's a row in $row that has been deleted from the database
158 // by this thread. Hence the concurrent pop was successful.
159 $namespace = $row->job_namespace;
160 $dbkey = $row->job_title;
161 $title = Title::makeTitleSafe( $namespace, $dbkey );
162 $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ), $row->job_id );
163
164 // Remove any duplicates it may have later in the queue
165 // Deadlock prone section
166 $dbw->begin();
167 $dbw->delete( 'job', $job->insertFields(), __METHOD__ );
168 $dbw->commit();
169
170 wfProfileOut( __METHOD__ );
171 return $job;
172 }
173
174 /**
175 * Create the appropriate object to handle a specific job
176 *
177 * @param $command String: Job command
178 * @param $title Title: Associated title
179 * @param $params Array: Job parameters
180 * @param $id Int: Job identifier
181 * @return Job
182 */
183 static function factory( $command, $title, $params = false, $id = 0 ) {
184 global $wgJobClasses;
185 if( isset( $wgJobClasses[$command] ) ) {
186 $class = $wgJobClasses[$command];
187 return new $class( $title, $params, $id );
188 }
189 throw new MWException( "Invalid job command `{$command}`" );
190 }
191
192 static function makeBlob( $params ) {
193 if ( $params !== false ) {
194 return serialize( $params );
195 } else {
196 return '';
197 }
198 }
199
200 static function extractBlob( $blob ) {
201 if ( (string)$blob !== '' ) {
202 return unserialize( $blob );
203 } else {
204 return false;
205 }
206 }
207
208 /**
209 * Batch-insert a group of jobs into the queue.
210 * This will be wrapped in a transaction with a forced commit.
211 *
212 * This may add duplicate at insert time, but they will be
213 * removed later on, when the first one is popped.
214 *
215 * @param $jobs array of Job objects
216 */
217 static function batchInsert( $jobs ) {
218 if( !count( $jobs ) ) {
219 return;
220 }
221 $dbw = wfGetDB( DB_MASTER );
222 $rows = array();
223 foreach( $jobs as $job ) {
224 $rows[] = $job->insertFields();
225 if ( count( $rows ) >= 50 ) {
226 # Do a small transaction to avoid slave lag
227 $dbw->begin();
228 $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
229 $dbw->commit();
230 $rows = array();
231 }
232 }
233 if ( $rows ) {
234 $dbw->begin();
235 $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
236 $dbw->commit();
237 }
238 }
239
240 /*-------------------------------------------------------------------------
241 * Non-static functions
242 *------------------------------------------------------------------------*/
243
244 function __construct( $command, $title, $params = false, $id = 0 ) {
245 $this->command = $command;
246 $this->title = $title;
247 $this->params = $params;
248 $this->id = $id;
249
250 // A bit of premature generalisation
251 // Oh well, the whole class is premature generalisation really
252 $this->removeDuplicates = true;
253 }
254
255 /**
256 * Insert a single job into the queue.
257 * @return bool true on success
258 */
259 function insert() {
260 $fields = $this->insertFields();
261
262 $dbw = wfGetDB( DB_MASTER );
263
264 if ( $this->removeDuplicates ) {
265 $res = $dbw->select( 'job', array( '1' ), $fields, __METHOD__ );
266 if ( $dbw->numRows( $res ) ) {
267 return;
268 }
269 }
270 return $dbw->insert( 'job', $fields, __METHOD__ );
271 }
272
273 protected function insertFields() {
274 $dbw = wfGetDB( DB_MASTER );
275 return array(
276 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
277 'job_cmd' => $this->command,
278 'job_namespace' => $this->title->getNamespace(),
279 'job_title' => $this->title->getDBkey(),
280 'job_params' => Job::makeBlob( $this->params )
281 );
282 }
283
284 function toString() {
285 $paramString = '';
286 if ( $this->params ) {
287 foreach ( $this->params as $key => $value ) {
288 if ( $paramString != '' ) {
289 $paramString .= ' ';
290 }
291 $paramString .= "$key=$value";
292 }
293 }
294
295 if ( is_object( $this->title ) ) {
296 $s = "{$this->command} " . $this->title->getPrefixedDBkey();
297 if ( $paramString !== '' ) {
298 $s .= ' ' . $paramString;
299 }
300 return $s;
301 } else {
302 return "{$this->command} $paramString";
303 }
304 }
305
306 protected function setLastError( $error ) {
307 $this->error = $error;
308 }
309
310 function getLastError() {
311 return $this->error;
312 }
313 }