Get network interfaces ready for https proxy. (#7556)

* Move HTTP from fdbclient/ to fdbrpc/.

* Move md5 and libb64 to contrib/.

* Get network interfaces ready for https proxy.

* Rebase
This commit is contained in:
Renxuan Wang 2022-07-25 17:08:32 -07:00 committed by GitHub
parent 6719e5a85b
commit dc9599f2e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 85 additions and 61 deletions

View File

@ -8,6 +8,8 @@ add_subdirectory(rapidxml)
add_subdirectory(sqlite)
add_subdirectory(SimpleOpt)
add_subdirectory(fmt-8.1.1)
add_subdirectory(md5)
add_subdirectory(libb64)
if(NOT WIN32)
add_subdirectory(linenoise)
add_subdirectory(debug_determinism)

View File

@ -0,0 +1,2 @@
add_library(libb64 STATIC cdecode.c cencode.c)
target_include_directories(libb64 PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include")

View File

@ -5,7 +5,7 @@ This is part of the libb64 project, and has been placed in the public domain.
For details, see http://sourceforge.net/projects/libb64
*/
#include "fdbclient/libb64/cdecode.h"
#include "libb64/cdecode.h"
int base64_decode_value(char value_in) {
static const char decoding[] = { 62, -1, -1, -1, 63, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, -1, -1, -1, -2, -1,

View File

@ -5,7 +5,7 @@ This is part of the libb64 project, and has been placed in the public domain.
For details, see http://sourceforge.net/projects/libb64
*/
#include "fdbclient/libb64/cencode.h"
#include "libb64/cencode.h"
const int CHARS_PER_LINE = 72;

View File

@ -0,0 +1,2 @@
add_library(md5 STATIC md5.c)
target_include_directories(md5 PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include")

View File

@ -1,8 +1,5 @@
fdb_find_sources(FDBCLIENT_SRCS)
list(APPEND FDBCLIENT_SRCS
sha1/SHA1.cpp
libb64/cdecode.c
libb64/cencode.c)
list(APPEND FDBCLIENT_SRCS sha1/SHA1.cpp)
message(STATUS "FDB version is ${FDB_VERSION}")
message(STATUS "FDB package name is ${FDB_PACKAGE_NAME}")

View File

@ -209,12 +209,7 @@ void ClientKnobs::initialize(Randomize randomize) {
init( IS_ACCEPTABLE_DELAY, 1.5 );
init( HTTP_READ_SIZE, 128*1024 );
init( HTTP_SEND_SIZE, 32*1024 );
init( HTTP_VERBOSE_LEVEL, 0 );
init( HTTP_REQUEST_ID_HEADER, "" );
init( HTTP_REQUEST_AWS_V4_HEADER, true );
init( HTTP_RESPONSE_SKIP_VERIFY_CHECKSUM_FOR_PARTIAL_CONTENT, false );
init( BLOBSTORE_ENCRYPTION_TYPE, "" );
init( BLOBSTORE_CONNECT_TRIES, 10 );
init( BLOBSTORE_CONNECT_TIMEOUT, 10 );

View File

@ -20,7 +20,7 @@
#include "fdbclient/RESTClient.h"
#include "fdbclient/HTTP.h"
#include "fdbrpc/HTTP.h"
#include "flow/IRateControl.h"
#include "fdbclient/RESTUtils.h"
#include "flow/Arena.h"

View File

@ -20,8 +20,8 @@
#include "fdbclient/S3BlobStore.h"
#include "fdbclient/md5/md5.h"
#include "fdbclient/libb64/encode.h"
#include "md5/md5.h"
#include "libb64/encode.h"
#include "fdbclient/sha1/SHA1.h"
#include <time.h>
#include <iomanip>

View File

@ -21,7 +21,7 @@
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/Tenant.h"
#include "fdbclient/libb64/encode.h"
#include "libb64/encode.h"
#include "flow/UnitTest.h"
Key TenantMapEntry::idToPrefix(int64_t id) {

View File

@ -36,8 +36,8 @@
#include "flow/Net2Packet.h"
#include "flow/IRateControl.h"
#include "fdbclient/S3BlobStore.h"
#include "fdbclient/md5/md5.h"
#include "fdbclient/libb64/encode.h"
#include "md5/md5.h"
#include "libb64/encode.h"
#include "flow/actorcompiler.h" // This must be the last #include.
ACTOR template <typename T>

View File

@ -220,12 +220,7 @@ public:
int64_t CSI_SIZE_LIMIT;
double CSI_STATUS_DELAY;
int HTTP_SEND_SIZE;
int HTTP_READ_SIZE;
int HTTP_VERBOSE_LEVEL;
std::string HTTP_REQUEST_ID_HEADER;
bool HTTP_REQUEST_AWS_V4_HEADER; // setting this knob to true will enable AWS V4 style header.
bool HTTP_RESPONSE_SKIP_VERIFY_CHECKSUM_FOR_PARTIAL_CONTENT; // skip verify md5 checksum for 206 response
std::string BLOBSTORE_ENCRYPTION_TYPE;
int BLOBSTORE_CONNECT_TRIES;
int BLOBSTORE_CONNECT_TIMEOUT;

View File

@ -25,7 +25,7 @@
#pragma once
#include "fdbclient/JSONDoc.h"
#include "fdbclient/HTTP.h"
#include "fdbrpc/HTTP.h"
#include "fdbclient/RESTUtils.h"
#include "flow/Arena.h"
#include "flow/FastRef.h"

View File

@ -26,7 +26,7 @@
#include "flow/Net2Packet.h"
#include "fdbclient/Knobs.h"
#include "flow/IRateControl.h"
#include "fdbclient/HTTP.h"
#include "fdbrpc/HTTP.h"
#include "fdbclient/JSONDoc.h"
// Representation of all the things you need to connect to a blob store instance with some credentials.

View File

@ -31,7 +31,7 @@
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/SpecialKeySpace.actor.h"
#include "fdbclient/TenantManagement.actor.h"
#include "fdbclient/libb64/encode.h"
#include "libb64/encode.h"
#include "flow/Arena.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // This must be the last #include.

View File

@ -61,10 +61,10 @@ if(${COROUTINE_IMPL} STREQUAL libcoro)
endif()
target_include_directories(fdbrpc PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include" PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/libeio)
target_link_libraries(fdbrpc PUBLIC flow PRIVATE rapidjson)
target_link_libraries(fdbrpc PUBLIC flow libb64 md5 PRIVATE rapidjson)
target_include_directories(fdbrpc_sampling PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include" PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/libeio)
target_link_libraries(fdbrpc_sampling PUBLIC flow_sampling PRIVATE rapidjson)
target_link_libraries(fdbrpc_sampling PUBLIC flow_sampling libb64 md5 PRIVATE rapidjson)
if(${COROUTINE_IMPL} STREQUAL libcoro)
target_link_libraries(fdbrpc PUBLIC coro)

View File

@ -18,12 +18,11 @@
* limitations under the License.
*/
#include "fdbclient/HTTP.h"
#include "fdbrpc/HTTP.h"
#include "fdbclient/md5/md5.h"
#include "fdbclient/ClientKnobs.h"
#include "fdbclient/libb64/encode.h"
#include "fdbclient/Knobs.h"
#include "md5/md5.h"
#include "libb64/encode.h"
#include "flow/Knobs.h"
#include <cctype>
#include "flow/actorcompiler.h" // has to be last include
@ -153,7 +152,7 @@ ACTOR Future<size_t> read_delimited_into_string(Reference<IConnection> conn,
// Next search will start at the current end of the buffer - delim size + 1
if (sPos >= lookBack)
sPos -= lookBack;
wait(success(read_into_string(conn, buf, CLIENT_KNOBS->HTTP_READ_SIZE)));
wait(success(read_into_string(conn, buf, FLOW_KNOBS->HTTP_READ_SIZE)));
}
}
@ -161,7 +160,7 @@ ACTOR Future<size_t> read_delimited_into_string(Reference<IConnection> conn,
ACTOR Future<Void> read_fixed_into_string(Reference<IConnection> conn, int len, std::string* buf, size_t pos) {
state int stop_size = pos + len;
while (buf->size() < stop_size)
wait(success(read_into_string(conn, buf, CLIENT_KNOBS->HTTP_READ_SIZE)));
wait(success(read_into_string(conn, buf, FLOW_KNOBS->HTTP_READ_SIZE)));
return Void();
}
@ -329,7 +328,7 @@ ACTOR Future<Void> read_http_response(Reference<HTTP::Response> r, Reference<ICo
// If there is actual response content, check the MD5 sum against the Content-MD5 response header
if (r->content.size() > 0) {
if (r->code == 206 && CLIENT_KNOBS->HTTP_RESPONSE_SKIP_VERIFY_CHECKSUM_FOR_PARTIAL_CONTENT) {
if (r->code == 206 && FLOW_KNOBS->HTTP_RESPONSE_SKIP_VERIFY_CHECKSUM_FOR_PARTIAL_CONTENT) {
return Void();
}
@ -368,7 +367,7 @@ ACTOR Future<Reference<HTTP::Response>> doRequest(Reference<IConnection> conn,
// There is no standard http request id header field, so either a global default can be set via a knob
// or it can be set per-request with the requestIDHeader argument (which overrides the default)
if (requestIDHeader.empty()) {
requestIDHeader = CLIENT_KNOBS->HTTP_REQUEST_ID_HEADER;
requestIDHeader = FLOW_KNOBS->HTTP_REQUEST_ID_HEADER;
}
state bool earlyResponse = false;
@ -400,13 +399,13 @@ ACTOR Future<Reference<HTTP::Response>> doRequest(Reference<IConnection> conn,
// Prepend headers to content packer buffer chain
pContent->prependWriteBuffer(pFirst, pLast);
if (CLIENT_KNOBS->HTTP_VERBOSE_LEVEL > 1)
if (FLOW_KNOBS->HTTP_VERBOSE_LEVEL > 1)
printf("[%s] HTTP starting %s %s ContentLen:%d\n",
conn->getDebugID().toString().c_str(),
verb.c_str(),
resource.c_str(),
contentLen);
if (CLIENT_KNOBS->HTTP_VERBOSE_LEVEL > 2) {
if (FLOW_KNOBS->HTTP_VERBOSE_LEVEL > 2) {
for (auto h : headers)
printf("Request Header: %s: %s\n", h.first.c_str(), h.second.c_str());
}
@ -427,7 +426,7 @@ ACTOR Future<Reference<HTTP::Response>> doRequest(Reference<IConnection> conn,
break;
}
state int trySend = CLIENT_KNOBS->HTTP_SEND_SIZE;
state int trySend = FLOW_KNOBS->HTTP_SEND_SIZE;
wait(sendRate->getAllowance(trySend));
int len = conn->write(pContent->getUnsent(), trySend);
if (pSent != nullptr)
@ -481,7 +480,7 @@ ACTOR Future<Reference<HTTP::Response>> doRequest(Reference<IConnection> conn,
}
}
if (CLIENT_KNOBS->HTTP_VERBOSE_LEVEL > 0) {
if (FLOW_KNOBS->HTTP_VERBOSE_LEVEL > 0) {
printf("[%s] HTTP %scode=%d early=%d, time=%fs %s %s contentLen=%d [%d out, response content len %d]\n",
conn->getDebugID().toString().c_str(),
(err.present() ? format("*ERROR*=%s ", err.get().name()).c_str() : ""),
@ -494,7 +493,7 @@ ACTOR Future<Reference<HTTP::Response>> doRequest(Reference<IConnection> conn,
total_sent,
(int)r->contentLen);
}
if (CLIENT_KNOBS->HTTP_VERBOSE_LEVEL > 2) {
if (FLOW_KNOBS->HTTP_VERBOSE_LEVEL > 2) {
printf("[%s] HTTP RESPONSE: %s %s\n%s\n",
conn->getDebugID().toString().c_str(),
verb.c_str(),
@ -510,7 +509,7 @@ ACTOR Future<Reference<HTTP::Response>> doRequest(Reference<IConnection> conn,
} catch (Error& e) {
double elapsed = timer() - send_start;
// A bad_request_id error would have already been logged in verbose mode before err is thrown above.
if (CLIENT_KNOBS->HTTP_VERBOSE_LEVEL > 0 && e.code() != error_code_http_bad_request_id) {
if (FLOW_KNOBS->HTTP_VERBOSE_LEVEL > 0 && e.code() != error_code_http_bad_request_id) {
printf("[%s] HTTP *ERROR*=%s early=%d, time=%fs %s %s contentLen=%d [%d out]\n",
conn->getDebugID().toString().c_str(),
e.name(),

View File

@ -48,6 +48,7 @@ public:
int write(SendBuffer const* buffer, int limit) override;
NetworkAddress getPeerAddress() const override;
UID getDebugID() const override;
boost::asio::ip::tcp::socket& getSocket() override { return socket; }
static Future<std::vector<NetworkAddress>> resolveTCPEndpoint(const std::string& host,
const std::string& service,
DNSCache* dnsCache);

View File

@ -324,6 +324,8 @@ struct Sim2Conn final : IConnection, ReferenceCounted<Sim2Conn> {
NetworkAddress getPeerAddress() const override { return peerEndpoint; }
UID getDebugID() const override { return dbgid; }
boost::asio::ip::tcp::socket& getSocket() override { throw operation_failed(); }
bool opened, closedByCaller, stableConnection;
private:
@ -948,8 +950,9 @@ public:
TaskPriority getCurrentTask() const override { return currentTaskID; }
void setCurrentTask(TaskPriority taskID) override { currentTaskID = taskID; }
// Sets the taskID/priority of the current task, without yielding
Future<Reference<IConnection>> connect(NetworkAddress toAddr, const std::string& host) override {
ASSERT(host.empty());
Future<Reference<IConnection>> connect(NetworkAddress toAddr,
boost::asio::ip::tcp::socket* existingSocket = nullptr) override {
ASSERT(existingSocket == nullptr);
if (!addressMap.count(toAddr)) {
return waitForProcessAndConnect(toAddr, this);
}
@ -975,7 +978,7 @@ public:
return onConnect(::delay(0.5 * deterministicRandom()->random01()), myc);
}
Future<Reference<IConnection>> connectExternal(NetworkAddress toAddr, const std::string& host) override {
Future<Reference<IConnection>> connectExternal(NetworkAddress toAddr) override {
return SimExternalConnection::connect(toAddr);
}

View File

@ -21,7 +21,7 @@
#include "fdbserver/RESTKmsConnector.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/HTTP.h"
#include "fdbrpc/HTTP.h"
#include "flow/IAsyncFile.h"
#include "fdbserver/KmsConnectorInterface.h"
#include "fdbserver/Knobs.h"

View File

@ -23,7 +23,7 @@
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/TenantManagement.actor.h"
#include "fdbclient/TenantSpecialKeys.actor.h"
#include "fdbclient/libb64/decode.h"
#include "libb64/decode.h"
#include "fdbrpc/simulator.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/Knobs.h"

View File

@ -168,6 +168,13 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
init( LOW_PRIORITY_DELAY_COUNT, 5 );
init( LOW_PRIORITY_MAX_DELAY, 5.0 );
// HTTP
init( HTTP_READ_SIZE, 128*1024 );
init( HTTP_SEND_SIZE, 32*1024 );
init( HTTP_VERBOSE_LEVEL, 0 );
init( HTTP_REQUEST_ID_HEADER, "" );
init( HTTP_RESPONSE_SKIP_VERIFY_CHECKSUM_FOR_PARTIAL_CONTENT, false );
//IAsyncFile
init( INCREMENTAL_DELETE_TRUNCATE_AMOUNT, 5e8 ); //500MB
init( INCREMENTAL_DELETE_INTERVAL, 1.0 ); //every 1 second

View File

@ -146,8 +146,8 @@ public:
void initMetrics() override;
// INetworkConnections interface
Future<Reference<IConnection>> connect(NetworkAddress toAddr, const std::string& host) override;
Future<Reference<IConnection>> connectExternal(NetworkAddress toAddr, const std::string& host) override;
Future<Reference<IConnection>> connect(NetworkAddress toAddr, tcp::socket* existingSocket = nullptr) override;
Future<Reference<IConnection>> connectExternal(NetworkAddress toAddr) override;
Future<Reference<IUDPSocket>> createUDPSocket(NetworkAddress toAddr) override;
Future<Reference<IUDPSocket>> createUDPSocket(bool isV6) override;
// The mock DNS methods should only be used in simulation.
@ -507,7 +507,7 @@ public:
UID getDebugID() const override { return id; }
tcp::socket& getSocket() { return socket; }
tcp::socket& getSocket() override { return socket; }
private:
UID id;
@ -839,10 +839,15 @@ public:
: id(nondeterministicRandom()->randomUniqueID()), socket(io_service), ssl_sock(socket, context->mutate()),
sslContext(context) {}
explicit SSLConnection(Reference<ReferencedObject<boost::asio::ssl::context>> context, tcp::socket* existingSocket)
: id(nondeterministicRandom()->randomUniqueID()), socket(std::move(*existingSocket)),
ssl_sock(socket, context->mutate()), sslContext(context) {}
// This is not part of the IConnection interface, because it is wrapped by INetwork::connect()
ACTOR static Future<Reference<IConnection>> connect(boost::asio::io_service* ios,
Reference<ReferencedObject<boost::asio::ssl::context>> context,
NetworkAddress addr) {
NetworkAddress addr,
tcp::socket* existingSocket = nullptr) {
std::pair<IPAddress, uint16_t> peerIP = std::make_pair(addr.ip, addr.port);
auto iter(g_network->networkInfo.serverTLSConnectionThrottler.find(peerIP));
if (iter != g_network->networkInfo.serverTLSConnectionThrottler.end()) {
@ -857,9 +862,15 @@ public:
}
}
if (existingSocket != nullptr) {
Reference<SSLConnection> self(new SSLConnection(context, existingSocket));
self->peer_address = addr;
self->init();
return self;
}
state Reference<SSLConnection> self(new SSLConnection(*ios, context));
self->peer_address = addr;
try {
auto to = tcpEndpoint(self->peer_address);
BindPromise p("N2_ConnectError", self->id);
@ -869,7 +880,7 @@ public:
wait(onConnected);
self->init();
return self;
} catch (Error& e) {
} catch (Error&) {
// Either the connection failed, or was cancelled by the caller
self->closeSocket();
throw;
@ -1097,7 +1108,7 @@ public:
UID getDebugID() const override { return id; }
tcp::socket& getSocket() { return socket; }
tcp::socket& getSocket() override { return socket; }
ssl_socket& getSSLSocket() { return ssl_sock; }
@ -1818,17 +1829,17 @@ THREAD_HANDLE Net2::startThread(THREAD_FUNC_RETURN (*func)(void*), void* arg, in
return ::startThread(func, arg, stackSize, name);
}
Future<Reference<IConnection>> Net2::connect(NetworkAddress toAddr, const std::string& host) {
Future<Reference<IConnection>> Net2::connect(NetworkAddress toAddr, tcp::socket* existingSocket) {
if (toAddr.isTLS()) {
initTLS(ETLSInitState::CONNECT);
return SSLConnection::connect(&this->reactor.ios, this->sslContextVar.get(), toAddr);
return SSLConnection::connect(&this->reactor.ios, this->sslContextVar.get(), toAddr, existingSocket);
}
return Connection::connect(&this->reactor.ios, toAddr);
}
Future<Reference<IConnection>> Net2::connectExternal(NetworkAddress toAddr, const std::string& host) {
return connect(toAddr, host);
Future<Reference<IConnection>> Net2::connectExternal(NetworkAddress toAddr) {
return connect(toAddr);
}
Future<Reference<IUDPSocket>> Net2::createUDPSocket(NetworkAddress toAddr) {

View File

@ -235,6 +235,13 @@ public:
int LOW_PRIORITY_DELAY_COUNT;
double LOW_PRIORITY_MAX_DELAY;
// HTTP
int HTTP_READ_SIZE;
int HTTP_SEND_SIZE;
int HTTP_VERBOSE_LEVEL;
std::string HTTP_REQUEST_ID_HEADER;
bool HTTP_RESPONSE_SKIP_VERIFY_CHECKSUM_FOR_PARTIAL_CONTENT; // skip verify md5 checksum for 206 response
// IAsyncFile
int64_t INCREMENTAL_DELETE_TRUNCATE_AMOUNT;
double INCREMENTAL_DELETE_INTERVAL;

View File

@ -472,6 +472,8 @@ public:
// At present, implemented by Sim2Conn where we want to disable bits flip for connections between parent process and
// child process, also reduce latency for this kind of connection
virtual bool isStableConnection() const { throw unsupported_operation(); }
virtual boost::asio::ip::tcp::socket& getSocket() = 0;
};
class IListener {
@ -688,9 +690,10 @@ public:
// Make an outgoing connection to the given address. May return an error or block indefinitely in case of
// connection problems!
virtual Future<Reference<IConnection>> connect(NetworkAddress toAddr, const std::string& host = "") = 0;
virtual Future<Reference<IConnection>> connect(NetworkAddress toAddr,
boost::asio::ip::tcp::socket* existingSocket = nullptr) = 0;
virtual Future<Reference<IConnection>> connectExternal(NetworkAddress toAddr, const std::string& host = "") = 0;
virtual Future<Reference<IConnection>> connectExternal(NetworkAddress toAddr) = 0;
// Make an outgoing udp connection and connect to the passed address.
virtual Future<Reference<IUDPSocket>> createUDPSocket(NetworkAddress toAddr) = 0;

View File

@ -293,7 +293,7 @@ Future<Reference<IConnection>> INetworkConnections::connect(const std::string& h
std::function<Future<Reference<IConnection>>(NetworkAddress const&)>,
Reference<IConnection>>(
pickEndpoint,
[=](NetworkAddress const& addr) -> Future<Reference<IConnection>> { return connectExternal(addr, host); });
[=](NetworkAddress const& addr) -> Future<Reference<IConnection>> { return connectExternal(addr); });
}
IUDPSocket::~IUDPSocket() {}