From ced65cd30b37eb4525c51ece6503e623b8187caa Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Fri, 22 May 2020 17:14:21 -0700 Subject: [PATCH] finished explicitly versioning everything stored in the database --- fdbclient/DatabaseBackupAgent.actor.cpp | 4 ++-- fdbclient/FDBTypes.h | 1 + fdbclient/ManagementAPI.actor.cpp | 12 ++++++------ fdbclient/TagThrottle.actor.cpp | 2 +- fdbclient/TagThrottle.h | 1 + fdbrpc/ReplicationPolicy.h | 1 + fdbserver/MasterProxyServer.actor.cpp | 2 +- fdbserver/OldTLogServer_4_6.actor.cpp | 2 +- fdbserver/OldTLogServer_6_0.actor.cpp | 2 +- fdbserver/OldTLogServer_6_2.actor.cpp | 2 +- fdbserver/Ratekeeper.actor.cpp | 2 +- fdbserver/RestoreWorker.actor.cpp | 2 +- fdbserver/TLogServer.actor.cpp | 15 +++++++++++++++ fdbserver/masterserver.actor.cpp | 4 ++-- fdbserver/worker.actor.cpp | 4 ++-- flow/ProtocolVersion.h | 10 +++++++++- 16 files changed, 46 insertions(+), 20 deletions(-) diff --git a/fdbclient/DatabaseBackupAgent.actor.cpp b/fdbclient/DatabaseBackupAgent.actor.cpp index 1da07379e7..231ff60a20 100644 --- a/fdbclient/DatabaseBackupAgent.actor.cpp +++ b/fdbclient/DatabaseBackupAgent.actor.cpp @@ -1910,7 +1910,7 @@ public: tr->set(backupAgent->config.get(logUidValue).pack(DatabaseBackupAgent::keyConfigLogUid), logUidValue); tr->set(backupAgent->config.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId), backupUid); tr->set(backupAgent->states.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId), backupUid); // written to config and states because it's also used by abort - tr->set(backupAgent->config.get(logUidValue).pack(DatabaseBackupAgent::keyConfigBackupRanges), BinaryWriter::toValue(backupRanges, IncludeVersion())); + tr->set(backupAgent->config.get(logUidValue).pack(DatabaseBackupAgent::keyConfigBackupRanges), BinaryWriter::toValue(backupRanges, IncludeVersion(ProtocolVersion::withDRBackupRanges()))); tr->set(backupAgent->states.get(logUidValue).pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_SUBMITTED))); if (stopWhenDone) { tr->set(backupAgent->config.get(logUidValue).pack(DatabaseBackupAgent::keyConfigStopWhenDoneKey), StringRef()); @@ -1934,7 +1934,7 @@ public: tr->set(mapPrefix, BinaryWriter::toValue(readVersion, Unversioned())); Key taskKey = wait(dbBackup::StartFullBackupTaskFunc::addTask(tr, backupAgent->taskBucket, logUidValue, backupUid, - addPrefix, removePrefix, BinaryWriter::toValue(backupRanges, IncludeVersion()), tagName, TaskCompletionKey::noSignal(), Reference(), databasesInSync)); + addPrefix, removePrefix, BinaryWriter::toValue(backupRanges, IncludeVersion(ProtocolVersion::withDRBackupRanges())), tagName, TaskCompletionKey::noSignal(), Reference(), databasesInSync)); if (lockDB) wait(lockDatabase(tr, logUid)); diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index 67d281583d..c6aff2a804 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -918,6 +918,7 @@ struct ClusterControllerPriorityInfo { ClusterControllerPriorityInfo::FitnessUnknown) {} ClusterControllerPriorityInfo(uint8_t processClassFitness, bool isExcluded, uint8_t dcFitness) : processClassFitness(processClassFitness), isExcluded(isExcluded), dcFitness(dcFitness) {} + //To change this serialization, ProtocolVersion::ClusterControllerPriorityInfo must be updated, and downgrades need to be considered template void serialize(Ar& ar) { serializer(ar, processClassFitness, isExcluded, dcFitness); diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index 06aff74ff2..e10edf1121 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -84,7 +84,7 @@ std::map configForToken( std::string const& mode ) { StatusObject regionObj; regionObj["regions"] = mv; - out[p+key] = BinaryWriter::toValue(regionObj, IncludeVersion()).toString(); + out[p+key] = BinaryWriter::toValue(regionObj, IncludeVersion(ProtocolVersion::withRegionConfiguration())).toString(); } return out; @@ -171,11 +171,11 @@ std::map configForToken( std::string const& mode ) { out[p+"log_replicas"] = log_replicas; out[p+"log_anti_quorum"] = "0"; - BinaryWriter policyWriter(IncludeVersion()); + BinaryWriter policyWriter(IncludeVersion(ProtocolVersion::withReplicationPolicy())); serializeReplicationPolicy(policyWriter, storagePolicy); out[p+"storage_replication_policy"] = policyWriter.toValue().toString(); - policyWriter = BinaryWriter(IncludeVersion()); + policyWriter = BinaryWriter(IncludeVersion(ProtocolVersion::withReplicationPolicy())); serializeReplicationPolicy(policyWriter, tLogPolicy); out[p+"log_replication_policy"] = policyWriter.toValue().toString(); return out; @@ -211,7 +211,7 @@ std::map configForToken( std::string const& mode ) { if (remoteRedundancySpecified) { out[p+"remote_log_replicas"] = remote_log_replicas; - BinaryWriter policyWriter(IncludeVersion()); + BinaryWriter policyWriter(IncludeVersion(ProtocolVersion::withReplicationPolicy())); serializeReplicationPolicy(policyWriter, remoteTLogPolicy); out[p+"remote_log_policy"] = policyWriter.toValue().toString(); return out; @@ -241,7 +241,7 @@ ConfigurationResult::Type buildConfiguration( std::vector const& mode if(!outConf.count(p + "storage_replication_policy") && outConf.count(p + "storage_replicas")) { int storageCount = stoi(outConf[p + "storage_replicas"]); Reference storagePolicy = Reference(new PolicyAcross(storageCount, "zoneid", Reference(new PolicyOne()))); - BinaryWriter policyWriter(IncludeVersion()); + BinaryWriter policyWriter(IncludeVersion(ProtocolVersion::withReplicationPolicy())); serializeReplicationPolicy(policyWriter, storagePolicy); outConf[p+"storage_replication_policy"] = policyWriter.toValue().toString(); } @@ -249,7 +249,7 @@ ConfigurationResult::Type buildConfiguration( std::vector const& mode if(!outConf.count(p + "log_replication_policy") && outConf.count(p + "log_replicas")) { int logCount = stoi(outConf[p + "log_replicas"]); Reference logPolicy = Reference(new PolicyAcross(logCount, "zoneid", Reference(new PolicyOne()))); - BinaryWriter policyWriter(IncludeVersion()); + BinaryWriter policyWriter(IncludeVersion(ProtocolVersion::withReplicationPolicy())); serializeReplicationPolicy(policyWriter, logPolicy); outConf[p+"log_replication_policy"] = policyWriter.toValue().toString(); } diff --git a/fdbclient/TagThrottle.actor.cpp b/fdbclient/TagThrottle.actor.cpp index 40de79d325..1bffce0e6a 100644 --- a/fdbclient/TagThrottle.actor.cpp +++ b/fdbclient/TagThrottle.actor.cpp @@ -171,7 +171,7 @@ namespace ThrottleApi { ASSERT(initialDuration > 0); TagThrottleValue throttle(tpsRate, expirationTime.present() ? expirationTime.get() : 0, initialDuration); - BinaryWriter wr(IncludeVersion()); + BinaryWriter wr(IncludeVersion(ProtocolVersion::withTagThrottleValue())); wr << throttle; state Value value = wr.toValue(); diff --git a/fdbclient/TagThrottle.h b/fdbclient/TagThrottle.h index a79c962fb1..6403f22692 100644 --- a/fdbclient/TagThrottle.h +++ b/fdbclient/TagThrottle.h @@ -136,6 +136,7 @@ struct TagThrottleValue { static TagThrottleValue fromValue(const ValueRef& value); + //To change this serialization, ProtocolVersion::TagThrottleValue must be updated, and downgrades need to be considered template void serialize(Ar& ar) { serializer(ar, tpsRate, expirationTime, initialDuration); diff --git a/fdbrpc/ReplicationPolicy.h b/fdbrpc/ReplicationPolicy.h index 0f533e43d1..c2027b26a0 100644 --- a/fdbrpc/ReplicationPolicy.h +++ b/fdbrpc/ReplicationPolicy.h @@ -238,6 +238,7 @@ protected: template void serializeReplicationPolicy(Ar& ar, Reference& policy) { + //To change this serialization, ProtocolVersion::ReplicationPolicy must be updated, and downgrades need to be considered if (Ar::isDeserializing) { StringRef name; serializer(ar, name); diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index 2a485f781c..7d50d077e6 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -700,7 +700,7 @@ ACTOR Future addBackupMutations(ProxyCommitData* self, std::mapend(); ++logRangeMutation) { //FIXME: this is re-implementing the serialize function of MutationListRef in order to have a yield - valueWriter = BinaryWriter(IncludeVersion()); + valueWriter = BinaryWriter(IncludeVersion(ProtocolVersion::withBackupMutations())); valueWriter << logRangeMutation->second.totalSize(); state MutationListRef::Blob* blobIter = logRangeMutation->second.blob_begin; diff --git a/fdbserver/OldTLogServer_4_6.actor.cpp b/fdbserver/OldTLogServer_4_6.actor.cpp index 257623a598..c0ae249076 100644 --- a/fdbserver/OldTLogServer_4_6.actor.cpp +++ b/fdbserver/OldTLogServer_4_6.actor.cpp @@ -131,7 +131,7 @@ namespace oldTLog_4_6 { void push( TLogQueueEntryRef const& qe ) { BinaryWriter wr( Unversioned() ); // outer framing is not versioned wr << uint32_t(0); - IncludeVersion().write(wr); // payload is versioned + IncludeVersion(ProtocolVersion::withTLogQueueEntryRef()).write(wr); // payload is versioned wr << qe; wr << uint8_t(1); *(uint32_t*)wr.getData() = wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t); diff --git a/fdbserver/OldTLogServer_6_0.actor.cpp b/fdbserver/OldTLogServer_6_0.actor.cpp index 05faf5e45a..d87daaee79 100644 --- a/fdbserver/OldTLogServer_6_0.actor.cpp +++ b/fdbserver/OldTLogServer_6_0.actor.cpp @@ -552,7 +552,7 @@ template void TLogQueue::push( T const& qe, Reference logData ) { BinaryWriter wr( Unversioned() ); // outer framing is not versioned wr << uint32_t(0); - IncludeVersion(ProtocolVersion::withTLog62()).write(wr); // payload is versioned + IncludeVersion(ProtocolVersion::withTLogQueueEntryRef()).write(wr); // payload is versioned wr << qe; wr << uint8_t(1); *(uint32_t*)wr.getData() = wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t); diff --git a/fdbserver/OldTLogServer_6_2.actor.cpp b/fdbserver/OldTLogServer_6_2.actor.cpp index d94c5d491c..5925010e78 100644 --- a/fdbserver/OldTLogServer_6_2.actor.cpp +++ b/fdbserver/OldTLogServer_6_2.actor.cpp @@ -633,7 +633,7 @@ template void TLogQueue::push( T const& qe, Reference logData ) { BinaryWriter wr( Unversioned() ); // outer framing is not versioned wr << uint32_t(0); - IncludeVersion(ProtocolVersion::withTLog62()).write(wr); // payload is versioned + IncludeVersion(ProtocolVersion::withTLogQueueEntryRef()).write(wr); // payload is versioned wr << qe; wr << uint8_t(1); *(uint32_t*)wr.getData() = wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t); diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index b65fe21f52..02433764f1 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -771,7 +771,7 @@ ACTOR Future monitorThrottlingChanges(RatekeeperData *self) { if(tagValue.expirationTime == 0 || tagValue.expirationTime > now() + tagValue.initialDuration) { TEST(true); // Converting tag throttle duration to absolute time tagValue.expirationTime = now() + tagValue.initialDuration; - BinaryWriter wr(IncludeVersion()); + BinaryWriter wr(IncludeVersion(ProtocolVersion::withTagThrottleValue())); wr << tagValue; state Value value = wr.toValue(); diff --git a/fdbserver/RestoreWorker.actor.cpp b/fdbserver/RestoreWorker.actor.cpp index 1896c8c6b5..e1d223f4fb 100644 --- a/fdbserver/RestoreWorker.actor.cpp +++ b/fdbserver/RestoreWorker.actor.cpp @@ -307,7 +307,7 @@ ACTOR Future monitorleader(Reference> lea } } else { // Workers compete to be the leader - tr.set(restoreLeaderKey, BinaryWriter::toValue(myWorkerInterf, IncludeVersion())); + tr.set(restoreLeaderKey, restoreWorkerInterfaceValue(myWorkerInterf, IncludeVersion(ProtocolVersion::withRestoreWorkerInterfaceValue()))); leaderInterf = myWorkerInterf; } wait(tr.commit()); diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index bfcdae058d..353c598c83 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -59,6 +59,7 @@ struct TLogQueueEntryRef { : version(from.version), knownCommittedVersion(from.knownCommittedVersion), id(from.id), messages(a, from.messages) { } + //To change this serialization, ProtocolVersion::TLogQueueEntryRef must be updated, and downgrades need to be considered template void serialize(Ar& ar) { serializer(ar, version, messages, knownCommittedVersion, id); @@ -643,6 +644,20 @@ struct LogData : NonCopyable, public ReferenceCounted { } }; +void TLogQueue::push( T const& qe, Reference logData ) { + BinaryWriter wr( Unversioned() ); // outer framing is not versioned + wr << uint32_t(0); + IncludeVersion(ProtocolVersion::withTLogQueueEntryRef()).write(wr); // payload is versioned + wr << qe; + wr << uint8_t(1); + *(uint32_t*)wr.getData() = wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t); + const IDiskQueue::location startloc = queue->getNextPushLocation(); + // FIXME: push shouldn't return anything. We should call getNextPushLocation() again. + const IDiskQueue::location endloc = queue->push( wr.toValue() ); + //TraceEvent("TLogQueueVersionWritten", dbgid).detail("Size", wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t)).detail("Loc", loc); + logData->versionLocation[qe.version] = std::make_pair(startloc, endloc); +} + template void TLogQueue::push( T const& qe, Reference logData ) { BinaryWriter wr( Unversioned() ); // outer framing is not versioned diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index 6693d63dcd..0ac492a0b9 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -131,7 +131,7 @@ private: } try { - wait( self->cstate.setExclusive( BinaryWriter::toValue(newState, IncludeVersion()) ) ); + wait( self->cstate.setExclusive( BinaryWriter::toValue(newState, IncludeVersion(ProtocolVersion::withDBCoreState())) ) ); } catch (Error& e) { TEST(true); // Master displaced during writeMasterState throw; @@ -824,7 +824,7 @@ void updateConfigForForcedRecovery(Reference self, vectorconfiguration.regions.begin(), self->configuration.regions.end(), RegionInfo::sort_by_priority() ); StatusObject regionJSON; regionJSON["regions"] = self->configuration.getRegionJSON(); - regionCommit.mutations.push_back_deep(regionCommit.arena(), MutationRef(MutationRef::SetValue, configKeysPrefix.toString() + "regions", BinaryWriter::toValue(regionJSON, IncludeVersion()).toString())); + regionCommit.mutations.push_back_deep(regionCommit.arena(), MutationRef(MutationRef::SetValue, configKeysPrefix.toString() + "regions", BinaryWriter::toValue(regionJSON, IncludeVersion(ProtocolVersion::withRegionConfiguration())).toString())); self->configuration.applyMutation( regionCommit.mutations.back() ); //modifying the configuration directly does not change the configuration when it is re-serialized unless we call applyMutation TraceEvent("ForcedRecoveryConfigChange", self->dbgid) .setMaxEventLength(11000) diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 508f041879..f396a47d7a 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -1534,7 +1534,7 @@ ClusterControllerPriorityInfo getCCPriorityInfo(std::string filePath, ProcessCla ACTOR Future monitorAndWriteCCPriorityInfo(std::string filePath, Reference> asyncPriorityInfo) { loop { wait(asyncPriorityInfo->onChange()); - std::string contents(BinaryWriter::toValue(asyncPriorityInfo->get(), IncludeVersion()).toString()); + std::string contents(BinaryWriter::toValue(asyncPriorityInfo->get(), IncludeVersion(ProtocolVersion::withClusterControllerPriorityInfo())).toString()); atomicReplace(filePath, contents, false); } } @@ -1552,7 +1552,7 @@ ACTOR Future createAndLockProcessIdFile(std::string folder) { Reference _lockFile = wait(IAsyncFileSystem::filesystem()->open(lockFilePath, IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_LOCK | IAsyncFile::OPEN_READWRITE, 0600)); lockFile = _lockFile; processIDUid = deterministicRandom()->randomUniqueID(); - BinaryWriter wr(IncludeVersion()); + BinaryWriter wr(IncludeVersion(ProtocolVersion::withProcessID())); wr << processIDUid; wait(lockFile.get()->write(wr.getData(), wr.getLength(), 0)); wait(lockFile.get()->sync()); diff --git a/flow/ProtocolVersion.h b/flow/ProtocolVersion.h index 593a87d281..644a47a950 100644 --- a/flow/ProtocolVersion.h +++ b/flow/ProtocolVersion.h @@ -88,7 +88,7 @@ public: // introduced features PROTOCOL_VERSION_FEATURE(0x0FDB00B061030000LL, TLogVersion); PROTOCOL_VERSION_FEATURE(0x0FDB00B061070000LL, PseudoLocalities); PROTOCOL_VERSION_FEATURE(0x0FDB00B061070000LL, ShardedTxsTags); - PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, TLog62); + PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, TLogQueueEntryRef); PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, GenerationRegVal); PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, MovableCoordinatedStateV2); PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, KeyServerValue); @@ -101,6 +101,14 @@ public: // introduced features PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, BackupStartValue); PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, LogRangeEncodeValue); PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, HealthyZoneValue); + PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, DRBackupRanges); + PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, RegionConfiguration); + PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, ReplicationPolicy); + PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, BackupMutations); + PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, ClusterControllerPriorityInfo); + PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, ProcessID); + PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, DBCoreState); + PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, TagThrottleValue); PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, ServerListValue); PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, StorageCacheValue); PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, RestoreStatusValue);