Merge remote-tracking branch 'apple/main' into vgasiunas-upgrade-test

This commit is contained in:
Vaidas Gasiunas 2022-04-13 14:17:46 +02:00
commit 871cdece0e
10 changed files with 799 additions and 11 deletions

View File

@ -23,7 +23,7 @@ else()
endif()
project(foundationdb
VERSION 7.1.0
VERSION 7.2.0
DESCRIPTION "FoundationDB is a scalable, fault-tolerant, ordered key-value store with full ACID transactions."
HOMEPAGE_URL "http://www.foundationdb.org/"
LANGUAGES C CXX ASM)

View File

@ -151,6 +151,7 @@ set(FDBCLIENT_SRCS
VersionedMap.h
VersionedMap.cpp
VersionVector.h
VersionVector.cpp
WellKnownEndpoints.h
WriteMap.h
json_spirit/json_spirit_error_position.h

View File

@ -208,7 +208,7 @@ struct CommitTransactionRef {
if (ar.protocolVersion().hasReportConflictingKeys()) {
serializer(ar, report_conflicting_keys);
}
if (ar.protocolVersion().hasSpanContext()) {
if (ar.protocolVersion().hasResolverPrivateMutations()) {
serializer(ar, lock_aware, spanContext);
}
}

380
fdbclient/VersionVector.cpp Normal file
View File

@ -0,0 +1,380 @@
/*
* VersionVector.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/Arena.h"
#include "flow/UnitTest.h"
#include "fdbclient/VersionVector.h"
namespace unit_tests {
struct TestContextArena {
Arena& _arena;
Arena& arena() { return _arena; }
ProtocolVersion protocolVersion() const { return g_network->protocolVersion(); }
uint8_t* allocate(size_t size) { return new (_arena) uint8_t[size]; }
};
TEST_CASE("/fdbclient/VersionVector/emptyVV") {
Arena arena;
TestContextArena context{ arena };
{
VersionVector serializedVV; // an empty version vector
size_t size = dynamic_size_traits<VersionVector>::size(serializedVV, context);
uint8_t* buf = context.allocate(size);
dynamic_size_traits<VersionVector>::save(buf, serializedVV, context);
VersionVector deserializedVV;
dynamic_size_traits<VersionVector>::load(buf, size, deserializedVV, context);
ASSERT(serializedVV.compare(deserializedVV));
}
{
VersionVector serializedVV(133200164); // "VersionVector::maxVersion" is set, empty otherwise
size_t size = dynamic_size_traits<VersionVector>::size(serializedVV, context);
uint8_t* buf = context.allocate(size);
dynamic_size_traits<VersionVector>::save(buf, serializedVV, context);
VersionVector deserializedVV;
dynamic_size_traits<VersionVector>::load(buf, size, deserializedVV, context);
ASSERT(serializedVV.compare(deserializedVV));
}
return Void();
}
TEST_CASE("/fdbclient/VersionVector/simpleVV") {
Arena arena;
TestContextArena context{ arena };
VersionVector serializedVV;
serializedVV.setVersion(Tag(-1, 2), 3619339);
serializedVV.setVersion(Tag(0, 13), 13292611);
std::set<Tag> tags;
tags.emplace(0, 2);
tags.emplace(0, 1);
tags.emplace(0, 0);
serializedVV.setVersion(tags, 13391141);
size_t size = dynamic_size_traits<VersionVector>::size(serializedVV, context);
uint8_t* buf = context.allocate(size);
dynamic_size_traits<VersionVector>::save(buf, serializedVV, context);
VersionVector deserializedVV;
dynamic_size_traits<VersionVector>::load(buf, size, deserializedVV, context);
ASSERT(serializedVV.compare(deserializedVV));
return Void();
}
// Populates version vector (with randomly generated tag localities, ids, and commit versions)
// based on the given specifications.
// @param vv Version vector
// @param tagCount total number of storage servers in the cluster
// @param localityCount total number of localities/regions in the cluster
// @param maxTagId maximum value of any tag id in the cluster
// @param maxCommitVersionDelta maximum difference between commit versions in the version vector
// @note assumes each locality contains the same number of tags
// @note picks locality values randomly from range [tagLocalityInvalid+1, INT8_MAX)
void populateVersionVector(VersionVector& vv,
int tagCount,
int localityCount,
int maxTagId,
const uint64_t maxCommitVersionDelta) {
std::vector<uint16_t> ids;
std::vector<int8_t> localities;
Version minVersion;
std::vector<Version> versions;
int tagsPerLocality = tagCount / localityCount;
// Populate localities.
for (int i = 0; localities.size() < (size_t)localityCount; i++) {
int8_t locality = deterministicRandom()->randomInt(tagLocalityInvalid + 1, INT8_MAX);
if (std::find(localities.begin(), localities.end(), locality) == localities.end()) {
localities.push_back(locality);
}
}
// Populate ids.
for (int i = 0; i < tagCount; i++) {
// Some of the ids could be duplicates, that's fine.
ids.push_back(deterministicRandom()->randomInt(0, maxTagId));
}
// Choose a value for minVersion. (Choose a value in such a way that
// "minVersion + maxCommitVersionDelta" does not exceed INT64_MAX.)
if (maxCommitVersionDelta <= UINT16_MAX) {
minVersion = deterministicRandom()->randomUInt32();
} else if (maxCommitVersionDelta <= UINT32_MAX) {
minVersion = deterministicRandom()->randomInt(0, UINT16_MAX);
} else {
minVersion = 0;
}
// Populate versions.
Version versionDelta;
for (int i = 0; i < tagCount; i++) {
if (maxCommitVersionDelta <= UINT8_MAX) {
versionDelta = deterministicRandom()->randomInt(0, UINT8_MAX);
} else if (maxCommitVersionDelta <= UINT16_MAX) {
versionDelta = deterministicRandom()->randomInt(0, UINT16_MAX);
} else if (maxCommitVersionDelta <= UINT32_MAX) {
versionDelta = deterministicRandom()->randomUInt32();
} else {
versionDelta = deterministicRandom()->randomInt64(0, INT64_MAX);
}
// Some of the versions could be duplicates, that's fine.
versions.push_back(minVersion + versionDelta);
}
// Sort versions.
std::sort(versions.begin(), versions.end());
// Populate the version vector.
std::set<Tag> tags;
int tagIndex = 0;
for (int i = 0; i < localities.size() && tagIndex < tagCount; i++) {
for (int j = 0; j < tagsPerLocality && tagIndex < tagCount; j++, tagIndex++) {
if (Tag(localities[i], ids[tagIndex]) == invalidTag) {
continue; // skip this tag (this version also gets skipped, that's fine)
}
if (versions[tagIndex] == vv.getMaxVersion()) {
tags.emplace(localities[i], ids[tagIndex]);
continue; // skip this version; this tag will get the next higher version
}
if (tags.empty()) {
vv.setVersion(Tag(localities[i], ids[tagIndex]), versions[tagIndex]);
} else {
vv.setVersion(tags, versions[tagIndex]);
tags.clear();
}
}
}
ASSERT(tagIndex == tagCount);
}
TEST_CASE("/fdbclient/VersionVector/testA") {
Arena arena;
TestContextArena context{ arena };
VersionVector serializedVV;
// 80 storage servers spread over 2 regions, maxTagId < INT8_MAX, and
// maxCommitVersionDelta < UINT8_MAX.
populateVersionVector(serializedVV, 80, 2, INT8_MAX, UINT8_MAX);
size_t size = dynamic_size_traits<VersionVector>::size(serializedVV, context);
uint8_t* buf = context.allocate(size);
dynamic_size_traits<VersionVector>::save(buf, serializedVV, context);
VersionVector deserializedVV;
dynamic_size_traits<VersionVector>::load(buf, size, deserializedVV, context);
ASSERT(serializedVV.compare(deserializedVV));
return Void();
}
TEST_CASE("/fdbclient/VersionVector/testB") {
Arena arena;
TestContextArena context{ arena };
VersionVector serializedVV;
// 800 storage servers spread over 2 regions, maxTagId < INT16_MAX, and
// maxCommitVersionDelta < UINT8_MAX.
populateVersionVector(serializedVV, 800, 2, INT16_MAX, UINT8_MAX);
size_t size = dynamic_size_traits<VersionVector>::size(serializedVV, context);
uint8_t* buf = context.allocate(size);
dynamic_size_traits<VersionVector>::save(buf, serializedVV, context);
VersionVector deserializedVV;
dynamic_size_traits<VersionVector>::load(buf, size, deserializedVV, context);
ASSERT(serializedVV.compare(deserializedVV));
return Void();
}
TEST_CASE("/fdbclient/VersionVector/testC") {
Arena arena;
TestContextArena context{ arena };
VersionVector serializedVV;
// 800 storage servers spread over 2 regions, maxTagId < INT16_MAX, and
// maxCommitVersionDelta < UINT16_MAX.
populateVersionVector(serializedVV, 800, 2, INT16_MAX, UINT16_MAX);
size_t size = dynamic_size_traits<VersionVector>::size(serializedVV, context);
uint8_t* buf = context.allocate(size);
dynamic_size_traits<VersionVector>::save(buf, serializedVV, context);
VersionVector deserializedVV;
dynamic_size_traits<VersionVector>::load(buf, size, deserializedVV, context);
ASSERT(serializedVV.compare(deserializedVV));
return Void();
}
TEST_CASE("/fdbclient/VersionVector/testD") {
Arena arena;
TestContextArena context{ arena };
VersionVector serializedVV;
// 800 storage servers spread over 2 regions, maxTagId < INT16_MAX, and
// maxCommitVersionDelta < UINT32_MAX.
populateVersionVector(serializedVV, 800, 2, INT16_MAX, UINT32_MAX);
size_t size = dynamic_size_traits<VersionVector>::size(serializedVV, context);
uint8_t* buf = context.allocate(size);
dynamic_size_traits<VersionVector>::save(buf, serializedVV, context);
VersionVector deserializedVV;
dynamic_size_traits<VersionVector>::load(buf, size, deserializedVV, context);
ASSERT(serializedVV.compare(deserializedVV));
return Void();
}
TEST_CASE("/fdbclient/VersionVector/testE") {
Arena arena;
TestContextArena context{ arena };
VersionVector serializedVV;
// 800 storage servers spread over 2 regions, maxTagId < INT16_MAX, and
// maxCommitVersionDelta < UINT64_MAX.
populateVersionVector(serializedVV, 800, 2, INT16_MAX, UINT64_MAX);
size_t size = dynamic_size_traits<VersionVector>::size(serializedVV, context);
uint8_t* buf = context.allocate(size);
dynamic_size_traits<VersionVector>::save(buf, serializedVV, context);
VersionVector deserializedVV;
dynamic_size_traits<VersionVector>::load(buf, size, deserializedVV, context);
ASSERT(serializedVV.compare(deserializedVV));
return Void();
}
TEST_CASE("/fdbclient/VersionVector/testF") {
Arena arena;
TestContextArena context{ arena };
VersionVector serializedVV;
// 1600 storage servers spread over 4 regions, maxTagId < INT16_MAX, and
// maxCommitVersionDelta < UINT8_MAX.
populateVersionVector(serializedVV, 1600, 4, INT16_MAX, UINT8_MAX);
size_t size = dynamic_size_traits<VersionVector>::size(serializedVV, context);
uint8_t* buf = context.allocate(size);
dynamic_size_traits<VersionVector>::save(buf, serializedVV, context);
VersionVector deserializedVV;
dynamic_size_traits<VersionVector>::load(buf, size, deserializedVV, context);
ASSERT(serializedVV.compare(deserializedVV));
return Void();
}
TEST_CASE("/fdbclient/VersionVector/testG") {
Arena arena;
TestContextArena context{ arena };
VersionVector serializedVV;
// 1600 storage servers spread over 4 regions, maxTagId < INT16_MAX, and
// maxCommitVersionDelta < UINT16_MAX.
populateVersionVector(serializedVV, 1600, 4, INT16_MAX, UINT16_MAX);
size_t size = dynamic_size_traits<VersionVector>::size(serializedVV, context);
uint8_t* buf = context.allocate(size);
dynamic_size_traits<VersionVector>::save(buf, serializedVV, context);
VersionVector deserializedVV;
dynamic_size_traits<VersionVector>::load(buf, size, deserializedVV, context);
ASSERT(serializedVV.compare(deserializedVV));
return Void();
}
TEST_CASE("/fdbclient/VersionVector/testH") {
Arena arena;
TestContextArena context{ arena };
VersionVector serializedVV;
// 3200 storage servers spread over 4 regions, maxTagId < INT16_MAX, and
// maxCommitVersionDelta < UINT32_MAX.
populateVersionVector(serializedVV, 3200, 4, INT16_MAX, UINT32_MAX);
size_t size = dynamic_size_traits<VersionVector>::size(serializedVV, context);
uint8_t* buf = context.allocate(size);
dynamic_size_traits<VersionVector>::save(buf, serializedVV, context);
VersionVector deserializedVV;
dynamic_size_traits<VersionVector>::load(buf, size, deserializedVV, context);
ASSERT(serializedVV.compare(deserializedVV));
return Void();
}
TEST_CASE("/fdbclient/VersionVector/testI") {
Arena arena;
TestContextArena context{ arena };
VersionVector serializedVV;
// 3200 storage servers spread over 4 regions, maxTagId < INT16_MAX, and
// maxCommitVersionDelta < UINT64_MAX.
populateVersionVector(serializedVV, 3200, 4, INT16_MAX, UINT64_MAX);
size_t size = dynamic_size_traits<VersionVector>::size(serializedVV, context);
uint8_t* buf = context.allocate(size);
dynamic_size_traits<VersionVector>::save(buf, serializedVV, context);
VersionVector deserializedVV;
dynamic_size_traits<VersionVector>::load(buf, size, deserializedVV, context);
ASSERT(serializedVV.compare(deserializedVV));
return Void();
}
} // namespace unit_tests
void forceLinkVersionVectorTests() {}

View File

@ -29,39 +29,65 @@
#include "fdbclient/FDBTypes.h"
#include "fdbclient/Knobs.h"
static const int InvalidEncodedSize = 0;
struct VersionVector {
boost::container::flat_map<Tag, Version> versions;
friend struct serializable_traits<VersionVector>;
boost::container::flat_map<Tag, Version> versions; // An ordered map. (Note:
// changing this to an unordered
// map will break the
// serialization code below.)
Version maxVersion; // Specifies the max version in this version vector. (Note:
// there may or may not be a corresponding entry for this
// version in the "versions" map.)
VersionVector() : maxVersion(invalidVersion) {}
VersionVector(Version version) : maxVersion(version) {}
VersionVector() : maxVersion(invalidVersion), cachedEncodedSize(InvalidEncodedSize) {}
VersionVector(Version version) : maxVersion(version), cachedEncodedSize(InvalidEncodedSize) {}
private:
// Only invoked by getDelta() and applyDelta(), where tag has been validated
// and version is guaranteed to be larger than the existing value.
inline void setVersionNoCheck(const Tag& tag, Version version) { versions[tag] = version; }
inline void setVersionNoCheck(const Tag& tag, Version version) {
versions[tag] = version;
invalidateCachedEncodedSize();
}
inline void invalidateCachedEncodedSize() { cachedEncodedSize = InvalidEncodedSize; }
// Encoded version vector size. Introduced to help speed up serialization.
// @note This encoded size is not meant to be kept in sync with the updates
// that happen to the version vector.
// @note A value of 0 (= InvalidEncodedSize) indicates that the encoded version
// vector size is not cached.
size_t cachedEncodedSize;
public:
Version getMaxVersion() const { return maxVersion; }
void setMaxVersion(Version version) { maxVersion = version; }
int size() const { return versions.size(); }
bool empty() const { return versions.empty(); }
void setVersion(const Tag& tag, Version version) {
ASSERT(tag != invalidTag);
ASSERT(tag.locality > tagLocalityInvalid);
ASSERT(version > maxVersion);
versions[tag] = version;
maxVersion = version;
invalidateCachedEncodedSize();
}
void setVersion(const std::set<Tag>& tags, Version version) {
ASSERT(version > maxVersion);
for (auto& tag : tags) {
ASSERT(tag != invalidTag);
ASSERT(tag.locality > tagLocalityInvalid);
versions[tag] = version;
}
maxVersion = version;
invalidateCachedEncodedSize();
}
bool hasVersion(const Tag& tag) const {
@ -80,6 +106,7 @@ public:
void clear() {
versions.clear();
maxVersion = invalidVersion;
invalidateCachedEncodedSize();
}
// @note this method, together with method applyDelta(), helps minimize
@ -144,9 +171,386 @@ public:
bool operator!=(const VersionVector& vv) const { return maxVersion != vv.maxVersion; }
bool operator<(const VersionVector& vv) const { return maxVersion < vv.maxVersion; }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, versions, maxVersion);
bool compare(const VersionVector& vv) {
if (maxVersion != vv.maxVersion) {
return false;
}
if (versions.size() != vv.versions.size()) {
return false;
}
auto iterA = versions.begin();
auto iterB = vv.versions.begin();
for (; iterA != versions.end(); iterA++, iterB++) {
if (iterA->first != iterB->first || iterA->second != iterB->second) {
return false;
}
}
ASSERT(iterB == vv.versions.end());
return true;
}
//
// Methods to set/get/check cached encoded version vector size.
//
void setCachedEncodedSize(size_t size) { cachedEncodedSize = size; }
bool isEncodedSizeCached() const { return cachedEncodedSize != InvalidEncodedSize; }
size_t getCachedEncodedSize() const {
ASSERT(isEncodedSizeCached());
return cachedEncodedSize;
}
//
// Methods to copy an encoded version vector into the serialization buffer.
//
// Encoding methods used:
//
// - Tag localities: Run-length encoding
// - Tag ids: Compact representation (depending on the max tag id value)
// - Commit versions: Delta encoding
//
// Extracts information about tag ids, tag localities, and commit versions that are
// captured in the version vector. This will avoid the need to make multiple iterations
// over the contents of the version vector while (encoding and) serializing it.
void getTagAndCommitVersionInfo(size_t& utlCount,
uint16_t& maxTagId,
Version& minCommitVersion,
Version& maxCommitVersion) const {
// Initialization
utlCount = 0; // unique tag locality count
maxTagId = 0; // the highest tag id in the version vector
minCommitVersion = MAX_VERSION; // the lowest commit version in "VersionVector::versions"
maxCommitVersion = invalidVersion; // the highest commit version in "VersionVector::versions"
// Population
int8_t locality = tagLocalityInvalid;
for (const auto& [tag, version] : versions) {
if (locality != tag.locality) {
locality = tag.locality;
utlCount++;
}
maxTagId = std::max(maxTagId, tag.id);
minCommitVersion = std::min(minCommitVersion, version);
maxCommitVersion = std::max(maxCommitVersion, version);
}
}
// Calculate size of the encoded version vector.
size_t getEncodedSize() const {
size_t utlCount; // unique tag locality count
uint16_t maxTagId; // the highest tag id in the version vector
Version minVersion; // the lowest commit version in the version vector
Version maxVersion; // the highest commit version in the version vector
getTagAndCommitVersionInfo(utlCount, maxTagId, minVersion, maxVersion);
// Is the version vector empty?
if (utlCount == 0) {
return sizeof(size_t) + /* captures unique tag locality count (= 0, in this case) */
sizeof(Version); /* captures VersionVector::maxVersion */
}
size_t tagIdSize = 0; // number of bytes needed to serialize an individual (potentially compacted) tag id
tagIdSize = (maxTagId <= UINT8_MAX) ? sizeof(uint8_t) : sizeof(uint16_t);
size_t commitVersionSize = 0; // number of bytes needed to serialize an individual commit version
if ((maxVersion - minVersion) <= UINT8_MAX) {
commitVersionSize = sizeof(uint8_t);
} else if ((maxVersion - minVersion) <= UINT16_MAX) {
commitVersionSize = sizeof(uint16_t);
} else if ((maxVersion - minVersion) <= UINT32_MAX) {
commitVersionSize = sizeof(uint32_t);
} else {
commitVersionSize = sizeof(uint64_t);
}
return sizeof(size_t) + /* unique tag locality count */
utlCount * (sizeof(int8_t) + sizeof(uint16_t)) + // unique tag locality values and their run lengths
sizeof(uint8_t) + /* number of bytes needed to serialize an individual (potentially compacted) tag id */
sizeof(uint8_t) + /* number of bytes needed to serialize an individual commit version */
sizeof(Version) + /* the lowest commit version in the version vector */
sizeof(size_t) + /* number of <tagid, version> pairs */
this->size() * (tagIdSize + commitVersionSize) + /* encoded <tagid, version> pairs */
sizeof(Version); /* VersionVector::maxVersion */
}
// Copy "value" into the serialization buffer.
template <typename T>
void serialize(uint8_t*& out, T value) const {
memcpy(out, &value, sizeof(T));
out += sizeof(T);
}
// Copy RLE encoded tag locality values into the serialization buffer.
void serializeTagLocalities(size_t utlCount, uint8_t*& out) const {
serialize<size_t>(out, utlCount); // unique tag locality count
// Is the version vector empty?
if (utlCount == 0) {
return;
}
int8_t locality = tagLocalityInvalid;
uint16_t localityCount = 0;
for (const auto& [tag, version] : versions) {
if (locality != tag.locality) {
if (locality != tagLocalityInvalid) {
serialize<int8_t>(out, locality); // tag locality value
serialize<uint16_t>(out, localityCount); // run length of the locality value
}
locality = tag.locality;
localityCount = 1;
} else {
localityCount++;
}
}
if (locality != tagLocalityInvalid) {
serialize<int8_t>(out, locality); // tag locality value
serialize<uint16_t>(out, localityCount); // run length of the locality value
}
}
// Copy encoded tag id and commit version values into the serialization buffer.
// T: Type to be used to serialize tag ids (uint8_t/uint16_t)
// V: Type to be used to serialize commit version deltas (uint8_t/uint16_t/uint32_t/uint64_t)
template <typename T, typename V>
void serializeSizedTagIdsAndSizedCommitVersions(Version minCommitVersion, uint8_t*& out) const {
// Number of bytes that will be used to serialize an individual tag id.
serialize<uint8_t>(out, (uint8_t)sizeof(T));
// Number of bytes that will be used to serialize an individual commit version delta value.
serialize<uint8_t>(out, (uint8_t)sizeof(V));
// The lowest commit version in the version vector.
serialize<Version>(out, minCommitVersion);
// The number of <tagId, commitVersion> pairs.
serialize<size_t>(out, (this->size()));
for (const auto& [tag, version] : versions) {
// Serialize tag id.
serialize<T>(out, (T)tag.id);
// Serialize commit version delta.
serialize<V>(out, (V)(version - minCommitVersion));
}
}
// Figure out the type to be used to serialize delta encoded commit version values,
// and call the above method to do the serialization.
// T: Type to be used to serialize tag ids (uint8_t/uint16_t)
template <typename T>
void serializeSizedTagIdsAndCommitVersions(Version minVersion, Version maxVersion, uint8_t*& out) const {
if ((maxVersion - minVersion) <= UINT8_MAX) {
serializeSizedTagIdsAndSizedCommitVersions<T, uint8_t>(minVersion, out);
} else if ((maxVersion - minVersion) <= UINT16_MAX) {
serializeSizedTagIdsAndSizedCommitVersions<T, uint16_t>(minVersion, out);
} else if ((maxVersion - minVersion) <= UINT32_MAX) {
serializeSizedTagIdsAndSizedCommitVersions<T, uint32_t>(minVersion, out);
} else {
serializeSizedTagIdsAndSizedCommitVersions<T, uint64_t>(minVersion, out);
}
}
// Figure out the types to be used to serialize (potentially compacted) tag ids and delta
// encoded commit version values, and call the above methods to do the serialization.
void serializeTagIdsAndCommitVersions(uint16_t maxTagId,
Version minVersion,
Version maxVersion,
uint8_t*& out) const {
ASSERT(!this->empty());
if (maxTagId <= UINT8_MAX) {
serializeSizedTagIdsAndCommitVersions<uint8_t>(minVersion, maxVersion, out);
} else {
serializeSizedTagIdsAndCommitVersions<uint16_t>(minVersion, maxVersion, out);
}
}
//
// Methods to load (an encoded) version vector from the serialization buffer.
//
// Extract "value" from the serialization buffer.
template <typename T>
void deserialize(const uint8_t*& data, T& value) const {
memcpy(&value, data, sizeof(T));
data += sizeof(T);
}
// Deserialize RLE encoded tag locality values.
void deserializeLocalities(const uint8_t*& data,
size_t& utlCount,
std::vector<int8_t>& localities,
std::vector<uint16_t>& localityCounts) {
// Initialization
localities.clear();
localityCounts.clear();
// Extract unique tag locality count from the buffer.
deserialize<size_t>(data, utlCount);
if (utlCount == 0) {
return;
}
int8_t locality;
uint16_t localityCount;
localities.reserve(utlCount);
localityCounts.reserve(utlCount);
for (size_t i = 0; i < utlCount; i++) {
deserialize<int8_t>(data, locality);
localities.push_back(locality);
deserialize<uint16_t>(data, localityCount);
localityCounts.push_back(localityCount);
}
}
// Deserialize tag ids and commit version values.
// T: Type that was used to serialize tag ids (uint8_t/uint16_t)
// V: Type that was used to serialize commit version deltas (uint8_t/uint16_t/uint32_t/uint64_t)
template <typename T, typename V>
void deserializeSizedTagIdsAndSizedCommitVersions(const uint8_t*& data,
std::vector<int8_t>& localities,
std::vector<uint16_t>& localityCounts) {
Version minCommitVersion;
deserialize<Version>(data, minCommitVersion);
size_t pairCount; // number of serialized <tag id, commit version> pairs
deserialize<size_t>(data, pairCount);
T tagId;
V versionDelta;
for (size_t i = 0; i < localities.size(); i++) {
for (size_t j = 0; j < localityCounts[i]; j++) {
// Deserialize tag id.
deserialize<T>(data, tagId);
// Deserialize commit version delta.
deserialize<V>(data, versionDelta);
Tag tag(localities[i], tagId);
setVersionNoCheck(tag, minCommitVersion + versionDelta);
}
}
}
// Figrue out the type that was used to serialize commit version deltas and call the above
// method to do the deserialization.
// T: Type that was used to serialize tag ids (uint8_t/uint16_t)
template <typename T>
void deserializeSizedTagIdsAndCommitVersions(const uint8_t*& data,
std::vector<int8_t>& localities,
std::vector<uint16_t>& localityCounts) {
uint8_t commitVersionDeltaSize; // number of bytes that were used to serialize an individual commit version
// delta value
deserialize<uint8_t>(data, commitVersionDeltaSize);
if (commitVersionDeltaSize == sizeof(uint8_t)) {
deserializeSizedTagIdsAndSizedCommitVersions<T, uint8_t>(data, localities, localityCounts);
} else if (commitVersionDeltaSize == sizeof(uint16_t)) {
deserializeSizedTagIdsAndSizedCommitVersions<T, uint16_t>(data, localities, localityCounts);
} else if (commitVersionDeltaSize == sizeof(uint32_t)) {
deserializeSizedTagIdsAndSizedCommitVersions<T, uint32_t>(data, localities, localityCounts);
} else {
ASSERT(commitVersionDeltaSize == sizeof(uint64_t));
deserializeSizedTagIdsAndSizedCommitVersions<T, uint64_t>(data, localities, localityCounts);
}
}
// Figure out the types that were used to serialize tag ids and commit version deltas, and
// call the above methods to do the deserialization.
void deserializeTagIdsAndCommitVersions(const uint8_t*& data,
std::vector<int8_t>& localities,
std::vector<uint16_t>& localityCounts) {
uint8_t tagIdSize; // number of bytes that were used to serialize an individual tag id
deserialize<uint8_t>(data, tagIdSize);
if (tagIdSize == sizeof(uint8_t)) {
deserializeSizedTagIdsAndCommitVersions<uint8_t>(data, localities, localityCounts);
} else {
ASSERT(tagIdSize == sizeof(uint16_t));
deserializeSizedTagIdsAndCommitVersions<uint16_t>(data, localities, localityCounts);
}
}
};
// @note Enabling/Disabling version vector encoding during serialization (and
// de-serialization):
// - To enable version vector encoding during serialization/de-serialization:
// derive "struct dynamic_size_traits<VersionVector>" from "std::true_type" and
// derive "struct serializable_traits<VersionVector>" from "std::false_type".
//
// - To disable version vector encoding during serialization/de-serialization::
// derive "struct dynamic_size_traits<VersionVector>" from "std::false_type" and
// derive "struct serializable_traits<VersionVector>" from "std::true_type".
template <>
struct serializable_traits<VersionVector> : std::false_type {
template <class Archiver>
static void serialize(Archiver& ar, VersionVector& vv) {
serializer(ar, vv.versions, vv.maxVersion);
}
};
template <>
struct dynamic_size_traits<VersionVector> : std::true_type {
template <class Context>
static size_t size(const VersionVector& vv, Context&) {
size_t encodedSize;
if (vv.isEncodedSizeCached()) {
encodedSize = vv.getCachedEncodedSize();
// @todo remove this assert before doing performance tests
ASSERT(encodedSize == vv.getEncodedSize());
} else {
encodedSize = vv.getEncodedSize();
const_cast<VersionVector&>(vv).setCachedEncodedSize(encodedSize);
}
return encodedSize;
}
template <class Context>
static void save(uint8_t* out, const VersionVector& vv, Context&) {
auto* begin = out;
size_t utlCount; // unique tag locality count
uint16_t maxTagId; // the highest tag id in the version vector
Version minCommitVersion; // the lowest commit version in the version vector (in "VersionVector::versions")
Version maxCommitVersion; // the highest commit version in the version vector (in "VersionVector::versions")
vv.getTagAndCommitVersionInfo(utlCount, maxTagId, minCommitVersion, maxCommitVersion);
vv.serializeTagLocalities(utlCount, out);
if (!vv.empty()) {
vv.serializeTagIdsAndCommitVersions(maxTagId, minCommitVersion, maxCommitVersion, out);
}
// Serialize vv::maxVersion.
vv.serialize<Version>(out, (vv.getMaxVersion()));
// @todo remove this assert before doing performance tests
ASSERT(out - begin == vv.getEncodedSize());
}
template <class Context>
static void load(const uint8_t* data, size_t size, VersionVector& vv, Context& context) {
auto* p = data;
size_t utlCount;
std::vector<int8_t> localities;
std::vector<uint16_t> localityCounts;
vv.deserializeLocalities(data, utlCount, localities, localityCounts);
if (utlCount > 0) {
vv.deserializeTagIdsAndCommitVersions(data, localities, localityCounts);
}
// Deserialize VersionVector::maxVersion.
Version maxVersion;
vv.deserialize<Version>(data, maxVersion);
vv.setMaxVersion(maxVersion);
ASSERT(data - p == size);
}
};

