Finished Redwood page API refactor and implemented dummy page encryption scheme with XOR, used randomly in simulation, to prove that pluggable encryption schemes fit in the design.
This commit is contained in:
parent
d227a2ad37
commit
7896c2979f
|
@ -21,6 +21,7 @@
|
|||
#ifndef FDBSERVER_IPAGER_H
|
||||
#define FDBSERVER_IPAGER_H
|
||||
#include "flow/Error.h"
|
||||
#include "flow/ProtocolVersion.h"
|
||||
#include <stdint.h>
|
||||
#pragma once
|
||||
|
||||
|
@ -101,11 +102,12 @@ enum PageType : uint8_t {
|
|||
// Pages can only be written using the latest HeaderVersion
|
||||
//
|
||||
// preWrite() must be called before writing a page to disk, which will do any checksum generation or encryption needed
|
||||
// postRead() must be called after loading a page from disk, which will do any verification or decryption needed
|
||||
// postRead1() must be called after loading a page from disk to header verification
|
||||
// postRead2() must be called after postRead1 to handle page contents verification and possible decryption
|
||||
class ArenaPage : public ReferenceCounted<ArenaPage>, public FastAllocated<ArenaPage> {
|
||||
public:
|
||||
ArenaPage(int logicalSize, int bufferSize)
|
||||
: logicalSize(logicalSize), bufferSize(bufferSize), pUsable(nullptr), userData(nullptr) {
|
||||
: logicalSize(logicalSize), bufferSize(bufferSize), pPayload(nullptr), userData(nullptr) {
|
||||
if (bufferSize > 0) {
|
||||
buffer = (uint8_t*)arena.allocate4kAlignedBuffer(bufferSize);
|
||||
|
||||
|
@ -130,9 +132,9 @@ public:
|
|||
}
|
||||
|
||||
// Before using begin() or size(), either init() or postRead() must be called
|
||||
const uint8_t* data() const { return pUsable; }
|
||||
uint8_t* mutateData() const { return (uint8_t*)pUsable; }
|
||||
int dataSize() const { return usableSize; }
|
||||
const uint8_t* data() const { return pPayload; }
|
||||
uint8_t* mutateData() const { return (uint8_t*)pPayload; }
|
||||
int dataSize() const { return payloadSize; }
|
||||
|
||||
const uint8_t* rawData() const { return buffer; }
|
||||
uint8_t* rawData() { return buffer; }
|
||||
|
@ -166,6 +168,7 @@ public:
|
|||
Version writeVersion;
|
||||
};
|
||||
|
||||
// An encoding that validates the payload with an XXHash checksum
|
||||
struct XXHashEncodingHeader {
|
||||
XXH64_hash_t checksum;
|
||||
void encode(uint8_t* payload, int len, PhysicalPageID seed) {
|
||||
|
@ -173,38 +176,40 @@ public:
|
|||
}
|
||||
void decode(uint8_t* payload, int len, PhysicalPageID seed) {
|
||||
if (checksum != XXH3_64bits_withSeed(payload, len, seed)) {
|
||||
throw checksum_failed();
|
||||
throw page_decoding_failed();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// An encoding that encrypts the payload with a 1 byte XOR key
|
||||
// and uses an XXHash checksum on the unencrypted payload.
|
||||
struct XOREncodingHeader {
|
||||
// Checksum on decrypted payload
|
||||
XXH64_hash_t checksum;
|
||||
uint8_t keyID;
|
||||
|
||||
void encode(uint8_t secret, uint8_t* payload, int len, PhysicalPageID seed) {
|
||||
uint8_t key = secret ^ keyID;
|
||||
for (int i = 0; i < len; ++i) {
|
||||
payload[i] ^= key;
|
||||
}
|
||||
checksum = XXH3_64bits_withSeed(payload, len, seed);
|
||||
for (int i = 0; i < len; ++i) {
|
||||
payload[i] ^= secret;
|
||||
}
|
||||
}
|
||||
void decode(uint8_t secret, uint8_t* payload, int len, PhysicalPageID seed) {
|
||||
if (checksum != XXH3_64bits_withSeed(payload, len, seed)) {
|
||||
throw checksum_failed();
|
||||
}
|
||||
uint8_t key = secret ^ keyID;
|
||||
for (int i = 0; i < len; ++i) {
|
||||
payload[i] ^= key;
|
||||
payload[i] ^= secret;
|
||||
}
|
||||
if (checksum != XXH3_64bits_withSeed(payload, len, seed)) {
|
||||
throw page_decoding_failed();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
struct Footer {
|
||||
XXH64_hash_t checksum;
|
||||
void update(uint8_t* payload, int len) { checksum = XXH3_64bits(payload, len); }
|
||||
void verify(uint8_t* payload, int len) {
|
||||
if (checksum != XXH3_64bits(payload, len)) {
|
||||
throw checksum_failed();
|
||||
void update(uint8_t* headerBytes, int len) { checksum = XXH3_64bits(headerBytes, len); }
|
||||
void verify(uint8_t* headerBytes, int len) {
|
||||
if (checksum != XXH3_64bits(headerBytes, len)) {
|
||||
throw page_header_checksum_failed();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -214,6 +219,7 @@ public:
|
|||
// Syntactic sugar for getting a series of types from a byte buffer
|
||||
// The Reader casts to any T * and increments the read pointer by T's size.
|
||||
struct Reader {
|
||||
Reader(void* p) : ptr((uint8_t*)p) {}
|
||||
uint8_t* ptr;
|
||||
template <typename T>
|
||||
operator T*() {
|
||||
|
@ -228,30 +234,33 @@ public:
|
|||
};
|
||||
|
||||
// Initialize the header for a new page to be populated soon and written to disk
|
||||
// Pre: Buffer is allocated and logical size is set
|
||||
// Post: Header is initialized using HEADER_WRITE_VERSION format
|
||||
// Encoding-specific header pointers are initialized
|
||||
// Payload can be written to with mutateData() and dataSize()
|
||||
void init(EncodingType t, PageType pageType, uint8_t pageSubType) {
|
||||
encodingType = t;
|
||||
Reader next{ buffer };
|
||||
VersionHeader* vh = next;
|
||||
// Only the latest header version is written.
|
||||
vh->headerVersion = HEADER_WRITE_VERSION;
|
||||
vh->encodingType = t;
|
||||
// Only HEADER_WRITE_VERSION is supported for writing, though
|
||||
// newer or older versions may be supported for reading
|
||||
pVersionHeader->headerVersion = HEADER_WRITE_VERSION;
|
||||
pVersionHeader->encodingType = t;
|
||||
|
||||
Reader next(pVersionHeader + 1);
|
||||
Header* h = next;
|
||||
h->pageType = pageType;
|
||||
h->pageSubType = pageSubType;
|
||||
|
||||
if (t == EncodingType::XXHash64) {
|
||||
next.skip<XXHashEncodingHeader>();
|
||||
pXXHashHeader = next;
|
||||
} else if (t == EncodingType::XOREncryption) {
|
||||
next.skip<XOREncodingHeader>();
|
||||
pXORHeader = next;
|
||||
} else {
|
||||
throw unsupported_format_version();
|
||||
throw page_encoding_not_supported();
|
||||
}
|
||||
|
||||
next.skip<Footer>();
|
||||
|
||||
pUsable = next;
|
||||
usableSize = logicalSize - (pUsable - buffer);
|
||||
pPayload = next;
|
||||
payloadSize = logicalSize - (pPayload - buffer);
|
||||
}
|
||||
|
||||
// Get the usable size for a new page of pageSize using HEADER_WRITE_VERSION with encoding type t
|
||||
|
@ -263,7 +272,7 @@ public:
|
|||
} else if (t == EncodingType::XOREncryption) {
|
||||
usable -= sizeof(XOREncodingHeader);
|
||||
} else {
|
||||
throw unsupported_format_version();
|
||||
throw page_encoding_not_supported();
|
||||
}
|
||||
|
||||
return usable;
|
||||
|
@ -272,23 +281,18 @@ public:
|
|||
Standalone<StringRef> asStringRef() const { return Standalone<StringRef>(StringRef(buffer, logicalSize)); }
|
||||
|
||||
// Get an ArenaPage which is a copy of this page, in its own Arena
|
||||
Reference<ArenaPage> cloneContents() const {
|
||||
Reference<ArenaPage> clone() const {
|
||||
ArenaPage* p = new ArenaPage(logicalSize, bufferSize);
|
||||
memcpy(p->buffer, buffer, logicalSize);
|
||||
p->pUsable = p->buffer + (pUsable - buffer);
|
||||
p->usableSize = usableSize;
|
||||
|
||||
p->encodingType = encodingType;
|
||||
if (encodingType == EncodingType::XOREncryption) {
|
||||
p->xorKeyID = xorKeyID;
|
||||
p->xorKeySecret = xorKeySecret;
|
||||
}
|
||||
p->postRead1(invalidPhysicalPageID, false);
|
||||
p->secret = secret;
|
||||
|
||||
return Reference<ArenaPage>(p);
|
||||
}
|
||||
|
||||
// Get an ArenaPage which depends on this page's Arena and references some of its memory
|
||||
Reference<ArenaPage> subPage(int offset, int len) const {
|
||||
ASSERT(offset + len <= logicalSize);
|
||||
ArenaPage* p = new ArenaPage(len, 0);
|
||||
p->buffer = buffer + offset;
|
||||
p->arena.dependsOn(arena);
|
||||
|
@ -296,12 +300,14 @@ public:
|
|||
}
|
||||
|
||||
// Must be called before writing to disk to update headers and encrypt page
|
||||
// Pre: Encoding secrets and other options must be set
|
||||
// Post: Encoding options will be stored in page if needed, payload will be encrypted
|
||||
// Pre: Encoding-specific header fields are set if needed
|
||||
// Secret is set if needed
|
||||
// Post: Main header checksum is updated, encoding-specific header is updated and
|
||||
// payload encrypted if needed
|
||||
void preWrite(PhysicalPageID pageID) const {
|
||||
Reader next{ buffer };
|
||||
const VersionHeader* vh = next;
|
||||
ASSERT(vh->headerVersion == HEADER_WRITE_VERSION);
|
||||
// Only HEADER_WRITE_VERSION is supported
|
||||
ASSERT(pVersionHeader->headerVersion == HEADER_WRITE_VERSION);
|
||||
Reader next(pVersionHeader + 1);
|
||||
|
||||
Header* h = next;
|
||||
h->firstPhysicalPageID = pageID;
|
||||
|
@ -312,60 +318,61 @@ public:
|
|||
h->lastKnownParentID = invalidLogicalPageID;
|
||||
h->writeVersion = invalidVersion;
|
||||
|
||||
if (vh->encodingType == EncodingType::XXHash64) {
|
||||
XXHashEncodingHeader* xh = next;
|
||||
xh->encode(pUsable, usableSize, pageID);
|
||||
} else if (vh->encodingType == EncodingType::XOREncryption) {
|
||||
XOREncodingHeader* xorh = next;
|
||||
xorh->keyID = xorKeyID;
|
||||
xorh->encode(xorKeySecret, pUsable, usableSize, pageID);
|
||||
if (pVersionHeader->encodingType == EncodingType::XXHash64) {
|
||||
next.skip<XXHashEncodingHeader>();
|
||||
pXXHashHeader->encode(pPayload, payloadSize, pageID);
|
||||
} else if (pVersionHeader->encodingType == EncodingType::XOREncryption) {
|
||||
next.skip<XOREncodingHeader>();
|
||||
pXORHeader->encode(secret[0], pPayload, payloadSize, pageID);
|
||||
} else {
|
||||
throw unsupported_format_version();
|
||||
throw page_encoding_not_supported();
|
||||
}
|
||||
|
||||
Footer* f = next;
|
||||
f->update(buffer, (uint8_t*)f - buffer);
|
||||
}
|
||||
|
||||
// Must be called after reading from disk to verify and decrypt page
|
||||
// Pre: Encoding secrets must be set
|
||||
// Post: Encoding options that come from page data will be populated, payload will be decrypted
|
||||
void postRead(PhysicalPageID pageID) {
|
||||
Reader next{ buffer };
|
||||
const VersionHeader* vh = next;
|
||||
encodingType = vh->encodingType;
|
||||
|
||||
if (vh->headerVersion == 1) {
|
||||
// Must be called after reading from disk to verify header and get encoding type
|
||||
// Pre: Bytes from storage medium copied into raw space
|
||||
// Post: Encoding-specific header pointer is initialized
|
||||
// Page header integrity is verified unless verify is false
|
||||
void postRead1(PhysicalPageID pageID, bool verify = true) {
|
||||
Reader next(pVersionHeader + 1);
|
||||
if (pVersionHeader->headerVersion == 1) {
|
||||
Header* h = next;
|
||||
XXHashEncodingHeader* xh = nullptr;
|
||||
XOREncodingHeader* xorh = nullptr;
|
||||
|
||||
if (encodingType == EncodingType::XXHash64) {
|
||||
xh = next;
|
||||
} else if (encodingType == EncodingType::XOREncryption) {
|
||||
xorh = next;
|
||||
if (pVersionHeader->encodingType == EncodingType::XXHash64) {
|
||||
pXXHashHeader = next;
|
||||
} else if (pVersionHeader->encodingType == EncodingType::XOREncryption) {
|
||||
pXORHeader = next;
|
||||
} else {
|
||||
throw unsupported_format_version();
|
||||
throw page_encoding_not_supported();
|
||||
}
|
||||
|
||||
Footer* f = next;
|
||||
pUsable = next;
|
||||
usableSize = logicalSize - (pUsable - buffer);
|
||||
pPayload = next;
|
||||
payloadSize = logicalSize - (pPayload - buffer);
|
||||
|
||||
f->verify(buffer, (uint8_t*)f - buffer);
|
||||
|
||||
if (xh != nullptr) {
|
||||
xh->decode(pUsable, usableSize, pageID);
|
||||
} else if (xorh != nullptr) {
|
||||
xorh->decode(xorKeySecret, pUsable, usableSize, pageID);
|
||||
xorKeyID = xorh->keyID;
|
||||
}
|
||||
|
||||
if (h->firstPhysicalPageID != pageID) {
|
||||
throw page_header_wrong_page_id();
|
||||
if (verify) {
|
||||
f->verify(buffer, (uint8_t*)f - buffer);
|
||||
if (pageID != h->firstPhysicalPageID) {
|
||||
throw page_header_wrong_page_id();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
throw unsupported_format_version();
|
||||
throw page_header_version_not_supported();
|
||||
}
|
||||
}
|
||||
|
||||
// Pre: postRead1 has been called, encoding-specific parameters have been set
|
||||
// Post: Payload has been verified and decrypted if necessary
|
||||
void postRead2(PhysicalPageID pageID) {
|
||||
if (pVersionHeader->encodingType == EncodingType::XXHash64) {
|
||||
pXXHashHeader->decode(pPayload, payloadSize, pageID);
|
||||
} else if (pVersionHeader->encodingType == EncodingType::XOREncryption) {
|
||||
pXORHeader->decode(secret[0], pPayload, payloadSize, pageID);
|
||||
} else {
|
||||
throw page_encoding_not_supported();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -382,18 +389,33 @@ private:
|
|||
// The physical size of allocated memory for the page which also represents the space
|
||||
// to be written to disk
|
||||
int bufferSize;
|
||||
uint8_t* buffer;
|
||||
union {
|
||||
uint8_t* buffer;
|
||||
VersionHeader* pVersionHeader;
|
||||
};
|
||||
|
||||
// Pointer and length of page space available to the user
|
||||
uint8_t* pUsable;
|
||||
int usableSize;
|
||||
uint8_t* pPayload;
|
||||
int payloadSize;
|
||||
union {
|
||||
XXHashEncodingHeader* pXXHashHeader;
|
||||
XOREncodingHeader* pXORHeader;
|
||||
};
|
||||
|
||||
EncodingType encodingType;
|
||||
// Encoding-specific secrets
|
||||
uint8_t xorKeyID;
|
||||
uint8_t xorKeySecret;
|
||||
// Secret string available for use by encodings
|
||||
Key secret;
|
||||
|
||||
public:
|
||||
EncodingType getEncodingType() const { return pVersionHeader->encodingType; }
|
||||
|
||||
void setSecret(Key k) { secret = k; }
|
||||
|
||||
// Read/write access to the XOR key ID
|
||||
uint8_t& xorKeyID() {
|
||||
ASSERT(pVersionHeader->encodingType == EncodingType::XOREncryption);
|
||||
return pXORHeader->keyID;
|
||||
}
|
||||
|
||||
// A metadata object that can be attached to the page and will be deleted with the page
|
||||
mutable void* userData;
|
||||
mutable void (*userDataDestructor)(void*);
|
||||
|
@ -426,6 +448,8 @@ public:
|
|||
// This API is probably too customized to the behavior of DWALPager and probably needs some changes to be more generic.
|
||||
class IPager2 : public IClosable {
|
||||
public:
|
||||
virtual std::string getName() const = 0;
|
||||
|
||||
// Returns an ArenaPage that can be passed to writePage. The data in the returned ArenaPage might not be zeroed.
|
||||
virtual Reference<ArenaPage> newPageBuffer(size_t blocks = 1) = 0;
|
||||
|
||||
|
|
|
@ -697,7 +697,7 @@ public:
|
|||
debug_printf("FIFOQueue::Cursor(%s) loadExtent\n", toString().c_str());
|
||||
return map(queue->pager->readExtent(pageID), [=](Reference<ArenaPage> p) {
|
||||
page = p;
|
||||
debug_printf("FIFOQueue::Cursor(%s) loadExtent done. Page: %p\n", toString().c_str(), page->data());
|
||||
debug_printf("FIFOQueue::Cursor(%s) loadExtent done. Page: %p\n", toString().c_str(), page->rawData());
|
||||
return Void();
|
||||
});
|
||||
}
|
||||
|
@ -780,7 +780,7 @@ public:
|
|||
|
||||
// For extent based queue, update the index of current page within the extent
|
||||
if (queue->usesExtents) {
|
||||
debug_printf("FIFOQueue::Cursor(%s) Adding page %s init=%d pageCount %d\n",
|
||||
debug_printf("FIFOQueue::Cursor(%s) Adding page %s init=%d pageCount % " PRId64 "\n",
|
||||
toString().c_str(),
|
||||
::toString(newPageID).c_str(),
|
||||
initializeNewPage,
|
||||
|
@ -1177,8 +1177,9 @@ public:
|
|||
c.pageID * self->pager->getPhysicalPageSize());
|
||||
|
||||
try {
|
||||
page->postRead(c.pageID);
|
||||
|
||||
page->postRead1(c.pageID);
|
||||
// These pages are not encrypted
|
||||
page->postRead2(c.pageID);
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevError, "RedwoodChecksumFailed")
|
||||
.detail("PageID", c.pageID)
|
||||
|
@ -1959,7 +1960,8 @@ public:
|
|||
break;
|
||||
}
|
||||
|
||||
debug_printf("currentSize is %u and input size is %u. Trying to evict %s to make room for %s\n",
|
||||
debug_printf("currentSize is %" PRId64
|
||||
" and input size is %u. Trying to evict %s to make room for %s\n",
|
||||
currentSize,
|
||||
size,
|
||||
toString(toEvict.index).c_str(),
|
||||
|
@ -2197,7 +2199,7 @@ public:
|
|||
}
|
||||
|
||||
void updateCommittedHeader() {
|
||||
lastCommittedHeaderPage = headerPage->cloneContents();
|
||||
lastCommittedHeaderPage = headerPage->clone();
|
||||
pLastCommittedHeader = (Header*)lastCommittedHeaderPage->mutateData();
|
||||
}
|
||||
|
||||
|
@ -2326,7 +2328,7 @@ public:
|
|||
try {
|
||||
loop choose {
|
||||
when(Standalone<VectorRef<RemappedPage>> remaps = waitNext(remapStream.getFuture())) {
|
||||
debug_printf("DWALPager(%s) recovery. remaps size: %d, queueEntries: %d\n",
|
||||
debug_printf("DWALPager(%s) recovery. remaps size: %d, queueEntries: %" PRId64 "\n",
|
||||
self->filename.c_str(),
|
||||
remaps.size(),
|
||||
self->remapQueue.numEntries);
|
||||
|
@ -2448,7 +2450,7 @@ public:
|
|||
|
||||
self->recoveryVersion = self->pHeader->committedVersion;
|
||||
debug_printf("DWALPager(%s) recovered. recoveryVersion=%" PRId64 " oldestVersion=%" PRId64
|
||||
" logicalPageSize=%d physicalPageSize=%d\n",
|
||||
" logicalPageSize=%d physicalPageSize=%d headerPageCount=%" PRId64 " filePageCount=%" PRId64 "\n",
|
||||
self->filename.c_str(),
|
||||
self->recoveryVersion,
|
||||
self->pHeader->oldestVersion,
|
||||
|
@ -2663,21 +2665,33 @@ public:
|
|||
page->data());
|
||||
|
||||
debug_printf("DWALPager(%s) writePhysicalPage %s\n", filename.c_str(), toString(pageIDs).c_str());
|
||||
|
||||
// Copy the page if preWrite will encrypt/modify the payload
|
||||
bool copy = page->getEncodingType() == EncodingType::XOREncryption;
|
||||
if (copy) {
|
||||
page = page->clone();
|
||||
}
|
||||
page->preWrite(pageIDs.front());
|
||||
|
||||
int blockSize = header ? smallestPhysicalBlock : physicalPageSize;
|
||||
Future<Void> f;
|
||||
if (pageIDs.size() == 1) {
|
||||
f = writePhysicalBlock(this, page->rawData(), reason, level, pageIDs.front(), blockSize, header);
|
||||
operations.add(f);
|
||||
return f;
|
||||
} else {
|
||||
std::vector<Future<Void>> writers;
|
||||
for (int i = 0; i < pageIDs.size(); ++i) {
|
||||
Future<Void> p = writePhysicalBlock(
|
||||
this, page->rawData() + (i * blockSize), reason, level, pageIDs[i], blockSize, header);
|
||||
writers.push_back(p);
|
||||
}
|
||||
f = waitForAll(writers);
|
||||
}
|
||||
std::vector<Future<Void>> writers;
|
||||
for (int i = 0; i < pageIDs.size(); ++i) {
|
||||
Future<Void> p = writePhysicalBlock(
|
||||
this, page->rawData() + (i * blockSize), reason, level, pageIDs[i], blockSize, header);
|
||||
writers.push_back(p);
|
||||
|
||||
// If the page was copied, hold the copy alive until f is ready
|
||||
if (copy) {
|
||||
f = holdWhile(page, f);
|
||||
}
|
||||
f = waitForAll(writers);
|
||||
|
||||
operations.add(f);
|
||||
return f;
|
||||
}
|
||||
|
@ -2903,7 +2917,12 @@ public:
|
|||
readBytes);
|
||||
|
||||
try {
|
||||
page->postRead(pageID);
|
||||
page->postRead1(pageID);
|
||||
if (page->getEncodingType() == EncodingType::XOREncryption) {
|
||||
uint8_t secret = ~page->xorKeyID();
|
||||
page->setSecret(StringRef(&secret, 1));
|
||||
}
|
||||
page->postRead2(pageID);
|
||||
debug_printf("DWALPager(%s) op=readPhysicalVerifyComplete %s ptr=%p bytes=%d\n",
|
||||
self->filename.c_str(),
|
||||
toString(pageID).c_str(),
|
||||
|
@ -2966,7 +2985,12 @@ public:
|
|||
pageIDs.size() * blockSize);
|
||||
|
||||
try {
|
||||
page->postRead(pageIDs.front());
|
||||
page->postRead1(pageIDs.front());
|
||||
if (page->getEncodingType() == EncodingType::XOREncryption) {
|
||||
uint8_t secret = ~page->xorKeyID();
|
||||
page->setSecret(StringRef(&secret, 1));
|
||||
}
|
||||
page->postRead2(pageIDs.front());
|
||||
} catch (Error& e) {
|
||||
// For header pages, error is a warning because recovery may still be possible
|
||||
TraceEvent(SevError, "RedwoodPageError")
|
||||
|
@ -3160,24 +3184,26 @@ public:
|
|||
}
|
||||
|
||||
// readSize may not be equal to the physical extent size (for the first and last extents)
|
||||
if (!readSize)
|
||||
// but otherwise use the full physical extent size
|
||||
if (readSize == 0) {
|
||||
readSize = self->physicalExtentSize;
|
||||
}
|
||||
|
||||
state Reference<ArenaPage> extent = ArenaPage::create(self->logicalPageSize, readSize);
|
||||
state Reference<ArenaPage> extent = ArenaPage::create(readSize, readSize);
|
||||
|
||||
// physicalReadSize is the size of disk read we intend to issue
|
||||
auto physicalReadSize = SERVER_KNOBS->REDWOOD_DEFAULT_EXTENT_READ_SIZE;
|
||||
auto parallelReads = readSize / physicalReadSize;
|
||||
auto lastReadSize = readSize % physicalReadSize;
|
||||
|
||||
debug_printf(
|
||||
"DWALPager(%s) op=readPhysicalExtentStart %s readSize %d offset %d physicalReadSize %d parallelReads %d\n",
|
||||
self->filename.c_str(),
|
||||
toString(pageID).c_str(),
|
||||
readSize,
|
||||
(int64_t)pageID * (self->physicalPageSize),
|
||||
physicalReadSize,
|
||||
parallelReads);
|
||||
debug_printf("DWALPager(%s) op=readPhysicalExtentStart %s readSize %d offset %" PRId64
|
||||
" physicalReadSize %d parallelReads %d\n",
|
||||
self->filename.c_str(),
|
||||
toString(pageID).c_str(),
|
||||
readSize,
|
||||
(int64_t)pageID * (self->physicalPageSize),
|
||||
physicalReadSize,
|
||||
parallelReads);
|
||||
|
||||
// we split the extent read into a number of parallel disk reads based on the determined physical
|
||||
// disk read size. All those reads are issued in parallel and their futures are stored into the following
|
||||
|
@ -3188,7 +3214,7 @@ public:
|
|||
int64_t currentOffset;
|
||||
for (i = 0; i < parallelReads; i++) {
|
||||
currentOffset = i * physicalReadSize;
|
||||
debug_printf("DWALPager(%s) current offset %d\n", self->filename.c_str(), currentOffset);
|
||||
debug_printf("DWALPager(%s) current offset %" PRId64 "\n", self->filename.c_str(), currentOffset);
|
||||
++g_redwoodMetrics.metric.pagerDiskRead;
|
||||
reads.push_back(
|
||||
self->pageFile->read(extent->rawData() + currentOffset, physicalReadSize, startOffset + currentOffset));
|
||||
|
@ -3197,7 +3223,7 @@ public:
|
|||
// Handle the last read separately as it may be smaller than physicalReadSize
|
||||
if (lastReadSize) {
|
||||
currentOffset = i * physicalReadSize;
|
||||
debug_printf("DWALPager(%s) iter %d current offset %d lastReadSize %d\n",
|
||||
debug_printf("DWALPager(%s) iter %d current offset %" PRId64 " lastReadSize %d\n",
|
||||
self->filename.c_str(),
|
||||
i,
|
||||
currentOffset,
|
||||
|
@ -4859,6 +4885,10 @@ public:
|
|||
Reference<ArenaPage> makeEmptyRoot() {
|
||||
Reference<ArenaPage> page = m_pager->newPageBuffer();
|
||||
page->init(m_encodingType, PageType::BTreeNode, 1);
|
||||
if (m_encodingType == EncodingType::XOREncryption) {
|
||||
page->xorKeyID() = ~dbEnd.key[0];
|
||||
page->setSecret(dbEnd.key.substr(0, 1));
|
||||
}
|
||||
BTreePage* btpage = (BTreePage*)page->mutateData();
|
||||
btpage->height = 1;
|
||||
btpage->kvBytes = 0;
|
||||
|
@ -4984,7 +5014,8 @@ public:
|
|||
self->m_pHeader = (MetaKey*)new uint8_t[self->m_headerSpace];
|
||||
|
||||
debug_printf("Recovered pager to version %" PRId64 ", oldest version is %" PRId64 "\n",
|
||||
self->m_newOldestVersion);
|
||||
self->getLastCommittedVersion(),
|
||||
self->m_pager->getOldestReadableVersion());
|
||||
|
||||
state Key meta = self->m_pager->getMetaKey();
|
||||
if (meta.size() == 0) {
|
||||
|
@ -5090,7 +5121,7 @@ public:
|
|||
// From the pager's perspective the only pages that should be in use are the btree root and
|
||||
// the previously mentioned lazy delete queue page.
|
||||
int64_t userPageCount = wait(self->m_pager->getUserPageCount());
|
||||
debug_printf("clearAllAndCheckSanity: userPageCount: %d\n", userPageCount);
|
||||
debug_printf("clearAllAndCheckSanity: userPageCount: %" PRId64 "\n", userPageCount);
|
||||
ASSERT(userPageCount == 2);
|
||||
|
||||
return Void();
|
||||
|
@ -5560,7 +5591,7 @@ private:
|
|||
|
||||
for (pageIndex = 0; pageIndex < pagesToBuild.size(); ++pageIndex) {
|
||||
auto& p = pagesToBuild[pageIndex];
|
||||
debug_printf("building page %d of %d %s\n", pageIndex + 1, pagesToBuild.size(), p.toString().c_str());
|
||||
debug_printf("building page %d of %zu %s\n", pageIndex + 1, pagesToBuild.size(), p.toString().c_str());
|
||||
ASSERT(p.count != 0);
|
||||
|
||||
// For internal pages, skip first entry if child link is null. Such links only exist
|
||||
|
@ -5603,6 +5634,10 @@ private:
|
|||
state Reference<ArenaPage> page = self->m_pager->newPageBuffer(p.blockCount);
|
||||
page->init(
|
||||
self->m_encodingType, (p.blockCount == 1) ? PageType::BTreeNode : PageType::BTreeSuperNode, height);
|
||||
if (self->m_encodingType == EncodingType::XOREncryption) {
|
||||
page->xorKeyID() = ~pageUpperBound.key[0];
|
||||
page->setSecret(pageUpperBound.key.substr(0, 1));
|
||||
}
|
||||
|
||||
BTreePage* btPage = (BTreePage*)page->mutateData();
|
||||
|
||||
|
@ -5715,7 +5750,7 @@ private:
|
|||
Version version,
|
||||
Standalone<VectorRef<RedwoodRecordRef>> records,
|
||||
unsigned int height) {
|
||||
debug_printf("buildNewRoot start version %" PRId64 ", %lu records\n", version, records.size());
|
||||
debug_printf("buildNewRoot start version %" PRId64 ", %d records\n", version, records.size());
|
||||
|
||||
// While there are multiple child pages for this version we must write new tree levels.
|
||||
while (records.size() > 1) {
|
||||
|
@ -5723,7 +5758,7 @@ private:
|
|||
ASSERT(height < std::numeric_limits<int8_t>::max());
|
||||
Standalone<VectorRef<RedwoodRecordRef>> newRecords =
|
||||
wait(writePages(self, &dbBegin, &dbEnd, records, height, version, BTreePageIDRef()));
|
||||
debug_printf("Wrote a new root level at version %" PRId64 " height %d size %lu pages\n",
|
||||
debug_printf("Wrote a new root level at version %" PRId64 " height %d size %d pages\n",
|
||||
version,
|
||||
height,
|
||||
newRecords.size());
|
||||
|
@ -5865,7 +5900,7 @@ private:
|
|||
|
||||
// Copy page to a new page which shares the same DecodeCache with the old page
|
||||
static Reference<ArenaPage> clonePageForUpdate(Reference<const ArenaPage> page) {
|
||||
Reference<ArenaPage> newPage = page->cloneContents();
|
||||
Reference<ArenaPage> newPage = page->clone();
|
||||
|
||||
if (newPage->userData != nullptr) {
|
||||
BTreePage::BinaryTree::DecodeCache* cache = (BTreePage::BinaryTree::DecodeCache*)page->userData;
|
||||
|
@ -6715,14 +6750,14 @@ private:
|
|||
recursions.push_back(self->commitSubtree(self, batch, pageID, height - 1, mBegin, mEnd, &u));
|
||||
}
|
||||
|
||||
debug_printf(
|
||||
"%s Recursions from internal page started. pageSize=%d level=%d children=%d slices=%d recursions=%d\n",
|
||||
context.c_str(),
|
||||
btPage->size(),
|
||||
btPage->height,
|
||||
btPage->tree()->numItems,
|
||||
slices.size(),
|
||||
recursions.size());
|
||||
debug_printf("%s Recursions from internal page started. pageSize=%d level=%d children=%d slices=%zu "
|
||||
"recursions=%zu\n",
|
||||
context.c_str(),
|
||||
btPage->size(),
|
||||
btPage->height,
|
||||
btPage->tree()->numItems,
|
||||
slices.size(),
|
||||
recursions.size());
|
||||
|
||||
wait(waitForAll(recursions));
|
||||
debug_printf("%s Recursions done, processing slice updates.\n", context.c_str());
|
||||
|
@ -7356,7 +7391,7 @@ public:
|
|||
SERVER_KNOBS->REDWOOD_EXTENT_CONCURRENT_READS,
|
||||
false,
|
||||
m_error);
|
||||
m_tree = new VersionedBTree(pager, filePrefix);
|
||||
m_tree = new VersionedBTree(pager, filePrefix, BUGGIFY ? EncodingType::XOREncryption : EncodingType::XXHash64);
|
||||
m_init = catchError(init_impl(this));
|
||||
}
|
||||
|
||||
|
@ -8662,7 +8697,7 @@ TEST_CASE("Lredwood/correctness/unit/deltaTree/IntIntPair") {
|
|||
|
||||
auto printItems = [&] {
|
||||
for (int k = 0; k < items.size(); ++k) {
|
||||
debug_printf("%d/%d %s\n", k + 1, items.size(), items[k].toString().c_str());
|
||||
debug_printf("%d/%zu %s\n", k + 1, items.size(), items[k].toString().c_str());
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -9264,9 +9299,12 @@ TEST_CASE("Lredwood/correctness/btree") {
|
|||
state int maxVerificationMapEntries = params.getInt("maxVerificationMapEntries").orDefault(300e3);
|
||||
state int concurrentExtentReads =
|
||||
params.getInt("concurrentExtentReads").orDefault(SERVER_KNOBS->REDWOOD_EXTENT_CONCURRENT_READS);
|
||||
state EncodingType encodingType =
|
||||
deterministicRandom()->coinflip() ? EncodingType::XXHash64 : EncodingType::XOREncryption;
|
||||
|
||||
printf("\n");
|
||||
printf("file: %s\n", file.c_str());
|
||||
printf("encodingType: %d\n", encodingType);
|
||||
printf("targetPageOps: %" PRId64 "\n", targetPageOps);
|
||||
printf("pagerMemoryOnly: %d\n", pagerMemoryOnly);
|
||||
printf("serialTest: %d\n", serialTest);
|
||||
|
@ -9295,7 +9333,7 @@ TEST_CASE("Lredwood/correctness/btree") {
|
|||
printf("Initializing...\n");
|
||||
pager = new DWALPager(
|
||||
pageSize, extentSize, file, cacheSizeBytes, remapCleanupWindow, concurrentExtentReads, pagerMemoryOnly);
|
||||
state VersionedBTree* btree = new VersionedBTree(pager, file);
|
||||
state VersionedBTree* btree = new VersionedBTree(pager, file, encodingType);
|
||||
wait(btree->init());
|
||||
|
||||
state std::map<std::pair<std::string, Version>, Optional<std::string>> written;
|
||||
|
|
|
@ -85,6 +85,10 @@ ERROR( granule_assignment_conflict, 1061, "Conflicting attempts to assign blob g
|
|||
ERROR( change_feed_cancelled, 1062, "Change feed was cancelled" )
|
||||
ERROR( blob_granule_file_load_error, 1063, "Error loading a blob file during granule materialization" )
|
||||
ERROR( page_header_wrong_page_id, 1064, "Page header does not match location on disk" )
|
||||
ERROR( page_header_checksum_failed, 1065, "Page header checksum failed" )
|
||||
ERROR( page_header_version_not_supported, 1066, "Page header checksum failed" )
|
||||
ERROR( page_encoding_not_supported, 1067, "Page encoding type is not supported or not valid" )
|
||||
ERROR( page_decoding_failed, 1068, "Page content decoding failed" )
|
||||
|
||||
ERROR( broken_promise, 1100, "Broken promise" )
|
||||
ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )
|
||||
|
|
Loading…
Reference in New Issue