Try to opportunistically flush statsd data in maintenance scripts
authorAaron Schulz <aschulz@wikimedia.org>
Sat, 2 Dec 2017 21:12:41 +0000 (13:12 -0800)
committerBryanDavis <bdavis@wikimedia.org>
Sat, 30 Dec 2017 05:01:21 +0000 (05:01 +0000)
This helps to avoid OOMs from buffer build-ups in the statsd
factory object. This piggy-backs on to the same checks used
for deferred update runs. In addition, the output() method
checks if the data size is getting large and emits if needed.

Bug: T181385
Change-Id: I598be98a5770f8358975815e51380c4b8f63a79e

includes/GlobalFunctions.php
includes/MediaWiki.php
includes/libs/stats/BufferingStatsdDataFactory.php
includes/libs/stats/IBufferingStatsdDataFactory.php
includes/libs/stats/NullStatsdDataFactory.php
maintenance/Maintenance.php
tests/phpunit/MediaWikiTestCase.php

index 1a33b76..523a0f9 100644 (file)
@@ -24,7 +24,6 @@ if ( !defined( 'MEDIAWIKI' ) ) {
        die( "This file is part of MediaWiki, it is not a valid entry point" );
 }
 
-use Liuggio\StatsdClient\Sender\SocketSender;
 use MediaWiki\Logger\LoggerFactory;
 use MediaWiki\ProcOpenError;
 use MediaWiki\Session\SessionManager;
