3 use Psr\Log\LoggerInterface
;
6 * Interface to key-value storage behind an HTTP server.
8 * Uses URL of the form "baseURL/{KEY}" to store, fetch, and delete values.
10 * E.g., when base URL is `/sessions/v1`, then the store would do:
12 * `PUT /sessions/v1/12345758`
16 * `GET /sessions/v1/12345758`
20 * `DELETE /sessions/v1/12345758`
22 * Minimal generic configuration:
25 * $wgObjectCaches['sessions'] = array(
26 * 'class' => 'RESTBagOStuff',
27 * 'url' => 'http://localhost:7231/wikimedia.org/somepath/'
31 * Configuration for Kask (session storage):
33 * $wgObjectCaches['sessions'] = array(
34 * 'class' => 'RESTBagOStuff',
35 * 'url' => 'https://kaskhost:1234/sessions/v1/',
37 * 'readHeaders' => [],
38 * 'writeHeaders' => [ 'content-type' => 'application/octet-stream' ],
39 * 'deleteHeaders' => [],
40 * 'writeMethod' => 'POST',
42 * 'extendedErrorBodyFields' => [ 'type', 'title', 'detail', 'instance' ]
44 * $wgSessionCacheType = 'sessions';
47 class RESTBagOStuff
extends BagOStuff
{
49 * Default connection timeout in seconds. The kernel retransmits the SYN
50 * packet after 1 second, so 1.2 seconds allows for 1 retransmit without
53 const DEFAULT_CONN_TIMEOUT
= 1.2;
56 * Default request timeout
58 const DEFAULT_REQ_TIMEOUT
= 3.0;
61 * @var MultiHttpClient
66 * REST URL to use for storage.
72 * @var array http parameters: readHeaders, writeHeaders, deleteHeaders, writeMethod
77 * @var array additional body fields to log on error, if possible
79 private $extendedErrorBodyFields;
81 public function __construct( $params ) {
82 if ( empty( $params['url'] ) ) {
83 throw new InvalidArgumentException( 'URL parameter is required' );
86 if ( empty( $params['client'] ) ) {
87 // Pass through some params to the HTTP client.
89 'connTimeout' => $params['connTimeout'] ?? self
::DEFAULT_CONN_TIMEOUT
,
90 'reqTimeout' => $params['reqTimeout'] ?? self
::DEFAULT_REQ_TIMEOUT
,
92 foreach ( [ 'caBundlePath', 'proxy' ] as $key ) {
93 if ( isset( $params[$key] ) ) {
94 $clientParams[$key] = $params[$key];
97 $this->client
= new MultiHttpClient( $clientParams );
99 $this->client
= $params['client'];
102 $this->httpParams
['writeMethod'] = $params['httpParams']['writeMethod'] ??
'PUT';
103 $this->httpParams
['readHeaders'] = $params['httpParams']['readHeaders'] ??
[];
104 $this->httpParams
['writeHeaders'] = $params['httpParams']['writeHeaders'] ??
[];
105 $this->httpParams
['deleteHeaders'] = $params['httpParams']['deleteHeaders'] ??
[];
106 $this->extendedErrorBodyFields
= $params['extendedErrorBodyFields'] ??
[];
108 // The parent constructor calls setLogger() which sets the logger in $this->client
109 parent
::__construct( $params );
111 // Make sure URL ends with /
112 $this->url
= rtrim( $params['url'], '/' ) . '/';
114 // Default config, R+W > N; no locks on reads though; writes go straight to state-machine
115 $this->attrMap
[self
::ATTR_SYNCWRITES
] = self
::QOS_SYNCWRITES_QC
;
118 public function setLogger( LoggerInterface
$logger ) {
119 parent
::setLogger( $logger );
120 $this->client
->setLogger( $logger );
123 protected function doGet( $key, $flags = 0, &$casToken = null ) {
128 'url' => $this->url
. rawurlencode( $key ),
129 'headers' => $this->httpParams
['readHeaders'],
132 list( $rcode, $rdesc, $rhdrs, $rbody, $rerr ) = $this->client
->run( $req );
133 if ( $rcode === 200 ) {
134 if ( is_string( $rbody ) ) {
135 $value = $this->decodeBody( $rbody );
136 /// @FIXME: use some kind of hash or UUID header as CAS token
137 $casToken = ( $value !== false ) ?
$rbody : null;
143 if ( $rcode === 0 ||
( $rcode >= 400 && $rcode != 404 ) ) {
144 return $this->handleError( "Failed to fetch $key", $rcode, $rerr, $rhdrs, $rbody );
149 public function set( $key, $value, $exptime = 0, $flags = 0 ) {
150 // @TODO: respect WRITE_SYNC (e.g. EACH_QUORUM)
151 // @TODO: respect $exptime
153 'method' => $this->httpParams
['writeMethod'],
154 'url' => $this->url
. rawurlencode( $key ),
155 'body' => $this->encodeBody( $value ),
156 'headers' => $this->httpParams
['writeHeaders'],
159 list( $rcode, $rdesc, $rhdrs, $rbody, $rerr ) = $this->client
->run( $req );
160 if ( $rcode === 200 ||
$rcode === 201 ||
$rcode === 204 ) {
163 return $this->handleError( "Failed to store $key", $rcode, $rerr, $rhdrs, $rbody );
166 public function add( $key, $value, $exptime = 0, $flags = 0 ) {
167 // @TODO: make this atomic
168 if ( $this->get( $key ) === false ) {
169 return $this->set( $key, $value, $exptime, $flags );
172 return false; // key already set
175 public function delete( $key, $flags = 0 ) {
176 // @TODO: respect WRITE_SYNC (e.g. EACH_QUORUM)
178 'method' => 'DELETE',
179 'url' => $this->url
. rawurlencode( $key ),
180 'headers' => $this->httpParams
['deleteHeaders'],
183 list( $rcode, $rdesc, $rhdrs, $rbody, $rerr ) = $this->client
->run( $req );
184 if ( in_array( $rcode, [ 200, 204, 205, 404, 410 ] ) ) {
187 return $this->handleError( "Failed to delete $key", $rcode, $rerr, $rhdrs, $rbody );
190 public function incr( $key, $value = 1 ) {
191 // @TODO: make this atomic
192 $n = $this->get( $key, self
::READ_LATEST
);
193 if ( $this->isInteger( $n ) ) { // key exists?
194 $n = max( $n +
intval( $value ), 0 );
195 // @TODO: respect $exptime
196 return $this->set( $key, $n ) ?
$n : false;
203 * Processes the response body.
205 * @param string $body request body to process
206 * @return mixed|bool the processed body, or false on error
208 private function decodeBody( $body ) {
209 $value = json_decode( $body, true );
210 return ( json_last_error() === JSON_ERROR_NONE
) ?
$value : false;
214 * Prepares the request body (the "value" portion of our key/value store) for transmission.
216 * @param string $body request body to prepare
217 * @return string the prepared body, or an empty string on error
218 * @throws LogicException
220 private function encodeBody( $body ) {
221 $value = json_encode( $body );
222 if ( $value === false ) {
223 throw new InvalidArgumentException( __METHOD__
. ": body could not be encoded." );
229 * Handle storage error
230 * @param string $msg Error message
231 * @param int $rcode Error code from client
232 * @param string $rerr Error message from client
233 * @param array $rhdrs Response headers
234 * @param string $rbody Error body from client (if any)
237 protected function handleError( $msg, $rcode, $rerr, $rhdrs, $rbody ) {
238 $message = "$msg : ({code}) {error}";
244 if ( $this->extendedErrorBodyFields
!== [] ) {
245 $body = $this->decodeBody( $rbody );
248 foreach ( $this->extendedErrorBodyFields
as $field ) {
249 if ( isset( $body[$field] ) ) {
250 $extraFields .= " : ({$field}) {$body[$field]}";
253 if ( $extraFields !== '' ) {
254 $message .= " {extra_fields}";
255 $context['extra_fields'] = $extraFields;
260 $this->logger
->error( $message, $context );
261 $this->setLastError( $rcode === 0 ? self
::ERR_UNREACHABLE
: self
::ERR_UNEXPECTED
);