Cache the storage queue results during first pass in readRange.

Currently, we may traverse the PTree backing the storae queue more than once
during the rangeRead operation. This is an attempt to cache the results during
the first traversal and avoid multiple PTree traversals in turn.
This commit is contained in:
negoyal 2019-08-21 17:51:30 -07:00
parent dd8ba6f08b
commit 4ba1725bb5
1 changed files with 58 additions and 26 deletions

View File

@ -1031,9 +1031,11 @@ ACTOR Future<Void> getShardStateQ( StorageServer* data, GetShardStateRequest req
return Void();
}
void merge( Arena& arena, VectorRef<KeyValueRef, VecSerStrategy::String>& output, VectorRef<KeyValueRef> const& base,
void merge( Arena& arena, VectorRef<KeyValueRef, VecSerStrategy::String>& output,
Arena& arenaCache, VectorRef<KeyValueRef, VecSerStrategy::String>& vm_output,
VectorRef<KeyValueRef> const& base,
StorageServer::VersionedData::iterator& start, StorageServer::VersionedData::iterator const& end,
int versionedDataCount, int limit, bool stopAtEndOfBase, int limitBytes = 1<<30 )
int& vCount, int limit, bool stopAtEndOfBase, int& pos, int limitBytes = 1<<30 )
// Combines data from base (at an older version) with sets from newer versions in [start, end) and appends the first (up to) |limit| rows to output
// If limit<0, base and output are in descending order, and start->key()>end->key(), but start is still inclusive and end is exclusive
{
@ -1042,16 +1044,23 @@ void merge( Arena& arena, VectorRef<KeyValueRef, VecSerStrategy::String>& output
bool forward = limit>0;
if (!forward) limit = -limit;
int accumulatedBytes = 0;
//int pos = 0; //forward? 0 : (vm_output.size() -1);
//int vCount = versionedDataCount;
KeyValueRef const* baseStart = base.begin();
KeyValueRef const* baseEnd = base.end();
while (baseStart!=baseEnd && start!=end && --limit>=0 && accumulatedBytes < limitBytes) {
if (forward ? baseStart->key < start.key() : baseStart->key > start.key())
while (baseStart!=baseEnd && /*start!=end */ vCount>0 && --limit>=0 && accumulatedBytes < limitBytes) {
//if (forward ? baseStart->key < start.key() : baseStart->key > start.key())
if (forward ? baseStart->key < vm_output[pos].key : baseStart->key > vm_output[pos].key)
output.push_back_deep( arena, *baseStart++ );
else {
output.push_back_deep( arena, KeyValueRef(start.key(), start->getValue()) );
if (baseStart->key == start.key()) ++baseStart;
if (forward) ++start; else --start;
//output.push_back_deep( arena, KeyValueRef(start.key(), start->getValue()) );
output.push_back_deep( arena, vm_output[pos]);
//if (baseStart->key == start.key()) ++baseStart;
if (baseStart->key == vm_output[pos].key) ++baseStart;
//if (forward) ++start; else --start;
++pos;
//if (forward) ++pos; else --pos;
vCount--;
}
accumulatedBytes += sizeof(KeyValueRef) + output.end()[-1].expectedSize();
}
@ -1060,10 +1069,14 @@ void merge( Arena& arena, VectorRef<KeyValueRef, VecSerStrategy::String>& output
accumulatedBytes += sizeof(KeyValueRef) + output.end()[-1].expectedSize();
}
if( !stopAtEndOfBase ) {
while (start!=end && --limit>=0 && accumulatedBytes < limitBytes) {
output.push_back_deep( arena, KeyValueRef(start.key(), start->getValue()) );
while (/*start!=end*/vCount>0 && --limit>=0 && accumulatedBytes < limitBytes) {
//output.push_back_deep( arena, KeyValueRef(start.key(), start->getValue()) );
output.push_back_deep( arena, vm_output[pos]);
accumulatedBytes += sizeof(KeyValueRef) + output.end()[-1].expectedSize();
if (forward) ++start; else --start;
//if (forward) ++start; else --start;
++pos;
//if (forward) ++pos; else --pos;
vCount--;
}
}
ASSERT( output.size() <= originalLimit );
@ -1074,13 +1087,15 @@ void merge( Arena& arena, VectorRef<KeyValueRef, VecSerStrategy::String>& output
// readRange has O(|result|) + O(log |data|) cost
ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version, KeyRange range, int limit, int* pLimitBytes ) {
state GetKeyValuesReply result;
state GetKeyValuesReply resultCache;
state StorageServer::VersionedData::ViewAtVersion view = data->data().at(version);
state StorageServer::VersionedData::iterator vStart = view.end();
state StorageServer::VersionedData::iterator vEnd = view.end();
state KeyRef readBegin;
state KeyRef readEnd;
state Key readBeginTemp;
state int vCount;
state int vCount = 0;
state int pos = 0;
//state UID rrid = deterministicRandom()->randomUniqueID();
//state int originalLimit = limit;
//state int originalLimitBytes = *pLimitBytes;
@ -1115,21 +1130,29 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
while (limit>0 && *pLimitBytes>0 && readBegin < range.end) {
// ASSERT( vStart == view.lower_bound(readBegin) );
ASSERT( !vStart || vStart.key() >= readBegin );
if (vStart) { auto b = vStart; --b; ASSERT( !b || b.key() < readBegin ); }
ASSERT( data->storageVersion() <= version );
if (pos == resultCache.data.size()) {
if (vStart) { auto b = vStart; --b; ASSERT( !b || b.key() < readBegin ); }
// Read up to limit items from the view, stopping at the next clear (or the end of the range)
vEnd = vStart;
vCount = 0;
//vCount = 0;
int vSize = 0;
while (vEnd && vEnd.key() < range.end && !vEnd->isClearTo() && vCount < limit && vSize < *pLimitBytes){
vSize += sizeof(KeyValueRef) + vEnd->getValue().expectedSize() + vEnd.key().expectedSize();
//while (vEnd && vEnd.key() < range.end && !vEnd->isClearTo() && vCount < limit && vSize < *pLimitBytes){
while (vStart && vStart.key() < range.end && !vStart->isClearTo() && vCount < limit &&
vSize < *pLimitBytes) {
resultCache.data.push_back(resultCache.arena, KeyValueRef(vStart.key(), vStart->getValue()));
// vSize += sizeof(KeyValueRef) + vEnd->getValue().expectedSize() + vEnd.key().expectedSize();
vSize += sizeof(KeyValueRef) + resultCache.data.end()[-1].expectedSize();
++vCount;
++vEnd;
//++vEnd;
++vStart;
}
}
// Read the data on disk up to vEnd (or the end of the range)
readEnd = vEnd ? std::min( vEnd.key(), range.end ) : range.end;
//readEnd = vEnd ? std::min( vEnd.key(), range.end ) : range.end;
readEnd = vStart ? std::min( vStart.key(), range.end ) : range.end;
Standalone<VectorRef<KeyValueRef>> atStorageVersion = wait(
data->storage.readRange( KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes ) );
@ -1146,7 +1169,9 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
// merge the sets in [vStart,vEnd) with the sets on disk, stopping at the last key from disk if there is 'more'
int prevSize = result.data.size();
merge( result.arena, result.data, atStorageVersion, vStart, vEnd, vCount, limit, more, *pLimitBytes );
//merge( result.arena, result.data, atStorageVersion, vStart, vEnd, vCount, limit, more, *pLimitBytes );
merge( result.arena, result.data, resultCache.arena, resultCache.data,
atStorageVersion, vStart, vEnd, vCount, limit, more, pos, *pLimitBytes );
limit -= result.data.size() - prevSize;
for (auto i = &result.data[prevSize]; i != result.data.end(); i++)
@ -1220,27 +1245,34 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
vEnd = vStart;
vCount = 0;
int vSize=0;
while (vEnd && vEnd.key() >= range.begin && !vEnd->isClearTo() && vCount < -limit && vSize < *pLimitBytes){
vSize += sizeof(KeyValueRef) + vEnd->getValue().expectedSize() + vEnd.key().expectedSize();
//while (vEnd && vEnd.key() >= range.begin && !vEnd->isClearTo() && vCount < -limit && vSize < *pLimitBytes){
while (vStart && vStart.key() >= range.begin && !vStart->isClearTo() && vCount < -limit && vSize < *pLimitBytes){
resultCache.data.push_back(resultCache.arena, KeyValueRef(vStart.key(), vStart->getValue()));
//vSize += sizeof(KeyValueRef) + vEnd->getValue().expectedSize() + vEnd.key().expectedSize();
vSize += sizeof(KeyValueRef) + resultCache.data.end()[-1].expectedSize();
++vCount;
--vEnd;
//--vEnd;
--vStart;
}
readBegin = range.begin;
if (vEnd)
readBegin = std::max( readBegin, vEnd->isClearTo() ? vEnd->getEndKey() : vEnd.key() );
//if (vEnd)
// readBegin = std::max( readBegin, vEnd->isClearTo() ? vEnd->getEndKey() : vEnd.key() );
if (vStart)
readBegin = std::max( readBegin, vStart->isClearTo() ? vStart->getEndKey() : vStart.key() );
Standalone<VectorRef<KeyValueRef>> atStorageVersion = wait( data->storage.readRange( KeyRangeRef(readBegin, readEnd), limit ) );
if (data->storageVersion() > version) throw transaction_too_old();
int prevSize = result.data.size();
merge( result.arena, result.data, atStorageVersion, vStart, vEnd, vCount, limit, false, *pLimitBytes );
merge( result.arena, result.data, resultCache.arena, resultCache.data,
atStorageVersion, vStart, vEnd, vCount, limit, false, pos, *pLimitBytes );
limit += result.data.size() - prevSize;
for (auto i = &result.data[prevSize]; i != result.data.end(); i++)
*pLimitBytes -= sizeof(KeyValueRef) + i->expectedSize();
vStart = vEnd;
//vStart = vEnd;
readEnd = readBegin;
if (vStart && vStart->isClearTo()) {