Merge pull request #2906 from satherton/keyvaluestorememory-delta-compression
Added key delta compression to KeyValueStoreMemory on-disk state.
This commit is contained in:
commit
cf81e34d5a
|
@ -26,6 +26,7 @@
|
|||
#include "fdbclient/Notified.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
#include "fdbserver/DeltaTree.h"
|
||||
|
||||
#define OP_DISK_OVERHEAD (sizeof(OpHeader) + 1)
|
||||
|
||||
|
@ -268,7 +269,8 @@ private:
|
|||
OpSnapshotEnd,
|
||||
OpSnapshotAbort, // terminate an in progress snapshot in order to start a full snapshot
|
||||
OpCommit, // only in log, not in queue
|
||||
OpRollback // only in log, not in queue
|
||||
OpRollback, // only in log, not in queue
|
||||
OpSnapshotItemDelta
|
||||
};
|
||||
|
||||
struct OpRef {
|
||||
|
@ -344,8 +346,7 @@ private:
|
|||
int64_t overheadWriteBytes;
|
||||
NotifiedVersion notifiedCommittedWriteBytes;
|
||||
Key recoveredSnapshotKey; // After recovery, the next key in the currently uncompleted snapshot
|
||||
IDiskQueue::location
|
||||
currentSnapshotEnd; // The end of the most recently completed snapshot (this snapshot cannot be discarded)
|
||||
IDiskQueue::location currentSnapshotEnd; // The end of the most recently completed snapshot (this snapshot cannot be discarded)
|
||||
IDiskQueue::location previousSnapshotEnd; // The end of the second most recently completed snapshot (on commit, this
|
||||
// snapshot can be discarded)
|
||||
PromiseStream<Future<Void>> addActor;
|
||||
|
@ -443,6 +444,7 @@ private:
|
|||
|
||||
state OpQueue recoveryQueue;
|
||||
state OpHeader h;
|
||||
state Standalone<StringRef> lastSnapshotKey;
|
||||
|
||||
TraceEvent("KVSMemRecoveryStarted", self->id)
|
||||
.detail("SnapshotEndLocation", uncommittedSnapshotEnd);
|
||||
|
@ -485,7 +487,7 @@ private:
|
|||
StringRef p1 = data.substr(0, h.len1);
|
||||
StringRef p2 = data.substr(h.len1, h.len2);
|
||||
|
||||
if (h.op == OpSnapshotItem) { // snapshot data item
|
||||
if (h.op == OpSnapshotItem || h.op == OpSnapshotItemDelta) { // snapshot data item
|
||||
/*if (p1 < uncommittedNextKey) {
|
||||
TraceEvent(SevError, "RecSnapshotBack", self->id)
|
||||
.detail("NextKey", uncommittedNextKey)
|
||||
|
@ -493,11 +495,27 @@ private:
|
|||
.detail("Nextlocation", self->log->getNextReadLocation());
|
||||
}
|
||||
ASSERT( p1 >= uncommittedNextKey );*/
|
||||
if(h.op == OpSnapshotItemDelta) {
|
||||
ASSERT(p1.size() > 1);
|
||||
// Get number of bytes borrowed from previous item key
|
||||
int borrowed = *(uint8_t *)p1.begin();
|
||||
ASSERT(borrowed <= lastSnapshotKey.size());
|
||||
// Trim p1 to just the suffix
|
||||
StringRef suffix = p1.substr(1);
|
||||
// Allocate a new string in data arena to hold prefix + suffix
|
||||
Arena &dataArena = *(Arena *)&data.arena();
|
||||
p1 = makeString(borrowed + suffix.size(), dataArena);
|
||||
// Copy the prefix into the new reconstituted key
|
||||
memcpy(mutateString(p1), lastSnapshotKey.begin(), borrowed);
|
||||
// Copy the suffix into the new reconstituted key
|
||||
memcpy(mutateString(p1) + borrowed, suffix.begin(), suffix.size());
|
||||
}
|
||||
if( p1 >= uncommittedNextKey )
|
||||
recoveryQueue.clear( KeyRangeRef(uncommittedNextKey, p1), &uncommittedNextKey.arena() ); //FIXME: Not sure what this line is for, is it necessary?
|
||||
recoveryQueue.set( KeyValueRef(p1, p2), &data.arena() );
|
||||
uncommittedNextKey = keyAfter(p1);
|
||||
++dbgSnapshotItemCount;
|
||||
lastSnapshotKey = Key(p1, data.arena());
|
||||
} else if (h.op == OpSnapshotEnd || h.op == OpSnapshotAbort) { // snapshot complete
|
||||
TraceEvent("RecSnapshotEnd", self->id)
|
||||
.detail("NextKey", uncommittedNextKey)
|
||||
|
@ -511,6 +529,7 @@ private:
|
|||
}
|
||||
|
||||
uncommittedNextKey = Key();
|
||||
lastSnapshotKey = Key();
|
||||
++dbgSnapshotEndCount;
|
||||
} else if (h.op == OpSet) { // set mutation
|
||||
recoveryQueue.set( KeyValueRef(p1,p2), &data.arena() );
|
||||
|
@ -629,6 +648,12 @@ private:
|
|||
state int snapItems = 0;
|
||||
state uint64_t snapshotBytes = 0;
|
||||
|
||||
// Snapshot keys will be alternately written to two preallocated buffers.
|
||||
// This allows consecutive snapshot keys to be compared for delta compression while only copying each key's bytes once.
|
||||
state Key lastSnapshotKeyA = makeString(CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT);
|
||||
state Key lastSnapshotKeyB = makeString(CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT);
|
||||
state bool lastSnapshotKeyUsingA = true;
|
||||
|
||||
TraceEvent("KVSMemStartingSnapshot", self->id).detail("StartKey", nextKey);
|
||||
|
||||
loop {
|
||||
|
@ -652,40 +677,118 @@ private:
|
|||
.detail("LastOperationWasASnapshot", nextKey == Key() && !nextKeyAfter);
|
||||
lastDiff = diff;
|
||||
|
||||
if (next == self->data.end()) {
|
||||
auto thisSnapshotEnd = self->log_op(OpSnapshotEnd, StringRef(), StringRef());
|
||||
//TraceEvent("SnapshotEnd", self->id)
|
||||
// .detail("LastKey", lastKey.present() ? lastKey.get() : LiteralStringRef("<none>"))
|
||||
// .detail("CurrentSnapshotEndLoc", self->currentSnapshotEnd)
|
||||
// .detail("PreviousSnapshotEndLoc", self->previousSnapshotEnd)
|
||||
// .detail("ThisSnapshotEnd", thisSnapshotEnd)
|
||||
// .detail("Items", snapItems)
|
||||
// .detail("CommittedWrites", self->notifiedCommittedWriteBytes.get())
|
||||
// .detail("SnapshotSize", snapshotBytes);
|
||||
// Since notifiedCommittedWriteBytes is only set() once per commit, before logging the commit operation, when
|
||||
// this line is reached it is certain that there are no snapshot items in this commit yet. Since this commit
|
||||
// could be the first thing read during recovery, we can't write a delta yet.
|
||||
bool useDelta = false;
|
||||
|
||||
ASSERT(thisSnapshotEnd >= self->currentSnapshotEnd);
|
||||
self->previousSnapshotEnd = self->currentSnapshotEnd;
|
||||
self->currentSnapshotEnd = thisSnapshotEnd;
|
||||
// Write snapshot items until the wait above would block because we've used up all of the byte budget
|
||||
loop {
|
||||
|
||||
if (++self->snapshotCount == 2) {
|
||||
self->replaceContent = false;
|
||||
if (next == self->data.end()) {
|
||||
// After a snapshot end is logged, recovery may not see the last snapshot item logged before it so the
|
||||
// next snapshot item logged cannot be a delta.
|
||||
useDelta = false;
|
||||
|
||||
auto thisSnapshotEnd = self->log_op(OpSnapshotEnd, StringRef(), StringRef());
|
||||
//TraceEvent("SnapshotEnd", self->id)
|
||||
// .detail("LastKey", lastKey.present() ? lastKey.get() : LiteralStringRef("<none>"))
|
||||
// .detail("CurrentSnapshotEndLoc", self->currentSnapshotEnd)
|
||||
// .detail("PreviousSnapshotEndLoc", self->previousSnapshotEnd)
|
||||
// .detail("ThisSnapshotEnd", thisSnapshotEnd)
|
||||
// .detail("Items", snapItems)
|
||||
// .detail("CommittedWrites", self->notifiedCommittedWriteBytes.get())
|
||||
// .detail("SnapshotSize", snapshotBytes);
|
||||
|
||||
ASSERT(thisSnapshotEnd >= self->currentSnapshotEnd);
|
||||
self->previousSnapshotEnd = self->currentSnapshotEnd;
|
||||
self->currentSnapshotEnd = thisSnapshotEnd;
|
||||
|
||||
if (++self->snapshotCount == 2) {
|
||||
self->replaceContent = false;
|
||||
}
|
||||
|
||||
snapItems = 0;
|
||||
snapshotBytes = 0;
|
||||
snapshotTotalWrittenBytes += OP_DISK_OVERHEAD;
|
||||
|
||||
// If we're not stopping now, reset next
|
||||
if(snapshotTotalWrittenBytes < self->notifiedCommittedWriteBytes.get()) {
|
||||
next = self->data.begin();
|
||||
}
|
||||
else {
|
||||
// Otherwise, save state for continuing after the next wait and stop
|
||||
nextKey = Key();
|
||||
nextKeyAfter = false;
|
||||
break;
|
||||
}
|
||||
|
||||
} else {
|
||||
// destKey is whichever of the two last key buffers we should write to next.
|
||||
Key &destKey = lastSnapshotKeyUsingA ? lastSnapshotKeyA : lastSnapshotKeyB;
|
||||
|
||||
// Get the key, using destKey as a temporary buffer if needed.
|
||||
KeyRef tempKey = next.getKey(mutateString(destKey));
|
||||
int opKeySize = tempKey.size();
|
||||
|
||||
// If tempKey did not use the start of destKey, then copy tempKey into destKey.
|
||||
// It's technically possible for the source and dest to overlap but with the current container implementations that will not happen.
|
||||
if(tempKey.begin() != destKey.begin()) {
|
||||
memcpy(mutateString(destKey), tempKey.begin(), tempKey.size());
|
||||
}
|
||||
|
||||
// Now, tempKey's bytes definitely exist in memory at destKey.begin() so update destKey's contents to be a proper KeyRef of the key.
|
||||
// This intentionally leaves the Arena alone and doesn't copy anything into it.
|
||||
destKey.contents() = KeyRef(destKey.begin(), tempKey.size());
|
||||
|
||||
// Get the common prefix between this key and the previous one, or 0 if there was no previous one.
|
||||
int commonPrefix;
|
||||
if(useDelta) {
|
||||
commonPrefix = commonPrefixLength(lastSnapshotKeyA, lastSnapshotKeyB);
|
||||
}
|
||||
else {
|
||||
commonPrefix = 0;
|
||||
useDelta = true;
|
||||
}
|
||||
|
||||
// If the common prefix is greater than 1, write a delta item. It isn't worth doing for 0 or 1 bytes, it would merely add decode overhead (string copying).
|
||||
if(commonPrefix > 1) {
|
||||
// Cap the common prefix length to 255. Sorry, ridiculously long keys!
|
||||
commonPrefix = std::min<int>(commonPrefix, std::numeric_limits<uint8_t>::max());
|
||||
|
||||
// We're going to temporarily write a 1-byte integer just before the key suffix to create the log op key and log it, then restore that byte.
|
||||
uint8_t &prefixLength = mutateString(destKey)[commonPrefix - 1];
|
||||
uint8_t backupByte = prefixLength;
|
||||
prefixLength = commonPrefix;
|
||||
|
||||
opKeySize = opKeySize - commonPrefix + 1;
|
||||
KeyRef opKey(&prefixLength, opKeySize);
|
||||
self->log_op(OpSnapshotItemDelta, opKey, next.getValue());
|
||||
|
||||
// Restore the overwritten byte
|
||||
prefixLength = backupByte;
|
||||
}
|
||||
else {
|
||||
self->log_op(OpSnapshotItem, tempKey, next.getValue());
|
||||
}
|
||||
|
||||
snapItems++;
|
||||
uint64_t opBytes = opKeySize + next.getValue().size() + OP_DISK_OVERHEAD;
|
||||
snapshotBytes += opBytes;
|
||||
snapshotTotalWrittenBytes += opBytes;
|
||||
lastSnapshotKeyUsingA = !lastSnapshotKeyUsingA;
|
||||
|
||||
// If we're not stopping now, increment next
|
||||
if(snapshotTotalWrittenBytes < self->notifiedCommittedWriteBytes.get()) {
|
||||
++next;
|
||||
}
|
||||
else {
|
||||
// Otherwise, save state for continuing after the next wait and stop
|
||||
nextKey = destKey;
|
||||
nextKeyAfter = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
nextKey = Key();
|
||||
nextKeyAfter = false;
|
||||
snapItems = 0;
|
||||
|
||||
snapshotBytes = 0;
|
||||
|
||||
snapshotTotalWrittenBytes += OP_DISK_OVERHEAD;
|
||||
} else {
|
||||
StringRef tempKey = next.getKey(self->reserved_buffer);
|
||||
self->log_op(OpSnapshotItem, tempKey, next.getValue());
|
||||
nextKey = tempKey;
|
||||
nextKeyAfter = true;
|
||||
snapItems++;
|
||||
uint64_t opBytes = tempKey.size() + next.getValue().size() + OP_DISK_OVERHEAD;
|
||||
snapshotBytes += opBytes;
|
||||
snapshotTotalWrittenBytes += opBytes;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue