From 2dc57568cb81e4ec03068a4f2e71efd67c43ef71 Mon Sep 17 00:00:00 2001 From: Alex Miller Date: Tue, 26 Feb 2019 16:47:04 -0800 Subject: [PATCH] Change many things about log_version. * log_version in the database (`/conf/log_version`) is now a hint that gets rounded to the nearest supported version. * fdbcli and FDB enforce that only a valid log_version can be configured to * TLogVersion is persisted in CoreTLogSet (and LogSet and TLogSet) * Some comments here and there * Add an assert on filename length to make sure KV-pairs in filename don't exceed a maximum length. --- fdbclient/DatabaseConfiguration.cpp | 13 ++++++++----- fdbrpc/sim2.actor.cpp | 4 ++++ fdbserver/DBCoreState.h | 12 +++++++++++- fdbserver/LogSystem.h | 1 + fdbserver/LogSystemConfig.h | 9 ++++++++- fdbserver/SimulatedCluster.actor.cpp | 1 + fdbserver/TLogServer.actor.cpp | 2 ++ fdbserver/TagPartitionedLogSystem.actor.cpp | 13 +++++++++++++ fdbserver/worker.actor.cpp | 14 ++++++++------ 9 files changed, 56 insertions(+), 13 deletions(-) diff --git a/fdbclient/DatabaseConfiguration.cpp b/fdbclient/DatabaseConfiguration.cpp index 4449900956..ff7bb5f49d 100644 --- a/fdbclient/DatabaseConfiguration.cpp +++ b/fdbclient/DatabaseConfiguration.cpp @@ -250,7 +250,7 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const { if(!noPolicies) result["log_replication_policy"] = tLogPolicy->info(); } - if ( tLogVersion != TLogVersion::DEFAULT ) { + if ( tLogVersion > TLogVersion::DEFAULT ) { result["log_version"] = (int)tLogVersion; } @@ -266,9 +266,7 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const { result["storage_engine"] = "custom"; } - if ( tLogSpillType != TLogSpillType::DEFAULT ) { - result["log_spill"] = (int)tLogSpillType; - } + result["log_spill"] = (int)tLogSpillType; if( remoteTLogReplicationFactor == 1 ) { result["remote_redundancy_mode"] = "remote_single"; @@ -381,7 +379,12 @@ bool DatabaseConfiguration::setInternal(KeyRef key, ValueRef value) { else if (ck == LiteralStringRef("log_replicas")) parse(&tLogReplicationFactor, value); else if (ck == LiteralStringRef("log_anti_quorum")) parse(&tLogWriteAntiQuorum, value); else if (ck == LiteralStringRef("storage_replicas")) parse(&storageTeamSize, value); - else if (ck == LiteralStringRef("log_version")) { parse((&type), value); tLogVersion = (TLogVersion::Version)type; } + else if (ck == LiteralStringRef("log_version")) { + parse((&type), value); + type = std::max((int)TLogVersion::MIN_RECRUITABLE, type); + type = std::min((int)TLogVersion::MAX_SUPPORTED, type); + tLogVersion = (TLogVersion::Version)type; + } else if (ck == LiteralStringRef("log_engine")) { parse((&type), value); tLogDataStoreType = (KeyValueStoreType::StoreType)type; // TODO: Remove this once Redwood works as a log engine if(tLogDataStoreType == KeyValueStoreType::SSD_REDWOOD_V1) diff --git a/fdbrpc/sim2.actor.cpp b/fdbrpc/sim2.actor.cpp index dbf22307b8..32f81ebf99 100644 --- a/fdbrpc/sim2.actor.cpp +++ b/fdbrpc/sim2.actor.cpp @@ -430,6 +430,10 @@ public: g_simulator.connectionFailuresDisableDuration = 1e6; } + // Filesystems on average these days seem to start to have limits of around 255 characters for a + // filename. We add ".part" below, so we need to stay under 250. + ASSERT( basename(filename).size() < 250 ); + wait( g_simulator.onMachine( currentProcess ) ); try { wait( delay(FLOW_KNOBS->MIN_OPEN_TIME + g_random->random01() * (FLOW_KNOBS->MAX_OPEN_TIME - FLOW_KNOBS->MIN_OPEN_TIME) ) ); diff --git a/fdbserver/DBCoreState.h b/fdbserver/DBCoreState.h index 56e8503ba6..3f727f5e2f 100644 --- a/fdbserver/DBCoreState.h +++ b/fdbserver/DBCoreState.h @@ -22,7 +22,9 @@ #define FDBSERVER_DBCORESTATE_H #pragma once +#include "fdbclient/FDBTypes.h" #include "fdbrpc/ReplicationPolicy.h" +#include "fdbserver/MasterInterface.h" // This structure is stored persistently in CoordinatedState and must be versioned carefully! // It records a synchronous replication topology which can be used in the absence of faults (or under a limited @@ -44,6 +46,7 @@ struct CoreTLogSet { int8_t locality; Version startVersion; std::vector> satelliteTagLocations; + TLogVersion tLogVersion; CoreTLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityUpgraded), startVersion(invalidVersion) {} @@ -54,7 +57,12 @@ struct CoreTLogSet { template void serialize(Archive& ar) { - serializer(ar, tLogs, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations); + if (ar.isDeserializing && ar.protocolVersion() < 0x0FDB00B061030001LL) { + serializer(ar, tLogs, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations); + tLogVersion = TLogVersion::V2; + } else { + serializer(ar, tLogs, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations, tLogVersion); + } } }; @@ -77,6 +85,7 @@ struct OldTLogCoreData { else if(ar.isDeserializing) { tLogs.push_back(CoreTLogSet()); serializer(ar, tLogs[0].tLogs, tLogs[0].tLogWriteAntiQuorum, tLogs[0].tLogReplicationFactor, tLogs[0].tLogPolicy, epochEnd, tLogs[0].tLogLocalities); + tLogs[0].tLogVersion = TLogVersion::V2; } } }; @@ -126,6 +135,7 @@ struct DBCoreState { } else if(ar.isDeserializing) { tLogs.push_back(CoreTLogSet()); serializer(ar, tLogs[0].tLogs, tLogs[0].tLogWriteAntiQuorum, recoveryCount, tLogs[0].tLogReplicationFactor, logSystemType); + tLogs[0].tLogVersion = TLogVersion::V2; uint64_t tLocalitySize = (uint64_t)tLogs[0].tLogLocalities.size(); serializer(ar, oldTLogData, tLogs[0].tLogPolicy, tLocalitySize); diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index c086dacfd5..60642b4265 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -39,6 +39,7 @@ public: int32_t tLogWriteAntiQuorum; int32_t tLogReplicationFactor; std::vector< LocalityData > tLogLocalities; // Stores the localities of the log servers + TLogVersion tLogVersion; IRepPolicyRef tLogPolicy; LocalitySetRef logServerSet; std::vector logIndexArray; diff --git a/fdbserver/LogSystemConfig.h b/fdbserver/LogSystemConfig.h index 1839c59dde..95b07debb1 100644 --- a/fdbserver/LogSystemConfig.h +++ b/fdbserver/LogSystemConfig.h @@ -60,6 +60,7 @@ struct TLogSet { std::vector> logRouters; int32_t tLogWriteAntiQuorum, tLogReplicationFactor; std::vector< LocalityData > tLogLocalities; // Stores the localities of the log servers + TLogVersion tLogVersion; IRepPolicyRef tLogPolicy; bool isLocal; int8_t locality; @@ -111,7 +112,13 @@ struct TLogSet { template void serialize( Ar& ar ) { - serializer(ar, tLogs, logRouters, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations); + if (ar.isDeserializing && ar.protocolVersion() < 0x0FDB00B061030001LL) { + serializer(ar, tLogs, logRouters, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations); + tLogVersion = TLogVersion::V2; + } else { + serializer(ar, tLogs, logRouters, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations, tLogVersion); + } + ASSERT(tLogPolicy.getPtr() == nullptr || tLogVersion != TLogVersion::UNSET); } }; diff --git a/fdbserver/SimulatedCluster.actor.cpp b/fdbserver/SimulatedCluster.actor.cpp index 103e8c7c15..ca12a4e422 100644 --- a/fdbserver/SimulatedCluster.actor.cpp +++ b/fdbserver/SimulatedCluster.actor.cpp @@ -822,6 +822,7 @@ void SimulationConfig::generateNormalConfig(int minimumReplication, int minimumR ASSERT(false); // Programmer forgot to adjust cases. } + // FIXME: test not setting log_spill. if (g_random->random01() < 0.5) { if (g_random->random01() < 0.5) { set_config("log_version:=2"); // 6.0 diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 6501d15d06..a9a8cc412b 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -181,6 +181,8 @@ private: ////// Persistence format (for self->persistentData) // Immutable keys +// persistFormat has been mostly invalidated by TLogVersion, and can probably be removed when +// 4.6's TLog code is removed. static const KeyValueRef persistFormat( LiteralStringRef( "Format" ), LiteralStringRef("FoundationDB/LogServer/2/4") ); static const KeyRangeRef persistFormatReadableRange( LiteralStringRef("FoundationDB/LogServer/2/3"), LiteralStringRef("FoundationDB/LogServer/2/5") ); static const KeyRangeRef persistRecoveryCountKeys = KeyRangeRef( LiteralStringRef( "DbRecoveryCount/" ), LiteralStringRef( "DbRecoveryCount0" ) ); diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 04136e70fe..6428a39de3 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -145,6 +145,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogRouters.push_back( Reference>>( new AsyncVar>( log ) ) ); } + logSet->tLogVersion = tLogSet.tLogVersion; logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum; logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor; logSet->tLogPolicy = tLogSet.tLogPolicy; @@ -171,6 +172,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogRouters.push_back( Reference>>( new AsyncVar>( log ) ) ); } + logSet->tLogVersion = tLogData.tLogVersion; logSet->tLogWriteAntiQuorum = tLogData.tLogWriteAntiQuorum; logSet->tLogReplicationFactor = tLogData.tLogReplicationFactor; logSet->tLogPolicy = tLogData.tLogPolicy; @@ -207,6 +209,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogRouters.push_back( Reference>>( new AsyncVar>( log ) ) ); } + logSet->tLogVersion = tLogSet.tLogVersion; logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum; logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor; logSet->tLogPolicy = tLogSet.tLogPolicy; @@ -234,6 +237,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogRouters.push_back( Reference>>( new AsyncVar>( log ) ) ); } + logSet->tLogVersion = tLogSet.tLogVersion; logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum; logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor; logSet->tLogPolicy = tLogSet.tLogPolicy; @@ -271,6 +275,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedget().id()); coreSet.tLogLocalities.push_back(log->get().interf().locality); } + coreSet.tLogVersion = t->tLogVersion; coreSet.tLogWriteAntiQuorum = t->tLogWriteAntiQuorum; coreSet.tLogReplicationFactor = t->tLogReplicationFactor; coreSet.tLogPolicy = t->tLogPolicy; @@ -293,6 +298,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedget().id()); } coreSet.tLogLocalities = t->tLogLocalities; + coreSet.tLogVersion = t->tLogVersion; coreSet.tLogWriteAntiQuorum = t->tLogWriteAntiQuorum; coreSet.tLogReplicationFactor = t->tLogReplicationFactor; coreSet.tLogPolicy = t->tLogPolicy; @@ -1032,6 +1038,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedisLocal || remoteLogsWrittenToCoreState) { logSystemConfig.tLogs.push_back(TLogSet()); TLogSet& log = logSystemConfig.tLogs.back(); + log.tLogVersion = logSet->tLogVersion; log.tLogWriteAntiQuorum = logSet->tLogWriteAntiQuorum; log.tLogReplicationFactor = logSet->tLogReplicationFactor; log.tLogPolicy = logSet->tLogPolicy; @@ -1059,6 +1066,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted logSet = oldLogData[i].tLogs[j]; + log.tLogVersion = logSet->tLogVersion; log.tLogWriteAntiQuorum = logSet->tLogWriteAntiQuorum; log.tLogReplicationFactor = logSet->tLogReplicationFactor; log.tLogPolicy = logSet->tLogPolicy; @@ -1374,6 +1382,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted>( new AsyncVar() ) ); failureTrackers.push_back( monitorLog(logVar, failed[j] ) ); } + logSet->tLogVersion = coreSet.tLogVersion; logSet->tLogReplicationFactor = coreSet.tLogReplicationFactor; logSet->tLogWriteAntiQuorum = coreSet.tLogWriteAntiQuorum; logSet->tLogPolicy = coreSet.tLogPolicy; @@ -1400,6 +1409,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedlogServers.push_back( logVar ); allLogServers.push_back( logVar ); } + logSet->tLogVersion = log.tLogVersion; logSet->tLogReplicationFactor = log.tLogReplicationFactor; logSet->tLogWriteAntiQuorum = log.tLogWriteAntiQuorum; logSet->tLogPolicy = log.tLogPolicy; @@ -1719,6 +1729,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted logSet = Reference( new LogSet() ); logSet->tLogReplicationFactor = configuration.getRemoteTLogReplicationFactor(); + logSet->tLogVersion = configuration.tLogVersion; logSet->tLogPolicy = configuration.getRemoteTLogPolicy(); logSet->isLocal = false; logSet->locality = remoteLocality; @@ -1850,6 +1861,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogs.push_back( Reference( new LogSet() ) ); + logSystem->tLogs[0]->tLogVersion = configuration.tLogVersion; logSystem->tLogs[0]->tLogWriteAntiQuorum = configuration.tLogWriteAntiQuorum; logSystem->tLogs[0]->tLogReplicationFactor = configuration.tLogReplicationFactor; logSystem->tLogs[0]->tLogPolicy = configuration.tLogPolicy; @@ -1871,6 +1883,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCountedtLogs[1]->isLocal = true; logSystem->tLogs[1]->locality = tagLocalitySatellite; + logSystem->tLogs[1]->tLogVersion = configuration.tLogVersion; logSystem->tLogs[1]->startVersion = oldLogSystem->knownCommittedVersion + 1; logSystem->tLogs[1]->tLogLocalities.resize( recr.satelliteTLogs.size() ); diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 3edd679e8f..eb997b46b9 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -232,10 +232,10 @@ struct TLogOptions { static ErrorOr FromStringRef( StringRef s ) { TLogOptions options; - for (StringRef kvpair = s.eat("_"); kvpair.size() != 0; kvpair = s.eat("_") ) { - StringRef key = kvpair.eat("="); - StringRef value = kvpair.eat("="); - if (kvpair.size() != 0) return default_error_or(); + for (StringRef key = s.eat("_"), value = s.eat("_"); + s.size() != 0 || key.size(); + key = s.eat("_"), value = s.eat("_")) { + if (key.size() != 0 && value.size() == 0) return default_error_or(); if (key == LiteralStringRef("V")) { ErrorOr tLogVersion = TLogVersion::FromStringRef(value); @@ -260,8 +260,8 @@ struct TLogOptions { if (version == TLogVersion::V2) return ""; std::string toReturn = - "V=" + boost::lexical_cast(version) + - "_LS=" + boost::lexical_cast(spillType); + "V_" + boost::lexical_cast(version) + + "_LS_" + boost::lexical_cast(spillType); ASSERT_WE_THINK( FromStringRef( toReturn ).get() == *this ); return toReturn + "-"; } @@ -303,6 +303,8 @@ std::vector< DiskStore > getDiskStores( std::string folder, std::string suffix, } else if( filename.startsWith( fileVersionedLogDataPrefix ) ) { store.storedComponent = DiskStore::TLogData; + // Use the option string that's in the file rather than tLogOptions.toPrefix(), + // because they might be different if a new option was introduced in this version. StringRef optionsString = filename.removePrefix(fileVersionedLogDataPrefix).eat("-"); TraceEvent("DiskStoreVersioned").detail("Filename", printable(filename)); ErrorOr tLogOptions = TLogOptions::FromStringRef(optionsString);