finished explicitly versioning everything stored in the database

This commit is contained in:
Evan Tschannen 2020-05-22 17:14:21 -07:00
parent 72ce997d22
commit ced65cd30b
16 changed files with 46 additions and 20 deletions

View File

@ -1910,7 +1910,7 @@ public:
tr->set(backupAgent->config.get(logUidValue).pack(DatabaseBackupAgent::keyConfigLogUid), logUidValue);
tr->set(backupAgent->config.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId), backupUid);
tr->set(backupAgent->states.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId), backupUid); // written to config and states because it's also used by abort
tr->set(backupAgent->config.get(logUidValue).pack(DatabaseBackupAgent::keyConfigBackupRanges), BinaryWriter::toValue(backupRanges, IncludeVersion()));
tr->set(backupAgent->config.get(logUidValue).pack(DatabaseBackupAgent::keyConfigBackupRanges), BinaryWriter::toValue(backupRanges, IncludeVersion(ProtocolVersion::withDRBackupRanges())));
tr->set(backupAgent->states.get(logUidValue).pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_SUBMITTED)));
if (stopWhenDone) {
tr->set(backupAgent->config.get(logUidValue).pack(DatabaseBackupAgent::keyConfigStopWhenDoneKey), StringRef());
@ -1934,7 +1934,7 @@ public:
tr->set(mapPrefix, BinaryWriter::toValue<Version>(readVersion, Unversioned()));
Key taskKey = wait(dbBackup::StartFullBackupTaskFunc::addTask(tr, backupAgent->taskBucket, logUidValue, backupUid,
addPrefix, removePrefix, BinaryWriter::toValue(backupRanges, IncludeVersion()), tagName, TaskCompletionKey::noSignal(), Reference<TaskFuture>(), databasesInSync));
addPrefix, removePrefix, BinaryWriter::toValue(backupRanges, IncludeVersion(ProtocolVersion::withDRBackupRanges())), tagName, TaskCompletionKey::noSignal(), Reference<TaskFuture>(), databasesInSync));
if (lockDB)
wait(lockDatabase(tr, logUid));

View File

@ -918,6 +918,7 @@ struct ClusterControllerPriorityInfo {
ClusterControllerPriorityInfo::FitnessUnknown) {}
ClusterControllerPriorityInfo(uint8_t processClassFitness, bool isExcluded, uint8_t dcFitness) : processClassFitness(processClassFitness), isExcluded(isExcluded), dcFitness(dcFitness) {}
//To change this serialization, ProtocolVersion::ClusterControllerPriorityInfo must be updated, and downgrades need to be considered
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, processClassFitness, isExcluded, dcFitness);

View File

