Implemented concurrent read limit in IKeyValueStore interface for Redwood. Added knobs for Redwood page size, concurrent read limit, and page fill factor. Changed commitSubtree() recursion back to use a vector and waitForAll() because it seems to be lower overhead than ActorCollection.
This commit is contained in:
parent
ffbed1d84c
commit
43f9e4dfad
|
@ -606,6 +606,10 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
init( FASTRESTORE_TXN_CLEAR_MAX, 1000 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_CLEAR_MAX = deterministicRandom()->random01() * 100 + 1; }
|
||||
init( FASTRESTORE_TXN_RETRY_MAX, 10 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_RETRY_MAX = deterministicRandom()->random01() * 100 + 1; }
|
||||
|
||||
init( REDWOOD_DEFAULT_PAGE_SIZE, 4096 );
|
||||
init( REDWOOD_KVSTORE_CONCURRENT_READS, 64 );
|
||||
init( REDWOOD_PAGE_REBUILD_FILL_FACTOR, 0.66 );
|
||||
|
||||
// clang-format on
|
||||
|
||||
if(clientKnobs)
|
||||
|
|
|
@ -537,6 +537,10 @@ public:
|
|||
int FASTRESTORE_TXN_CLEAR_MAX; // threshold to start tracking each clear op in a txn
|
||||
int FASTRESTORE_TXN_RETRY_MAX; // threshold to start output error on too many retries
|
||||
|
||||
int REDWOOD_DEFAULT_PAGE_SIZE; // Page size for new Redwood files
|
||||
int REDWOOD_KVSTORE_CONCURRENT_READS; // Max number of simultaneous point or range reads in progress.
|
||||
double REDWOOD_PAGE_REBUILD_FILL_FACTOR; // When rebuilding pages, start a new page after this capacity
|
||||
|
||||
ServerKnobs();
|
||||
void initialize(bool randomize = false, ClientKnobs* clientKnobs = NULL, bool isSimulated = false);
|
||||
};
|
||||
|
|
|
@ -944,10 +944,10 @@ public:
|
|||
|
||||
void setPageSize(int size) {
|
||||
logicalPageSize = size;
|
||||
physicalPageSize = smallestPhysicalBlock;
|
||||
while (logicalPageSize > physicalPageSize) {
|
||||
physicalPageSize += smallestPhysicalBlock;
|
||||
}
|
||||
// Physical page size is the total size of the smallest number of physical blocks needed to store
|
||||
// logicalPageSize bytes
|
||||
int blocks = 1 + ((logicalPageSize - 1) / smallestPhysicalBlock);
|
||||
physicalPageSize = blocks * smallestPhysicalBlock;
|
||||
if (pHeader != nullptr) {
|
||||
pHeader->pageSize = logicalPageSize;
|
||||
}
|
||||
|
@ -3186,8 +3186,7 @@ private:
|
|||
// This is how much space for the binary tree exists in the page, after the header
|
||||
state int blockSize = self->m_pager->getUsablePageSize();
|
||||
state int pageSize = blockSize - sizeof(BTreePage);
|
||||
state float fillFactor = 0.66; // TODO: Make this a knob
|
||||
state int pageFillTarget = pageSize * fillFactor;
|
||||
state int pageFillTarget = pageSize * SERVER_KNOBS->REDWOOD_PAGE_REBUILD_FILL_FACTOR;
|
||||
state int blockCount = 1;
|
||||
|
||||
state int kvBytes = 0;
|
||||
|
@ -3259,7 +3258,7 @@ private:
|
|||
|
||||
blockCount += newBlocks;
|
||||
pageSize = newPageSize;
|
||||
pageFillTarget = pageSize * fillFactor;
|
||||
pageFillTarget = pageSize * SERVER_KNOBS->REDWOOD_PAGE_REBUILD_FILL_FACTOR;
|
||||
}
|
||||
|
||||
kvBytes += keySize + valueSize;
|
||||
|
@ -4117,8 +4116,7 @@ private:
|
|||
return Void();
|
||||
} else {
|
||||
// Internal Page
|
||||
SignalableActorCollection recursions;
|
||||
state int recursionsCount = 0;
|
||||
std::vector<Future<Void>> recursions;
|
||||
state std::vector<InternalPageSliceUpdate*> slices;
|
||||
state Arena arena;
|
||||
|
||||
|
@ -4269,15 +4267,17 @@ private:
|
|||
}
|
||||
|
||||
// If this page has height of 2 then its children are leaf nodes
|
||||
++recursionsCount;
|
||||
recursions.add(self->commitSubtree(self, snapshot, mutationBuffer, pageID, btPage->height == 2, mBegin,
|
||||
mEnd, slices.back()));
|
||||
recursions.push_back(self->commitSubtree(self, snapshot, mutationBuffer, pageID, btPage->height == 2,
|
||||
mBegin, mEnd, slices.back()));
|
||||
}
|
||||
|
||||
wait(recursions.signal());
|
||||
debug_printf(
|
||||
"%s Recursions done. Processing child range updates. level=%d children=%d slices=%d recursions=%d\n",
|
||||
context.c_str(), btPage->height, btPage->tree().numItems, slices.size(), recursionsCount);
|
||||
"%s Recursions from internal page started. pageSize=%d level=%d children=%d slices=%d recursions=%d\n",
|
||||
context.c_str(), btPage->size(), btPage->height, btPage->tree().numItems, slices.size(),
|
||||
recursions.size());
|
||||
|
||||
wait(waitForAll(recursions));
|
||||
debug_printf("%s Recursions done, processing slice updates.\n", context.c_str());
|
||||
|
||||
state InternalPageModifier m(cursor.mirror, tryToUpdate);
|
||||
|
||||
|
@ -4905,9 +4905,10 @@ VersionedBTree::Counts VersionedBTree::counts;
|
|||
|
||||
class KeyValueStoreRedwoodUnversioned : public IKeyValueStore {
|
||||
public:
|
||||
KeyValueStoreRedwoodUnversioned(std::string filePrefix, UID logID) : m_filePrefix(filePrefix) {
|
||||
KeyValueStoreRedwoodUnversioned(std::string filePrefix, UID logID)
|
||||
: m_filePrefix(filePrefix), m_concurrentReads(SERVER_KNOBS->REDWOOD_KVSTORE_CONCURRENT_READS) {
|
||||
// TODO: This constructor should really just take an IVersionedStore
|
||||
IPager2* pager = new DWALPager(4096, filePrefix, 0);
|
||||
IPager2* pager = new DWALPager(SERVER_KNOBS->REDWOOD_DEFAULT_PAGE_SIZE, filePrefix, 0);
|
||||
m_tree = new VersionedBTree(pager, filePrefix);
|
||||
m_init = catchError(init_impl(this));
|
||||
}
|
||||
|
@ -4978,6 +4979,9 @@ public:
|
|||
|
||||
ACTOR static Future<Standalone<RangeResultRef>> readRange_impl(KeyValueStoreRedwoodUnversioned* self, KeyRange keys,
|
||||
int rowLimit, int byteLimit) {
|
||||
wait(self->m_concurrentReads.take());
|
||||
state FlowLock::Releaser releaser(self->m_concurrentReads);
|
||||
|
||||
self->m_tree->counts.getRanges++;
|
||||
state Standalone<RangeResultRef> result;
|
||||
state int accumulatedBytes = 0;
|
||||
|
@ -5027,6 +5031,9 @@ public:
|
|||
|
||||
ACTOR static Future<Optional<Value>> readValue_impl(KeyValueStoreRedwoodUnversioned* self, Key key,
|
||||
Optional<UID> debugID) {
|
||||
wait(self->m_concurrentReads.take());
|
||||
state FlowLock::Releaser releaser(self->m_concurrentReads);
|
||||
|
||||
self->m_tree->counts.gets++;
|
||||
state Reference<IStoreCursor> cur = self->m_tree->readAtVersion(self->m_tree->getLastCommittedVersion());
|
||||
|
||||
|
@ -5043,6 +5050,9 @@ public:
|
|||
|
||||
ACTOR static Future<Optional<Value>> readValuePrefix_impl(KeyValueStoreRedwoodUnversioned* self, Key key,
|
||||
int maxLength, Optional<UID> debugID) {
|
||||
wait(self->m_concurrentReads.take());
|
||||
state FlowLock::Releaser releaser(self->m_concurrentReads);
|
||||
|
||||
self->m_tree->counts.gets++;
|
||||
state Reference<IStoreCursor> cur = self->m_tree->readAtVersion(self->m_tree->getLastCommittedVersion());
|
||||
|
||||
|
@ -5067,6 +5077,7 @@ private:
|
|||
Future<Void> m_init;
|
||||
Promise<Void> m_closed;
|
||||
Promise<Void> m_error;
|
||||
FlowLock m_concurrentReads;
|
||||
|
||||
template <typename T>
|
||||
inline Future<T> catchError(Future<T> f) {
|
||||
|
|
Loading…
Reference in New Issue