Merge remote-tracking branch 'apple/main' into vgasiunas-upgrade-test

This commit is contained in:
Vaidas Gasiunas 2022-04-08 20:27:45 +02:00
commit f91acd323c
21 changed files with 1760 additions and 1048 deletions

View File

@ -619,7 +619,7 @@ FDBFuture* fdb_transaction_get_range_impl(FDBTransaction* tr,
.extractPtr());
}
FDBFuture* fdb_transaction_get_mapped_range_impl(FDBTransaction* tr,
extern "C" DLLEXPORT FDBFuture* fdb_transaction_get_mapped_range(FDBTransaction* tr,
uint8_t const* begin_key_name,
int begin_key_name_length,
fdb_bool_t begin_or_equal,
@ -651,8 +651,7 @@ FDBFuture* fdb_transaction_get_mapped_range_impl(FDBTransaction* tr,
.extractPtr());
}
// TODO: Support FDB_API_ADDED in generate_asm.py and then this can be replaced with fdb_api_ptr_unimpl.
FDBFuture* fdb_transaction_get_mapped_range_v699(FDBTransaction* tr,
FDBFuture* fdb_transaction_get_range_and_flat_map_v709(FDBTransaction* tr,
uint8_t const* begin_key_name,
int begin_key_name_length,
fdb_bool_t begin_or_equal,
@ -669,7 +668,7 @@ FDBFuture* fdb_transaction_get_mapped_range_v699(FDBTransaction* tr,
int iteration,
fdb_bool_t snapshot,
fdb_bool_t reverse) {
fprintf(stderr, "UNIMPLEMENTED FDB API FUNCTION\n");
fprintf(stderr, "GetRangeAndFlatMap is removed from 7.0. Please upgrade to 7.1 and use GetMappedRange\n");
abort();
}
@ -900,13 +899,13 @@ extern "C" DLLEXPORT fdb_error_t fdb_select_api_version_impl(int runtime_version
// Versioned API changes -- descending order by version (new changes at top)
// FDB_API_CHANGED( function, ver ) means there is a new implementation as of ver, and a function function_(ver-1)
// is the old implementation FDB_API_REMOVED( function, ver ) means the function was removed as of ver, and
// is the old implementation. FDB_API_REMOVED( function, ver ) means the function was removed as of ver, and
// function_(ver-1) is the old implementation
//
// WARNING: use caution when implementing removed functions by calling public API functions. This can lead to
// undesired behavior when using the multi-version API. Instead, it is better to have both the removed and public
// functions call an internal implementation function. See fdb_create_database_impl for an example.
FDB_API_CHANGED(fdb_transaction_get_mapped_range, 700);
FDB_API_REMOVED(fdb_transaction_get_range_and_flat_map, 710);
FDB_API_REMOVED(fdb_future_get_version, 620);
FDB_API_REMOVED(fdb_create_cluster, 610);
FDB_API_REMOVED(fdb_cluster_create_database, 610);

View File

@ -28,7 +28,6 @@ Features
* Improved the efficiency with which storage servers replicate data between themselves. `(PR #5017) <https://github.com/apple/foundationdb/pull/5017>`_
* Added support to ``exclude command`` to exclude based on locality match. `(PR #5113) <https://github.com/apple/foundationdb/pull/5113>`_
* Add the ``trace_partial_file_suffix`` network option. This option will give unfinished trace files a special suffix to indicate they're not complete yet. When the trace file is complete, it is renamed to remove the suffix. `(PR #5328) <https://github.com/apple/foundationdb/pull/5328>`_
* Added "get range and flat map" feature with new APIs (see Bindings section). Storage servers are able to generate the keys in the queries based on another query. With this, upper layer can push some computations down to FDB, to improve latency and bandwidth when read. `(PR #5609) <https://github.com/apple/foundationdb/pull/5609>`_
Performance
-----------
@ -85,8 +84,6 @@ Bindings
* C: Added a function, ``fdb_database_create_snapshot``, to create a snapshot of the database. `(PR #4241) <https://github.com/apple/foundationdb/pull/4241/files>`_
* C: Added ``fdb_database_get_main_thread_busyness`` function to report how busy a client's main thread is. `(PR #4504) <https://github.com/apple/foundationdb/pull/4504>`_
* Java: Added ``Database.getMainThreadBusyness`` function to report how busy a client's main thread is. `(PR #4564) <https://github.com/apple/foundationdb/pull/4564>`_
* C: Added ``fdb_transaction_get_range_and_flat_map`` function to support running queries based on another query in one request. `(PR #5609) <https://github.com/apple/foundationdb/pull/5609>`_
* Java: Added ``Transaction.getRangeAndFlatMap`` function to support running queries based on another query in one request. `(PR #5609) <https://github.com/apple/foundationdb/pull/5609>`_
Other Changes
-------------

View File

@ -10,6 +10,7 @@ Release Notes
Features
--------
* Added ``USE_GRV_CACHE`` transaction option to allow read versions to be locally cached on the client side for latency optimizations. `(PR #5725) <https://github.com/apple/foundationdb/pull/5725>`_ `(PR #6664) <https://github.com/apple/foundationdb/pull/6664>`_
* Added "get range and flat map" feature with new APIs (see Bindings section). Storage servers are able to generate the keys in the queries based on another query. With this, upper layer can push some computations down to FDB, to improve latency and bandwidth when read. `(PR #5609) <https://github.com/apple/foundationdb/pull/5609>`_, `(PR #6181) <https://github.com/apple/foundationdb/pull/6181>`_, etc..
Performance
-----------
@ -25,6 +26,8 @@ Status
Bindings
--------
* C: Added ``fdb_transaction_get_range_and_flat_map`` function to support running queries based on another query in one request. `(PR #5609) <https://github.com/apple/foundationdb/pull/5609>`_
* Java: Added ``Transaction.getRangeAndFlatMap`` function to support running queries based on another query in one request. `(PR #5609) <https://github.com/apple/foundationdb/pull/5609>`_
Other Changes
-------------

View File

@ -609,7 +609,7 @@ void DLApi::init() {
headerVersion >= 0);
loadClientFunction(&api->transactionGetRange, lib, fdbCPath, "fdb_transaction_get_range", headerVersion >= 0);
loadClientFunction(
&api->transactionGetMappedRange, lib, fdbCPath, "fdb_transaction_get_mapped_range", headerVersion >= 700);
&api->transactionGetMappedRange, lib, fdbCPath, "fdb_transaction_get_mapped_range", headerVersion >= 710);
loadClientFunction(
&api->transactionGetVersionstamp, lib, fdbCPath, "fdb_transaction_get_versionstamp", headerVersion >= 410);
loadClientFunction(&api->transactionSet, lib, fdbCPath, "fdb_transaction_set", headerVersion >= 0);
@ -667,7 +667,7 @@ void DLApi::init() {
loadClientFunction(
&api->futureGetKeyValueArray, lib, fdbCPath, "fdb_future_get_keyvalue_array", headerVersion >= 0);
loadClientFunction(
&api->futureGetMappedKeyValueArray, lib, fdbCPath, "fdb_future_get_mappedkeyvalue_array", headerVersion >= 700);
&api->futureGetMappedKeyValueArray, lib, fdbCPath, "fdb_future_get_mappedkeyvalue_array", headerVersion >= 710);
loadClientFunction(&api->futureGetSharedState, lib, fdbCPath, "fdb_future_get_shared_state", headerVersion >= 710);
loadClientFunction(&api->futureSetCallback, lib, fdbCPath, "fdb_future_set_callback", headerVersion >= 0);
loadClientFunction(&api->futureCancel, lib, fdbCPath, "fdb_future_cancel", headerVersion >= 0);
@ -1536,7 +1536,7 @@ void MultiVersionDatabase::DatabaseState::protocolVersionChanged(ProtocolVersion
.detail("OldProtocolVersion", dbProtocolVersion);
// When the protocol version changes, clear the corresponding entry in the shared state map
// so it can be re-initialized. Only do so if there was a valid previous protocol version.
if (dbProtocolVersion.present()) {
if (dbProtocolVersion.present() && MultiVersionApi::apiVersionAtLeast(710)) {
MultiVersionApi::api->clearClusterSharedStateMapEntry(clusterFilePath);
}
@ -2333,9 +2333,14 @@ ThreadFuture<Void> MultiVersionApi::updateClusterSharedStateMap(std::string clus
void MultiVersionApi::clearClusterSharedStateMapEntry(std::string clusterFilePath) {
MutexHolder holder(lock);
auto ssPtr = clusterSharedStateMap[clusterFilePath].get();
auto mapEntry = clusterSharedStateMap.find(clusterFilePath);
if (mapEntry == clusterSharedStateMap.end()) {
TraceEvent(SevError, "ClusterSharedStateMapEntryNotFound").detail("ClusterFilePath", clusterFilePath);
return;
}
auto ssPtr = mapEntry->second.get();
ssPtr->delRef(ssPtr);
clusterSharedStateMap.erase(clusterFilePath);
clusterSharedStateMap.erase(mapEntry);
}
std::vector<std::string> parseOptionValues(std::string valueStr) {

View File

@ -830,6 +830,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( REDWOOD_METRICS_INTERVAL, 5.0 );
init( REDWOOD_HISTOGRAM_INTERVAL, 30.0 );
init( REDWOOD_EVICT_UPDATED_PAGES, true ); if( randomize && BUGGIFY ) { REDWOOD_EVICT_UPDATED_PAGES = false; }
init( REDWOOD_DECODECACHE_REUSE_MIN_HEIGHT, 2 ); if( randomize && BUGGIFY ) { REDWOOD_DECODECACHE_REUSE_MIN_HEIGHT = deterministicRandom()->randomInt(1, 7); }
// Server request latency measurement
init( LATENCY_SAMPLE_SIZE, 100000 );

View File

@ -794,6 +794,7 @@ public:
double REDWOOD_METRICS_INTERVAL;
double REDWOOD_HISTOGRAM_INTERVAL;
bool REDWOOD_EVICT_UPDATED_PAGES; // Whether to prioritize eviction of updated pages from cache.
int REDWOOD_DECODECACHE_REUSE_MIN_HEIGHT; // Minimum height for which to keep and reuse page decode caches
// Server request latency measurement
int LATENCY_SAMPLE_SIZE;

View File

@ -1177,15 +1177,16 @@ public:
struct Cursor {
Cursor() : cache(nullptr), nodeIndex(-1) {}
Cursor(DecodeCache* cache, DeltaTree2* tree) : tree(tree), cache(cache), nodeIndex(-1) {}
Cursor(Reference<DecodeCache> cache, DeltaTree2* tree) : tree(tree), cache(cache), nodeIndex(-1) {}
Cursor(DecodeCache* cache, DeltaTree2* tree, int nodeIndex) : tree(tree), cache(cache), nodeIndex(nodeIndex) {}
Cursor(Reference<DecodeCache> cache, DeltaTree2* tree, int nodeIndex)
: tree(tree), cache(cache), nodeIndex(nodeIndex) {}
// Copy constructor does not copy item because normally a copied cursor will be immediately moved.
Cursor(const Cursor& c) : tree(c.tree), cache(c.cache), nodeIndex(c.nodeIndex) {}
~Cursor() {
if (cache != nullptr) {
if (cache.isValid()) {
cache->updateUsedMemory();
}
}
@ -1212,7 +1213,7 @@ public:
}
DeltaTree2* tree;
DecodeCache* cache;
Reference<DecodeCache> cache;
int nodeIndex;
mutable Optional<T> item;
@ -1274,6 +1275,7 @@ public:
return item.get();
}
// Switch the cursor to point to a new DeltaTree
void switchTree(DeltaTree2* newTree) {
tree = newTree;
// Reset item because it may point into tree memory
@ -1709,7 +1711,13 @@ public:
} else {
nodeBytesUsed = 0;
}
ASSERT(size() <= spaceAvailable);
nodeBytesFree = spaceAvailable - size();
// Zero unused available space
memset((uint8_t*)this + size(), 0, nodeBytesFree);
return size();
}
@ -1782,8 +1790,15 @@ private:
node.setLeftChildOffset(largeNodes, leftChildOffset);
node.setRightChildOffset(largeNodes, rightChildOffset);
deltatree_printf("%p: Serialized %s as %s\n", this, item.toString().c_str(), node.toString(this).c_str());
int written = wptr - (uint8_t*)&node;
deltatree_printf("Built subtree tree=%p subtreeRoot=%p written=%d end=%p serialized subtreeRoot %s as %s \n",
this,
&node,
written,
(uint8_t*)&node + written,
item.toString().c_str(),
node.toString(this).c_str());
return wptr - (uint8_t*)&node;
return written;
}
};

View File

@ -20,22 +20,24 @@
#ifndef FDBSERVER_IPAGER_H
#define FDBSERVER_IPAGER_H
#include "flow/Error.h"
#include "flow/FastAlloc.h"
#include "flow/ProtocolVersion.h"
#include <cstddef>
#include <stdint.h>
#pragma once
#include "fdbserver/IKeyValueStore.h"
#include "flow/flow.h"
#include "fdbclient/FDBTypes.h"
#define XXH_INLINE_ALL
#include "flow/xxhash.h"
#ifndef VALGRIND
#define VALGRIND_MAKE_MEM_UNDEFINED(x, y)
#define VALGRIND_MAKE_MEM_DEFINED(x, y)
#endif
typedef uint32_t LogicalPageID;
typedef uint32_t PhysicalPageID;
#define invalidLogicalPageID std::numeric_limits<LogicalPageID>::max()
#define invalidPhysicalPageID std::numeric_limits<PhysicalPageID>::max()
typedef uint32_t QueueID;
#define invalidQueueID std::numeric_limits<QueueID>::max()
@ -76,90 +78,509 @@ static const std::vector<std::pair<PagerEvents, PagerEventReasons>> L0PossibleEv
{ PagerEvents::PageWrite, PagerEventReasons::MetaData },
};
// Represents a block of memory in a 4096-byte aligned location held by an Arena.
enum EncodingType : uint8_t {
XXHash64 = 0,
// For testing purposes
XOREncryption = 1
};
enum PageType : uint8_t {
HeaderPage = 0,
BackupHeaderPage = 1,
BTreeNode = 2,
BTreeSuperNode = 3,
QueuePageStandalone = 4,
QueuePageInExtent = 5
};
// Encryption key ID
typedef uint64_t KeyID;
// EncryptionKeyRef is somewhat multi-variant, it will contain members representing the union
// of all fields relevant to any implemented encryption scheme. They are generally of
// the form
// Page Fields - fields which come from or are stored in the Page
// Secret Fields - fields which are only known by the Key Provider
// but it is up to each encoding and provider which fields are which and which ones are used
struct EncryptionKeyRef {
EncryptionKeyRef(){};
EncryptionKeyRef(Arena& arena, const EncryptionKeyRef& toCopy) : secret(arena, toCopy.secret), id(toCopy.id) {}
int expectedSize() const { return secret.size(); }
StringRef secret;
Optional<KeyID> id;
};
typedef Standalone<EncryptionKeyRef> EncryptionKey;
// Interface used by pager to get encryption keys by ID when reading pages from disk
// and by the BTree to get encryption keys to use for new pages
class IEncryptionKeyProvider {
public:
virtual ~IEncryptionKeyProvider() {}
// Get an EncryptionKey with Secret Fields populated based on the given Page Fields.
// It is up to the implementation which fields those are.
// The output Page Fields must match the input Page Fields.
virtual Future<EncryptionKey> getSecrets(const EncryptionKeyRef& key) = 0;
// Get encryption key that should be used for a given user Key-Value range
virtual Future<EncryptionKey> getByRange(const KeyRef& begin, const KeyRef& end) = 0;
};
// This is a hacky way to attach an additional object of an arbitrary type at runtime to another object.
// It stores an arbitrary void pointer and a void pointer function to call when the ArbitraryObject
// is destroyed.
// It has helper operator= methods for storing heap-allocated T's or Reference<T>'s in into it via
// x = thing;
// Examples:
// ArbitraryObject x;
// x.set(new Widget()); // x owns the new object
// x.set(Reference<SomeClass>(new SomeClass()); // x holds a reference now too
// x.setReference(new SomeReferenceCountedType()); //
struct ArbitraryObject {
ArbitraryObject() : ptr(nullptr), onDestruct(nullptr) {}
ArbitraryObject(const ArbitraryObject&) = delete;
~ArbitraryObject() { destructOnly(); }
bool valid() const { return ptr != nullptr; }
template <typename T>
void operator=(T* p) {
destructOnly();
ptr = p;
onDestruct = [](void* ptr) { delete (T*)ptr; };
}
template <typename T>
void operator=(Reference<T>& r) {
destructOnly();
ptr = r.getPtr();
r.getPtr()->addref();
onDestruct = [](void* ptr) { ((T*)ptr)->delref(); };
}
template <typename T>
void operator=(Reference<T>&& r) {
destructOnly();
ptr = r.extractPtr();
onDestruct = [](void* ptr) { ((T*)ptr)->delref(); };
}
template <typename T>
T* getPtr() {
return (T*)ptr;
}
template <typename T>
Reference<T> getReference() {
return Reference<T>::addRef((T*)ptr);
}
void reset() {
destructOnly();
ptr = nullptr;
onDestruct = nullptr;
}
// ptr can be set to any arbitrary thing. If it is not null at destruct time then
// onDestruct(ptr) will be called if onDestruct is not null.
void* ptr = nullptr;
void (*onDestruct)(void*) = nullptr;
private:
// Call onDestruct(ptr) if needed but don't reset any state
void destructOnly() {
if (ptr != nullptr && onDestruct != nullptr) {
onDestruct(ptr);
}
}
};
// ArenaPage represents a data page meant to be stored on disk, located in a block of
// 4k-aligned memory held by an Arena
//
// Page Format:
// PageHeader - describes main header version, encoding type, and offsets of subheaders and payload.
// MainHeader - structure based on header version. It is responsible for protecting all bytes
// of PageHeader, MainHeader, and EncodingHeader with some sort of checksum.
// EncodingHeader - structure based on encoding type. It is responsible for protecting and
// possibly encrypting all payload bytes.
// Payload - User accessible bytes, protected and possibly encrypted based on the encoding
//
// preWrite() must be called before writing a page to disk to update checksums and encrypt as needed
// After reading a page from disk,
// postReadHeader() must be called to verify the verison, main, and encoding headers
// postReadPayload() must be called, after potentially setting encryption secret, to verify and possibly
// decrypt the payload
class ArenaPage : public ReferenceCounted<ArenaPage>, public FastAllocated<ArenaPage> {
public:
// The page's logical size includes an opaque checksum, use size() to get usable size
ArenaPage(int logicalSize, int bufferSize) : logicalSize(logicalSize), bufferSize(bufferSize), userData(nullptr) {
// This is the header version that new page init() calls will use.
// It is not necessarily the latest header version, as read/modify support for
// a new header version may be added prior to using that version as the default
// for new pages as part of downgrade support.
static constexpr uint8_t HEADER_WRITE_VERSION = 1;
ArenaPage(int logicalSize, int bufferSize) : logicalSize(logicalSize), bufferSize(bufferSize), pPayload(nullptr) {
if (bufferSize > 0) {
buffer = (uint8_t*)arena.allocate4kAlignedBuffer(bufferSize);
// Mark any unused page portion defined
VALGRIND_MAKE_MEM_DEFINED(buffer + logicalSize, bufferSize - logicalSize);
// Zero unused region
memset(buffer + logicalSize, 0, bufferSize - logicalSize);
} else {
buffer = nullptr;
}
};
~ArenaPage() {
if (userData != nullptr && userDataDestructor != nullptr) {
userDataDestructor(userData);
~ArenaPage() {}
// Before using these, either init() or postReadHeader and postReadPayload() must be called
const uint8_t* data() const { return pPayload; }
uint8_t* mutateData() const { return (uint8_t*)pPayload; }
int dataSize() const { return payloadSize; }
StringRef dataAsStringRef() const { return StringRef((uint8_t*)pPayload, payloadSize); }
const uint8_t* rawData() const { return buffer; }
uint8_t* rawData() { return buffer; }
int rawSize() const { return bufferSize; }
#pragma pack(push, 1)
// The next few structs describe the byte-packed physical structure. The fields of Page
// cannot change, but new header versions and encoding types can be added and existing
// header versions and encoding type headers could change size as offset information
// is stored to enable efficient jumping to the encoding header or payload.
// Page members are only initialized in init()
struct PageHeader {
uint8_t headerVersion;
EncodingType encodingType;
// Encoding header comes after main header
uint8_t encodingHeaderOffset;
// Payload comes after encoding header
uint8_t payloadOffset;
// Get main header pointer, casting to its type
template <typename T>
T* getMainHeader() const {
return (T*)(this + 1);
}
// Get encoding header pointer, casting to its type
template <typename T>
T* getEncodingHeader() const {
return (T*)((uint8_t*)this + encodingHeaderOffset);
}
// Get payload pointer
uint8_t* getPayload() const { return (uint8_t*)this + payloadOffset; }
};
// Redwood header version 1
// Protects all headers with a 64-bit XXHash checksum
// Most other fields are forensic in nature and are not required to be set for correct
// behavior but they can faciliate forensic investigation of data on disk. Some of them
// could be used for sanity checks at runtime.
struct RedwoodHeaderV1 {
PageType pageType;
// The meaning of pageSubType is based on pageType
// For Queue pages, pageSubType is the QueueID
// For BTree nodes, pageSubType is Height (also stored in BTreeNode)
uint8_t pageSubType;
// Format identifier, normally specific to the page Type and SubType
uint8_t pageFormat;
XXH64_hash_t checksum;
// Physical page ID of first block on disk of the ArenaPage
PhysicalPageID firstPhysicalPageID;
// The first logical page ID the ArenaPage was referenced by when last written
LogicalPageID lastKnownLogicalPageID;
// The first logical page ID of the parent of this ArenaPage when last written
LogicalPageID lastKnownParentLogicalPageID;
// Time and write version as of the last update to this page.
// Note that for relocated pages, writeVersion should not be updated.
double writeTime;
Version writeVersion;
// Update checksum
void updateChecksum(uint8_t* headerBytes, int len) {
// Checksum is within the checksum input so clear it first
checksum = 0;
checksum = XXH3_64bits(headerBytes, len);
}
// Verify checksum
void verifyChecksum(uint8_t* headerBytes, int len) {
// Checksum is within the checksum input so save it and restore it afterwards
XXH64_hash_t saved = checksum;
checksum = 0;
XXH64_hash_t calculated = XXH3_64bits(headerBytes, len);
checksum = saved;
if (saved != calculated) {
throw page_header_checksum_failed();
}
}
};
// An encoding that validates the payload with an XXHash checksum
struct XXHashEncodingHeader {
XXH64_hash_t checksum;
void encode(uint8_t* payload, int len, PhysicalPageID seed) {
checksum = XXH3_64bits_withSeed(payload, len, seed);
}
void decode(uint8_t* payload, int len, PhysicalPageID seed) {
if (checksum != XXH3_64bits_withSeed(payload, len, seed)) {
throw page_decoding_failed();
}
}
};
// A dummy "encrypting" encoding which uses XOR with a 1 byte secret key on
// the payload to obfuscate it and protects the payload with an XXHash checksum.
struct XOREncryptionEncodingHeader {
// Checksum is on unencrypted payload
XXH64_hash_t checksum;
uint8_t keyID;
void encode(uint8_t secret, uint8_t* payload, int len, PhysicalPageID seed) {
checksum = XXH3_64bits_withSeed(payload, len, seed);
for (int i = 0; i < len; ++i) {
payload[i] ^= secret;
}
}
void decode(uint8_t secret, uint8_t* payload, int len, PhysicalPageID seed) {
for (int i = 0; i < len; ++i) {
payload[i] ^= secret;
}
if (checksum != XXH3_64bits_withSeed(payload, len, seed)) {
throw page_decoding_failed();
}
}
};
#pragma pack(pop)
// Get the size of the encoding header based on type
// Note that this is only to be used in operations involving new pages to calculate the payload offset. For
// existing pages, the payload offset is stored in the page.
static int encodingHeaderSize(EncodingType t) {
if (t == EncodingType::XXHash64) {
return sizeof(XXHashEncodingHeader);
} else if (t == EncodingType::XOREncryption) {
return sizeof(XOREncryptionEncodingHeader);
} else {
throw page_encoding_not_supported();
}
}
uint8_t const* begin() const { return (uint8_t*)buffer; }
// Get the usable size for a new page of pageSize using HEADER_WRITE_VERSION with encoding type t
static int getUsableSize(int pageSize, EncodingType t) {
return pageSize - sizeof(PageHeader) - sizeof(RedwoodHeaderV1) - encodingHeaderSize(t);
}
uint8_t* mutate() { return (uint8_t*)buffer; }
// Initialize the header for a new page so that the payload can be written to
// Pre: Buffer is allocated and logical size is set
// Post: Page header is initialized and space is reserved for subheaders for
// HEADER_WRITE_VERSION main header and the given encoding type.
// Payload can be written to with mutateData() and dataSize()
void init(EncodingType t, PageType pageType, uint8_t pageSubType, uint8_t pageFormat = 0) {
// Carefully cast away constness to modify page header
PageHeader* p = const_cast<PageHeader*>(page);
p->headerVersion = HEADER_WRITE_VERSION;
p->encodingHeaderOffset = sizeof(PageHeader) + sizeof(RedwoodHeaderV1);
p->encodingType = t;
p->payloadOffset = page->encodingHeaderOffset + encodingHeaderSize(t);
typedef XXH64_hash_t Checksum;
pPayload = page->getPayload();
payloadSize = logicalSize - (pPayload - buffer);
// Usable size, without checksum
int size() const { return logicalSize - sizeof(Checksum); }
RedwoodHeaderV1* h = page->getMainHeader<RedwoodHeaderV1>();
h->pageType = pageType;
h->pageSubType = pageSubType;
h->pageFormat = pageFormat;
Standalone<StringRef> asStringRef() const { return Standalone<StringRef>(StringRef(begin(), size()), arena); }
// Write dummy values for these in new pages. They should be updated when possible before calling preWrite()
// when modifying existing pages
h->lastKnownLogicalPageID = invalidLogicalPageID;
h->lastKnownParentLogicalPageID = invalidLogicalPageID;
h->writeVersion = invalidVersion;
}
// Get an ArenaPage which is a copy of this page, in its own Arena
Reference<ArenaPage> cloneContents() const {
// Get the logical page buffer as a StringRef
Standalone<StringRef> asStringRef() const { return Standalone<StringRef>(StringRef(buffer, logicalSize)); }
// Get a new ArenaPage that contains a copy of this page's data.
// extra is not copied to the returned page
Reference<ArenaPage> clone() const {
ArenaPage* p = new ArenaPage(logicalSize, bufferSize);
memcpy(p->buffer, buffer, logicalSize);
// Non-verifying header parse just to initialize members
p->postReadHeader(invalidPhysicalPageID, false);
p->encryptionKey = encryptionKey;
return Reference<ArenaPage>(p);
}
// Get an ArenaPage which depends on this page's Arena and references some of its memory
Reference<ArenaPage> subPage(int offset, int len) const {
Reference<ArenaPage> getSubPage(int offset, int len) const {
ASSERT(offset + len <= logicalSize);
ArenaPage* p = new ArenaPage(len, 0);
p->buffer = buffer + offset;
p->arena.dependsOn(arena);
// Non-verifying header parse just to initialize component pointers
p->postReadHeader(invalidPhysicalPageID, false);
p->encryptionKey = encryptionKey;
return Reference<ArenaPage>(p);
}
// Given a vector of pages with the same ->size(), create a new ArenaPage with a ->size() that is
// equivalent to all of the input pages and has all of their contents copied into it.
static Reference<ArenaPage> concatPages(const std::vector<Reference<const ArenaPage>>& pages) {
int usableSize = pages.front()->size();
int totalUsableSize = pages.size() * usableSize;
int totalBufferSize = pages.front()->bufferSize * pages.size();
ArenaPage* superpage = new ArenaPage(totalUsableSize + sizeof(Checksum), totalBufferSize);
uint8_t* wptr = superpage->mutate();
for (auto& p : pages) {
ASSERT(p->size() == usableSize);
memcpy(wptr, p->begin(), usableSize);
wptr += usableSize;
// The next two functions set mostly forensic info that may help in an investigation to identify data on disk. The
// exception is pageID which must be set to the physical page ID on disk where the page is written or post-read
// verification will fail.
void setWriteInfo(PhysicalPageID pageID, Version writeVersion) {
if (page->headerVersion == 1) {
RedwoodHeaderV1* h = page->getMainHeader<RedwoodHeaderV1>();
h->firstPhysicalPageID = pageID;
h->writeVersion = writeVersion;
h->writeTime = now();
}
}
return Reference<ArenaPage>(superpage);
// These should be updated before writing a BTree page. Note that the logical ID that refers to a page can change
// after the page is written, if its parent is updated to point directly to its physical page ID. Therefore, the
// last known logical page ID should always be updated before writing an updated version of a BTree page.
void setLogicalPageInfo(LogicalPageID lastKnownLogicalPageID, LogicalPageID lastKnownParentLogicalPageID) {
if (page->headerVersion == 1) {
RedwoodHeaderV1* h = page->getMainHeader<RedwoodHeaderV1>();
h->lastKnownLogicalPageID = lastKnownLogicalPageID;
h->lastKnownParentLogicalPageID = lastKnownParentLogicalPageID;
}
}
Checksum& getChecksum() { return *(Checksum*)(buffer + size()); }
// Must be called before writing to disk to update headers and encrypt page
// Pre: Encoding-specific header fields are set if needed
// Secret is set if needed
// Post: Main and Encoding subheaders are updated
// Payload is possibly encrypted
void preWrite(PhysicalPageID pageID) const {
// Explicitly check payload definedness to make the source of valgrind errors more clear.
// Without this check, calculating a checksum on a payload with undefined bytes does not
// cause a valgrind error but the resulting checksum is undefined which causes errors later.
ASSERT(VALGRIND_CHECK_MEM_IS_DEFINED(pPayload, payloadSize) == 0);
Checksum calculateChecksum(LogicalPageID pageID) { return XXH3_64bits_withSeed(buffer, size(), pageID); }
if (page->encodingType == EncodingType::XXHash64) {
page->getEncodingHeader<XXHashEncodingHeader>()->encode(pPayload, payloadSize, pageID);
} else if (page->encodingType == EncodingType::XOREncryption) {
ASSERT(encryptionKey.secret.size() == 1);
XOREncryptionEncodingHeader* xh = page->getEncodingHeader<XOREncryptionEncodingHeader>();
xh->keyID = encryptionKey.id.orDefault(0);
xh->encode(encryptionKey.secret[0], pPayload, payloadSize, pageID);
} else {
throw page_encoding_not_supported();
}
void updateChecksum(LogicalPageID pageID) { getChecksum() = calculateChecksum(pageID); }
if (page->headerVersion == 1) {
page->getMainHeader<RedwoodHeaderV1>()->updateChecksum(buffer, pPayload - buffer);
} else {
throw page_header_version_not_supported();
}
}
bool verifyChecksum(LogicalPageID pageID) { return getChecksum() == calculateChecksum(pageID); }
// Must be called after reading from disk to verify all non-payload bytes
// Pre: Bytes from storage medium copied into raw buffer space
// Post: Page headers outside of payload are verified (unless verify is false)
// encryptionKey is updated with information from encoding header if needed
// Payload is accessible via data(), dataSize(), etc.
//
// Exceptions are thrown for unknown header types or pages which fail verification
void postReadHeader(PhysicalPageID pageID, bool verify = true) {
pPayload = page->getPayload();
payloadSize = logicalSize - (pPayload - buffer);
// Populate encryption key with relevant fields from page
if (page->encodingType == EncodingType::XOREncryption) {
encryptionKey.id = page->getEncodingHeader<XOREncryptionEncodingHeader>()->keyID;
}
if (page->headerVersion == 1) {
if (verify) {
RedwoodHeaderV1* h = page->getMainHeader<RedwoodHeaderV1>();
h->verifyChecksum(buffer, pPayload - buffer);
if (pageID != h->firstPhysicalPageID) {
throw page_header_wrong_page_id();
}
}
} else {
throw page_header_version_not_supported();
}
}
// Pre: postReadHeader has been called, encoding-specific parameters (such as the encryption secret) have been set
// Post: Payload has been verified and decrypted if necessary
void postReadPayload(PhysicalPageID pageID) {
if (page->encodingType == EncodingType::XXHash64) {
page->getEncodingHeader<XXHashEncodingHeader>()->decode(pPayload, payloadSize, pageID);
} else if (page->encodingType == EncodingType::XOREncryption) {
ASSERT(encryptionKey.secret.size() == 1);
page->getEncodingHeader<XOREncryptionEncodingHeader>()->decode(
encryptionKey.secret[0], pPayload, payloadSize, pageID);
} else {
throw page_encoding_not_supported();
}
}
const Arena& getArena() const { return arena; }
static bool isEncodingTypeEncrypted(EncodingType t) { return t == EncodingType::XOREncryption; }
// Returns true if the page's encoding type employs encryption
bool isEncrypted() const { return isEncodingTypeEncrypted(getEncodingType()); }
private:
Arena arena;
// The logical size of the page, which can be smaller than bufferSize, which is only of
// practical purpose in simulation to use arbitrarily small page sizes to test edge cases
// with shorter execution time
int logicalSize;
// The 4k-aligned physical size of allocated memory for the page which also represents the
// block size to be written to disk
int bufferSize;
// buffer is a pointer to the page's memory
// For convenience, it is unioned with a Page pointer which defines the page structure
union {
uint8_t* buffer;
const PageHeader* page;
};
// Pointer and length of page space available to the user
// These are accessed very often so they are stored directly
uint8_t* pPayload;
int payloadSize;
public:
mutable void* userData;
mutable void (*userDataDestructor)(void*);
EncodingType getEncodingType() const { return page->encodingType; }
PhysicalPageID getPhysicalPageID() const {
if (page->headerVersion == 1) {
return page->getMainHeader<RedwoodHeaderV1>()->firstPhysicalPageID;
} else {
throw page_header_version_not_supported();
}
}
// Used by encodings that do encryption
EncryptionKey encryptionKey;
mutable ArbitraryObject extra;
};
class IPagerSnapshot {
@ -184,18 +605,21 @@ public:
virtual void addref() = 0;
virtual void delref() = 0;
ArbitraryObject extra;
};
// This API is probably too customized to the behavior of DWALPager and probably needs some changes to be more generic.
class IPager2 : public IClosable {
public:
virtual std::string getName() const = 0;
// Returns an ArenaPage that can be passed to writePage. The data in the returned ArenaPage might not be zeroed.
virtual Reference<ArenaPage> newPageBuffer(size_t size = 1) = 0;
virtual Reference<ArenaPage> newPageBuffer(size_t blocks = 1) = 0;
// Returns the usable size of pages returned by the pager (i.e. the size of the page that isn't pager overhead).
// For a given pager instance, separate calls to this function must return the same value.
// Only valid to call after recovery is complete.
virtual int getUsablePageSize() const = 0;
virtual int getPhysicalPageSize() const = 0;
virtual int getLogicalPageSize() const = 0;
virtual int getPagesPerExtent() const = 0;
@ -251,7 +675,7 @@ public:
bool noHit) = 0;
virtual Future<Reference<ArenaPage>> readMultiPage(PagerEventReasons reason,
unsigned int level,
Standalone<VectorRef<PhysicalPageID>> pageIDs,
VectorRef<PhysicalPageID> pageIDs,
int priority,
bool cacheable,
bool noHit) = 0;
@ -271,16 +695,13 @@ public:
// The snapshot shall be usable until setOldVersion() is called with a version > v.
virtual Reference<IPagerSnapshot> getReadSnapshot(Version v) = 0;
// Atomically make durable all pending page writes, page frees, and update the metadata string,
// setting the committed version to v
// v must be >= the highest versioned page write.
virtual Future<Void> commit(Version v) = 0;
// Atomically make durable all pending page writes, page frees, and update the user commit
// record at version v
// v must be higher than the highest committed version
virtual Future<Void> commit(Version v, Value commitRecord) = 0;
// Get the latest meta key set or committed
virtual Key getMetaKey() const = 0;
// Set the metakey which will be stored in the next commit
virtual void setMetaKey(KeyRef metaKey) = 0;
// Get the latest committed user commit record
virtual Value getCommitRecord() const = 0;
virtual StorageBytes getStorageBytes() const = 0;
@ -318,4 +739,52 @@ protected:
~IPager2() {} // Destruction should be done using close()/dispose() from the IClosable interface
};
// The null key provider is useful to simplify page decoding.
// It throws an error for any key info requested.
class NullKeyProvider : public IEncryptionKeyProvider {
public:
virtual ~NullKeyProvider() {}
Future<EncryptionKey> getSecrets(const EncryptionKeyRef& key) override { throw encryption_key_not_found(); }
Future<EncryptionKey> getByRange(const KeyRef& begin, const KeyRef& end) override {
throw encryption_key_not_found();
}
};
// Key provider for dummy XOR encryption scheme
class XOREncryptionKeyProvider : public IEncryptionKeyProvider {
public:
XOREncryptionKeyProvider(std::string filename) {
ASSERT(g_network->isSimulated());
// Choose a deterministic random filename (without path) byte for secret generation
// Remove any leading directory names
size_t lastSlash = filename.find_last_of("\\/");
if (lastSlash != filename.npos) {
filename.erase(0, lastSlash);
}
xorWith = filename.empty() ? 0x5e
: (uint8_t)filename[XXH3_64bits(filename.data(), filename.size()) % filename.size()];
}
virtual ~XOREncryptionKeyProvider() {}
virtual Future<EncryptionKey> getSecrets(const EncryptionKeyRef& key) override {
if (!key.id.present()) {
throw encryption_key_not_found();
}
EncryptionKey s = key;
uint8_t secret = ~(uint8_t)key.id.get() ^ xorWith;
s.secret = StringRef(s.arena(), &secret, 1);
return s;
}
virtual Future<EncryptionKey> getByRange(const KeyRef& begin, const KeyRef& end) override {
EncryptionKeyRef k;
k.id = end.empty() ? 0 : *(end.end() - 1);
return getSecrets(k);
}
uint8_t xorWith;
};
#endif

File diff suppressed because it is too large Load Diff

View File

@ -227,7 +227,8 @@ struct ConfigureDatabaseWorkload : TestWorkload {
double testDuration;
int additionalDBs;
bool allowDescriptorChange;
bool allowTestStorageMigration;
bool allowTestStorageMigration; // allow change storage migration and perpetual wiggle conf
bool storageMigrationCompatibleConf; // only allow generating configuration suitable for storage migration test
bool waitStoreTypeCheck;
bool downgradeTest1; // if this is true, don't pick up downgrade incompatible config
std::vector<Future<Void>> clients;
@ -239,6 +240,7 @@ struct ConfigureDatabaseWorkload : TestWorkload {
getOption(options, LiteralStringRef("allowDescriptorChange"), SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT);
allowTestStorageMigration =
getOption(options, "allowTestStorageMigration"_sr, false) && g_simulator.allowStorageMigrationTypeChange;
storageMigrationCompatibleConf = getOption(options, "storageMigrationCompatibleConf"_sr, false);
waitStoreTypeCheck = getOption(options, "waitStoreTypeCheck"_sr, false);
downgradeTest1 = getOption(options, "downgradeTest1"_sr, false);
g_simulator.usableRegions = 1;
@ -349,7 +351,11 @@ struct ConfigureDatabaseWorkload : TestWorkload {
}
state int randomChoice;
if (self->allowTestStorageMigration) {
randomChoice = deterministicRandom()->randomInt(4, 9);
randomChoice = (deterministicRandom()->random01() < 0.375) ? deterministicRandom()->randomInt(0, 3)
: deterministicRandom()->randomInt(4, 9);
} else if (self->storageMigrationCompatibleConf) {
randomChoice = (deterministicRandom()->random01() < 3.0 / 7) ? deterministicRandom()->randomInt(0, 3)
: deterministicRandom()->randomInt(5, 9);
} else {
randomChoice = deterministicRandom()->randomInt(0, 8);
}

View File

@ -690,7 +690,7 @@ TEST_CASE("/flow/Tracing/AddLinks") {
return Void();
};
uint64_t swapUint16BE(uint8_t* index) {
uint16_t swapUint16BE(uint8_t* index) {
uint16_t value;
memcpy(&value, index, sizeof(value));
return fromBigEndian16(value);
@ -718,6 +718,26 @@ std::string readMPString(uint8_t* index, int len) {
return reinterpret_cast<char*>(data);
}
std::string readMPString(uint8_t* index) {
auto len = 0;
switch (*index) {
case 0xda:
index++; // read the size in the next 2 bytes
len = swapUint16BE(index);
index += 2; // move index past the size bytes
break;
default:
// We & out the bits here that contain the length the initial 3 higher order bits are
// to signify this is a string of len <= 31 chars.
len = static_cast<uint8_t>(*index & 0b00011111);
index++;
}
uint8_t data[len + 1];
std::copy(index, index + len, data);
data[len] = '\0';
return reinterpret_cast<char*>(data);
}
// Windows doesn't like lack of header and declaration of constructor for FastUDPTracer
#ifndef WIN32
TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") {
@ -754,9 +774,7 @@ TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") {
ASSERT(data[46] == 0xcf);
ASSERT(swapUint64BE(&data[47]) == 1);
// Read and verify span name
ASSERT(data[55] == (0b10100000 | strlen("encoded_span")));
ASSERT(strncmp(readMPString(&data[56], strlen("encoded_span")).c_str(), "encoded_span", strlen("encoded_span")) ==
0);
ASSERT(readMPString(&data[55]) == "encoded_span");
// Verify begin/end is encoded, we don't care about the values
ASSERT(data[68] == 0xcb);
ASSERT(data[77] == 0xcb);
@ -795,10 +813,7 @@ TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") {
ASSERT(data[0] == 0b10011110); // 14 element array.
// We don't care about the next 54 bytes as there is no parent and a randomly assigned Trace and SpanID
// Read and verify span name
ASSERT(data[55] == (0b10100000 | strlen("encoded_span_3")));
ASSERT(strncmp(readMPString(&data[56], strlen("encoded_span_3")).c_str(),
"encoded_span_3",
strlen("encoded_span_3")) == 0);
ASSERT(readMPString(&data[55]) == "encoded_span_3");
// Verify begin/end is encoded, we don't care about the values
ASSERT(data[70] == 0xcb);
ASSERT(data[79] == 0xcb);
@ -818,43 +833,32 @@ TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") {
ASSERT(swapUint64BE(&data[112]) == 400);
// Events
ASSERT(data[120] == 0b10010001); // empty
ASSERT(data[121] == (0b10100000 | strlen("event1")));
ASSERT(strncmp(readMPString(&data[122], strlen("event1")).c_str(), "event1", strlen("event1")) == 0);
ASSERT(readMPString(&data[121]) == "event1");
ASSERT(data[128] == 0xcb);
ASSERT(swapDoubleBE(&data[129]) == 100.101);
// Events Attributes
ASSERT(data[137] == 0b10000001); // single k/v pair
ASSERT(data[138] == 0b10100011); // length of key string "foo" == 3
ASSERT(strncmp(readMPString(&data[139], strlen("foo")).c_str(), "foo", strlen("foo")) == 0);
ASSERT(data[142] == 0b10100011); // length of key string "bar" == 3
ASSERT(strncmp(readMPString(&data[143], strlen("bar")).c_str(), "bar", strlen("bar")) == 0);
ASSERT(readMPString(&data[138]) == "foo");
ASSERT(readMPString(&data[142]) == "bar");
// Attributes
ASSERT(data[146] == 0b10000010); // two k/v pair
// Reconstruct map from MessagePack wire format data and verify.
std::unordered_map<std::string, std::string> attributes;
auto index = 147;
// We & out the bits here that contain the length the initial 4 higher order bits are
// to signify this is a string of len <= 31 chars.
auto firstKeyLength = static_cast<uint8_t>(data[index] & 0b00011111);
index++;
auto firstKey = readMPString(&data[index], firstKeyLength);
index += firstKeyLength;
auto firstValueLength = static_cast<uint8_t>(data[index] & 0b00011111);
index++;
auto firstValue = readMPString(&data[index], firstValueLength);
index += firstValueLength;
auto firstKey = readMPString(&data[index]);
index += firstKey.length() + 1; // +1 for control byte
auto firstValue = readMPString(&data[index]);
index += firstValue.length() + 1; // +1 for control byte
attributes[firstKey] = firstValue;
auto secondKeyLength = static_cast<uint8_t>(data[index] & 0b00011111);
index++;
auto secondKey = readMPString(&data[index], secondKeyLength);
index += secondKeyLength;
auto secondValueLength = static_cast<uint8_t>(data[index] & 0b00011111);
index++;
auto secondValue = readMPString(&data[index], secondValueLength);
auto secondKey = readMPString(&data[index]);
index += secondKey.length() + 1; // +1 for control byte
auto secondValue = readMPString(&data[index]);
attributes[secondKey] = secondValue;
// We don't know what the value for address will be, so just verify it is in the map.
ASSERT(attributes.find("address") != attributes.end());
ASSERT(strncmp(attributes["operation"].c_str(), "grv", strlen("grv")) == 0);
ASSERT(attributes["operation"] == "grv");
request.reset();
@ -876,9 +880,7 @@ TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") {
// We don't care about the next 54 bytes as there is no parent and a randomly assigned Trace and SpanID
// Read and verify span name
ASSERT(data[55] == 0xda);
auto locationLength = swapUint16BE(&data[56]);
ASSERT(locationLength == strlen(longString));
ASSERT(strncmp(readMPString(&data[58], locationLength).c_str(), longString, strlen(longString)) == 0);
ASSERT(readMPString(&data[55]) == longString);
return Void();
};
#endif

View File

@ -73,3 +73,11 @@ T waitNext(const FutureStream<T>&);
#ifdef _MSC_VER
#pragma warning(disable : 4355) // 'this' : used in base member initializer list
#endif
// Currently, #ifdef can't be used inside actors, so define no-op versions of these valgrind
// functions if valgrind is not defined
#ifndef VALGRIND
#define VALGRIND_MAKE_MEM_UNDEFINED(x, y)
#define VALGRIND_MAKE_MEM_DEFINED(x, y)
#define VALGRIND_CHECK_MEM_IS_DEFINED(x, y) 0
#endif

View File

@ -88,6 +88,13 @@ ERROR( blob_granule_transaction_too_old, 1064, "Read version is older than blob
ERROR( blob_manager_replaced, 1065, "This blob manager has been replaced." )
ERROR( change_feed_popped, 1066, "Tried to read a version older than what has been popped from the change feed" )
ERROR( remote_kvs_cancelled, 1067, "The remote key-value store is cancelled" )
ERROR( page_header_wrong_page_id, 1068, "Page header does not match location on disk" )
ERROR( page_header_checksum_failed, 1069, "Page header checksum failed" )
ERROR( page_header_version_not_supported, 1070, "Page header version is not supported" )
ERROR( page_encoding_not_supported, 1071, "Page encoding type is not supported or not valid" )
ERROR( page_decoding_failed, 1072, "Page content decoding failed" )
ERROR( unexpected_encoding_type, 1073, "Page content decoding failed" )
ERROR( encryption_key_not_found, 1074, "Encryption key not found" )
ERROR( broken_promise, 1100, "Broken promise" )
ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )

View File

@ -1,3 +1,5 @@
storageEngineExcludeTypes=3
logAntiQuorum = 0
testTitle=SubmitBackup

View File

@ -1,3 +1,5 @@
storageEngineExcludeTypes=3
;write 1000 Keys ending with even numbers
testTitle=SnapTestPre
clearAfterTest=false

View File

@ -1,3 +1,5 @@
storageEngineExcludeTypes=3
;write 1000 Keys ending with even numbers
testTitle=SnapTestPre
clearAfterTest=false

View File

@ -1,3 +1,5 @@
storageEngineExcludeTypes=3
;write 1000 Keys ending with even number
testTitle=SnapSimplePre
clearAfterTest=false

View File

@ -1,3 +1,5 @@
storageEngineExcludeTypes=3
[[test]]
testTitle = 'SubmitBackup'
simBackupAgents= 'BackupToFile'

View File

@ -10,6 +10,7 @@ waitForQuiescenceBegin=false
testName = 'ConfigureDatabase'
testDuration = 300.0
waitStoreTypeCheck = true
storageMigrationCompatibleConf = true
[[test.workload]]
testName = 'RandomClogging'

View File

@ -1,4 +1,4 @@
storageEngineExcludeTypes=-1,-2
storageEngineExcludeTypes=-1,-2,3
maxTLogVersion=6
disableTss=true
disableHostname=true

View File

@ -10,6 +10,7 @@ waitForQuiescenceBegin=false
testName = 'ConfigureDatabase'
testDuration = 300.0
waitStoreTypeCheck = true
storageMigrationCompatibleConf = true
[[test.workload]]
testName = 'RandomClogging'