In the SQLite storage engine, destroy and create new cursors after SQLITE_CURSOR_MAX_LIFETIME_BYTES of KV read usage because cursor usage increases page cache memory usage in SQLite (internal page cache, not AsyncFileCached) by pinning pages which are not freed until the cursor is destroyed.

This commit is contained in:
Steve Atherton 2022-03-07 08:41:44 -08:00
parent 58f5cea39b
commit 8f8f95931b
3 changed files with 11 additions and 1 deletions

View File

@ -290,6 +290,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( SQLITE_CHUNK_SIZE_PAGES_SIM, 1024 ); // 4MB
init( SQLITE_READER_THREADS, 64 ); // number of read threads
init( SQLITE_WRITE_WINDOW_SECONDS, -1 );
init( SQLITE_CURSOR_MAX_LIFETIME_BYTES, 1e6 ); if( randomize && BUGGIFY ) SQLITE_CURSOR_MAX_LIFETIME_BYTES = 0;
init( SQLITE_WRITE_WINDOW_LIMIT, -1 );
if( randomize && BUGGIFY ) {
// Choose an window between .01 and 1.01 seconds.

View File

@ -254,6 +254,7 @@ public:
int SQLITE_READER_THREADS;
int SQLITE_WRITE_WINDOW_LIMIT;
double SQLITE_WRITE_WINDOW_SECONDS;
int64_t SQLITE_CURSOR_MAX_LIFETIME_BYTES;
// KeyValueStoreSqlite spring cleaning
double SPRING_CLEANING_NO_ACTION_INTERVAL;

View File

@ -706,6 +706,7 @@ struct RawCursor {
BtCursor* cursor;
KeyInfo keyInfo;
bool valid;
int64_t kvBytesRead = 0;
operator bool() const { return valid; }
@ -1105,6 +1106,7 @@ struct RawCursor {
if (r == 0) {
Value result;
((ValueRef&)result) = decodeKVFragment(getEncodedRow(result.arena())).get().value;
kvBytesRead += key.size() + result.size();
return result;
}
@ -1115,12 +1117,14 @@ struct RawCursor {
DefragmentingReader i(*this, m, true);
if (i.peek() == key) {
Optional<KeyValueRef> kv = i.getNext();
kvBytesRead += key.size() + kv.get().value.size();
return Value(kv.get().value, m);
}
} else if (r == 0) {
Value result;
KeyValueRef kv = decodeKV(getEncodedRow(result.arena()));
((ValueRef&)result) = kv.value;
kvBytesRead += key.size() + result.size();
return result;
}
@ -1135,6 +1139,7 @@ struct RawCursor {
DefragmentingReader i(*this, m, getEncodedKVFragmentSize(key.size(), maxLength));
if (i.peek() == key) {
Optional<KeyValueRef> kv = i.getNext();
kvBytesRead += key.size() + kv.get().value.size();
return Value(kv.get().value, m);
}
} else if (!moveTo(key)) {
@ -1145,6 +1150,7 @@ struct RawCursor {
int maxEncodedSize = getEncodedSize(key.size(), maxLength);
KeyValueRef kv = decodeKVPrefix(getEncodedRowPrefix(result.arena(), maxEncodedSize), maxLength);
((ValueRef&)result) = kv.value;
kvBytesRead += key.size() + result.size();
return result;
}
return Optional<Value>();
@ -1221,6 +1227,8 @@ struct RawCursor {
ASSERT(result.size() > 0);
result.readThrough = result[result.size() - 1].key;
}
// AccumulatedBytes includes KeyValueRef overhead so subtract it
kvBytesRead += (accumulatedBytes - result.size() * sizeof(KeyValueRef));
return result;
}
@ -1639,7 +1647,7 @@ private:
Reference<ReadCursor> getCursor() {
Reference<ReadCursor> cursor = *ppReadCursor;
if (!cursor) {
if (!cursor || cursor->get().kvBytesRead > SERVER_KNOBS->SQLITE_CURSOR_MAX_LIFETIME_BYTES) {
*ppReadCursor = cursor = makeReference<ReadCursor>();
cursor->init(conn);
}