Merge pull request #791 from ajbeamon/remove-cluster-from-iclientapi

Remove cluster from IClientApi (phase 2 of removing DB names)
This commit is contained in:
Evan Tschannen 2018-11-10 10:16:18 -08:00 committed by GitHub
commit a654183f63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 357 additions and 645 deletions

View File

@ -31,15 +31,15 @@ int g_api_version = 0;
*
* type mapping:
* FDBFuture -> ThreadSingleAssignmentVarBase
* FDBCluster -> ICluster
* FDBCluster -> char
* FDBDatabase -> IDatabase
* FDBTransaction -> ITransaction
*/
#define TSAVB(f) ((ThreadSingleAssignmentVarBase*)(f))
#define TSAV(T, f) ((ThreadSingleAssignmentVar<T>*)(f))
#define CLUSTER(c) ((char*)c)
#define DB(d) ((IDatabase*)d)
#define CLUSTER(c) ((ICluster*)c)
#define TXN(t) ((ITransaction*)t)
/*
@ -129,7 +129,7 @@ fdb_error_t fdb_stop_network() {
extern "C" DLLEXPORT
fdb_error_t fdb_add_network_thread_completion_hook(void (*hook)(void*), void *hook_parameter) {
CATCH_AND_RETURN( API->addNetworkThreadCompletionHook(hook, hook_parameter); );
CATCH_AND_RETURN( API->addNetworkThreadCompletionHook(hook, hook_parameter); );
}
@ -238,7 +238,7 @@ extern "C" DLLEXPORT
fdb_error_t fdb_future_get_cluster( FDBFuture* f, FDBCluster** out_cluster ) {
CATCH_AND_RETURN(
*out_cluster = (FDBCluster*)
( (TSAV( Reference<ICluster>, f )->get() ).extractPtr() ); );
( (TSAV( char*, f )->get() ) ); );
}
extern "C" DLLEXPORT
@ -295,7 +295,12 @@ fdb_error_t fdb_future_get_string_array(
extern "C" DLLEXPORT
FDBFuture* fdb_create_cluster( const char* cluster_file_path ) {
return (FDBFuture*) API->createCluster( cluster_file_path ? cluster_file_path : ""/*, g_api_version*/ ).extractPtr();
char *path = NULL;
if(cluster_file_path) {
path = new char[strlen(cluster_file_path) + 1];
strcpy(path, cluster_file_path);
}
return (FDBFuture*)ThreadFuture<char*>(path).extractPtr();
}
extern "C" DLLEXPORT
@ -304,13 +309,13 @@ fdb_error_t fdb_cluster_set_option( FDBCluster* c,
uint8_t const* value,
int value_length )
{
CATCH_AND_RETURN(
CLUSTER(c)->setOption( (FDBClusterOptions::Option)option, value ? StringRef( value, value_length ) : Optional<StringRef>() ); );
// There are no cluster options
return error_code_success;
}
extern "C" DLLEXPORT
void fdb_cluster_destroy( FDBCluster* c ) {
CATCH_AND_DIE( CLUSTER(c)->delref(); );
CATCH_AND_DIE( delete[] CLUSTER(c); );
}
extern "C" DLLEXPORT
@ -321,7 +326,7 @@ FDBFuture* fdb_cluster_create_database( FDBCluster* c, uint8_t const* db_name,
return (FDBFuture*)ThreadFuture<Reference<IDatabase>>(invalid_database_name()).extractPtr();
}
return (FDBFuture*)CLUSTER(c)->createDatabase().extractPtr();
return (FDBFuture*) API->createDatabase( c ? CLUSTER(c) : "").extractPtr();
}
extern "C" DLLEXPORT

View File

