Synthesize data on SS based off parameters from new system transaction
This commit is contained in:
parent
5ebe8b0915
commit
2b4b4ae512
|
@ -1341,22 +1341,23 @@ Key uidPrefixKey(KeyRef keyPrefix, UID logUid) {
|
|||
return bw.toValue();
|
||||
}
|
||||
|
||||
std::tuple<Standalone<StringRef>, uint64_t, uint64_t> decodeConstructKeys(ValueRef value) {
|
||||
std::tuple<Standalone<StringRef>, uint64_t, uint64_t, uint64_t> decodeConstructKeys(ValueRef value) {
|
||||
StringRef keyStart;
|
||||
uint64_t keySize, keyCount;
|
||||
uint64_t valSize, keyCount, seed;
|
||||
BinaryReader rd(value, Unversioned());
|
||||
rd >> keyStart;
|
||||
rd >> keySize;
|
||||
rd >> valSize;
|
||||
rd >> keyCount;
|
||||
return std::make_tuple(keyStart, keySize, keyCount);
|
||||
rd >> seed;
|
||||
return std::make_tuple(keyStart, valSize, keyCount, seed);
|
||||
}
|
||||
|
||||
Value encodeConstructValue(StringRef keyStart, uint64_t keySize, uint64_t keyCount, uint64_t seed) {
|
||||
Value encodeConstructValue(StringRef keyStart, uint64_t valSize, uint64_t keyCount, uint64_t seed) {
|
||||
BinaryWriter wr(Unversioned());
|
||||
wr << keyStart;
|
||||
wr << keySize;
|
||||
wr << valSize;
|
||||
wr << keyCount;
|
||||
// wr << seed;
|
||||
wr << seed;
|
||||
return wr.toValue();
|
||||
}
|
||||
|
||||
|
|
|
@ -545,8 +545,8 @@ Key logRangesEncodeValue(KeyRef keyEnd, KeyRef destPath);
|
|||
// the given uid encoded at the end
|
||||
Key uidPrefixKey(KeyRef keyPrefix, UID logUid);
|
||||
|
||||
extern std::tuple<Standalone<StringRef>, uint64_t, uint64_t> decodeConstructKeys(ValueRef value);
|
||||
extern Value encodeConstructValue(StringRef keyStart, uint64_t keySize, uint64_t keyCount, uint64_t seed);
|
||||
extern std::tuple<Standalone<StringRef>, uint64_t, uint64_t, uint64_t> decodeConstructKeys(ValueRef value);
|
||||
extern Value encodeConstructValue(StringRef keyStart, uint64_t valSize, uint64_t keyCount, uint64_t seed);
|
||||
|
||||
/// Apply mutations constant variables
|
||||
|
||||
|
|
|
@ -585,19 +585,17 @@ private:
|
|||
}
|
||||
|
||||
if (m.param1.startsWith(constructDataKey)) {
|
||||
TraceEvent("DANHERE").detail("V", m.param2);
|
||||
// std::string safeLocality = BinaryReader::fromStringRef<std::string>(m.param2, Unversioned());
|
||||
// TraceEvent("DANHERE2").detail("V",safeLocality);
|
||||
|
||||
std::tuple<Standalone<StringRef>, uint64_t, uint64_t> t = decodeConstructKeys(m.param2);
|
||||
std::string s = "\xcf\xdf";
|
||||
// Value v = encodeConstructValue(s, 100, 0x2ff, 123);
|
||||
// TraceEvent("ConstructDataDebug").detail("V", v);
|
||||
std::tuple<Standalone<StringRef>, uint64_t, uint64_t, uint64_t> t = decodeConstructKeys(m.param2);
|
||||
uint64_t second_element = std::get<1>(t), third_element = std::get<2>(t);
|
||||
Standalone<StringRef> first_element = std::get<0>(t);
|
||||
// m.param2 = encodelConstructValue("\xcf\xdf"_sr, second_element, third_element, 0);
|
||||
TraceEvent("DANHERE")
|
||||
TraceEvent("ConstructData")
|
||||
.detail("S", m.param2)
|
||||
.detail("F1", first_element)
|
||||
.detail("F2", second_element)
|
||||
.detail("F3", third_element)
|
||||
.detail("S", m.param2);
|
||||
.detail("F3", third_element);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -10680,7 +10680,7 @@ private:
|
|||
|
||||
TraceEvent(SevDebug, "AddingChangeFeed", data->thisServerID)
|
||||
.detail("FeedID", changeFeedId)
|
||||
.detail("Rangze", changeFeedRange)
|
||||
.detail("Range", changeFeedRange)
|
||||
.detail("EmptyVersion", feed->second->emptyVersion);
|
||||
|
||||
auto rs = data->keyChangeFeed.modify(changeFeedRange);
|
||||
|
@ -10857,34 +10857,39 @@ private:
|
|||
}
|
||||
}
|
||||
} else if (m.param1.substr(1).startsWith(constructDataKey)) {
|
||||
std::tuple<Standalone<StringRef>, uint64_t, uint64_t> t = decodeConstructKeys(m.param2);
|
||||
std::pair<Standalone<StringRef>, Standalone<StringRef>> m;
|
||||
uint64_t second_element, third_element;
|
||||
std::tie(m.first, second_element, third_element) = t;
|
||||
// constructedMutation.param1 = first_element;
|
||||
m.second = "23"_sr; // ;//encodeConstructValue(23);
|
||||
data->constructedData.push_back(m);
|
||||
TraceEvent("DANCONSTRUCTSS")
|
||||
.detail("F1", data->constructedData.front().first)
|
||||
.detail("F2", second_element)
|
||||
.detail("F3", third_element);
|
||||
// int seed=0; // from constructData
|
||||
// for (int key=0; key<numKeys; key++) {
|
||||
// MutationRef constructedMutation;
|
||||
// constructedMutation.type = MutationRef::SetValue;
|
||||
// constructedMutation.param1 = "\xbf/constructData/"_sr;
|
||||
// // append 'key' to param1
|
||||
// for (int v=0; v < sizeValue; v++) {
|
||||
// // append random char to param2
|
||||
// const char *param2_value = "23";
|
||||
// constructedMutation.param2 = param2_value;
|
||||
// }
|
||||
// data->constructedData.push_back(constructedMutation);
|
||||
// TraceEvent("DANCONSTRUCTSS")
|
||||
// .detail("F1", firstKey)
|
||||
// .detail("F2", sizeValue)
|
||||
// .detail("F3", numKeys);
|
||||
// }
|
||||
uint64_t valSize, keyCount, seed;
|
||||
Standalone<StringRef> prefix;
|
||||
std::tie(prefix, valSize, keyCount, seed) = decodeConstructKeys(m.param2);
|
||||
// ASSERT
|
||||
for (auto& r : data->shards.ranges()) {
|
||||
KeyRange keyRange = KeyRange(r.range());
|
||||
if (keyRange.contains(prefix)) {
|
||||
uint8_t keyBuf[prefix.size() + 4];
|
||||
uint8_t* keyPos = prefix.copyTo(keyBuf);
|
||||
uint8_t valBuf[valSize];
|
||||
setThreadLocalDeterministicRandomSeed(seed);
|
||||
uint32_t keyNum = 0;
|
||||
while (++keyNum <= keyCount) {
|
||||
if ((keyNum % 0xff) == 0) {
|
||||
*keyPos++ = 0;
|
||||
}
|
||||
*keyPos = keyNum % 0xff;
|
||||
deterministicRandom()->randomBytes(&valBuf[0], valSize);
|
||||
StringRef key(keyBuf, keyPos - keyBuf + 1);
|
||||
StringRef val(valBuf, valSize);
|
||||
std::pair<Standalone<StringRef>, Standalone<StringRef>> m = { key, val };
|
||||
data->constructedData.push_back(m);
|
||||
TraceEvent("ConstructDataBuilder")
|
||||
.detail("F1", prefix)
|
||||
.detail("F2", valSize)
|
||||
.detail("F3", keyCount)
|
||||
.detail("F4", seed)
|
||||
.detail("K", key)
|
||||
.detail("V", val);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ASSERT(false); // Unknown private mutation
|
||||
}
|
||||
|
@ -11363,20 +11368,6 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
|||
.suppressFor(10.0)
|
||||
.detail("Version", cloneCursor2->version().toString());
|
||||
} else if (ver != invalidVersion) { // This change belongs to a version < minVersion
|
||||
|
||||
if (data->constructedData.size()) {
|
||||
MutationRef constructedMutation;
|
||||
constructedMutation.type = MutationRef::SetValue;
|
||||
constructedMutation.param1 = data->constructedData.front().first;
|
||||
constructedMutation.param2 = data->constructedData.front().second;
|
||||
TraceEvent("DANPULL").detail("T", constructedMutation.param1);
|
||||
updater.applyMutation(data, constructedMutation, encryptedMutation, ver, false);
|
||||
data->constructedData.pop_front();
|
||||
mutationBytes += constructedMutation.totalSize();
|
||||
data->counters.mutationBytes += constructedMutation.totalSize();
|
||||
data->counters.logicalBytesInput += constructedMutation.expectedSize();
|
||||
++data->counters.mutations;
|
||||
}
|
||||
DEBUG_MUTATION("SSPeek", ver, msg, data->thisServerID);
|
||||
if (ver == data->initialClusterVersion) {
|
||||
//TraceEvent("SSPeekMutation", data->thisServerID).log();
|
||||
|
@ -11413,12 +11404,27 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
|||
++data->counters.atomicMutations;
|
||||
break;
|
||||
}
|
||||
while (data->constructedData.size()) {
|
||||
MutationRef constructedMutation;
|
||||
MutationRefAndCipherKeys encryptedMutation;
|
||||
constructedMutation.type = MutationRef::SetValue;
|
||||
constructedMutation.param1 = data->constructedData.front().first;
|
||||
constructedMutation.param2 = data->constructedData.front().second;
|
||||
TraceEvent("ConstructDataApply").detail("T", constructedMutation.param1);
|
||||
updater.applyMutation(data, constructedMutation, encryptedMutation, ver, false);
|
||||
data->constructedData.pop_front();
|
||||
mutationBytes += constructedMutation.totalSize();
|
||||
data->counters.mutationBytes += constructedMutation.totalSize();
|
||||
data->counters.logicalBytesInput += constructedMutation.expectedSize();
|
||||
++data->counters.mutations;
|
||||
}
|
||||
} else
|
||||
TraceEvent(SevError, "DiscardingPeekedData", data->thisServerID)
|
||||
.detail("Mutation", msg)
|
||||
.detail("Version", cloneCursor2->version().toString());
|
||||
}
|
||||
}
|
||||
|
||||
data->tLogMsgsPTreeUpdatesLatencyHistogram->sampleSeconds(now() - beforeTLogMsgsUpdates);
|
||||
if (data->currentChangeFeeds.size()) {
|
||||
data->changeFeedVersions.emplace_back(
|
||||
|
|
Loading…
Reference in New Issue