857 lines
32 KiB
C++
857 lines
32 KiB
C++
/*
|
|
* KeyValueStoreMemory.actor.cpp
|
|
*
|
|
* This source file is part of the FoundationDB open source project
|
|
*
|
|
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
#include "fdbserver/IKeyValueStore.h"
|
|
#include "fdbserver/IDiskQueue.h"
|
|
#include "flow/IKeyValueContainer.h"
|
|
#include "flow/RadixTree.h"
|
|
#include "flow/ActorCollection.h"
|
|
#include "fdbclient/Notified.h"
|
|
#include "fdbclient/SystemData.h"
|
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
|
#include "fdbserver/DeltaTree.h"
|
|
|
|
#define OP_DISK_OVERHEAD (sizeof(OpHeader) + 1)
|
|
|
|
extern bool noUnseed;
|
|
|
|
template <typename Container>
|
|
class KeyValueStoreMemory : public IKeyValueStore, NonCopyable {
|
|
public:
|
|
KeyValueStoreMemory(IDiskQueue* log, UID id, int64_t memoryLimit, KeyValueStoreType storeType, bool disableSnapshot,
|
|
bool replaceContent, bool exactRecovery);
|
|
|
|
// IClosable
|
|
virtual Future<Void> getError() { return log->getError(); }
|
|
virtual Future<Void> onClosed() { return log->onClosed(); }
|
|
virtual void dispose() {
|
|
recovering.cancel();
|
|
log->dispose();
|
|
if (reserved_buffer != nullptr) {
|
|
delete[] reserved_buffer;
|
|
reserved_buffer = nullptr;
|
|
}
|
|
delete this;
|
|
}
|
|
virtual void close() {
|
|
recovering.cancel();
|
|
log->close();
|
|
if (reserved_buffer != nullptr) {
|
|
delete[] reserved_buffer;
|
|
reserved_buffer = nullptr;
|
|
}
|
|
delete this;
|
|
}
|
|
|
|
// IKeyValueStore
|
|
virtual KeyValueStoreType getType() { return type; }
|
|
|
|
virtual std::tuple<size_t, size_t, size_t> getSize() { return data.size(); }
|
|
|
|
int64_t getAvailableSize() {
|
|
int64_t residentSize = data.sumTo(data.end()) + queue.totalSize() + // doesn't account for overhead in queue
|
|
transactionSize;
|
|
|
|
return memoryLimit - residentSize;
|
|
}
|
|
|
|
virtual StorageBytes getStorageBytes() {
|
|
StorageBytes diskQueueBytes = log->getStorageBytes();
|
|
|
|
// Try to bound how many in-memory bytes we might need to write to disk if we commit() now
|
|
int64_t uncommittedBytes = queue.totalSize() + transactionSize;
|
|
|
|
// Check that we have enough space in memory and on disk
|
|
int64_t freeSize = std::min(getAvailableSize(), diskQueueBytes.free / 4 - uncommittedBytes);
|
|
int64_t availableSize = std::min(getAvailableSize(), diskQueueBytes.available / 4 - uncommittedBytes);
|
|
int64_t totalSize = std::min(memoryLimit, diskQueueBytes.total / 4 - uncommittedBytes);
|
|
|
|
return StorageBytes(std::max((int64_t)0, freeSize), std::max((int64_t)0, totalSize), diskQueueBytes.used,
|
|
std::max((int64_t)0, availableSize));
|
|
}
|
|
|
|
void semiCommit() {
|
|
transactionSize += queue.totalSize();
|
|
if (transactionSize > 0.5 * committedDataSize) {
|
|
transactionIsLarge = true;
|
|
TraceEvent("KVSMemSwitchingToLargeTransactionMode", id)
|
|
.detail("TransactionSize", transactionSize)
|
|
.detail("DataSize", committedDataSize);
|
|
TEST(true); // KeyValueStoreMemory switching to large transaction mode
|
|
TEST(committedDataSize >
|
|
1e3); // KeyValueStoreMemory switching to large transaction mode with committed data
|
|
}
|
|
|
|
int64_t bytesWritten = commit_queue(queue, true);
|
|
committedWriteBytes += bytesWritten;
|
|
}
|
|
|
|
virtual void set(KeyValueRef keyValue, const Arena* arena) {
|
|
// A commit that occurs with no available space returns Never, so we can throw out all modifications
|
|
if (getAvailableSize() <= 0) return;
|
|
|
|
if (transactionIsLarge) {
|
|
data.insert(keyValue.key, keyValue.value);
|
|
} else {
|
|
queue.set(keyValue, arena);
|
|
if (recovering.isReady() && !disableSnapshot) {
|
|
semiCommit();
|
|
}
|
|
}
|
|
}
|
|
|
|
virtual void clear(KeyRangeRef range, const Arena* arena) {
|
|
// A commit that occurs with no available space returns Never, so we can throw out all modifications
|
|
if (getAvailableSize() <= 0) return;
|
|
|
|
if (transactionIsLarge) {
|
|
data.erase(data.lower_bound(range.begin), data.lower_bound(range.end));
|
|
} else {
|
|
queue.clear(range, arena);
|
|
if (recovering.isReady() && !disableSnapshot) {
|
|
semiCommit();
|
|
}
|
|
}
|
|
}
|
|
|
|
virtual Future<Void> commit(bool sequential) {
|
|
if(getAvailableSize() <= 0) {
|
|
TraceEvent(SevError, "KeyValueStoreMemory_OutOfSpace", id);
|
|
return Never();
|
|
}
|
|
|
|
if (recovering.isError()) throw recovering.getError();
|
|
if (!recovering.isReady()) return waitAndCommit(this, sequential);
|
|
|
|
if (!disableSnapshot && replaceContent && !firstCommitWithSnapshot) {
|
|
transactionSize += SERVER_KNOBS->REPLACE_CONTENTS_BYTES;
|
|
committedWriteBytes += SERVER_KNOBS->REPLACE_CONTENTS_BYTES;
|
|
semiCommit();
|
|
}
|
|
|
|
if (transactionIsLarge) {
|
|
fullSnapshot(data);
|
|
resetSnapshot = true;
|
|
committedWriteBytes = notifiedCommittedWriteBytes.get();
|
|
overheadWriteBytes = 0;
|
|
|
|
if (disableSnapshot) {
|
|
return Void();
|
|
}
|
|
log_op(OpCommit, StringRef(), StringRef());
|
|
} else {
|
|
int64_t bytesWritten = commit_queue(queue, !disableSnapshot, sequential);
|
|
|
|
if (disableSnapshot) {
|
|
return Void();
|
|
}
|
|
|
|
if (bytesWritten > 0 || committedWriteBytes > notifiedCommittedWriteBytes.get()) {
|
|
committedWriteBytes += bytesWritten + overheadWriteBytes +
|
|
OP_DISK_OVERHEAD; // OP_DISK_OVERHEAD is for the following log_op(OpCommit)
|
|
notifiedCommittedWriteBytes.set(committedWriteBytes); // This set will cause snapshot items to be
|
|
// written, so it must happen before the OpCommit
|
|
log_op(OpCommit, StringRef(), StringRef());
|
|
overheadWriteBytes = log->getCommitOverhead();
|
|
}
|
|
}
|
|
|
|
auto c = log->commit();
|
|
|
|
committedDataSize = data.sumTo(data.end());
|
|
transactionSize = 0;
|
|
transactionIsLarge = false;
|
|
firstCommitWithSnapshot = false;
|
|
|
|
addActor.send(commitAndUpdateVersions(this, c, previousSnapshotEnd));
|
|
return c;
|
|
}
|
|
|
|
virtual Future<Optional<Value>> readValue(KeyRef key, Optional<UID> debugID = Optional<UID>()) {
|
|
if (recovering.isError()) throw recovering.getError();
|
|
if (!recovering.isReady()) return waitAndReadValue(this, key);
|
|
|
|
auto it = data.find(key);
|
|
if (it == data.end()) return Optional<Value>();
|
|
return Optional<Value>(it.getValue());
|
|
}
|
|
|
|
virtual Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength,
|
|
Optional<UID> debugID = Optional<UID>()) {
|
|
if (recovering.isError()) throw recovering.getError();
|
|
if (!recovering.isReady()) return waitAndReadValuePrefix(this, key, maxLength);
|
|
|
|
auto it = data.find(key);
|
|
if (it == data.end()) return Optional<Value>();
|
|
auto val = it.getValue();
|
|
if (maxLength < val.size()) {
|
|
return Optional<Value>(val.substr(0, maxLength));
|
|
} else {
|
|
return Optional<Value>(val);
|
|
}
|
|
}
|
|
|
|
// If rowLimit>=0, reads first rows sorted ascending, otherwise reads last rows sorted descending
|
|
// The total size of the returned value (less the last entry) will be less than byteLimit
|
|
virtual Future<Standalone<RangeResultRef>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 ) {
|
|
if(recovering.isError()) throw recovering.getError();
|
|
if (!recovering.isReady()) return waitAndReadRange(this, keys, rowLimit, byteLimit);
|
|
|
|
Standalone<RangeResultRef> result;
|
|
if (rowLimit == 0) {
|
|
return result;
|
|
}
|
|
|
|
if (rowLimit > 0) {
|
|
auto it = data.lower_bound(keys.begin);
|
|
while (it != data.end() && rowLimit && byteLimit > 0) {
|
|
StringRef tempKey = it.getKey(reserved_buffer);
|
|
if (tempKey >= keys.end) break;
|
|
|
|
byteLimit -= sizeof(KeyValueRef) + tempKey.size() + it.getValue().size();
|
|
result.push_back_deep(result.arena(), KeyValueRef(tempKey, it.getValue()));
|
|
++it;
|
|
--rowLimit;
|
|
}
|
|
} else {
|
|
rowLimit = -rowLimit;
|
|
auto it = data.previous(data.lower_bound(keys.end));
|
|
while (it != data.end() && rowLimit && byteLimit > 0) {
|
|
StringRef tempKey = it.getKey(reserved_buffer);
|
|
if (tempKey < keys.begin) break;
|
|
|
|
byteLimit -= sizeof(KeyValueRef) + tempKey.size() + it.getValue().size();
|
|
result.push_back_deep(result.arena(), KeyValueRef(tempKey, it.getValue()));
|
|
it = data.previous(it);
|
|
--rowLimit;
|
|
}
|
|
}
|
|
|
|
result.more = rowLimit == 0 || byteLimit <= 0;
|
|
if(result.more) {
|
|
ASSERT(result.size() > 0);
|
|
result.readThrough = result[result.size()-1].key;
|
|
}
|
|
return result;
|
|
}
|
|
|
|
virtual void resyncLog() {
|
|
ASSERT(recovering.isReady());
|
|
resetSnapshot = true;
|
|
log_op(OpSnapshotAbort, StringRef(), StringRef());
|
|
}
|
|
|
|
virtual void enableSnapshot() { disableSnapshot = false; }
|
|
|
|
private:
|
|
enum OpType {
|
|
OpSet,
|
|
OpClear,
|
|
OpClearToEnd,
|
|
OpSnapshotItem,
|
|
OpSnapshotEnd,
|
|
OpSnapshotAbort, // terminate an in progress snapshot in order to start a full snapshot
|
|
OpCommit, // only in log, not in queue
|
|
OpRollback, // only in log, not in queue
|
|
OpSnapshotItemDelta
|
|
};
|
|
|
|
struct OpRef {
|
|
OpType op;
|
|
StringRef p1, p2;
|
|
OpRef() {}
|
|
OpRef(Arena& a, OpRef const& o) : op(o.op), p1(a, o.p1), p2(a, o.p2) {}
|
|
size_t expectedSize() { return p1.expectedSize() + p2.expectedSize(); }
|
|
};
|
|
struct OpHeader {
|
|
int op;
|
|
int len1, len2;
|
|
};
|
|
|
|
struct OpQueue {
|
|
OpQueue() : numBytes(0) {}
|
|
|
|
int totalSize() const { return numBytes; }
|
|
|
|
void clear() {
|
|
numBytes = 0;
|
|
operations = Standalone<VectorRef<OpRef>>();
|
|
arenas.clear();
|
|
}
|
|
|
|
void rollback() { clear(); }
|
|
|
|
void set(KeyValueRef keyValue, const Arena* arena = NULL) {
|
|
queue_op(OpSet, keyValue.key, keyValue.value, arena);
|
|
}
|
|
|
|
void clear(KeyRangeRef range, const Arena* arena = NULL) { queue_op(OpClear, range.begin, range.end, arena); }
|
|
|
|
void clear_to_end(StringRef fromKey, const Arena* arena = NULL) {
|
|
queue_op(OpClearToEnd, fromKey, StringRef(), arena);
|
|
}
|
|
|
|
void queue_op(OpType op, StringRef p1, StringRef p2, const Arena* arena) {
|
|
numBytes += p1.size() + p2.size() + sizeof(OpHeader) + sizeof(OpRef);
|
|
|
|
OpRef r;
|
|
r.op = op;
|
|
r.p1 = p1;
|
|
r.p2 = p2;
|
|
if (arena == NULL) {
|
|
operations.push_back_deep(operations.arena(), r);
|
|
} else {
|
|
operations.push_back(operations.arena(), r);
|
|
arenas.push_back(*arena);
|
|
}
|
|
}
|
|
|
|
const OpRef* begin() { return operations.begin(); }
|
|
|
|
const OpRef* end() { return operations.end(); }
|
|
|
|
private:
|
|
Standalone<VectorRef<OpRef>> operations;
|
|
uint64_t numBytes;
|
|
std::vector<Arena> arenas;
|
|
};
|
|
KeyValueStoreType type;
|
|
UID id;
|
|
|
|
Container data;
|
|
// reserved buffer for snapshot/fullsnapshot
|
|
uint8_t* reserved_buffer;
|
|
|
|
OpQueue queue; // mutations not yet commit()ted
|
|
IDiskQueue* log;
|
|
Future<Void> recovering, snapshotting;
|
|
int64_t committedWriteBytes;
|
|
int64_t overheadWriteBytes;
|
|
NotifiedVersion notifiedCommittedWriteBytes;
|
|
Key recoveredSnapshotKey; // After recovery, the next key in the currently uncompleted snapshot
|
|
IDiskQueue::location currentSnapshotEnd; // The end of the most recently completed snapshot (this snapshot cannot be discarded)
|
|
IDiskQueue::location previousSnapshotEnd; // The end of the second most recently completed snapshot (on commit, this
|
|
// snapshot can be discarded)
|
|
PromiseStream<Future<Void>> addActor;
|
|
Future<Void> commitActors;
|
|
|
|
int64_t committedDataSize;
|
|
int64_t transactionSize;
|
|
bool transactionIsLarge;
|
|
|
|
bool resetSnapshot; // Set to true after a fullSnapshot is performed. This causes the regular snapshot mechanism to
|
|
// restart
|
|
bool disableSnapshot;
|
|
bool replaceContent;
|
|
bool firstCommitWithSnapshot;
|
|
int snapshotCount;
|
|
|
|
int64_t memoryLimit; // The upper limit on the memory used by the store (excluding, possibly, some clear operations)
|
|
std::vector<std::pair<KeyValueMapPair, uint64_t>> dataSets;
|
|
|
|
int64_t commit_queue(OpQueue& ops, bool log, bool sequential = false) {
|
|
int64_t total = 0, count = 0;
|
|
IDiskQueue::location log_location = 0;
|
|
|
|
for (auto o = ops.begin(); o != ops.end(); ++o) {
|
|
++count;
|
|
total += o->p1.size() + o->p2.size() + OP_DISK_OVERHEAD;
|
|
if (o->op == OpSet) {
|
|
if (sequential) {
|
|
KeyValueMapPair pair(o->p1, o->p2);
|
|
dataSets.push_back(std::make_pair(pair, pair.arena.getSize() + data.getElementBytes()));
|
|
} else {
|
|
data.insert(o->p1, o->p2);
|
|
}
|
|
} else if (o->op == OpClear) {
|
|
if (sequential) {
|
|
data.insert(dataSets);
|
|
dataSets.clear();
|
|
}
|
|
data.erase(data.lower_bound(o->p1), data.lower_bound(o->p2));
|
|
} else if (o->op == OpClearToEnd) {
|
|
if (sequential) {
|
|
data.insert(dataSets);
|
|
dataSets.clear();
|
|
}
|
|
data.erase(data.lower_bound(o->p1), data.end());
|
|
} else
|
|
ASSERT(false);
|
|
if (log) log_location = log_op(o->op, o->p1, o->p2);
|
|
}
|
|
if (sequential) {
|
|
data.insert(dataSets);
|
|
dataSets.clear();
|
|
}
|
|
|
|
bool ok = count < 1e6;
|
|
if( !ok ) {
|
|
TraceEvent(/*ok ? SevInfo : */SevWarnAlways, "KVSMemCommitQueue", id)
|
|
.detail("Bytes", total)
|
|
.detail("Log", log)
|
|
.detail("Ops", count)
|
|
.detail("LastLoggedLocation", log_location)
|
|
.detail("Details", count);
|
|
}
|
|
|
|
ops.clear();
|
|
return total;
|
|
}
|
|
|
|
IDiskQueue::location log_op(OpType op, StringRef v1, StringRef v2) {
|
|
OpHeader h = { (int)op, v1.size(), v2.size() };
|
|
log->push(StringRef((const uint8_t*)&h, sizeof(h)));
|
|
log->push(v1);
|
|
log->push(v2);
|
|
return log->push(LiteralStringRef("\x01")); // Changes here should be reflected in OP_DISK_OVERHEAD
|
|
}
|
|
|
|
ACTOR static Future<Void> recover( KeyValueStoreMemory* self, bool exactRecovery ) {
|
|
loop {
|
|
// 'uncommitted' variables track something that might be rolled back by an OpRollback, and are copied into permanent variables
|
|
// (in self) in OpCommit. OpRollback does the reverse (copying the permanent versions over the uncommitted versions)
|
|
// the uncommitted and committed variables should be equal initially (to whatever makes sense if there are no committed transactions recovered)
|
|
state Key uncommittedNextKey = self->recoveredSnapshotKey;
|
|
state IDiskQueue::location uncommittedPrevSnapshotEnd = self->previousSnapshotEnd = self->log->getNextReadLocation(); // not really, but popping up to here does nothing
|
|
state IDiskQueue::location uncommittedSnapshotEnd = self->currentSnapshotEnd = uncommittedPrevSnapshotEnd;
|
|
|
|
state int zeroFillSize = 0;
|
|
state int dbgSnapshotItemCount=0;
|
|
state int dbgSnapshotEndCount=0;
|
|
state int dbgMutationCount=0;
|
|
state int dbgCommitCount=0;
|
|
state double startt = now();
|
|
state UID dbgid = self->id;
|
|
|
|
state Future<Void> loggingDelay = delay(1.0);
|
|
|
|
state OpQueue recoveryQueue;
|
|
state OpHeader h;
|
|
state Standalone<StringRef> lastSnapshotKey;
|
|
|
|
TraceEvent("KVSMemRecoveryStarted", self->id)
|
|
.detail("SnapshotEndLocation", uncommittedSnapshotEnd);
|
|
|
|
try {
|
|
loop {
|
|
{
|
|
Standalone<StringRef> data = wait( self->log->readNext( sizeof(OpHeader) ) );
|
|
if (data.size() != sizeof(OpHeader)) {
|
|
if (data.size()) {
|
|
TEST(true); // zero fill partial header in KeyValueStoreMemory
|
|
memset(&h, 0, sizeof(OpHeader));
|
|
memcpy(&h, data.begin(), data.size());
|
|
zeroFillSize = sizeof(OpHeader)-data.size() + h.len1 + h.len2 + 1;
|
|
}
|
|
TraceEvent("KVSMemRecoveryComplete", self->id)
|
|
.detail("Reason", "Non-header sized data read")
|
|
.detail("DataSize", data.size())
|
|
.detail("ZeroFillSize", zeroFillSize)
|
|
.detail("SnapshotEndLocation", uncommittedSnapshotEnd)
|
|
.detail("NextReadLoc", self->log->getNextReadLocation());
|
|
break;
|
|
}
|
|
h = *(OpHeader*)data.begin();
|
|
}
|
|
Standalone<StringRef> data = wait( self->log->readNext( h.len1 + h.len2+1 ) );
|
|
if (data.size() != h.len1 + h.len2 + 1) {
|
|
zeroFillSize = h.len1 + h.len2 + 1 - data.size();
|
|
TraceEvent("KVSMemRecoveryComplete", self->id)
|
|
.detail("Reason", "data specified by header does not exist")
|
|
.detail("DataSize", data.size())
|
|
.detail("ZeroFillSize", zeroFillSize)
|
|
.detail("SnapshotEndLocation", uncommittedSnapshotEnd)
|
|
.detail("OpCode", h.op)
|
|
.detail("NextReadLoc", self->log->getNextReadLocation());
|
|
break;
|
|
}
|
|
|
|
if (data[data.size()-1]) {
|
|
StringRef p1 = data.substr(0, h.len1);
|
|
StringRef p2 = data.substr(h.len1, h.len2);
|
|
|
|
if (h.op == OpSnapshotItem || h.op == OpSnapshotItemDelta) { // snapshot data item
|
|
/*if (p1 < uncommittedNextKey) {
|
|
TraceEvent(SevError, "RecSnapshotBack", self->id)
|
|
.detail("NextKey", uncommittedNextKey)
|
|
.detail("P1", p1)
|
|
.detail("Nextlocation", self->log->getNextReadLocation());
|
|
}
|
|
ASSERT( p1 >= uncommittedNextKey );*/
|
|
if(h.op == OpSnapshotItemDelta) {
|
|
ASSERT(p1.size() > 1);
|
|
// Get number of bytes borrowed from previous item key
|
|
int borrowed = *(uint8_t *)p1.begin();
|
|
ASSERT(borrowed <= lastSnapshotKey.size());
|
|
// Trim p1 to just the suffix
|
|
StringRef suffix = p1.substr(1);
|
|
// Allocate a new string in data arena to hold prefix + suffix
|
|
Arena &dataArena = *(Arena *)&data.arena();
|
|
p1 = makeString(borrowed + suffix.size(), dataArena);
|
|
// Copy the prefix into the new reconstituted key
|
|
memcpy(mutateString(p1), lastSnapshotKey.begin(), borrowed);
|
|
// Copy the suffix into the new reconstituted key
|
|
memcpy(mutateString(p1) + borrowed, suffix.begin(), suffix.size());
|
|
}
|
|
if( p1 >= uncommittedNextKey )
|
|
recoveryQueue.clear( KeyRangeRef(uncommittedNextKey, p1), &uncommittedNextKey.arena() ); //FIXME: Not sure what this line is for, is it necessary?
|
|
recoveryQueue.set( KeyValueRef(p1, p2), &data.arena() );
|
|
uncommittedNextKey = keyAfter(p1);
|
|
++dbgSnapshotItemCount;
|
|
lastSnapshotKey = Key(p1, data.arena());
|
|
} else if (h.op == OpSnapshotEnd || h.op == OpSnapshotAbort) { // snapshot complete
|
|
TraceEvent("RecSnapshotEnd", self->id)
|
|
.detail("NextKey", uncommittedNextKey)
|
|
.detail("Nextlocation", self->log->getNextReadLocation())
|
|
.detail("IsSnapshotEnd", h.op == OpSnapshotEnd);
|
|
|
|
if(h.op == OpSnapshotEnd) {
|
|
uncommittedPrevSnapshotEnd = uncommittedSnapshotEnd;
|
|
uncommittedSnapshotEnd = self->log->getNextReadLocation();
|
|
recoveryQueue.clear_to_end( uncommittedNextKey, &uncommittedNextKey.arena() );
|
|
}
|
|
|
|
uncommittedNextKey = Key();
|
|
lastSnapshotKey = Key();
|
|
++dbgSnapshotEndCount;
|
|
} else if (h.op == OpSet) { // set mutation
|
|
recoveryQueue.set( KeyValueRef(p1,p2), &data.arena() );
|
|
++dbgMutationCount;
|
|
} else if (h.op == OpClear) { // clear mutation
|
|
recoveryQueue.clear( KeyRangeRef(p1,p2), &data.arena() );
|
|
++dbgMutationCount;
|
|
} else if (h.op == OpClearToEnd) { //clear all data from begin key to end
|
|
recoveryQueue.clear_to_end( p1, &data.arena() );
|
|
} else if (h.op == OpCommit) { // commit previous transaction
|
|
self->commit_queue(recoveryQueue, false);
|
|
++dbgCommitCount;
|
|
self->recoveredSnapshotKey = uncommittedNextKey;
|
|
self->previousSnapshotEnd = uncommittedPrevSnapshotEnd;
|
|
self->currentSnapshotEnd = uncommittedSnapshotEnd;
|
|
} else if (h.op == OpRollback) { // rollback previous transaction
|
|
recoveryQueue.rollback();
|
|
TraceEvent("KVSMemRecSnapshotRollback", self->id)
|
|
.detail("NextKey", uncommittedNextKey);
|
|
uncommittedNextKey = self->recoveredSnapshotKey;
|
|
uncommittedPrevSnapshotEnd = self->previousSnapshotEnd;
|
|
uncommittedSnapshotEnd = self->currentSnapshotEnd;
|
|
} else
|
|
ASSERT(false);
|
|
} else {
|
|
TraceEvent("KVSMemRecoverySkippedZeroFill", self->id)
|
|
.detail("PayloadSize", data.size())
|
|
.detail("ExpectedSize", h.len1 + h.len2 + 1)
|
|
.detail("OpCode", h.op)
|
|
.detail("EndsAt", self->log->getNextReadLocation());
|
|
}
|
|
|
|
if (loggingDelay.isReady()) {
|
|
TraceEvent("KVSMemRecoveryLogSnap", self->id)
|
|
.detail("SnapshotItems", dbgSnapshotItemCount)
|
|
.detail("SnapshotEnd", dbgSnapshotEndCount)
|
|
.detail("Mutations", dbgMutationCount)
|
|
.detail("Commits", dbgCommitCount)
|
|
.detail("EndsAt", self->log->getNextReadLocation());
|
|
loggingDelay = delay(1.0);
|
|
}
|
|
|
|
wait( yield() );
|
|
}
|
|
|
|
if (zeroFillSize) {
|
|
if( exactRecovery ) {
|
|
TraceEvent(SevError, "KVSMemExpectedExact", self->id);
|
|
ASSERT(false);
|
|
}
|
|
|
|
TEST( true ); // Fixing a partial commit at the end of the KeyValueStoreMemory log
|
|
for(int i=0; i<zeroFillSize; i++)
|
|
self->log->push( StringRef((const uint8_t*)"",1) );
|
|
}
|
|
//self->rollback(); not needed, since we are about to discard anything left in the recoveryQueue
|
|
//TraceEvent("KVSMemRecRollback", self->id).detail("QueueEmpty", data.size() == 0);
|
|
// make sure that before any new operations are added to the log that all uncommitted operations are "rolled back"
|
|
self->log_op( OpRollback, StringRef(), StringRef() ); // rollback previous transaction
|
|
|
|
self->committedDataSize = self->data.sumTo(self->data.end());
|
|
|
|
TraceEvent("KVSMemRecovered", self->id)
|
|
.detail("SnapshotItems", dbgSnapshotItemCount)
|
|
.detail("SnapshotEnd", dbgSnapshotEndCount)
|
|
.detail("Mutations", dbgMutationCount)
|
|
.detail("Commits", dbgCommitCount)
|
|
.detail("TimeTaken", now()-startt);
|
|
|
|
self->semiCommit();
|
|
return Void();
|
|
} catch( Error &e ) {
|
|
bool ok = e.code() == error_code_operation_cancelled || e.code() == error_code_file_not_found || e.code() == error_code_disk_adapter_reset;
|
|
TraceEvent(ok ? SevInfo : SevError, "ErrorDuringRecovery", dbgid).error(e, true);
|
|
if(e.code() != error_code_disk_adapter_reset) {
|
|
throw e;
|
|
}
|
|
self->data.clear();
|
|
self->dataSets.clear();
|
|
}
|
|
}
|
|
}
|
|
|
|
// Snapshots an entire data set
|
|
void fullSnapshot(Container& snapshotData) {
|
|
previousSnapshotEnd = log_op(OpSnapshotAbort, StringRef(), StringRef());
|
|
replaceContent = false;
|
|
|
|
// Clear everything since we are about to write the whole database
|
|
log_op(OpClearToEnd, allKeys.begin, StringRef());
|
|
|
|
int count = 0;
|
|
int64_t snapshotSize = 0;
|
|
for (auto kv = snapshotData.begin(); kv != snapshotData.end(); ++kv) {
|
|
StringRef tempKey = kv.getKey(reserved_buffer);
|
|
log_op(OpSnapshotItem, tempKey, kv.getValue());
|
|
snapshotSize += tempKey.size() + kv.getValue().size() + OP_DISK_OVERHEAD;
|
|
++count;
|
|
}
|
|
|
|
TraceEvent("FullSnapshotEnd", id)
|
|
.detail("PreviousSnapshotEndLoc", previousSnapshotEnd)
|
|
.detail("SnapshotSize", snapshotSize)
|
|
.detail("SnapshotElements", count);
|
|
|
|
currentSnapshotEnd = log_op(OpSnapshotEnd, StringRef(), StringRef());
|
|
}
|
|
|
|
ACTOR static Future<Void> snapshot( KeyValueStoreMemory* self ) {
|
|
wait(self->recovering);
|
|
|
|
state Key nextKey = self->recoveredSnapshotKey;
|
|
state bool nextKeyAfter = false; // setting this to true is equilvent to setting nextKey = keyAfter(nextKey)
|
|
state uint64_t snapshotTotalWrittenBytes = 0;
|
|
state int lastDiff = 0;
|
|
state int snapItems = 0;
|
|
state uint64_t snapshotBytes = 0;
|
|
|
|
// Snapshot keys will be alternately written to two preallocated buffers.
|
|
// This allows consecutive snapshot keys to be compared for delta compression while only copying each key's bytes once.
|
|
state Key lastSnapshotKeyA = makeString(CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT);
|
|
state Key lastSnapshotKeyB = makeString(CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT);
|
|
state bool lastSnapshotKeyUsingA = true;
|
|
|
|
TraceEvent("KVSMemStartingSnapshot", self->id).detail("StartKey", nextKey);
|
|
|
|
loop {
|
|
wait( self->notifiedCommittedWriteBytes.whenAtLeast( snapshotTotalWrittenBytes + 1 ) );
|
|
|
|
if (self->resetSnapshot) {
|
|
nextKey = Key();
|
|
nextKeyAfter = false;
|
|
snapItems = 0;
|
|
snapshotBytes = 0;
|
|
self->resetSnapshot = false;
|
|
}
|
|
|
|
auto next = nextKeyAfter ? self->data.upper_bound(nextKey) : self->data.lower_bound(nextKey);
|
|
int diff = self->notifiedCommittedWriteBytes.get() - snapshotTotalWrittenBytes;
|
|
if (diff > lastDiff && diff > 5e7)
|
|
TraceEvent(SevWarnAlways, "ManyWritesAtOnce", self->id)
|
|
.detail("CommittedWrites", self->notifiedCommittedWriteBytes.get())
|
|
.detail("SnapshotWrites", snapshotTotalWrittenBytes)
|
|
.detail("Diff", diff)
|
|
.detail("LastOperationWasASnapshot", nextKey == Key() && !nextKeyAfter);
|
|
lastDiff = diff;
|
|
|
|
// Since notifiedCommittedWriteBytes is only set() once per commit, before logging the commit operation, when
|
|
// this line is reached it is certain that there are no snapshot items in this commit yet. Since this commit
|
|
// could be the first thing read during recovery, we can't write a delta yet.
|
|
bool useDelta = false;
|
|
|
|
// Write snapshot items until the wait above would block because we've used up all of the byte budget
|
|
loop {
|
|
|
|
if (next == self->data.end()) {
|
|
// After a snapshot end is logged, recovery may not see the last snapshot item logged before it so the
|
|
// next snapshot item logged cannot be a delta.
|
|
useDelta = false;
|
|
|
|
auto thisSnapshotEnd = self->log_op(OpSnapshotEnd, StringRef(), StringRef());
|
|
//TraceEvent("SnapshotEnd", self->id)
|
|
// .detail("LastKey", lastKey.present() ? lastKey.get() : LiteralStringRef("<none>"))
|
|
// .detail("CurrentSnapshotEndLoc", self->currentSnapshotEnd)
|
|
// .detail("PreviousSnapshotEndLoc", self->previousSnapshotEnd)
|
|
// .detail("ThisSnapshotEnd", thisSnapshotEnd)
|
|
// .detail("Items", snapItems)
|
|
// .detail("CommittedWrites", self->notifiedCommittedWriteBytes.get())
|
|
// .detail("SnapshotSize", snapshotBytes);
|
|
|
|
ASSERT(thisSnapshotEnd >= self->currentSnapshotEnd);
|
|
self->previousSnapshotEnd = self->currentSnapshotEnd;
|
|
self->currentSnapshotEnd = thisSnapshotEnd;
|
|
|
|
if (++self->snapshotCount == 2) {
|
|
self->replaceContent = false;
|
|
}
|
|
|
|
snapItems = 0;
|
|
snapshotBytes = 0;
|
|
snapshotTotalWrittenBytes += OP_DISK_OVERHEAD;
|
|
|
|
// If we're not stopping now, reset next
|
|
if(snapshotTotalWrittenBytes < self->notifiedCommittedWriteBytes.get()) {
|
|
next = self->data.begin();
|
|
}
|
|
else {
|
|
// Otherwise, save state for continuing after the next wait and stop
|
|
nextKey = Key();
|
|
nextKeyAfter = false;
|
|
break;
|
|
}
|
|
|
|
} else {
|
|
// destKey is whichever of the two last key buffers we should write to next.
|
|
Key &destKey = lastSnapshotKeyUsingA ? lastSnapshotKeyA : lastSnapshotKeyB;
|
|
|
|
// Get the key, using destKey as a temporary buffer if needed.
|
|
KeyRef tempKey = next.getKey(mutateString(destKey));
|
|
int opKeySize = tempKey.size();
|
|
|
|
// If tempKey did not use the start of destKey, then copy tempKey into destKey.
|
|
// It's technically possible for the source and dest to overlap but with the current container implementations that will not happen.
|
|
if(tempKey.begin() != destKey.begin()) {
|
|
memcpy(mutateString(destKey), tempKey.begin(), tempKey.size());
|
|
}
|
|
|
|
// Now, tempKey's bytes definitely exist in memory at destKey.begin() so update destKey's contents to be a proper KeyRef of the key.
|
|
// This intentionally leaves the Arena alone and doesn't copy anything into it.
|
|
destKey.contents() = KeyRef(destKey.begin(), tempKey.size());
|
|
|
|
// Get the common prefix between this key and the previous one, or 0 if there was no previous one.
|
|
int commonPrefix;
|
|
if(useDelta && SERVER_KNOBS->PREFIX_COMPRESS_KVS_MEM_SNAPSHOTS) {
|
|
commonPrefix = commonPrefixLength(lastSnapshotKeyA, lastSnapshotKeyB);
|
|
}
|
|
else {
|
|
commonPrefix = 0;
|
|
useDelta = true;
|
|
}
|
|
|
|
// If the common prefix is greater than 1, write a delta item. It isn't worth doing for 0 or 1 bytes, it would merely add decode overhead (string copying).
|
|
if(commonPrefix > 1) {
|
|
// Cap the common prefix length to 255. Sorry, ridiculously long keys!
|
|
commonPrefix = std::min<int>(commonPrefix, std::numeric_limits<uint8_t>::max());
|
|
|
|
// We're going to temporarily write a 1-byte integer just before the key suffix to create the log op key and log it, then restore that byte.
|
|
uint8_t &prefixLength = mutateString(destKey)[commonPrefix - 1];
|
|
uint8_t backupByte = prefixLength;
|
|
prefixLength = commonPrefix;
|
|
|
|
opKeySize = opKeySize - commonPrefix + 1;
|
|
KeyRef opKey(&prefixLength, opKeySize);
|
|
self->log_op(OpSnapshotItemDelta, opKey, next.getValue());
|
|
|
|
// Restore the overwritten byte
|
|
prefixLength = backupByte;
|
|
}
|
|
else {
|
|
self->log_op(OpSnapshotItem, tempKey, next.getValue());
|
|
}
|
|
|
|
snapItems++;
|
|
uint64_t opBytes = opKeySize + next.getValue().size() + OP_DISK_OVERHEAD;
|
|
snapshotBytes += opBytes;
|
|
snapshotTotalWrittenBytes += opBytes;
|
|
lastSnapshotKeyUsingA = !lastSnapshotKeyUsingA;
|
|
|
|
// If we're not stopping now, increment next
|
|
if(snapshotTotalWrittenBytes < self->notifiedCommittedWriteBytes.get()) {
|
|
++next;
|
|
}
|
|
else {
|
|
// Otherwise, save state for continuing after the next wait and stop
|
|
nextKey = destKey;
|
|
nextKeyAfter = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
ACTOR static Future<Optional<Value>> waitAndReadValue( KeyValueStoreMemory* self, Key key ) {
|
|
wait( self->recovering );
|
|
return self->readValue(key).get();
|
|
}
|
|
ACTOR static Future<Optional<Value>> waitAndReadValuePrefix( KeyValueStoreMemory* self, Key key, int maxLength) {
|
|
wait( self->recovering );
|
|
return self->readValuePrefix(key, maxLength).get();
|
|
}
|
|
ACTOR static Future<Standalone<RangeResultRef>> waitAndReadRange( KeyValueStoreMemory* self, KeyRange keys, int rowLimit, int byteLimit ) {
|
|
wait( self->recovering );
|
|
return self->readRange(keys, rowLimit, byteLimit).get();
|
|
}
|
|
ACTOR static Future<Void> waitAndCommit(KeyValueStoreMemory* self, bool sequential) {
|
|
wait(self->recovering);
|
|
wait(self->commit(sequential));
|
|
return Void();
|
|
}
|
|
ACTOR static Future<Void> commitAndUpdateVersions( KeyValueStoreMemory* self, Future<Void> commit, IDiskQueue::location location ) {
|
|
wait( commit );
|
|
self->log->pop(location);
|
|
return Void();
|
|
}
|
|
};
|
|
|
|
template <typename Container>
|
|
KeyValueStoreMemory<Container>::KeyValueStoreMemory(IDiskQueue* log, UID id, int64_t memoryLimit,
|
|
KeyValueStoreType storeType, bool disableSnapshot,
|
|
bool replaceContent, bool exactRecovery)
|
|
: log(log), id(id), type(storeType), previousSnapshotEnd(-1), currentSnapshotEnd(-1), resetSnapshot(false),
|
|
memoryLimit(memoryLimit), committedWriteBytes(0), overheadWriteBytes(0), committedDataSize(0), transactionSize(0),
|
|
transactionIsLarge(false), disableSnapshot(disableSnapshot), replaceContent(replaceContent), snapshotCount(0),
|
|
firstCommitWithSnapshot(true) {
|
|
// create reserved buffer for radixtree store type
|
|
this->reserved_buffer =
|
|
(storeType == KeyValueStoreType::MEMORY) ? nullptr : new uint8_t[CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT];
|
|
if (this->reserved_buffer != nullptr) memset(this->reserved_buffer, 0, CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT);
|
|
|
|
recovering = recover(this, exactRecovery);
|
|
snapshotting = snapshot(this);
|
|
commitActors = actorCollection(addActor.getFuture());
|
|
}
|
|
|
|
IKeyValueStore* keyValueStoreMemory(std::string const& basename, UID logID, int64_t memoryLimit, std::string ext,
|
|
KeyValueStoreType storeType) {
|
|
TraceEvent("KVSMemOpening", logID)
|
|
.detail("Basename", basename)
|
|
.detail("MemoryLimit", memoryLimit)
|
|
.detail("StoreType", storeType);
|
|
|
|
IDiskQueue *log = openDiskQueue( basename, ext, logID, DiskQueueVersion::V1 );
|
|
if(storeType == KeyValueStoreType::MEMORY_RADIXTREE){
|
|
return new KeyValueStoreMemory<radix_tree>(log, logID, memoryLimit, storeType, false, false, false);
|
|
} else {
|
|
return new KeyValueStoreMemory<IKeyValueContainer>(log, logID, memoryLimit, storeType, false, false, false);
|
|
}
|
|
}
|
|
|
|
IKeyValueStore* keyValueStoreLogSystem( class IDiskQueue* queue, UID logID, int64_t memoryLimit, bool disableSnapshot, bool replaceContent, bool exactRecovery ) {
|
|
return new KeyValueStoreMemory<IKeyValueContainer>(queue, logID, memoryLimit, KeyValueStoreType::MEMORY,
|
|
disableSnapshot, replaceContent, exactRecovery);
|
|
}
|