namespace MediaWiki\Storage;
+use AppendIterator;
use DBAccessObjectUtils;
use IDBAccessObject;
use IExpiringStore;
use InvalidArgumentException;
use Language;
use MWException;
+use StatusValue;
use WANObjectCache;
use ExternalStoreAccess;
use Wikimedia\Assert\Assert;
* @param ExternalStoreAccess $extStoreAccess Access layer for external storage
* @param WANObjectCache $cache A cache manager for caching blobs. This can be the local
* wiki's default instance even if $dbDomain refers to a different wiki, since
- * makeGlobalKey() is used to constructed a key that allows cached blobs from the
- * same database to be re-used between wikis. For example, enwiki and frwiki will
- * use the same cache keys for blobs from the wikidatawiki database, regardless of
- * the cache's default key space.
+ * makeGlobalKey() is used to construct a key that allows cached blobs from the
+ * same database to be re-used between wikis. For example, wiki A and wiki B will
+ * use the same cache keys for blobs fetched from wiki C, regardless of the
+ * wiki-specific default key space.
* @param bool|string $dbDomain The ID of the target wiki database. Use false for the local wiki.
*/
public function __construct(
public function getBlob( $blobAddress, $queryFlags = 0 ) {
Assert::parameterType( 'string', $blobAddress, '$blobAddress' );
- // No negative caching; negative hits on text rows may be due to corrupted replica DBs
+ $error = null;
$blob = $this->cache->getWithSetCallback(
$this->getCacheKey( $blobAddress ),
$this->getCacheTTL(),
- function ( $unused, &$ttl, &$setOpts ) use ( $blobAddress, $queryFlags ) {
+ function ( $unused, &$ttl, &$setOpts ) use ( $blobAddress, $queryFlags, &$error ) {
// Ignore $setOpts; blobs are immutable and negatives are not cached
- return $this->fetchBlob( $blobAddress, $queryFlags );
+ list( $result, $errors ) = $this->fetchBlobs( [ $blobAddress ], $queryFlags );
+ // No negative caching; negative hits on text rows may be due to corrupted replica DBs
+ $error = $errors[$blobAddress] ?? null;
+ return $result[$blobAddress];
},
[ 'pcGroup' => self::TEXT_CACHE_GROUP, 'pcTTL' => IExpiringStore::TTL_PROC_LONG ]
);
- if ( $blob === false ) {
- throw new BlobAccessException( 'Failed to load blob from address ' . $blobAddress );
+ if ( $error ) {
+ throw new BlobAccessException( $error );
}
+ Assert::postcondition( is_string( $blob ), 'Blob must not be null' );
return $blob;
}
+ /**
+ * A batched version of BlobStore::getBlob.
+ *
+ * @param string[] $blobAddresses An array of blob addresses.
+ * @param int $queryFlags See IDBAccessObject.
+ * @throws BlobAccessException
+ * @return StatusValue A status with a map of blobAddress => binary blob data or null
+ * if fetching the blob has failed. Fetch failures errors are the
+ * warnings in the status object.
+ * @since 1.34
+ */
+ public function getBlobBatch( $blobAddresses, $queryFlags = 0 ) {
+ $errors = null;
+ $addressByCacheKey = $this->cache->makeMultiKeys(
+ $blobAddresses,
+ function ( $blobAddress ) {
+ return $this->getCacheKey( $blobAddress );
+ }
+ );
+ $blobsByCacheKey = $this->cache->getMultiWithUnionSetCallback(
+ $addressByCacheKey,
+ $this->getCacheTTL(),
+ function ( array $blobAddresses, array &$ttls, array &$setOpts ) use ( $queryFlags, &$errors ) {
+ // Ignore $setOpts; blobs are immutable and negatives are not cached
+ list( $result, $errors ) = $this->fetchBlobs( $blobAddresses, $queryFlags );
+ return $result;
+ },
+ [ 'pcGroup' => self::TEXT_CACHE_GROUP, 'pcTTL' => IExpiringStore::TTL_PROC_LONG ]
+ );
+
+ // Remap back to incoming blob addresses. The return value of the
+ // WANObjectCache::getMultiWithUnionSetCallback is keyed on the internal
+ // keys from WANObjectCache::makeMultiKeys, so we need to remap them
+ // before returning to the client.
+ $blobsByAddress = [];
+ foreach ( $blobsByCacheKey as $cacheKey => $blob ) {
+ $blobsByAddress[ $addressByCacheKey[ $cacheKey ] ] = $blob !== false ? $blob : null;
+ }
+
+ $result = StatusValue::newGood( $blobsByAddress );
+ if ( $errors ) {
+ foreach ( $errors as $error ) {
+ $result->warning( 'internalerror', $error );
+ }
+ }
+ return $result;
+ }
+
/**
* MCR migration note: this corresponds to Revision::fetchText
*
- * @param string $blobAddress
+ * @param string[] $blobAddresses
* @param int $queryFlags
*
* @throws BlobAccessException
- * @return string|false
- */
- private function fetchBlob( $blobAddress, $queryFlags ) {
- list( $schema, $id, ) = self::splitBlobAddress( $blobAddress );
+ * @return array [ $result, $errors ] A map of blob addresses to successfully fetched blobs
+ * or false if fetch failed, plus and array of errors
+ */
+ private function fetchBlobs( $blobAddresses, $queryFlags ) {
+ $textIdToBlobAddress = [];
+ $result = [];
+ $errors = [];
+ foreach ( $blobAddresses as $blobAddress ) {
+ list( $schema, $id ) = self::splitBlobAddress( $blobAddress );
+ //TODO: MCR: also support 'ex' schema with ExternalStore URLs, plus flags encoded in the URL!
+ if ( $schema === 'tt' ) {
+ $textId = intval( $id );
+ $textIdToBlobAddress[$textId] = $blobAddress;
+ } else {
+ $errors[$blobAddress] = "Unknown blob address schema: $schema";
+ $result[$blobAddress] = false;
+ continue;
+ }
- //TODO: MCR: also support 'ex' schema with ExternalStore URLs, plus flags encoded in the URL!
- if ( $schema === 'tt' ) {
- $textId = intval( $id );
- } else {
- // XXX: change to better exceptions! That makes migration more difficult, though.
- throw new BlobAccessException( "Unknown blob address schema: $schema" );
+ if ( !$textId || $id !== (string)$textId ) {
+ $errors[$blobAddress] = "Bad blob address: $blobAddress";
+ $result[$blobAddress] = false;
+ }
}
- if ( !$textId || $id !== (string)$textId ) {
- // XXX: change to better exceptions! That makes migration more difficult, though.
- throw new BlobAccessException( "Bad blob address: $blobAddress" );
+ $textIds = array_keys( $textIdToBlobAddress );
+ if ( !$textIds ) {
+ return [ $result, $errors ];
}
-
// Callers doing updates will pass in READ_LATEST as usual. Since the text/blob tables
// do not normally get rows changed around, set READ_LATEST_IMMUTABLE in those cases.
$queryFlags |= DBAccessObjectUtils::hasFlags( $queryFlags, self::READ_LATEST )
? self::READ_LATEST_IMMUTABLE
: 0;
-
list( $index, $options, $fallbackIndex, $fallbackOptions ) =
DBAccessObjectUtils::getDBOptions( $queryFlags );
-
// Text data is immutable; check replica DBs first.
- $row = $this->getDBConnection( $index )->selectRow(
+ $dbConnection = $this->getDBConnection( $index );
+ $rows = $dbConnection->select(
'text',
- [ 'old_text', 'old_flags' ],
- [ 'old_id' => $textId ],
+ [ 'old_id', 'old_text', 'old_flags' ],
+ [ 'old_id' => $textIds ],
__METHOD__,
$options
);
- // Fallback to DB_MASTER in some cases if the row was not found, using the appropriate
+ // Fallback to DB_MASTER in some cases if not all the rows were found, using the appropriate
// options, such as FOR UPDATE to avoid missing rows due to REPEATABLE-READ.
- if ( !$row && $fallbackIndex !== null ) {
- $row = $this->getDBConnection( $fallbackIndex )->selectRow(
+ if ( $dbConnection->numRows( $rows ) !== count( $textIds ) && $fallbackIndex !== null ) {
+ $fetchedTextIds = [];
+ foreach ( $rows as $row ) {
+ $fetchedTextIds[] = $row->old_id;
+ }
+ $missingTextIds = array_diff( $textIds, $fetchedTextIds );
+ $dbConnection = $this->getDBConnection( $fallbackIndex );
+ $rowsFromFallback = $dbConnection->select(
'text',
- [ 'old_text', 'old_flags' ],
- [ 'old_id' => $textId ],
+ [ 'old_id', 'old_text', 'old_flags' ],
+ [ 'old_id' => $missingTextIds ],
__METHOD__,
$fallbackOptions
);
+ $appendIterator = new AppendIterator();
+ $appendIterator->append( $rows );
+ $appendIterator->append( $rowsFromFallback );
+ $rows = $appendIterator;
}
- if ( !$row ) {
- wfWarn( __METHOD__ . ": No text row with ID $textId." );
- return false;
+ foreach ( $rows as $row ) {
+ $blobAddress = $textIdToBlobAddress[$row->old_id];
+ $blob = $this->expandBlob( $row->old_text, $row->old_flags, $blobAddress );
+ if ( $blob === false ) {
+ $errors[$blobAddress] = "Bad data in text row {$row->old_id}.";
+ }
+ $result[$blobAddress] = $blob;
}
- $blob = $this->expandBlob( $row->old_text, $row->old_flags, $blobAddress );
-
- if ( $blob === false ) {
- wfLogWarning( __METHOD__ . ": Bad data in text row $textId." );
- return false;
+ // If we're still missing some of the rows, set errors for missing blobs.
+ if ( count( $result ) !== count( $blobAddresses ) ) {
+ foreach ( $blobAddresses as $blobAddress ) {
+ if ( !isset( $result[$blobAddress ] ) ) {
+ $errors[$blobAddress] = "Unable to fetch blob at $blobAddress";
+ $result[$blobAddress] = false;
+ }
+ }
}
-
- return $blob;
+ return [ $result, $errors ];
}
/**
* Get a cache key for a given Blob address.
*
* The cache key is constructed in a way that allows cached blobs from the same database
- * to be re-used between wikis. For example, enwiki and frwiki will use the same cache keys
- * for blobs from the wikidatawiki database.
+ * to be re-used between wikis. For example, wiki A and wiki B will use the same cache keys
+ * for blobs fetched from wiki C.
*
* @param string $blobAddress
* @return string
*/
private function getCacheKey( $blobAddress ) {
return $this->cache->makeGlobalKey(
- 'BlobStore',
- 'address',
+ 'SqlBlobStore-blob',
$this->dbLoadBalancer->resolveDomainID( $this->dbDomain ),
$blobAddress
);