[SPIP] ~v3.0.20-->v3.0.25
[lhc/web/clavette_www.git] / www / ecrire / inc / queue.php
1 <?php
2
3 /***************************************************************************\
4 * SPIP, Systeme de publication pour l'internet *
5 * *
6 * Copyright (c) 2001-2016 *
7 * Arnaud Martin, Antoine Pitrou, Philippe Riviere, Emmanuel Saint-James *
8 * *
9 * Ce programme est un logiciel libre distribue sous licence GNU/GPL. *
10 * Pour plus de details voir le fichier COPYING.txt ou l'aide en ligne. *
11 \***************************************************************************/
12
13 /**
14 * Gestion des queues de travaux
15 *
16 * @package SPIP\Queue
17 **/
18 if (!defined("_ECRIRE_INC_VERSION")) return;
19
20 define('_JQ_SCHEDULED',1);
21 define('_JQ_PENDING',0);
22 #define('_JQ_MAX_JOBS_EXECUTE',200); // pour personaliser le nombre de jobs traitables a chaque hit
23 #define('_JQ_MAX_JOBS_TIME_TO_EXECUTE',15); // pour personaliser le temps d'excution dispo a chaque hit
24 #define('_JQ_NB_JOBS_OVERFLOW',10000); // nombre de jobs a partir duquel on force le traitement en fin de hit pour purger
25
26 /**
27 * Ajouter une tâche à la file
28 *
29 * Les tâches sont ensuites exécutées par date programmée croissant/priorité décroissante
30 *
31 * @param $function
32 * The function name to call.
33 * @param $description
34 * A human-readable description of the queued job.
35 * @param $arguments
36 * Optional array of arguments to pass to the function.
37 * @param $file
38 * Optional file path which needs to be included for $fucntion.
39 * @param $no_duplicate
40 * If TRUE, do not add the job to the queue if one with the same function and
41 * arguments already exists.
42 * If 'function_only' test of existence is only on function name (for cron job)
43 * @param $time
44 * time for starting the job. If 0, job will start as soon as possible
45 * @param $priority
46 * -10 (low priority) to +10 (high priority), 0 is the default
47 * @return int
48 * id of job
49 */
50 function queue_add_job($function, $description, $arguments = array(), $file = '', $no_duplicate = false, $time=0, $priority=0){
51 include_spip('base/abstract_sql');
52
53 // cas pourri de ecrire/action/editer_site avec l'option reload=oui
54 if (defined('_GENIE_SYNDIC_NOW'))
55 $arguments['id_syndic'] = _GENIE_SYNDIC_NOW;
56
57 // serialiser les arguments
58 $arguments = serialize($arguments);
59 $md5args = md5($arguments);
60
61 // si pas de date programee, des que possible
62 $duplicate_where = 'status='.intval(_JQ_SCHEDULED).' AND ';
63 if (!$time){
64 $time = time();
65 $duplicate_where = ""; // ne pas dupliquer si deja le meme job en cours d'execution
66 }
67 $date = date('Y-m-d H:i:s',$time);
68
69 $set_job = array(
70 'fonction'=>$function,
71 'descriptif'=>$description,
72 'args'=>$arguments,
73 'md5args'=>$md5args,
74 'inclure'=>$file,
75 'priorite'=>max(-10,min(10,intval($priority))),
76 'date'=>$date,
77 'status'=>_JQ_SCHEDULED,
78 );
79 // si option ne pas dupliquer, regarder si la fonction existe deja
80 // avec les memes args et file
81 if (
82 $no_duplicate
83 AND
84 $id_job = sql_getfetsel('id_job','spip_jobs',
85 $duplicate_where =
86 $duplicate_where . 'fonction='.sql_quote($function)
87 .(($no_duplicate==='function_only')?'':
88 ' AND md5args='.sql_quote($md5args).' AND inclure='.sql_quote($file)))
89 )
90 return $id_job;
91
92 $id_job = sql_insertq('spip_jobs',$set_job);
93 // en cas de concurrence, deux process peuvent arriver jusqu'ici en parallele
94 // avec le meme job unique a inserer. Dans ce cas, celui qui a eu l'id le plus grand
95 // doit s'effacer
96 if (
97 $no_duplicate
98 AND
99 $id_prev = sql_getfetsel('id_job','spip_jobs',"id_job<".intval($id_job)." AND $duplicate_where")){
100 sql_delete('spip_jobs','id_job='.intval($id_job));
101 return $id_prev;
102 }
103
104 // verifier la non duplication qui peut etre problematique en cas de concurence
105 // il faut dans ce cas que seul le dernier ajoute se supprime !
106
107 // une option de debug pour verifier que les arguments en base sont bons
108 // ie cas d'un char non acceptables sur certains type de champs
109 // qui coupe la valeur
110 if (defined('_JQ_INSERT_CHECK_ARGS') AND $id_job) {
111 $args = sql_getfetsel('args', 'spip_jobs', 'id_job='.intval($id_job));
112 if ($args!==$arguments) {
113 spip_log('arguments job errones / longueur '.strlen($args)." vs ".strlen($arguments).' / valeur : '.var_export($arguments,true),'queue');
114 }
115 }
116
117 if ($id_job){
118 queue_update_next_job_time($time);
119 }
120 // si la mise en file d'attente du job echoue,
121 // il ne faut pas perdre l'execution de la fonction
122 // on la lance immediatement, c'est un fallback
123 // sauf en cas d'upgrade necessaire (table spip_jobs inexistante)
124 elseif($GLOBALS['meta']['version_installee']==$GLOBALS['spip_version_base']) {
125 $set_job['id_job'] = 0;
126 queue_start_job($set_job);
127 }
128
129 return $id_job;
130 }
131
132 /**
133 * Purger la file de tâche et reprgrammer les tâches périodiques
134 *
135 * @return void
136 */
137 function queue_purger(){
138 include_spip('base/abstract_sql');
139 sql_delete('spip_jobs');
140 sql_delete("spip_jobs_liens","id_job NOT IN (".sql_get_select("id_job","spip_jobs").")");
141 include_spip('inc/genie');
142 genie_queue_watch_dist();
143 }
144
145 /**
146 * Retirer une tache de la file d'attente
147 * @param int $id_job
148 * id de la tache a retirer
149 * @return bool
150 */
151 function queue_remove_job($id_job){
152 include_spip('base/abstract_sql');
153
154 if ($row = sql_fetsel('fonction,inclure,date','spip_jobs','id_job='.intval($id_job))
155 AND $res = sql_delete('spip_jobs','id_job='.intval($id_job))){
156 queue_unlink_job($id_job);
157 // est-ce une tache cron qu'il faut relancer ?
158 if ($periode = queue_is_cron_job($row['fonction'],$row['inclure'])){
159 // relancer avec les nouveaux arguments de temps
160 include_spip('inc/genie');
161 // relancer avec la periode prevue
162 queue_genie_replan_job($row['fonction'],$periode,strtotime($row['date']));
163 }
164 queue_update_next_job_time();
165 }
166 return $res;
167 }
168
169 /**
170 * Associer une tache avec un objet
171 *
172 * @param int $id_job
173 * id of job to link
174 * @param array $objets
175 * can be a simple array('objet'=>'article','id_objet'=>23)
176 * or an array of simple array to link multiples objet in one time
177 */
178 function queue_link_job($id_job,$objets){
179 include_spip('base/abstract_sql');
180
181 if (is_array($objets) AND count($objets)){
182 if (is_array(reset($objets))){
183 foreach($objets as $k=>$o){
184 $objets[$k]['id_job'] = $id_job;
185 }
186 sql_insertq_multi('spip_jobs_liens',$objets);
187 }
188 else
189 sql_insertq('spip_jobs_liens',array_merge(array('id_job'=>$id_job),$objets));
190 }
191 }
192
193 /**
194 * Dissocier une tache d'un objet
195 *
196 * @param int $id_job
197 * id of job to unlink ibject with
198 * @return int/bool
199 * result of sql_delete
200 */
201 function queue_unlink_job($id_job){
202 return sql_delete("spip_jobs_liens","id_job=".intval($id_job));
203 }
204
205 /**
206 * Lancer une tache decrite par sa ligne SQL
207 * @param array $row
208 * describe the job, with field of table spip_jobs
209 * @return mixed
210 * return the result of job
211 */
212 function queue_start_job($row){
213
214 // deserialiser les arguments
215 $args = unserialize($row['args']);
216 if ($args===false){
217 spip_log('arguments job errones '.var_export($row,true),'queue');
218 $args = array();
219 }
220
221 $fonction = $row['fonction'];
222 if (strlen($inclure = trim($row['inclure']))){
223 if (substr($inclure,-1)=='/'){ // c'est un chemin pour charger_fonction
224 $f = charger_fonction($fonction,rtrim($inclure,'/'),false);
225 if ($f)
226 $fonction = $f;
227 }
228 else
229 include_spip($inclure);
230 }
231
232 if (!function_exists($fonction)){
233 spip_log("fonction $fonction ($inclure) inexistante ".var_export($row,true),'queue');
234 return false;
235 }
236
237 spip_log("queue [".$row['id_job']."]: $fonction() start", 'queue');
238 switch (count($args)) {
239 case 0: $res = $fonction(); break;
240 case 1: $res = $fonction($args[0]); break;
241 case 2: $res = $fonction($args[0],$args[1]); break;
242 case 3: $res = $fonction($args[0],$args[1], $args[2]); break;
243 case 4: $res = $fonction($args[0],$args[1], $args[2], $args[3]); break;
244 case 5: $res = $fonction($args[0],$args[1], $args[2], $args[3], $args[4]); break;
245 case 6: $res = $fonction($args[0],$args[1], $args[2], $args[3], $args[4], $args[5]); break;
246 case 7: $res = $fonction($args[0],$args[1], $args[2], $args[3], $args[4], $args[5], $args[6]); break;
247 case 8: $res = $fonction($args[0],$args[1], $args[2], $args[3], $args[4], $args[5], $args[6], $args[7]); break;
248 case 9: $res = $fonction($args[0],$args[1], $args[2], $args[3], $args[4], $args[5], $args[6], $args[7], $args[8]); break;
249 case 10:$res = $fonction($args[0],$args[1], $args[2], $args[3], $args[4], $args[5], $args[6], $args[7], $args[8], $args[9]); break;
250 default:
251 # plus lent mais completement generique
252 $res = call_user_func_array($fonction, $args);
253 }
254 spip_log("queue [".$row['id_job']."]: $fonction() end", 'queue');
255 return $res;
256
257 }
258
259 /**
260 * Scheduler :
261 * Prend une par une les taches en attente
262 * et les lance, dans la limite d'un temps disponible total
263 * et d'un nombre maxi de taches
264 *
265 * La date de la prochaine tache a executer est mise a jour
266 * apres chaque chaque tache finie
267 * afin de relancer le scheduler uniquement quand c'est necessaire
268 *
269 * @param array $force_jobs
270 * list of id_job to execute when provided
271 * @return null|false
272 */
273 function queue_schedule($force_jobs = null){
274 $time = time();
275 if (defined('_DEBUG_BLOCK_QUEUE')) {
276 spip_log("_DEBUG_BLOCK_QUEUE : schedule stop",'jq'._LOG_DEBUG);
277 return;
278 }
279
280 // rien a faire si le prochain job est encore dans le futur
281 if (queue_sleep_time_to_next_job()>0 AND (!$force_jobs OR !count($force_jobs))){
282 spip_log("queue_sleep_time_to_next_job",'jq'._LOG_DEBUG);
283 return;
284 }
285
286 include_spip('base/abstract_sql');
287 // on ne peut rien faire si pas de connexion SQL
288 if (!spip_connect()) return false;
289
290 if (!defined('_JQ_MAX_JOBS_TIME_TO_EXECUTE')){
291 $max_time = ini_get('max_execution_time')/2;
292 // valeur conservatrice si on a pas reussi a lire le max_execution_time
293 if (!$max_time) $max_time=5;
294 define('_JQ_MAX_JOBS_TIME_TO_EXECUTE',min($max_time,15)); // une valeur maxi en temps.
295 }
296 $end_time = $time + _JQ_MAX_JOBS_TIME_TO_EXECUTE;
297
298 spip_log("JQ schedule $time / $end_time",'jq'._LOG_DEBUG);
299
300 if (!defined('_JQ_MAX_JOBS_EXECUTE'))
301 define('_JQ_MAX_JOBS_EXECUTE',200);
302 $nbj=0;
303 // attraper les jobs
304 // dont la date est passee (echus en attente),
305 // par odre :
306 // - de priorite
307 // - de date
308 // lorsqu'un job cron n'a pas fini, sa priorite est descendue
309 // pour qu'il ne bloque pas les autres jobs en attente
310 if (is_array($force_jobs) AND count($force_jobs))
311 $cond = "status=".intval(_JQ_SCHEDULED)." AND ".sql_in("id_job", $force_jobs);
312 else {
313 $now = date('Y-m-d H:i:s',$time);
314 $cond = "status=".intval(_JQ_SCHEDULED)." AND date<=".sql_quote($now);
315 }
316
317 register_shutdown_function('queue_error_handler'); // recuperer les erreurs auant que possible
318 $res = sql_allfetsel('*','spip_jobs',$cond,'','priorite DESC,date','0,'.(_JQ_MAX_JOBS_EXECUTE+1));
319 do {
320 if ($row = array_shift($res)){
321 $nbj++;
322 // il faut un verrou, a base de sql_delete
323 if (sql_delete('spip_jobs',"id_job=".intval($row['id_job'])." AND status=".intval(_JQ_SCHEDULED))){
324 #spip_log("JQ schedule job ".$nbj." OK",'jq');
325 // on reinsert dans la base aussitot avec un status=_JQ_PENDING
326 $row['status'] = _JQ_PENDING;
327 $row['date'] = date('Y-m-d H:i:s',$time);
328 sql_insertq('spip_jobs', $row);
329
330 // on a la main sur le job :
331 // l'executer
332 $result = queue_start_job($row);
333
334 $time = time();
335 queue_close_job($row, $time, $result);
336 }
337 }
338 spip_log("JQ schedule job end time ".$time,'jq'._LOG_DEBUG);
339 } while ($nbj<_JQ_MAX_JOBS_EXECUTE AND $row AND $time<$end_time);
340 spip_log("JQ schedule end time ".time(),'jq'._LOG_DEBUG);
341
342 if ($row = array_shift($res)){
343 queue_update_next_job_time(0); // on sait qu'il y a encore des jobs a lancer ASAP
344 spip_log("JQ encore !",'jq'._LOG_DEBUG);
345 }
346 else
347 queue_update_next_job_time();
348
349 return true;
350 }
351
352 /**
353 * Terminer un job au status _JQ_PENDING :
354 * - le reprogrammer si c'est un cron
355 * - supprimer ses liens
356 * - le detruire en dernier
357 *
358 * @param array $row
359 * @param int $time
360 * @param int $result
361 */
362 function queue_close_job(&$row,$time,$result=0){
363 // est-ce une tache cron qu'il faut relancer ?
364 if ($periode = queue_is_cron_job($row['fonction'],$row['inclure'])){
365 // relancer avec les nouveaux arguments de temps
366 include_spip('inc/genie');
367 if ($result<0)
368 // relancer tout de suite, mais en baissant la priorite
369 queue_genie_replan_job($row['fonction'],$periode,0-$result,null,$row['priorite']-1);
370 else
371 // relancer avec la periode prevue
372 queue_genie_replan_job($row['fonction'],$periode,$time);
373 }
374 // purger ses liens eventuels avec des objets
375 sql_delete("spip_jobs_liens","id_job=".intval($row['id_job']));
376 // supprimer le job fini
377 sql_delete('spip_jobs','id_job='.intval($row['id_job']));
378 }
379
380 /**
381 * Recuperer des erreurs auant que possible
382 * en terminant la gestion de la queue
383 */
384 function queue_error_handler(){
385 // se remettre dans le bon dossier, car Apache le change parfois (toujours?)
386 chdir(_ROOT_CWD);
387
388 queue_update_next_job_time();
389 }
390
391
392 /**
393 * Tester si une tache etait une tache periodique a reprogrammer
394 *
395 * @param <type> $function
396 * @param <type> $inclure
397 * @return <type>
398 */
399 function queue_is_cron_job($function,$inclure){
400 static $taches = null;
401 if (strncmp($inclure,'genie/',6)==0){
402 if (is_null($taches)){
403 include_spip('inc/genie');
404 $taches = taches_generales();
405 }
406 if (isset($taches[$function]))
407 return $taches[$function];
408 }
409 return false;
410 }
411
412 /**
413 * Mettre a jour la date du prochain job a lancer
414 * Si une date est fournie (au format time unix)
415 * on fait simplement un min entre la date deja connue et celle fournie
416 * (cas de l'ajout simple
417 * ou cas $next_time=0 car l'on sait qu'il faut revenir ASAP)
418 *
419 * @param int $next_time
420 * temps de la tache ajoutee ou 0 pour ASAP
421 */
422 function queue_update_next_job_time($next_time=null){
423 static $nb_jobs_scheduled = null;
424 static $deja_la = false;
425 // prendre le min des $next_time que l'on voit passer ici, en cas de reentrance
426 static $next = null;
427 // queue_close_job peut etre reentrant ici
428 if ($deja_la) return;
429 $deja_la = true;
430
431 include_spip('base/abstract_sql');
432 $time = time();
433
434 // traiter les jobs morts au combat (_JQ_PENDING depuis plus de 180s)
435 // pour cause de timeout ou autre erreur fatale
436 $res = sql_allfetsel("*","spip_jobs","status=".intval(_JQ_PENDING)." AND date<".sql_quote(date('Y-m-d H:i:s',$time-180)));
437 if (is_array($res)) {
438 foreach ($res as $row)
439 queue_close_job($row,$time);
440 }
441
442 // chercher la date du prochain job si pas connu
443 if (is_null($next) OR is_null(queue_sleep_time_to_next_job())){
444 $date = sql_getfetsel('date','spip_jobs',"status=".intval(_JQ_SCHEDULED),'','date','0,1');
445 $next = strtotime($date);
446 }
447 if (!is_null($next_time)){
448 if (is_null($next) OR $next>$next_time)
449 $next = $next_time;
450 }
451
452 if ($next){
453 if (is_null($nb_jobs_scheduled))
454 $nb_jobs_scheduled = sql_countsel('spip_jobs',"status=".intval(_JQ_SCHEDULED)." AND date<".sql_quote(date('Y-m-d H:i:s',$time)));
455 elseif ($next<=$time)
456 $nb_jobs_scheduled++;
457 // si trop de jobs en attente, on force la purge en fin de hit
458 // pour assurer le coup
459 if ($nb_jobs_scheduled>defined('_JQ_NB_JOBS_OVERFLOW')?_JQ_NB_JOBS_OVERFLOW:10000)
460 define('_DIRECT_CRON_FORCE',true);
461 }
462
463 queue_set_next_job_time($next);
464 $deja_la = false;
465 }
466
467
468 /**
469 * Mettre a jour la date de prochain job
470 * @param int $next
471 */
472 function queue_set_next_job_time($next) {
473
474 // utiliser le temps courant reel plutot que temps de la requete ici
475 $time = time();
476
477 // toujours relire la valeur pour comparer, pour tenir compte des maj concourrantes
478 // et ne mettre a jour que si il y a un interet a le faire
479 // permet ausis d'initialiser le nom de fichier a coup sur
480 $curr_next = $_SERVER['REQUEST_TIME'] + max(0,queue_sleep_time_to_next_job(true));
481 if (
482 ($curr_next<=$time AND $next>$time) // le prochain job est dans le futur mais pas la date planifiee actuelle
483 OR $curr_next>$next // le prochain job est plus tot que la date planifiee actuelle
484 ) {
485 if (include_spip('inc/memoization') AND defined('_MEMOIZE_MEMORY') AND _MEMOIZE_MEMORY) {
486 cache_set(_JQ_NEXT_JOB_TIME_FILENAME,intval($next));
487 }
488 else {
489 ecrire_fichier(_JQ_NEXT_JOB_TIME_FILENAME,intval($next));
490 }
491 queue_sleep_time_to_next_job($next);
492 }
493
494 return queue_sleep_time_to_next_job();
495 }
496
497 /**
498 * Déclenche le cron en asynchrone ou retourne le code HTML pour le déclencher
499 *
500 * Retourne le HTML à ajouter à la page pour declencher le cron
501 * ou rien si on a réussi à le lancer en asynchrone.
502 *
503 * @return string
504 */
505 function queue_affichage_cron(){
506 $texte = "";
507
508 $time_to_next = queue_sleep_time_to_next_job();
509 // rien a faire si le prochain job est encore dans le futur
510 if ($time_to_next>0 OR defined('_DEBUG_BLOCK_QUEUE'))
511 return $texte;
512
513 // ne pas relancer si on vient de lancer dans la meme seconde par un hit concurent
514 if (file_exists($lock=_DIR_TMP."cron.lock") AND !(@filemtime($lock)<$_SERVER['REQUEST_TIME']))
515 return $texte;
516 @touch($lock);
517
518 // il y a des taches en attentes
519 // si depuis plus de 5min, on essaye de lancer le cron par tous les moyens pour rattraper le coup
520 // on est sans doute sur un site qui n'autorise pas http sortant ou avec peu de trafic
521 $urgent = false;
522 if ($time_to_next<-300)
523 $urgent = true;
524
525 $url_cron = generer_url_action('cron','',false,true);
526
527 if (!defined('_HTML_BG_CRON_FORCE') OR !_HTML_BG_CRON_FORCE){
528
529 // methode la plus rapide :
530 // Si fsockopen est possible, on lance le cron via un socket en asynchrone
531 // si fsockopen echoue (disponibilite serveur, firewall) on essaye pas cURL
532 // car on a toutes les chances d'echouer pareil mais sans moyen de le savoir
533 // on passe direct a la methode background-image
534 if(function_exists('fsockopen')){
535 $parts=parse_url($url_cron);
536
537 switch ($parts['scheme']) {
538 case 'https':
539 $scheme = 'ssl://';
540 $port = 443;
541 break;
542 case 'http':
543 default:
544 $scheme = '';
545 $port = 80;
546 }
547
548 $fp = @fsockopen($scheme.$parts['host'],
549 isset($parts['port'])?$parts['port']:$port,
550 $errno, $errstr, 1);
551
552 if ($fp) {
553 $timeout = 200; // ms
554 stream_set_timeout($fp,0,$timeout * 1000);
555 $query = $parts['path'].($parts['query']?"?".$parts['query']:"");
556 $out = "GET ".$query." HTTP/1.1\r\n";
557 $out.= "Host: ".$parts['host']."\r\n";
558 $out.= "Connection: Close\r\n\r\n";
559 fwrite($fp, $out);
560 spip_timer('read');
561 $t = 0;
562 // on lit la reponse si possible pour fermer proprement la connexion
563 // avec un timeout total de 200ms pour ne pas se bloquer
564 while (!feof($fp) AND $t<$timeout) {
565 @fgets($fp, 1024);
566 $t += spip_timer('read',true);
567 spip_timer('read');
568 }
569 fclose($fp);
570 if (!$urgent)
571 return $texte;
572 }
573 }
574 // si fsockopen n'est pas dispo on essaye cURL :
575 // lancer le cron par un cURL asynchrone si cURL est present
576 elseif (function_exists("curl_init")){
577 //setting the curl parameters.
578 $ch = curl_init($url_cron);
579 curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
580 // cf bug : http://www.php.net/manual/en/function.curl-setopt.php#104597
581 curl_setopt($ch, CURLOPT_NOSIGNAL, 1);
582 // valeur mini pour que la requete soit lancee
583 curl_setopt($ch, CURLOPT_TIMEOUT_MS, 200);
584 // lancer
585 curl_exec($ch);
586 // fermer
587 curl_close($ch);
588 if (!$urgent)
589 return $texte;
590 }
591 }
592
593 // si deja force, on retourne sans rien
594 if (defined('_DIRECT_CRON_FORCE'))
595 return $texte;
596
597 // si c'est un bot
598 // inutile de faire un appel par image background,
599 // on force un appel direct en fin de hit
600 if ((defined('_IS_BOT') AND _IS_BOT)){
601 define('_DIRECT_CRON_FORCE',true);
602 return $texte;
603 }
604
605 // en derniere solution, on insere une image background dans la page
606 $texte = '<!-- SPIP-CRON --><div style="background-image: url(\'' .
607 generer_url_action('cron') .
608 '\');"></div>';
609
610 return $texte;
611 }
612 ?>