Merge pull request #2650 from ajbeamon/fix-reverse-range-read-byte-limit-bug

Fix reverse range read performance bug
This commit is contained in:
A.J. Beamon 2020-02-20 12:47:17 -08:00 committed by GitHub
commit fcbdcda490
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 174 additions and 173 deletions

View File

@ -5,6 +5,11 @@ Release Notes
6.2.16
======
Performance
-----------
* Reverse range reads could read too much data from disk, resulting in poor performance relative to forward range reads. `(PR #2650) <https://github.com/apple/foundationdb./pull/2650>`_.
Fixes
-----

View File

@ -401,7 +401,7 @@ struct LeaderRegisterCollection {
if( !self->pStore->exists() )
return Void();
OnDemandStore &store = *self->pStore;
Standalone<VectorRef<KeyValueRef>> forwardingInfo = wait( store->readRange( fwdKeys ) );
Standalone<RangeResultRef> forwardingInfo = wait( store->readRange( fwdKeys ) );
for( int i = 0; i < forwardingInfo.size(); i++ ) {
LeaderInfo forwardInfo;
forwardInfo.forward = true;

View File

@ -49,7 +49,7 @@ public:
// If rowLimit>=0, reads first rows sorted ascending, otherwise reads last rows sorted descending
// The total size of the returned value (less the last entry) will be less than byteLimit
virtual Future<Standalone<VectorRef<KeyValueRef>>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 ) = 0;
virtual Future<Standalone<RangeResultRef>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 ) = 0;
//Returns the amount of free and total space for this store, in bytes
virtual StorageBytes getStorageBytes() = 0;

View File

