Merge pull request #712 from ajbeamon/remove-database-name-internal
Eliminate use of database names (phase 1)
This commit is contained in:
commit
4eaff42e4f
|
@ -315,10 +315,13 @@ void fdb_cluster_destroy( FDBCluster* c ) {
|
|||
|
||||
extern "C" DLLEXPORT
|
||||
FDBFuture* fdb_cluster_create_database( FDBCluster* c, uint8_t const* db_name,
|
||||
int db_name_length ) {
|
||||
return (FDBFuture*)
|
||||
( CLUSTER(c)->createDatabase( StringRef( db_name,
|
||||
db_name_length ) ).extractPtr() );
|
||||
int db_name_length )
|
||||
{
|
||||
if(strncmp((const char*)db_name, "DB", db_name_length) != 0) {
|
||||
return (FDBFuture*)ThreadFuture<Reference<IDatabase>>(invalid_database_name()).extractPtr();
|
||||
}
|
||||
|
||||
return (FDBFuture*)CLUSTER(c)->createDatabase().extractPtr();
|
||||
}
|
||||
|
||||
extern "C" DLLEXPORT
|
||||
|
|
|
@ -2672,7 +2672,6 @@ int main(int argc, char* argv[]) {
|
|||
Reference<Cluster> source_cluster;
|
||||
Reference<ClusterConnectionFile> source_ccf;
|
||||
Database source_db;
|
||||
const KeyRef databaseKey = LiteralStringRef("DB");
|
||||
FileBackupAgent ba;
|
||||
Key tag;
|
||||
Future<Optional<Void>> f;
|
||||
|
@ -2746,7 +2745,7 @@ int main(int argc, char* argv[]) {
|
|||
.detail("MemoryLimit", memLimit)
|
||||
.trackLatest("ProgramStart");
|
||||
|
||||
db = cluster->createDatabase(databaseKey, localities).get();
|
||||
db = cluster->createDatabase(localities).get();
|
||||
return true;
|
||||
};
|
||||
|
||||
|
@ -2769,7 +2768,7 @@ int main(int argc, char* argv[]) {
|
|||
return FDB_EXIT_ERROR;
|
||||
}
|
||||
|
||||
source_db = source_cluster->createDatabase(databaseKey, localities).get();
|
||||
source_db = source_cluster->createDatabase(localities).get();
|
||||
}
|
||||
|
||||
switch (programExe)
|
||||
|
|
|
@ -1478,8 +1478,8 @@ ACTOR template <class T> Future<T> makeInterruptable( Future<T> f ) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Database> openDatabase( Reference<ClusterConnectionFile> ccf, Reference<Cluster> cluster, Standalone<StringRef> name, bool doCheckStatus ) {
|
||||
state Database db = wait( cluster->createDatabase(name) );
|
||||
ACTOR Future<Database> openDatabase( Reference<ClusterConnectionFile> ccf, Reference<Cluster> cluster, bool doCheckStatus ) {
|
||||
state Database db = wait( cluster->createDatabase() );
|
||||
if (doCheckStatus) {
|
||||
wait( makeInterruptable( checkStatus( Void(), ccf )) );
|
||||
}
|
||||
|
@ -2220,9 +2220,6 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
|
||||
state FdbOptions *options = &globalOptions;
|
||||
|
||||
state const char *database = "DB";
|
||||
state Standalone<StringRef> openDbName = StringRef(database);
|
||||
|
||||
state Reference<ClusterConnectionFile> ccf;
|
||||
|
||||
state std::pair<std::string, bool> resolvedClusterFile = ClusterConnectionFile::lookupClusterFileName( opt.clusterFile );
|
||||
|
@ -2260,9 +2257,9 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
.trackLatest("ProgramStart");
|
||||
}
|
||||
|
||||
if (connected && database) {
|
||||
if (connected) {
|
||||
try {
|
||||
Database _db = wait( openDatabase( ccf, cluster, openDbName, !opt.exec.present() && opt.initialStatusCheck ) );
|
||||
Database _db = wait( openDatabase( ccf, cluster, !opt.exec.present() && opt.initialStatusCheck ) );
|
||||
db = _db;
|
||||
tr = Reference<ReadYourWritesTransaction>();
|
||||
opened = true;
|
||||
|
@ -2271,7 +2268,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
} catch (Error& e) {
|
||||
if(e.code() != error_code_actor_cancelled) {
|
||||
printf("ERROR: %s (%d)\n", e.what(), e.code());
|
||||
printf("Unable to open database `%s'\n", database);
|
||||
printf("Unable to open database\n");
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
|
|
@ -117,7 +117,7 @@ struct OpenDatabaseRequest {
|
|||
// info changes. Returns immediately if the current client info id is different from
|
||||
// knownClientInfoID; otherwise returns when it next changes (or perhaps after a long interval)
|
||||
Arena arena;
|
||||
StringRef dbName, issues, traceLogGroup;
|
||||
StringRef issues, traceLogGroup;
|
||||
VectorRef<ClientVersionRef> supportedVersions;
|
||||
UID knownClientInfoID;
|
||||
ReplyPromise< struct ClientDBInfo > reply;
|
||||
|
@ -125,7 +125,7 @@ struct OpenDatabaseRequest {
|
|||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
ASSERT( ar.protocolVersion() >= 0x0FDB00A400040001LL );
|
||||
ar & dbName & issues & supportedVersions & traceLogGroup & knownClientInfoID & reply & arena;
|
||||
ar & issues & supportedVersions & traceLogGroup & knownClientInfoID & reply & arena;
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -51,15 +51,14 @@ public:
|
|||
|
||||
class DatabaseContext : public ReferenceCounted<DatabaseContext>, NonCopyable {
|
||||
public:
|
||||
static Future<Database> createDatabase( Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<Cluster> cluster, Standalone<StringRef> dbName, LocalityData const& clientLocality );
|
||||
//static Future< Void > configureDatabase( ZookeeperInterface const& zk, int configScope, int configMode, Standalone<StringRef> dbName = Standalone<StringRef>() );
|
||||
static Future<Database> createDatabase( Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<Cluster> cluster, LocalityData const& clientLocality );
|
||||
|
||||
// For internal (fdbserver) use only: create a database context for a DB with already known client info
|
||||
static Database create( Reference<AsyncVar<ClientDBInfo>> info, Future<Void> dependency, LocalityData clientLocality, bool enableLocalityLoadBalance, int taskID = TaskDefaultEndpoint, bool lockAware = false );
|
||||
|
||||
~DatabaseContext();
|
||||
|
||||
Database clone() const { return Database(new DatabaseContext( clientInfo, cluster, clientInfoMonitor, dbName, dbId, taskID, clientLocality, enableLocalityLoadBalance, lockAware )); }
|
||||
Database clone() const { return Database(new DatabaseContext( clientInfo, cluster, clientInfoMonitor, dbId, taskID, clientLocality, enableLocalityLoadBalance, lockAware )); }
|
||||
|
||||
pair<KeyRange,Reference<LocationInfo>> getCachedLocation( const KeyRef&, bool isBackward = false );
|
||||
bool getCachedLocations( const KeyRangeRef&, vector<std::pair<KeyRange,Reference<LocationInfo>>>&, int limit, bool reverse );
|
||||
|
@ -90,7 +89,7 @@ public:
|
|||
//private: friend class ClientInfoMonitorActor;
|
||||
explicit DatabaseContext( Reference<AsyncVar<ClientDBInfo>> clientInfo,
|
||||
Reference<Cluster> cluster, Future<Void> clientInfoMonitor,
|
||||
Standalone<StringRef> dbName, Standalone<StringRef> dbId, int taskID, LocalityData clientLocality, bool enableLocalityLoadBalance, bool lockAware );
|
||||
Standalone<StringRef> dbId, int taskID, LocalityData clientLocality, bool enableLocalityLoadBalance, bool lockAware );
|
||||
|
||||
// These are reference counted
|
||||
Reference<Cluster> cluster;
|
||||
|
@ -127,8 +126,6 @@ public:
|
|||
|
||||
std::map< std::vector<UID>, LocationInfo* > ssid_locationInfo;
|
||||
|
||||
// for logging/debugging (relic of multi-db support)
|
||||
Standalone<StringRef> dbName;
|
||||
Standalone<StringRef> dbId;
|
||||
|
||||
int64_t transactionReadVersions;
|
||||
|
|
|
@ -84,7 +84,7 @@ public:
|
|||
class ICluster {
|
||||
public:
|
||||
virtual ~ICluster() {}
|
||||
virtual ThreadFuture<Reference<IDatabase>> createDatabase(Standalone<StringRef> dbName) = 0;
|
||||
virtual ThreadFuture<Reference<IDatabase>> createDatabase() = 0;
|
||||
virtual void setOption(FDBClusterOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;
|
||||
|
||||
virtual void addref() = 0;
|
||||
|
|
|
@ -410,7 +410,7 @@ TEST_CASE("fdbserver/metrics/TraceEvents") {
|
|||
|
||||
state Reference<Cluster> metricsCluster = Cluster::createCluster( metricsConnFile, Cluster::API_VERSION_LATEST );
|
||||
TDMetricCollection::getTDMetrics()->address = LiteralStringRef("0.0.0.0:0");
|
||||
state Future<Void> metrics = runMetrics(metricsCluster->createDatabase(LiteralStringRef("DB")), KeyRef(metricsPrefix));
|
||||
state Future<Void> metrics = runMetrics(metricsCluster->createDatabase(), KeyRef(metricsPrefix));
|
||||
state int64_t x = 0;
|
||||
|
||||
state double w = 0.5;
|
||||
|
|
|
@ -223,8 +223,8 @@ void DLDatabase::setOption(FDBDatabaseOptions::Option option, Optional<StringRef
|
|||
}
|
||||
|
||||
// DLCluster
|
||||
ThreadFuture<Reference<IDatabase>> DLCluster::createDatabase(Standalone<StringRef> dbName) {
|
||||
FdbCApi::FDBFuture *f = api->clusterCreateDatabase(cluster, (uint8_t*)dbName.toString().c_str(), dbName.size());
|
||||
ThreadFuture<Reference<IDatabase>> DLCluster::createDatabase() {
|
||||
FdbCApi::FDBFuture *f = api->clusterCreateDatabase(cluster, (uint8_t*)"DB", 2);
|
||||
|
||||
return toThreadFuture<Reference<IDatabase>>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
|
||||
FdbCApi::FDBDatabase *db;
|
||||
|
@ -566,8 +566,8 @@ void MultiVersionTransaction::reset() {
|
|||
}
|
||||
|
||||
// MultiVersionDatabase
|
||||
MultiVersionDatabase::MultiVersionDatabase(Reference<MultiVersionCluster> cluster, Standalone<StringRef> dbName, Reference<IDatabase> db, ThreadFuture<Void> changed)
|
||||
: dbState(new DatabaseState(cluster, dbName, db, changed)) {}
|
||||
MultiVersionDatabase::MultiVersionDatabase(Reference<MultiVersionCluster> cluster, Reference<IDatabase> db, ThreadFuture<Void> changed)
|
||||
: dbState(new DatabaseState(cluster, db, changed)) {}
|
||||
|
||||
MultiVersionDatabase::~MultiVersionDatabase() {
|
||||
dbState->cancelCallbacks();
|
||||
|
@ -575,7 +575,7 @@ MultiVersionDatabase::~MultiVersionDatabase() {
|
|||
|
||||
Reference<IDatabase> MultiVersionDatabase::debugCreateFromExistingDatabase(Reference<IDatabase> db) {
|
||||
auto cluster = Reference<ThreadSafeAsyncVar<Reference<ICluster>>>(new ThreadSafeAsyncVar<Reference<ICluster>>(Reference<ICluster>(NULL)));
|
||||
return Reference<IDatabase>(new MultiVersionDatabase(Reference<MultiVersionCluster>::addRef(new MultiVersionCluster()), LiteralStringRef("DB"), db, ThreadFuture<Void>(Never())));
|
||||
return Reference<IDatabase>(new MultiVersionDatabase(Reference<MultiVersionCluster>::addRef(new MultiVersionCluster()), db, ThreadFuture<Void>(Never())));
|
||||
}
|
||||
|
||||
Reference<ITransaction> MultiVersionDatabase::createTransaction() {
|
||||
|
@ -592,8 +592,8 @@ void MultiVersionDatabase::setOption(FDBDatabaseOptions::Option option, Optional
|
|||
dbState->options.push_back(std::make_pair(option, value.cast_to<Standalone<StringRef>>()));
|
||||
}
|
||||
|
||||
MultiVersionDatabase::DatabaseState::DatabaseState(Reference<MultiVersionCluster> cluster, Standalone<StringRef> dbName, Reference<IDatabase> db, ThreadFuture<Void> changed)
|
||||
: cluster(cluster), dbName(dbName), db(db), dbVar(new ThreadSafeAsyncVar<Reference<IDatabase>>(db)), cancelled(false), changed(changed)
|
||||
MultiVersionDatabase::DatabaseState::DatabaseState(Reference<MultiVersionCluster> cluster, Reference<IDatabase> db, ThreadFuture<Void> changed)
|
||||
: cluster(cluster), db(db), dbVar(new ThreadSafeAsyncVar<Reference<IDatabase>>(db)), cancelled(false), changed(changed)
|
||||
{
|
||||
addref();
|
||||
int userParam;
|
||||
|
@ -667,7 +667,7 @@ void MultiVersionDatabase::DatabaseState::updateDatabase() {
|
|||
|
||||
if(currentCluster.value) {
|
||||
addref();
|
||||
dbFuture = currentCluster.value->createDatabase(dbName);
|
||||
dbFuture = currentCluster.value->createDatabase();
|
||||
dbFuture.callOrSetAsCallback(this, userParam, false);
|
||||
}
|
||||
}
|
||||
|
@ -710,23 +710,23 @@ MultiVersionCluster::~MultiVersionCluster() {
|
|||
clusterState->cancelConnections();
|
||||
}
|
||||
|
||||
ThreadFuture<Reference<IDatabase>> MultiVersionCluster::createDatabase(Standalone<StringRef> dbName) {
|
||||
ThreadFuture<Reference<IDatabase>> MultiVersionCluster::createDatabase() {
|
||||
auto cluster = clusterState->clusterVar->get();
|
||||
|
||||
if(cluster.value) {
|
||||
ThreadFuture<Reference<IDatabase>> dbFuture = abortableFuture(cluster.value->createDatabase(dbName), cluster.onChange);
|
||||
ThreadFuture<Reference<IDatabase>> dbFuture = abortableFuture(cluster.value->createDatabase(), cluster.onChange);
|
||||
|
||||
return mapThreadFuture<Reference<IDatabase>, Reference<IDatabase>>(dbFuture, [this, cluster, dbName](ErrorOr<Reference<IDatabase>> db) {
|
||||
return mapThreadFuture<Reference<IDatabase>, Reference<IDatabase>>(dbFuture, [this, cluster](ErrorOr<Reference<IDatabase>> db) {
|
||||
if(db.isError() && db.getError().code() != error_code_cluster_version_changed) {
|
||||
return db;
|
||||
}
|
||||
|
||||
Reference<IDatabase> newDb = db.isError() ? Reference<IDatabase>(NULL) : db.get();
|
||||
return ErrorOr<Reference<IDatabase>>(Reference<IDatabase>(new MultiVersionDatabase(Reference<MultiVersionCluster>::addRef(this), dbName, newDb, cluster.onChange)));
|
||||
return ErrorOr<Reference<IDatabase>>(Reference<IDatabase>(new MultiVersionDatabase(Reference<MultiVersionCluster>::addRef(this), newDb, cluster.onChange)));
|
||||
});
|
||||
}
|
||||
else {
|
||||
return Reference<IDatabase>(new MultiVersionDatabase(Reference<MultiVersionCluster>::addRef(this), dbName, Reference<IDatabase>(), cluster.onChange));
|
||||
return Reference<IDatabase>(new MultiVersionDatabase(Reference<MultiVersionCluster>::addRef(this), Reference<IDatabase>(), cluster.onChange));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -756,7 +756,7 @@ void MultiVersionCluster::Connector::connect() {
|
|||
}
|
||||
else {
|
||||
candidateCluster = cluster.get();
|
||||
return ErrorOr<ThreadFuture<Reference<IDatabase>>>(cluster.get()->createDatabase(LiteralStringRef("DB")));
|
||||
return ErrorOr<ThreadFuture<Reference<IDatabase>>>(cluster.get()->createDatabase());
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
@ -178,7 +178,7 @@ public:
|
|||
DLCluster(Reference<FdbCApi> api, FdbCApi::FDBCluster *cluster) : api(api), cluster(cluster) {}
|
||||
~DLCluster() { api->clusterDestroy(cluster); }
|
||||
|
||||
ThreadFuture<Reference<IDatabase>> createDatabase(Standalone<StringRef> dbName);
|
||||
ThreadFuture<Reference<IDatabase>> createDatabase();
|
||||
void setOption(FDBClusterOptions::Option option, Optional<StringRef> value = Optional<StringRef>());
|
||||
|
||||
void addref() { ThreadSafeReferenceCounted<DLCluster>::addref(); }
|
||||
|
@ -278,7 +278,7 @@ class MultiVersionCluster;
|
|||
|
||||
class MultiVersionDatabase : public IDatabase, ThreadSafeReferenceCounted<MultiVersionDatabase> {
|
||||
public:
|
||||
MultiVersionDatabase(Reference<MultiVersionCluster> cluster, Standalone<StringRef> dbName, Reference<IDatabase> db, ThreadFuture<Void> changed);
|
||||
MultiVersionDatabase(Reference<MultiVersionCluster> cluster, Reference<IDatabase> db, ThreadFuture<Void> changed);
|
||||
~MultiVersionDatabase();
|
||||
|
||||
Reference<ITransaction> createTransaction();
|
||||
|
@ -291,7 +291,7 @@ public:
|
|||
|
||||
private:
|
||||
struct DatabaseState : ThreadCallback, ThreadSafeReferenceCounted<DatabaseState> {
|
||||
DatabaseState(Reference<MultiVersionCluster> cluster, Standalone<StringRef> dbName, Reference<IDatabase> db, ThreadFuture<Void> changed);
|
||||
DatabaseState(Reference<MultiVersionCluster> cluster, Reference<IDatabase> db, ThreadFuture<Void> changed);
|
||||
|
||||
void updateDatabase();
|
||||
void cancelCallbacks();
|
||||
|
@ -304,7 +304,6 @@ private:
|
|||
|
||||
Reference<IDatabase> db;
|
||||
const Reference<ThreadSafeAsyncVar<Reference<IDatabase>>> dbVar;
|
||||
const Standalone<StringRef> dbName;
|
||||
|
||||
ThreadFuture<Reference<IDatabase>> dbFuture;
|
||||
ThreadFuture<Void> changed;
|
||||
|
@ -343,7 +342,7 @@ public:
|
|||
MultiVersionCluster(MultiVersionApi *api, std::string clusterFilePath, Reference<ICluster> cluster);
|
||||
~MultiVersionCluster();
|
||||
|
||||
ThreadFuture<Reference<IDatabase>> createDatabase(Standalone<StringRef> dbName);
|
||||
ThreadFuture<Reference<IDatabase>> createDatabase();
|
||||
void setOption(FDBClusterOptions::Option option, Optional<StringRef> value = Optional<StringRef>());
|
||||
|
||||
void addref() { ThreadSafeReferenceCounted<MultiVersionCluster>::addref(); }
|
||||
|
|
|
@ -474,9 +474,9 @@ ACTOR static Future<Void> monitorMasterProxiesChange(Reference<AsyncVar<ClientDB
|
|||
DatabaseContext::DatabaseContext(
|
||||
Reference<AsyncVar<ClientDBInfo>> clientInfo,
|
||||
Reference<Cluster> cluster, Future<Void> clientInfoMonitor,
|
||||
Standalone<StringRef> dbName, Standalone<StringRef> dbId,
|
||||
int taskID, LocalityData clientLocality, bool enableLocalityLoadBalance, bool lockAware )
|
||||
: clientInfo(clientInfo), masterProxiesChangeTrigger(), cluster(cluster), clientInfoMonitor(clientInfoMonitor), dbName(dbName), dbId(dbId),
|
||||
Standalone<StringRef> dbId, int taskID, LocalityData clientLocality,
|
||||
bool enableLocalityLoadBalance, bool lockAware )
|
||||
: clientInfo(clientInfo), masterProxiesChangeTrigger(), cluster(cluster), clientInfoMonitor(clientInfoMonitor), dbId(dbId),
|
||||
transactionReadVersions(0), transactionLogicalReads(0), transactionPhysicalReads(0), transactionCommittedMutations(0), transactionCommittedMutationBytes(0), transactionsCommitStarted(0),
|
||||
transactionsCommitCompleted(0), transactionsTooOld(0), transactionsFutureVersions(0), transactionsNotCommitted(0), transactionsMaybeCommitted(0), transactionsResourceConstrained(0), taskID(taskID),
|
||||
outstandingWatches(0), maxOutstandingWatches(CLIENT_KNOBS->DEFAULT_MAX_OUTSTANDING_WATCHES), clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance), lockAware(lockAware),
|
||||
|
@ -494,14 +494,11 @@ DatabaseContext::DatabaseContext(
|
|||
clientStatusUpdater.actor = clientStatusUpdateActor(this);
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Standalone<StringRef> dbName,
|
||||
Reference<ClusterConnectionFile> ccf, Reference<AsyncVar<ClientDBInfo>> outInfo )
|
||||
{
|
||||
ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<ClusterConnectionFile> ccf, Reference<AsyncVar<ClientDBInfo>> outInfo ) {
|
||||
try {
|
||||
loop {
|
||||
OpenDatabaseRequest req;
|
||||
req.knownClientInfoID = outInfo->get().id;
|
||||
req.dbName = dbName;
|
||||
req.supportedVersions = VectorRef<ClientVersionRef>(req.arena, networkOptions.supportedVersions);
|
||||
req.traceLogGroup = StringRef(req.arena, networkOptions.traceLogGroup);
|
||||
|
||||
|
@ -529,7 +526,6 @@ ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<Cluster
|
|||
} catch( Error& e ) {
|
||||
TraceEvent(SevError, "MonitorClientInfoError")
|
||||
.error(e)
|
||||
.detail("DBName", printable(dbName))
|
||||
.detail("ConnectionFile", ccf && ccf->canGetFilename() ? ccf->getFilename() : "")
|
||||
.detail("ConnectionString", ccf ? ccf->getConnectionString().toString() : "");
|
||||
|
||||
|
@ -537,20 +533,15 @@ ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<Cluster
|
|||
}
|
||||
}
|
||||
|
||||
Future< Database > DatabaseContext::createDatabase( Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<Cluster> cluster, Standalone<StringRef> dbName, LocalityData const& clientLocality ) {
|
||||
if (dbName != LiteralStringRef("DB")) {
|
||||
return invalid_database_name(); // we no longer offer multi-database support, so all databases *must* be named this
|
||||
}
|
||||
else {
|
||||
Reference<AsyncVar<ClientDBInfo>> info( new AsyncVar<ClientDBInfo> );
|
||||
Future<Void> monitor = monitorClientInfo( clusterInterface, dbName, cluster ? cluster->getConnectionFile() : Reference<ClusterConnectionFile>(), info );
|
||||
Future< Database > DatabaseContext::createDatabase( Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<Cluster> cluster, LocalityData const& clientLocality ) {
|
||||
Reference<AsyncVar<ClientDBInfo>> info( new AsyncVar<ClientDBInfo> );
|
||||
Future<Void> monitor = monitorClientInfo( clusterInterface, cluster ? cluster->getConnectionFile() : Reference<ClusterConnectionFile>(), info );
|
||||
|
||||
return std::move( Database( new DatabaseContext( info, cluster, monitor, dbName, LiteralStringRef(""), TaskDefaultEndpoint, clientLocality, true, false ) ) );
|
||||
}
|
||||
return std::move( Database( new DatabaseContext( info, cluster, monitor, LiteralStringRef(""), TaskDefaultEndpoint, clientLocality, true, false ) ) );
|
||||
}
|
||||
|
||||
Database DatabaseContext::create( Reference<AsyncVar<ClientDBInfo>> info, Future<Void> dependency, LocalityData clientLocality, bool enableLocalityLoadBalance, int taskID, bool lockAware ) {
|
||||
return Database( new DatabaseContext( info, Reference<Cluster>(), dependency, LiteralStringRef("DB"), LiteralStringRef(""), taskID, clientLocality, enableLocalityLoadBalance, lockAware ) );
|
||||
return Database( new DatabaseContext( info, Reference<Cluster>(), dependency, LiteralStringRef(""), taskID, clientLocality, enableLocalityLoadBalance, lockAware ) );
|
||||
}
|
||||
|
||||
DatabaseContext::~DatabaseContext() {
|
||||
|
@ -741,8 +732,8 @@ Reference<Cluster> Cluster::createCluster(std::string connFileName, int apiVersi
|
|||
return Reference<Cluster>(new Cluster( rccf, apiVersion));
|
||||
}
|
||||
|
||||
Future<Database> Cluster::createDatabase( Standalone<StringRef> dbName, LocalityData locality ) {
|
||||
return DatabaseContext::createDatabase( clusterInterface, Reference<Cluster>::addRef( this ), dbName, locality );
|
||||
Future<Database> Cluster::createDatabase( LocalityData locality ) {
|
||||
return DatabaseContext::createDatabase( clusterInterface, Reference<Cluster>::addRef( this ), locality );
|
||||
}
|
||||
|
||||
Future<Void> Cluster::onConnected() {
|
||||
|
|
|
@ -110,7 +110,7 @@ public:
|
|||
static Reference<Cluster> createCluster(std::string connFileName, int apiVersion);
|
||||
|
||||
// See DatabaseContext::createDatabase
|
||||
Future<Database> createDatabase( Standalone<StringRef> dbName, LocalityData locality = LocalityData() );
|
||||
Future<Database> createDatabase( LocalityData locality = LocalityData() );
|
||||
|
||||
void setOption(FDBClusterOptions::Option option, Optional<StringRef> value);
|
||||
|
||||
|
|
|
@ -65,17 +65,17 @@ Future<Reference<IDatabase>> threadSafeCreateDatabase( Database db ) {
|
|||
return Reference<IDatabase>(new ThreadSafeDatabase(db.getPtr()));
|
||||
}
|
||||
|
||||
ACTOR Future<Reference<IDatabase>> threadSafeCreateDatabase( Cluster* cluster, Standalone<StringRef> name ) {
|
||||
Database db = wait( cluster->createDatabase(name) );
|
||||
ACTOR Future<Reference<IDatabase>> threadSafeCreateDatabase( Cluster* cluster ) {
|
||||
Database db = wait( cluster->createDatabase() );
|
||||
Reference<IDatabase> threadSafeDb = wait(threadSafeCreateDatabase(db));
|
||||
return threadSafeDb;
|
||||
}
|
||||
|
||||
ThreadFuture<Reference<IDatabase>> ThreadSafeCluster::createDatabase( Standalone<StringRef> dbName ) {
|
||||
ThreadFuture<Reference<IDatabase>> ThreadSafeCluster::createDatabase() {
|
||||
Cluster* cluster = this->cluster;
|
||||
return onMainThread( [cluster, dbName](){
|
||||
return onMainThread( [cluster](){
|
||||
cluster->checkDeferredError();
|
||||
return threadSafeCreateDatabase(cluster, dbName);
|
||||
return threadSafeCreateDatabase(cluster);
|
||||
} );
|
||||
}
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ class ThreadSafeCluster : public ICluster, public ThreadSafeReferenceCounted<Thr
|
|||
public:
|
||||
static ThreadFuture<Reference<ICluster>> create( std::string connFilename, int apiVersion = -1 );
|
||||
~ThreadSafeCluster();
|
||||
ThreadFuture<Reference<IDatabase>> createDatabase( Standalone<StringRef> dbName );
|
||||
ThreadFuture<Reference<IDatabase>> createDatabase();
|
||||
|
||||
void setOption( FDBClusterOptions::Option option, Optional<StringRef> value = Optional<StringRef>() );
|
||||
|
||||
|
|
|
@ -98,7 +98,7 @@ public:
|
|||
|
||||
DBInfo() : masterRegistrationCount(0), recoveryStalled(false), forceRecovery(false),
|
||||
clientInfo( new AsyncVar<ClientDBInfo>( ClientDBInfo() ) ),
|
||||
serverInfo( new AsyncVar<ServerDBInfo>( ServerDBInfo( LiteralStringRef("DB") ) ) ),
|
||||
serverInfo( new AsyncVar<ServerDBInfo>( ServerDBInfo() ) ),
|
||||
db( DatabaseContext::create( clientInfo, Future<Void>(), LocalityData(), true, TaskDefaultEndpoint, true ) ) // SOMEDAY: Locality!
|
||||
{
|
||||
|
||||
|
@ -1067,7 +1067,7 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
|
|||
// for status tool
|
||||
TraceEvent("RecruitedMasterWorker", cluster->id)
|
||||
.detail("Address", newMaster.get().address())
|
||||
.trackLatest("DB/RecruitedMasterWorker");
|
||||
.trackLatest("RecruitedMasterWorker");
|
||||
|
||||
iMaster = newMaster.get();
|
||||
|
||||
|
@ -1076,7 +1076,7 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
|
|||
db->forceRecovery = false;
|
||||
db->forceMasterFailure = Promise<Void>();
|
||||
|
||||
auto dbInfo = ServerDBInfo( LiteralStringRef("DB") );
|
||||
auto dbInfo = ServerDBInfo();
|
||||
dbInfo.master = iMaster;
|
||||
dbInfo.id = g_random->randomUniqueID();
|
||||
dbInfo.masterLifetime = db->serverInfo->get().masterLifetime;
|
||||
|
@ -1158,7 +1158,6 @@ ACTOR Future<Void> clusterGetServerInfo(
|
|||
|
||||
ACTOR Future<Void> clusterOpenDatabase(
|
||||
ClusterControllerData::DBInfo* db,
|
||||
Standalone<StringRef> dbName,
|
||||
UID knownClientInfoID,
|
||||
std::string issues,
|
||||
Standalone<VectorRef<ClientVersionRef>> supportedVersions,
|
||||
|
@ -1542,13 +1541,13 @@ ACTOR Future<Void> clusterRecruitRemoteFromConfiguration( ClusterControllerData*
|
|||
void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest const& req ) {
|
||||
req.reply.send( Void() );
|
||||
|
||||
TraceEvent("MasterRegistrationReceived", self->id).detail("DbName", printable(req.dbName)).detail("MasterId", req.id).detail("Master", req.mi.toString()).detail("Tlogs", describe(req.logSystemConfig.tLogs)).detail("Resolvers", req.resolvers.size())
|
||||
TraceEvent("MasterRegistrationReceived", self->id).detail("MasterId", req.id).detail("Master", req.mi.toString()).detail("Tlogs", describe(req.logSystemConfig.tLogs)).detail("Resolvers", req.resolvers.size())
|
||||
.detail("RecoveryState", (int)req.recoveryState).detail("RegistrationCount", req.registrationCount).detail("Proxies", req.proxies.size()).detail("RecoveryCount", req.recoveryCount).detail("Stalled", req.recoveryStalled);
|
||||
|
||||
//make sure the request comes from an active database
|
||||
auto db = &self->db;
|
||||
if ( db->serverInfo->get().master.id() != req.id || req.registrationCount <= db->masterRegistrationCount ) {
|
||||
TraceEvent("MasterRegistrationNotFound", self->id).detail("DbName", printable(req.dbName)).detail("MasterId", req.id).detail("ExistingId", db->serverInfo->get().master.id()).detail("RegCount", req.registrationCount).detail("ExistingRegCount", db->masterRegistrationCount);
|
||||
TraceEvent("MasterRegistrationNotFound", self->id).detail("MasterId", req.id).detail("ExistingId", db->serverInfo->get().master.id()).detail("RegCount", req.registrationCount).detail("ExistingRegCount", db->masterRegistrationCount);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -2157,7 +2156,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
|
|||
return Void();
|
||||
}
|
||||
when( OpenDatabaseRequest req = waitNext( interf.clientInterface.openDatabase.getFuture() ) ) {
|
||||
addActor.send( clusterOpenDatabase( &self.db, req.dbName, req.knownClientInfoID, req.issues.toString(), req.supportedVersions, req.traceLogGroup, req.reply ) );
|
||||
addActor.send( clusterOpenDatabase( &self.db, req.knownClientInfoID, req.issues.toString(), req.supportedVersions, req.traceLogGroup, req.reply ) );
|
||||
}
|
||||
when( RecruitFromConfigurationRequest req = waitNext( interf.recruitFromConfiguration.getFuture() ) ) {
|
||||
addActor.send( clusterRecruitFromConfiguration( &self, req ) );
|
||||
|
|
|
@ -194,7 +194,6 @@ struct GetWorkersRequest {
|
|||
};
|
||||
|
||||
struct RegisterMasterRequest {
|
||||
Standalone<StringRef> dbName;
|
||||
UID id;
|
||||
LocalityData mi;
|
||||
LogSystemConfig logSystemConfig;
|
||||
|
@ -214,7 +213,7 @@ struct RegisterMasterRequest {
|
|||
template <class Ar>
|
||||
void serialize( Ar& ar ) {
|
||||
ASSERT( ar.protocolVersion() >= 0x0FDB00A200040001LL );
|
||||
ar & dbName & id & mi & logSystemConfig & proxies & resolvers & recoveryCount & registrationCount & configuration & priorCommittedLogServers & recoveryState & recoveryStalled & reply;
|
||||
ar & id & mi & logSystemConfig & proxies & resolvers & recoveryCount & registrationCount & configuration & priorCommittedLogServers & recoveryState & recoveryStalled & reply;
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -571,7 +571,7 @@ struct DDTeamCollection {
|
|||
if(!primary || configuration.usableRegions == 1) {
|
||||
TraceEvent("DDTrackerStarting", masterId)
|
||||
.detail( "State", "Inactive" )
|
||||
.trackLatest( format("%s/DDTrackerStarting", printable(cx->dbName).c_str() ).c_str() );
|
||||
.trackLatest( "DDTrackerStarting" );
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -600,7 +600,7 @@ struct DDTeamCollection {
|
|||
if(!self->primary || self->configuration.usableRegions == 1) {
|
||||
TraceEvent("DDTrackerStarting", self->masterId)
|
||||
.detail( "State", "Active" )
|
||||
.trackLatest( format("%s/DDTrackerStarting", printable(self->cx->dbName).c_str() ).c_str() );
|
||||
.trackLatest( "DDTrackerStarting" );
|
||||
}
|
||||
|
||||
return Void();
|
||||
|
@ -1792,7 +1792,7 @@ ACTOR Future<Void> monitorStorageServerRecruitment(DDTeamCollection *self) {
|
|||
state bool recruiting = false;
|
||||
TraceEvent("StorageServerRecruitment", self->masterId)
|
||||
.detail("State", "Idle")
|
||||
.trackLatest((self->cx->dbName.toString() + "/StorageServerRecruitment_" + self->masterId.toString()).c_str());
|
||||
.trackLatest(("StorageServerRecruitment_" + self->masterId.toString()).c_str());
|
||||
loop {
|
||||
if( !recruiting ) {
|
||||
while(self->recruitingStream.get() == 0) {
|
||||
|
@ -1800,7 +1800,7 @@ ACTOR Future<Void> monitorStorageServerRecruitment(DDTeamCollection *self) {
|
|||
}
|
||||
TraceEvent("StorageServerRecruitment", self->masterId)
|
||||
.detail("State", "Recruiting")
|
||||
.trackLatest((self->cx->dbName.toString() + "/StorageServerRecruitment_" + self->masterId.toString()).c_str());
|
||||
.trackLatest(("StorageServerRecruitment_" + self->masterId.toString()).c_str());
|
||||
recruiting = true;
|
||||
} else {
|
||||
loop {
|
||||
|
@ -1811,7 +1811,7 @@ ACTOR Future<Void> monitorStorageServerRecruitment(DDTeamCollection *self) {
|
|||
}
|
||||
TraceEvent("StorageServerRecruitment", self->masterId)
|
||||
.detail("State", "Idle")
|
||||
.trackLatest((self->cx->dbName.toString() + "/StorageServerRecruitment_" + self->masterId.toString()).c_str());
|
||||
.trackLatest(("StorageServerRecruitment_" + self->masterId.toString()).c_str());
|
||||
recruiting = false;
|
||||
}
|
||||
}
|
||||
|
@ -2211,7 +2211,7 @@ ACTOR Future<Void> dataDistribution(
|
|||
.detail( "LowPriorityRelocations", 0 )
|
||||
.detail( "HighPriorityRelocations", 0 )
|
||||
.detail( "HighestPriority", 0 )
|
||||
.trackLatest( format("%s/MovingData", printable(cx->dbName).c_str() ).c_str() );
|
||||
.trackLatest( "MovingData" );
|
||||
|
||||
TraceEvent("TotalDataInFlight", mi.id()).detail("Primary", true).detail("TotalBytes", 0).detail("UnhealthyServers", 0).detail("HighestPriority", 0).trackLatest("TotalDataInFlight");
|
||||
TraceEvent("TotalDataInFlight", mi.id()).detail("Primary", false).detail("TotalBytes", 0).detail("UnhealthyServers", 0).detail("HighestPriority", configuration.usableRegions > 1 ? 0 : -1).trackLatest("TotalDataInFlightRemote");
|
||||
|
|
|
@ -1271,7 +1271,7 @@ ACTOR Future<Void> dataDistributionQueue(
|
|||
.detail( "HighPriorityRelocations", highPriorityRelocations )
|
||||
.detail( "HighestPriority", highestPriorityRelocation )
|
||||
.detail( "BytesWritten", self.bytesWritten )
|
||||
.trackLatest( format("%s/MovingData", printable(cx->dbName).c_str() ).c_str() );
|
||||
.trackLatest( "MovingData" );
|
||||
}
|
||||
when ( wait( self.error.getFuture() ) ) {} // Propagate errors from dataDistributionRelocator
|
||||
when ( wait(waitForAll( balancingFutures ) )) {}
|
||||
|
|
|
@ -41,7 +41,7 @@ BandwidthStatus getBandwidthStatus( StorageMetrics const& metrics ) {
|
|||
return BandwidthStatusNormal;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> updateMaxShardSize( Standalone<StringRef> dbName, Reference<AsyncVar<int64_t>> dbSizeEstimate, Reference<AsyncVar<Optional<int64_t>>> maxShardSize ) {
|
||||
ACTOR Future<Void> updateMaxShardSize( Reference<AsyncVar<int64_t>> dbSizeEstimate, Reference<AsyncVar<Optional<int64_t>>> maxShardSize ) {
|
||||
state int64_t lastDbSize = 0;
|
||||
state int64_t granularity = g_network->isSimulated() ?
|
||||
SERVER_KNOBS->DD_SHARD_SIZE_GRANULARITY_SIM : SERVER_KNOBS->DD_SHARD_SIZE_GRANULARITY;
|
||||
|
@ -619,7 +619,7 @@ ACTOR Future<Void> trackInitialShards(DataDistributionTracker *self, Reference<I
|
|||
Future<Void> initialSize = changeSizes( self, KeyRangeRef(allKeys.begin, allKeys.end), 0 );
|
||||
self->readyToStart.send(Void());
|
||||
wait( initialSize );
|
||||
self->maxShardSizeUpdater = updateMaxShardSize( self->cx->dbName, self->dbSizeEstimate, self->maxShardSize );
|
||||
self->maxShardSizeUpdater = updateMaxShardSize( self->dbSizeEstimate, self->maxShardSize );
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -690,7 +690,7 @@ ACTOR Future<Void> dataDistributionTracker(
|
|||
TraceEvent("DDTrackerStats", self.masterId)
|
||||
.detail("Shards", self.shards.size())
|
||||
.detail("TotalSizeBytes", self.dbSizeEstimate->get())
|
||||
.trackLatest( format("%s/DDTrackerStats", printable(cx->dbName).c_str() ).c_str() );
|
||||
.trackLatest( "DDTrackerStats" );
|
||||
|
||||
loggingTrigger = delay(SERVER_KNOBS->DATA_DISTRIBUTION_LOGGING_INTERVAL);
|
||||
}
|
||||
|
|
|
@ -43,20 +43,20 @@ ACTOR Future<vector<std::pair<WorkerInterface, ProcessClass>>> getWorkers( Refer
|
|||
|
||||
//Gets the WorkerInterface representing the Master server.
|
||||
ACTOR Future<WorkerInterface> getMasterWorker( Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo ) {
|
||||
TraceEvent("GetMasterWorker").detail("Database", printable(cx->dbName)).detail("Stage", "GettingWorkers");
|
||||
TraceEvent("GetMasterWorker").detail("Stage", "GettingWorkers");
|
||||
|
||||
loop {
|
||||
state vector<std::pair<WorkerInterface, ProcessClass>> workers = wait( getWorkers( dbInfo ) );
|
||||
|
||||
for( int i = 0; i < workers.size(); i++ ) {
|
||||
if( workers[i].first.address() == dbInfo->get().master.address() ) {
|
||||
TraceEvent("GetMasterWorker").detail("Database", printable(cx->dbName)).detail("Stage", "GotWorkers").detail("MasterId", dbInfo->get().master.id()).detail("WorkerId", workers[i].first.id());
|
||||
TraceEvent("GetMasterWorker").detail("Stage", "GotWorkers").detail("MasterId", dbInfo->get().master.id()).detail("WorkerId", workers[i].first.id());
|
||||
return workers[i].first;
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent(SevWarn, "GetMasterWorkerError")
|
||||
.detail("Database", printable(cx->dbName)).detail("Error", "MasterWorkerNotFound")
|
||||
.detail("Error", "MasterWorkerNotFound")
|
||||
.detail("Master", dbInfo->get().master.id()).detail("MasterAddress", dbInfo->get().master.address())
|
||||
.detail("WorkerCount", workers.size());
|
||||
|
||||
|
@ -67,7 +67,7 @@ ACTOR Future<WorkerInterface> getMasterWorker( Database cx, Reference<AsyncVar<S
|
|||
//Gets the number of bytes in flight from the master
|
||||
ACTOR Future<int64_t> getDataInFlight( Database cx, WorkerInterface masterWorker ) {
|
||||
try {
|
||||
TraceEvent("DataInFlight").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingMaster");
|
||||
TraceEvent("DataInFlight").detail("Stage", "ContactingMaster");
|
||||
TraceEventFields md = wait( timeoutError(masterWorker.eventLogRequest.getReply(
|
||||
EventLogRequest( LiteralStringRef("TotalDataInFlight") ) ), 1.0 ) );
|
||||
int64_t dataInFlight;
|
||||
|
@ -102,7 +102,7 @@ int64_t getQueueSize( TraceEventFields md ) {
|
|||
|
||||
// This is not robust in the face of a TLog failure
|
||||
ACTOR Future<int64_t> getMaxTLogQueueSize( Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo, WorkerInterface masterWorker ) {
|
||||
TraceEvent("MaxTLogQueueSize").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingLogs");
|
||||
TraceEvent("MaxTLogQueueSize").detail("Stage", "ContactingLogs");
|
||||
|
||||
state std::vector<std::pair<WorkerInterface, ProcessClass>> workers = wait(getWorkers(dbInfo));
|
||||
std::map<NetworkAddress, WorkerInterface> workersMap;
|
||||
|
@ -123,8 +123,7 @@ ACTOR Future<int64_t> getMaxTLogQueueSize( Database cx, Reference<AsyncVar<Serve
|
|||
}
|
||||
wait( waitForAll( messages ) );
|
||||
|
||||
TraceEvent("MaxTLogQueueSize").detail("Database", printable(cx->dbName))
|
||||
.detail("Stage", "ComputingMax").detail("MessageCount", messages.size());
|
||||
TraceEvent("MaxTLogQueueSize").detail("Stage", "ComputingMax").detail("MessageCount", messages.size());
|
||||
|
||||
state int64_t maxQueueSize = 0;
|
||||
state int i = 0;
|
||||
|
@ -169,7 +168,7 @@ ACTOR Future<vector<StorageServerInterface>> getStorageServers( Database cx, boo
|
|||
|
||||
//Gets the maximum size of all the storage server queues
|
||||
ACTOR Future<int64_t> getMaxStorageServerQueueSize( Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo, WorkerInterface masterWorker ) {
|
||||
TraceEvent("MaxStorageServerQueueSize").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingStorageServers");
|
||||
TraceEvent("MaxStorageServerQueueSize").detail("Stage", "ContactingStorageServers");
|
||||
|
||||
Future<std::vector<StorageServerInterface>> serversFuture = getStorageServers(cx);
|
||||
state Future<std::vector<std::pair<WorkerInterface, ProcessClass>>> workersFuture = getWorkers(dbInfo);
|
||||
|
@ -195,7 +194,7 @@ ACTOR Future<int64_t> getMaxStorageServerQueueSize( Database cx, Reference<Async
|
|||
|
||||
wait( waitForAll(messages) );
|
||||
|
||||
TraceEvent("MaxStorageServerQueueSize").detail("Database", printable(cx->dbName)).detail("Stage", "ComputingMax").detail("MessageCount", messages.size());
|
||||
TraceEvent("MaxStorageServerQueueSize").detail("Stage", "ComputingMax").detail("MessageCount", messages.size());
|
||||
|
||||
state int64_t maxQueueSize = 0;
|
||||
state int i = 0;
|
||||
|
@ -222,12 +221,12 @@ ACTOR Future<int64_t> getMaxStorageServerQueueSize( Database cx, Reference<Async
|
|||
//Gets the size of the data distribution queue. If reportInFlight is true, then data in flight is considered part of the queue
|
||||
ACTOR Future<int64_t> getDataDistributionQueueSize( Database cx, WorkerInterface masterWorker, bool reportInFlight) {
|
||||
try {
|
||||
TraceEvent("DataDistributionQueueSize").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingMaster");
|
||||
TraceEvent("DataDistributionQueueSize").detail("Stage", "ContactingMaster");
|
||||
|
||||
TraceEventFields movingDataMessage = wait( timeoutError(masterWorker.eventLogRequest.getReply(
|
||||
EventLogRequest( StringRef( cx->dbName.toString() + "/MovingData") ) ), 1.0 ) );
|
||||
EventLogRequest( LiteralStringRef("MovingData") ) ), 1.0 ) );
|
||||
|
||||
TraceEvent("DataDistributionQueueSize").detail("Database", printable(cx->dbName)).detail("Stage", "GotString")/*.detail("Result", printable(movingDataMessage))*/.detail("TrackLatest", printable( StringRef( cx->dbName.toString() + "/MovingData") ) );
|
||||
TraceEvent("DataDistributionQueueSize").detail("Stage", "GotString");
|
||||
|
||||
int64_t inQueue;
|
||||
sscanf(movingDataMessage.getValue("InQueue").c_str(), "%lld", &inQueue);
|
||||
|
@ -256,10 +255,10 @@ ACTOR Future<int64_t> getDataDistributionQueueSize( Database cx, Reference<Async
|
|||
//Checks that data distribution is active
|
||||
ACTOR Future<bool> getDataDistributionActive( Database cx, WorkerInterface masterWorker ) {
|
||||
try {
|
||||
TraceEvent("DataDistributionActive").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingMaster");
|
||||
TraceEvent("DataDistributionActive").detail("Stage", "ContactingMaster");
|
||||
|
||||
TraceEventFields activeMessage = wait( timeoutError(masterWorker.eventLogRequest.getReply(
|
||||
EventLogRequest( StringRef( cx->dbName.toString() + "/DDTrackerStarting") ) ), 1.0 ) );
|
||||
EventLogRequest( LiteralStringRef("DDTrackerStarting") ) ), 1.0 ) );
|
||||
|
||||
return activeMessage.getValue("State") == "Active";
|
||||
} catch( Error &e ) {
|
||||
|
@ -271,10 +270,10 @@ ACTOR Future<bool> getDataDistributionActive( Database cx, WorkerInterface maste
|
|||
//Checks to see if any storage servers are being recruited
|
||||
ACTOR Future<bool> getStorageServersRecruiting( Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo, WorkerInterface masterWorker ) {
|
||||
try {
|
||||
TraceEvent("StorageServersRecruiting").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingMaster");
|
||||
TraceEvent("StorageServersRecruiting").detail("Stage", "ContactingMaster");
|
||||
|
||||
TraceEventFields recruitingMessage = wait( timeoutError(masterWorker.eventLogRequest.getReply(
|
||||
EventLogRequest( StringRef( cx->dbName.toString() + "/StorageServerRecruitment_" + dbInfo->get().master.id().toString()) ) ), 1.0 ) );
|
||||
EventLogRequest( StringRef( "StorageServerRecruitment_" + dbInfo->get().master.id().toString()) ) ), 1.0 ) );
|
||||
|
||||
return recruitingMessage.getValue("State") == "Recruiting";
|
||||
} catch( Error &e ) {
|
||||
|
|
|
@ -117,7 +117,6 @@ struct Ratekeeper {
|
|||
std::map<UID, std::pair<int64_t, double> > proxy_transactionCountAndTime;
|
||||
Smoother smoothReleasedTransactions, smoothTotalDurableBytes;
|
||||
double TPSLimit;
|
||||
Standalone<StringRef> dbName;
|
||||
DatabaseConfiguration configuration;
|
||||
|
||||
Int64MetricHandle tpsLimitMetric;
|
||||
|
@ -502,7 +501,7 @@ void updateRate( Ratekeeper* self ) {
|
|||
.detail("TotalDiskUsageBytes", totalDiskUsageBytes)
|
||||
.detail("WorstStorageServerVersionLag", worstVersionLag)
|
||||
.detail("LimitingStorageServerVersionLag", limitingVersionLag)
|
||||
.trackLatest(format("%s/RkUpdate", printable(self->dbName).c_str() ).c_str());
|
||||
.trackLatest("RkUpdate");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -510,7 +509,6 @@ ACTOR Future<Void> rateKeeper(
|
|||
Reference<AsyncVar<ServerDBInfo>> dbInfo,
|
||||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges,
|
||||
FutureStream< struct GetRateInfoRequest > getRateInfo,
|
||||
Standalone<StringRef> dbName,
|
||||
DatabaseConfiguration configuration,
|
||||
double* lastLimited)
|
||||
{
|
||||
|
@ -521,7 +519,6 @@ ACTOR Future<Void> rateKeeper(
|
|||
state std::vector<Future<Void>> tlogTrackers;
|
||||
state std::vector<TLogInterface> tlogInterfs;
|
||||
state Promise<Void> err;
|
||||
self.dbName = dbName;
|
||||
self.configuration = configuration;
|
||||
self.lastLimited = lastLimited;
|
||||
|
||||
|
|
|
@ -30,7 +30,6 @@ Future<Void> rateKeeper(
|
|||
Reference<AsyncVar<struct ServerDBInfo>> const& dbInfo,
|
||||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > const& serverChanges, // actually an input, but we don't want broken_promise
|
||||
FutureStream< struct GetRateInfoRequest > const& getRateInfo,
|
||||
Standalone<StringRef> const& dbName,
|
||||
DatabaseConfiguration const& configuration,
|
||||
double* const& lastLimited);
|
||||
|
||||
|
|
|
@ -37,7 +37,6 @@ struct ServerDBInfo {
|
|||
ClientDBInfo client; // After a successful recovery, eventually proxies that communicate with it
|
||||
MasterInterface master; // The best guess as to the most recent master, which might still be recovering
|
||||
vector<ResolverInterface> resolvers;
|
||||
Standalone<StringRef> dbName;
|
||||
DBRecoveryCount recoveryCount; // A recovery count from DBCoreState. A successful master recovery increments it twice; unsuccessful recoveries may increment it once. Depending on where the current master is in its recovery process, this might not have been written by the current master.
|
||||
RecoveryState recoveryState;
|
||||
LifetimeToken masterLifetime; // Used by masterserver to detect not being the currently chosen master
|
||||
|
@ -45,15 +44,14 @@ struct ServerDBInfo {
|
|||
LogSystemConfig logSystemConfig;
|
||||
std::vector<UID> priorCommittedLogServers; // If !fullyRecovered and logSystemConfig refers to a new log system which may not have been committed to the coordinated state yet, then priorCommittedLogServers are the previous, fully committed generation which need to stay alive in case this recovery fails
|
||||
|
||||
ServerDBInfo() : recoveryCount(0), recoveryState(RecoveryState::UNINITIALIZED) {}
|
||||
explicit ServerDBInfo(StringRef const& dbName) : dbName(dbName), recoveryCount(0), recoveryState(RecoveryState::UNINITIALIZED) {}
|
||||
explicit ServerDBInfo() : recoveryCount(0), recoveryState(RecoveryState::UNINITIALIZED) {}
|
||||
|
||||
bool operator == (ServerDBInfo const& r) const { return id == r.id; }
|
||||
bool operator != (ServerDBInfo const& r) const { return id != r.id; }
|
||||
|
||||
template <class Ar>
|
||||
void serialize( Ar& ar ) {
|
||||
ar & id & clusterInterface & client & master & resolvers & dbName & recoveryCount & masterLifetime & logSystemConfig & priorCommittedLogServers & recoveryState;
|
||||
ar & id & clusterInterface & client & master & resolvers & recoveryCount & masterLifetime & logSystemConfig & priorCommittedLogServers & recoveryState;
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -132,7 +132,7 @@ ACTOR Future<Void> runBackup( Reference<ClusterConnectionFile> connFile ) {
|
|||
|
||||
if (g_simulator.backupAgents == ISimulator::BackupToFile) {
|
||||
Reference<Cluster> cluster = Cluster::createCluster(connFile, -1);
|
||||
Database cx = cluster->createDatabase(LiteralStringRef("DB")).get();
|
||||
Database cx = cluster->createDatabase().get();
|
||||
|
||||
state FileBackupAgent fileAgent;
|
||||
state double backupPollDelay = 1.0 / CLIENT_KNOBS->BACKUP_AGGREGATE_POLL_RATE;
|
||||
|
@ -160,11 +160,11 @@ ACTOR Future<Void> runDr( Reference<ClusterConnectionFile> connFile ) {
|
|||
|
||||
if (g_simulator.drAgents == ISimulator::BackupToDB) {
|
||||
Reference<Cluster> cluster = Cluster::createCluster(connFile, -1);
|
||||
Database cx = cluster->createDatabase(LiteralStringRef("DB")).get();
|
||||
Database cx = cluster->createDatabase().get();
|
||||
|
||||
Reference<ClusterConnectionFile> extraFile(new ClusterConnectionFile(*g_simulator.extraDB));
|
||||
Reference<Cluster> extraCluster = Cluster::createCluster(extraFile, -1);
|
||||
state Database extraDB = extraCluster->createDatabase(LiteralStringRef("DB")).get();
|
||||
state Database extraDB = extraCluster->createDatabase().get();
|
||||
|
||||
TraceEvent("StartingDrAgents").detail("ConnFile", connFile->getConnectionString().toString()).detail("ExtraString", extraFile->getConnectionString().toString());
|
||||
|
||||
|
|
|
@ -1069,16 +1069,16 @@ static StatusObject configurationFetcher(Optional<DatabaseConfiguration> conf, S
|
|||
return statusObj;
|
||||
}
|
||||
|
||||
ACTOR static Future<StatusObject> dataStatusFetcher(std::pair<WorkerInterface, ProcessClass> mWorker, std::string dbName, int *minReplicasRemaining) {
|
||||
ACTOR static Future<StatusObject> dataStatusFetcher(std::pair<WorkerInterface, ProcessClass> mWorker, int *minReplicasRemaining) {
|
||||
state StatusObject statusObjData;
|
||||
|
||||
try {
|
||||
std::vector<Future<TraceEventFields>> futures;
|
||||
|
||||
// TODO: Should this be serial?
|
||||
futures.push_back(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(StringRef(dbName + "/DDTrackerStarting"))), 1.0));
|
||||
futures.push_back(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(StringRef(dbName + "/DDTrackerStats"))), 1.0));
|
||||
futures.push_back(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(StringRef(dbName + "/MovingData"))), 1.0));
|
||||
futures.push_back(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("DDTrackerStarting"))), 1.0));
|
||||
futures.push_back(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("DDTrackerStats"))), 1.0));
|
||||
futures.push_back(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("MovingData"))), 1.0));
|
||||
futures.push_back(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("TotalDataInFlight"))), 1.0));
|
||||
futures.push_back(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("TotalDataInFlightRemote"))), 1.0));
|
||||
|
||||
|
@ -1291,7 +1291,7 @@ static int getExtraTLogEligibleMachines(vector<std::pair<WorkerInterface, Proces
|
|||
}
|
||||
|
||||
ACTOR static Future<StatusObject> workloadStatusFetcher(Reference<AsyncVar<struct ServerDBInfo>> db, vector<std::pair<WorkerInterface, ProcessClass>> workers, std::pair<WorkerInterface, ProcessClass> mWorker,
|
||||
std::string dbName, StatusObject *qos, StatusObject *data_overlay, std::set<std::string> *incomplete_reasons, Future<ErrorOr<vector<std::pair<StorageServerInterface, TraceEventFields>>>> storageServerFuture)
|
||||
StatusObject *qos, StatusObject *data_overlay, std::set<std::string> *incomplete_reasons, Future<ErrorOr<vector<std::pair<StorageServerInterface, TraceEventFields>>>> storageServerFuture)
|
||||
{
|
||||
state StatusObject statusObj;
|
||||
state StatusObject operationsObj;
|
||||
|
@ -1342,7 +1342,7 @@ ACTOR static Future<StatusObject> workloadStatusFetcher(Reference<AsyncVar<struc
|
|||
|
||||
// Transactions
|
||||
try {
|
||||
TraceEventFields md = wait( timeoutError(mWorker.first.eventLogRequest.getReply( EventLogRequest(StringRef(dbName+"/RkUpdate") ) ), 1.0) );
|
||||
TraceEventFields md = wait( timeoutError(mWorker.first.eventLogRequest.getReply( EventLogRequest(LiteralStringRef("RkUpdate") ) ), 1.0) );
|
||||
double tpsLimit = parseDouble(md.getValue("TPSLimit"));
|
||||
double transPerSec = parseDouble(md.getValue("ReleasedTPS"));
|
||||
int ssCount = parseInt(md.getValue("StorageServers"));
|
||||
|
@ -1701,9 +1701,6 @@ ACTOR Future<StatusReply> clusterGetStatus(
|
|||
std::vector<NetworkAddress> incompatibleConnections,
|
||||
Version datacenterVersionDifference )
|
||||
{
|
||||
// since we no longer offer multi-database support, all databases must be named DB
|
||||
state std::string dbName = "DB";
|
||||
|
||||
// Check if master worker is present
|
||||
state StatusArray messages;
|
||||
state std::set<std::string> status_incomplete_reasons;
|
||||
|
@ -1814,8 +1811,8 @@ ACTOR Future<StatusReply> clusterGetStatus(
|
|||
|
||||
state int minReplicasRemaining = -1;
|
||||
std::vector<Future<StatusObject>> futures2;
|
||||
futures2.push_back(dataStatusFetcher(mWorker, dbName, &minReplicasRemaining));
|
||||
futures2.push_back(workloadStatusFetcher(db, workers, mWorker, dbName, &qos, &data_overlay, &status_incomplete_reasons, storageServerFuture));
|
||||
futures2.push_back(dataStatusFetcher(mWorker, &minReplicasRemaining));
|
||||
futures2.push_back(workloadStatusFetcher(db, workers, mWorker, &qos, &data_overlay, &status_incomplete_reasons, storageServerFuture));
|
||||
futures2.push_back(layerStatusFetcher(cx, &messages, &status_incomplete_reasons));
|
||||
futures2.push_back(lockedStatusFetcher(db, &messages, &status_incomplete_reasons));
|
||||
|
||||
|
|
|
@ -44,10 +44,10 @@ struct WorkloadInterface {
|
|||
struct WorkloadRequest {
|
||||
Arena arena;
|
||||
StringRef title;
|
||||
StringRef database;
|
||||
int timeout;
|
||||
double databasePingDelay;
|
||||
int64_t sharedRandomNumber;
|
||||
bool useDatabase;
|
||||
|
||||
// The vector of option lists are to construct compound workloads. If there
|
||||
// is only one workload to be run...pass just one list of options!
|
||||
|
@ -68,7 +68,7 @@ struct WorkloadRequest {
|
|||
|
||||
template <class Ar>
|
||||
void serialize( Ar& ar ) {
|
||||
ar & title & database & timeout & databasePingDelay & sharedRandomNumber & options & clientId & clientCount & reply & arena;
|
||||
ar & title & timeout & databasePingDelay & sharedRandomNumber & useDatabase & options & clientId & clientCount & reply & arena;
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -201,7 +201,6 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
|
|||
|
||||
std::map<UID, ProxyVersionReplies> lastProxyVersionReplies;
|
||||
|
||||
Standalone<StringRef> dbName;
|
||||
Standalone<StringRef> dbId;
|
||||
|
||||
MasterInterface myInterface;
|
||||
|
@ -227,7 +226,6 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
|
|||
MasterInterface const& myInterface,
|
||||
ServerCoordinators const& coordinators,
|
||||
ClusterControllerFullInterface const& clusterController,
|
||||
Standalone<StringRef> const& dbName,
|
||||
Standalone<StringRef> const& dbId,
|
||||
PromiseStream<Future<Void>> const& addActor,
|
||||
bool forceRecovery
|
||||
|
@ -238,7 +236,6 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
|
|||
cstate(coordinators, addActor, dbgid),
|
||||
coordinators(coordinators),
|
||||
clusterController(clusterController),
|
||||
dbName(dbName),
|
||||
dbId(dbId),
|
||||
forceRecovery(forceRecovery),
|
||||
lastEpochEnd(invalidVersion),
|
||||
|
@ -436,7 +433,6 @@ ACTOR Future<Void> updateLogsValue( Reference<MasterData> self, Database cx ) {
|
|||
|
||||
Future<Void> sendMasterRegistration( MasterData* self, LogSystemConfig const& logSystemConfig, vector<MasterProxyInterface> proxies, vector<ResolverInterface> resolvers, DBRecoveryCount recoveryCount, vector<UID> priorCommittedLogServers ) {
|
||||
RegisterMasterRequest masterReq;
|
||||
masterReq.dbName = self->dbName;
|
||||
masterReq.id = self->myInterface.id();
|
||||
masterReq.mi = self->myInterface.locality;
|
||||
masterReq.logSystemConfig = logSystemConfig;
|
||||
|
@ -1308,7 +1304,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
|
|||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > ddStorageServerChanges;
|
||||
state double lastLimited = 0;
|
||||
self->addActor.send( reportErrorsExcept( dataDistribution( self->dbInfo, self->myInterface, self->configuration, ddStorageServerChanges, self->logSystem, self->recoveryTransactionVersion, self->primaryDcId, self->remoteDcIds, &lastLimited, remoteRecovered.getFuture() ), "DataDistribution", self->dbgid, &normalMasterErrors() ) );
|
||||
self->addActor.send( reportErrors( rateKeeper( self->dbInfo, ddStorageServerChanges, self->myInterface.getRateInfo.getFuture(), self->dbName, self->configuration, &lastLimited ), "Ratekeeper", self->dbgid) );
|
||||
self->addActor.send( reportErrors( rateKeeper( self->dbInfo, ddStorageServerChanges, self->myInterface.getRateInfo.getFuture(), self->configuration, &lastLimited ), "Ratekeeper", self->dbgid) );
|
||||
}
|
||||
|
||||
if( self->resolvers.size() > 1 )
|
||||
|
@ -1325,7 +1321,7 @@ ACTOR Future<Void> masterServer( MasterInterface mi, Reference<AsyncVar<ServerDB
|
|||
{
|
||||
state Future<Void> onDBChange = Void();
|
||||
state PromiseStream<Future<Void>> addActor;
|
||||
state Reference<MasterData> self( new MasterData( db, mi, coordinators, db->get().clusterInterface, db->get().dbName, LiteralStringRef(""), addActor, forceRecovery ) );
|
||||
state Reference<MasterData> self( new MasterData( db, mi, coordinators, db->get().clusterInterface, LiteralStringRef(""), addActor, forceRecovery ) );
|
||||
state Future<Void> collection = actorCollection( self->addActor.getFuture() );
|
||||
|
||||
TEST( !lifetime.isStillValid( db->get().masterLifetime, mi.id()==db->get().master.id() ) ); // Master born doomed
|
||||
|
|
|
@ -284,7 +284,6 @@ struct CompoundWorkload : TestWorkload {
|
|||
};
|
||||
|
||||
TestWorkload *getWorkloadIface( WorkloadRequest work, VectorRef<KeyValueRef> options, Reference<AsyncVar<ServerDBInfo>> dbInfo ) {
|
||||
options.push_back( work.arena, KeyValueRef(LiteralStringRef("dbName"), work.database) );
|
||||
Value testName = getOption( options, LiteralStringRef("testName"), LiteralStringRef("no-test-specified") );
|
||||
WorkloadContext wcx;
|
||||
wcx.clientId = work.clientId;
|
||||
|
@ -378,7 +377,7 @@ ACTOR Future<Void> testDatabaseLiveness( Database cx, double databasePingDelay,
|
|||
} catch( Error& e ) {
|
||||
if( e.code() != error_code_actor_cancelled )
|
||||
TraceEvent(SevError, ("PingingDatabaseLivenessError_" + context).c_str()).error(e)
|
||||
.detail("Database", printable(cx->dbName)).detail("PingDelay", databasePingDelay);
|
||||
.detail("PingDelay", databasePingDelay);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
@ -491,18 +490,15 @@ ACTOR Future<Void> testerServerWorkload( WorkloadRequest work, Reference<Cluster
|
|||
state Database cx;
|
||||
try {
|
||||
std::map<std::string, std::string> details;
|
||||
details["Database"] = printable(work.database);
|
||||
details["WorkloadTitle"] = printable(work.title);
|
||||
details["ClientId"] = format("%d", work.clientId);
|
||||
details["ClientCount"] = format("%d", work.clientCount);
|
||||
details["WorkloadTimeout"] = format("%d", work.timeout);
|
||||
startRole(workIface.id(), UID(), "Tester", details);
|
||||
|
||||
Standalone<StringRef> database = work.database;
|
||||
|
||||
if( database.size() ) {
|
||||
if( work.useDatabase ) {
|
||||
Reference<Cluster> cluster = Cluster::createCluster(ccf->getFilename(), -1);
|
||||
Database _cx = wait(cluster->createDatabase(database, locality));
|
||||
Database _cx = wait(cluster->createDatabase(locality));
|
||||
cx = _cx;
|
||||
|
||||
wait( delay(1.0) );
|
||||
|
@ -624,12 +620,11 @@ void logMetrics( vector<PerfMetric> metrics ) {
|
|||
}
|
||||
|
||||
ACTOR Future<DistributedTestResults> runWorkload( Database cx, std::vector< TesterInterface > testers,
|
||||
StringRef database, TestSpec spec ) {
|
||||
TestSpec spec ) {
|
||||
// FIXME: Fault tolerance for test workers (handle nonresponse or broken_promise from each getReply below)
|
||||
TraceEvent("TestRunning").detail( "WorkloadTitle", printable(spec.title) )
|
||||
.detail("TesterCount", testers.size()).detail("Phases", spec.phases)
|
||||
.detail("TestTimeout", spec.timeout)
|
||||
.detail("Database", printable( database ));
|
||||
.detail("TestTimeout", spec.timeout);
|
||||
state vector< Future< WorkloadInterface > > workRequests;
|
||||
state vector<vector<PerfMetric>> metricsResults;
|
||||
|
||||
|
@ -640,7 +635,7 @@ ACTOR Future<DistributedTestResults> runWorkload( Database cx, std::vector< Test
|
|||
for(; i < testers.size(); i++) {
|
||||
WorkloadRequest req;
|
||||
req.title = spec.title;
|
||||
req.database = database;
|
||||
req.useDatabase = spec.useDB;
|
||||
req.timeout = spec.timeout;
|
||||
req.databasePingDelay = spec.databasePingDelay;
|
||||
req.options = spec.options;
|
||||
|
@ -724,7 +719,7 @@ ACTOR Future<DistributedTestResults> runWorkload( Database cx, std::vector< Test
|
|||
}
|
||||
|
||||
//Sets the database configuration by running the ChangeConfig workload
|
||||
ACTOR Future<Void> changeConfiguration(Database cx, std::vector< TesterInterface > testers, StringRef database, StringRef configMode) {
|
||||
ACTOR Future<Void> changeConfiguration(Database cx, std::vector< TesterInterface > testers, StringRef configMode) {
|
||||
state TestSpec spec;
|
||||
Standalone<VectorRef<KeyValueRef>> options;
|
||||
spec.title = LiteralStringRef("ChangeConfig");
|
||||
|
@ -732,12 +727,12 @@ ACTOR Future<Void> changeConfiguration(Database cx, std::vector< TesterInterface
|
|||
options.push_back_deep(options.arena(), KeyValueRef(LiteralStringRef("configMode"), configMode));
|
||||
spec.options.push_back_deep(spec.options.arena(), options);
|
||||
|
||||
DistributedTestResults testResults = wait(runWorkload(cx, testers, database, spec));
|
||||
DistributedTestResults testResults = wait(runWorkload(cx, testers, spec));
|
||||
return Void();
|
||||
}
|
||||
|
||||
//Runs the consistency check workload, which verifies that the database is in a consistent state
|
||||
ACTOR Future<Void> checkConsistency(Database cx, std::vector< TesterInterface > testers, StringRef database, bool doQuiescentCheck,
|
||||
ACTOR Future<Void> checkConsistency(Database cx, std::vector< TesterInterface > testers, bool doQuiescentCheck,
|
||||
double quiescentWaitTimeout, double softTimeLimit, double databasePingDelay, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
|
||||
state TestSpec spec;
|
||||
|
||||
|
@ -761,7 +756,7 @@ ACTOR Future<Void> checkConsistency(Database cx, std::vector< TesterInterface >
|
|||
state double start = now();
|
||||
state bool lastRun = false;
|
||||
loop {
|
||||
DistributedTestResults testResults = wait(runWorkload(cx, testers, database, spec));
|
||||
DistributedTestResults testResults = wait(runWorkload(cx, testers, spec));
|
||||
if(testResults.ok() || lastRun) {
|
||||
if( g_network->isSimulated() ) {
|
||||
g_simulator.connectionFailuresDisableDuration = connectionFailures;
|
||||
|
@ -776,12 +771,12 @@ ACTOR Future<Void> checkConsistency(Database cx, std::vector< TesterInterface >
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<bool> runTest( Database cx, std::vector< TesterInterface > testers, StringRef database, TestSpec spec, Reference<AsyncVar<ServerDBInfo>> dbInfo )
|
||||
ACTOR Future<bool> runTest( Database cx, std::vector< TesterInterface > testers, TestSpec spec, Reference<AsyncVar<ServerDBInfo>> dbInfo )
|
||||
{
|
||||
state DistributedTestResults testResults;
|
||||
|
||||
try {
|
||||
Future<DistributedTestResults> fTestResults = runWorkload( cx, testers, database, spec );
|
||||
Future<DistributedTestResults> fTestResults = runWorkload( cx, testers, spec );
|
||||
if( spec.timeout > 0 ) {
|
||||
fTestResults = timeoutError( fTestResults, spec.timeout );
|
||||
}
|
||||
|
@ -816,7 +811,7 @@ ACTOR Future<bool> runTest( Database cx, std::vector< TesterInterface > testers,
|
|||
if(spec.runConsistencyCheck) {
|
||||
try {
|
||||
bool quiescent = g_network->isSimulated() ? !BUGGIFY : spec.waitForQuiescenceEnd;
|
||||
wait(timeoutError(checkConsistency(cx, testers, database, quiescent, 10000.0, 18000, spec.databasePingDelay, dbInfo), 20000.0));
|
||||
wait(timeoutError(checkConsistency(cx, testers, quiescent, 10000.0, 18000, spec.databasePingDelay, dbInfo), 20000.0));
|
||||
}
|
||||
catch(Error& e) {
|
||||
TraceEvent(SevError, "TestFailure").error(e).detail("Reason", "Unable to perform consistency check");
|
||||
|
@ -998,7 +993,6 @@ vector<TestSpec> readTests( ifstream& ifs ) {
|
|||
}
|
||||
|
||||
ACTOR Future<Void> runTests( Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> cc, Reference<AsyncVar<Optional<struct ClusterInterface>>> ci, vector< TesterInterface > testers, vector<TestSpec> tests, StringRef startingConfiguration, LocalityData locality ) {
|
||||
state Standalone<StringRef> database = LiteralStringRef("DB");
|
||||
state Database cx;
|
||||
state Reference<AsyncVar<ServerDBInfo>> dbInfo( new AsyncVar<ServerDBInfo> );
|
||||
state Future<Void> ccMonitor = monitorServerDBInfo( cc, Reference<ClusterConnectionFile>(), LocalityData(), dbInfo ); // FIXME: locality
|
||||
|
@ -1034,17 +1028,16 @@ ACTOR Future<Void> runTests( Reference<AsyncVar<Optional<struct ClusterControlle
|
|||
databasePingDelay = 0.0;
|
||||
|
||||
if (useDB) {
|
||||
Database _cx = wait( DatabaseContext::createDatabase( ci, Reference<Cluster>(), database, locality ) );
|
||||
Database _cx = wait( DatabaseContext::createDatabase( ci, Reference<Cluster>(), locality ) );
|
||||
cx = _cx;
|
||||
} else
|
||||
database = LiteralStringRef("");
|
||||
}
|
||||
|
||||
state Future<Void> disabler = disableConnectionFailuresAfter(450, "Tester");
|
||||
|
||||
//Change the configuration (and/or create the database) if necessary
|
||||
if(useDB && startingConfiguration != StringRef()) {
|
||||
try {
|
||||
wait(timeoutError(changeConfiguration(cx, testers, database, startingConfiguration), 2000.0));
|
||||
wait(timeoutError(changeConfiguration(cx, testers, startingConfiguration), 2000.0));
|
||||
}
|
||||
catch(Error& e) {
|
||||
TraceEvent(SevError, "TestFailure").error(e).detail("Reason", "Unable to set starting configuration");
|
||||
|
@ -1065,7 +1058,7 @@ ACTOR Future<Void> runTests( Reference<AsyncVar<Optional<struct ClusterControlle
|
|||
TraceEvent("TestsExpectedToPass").detail("Count", tests.size());
|
||||
state int idx = 0;
|
||||
for(; idx < tests.size(); idx++ ) {
|
||||
bool ok = wait( runTest( cx, testers, database, tests[idx], dbInfo ) );
|
||||
bool ok = wait( runTest( cx, testers, tests[idx], dbInfo ) );
|
||||
// do we handle a failure here?
|
||||
}
|
||||
|
||||
|
|
|
@ -490,7 +490,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
|
|||
state ActorCollection filesClosed(true);
|
||||
state Promise<Void> stopping;
|
||||
state WorkerCache<InitializeStorageReply> storageCache;
|
||||
state Reference<AsyncVar<ServerDBInfo>> dbInfo( new AsyncVar<ServerDBInfo>(ServerDBInfo(LiteralStringRef("DB"))) );
|
||||
state Reference<AsyncVar<ServerDBInfo>> dbInfo( new AsyncVar<ServerDBInfo>(ServerDBInfo()) );
|
||||
state Future<Void> metricsLogger;
|
||||
state PromiseStream<InitializeTLogRequest> tlogRequests;
|
||||
state Future<Void> tlog = Void();
|
||||
|
@ -504,7 +504,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
|
|||
if( metricsConnFile.size() > 0) {
|
||||
try {
|
||||
state Reference<Cluster> cluster = Cluster::createCluster( metricsConnFile, Cluster::API_VERSION_LATEST );
|
||||
metricsLogger = runMetrics( cluster->createDatabase(LiteralStringRef("DB"), locality), KeyRef(metricsPrefix) );
|
||||
metricsLogger = runMetrics( cluster->createDatabase(locality), KeyRef(metricsPrefix) );
|
||||
} catch(Error &e) {
|
||||
TraceEvent(SevWarnAlways, "TDMetricsBadClusterFile").error(e).detail("ConnFile", metricsConnFile);
|
||||
}
|
||||
|
|
|
@ -282,7 +282,7 @@ struct ApiWorkload : TestWorkload {
|
|||
if(useExtraDB) {
|
||||
Reference<ClusterConnectionFile> extraFile(new ClusterConnectionFile(*g_simulator.extraDB));
|
||||
Reference<Cluster> extraCluster = Cluster::createCluster(extraFile, -1);
|
||||
extraDB = extraCluster->createDatabase(LiteralStringRef("DB")).get();
|
||||
extraDB = extraCluster->createDatabase().get();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ struct AtomicSwitchoverWorkload : TestWorkload {
|
|||
|
||||
Reference<ClusterConnectionFile> extraFile(new ClusterConnectionFile(*g_simulator.extraDB));
|
||||
Reference<Cluster> extraCluster = Cluster::createCluster(extraFile, -1);
|
||||
extraDB = extraCluster->createDatabase(LiteralStringRef("DB")).get();
|
||||
extraDB = extraCluster->createDatabase().get();
|
||||
}
|
||||
|
||||
virtual std::string description() {
|
||||
|
|
|
@ -38,7 +38,7 @@ struct BackupToDBAbort : TestWorkload {
|
|||
|
||||
Reference<ClusterConnectionFile> extraFile(new ClusterConnectionFile(*g_simulator.extraDB));
|
||||
Reference<Cluster> extraCluster = Cluster::createCluster(extraFile, -1);
|
||||
extraDB = extraCluster->createDatabase(LiteralStringRef("DB")).get();
|
||||
extraDB = extraCluster->createDatabase().get();
|
||||
|
||||
lockid = UID(0xbeeffeed, 0xdecaf00d);
|
||||
}
|
||||
|
|
|
@ -105,7 +105,7 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
|
|||
|
||||
Reference<ClusterConnectionFile> extraFile(new ClusterConnectionFile(*g_simulator.extraDB));
|
||||
Reference<Cluster> extraCluster = Cluster::createCluster(extraFile, -1);
|
||||
extraDB = extraCluster->createDatabase(LiteralStringRef("DB")).get();
|
||||
extraDB = extraCluster->createDatabase().get();
|
||||
|
||||
TraceEvent("BARW_Start").detail("Locked", locked);
|
||||
}
|
||||
|
|
|
@ -70,7 +70,7 @@ struct BackupToDBUpgradeWorkload : TestWorkload {
|
|||
|
||||
Reference<ClusterConnectionFile> extraFile(new ClusterConnectionFile(*g_simulator.extraDB));
|
||||
Reference<Cluster> extraCluster = Cluster::createCluster(extraFile, -1);
|
||||
extraDB = extraCluster->createDatabase(LiteralStringRef("DB")).get();
|
||||
extraDB = extraCluster->createDatabase().get();
|
||||
|
||||
TraceEvent("DRU_Start");
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ struct ChangeConfigWorkload : TestWorkload {
|
|||
if (g_network->isSimulated() && g_simulator.extraDB) {
|
||||
Reference<ClusterConnectionFile> extraFile(new ClusterConnectionFile(*g_simulator.extraDB));
|
||||
Reference<Cluster> cluster = Cluster::createCluster(extraFile, -1);
|
||||
state Database extraDB = cluster->createDatabase(LiteralStringRef("DB")).get();
|
||||
state Database extraDB = cluster->createDatabase().get();
|
||||
|
||||
wait(delay(5*g_random->random01()));
|
||||
if (self->configMode.size()) {
|
||||
|
|
|
@ -40,9 +40,9 @@ struct DDMetricsWorkload : TestWorkload {
|
|||
ACTOR Future<int> getHighPriorityRelocationsInFlight( Database cx, DDMetricsWorkload *self ) {
|
||||
WorkerInterface masterWorker = wait(getMasterWorker(cx, self->dbInfo));
|
||||
|
||||
TraceEvent("GetHighPriorityReliocationsInFlight").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingMaster");
|
||||
TraceEvent("GetHighPriorityReliocationsInFlight").detail("Stage", "ContactingMaster");
|
||||
TraceEventFields md = wait( timeoutError(masterWorker.eventLogRequest.getReply(
|
||||
EventLogRequest( StringRef( cx->dbName.toString() + "/MovingData" ) ) ), 1.0 ) );
|
||||
EventLogRequest( LiteralStringRef( "MovingData" ) ) ), 1.0 ) );
|
||||
int relocations;
|
||||
sscanf(md.getValue("HighPriorityRelocations").c_str(), "%d", &relocations);
|
||||
return relocations;
|
||||
|
|
|
@ -131,7 +131,7 @@ struct PerformanceWorkload : TestWorkload {
|
|||
TestSpec spec( LiteralStringRef("PerformanceSetup"), false, false );
|
||||
spec.options = options;
|
||||
spec.phases = TestWorkload::SETUP;
|
||||
DistributedTestResults results = wait( runWorkload( cx, testers, self->dbName, spec ) );
|
||||
DistributedTestResults results = wait( runWorkload( cx, testers, spec ) );
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -166,7 +166,7 @@ struct PerformanceWorkload : TestWorkload {
|
|||
TestSpec spec( LiteralStringRef("PerformanceRun"), false, false );
|
||||
spec.phases = TestWorkload::EXECUTION | TestWorkload::METRICS;
|
||||
spec.options = options;
|
||||
DistributedTestResults r = wait( runWorkload( cx, self->testers, self->dbName, spec ) );
|
||||
DistributedTestResults r = wait( runWorkload( cx, self->testers, spec ) );
|
||||
results = r;
|
||||
} catch(Error& e) {
|
||||
TraceEvent("PerformanceRunError").error(e, true).detail("Workload", printable(self->probeWorkload));
|
||||
|
|
|
@ -114,8 +114,7 @@ struct MoveKeysWorkload : TestWorkload {
|
|||
return vector<StorageServerInterface>(t.begin(), t.end());
|
||||
}
|
||||
|
||||
ACTOR Future<Void> doMoveKeys(Database cx, MoveKeysWorkload *self, KeyRange keys, vector<StorageServerInterface> destinationTeam,
|
||||
MoveKeysLock lock, std::string dbName ) {
|
||||
ACTOR Future<Void> doMoveKeys(Database cx, MoveKeysWorkload *self, KeyRange keys, vector<StorageServerInterface> destinationTeam, MoveKeysLock lock ) {
|
||||
state TraceInterval relocateShardInterval("RelocateShard");
|
||||
state FlowLock fl1(1);
|
||||
state FlowLock fl2(1);
|
||||
|
@ -196,7 +195,7 @@ struct MoveKeysWorkload : TestWorkload {
|
|||
inFlight.insert( keys, team );
|
||||
for(int r=0; r<ranges.size(); r++) {
|
||||
auto& rTeam = inFlight.rangeContaining(ranges[r].begin)->value();
|
||||
inFlightActors.insert( ranges[r], self->doMoveKeys( cx, self, ranges[r], rTeam, lock, self->dbName.toString() ) );
|
||||
inFlightActors.insert( ranges[r], self->doMoveKeys( cx, self, ranges[r], rTeam, lock ) );
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
|
|
|
@ -152,7 +152,7 @@ struct VersionStampWorkload : TestWorkload {
|
|||
if (self->validateExtraDB) {
|
||||
Reference<ClusterConnectionFile> extraFile(new ClusterConnectionFile(*g_simulator.extraDB));
|
||||
Reference<Cluster> extraCluster = Cluster::createCluster(extraFile, -1);
|
||||
cx = extraCluster->createDatabase(LiteralStringRef("DB")).get();
|
||||
cx = extraCluster->createDatabase().get();
|
||||
}
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
// We specifically wish to grab the smalles read version that we can get and maintain it, to
|
||||
|
@ -243,7 +243,7 @@ struct VersionStampWorkload : TestWorkload {
|
|||
if (g_simulator.extraDB != NULL) {
|
||||
Reference<ClusterConnectionFile> extraFile(new ClusterConnectionFile(*g_simulator.extraDB));
|
||||
Reference<Cluster> extraCluster = Cluster::createCluster(extraFile, -1);
|
||||
state Database extraDB = extraCluster->createDatabase(LiteralStringRef("DB")).get();
|
||||
state Database extraDB = extraCluster->createDatabase().get();
|
||||
}
|
||||
|
||||
loop{
|
||||
|
|
|
@ -78,7 +78,7 @@ struct WriteDuringReadWorkload : TestWorkload {
|
|||
if(useExtraDB) {
|
||||
Reference<ClusterConnectionFile> extraFile(new ClusterConnectionFile(*g_simulator.extraDB));
|
||||
Reference<Cluster> extraCluster = Cluster::createCluster(extraFile, -1);
|
||||
extraDB = extraCluster->createDatabase(LiteralStringRef("DB")).get();
|
||||
extraDB = extraCluster->createDatabase().get();
|
||||
useSystemKeys = false;
|
||||
}
|
||||
|
||||
|
|
|
@ -54,13 +54,11 @@ private:
|
|||
|
||||
struct TestWorkload : NonCopyable, WorkloadContext {
|
||||
int phases;
|
||||
Value dbName;
|
||||
|
||||
// Subclasses are expected to also have a constructor with this signature (to work with WorkloadFactory<>):
|
||||
explicit TestWorkload(WorkloadContext const& wcx)
|
||||
: WorkloadContext(wcx)
|
||||
{
|
||||
dbName = getOption( options, LiteralStringRef("dbName"), StringRef() );
|
||||
bool runSetup = getOption( options, LiteralStringRef("runSetup"), true );
|
||||
phases = TestWorkload::EXECUTION | TestWorkload::CHECK | TestWorkload::METRICS;
|
||||
if( runSetup )
|
||||
|
@ -199,7 +197,7 @@ public:
|
|||
|
||||
Future<DistributedTestResults> runWorkload(
|
||||
Database const& cx, std::vector< TesterInterface > const& testers,
|
||||
StringRef const& database, TestSpec const& spec );
|
||||
TestSpec const& spec );
|
||||
|
||||
void logMetrics( vector<PerfMetric> metrics );
|
||||
|
||||
|
|
|
@ -55,7 +55,7 @@ using namespace boost::asio::ip;
|
|||
//
|
||||
// xyzdev
|
||||
// vvvv
|
||||
const uint64_t currentProtocolVersion = 0x0FDB00B061000001LL;
|
||||
const uint64_t currentProtocolVersion = 0x0FDB00B061010001LL;
|
||||
const uint64_t compatibleProtocolVersionMask = 0xffffffffffff0000LL;
|
||||
const uint64_t minValidProtocolVersion = 0x0FDB00A200060001LL;
|
||||
|
||||
|
|
Loading…
Reference in New Issue