clang format
This commit is contained in:
parent
555f3ebfaa
commit
36915e848a
|
@ -277,7 +277,8 @@ void loadClientFunction(T *fp, void *lib, std::string libPath, const char *funct
|
|||
}
|
||||
}
|
||||
|
||||
DLApi::DLApi(std::string fdbCPath, bool unlinkOnLoad) : api(new FdbCApi()), fdbCPath(fdbCPath), unlinkOnLoad(unlinkOnLoad), networkSetup(false) {}
|
||||
DLApi::DLApi(std::string fdbCPath, bool unlinkOnLoad)
|
||||
: api(new FdbCApi()), fdbCPath(fdbCPath), unlinkOnLoad(unlinkOnLoad), networkSetup(false) {}
|
||||
|
||||
void DLApi::init() {
|
||||
if(isLibraryLoaded(fdbCPath.c_str())) {
|
||||
|
@ -291,8 +292,10 @@ void DLApi::init() {
|
|||
}
|
||||
if (unlinkOnLoad) {
|
||||
int err = unlink(fdbCPath.c_str());
|
||||
if(err) {
|
||||
TraceEvent(SevError, "ErrorUnlinkingTempClientLibraryFile").detail("errno", errno).detail("LibraryPath", fdbCPath);
|
||||
if (err) {
|
||||
TraceEvent(SevError, "ErrorUnlinkingTempClientLibraryFile")
|
||||
.detail("errno", errno)
|
||||
.detail("LibraryPath", fdbCPath);
|
||||
throw platform_error();
|
||||
}
|
||||
}
|
||||
|
@ -694,10 +697,12 @@ void MultiVersionTransaction::reset() {
|
|||
}
|
||||
|
||||
// MultiVersionDatabase
|
||||
MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi *api, int threadIdx, std::string clusterFilePath, Reference<IDatabase> db, bool openConnectors) : dbState(new DatabaseState()), threadIdx(threadIdx) {
|
||||
MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi* api, int threadIdx, std::string clusterFilePath,
|
||||
Reference<IDatabase> db, bool openConnectors)
|
||||
: dbState(new DatabaseState()), threadIdx(threadIdx) {
|
||||
dbState->db = db;
|
||||
dbState->dbVar->set(db);
|
||||
|
||||
|
||||
if(!openConnectors) {
|
||||
dbState->currentClientIndex = 0;
|
||||
}
|
||||
|
@ -723,7 +728,8 @@ MultiVersionDatabase::~MultiVersionDatabase() {
|
|||
}
|
||||
|
||||
Reference<IDatabase> MultiVersionDatabase::debugCreateFromExistingDatabase(Reference<IDatabase> db) {
|
||||
return Reference<IDatabase>(new MultiVersionDatabase(MultiVersionApi::api, 0, "", db, false)); // XXX better choice of thread to run this database on?
|
||||
return Reference<IDatabase>(new MultiVersionDatabase(
|
||||
MultiVersionApi::api, 0, "", db, false)); // XXX better choice of thread to run this database on?
|
||||
}
|
||||
|
||||
Reference<ITransaction> MultiVersionDatabase::createTransaction() {
|
||||
|
@ -917,21 +923,23 @@ bool MultiVersionApi::apiVersionAtLeast(int minVersion) {
|
|||
return MultiVersionApi::api->apiVersion >= minVersion || MultiVersionApi::api->apiVersion < 0;
|
||||
}
|
||||
|
||||
void MultiVersionApi::runOnExternalClientsAllThreads(std::function<void(Reference<ClientInfo>)> func, bool runOnFailedClients) {
|
||||
void MultiVersionApi::runOnExternalClientsAllThreads(std::function<void(Reference<ClientInfo>)> func,
|
||||
bool runOnFailedClients) {
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
runOnExternalClients(i, func, runOnFailedClients);
|
||||
}
|
||||
}
|
||||
|
||||
// runOnFailedClients should be used cautiously. Some failed clients may not have successfully loaded all symbols.
|
||||
void MultiVersionApi::runOnExternalClients(int threadIdx, std::function<void(Reference<ClientInfo>)> func, bool runOnFailedClients) {
|
||||
void MultiVersionApi::runOnExternalClients(int threadIdx, std::function<void(Reference<ClientInfo>)> func,
|
||||
bool runOnFailedClients) {
|
||||
bool newFailure = false;
|
||||
|
||||
auto c = externalClients.begin();
|
||||
while(c != externalClients.end()) {
|
||||
auto client = c->second[threadIdx];
|
||||
try {
|
||||
if(!client->failed || runOnFailedClients) { // TODO: Should we ignore some failures?
|
||||
if (!client->failed || runOnFailedClients) { // TODO: Should we ignore some failures?
|
||||
func(client);
|
||||
}
|
||||
}
|
||||
|
@ -1016,7 +1024,7 @@ void MultiVersionApi::addExternalLibrary(std::string path) {
|
|||
throw file_not_found();
|
||||
}
|
||||
|
||||
if(externalClientDescriptions.count(filename) == 0) {
|
||||
if (externalClientDescriptions.count(filename) == 0) {
|
||||
TraceEvent("AddingExternalClient").detail("LibraryPath", filename);
|
||||
externalClientDescriptions.emplace(std::make_pair(filename, ClientDesc(path, true)));
|
||||
}
|
||||
|
@ -1030,10 +1038,10 @@ void MultiVersionApi::addExternalLibraryDirectory(std::string path) {
|
|||
|
||||
for(auto filename : files) {
|
||||
std::string lib = abspath(joinPath(path, filename));
|
||||
if(externalClientDescriptions.count(filename) == 0) {
|
||||
if (externalClientDescriptions.count(filename) == 0) {
|
||||
TraceEvent("AddingExternalClient").detail("LibraryPath", filename);
|
||||
externalClientDescriptions.emplace(std::make_pair(filename, ClientDesc(lib, true)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1056,9 +1064,9 @@ std::vector<std::string> MultiVersionApi::copyExternalLibraryPerThread(std::stri
|
|||
constexpr size_t buf_sz = 4096;
|
||||
char buf[buf_sz];
|
||||
std::cout << path << " -> " << tempName << std::endl;
|
||||
while(1) {
|
||||
while (1) {
|
||||
ssize_t readCount = read(fd, buf, buf_sz);
|
||||
if (readCount == 0) {
|
||||
if (readCount == 0) {
|
||||
// eof
|
||||
break;
|
||||
}
|
||||
|
@ -1066,9 +1074,9 @@ std::vector<std::string> MultiVersionApi::copyExternalLibraryPerThread(std::stri
|
|||
throw platform_error;
|
||||
}
|
||||
ssize_t written = 0;
|
||||
while(written != readCount) {
|
||||
while (written != readCount) {
|
||||
ssize_t writeCount = write(tempFd, buf + written, readCount - written);
|
||||
if (writeCount == -1) {
|
||||
if (writeCount == -1) {
|
||||
throw platform_error;
|
||||
}
|
||||
written += writeCount;
|
||||
|
@ -1082,7 +1090,6 @@ std::vector<std::string> MultiVersionApi::copyExternalLibraryPerThread(std::stri
|
|||
}
|
||||
|
||||
return paths;
|
||||
|
||||
}
|
||||
|
||||
void MultiVersionApi::disableLocalClient() {
|
||||
|
@ -1098,7 +1105,8 @@ void MultiVersionApi::setSupportedClientVersions(Standalone<StringRef> versions)
|
|||
MutexHolder holder(lock);
|
||||
ASSERT(networkSetup);
|
||||
|
||||
// This option must be set on the main thread because it modifies structures that can be used concurrently by the main thread
|
||||
// This option must be set on the main thread because it modifies structures that can be used concurrently by the
|
||||
// main thread
|
||||
onMainThreadVoid([this, versions](){
|
||||
localClient->api->setNetworkOption(FDBNetworkOptions::SUPPORTED_CLIENT_VERSIONS, versions);
|
||||
}, NULL);
|
||||
|
@ -1157,8 +1165,7 @@ void MultiVersionApi::setNetworkOptionInternal(FDBNetworkOptions::Option option,
|
|||
ASSERT(!value.present() && !networkStartSetup);
|
||||
externalClient = true;
|
||||
bypassMultiClientApi = true;
|
||||
}
|
||||
else if(option == FDBNetworkOptions::CLIENT_THREADS_PER_VERSION) {
|
||||
} else if (option == FDBNetworkOptions::CLIENT_THREADS_PER_VERSION) {
|
||||
MutexHolder holder(lock);
|
||||
validateOption(value, true, false, false);
|
||||
ASSERT(!networkStartSetup);
|
||||
|
@ -1166,8 +1173,7 @@ void MultiVersionApi::setNetworkOptionInternal(FDBNetworkOptions::Option option,
|
|||
if (threadCount > 1) {
|
||||
disableLocalClient();
|
||||
}
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
MutexHolder holder(lock);
|
||||
localClient->api->setNetworkOption(option, value);
|
||||
|
||||
|
@ -1199,12 +1205,16 @@ void MultiVersionApi::setupNetwork() {
|
|||
std::string path = i.second.libPath;
|
||||
std::string filename = basename(path);
|
||||
|
||||
// Copy external lib for each thread
|
||||
// Copy external lib for each thread
|
||||
if (externalClients.count(filename) == 0) {
|
||||
externalClients[filename] = {};
|
||||
for (const auto & tmp : copyExternalLibraryPerThread(path)) {
|
||||
TraceEvent("AddingExternalClient").detail("FileName", filename).detail("LibraryPath", path).detail("TempPath", tmp);
|
||||
externalClients[filename].push_back(Reference<ClientInfo>(new ClientInfo(new DLApi(tmp, true /*unlink on load*/), path)));
|
||||
for (const auto& tmp : copyExternalLibraryPerThread(path)) {
|
||||
TraceEvent("AddingExternalClient")
|
||||
.detail("FileName", filename)
|
||||
.detail("LibraryPath", path)
|
||||
.detail("TempPath", tmp);
|
||||
externalClients[filename].push_back(
|
||||
Reference<ClientInfo>(new ClientInfo(new DLApi(tmp, true /*unlink on load*/), path)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1299,9 +1309,7 @@ void MultiVersionApi::stopNetwork() {
|
|||
localClient->api->stopNetwork();
|
||||
|
||||
if(!bypassMultiClientApi) {
|
||||
runOnExternalClientsAllThreads([](Reference<ClientInfo> client) {
|
||||
client->api->stopNetwork();
|
||||
}, true);
|
||||
runOnExternalClientsAllThreads([](Reference<ClientInfo> client) { client->api->stopNetwork(); }, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1333,12 +1341,14 @@ Reference<IDatabase> MultiVersionApi::createDatabase(const char *clusterFilePath
|
|||
if (threadCount > 1) {
|
||||
ASSERT(localClientDisabled);
|
||||
ASSERT(!bypassMultiClientApi);
|
||||
|
||||
|
||||
int threadIdx = nextThread;
|
||||
nextThread = (nextThread + 1) % threadCount;
|
||||
lock.leave();
|
||||
for(auto it : externalClients) {
|
||||
TraceEvent("CreatingDatabaseOnExternalClient").detail("LibraryPath", it.first).detail("Failed", it.second[threadIdx]->failed);
|
||||
for (auto it : externalClients) {
|
||||
TraceEvent("CreatingDatabaseOnExternalClient")
|
||||
.detail("LibraryPath", it.first)
|
||||
.detail("Failed", it.second[threadIdx]->failed);
|
||||
}
|
||||
return Reference<IDatabase>(new MultiVersionDatabase(this, threadIdx, clusterFile, Reference<IDatabase>()));
|
||||
}
|
||||
|
@ -1351,7 +1361,9 @@ Reference<IDatabase> MultiVersionApi::createDatabase(const char *clusterFilePath
|
|||
}
|
||||
else {
|
||||
for(auto it : externalClients) {
|
||||
TraceEvent("CreatingDatabaseOnExternalClient").detail("LibraryPath", it.first).detail("Failed", it.second[0]->failed);
|
||||
TraceEvent("CreatingDatabaseOnExternalClient")
|
||||
.detail("LibraryPath", it.first)
|
||||
.detail("Failed", it.second[0]->failed);
|
||||
}
|
||||
return Reference<IDatabase>(new MultiVersionDatabase(this, 0, clusterFile, db));
|
||||
}
|
||||
|
@ -1363,7 +1375,7 @@ void MultiVersionApi::updateSupportedVersions() {
|
|||
|
||||
// not mutating the client, so just call on one instance of each client version.
|
||||
// thread 0 always exists.
|
||||
runOnExternalClients(0, [&versionStr](Reference<ClientInfo> client){
|
||||
runOnExternalClients(0, [&versionStr](Reference<ClientInfo> client) {
|
||||
const char *ver = client->api->getClientVersion();
|
||||
versionStr.append(versionStr.arena(), (uint8_t*)ver, (int)strlen(ver));
|
||||
versionStr.append(versionStr.arena(), (uint8_t*)";", 1);
|
||||
|
@ -1458,7 +1470,9 @@ void MultiVersionApi::loadEnvironmentVariableNetworkOptions() {
|
|||
envOptionsLoaded = true;
|
||||
}
|
||||
|
||||
MultiVersionApi::MultiVersionApi() : bypassMultiClientApi(false), networkStartSetup(false), networkSetup(false), callbackOnMainThread(true), externalClient(false), localClientDisabled(false), apiVersion(0), envOptionsLoaded(false), threadCount(0) {}
|
||||
MultiVersionApi::MultiVersionApi()
|
||||
: bypassMultiClientApi(false), networkStartSetup(false), networkSetup(false), callbackOnMainThread(true),
|
||||
externalClient(false), localClientDisabled(false), apiVersion(0), envOptionsLoaded(false), threadCount(0) {}
|
||||
|
||||
MultiVersionApi* MultiVersionApi::api = new MultiVersionApi();
|
||||
|
||||
|
|
|
@ -283,7 +283,7 @@ struct ClientDesc {
|
|||
std::string const libPath;
|
||||
bool const external;
|
||||
|
||||
ClientDesc(std::string libPath, bool external) : libPath(libPath), external(external) { }
|
||||
ClientDesc(std::string libPath, bool external) : libPath(libPath), external(external) {}
|
||||
};
|
||||
|
||||
struct ClientInfo : ClientDesc, ThreadSafeReferenceCounted<ClientInfo> {
|
||||
|
@ -293,8 +293,9 @@ struct ClientInfo : ClientDesc, ThreadSafeReferenceCounted<ClientInfo> {
|
|||
std::vector<std::pair<void (*)(void*), void*>> threadCompletionHooks;
|
||||
|
||||
ClientInfo() : ClientDesc(std::string(), false), protocolVersion(0), api(NULL), failed(true) {}
|
||||
ClientInfo(IClientApi *api) : ClientDesc("internal", false), protocolVersion(0), api(api), failed(false) {}
|
||||
ClientInfo(IClientApi *api, std::string libPath) : ClientDesc(libPath, true), protocolVersion(0), api(api), failed(false) {}
|
||||
ClientInfo(IClientApi* api) : ClientDesc("internal", false), protocolVersion(0), api(api), failed(false) {}
|
||||
ClientInfo(IClientApi* api, std::string libPath)
|
||||
: ClientDesc(libPath, true), protocolVersion(0), api(api), failed(false) {}
|
||||
|
||||
void loadProtocolVersion();
|
||||
bool canReplace(Reference<ClientInfo> other) const;
|
||||
|
@ -304,7 +305,8 @@ class MultiVersionApi;
|
|||
|
||||
class MultiVersionDatabase : public IDatabase, ThreadSafeReferenceCounted<MultiVersionDatabase> {
|
||||
public:
|
||||
MultiVersionDatabase(MultiVersionApi *api, int threadIdx, std::string clusterFilePath, Reference<IDatabase> db, bool openConnectors=true);
|
||||
MultiVersionDatabase(MultiVersionApi* api, int threadIdx, std::string clusterFilePath, Reference<IDatabase> db,
|
||||
bool openConnectors = true);
|
||||
~MultiVersionDatabase();
|
||||
|
||||
Reference<ITransaction> createTransaction() override;
|
||||
|
@ -386,8 +388,9 @@ public:
|
|||
static MultiVersionApi* api;
|
||||
|
||||
Reference<ClientInfo> getLocalClient();
|
||||
void runOnExternalClients(int threadId, std::function<void(Reference<ClientInfo>)>, bool runOnFailedClients=false);
|
||||
void runOnExternalClientsAllThreads(std::function<void(Reference<ClientInfo>)>, bool runOnFailedClients=false);
|
||||
void runOnExternalClients(int threadId, std::function<void(Reference<ClientInfo>)>,
|
||||
bool runOnFailedClients = false);
|
||||
void runOnExternalClientsAllThreads(std::function<void(Reference<ClientInfo>)>, bool runOnFailedClients = false);
|
||||
|
||||
void updateSupportedVersions();
|
||||
|
||||
|
|
Loading…
Reference in New Issue