- Block a peek request on a tlog until the tlog has a commit version

that is relevant to the requester

Code extracted from https://github.com/apple/foundationdb/pull/5058
This commit is contained in:
Sreenath Bodagala 2021-07-13 20:21:12 +00:00
parent f77093ea61
commit 5f504d2148
1 changed files with 27 additions and 4 deletions

View File

@ -516,6 +516,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
Deque<std::pair<Version, Standalone<VectorRef<uint8_t>>>> messageBlocks;
std::vector<std::vector<Reference<TagData>>> tag_data; // tag.locality | tag.id
int unpoppedRecoveredTags;
std::map<Tag, Promise<Void>> waitingTags;
Reference<TagData> getTagData(Tag tag) {
int idx = tag.toTagDataIndex();
@ -1452,6 +1453,13 @@ void commitMessages(TLogData* self,
txsBytes += tagData->versionMessages.back().second.expectedSize();
}
auto iter = logData->waitingTags.find(tag);
if (iter != logData->waitingTags.end()) {
auto promise = iter->second;
logData->waitingTags.erase(iter);
promise.send(Void());
}
// The factor of VERSION_MESSAGES_OVERHEAD is intended to be an overestimate of the actual memory used
// to store this data in a std::deque. In practice, this number is probably something like 528/512
// ~= 1.03, but this could vary based on the implementation. There will also be a fixed overhead per
@ -1506,6 +1514,17 @@ std::deque<std::pair<Version, LengthPrefixedStringRef>>& getVersionMessages(Refe
return tagData->versionMessages;
};
ACTOR Future<Void> waitForMessagesForTag(Reference<LogData> self, TLogPeekRequest* req) {
auto tagData = self->getTagData(req->tag);
if (tagData.isValid()) {
return Void();
}
wait(self->waitingTags[req->tag].getFuture());
// we want the caller to finish first, otherwise the data structure it is building might not be complete
wait(delay(0.0));
return Void();
}
void peekMessagesFromMemory(Reference<LogData> self,
TLogPeekRequest const& req,
BinaryWriter& messages,
@ -1835,6 +1854,10 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
if (req.onlySpilled) {
endVersion = logData->persistentDataDurableVersion + 1;
} else {
if (req.tag.locality >= 0 && !req.returnIfBlocked) {
// wait for at most 1 second
wait(waitForMessagesForTag(logData, &req) || delay(1.0));
}
peekMessagesFromMemory(logData, req, messages, endVersion);
}
@ -2655,7 +2678,7 @@ ACTOR Future<Void> tLogCore(TLogData* self,
SERVER_KNOBS->STORAGE_LOGGING_DELAY,
&logData->cc,
logData->logId.toString() + "/TLogMetrics",
[self=self](TraceEvent& te) {
[self = self](TraceEvent& te) {
StorageBytes sbTlog = self->persistentData->getStorageBytes();
te.detail("KvstoreBytesUsed", sbTlog.used);
te.detail("KvstoreBytesFree", sbTlog.free);
@ -2885,9 +2908,9 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
logsByVersion.emplace_back(ver, id1);
TraceEvent("TLogPersistentStateRestore", self->dbgid)
.detail("LogId", logData->logId)
.detail("Ver", ver)
.detail("RecoveryCount", logData->recoveryCount);
.detail("LogId", logData->logId)
.detail("Ver", ver)
.detail("RecoveryCount", logData->recoveryCount);
// Restore popped keys. Pop operations that took place after the last (committed) updatePersistentDataVersion
// might be lost, but that is fine because we will get the corresponding data back, too.
tagKeys = prefixRange(rawId.withPrefix(persistTagPoppedKeys.begin));