Use commit version for global configuration updates

FIXME: There is a memory issue where the underlying data for values set
in the `data` field of GlobalConfig will be freed shortly after being
set.
This commit is contained in:
Lukas Joswiak 2021-02-18 17:44:42 -08:00
parent 9e20b08976
commit 7bb0b3d899
2 changed files with 33 additions and 16 deletions

View File

@ -1436,10 +1436,6 @@ ACTOR Future<Optional<std::string>> globalConfigCommitActor(GlobalConfigImpl* gl
}
}
// TODO: Should probably be using the commit version...
Version readVersion = wait(ryw->getReadVersion());
BinaryWriter wr = BinaryWriter(AssumeVersion(g_network->protocolVersion()));
Arena arena;
VectorRef<MutationRef> mutations;
@ -1464,16 +1460,24 @@ ACTOR Future<Optional<std::string>> globalConfigCommitActor(GlobalConfigImpl* gl
++iter;
}
wr << std::make_pair(readVersion, mutations);
ProtocolVersion protocolVersion = g_network->protocolVersion();
// Record the mutations in this commit into the global configuration history.
Key historyVersionKey = globalConfigHistoryPrefix.withSuffix(std::to_string(readVersion));
tr.set(historyVersionKey, wr.toValue());
BinaryWriter historyKeyWriter(AssumeVersion(protocolVersion));
historyKeyWriter.serializeBytes(globalConfigHistoryPrefix);
Key historyKey = addVersionStampAtEnd(historyKeyWriter.toValue());
ProtocolVersion protocolVersion = g_network->protocolVersion();
BinaryWriter versionWriter = BinaryWriter(AssumeVersion(protocolVersion));
versionWriter << readVersion << protocolVersion;
tr.set(globalConfigVersionKey, versionWriter.toValue());
BinaryWriter historyMutationsWriter(AssumeVersion(protocolVersion));
historyMutationsWriter << mutations;
tr.atomicOp(historyKey, historyMutationsWriter.toValue(), MutationRef::SetVersionstampedKey);
// Write version key to trigger update in cluster controller.
tr.atomicOp(globalConfigVersionKey,
BinaryWriter::toValue(protocolVersion, AssumeVersion(protocolVersion))
.withPrefix(LiteralStringRef("0123456789")) // placeholder for versionstamp
.withSuffix(LiteralStringRef("\x00\x00\x00\x00")),
MutationRef::SetVersionstampedValue);
return Optional<std::string>();

View File

@ -3205,9 +3205,10 @@ ACTOR Future<Void> monitorClientTxnInfoConfigs(ClusterControllerData::DBInfo* db
if (globalConfigVersion.present()) {
BinaryReader versionReader = BinaryReader(globalConfigVersion.get(), AssumeVersion(g_network->protocolVersion()));
Version version;
ProtocolVersion protocolVersion;
versionReader >> version >> protocolVersion;
int64_t commitVersion;
int16_t serializationOrder;
state ProtocolVersion protocolVersion;
versionReader >> commitVersion >> serializationOrder >> protocolVersion;
state Arena arena;
if (protocolVersion == g_network->protocolVersion()) {
@ -3218,9 +3219,21 @@ ACTOR Future<Void> monitorClientTxnInfoConfigs(ClusterControllerData::DBInfo* db
clientInfo.history.clear();
for (const auto& kv : globalConfigHistory) {
BinaryReader rd = BinaryReader(kv.value, AssumeVersion(g_network->protocolVersion()));
Standalone<std::pair<Version, VectorRef<MutationRef>>> data;
rd >> data >> arena;
// Read commit version out of versionstamp at end of key.
BinaryReader versionReader = BinaryReader(kv.key.removePrefix(globalConfigHistoryPrefix), AssumeVersion(protocolVersion));
Version historyCommitVersion;
versionReader >> historyCommitVersion;
historyCommitVersion = bigEndian64(historyCommitVersion);
data.first = historyCommitVersion;
// Read the list of mutations that occurred at this version.
BinaryReader mutationReader = BinaryReader(kv.value, AssumeVersion(protocolVersion));
VectorRef<MutationRef> mutations;
mutationReader >> mutations;
data.second = VectorRef(arena, mutations);
clientInfo.history.push_back(data);
}