Step 5 of fixing storage server range reads: update the logic of reverse range reads to match forward range reads
This commit is contained in:
parent
16167b07d5
commit
fa920a6cef
|
@ -1139,31 +1139,36 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
|
||||||
if (atStorageVersion.more && limit > 0 && *pLimitBytes > 0) {
|
if (atStorageVersion.more && limit > 0 && *pLimitBytes > 0) {
|
||||||
ASSERT(result.data.end()[-1].key == atStorageVersion.end()[-1].key);
|
ASSERT(result.data.end()[-1].key == atStorageVersion.end()[-1].key);
|
||||||
readBegin = readBeginTemp = keyAfter(result.data.end()[-1].key);
|
readBegin = readBeginTemp = keyAfter(result.data.end()[-1].key);
|
||||||
} else if (vEnd && vEnd->isClearTo()) {
|
} else if (vEnd && vEnd->isClearTo() && limit > 0 && *pLimitBytes > 0) {
|
||||||
|
ASSERT(vStart == vEnd); // vStart will have been advanced by merge()
|
||||||
ASSERT(vEnd->getEndKey() > readBegin);
|
ASSERT(vEnd->getEndKey() > readBegin);
|
||||||
readBegin = vEnd->getEndKey();
|
readBegin = vEnd->getEndKey();
|
||||||
vStart = vEnd;
|
|
||||||
++vStart;
|
++vStart;
|
||||||
} else {
|
} else {
|
||||||
ASSERT(limit<=0 || *pLimitBytes<=0 || readEnd == range.end);
|
ASSERT(limit<=0 || *pLimitBytes<=0 || readEnd == range.end);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// all but the last item are less than *pLimitBytes
|
|
||||||
ASSERT( result.data.size() == 0 || *pLimitBytes + result.data.end()[-1].expectedSize() + sizeof(KeyValueRef) > 0 );
|
|
||||||
} else {
|
} else {
|
||||||
// Reverse read - abandon hope alle ye who enter here
|
vStart = view.lastLess(range.end);
|
||||||
readEnd = range.end;
|
|
||||||
|
|
||||||
vStart = view.lastLess(readEnd);
|
|
||||||
|
|
||||||
// A clear might extend all the way to range.end
|
// A clear might extend all the way to range.end
|
||||||
if (vStart && vStart->isClearTo() && vStart->getEndKey() >= readEnd) {
|
if (vStart && vStart->isClearTo() && vStart->getEndKey() >= range.end) {
|
||||||
readEnd = vStart.key();
|
readEnd = vStart.key();
|
||||||
--vStart;
|
--vStart;
|
||||||
|
} else {
|
||||||
|
readEnd = range.end;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (limit < 0 && *pLimitBytes > 0 && readEnd > range.begin) {
|
while (limit < 0 && *pLimitBytes > 0 && readEnd > range.begin) {
|
||||||
|
ASSERT(!vStart || vStart.key() < readEnd);
|
||||||
|
if (vStart) {
|
||||||
|
auto b = vStart;
|
||||||
|
++b;
|
||||||
|
ASSERT(!b || b.key() >= readEnd);
|
||||||
|
}
|
||||||
|
ASSERT(data->storageVersion() <= version);
|
||||||
|
|
||||||
vEnd = vStart;
|
vEnd = vStart;
|
||||||
vCount = 0;
|
vCount = 0;
|
||||||
int vSize=0;
|
int vSize=0;
|
||||||
|
@ -1173,30 +1178,38 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
|
||||||
--vEnd;
|
--vEnd;
|
||||||
}
|
}
|
||||||
|
|
||||||
readBegin = range.begin;
|
readBegin = vEnd ? std::max(vEnd->isClearTo() ? vEnd->getEndKey() : vEnd.key(), range.begin) : range.begin;
|
||||||
if (vEnd)
|
Standalone<RangeResultRef> atStorageVersion =
|
||||||
readBegin = std::max( readBegin, vEnd->isClearTo() ? vEnd->getEndKey() : vEnd.key() );
|
wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes));
|
||||||
|
|
||||||
Standalone<RangeResultRef> atStorageVersion = wait( data->storage.readRange( KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes ) );
|
ASSERT(atStorageVersion.size() <= -limit);
|
||||||
if (data->storageVersion() > version) throw transaction_too_old();
|
if (data->storageVersion() > version) throw transaction_too_old();
|
||||||
|
|
||||||
int prevSize = result.data.size();
|
int prevSize = result.data.size();
|
||||||
merge( result.arena, result.data, atStorageVersion, vStart, vEnd, vCount, limit, false, *pLimitBytes );
|
merge(result.arena, result.data, atStorageVersion, vStart, vEnd, vCount, limit, atStorageVersion.more, *pLimitBytes);
|
||||||
limit += result.data.size() - prevSize;
|
limit += result.data.size() - prevSize;
|
||||||
|
|
||||||
for (auto i = result.data.begin() + prevSize; i != result.data.end(); i++)
|
for (auto i = result.data.begin() + prevSize; i != result.data.end(); i++)
|
||||||
*pLimitBytes -= sizeof(KeyValueRef) + i->expectedSize();
|
*pLimitBytes -= sizeof(KeyValueRef) + i->expectedSize();
|
||||||
|
|
||||||
vStart = vEnd;
|
if (atStorageVersion.more && limit < 0 && *pLimitBytes > 0) {
|
||||||
readEnd = readBegin;
|
ASSERT(result.data.end()[-1].key == atStorageVersion.end()[-1].key);
|
||||||
|
readEnd = result.data.end()[-1].key;
|
||||||
if (vStart && vStart->isClearTo()) {
|
} else if (vEnd && vEnd->isClearTo() && limit < 0 && *pLimitBytes > 0) {
|
||||||
ASSERT( vStart.key() < readEnd );
|
ASSERT(vStart == vEnd);
|
||||||
readEnd = vStart.key();
|
ASSERT(vEnd.key() < readEnd)
|
||||||
|
readEnd = vEnd.key();
|
||||||
--vStart;
|
--vStart;
|
||||||
|
} else {
|
||||||
|
ASSERT(limit>=0 || *pLimitBytes<=0 || readBegin == range.begin);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// all but the last item are less than *pLimitBytes
|
||||||
|
ASSERT(result.data.size() == 0 || *pLimitBytes + result.data.end()[-1].expectedSize() + sizeof(KeyValueRef) > 0);
|
||||||
|
|
||||||
result.more = limit == 0 || *pLimitBytes<=0; // FIXME: Does this have to be exact?
|
result.more = limit == 0 || *pLimitBytes<=0; // FIXME: Does this have to be exact?
|
||||||
result.version = version;
|
result.version = version;
|
||||||
return result;
|
return result;
|
||||||
|
|
Loading…
Reference in New Issue