Change many things about log_version.
* log_version in the database (`/conf/log_version`) is now a hint that gets rounded to the nearest supported version. * fdbcli and FDB enforce that only a valid log_version can be configured to * TLogVersion is persisted in CoreTLogSet (and LogSet and TLogSet) * Some comments here and there * Add an assert on filename length to make sure KV-pairs in filename don't exceed a maximum length.
This commit is contained in:
parent
6d23eb2d1a
commit
2dc57568cb
|
@ -250,7 +250,7 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
|
|||
if(!noPolicies) result["log_replication_policy"] = tLogPolicy->info();
|
||||
}
|
||||
|
||||
if ( tLogVersion != TLogVersion::DEFAULT ) {
|
||||
if ( tLogVersion > TLogVersion::DEFAULT ) {
|
||||
result["log_version"] = (int)tLogVersion;
|
||||
}
|
||||
|
||||
|
@ -266,9 +266,7 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
|
|||
result["storage_engine"] = "custom";
|
||||
}
|
||||
|
||||
if ( tLogSpillType != TLogSpillType::DEFAULT ) {
|
||||
result["log_spill"] = (int)tLogSpillType;
|
||||
}
|
||||
result["log_spill"] = (int)tLogSpillType;
|
||||
|
||||
if( remoteTLogReplicationFactor == 1 ) {
|
||||
result["remote_redundancy_mode"] = "remote_single";
|
||||
|
@ -381,7 +379,12 @@ bool DatabaseConfiguration::setInternal(KeyRef key, ValueRef value) {
|
|||
else if (ck == LiteralStringRef("log_replicas")) parse(&tLogReplicationFactor, value);
|
||||
else if (ck == LiteralStringRef("log_anti_quorum")) parse(&tLogWriteAntiQuorum, value);
|
||||
else if (ck == LiteralStringRef("storage_replicas")) parse(&storageTeamSize, value);
|
||||
else if (ck == LiteralStringRef("log_version")) { parse((&type), value); tLogVersion = (TLogVersion::Version)type; }
|
||||
else if (ck == LiteralStringRef("log_version")) {
|
||||
parse((&type), value);
|
||||
type = std::max((int)TLogVersion::MIN_RECRUITABLE, type);
|
||||
type = std::min((int)TLogVersion::MAX_SUPPORTED, type);
|
||||
tLogVersion = (TLogVersion::Version)type;
|
||||
}
|
||||
else if (ck == LiteralStringRef("log_engine")) { parse((&type), value); tLogDataStoreType = (KeyValueStoreType::StoreType)type;
|
||||
// TODO: Remove this once Redwood works as a log engine
|
||||
if(tLogDataStoreType == KeyValueStoreType::SSD_REDWOOD_V1)
|
||||
|
|
|
@ -430,6 +430,10 @@ public:
|
|||
g_simulator.connectionFailuresDisableDuration = 1e6;
|
||||
}
|
||||
|
||||
// Filesystems on average these days seem to start to have limits of around 255 characters for a
|
||||
// filename. We add ".part" below, so we need to stay under 250.
|
||||
ASSERT( basename(filename).size() < 250 );
|
||||
|
||||
wait( g_simulator.onMachine( currentProcess ) );
|
||||
try {
|
||||
wait( delay(FLOW_KNOBS->MIN_OPEN_TIME + g_random->random01() * (FLOW_KNOBS->MAX_OPEN_TIME - FLOW_KNOBS->MIN_OPEN_TIME) ) );
|
||||
|
|
|
@ -22,7 +22,9 @@
|
|||
#define FDBSERVER_DBCORESTATE_H
|
||||
#pragma once
|
||||
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbrpc/ReplicationPolicy.h"
|
||||
#include "fdbserver/MasterInterface.h"
|
||||
|
||||
// This structure is stored persistently in CoordinatedState and must be versioned carefully!
|
||||
// It records a synchronous replication topology which can be used in the absence of faults (or under a limited
|
||||
|
@ -44,6 +46,7 @@ struct CoreTLogSet {
|
|||
int8_t locality;
|
||||
Version startVersion;
|
||||
std::vector<std::vector<int>> satelliteTagLocations;
|
||||
TLogVersion tLogVersion;
|
||||
|
||||
CoreTLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityUpgraded), startVersion(invalidVersion) {}
|
||||
|
||||
|
@ -54,7 +57,12 @@ struct CoreTLogSet {
|
|||
|
||||
template <class Archive>
|
||||
void serialize(Archive& ar) {
|
||||
serializer(ar, tLogs, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations);
|
||||
if (ar.isDeserializing && ar.protocolVersion() < 0x0FDB00B061030001LL) {
|
||||
serializer(ar, tLogs, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations);
|
||||
tLogVersion = TLogVersion::V2;
|
||||
} else {
|
||||
serializer(ar, tLogs, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations, tLogVersion);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -77,6 +85,7 @@ struct OldTLogCoreData {
|
|||
else if(ar.isDeserializing) {
|
||||
tLogs.push_back(CoreTLogSet());
|
||||
serializer(ar, tLogs[0].tLogs, tLogs[0].tLogWriteAntiQuorum, tLogs[0].tLogReplicationFactor, tLogs[0].tLogPolicy, epochEnd, tLogs[0].tLogLocalities);
|
||||
tLogs[0].tLogVersion = TLogVersion::V2;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -126,6 +135,7 @@ struct DBCoreState {
|
|||
} else if(ar.isDeserializing) {
|
||||
tLogs.push_back(CoreTLogSet());
|
||||
serializer(ar, tLogs[0].tLogs, tLogs[0].tLogWriteAntiQuorum, recoveryCount, tLogs[0].tLogReplicationFactor, logSystemType);
|
||||
tLogs[0].tLogVersion = TLogVersion::V2;
|
||||
|
||||
uint64_t tLocalitySize = (uint64_t)tLogs[0].tLogLocalities.size();
|
||||
serializer(ar, oldTLogData, tLogs[0].tLogPolicy, tLocalitySize);
|
||||
|
|
|
@ -39,6 +39,7 @@ public:
|
|||
int32_t tLogWriteAntiQuorum;
|
||||
int32_t tLogReplicationFactor;
|
||||
std::vector< LocalityData > tLogLocalities; // Stores the localities of the log servers
|
||||
TLogVersion tLogVersion;
|
||||
IRepPolicyRef tLogPolicy;
|
||||
LocalitySetRef logServerSet;
|
||||
std::vector<int> logIndexArray;
|
||||
|
|
|
@ -60,6 +60,7 @@ struct TLogSet {
|
|||
std::vector<OptionalInterface<TLogInterface>> logRouters;
|
||||
int32_t tLogWriteAntiQuorum, tLogReplicationFactor;
|
||||
std::vector< LocalityData > tLogLocalities; // Stores the localities of the log servers
|
||||
TLogVersion tLogVersion;
|
||||
IRepPolicyRef tLogPolicy;
|
||||
bool isLocal;
|
||||
int8_t locality;
|
||||
|
@ -111,7 +112,13 @@ struct TLogSet {
|
|||
|
||||
template <class Ar>
|
||||
void serialize( Ar& ar ) {
|
||||
serializer(ar, tLogs, logRouters, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations);
|
||||
if (ar.isDeserializing && ar.protocolVersion() < 0x0FDB00B061030001LL) {
|
||||
serializer(ar, tLogs, logRouters, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations);
|
||||
tLogVersion = TLogVersion::V2;
|
||||
} else {
|
||||
serializer(ar, tLogs, logRouters, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations, tLogVersion);
|
||||
}
|
||||
ASSERT(tLogPolicy.getPtr() == nullptr || tLogVersion != TLogVersion::UNSET);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -822,6 +822,7 @@ void SimulationConfig::generateNormalConfig(int minimumReplication, int minimumR
|
|||
ASSERT(false); // Programmer forgot to adjust cases.
|
||||
}
|
||||
|
||||
// FIXME: test not setting log_spill.
|
||||
if (g_random->random01() < 0.5) {
|
||||
if (g_random->random01() < 0.5) {
|
||||
set_config("log_version:=2"); // 6.0
|
||||
|
|
|
@ -181,6 +181,8 @@ private:
|
|||
////// Persistence format (for self->persistentData)
|
||||
|
||||
// Immutable keys
|
||||
// persistFormat has been mostly invalidated by TLogVersion, and can probably be removed when
|
||||
// 4.6's TLog code is removed.
|
||||
static const KeyValueRef persistFormat( LiteralStringRef( "Format" ), LiteralStringRef("FoundationDB/LogServer/2/4") );
|
||||
static const KeyRangeRef persistFormatReadableRange( LiteralStringRef("FoundationDB/LogServer/2/3"), LiteralStringRef("FoundationDB/LogServer/2/5") );
|
||||
static const KeyRangeRef persistRecoveryCountKeys = KeyRangeRef( LiteralStringRef( "DbRecoveryCount/" ), LiteralStringRef( "DbRecoveryCount0" ) );
|
||||
|
|
|
@ -145,6 +145,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
for( auto& log : tLogSet.logRouters) {
|
||||
logSet->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
|
||||
}
|
||||
logSet->tLogVersion = tLogSet.tLogVersion;
|
||||
logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum;
|
||||
logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor;
|
||||
logSet->tLogPolicy = tLogSet.tLogPolicy;
|
||||
|
@ -171,6 +172,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
for( auto & log : tLogData.logRouters) {
|
||||
logSet->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
|
||||
}
|
||||
logSet->tLogVersion = tLogData.tLogVersion;
|
||||
logSet->tLogWriteAntiQuorum = tLogData.tLogWriteAntiQuorum;
|
||||
logSet->tLogReplicationFactor = tLogData.tLogReplicationFactor;
|
||||
logSet->tLogPolicy = tLogData.tLogPolicy;
|
||||
|
@ -207,6 +209,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
for( auto & log : tLogSet.logRouters) {
|
||||
logSet->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
|
||||
}
|
||||
logSet->tLogVersion = tLogSet.tLogVersion;
|
||||
logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum;
|
||||
logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor;
|
||||
logSet->tLogPolicy = tLogSet.tLogPolicy;
|
||||
|
@ -234,6 +237,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
for( auto & log : tLogSet.logRouters) {
|
||||
logSet->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
|
||||
}
|
||||
logSet->tLogVersion = tLogSet.tLogVersion;
|
||||
logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum;
|
||||
logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor;
|
||||
logSet->tLogPolicy = tLogSet.tLogPolicy;
|
||||
|
@ -271,6 +275,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
coreSet.tLogs.push_back(log->get().id());
|
||||
coreSet.tLogLocalities.push_back(log->get().interf().locality);
|
||||
}
|
||||
coreSet.tLogVersion = t->tLogVersion;
|
||||
coreSet.tLogWriteAntiQuorum = t->tLogWriteAntiQuorum;
|
||||
coreSet.tLogReplicationFactor = t->tLogReplicationFactor;
|
||||
coreSet.tLogPolicy = t->tLogPolicy;
|
||||
|
@ -293,6 +298,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
coreSet.tLogs.push_back(log->get().id());
|
||||
}
|
||||
coreSet.tLogLocalities = t->tLogLocalities;
|
||||
coreSet.tLogVersion = t->tLogVersion;
|
||||
coreSet.tLogWriteAntiQuorum = t->tLogWriteAntiQuorum;
|
||||
coreSet.tLogReplicationFactor = t->tLogReplicationFactor;
|
||||
coreSet.tLogPolicy = t->tLogPolicy;
|
||||
|
@ -1032,6 +1038,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
if(logSet->isLocal || remoteLogsWrittenToCoreState) {
|
||||
logSystemConfig.tLogs.push_back(TLogSet());
|
||||
TLogSet& log = logSystemConfig.tLogs.back();
|
||||
log.tLogVersion = logSet->tLogVersion;
|
||||
log.tLogWriteAntiQuorum = logSet->tLogWriteAntiQuorum;
|
||||
log.tLogReplicationFactor = logSet->tLogReplicationFactor;
|
||||
log.tLogPolicy = logSet->tLogPolicy;
|
||||
|
@ -1059,6 +1066,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
for( int j = 0; j < oldLogData[i].tLogs.size(); j++ ) {
|
||||
TLogSet& log = logSystemConfig.oldTLogs[i].tLogs[j];
|
||||
Reference<LogSet> logSet = oldLogData[i].tLogs[j];
|
||||
log.tLogVersion = logSet->tLogVersion;
|
||||
log.tLogWriteAntiQuorum = logSet->tLogWriteAntiQuorum;
|
||||
log.tLogReplicationFactor = logSet->tLogReplicationFactor;
|
||||
log.tLogPolicy = logSet->tLogPolicy;
|
||||
|
@ -1374,6 +1382,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
failed.push_back( Reference<AsyncVar<bool>>( new AsyncVar<bool>() ) );
|
||||
failureTrackers.push_back( monitorLog(logVar, failed[j] ) );
|
||||
}
|
||||
logSet->tLogVersion = coreSet.tLogVersion;
|
||||
logSet->tLogReplicationFactor = coreSet.tLogReplicationFactor;
|
||||
logSet->tLogWriteAntiQuorum = coreSet.tLogWriteAntiQuorum;
|
||||
logSet->tLogPolicy = coreSet.tLogPolicy;
|
||||
|
@ -1400,6 +1409,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
logSet->logServers.push_back( logVar );
|
||||
allLogServers.push_back( logVar );
|
||||
}
|
||||
logSet->tLogVersion = log.tLogVersion;
|
||||
logSet->tLogReplicationFactor = log.tLogReplicationFactor;
|
||||
logSet->tLogWriteAntiQuorum = log.tLogWriteAntiQuorum;
|
||||
logSet->tLogPolicy = log.tLogPolicy;
|
||||
|
@ -1719,6 +1729,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
|
||||
state Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
|
||||
logSet->tLogReplicationFactor = configuration.getRemoteTLogReplicationFactor();
|
||||
logSet->tLogVersion = configuration.tLogVersion;
|
||||
logSet->tLogPolicy = configuration.getRemoteTLogPolicy();
|
||||
logSet->isLocal = false;
|
||||
logSet->locality = remoteLocality;
|
||||
|
@ -1850,6 +1861,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
}
|
||||
|
||||
logSystem->tLogs.push_back( Reference<LogSet>( new LogSet() ) );
|
||||
logSystem->tLogs[0]->tLogVersion = configuration.tLogVersion;
|
||||
logSystem->tLogs[0]->tLogWriteAntiQuorum = configuration.tLogWriteAntiQuorum;
|
||||
logSystem->tLogs[0]->tLogReplicationFactor = configuration.tLogReplicationFactor;
|
||||
logSystem->tLogs[0]->tLogPolicy = configuration.tLogPolicy;
|
||||
|
@ -1871,6 +1883,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
}
|
||||
logSystem->tLogs[1]->isLocal = true;
|
||||
logSystem->tLogs[1]->locality = tagLocalitySatellite;
|
||||
logSystem->tLogs[1]->tLogVersion = configuration.tLogVersion;
|
||||
logSystem->tLogs[1]->startVersion = oldLogSystem->knownCommittedVersion + 1;
|
||||
|
||||
logSystem->tLogs[1]->tLogLocalities.resize( recr.satelliteTLogs.size() );
|
||||
|
|
|
@ -232,10 +232,10 @@ struct TLogOptions {
|
|||
|
||||
static ErrorOr<TLogOptions> FromStringRef( StringRef s ) {
|
||||
TLogOptions options;
|
||||
for (StringRef kvpair = s.eat("_"); kvpair.size() != 0; kvpair = s.eat("_") ) {
|
||||
StringRef key = kvpair.eat("=");
|
||||
StringRef value = kvpair.eat("=");
|
||||
if (kvpair.size() != 0) return default_error_or();
|
||||
for (StringRef key = s.eat("_"), value = s.eat("_");
|
||||
s.size() != 0 || key.size();
|
||||
key = s.eat("_"), value = s.eat("_")) {
|
||||
if (key.size() != 0 && value.size() == 0) return default_error_or();
|
||||
|
||||
if (key == LiteralStringRef("V")) {
|
||||
ErrorOr<TLogVersion> tLogVersion = TLogVersion::FromStringRef(value);
|
||||
|
@ -260,8 +260,8 @@ struct TLogOptions {
|
|||
if (version == TLogVersion::V2) return "";
|
||||
|
||||
std::string toReturn =
|
||||
"V=" + boost::lexical_cast<std::string>(version) +
|
||||
"_LS=" + boost::lexical_cast<std::string>(spillType);
|
||||
"V_" + boost::lexical_cast<std::string>(version) +
|
||||
"_LS_" + boost::lexical_cast<std::string>(spillType);
|
||||
ASSERT_WE_THINK( FromStringRef( toReturn ).get() == *this );
|
||||
return toReturn + "-";
|
||||
}
|
||||
|
@ -303,6 +303,8 @@ std::vector< DiskStore > getDiskStores( std::string folder, std::string suffix,
|
|||
}
|
||||
else if( filename.startsWith( fileVersionedLogDataPrefix ) ) {
|
||||
store.storedComponent = DiskStore::TLogData;
|
||||
// Use the option string that's in the file rather than tLogOptions.toPrefix(),
|
||||
// because they might be different if a new option was introduced in this version.
|
||||
StringRef optionsString = filename.removePrefix(fileVersionedLogDataPrefix).eat("-");
|
||||
TraceEvent("DiskStoreVersioned").detail("Filename", printable(filename));
|
||||
ErrorOr<TLogOptions> tLogOptions = TLogOptions::FromStringRef(optionsString);
|
||||
|
|
Loading…
Reference in New Issue