Merge pull request #4374 from sfc-gh-ngoyal/mymaster
Merge Release 6.3 into master
This commit is contained in:
commit
0565aa7fae
|
@ -125,6 +125,17 @@ Because it can have only a single network thread, a client process may become li
|
|||
|
||||
If you suspect that a client process's workload may be saturating the network thread, this can be confirmed by checking whether the network thread is running with high CPU usage. In the :ref:`client trace logs <client-trace-logging>`, the ``ProcessMetrics`` trace event has a field for ``MainThreadCPUSeconds`` that indicates the number of seconds out of ``Elapsed`` that the network thread was busy. You can also attempt to identify a busy thread from any tool that reports the CPU activity of threads in your process.
|
||||
|
||||
.. note:: FoundationDB 6.3 introduced :ref:`multi-threaded client <multi-threaded-client>`, and can alternatively be used to scale clients.
|
||||
|
||||
.. _multi-threaded-client:
|
||||
|
||||
Multi-threaded Client
|
||||
=====================
|
||||
|
||||
FoundationDB client library can start multiple worker threads for each version of client that is loaded. Every single cluster will be serviced by a one client thread. If the client is connected to only one cluster, exactly one thread would be active and the rest will remain idle. Hence, using this feature is useful when the client is actively using more than one cluster.
|
||||
|
||||
Clients can be configured to use worker-threads by setting the ``FDBNetworkOptions::CLIENT_THREADS_PER_VERSION`` option.
|
||||
|
||||
.. _client-trace-logging:
|
||||
|
||||
Client trace logging
|
||||
|
|
|
@ -88,6 +88,7 @@ Status
|
|||
* Removed fields ``worst_version_lag_storage_server`` and ``limiting_version_lag_storage_server`` from the ``cluster.qos`` section. The ``worst_data_lag_storage_server`` and ``limiting_data_lag_storage_server`` objects can be used instead. `(PR #3196) <https://github.com/apple/foundationdb/pull/3196>`_
|
||||
* If a process is unable to flush trace logs to disk, the problem will now be reported via the output of ``status`` command inside ``fdbcli``. `(PR #2605) <https://github.com/apple/foundationdb/pull/2605>`_ `(PR #2820) <https://github.com/apple/foundationdb/pull/2820>`_
|
||||
* When a configuration key is changed, it will always be included in ``status json`` output, even the value is reverted back to the default value. [6.3.5] `(PR #3610) <https://github.com/apple/foundationdb/pull/3610>`_
|
||||
* Added transactions.rejected_for_queued_too_long for bookkeeping the number of transactions rejected by commit proxy because its queuing time exceeds MVCC window. `(PR #4353) <https://github.com/apple/foundationdb/pull/4353>`_
|
||||
|
||||
Bindings
|
||||
--------
|
||||
|
|
|
@ -330,7 +330,8 @@ void loadClientFunction(T *fp, void *lib, std::string libPath, const char *funct
|
|||
}
|
||||
}
|
||||
|
||||
DLApi::DLApi(std::string fdbCPath) : api(new FdbCApi()), fdbCPath(fdbCPath), networkSetup(false) {}
|
||||
DLApi::DLApi(std::string fdbCPath, bool unlinkOnLoad)
|
||||
: api(new FdbCApi()), fdbCPath(fdbCPath), unlinkOnLoad(unlinkOnLoad), networkSetup(false) {}
|
||||
|
||||
void DLApi::init() {
|
||||
if(isLibraryLoaded(fdbCPath.c_str())) {
|
||||
|
@ -342,6 +343,15 @@ void DLApi::init() {
|
|||
TraceEvent(SevError, "ErrorLoadingExternalClientLibrary").detail("LibraryPath", fdbCPath);
|
||||
throw platform_error();
|
||||
}
|
||||
if (unlinkOnLoad) {
|
||||
int err = unlink(fdbCPath.c_str());
|
||||
if (err) {
|
||||
TraceEvent(SevError, "ErrorUnlinkingTempClientLibraryFile")
|
||||
.detail("errno", errno)
|
||||
.detail("LibraryPath", fdbCPath);
|
||||
throw platform_error();
|
||||
}
|
||||
}
|
||||
|
||||
loadClientFunction(&api->selectApiVersion, lib, fdbCPath, "fdb_select_api_version_impl");
|
||||
loadClientFunction(&api->getClientVersion, lib, fdbCPath, "fdb_get_client_version", headerVersion >= 410);
|
||||
|
@ -761,7 +771,9 @@ void MultiVersionTransaction::reset() {
|
|||
}
|
||||
|
||||
// MultiVersionDatabase
|
||||
MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi *api, std::string clusterFilePath, Reference<IDatabase> db, bool openConnectors) : dbState(new DatabaseState()) {
|
||||
MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi* api, int threadIdx, std::string clusterFilePath,
|
||||
Reference<IDatabase> db, bool openConnectors)
|
||||
: dbState(new DatabaseState()), threadIdx(threadIdx) {
|
||||
dbState->db = db;
|
||||
dbState->dbVar->set(db);
|
||||
|
||||
|
@ -777,7 +789,7 @@ MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi *api, std::string clu
|
|||
dbState->currentClientIndex = -1;
|
||||
}
|
||||
|
||||
api->runOnExternalClients([this, clusterFilePath](Reference<ClientInfo> client) {
|
||||
api->runOnExternalClients(threadIdx, [this, clusterFilePath](Reference<ClientInfo> client) {
|
||||
dbState->addConnection(client, clusterFilePath);
|
||||
});
|
||||
|
||||
|
@ -790,7 +802,8 @@ MultiVersionDatabase::~MultiVersionDatabase() {
|
|||
}
|
||||
|
||||
Reference<IDatabase> MultiVersionDatabase::debugCreateFromExistingDatabase(Reference<IDatabase> db) {
|
||||
return Reference<IDatabase>(new MultiVersionDatabase(MultiVersionApi::api, "", db, false));
|
||||
return Reference<IDatabase>(new MultiVersionDatabase(
|
||||
MultiVersionApi::api, 0, "", db, false));
|
||||
}
|
||||
|
||||
Reference<ITransaction> MultiVersionDatabase::createTransaction() {
|
||||
|
@ -934,7 +947,7 @@ void MultiVersionDatabase::DatabaseState::stateChanged() {
|
|||
}
|
||||
|
||||
if(newIndex == -1) {
|
||||
ASSERT(currentClientIndex == 0); // This can only happen for the local client, which we set as the current connection before we know it's connected
|
||||
ASSERT_EQ(currentClientIndex, 0); // This can only happen for the local client, which we set as the current connection before we know it's connected
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -998,30 +1011,39 @@ void MultiVersionDatabase::DatabaseState::cancelConnections() {
|
|||
// MultiVersionApi
|
||||
|
||||
bool MultiVersionApi::apiVersionAtLeast(int minVersion) {
|
||||
ASSERT(MultiVersionApi::api->apiVersion != 0);
|
||||
ASSERT_NE(MultiVersionApi::api->apiVersion, 0);
|
||||
return MultiVersionApi::api->apiVersion >= minVersion || MultiVersionApi::api->apiVersion < 0;
|
||||
}
|
||||
|
||||
void MultiVersionApi::runOnExternalClientsAllThreads(std::function<void(Reference<ClientInfo>)> func,
|
||||
bool runOnFailedClients) {
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
runOnExternalClients(i, func, runOnFailedClients);
|
||||
}
|
||||
}
|
||||
|
||||
// runOnFailedClients should be used cautiously. Some failed clients may not have successfully loaded all symbols.
|
||||
void MultiVersionApi::runOnExternalClients(std::function<void(Reference<ClientInfo>)> func, bool runOnFailedClients) {
|
||||
void MultiVersionApi::runOnExternalClients(int threadIdx, std::function<void(Reference<ClientInfo>)> func,
|
||||
bool runOnFailedClients) {
|
||||
bool newFailure = false;
|
||||
|
||||
auto c = externalClients.begin();
|
||||
while(c != externalClients.end()) {
|
||||
auto client = c->second[threadIdx];
|
||||
try {
|
||||
if(!c->second->failed || runOnFailedClients) { // TODO: Should we ignore some failures?
|
||||
func(c->second);
|
||||
if (!client->failed || runOnFailedClients) { // TODO: Should we ignore some failures?
|
||||
func(client);
|
||||
}
|
||||
}
|
||||
catch(Error &e) {
|
||||
if(e.code() == error_code_external_client_already_loaded) {
|
||||
TraceEvent(SevInfo, "ExternalClientAlreadyLoaded").error(e).detail("LibPath", c->second->libPath);
|
||||
TraceEvent(SevInfo, "ExternalClientAlreadyLoaded").error(e).detail("LibPath", c->first);
|
||||
c = externalClients.erase(c);
|
||||
continue;
|
||||
}
|
||||
else {
|
||||
TraceEvent(SevWarnAlways, "ExternalClientFailure").error(e).detail("LibPath", c->second->libPath);
|
||||
c->second->failed = true;
|
||||
TraceEvent(SevWarnAlways, "ExternalClientFailure").error(e).detail("LibPath", c->first);
|
||||
client->failed = true;
|
||||
newFailure = true;
|
||||
}
|
||||
}
|
||||
|
@ -1088,50 +1110,104 @@ void MultiVersionApi::setCallbacksOnExternalThreads() {
|
|||
|
||||
callbackOnMainThread = false;
|
||||
}
|
||||
|
||||
void MultiVersionApi::addExternalLibrary(std::string path) {
|
||||
std::string filename = basename(path);
|
||||
// we need at least one external library thread to run this library.
|
||||
threadCount = std::max(threadCount, 1);
|
||||
|
||||
if (filename.empty() || !fileExists(path)) {
|
||||
TraceEvent("ExternalClientNotFound").detail("LibraryPath", filename);
|
||||
throw file_not_found();
|
||||
}
|
||||
|
||||
MutexHolder holder(lock);
|
||||
if(networkStartSetup) {
|
||||
throw invalid_option(); // SOMEDAY: it might be good to allow clients to be added after the network is setup
|
||||
}
|
||||
|
||||
if(externalClients.count(filename) == 0) {
|
||||
if (externalClientDescriptions.count(filename) == 0) {
|
||||
TraceEvent("AddingExternalClient").detail("LibraryPath", filename);
|
||||
externalClients[filename] = makeReference<ClientInfo>(new DLApi(path), path);
|
||||
externalClientDescriptions.emplace(std::make_pair(filename, ClientDesc(path, true)));
|
||||
}
|
||||
}
|
||||
|
||||
void MultiVersionApi::addExternalLibraryDirectory(std::string path) {
|
||||
TraceEvent("AddingExternalClientDirectory").detail("Directory", path);
|
||||
std::vector<std::string> files = platform::listFiles(path, DYNAMIC_LIB_EXT);
|
||||
|
||||
MutexHolder holder(lock);
|
||||
if(networkStartSetup) {
|
||||
throw invalid_option(); // SOMEDAY: it might be good to allow clients to be added after the network is setup. For directories, we can monitor them for the addition of new files.
|
||||
}
|
||||
// we need at least one external library thread to run these libraries.
|
||||
threadCount = std::max(threadCount, 1);
|
||||
|
||||
for(auto filename : files) {
|
||||
std::string lib = abspath(joinPath(path, filename));
|
||||
if(externalClients.count(filename) == 0) {
|
||||
if (externalClientDescriptions.count(filename) == 0) {
|
||||
TraceEvent("AddingExternalClient").detail("LibraryPath", filename);
|
||||
externalClients[filename] = makeReference<ClientInfo>(new DLApi(lib), lib);
|
||||
externalClientDescriptions.emplace(std::make_pair(filename, ClientDesc(lib, true)));
|
||||
}
|
||||
}
|
||||
}
|
||||
#if defined(__unixish__)
|
||||
std::vector<std::pair<std::string, bool>> MultiVersionApi::copyExternalLibraryPerThread(std::string path) {
|
||||
ASSERT_GE(threadCount, 1);
|
||||
// Copy library for each thread configured per version
|
||||
std::vector<std::pair<std::string, bool>> paths;
|
||||
// It's tempting to use the so once without copying. However, we don't know
|
||||
// if the thing we're about to copy is the shared object executing this code
|
||||
// or not, so this optimization is unsafe.
|
||||
// paths.push_back({path, false});
|
||||
for (int ii = 0; ii < threadCount; ++ii) {
|
||||
std::string filename = basename(path);
|
||||
|
||||
char tempName[PATH_MAX + 12];
|
||||
sprintf(tempName, "/tmp/%s-XXXXXX", filename.c_str());
|
||||
int tempFd = mkstemp(tempName);
|
||||
int fd;
|
||||
|
||||
if ((fd = open(path.c_str(), O_RDONLY)) == -1) {
|
||||
TraceEvent("ExternalClientNotFound").detail("LibraryPath", path);
|
||||
throw file_not_found();
|
||||
}
|
||||
|
||||
constexpr size_t buf_sz = 4096;
|
||||
char buf[buf_sz];
|
||||
while (1) {
|
||||
ssize_t readCount = read(fd, buf, buf_sz);
|
||||
if (readCount == 0) {
|
||||
// eof
|
||||
break;
|
||||
}
|
||||
if (readCount == -1) {
|
||||
throw platform_error;
|
||||
}
|
||||
ssize_t written = 0;
|
||||
while (written != readCount) {
|
||||
ssize_t writeCount = write(tempFd, buf + written, readCount - written);
|
||||
if (writeCount == -1) {
|
||||
throw platform_error;
|
||||
}
|
||||
written += writeCount;
|
||||
}
|
||||
}
|
||||
|
||||
close(fd);
|
||||
close(tempFd);
|
||||
|
||||
paths.push_back({tempName, true}); // use + delete temporary copies of the library.
|
||||
}
|
||||
|
||||
return paths;
|
||||
}
|
||||
#else
|
||||
std::vector<std::pair< std::string, bool> > MultiVersionApi::copyExternalLibraryPerThread(std::string path) {
|
||||
if (threadCount > 1) {
|
||||
throw platform_error(); // not supported
|
||||
}
|
||||
std::vector<std::pair<std::string, bool>> paths;
|
||||
paths.push_back({ path , false });
|
||||
return paths;
|
||||
}
|
||||
#endif
|
||||
|
||||
void MultiVersionApi::disableLocalClient() {
|
||||
MutexHolder holder(lock);
|
||||
if(networkStartSetup || bypassMultiClientApi) {
|
||||
throw invalid_option();
|
||||
}
|
||||
|
||||
threadCount = std::max(threadCount, 1);
|
||||
localClientDisabled = true;
|
||||
}
|
||||
|
||||
|
@ -1139,13 +1215,14 @@ void MultiVersionApi::setSupportedClientVersions(Standalone<StringRef> versions)
|
|||
MutexHolder holder(lock);
|
||||
ASSERT(networkSetup);
|
||||
|
||||
// This option must be set on the main thread because it modifes structures that can be used concurrently by the main thread
|
||||
// This option must be set on the main thread because it modifies structures that can be used concurrently by the
|
||||
// main thread
|
||||
onMainThreadVoid([this, versions](){
|
||||
localClient->api->setNetworkOption(FDBNetworkOptions::SUPPORTED_CLIENT_VERSIONS, versions);
|
||||
}, nullptr);
|
||||
|
||||
if(!bypassMultiClientApi) {
|
||||
runOnExternalClients([versions](Reference<ClientInfo> client) {
|
||||
runOnExternalClientsAllThreads([versions](Reference<ClientInfo> client) {
|
||||
client->api->setNetworkOption(FDBNetworkOptions::SUPPORTED_CLIENT_VERSIONS, versions);
|
||||
});
|
||||
}
|
||||
|
@ -1198,14 +1275,26 @@ void MultiVersionApi::setNetworkOptionInternal(FDBNetworkOptions::Option option,
|
|||
ASSERT(!value.present() && !networkStartSetup);
|
||||
externalClient = true;
|
||||
bypassMultiClientApi = true;
|
||||
} else if (option == FDBNetworkOptions::CLIENT_THREADS_PER_VERSION) {
|
||||
MutexHolder holder(lock);
|
||||
validateOption(value, true, false, false);
|
||||
ASSERT(!networkStartSetup);
|
||||
#if defined(__unixish__)
|
||||
threadCount = extractIntOption(value, 1, 1024);
|
||||
#else
|
||||
// multiple client threads are not supported on windows.
|
||||
threadCount = extractIntOption(value, 1, 1);
|
||||
#endif
|
||||
if (threadCount > 1) {
|
||||
disableLocalClient();
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
MutexHolder holder(lock);
|
||||
localClient->api->setNetworkOption(option, value);
|
||||
|
||||
if(!bypassMultiClientApi) {
|
||||
if(networkSetup) {
|
||||
runOnExternalClients(
|
||||
runOnExternalClientsAllThreads(
|
||||
[option, value](Reference<ClientInfo> client) { client->api->setNetworkOption(option, value); });
|
||||
}
|
||||
else {
|
||||
|
@ -1227,6 +1316,24 @@ void MultiVersionApi::setupNetwork() {
|
|||
throw network_already_setup();
|
||||
}
|
||||
|
||||
for (auto i : externalClientDescriptions) {
|
||||
std::string path = i.second.libPath;
|
||||
std::string filename = basename(path);
|
||||
|
||||
// Copy external lib for each thread
|
||||
if (externalClients.count(filename) == 0) {
|
||||
externalClients[filename] = {};
|
||||
for (const auto& tmp : copyExternalLibraryPerThread(path)) {
|
||||
TraceEvent("AddingExternalClient")
|
||||
.detail("FileName", filename)
|
||||
.detail("LibraryPath", path)
|
||||
.detail("TempPath", tmp.first);
|
||||
externalClients[filename].push_back(
|
||||
Reference<ClientInfo>(new ClientInfo(new DLApi(tmp.first, tmp.second /*unlink on load*/), path)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
networkStartSetup = true;
|
||||
|
||||
if(externalClients.empty()) {
|
||||
|
@ -1244,14 +1351,14 @@ void MultiVersionApi::setupNetwork() {
|
|||
localClient->loadProtocolVersion();
|
||||
|
||||
if(!bypassMultiClientApi) {
|
||||
runOnExternalClients([this](Reference<ClientInfo> client) {
|
||||
runOnExternalClientsAllThreads([this](Reference<ClientInfo> client) {
|
||||
TraceEvent("InitializingExternalClient").detail("LibraryPath", client->libPath);
|
||||
client->api->selectApiVersion(apiVersion);
|
||||
client->loadProtocolVersion();
|
||||
});
|
||||
|
||||
MutexHolder holder(lock);
|
||||
runOnExternalClients([this, transportId](Reference<ClientInfo> client) {
|
||||
runOnExternalClientsAllThreads([this, transportId](Reference<ClientInfo> client) {
|
||||
for(auto option : options) {
|
||||
client->api->setNetworkOption(option.first, option.second.castTo<StringRef>());
|
||||
}
|
||||
|
@ -1292,7 +1399,7 @@ void MultiVersionApi::runNetwork() {
|
|||
|
||||
std::vector<THREAD_HANDLE> handles;
|
||||
if(!bypassMultiClientApi) {
|
||||
runOnExternalClients([&handles](Reference<ClientInfo> client) {
|
||||
runOnExternalClientsAllThreads([&handles](Reference<ClientInfo> client) {
|
||||
if(client->external) {
|
||||
handles.push_back(g_network->startThread(&runNetworkThread, client.getPtr()));
|
||||
}
|
||||
|
@ -1317,9 +1424,7 @@ void MultiVersionApi::stopNetwork() {
|
|||
localClient->api->stopNetwork();
|
||||
|
||||
if(!bypassMultiClientApi) {
|
||||
runOnExternalClients([](Reference<ClientInfo> client) {
|
||||
client->api->stopNetwork();
|
||||
}, true);
|
||||
runOnExternalClientsAllThreads([](Reference<ClientInfo> client) { client->api->stopNetwork(); }, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1334,7 +1439,7 @@ void MultiVersionApi::addNetworkThreadCompletionHook(void (*hook)(void*), void *
|
|||
localClient->api->addNetworkThreadCompletionHook(hook, hookParameter);
|
||||
|
||||
if(!bypassMultiClientApi) {
|
||||
runOnExternalClients([hook, hookParameter](Reference<ClientInfo> client) {
|
||||
runOnExternalClientsAllThreads([hook, hookParameter](Reference<ClientInfo> client) {
|
||||
client->api->addNetworkThreadCompletionHook(hook, hookParameter);
|
||||
});
|
||||
}
|
||||
|
@ -1346,12 +1451,24 @@ Reference<IDatabase> MultiVersionApi::createDatabase(const char *clusterFilePath
|
|||
lock.leave();
|
||||
throw network_not_setup();
|
||||
}
|
||||
lock.leave();
|
||||
|
||||
std::string clusterFile(clusterFilePath);
|
||||
if(localClientDisabled) {
|
||||
return Reference<IDatabase>(new MultiVersionDatabase(this, clusterFile, Reference<IDatabase>()));
|
||||
|
||||
if (threadCount > 1) {
|
||||
ASSERT(localClientDisabled);
|
||||
ASSERT(!bypassMultiClientApi);
|
||||
|
||||
int threadIdx = nextThread;
|
||||
nextThread = (nextThread + 1) % threadCount;
|
||||
lock.leave();
|
||||
for (auto it : externalClients) {
|
||||
TraceEvent("CreatingDatabaseOnExternalClient")
|
||||
.detail("LibraryPath", it.first)
|
||||
.detail("Failed", it.second[threadIdx]->failed);
|
||||
}
|
||||
return Reference<IDatabase>(new MultiVersionDatabase(this, threadIdx, clusterFile, Reference<IDatabase>()));
|
||||
}
|
||||
|
||||
lock.leave();
|
||||
|
||||
auto db = localClient->api->createDatabase(clusterFilePath);
|
||||
if(bypassMultiClientApi) {
|
||||
|
@ -1359,9 +1476,11 @@ Reference<IDatabase> MultiVersionApi::createDatabase(const char *clusterFilePath
|
|||
}
|
||||
else {
|
||||
for(auto it : externalClients) {
|
||||
TraceEvent("CreatingDatabaseOnExternalClient").detail("LibraryPath", it.second->libPath).detail("Failed", it.second->failed);
|
||||
TraceEvent("CreatingDatabaseOnExternalClient")
|
||||
.detail("LibraryPath", it.first)
|
||||
.detail("Failed", it.second[0]->failed);
|
||||
}
|
||||
return Reference<IDatabase>(new MultiVersionDatabase(this, clusterFile, db));
|
||||
return Reference<IDatabase>(new MultiVersionDatabase(this, 0, clusterFile, db));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1369,7 +1488,9 @@ void MultiVersionApi::updateSupportedVersions() {
|
|||
if(networkSetup) {
|
||||
Standalone<VectorRef<uint8_t>> versionStr;
|
||||
|
||||
runOnExternalClients([&versionStr](Reference<ClientInfo> client){
|
||||
// not mutating the client, so just call on one instance of each client version.
|
||||
// thread 0 always exists.
|
||||
runOnExternalClients(0, [&versionStr](Reference<ClientInfo> client) {
|
||||
const char *ver = client->api->getClientVersion();
|
||||
versionStr.append(versionStr.arena(), (uint8_t*)ver, (int)strlen(ver));
|
||||
versionStr.append(versionStr.arena(), (uint8_t*)";", 1);
|
||||
|
@ -1464,7 +1585,9 @@ void MultiVersionApi::loadEnvironmentVariableNetworkOptions() {
|
|||
envOptionsLoaded = true;
|
||||
}
|
||||
|
||||
MultiVersionApi::MultiVersionApi() : bypassMultiClientApi(false), networkStartSetup(false), networkSetup(false), callbackOnMainThread(true), externalClient(false), localClientDisabled(false), apiVersion(0), envOptionsLoaded(false) {}
|
||||
MultiVersionApi::MultiVersionApi()
|
||||
: bypassMultiClientApi(false), networkStartSetup(false), networkSetup(false), callbackOnMainThread(true),
|
||||
externalClient(false), localClientDisabled(false), apiVersion(0), envOptionsLoaded(false), threadCount(0) {}
|
||||
|
||||
MultiVersionApi* MultiVersionApi::api = new MultiVersionApi();
|
||||
|
||||
|
@ -1481,7 +1604,7 @@ void ClientInfo::loadProtocolVersion() {
|
|||
protocolVersion = ProtocolVersion(strtoull(protocolVersionStr.c_str(), &next, 16));
|
||||
|
||||
ASSERT(protocolVersion.version() != 0 && protocolVersion.version() != ULLONG_MAX);
|
||||
ASSERT(next == &protocolVersionStr[protocolVersionStr.length()]);
|
||||
ASSERT_EQ(next, &protocolVersionStr[protocolVersionStr.length()]);
|
||||
}
|
||||
|
||||
bool ClientInfo::canReplace(Reference<ClientInfo> other) const {
|
||||
|
@ -1529,7 +1652,7 @@ TEST_CASE("/fdbclient/multiversionclient/EnvironmentVariableParsing" ) {
|
|||
ASSERT(false);
|
||||
}
|
||||
catch(Error &e) {
|
||||
ASSERT(e.code() == error_code_invalid_option_value);
|
||||
ASSERT_EQ(e.code(), error_code_invalid_option_value);
|
||||
}
|
||||
|
||||
return Void();
|
||||
|
@ -1688,7 +1811,7 @@ ACTOR Future<Void> checkUndestroyedFutures(std::vector<ThreadSingleAssignmentVar
|
|||
for(fNum = 0; fNum < undestroyed.size(); ++fNum) {
|
||||
f = undestroyed[fNum];
|
||||
|
||||
ASSERT(f->debugGetReferenceCount() == 1);
|
||||
ASSERT_EQ(f->debugGetReferenceCount(), 1);
|
||||
ASSERT(f->isReady());
|
||||
|
||||
f->cancel();
|
||||
|
@ -1772,7 +1895,7 @@ struct AbortableTest {
|
|||
auto newFuture = FutureInfo(abortableFuture(f.future, ThreadFuture<Void>(abort)), f.expectedValue, f.legalErrors);
|
||||
|
||||
if(!abort->isReady() && deterministicRandom()->coinflip()) {
|
||||
ASSERT(abort->status == ThreadSingleAssignmentVarBase::Unset);
|
||||
ASSERT_EQ(abort->status, ThreadSingleAssignmentVarBase::Unset);
|
||||
newFuture.threads.push_back(g_network->startThread(setAbort, abort));
|
||||
}
|
||||
|
||||
|
@ -1816,7 +1939,7 @@ private:
|
|||
struct DLTest {
|
||||
static FutureInfo createThreadFuture(FutureInfo f) {
|
||||
return FutureInfo(toThreadFuture<int>(getApi(), (FdbCApi::FDBFuture*)f.future.extractPtr(), [](FdbCApi::FDBFuture *f, FdbCApi *api) {
|
||||
ASSERT(((ThreadSingleAssignmentVar<int>*)f)->debugGetReferenceCount() >= 1);
|
||||
ASSERT_GE(((ThreadSingleAssignmentVar<int>*)f)->debugGetReferenceCount(), 1);
|
||||
return ((ThreadSingleAssignmentVar<int>*)f)->get();
|
||||
}), f.expectedValue, f.legalErrors);
|
||||
}
|
||||
|
|
|
@ -210,7 +210,7 @@ private:
|
|||
|
||||
class DLApi : public IClientApi {
|
||||
public:
|
||||
DLApi(std::string fdbCPath);
|
||||
DLApi(std::string fdbCPath, bool unlinkOnLoad = false);
|
||||
|
||||
void selectApiVersion(int apiVersion) override;
|
||||
const char* getClientVersion() override;
|
||||
|
@ -229,6 +229,7 @@ public:
|
|||
private:
|
||||
const std::string fdbCPath;
|
||||
const Reference<FdbCApi> api;
|
||||
const bool unlinkOnLoad;
|
||||
int headerVersion;
|
||||
bool networkSetup;
|
||||
|
||||
|
@ -302,17 +303,23 @@ private:
|
|||
std::vector<std::pair<FDBTransactionOptions::Option, Optional<Standalone<StringRef>>>> persistentOptions;
|
||||
};
|
||||
|
||||
struct ClientInfo : ThreadSafeReferenceCounted<ClientInfo> {
|
||||
struct ClientDesc {
|
||||
std::string const libPath;
|
||||
bool const external;
|
||||
|
||||
ClientDesc(std::string libPath, bool external) : libPath(libPath), external(external) {}
|
||||
};
|
||||
|
||||
struct ClientInfo : ClientDesc, ThreadSafeReferenceCounted<ClientInfo> {
|
||||
ProtocolVersion protocolVersion;
|
||||
IClientApi *api;
|
||||
std::string libPath;
|
||||
bool external;
|
||||
bool failed;
|
||||
std::vector<std::pair<void (*)(void*), void*>> threadCompletionHooks;
|
||||
|
||||
ClientInfo() : protocolVersion(0), api(nullptr), external(false), failed(true) {}
|
||||
ClientInfo(IClientApi *api) : protocolVersion(0), api(api), libPath("internal"), external(false), failed(false) {}
|
||||
ClientInfo(IClientApi *api, std::string libPath) : protocolVersion(0), api(api), libPath(libPath), external(true), failed(false) {}
|
||||
ClientInfo() : ClientDesc(std::string(), false), protocolVersion(0), api(nullptr), failed(true) {}
|
||||
ClientInfo(IClientApi* api) : ClientDesc("internal", false), protocolVersion(0), api(api), failed(false) {}
|
||||
ClientInfo(IClientApi* api, std::string libPath)
|
||||
: ClientDesc(libPath, true), protocolVersion(0), api(api), failed(false) {}
|
||||
|
||||
void loadProtocolVersion();
|
||||
bool canReplace(Reference<ClientInfo> other) const;
|
||||
|
@ -322,7 +329,8 @@ class MultiVersionApi;
|
|||
|
||||
class MultiVersionDatabase final : public IDatabase, ThreadSafeReferenceCounted<MultiVersionDatabase> {
|
||||
public:
|
||||
MultiVersionDatabase(MultiVersionApi *api, std::string clusterFilePath, Reference<IDatabase> db, bool openConnectors=true);
|
||||
MultiVersionDatabase(MultiVersionApi* api, int threadIdx, std::string clusterFilePath, Reference<IDatabase> db,
|
||||
bool openConnectors = true);
|
||||
~MultiVersionDatabase() override;
|
||||
|
||||
Reference<ITransaction> createTransaction() override;
|
||||
|
@ -389,6 +397,7 @@ private:
|
|||
};
|
||||
|
||||
const Reference<DatabaseState> dbState;
|
||||
const int threadIdx;
|
||||
friend class MultiVersionTransaction;
|
||||
};
|
||||
|
||||
|
@ -408,7 +417,9 @@ public:
|
|||
static MultiVersionApi* api;
|
||||
|
||||
Reference<ClientInfo> getLocalClient();
|
||||
void runOnExternalClients(std::function<void(Reference<ClientInfo>)>, bool runOnFailedClients=false);
|
||||
void runOnExternalClients(int threadId, std::function<void(Reference<ClientInfo>)>,
|
||||
bool runOnFailedClients = false);
|
||||
void runOnExternalClientsAllThreads(std::function<void(Reference<ClientInfo>)>, bool runOnFailedClients = false);
|
||||
|
||||
void updateSupportedVersions();
|
||||
|
||||
|
@ -426,13 +437,17 @@ private:
|
|||
void setCallbacksOnExternalThreads();
|
||||
void addExternalLibrary(std::string path);
|
||||
void addExternalLibraryDirectory(std::string path);
|
||||
// Return a vector of (pathname, unlink_on_close) pairs. Makes threadCount - 1 copies of the library stored in path,
|
||||
// and returns a vector of length threadCount.
|
||||
std::vector<std::pair<std::string, bool>> copyExternalLibraryPerThread(std::string path);
|
||||
void disableLocalClient();
|
||||
void setSupportedClientVersions(Standalone<StringRef> versions);
|
||||
|
||||
void setNetworkOptionInternal(FDBNetworkOptions::Option option, Optional<StringRef> value);
|
||||
|
||||
Reference<ClientInfo> localClient;
|
||||
std::map<std::string, Reference<ClientInfo>> externalClients;
|
||||
std::map<std::string, ClientDesc> externalClientDescriptions;
|
||||
std::map<std::string, std::vector<Reference<ClientInfo>>> externalClients;
|
||||
|
||||
bool networkStartSetup;
|
||||
volatile bool networkSetup;
|
||||
|
@ -440,6 +455,9 @@ private:
|
|||
volatile bool externalClient;
|
||||
int apiVersion;
|
||||
|
||||
int nextThread = 0;
|
||||
int threadCount;
|
||||
|
||||
Mutex lock;
|
||||
std::vector<std::pair<FDBNetworkOptions::Option, Optional<Standalone<StringRef>>>> options;
|
||||
std::map<FDBNetworkOptions::Option, std::set<Standalone<StringRef>>> setEnvOptions;
|
||||
|
|
|
@ -108,7 +108,10 @@ description is not currently required but encouraged.
|
|||
paramType="String" paramDescription="path to directory containing client libraries"
|
||||
description="Searches the specified path for dynamic libraries and adds them to the list of client libraries for use by the multi-version client API. Must be set before setting up the network." />
|
||||
<Option name="disable_local_client" code="64"
|
||||
description="Prevents connections through the local client, allowing only connections through externally loaded client libraries. Intended primarily for testing." />
|
||||
description="Prevents connections through the local client, allowing only connections through externally loaded client libraries." />
|
||||
<Option name="client_threads_per_version" code="65"
|
||||
paramType="Int" paramDescription="Number of client threads to be spawned. Each server will be serviced by a single client thread."
|
||||
description="Spawns multiple worker threads for each version of the client that is loaded. Setting this to a number greater than one implies disable_local_client." />
|
||||
<Option name="disable_client_statistics_logging" code="70"
|
||||
description="Disables logging of client statistics, such as sampled transaction activity." />
|
||||
<Option name="enable_slow_task_profiling" code="71"
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
#!/bin/bash
|
||||
set -euo pipefail
|
||||
trap "kill 0" EXIT
|
||||
ROOT=`pwd`
|
||||
|
||||
function usage {
|
||||
echo "Usage 'cd working-directory; ${0} path-to-build-root number-of-clusters-to-start' "
|
||||
exit 1
|
||||
}
|
||||
|
||||
if (( $# != 3 )) ; then
|
||||
echo Wrong number of arguments
|
||||
usage
|
||||
fi
|
||||
|
||||
BUILD=$1
|
||||
|
||||
FDB=${BUILD}/bin/fdbserver
|
||||
if [ ! -f ${FDB} ]; then
|
||||
echo "Error: ${FDB} not found!"
|
||||
usage
|
||||
fi
|
||||
|
||||
rm -rf ./loopback-cluster-*
|
||||
|
||||
for i in `seq 1 $2` ; do
|
||||
DIR=./loopback-cluster-$i
|
||||
mkdir -p ${DIR}
|
||||
|
||||
PORT_PREFIX=${i}50
|
||||
CLUSTER_FILE="test$i:testdb$i@127.0.0.1:${PORT_PREFIX}1"
|
||||
CLUSTER=${DIR}/fdb.cluster
|
||||
echo $CLUSTER_FILE > $CLUSTER
|
||||
echo "Starting Cluster: " $CLUSTER_FILE
|
||||
|
||||
for j in 1 2 3; do
|
||||
LOG=${DIR}/${j}/log
|
||||
DATA=${DIR}/${j}/data
|
||||
mkdir -p ${LOG} ${DATA}
|
||||
${FDB} -p auto:${PORT_PREFIX}${j} -d $DATA -L $LOG -C $CLUSTER &
|
||||
done
|
||||
|
||||
CLI="$ROOT/bin/fdbcli -C ${CLUSTER} --exec"
|
||||
( sleep 2 ; $CLI "configure new ssd single" ) &
|
||||
done;
|
||||
|
||||
sleep 2
|
||||
$3
|
|
@ -0,0 +1,60 @@
|
|||
#!/bin/env python2
|
||||
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser("Run multithreaded client tests")
|
||||
|
||||
parser.add_argument("cluster_file", nargs='+', help='List of fdb.cluster files to connect to')
|
||||
parser.add_argument("--skip-so-files", default=False, action='store_true', help='Do not load .so files')
|
||||
parser.add_argument("--threads", metavar="N", type=int, default=3, help='Number of threads to use. Zero implies local client')
|
||||
parser.add_argument("--build-dir", metavar="DIR", default='.', help='Path to root directory of FDB build output')
|
||||
parser.add_argument("--client-log-dir", metavar="DIR", default="client-logs", help="Path to write client logs to. The directory will be created if it does not exist.")
|
||||
args = parser.parse_args()
|
||||
|
||||
import sys
|
||||
|
||||
### sample usage (from inside your FDB build output directory):
|
||||
|
||||
## These should pass:
|
||||
# ../tests/loopback_cluster/run_cluster.sh . 3 '../tests/python_tests/multithreaded_client.py loopback-cluster-*/fdb.cluster'
|
||||
# ../tests/loopback_cluster/run_cluster.sh . 3 '../tests/python_tests/multithreaded_client.py loopback-cluster-*/fdb.cluster --threads 1'
|
||||
# ../tests/loopback_cluster/run_cluster.sh . 3 '../tests/python_tests/multithreaded_client.py loopback-cluster-*/fdb.cluster --threads 1 --skip-so-files'
|
||||
# ../tests/loopback_cluster/run_cluster.sh . 3 '../tests/python_tests/multithreaded_client.py loopback-cluster-*/fdb.cluster --threads 0'
|
||||
# ../tests/loopback_cluster/run_cluster.sh . 3 '../tests/python_tests/multithreaded_client.py loopback-cluster-*/fdb.cluster --threads 0 --skip-so-files'
|
||||
|
||||
## This fails (unsupported configuration):
|
||||
# ../tests/loopback_cluster/run_cluster.sh . 3 '../tests/python_tests/multithreaded_client.py loopback-cluster-*/fdb.cluster --threads 2 --skip-so-files'
|
||||
|
||||
sys.path.append(args.build_dir + '/bindings/python')
|
||||
|
||||
import fdb
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
fdb.api_version(630)
|
||||
|
||||
if not os.path.exists(args.client_log_dir):
|
||||
os.mkdir(args.client_log_dir)
|
||||
|
||||
fdb.options.set_trace_enable(args.client_log_dir)
|
||||
fdb.options.set_knob("min_trace_severity=5")
|
||||
|
||||
if not args.skip_so_files:
|
||||
print "Loading .so files"
|
||||
fdb.options.set_external_client_directory(args.build_dir + '/lib')
|
||||
|
||||
if args.threads > 0:
|
||||
fdb.options.set_client_threads_per_version(args.threads)
|
||||
|
||||
dbs = []
|
||||
for v in args.cluster_file:
|
||||
dbs.append(fdb.open(cluster_file=v))
|
||||
|
||||
counter = 0
|
||||
for i in range(100):
|
||||
key = b"test_%d" % random.randrange(0, 100000000)
|
||||
val = b"value_%d" % random.randrange(0, 10000000)
|
||||
db = dbs[i % len(dbs)]
|
||||
print ("Writing: ", key, val, db)
|
||||
db[key] = val
|
||||
assert (val == db[key])
|
Loading…
Reference in New Issue