View File

@ -37,6 +37,7 @@ void forceLinkSimExternalConnectionTests();
void forceLinkMutationLogReaderTests();
void forceLinkSimEncryptKmsProxyTests();
void forceLinkIThreadPoolTests();
void forceLinkVersionVectorTests();
struct UnitTestWorkload : TestWorkload {
bool enabled;
@ -84,6 +85,7 @@ struct UnitTestWorkload : TestWorkload {
forceLinkMutationLogReaderTests();
forceLinkSimEncryptKmsProxyTests();
forceLinkIThreadPoolTests();
forceLinkVersionVectorTests();
}
std::string description() const override { return "UnitTests"; }

View File

@ -163,6 +163,7 @@ public: // introduced features
PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, PerpetualWiggleMetadata);
PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, Tenants);
PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, StorageInterfaceReadiness);
PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, ResolverPrivateMutations);
};
template <>

View File

@ -254,8 +254,8 @@ if(WITH_PYTHON)
TEST_FILES restarting/from_7.0.0/UpgradeAndBackupRestore-1.toml
restarting/from_7.0.0/UpgradeAndBackupRestore-2.toml)
add_fdb_test(
TEST_FILES restarting/to_7.0.0/CycleTestRestart-1.txt
restarting/to_7.0.0/CycleTestRestart-2.txt)
TEST_FILES restarting/to_7.1.0/CycleTestRestart-1.txt
restarting/to_7.1.0/CycleTestRestart-2.txt)
add_fdb_test(
TEST_FILES restarting/from_7.0.0/SnapTestAttrition-1.txt
restarting/from_7.0.0/SnapTestAttrition-2.txt)