@@ -1231,6 +1230,7 @@ function wfErrorLog( $text, $file, array $context = [] ) {
 
 /**
  * @todo document
+ * @todo Move logic to MediaWiki.php
  */
 function wfLogProfilingData() {
        global $wgDebugLogGroups, $wgDebugRawPage;
@@ -1242,23 +1242,13 @@ function wfLogProfilingData() {
        $profiler->setContext( $context );
        $profiler->logData();
 
-       $config = $context->getConfig();
-       $stats = MediaWikiServices::getInstance()->getStatsdDataFactory();
-       if ( $config->get( 'StatsdServer' ) && $stats->hasData() ) {
-               try {
-                       $statsdServer = explode( ':', $config->get( 'StatsdServer' ) );
-                       $statsdHost = $statsdServer[0];
-                       $statsdPort = isset( $statsdServer[1] ) ? $statsdServer[1] : 8125;
-                       $statsdSender = new SocketSender( $statsdHost, $statsdPort );
-                       $statsdClient = new SamplingStatsdClient( $statsdSender, true, false );
-                       $statsdClient->setSamplingRates( $config->get( 'StatsdSamplingRates' ) );
-                       $statsdClient->send( $stats->getData() );
-               } catch ( Exception $ex ) {
-                       MWExceptionHandler::logException( $ex );
-               }
-       }
+       // Send out any buffered statsd metrics as needed
+       MediaWiki::emitBufferedStatsdData(
+               MediaWikiServices::getInstance()->getStatsdDataFactory(),
+               $context->getConfig()
+       );
 
-       # Profiling must actually be enabled...
+       // Profiling must actually be enabled...
        if ( $profiler instanceof ProfilerStub ) {
                return;
        }
index 9e3bc10..beb9de5 100644 (file)
@@ -26,6 +26,7 @@ use MediaWiki\MediaWikiServices;
 use Wikimedia\Rdbms\ChronologyProtector;
 use Wikimedia\Rdbms\LBFactory;
 use Wikimedia\Rdbms\DBConnectionError;
+use Liuggio\StatsdClient\Sender\SocketSender;
 
 /**
  * The MediaWiki class is the helper class for the index.php entry point.
@@ -912,6 +913,34 @@ class MediaWiki {
                wfDebug( "Request ended normally\n" );
        }
 
+       /**
+        * Send out any buffered statsd data according to sampling rules
+        *
+        * @param IBufferingStatsdDataFactory $stats
+        * @param Config $config
+        * @throws ConfigException
+        * @since 1.31
+        */
+       public static function emitBufferedStatsdData(
+               IBufferingStatsdDataFactory $stats, Config $config
+       ) {
+               if ( $config->get( 'StatsdServer' ) && $stats->hasData() ) {
+                       try {
+                               $statsdServer = explode( ':', $config->get( 'StatsdServer' ) );
+                               $statsdHost = $statsdServer[0];
+                               $statsdPort = isset( $statsdServer[1] ) ? $statsdServer[1] : 8125;
+                               $statsdSender = new SocketSender( $statsdHost, $statsdPort );
+                               $statsdClient = new SamplingStatsdClient( $statsdSender, true, false );
+                               $statsdClient->setSamplingRates( $config->get( 'StatsdSamplingRates' ) );
+                               $statsdClient->send( $stats->getData() );
+
+                               $stats->clearData(); // empty buffer for the next round
+                       } catch ( Exception $ex ) {
+                               MWExceptionHandler::logException( $ex );
+                       }
+               }
+       }
+
        /**
         * Potentially open a socket and sent an HTTP request back to the server
         * to run a specified number of jobs. This registers a callback to cleanup
index d75d9c0..06915b2 100644 (file)
@@ -99,27 +99,22 @@ class BufferingStatsdDataFactory extends StatsdDataFactory implements IBuffering
                return $this->buffer;
        }
 
-       /**
-        * Check whether this data factory has any data.
-        * @return bool
-        */
        public function hasData() {
                return !empty( $this->buffer );
        }
 
-       /**
-        * Return data from the factory.
-        * @return StatsdData[]
-        */
        public function getData() {
                return $this->buffer;
        }
 
-       /**
-        * Set collection enable status.
-        * @param bool $enabled Will collection be enabled?
-        * @return void
-        */
+       public function clearData() {
+               $this->buffer = [];
+       }
+
+       public function getDataCount() {
+               return count( $this->buffer );
+       }
+
        public function setEnabled( $enabled ) {
                $this->enabled = $enabled;
        }
index f77b26c..77b4c35 100644 (file)
@@ -9,22 +9,34 @@ use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
  */
 interface IBufferingStatsdDataFactory extends StatsdDataFactoryInterface {
        /**
-        * Check whether this data factory has any data.
+        * Check whether this data factory has any buffered data.
         * @return bool
         */
        public function hasData();
 
        /**
-        * Return data from the factory.
+        * Return the buffered data from the factory.
         * @return StatsdData[]
         */
        public function getData();
 
+       /**
+        * Clear all buffered data from the factory
+        * @since 1.31
+        */
+       public function clearData();
+
+       /**
+        * Return the number of buffered statsd data entries
+        * @return int
+        * @since 1.31
+        */
+       public function getDataCount();
+
        /**
         * Set collection enable status.
         * @param bool $enabled Will collection be enabled?
         * @return void
         */
        public function setEnabled( $enabled );
-
 }
index ed16311..63de8f2 100644 (file)
@@ -105,27 +105,22 @@ class NullStatsdDataFactory implements IBufferingStatsdDataFactory {
                return $data;
        }
 
-       /**
-        * Check whether this data factory has any data.
-        * @return bool
-        */
        public function hasData() {
                return false;
        }
 
-       /**
-        * Return data from the factory.
-        * @return StatsdData[]
-        */
        public function getData() {
                return [];
        }
 
-       /**
-        * Set collection enable status.
-        * @param bool $enabled Will collection be enabled?
-        * @return void
-        */
+       public function clearData() {
+               // Nothing to do, always empty
+       }
+
+       public function getDataCount() {
+               return 0;
+       }
+
        public function setEnabled( $enabled ) {
                // Nothing to do, null factory is always disabled.
        }
index a3e0ffd..07f547f 100644 (file)
@@ -381,6 +381,11 @@ abstract class Maintenance {
         * @param mixed $channel Unique identifier for the channel. See function outputChanneled.
         */
        protected function output( $out, $channel = null ) {
+               // Try to periodically flush buffered metrics to avoid OOMs
+               $stats = MediaWikiServices::getInstance()->getStatsdDataFactory();
+               if ( $stats->getDataCount() > 1000 ) {
+                       MediaWiki::emitBufferedStatsdData( $stats, $this->getConfig() );
+               }
                if ( $this->mQuiet ) {
                        return;
                }
@@ -591,36 +596,41 @@ abstract class Maintenance {
                $lbFactory->setAgentName(
                        mb_strlen( $agent ) > 15 ? mb_substr( $agent, 0, 15 ) . '...' : $agent
                );
-               self::setLBFactoryTriggers( $lbFactory );
+               self::setLBFactoryTriggers( $lbFactory, $this->getConfig() );
        }
 
        /**
         * @param LBFactory $LBFactory
+        * @param Config $config
         * @since 1.28
         */
-       public static function setLBFactoryTriggers( LBFactory $LBFactory ) {
+       public static function setLBFactoryTriggers( LBFactory $LBFactory, Config $config ) {
+               $services = MediaWikiServices::getInstance();
+               $stats = $services->getStatsdDataFactory();
                // Hook into period lag checks which often happen in long-running scripts
-               $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+               $lbFactory = $services->getDBLoadBalancerFactory();
                $lbFactory->setWaitForReplicationListener(
                        __METHOD__,
-                       function () {
-                               global $wgCommandLineMode;
+                       function () use ( $stats, $config ) {
                                // Check config in case of JobRunner and unit tests
-                               if ( $wgCommandLineMode ) {
+                               if ( $config->get( 'CommandLineMode' ) ) {
                                        DeferredUpdates::tryOpportunisticExecute( 'run' );
                                }
+                               // Try to periodically flush buffered metrics to avoid OOMs
+                               MediaWiki::emitBufferedStatsdData( $stats, $config );
                        }
                );
                // Check for other windows to run them. A script may read or do a few writes
                // to the master but mostly be writing to something else, like a file store.
                $lbFactory->getMainLB()->setTransactionListener(
                        __METHOD__,
-                       function ( $trigger ) {
-                               global $wgCommandLineMode;
+                       function ( $trigger ) use ( $stats, $config ) {
                                // Check config in case of JobRunner and unit tests
-                               if ( $wgCommandLineMode && $trigger === IDatabase::TRIGGER_COMMIT ) {
+                               if ( $config->get( 'CommandLineMode' ) && $trigger === IDatabase::TRIGGER_COMMIT ) {
                                        DeferredUpdates::tryOpportunisticExecute( 'run' );
                                }
+                               // Try to periodically flush buffered metrics to avoid OOMs
+                               MediaWiki::emitBufferedStatsdData( $stats, $config );
                        }
                );
        }
index ef13101..d542826 100644 (file)
@@ -517,8 +517,9 @@ abstract class MediaWikiTestCase extends PHPUnit_Framework_TestCase {
 
                // XXX: reset maintenance triggers
                // Hook into period lag checks which often happen in long-running scripts
-               $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
-               Maintenance::setLBFactoryTriggers( $lbFactory );
+               $services = MediaWikiServices::getInstance();
+               $lbFactory = $services->getDBLoadBalancerFactory();
+               Maintenance::setLBFactoryTriggers( $lbFactory, $services->getMainConfig() );
 
                ob_start( 'MediaWikiTestCase::wfResetOutputBuffersBarrier' );
        }