Merge "kafka: Implement ack handling"
[lhc/web/wiklou.git] / includes / deferred / CdnCacheUpdate.php
index 9f7d8ca..4ce9e62 100644 (file)
@@ -22,6 +22,7 @@
  */
 
 use Wikimedia\Assert\Assert;
+use MediaWiki\MediaWikiServices;
 
 /**
  * Handles purging appropriate CDN URLs given a title (or titles)
@@ -29,7 +30,7 @@ use Wikimedia\Assert\Assert;
  */
 class CdnCacheUpdate implements DeferrableUpdate, MergeableUpdate {
        /** @var string[] Collection of URLs to purge */
-       protected $urls = array();
+       protected $urls = [];
 
        /**
         * @param string[] $urlArr Collection of URLs to purge
@@ -52,7 +53,7 @@ class CdnCacheUpdate implements DeferrableUpdate, MergeableUpdate {
         * @param string[] $urlArr
         * @return CdnCacheUpdate
         */
-       public static function newFromTitles( $titles, $urlArr = array() ) {
+       public static function newFromTitles( $titles, $urlArr = [] ) {
                /** @var Title $title */
                foreach ( $titles as $title ) {
                        $urlArr = array_merge( $urlArr, $title->getCdnUrls() );
@@ -81,10 +82,10 @@ class CdnCacheUpdate implements DeferrableUpdate, MergeableUpdate {
                if ( $wgCdnReboundPurgeDelay > 0 ) {
                        JobQueueGroup::singleton()->lazyPush( new CdnPurgeJob(
                                Title::makeTitle( NS_SPECIAL, 'Badtitle/' . __CLASS__ ),
-                               array(
+                               [
                                        'urls' => $this->urls,
                                        'jobReleaseTimestamp' => time() + $wgCdnReboundPurgeDelay
-                               )
+                               ]
                        ) );
                }
        }
@@ -108,10 +109,29 @@ class CdnCacheUpdate implements DeferrableUpdate, MergeableUpdate {
 
                wfDebugLog( 'squid', __METHOD__ . ': ' . implode( ' ', $urlArr ) );
 
+               // Reliably broadcast the purge to all edge nodes
+               $relayer = MediaWikiServices::getInstance()->getEventRelayerGroup()
+                                       ->getRelayer( 'cdn-url-purges' );
+               $ts = microtime( true );
+               $relayer->notifyMulti(
+                       'cdn-url-purges',
+                       array_map(
+                               function ( $url ) use ( $ts ) {
+                                       return [
+                                               'url' => $url,
+                                               'timestamp' => $ts,
+                                       ];
+                               },
+                               $urlArr
+                       )
+               );
+
+               // Send lossy UDP broadcasting if enabled
                if ( $wgHTCPRouting ) {
                        self::HTCPPurge( $urlArr );
                }
 
+               // Do direct server purges if enabled (this does not scale very well)
                if ( $wgSquidServers ) {
                        // Maximum number of parallel connections per squid
                        $maxSocketsPerSquid = 8;
@@ -195,7 +215,7 @@ class CdnCacheUpdate implements DeferrableUpdate, MergeableUpdate {
 
                        if ( isset( $conf['host'] ) && isset( $conf['port'] ) ) {
                                // Normalize single entries
-                               $conf = array( $conf );
+                               $conf = [ $conf ];
                        }
                        foreach ( $conf as $subconf ) {
                                if ( !isset( $subconf['host'] ) || !isset( $subconf['port'] ) ) {