@ -77,12 +77,12 @@ struct KeyValueStoreCompressTestData : IKeyValueStore {
// If rowLimit>=0, reads first rows sorted ascending, otherwise reads last rows sorted descending
// The total size of the returned value (less the last entry) will be less than byteLimit
virtual Future<Standalone<VectorRef<KeyValueRef>>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 ) {
virtual Future<Standalone<RangeResultRef>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 ) {
return doReadRange(store, keys, rowLimit, byteLimit);
}
ACTOR Future<Standalone<VectorRef<KeyValueRef>>> doReadRange( IKeyValueStore* store, KeyRangeRef keys, int rowLimit, int byteLimit ) {
Standalone<VectorRef<KeyValueRef>> _vs = wait( store->readRange(keys, rowLimit, byteLimit) );
Standalone<VectorRef<KeyValueRef>> vs = _vs; // Get rid of implicit const& from wait statement
ACTOR Future<Standalone<RangeResultRef>> doReadRange( IKeyValueStore* store, KeyRangeRef keys, int rowLimit, int byteLimit ) {
Standalone<RangeResultRef> _vs = wait( store->readRange(keys, rowLimit, byteLimit) );
Standalone<RangeResultRef> vs = _vs; // Get rid of implicit const& from wait statement
Arena& a = vs.arena();
for(int i=0; i<vs.size(); i++)
vs[i].value = ValueRef( a, (ValueRef const&)unpack(vs[i].value) );

View File

@ -216,14 +216,18 @@ public:
// If rowLimit>=0, reads first rows sorted ascending, otherwise reads last rows sorted descending
// The total size of the returned value (less the last entry) will be less than byteLimit
virtual Future<Standalone<VectorRef<KeyValueRef>>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 ) {
virtual Future<Standalone<RangeResultRef>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 ) {
if(recovering.isError()) throw recovering.getError();
if (!recovering.isReady()) return waitAndReadRange(this, keys, rowLimit, byteLimit);
Standalone<VectorRef<KeyValueRef>> result;
if (rowLimit >= 0) {
Standalone<RangeResultRef> result;
if (rowLimit == 0) {
return result;
}
if (rowLimit > 0) {
auto it = data.lower_bound(keys.begin);
while (it!=data.end() && it->key < keys.end && rowLimit && byteLimit>=0) {
while (it!=data.end() && it->key < keys.end && rowLimit && byteLimit>0) {
byteLimit -= sizeof(KeyValueRef) + it->key.size() + it->value.size();
result.push_back_deep( result.arena(), KeyValueRef(it->key, it->value) );
++it;
@ -232,13 +236,19 @@ public:
} else {
rowLimit = -rowLimit;
auto it = data.previous( data.lower_bound(keys.end) );
while (it!=data.end() && it->key >= keys.begin && rowLimit && byteLimit>=0) {
while (it!=data.end() && it->key >= keys.begin && rowLimit && byteLimit>0) {
byteLimit -= sizeof(KeyValueRef) + it->key.size() + it->value.size();
result.push_back_deep( result.arena(), KeyValueRef(it->key, it->value) );
it = data.previous(it);
--rowLimit;
}
}
result.more = rowLimit == 0 || byteLimit <= 0;
if(result.more) {
ASSERT(result.size() > 0);
result.readThrough = result[result.size()-1].key;
}
return result;
}
@ -694,7 +704,7 @@ private:
wait( self->recovering );
return self->readValuePrefix(key, maxLength).get();
}
ACTOR static Future<Standalone<VectorRef<KeyValueRef>>> waitAndReadRange( KeyValueStoreMemory* self, KeyRange keys, int rowLimit, int byteLimit ) {
ACTOR static Future<Standalone<RangeResultRef>> waitAndReadRange( KeyValueStoreMemory* self, KeyRange keys, int rowLimit, int byteLimit ) {
wait( self->recovering );
return self->readRange(keys, rowLimit, byteLimit).get();
}

View File

@ -1076,21 +1076,26 @@ struct RawCursor {
}
return Optional<Value>();
}
Standalone<VectorRef<KeyValueRef>> getRange( KeyRangeRef keys, int rowLimit, int byteLimit ) {
Standalone<VectorRef<KeyValueRef>> result;
Standalone<RangeResultRef> getRange( KeyRangeRef keys, int rowLimit, int byteLimit ) {
Standalone<RangeResultRef> result;
int accumulatedBytes = 0;
ASSERT( byteLimit > 0 );
if(rowLimit == 0) {
return result;
}
if(db.fragment_values) {
if(rowLimit >= 0) {
if(rowLimit > 0) {
int r = moveTo(keys.begin);
if (r < 0)
moveNext();
DefragmentingReader i(*this, result.arena(), true);
Optional<KeyRef> nextKey = i.peek();
while(nextKey.present() && nextKey.get() < keys.end && rowLimit-- && accumulatedBytes < byteLimit) {
while(nextKey.present() && nextKey.get() < keys.end && rowLimit != 0 && accumulatedBytes < byteLimit) {
Optional<KeyValueRef> kv = i.getNext();
result.push_back(result.arena(), kv.get());
--rowLimit;
accumulatedBytes += sizeof(KeyValueRef) + kv.get().expectedSize();
nextKey = i.peek();
}
@ -1101,37 +1106,45 @@ struct RawCursor {
movePrevious();
DefragmentingReader i(*this, result.arena(), false);
Optional<KeyRef> nextKey = i.peek();
while(nextKey.present() && nextKey.get() >= keys.begin && rowLimit++ && accumulatedBytes < byteLimit) {
while(nextKey.present() && nextKey.get() >= keys.begin && rowLimit != 0 && accumulatedBytes < byteLimit) {
Optional<KeyValueRef> kv = i.getNext();
result.push_back(result.arena(), kv.get());
++rowLimit;
accumulatedBytes += sizeof(KeyValueRef) + kv.get().expectedSize();
nextKey = i.peek();
}
}
}
else {
if (rowLimit >= 0) {
if (rowLimit > 0) {
int r = moveTo( keys.begin );
if (r < 0) moveNext();
while (this->valid && rowLimit-- && accumulatedBytes < byteLimit) {
while (this->valid && rowLimit != 0 && accumulatedBytes < byteLimit) {
KeyValueRef kv = decodeKV( getEncodedRow( result.arena() ) );
accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize();
if (kv.key >= keys.end) break;
--rowLimit;
accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize();
result.push_back( result.arena(), kv );
moveNext();
}
} else {
int r = moveTo( keys.end );
if (r >= 0) movePrevious();
while (this->valid && rowLimit++ && accumulatedBytes < byteLimit) {
while (this->valid && rowLimit != 0 && accumulatedBytes < byteLimit) {
KeyValueRef kv = decodeKV( getEncodedRow( result.arena() ) );
accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize();
if (kv.key < keys.begin) break;
++rowLimit;
accumulatedBytes += sizeof(KeyValueRef) + kv.expectedSize();
result.push_back( result.arena(), kv );
movePrevious();
}
}
}
result.more = rowLimit == 0 || accumulatedBytes >= byteLimit;
if(result.more) {
ASSERT(result.size() > 0);
result.readThrough = result[result.size()-1].key;
}
return result;
}
@ -1451,7 +1464,7 @@ public:
virtual Future<Optional<Value>> readValue( KeyRef key, Optional<UID> debugID );
virtual Future<Optional<Value>> readValuePrefix( KeyRef key, int maxLength, Optional<UID> debugID );
virtual Future<Standalone<VectorRef<KeyValueRef>>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 );
virtual Future<Standalone<RangeResultRef>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 );
KeyValueStoreSQLite(std::string const& filename, UID logID, KeyValueStoreType type, bool checkChecksums, bool checkIntegrity);
~KeyValueStoreSQLite();
@ -1550,7 +1563,7 @@ private:
struct ReadRangeAction : TypedAction<Reader, ReadRangeAction>, FastAllocated<ReadRangeAction> {
KeyRange keys;
int rowLimit, byteLimit;
ThreadReturnPromise<Standalone<VectorRef<KeyValueRef>>> result;
ThreadReturnPromise<Standalone<RangeResultRef>> result;
ReadRangeAction(KeyRange keys, int rowLimit, int byteLimit) : keys(keys), rowLimit(rowLimit), byteLimit(byteLimit) {}
virtual double getTimeEstimate() { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; }
};
@ -2000,7 +2013,7 @@ Future<Optional<Value>> KeyValueStoreSQLite::readValuePrefix( KeyRef key, int ma
readThreads->post(p);
return f;
}
Future<Standalone<VectorRef<KeyValueRef>>> KeyValueStoreSQLite::readRange( KeyRangeRef keys, int rowLimit, int byteLimit ) {
Future<Standalone<RangeResultRef>> KeyValueStoreSQLite::readRange( KeyRangeRef keys, int rowLimit, int byteLimit ) {
++readsRequested;
auto p = new Reader::ReadRangeAction(keys, rowLimit, byteLimit);
auto f = p->result.getFuture();

View File

@ -1345,7 +1345,7 @@ ACTOR static Future<Void> rejoinServer( MasterProxyInterface proxy, ProxyCommitD
GetStorageServerRejoinInfoReply rep;
rep.version = commitData->version;
rep.tag = decodeServerTagValue( commitData->txnStateStore->readValue(serverTagKeyFor(req.id)).get().get() );
Standalone<VectorRef<KeyValueRef>> history = commitData->txnStateStore->readRange(serverTagHistoryRangeFor(req.id)).get();
Standalone<RangeResultRef> history = commitData->txnStateStore->readRange(serverTagHistoryRangeFor(req.id)).get();
for(int i = history.size()-1; i >= 0; i-- ) {
rep.history.push_back(std::make_pair(decodeServerTagHistoryKey(history[i].key), decodeServerTagValue(history[i].value)));
}
@ -1696,7 +1696,7 @@ ACTOR Future<Void> masterProxyServerCore(
state KeyRange txnKeys = allKeys;
loop {
wait(yield());
Standalone<VectorRef<KeyValueRef>> data = commitData.txnStateStore->readRange(txnKeys, SERVER_KNOBS->BUGGIFIED_ROW_LIMIT, SERVER_KNOBS->APPLY_MUTATION_BYTES).get();
Standalone<RangeResultRef> data = commitData.txnStateStore->readRange(txnKeys, SERVER_KNOBS->BUGGIFIED_ROW_LIMIT, SERVER_KNOBS->APPLY_MUTATION_BYTES).get();
if(!data.size()) break;
((KeyRangeRef&)txnKeys) = KeyRangeRef( keyAfter(data.back().key, txnKeys.arena()), txnKeys.end );

View File

@ -955,7 +955,7 @@ namespace oldTLog_4_6 {
peekMessagesFromMemory( logData, req, messages2, endVersion );
Standalone<VectorRef<KeyValueRef>> kvs = wait(
Standalone<RangeResultRef> kvs = wait(
self->persistentData->readRange(KeyRangeRef(
persistTagMessagesKey(logData->logId, oldTag, req.begin),
persistTagMessagesKey(logData->logId, oldTag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES));
@ -1269,8 +1269,8 @@ namespace oldTLog_4_6 {
IKeyValueStore *storage = self->persistentData;
state Future<Optional<Value>> fFormat = storage->readValue(persistFormat.key);
state Future<Standalone<VectorRef<KeyValueRef>>> fVers = storage->readRange(persistCurrentVersionKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fRecoverCounts = storage->readRange(persistRecoveryCountKeys);
state Future<Standalone<RangeResultRef>> fVers = storage->readRange(persistCurrentVersionKeys);
state Future<Standalone<RangeResultRef>> fRecoverCounts = storage->readRange(persistRecoveryCountKeys);
// FIXME: metadata in queue?
@ -1283,7 +1283,7 @@ namespace oldTLog_4_6 {
}
if (!fFormat.get().present()) {
Standalone<VectorRef<KeyValueRef>> v = wait( self->persistentData->readRange( KeyRangeRef(StringRef(), LiteralStringRef("\xff")), 1 ) );
Standalone<RangeResultRef> v = wait( self->persistentData->readRange( KeyRangeRef(StringRef(), LiteralStringRef("\xff")), 1 ) );
if (!v.size()) {
TEST(true); // The DB is completely empty, so it was never initialized. Delete it.
throw worker_removed();
@ -1336,7 +1336,7 @@ namespace oldTLog_4_6 {
tagKeys = prefixRange( rawId.withPrefix(persistTagPoppedKeys.begin) );
loop {
if(logData->removed.isReady()) break;
Standalone<VectorRef<KeyValueRef>> data = wait( self->persistentData->readRange( tagKeys, BUGGIFY ? 3 : 1<<30, 1<<20 ) );
Standalone<RangeResultRef> data = wait( self->persistentData->readRange( tagKeys, BUGGIFY ? 3 : 1<<30, 1<<20 ) );
if (!data.size()) break;
((KeyRangeRef&)tagKeys) = KeyRangeRef( keyAfter(data.back().key, tagKeys.arena()), tagKeys.end );

View File

@ -1158,7 +1158,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
peekMessagesFromMemory( logData, req, messages2, endVersion );
}
Standalone<VectorRef<KeyValueRef>> kvs = wait(
Standalone<RangeResultRef> kvs = wait(
self->persistentData->readRange(KeyRangeRef(
persistTagMessagesKey(logData->logId, req.tag, req.begin),
persistTagMessagesKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES));
@ -1929,12 +1929,12 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
state IKeyValueStore *storage = self->persistentData;
wait(storage->init());
state Future<Optional<Value>> fFormat = storage->readValue(persistFormat.key);
state Future<Standalone<VectorRef<KeyValueRef>>> fVers = storage->readRange(persistCurrentVersionKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fKnownCommitted = storage->readRange(persistKnownCommittedVersionKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fLocality = storage->readRange(persistLocalityKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fLogRouterTags = storage->readRange(persistLogRouterTagsKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fTxsTags = storage->readRange(persistTxsTagsKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fRecoverCounts = storage->readRange(persistRecoveryCountKeys);
state Future<Standalone<RangeResultRef>> fVers = storage->readRange(persistCurrentVersionKeys);
state Future<Standalone<RangeResultRef>> fKnownCommitted = storage->readRange(persistKnownCommittedVersionKeys);
state Future<Standalone<RangeResultRef>> fLocality = storage->readRange(persistLocalityKeys);
state Future<Standalone<RangeResultRef>> fLogRouterTags = storage->readRange(persistLogRouterTagsKeys);
state Future<Standalone<RangeResultRef>> fTxsTags = storage->readRange(persistTxsTagsKeys);
state Future<Standalone<RangeResultRef>> fRecoverCounts = storage->readRange(persistRecoveryCountKeys);
// FIXME: metadata in queue?
@ -1953,7 +1953,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
}
if (!fFormat.get().present()) {
Standalone<VectorRef<KeyValueRef>> v = wait( self->persistentData->readRange( KeyRangeRef(StringRef(), LiteralStringRef("\xff")), 1 ) );
Standalone<RangeResultRef> v = wait( self->persistentData->readRange( KeyRangeRef(StringRef(), LiteralStringRef("\xff")), 1 ) );
if (!v.size()) {
TEST(true); // The DB is completely empty, so it was never initialized. Delete it.
throw worker_removed();
@ -2043,7 +2043,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
tagKeys = prefixRange( rawId.withPrefix(persistTagPoppedKeys.begin) );
loop {
if(logData->removed.isReady()) break;
Standalone<VectorRef<KeyValueRef>> data = wait( self->persistentData->readRange( tagKeys, BUGGIFY ? 3 : 1<<30, 1<<20 ) );
Standalone<RangeResultRef> data = wait( self->persistentData->readRange( tagKeys, BUGGIFY ? 3 : 1<<30, 1<<20 ) );
if (!data.size()) break;
((KeyRangeRef&)tagKeys) = KeyRangeRef( keyAfter(data.back().key, tagKeys.arena()), tagKeys.end );

View File

@ -685,7 +685,7 @@ ACTOR Future<Void> updatePoppedLocation( TLogData* self, Reference<LogData> logD
if (data->popped <= logData->persistentDataVersion) {
// Recover the next needed location in the Disk Queue from the index.
Standalone<VectorRef<KeyValueRef>> kvrefs = wait(
Standalone<RangeResultRef> kvrefs = wait(
self->persistentData->readRange(KeyRangeRef(
persistTagMessageRefsKey(logData->logId, data->tag, data->popped),
persistTagMessageRefsKey(logData->logId, data->tag, logData->persistentDataVersion + 1)), 1));
@ -1463,7 +1463,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
}
if (req.tag.locality == tagLocalityTxs || req.tag == txsTag) {
Standalone<VectorRef<KeyValueRef>> kvs = wait(
Standalone<RangeResultRef> kvs = wait(
self->persistentData->readRange(KeyRangeRef(
persistTagMessagesKey(logData->logId, req.tag, req.begin),
persistTagMessagesKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES));
@ -1482,7 +1482,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
}
} else {
// FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow.
Standalone<VectorRef<KeyValueRef>> kvrefs = wait(
Standalone<RangeResultRef> kvrefs = wait(
self->persistentData->readRange(KeyRangeRef(
persistTagMessageRefsKey(logData->logId, req.tag, req.begin),
persistTagMessageRefsKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)),
@ -2335,13 +2335,13 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
wait(storage->init());
state Future<Optional<Value>> fFormat = storage->readValue(persistFormat.key);
state Future<Optional<Value>> fRecoveryLocation = storage->readValue(persistRecoveryLocationKey);
state Future<Standalone<VectorRef<KeyValueRef>>> fVers = storage->readRange(persistCurrentVersionKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fKnownCommitted = storage->readRange(persistKnownCommittedVersionKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fLocality = storage->readRange(persistLocalityKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fLogRouterTags = storage->readRange(persistLogRouterTagsKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fTxsTags = storage->readRange(persistTxsTagsKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fRecoverCounts = storage->readRange(persistRecoveryCountKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fProtocolVersions = storage->readRange(persistProtocolVersionKeys);
state Future<Standalone<RangeResultRef>> fVers = storage->readRange(persistCurrentVersionKeys);
state Future<Standalone<RangeResultRef>> fKnownCommitted = storage->readRange(persistKnownCommittedVersionKeys);
state Future<Standalone<RangeResultRef>> fLocality = storage->readRange(persistLocalityKeys);
state Future<Standalone<RangeResultRef>> fLogRouterTags = storage->readRange(persistLogRouterTagsKeys);
state Future<Standalone<RangeResultRef>> fTxsTags = storage->readRange(persistTxsTagsKeys);
state Future<Standalone<RangeResultRef>> fRecoverCounts = storage->readRange(persistRecoveryCountKeys);
state Future<Standalone<RangeResultRef>> fProtocolVersions = storage->readRange(persistProtocolVersionKeys);
// FIXME: metadata in queue?
@ -2360,7 +2360,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
}
if (!fFormat.get().present()) {
Standalone<VectorRef<KeyValueRef>> v = wait( self->persistentData->readRange( KeyRangeRef(StringRef(), LiteralStringRef("\xff")), 1 ) );
Standalone<RangeResultRef> v = wait( self->persistentData->readRange( KeyRangeRef(StringRef(), LiteralStringRef("\xff")), 1 ) );
if (!v.size()) {
TEST(true); // The DB is completely empty, so it was never initialized. Delete it.
throw worker_removed();
@ -2448,7 +2448,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
tagKeys = prefixRange( rawId.withPrefix(persistTagPoppedKeys.begin) );
loop {
if(logData->removed.isReady()) break;
Standalone<VectorRef<KeyValueRef>> data = wait( self->persistentData->readRange( tagKeys, BUGGIFY ? 3 : 1<<30, 1<<20 ) );
Standalone<RangeResultRef> data = wait( self->persistentData->readRange( tagKeys, BUGGIFY ? 3 : 1<<30, 1<<20 ) );
if (!data.size()) break;
((KeyRangeRef&)tagKeys) = KeyRangeRef( keyAfter(data.back().key, tagKeys.arena()), tagKeys.end );

View File

@ -4859,22 +4859,26 @@ public:
m_tree->set(keyValue);
}
Future< Standalone< VectorRef< KeyValueRef > > > readRange(KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30) {
Future< Standalone< RangeResultRef > > readRange(KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30) {
debug_printf("READRANGE %s\n", printable(keys).c_str());
return catchError(readRange_impl(this, keys, rowLimit, byteLimit));
}
ACTOR static Future< Standalone< VectorRef< KeyValueRef > > > readRange_impl(KeyValueStoreRedwoodUnversioned *self, KeyRange keys, int rowLimit, int byteLimit) {
ACTOR static Future< Standalone< RangeResultRef > > readRange_impl(KeyValueStoreRedwoodUnversioned *self, KeyRange keys, int rowLimit, int byteLimit) {
self->m_tree->counts.getRanges++;
state Standalone<VectorRef<KeyValueRef>> result;
state Standalone<RangeResultRef> result;
state int accumulatedBytes = 0;
ASSERT( byteLimit > 0 );
if(rowLimit == 0) {
return result;
}
state Reference<IStoreCursor> cur = self->m_tree->readAtVersion(self->m_tree->getLastCommittedVersion());
// Prefetch is currently only done in the forward direction
state int prefetchBytes = rowLimit > 1 ? byteLimit : 0;
if(rowLimit >= 0) {
if(rowLimit > 0) {
wait(cur->findFirstEqualOrGreater(keys.begin, prefetchBytes));
while(cur->isValid() && cur->getKey() < keys.end) {
KeyValueRef kv(KeyRef(result.arena(), cur->getKey()), ValueRef(result.arena(), cur->getValue()));
@ -4900,6 +4904,12 @@ public:
wait(cur->prev());
}
}
result.more = rowLimit == 0 || accumulatedBytes >= byteLimit;
if(result.more) {
ASSERT(result.size() > 0);
result.readThrough = result[result.size()-1].key;
}
return result;
}

View File

@ -665,8 +665,8 @@ ACTOR Future<Void> readTransactionSystemState( Reference<MasterData> self, Refer
TraceEvent("MasterRecovering", self->dbgid).detail("LastEpochEnd", self->lastEpochEnd).detail("RecoveryTransactionVersion", self->recoveryTransactionVersion);
Standalone<VectorRef<KeyValueRef>> rawConf = wait( self->txnStateStore->readRange( configKeys ) );
self->configuration.fromKeyValues( rawConf );
Standalone<RangeResultRef> rawConf = wait( self->txnStateStore->readRange( configKeys ) );
self->configuration.fromKeyValues( rawConf.castTo<VectorRef<KeyValueRef>>() );
self->originalConfiguration = self->configuration;
self->hasConfiguration = true;
@ -676,13 +676,13 @@ ACTOR Future<Void> readTransactionSystemState( Reference<MasterData> self, Refer
.detail("Conf", self->configuration.toString())
.trackLatest("RecoveredConfig");
Standalone<VectorRef<KeyValueRef>> rawLocalities = wait( self->txnStateStore->readRange( tagLocalityListKeys ) );
Standalone<RangeResultRef> rawLocalities = wait( self->txnStateStore->readRange( tagLocalityListKeys ) );
self->dcId_locality.clear();
for(auto& kv : rawLocalities) {
self->dcId_locality[decodeTagLocalityListKey(kv.key)] = decodeTagLocalityListValue(kv.value);
}
Standalone<VectorRef<KeyValueRef>> rawTags = wait( self->txnStateStore->readRange( serverTagKeys ) );
Standalone<RangeResultRef> rawTags = wait( self->txnStateStore->readRange( serverTagKeys ) );
self->allTags.clear();
if(self->forceRecovery) {
@ -699,7 +699,7 @@ ACTOR Future<Void> readTransactionSystemState( Reference<MasterData> self, Refer
}
}
Standalone<VectorRef<KeyValueRef>> rawHistoryTags = wait( self->txnStateStore->readRange( serverTagHistoryKeys ) );
Standalone<RangeResultRef> rawHistoryTags = wait( self->txnStateStore->readRange( serverTagHistoryKeys ) );
for(auto& kv : rawHistoryTags) {
self->allTags.push_back(decodeServerTagValue( kv.value ));
}
@ -722,13 +722,13 @@ ACTOR Future<Void> sendInitialCommitToResolvers( Reference<MasterData> self ) {
state Sequence txnSequence = 0;
ASSERT(self->recoveryTransactionVersion);
state Standalone<VectorRef<KeyValueRef>> data = self->txnStateStore->readRange(txnKeys, BUGGIFY ? 3 : SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES).get();
state Standalone<RangeResultRef> data = self->txnStateStore->readRange(txnKeys, BUGGIFY ? 3 : SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES).get();
state vector<Future<Void>> txnReplies;
state int64_t dataOutstanding = 0;
loop {
if(!data.size()) break;
((KeyRangeRef&)txnKeys) = KeyRangeRef( keyAfter(data.back().key, txnKeys.arena()), txnKeys.end );
Standalone<VectorRef<KeyValueRef>> nextData = self->txnStateStore->readRange(txnKeys, BUGGIFY ? 3 : SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES).get();
Standalone<RangeResultRef> nextData = self->txnStateStore->readRange(txnKeys, BUGGIFY ? 3 : SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES).get();
for(auto& r : self->proxies) {
TxnStateRequest req;

View File

@ -185,7 +185,7 @@ struct StorageServerDisk {
Future<Key> readNextKeyInclusive( KeyRef key ) { return readFirstKey(storage, KeyRangeRef(key, allKeys.end)); }
Future<Optional<Value>> readValue( KeyRef key, Optional<UID> debugID = Optional<UID>() ) { return storage->readValue(key, debugID); }
Future<Optional<Value>> readValuePrefix( KeyRef key, int maxLength, Optional<UID> debugID = Optional<UID>() ) { return storage->readValuePrefix(key, maxLength, debugID); }
Future<Standalone<VectorRef<KeyValueRef>>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 ) { return storage->readRange(keys, rowLimit, byteLimit); }
Future<Standalone<RangeResultRef>> readRange( KeyRangeRef keys, int rowLimit = 1<<30, int byteLimit = 1<<30 ) { return storage->readRange(keys, rowLimit, byteLimit); }
KeyValueStoreType getKeyValueStoreType() { return storage->getType(); }
StorageBytes getStorageBytes() { return storage->getStorageBytes(); }
@ -197,7 +197,7 @@ private:
void writeMutations( MutationListRef mutations, Version debugVersion, const char* debugContext );
ACTOR static Future<Key> readFirstKey( IKeyValueStore* storage, KeyRangeRef range ) {
Standalone<VectorRef<KeyValueRef>> r = wait( storage->readRange( range, 1 ) );
Standalone<RangeResultRef> r = wait( storage->readRange( range, 1 ) );
if (r.size()) return r[0].key;
else return range.end;
}
@ -1045,17 +1045,19 @@ void merge( Arena& arena, VectorRef<KeyValueRef, VecSerStrategy::String>& output
// Combines data from base (at an older version) with sets from newer versions in [start, end) and appends the first (up to) |limit| rows to output
// If limit<0, base and output are in descending order, and start->key()>end->key(), but start is still inclusive and end is exclusive
{
if (limit==0) return;
int originalLimit = abs(limit) + output.size();
ASSERT(limit != 0);
bool forward = limit>0;
if (!forward) limit = -limit;
int adjustedLimit = limit + output.size();
int accumulatedBytes = 0;
KeyValueRef const* baseStart = base.begin();
KeyValueRef const* baseEnd = base.end();
while (baseStart!=baseEnd && start!=end && --limit>=0 && accumulatedBytes < limitBytes) {
if (forward ? baseStart->key < start.key() : baseStart->key > start.key())
while (baseStart!=baseEnd && start!=end && output.size() < adjustedLimit && accumulatedBytes < limitBytes) {
if (forward ? baseStart->key < start.key() : baseStart->key > start.key()) {
output.push_back_deep( arena, *baseStart++ );
}
else {
output.push_back_deep( arena, KeyValueRef(start.key(), start->getValue()) );
if (baseStart->key == start.key()) ++baseStart;
@ -1063,18 +1065,17 @@ void merge( Arena& arena, VectorRef<KeyValueRef, VecSerStrategy::String>& output
}
accumulatedBytes += sizeof(KeyValueRef) + output.end()[-1].expectedSize();
}
while (baseStart!=baseEnd && --limit>=0 && accumulatedBytes < limitBytes) {
while (baseStart!=baseEnd && output.size() < adjustedLimit && accumulatedBytes < limitBytes) {
output.push_back_deep( arena, *baseStart++ );
accumulatedBytes += sizeof(KeyValueRef) + output.end()[-1].expectedSize();
}
if( !stopAtEndOfBase ) {
while (start!=end && --limit>=0 && accumulatedBytes < limitBytes) {
while (start!=end && output.size() < adjustedLimit && accumulatedBytes < limitBytes) {
output.push_back_deep( arena, KeyValueRef(start.key(), start->getValue()) );
accumulatedBytes += sizeof(KeyValueRef) + output.end()[-1].expectedSize();
if (forward) ++start; else --start;
}
}
ASSERT( output.size() <= originalLimit );
}
// readRange reads up to |limit| rows from the given range and version, combining data->storage and data->versionedData.
@ -1089,14 +1090,8 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
state KeyRef readEnd;
state Key readBeginTemp;
state int vCount;
//state UID rrid = deterministicRandom()->randomUniqueID();
//state int originalLimit = limit;
//state int originalLimitBytes = *pLimitBytes;
//state bool track = rrid.first() == 0x1bc134c2f752187cLL;
// FIXME: Review pLimitBytes behavior
// if (limit >= 0) we are reading forward, else backward
if (limit >= 0) {
// We might care about a clear beginning before start that
// runs into range
@ -1108,20 +1103,7 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
vStart = view.lower_bound(readBegin);
/*if (track) {
printf("readRange(%llx, @%lld, '%s'-'%s')\n", data->thisServerID.first(), version, printable(range.begin).c_str(), printable(range.end).c_str());
printf("mvcc:\n");
vEnd = view.upper_bound(range.end);
for(auto r=vStart; r != vEnd; ++r) {
if (r->isClearTo())
printf(" '%s'-'%s' cleared\n", printable(r.key()).c_str(), printable(r->getEndKey()).c_str());
else
printf(" '%s' := '%s'\n", printable(r.key()).c_str(), printable(r->getValue()).c_str());
}
}*/
while (limit>0 && *pLimitBytes>0 && readBegin < range.end) {
// ASSERT( vStart == view.lower_bound(readBegin) );
ASSERT( !vStart || vStart.key() >= readBegin );
if (vStart) { auto b = vStart; --b; ASSERT( !b || b.key() < readBegin ); }
ASSERT( data->storageVersion() <= version );
@ -1138,93 +1120,58 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
// Read the data on disk up to vEnd (or the end of the range)
readEnd = vEnd ? std::min( vEnd.key(), range.end ) : range.end;
Standalone<VectorRef<KeyValueRef>> atStorageVersion = wait(
Standalone<RangeResultRef> atStorageVersion = wait(
data->storage.readRange( KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes ) );
/*if (track) {
printf("read [%s,%s): %d rows\n", printable(readBegin).c_str(), printable(readEnd).c_str(), atStorageVersion.size());
for(auto r=atStorageVersion.begin(); r != atStorageVersion.end(); ++r)
printf(" '%s' := '%s'\n", printable(r->key).c_str(), printable(r->value).c_str());
}*/
ASSERT( atStorageVersion.size() <= limit );
if (data->storageVersion() > version) throw transaction_too_old();
bool more = atStorageVersion.size()!=0;
// merge the sets in [vStart,vEnd) with the sets on disk, stopping at the last key from disk if there is 'more'
// merge the sets in [vStart,vEnd) with the sets on disk, stopping at the last key from disk if we were limited
int prevSize = result.data.size();
merge( result.arena, result.data, atStorageVersion, vStart, vEnd, vCount, limit, more, *pLimitBytes );
merge( result.arena, result.data, atStorageVersion, vStart, vEnd, vCount, limit, atStorageVersion.more, *pLimitBytes );
limit -= result.data.size() - prevSize;
for (auto i = result.data.begin() + prevSize; i != result.data.end(); i++)
*pLimitBytes -= sizeof(KeyValueRef) + i->expectedSize();
// Setup for the next iteration
if (more) { // if there might be more data, begin reading right after what we already found to find out
//if (track) printf("more\n");
if (!(limit<=0 || *pLimitBytes<=0 || result.data.end()[-1].key == atStorageVersion.end()[-1].key))
TraceEvent(SevError, "ReadRangeIssue", data->thisServerID).detail("ReadBegin", readBegin).detail("ReadEnd", readEnd)
.detail("VStart", vStart ? vStart.key() : LiteralStringRef("nil")).detail("VEnd", vEnd ? vEnd.key() : LiteralStringRef("nil"))
.detail("AtStorageVersionBack", atStorageVersion.end()[-1].key).detail("ResultBack", result.data.end()[-1].key)
.detail("Limit", limit).detail("LimitBytes", *pLimitBytes).detail("ResultSize", result.data.size()).detail("PrevSize", prevSize);
readBegin = readBeginTemp = keyAfter( result.data.end()[-1].key );
ASSERT( limit<=0 || *pLimitBytes<=0 || result.data.end()[-1].key == atStorageVersion.end()[-1].key );
} else if (vStart && vStart->isClearTo()){ // if vStart is a clear, skip it.
//if (track) printf("skip clear\n");
readBegin = vStart->getEndKey(); // next disk read should start at the end of the clear
++vStart;
} else { // Otherwise, continue at readEnd
//if (track) printf("continue\n");
readBegin = readEnd;
}
}
// all but the last item are less than *pLimitBytes
ASSERT( result.data.size() == 0 || *pLimitBytes + result.data.end()[-1].expectedSize() + sizeof(KeyValueRef) > 0 );
/*if (*pLimitBytes <= 0)
TraceEvent(SevWarn, "ReadRangeLimitExceeded")
.detail("Version", version)
.detail("Begin", range.begin )
.detail("End", range.end )
.detail("LimitReamin", limit)
.detail("LimitBytesRemain", *pLimitBytes); */
if (limit <=0 || *pLimitBytes <= 0) {
break;
}
/*GetKeyValuesReply correct = wait( readRangeOld(data, version, range, originalLimit, originalLimitBytes) );
bool prefix_equal = true;
int totalsize = 0;
int first_difference = -1;
for(int i=0; i<result.data.size() && i<correct.data.size(); i++) {
if (result.data[i] != correct.data[i]) {
first_difference = i;
prefix_equal = false;
// If we hit our limits reading from disk but then combining with MVCC gave us back more room
if (atStorageVersion.more) {
ASSERT(result.data.end()[-1].key == atStorageVersion.end()[-1].key);
readBegin = readBeginTemp = keyAfter(result.data.end()[-1].key);
} else if (vEnd && vEnd->isClearTo()) {
ASSERT(vStart == vEnd); // vStart will have been advanced by merge()
ASSERT(vEnd->getEndKey() > readBegin);
readBegin = vEnd->getEndKey();
++vStart;
} else {
ASSERT(readEnd == range.end);
break;
}
totalsize += result.data[i].expectedSize() + sizeof(KeyValueRef);
}
// for the following check
result.more = limit == 0 || *pLimitBytes<=0; // FIXME: Does this have to be exact?
result.version = version;
if ( !(totalsize>originalLimitBytes ? prefix_equal : result.data==correct.data) || correct.more != result.more ) {
TraceEvent(SevError, "IncorrectResult", rrid).detail("Server", data->thisServerID).detail("CorrectRows", correct.data.size())
.detail("FirstDifference", first_difference).detail("OriginalLimit", originalLimit)
.detail("ResultRows", result.data.size()).detail("Result0", result.data[0].key).detail("Correct0", correct.data[0].key)
.detail("ResultN", result.data.size() ? result.data[std::min(correct.data.size(),result.data.size())-1].key : "nil")
.detail("CorrectN", correct.data.size() ? correct.data[std::min(correct.data.size(),result.data.size())-1].key : "nil");
}*/
} else {
// Reverse read - abandon hope alle ye who enter here
readEnd = range.end;
vStart = view.lastLess(readEnd);
vStart = view.lastLess(range.end);
// A clear might extend all the way to range.end
if (vStart && vStart->isClearTo() && vStart->getEndKey() >= readEnd) {
if (vStart && vStart->isClearTo() && vStart->getEndKey() >= range.end) {
readEnd = vStart.key();
--vStart;
} else {
readEnd = range.end;
}
while (limit < 0 && *pLimitBytes > 0 && readEnd > range.begin) {
ASSERT(!vStart || vStart.key() < readEnd);
if (vStart) {
auto b = vStart;
++b;
ASSERT(!b || b.key() >= readEnd);
}
ASSERT(data->storageVersion() <= version);
vEnd = vStart;
vCount = 0;
int vSize=0;
@ -1234,30 +1181,42 @@ ACTOR Future<GetKeyValuesReply> readRange( StorageServer* data, Version version,
--vEnd;
}
readBegin = range.begin;
if (vEnd)
readBegin = std::max( readBegin, vEnd->isClearTo() ? vEnd->getEndKey() : vEnd.key() );
readBegin = vEnd ? std::max(vEnd->isClearTo() ? vEnd->getEndKey() : vEnd.key(), range.begin) : range.begin;
Standalone<RangeResultRef> atStorageVersion =
wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes));
Standalone<VectorRef<KeyValueRef>> atStorageVersion = wait( data->storage.readRange( KeyRangeRef(readBegin, readEnd), limit ) );
ASSERT(atStorageVersion.size() <= -limit);
if (data->storageVersion() > version) throw transaction_too_old();
int prevSize = result.data.size();
merge( result.arena, result.data, atStorageVersion, vStart, vEnd, vCount, limit, false, *pLimitBytes );
merge(result.arena, result.data, atStorageVersion, vStart, vEnd, vCount, limit, atStorageVersion.more, *pLimitBytes);
limit += result.data.size() - prevSize;
for (auto i = result.data.begin() + prevSize; i != result.data.end(); i++)
*pLimitBytes -= sizeof(KeyValueRef) + i->expectedSize();
vStart = vEnd;
readEnd = readBegin;
if (limit >=0 || *pLimitBytes <= 0) {
break;
}
if (vStart && vStart->isClearTo()) {
ASSERT( vStart.key() < readEnd );
readEnd = vStart.key();
if (atStorageVersion.more) {
ASSERT(result.data.end()[-1].key == atStorageVersion.end()[-1].key);
readEnd = result.data.end()[-1].key;
} else if (vEnd && vEnd->isClearTo()) {
ASSERT(vStart == vEnd);
ASSERT(vEnd.key() < readEnd)
readEnd = vEnd.key();
--vStart;
} else {
ASSERT(readBegin == range.begin);
break;
}
}
}
// all but the last item are less than *pLimitBytes
ASSERT(result.data.size() == 0 || *pLimitBytes + result.data.end()[-1].expectedSize() + sizeof(KeyValueRef) > 0);
result.more = limit == 0 || *pLimitBytes<=0; // FIXME: Does this have to be exact?
result.version = version;
return result;
@ -3036,8 +2995,8 @@ ACTOR Future<Void> applyByteSampleResult( StorageServer* data, IKeyValueStore* s
state int totalKeys = 0;
state int totalBytes = 0;
loop {
Standalone<VectorRef<KeyValueRef>> bs = wait( storage->readRange( KeyRangeRef(begin, end), SERVER_KNOBS->STORAGE_LIMIT_BYTES, SERVER_KNOBS->STORAGE_LIMIT_BYTES ) );
if(results) results->push_back(bs);
Standalone<RangeResultRef> bs = wait( storage->readRange( KeyRangeRef(begin, end), SERVER_KNOBS->STORAGE_LIMIT_BYTES, SERVER_KNOBS->STORAGE_LIMIT_BYTES ) );
if(results) results->push_back(bs.castTo<VectorRef<KeyValueRef>>());
int rangeSize = bs.expectedSize();
totalFetches++;
totalKeys += bs.size();
@ -3118,8 +3077,8 @@ ACTOR Future<bool> restoreDurableState( StorageServer* data, IKeyValueStore* sto
state Future<Optional<Value>> fVersion = storage->readValue(persistVersion);
state Future<Optional<Value>> fLogProtocol = storage->readValue(persistLogProtocol);
state Future<Optional<Value>> fPrimaryLocality = storage->readValue(persistPrimaryLocality);
state Future<Standalone<VectorRef<KeyValueRef>>> fShardAssigned = storage->readRange(persistShardAssignedKeys);
state Future<Standalone<VectorRef<KeyValueRef>>> fShardAvailable = storage->readRange(persistShardAvailableKeys);
state Future<Standalone<RangeResultRef>> fShardAssigned = storage->readRange(persistShardAssignedKeys);
state Future<Standalone<RangeResultRef>> fShardAvailable = storage->readRange(persistShardAvailableKeys);
state Promise<Void> byteSampleSampleRecovered;
state Promise<Void> startByteSampleRestore;
@ -3156,7 +3115,7 @@ ACTOR Future<bool> restoreDurableState( StorageServer* data, IKeyValueStore* sto
debug_checkRestoredVersion( data->thisServerID, version, "StorageServer" );
data->setInitialVersion( version );
state Standalone<VectorRef<KeyValueRef>> available = fShardAvailable.get();
state Standalone<RangeResultRef> available = fShardAvailable.get();
state int availableLoc;
for(availableLoc=0; availableLoc<available.size(); availableLoc++) {
KeyRangeRef keys(
@ -3170,7 +3129,7 @@ ACTOR Future<bool> restoreDurableState( StorageServer* data, IKeyValueStore* sto
wait(yield());
}
state Standalone<VectorRef<KeyValueRef>> assigned = fShardAssigned.get();
state Standalone<RangeResultRef> assigned = fShardAssigned.get();
state int assignedLoc;
for(assignedLoc=0; assignedLoc<assigned.size(); assignedLoc++) {
KeyRangeRef keys(

View File

@ -270,7 +270,7 @@ ACTOR Future<Void> testKVStoreMain( KVStoreTestWorkload* workload, KVTest* ptest
state Key k;
state double cst = timer();
while (true) {
Standalone<VectorRef<KeyValueRef>> kv = wait( test.store->readRange( KeyRangeRef(k, LiteralStringRef("\xff\xff\xff\xff")), 1000 ) );
Standalone<RangeResultRef> kv = wait( test.store->readRange( KeyRangeRef(k, LiteralStringRef("\xff\xff\xff\xff")), 1000 ) );
count += kv.size();
if (kv.size() < 1000) break;
k = keyAfter( kv[ kv.size()-1 ].key );

View File

@ -519,6 +519,10 @@ public:
}
#endif
template <class U> Standalone<U> castTo() const {
return Standalone<U>(*this, arena());
}
template <class Archive>
void serialize(Archive& ar) {
// FIXME: something like BinaryReader(ar) >> arena >> *(T*)this; to guarantee standalone arena???