Merge pull request #5130 from neethuhaneesha/readCancelRocksDB
Cancelling the timedout reads with rocksdb storage.
This commit is contained in:
commit
41f94e6e3b
|
@ -344,6 +344,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( ROCKSDB_PREFIX_LEN, 0 );
|
||||
init( ROCKSDB_BLOCK_CACHE_SIZE, 0 );
|
||||
init( ROCKSDB_METRICS_DELAY, 60.0 );
|
||||
init( ROCKSDB_READ_VALUE_TIMEOUT, 5.0 );
|
||||
init( ROCKSDB_READ_VALUE_PREFIX_TIMEOUT, 5.0 );
|
||||
init( ROCKSDB_READ_RANGE_TIMEOUT, 5.0 );
|
||||
|
||||
// Leader election
|
||||
bool longLeaderElection = randomize && BUGGIFY;
|
||||
|
|
|
@ -276,6 +276,9 @@ public:
|
|||
int ROCKSDB_PREFIX_LEN;
|
||||
int64_t ROCKSDB_BLOCK_CACHE_SIZE;
|
||||
double ROCKSDB_METRICS_DELAY;
|
||||
double ROCKSDB_READ_VALUE_TIMEOUT;
|
||||
double ROCKSDB_READ_VALUE_PREFIX_TIMEOUT;
|
||||
double ROCKSDB_READ_RANGE_TIMEOUT;
|
||||
|
||||
// Leader election
|
||||
int MAX_NOTIFICATIONS;
|
||||
|
|
|
@ -309,16 +309,33 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
|
||||
struct Reader : IThreadPoolReceiver {
|
||||
DB& db;
|
||||
double readValueTimeout;
|
||||
double readValuePrefixTimeout;
|
||||
double readRangeTimeout;
|
||||
|
||||
explicit Reader(DB& db) : db(db) {}
|
||||
explicit Reader(DB& db) : db(db) {
|
||||
if (g_network->isSimulated()) {
|
||||
// In simulation, increasing the read operation timeouts to 5 minutes, as some of the tests have
|
||||
// very high load and single read thread cannot process all the load within the timeouts.
|
||||
readValueTimeout = 5 * 60;
|
||||
readValuePrefixTimeout = 5 * 60;
|
||||
readRangeTimeout = 5 * 60;
|
||||
} else {
|
||||
readValueTimeout = SERVER_KNOBS->ROCKSDB_READ_VALUE_TIMEOUT;
|
||||
readValuePrefixTimeout = SERVER_KNOBS->ROCKSDB_READ_VALUE_PREFIX_TIMEOUT;
|
||||
readRangeTimeout = SERVER_KNOBS->ROCKSDB_READ_RANGE_TIMEOUT;
|
||||
}
|
||||
}
|
||||
|
||||
void init() override {}
|
||||
|
||||
struct ReadValueAction : TypedAction<Reader, ReadValueAction> {
|
||||
Key key;
|
||||
Optional<UID> debugID;
|
||||
double startTime;
|
||||
ThreadReturnPromise<Optional<Value>> result;
|
||||
ReadValueAction(KeyRef key, Optional<UID> debugID) : key(key), debugID(debugID) {}
|
||||
ReadValueAction(KeyRef key, Optional<UID> debugID)
|
||||
: key(key), debugID(debugID), startTime(timer_monotonic()) {}
|
||||
double getTimeEstimate() const override { return SERVER_KNOBS->READ_VALUE_TIME_ESTIMATE; }
|
||||
};
|
||||
void action(ReadValueAction& a) {
|
||||
|
@ -327,8 +344,21 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
traceBatch = { TraceBatch{} };
|
||||
traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.Before");
|
||||
}
|
||||
if (timer_monotonic() - a.startTime > readValueTimeout) {
|
||||
TraceEvent(SevWarn, "RocksDBError")
|
||||
.detail("Error", "Read value request timedout")
|
||||
.detail("Method", "ReadValueAction")
|
||||
.detail("Timeout value", readValueTimeout);
|
||||
a.result.sendError(transaction_too_old());
|
||||
return;
|
||||
}
|
||||
rocksdb::PinnableSlice value;
|
||||
auto s = db->Get(getReadOptions(), db->DefaultColumnFamily(), toSlice(a.key), &value);
|
||||
auto options = getReadOptions();
|
||||
uint64_t deadlineMircos =
|
||||
db->GetEnv()->NowMicros() + (readValueTimeout - (timer_monotonic() - a.startTime)) * 1000000;
|
||||
std::chrono::seconds deadlineSeconds(deadlineMircos / 1000000);
|
||||
options.deadline = std::chrono::duration_cast<std::chrono::microseconds>(deadlineSeconds);
|
||||
auto s = db->Get(options, db->DefaultColumnFamily(), toSlice(a.key), &value);
|
||||
if (a.debugID.present()) {
|
||||
traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.After");
|
||||
traceBatch.get().dump();
|
||||
|
@ -347,13 +377,13 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
Key key;
|
||||
int maxLength;
|
||||
Optional<UID> debugID;
|
||||
double startTime;
|
||||
ThreadReturnPromise<Optional<Value>> result;
|
||||
ReadValuePrefixAction(Key key, int maxLength, Optional<UID> debugID)
|
||||
: key(key), maxLength(maxLength), debugID(debugID){};
|
||||
: key(key), maxLength(maxLength), debugID(debugID), startTime(timer_monotonic()){};
|
||||
double getTimeEstimate() const override { return SERVER_KNOBS->READ_VALUE_TIME_ESTIMATE; }
|
||||
};
|
||||
void action(ReadValuePrefixAction& a) {
|
||||
rocksdb::PinnableSlice value;
|
||||
Optional<TraceBatch> traceBatch;
|
||||
if (a.debugID.present()) {
|
||||
traceBatch = { TraceBatch{} };
|
||||
|
@ -361,7 +391,21 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
a.debugID.get().first(),
|
||||
"Reader.Before"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
}
|
||||
auto s = db->Get(getReadOptions(), db->DefaultColumnFamily(), toSlice(a.key), &value);
|
||||
if (timer_monotonic() - a.startTime > readValuePrefixTimeout) {
|
||||
TraceEvent(SevWarn, "RocksDBError")
|
||||
.detail("Error", "Read value prefix request timedout")
|
||||
.detail("Method", "ReadValuePrefixAction")
|
||||
.detail("Timeout value", readValuePrefixTimeout);
|
||||
a.result.sendError(transaction_too_old());
|
||||
return;
|
||||
}
|
||||
rocksdb::PinnableSlice value;
|
||||
auto options = getReadOptions();
|
||||
uint64_t deadlineMircos =
|
||||
db->GetEnv()->NowMicros() + (readValuePrefixTimeout - (timer_monotonic() - a.startTime)) * 1000000;
|
||||
std::chrono::seconds deadlineSeconds(deadlineMircos / 1000000);
|
||||
options.deadline = std::chrono::duration_cast<std::chrono::microseconds>(deadlineSeconds);
|
||||
auto s = db->Get(options, db->DefaultColumnFamily(), toSlice(a.key), &value);
|
||||
if (a.debugID.present()) {
|
||||
traceBatch.get().addEvent("GetValuePrefixDebug",
|
||||
a.debugID.get().first(),
|
||||
|
@ -384,12 +428,22 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
struct ReadRangeAction : TypedAction<Reader, ReadRangeAction>, FastAllocated<ReadRangeAction> {
|
||||
KeyRange keys;
|
||||
int rowLimit, byteLimit;
|
||||
double startTime;
|
||||
ThreadReturnPromise<RangeResult> result;
|
||||
ReadRangeAction(KeyRange keys, int rowLimit, int byteLimit)
|
||||
: keys(keys), rowLimit(rowLimit), byteLimit(byteLimit) {}
|
||||
: keys(keys), rowLimit(rowLimit), byteLimit(byteLimit), startTime(timer_monotonic()) {}
|
||||
double getTimeEstimate() const override { return SERVER_KNOBS->READ_RANGE_TIME_ESTIMATE; }
|
||||
};
|
||||
void action(ReadRangeAction& a) {
|
||||
if (timer_monotonic() - a.startTime > readRangeTimeout) {
|
||||
TraceEvent(SevWarn, "RocksDBError")
|
||||
.detail("Error", "Read range request timedout")
|
||||
.detail("Method", "ReadRangeAction")
|
||||
.detail("Timeout value", readRangeTimeout);
|
||||
a.result.sendError(transaction_too_old());
|
||||
return;
|
||||
}
|
||||
|
||||
RangeResult result;
|
||||
if (a.rowLimit == 0 || a.byteLimit == 0) {
|
||||
a.result.send(result);
|
||||
|
@ -397,6 +451,10 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
int accumulatedBytes = 0;
|
||||
rocksdb::Status s;
|
||||
auto options = getReadOptions();
|
||||
uint64_t deadlineMircos =
|
||||
db->GetEnv()->NowMicros() + (readRangeTimeout - (timer_monotonic() - a.startTime)) * 1000000;
|
||||
std::chrono::seconds deadlineSeconds(deadlineMircos / 1000000);
|
||||
options.deadline = std::chrono::duration_cast<std::chrono::microseconds>(deadlineSeconds);
|
||||
// When using a prefix extractor, ensure that keys are returned in order even if they cross
|
||||
// a prefix boundary.
|
||||
options.auto_prefix_mode = (SERVER_KNOBS->ROCKSDB_PREFIX_LEN > 0);
|
||||
|
@ -413,6 +471,14 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
if (result.size() >= a.rowLimit || accumulatedBytes >= a.byteLimit) {
|
||||
break;
|
||||
}
|
||||
if (timer_monotonic() - a.startTime > readRangeTimeout) {
|
||||
TraceEvent(SevWarn, "RocksDBError")
|
||||
.detail("Error", "Read range request timedout")
|
||||
.detail("Method", "ReadRangeAction")
|
||||
.detail("Timeout value", readRangeTimeout);
|
||||
a.result.sendError(transaction_too_old());
|
||||
return;
|
||||
}
|
||||
cursor->Next();
|
||||
}
|
||||
s = cursor->status();
|
||||
|
@ -432,6 +498,14 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
if (result.size() >= -a.rowLimit || accumulatedBytes >= a.byteLimit) {
|
||||
break;
|
||||
}
|
||||
if (timer_monotonic() - a.startTime > readRangeTimeout) {
|
||||
TraceEvent(SevWarn, "RocksDBError")
|
||||
.detail("Error", "Read range request timedout")
|
||||
.detail("Method", "ReadRangeAction")
|
||||
.detail("Timeout value", readRangeTimeout);
|
||||
a.result.sendError(transaction_too_old());
|
||||
return;
|
||||
}
|
||||
cursor->Prev();
|
||||
}
|
||||
s = cursor->status();
|
||||
|
|
Loading…
Reference in New Issue