Merge pull request #1160 from alexmiller-apple/tstlog-fork

Spill-By-Reference TLog Part 2: New and Old TLogServers co-exist harmoniously
This commit is contained in:
Evan Tschannen 2019-02-26 18:00:04 -08:00 committed by GitHub
commit 8afb7fbb9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 2551 additions and 30 deletions

View File

@ -11,6 +11,8 @@ Improved replication mechanism, a new hierarchical replication technique that fu
* Get read version, read, and commit requests are counted and aggregated by server-side latency in configurable latency bands and output in JSON status. `(PR #1084) <https://github.com/apple/foundationdb/pull/1084>`_
* Added configuration option to choose log spilling implementation `(PR #1160) <https://github.com/apple/foundationdb/pull/1160>`_
* Added configuration option to choose log system implementation `(PR #1160) <https://github.com/apple/foundationdb/pull/1160>`_
Performance
-----------

View File

@ -30,7 +30,9 @@ void DatabaseConfiguration::resetInternal() {
// does NOT reset rawConfiguration
initialized = false;
masterProxyCount = resolverCount = desiredTLogCount = tLogWriteAntiQuorum = tLogReplicationFactor = storageTeamSize = desiredLogRouterCount = -1;
tLogVersion = TLogVersion::DEFAULT;
tLogDataStoreType = storageServerStoreType = KeyValueStoreType::END;
tLogSpillType = TLogSpillType::DEFAULT;
autoMasterProxyCount = CLIENT_KNOBS->DEFAULT_AUTO_PROXIES;
autoResolverCount = CLIENT_KNOBS->DEFAULT_AUTO_RESOLVERS;
autoDesiredTLogCount = CLIENT_KNOBS->DEFAULT_AUTO_LOGS;
@ -166,7 +168,12 @@ bool DatabaseConfiguration::isValid() const {
getDesiredProxies() >= 1 &&
getDesiredLogs() >= 1 &&
getDesiredResolvers() >= 1 &&
tLogVersion != TLogVersion::UNSET &&
tLogVersion >= TLogVersion::MIN_RECRUITABLE &&
tLogVersion <= TLogVersion::MAX_SUPPORTED &&
tLogDataStoreType != KeyValueStoreType::END &&
tLogSpillType != TLogSpillType::UNSET &&
!(tLogSpillType == TLogSpillType::REFERENCE && tLogVersion < TLogVersion::V3) &&
storageServerStoreType != KeyValueStoreType::END &&
autoMasterProxyCount >= 1 &&
autoResolverCount >= 1 &&
@ -244,6 +251,10 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
if(!noPolicies) result["log_replication_policy"] = tLogPolicy->info();
}
if ( tLogVersion > TLogVersion::DEFAULT ) {
result["log_version"] = (int)tLogVersion;
}
if( tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V1 && storageServerStoreType == KeyValueStoreType::SSD_BTREE_V1) {
result["storage_engine"] = "ssd-1";
} else if (tLogDataStoreType == KeyValueStoreType::SSD_BTREE_V2 && storageServerStoreType == KeyValueStoreType::SSD_BTREE_V2) {
@ -256,6 +267,8 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
result["storage_engine"] = "custom";
}
result["log_spill"] = (int)tLogSpillType;
if( remoteTLogReplicationFactor == 1 ) {
result["remote_redundancy_mode"] = "remote_single";
} else if( remoteTLogReplicationFactor == 2 ) {
@ -371,11 +384,18 @@ bool DatabaseConfiguration::setInternal(KeyRef key, ValueRef value) {
else if (ck == LiteralStringRef("log_replicas")) parse(&tLogReplicationFactor, value);
else if (ck == LiteralStringRef("log_anti_quorum")) parse(&tLogWriteAntiQuorum, value);
else if (ck == LiteralStringRef("storage_replicas")) parse(&storageTeamSize, value);
else if (ck == LiteralStringRef("log_version")) {
parse((&type), value);
type = std::max((int)TLogVersion::MIN_RECRUITABLE, type);
type = std::min((int)TLogVersion::MAX_SUPPORTED, type);
tLogVersion = (TLogVersion::Version)type;
}
else if (ck == LiteralStringRef("log_engine")) { parse((&type), value); tLogDataStoreType = (KeyValueStoreType::StoreType)type;
// TODO: Remove this once Redwood works as a log engine
if(tLogDataStoreType == KeyValueStoreType::SSD_REDWOOD_V1)
tLogDataStoreType = KeyValueStoreType::SSD_BTREE_V2;
}
else if (ck == LiteralStringRef("log_spill")) { parse((&type), value); tLogSpillType = (TLogSpillType::SpillType)type; }
else if (ck == LiteralStringRef("storage_engine")) { parse((&type), value); storageServerStoreType = (KeyValueStoreType::StoreType)type; }
else if (ck == LiteralStringRef("auto_proxies")) parse(&autoMasterProxyCount, value);
else if (ck == LiteralStringRef("auto_resolvers")) parse(&autoResolverCount, value);

View File

@ -162,7 +162,9 @@ struct DatabaseConfiguration {
int32_t autoDesiredTLogCount;
int32_t tLogWriteAntiQuorum;
int32_t tLogReplicationFactor;
TLogVersion tLogVersion;
KeyValueStoreType tLogDataStoreType;
TLogSpillType tLogSpillType;
// Storage Servers
IRepPolicyRef storagePolicy;

View File

@ -513,6 +513,82 @@ private:
uint32_t type;
};
struct TLogVersion {
enum Version {
UNSET = 0,
// Everything between BEGIN and END should be densely packed, so that we
// can iterate over them easily.
// V1 = 1, // 4.6 is dispatched to via 6.0
V2 = 2, // 6.0
V3 = 3, // 6.1
MIN_SUPPORTED = V2,
MAX_SUPPORTED = V3,
MIN_RECRUITABLE = V2,
DEFAULT = V2,
} version;
TLogVersion() : version(UNSET) {}
TLogVersion( Version v ) : version(v) {}
operator Version() const {
return version;
}
template <class Ar>
void serialize(Ar& ar) {
uint32_t v = (uint32_t)version;
serializer(ar, v);
version = (Version)v;
}
static ErrorOr<TLogVersion> FromStringRef( StringRef s ) {
if (s == LiteralStringRef("2")) return V2;
if (s == LiteralStringRef("3")) return V3;
return default_error_or();
}
};
struct TLogSpillType {
// These enumerated values are stored in the database configuration, so can NEVER be changed. Only add new ones just before END.
enum SpillType {
UNSET = 0,
DEFAULT = 1,
VALUE = 1,
REFERENCE = 2,
END = 3,
};
TLogSpillType() : type(DEFAULT) {}
TLogSpillType( SpillType type ) : type(type) {
if ((uint32_t)type >= END) {
this->type = UNSET;
}
}
operator SpillType() const { return SpillType(type); }
template <class Ar>
void serialize(Ar& ar) { serializer(ar, type); }
std::string toString() const {
switch( type ) {
case VALUE: return "value";
case REFERENCE: return "reference";
case UNSET: return "unset";
default: ASSERT(false);
}
return "";
}
static ErrorOr<TLogSpillType> FromStringRef( StringRef s ) {
if ( s == LiteralStringRef("1") ) return VALUE;
if ( s == LiteralStringRef("2") ) return REFERENCE;
return default_error_or();
}
private:
uint32_t type;
};
//Contains the amount of free and total space for a storage server, in bytes
struct StorageBytes {
int64_t free;

View File

@ -467,6 +467,9 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"resolvers":1,
"storage_replication_policy":"(zoneid^3x1)",
"logs":2,
"log_version":2,
"log_engine":1,
"log_spill":1,
"storage_engine":{
"$enum":[
"ssd",

View File

@ -430,6 +430,10 @@ public:
g_simulator.connectionFailuresDisableDuration = 1e6;
}
// Filesystems on average these days seem to start to have limits of around 255 characters for a
// filename. We add ".part" below, so we need to stay under 250.
ASSERT( basename(filename).size() < 250 );
wait( g_simulator.onMachine( currentProcess ) );
try {
wait( delay(FLOW_KNOBS->MIN_OPEN_TIME + g_random->random01() * (FLOW_KNOBS->MAX_OPEN_TIME - FLOW_KNOBS->MIN_OPEN_TIME) ) );

View File

@ -48,7 +48,8 @@ set(FDBSERVER_SRCS
MoveKeys.actor.h
networktest.actor.cpp
NetworkTest.h
OldTLogServer.actor.cpp
OldTLogServer_4_6.actor.cpp
OldTLogServer_6_0.actor.cpp
Orderer.actor.h
pubsub.actor.cpp
pubsub.h

View File

@ -22,7 +22,9 @@
#define FDBSERVER_DBCORESTATE_H
#pragma once
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/ReplicationPolicy.h"
#include "fdbserver/MasterInterface.h"
// This structure is stored persistently in CoordinatedState and must be versioned carefully!
// It records a synchronous replication topology which can be used in the absence of faults (or under a limited
@ -44,6 +46,7 @@ struct CoreTLogSet {
int8_t locality;
Version startVersion;
std::vector<std::vector<int>> satelliteTagLocations;
TLogVersion tLogVersion;
CoreTLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityUpgraded), startVersion(invalidVersion) {}
@ -54,7 +57,12 @@ struct CoreTLogSet {
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, tLogs, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations);
serializer(ar, tLogs, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations);
if (ar.isDeserializing && ar.protocolVersion() < 0x0FDB00B061030001LL) {
tLogVersion = TLogVersion::V2;
} else {
serializer(ar, tLogVersion);
}
}
};
@ -77,6 +85,7 @@ struct OldTLogCoreData {
else if(ar.isDeserializing) {
tLogs.push_back(CoreTLogSet());
serializer(ar, tLogs[0].tLogs, tLogs[0].tLogWriteAntiQuorum, tLogs[0].tLogReplicationFactor, tLogs[0].tLogPolicy, epochEnd, tLogs[0].tLogLocalities);
tLogs[0].tLogVersion = TLogVersion::V2;
}
}
};
@ -126,6 +135,7 @@ struct DBCoreState {
} else if(ar.isDeserializing) {
tLogs.push_back(CoreTLogSet());
serializer(ar, tLogs[0].tLogs, tLogs[0].tLogWriteAntiQuorum, recoveryCount, tLogs[0].tLogReplicationFactor, logSystemType);
tLogs[0].tLogVersion = TLogVersion::V2;
uint64_t tLocalitySize = (uint64_t)tLogs[0].tLogLocalities.size();
serializer(ar, oldTLogData, tLogs[0].tLogPolicy, tLocalitySize);

View File

@ -1160,8 +1160,9 @@ private:
.detail("Context", context)
.detail("File0Name", rawQueue->files[0].dbgFilename);
for(int i = 1; i >= 0; i--)
if ( firstPages(i).checkHash() && firstPages(i).seq <= (size_t)loc ) {
for(int i = 1; i >= 0; i--) {
ASSERT_WE_THINK( firstPages(i).checkHash() );
if ( firstPages(i).seq <= (size_t)loc ) {
*file = i;
*page = (loc - firstPages(i).seq)/sizeof(Page);
if (context)
@ -1176,6 +1177,7 @@ private:
ok = true;
break;
}
}
if (!ok)
TraceEvent(SevError, "DiskQueueLocationError", dbgid)
.detail("Page0Valid", firstPages(0).checkHash())

View File

@ -39,6 +39,7 @@ public:
int32_t tLogWriteAntiQuorum;
int32_t tLogReplicationFactor;
std::vector< LocalityData > tLogLocalities; // Stores the localities of the log servers
TLogVersion tLogVersion;
IRepPolicyRef tLogPolicy;
LocalitySetRef logServerSet;
std::vector<int> logIndexArray;

View File

@ -60,6 +60,7 @@ struct TLogSet {
std::vector<OptionalInterface<TLogInterface>> logRouters;
int32_t tLogWriteAntiQuorum, tLogReplicationFactor;
std::vector< LocalityData > tLogLocalities; // Stores the localities of the log servers
TLogVersion tLogVersion;
IRepPolicyRef tLogPolicy;
bool isLocal;
int8_t locality;
@ -112,6 +113,12 @@ struct TLogSet {
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, tLogs, logRouters, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations);
if (ar.isDeserializing && ar.protocolVersion() < 0x0FDB00B061030001LL) {
tLogVersion = TLogVersion::V2;
} else {
serializer(ar, tLogVersion);
}
ASSERT(tLogPolicy.getPtr() == nullptr || tLogVersion != TLogVersion::UNSET);
}
};

View File

@ -43,7 +43,7 @@ using std::make_pair;
using std::min;
using std::max;
namespace oldTLog {
namespace oldTLog_4_6 {
typedef int16_t OldTag;

File diff suppressed because it is too large Load Diff

View File

@ -826,6 +826,26 @@ void SimulationConfig::generateNormalConfig(int minimumReplication, int minimumR
ASSERT(false); // Programmer forgot to adjust cases.
}
if (g_random->random01() < 0.5) {
if (g_random->random01() < 0.5) {
set_config("log_spill:=1"); // VALUE
}
int logVersion = g_random->randomInt( 0, 3 );
switch (logVersion) {
case 0:
break;
case 1:
set_config("log_version:=2"); // 6.0
break;
case 2:
set_config("log_version:=3"); // 6.1
break;
}
} else {
set_config("log_version:=3"); // 6.1
set_config("log_spill:=2"); // REFERENCE
}
if(generateFearless || (datacenters == 2 && g_random->random01() < 0.5)) {
//The kill region workload relies on the fact that all "0", "2", and "4" are all of the possible primary dcids.
StatusObject primaryObj;

View File

@ -181,6 +181,8 @@ private:
////// Persistence format (for self->persistentData)
// Immutable keys
// persistFormat has been mostly invalidated by TLogVersion, and can probably be removed when
// 4.6's TLog code is removed.
static const KeyValueRef persistFormat( LiteralStringRef( "Format" ), LiteralStringRef("FoundationDB/LogServer/2/4") );
static const KeyRangeRef persistFormatReadableRange( LiteralStringRef("FoundationDB/LogServer/2/3"), LiteralStringRef("FoundationDB/LogServer/2/5") );
static const KeyRangeRef persistRecoveryCountKeys = KeyRangeRef( LiteralStringRef( "DbRecoveryCount/" ), LiteralStringRef( "DbRecoveryCount0" ) );
@ -1713,7 +1715,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
tlogRequests.getFuture().pop().reply.sendError(recruitment_failed());
}
wait( oldTLog::tLog(self->persistentData, self->rawPersistentQueue, self->dbInfo, locality, self->dbgid) );
wait( oldTLog_4_6::tLog(self->persistentData, self->rawPersistentQueue, self->dbInfo, locality, self->dbgid) );
throw internal_error();
}

View File

@ -145,6 +145,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for( auto& log : tLogSet.logRouters) {
logSet->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
logSet->tLogVersion = tLogSet.tLogVersion;
logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum;
logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor;
logSet->tLogPolicy = tLogSet.tLogPolicy;
@ -171,6 +172,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for( auto & log : tLogData.logRouters) {
logSet->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
logSet->tLogVersion = tLogData.tLogVersion;
logSet->tLogWriteAntiQuorum = tLogData.tLogWriteAntiQuorum;
logSet->tLogReplicationFactor = tLogData.tLogReplicationFactor;
logSet->tLogPolicy = tLogData.tLogPolicy;
@ -207,6 +209,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for( auto & log : tLogSet.logRouters) {
logSet->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
logSet->tLogVersion = tLogSet.tLogVersion;
logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum;
logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor;
logSet->tLogPolicy = tLogSet.tLogPolicy;
@ -234,6 +237,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for( auto & log : tLogSet.logRouters) {
logSet->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
logSet->tLogVersion = tLogSet.tLogVersion;
logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum;
logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor;
logSet->tLogPolicy = tLogSet.tLogPolicy;
@ -271,6 +275,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
coreSet.tLogs.push_back(log->get().id());
coreSet.tLogLocalities.push_back(log->get().interf().locality);
}
coreSet.tLogVersion = t->tLogVersion;
coreSet.tLogWriteAntiQuorum = t->tLogWriteAntiQuorum;
coreSet.tLogReplicationFactor = t->tLogReplicationFactor;
coreSet.tLogPolicy = t->tLogPolicy;
@ -293,6 +298,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
coreSet.tLogs.push_back(log->get().id());
}
coreSet.tLogLocalities = t->tLogLocalities;
coreSet.tLogVersion = t->tLogVersion;
coreSet.tLogWriteAntiQuorum = t->tLogWriteAntiQuorum;
coreSet.tLogReplicationFactor = t->tLogReplicationFactor;
coreSet.tLogPolicy = t->tLogPolicy;
@ -1032,6 +1038,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(logSet->isLocal || remoteLogsWrittenToCoreState) {
logSystemConfig.tLogs.push_back(TLogSet());
TLogSet& log = logSystemConfig.tLogs.back();
log.tLogVersion = logSet->tLogVersion;
log.tLogWriteAntiQuorum = logSet->tLogWriteAntiQuorum;
log.tLogReplicationFactor = logSet->tLogReplicationFactor;
log.tLogPolicy = logSet->tLogPolicy;
@ -1059,6 +1066,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for( int j = 0; j < oldLogData[i].tLogs.size(); j++ ) {
TLogSet& log = logSystemConfig.oldTLogs[i].tLogs[j];
Reference<LogSet> logSet = oldLogData[i].tLogs[j];
log.tLogVersion = logSet->tLogVersion;
log.tLogWriteAntiQuorum = logSet->tLogWriteAntiQuorum;
log.tLogReplicationFactor = logSet->tLogReplicationFactor;
log.tLogPolicy = logSet->tLogPolicy;
@ -1384,6 +1392,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
failed.push_back( Reference<AsyncVar<bool>>( new AsyncVar<bool>() ) );
failureTrackers.push_back( monitorLog(logVar, failed[j] ) );
}
logSet->tLogVersion = coreSet.tLogVersion;
logSet->tLogReplicationFactor = coreSet.tLogReplicationFactor;
logSet->tLogWriteAntiQuorum = coreSet.tLogWriteAntiQuorum;
logSet->tLogPolicy = coreSet.tLogPolicy;
@ -1410,6 +1419,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSet->logServers.push_back( logVar );
allLogServers.push_back( logVar );
}
logSet->tLogVersion = log.tLogVersion;
logSet->tLogReplicationFactor = log.tLogReplicationFactor;
logSet->tLogWriteAntiQuorum = log.tLogWriteAntiQuorum;
logSet->tLogPolicy = log.tLogPolicy;
@ -1729,6 +1739,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
state Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
logSet->tLogReplicationFactor = configuration.getRemoteTLogReplicationFactor();
logSet->tLogVersion = configuration.tLogVersion;
logSet->tLogPolicy = configuration.getRemoteTLogPolicy();
logSet->isLocal = false;
logSet->locality = remoteLocality;
@ -1786,7 +1797,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for( int i = 0; i < remoteWorkers.remoteTLogs.size(); i++ ) {
InitializeTLogRequest &req = remoteTLogReqs[i];
req.recruitmentID = self->recruitmentID;
req.logVersion = configuration.tLogVersion;
req.storeType = configuration.tLogDataStoreType;
req.spillType = configuration.tLogSpillType;
req.recoverFrom = oldLogSystem->getLogSystemConfig();
req.recoverAt = oldLogSystem->recoverAt.get();
req.knownCommittedVersion = oldLogSystem->knownCommittedVersion;
@ -1858,6 +1871,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
logSystem->tLogs.push_back( Reference<LogSet>( new LogSet() ) );
logSystem->tLogs[0]->tLogVersion = configuration.tLogVersion;
logSystem->tLogs[0]->tLogWriteAntiQuorum = configuration.tLogWriteAntiQuorum;
logSystem->tLogs[0]->tLogReplicationFactor = configuration.tLogReplicationFactor;
logSystem->tLogs[0]->tLogPolicy = configuration.tLogPolicy;
@ -1879,6 +1893,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
logSystem->tLogs[1]->isLocal = true;
logSystem->tLogs[1]->locality = tagLocalitySatellite;
logSystem->tLogs[1]->tLogVersion = configuration.tLogVersion;
logSystem->tLogs[1]->startVersion = oldLogSystem->knownCommittedVersion + 1;
logSystem->tLogs[1]->tLogLocalities.resize( recr.satelliteTLogs.size() );
@ -1962,7 +1977,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for( int i = 0; i < recr.tLogs.size(); i++ ) {
InitializeTLogRequest &req = reqs[i];
req.recruitmentID = logSystem->recruitmentID;
req.logVersion = configuration.tLogVersion;
req.storeType = configuration.tLogDataStoreType;
req.spillType = configuration.tLogSpillType;
req.recoverFrom = oldLogSystem->getLogSystemConfig();
req.recoverAt = oldLogSystem->recoverAt.get();
req.knownCommittedVersion = oldLogSystem->knownCommittedVersion;
@ -2005,7 +2022,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for( int i = 0; i < recr.satelliteTLogs.size(); i++ ) {
InitializeTLogRequest &req = sreqs[i];
req.recruitmentID = logSystem->recruitmentID;
req.logVersion = configuration.tLogVersion;
req.storeType = configuration.tLogDataStoreType;
req.spillType = configuration.tLogSpillType;
req.recoverFrom = oldLogSystem->getLogSystemConfig();
req.recoverAt = oldLogSystem->recoverAt.get();
req.knownCommittedVersion = oldLogSystem->knownCommittedVersion;

View File

@ -80,7 +80,9 @@ struct InitializeTLogRequest {
LogEpoch epoch;
std::vector<Tag> recoverTags;
std::vector<Tag> allTags;
TLogVersion logVersion;
KeyValueStoreType storeType;
TLogSpillType spillType;
Tag remoteTag;
int8_t locality;
bool isPrimary;
@ -93,7 +95,7 @@ struct InitializeTLogRequest {
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, recruitmentID, recoverFrom, recoverAt, knownCommittedVersion, epoch, recoverTags, allTags, storeType, remoteTag, locality, isPrimary, startVersion, logRouterTags, reply);
serializer(ar, recruitmentID, recoverFrom, recoverAt, knownCommittedVersion, epoch, recoverTags, allTags, storeType, remoteTag, locality, isPrimary, startVersion, logRouterTags, reply, logVersion, spillType);
}
};
@ -363,10 +365,18 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface ddi, Reference<Async
void registerThreadForProfiling();
void updateCpuProfiler(ProfilerRequest req);
namespace oldTLog {
namespace oldTLog_4_6 {
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality, UID tlogId);
}
namespace oldTLog_6_0 {
ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db, LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests, UID tlogId, bool restoreFromDisk,
Promise<Void> oldLog, Promise<Void> recovered);
}
typedef decltype(&tLog) TLogFn;
#include "flow/unactorcompiler.h"
#endif

View File

@ -57,8 +57,9 @@
<ActorCompiler Include="LogSystemPeekCursor.actor.cpp" />
<ActorCompiler Include="MemoryPager.actor.cpp" />
<ActorCompiler Include="LogRouter.actor.cpp" />
<ActorCompiler Include="OldTLogServer.actor.cpp" />
<ClCompile Include="LatencyBandConfig.cpp" />
<ActorCompiler Include="OldTLogServer_4_6.actor.cpp" />
<ActorCompiler Include="OldTLogServer_6_0.actor.cpp" />
<ClCompile Include="SkipList.cpp" />
<ActorCompiler Include="WaitFailure.actor.cpp" />
<ActorCompiler Include="tester.actor.cpp" />

View File

@ -18,6 +18,8 @@
* limitations under the License.
*/
#include <boost/lexical_cast.hpp>
#include "flow/ActorCollection.h"
#include "flow/SystemMonitor.h"
#include "flow/TDMetric.actor.h"
@ -184,6 +186,7 @@ ACTOR Future<Void> loadedPonger( FutureStream<LoadedPingRequest> pings ) {
StringRef fileStoragePrefix = LiteralStringRef("storage-");
StringRef fileLogDataPrefix = LiteralStringRef("log-");
StringRef fileVersionedLogDataPrefix = LiteralStringRef("log2-");
StringRef fileLogQueuePrefix = LiteralStringRef("logqueue-");
StringRef tlogQueueExtension = LiteralStringRef("fdq");
@ -220,13 +223,68 @@ std::string filenameFromId( KeyValueStoreType storeType, std::string folder, std
UNREACHABLE();
}
struct DiskStore {
enum COMPONENT { TLogData, Storage };
struct TLogOptions {
TLogOptions() = default;
TLogOptions( TLogVersion v, TLogSpillType s ) : version(v), spillType(s) {}
UID storeID;
std::string filename; // For KVStoreMemory just the base filename to be passed to IDiskQueue
COMPONENT storedComponent;
KeyValueStoreType storeType;
TLogVersion version = TLogVersion::DEFAULT;
TLogSpillType spillType = TLogSpillType::DEFAULT;
static ErrorOr<TLogOptions> FromStringRef( StringRef s ) {
TLogOptions options;
for (StringRef key = s.eat("_"), value = s.eat("_");
s.size() != 0 || key.size();
key = s.eat("_"), value = s.eat("_")) {
if (key.size() != 0 && value.size() == 0) return default_error_or();
if (key == LiteralStringRef("V")) {
ErrorOr<TLogVersion> tLogVersion = TLogVersion::FromStringRef(value);
if (tLogVersion.isError()) return tLogVersion.getError();
options.version = tLogVersion.get();
} else if (key == LiteralStringRef("LS")) {
ErrorOr<TLogSpillType> tLogSpillType = TLogSpillType::FromStringRef(value);
if (tLogSpillType.isError()) return tLogSpillType.getError();
options.spillType = tLogSpillType.get();
} else {
return default_error_or();
}
}
return options;
}
bool operator == ( const TLogOptions& o ) {
return version == o.version && spillType == o.spillType;
}
std::string toPrefix() const {
if (version == TLogVersion::V2) return "";
std::string toReturn =
"V_" + boost::lexical_cast<std::string>(version) +
"_LS_" + boost::lexical_cast<std::string>(spillType);
ASSERT_WE_THINK( FromStringRef( toReturn ).get() == *this );
return toReturn + "-";
}
};
TLogFn tLogFnForOptions( TLogOptions options ) {
auto tLogFn = tLog;
if ( options.version == TLogVersion::V2 && options.spillType == TLogSpillType::VALUE) return oldTLog_6_0::tLog;
if ( options.version == TLogVersion::V2 && options.spillType == TLogSpillType::REFERENCE) ASSERT(false);
if ( options.version == TLogVersion::V3 && options.spillType == TLogSpillType::VALUE ) return oldTLog_6_0::tLog;
if ( options.version == TLogVersion::V3 && options.spillType == TLogSpillType::REFERENCE) return tLog;
ASSERT(false);
return tLogFn;
}
struct DiskStore {
enum COMPONENT { TLogData, Storage, UNSET };
UID storeID = UID();
std::string filename = ""; // For KVStoreMemory just the base filename to be passed to IDiskQueue
COMPONENT storedComponent = UNSET;
KeyValueStoreType storeType = KeyValueStoreType::END;
TLogOptions tLogOptions;
};
std::vector< DiskStore > getDiskStores( std::string folder, std::string suffix, KeyValueStoreType type) {
@ -237,13 +295,32 @@ std::vector< DiskStore > getDiskStores( std::string folder, std::string suffix,
DiskStore store;
store.storeType = type;
StringRef prefix;
if( StringRef( files[idx] ).startsWith( fileStoragePrefix ) ) {
StringRef filename = StringRef( files[idx] );
Standalone<StringRef> prefix;
if( filename.startsWith( fileStoragePrefix ) ) {
store.storedComponent = DiskStore::Storage;
prefix = fileStoragePrefix;
}
else if( StringRef( files[idx] ).startsWith( fileLogDataPrefix ) ) {
else if( filename.startsWith( fileVersionedLogDataPrefix ) ) {
store.storedComponent = DiskStore::TLogData;
// Use the option string that's in the file rather than tLogOptions.toPrefix(),
// because they might be different if a new option was introduced in this version.
StringRef optionsString = filename.removePrefix(fileVersionedLogDataPrefix).eat("-");
TraceEvent("DiskStoreVersioned").detail("Filename", printable(filename));
ErrorOr<TLogOptions> tLogOptions = TLogOptions::FromStringRef(optionsString);
if (tLogOptions.isError()) {
TraceEvent(SevWarn, "DiskStoreMalformedFilename").detail("Filename", printable(filename));
continue;
}
TraceEvent("DiskStoreVersionedSuccess").detail("Filename", printable(filename));
store.tLogOptions = tLogOptions.get();
prefix = filename.substr(0, fileVersionedLogDataPrefix.size() + optionsString.size() + 1);
}
else if( filename.startsWith( fileLogDataPrefix ) ) {
TraceEvent("DiskStoreUnversioned").detail("Filename", printable(filename));
store.storedComponent = DiskStore::TLogData;
store.tLogOptions.version = TLogVersion::V2;
store.tLogOptions.spillType = TLogSpillType::VALUE;
prefix = fileLogDataPrefix;
}
else
@ -540,7 +617,13 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
state WorkerCache<InitializeStorageReply> storageCache;
state Reference<AsyncVar<ServerDBInfo>> dbInfo( new AsyncVar<ServerDBInfo>(ServerDBInfo()) );
state Future<Void> metricsLogger;
state std::map<KeyValueStoreType::StoreType, std::pair<Future<Void>, PromiseStream<InitializeTLogRequest>>> sharedLogs;
// tLogFnForOptions() can return a function that doesn't correspond with the FDB version that the
// TLogVersion represents. This can be done if the newer TLog doesn't support a requested option.
// As (store type, spill type) can map to the same TLogFn across multiple TLogVersions, we need to
// decide if we should collapse them into the same SharedTLog instance as well. The answer
// here is no, so that when running with log_version==3, all files should say V=3.
state std::map<std::tuple<TLogVersion, KeyValueStoreType::StoreType, TLogSpillType>,
std::pair<Future<Void>, PromiseStream<InitializeTLogRequest>>> sharedLogs;
state WorkerInterface interf( locality );
@ -625,9 +708,18 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
f = storageServerRollbackRebooter( f, s.storeType, s.filename, recruited.id(), recruited.locality, dbInfo, folder, &filesClosed, memoryLimit, kv);
errorForwarders.add( forwardError( errors, Role::STORAGE_SERVER, recruited.id(), f ) );
} else if( s.storedComponent == DiskStore::TLogData ) {
std::string logQueueBasename;
const std::string filename = basename(s.filename);
if (StringRef(filename).startsWith(fileLogDataPrefix)) {
logQueueBasename = fileLogQueuePrefix.toString();
} else {
StringRef optionsString = StringRef(filename).removePrefix(fileVersionedLogDataPrefix).eat("-");
logQueueBasename = fileLogQueuePrefix.toString() + optionsString.toString() + "-";
}
ASSERT_WE_THINK( StringRef( parentDirectory(s.filename) ).endsWith( StringRef(folder) ) );
IKeyValueStore* kv = openKVStore( s.storeType, s.filename, s.storeID, memoryLimit, validateDataFiles );
IDiskQueue* queue = openDiskQueue(
joinPath( folder, fileLogQueuePrefix.toString() + s.storeID.toString() + "-"), tlogQueueExtension.toString(), s.storeID, 10*SERVER_KNOBS->TARGET_BYTES_PER_TLOG);
joinPath( folder, logQueueBasename + s.storeID.toString() + "-"), tlogQueueExtension.toString(), s.storeID, 10*SERVER_KNOBS->TARGET_BYTES_PER_TLOG);
filesClosed.add( kv->onClosed() );
filesClosed.add( queue->onClosed() );
@ -637,8 +729,11 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
Promise<Void> oldLog;
Promise<Void> recovery;
auto& logData = sharedLogs[s.storeType];
Future<Void> tl = tLog( kv, queue, dbInfo, locality, !logData.first.isValid() || logData.first.isReady() ? logData.second : PromiseStream<InitializeTLogRequest>(), s.storeID, true, oldLog, recovery );
TLogFn tLogFn = tLogFnForOptions(s.tLogOptions);
auto& logData = sharedLogs[std::make_tuple(s.tLogOptions.version, s.storeType, s.tLogOptions.spillType)];
// FIXME: Shouldn't if logData.first isValid && !isReady, shouldn't we
// be sending a fake InitializeTLogRequest rather than calling tLog() ?
Future<Void> tl = tLogFn( kv, queue, dbInfo, locality, !logData.first.isValid() || logData.first.isReady() ? logData.second : PromiseStream<InitializeTLogRequest>(), s.storeID, true, oldLog, recovery );
recoveries.push_back(recovery.getFuture());
tl = handleIOErrors( tl, kv, s.storeID );
@ -741,7 +836,19 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
req.reply.send(recruited);
}
when( InitializeTLogRequest req = waitNext(interf.tLog.getFuture()) ) {
auto& logData = sharedLogs[req.storeType];
// For now, there's a one-to-one mapping of spill type to TLogVersion.
// With future work, a particular version of the TLog can support multiple
// different spilling strategies, at which point SpillType will need to be
// plumbed down into tLogFn.
if (req.logVersion < TLogVersion::MIN_RECRUITABLE) {
TraceEvent(SevError, "InitializeTLogInvalidLogVersion")
.detail("Version", req.logVersion)
.detail("MinRecruitable", TLogVersion::MIN_RECRUITABLE);
req.reply.sendError(internal_error());
}
TLogOptions tLogOptions(req.logVersion, req.spillType);
TLogFn tLogFn = tLogFnForOptions(tLogOptions);
auto& logData = sharedLogs[std::make_tuple(req.logVersion, req.storeType, req.spillType)];
logData.second.send(req);
if(!logData.first.isValid() || logData.first.isReady()) {
UID logId = g_random->randomUniqueID();
@ -752,13 +859,14 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
//FIXME: start role for every tlog instance, rather that just for the shared actor, also use a different role type for the shared actor
startRole( Role::SHARED_TRANSACTION_LOG, logId, interf.id(), details );
std::string filename = filenameFromId( req.storeType, folder, fileLogDataPrefix.toString(), logId );
const StringRef prefix = req.logVersion > TLogVersion::V2 ? fileVersionedLogDataPrefix : fileLogDataPrefix;
std::string filename = filenameFromId( req.storeType, folder, prefix.toString() + tLogOptions.toPrefix(), logId );
IKeyValueStore* data = openKVStore( req.storeType, filename, logId, memoryLimit );
IDiskQueue* queue = openDiskQueue( joinPath( folder, fileLogQueuePrefix.toString() + logId.toString() + "-" ), tlogQueueExtension.toString(), logId );
IDiskQueue* queue = openDiskQueue( joinPath( folder, fileLogQueuePrefix.toString() + tLogOptions.toPrefix() + logId.toString() + "-" ), tlogQueueExtension.toString(), logId );
filesClosed.add( data->onClosed() );
filesClosed.add( queue->onClosed() );
logData.first = tLog( data, queue, dbInfo, locality, logData.second, logId, false, Promise<Void>(), Promise<Void>() );
logData.first = tLogFn( data, queue, dbInfo, locality, logData.second, logId, false, Promise<Void>(), Promise<Void>() );
logData.first = handleIOErrors( logData.first, data, logId );
logData.first = handleIOErrors( logData.first, queue, logId );
errorForwarders.add( forwardError( errors, Role::SHARED_TRANSACTION_LOG, logId, logData.first ) );
@ -900,8 +1008,28 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
included = fileExists(d.filename + "1.fdq");
}
if(d.storedComponent == DiskStore::COMPONENT::TLogData && included) {
std::string basename = fileLogQueuePrefix.toString() + d.filename.substr(fileLogDataPrefix.size());
included = fileExists(basename + "0.fdq") && fileExists(basename + "1.fdq");
included = false;
// The previous code assumed that d.filename is a filename. But that is not true.
// d.filename is a path. Removing a prefix and adding a new one just makes a broken
// directory name. So fileExists would always return false.
// Weirdly, this doesn't break anything, as tested by taking a clean check of FDB,
// setting included to false always, and then running correctness. So I'm just
// improving the situation by actually marking it as broken.
// FIXME: this whole thing
/*
std::string logDataBasename;
StringRef filename = d.filename;
if (filename.startsWith(fileLogDataPrefix)) {
logDataBasename = fileLogQueuePrefix.toString() + d.filename.substr(fileLogDataPrefix.size());
} else {
StringRef optionsString = filename.removePrefix(fileVersionedLogDataPrefix).eat("-");
logDataBasename = fileLogQueuePrefix.toString() + optionsString.toString() + "-";
}
TraceEvent("DiskStoreRequest").detail("FilenameBasename", logDataBasename);
if (fileExists(logDataBasename + "0.fdq") && fileExists(logDataBasename + "1.fdq")) {
included = true;
}
*/
}
}
if(included) {

View File

@ -27,6 +27,7 @@
// "ssd" is an alias to the preferred type which skews the random distribution toward it but that's okay.
static const char* storeTypes[] = { "ssd", "ssd-1", "ssd-2", "memory" };
static const char* logTypes[] = { "log_engine:=1", "log_engine:=2", "log_spill:=1", "log_spill:=2", "log_version:=2", "log_version:=3" };
static const char* redundancies[] = { "single", "double", "triple" };
std::string generateRegions() {
@ -261,7 +262,7 @@ struct ConfigureDatabaseWorkload : TestWorkload {
if(g_simulator.speedUpSimulation) {
return Void();
}
state int randomChoice = g_random->randomInt(0, 6);
state int randomChoice = g_random->randomInt(0, 7);
if( randomChoice == 0 ) {
double waitDuration = 3.0 * g_random->random01();
//TraceEvent("ConfigureTestWaitAfter").detail("WaitDuration",waitDuration);
@ -344,6 +345,10 @@ struct ConfigureDatabaseWorkload : TestWorkload {
else if ( randomChoice == 5) {
wait(success( changeConfig( cx, storeTypes[g_random->randomInt( 0, sizeof(storeTypes)/sizeof(storeTypes[0]))], true ) ));
}
else if ( randomChoice == 6 ) {
// Some configurations will be invalid, and that's fine.
wait(success( changeConfig( cx, logTypes[g_random->randomInt( 0, sizeof(logTypes)/sizeof(logTypes[0]))], false ) ));
}
else {
ASSERT(false);
}

View File

@ -129,6 +129,9 @@ add_fdb_test(TEST_FILES rare/LargeApiCorrectnessStatus.txt)
add_fdb_test(TEST_FILES rare/RYWDisable.txt)
add_fdb_test(TEST_FILES rare/RandomReadWriteTest.txt)
add_fdb_test(TEST_FILES rare/SwizzledLargeApiCorrectness.txt)
add_fdb_test(
TEST_FILES restarting/ConfigureTestRestart-1.txt
restarting/ConfigureTestRestart-2.txt)
add_fdb_test(
TEST_FILES restarting/CycleTestRestart-1.txt
restarting/CycleTestRestart-2.txt)

View File

@ -0,0 +1,15 @@
testTitle=CloggedConfigureDatabaseTest
testName=ConfigureDatabase
testDuration=30.0
testName=RandomClogging
testDuration=30.0
testName=RandomClogging
testDuration=30.0
scale=0.1
clogginess=2.0
testName=SaveAndKill
restartInfoLocation=simfdb/restartInfo.ini
testDuration=30.0

View File

@ -0,0 +1,12 @@
testTitle=CloggedConfigureDatabaseTest
runSetup=false
testName=ConfigureDatabase
testDuration=300.0
testName=RandomClogging
testDuration=300.0
testName=RandomClogging
testDuration=300.0
scale=0.1
clogginess=2.0

View File

@ -8,4 +8,5 @@ testTitle=CloggedConfigureDatabaseTest
testName=RandomClogging
testDuration=300.0
scale=0.1
clogginess=2.0
clogginess=2.0