3 /***************************************************************************\
4 * SPIP, Systeme de publication pour l'internet *
6 * Copyright (c) 2001-2017 *
7 * Arnaud Martin, Antoine Pitrou, Philippe Riviere, Emmanuel Saint-James *
9 * Ce programme est un logiciel libre distribue sous licence GNU/GPL. *
10 * Pour plus de details voir le fichier COPYING.txt ou l'aide en ligne. *
11 \***************************************************************************/
14 * Gestion des queues de travaux
16 * @package SPIP\Core\Queue
18 if (!defined("_ECRIRE_INC_VERSION")) {
22 define('_JQ_SCHEDULED', 1);
23 define('_JQ_PENDING', 0);
24 #define('_JQ_MAX_JOBS_EXECUTE',200); // pour personaliser le nombre de jobs traitables a chaque hit
25 #define('_JQ_MAX_JOBS_TIME_TO_EXECUTE',15); // pour personaliser le temps d'excution dispo a chaque hit
26 #define('_JQ_NB_JOBS_OVERFLOW',10000); // nombre de jobs a partir duquel on force le traitement en fin de hit pour purger
29 * Ajouter une tâche à la file
31 * Les tâches sont ensuites exécutées par date programmée croissant/priorité décroissante
34 * The function name to call.
36 * A human-readable description of the queued job.
38 * Optional array of arguments to pass to the function.
40 * Optional file path which needs to be included for $fucntion.
41 * @param $no_duplicate
42 * If TRUE, do not add the job to the queue if one with the same function and
43 * arguments already exists.
44 * If 'function_only' test of existence is only on function name (for cron job)
46 * time for starting the job. If 0, job will start as soon as possible
48 * -10 (low priority) to +10 (high priority), 0 is the default
52 function queue_add_job(
57 $no_duplicate = false,
61 include_spip('base/abstract_sql');
63 // cas pourri de ecrire/action/editer_site avec l'option reload=oui
64 if (defined('_GENIE_SYNDIC_NOW')) {
65 $arguments['id_syndic'] = _GENIE_SYNDIC_NOW
;
68 // serialiser les arguments
69 $arguments = serialize($arguments);
70 $md5args = md5($arguments);
72 // si pas de date programee, des que possible
73 $duplicate_where = 'status=' . intval(_JQ_SCHEDULED
) . ' AND ';
76 $duplicate_where = ""; // ne pas dupliquer si deja le meme job en cours d'execution
78 $date = date('Y-m-d H:i:s', $time);
81 'fonction' => $function,
82 'descriptif' => $description,
84 'md5args' => $md5args,
86 'priorite' => max(-10, min(10, intval($priority))),
88 'status' => _JQ_SCHEDULED
,
90 // si option ne pas dupliquer, regarder si la fonction existe deja
91 // avec les memes args et file
95 $id_job = sql_getfetsel('id_job', 'spip_jobs',
97 $duplicate_where . 'fonction=' . sql_quote($function)
98 . (($no_duplicate === 'function_only') ?
'' :
99 ' AND md5args=' . sql_quote($md5args) . ' AND inclure=' . sql_quote($file)))
104 $id_job = sql_insertq('spip_jobs', $set_job);
105 // en cas de concurrence, deux process peuvent arriver jusqu'ici en parallele
106 // avec le meme job unique a inserer. Dans ce cas, celui qui a eu l'id le plus grand
111 $id_prev = sql_getfetsel('id_job', 'spip_jobs', "id_job<" . intval($id_job) . " AND $duplicate_where")
113 sql_delete('spip_jobs', 'id_job=' . intval($id_job));
118 // verifier la non duplication qui peut etre problematique en cas de concurence
119 // il faut dans ce cas que seul le dernier ajoute se supprime !
121 // une option de debug pour verifier que les arguments en base sont bons
122 // ie cas d'un char non acceptables sur certains type de champs
123 // qui coupe la valeur
124 if (defined('_JQ_INSERT_CHECK_ARGS') and $id_job) {
125 $args = sql_getfetsel('args', 'spip_jobs', 'id_job=' . intval($id_job));
126 if ($args !== $arguments) {
127 spip_log('arguments job errones / longueur ' . strlen($args) . " vs " . strlen($arguments) . ' / valeur : ' . var_export($arguments,
133 queue_update_next_job_time($time);
135 // si la mise en file d'attente du job echoue,
136 // il ne faut pas perdre l'execution de la fonction
137 // on la lance immediatement, c'est un fallback
138 // sauf en cas d'upgrade necessaire (table spip_jobs inexistante)
139 elseif ($GLOBALS['meta']['version_installee'] == $GLOBALS['spip_version_base']) {
140 $set_job['id_job'] = 0;
141 queue_start_job($set_job);
148 * Purger la file de tâche et reprogrammer les tâches périodiques
152 function queue_purger() {
153 include_spip('base/abstract_sql');
154 sql_delete('spip_jobs');
155 sql_delete("spip_jobs_liens", "id_job NOT IN (" . sql_get_select("id_job", "spip_jobs") . ")");
156 include_spip('inc/genie');
157 genie_queue_watch_dist();
161 * Retirer une tache de la file d'attente
164 * id de la tache a retirer
167 function queue_remove_job($id_job) {
168 include_spip('base/abstract_sql');
170 if ($row = sql_fetsel('fonction,inclure,date', 'spip_jobs', 'id_job=' . intval($id_job))
171 and $res = sql_delete('spip_jobs', 'id_job=' . intval($id_job))
173 queue_unlink_job($id_job);
174 // est-ce une tache cron qu'il faut relancer ?
175 if ($periode = queue_is_cron_job($row['fonction'], $row['inclure'])) {
176 // relancer avec les nouveaux arguments de temps
177 include_spip('inc/genie');
178 // relancer avec la periode prevue
179 queue_genie_replan_job($row['fonction'], $periode, strtotime($row['date']));
181 queue_update_next_job_time();
188 * Associer une tache avec un objet
191 * id de la tache a lier
192 * @param array $objets
193 * peut être un simple tableau array('objet'=>'article','id_objet'=>23)
194 * ou un tableau composé de tableaux simples pour lieur plusieurs objets en une fois
196 function queue_link_job($id_job, $objets) {
197 include_spip('base/abstract_sql');
199 if (is_array($objets) and count($objets)) {
200 if (is_array(reset($objets))) {
201 foreach ($objets as $k => $o) {
202 $objets[$k]['id_job'] = $id_job;
204 sql_insertq_multi('spip_jobs_liens', $objets);
206 sql_insertq('spip_jobs_liens', array_merge(array('id_job' => $id_job), $objets));
212 * Dissocier une tache d'un objet
215 * id de la tache à dissocier
217 * resultat du sql_delete
219 function queue_unlink_job($id_job) {
220 return sql_delete("spip_jobs_liens", "id_job=" . intval($id_job));
224 * Lancer une tache decrite par sa ligne SQL
227 * describe the job, with field of table spip_jobs
229 * return the result of job
231 function queue_start_job($row) {
233 // deserialiser les arguments
234 $args = unserialize($row['args']);
235 if ($args === false) {
236 spip_log('arguments job errones ' . var_export($row, true), 'queue');
240 $fonction = $row['fonction'];
241 if (strlen($inclure = trim($row['inclure']))) {
242 if (substr($inclure, -1) == '/') { // c'est un chemin pour charger_fonction
243 $f = charger_fonction($fonction, rtrim($inclure, '/'), false);
248 include_spip($inclure);
252 if (!function_exists($fonction)) {
253 spip_log("fonction $fonction ($inclure) inexistante " . var_export($row, true), 'queue');
258 spip_log("queue [" . $row['id_job'] . "]: $fonction() start", 'queue');
259 switch (count($args)) {
264 $res = $fonction($args[0]);
267 $res = $fonction($args[0], $args[1]);
270 $res = $fonction($args[0], $args[1], $args[2]);
273 $res = $fonction($args[0], $args[1], $args[2], $args[3]);
276 $res = $fonction($args[0], $args[1], $args[2], $args[3], $args[4]);
279 $res = $fonction($args[0], $args[1], $args[2], $args[3], $args[4], $args[5]);
282 $res = $fonction($args[0], $args[1], $args[2], $args[3], $args[4], $args[5], $args[6]);
285 $res = $fonction($args[0], $args[1], $args[2], $args[3], $args[4], $args[5], $args[6], $args[7]);
288 $res = $fonction($args[0], $args[1], $args[2], $args[3], $args[4], $args[5], $args[6], $args[7], $args[8]);
291 $res = $fonction($args[0], $args[1], $args[2], $args[3], $args[4], $args[5], $args[6], $args[7], $args[8],
295 # plus lent mais completement generique
296 $res = call_user_func_array($fonction, $args);
298 spip_log("queue [" . $row['id_job'] . "]: $fonction() end", 'queue');
305 * Exécute les prochaînes tâches cron et replanifie les suivantes
307 * Prend une par une les tâches en attente et les lance, dans la limite
308 * d'un temps disponible total et d'un nombre maxi de tâches
310 * La date de la prochaine tâche à exécuter est mise à jour
311 * après chaque chaque tâche finie afin de relancer le scheduler uniquement
312 * quand c'est nécessaire
314 * @uses queue_sleep_time_to_next_job()
315 * @uses queue_error_handler() Pour capturer les erreurs en fin de hit
316 * @uses queue_start_job()
317 * @uses queue_close_job()
318 * @uses queue_update_next_job_time()
320 * @param array $force_jobs
321 * list of id_job to execute when provided
323 * - null : pas de tâche à réaliser maintenant
324 * - false : pas de connexion SQL
325 * - true : une planification a été faite.
327 function queue_schedule($force_jobs = null) {
329 if (defined('_DEBUG_BLOCK_QUEUE')) {
330 spip_log("_DEBUG_BLOCK_QUEUE : schedule stop", 'jq' . _LOG_DEBUG
);
335 // rien a faire si le prochain job est encore dans le futur
336 if (queue_sleep_time_to_next_job() > 0 and (!$force_jobs or !count($force_jobs))) {
337 spip_log("queue_sleep_time_to_next_job", 'jq' . _LOG_DEBUG
);
342 include_spip('base/abstract_sql');
343 // on ne peut rien faire si pas de connexion SQL
344 if (!spip_connect()) {
348 if (!defined('_JQ_MAX_JOBS_TIME_TO_EXECUTE')) {
349 $max_time = ini_get('max_execution_time') / 2;
350 // valeur conservatrice si on a pas reussi a lire le max_execution_time
354 define('_JQ_MAX_JOBS_TIME_TO_EXECUTE', min($max_time, 15)); // une valeur maxi en temps.
356 $end_time = $time + _JQ_MAX_JOBS_TIME_TO_EXECUTE
;
358 spip_log("JQ schedule $time / $end_time", 'jq' . _LOG_DEBUG
);
360 if (!defined('_JQ_MAX_JOBS_EXECUTE')) {
361 define('_JQ_MAX_JOBS_EXECUTE', 200);
365 // dont la date est passee (echus en attente),
369 // lorsqu'un job cron n'a pas fini, sa priorite est descendue
370 // pour qu'il ne bloque pas les autres jobs en attente
371 if (is_array($force_jobs) and count($force_jobs)) {
372 $cond = "status=" . intval(_JQ_SCHEDULED
) . " AND " . sql_in("id_job", $force_jobs);
374 $now = date('Y-m-d H:i:s', $time);
375 $cond = "status=" . intval(_JQ_SCHEDULED
) . " AND date<=" . sql_quote($now);
378 register_shutdown_function('queue_error_handler'); // recuperer les erreurs auant que possible
379 $res = sql_allfetsel('*', 'spip_jobs', $cond, '', 'priorite DESC,date', '0,' . (_JQ_MAX_JOBS_EXECUTE +
1));
381 if ($row = array_shift($res)) {
383 // il faut un verrou, a base de sql_delete
384 if (sql_delete('spip_jobs', "id_job=" . intval($row['id_job']) . " AND status=" . intval(_JQ_SCHEDULED
))) {
385 #spip_log("JQ schedule job ".$nbj." OK",'jq');
386 // on reinsert dans la base aussitot avec un status=_JQ_PENDING
387 $row['status'] = _JQ_PENDING
;
388 $row['date'] = date('Y-m-d H:i:s', $time);
389 sql_insertq('spip_jobs', $row);
391 // on a la main sur le job :
393 $result = queue_start_job($row);
396 queue_close_job($row, $time, $result);
399 spip_log("JQ schedule job end time " . $time, 'jq' . _LOG_DEBUG
);
400 } while ($nbj < _JQ_MAX_JOBS_EXECUTE
and $row and $time < $end_time);
401 spip_log("JQ schedule end time " . time(), 'jq' . _LOG_DEBUG
);
403 if ($row = array_shift($res)) {
404 queue_update_next_job_time(0); // on sait qu'il y a encore des jobs a lancer ASAP
405 spip_log("JQ encore !", 'jq' . _LOG_DEBUG
);
407 queue_update_next_job_time();
414 * Terminer un job au status _JQ_PENDING
416 * - le reprogrammer si c'est un cron
417 * - supprimer ses liens
418 * - le detruire en dernier
420 * @uses queue_is_cron_job()
421 * @uses queue_genie_replan_job()
427 function queue_close_job(&$row, $time, $result = 0) {
428 // est-ce une tache cron qu'il faut relancer ?
429 if ($periode = queue_is_cron_job($row['fonction'], $row['inclure'])) {
430 // relancer avec les nouveaux arguments de temps
431 include_spip('inc/genie');
432 if ($result < 0) // relancer tout de suite, mais en baissant la priorite
434 queue_genie_replan_job($row['fonction'], $periode, 0 - $result, null, $row['priorite'] - 1);
435 } else // relancer avec la periode prevue
437 queue_genie_replan_job($row['fonction'], $periode, $time);
440 // purger ses liens eventuels avec des objets
441 sql_delete("spip_jobs_liens", "id_job=" . intval($row['id_job']));
442 // supprimer le job fini
443 sql_delete('spip_jobs', 'id_job=' . intval($row['id_job']));
447 * Récuperer des erreurs autant que possible
448 * en terminant la gestion de la queue
450 * @uses queue_update_next_job_time()
452 function queue_error_handler() {
453 // se remettre dans le bon dossier, car Apache le change parfois (toujours?)
456 queue_update_next_job_time();
461 * Tester si une tâche était une tâche périodique à reprogrammer
463 * @uses taches_generales()
465 * @param string $function
466 * Nom de la fonction de tâche
467 * @param string $inclure
468 * Nom de l'inclusion contenant la fonction
470 * Périodicité de la tâche en secondes, si tâche périodique, sinon false.
472 function queue_is_cron_job($function, $inclure) {
473 static $taches = null;
474 if (strncmp($inclure, 'genie/', 6) == 0) {
475 if (is_null($taches)) {
476 include_spip('inc/genie');
477 $taches = taches_generales();
479 if (isset($taches[$function])) {
480 return $taches[$function];
488 * Mettre a jour la date du prochain job a lancer
489 * Si une date est fournie (au format time unix)
490 * on fait simplement un min entre la date deja connue et celle fournie
491 * (cas de l'ajout simple
492 * ou cas $next_time=0 car l'on sait qu'il faut revenir ASAP)
494 * @param int $next_time
495 * temps de la tache ajoutee ou 0 pour ASAP
497 function queue_update_next_job_time($next_time = null) {
498 static $nb_jobs_scheduled = null;
499 static $deja_la = false;
500 // prendre le min des $next_time que l'on voit passer ici, en cas de reentrance
502 // queue_close_job peut etre reentrant ici
508 include_spip('base/abstract_sql');
511 // traiter les jobs morts au combat (_JQ_PENDING depuis plus de 180s)
512 // pour cause de timeout ou autre erreur fatale
513 $res = sql_allfetsel("*", "spip_jobs",
514 "status=" . intval(_JQ_PENDING
) . " AND date<" . sql_quote(date('Y-m-d H:i:s', $time - 180)));
515 if (is_array($res)) {
516 foreach ($res as $row) {
517 queue_close_job($row, $time);
521 // chercher la date du prochain job si pas connu
522 if (is_null($next) or is_null(queue_sleep_time_to_next_job())) {
523 $date = sql_getfetsel('date', 'spip_jobs', "status=" . intval(_JQ_SCHEDULED
), '', 'date', '0,1');
524 $next = strtotime($date);
526 if (!is_null($next_time)) {
527 if (is_null($next) or $next > $next_time) {
533 if (is_null($nb_jobs_scheduled)) {
534 $nb_jobs_scheduled = sql_countsel('spip_jobs',
535 "status=" . intval(_JQ_SCHEDULED
) . " AND date<" . sql_quote(date('Y-m-d H:i:s', $time)));
536 } elseif ($next <= $time) {
537 $nb_jobs_scheduled++
;
539 // si trop de jobs en attente, on force la purge en fin de hit
540 // pour assurer le coup
541 if ($nb_jobs_scheduled > (defined('_JQ_NB_JOBS_OVERFLOW') ? _JQ_NB_JOBS_OVERFLOW
: 10000)) {
542 define('_DIRECT_CRON_FORCE', true);
546 queue_set_next_job_time($next);
552 * Mettre a jour la date de prochain job
556 function queue_set_next_job_time($next) {
558 // utiliser le temps courant reel plutot que temps de la requete ici
561 // toujours relire la valeur pour comparer, pour tenir compte des maj concourrantes
562 // et ne mettre a jour que si il y a un interet a le faire
563 // permet ausis d'initialiser le nom de fichier a coup sur
564 $curr_next = $_SERVER['REQUEST_TIME'] +
max(0, queue_sleep_time_to_next_job(true));
566 ($curr_next <= $time and $next > $time) // le prochain job est dans le futur mais pas la date planifiee actuelle
567 or $curr_next > $next // le prochain job est plus tot que la date planifiee actuelle
569 if (function_exists("cache_set") and defined('_MEMOIZE_MEMORY') and _MEMOIZE_MEMORY
) {
570 cache_set(_JQ_NEXT_JOB_TIME_FILENAME
, intval($next));
572 ecrire_fichier(_JQ_NEXT_JOB_TIME_FILENAME
, intval($next));
574 queue_sleep_time_to_next_job($next);
577 return queue_sleep_time_to_next_job();
581 * Déclenche le cron en asynchrone ou retourne le code HTML pour le déclencher
583 * Retourne le HTML à ajouter à la page pour declencher le cron
584 * ou rien si on a réussi à le lancer en asynchrone.
586 * Un verrou (cron.lock) empêche l'exécution du cron plus d'une fois par seconde.
588 * @uses queue_sleep_time_to_next_job()
589 * @see action_cron() L'URL appelée pour déclencher le cron
593 function queue_affichage_cron() {
596 $time_to_next = queue_sleep_time_to_next_job();
597 // rien a faire si le prochain job est encore dans le futur
598 if ($time_to_next > 0 or defined('_DEBUG_BLOCK_QUEUE')) {
602 // ne pas relancer si on vient de lancer dans la meme seconde par un hit concurent
603 if (file_exists($lock = _DIR_TMP
. "cron.lock") and !(@filemtime
($lock) < $_SERVER['REQUEST_TIME'])) {
609 // il y a des taches en attentes
610 // si depuis plus de 5min, on essaye de lancer le cron par tous les moyens pour rattraper le coup
611 // on est sans doute sur un site qui n'autorise pas http sortant ou avec peu de trafic
613 if ($time_to_next < -300) {
617 $url_cron = generer_url_action('cron', '', false, true);
619 if (!defined('_HTML_BG_CRON_FORCE') or !_HTML_BG_CRON_FORCE
) {
621 // methode la plus rapide :
622 // Si fsockopen est possible, on lance le cron via un socket en asynchrone
623 // si fsockopen echoue (disponibilite serveur, firewall) on essaye pas cURL
624 // car on a toutes les chances d'echouer pareil mais sans moyen de le savoir
625 // on passe direct a la methode background-image
626 if (function_exists('fsockopen')) {
627 $parts = parse_url($url_cron);
629 switch ($parts['scheme']) {
639 $fp = @fsockopen
($scheme . $parts['host'],
640 isset($parts['port']) ?
$parts['port'] : $port,
644 $timeout = 200; // ms
645 stream_set_timeout($fp, 0, $timeout * 1000);
646 $query = $parts['path'] . ($parts['query'] ?
"?" . $parts['query'] : "");
647 $out = "GET " . $query . " HTTP/1.1\r\n";
648 $out .= "Host: " . $parts['host'] . "\r\n";
649 $out .= "Connection: Close\r\n\r\n";
653 // on lit la reponse si possible pour fermer proprement la connexion
654 // avec un timeout total de 200ms pour ne pas se bloquer
655 while (!feof($fp) and $t < $timeout) {
657 $t +
= spip_timer('read', true);
666 // si fsockopen n'est pas dispo on essaye cURL :
667 // lancer le cron par un cURL asynchrone si cURL est present
668 elseif (function_exists("curl_init")) {
669 //setting the curl parameters.
670 $ch = curl_init($url_cron);
671 curl_setopt($ch, CURLOPT_RETURNTRANSFER
, true);
672 // cf bug : http://www.php.net/manual/en/function.curl-setopt.php#104597
673 curl_setopt($ch, CURLOPT_NOSIGNAL
, 1);
674 // valeur mini pour que la requete soit lancee
675 curl_setopt($ch, CURLOPT_TIMEOUT_MS
, 200);
686 // si deja force, on retourne sans rien
687 if (defined('_DIRECT_CRON_FORCE')) {
692 // inutile de faire un appel par image background,
693 // on force un appel direct en fin de hit
694 if ((defined('_IS_BOT') and _IS_BOT
)) {
695 define('_DIRECT_CRON_FORCE', true);
700 // en derniere solution, on insere une image background dans la page
701 $texte = '<!-- SPIP-CRON --><div style="background-image: url(\'' .
702 generer_url_action('cron') .