@ -84,7 +84,7 @@ std::map<std::string, std::string> configForToken( std::string const& mode ) {
StatusObject regionObj;
regionObj["regions"] = mv;
out[p+key] = BinaryWriter::toValue(regionObj, IncludeVersion()).toString();
out[p+key] = BinaryWriter::toValue(regionObj, IncludeVersion(ProtocolVersion::withRegionConfiguration())).toString();
}
return out;
@ -171,11 +171,11 @@ std::map<std::string, std::string> configForToken( std::string const& mode ) {
out[p+"log_replicas"] = log_replicas;
out[p+"log_anti_quorum"] = "0";
BinaryWriter policyWriter(IncludeVersion());
BinaryWriter policyWriter(IncludeVersion(ProtocolVersion::withReplicationPolicy()));
serializeReplicationPolicy(policyWriter, storagePolicy);
out[p+"storage_replication_policy"] = policyWriter.toValue().toString();
policyWriter = BinaryWriter(IncludeVersion());
policyWriter = BinaryWriter(IncludeVersion(ProtocolVersion::withReplicationPolicy()));
serializeReplicationPolicy(policyWriter, tLogPolicy);
out[p+"log_replication_policy"] = policyWriter.toValue().toString();
return out;
@ -211,7 +211,7 @@ std::map<std::string, std::string> configForToken( std::string const& mode ) {
if (remoteRedundancySpecified) {
out[p+"remote_log_replicas"] = remote_log_replicas;
BinaryWriter policyWriter(IncludeVersion());
BinaryWriter policyWriter(IncludeVersion(ProtocolVersion::withReplicationPolicy()));
serializeReplicationPolicy(policyWriter, remoteTLogPolicy);
out[p+"remote_log_policy"] = policyWriter.toValue().toString();
return out;
@ -241,7 +241,7 @@ ConfigurationResult::Type buildConfiguration( std::vector<StringRef> const& mode
if(!outConf.count(p + "storage_replication_policy") && outConf.count(p + "storage_replicas")) {
int storageCount = stoi(outConf[p + "storage_replicas"]);
Reference<IReplicationPolicy> storagePolicy = Reference<IReplicationPolicy>(new PolicyAcross(storageCount, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
BinaryWriter policyWriter(IncludeVersion());
BinaryWriter policyWriter(IncludeVersion(ProtocolVersion::withReplicationPolicy()));
serializeReplicationPolicy(policyWriter, storagePolicy);
outConf[p+"storage_replication_policy"] = policyWriter.toValue().toString();
}
@ -249,7 +249,7 @@ ConfigurationResult::Type buildConfiguration( std::vector<StringRef> const& mode
if(!outConf.count(p + "log_replication_policy") && outConf.count(p + "log_replicas")) {
int logCount = stoi(outConf[p + "log_replicas"]);
Reference<IReplicationPolicy> logPolicy = Reference<IReplicationPolicy>(new PolicyAcross(logCount, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())));
BinaryWriter policyWriter(IncludeVersion());
BinaryWriter policyWriter(IncludeVersion(ProtocolVersion::withReplicationPolicy()));
serializeReplicationPolicy(policyWriter, logPolicy);
outConf[p+"log_replication_policy"] = policyWriter.toValue().toString();
}

View File

@ -171,7 +171,7 @@ namespace ThrottleApi {
ASSERT(initialDuration > 0);
TagThrottleValue throttle(tpsRate, expirationTime.present() ? expirationTime.get() : 0, initialDuration);
BinaryWriter wr(IncludeVersion());
BinaryWriter wr(IncludeVersion(ProtocolVersion::withTagThrottleValue()));
wr << throttle;
state Value value = wr.toValue();

View File

@ -136,6 +136,7 @@ struct TagThrottleValue {
static TagThrottleValue fromValue(const ValueRef& value);
//To change this serialization, ProtocolVersion::TagThrottleValue must be updated, and downgrades need to be considered
template<class Ar>
void serialize(Ar& ar) {
serializer(ar, tpsRate, expirationTime, initialDuration);

View File

@ -238,6 +238,7 @@ protected:
template <class Ar>
void serializeReplicationPolicy(Ar& ar, Reference<IReplicationPolicy>& policy) {
//To change this serialization, ProtocolVersion::ReplicationPolicy must be updated, and downgrades need to be considered
if (Ar::isDeserializing) {
StringRef name;
serializer(ar, name);

View File

@ -700,7 +700,7 @@ ACTOR Future<Void> addBackupMutations(ProxyCommitData* self, std::map<Key, Mutat
for (; logRangeMutation != logRangeMutations->end(); ++logRangeMutation)
{
//FIXME: this is re-implementing the serialize function of MutationListRef in order to have a yield
valueWriter = BinaryWriter(IncludeVersion());
valueWriter = BinaryWriter(IncludeVersion(ProtocolVersion::withBackupMutations()));
valueWriter << logRangeMutation->second.totalSize();
state MutationListRef::Blob* blobIter = logRangeMutation->second.blob_begin;

View File

@ -131,7 +131,7 @@ namespace oldTLog_4_6 {
void push( TLogQueueEntryRef const& qe ) {
BinaryWriter wr( Unversioned() ); // outer framing is not versioned
wr << uint32_t(0);
IncludeVersion().write(wr); // payload is versioned
IncludeVersion(ProtocolVersion::withTLogQueueEntryRef()).write(wr); // payload is versioned
wr << qe;
wr << uint8_t(1);
*(uint32_t*)wr.getData() = wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t);

View File

@ -552,7 +552,7 @@ template <class T>
void TLogQueue::push( T const& qe, Reference<LogData> logData ) {
BinaryWriter wr( Unversioned() ); // outer framing is not versioned
wr << uint32_t(0);
IncludeVersion(ProtocolVersion::withTLog62()).write(wr); // payload is versioned
IncludeVersion(ProtocolVersion::withTLogQueueEntryRef()).write(wr); // payload is versioned
wr << qe;
wr << uint8_t(1);
*(uint32_t*)wr.getData() = wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t);

View File

@ -633,7 +633,7 @@ template <class T>
void TLogQueue::push( T const& qe, Reference<LogData> logData ) {
BinaryWriter wr( Unversioned() ); // outer framing is not versioned
wr << uint32_t(0);
IncludeVersion(ProtocolVersion::withTLog62()).write(wr); // payload is versioned
IncludeVersion(ProtocolVersion::withTLogQueueEntryRef()).write(wr); // payload is versioned
wr << qe;
wr << uint8_t(1);
*(uint32_t*)wr.getData() = wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t);

View File

@ -771,7 +771,7 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
if(tagValue.expirationTime == 0 || tagValue.expirationTime > now() + tagValue.initialDuration) {
TEST(true); // Converting tag throttle duration to absolute time
tagValue.expirationTime = now() + tagValue.initialDuration;
BinaryWriter wr(IncludeVersion());
BinaryWriter wr(IncludeVersion(ProtocolVersion::withTagThrottleValue()));
wr << tagValue;
state Value value = wr.toValue();

View File

@ -307,7 +307,7 @@ ACTOR Future<Void> monitorleader(Reference<AsyncVar<RestoreWorkerInterface>> lea
}
} else {
// Workers compete to be the leader
tr.set(restoreLeaderKey, BinaryWriter::toValue(myWorkerInterf, IncludeVersion()));
tr.set(restoreLeaderKey, restoreWorkerInterfaceValue(myWorkerInterf, IncludeVersion(ProtocolVersion::withRestoreWorkerInterfaceValue())));
leaderInterf = myWorkerInterf;
}
wait(tr.commit());

View File

@ -59,6 +59,7 @@ struct TLogQueueEntryRef {
: version(from.version), knownCommittedVersion(from.knownCommittedVersion), id(from.id), messages(a, from.messages) {
}
//To change this serialization, ProtocolVersion::TLogQueueEntryRef must be updated, and downgrades need to be considered
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version, messages, knownCommittedVersion, id);
@ -643,6 +644,20 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
}
};
void TLogQueue::push( T const& qe, Reference<LogData> logData ) {
BinaryWriter wr( Unversioned() ); // outer framing is not versioned
wr << uint32_t(0);
IncludeVersion(ProtocolVersion::withTLogQueueEntryRef()).write(wr); // payload is versioned
wr << qe;
wr << uint8_t(1);
*(uint32_t*)wr.getData() = wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t);
const IDiskQueue::location startloc = queue->getNextPushLocation();
// FIXME: push shouldn't return anything. We should call getNextPushLocation() again.
const IDiskQueue::location endloc = queue->push( wr.toValue() );
//TraceEvent("TLogQueueVersionWritten", dbgid).detail("Size", wr.getLength() - sizeof(uint32_t) - sizeof(uint8_t)).detail("Loc", loc);
logData->versionLocation[qe.version] = std::make_pair(startloc, endloc);
}
template <class T>
void TLogQueue::push( T const& qe, Reference<LogData> logData ) {
BinaryWriter wr( Unversioned() ); // outer framing is not versioned

View File

@ -131,7 +131,7 @@ private:
}
try {
wait( self->cstate.setExclusive( BinaryWriter::toValue(newState, IncludeVersion()) ) );
wait( self->cstate.setExclusive( BinaryWriter::toValue(newState, IncludeVersion(ProtocolVersion::withDBCoreState())) ) );
} catch (Error& e) {
TEST(true); // Master displaced during writeMasterState
throw;
@ -824,7 +824,7 @@ void updateConfigForForcedRecovery(Reference<MasterData> self, vector<Standalone
std::sort(self->configuration.regions.begin(), self->configuration.regions.end(), RegionInfo::sort_by_priority() );
StatusObject regionJSON;
regionJSON["regions"] = self->configuration.getRegionJSON();
regionCommit.mutations.push_back_deep(regionCommit.arena(), MutationRef(MutationRef::SetValue, configKeysPrefix.toString() + "regions", BinaryWriter::toValue(regionJSON, IncludeVersion()).toString()));
regionCommit.mutations.push_back_deep(regionCommit.arena(), MutationRef(MutationRef::SetValue, configKeysPrefix.toString() + "regions", BinaryWriter::toValue(regionJSON, IncludeVersion(ProtocolVersion::withRegionConfiguration())).toString()));
self->configuration.applyMutation( regionCommit.mutations.back() ); //modifying the configuration directly does not change the configuration when it is re-serialized unless we call applyMutation
TraceEvent("ForcedRecoveryConfigChange", self->dbgid)
.setMaxEventLength(11000)

View File

@ -1534,7 +1534,7 @@ ClusterControllerPriorityInfo getCCPriorityInfo(std::string filePath, ProcessCla
ACTOR Future<Void> monitorAndWriteCCPriorityInfo(std::string filePath, Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo) {
loop {
wait(asyncPriorityInfo->onChange());
std::string contents(BinaryWriter::toValue(asyncPriorityInfo->get(), IncludeVersion()).toString());
std::string contents(BinaryWriter::toValue(asyncPriorityInfo->get(), IncludeVersion(ProtocolVersion::withClusterControllerPriorityInfo())).toString());
atomicReplace(filePath, contents, false);
}
}
@ -1552,7 +1552,7 @@ ACTOR Future<UID> createAndLockProcessIdFile(std::string folder) {
Reference<IAsyncFile> _lockFile = wait(IAsyncFileSystem::filesystem()->open(lockFilePath, IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_LOCK | IAsyncFile::OPEN_READWRITE, 0600));
lockFile = _lockFile;
processIDUid = deterministicRandom()->randomUniqueID();
BinaryWriter wr(IncludeVersion());
BinaryWriter wr(IncludeVersion(ProtocolVersion::withProcessID()));
wr << processIDUid;
wait(lockFile.get()->write(wr.getData(), wr.getLength(), 0));
wait(lockFile.get()->sync());

View File

@ -88,7 +88,7 @@ public: // introduced features
PROTOCOL_VERSION_FEATURE(0x0FDB00B061030000LL, TLogVersion);
PROTOCOL_VERSION_FEATURE(0x0FDB00B061070000LL, PseudoLocalities);
PROTOCOL_VERSION_FEATURE(0x0FDB00B061070000LL, ShardedTxsTags);
PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, TLog62);
PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, TLogQueueEntryRef);
PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, GenerationRegVal);
PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, MovableCoordinatedStateV2);
PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, KeyServerValue);
@ -101,6 +101,14 @@ public: // introduced features
PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, BackupStartValue);
PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, LogRangeEncodeValue);
PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, HealthyZoneValue);
PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, DRBackupRanges);
PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, RegionConfiguration);
PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, ReplicationPolicy);
PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, BackupMutations);
PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, ClusterControllerPriorityInfo);
PROTOCOL_VERSION_FEATURE(0x0FDB00B062010001LL, ProcessID);
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, DBCoreState);
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, TagThrottleValue);
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, ServerListValue);
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, StorageCacheValue);
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, RestoreStatusValue);