foundationdb/fdbclient/RESTClient.actor.cpp

364 lines
14 KiB
C++

/*
* RESTClient.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/RESTClient.h"
#include "fdbclient/HTTP.h"
#include "flow/IRateControl.h"
#include "fdbclient/RESTUtils.h"
#include "flow/Arena.h"
#include "flow/Error.h"
#include "flow/FastRef.h"
#include "flow/Knobs.h"
#include "flow/Net2Packet.h"
#include "flow/flow.h"
#include "flow/network.h"
#include "flow/serialize.h"
#include "flow/Trace.h"
#include "flow/UnitTest.h"
#include <memory>
#include <unordered_map>
#include "flow/actorcompiler.h" // always the last include
json_spirit::mObject RESTClient::Stats::getJSON() {
json_spirit::mObject o;
o["host_service"] = host_service;
o["requests_failed"] = requests_failed;
o["requests_successful"] = requests_successful;
o["bytes_sent"] = bytes_sent;
return o;
}
RESTClient::Stats RESTClient::Stats::operator-(const Stats& rhs) {
Stats r(host_service);
r.requests_failed = requests_failed - rhs.requests_failed;
r.requests_successful = requests_successful - rhs.requests_successful;
r.bytes_sent = bytes_sent - rhs.bytes_sent;
return r;
}
RESTClient::RESTClient() {}
RESTClient::RESTClient(std::unordered_map<std::string, int>& knobSettings) {
knobs.set(knobSettings);
}
void RESTClient::setKnobs(const std::unordered_map<std::string, int>& knobSettings) {
knobs.set(knobSettings);
}
std::unordered_map<std::string, int> RESTClient::getKnobs() const {
return knobs.get();
}
ACTOR Future<Reference<HTTP::Response>> doRequest_impl(Reference<RESTClient> client,
std::string verb,
HTTP::Headers headers,
RESTUrl* url,
std::set<unsigned int> successCodes) {
state UnsentPacketQueue content;
state int contentLen = url->body.size();
if (url->body.size() > 0) {
PacketWriter pw(content.getWriteBuffer(url->body.size()), nullptr, Unversioned());
pw.serializeBytes(url->body);
}
std::string statsKey = RESTClient::getStatsKey(url->service, url->service);
auto sItr = client->statsMap.find(statsKey);
if (sItr == client->statsMap.end()) {
client->statsMap.emplace(statsKey, std::make_unique<RESTClient::Stats>(statsKey));
}
headers["Content-Length"] = format("%d", contentLen);
headers["Host"] = url->host;
state int maxTries = std::min(client->knobs.request_tries, client->knobs.connect_tries);
state int thisTry = 1;
state double nextRetryDelay = 2.0;
state Reference<IRateControl> sendReceiveRate = makeReference<Unlimited>();
state double reqTimeout = (client->knobs.request_timeout_secs * 1.0) / 60;
state RESTConnectionPoolKey connectPoolKey = RESTConnectionPool::getConnectionPoolKey(url->host, url->service);
state RESTClient::Stats* statsPtr = client->statsMap[statsKey].get();
loop {
state Optional<Error> err;
state Optional<NetworkAddress> remoteAddress;
state bool connectionEstablished = false;
state Reference<HTTP::Response> r;
try {
// Start connecting
Future<RESTConnectionPool::ReusableConnection> frconn = client->conectionPool->connect(
connectPoolKey, client->knobs.secure_connection, client->knobs.max_connection_life);
// Finish connecting, do request
state RESTConnectionPool::ReusableConnection rconn =
wait(timeoutError(frconn, client->knobs.connect_timeout));
connectionEstablished = true;
remoteAddress = rconn.conn->getPeerAddress();
Reference<HTTP::Response> _r = wait(timeoutError(HTTP::doRequest(rconn.conn,
verb,
url->resource,
headers,
contentLen > 0 ? &content : nullptr,
contentLen,
sendReceiveRate,
&statsPtr->bytes_sent,
sendReceiveRate),
reqTimeout));
r = _r;
// Since the response was parsed successfully (which is why we are here) reuse the connection unless we
// received the "Connection: close" header.
if (r->headers["Connection"] != "close") {
client->conectionPool->returnConnection(connectPoolKey, rconn, client->knobs.connection_pool_size);
}
rconn.conn.clear();
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
err = e;
}
// If err is not present then r is valid.
// If r->code is in successCodes then record the successful request and return r.
if (!err.present() && successCodes.count(r->code) != 0) {
statsPtr->requests_successful++;
return r;
}
// Otherwise, this request is considered failed. Update failure count.
statsPtr->requests_failed++;
// All errors in err are potentially retryable as well as certain HTTP response codes...
bool retryable = err.present() || r->code == HTTP::HTTP_STATUS_CODE_INTERNAL_SERVER_ERROR ||
r->code == HTTP::HTTP_STATUS_CODE_BAD_GATEWAY ||
r->code == HTTP::HTTP_STATUS_CODE_SERVICE_UNAVAILABLE ||
r->code == HTTP::HTTP_STATUS_CODE_TOO_MANY_REQUESTS;
// But only if our previous attempt was not the last allowable try.
retryable = retryable && (thisTry < maxTries);
TraceEvent event(SevWarn, retryable ? "RESTClient_FailedRetryable" : "RESTClient_RequestFailed");
// Attach err to trace event if present, otherwise extract some stuff from the response
if (err.present()) {
event.errorUnsuppressed(err.get());
}
event.suppressFor(60);
if (!err.present()) {
event.detail("ResponseCode", r->code);
}
event.detail("ConnectionEstablished", connectionEstablished);
if (remoteAddress.present())
event.detail("RemoteEndpoint", remoteAddress.get());
else
event.detail("RemoteHost", url->host);
event.detail("Verb", verb).detail("Resource", url->resource).detail("ThisTry", thisTry);
// If r is not valid or not code TOO_MANY_REQUESTS then increment the try count.
// TOO_MANY_REQUEST's will not count against the attempt limit.
if (!r || r->code != HTTP::HTTP_STATUS_CODE_TOO_MANY_REQUESTS) {
++thisTry;
}
// We will wait delay seconds before the next retry, start with nextRetryDelay.
double delay = nextRetryDelay;
// Double but limit the *next* nextRetryDelay.
nextRetryDelay = std::min(nextRetryDelay * 2, 60.0);
if (retryable) {
// If r is valid then obey the Retry-After response header if present.
if (r) {
auto iRetryAfter = r->headers.find("Retry-After");
if (iRetryAfter != r->headers.end()) {
event.detail("RetryAfterHeader", iRetryAfter->second);
char* pEnd;
double retryAfter = strtod(iRetryAfter->second.c_str(), &pEnd);
if (*pEnd) {
// If there were other characters then don't trust the parsed value
retryAfter = HTTP::HTTP_RETRYAFTER_DELAY_SECS;
}
// Update delay
delay = std::max(delay, retryAfter);
}
}
// Log the delay then wait.
event.detail("RetryDelay", delay);
wait(::delay(delay));
} else {
// We can't retry, so throw something.
// This error code means the authentication header was not accepted, likely the account or key is wrong.
if (r && r->code == HTTP::HTTP_STATUS_CODE_NOT_ACCEPTABLE) {
throw http_not_accepted();
}
if (r && r->code == HTTP::HTTP_STATUS_CODE_UNAUTHORIZED) {
throw http_auth_failed();
}
// Recognize and throw specific errors
if (err.present()) {
int code = err.get().code();
// If we get a timed_out error during the the connect() phase, we'll call that connection_failed despite
// the fact that there was technically never a 'connection' to begin with. It differentiates between an
// active connection timing out vs a connection timing out, though not between an active connection
// failing vs connection attempt failing.
// TODO: Add more error types?
if (code == error_code_timed_out && !connectionEstablished) {
throw connection_failed();
}
if (code == error_code_timed_out || code == error_code_connection_failed ||
code == error_code_lookup_failed) {
throw err.get();
}
}
throw http_request_failed();
}
}
}
Future<Reference<HTTP::Response>> RESTClient::doPutOrPost(const std::string& verb,
Optional<HTTP::Headers> optHeaders,
RESTUrl* url,
std::set<unsigned int> successCodes) {
HTTP::Headers headers;
if (optHeaders.present()) {
headers = optHeaders.get();
}
return doRequest_impl(Reference<RESTClient>::addRef(this), verb, headers, url, successCodes);
}
Future<Reference<HTTP::Response>> RESTClient::doPost(const std::string& fullUrl,
const std::string& requestBody,
Optional<HTTP::Headers> optHeaders) {
RESTUrl url(fullUrl, requestBody, knobs.secure_connection);
return doPutOrPost(HTTP::HTTP_VERB_POST, optHeaders, std::addressof(url), { HTTP::HTTP_STATUS_CODE_OK });
}
Future<Reference<HTTP::Response>> RESTClient::doPut(const std::string& fullUrl,
const std::string& requestBody,
Optional<HTTP::Headers> optHeaders) {
RESTUrl url(fullUrl, requestBody, knobs.secure_connection);
return doPutOrPost(
HTTP::HTTP_VERB_PUT,
optHeaders,
std::addressof(url),
// 201 - on successful resource create
// 200 / 204 - if target resource representation was successfully modified with the desired state
{ HTTP::HTTP_STATUS_CODE_OK, HTTP::HTTP_STATUS_CODE_CREATED, HTTP::HTTP_STATUS_CODE_NO_CONTENT });
}
Future<Reference<HTTP::Response>> RESTClient::doGetHeadDeleteOrTrace(const std::string& verb,
Optional<HTTP::Headers> optHeaders,
RESTUrl* url,
std::set<unsigned int> successCodes) {
HTTP::Headers headers;
if (optHeaders.present()) {
headers = optHeaders.get();
}
return doRequest_impl(Reference<RESTClient>::addRef(this), HTTP::HTTP_VERB_GET, headers, url, successCodes);
}
Future<Reference<HTTP::Response>> RESTClient::doGet(const std::string& fullUrl, Optional<HTTP::Headers> optHeaders) {
RESTUrl url(fullUrl, knobs.secure_connection);
return doGetHeadDeleteOrTrace(HTTP::HTTP_VERB_GET, optHeaders, std::addressof(url), { HTTP::HTTP_STATUS_CODE_OK });
}
Future<Reference<HTTP::Response>> RESTClient::doHead(const std::string& fullUrl, Optional<HTTP::Headers> optHeaders) {
RESTUrl url(fullUrl, knobs.secure_connection);
return doGetHeadDeleteOrTrace(HTTP::HTTP_VERB_HEAD, optHeaders, std::addressof(url), { HTTP::HTTP_STATUS_CODE_OK });
}
Future<Reference<HTTP::Response>> RESTClient::doDelete(const std::string& fullUrl, Optional<HTTP::Headers> optHeaders) {
RESTUrl url(fullUrl, knobs.secure_connection);
return doGetHeadDeleteOrTrace(
HTTP::HTTP_VERB_DELETE,
optHeaders,
std::addressof(url),
// 200 - action has been enacted.
// 202 - action will likely succeed, but, has not yet been enacted.
// 204 - action has been enated, no further information is to supplied.
{ HTTP::HTTP_STATUS_CODE_OK, HTTP::HTTP_STATUS_CODE_NO_CONTENT, HTTP::HTTP_STATUS_CODE_ACCEPTED });
}
Future<Reference<HTTP::Response>> RESTClient::doTrace(const std::string& fullUrl, Optional<HTTP::Headers> optHeaders) {
RESTUrl url(fullUrl, knobs.secure_connection);
return doGetHeadDeleteOrTrace(
HTTP::HTTP_VERB_TRACE, optHeaders, std::addressof(url), { HTTP::HTTP_STATUS_CODE_OK });
}
// Only used to link unit tests
void forceLinkRESTClientTests() {}
TEST_CASE("fdbrpc/RESTClient") {
RESTClient r;
std::unordered_map<std::string, int> knobs = r.getKnobs();
ASSERT_EQ(knobs["secure_connection"], RESTClientKnobs::SECURE_CONNECTION);
ASSERT_EQ(knobs["connection_pool_size"], FLOW_KNOBS->RESTCLIENT_MAX_CONNECTIONPOOL_SIZE);
ASSERT_EQ(knobs["connect_tries"], FLOW_KNOBS->RESTCLIENT_CONNECT_TRIES);
ASSERT_EQ(knobs["connect_timeout"], FLOW_KNOBS->RESTCLIENT_CONNECT_TIMEOUT);
ASSERT_EQ(knobs["max_connection_life"], FLOW_KNOBS->RESTCLIENT_MAX_CONNECTION_LIFE);
ASSERT_EQ(knobs["request_tries"], FLOW_KNOBS->RESTCLIENT_REQUEST_TRIES);
ASSERT_EQ(knobs["request_timeout_secs"], FLOW_KNOBS->RESTCLIENT_REQUEST_TIMEOUT_SEC);
for (auto& itr : knobs) {
itr.second++;
}
r.setKnobs(knobs);
std::unordered_map<std::string, int> updated = r.getKnobs();
for (auto& itr : updated) {
ASSERT_EQ(knobs[itr.first], itr.second);
}
// invalid client knob
knobs["foo"] = 100;
try {
r.setKnobs(knobs);
ASSERT(false);
} catch (Error& e) {
if (e.code() != error_code_rest_invalid_rest_client_knob) {
throw e;
}
}
return Void();
}