added a knob for how many bytes are read from disk
This commit is contained in:
parent
a2c5554de7
commit
88eed268c3
|
@ -86,6 +86,7 @@ void ClientKnobs::initialize(Randomize randomize) {
|
|||
init( CHANGE_FEED_COALESCE_LOCATIONS, true ); if( randomize && BUGGIFY ) CHANGE_FEED_COALESCE_LOCATIONS = false;
|
||||
init( CHANGE_FEED_CACHE_FLUSH_BYTES, 10e6 ); if( randomize && BUGGIFY ) CHANGE_FEED_CACHE_FLUSH_BYTES = deterministicRandom()->randomInt64(1, 1e6);
|
||||
init( CHANGE_FEED_CACHE_EXPIRE_TIME, 60.0 ); if( randomize && BUGGIFY ) CHANGE_FEED_CACHE_EXPIRE_TIME = 1.0;
|
||||
init( CHANGE_FEED_CACHE_LIMIT_BYTES, 500000 ); if( randomize && BUGGIFY ) CHANGE_FEED_CACHE_LIMIT_BYTES = 50000;
|
||||
|
||||
init( MAX_BATCH_SIZE, 1000 ); if( randomize && BUGGIFY ) MAX_BATCH_SIZE = 1;
|
||||
init( GRV_BATCH_TIMEOUT, 0.005 ); if( randomize && BUGGIFY ) GRV_BATCH_TIMEOUT = 0.1;
|
||||
|
|
|
@ -9567,7 +9567,9 @@ ACTOR Future<Void> cleanupChangeFeedCache(DatabaseContext* db) {
|
|||
ACTOR Future<Void> initializeCFCache(DatabaseContext* db) {
|
||||
state Key beginKey = changeFeedCacheFeedKeys.begin;
|
||||
loop {
|
||||
RangeResult res = wait(db->storage->readRange(KeyRangeRef(beginKey, changeFeedCacheFeedKeys.end), 5e5, 5e5));
|
||||
RangeResult res = wait(db->storage->readRange(KeyRangeRef(beginKey, changeFeedCacheFeedKeys.end),
|
||||
CLIENT_KNOBS->CHANGE_FEED_CACHE_LIMIT_BYTES,
|
||||
CLIENT_KNOBS->CHANGE_FEED_CACHE_LIMIT_BYTES));
|
||||
if (res.size()) {
|
||||
beginKey = keyAfter(res.back().key);
|
||||
} else {
|
||||
|
@ -10697,7 +10699,9 @@ ACTOR Future<bool> getChangeFeedStreamFromDisk(Reference<DatabaseContext> db,
|
|||
loop {
|
||||
Key beginKey = changeFeedCacheKey(tenantPrefix, rangeID, range, *begin);
|
||||
Key endKey = changeFeedCacheKey(tenantPrefix, rangeID, range, MAX_VERSION);
|
||||
state RangeResult res = wait(db->storage->readRange(KeyRangeRef(beginKey, endKey), 5e5, 5e5));
|
||||
state RangeResult res = wait(db->storage->readRange(KeyRangeRef(beginKey, endKey),
|
||||
CLIENT_KNOBS->CHANGE_FEED_CACHE_LIMIT_BYTES,
|
||||
CLIENT_KNOBS->CHANGE_FEED_CACHE_LIMIT_BYTES));
|
||||
state int idx = 0;
|
||||
|
||||
while (!foundEnd && idx < res.size()) {
|
||||
|
|
|
@ -82,6 +82,7 @@ public:
|
|||
bool CHANGE_FEED_COALESCE_LOCATIONS;
|
||||
int64_t CHANGE_FEED_CACHE_FLUSH_BYTES;
|
||||
double CHANGE_FEED_CACHE_EXPIRE_TIME;
|
||||
int64_t CHANGE_FEED_CACHE_LIMIT_BYTES;
|
||||
|
||||
int MAX_BATCH_SIZE;
|
||||
double GRV_BATCH_TIMEOUT;
|
||||
|
|
Loading…
Reference in New Issue