@ -2672,12 +2672,10 @@ int main(int argc, char* argv[]) {
printf(" %d: %d %s\n", i->second, i->first, Error::fromCode(i->first).what());
Reference<Cluster> cluster;
Reference<ClusterConnectionFile> ccf;
Database db;
Reference<Cluster> source_cluster;
Reference<ClusterConnectionFile> source_ccf;
Database source_db;
Reference<ClusterConnectionFile> sourceCcf;
Database sourceDb;
FileBackupAgent ba;
Key tag;
Future<Optional<Void>> f;
@ -2692,6 +2690,15 @@ int main(int argc, char* argv[]) {
return FDB_EXIT_ERROR;
}
TraceEvent("ProgramStart")
.detail("SourceVersion", getHGVersion())
.detail("Version", FDB_VT_VERSION )
.detail("PackageName", FDB_VT_PACKAGE_NAME)
.detailf("ActualTime", "%lld", DEBUG_DETERMINISM ? 0 : time(NULL))
.detail("CommandLine", commandLine)
.detail("MemoryLimit", memLimit)
.trackLatest("ProgramStart");
// Ordinarily, this is done when the network is run. However, network thread should be set before TraceEvents are logged. This thread will eventually run the network, so call it now.
TraceEvent::setNetworkThread();
@ -2734,7 +2741,7 @@ int main(int argc, char* argv[]) {
}
try {
cluster = Cluster::createCluster(ccf, -1);
db = Database::createDatabase(ccf, -1, localities);
}
catch (Error& e) {
fprintf(stderr, "ERROR: %s\n", e.what());
@ -2742,23 +2749,13 @@ int main(int argc, char* argv[]) {
return false;
}
TraceEvent("ProgramStart")
.detail("SourceVersion", getHGVersion())
.detail("Version", FDB_VT_VERSION )
.detail("PackageName", FDB_VT_PACKAGE_NAME)
.detailf("ActualTime", "%lld", DEBUG_DETERMINISM ? 0 : time(NULL))
.detail("CommandLine", commandLine)
.detail("MemoryLimit", memLimit)
.trackLatest("ProgramStart");
db = cluster->createDatabase(localities).get();
return true;
};
if(sourceClusterFile.size()) {
auto resolvedSourceClusterFile = ClusterConnectionFile::lookupClusterFileName(sourceClusterFile);
try {
source_ccf = Reference<ClusterConnectionFile>(new ClusterConnectionFile(resolvedSourceClusterFile.first));
sourceCcf = Reference<ClusterConnectionFile>(new ClusterConnectionFile(resolvedSourceClusterFile.first));
}
catch (Error& e) {
fprintf(stderr, "%s\n", ClusterConnectionFile::getErrorString(resolvedSourceClusterFile, e).c_str());
@ -2766,15 +2763,13 @@ int main(int argc, char* argv[]) {
}
try {
source_cluster = Cluster::createCluster(source_ccf, -1);
sourceDb = Database::createDatabase(ccf, -1, localities);
}
catch (Error& e) {
fprintf(stderr, "ERROR: %s\n", e.what());
fprintf(stderr, "ERROR: Unable to connect to cluster from `%s'\n", source_ccf->getFilename().c_str());
fprintf(stderr, "ERROR: Unable to connect to cluster from `%s'\n", sourceCcf->getFilename().c_str());
return FDB_EXIT_ERROR;
}
source_db = source_cluster->createDatabase(localities).get();
}
switch (programExe)
@ -2904,7 +2899,7 @@ int main(int argc, char* argv[]) {
case EXE_DR_AGENT:
if(!initCluster())
return FDB_EXIT_ERROR;
f = stopAfter( runDBAgent(source_db, db) );
f = stopAfter( runDBAgent(sourceDb, db) );
break;
case EXE_DB_BACKUP:
if(!initCluster())
@ -2912,22 +2907,22 @@ int main(int argc, char* argv[]) {
switch (dbType)
{
case DB_START:
f = stopAfter( submitDBBackup(source_db, db, backupKeys, tagName) );
f = stopAfter( submitDBBackup(sourceDb, db, backupKeys, tagName) );
break;
case DB_STATUS:
f = stopAfter( statusDBBackup(source_db, db, tagName, maxErrors) );
f = stopAfter( statusDBBackup(sourceDb, db, tagName, maxErrors) );
break;
case DB_SWITCH:
f = stopAfter( switchDBBackup(source_db, db, backupKeys, tagName) );
f = stopAfter( switchDBBackup(sourceDb, db, backupKeys, tagName) );
break;
case DB_ABORT:
f = stopAfter( abortDBBackup(source_db, db, tagName, partial) );
f = stopAfter( abortDBBackup(sourceDb, db, tagName, partial) );
break;
case DB_PAUSE:
f = stopAfter( changeDBBackupResumed(source_db, db, true) );
f = stopAfter( changeDBBackupResumed(sourceDb, db, true) );
break;
case DB_RESUME:
f = stopAfter( changeDBBackupResumed(source_db, db, false) );
f = stopAfter( changeDBBackupResumed(sourceDb, db, false) );
break;
case DB_UNDEFINED:
default:

View File

@ -1483,14 +1483,6 @@ ACTOR template <class T> Future<T> makeInterruptable( Future<T> f ) {
}
}
ACTOR Future<Database> openDatabase( Reference<ClusterConnectionFile> ccf, Reference<Cluster> cluster, bool doCheckStatus ) {
state Database db = wait( cluster->createDatabase() );
if (doCheckStatus) {
wait( makeInterruptable( checkStatus( Void(), ccf )) );
}
return db;
}
ACTOR Future<Void> commitTransaction( Reference<ReadYourWritesTransaction> tr ) {
wait( makeInterruptable( tr->commit() ) );
auto ver = tr->getCommittedVersion();
@ -2294,12 +2286,9 @@ Future<T> stopNetworkAfter( Future<T> what ) {
ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
state LineNoise& linenoise = *plinenoise;
state bool connected = false;
state bool opened = false;
state bool intrans = false;
state Database db;
state Reference<Cluster> cluster;
state Reference<ReadYourWritesTransaction> tr;
state bool writeMode = false;
@ -2326,10 +2315,10 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
TraceEvent::setNetworkThread();
try {
cluster = Cluster::createCluster(ccf->getFilename().c_str(), -1);
connected = true;
if (!opt.exec.present())
db = Database::createDatabase(ccf, -1);
if (!opt.exec.present()) {
printf("Using cluster file `%s'.\n", ccf->getFilename().c_str());
}
}
catch (Error& e) {
printf("ERROR: %s (%d)\n", e.what(), e.code());
@ -2349,28 +2338,16 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
.trackLatest("ProgramStart");
}
if (connected) {
try {
Database _db = wait( openDatabase( ccf, cluster, !opt.exec.present() && opt.initialStatusCheck ) );
db = _db;
tr = Reference<ReadYourWritesTransaction>();
opened = true;
if (!opt.exec.present() && !opt.initialStatusCheck)
printf("\n");
} catch (Error& e) {
if(e.code() != error_code_actor_cancelled) {
printf("ERROR: %s (%d)\n", e.what(), e.code());
printf("Unable to open database\n");
}
return 1;
}
}
if (!opt.exec.present()) {
if(opt.initialStatusCheck) {
wait( makeInterruptable( checkStatus( Void(), ccf )) );
}
else {
printf("\n");
}
printf("Welcome to the fdbcli. For help, type `help'.\n");
validOptions = options->getValidOptions();
}
state bool is_error = false;
@ -2499,14 +2476,8 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
continue;
}
if (!connected) {
printf("ERROR: Not connected\n");
is_error = true;
continue;
}
if (tokencmp(tokens[0], "waitconnected")) {
wait( makeInterruptable( cluster->onConnected() ) );
wait( makeInterruptable( db->onConnected() ) );
continue;
}
@ -2600,12 +2571,6 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
continue;
}
if (!opened) {
printf("ERROR: No database open\n");
is_error = true;
continue;
}
if (tokencmp(tokens[0], "begin")) {
if (tokens.size() != 1) {
printUsage(tokens[0]);
@ -3207,13 +3172,11 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
if(e.code() != error_code_actor_cancelled)
printf("ERROR: %s (%d)\n", e.what(), e.code());
is_error = true;
if (connected && opened) {
if (intrans) {
printf("Rolling back current transaction\n");
intrans = false;
options = &globalOptions;
options->apply(tr);
}
if (intrans) {
printf("Rolling back current transaction\n");
intrans = false;
options = &globalOptions;
options->apply(tr);
}
}

View File

@ -81,7 +81,7 @@ public:
bool writeFile();
void setConnectionString( ClusterConnectionString const& );
std::string const& getFilename() const { ASSERT( filename.size() ); return filename; }
bool canGetFilename() { return filename.size() != 0; }
bool canGetFilename() const { return filename.size() != 0; }
bool fileContentsUpToDate() const;
bool fileContentsUpToDate(ClusterConnectionString &fileConnectionString) const;
void notifyConnected();

View File

@ -51,14 +51,13 @@ public:
class DatabaseContext : public ReferenceCounted<DatabaseContext>, NonCopyable {
public:
static Future<Database> createDatabase( Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<Cluster> cluster, LocalityData const& clientLocality );
// For internal (fdbserver) use only: create a database context for a DB with already known client info
static Database create( Reference<AsyncVar<ClientDBInfo>> info, Future<Void> dependency, LocalityData clientLocality, bool enableLocalityLoadBalance, int taskID = TaskDefaultEndpoint, bool lockAware = false );
// For internal (fdbserver) use only
static Database create( Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<ClusterConnectionFile> connFile, LocalityData const& clientLocality );
static Database create( Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor, LocalityData clientLocality, bool enableLocalityLoadBalance, int taskID=TaskDefaultEndpoint, bool lockAware=false, int apiVersion=Database::API_VERSION_LATEST );
~DatabaseContext();
Database clone() const { return Database(new DatabaseContext( clientInfo, cluster, clientInfoMonitor, dbId, taskID, clientLocality, enableLocalityLoadBalance, lockAware )); }
Database clone() const { return Database(new DatabaseContext( cluster, clientInfo, clientInfoMonitor, dbId, taskID, clientLocality, enableLocalityLoadBalance, lockAware, apiVersion )); }
pair<KeyRange,Reference<LocationInfo>> getCachedLocation( const KeyRef&, bool isBackward = false );
bool getCachedLocations( const KeyRangeRef&, vector<std::pair<KeyRange,Reference<LocationInfo>>>&, int limit, bool reverse );
@ -76,27 +75,26 @@ public:
void setOption( FDBDatabaseOptions::Option option, Optional<StringRef> value );
Error deferred_error;
Error deferredError;
bool lockAware;
void checkDeferredError() {
if( cluster )
cluster->checkDeferredError();
if( deferred_error.code() != invalid_error_code )
throw deferred_error;
if( deferredError.code() != invalid_error_code ) {
throw deferredError;
}
}
//private: friend class ClientInfoMonitorActor;
explicit DatabaseContext( Reference<AsyncVar<ClientDBInfo>> clientInfo,
Reference<Cluster> cluster, Future<Void> clientInfoMonitor,
Standalone<StringRef> dbId, int taskID, LocalityData clientLocality, bool enableLocalityLoadBalance, bool lockAware );
int apiVersionAtLeast(int minVersion) { return apiVersion < 0 || apiVersion >= minVersion; }
// These are reference counted
Reference<Cluster> cluster;
Future<Void> clientInfoMonitor; // or sometimes an outside dependency that does the same thing!
Future<Void> onConnected(); // Returns after a majority of coordination servers are available and have reported a leader. The cluster file therefore is valid, but the database might be unavailable.
Reference<ClusterConnectionFile> getConnectionFile();
//private:
explicit DatabaseContext( Reference<Cluster> cluster, Reference<AsyncVar<ClientDBInfo>> clientDBInfo,
Future<Void> clientInfoMonitor, Standalone<StringRef> dbId, int taskID, LocalityData const& clientLocality,
bool enableLocalityLoadBalance, bool lockAware, int apiVersion = Database::API_VERSION_LATEST );
// Key DB-specific information
Reference<AsyncVar<ClientDBInfo>> clientInfo;
AsyncTrigger masterProxiesChangeTrigger;
Future<Void> monitorMasterProxiesInfoChange;
Reference<ProxyInfo> masterProxies;
@ -151,6 +149,13 @@ public:
Int64MetricHandle getValueSubmitted;
EventMetricHandle<GetValueComplete> getValueCompleted;
Reference<AsyncVar<ClientDBInfo>> clientInfo;
Future<Void> clientInfoMonitor;
Reference<Cluster> cluster;
int apiVersion;
};
#endif

View File

@ -74,6 +74,7 @@ public:
class IDatabase {
public:
virtual ~IDatabase() {}
virtual Reference<ITransaction> createTransaction() = 0;
virtual void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;
@ -81,16 +82,6 @@ public:
virtual void delref() = 0;
};
class ICluster {
public:
virtual ~ICluster() {}
virtual ThreadFuture<Reference<IDatabase>> createDatabase() = 0;
virtual void setOption(FDBClusterOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;
virtual void addref() = 0;
virtual void delref() = 0;
};
class IClientApi {
public:
virtual ~IClientApi() {}
@ -103,7 +94,7 @@ public:
virtual void runNetwork() = 0;
virtual void stopNetwork() = 0;
virtual ThreadFuture<Reference<ICluster>> createCluster(const char *clusterFilePath) = 0;
virtual ThreadFuture<Reference<IDatabase>> createDatabase(const char *clusterFilePath) = 0;
virtual void addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter) = 0;
};

View File

@ -730,7 +730,7 @@ ACTOR Future<CoordinatorsResult::Type> changeQuorum( Database cx, Reference<IQuo
return CoordinatorsResult::BAD_DATABASE_STATE; // Someone deleted this key entirely?
state ClusterConnectionString old( currentKey.get().toString() );
if ( cx->cluster && old.clusterKeyName().toString() != cx->cluster->getConnectionFile()->getConnectionString().clusterKeyName() )
if ( cx->getConnectionFile() && old.clusterKeyName().toString() != cx->getConnectionFile()->getConnectionString().clusterKeyName() )
return CoordinatorsResult::BAD_DATABASE_STATE; // Someone changed the "name" of the database??
state CoordinatorsResult::Type result = CoordinatorsResult::SUCCESS;

View File

@ -408,9 +408,9 @@ TEST_CASE("/fdbserver/metrics/TraceEvents") {
}
fprintf(stdout, "Using environment variables METRICS_CONNFILE and METRICS_PREFIX.\n");
state Reference<Cluster> metricsCluster = Cluster::createCluster( metricsConnFile, Cluster::API_VERSION_LATEST );
state Database metricsDb = Database::createDatabase(metricsConnFile, Database::API_VERSION_LATEST);
TDMetricCollection::getTDMetrics()->address = LiteralStringRef("0.0.0.0:0");
state Future<Void> metrics = runMetrics(metricsCluster->createDatabase(), KeyRef(metricsPrefix));
state Future<Void> metrics = runMetrics(metricsDb, KeyRef(metricsPrefix));
state int64_t x = 0;
state double w = 0.5;

View File

@ -59,8 +59,9 @@ std::string ClusterConnectionFile::getErrorString( std::pair<std::string, bool>
}
ClusterConnectionFile::ClusterConnectionFile( std::string const& filename ) {
if( !fileExists( filename ) )
if( !fileExists( filename ) ) {
throw no_cluster_file_found();
}
cs = ClusterConnectionString(readFileBytes(filename, MAX_CLUSTER_FILE_BYTES));
this->filename = filename;

View File

@ -222,21 +222,6 @@ void DLDatabase::setOption(FDBDatabaseOptions::Option option, Optional<StringRef
throwIfError(api->databaseSetOption(db, option, value.present() ? value.get().begin() : NULL, value.present() ? value.get().size() : 0));
}
// DLCluster
ThreadFuture<Reference<IDatabase>> DLCluster::createDatabase() {
FdbCApi::FDBFuture *f = api->clusterCreateDatabase(cluster, (uint8_t*)"DB", 2);
return toThreadFuture<Reference<IDatabase>>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
FdbCApi::FDBDatabase *db;
api->futureGetDatabase(f, &db);
return Reference<IDatabase>(new DLDatabase(Reference<FdbCApi>::addRef(api), db));
});
}
void DLCluster::setOption(FDBClusterOptions::Option option, Optional<StringRef> value) {
throwIfError(api->clusterSetOption(cluster, option, value.present() ? value.get().begin() : NULL, value.present() ? value.get().size() : 0));
}
// DLApi
template<class T>
void loadClientFunction(T *fp, void *lib, std::string libPath, const char *functionName, bool requireFunction = true) {
@ -361,13 +346,31 @@ void DLApi::stopNetwork() {
}
}
ThreadFuture<Reference<ICluster>> DLApi::createCluster(const char *clusterFilePath) {
ThreadFuture<Reference<IDatabase>> DLApi::createDatabase(const char *clusterFilePath) {
FdbCApi::FDBFuture *f = api->createCluster(clusterFilePath);
return toThreadFuture<Reference<ICluster>>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
auto clusterFuture = toThreadFuture<FdbCApi::FDBCluster*>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
FdbCApi::FDBCluster *cluster;
api->futureGetCluster(f, &cluster);
return Reference<ICluster>(new DLCluster(Reference<FdbCApi>::addRef(api), cluster));
return cluster;
});
Reference<FdbCApi> innerApi = api;
return flatMapThreadFuture<FdbCApi::FDBCluster*, Reference<IDatabase>>(clusterFuture, [innerApi](ErrorOr<FdbCApi::FDBCluster*> cluster) {
if(cluster.isError()) {
return ErrorOr<ThreadFuture<Reference<IDatabase>>>(cluster.getError());
}
auto dbFuture = toThreadFuture<Reference<IDatabase>>(innerApi, innerApi->clusterCreateDatabase(cluster.get(), (uint8_t*)"DB", 2), [](FdbCApi::FDBFuture *f, FdbCApi *api) {
FdbCApi::FDBDatabase *db;
api->futureGetDatabase(f, &db);
return Reference<IDatabase>(new DLDatabase(Reference<FdbCApi>::addRef(api), db));
});
return ErrorOr<ThreadFuture<Reference<IDatabase>>>(mapThreadFuture<Reference<IDatabase>, Reference<IDatabase>>(dbFuture, [cluster, innerApi](ErrorOr<Reference<IDatabase>> db) {
innerApi->clusterDestroy(cluster.get());
return db;
}));
});
}
@ -566,16 +569,36 @@ void MultiVersionTransaction::reset() {
}
// MultiVersionDatabase
MultiVersionDatabase::MultiVersionDatabase(Reference<MultiVersionCluster> cluster, Reference<IDatabase> db, ThreadFuture<Void> changed)
: dbState(new DatabaseState(cluster, db, changed)) {}
MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi *api, std::string clusterFilePath, Reference<IDatabase> db, bool openConnectors) : dbState(new DatabaseState()) {
dbState->db = db;
dbState->dbVar->set(db);
if(!openConnectors) {
dbState->currentClientIndex = 0;
}
else {
if(!api->localClientDisabled) {
dbState->currentClientIndex = 0;
dbState->addConnection(api->getLocalClient(), clusterFilePath);
}
else {
dbState->currentClientIndex = -1;
}
api->runOnExternalClients([this, clusterFilePath](Reference<ClientInfo> client) {
dbState->addConnection(client, clusterFilePath);
});
dbState->startConnections();
}
}
MultiVersionDatabase::~MultiVersionDatabase() {
dbState->cancelCallbacks();
dbState->cancelConnections();
}
Reference<IDatabase> MultiVersionDatabase::debugCreateFromExistingDatabase(Reference<IDatabase> db) {
auto cluster = Reference<ThreadSafeAsyncVar<Reference<ICluster>>>(new ThreadSafeAsyncVar<Reference<ICluster>>(Reference<ICluster>(NULL)));
return Reference<IDatabase>(new MultiVersionDatabase(Reference<MultiVersionCluster>::addRef(new MultiVersionCluster()), db, ThreadFuture<Void>(Never())));
return Reference<IDatabase>(new MultiVersionDatabase(MultiVersionApi::api, "", db, false));
}
Reference<ITransaction> MultiVersionDatabase::createTransaction() {
@ -602,164 +625,7 @@ void MultiVersionDatabase::setOption(FDBDatabaseOptions::Option option, Optional
dbState->options.push_back(std::make_pair(option, value.cast_to<Standalone<StringRef>>()));
}
MultiVersionDatabase::DatabaseState::DatabaseState(Reference<MultiVersionCluster> cluster, Reference<IDatabase> db, ThreadFuture<Void> changed)
: cluster(cluster), db(db), dbVar(new ThreadSafeAsyncVar<Reference<IDatabase>>(db)), cancelled(false), changed(changed)
{
addref();
int userParam;
changed.callOrSetAsCallback(this, userParam, false);
}
void MultiVersionDatabase::DatabaseState::fire(const Void &unused, int& userParam) {
onMainThreadVoid([this]() {
if(!cancelled) {
if(changed.isReady()) {
updateDatabase();
}
else if(dbFuture.isValid() && dbFuture.isReady()) {
auto newDb = dbFuture.get();
optionLock.enter();
bool optionFailed = false;
for(auto option : options) {
try {
newDb->setOption(option.first, option.second.cast_to<StringRef>()); // In practice, this will set a deferred error instead of throwing. If that happens, the database will be unusable (all transaction operations will throw errors).
}
catch(Error &e) {
optionFailed = true;
TraceEvent(SevError, "DatabaseVersionChangeOptionError").error(e).detail("Option", option.first).detail("OptionValue", printable(option.second));
}
}
if(!optionFailed) {
db = newDb;
}
else {
// TODO: does this constitute a client failure?
db = Reference<IDatabase>(NULL); // If we can't set all options on the database, just leave us disconnected until we switch clients again
}
optionLock.leave();
dbVar->set(db);
dbFuture.cancel();
}
}
delref();
}, NULL);
}
void MultiVersionDatabase::DatabaseState::error(const Error& e, int& userParam) {
if(e.code() == error_code_operation_cancelled) {
delref();
return;
}
// TODO: retry?
TraceEvent(SevWarnAlways, "DatabaseCreationFailed").error(e);
onMainThreadVoid([this]() {
updateDatabase();
delref();
}, NULL);
}
void MultiVersionDatabase::DatabaseState::updateDatabase() {
auto currentCluster = cluster->clusterState->clusterVar->get();
changed = currentCluster.onChange;
addref();
int userParam;
changed.callOrSetAsCallback(this, userParam, false);
if(dbFuture.isValid()) {
dbFuture.cancel();
}
if(currentCluster.value) {
addref();
dbFuture = currentCluster.value->createDatabase();
dbFuture.callOrSetAsCallback(this, userParam, false);
}
}
void MultiVersionDatabase::DatabaseState::cancelCallbacks() {
addref();
onMainThreadVoid([this]() {
cancelled = true;
if(dbFuture.isValid()) {
dbFuture.cancel();
}
if(changed.isValid() && changed.clearCallback(this)) {
delref();
}
delref();
}, NULL);
}
// MultiVersionCluster
MultiVersionCluster::MultiVersionCluster(MultiVersionApi *api, std::string clusterFilePath, Reference<ICluster> cluster) : clusterState(new ClusterState()) {
clusterState->cluster = cluster;
clusterState->clusterVar->set(cluster);
if(!api->localClientDisabled) {
clusterState->currentClientIndex = 0;
clusterState->addConnection(api->getLocalClient(), clusterFilePath);
}
else {
clusterState->currentClientIndex = -1;
}
api->runOnExternalClients([this, clusterFilePath](Reference<ClientInfo> client) {
clusterState->addConnection(client, clusterFilePath);
});
clusterState->startConnections();
}
MultiVersionCluster::~MultiVersionCluster() {
clusterState->cancelConnections();
}
ThreadFuture<Reference<IDatabase>> MultiVersionCluster::createDatabase() {
auto cluster = clusterState->clusterVar->get();
if(cluster.value) {
ThreadFuture<Reference<IDatabase>> dbFuture = abortableFuture(cluster.value->createDatabase(), cluster.onChange);
return mapThreadFuture<Reference<IDatabase>, Reference<IDatabase>>(dbFuture, [this, cluster](ErrorOr<Reference<IDatabase>> db) {
if(db.isError() && db.getError().code() != error_code_cluster_version_changed) {
return db;
}
Reference<IDatabase> newDb = db.isError() ? Reference<IDatabase>(NULL) : db.get();
return ErrorOr<Reference<IDatabase>>(Reference<IDatabase>(new MultiVersionDatabase(Reference<MultiVersionCluster>::addRef(this), newDb, cluster.onChange)));
});
}
else {
return Reference<IDatabase>(new MultiVersionDatabase(Reference<MultiVersionCluster>::addRef(this), Reference<IDatabase>(), cluster.onChange));
}
}
void MultiVersionCluster::setOption(FDBClusterOptions::Option option, Optional<StringRef> value) {
MutexHolder holder(clusterState->optionLock);
auto itr = FDBClusterOptions::optionInfo.find(option);
if(itr != FDBClusterOptions::optionInfo.end()) {
TraceEvent("SetClusterOption").detail("Option", itr->second.name);
}
else {
TraceEvent("UnknownClusterOption").detail("Option", option);
throw invalid_option();
}
if(clusterState->cluster) {
clusterState->cluster->setOption(option, value);
}
clusterState->options.push_back(std::make_pair(option, value.cast_to<Standalone<StringRef>>()));
}
void MultiVersionCluster::Connector::connect() {
void MultiVersionDatabase::Connector::connect() {
addref();
onMainThreadVoid([this]() {
if(!cancelled) {
@ -767,23 +633,14 @@ void MultiVersionCluster::Connector::connect() {
if(connectionFuture.isValid()) {
connectionFuture.cancel();
}
auto clusterFuture = client->api->createCluster(clusterFilePath.c_str());
auto dbFuture = flatMapThreadFuture<Reference<ICluster>, Reference<IDatabase>>(clusterFuture, [this](ErrorOr<Reference<ICluster>> cluster) {
if(cluster.isError()) {
return ErrorOr<ThreadFuture<Reference<IDatabase>>>(cluster.getError());
}
else {
candidateCluster = cluster.get();
return ErrorOr<ThreadFuture<Reference<IDatabase>>>(cluster.get()->createDatabase());
}
});
ThreadFuture<Reference<IDatabase>> dbFuture = client->api->createDatabase(clusterFilePath.c_str());
connectionFuture = flatMapThreadFuture<Reference<IDatabase>, Void>(dbFuture, [this](ErrorOr<Reference<IDatabase>> db) {
if(db.isError()) {
return ErrorOr<ThreadFuture<Void>>(db.getError());
}
else {
candidateDatabase = db.get();
tr = db.get()->createTransaction();
auto versionFuture = mapThreadFuture<Version, Void>(tr->getReadVersion(), [this](ErrorOr<Version> v) {
// If the version attempt returns an error, we regard that as a connection (except operation_cancelled)
@ -809,7 +666,7 @@ void MultiVersionCluster::Connector::connect() {
}
// Only called from main thread
void MultiVersionCluster::Connector::cancel() {
void MultiVersionDatabase::Connector::cancel() {
connected = false;
cancelled = true;
if(connectionFuture.isValid()) {
@ -817,29 +674,32 @@ void MultiVersionCluster::Connector::cancel() {
}
}
void MultiVersionCluster::Connector::fire(const Void &unused, int& userParam) {
void MultiVersionDatabase::Connector::fire(const Void &unused, int& userParam) {
onMainThreadVoid([this]() {
if(!cancelled) {
connected = true;
clusterState->stateChanged();
dbState->stateChanged();
}
delref();
}, NULL);
}
void MultiVersionCluster::Connector::error(const Error& e, int& userParam) {
void MultiVersionDatabase::Connector::error(const Error& e, int& userParam) {
if(e.code() != error_code_operation_cancelled) {
// TODO: is it right to abandon this connection attempt?
client->failed = true;
MultiVersionApi::api->updateSupportedVersions();
TraceEvent(SevError, "ClusterConnectionError").error(e).detail("ClientLibrary", this->client->libPath);
TraceEvent(SevError, "DatabaseConnectionError").error(e).detail("ClientLibrary", this->client->libPath);
}
delref();
}
MultiVersionDatabase::DatabaseState::DatabaseState()
: dbVar(new ThreadSafeAsyncVar<Reference<IDatabase>>(Reference<IDatabase>(NULL))), currentClientIndex(-1) {}
// Only called from main thread
void MultiVersionCluster::ClusterState::stateChanged() {
void MultiVersionDatabase::DatabaseState::stateChanged() {
int newIndex = -1;
for(int i = 0; i < clients.size(); ++i) {
if(i != currentClientIndex && connectionAttempts[i]->connected) {
@ -862,12 +722,12 @@ void MultiVersionCluster::ClusterState::stateChanged() {
}
// Restart connection for replaced client
auto newCluster = connectionAttempts[newIndex]->candidateCluster;
auto newDb = connectionAttempts[newIndex]->candidateDatabase;
optionLock.enter();
for(auto option : options) {
try {
newCluster->setOption(option.first, option.second.cast_to<StringRef>()); // In practice, this will set a deferred error instead of throwing. If that happens, the cluster will be unusable (attempts to use it will throw errors).
newDb->setOption(option.first, option.second.cast_to<StringRef>()); // In practice, this will set a deferred error instead of throwing. If that happens, the database will be unusable (attempts to use it will throw errors).
}
catch(Error &e) {
optionLock.leave();
@ -879,10 +739,10 @@ void MultiVersionCluster::ClusterState::stateChanged() {
}
}
cluster = newCluster;
db = newDb;
optionLock.leave();
clusterVar->set(cluster);
dbVar->set(db);
if(currentClientIndex >= 0 && connectionAttempts[currentClientIndex]->connected) {
connectionAttempts[currentClientIndex]->connected = false;
@ -893,18 +753,18 @@ void MultiVersionCluster::ClusterState::stateChanged() {
currentClientIndex = newIndex;
}
void MultiVersionCluster::ClusterState::addConnection(Reference<ClientInfo> client, std::string clusterFilePath) {
void MultiVersionDatabase::DatabaseState::addConnection(Reference<ClientInfo> client, std::string clusterFilePath) {
clients.push_back(client);
connectionAttempts.push_back(Reference<Connector>(new Connector(Reference<ClusterState>::addRef(this), client, clusterFilePath)));
connectionAttempts.push_back(Reference<Connector>(new Connector(Reference<DatabaseState>::addRef(this), client, clusterFilePath)));
}
void MultiVersionCluster::ClusterState::startConnections() {
void MultiVersionDatabase::DatabaseState::startConnections() {
for(auto c : connectionAttempts) {
c->connect();
}
}
void MultiVersionCluster::ClusterState::cancelConnections() {
void MultiVersionDatabase::DatabaseState::cancelConnections() {
addref();
onMainThreadVoid([this](){
for(auto c : connectionAttempts) {
@ -1253,7 +1113,7 @@ void MultiVersionApi::addNetworkThreadCompletionHook(void (*hook)(void*), void *
}
}
ThreadFuture<Reference<ICluster>> MultiVersionApi::createCluster(const char *clusterFilePath) {
ThreadFuture<Reference<IDatabase>> MultiVersionApi::createDatabase(const char *clusterFilePath) {
lock.enter();
if(!networkSetup) {
lock.leave();
@ -1263,23 +1123,23 @@ ThreadFuture<Reference<ICluster>> MultiVersionApi::createCluster(const char *clu
std::string clusterFile(clusterFilePath);
if(localClientDisabled) {
return Reference<ICluster>(new MultiVersionCluster(this, clusterFile, Reference<ICluster>()));
return Reference<IDatabase>(new MultiVersionDatabase(this, clusterFile, Reference<IDatabase>()));
}
auto clusterFuture = localClient->api->createCluster(clusterFilePath);
auto databaseFuture = localClient->api->createDatabase(clusterFilePath);
if(bypassMultiClientApi) {
return clusterFuture;
return databaseFuture;
}
else {
for( auto it : externalClients ) {
TraceEvent("CreatingClusterOnExternalClient").detail("LibraryPath", it.second->libPath).detail("Failed", it.second->failed);
for(auto it : externalClients) {
TraceEvent("CreatingDatabaseOnExternalClient").detail("LibraryPath", it.second->libPath).detail("Failed", it.second->failed);
}
return mapThreadFuture<Reference<ICluster>, Reference<ICluster>>(clusterFuture, [this, clusterFile](ErrorOr<Reference<ICluster>> cluster) {
if(cluster.isError()) {
return cluster;
return mapThreadFuture<Reference<IDatabase>, Reference<IDatabase>>(databaseFuture, [this, clusterFile](ErrorOr<Reference<IDatabase>> database) {
if(database.isError()) {
return database;
}
return ErrorOr<Reference<ICluster>>(Reference<ICluster>(new MultiVersionCluster(this, clusterFile, cluster.get())));
return ErrorOr<Reference<IDatabase>>(Reference<IDatabase>(new MultiVersionDatabase(this, clusterFile, database.get())));
});
}
}

View File

@ -173,22 +173,6 @@ private:
FdbCApi::FDBDatabase* const db;
};
class DLCluster : public ICluster, ThreadSafeReferenceCounted<DLCluster> {
public:
DLCluster(Reference<FdbCApi> api, FdbCApi::FDBCluster *cluster) : api(api), cluster(cluster) {}
~DLCluster() { api->clusterDestroy(cluster); }
ThreadFuture<Reference<IDatabase>> createDatabase();
void setOption(FDBClusterOptions::Option option, Optional<StringRef> value = Optional<StringRef>());
void addref() { ThreadSafeReferenceCounted<DLCluster>::addref(); }
void delref() { ThreadSafeReferenceCounted<DLCluster>::delref(); }
private:
const Reference<FdbCApi> api;
FdbCApi::FDBCluster* const cluster;
};
class DLApi : public IClientApi {
public:
DLApi(std::string fdbCPath);
@ -201,7 +185,7 @@ public:
void runNetwork();
void stopNetwork();
ThreadFuture<Reference<ICluster>> createCluster(const char *clusterFilePath);
ThreadFuture<Reference<IDatabase>> createDatabase(const char *clusterFilePath);
void addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter);
@ -274,50 +258,6 @@ private:
void updateTransaction();
};
class MultiVersionCluster;
class MultiVersionDatabase : public IDatabase, ThreadSafeReferenceCounted<MultiVersionDatabase> {
public:
MultiVersionDatabase(Reference<MultiVersionCluster> cluster, Reference<IDatabase> db, ThreadFuture<Void> changed);
~MultiVersionDatabase();
Reference<ITransaction> createTransaction();
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>());
void addref() { ThreadSafeReferenceCounted<MultiVersionDatabase>::addref(); }
void delref() { ThreadSafeReferenceCounted<MultiVersionDatabase>::delref(); }
static Reference<IDatabase> debugCreateFromExistingDatabase(Reference<IDatabase> db);
private:
struct DatabaseState : ThreadCallback, ThreadSafeReferenceCounted<DatabaseState> {
DatabaseState(Reference<MultiVersionCluster> cluster, Reference<IDatabase> db, ThreadFuture<Void> changed);
void updateDatabase();
void cancelCallbacks();
bool canFire(int notMadeActive) { return true; }
void fire(const Void &unused, int& userParam);
void error(const Error& e, int& userParam);
const Reference<MultiVersionCluster> cluster;
Reference<IDatabase> db;
const Reference<ThreadSafeAsyncVar<Reference<IDatabase>>> dbVar;
ThreadFuture<Reference<IDatabase>> dbFuture;
ThreadFuture<Void> changed;
bool cancelled;
std::vector<std::pair<FDBDatabaseOptions::Option, Optional<Standalone<StringRef>>>> options;
Mutex optionLock;
};
const Reference<DatabaseState> dbState;
friend class MultiVersionTransaction;
};
struct ClientInfo : ThreadSafeReferenceCounted<ClientInfo> {
uint64_t protocolVersion;
IClientApi *api;
@ -336,23 +276,24 @@ struct ClientInfo : ThreadSafeReferenceCounted<ClientInfo> {
class MultiVersionApi;
class MultiVersionCluster : public ICluster, ThreadSafeReferenceCounted<MultiVersionCluster> {
class MultiVersionDatabase : public IDatabase, ThreadSafeReferenceCounted<MultiVersionDatabase> {
public:
MultiVersionCluster() : clusterState(new ClusterState()) {} // Used in testing workloads
MultiVersionCluster(MultiVersionApi *api, std::string clusterFilePath, Reference<ICluster> cluster);
~MultiVersionCluster();
MultiVersionDatabase(MultiVersionApi *api, std::string clusterFilePath, Reference<IDatabase> db, bool openConnectors=true);
~MultiVersionDatabase();
ThreadFuture<Reference<IDatabase>> createDatabase();
void setOption(FDBClusterOptions::Option option, Optional<StringRef> value = Optional<StringRef>());
Reference<ITransaction> createTransaction();
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>());
void addref() { ThreadSafeReferenceCounted<MultiVersionCluster>::addref(); }
void delref() { ThreadSafeReferenceCounted<MultiVersionCluster>::delref(); }
void addref() { ThreadSafeReferenceCounted<MultiVersionDatabase>::addref(); }
void delref() { ThreadSafeReferenceCounted<MultiVersionDatabase>::delref(); }
static Reference<IDatabase> debugCreateFromExistingDatabase(Reference<IDatabase> db);
private:
struct ClusterState;
struct DatabaseState;
struct Connector : ThreadCallback, ThreadSafeReferenceCounted<Connector> {
Connector(Reference<ClusterState> clusterState, Reference<ClientInfo> client, std::string clusterFilePath) : clusterState(clusterState), client(client), clusterFilePath(clusterFilePath), connected(false), cancelled(false) {}
Connector(Reference<DatabaseState> dbState, Reference<ClientInfo> client, std::string clusterFilePath) : dbState(dbState), client(client), clusterFilePath(clusterFilePath), connected(false), cancelled(false) {}
void connect();
void cancel();
@ -364,38 +305,43 @@ private:
const Reference<ClientInfo> client;
const std::string clusterFilePath;
const Reference<ClusterState> clusterState;
const Reference<DatabaseState> dbState;
ThreadFuture<Void> connectionFuture;
Reference<ICluster> candidateCluster;
Reference<IDatabase> candidateDatabase;
Reference<ITransaction> tr;
bool connected;
bool cancelled;
};
struct ClusterState : ThreadSafeReferenceCounted<ClusterState> {
ClusterState() : clusterVar(new ThreadSafeAsyncVar<Reference<ICluster>>(Reference<ICluster>(NULL))), currentClientIndex(-1) {}
struct DatabaseState : ThreadSafeReferenceCounted<DatabaseState> {
DatabaseState();
void stateChanged();
void addConnection(Reference<ClientInfo> client, std::string clusterFilePath);
void startConnections();
void cancelConnections();
Reference<ICluster> cluster;
const Reference<ThreadSafeAsyncVar<Reference<ICluster>>> clusterVar;
Reference<IDatabase> db;
const Reference<ThreadSafeAsyncVar<Reference<IDatabase>>> dbVar;
ThreadFuture<Reference<IDatabase>> dbFuture;
ThreadFuture<Void> changed;
bool cancelled;
int currentClientIndex;
std::vector<Reference<ClientInfo>> clients;
std::vector<Reference<Connector>> connectionAttempts;
std::vector<std::pair<FDBClusterOptions::Option, Optional<Standalone<StringRef>>>> options;
std::vector<std::pair<FDBDatabaseOptions::Option, Optional<Standalone<StringRef>>>> options;
Mutex optionLock;
};
const Reference<ClusterState> clusterState;
friend class MultiVersionDatabase;
const Reference<DatabaseState> dbState;
friend class MultiVersionTransaction;
};
class MultiVersionApi : public IClientApi {
@ -409,7 +355,7 @@ public:
void stopNetwork();
void addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter);
ThreadFuture<Reference<ICluster>> createCluster(const char *clusterFilePath);
ThreadFuture<Reference<IDatabase>> createDatabase(const char *clusterFilePath);
static MultiVersionApi* api;
Reference<ClientInfo> getLocalClient();
@ -449,5 +395,4 @@ private:
volatile bool envOptionsLoaded;
};
#endif

View File

@ -217,6 +217,7 @@ ACTOR Future<Void> databaseLogger( DatabaseContext *cx ) {
loop {
wait( delay( CLIENT_KNOBS->SYSTEM_MONITOR_INTERVAL, cx->taskID ) );
TraceEvent("TransactionMetrics")
.detail("Cluster", cx->cluster && cx->getConnectionFile() ? cx->getConnectionFile()->getConnectionString().clusterKeyName().toString() : "")
.detail("ReadVersions", cx->transactionReadVersions)
.detail("LogicalUncachedReads", cx->transactionLogicalReads)
.detail("PhysicalReadRequests", cx->transactionPhysicalReads)
@ -472,16 +473,17 @@ ACTOR static Future<Void> monitorMasterProxiesChange(Reference<AsyncVar<ClientDB
}
DatabaseContext::DatabaseContext(
Reference<AsyncVar<ClientDBInfo>> clientInfo,
Reference<Cluster> cluster, Future<Void> clientInfoMonitor,
Standalone<StringRef> dbId, int taskID, LocalityData clientLocality,
bool enableLocalityLoadBalance, bool lockAware )
: clientInfo(clientInfo), masterProxiesChangeTrigger(), cluster(cluster), clientInfoMonitor(clientInfoMonitor), dbId(dbId),
transactionReadVersions(0), transactionLogicalReads(0), transactionPhysicalReads(0), transactionCommittedMutations(0), transactionCommittedMutationBytes(0), transactionsCommitStarted(0),
transactionsCommitCompleted(0), transactionsTooOld(0), transactionsFutureVersions(0), transactionsNotCommitted(0), transactionsMaybeCommitted(0), transactionsResourceConstrained(0), taskID(taskID),
outstandingWatches(0), maxOutstandingWatches(CLIENT_KNOBS->DEFAULT_MAX_OUTSTANDING_WATCHES), clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance), lockAware(lockAware),
Reference<Cluster> cluster, Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor, Standalone<StringRef> dbId,
int taskID, LocalityData const& clientLocality, bool enableLocalityLoadBalance, bool lockAware, int apiVersion )
: cluster(cluster), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), dbId(dbId), taskID(taskID), clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance),
lockAware(lockAware), apiVersion(apiVersion),
transactionReadVersions(0), transactionLogicalReads(0), transactionPhysicalReads(0), transactionCommittedMutations(0), transactionCommittedMutationBytes(0),
transactionsCommitStarted(0), transactionsCommitCompleted(0), transactionsTooOld(0), transactionsFutureVersions(0), transactionsNotCommitted(0),
transactionsMaybeCommitted(0), transactionsResourceConstrained(0), outstandingWatches(0),
latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000)
{
maxOutstandingWatches = CLIENT_KNOBS->DEFAULT_MAX_OUTSTANDING_WATCHES;
logger = databaseLogger( this );
locationCacheSize = g_network->isSimulated() ?
CLIENT_KNOBS->LOCATION_CACHE_EVICTION_SIZE_SIM :
@ -533,15 +535,16 @@ ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<Cluster
}
}
Future< Database > DatabaseContext::createDatabase( Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<Cluster> cluster, LocalityData const& clientLocality ) {
Reference<AsyncVar<ClientDBInfo>> info( new AsyncVar<ClientDBInfo> );
Future<Void> monitor = monitorClientInfo( clusterInterface, cluster ? cluster->getConnectionFile() : Reference<ClusterConnectionFile>(), info );
Database DatabaseContext::create(Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<ClusterConnectionFile> connFile, LocalityData const& clientLocality) {
Reference<Cluster> cluster(new Cluster(connFile, clusterInterface));
Reference<AsyncVar<ClientDBInfo>> clientInfo(new AsyncVar<ClientDBInfo>());
Future<Void> clientInfoMonitor = monitorClientInfo(clusterInterface, connFile, clientInfo);
return std::move( Database( new DatabaseContext( info, cluster, monitor, LiteralStringRef(""), TaskDefaultEndpoint, clientLocality, true, false ) ) );
return Database(new DatabaseContext(cluster, clientInfo, clientInfoMonitor, LiteralStringRef(""), TaskDefaultEndpoint, clientLocality, true, false));
}
Database DatabaseContext::create( Reference<AsyncVar<ClientDBInfo>> info, Future<Void> dependency, LocalityData clientLocality, bool enableLocalityLoadBalance, int taskID, bool lockAware ) {
return Database( new DatabaseContext( info, Reference<Cluster>(), dependency, LiteralStringRef(""), taskID, clientLocality, enableLocalityLoadBalance, lockAware ) );
Database DatabaseContext::create(Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor, LocalityData clientLocality, bool enableLocalityLoadBalance, int taskID, bool lockAware, int apiVersion) {
return Database( new DatabaseContext( Reference<Cluster>(nullptr), clientInfo, clientInfoMonitor, LiteralStringRef(""), taskID, clientLocality, enableLocalityLoadBalance, lockAware, apiVersion ) );
}
DatabaseContext::~DatabaseContext() {
@ -683,65 +686,91 @@ void DatabaseContext::removeWatch() {
ASSERT(outstandingWatches >= 0);
}
Future<Void> DatabaseContext::onConnected() {
ASSERT(cluster);
return cluster->onConnected();
}
Reference<ClusterConnectionFile> DatabaseContext::getConnectionFile() {
ASSERT(cluster);
return cluster->getConnectionFile();
}
Database Database::createDatabase( Reference<ClusterConnectionFile> connFile, int apiVersion, LocalityData const& clientLocality ) {
Reference<Cluster> cluster(new Cluster(connFile, apiVersion));
Reference<AsyncVar<ClientDBInfo>> clientInfo(new AsyncVar<ClientDBInfo>());
Future<Void> clientInfoMonitor = monitorClientInfo(cluster->getClusterInterface(), connFile, clientInfo);
return Database( new DatabaseContext( cluster, clientInfo, clientInfoMonitor, LiteralStringRef(""), TaskDefaultEndpoint, clientLocality, true, false, apiVersion ) );
}
Database Database::createDatabase( std::string connFileName, int apiVersion, LocalityData const& clientLocality ) {
Reference<ClusterConnectionFile> rccf = Reference<ClusterConnectionFile>(new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFileName).first));
return Database::createDatabase(rccf, apiVersion, clientLocality);
}
extern uint32_t determinePublicIPAutomatically( ClusterConnectionString const& ccs );
Cluster::Cluster( Reference<ClusterConnectionFile> connFile, int apiVersion )
: clusterInterface( new AsyncVar<Optional<ClusterInterface>> ), apiVersion(apiVersion), connectionFile( connFile )
Cluster::Cluster( Reference<ClusterConnectionFile> connFile, int apiVersion )
: clusterInterface(new AsyncVar<Optional<ClusterInterface>>())
{
init(connFile, true, apiVersion);
}
Cluster::Cluster( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface)
: clusterInterface(clusterInterface)
{
init(connFile, true);
}
void Cluster::init( Reference<ClusterConnectionFile> connFile, bool startClientInfoMonitor, int apiVersion ) {
connectionFile = connFile;
connected = clusterInterface->onChange();
if(!g_network)
throw network_not_setup();
if(networkOptions.traceDirectory.present() && !traceFileIsOpen()) {
g_network->initMetrics();
FlowTransport::transport().initMetrics();
initTraceEventMetrics();
if(connFile) {
if(networkOptions.traceDirectory.present() && !traceFileIsOpen()) {
g_network->initMetrics();
FlowTransport::transport().initMetrics();
initTraceEventMetrics();
auto publicIP = determinePublicIPAutomatically( connFile->getConnectionString() );
openTraceFile(NetworkAddress(publicIP, ::getpid()), networkOptions.traceRollSize, networkOptions.traceMaxLogsSize, networkOptions.traceDirectory.get(), "trace", networkOptions.traceLogGroup);
auto publicIP = determinePublicIPAutomatically( connFile->getConnectionString() );
openTraceFile(NetworkAddress(publicIP, ::getpid()), networkOptions.traceRollSize, networkOptions.traceMaxLogsSize, networkOptions.traceDirectory.get(), "trace", networkOptions.traceLogGroup);
TraceEvent("ClientStart")
.detail("SourceVersion", getHGVersion())
.detail("Version", FDB_VT_VERSION)
.detail("PackageName", FDB_VT_PACKAGE_NAME)
.detail("ClusterFile", connFile->getFilename().c_str())
.detail("ConnectionString", connFile->getConnectionString().toString())
.detailf("ActualTime", "%lld", DEBUG_DETERMINISM ? 0 : time(NULL))
.detail("ApiVersion", apiVersion)
.detailf("ImageOffset", "%p", platform::getImageOffset())
.trackLatest("ClientStart");
TraceEvent("ClientStart")
.detail("SourceVersion", getHGVersion())
.detail("Version", FDB_VT_VERSION)
.detail("PackageName", FDB_VT_PACKAGE_NAME)
.detail("ClusterFile", connFile->getFilename().c_str())
.detail("ConnectionString", connFile->getConnectionString().toString())
.detailf("ActualTime", "%lld", DEBUG_DETERMINISM ? 0 : time(NULL))
.detail("ApiVersion", apiVersion)
.detailf("ImageOffset", "%p", platform::getImageOffset())
.trackLatest("ClientStart");
initializeSystemMonitorMachineState(SystemMonitorMachineState(publicIP));
initializeSystemMonitorMachineState(SystemMonitorMachineState(publicIP));
systemMonitor();
uncancellable( recurring( &systemMonitor, CLIENT_KNOBS->SYSTEM_MONITOR_INTERVAL, TaskFlushTrace ) );
systemMonitor();
uncancellable( recurring( &systemMonitor, CLIENT_KNOBS->SYSTEM_MONITOR_INTERVAL, TaskFlushTrace ) );
}
leaderMon = monitorLeader( connFile, clusterInterface );
failMon = failureMonitorClient( clusterInterface, false );
}
leaderMon = monitorLeader( connectionFile, clusterInterface );
failMon = failureMonitorClient( clusterInterface, false );
connected = clusterInterface->onChange(); // SOMEDAY: This is a hack
}
Cluster::~Cluster() {}
Reference<Cluster> Cluster::createCluster( Reference<ClusterConnectionFile> connFile, int apiVersion ) {
return Reference<Cluster>( new Cluster( connFile, apiVersion ) );
}
Reference<Cluster> Cluster::createCluster(std::string connFileName, int apiVersion) {
Reference<ClusterConnectionFile> rccf = Reference<ClusterConnectionFile>(new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFileName).first));
return Reference<Cluster>(new Cluster( rccf, apiVersion));
}
Future<Database> Cluster::createDatabase( LocalityData locality ) {
return DatabaseContext::createDatabase( clusterInterface, Reference<Cluster>::addRef( this ), locality );
Reference<AsyncVar<Optional<struct ClusterInterface>>> Cluster::getClusterInterface() {
return clusterInterface;
}
Future<Void> Cluster::onConnected() {
return connected;
}
void Cluster::setOption(FDBClusterOptions::Option option, Optional<StringRef> value) { }
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value) {
switch(option) {
// SOMEDAY: If the network is already started, should these three throw an error?
@ -2211,14 +2240,9 @@ void Transaction::fullReset() {
}
int Transaction::apiVersionAtLeast(int minVersion) const {
if(cx->cluster)
return cx->cluster->apiVersionAtLeast(minVersion);
return true;
return cx->apiVersionAtLeast(minVersion);
}
class MutationBlock {
public:
bool mutated;

View File

@ -57,9 +57,13 @@ struct NetworkOptions {
{ }
};
class Database {
public:
enum { API_VERSION_LATEST = -1 };
static Database createDatabase( Reference<ClusterConnectionFile> connFile, int apiVersion, LocalityData const& clientLocality=LocalityData() );
static Database createDatabase( std::string connFileName, int apiVersion, LocalityData const& clientLocality=LocalityData() );
Database() {} // an uninitialized database can be destructed or reassigned safely; that's it
void operator= ( Database const& rhs ) { db = rhs.db; }
Database( Database const& rhs ) : db(rhs.db) {}
@ -70,6 +74,7 @@ public:
explicit Database(Reference<DatabaseContext> cx) : db(cx) {}
explicit Database( DatabaseContext* cx ) : db(cx) {}
inline DatabaseContext* getPtr() const { return db.getPtr(); }
inline DatabaseContext* extractPtr() { return db.extractPtr(); }
DatabaseContext* operator->() const { return db.getPtr(); }
private:
@ -103,40 +108,25 @@ void stopNetwork();
*/
class Cluster : public ReferenceCounted<Cluster>, NonCopyable {
public:
enum { API_VERSION_LATEST = -1 };
// Constructs a Cluster. This uses the global networking enging configured in setupNetwork()
// apiVersion may be set to API_VERSION_LATEST
static Reference<Cluster> createCluster( Reference<ClusterConnectionFile> connFile, int apiVersion );
static Reference<Cluster> createCluster(std::string connFileName, int apiVersion);
// See DatabaseContext::createDatabase
Future<Database> createDatabase( LocalityData locality = LocalityData() );
void setOption(FDBClusterOptions::Option option, Optional<StringRef> value);
Reference<ClusterConnectionFile> getConnectionFile() { return connectionFile; }
Cluster(Reference<ClusterConnectionFile> connFile, int apiVersion=Database::API_VERSION_LATEST);
Cluster(Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<Optional<struct ClusterInterface>>> clusterInterface);
~Cluster();
int apiVersionAtLeast(int minVersion) { return apiVersion < 0 || apiVersion >= minVersion; }
Reference<AsyncVar<Optional<struct ClusterInterface>>> getClusterInterface();
Reference<ClusterConnectionFile> getConnectionFile() { return connectionFile; }
Future<Void> onConnected(); // Returns after a majority of coordination servers are available and have reported a leader. The cluster file therefore is valid, but the database might be unavailable.
Future<Void> onConnected();
Error deferred_error;
void checkDeferredError() { if (deferred_error.code() != invalid_error_code) throw deferred_error; }
private: friend class ThreadSafeCluster;
friend class AtomicOpsApiCorrectnessWorkload; // This is just for testing purposes. It needs to change apiVersion
friend class AtomicOpsWorkload; // This is just for testing purposes. It needs to change apiVersion
friend class VersionStampWorkload; // This is just for testing purposes. It needs to change apiVersion
Cluster( Reference<ClusterConnectionFile> connFile, int apiVersion = API_VERSION_LATEST );
private:
void init(Reference<ClusterConnectionFile> connFile, bool startClientInfoMonitor, int apiVersion=Database::API_VERSION_LATEST);
Reference<AsyncVar<Optional<struct ClusterInterface>>> clusterInterface;
Future<Void> leaderMon, failMon, connected;
Reference<ClusterConnectionFile> connectionFile;
int apiVersion;
Future<Void> leaderMon;
Future<Void> failMon;
Future<Void> connected;
};
struct StorageMetrics;

View File

@ -1159,8 +1159,8 @@ Future< Optional<Value> > ReadYourWritesTransaction::get( const Key& key, bool s
TEST(true);
if (key == LiteralStringRef("\xff\xff/status/json")){
if (tr.getDatabase().getPtr() && tr.getDatabase()->cluster && tr.getDatabase()->cluster->getConnectionFile()) {
return getJSON(tr.getDatabase()->cluster->getConnectionFile());
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
return getJSON(tr.getDatabase()->getConnectionFile());
}
else {
return Optional<Value>();
@ -1169,8 +1169,8 @@ Future< Optional<Value> > ReadYourWritesTransaction::get( const Key& key, bool s
if (key == LiteralStringRef("\xff\xff/cluster_file_path")) {
try {
if (tr.getDatabase().getPtr() && tr.getDatabase()->cluster && tr.getDatabase()->cluster->getConnectionFile()) {
Optional<Value> output = StringRef(tr.getDatabase()->cluster->getConnectionFile()->getFilename());
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
Optional<Value> output = StringRef(tr.getDatabase()->getConnectionFile()->getFilename());
return output;
}
}
@ -1182,8 +1182,8 @@ Future< Optional<Value> > ReadYourWritesTransaction::get( const Key& key, bool s
if (key == LiteralStringRef("\xff\xff/connection_string")){
try {
if (tr.getDatabase().getPtr() && tr.getDatabase()->cluster && tr.getDatabase()->cluster->getConnectionFile()) {
Reference<ClusterConnectionFile> f = tr.getDatabase()->cluster->getConnectionFile();
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
Reference<ClusterConnectionFile> f = tr.getDatabase()->getConnectionFile();
Optional<Value> output = StringRef(f->getConnectionString().toString());
return output;
}
@ -1237,8 +1237,8 @@ Future< Standalone<RangeResultRef> > ReadYourWritesTransaction::getRange(
bool reverse )
{
if (begin.getKey() == LiteralStringRef("\xff\xff/worker_interfaces")){
if (tr.getDatabase().getPtr() && tr.getDatabase()->cluster && tr.getDatabase()->cluster->getConnectionFile()) {
return getWorkerInterfaces(tr.getDatabase()->cluster->getConnectionFile());
if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) {
return getWorkerInterfaces(tr.getDatabase()->getConnectionFile());
}
else {
return Standalone<RangeResultRef>();
@ -1789,7 +1789,7 @@ void ReadYourWritesTransaction::operator=(ReadYourWritesTransaction&& r) noexcep
reading = std::move( r.reading );
resetPromise = std::move( r.resetPromise );
r.resetPromise = Promise<Void>();
deferred_error = std::move( r.deferred_error );
deferredError = std::move( r.deferredError );
retries = r.retries;
timeoutActor = r.timeoutActor;
creationTime = r.creationTime;
@ -1807,7 +1807,7 @@ ReadYourWritesTransaction::ReadYourWritesTransaction(ReadYourWritesTransaction&&
reading( std::move(r.reading) ),
retries( r.retries ),
creationTime( r.creationTime ),
deferred_error( std::move(r.deferred_error) ),
deferredError( std::move(r.deferredError) ),
timeoutActor( std::move(r.timeoutActor) ),
resetPromise( std::move(r.resetPromise) ),
commitStarted( r.commitStarted ),
@ -1839,7 +1839,7 @@ void ReadYourWritesTransaction::resetRyow() {
reading = AndFuture();
commitStarted = false;
deferred_error = Error();
deferredError = Error();
if(tr.apiVersionAtLeast(16)) {
options.reset(tr);

View File

@ -124,9 +124,9 @@ public:
Future<Void> debug_onIdle() { return reading; }
// Used by ThreadSafeTransaction for exceptions thrown in void methods
Error deferred_error;
Error deferredError;
void checkDeferredError() { tr.checkDeferredError(); if (deferred_error.code() != invalid_error_code) throw deferred_error; }
void checkDeferredError() { tr.checkDeferredError(); if (deferredError.code() != invalid_error_code) throw deferredError; }
void getWriteConflicts( KeyRangeMap<bool> *result );

View File

@ -30,59 +30,29 @@
// Users of ThreadSafeTransaction might share Reference<ThreadSafe...> between different threads as long as they don't call addRef (e.g. C API follows this).
// Therefore, it is unsafe to call (explicitly or implicitly) this->addRef in any of these functions.
Reference<ICluster> constructThreadSafeCluster( Cluster* cluster ) {
return Reference<ICluster>( new ThreadSafeCluster(cluster) );
Reference<IDatabase> constructThreadSafeDatabase( Database db ) {
return Reference<IDatabase>( new ThreadSafeDatabase(db.extractPtr()) );
}
Future<Reference<ICluster>> createThreadSafeCluster( std::string connFilename, int apiVersion ) {
Reference<Cluster> cluster = Cluster::createCluster( connFilename, apiVersion );
return constructThreadSafeCluster( cluster.extractPtr() );
Future<Reference<IDatabase>> createThreadSafeDatabase( std::string connFilename, int apiVersion ) {
Database db = Database::createDatabase( connFilename, apiVersion );
return constructThreadSafeDatabase( db );
}
ThreadFuture<Reference<ICluster>> ThreadSafeCluster::create( std::string connFilename, int apiVersion ) {
if (!g_network) return ThreadFuture<Reference<ICluster>>(network_not_setup());
return onMainThread( [connFilename, apiVersion](){ return createThreadSafeCluster( connFilename, apiVersion ); } );
ThreadFuture<Reference<IDatabase>> ThreadSafeDatabase::create( std::string connFilename, int apiVersion ) {
if (!g_network) return ThreadFuture<Reference<IDatabase>>(network_not_setup());
return onMainThread( [connFilename, apiVersion](){ return createThreadSafeDatabase( connFilename, apiVersion ); } );
}
ThreadFuture<Void> ThreadSafeCluster::onConnected() {
Cluster* cluster = this->cluster;
return onMainThread( [cluster]() -> Future<Void> {
cluster->checkDeferredError();
return cluster->onConnected();
} );
}
void ThreadSafeCluster::setOption( FDBClusterOptions::Option option, Optional<StringRef> value ) {
Cluster* cluster = this->cluster;
Standalone<Optional<StringRef>> passValue = value;
onMainThreadVoid( [cluster, option, passValue](){ cluster->setOption(option, passValue.contents()); }, &cluster->deferred_error );
}
ThreadSafeCluster::~ThreadSafeCluster() {
Cluster* cluster = this->cluster;
onMainThreadVoid( [cluster](){ cluster->delref(); }, NULL );
}
Future<Reference<IDatabase>> threadSafeCreateDatabase( Database db ) {
db.getPtr()->addref();
return Reference<IDatabase>(new ThreadSafeDatabase(db.getPtr()));
}
ACTOR Future<Reference<IDatabase>> threadSafeCreateDatabase( Cluster* cluster ) {
Database db = wait( cluster->createDatabase() );
Reference<IDatabase> threadSafeDb = wait(threadSafeCreateDatabase(db));
return threadSafeDb;
}
ThreadFuture<Reference<IDatabase>> ThreadSafeCluster::createDatabase() {
Cluster* cluster = this->cluster;
return onMainThread( [cluster](){
cluster->checkDeferredError();
return threadSafeCreateDatabase(cluster);
ThreadFuture<Void> ThreadSafeDatabase::onConnected() {
DatabaseContext* db = this->db;
return onMainThread( [db]() -> Future<Void> {
db->checkDeferredError();
return db->onConnected();
} );
}
ThreadFuture<Reference<IDatabase>> ThreadSafeDatabase::createFromExistingDatabase(Database db) {
return onMainThread( [db](){
db->checkDeferredError();
return threadSafeCreateDatabase(db);
return Future<Reference<IDatabase>>(constructThreadSafeDatabase(db));
});
}
@ -98,7 +68,7 @@ Database ThreadSafeDatabase::unsafeGetDatabase() const {
void ThreadSafeDatabase::setOption( FDBDatabaseOptions::Option option, Optional<StringRef> value) {
DatabaseContext *db = this->db;
Standalone<Optional<StringRef>> passValue = value;
onMainThreadVoid( [db, option, passValue](){ db->setOption(option, passValue.contents()); }, &db->deferred_error );
onMainThreadVoid( [db, option, passValue](){ db->setOption(option, passValue.contents()); }, &db->deferredError );
}
ThreadSafeDatabase::~ThreadSafeDatabase() {
@ -133,7 +103,7 @@ void ThreadSafeTransaction::cancel() {
void ThreadSafeTransaction::setVersion( Version v ) {
ReadYourWritesTransaction *tr = this->tr;
onMainThreadVoid( [tr, v](){ tr->setVersion(v); }, &tr->deferred_error );
onMainThreadVoid( [tr, v](){ tr->setVersion(v); }, &tr->deferredError );
}
ThreadFuture<Version> ThreadSafeTransaction::getReadVersion() {
@ -200,12 +170,12 @@ void ThreadSafeTransaction::addReadConflictRange( const KeyRangeRef& keys) {
KeyRange r = keys;
ReadYourWritesTransaction *tr = this->tr;
onMainThreadVoid( [tr, r](){ tr->addReadConflictRange(r); }, &tr->deferred_error );
onMainThreadVoid( [tr, r](){ tr->addReadConflictRange(r); }, &tr->deferredError );
}
void ThreadSafeTransaction::makeSelfConflicting() {
ReadYourWritesTransaction *tr = this->tr;
onMainThreadVoid( [tr](){ tr->makeSelfConflicting(); }, &tr->deferred_error );
onMainThreadVoid( [tr](){ tr->makeSelfConflicting(); }, &tr->deferredError );
}
void ThreadSafeTransaction::atomicOp( const KeyRef& key, const ValueRef& value, uint32_t operationType ) {
@ -213,7 +183,7 @@ void ThreadSafeTransaction::atomicOp( const KeyRef& key, const ValueRef& value,
Value v = value;
ReadYourWritesTransaction *tr = this->tr;
onMainThreadVoid( [tr, k, v, operationType](){ tr->atomicOp(k, v, operationType); }, &tr->deferred_error );
onMainThreadVoid( [tr, k, v, operationType](){ tr->atomicOp(k, v, operationType); }, &tr->deferredError );
}
void ThreadSafeTransaction::set( const KeyRef& key, const ValueRef& value ) {
@ -221,14 +191,14 @@ void ThreadSafeTransaction::set( const KeyRef& key, const ValueRef& value ) {
Value v = value;
ReadYourWritesTransaction *tr = this->tr;
onMainThreadVoid( [tr, k, v](){ tr->set(k, v); }, &tr->deferred_error );
onMainThreadVoid( [tr, k, v](){ tr->set(k, v); }, &tr->deferredError );
}
void ThreadSafeTransaction::clear( const KeyRangeRef& range ) {
KeyRange r = range;
ReadYourWritesTransaction *tr = this->tr;
onMainThreadVoid( [tr, r](){ tr->clear(r); }, &tr->deferred_error );
onMainThreadVoid( [tr, r](){ tr->clear(r); }, &tr->deferredError );
}
void ThreadSafeTransaction::clear( const KeyRef& begin, const KeyRef& end ) {
@ -241,14 +211,14 @@ void ThreadSafeTransaction::clear( const KeyRef& begin, const KeyRef& end ) {
throw inverted_range();
tr->clear(KeyRangeRef(b, e));
}, &tr->deferred_error );
}, &tr->deferredError );
}
void ThreadSafeTransaction::clear( const KeyRef& key ) {
Key k = key;
ReadYourWritesTransaction *tr = this->tr;
onMainThreadVoid( [tr, k](){ tr->clear(k); }, &tr->deferred_error );
onMainThreadVoid( [tr, k](){ tr->clear(k); }, &tr->deferredError );
}
ThreadFuture< Void > ThreadSafeTransaction::watch( const KeyRef& key ) {
@ -265,7 +235,7 @@ void ThreadSafeTransaction::addWriteConflictRange( const KeyRangeRef& keys) {
KeyRange r = keys;
ReadYourWritesTransaction *tr = this->tr;
onMainThreadVoid( [tr, r](){ tr->addWriteConflictRange(r); }, &tr->deferred_error );
onMainThreadVoid( [tr, r](){ tr->addWriteConflictRange(r); }, &tr->deferredError );
}
ThreadFuture< Void > ThreadSafeTransaction::commit() {
@ -292,7 +262,7 @@ ThreadFuture<Standalone<StringRef>> ThreadSafeTransaction::getVersionstamp() {
void ThreadSafeTransaction::setOption( FDBTransactionOptions::Option option, Optional<StringRef> value ) {
ReadYourWritesTransaction *tr = this->tr;
Standalone<Optional<StringRef>> passValue = value;
onMainThreadVoid( [tr, option, passValue](){ tr->setOption(option, passValue.contents()); }, &tr->deferred_error );
onMainThreadVoid( [tr, option, passValue](){ tr->setOption(option, passValue.contents()); }, &tr->deferredError );
}
ThreadFuture<Void> ThreadSafeTransaction::checkDeferredError() {
@ -301,7 +271,7 @@ ThreadFuture<Void> ThreadSafeTransaction::checkDeferredError() {
try {
tr->checkDeferredError();
} catch (Error &e) {
tr->deferred_error = Error();
tr->deferredError = Error();
return Future<Void>(e);
}
return Future<Void>(Void());
@ -387,8 +357,8 @@ void ThreadSafeApi::stopNetwork() {
::stopNetwork();
}
ThreadFuture<Reference<ICluster>> ThreadSafeApi::createCluster(const char *clusterFilePath) {
return ThreadSafeCluster::create(clusterFilePath, apiVersion);
ThreadFuture<Reference<IDatabase>> ThreadSafeApi::createDatabase(const char *clusterFilePath) {
return ThreadSafeDatabase::create(clusterFilePath, apiVersion);
}
void ThreadSafeApi::addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter) {

View File

@ -27,41 +27,22 @@
#include "fdbclient/ClusterInterface.h"
#include "fdbclient/IClientApi.h"
class ThreadSafeDatabase;
class ThreadSafeCluster : public ICluster, public ThreadSafeReferenceCounted<ThreadSafeCluster>, private NonCopyable {
public:
static ThreadFuture<Reference<ICluster>> create( std::string connFilename, int apiVersion = -1 );
~ThreadSafeCluster();
ThreadFuture<Reference<IDatabase>> createDatabase();
void setOption( FDBClusterOptions::Option option, Optional<StringRef> value = Optional<StringRef>() );
ThreadFuture<Void> onConnected(); // Returns after a majority of coordination servers are available and have reported a leader. The cluster file therefore is valid, but the database might be unavailable.
void addref() { ThreadSafeReferenceCounted<ThreadSafeCluster>::addref(); }
void delref() { ThreadSafeReferenceCounted<ThreadSafeCluster>::delref(); }
private:
ThreadSafeCluster( Cluster* cluster ) : cluster(cluster) { }
Cluster* cluster;
friend Reference<ICluster> constructThreadSafeCluster( Cluster* cluster );
};
class ThreadSafeDatabase : public IDatabase, public ThreadSafeReferenceCounted<ThreadSafeDatabase> {
public:
~ThreadSafeDatabase();
static ThreadFuture<Reference<IDatabase>> create( std::string connFilename, int apiVersion=-1 );
static ThreadFuture<Reference<IDatabase>> createFromExistingDatabase(Database cx);
Reference<ITransaction> createTransaction();
void setOption( FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>() );
ThreadFuture<Void> onConnected(); // Returns after a majority of coordination servers are available and have reported a leader. The cluster file therefore is valid, but the database might be unavailable.
void addref() { ThreadSafeReferenceCounted<ThreadSafeDatabase>::addref(); }
void delref() { ThreadSafeReferenceCounted<ThreadSafeDatabase>::delref(); }
private:
friend class ThreadSafeCluster;
friend class ThreadSafeTransaction;
DatabaseContext* db;
public: // Internal use only
@ -138,7 +119,7 @@ public:
void runNetwork();
void stopNetwork();
ThreadFuture<Reference<ICluster>> createCluster(const char *clusterFilePath);
ThreadFuture<Reference<IDatabase>> createDatabase(const char *clusterFilePath);
void addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter);

View File

@ -131,8 +131,7 @@ ACTOR Future<Void> runBackup( Reference<ClusterConnectionFile> connFile ) {
}
if (g_simulator.backupAgents == ISimulator::BackupToFile) {
Reference<Cluster> cluster = Cluster::createCluster(connFile, -1);
Database cx = cluster->createDatabase().get();
Database cx = Database::createDatabase(connFile, -1);
state FileBackupAgent fileAgent;
state double backupPollDelay = 1.0 / CLIENT_KNOBS->BACKUP_AGGREGATE_POLL_RATE;
@ -159,12 +158,10 @@ ACTOR Future<Void> runDr( Reference<ClusterConnectionFile> connFile ) {
}
if (g_simulator.drAgents == ISimulator::BackupToDB) {
Reference<Cluster> cluster = Cluster::createCluster(connFile, -1);
Database cx = cluster->createDatabase().get();
Database cx = Database::createDatabase(connFile, -1);
Reference<ClusterConnectionFile> extraFile(new ClusterConnectionFile(*g_simulator.extraDB));
Reference<Cluster> extraCluster = Cluster::createCluster(extraFile, -1);
state Database extraDB = extraCluster->createDatabase().get();
state Database extraDB = Database::createDatabase(extraFile, -1);
TraceEvent("StartingDrAgents").detail("ConnFile", connFile->getConnectionString().toString()).detail("ExtraString", extraFile->getConnectionString().toString());

View File

@ -497,10 +497,7 @@ ACTOR Future<Void> testerServerWorkload( WorkloadRequest work, Reference<Cluster
startRole(Role::TESTER, workIface.id(), UID(), details);
if( work.useDatabase ) {
Reference<Cluster> cluster = Cluster::createCluster(ccf->getFilename(), -1);
Database _cx = wait(cluster->createDatabase(locality));
cx = _cx;
cx = Database::createDatabase(ccf, -1, locality);
wait( delay(1.0) );
}
@ -1030,8 +1027,7 @@ ACTOR Future<Void> runTests( Reference<AsyncVar<Optional<struct ClusterControlle
databasePingDelay = 0.0;
if (useDB) {
Database _cx = wait( DatabaseContext::createDatabase( ci, Reference<Cluster>(), locality ) );
cx = _cx;
cx = DatabaseContext::create(ci, Reference<ClusterConnectionFile>(), locality);
}
state Future<Void> disabler = disableConnectionFailuresAfter(450, "Tester");

View File

@ -515,8 +515,8 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
if(metricsPrefix.size() > 0) {
if( metricsConnFile.size() > 0) {
try {
state Reference<Cluster> cluster = Cluster::createCluster( metricsConnFile, Cluster::API_VERSION_LATEST );
metricsLogger = runMetrics( cluster->createDatabase(locality), KeyRef(metricsPrefix) );
state Database db = Database::createDatabase(metricsConnFile, Database::API_VERSION_LATEST, locality);
metricsLogger = runMetrics( db, KeyRef(metricsPrefix) );
} catch(Error &e) {
TraceEvent(SevWarnAlways, "TDMetricsBadClusterFile").error(e).detail("ConnFile", metricsConnFile);
}

View File

@ -284,6 +284,7 @@ ACTOR Future<Void> chooseTransactionFactory(Database cx, std::vector<Transaction
else if(transactionType == MULTI_VERSION)
{
printf("client %d: Running Multi-Version Transactions\n", self->clientPrefixInt);
MultiVersionApi::api->selectApiVersion(cx->apiVersion);
Reference<IDatabase> threadSafeHandle = wait(unsafeThreadFutureToFuture(ThreadSafeDatabase::createFromExistingDatabase(cx)));
Reference<IDatabase> dbHandle = MultiVersionDatabase::debugCreateFromExistingDatabase(threadSafeHandle);
self->transactionFactory = Reference<TransactionFactoryInterface>(new TransactionFactory<ThreadTransactionWrapper, Reference<IDatabase>>(dbHandle, dbHandle, false));

View File

@ -281,8 +281,7 @@ struct ApiWorkload : TestWorkload {
useExtraDB = g_simulator.extraDB != NULL;
if(useExtraDB) {
Reference<ClusterConnectionFile> extraFile(new ClusterConnectionFile(*g_simulator.extraDB));
Reference<Cluster> extraCluster = Cluster::createCluster(extraFile, -1);
extraDB = extraCluster->createDatabase().get();
extraDB = Database::createDatabase(extraFile, -1);
}
}

View File

@ -94,7 +94,7 @@ struct AtomicOpsWorkload : TestWorkload {
virtual Future<Void> setup( Database const& cx ) {
if (apiVersion500)
cx->cluster->apiVersion = 500;
cx->apiVersion = 500;
if(clientId != 0)
return Void();

View File

@ -30,11 +30,11 @@ struct AtomicOpsApiCorrectnessWorkload : TestWorkload {
private:
static int getApiVersion(const Database &cx) {
return cx->cluster->apiVersion;
return cx->apiVersion;
}
static void setApiVersion(Database *cx, int version) {
(*cx)->cluster->apiVersion = version;
(*cx)->apiVersion = version;
}
Key getTestKey(std::string prefix) {

View File

@ -40,8 +40,7 @@ struct AtomicSwitchoverWorkload : TestWorkload {
backupRanges.push_back_deep(backupRanges.arena(), normalKeys);
Reference<ClusterConnectionFile> extraFile(new ClusterConnectionFile(*g_simulator.extraDB));
Reference<Cluster> extraCluster = Cluster::createCluster(extraFile, -1);
extraDB = extraCluster->createDatabase().get();
extraDB = Database::createDatabase(extraFile, -1);
}
virtual std::string description() {

View File

@ -37,8 +37,7 @@ struct BackupToDBAbort : TestWorkload {
backupRanges.push_back_deep(backupRanges.arena(), normalKeys);
Reference<ClusterConnectionFile> extraFile(new ClusterConnectionFile(*g_simulator.extraDB));
Reference<Cluster> extraCluster = Cluster::createCluster(extraFile, -1);
extraDB = extraCluster->createDatabase().get();
extraDB = Database::createDatabase(extraFile, -1);
lockid = UID(0xbeeffeed, 0xdecaf00d);
}

View File

@ -104,8 +104,7 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
}
Reference<ClusterConnectionFile> extraFile(new ClusterConnectionFile(*g_simulator.extraDB));
Reference<Cluster> extraCluster = Cluster::createCluster(extraFile, -1);
extraDB = extraCluster->createDatabase().get();
extraDB = Database::createDatabase(extraFile, -1);
TraceEvent("BARW_Start").detail("Locked", locked);
}

View File

@ -69,8 +69,7 @@ struct BackupToDBUpgradeWorkload : TestWorkload {
}
Reference<ClusterConnectionFile> extraFile(new ClusterConnectionFile(*g_simulator.extraDB));
Reference<Cluster> extraCluster = Cluster::createCluster(extraFile, -1);
extraDB = extraCluster->createDatabase().get();
extraDB = Database::createDatabase(extraFile, -1);
TraceEvent("DRU_Start");
}

View File

@ -57,8 +57,7 @@ struct ChangeConfigWorkload : TestWorkload {
ACTOR Future<Void> extraDatabaseConfigure(ChangeConfigWorkload *self) {
if (g_network->isSimulated() && g_simulator.extraDB) {
Reference<ClusterConnectionFile> extraFile(new ClusterConnectionFile(*g_simulator.extraDB));
Reference<Cluster> cluster = Cluster::createCluster(extraFile, -1);
state Database extraDB = cluster->createDatabase().get();
state Database extraDB = Database::createDatabase(extraFile, -1);
wait(delay(5*g_random->random01()));
if (self->configMode.size()) {

View File

@ -60,12 +60,8 @@ struct StatusWorkload : TestWorkload {
virtual Future<Void> start(Database const& cx) {
if (clientId != 0)
return Void();
Reference<Cluster> cluster = cx->cluster;
if (!cluster) {
TraceEvent(SevError, "StatusWorkloadStartError").detail("Reason", "NULL cluster");
return Void();
}
return success(timeout(fetcher(cluster->getConnectionFile(), this), testDuration));
return success(timeout(fetcher(cx->getConnectionFile(), this), testDuration));
}
virtual Future<bool> check(Database const& cx) {
return errors.getValue() == 0;

View File

@ -158,6 +158,7 @@ struct ThreadSafetyWorkload : TestWorkload {
self->db = dbRef;
if(g_random->coinflip()) {
MultiVersionApi::api->selectApiVersion(cx->apiVersion);
self->db = MultiVersionDatabase::debugCreateFromExistingDatabase(dbRef);
}

View File

@ -73,10 +73,10 @@ struct VersionStampWorkload : TestWorkload {
apiVersion = 520;
}
else {
apiVersion = Cluster::API_VERSION_LATEST;
apiVersion = Database::API_VERSION_LATEST;
}
TraceEvent("VersionStampApiVersion").detail("ApiVersion", apiVersion);
cx->cluster->apiVersion = apiVersion;
cx->apiVersion = apiVersion;
if (clientId == 0)
return _start(cx, this, 1 / transactionsPerSecond);
return Void();
@ -151,8 +151,7 @@ struct VersionStampWorkload : TestWorkload {
ACTOR Future<bool> _check(Database cx, VersionStampWorkload* self) {
if (self->validateExtraDB) {
Reference<ClusterConnectionFile> extraFile(new ClusterConnectionFile(*g_simulator.extraDB));
Reference<Cluster> extraCluster = Cluster::createCluster(extraFile, -1);
cx = extraCluster->createDatabase().get();
cx = Database::createDatabase(extraFile, -1);
}
state ReadYourWritesTransaction tr(cx);
// We specifically wish to grab the smalles read version that we can get and maintain it, to
@ -242,13 +241,12 @@ struct VersionStampWorkload : TestWorkload {
if (g_simulator.extraDB != NULL) {
Reference<ClusterConnectionFile> extraFile(new ClusterConnectionFile(*g_simulator.extraDB));
Reference<Cluster> extraCluster = Cluster::createCluster(extraFile, -1);
state Database extraDB = extraCluster->createDatabase().get();
state Database extraDB = Database::createDatabase(extraFile, -1);
}
loop{
wait(poisson(&lastTime, delay));
bool oldVSFormat = !cx->cluster->apiVersionAtLeast(520);
bool oldVSFormat = !cx->apiVersionAtLeast(520);
state bool cx_is_primary = true;
state ReadYourWritesTransaction tr(cx);

View File

@ -77,8 +77,7 @@ struct WriteDuringReadWorkload : TestWorkload {
useExtraDB = g_simulator.extraDB != NULL;
if(useExtraDB) {
Reference<ClusterConnectionFile> extraFile(new ClusterConnectionFile(*g_simulator.extraDB));
Reference<Cluster> extraCluster = Cluster::createCluster(extraFile, -1);
extraDB = extraCluster->createDatabase().get();
extraDB = Database::createDatabase(extraFile, -1);
useSystemKeys = false;
}