Merge pull request #11600 from jzhou77/coroutine

Refactor LogRouter and Watch workload using coroutines
This commit is contained in:
Jingyu Zhou 2024-08-26 16:52:23 -07:00 committed by GitHub
commit 9a914d9400
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 417 additions and 383 deletions

View File

@ -100,7 +100,7 @@ co_await Choose()
})
.When(foo(), [](Foo const& f) {
// do something else
}).Run();
}).run();
```
While `Choose` and `choose` behave very similarly, there are some minor differences between
@ -589,16 +589,16 @@ Luckily, with coroutines, we can do one better: generalize the retry loop. The a
```c++
co_await db.run([&](ReadYourWritesTransaction* tr) -> Future<Void> {
Value v = wait(tr.get(key));
tr.set(key2, val2);
wait(tr.commit());
Value v = wait(tr->get(key));
tr->set(key2, val2);
wait(tr->commit());
});
```
A possible implementation of `Database::run` would be:
```c++
template <std:invocable<ReadYourWritesTransaction*> Fun>
template <std::invocable<ReadYourWritesTransaction*> Fun>
Future<Void> Database::run(Fun fun) {
ReadYourWritesTransaction tr(*this);
Future<Void> onError;
@ -609,6 +609,7 @@ Future<Void> Database::run(Fun fun) {
}
try {
co_await fun(&tr);
co_return;
} catch (Error& e) {
onError = tr.onError(e);
}

View File

@ -119,6 +119,9 @@ public:
const UniqueOrderedOptionList<FDBTransactionOptions>& getTransactionDefaults() const;
template <std::invocable<Transaction*> Fun>
Future<Void> run(Fun fun);
private:
Reference<DatabaseContext> db;
};
@ -578,6 +581,24 @@ private:
Future<Void> committing;
};
template <std::invocable<Transaction*> Fun>
Future<Void> Database::run(Fun fun) {
Transaction tr(*this);
Future<Void> onError;
while (true) {
if (onError.isValid()) {
co_await onError;
onError = Future<Void>();
}
try {
co_await fun(&tr);
co_return;
} catch (Error& e) {
onError = tr.onError(e);
}
}
}
ACTOR Future<Version> waitForCommittedVersion(Database cx, Version version, SpanContext spanContext);
ACTOR Future<Standalone<VectorRef<DDMetricsRef>>> waitDataDistributionMetricsList(Database cx,
KeyRange keys,

View File

@ -605,7 +605,7 @@ public:
// Must be called on the server before using a ReplyPromiseStream to limit the amount of outstanding bytes to the
// client
void setByteLimit(int64_t byteLimit) { queue->acknowledgements.bytesLimit = byteLimit; }
void setByteLimit(int64_t byteLimit) const { queue->acknowledgements.bytesLimit = byteLimit; }
void operator=(const ReplyPromiseStream& rhs) {
rhs.queue->addPromiseRef();

View File

@ -1,5 +1,5 @@
/*
* LogRouter.actor.cpp
* LogRouter.cpp
*
* This source file is part of the FoundationDB open source project
*
@ -28,6 +28,7 @@
#include "flow/ActorCollection.h"
#include "flow/Arena.h"
#include "flow/CodeProbe.h"
#include "flow/Coroutines.h"
#include "flow/Histogram.h"
#include "flow/Trace.h"
#include "flow/network.h"
@ -55,25 +56,16 @@ struct LogRouterData {
}
// Erase messages not needed to update *from* versions >= before (thus, messages with toversion <= before)
ACTOR Future<Void> eraseMessagesBefore(TagData* self,
Version before,
LogRouterData* tlogData,
TaskPriority taskID) {
while (!self->version_messages.empty() && self->version_messages.front().first < before) {
Version version = self->version_messages.front().first;
Future<Void> eraseMessagesBefore(Version before, TaskPriority taskID) {
while (!version_messages.empty() && version_messages.front().first < before) {
Version version = version_messages.front().first;
while (!self->version_messages.empty() && self->version_messages.front().first == version) {
self->version_messages.pop_front();
while (!version_messages.empty() && version_messages.front().first == version) {
version_messages.pop_front();
}
wait(yield(taskID));
co_await yield(taskID);
}
return Void();
}
Future<Void> eraseMessagesBefore(Version before, LogRouterData* tlogData, TaskPriority taskID) {
return eraseMessagesBefore(this, before, tlogData, taskID);
}
};
@ -199,9 +191,56 @@ struct LogRouterData {
te.detail("RouterTag", this->routerTag.toString());
});
}
std::deque<std::pair<Version, LengthPrefixedStringRef>>& get_version_messages(Tag tag) {
auto tagData = getTagData(tag);
if (!tagData) {
static std::deque<std::pair<Version, LengthPrefixedStringRef>> empty;
return empty;
}
return tagData->version_messages;
}
Version getTagPopVersion(Tag tag) {
auto tagData = getTagData(tag);
if (!tagData)
return Version(0);
return tagData->popped;
}
// Copy pulled messages into memory blocks owned by each tag, i.e., tag_data.
void commitMessages(Version version, const std::vector<TagsAndMessage>& taggedMessages);
Future<Void> waitForVersion(Version ver);
Future<Void> waitForVersionAndLog(Version ver);
void peekMessagesFromMemory(Tag tag, Version begin, BinaryWriter& messages, Version& endVersion);
// Common logics to peek TLog and create TLogPeekReply that serves both streaming peek or normal peek request
template <typename PromiseType>
Future<Void> logRouterPeekMessages(PromiseType replyPromise,
Version reqBegin,
Tag reqTag,
bool reqReturnIfBlocked = false,
bool reqOnlySpilled = false,
Optional<std::pair<UID, int>> reqSequence = Optional<std::pair<UID, int>>());
// Keeps pushing TLogPeekStreamReply until it's removed from the cluster or should recover
Future<Void> logRouterPeekStream(TLogPeekStreamRequest req);
// Log router (LR) asynchronously pull data from satellite tLogs (preferred) or primary tLogs at tag
// (self->routerTag) for the version range from the LR's current version (exclusive) to its epoch's end version or
// recovery version.
Future<Void> pullAsyncData();
Future<Reference<ILogSystem::IPeekCursor>> getPeekCursorData(Reference<ILogSystem::IPeekCursor> r,
Version beginVersion);
// Future<Void> logRouterPop(const TLogPopRequest& req);
Future<Void> cleanupPeekTrackers();
};
void commitMessages(LogRouterData* self, Version version, const std::vector<TagsAndMessage>& taggedMessages) {
void LogRouterData::commitMessages(Version version, const std::vector<TagsAndMessage>& taggedMessages) {
if (!taggedMessages.size()) {
return;
}
@ -214,27 +253,27 @@ void commitMessages(LogRouterData* self, Version version, const std::vector<Tags
// Grab the last block in the blocks list so we can share its arena
// We pop all of the elements of it to create a "fresh" vector that starts at the end of the previous vector
Standalone<VectorRef<uint8_t>> block;
if (self->messageBlocks.empty()) {
if (messageBlocks.empty()) {
block = Standalone<VectorRef<uint8_t>>();
block.reserve(block.arena(), std::max<int64_t>(SERVER_KNOBS->TLOG_MESSAGE_BLOCK_BYTES, msgSize));
} else {
block = self->messageBlocks.back().second;
block = messageBlocks.back().second;
}
block.pop_front(block.size());
for (const auto& msg : taggedMessages) {
if (msg.message.size() > block.capacity() - block.size()) {
self->messageBlocks.emplace_back(version, block);
messageBlocks.emplace_back(version, block);
block = Standalone<VectorRef<uint8_t>>();
block.reserve(block.arena(), std::max<int64_t>(SERVER_KNOBS->TLOG_MESSAGE_BLOCK_BYTES, msgSize));
}
block.append(block.arena(), msg.message.begin(), msg.message.size());
for (const auto& tag : msg.tags) {
auto tagData = self->getTagData(tag);
auto tagData = getTagData(tag);
if (!tagData) {
tagData = self->createTagData(tag, 0, 0);
tagData = createTagData(tag, 0, 0);
}
if (version >= tagData->popped) {
@ -249,162 +288,164 @@ void commitMessages(LogRouterData* self, Version version, const std::vector<Tags
msgSize -= msg.message.size();
}
self->messageBlocks.emplace_back(version, block);
messageBlocks.emplace_back(version, block);
}
ACTOR Future<Void> waitForVersion(LogRouterData* self, Version ver) {
Future<Void> LogRouterData::waitForVersion(Version ver) {
// The only time the log router should allow a gap in versions larger than MAX_READ_TRANSACTION_LIFE_VERSIONS is
// when processing epoch end. Since one set of log routers is created per generation of transaction logs, the gap
// caused by epoch end will be within MAX_VERSIONS_IN_FLIGHT of the log routers start version.
state double startTime = now();
if (self->version.get() < self->startVersion) {
double startTime = now();
if (version.get() < startVersion) {
// Log router needs to wait for remote tLogs to process data, whose version is less than self->startVersion,
// before the log router can pull more data (i.e., data after self->startVersion) from satellite tLog;
// This prevents LR from getting OOM due to it pulls too much data from satellite tLog at once;
// Note: each commit writes data to both primary tLog and satellite tLog. Satellite tLog can be viewed as
// a part of primary tLogs.
if (ver > self->startVersion) {
self->version.set(self->startVersion);
if (ver > startVersion) {
version.set(startVersion);
// Wait for remote tLog to peek and pop from LR,
// so that LR's minPopped version can increase to self->startVersion
wait(self->minPopped.whenAtLeast(self->version.get()));
co_await minPopped.whenAtLeast(version.get());
}
self->waitForVersionTime += now() - startTime;
self->maxWaitForVersionTime = std::max(self->maxWaitForVersionTime, now() - startTime);
return Void();
waitForVersionTime += now() - startTime;
maxWaitForVersionTime = std::max(maxWaitForVersionTime, now() - startTime);
co_return;
}
if (!self->foundEpochEnd) {
if (!foundEpochEnd) {
// Similar to proxy that does not keep more than MAX_READ_TRANSACTION_LIFE_VERSIONS transactions outstanding;
// Log router does not keep more than MAX_READ_TRANSACTION_LIFE_VERSIONS transactions outstanding because
// remote SS cannot roll back to more than MAX_READ_TRANSACTION_LIFE_VERSIONS ago.
wait(self->minPopped.whenAtLeast(
std::min(self->version.get(), ver - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)));
co_await minPopped.whenAtLeast(std::min(version.get(), ver - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS));
} else {
while (self->minPopped.get() + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS < ver) {
if (self->minPopped.get() + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS > self->version.get()) {
self->version.set(self->minPopped.get() + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS);
wait(yield(TaskPriority::TLogCommit));
while (minPopped.get() + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS < ver) {
if (minPopped.get() + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS > version.get()) {
version.set(minPopped.get() + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS);
co_await yield(TaskPriority::TLogCommit);
} else {
wait(self->minPopped.whenAtLeast((self->minPopped.get() + 1)));
co_await minPopped.whenAtLeast((minPopped.get() + 1));
}
}
}
if (ver >= self->startVersion + SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT) {
self->foundEpochEnd = true;
if (ver >= startVersion + SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT) {
foundEpochEnd = true;
}
self->waitForVersionTime += now() - startTime;
self->maxWaitForVersionTime = std::max(self->maxWaitForVersionTime, now() - startTime);
return Void();
waitForVersionTime += now() - startTime;
maxWaitForVersionTime = std::max(maxWaitForVersionTime, now() - startTime);
}
ACTOR Future<Void> waitForVersionAndLog(LogRouterData* self, Version ver) {
state Future<Void> f = waitForVersion(self, ver);
state double emitInterval = 60.0;
Future<Void> LogRouterData::waitForVersionAndLog(Version ver) {
Future<Void> f = waitForVersion(ver);
double emitInterval = 60.0;
loop {
choose {
when(wait(f)) {
return Void();
}
when(wait(delay(emitInterval))) {
TraceEvent("LogRouterWaitForVersionLongDelay", self->dbgid)
.detail("WaitForVersion", ver)
.detail("StartVersion", self->startVersion)
.detail("Version", self->version.get())
.detail("MinPopped", self->minPopped.get())
.detail("FoundEpochEnd", self->foundEpochEnd);
}
bool shouldExit = false;
co_await Choose()
.When(f, [&](const Void&) { shouldExit = true; })
.When(delay(emitInterval),
[&](const Void&) {
TraceEvent("LogRouterWaitForVersionLongDelay", dbgid)
.detail("WaitForVersion", ver)
.detail("StartVersion", startVersion)
.detail("Version", version.get())
.detail("MinPopped", minPopped.get())
.detail("FoundEpochEnd", foundEpochEnd);
})
.run();
if (shouldExit) {
break;
}
}
}
ACTOR Future<Reference<ILogSystem::IPeekCursor>> getPeekCursorData(LogRouterData* self,
Reference<ILogSystem::IPeekCursor> r,
Version startVersion) {
state Reference<ILogSystem::IPeekCursor> result = r;
state bool useSatellite = SERVER_KNOBS->LOG_ROUTER_PEEK_FROM_SATELLITES_PREFERRED;
state uint32_t noPrimaryPeekLocation = 0;
Future<Reference<ILogSystem::IPeekCursor>> LogRouterData::getPeekCursorData(Reference<ILogSystem::IPeekCursor> r,
Version beginVersion) {
Reference<ILogSystem::IPeekCursor> result = r;
bool useSatellite = SERVER_KNOBS->LOG_ROUTER_PEEK_FROM_SATELLITES_PREFERRED;
uint32_t noPrimaryPeekLocation = 0;
loop {
Future<Void> getMoreF = Never();
if (result) {
getMoreF = result->getMore(TaskPriority::TLogCommit);
++self->getMoreCount;
++getMoreCount;
if (!getMoreF.isReady()) {
++self->getMoreBlockedCount;
++getMoreBlockedCount;
}
}
state double startTime = now();
choose {
when(wait(getMoreF)) {
double peekTime = now() - startTime;
self->peekLatencyDist->sampleSeconds(peekTime);
self->getMoreTime += peekTime;
self->maxGetMoreTime = std::max(self->maxGetMoreTime, peekTime);
return result;
}
when(wait(self->logSystemChanged)) {
if (self->logSystem->get()) {
result =
self->logSystem->get()->peekLogRouter(self->dbgid, startVersion, self->routerTag, useSatellite);
self->primaryPeekLocation = result->getPrimaryPeekLocation();
TraceEvent("LogRouterPeekLocation", self->dbgid)
.detail("LogID", result->getPrimaryPeekLocation())
.trackLatest(self->eventCacheHolder->trackingKey);
} else {
result = Reference<ILogSystem::IPeekCursor>();
}
self->logSystemChanged = self->logSystem->onChange();
}
when(wait(result ? delay(SERVER_KNOBS->LOG_ROUTER_PEEK_SWITCH_DC_TIME) : Never())) {
// Peek has become stuck for a while, trying switching between primary DC and satellite
CODE_PROBE(true, "Detect log router slow peeks");
TraceEvent(SevWarnAlways, "LogRouterSlowPeek", self->dbgid).detail("NextTrySatellite", !useSatellite);
useSatellite = !useSatellite;
result =
self->logSystem->get()->peekLogRouter(self->dbgid, startVersion, self->routerTag, useSatellite);
self->primaryPeekLocation = result->getPrimaryPeekLocation();
TraceEvent("LogRouterPeekLocation", self->dbgid)
.detail("LogID", result->getPrimaryPeekLocation())
.trackLatest(self->eventCacheHolder->trackingKey);
// If no primary peek location after many tries, flag an error for manual intervention.
// The LR may become a bottleneck on the system and need to be excluded.
noPrimaryPeekLocation = self->primaryPeekLocation.present() ? 0 : ++noPrimaryPeekLocation;
if (!(noPrimaryPeekLocation % 4)) {
TraceEvent(SevWarnAlways, "NoPrimaryPeekLocationForLR", self->dbgid);
}
}
double startTime = now();
bool shouldExit = false;
co_await Choose()
.When(getMoreF,
[&](const Void&) {
double peekTime = now() - startTime;
peekLatencyDist->sampleSeconds(peekTime);
getMoreTime += peekTime;
maxGetMoreTime = std::max(maxGetMoreTime, peekTime);
shouldExit = true;
})
.When(logSystemChanged,
[&](const Void&) {
if (logSystem->get()) {
result = logSystem->get()->peekLogRouter(dbgid, beginVersion, routerTag, useSatellite);
primaryPeekLocation = result->getPrimaryPeekLocation();
TraceEvent("LogRouterPeekLocation", dbgid)
.detail("LogID", result->getPrimaryPeekLocation())
.trackLatest(eventCacheHolder->trackingKey);
} else {
result = Reference<ILogSystem::IPeekCursor>();
}
logSystemChanged = logSystem->onChange();
})
.When(result ? delay(SERVER_KNOBS->LOG_ROUTER_PEEK_SWITCH_DC_TIME) : Never(),
[&](const Void&) {
// Peek has become stuck for a while, trying switching between primary DC and satellite
CODE_PROBE(true, "Detect log router slow peeks");
TraceEvent(SevWarnAlways, "LogRouterSlowPeek", dbgid).detail("NextTrySatellite", !useSatellite);
useSatellite = !useSatellite;
result = logSystem->get()->peekLogRouter(dbgid, beginVersion, routerTag, useSatellite);
primaryPeekLocation = result->getPrimaryPeekLocation();
TraceEvent("LogRouterPeekLocation", dbgid)
.detail("LogID", result->getPrimaryPeekLocation())
.trackLatest(eventCacheHolder->trackingKey);
// If no primary peek location after many tries, flag an error for manual intervention.
// The LR may become a bottleneck on the system and need to be excluded.
noPrimaryPeekLocation = primaryPeekLocation.present() ? 0 : ++noPrimaryPeekLocation;
if (!(noPrimaryPeekLocation % 4)) {
TraceEvent(SevWarnAlways, "NoPrimaryPeekLocationForLR", dbgid);
}
})
.run();
if (shouldExit) {
co_return result;
}
}
}
// Log router (LR) asynchronously pull data from satellite tLogs (preferred) or primary tLogs at tag (self->routerTag)
// for the version range from the LR's current version (exclusive) to its epoch's end version or recovery version.
ACTOR Future<Void> pullAsyncData(LogRouterData* self) {
state Reference<ILogSystem::IPeekCursor> r;
state Version tagAt = self->version.get() + 1;
state Version lastVer = 0;
state std::vector<int> tags; // an optimization to avoid reallocating vector memory in every loop
Future<Void> LogRouterData::pullAsyncData() {
Reference<ILogSystem::IPeekCursor> r;
Version tagAt = version.get() + 1;
Version lastVer = 0;
std::vector<int> tags; // an optimization to avoid reallocating vector memory in every loop
loop {
Reference<ILogSystem::IPeekCursor> _r = wait(getPeekCursorData(self, r, tagAt));
r = _r;
r = co_await getPeekCursorData(r, tagAt);
self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, r->getMinKnownCommittedVersion());
minKnownCommittedVersion = std::max(minKnownCommittedVersion, r->getMinKnownCommittedVersion());
state Version ver = 0;
state std::vector<TagsAndMessage> messages;
state Arena arena;
Version ver = 0;
std::vector<TagsAndMessage> messages;
Arena arena;
while (true) {
state bool foundMessage = r->hasMessage();
bool foundMessage = r->hasMessage();
if (!foundMessage || r->version().version != ver) {
ASSERT(r->version().version > lastVer);
if (ver) {
wait(waitForVersionAndLog(self, ver));
co_await waitForVersionAndLog(ver);
commitMessages(self, ver, messages);
self->version.set(ver);
wait(yield(TaskPriority::TLogCommit));
commitMessages(ver, messages);
version.set(ver);
co_await yield(TaskPriority::TLogCommit);
//TraceEvent("LogRouterVersion").detail("Ver",ver);
}
lastVer = ver;
@ -414,11 +455,11 @@ ACTOR Future<Void> pullAsyncData(LogRouterData* self) {
if (!foundMessage) {
ver--; // ver is the next possible version we will get data for
if (ver > self->version.get() && ver >= r->popped()) {
wait(waitForVersionAndLog(self, ver));
if (ver > version.get() && ver >= r->popped()) {
co_await waitForVersionAndLog(ver);
self->version.set(ver);
wait(yield(TaskPriority::TLogCommit));
version.set(ver);
co_await yield(TaskPriority::TLogCommit);
}
break;
}
@ -427,7 +468,7 @@ ACTOR Future<Void> pullAsyncData(LogRouterData* self) {
TagsAndMessage tagAndMsg;
tagAndMsg.message = r->getMessageWithTags();
tags.clear();
self->logSet.getPushLocations(r->getTags(), tags, 0);
logSet.getPushLocations(r->getTags(), tags, 0);
tagAndMsg.tags.reserve(arena, tags.size());
for (const auto& t : tags) {
tagAndMsg.tags.push_back(arena, Tag(tagLocalityRemoteLog, t));
@ -437,24 +478,15 @@ ACTOR Future<Void> pullAsyncData(LogRouterData* self) {
r->nextMessage();
}
tagAt = std::max(r->version().version, self->version.get() + 1);
tagAt = std::max(r->version().version, version.get() + 1);
}
}
std::deque<std::pair<Version, LengthPrefixedStringRef>>& get_version_messages(LogRouterData* self, Tag tag) {
auto tagData = self->getTagData(tag);
if (!tagData) {
static std::deque<std::pair<Version, LengthPrefixedStringRef>> empty;
return empty;
}
return tagData->version_messages;
};
void peekMessagesFromMemory(LogRouterData* self, Tag tag, Version begin, BinaryWriter& messages, Version& endVersion) {
void LogRouterData::peekMessagesFromMemory(Tag tag, Version begin, BinaryWriter& messages, Version& endVersion) {
ASSERT(!messages.getLength());
auto& deque = get_version_messages(self, tag);
//TraceEvent("TLogPeekMem", self->dbgid).detail("Tag", req.tag1).detail("PDS", self->persistentDataSequence).detail("PDDS", self->persistentDataDurableSequence).detail("Oldest", map1.empty() ? 0 : map1.begin()->key ).detail("OldestMsgCount", map1.empty() ? 0 : map1.begin()->value.size());
auto& deque = get_version_messages(tag);
//TraceEvent("TLogPeekMem", dbgid).detail("Tag", req.tag1).detail("PDS", persistentDataSequence).detail("PDDS", persistentDataDurableSequence).detail("Oldest", map1.empty() ? 0 : map1.begin()->key ).detail("OldestMsgCount", map1.empty() ? 0 : map1.begin()->value.size());
auto it = std::lower_bound(deque.begin(),
deque.end(),
@ -478,43 +510,33 @@ void peekMessagesFromMemory(LogRouterData* self, Tag tag, Version begin, BinaryW
}
}
Version poppedVersion(LogRouterData* self, Tag tag) {
auto tagData = self->getTagData(tag);
if (!tagData)
return Version(0);
return tagData->popped;
}
template <typename PromiseType>
Future<Void> LogRouterData::logRouterPeekMessages(PromiseType replyPromise,
Version reqBegin,
Tag reqTag,
bool reqReturnIfBlocked,
bool reqOnlySpilled,
Optional<std::pair<UID, int>> reqSequence) {
BinaryWriter messages(Unversioned());
int sequence = -1;
UID peekId;
// Common logics to peek TLog and create TLogPeekReply that serves both streaming peek or normal peek request
ACTOR template <typename PromiseType>
Future<Void> logRouterPeekMessages(PromiseType replyPromise,
LogRouterData* self,
Version reqBegin,
Tag reqTag,
bool reqReturnIfBlocked = false,
bool reqOnlySpilled = false,
Optional<std::pair<UID, int>> reqSequence = Optional<std::pair<UID, int>>()) {
state BinaryWriter messages(Unversioned());
state int sequence = -1;
state UID peekId;
DebugLogTraceEvent("LogRouterPeek0", self->dbgid)
DebugLogTraceEvent("LogRouterPeek0", dbgid)
.detail("ReturnIfBlocked", reqReturnIfBlocked)
.detail("Tag", reqTag.toString())
.detail("Seq", reqSequence.present() ? reqSequence.get().second : -1)
.detail("SeqCursor", reqSequence.present() ? reqSequence.get().first : UID())
.detail("Ver", self->version.get())
.detail("Ver", version.get())
.detail("Begin", reqBegin);
if (reqSequence.present()) {
try {
peekId = reqSequence.get().first;
sequence = reqSequence.get().second;
if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS &&
self->peekTracker.find(peekId) == self->peekTracker.end()) {
if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && peekTracker.find(peekId) == peekTracker.end()) {
throw operation_obsolete();
}
auto& trackerData = self->peekTracker[peekId];
auto& trackerData = peekTracker[peekId];
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
trackerData.sequence_version[0].send(std::make_pair(reqBegin, reqOnlySpilled));
}
@ -534,12 +556,12 @@ Future<Void> logRouterPeekMessages(PromiseType replyPromise,
}
trackerData.lastUpdate = now();
std::pair<Version, bool> prevPeekData = wait(trackerData.sequence_version[sequence].getFuture());
std::pair<Version, bool> prevPeekData = co_await trackerData.sequence_version[sequence].getFuture();
reqBegin = prevPeekData.first;
reqOnlySpilled = prevPeekData.second;
wait(yield());
co_await yield();
} catch (Error& e) {
DebugLogTraceEvent("LogRouterPeekError", self->dbgid)
DebugLogTraceEvent("LogRouterPeekError", dbgid)
.error(e)
.detail("Tag", reqTag.toString())
.detail("Seq", reqSequence.present() ? reqSequence.get().second : -1)
@ -548,49 +570,49 @@ Future<Void> logRouterPeekMessages(PromiseType replyPromise,
if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) {
replyPromise.sendError(e);
return Void();
co_return;
} else {
throw;
}
}
}
if (reqReturnIfBlocked && self->version.get() < reqBegin) {
if (reqReturnIfBlocked && version.get() < reqBegin) {
replyPromise.sendError(end_of_stream());
if (reqSequence.present()) {
auto& trackerData = self->peekTracker[peekId];
auto& trackerData = peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence + 1];
if (!sequenceData.isSet()) {
sequenceData.send(std::make_pair(reqBegin, reqOnlySpilled));
}
}
return Void();
co_return;
}
if (self->version.get() < reqBegin) {
wait(self->version.whenAtLeast(reqBegin));
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
if (version.get() < reqBegin) {
co_await version.whenAtLeast(reqBegin);
co_await delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask());
}
state double startTime = now();
state Version poppedVer;
state Version endVersion;
double startTime = now();
Version poppedVer;
Version endVersion;
// Run the peek logic in a loop to account for the case where there is no data to return to the caller, and we may
// want to wait a little bit instead of just sending back an empty message. This feature is controlled by a knob.
loop {
poppedVer = poppedVersion(self, reqTag);
poppedVer = getTagPopVersion(reqTag);
if (poppedVer > reqBegin || reqBegin < self->startVersion) {
if (poppedVer > reqBegin || reqBegin < startVersion) {
// This should only happen if a packet is sent multiple times and the reply is not needed.
// Since we are using popped differently, do not send a reply.
TraceEvent(SevWarnAlways, "LogRouterPeekPopped", self->dbgid)
TraceEvent(SevWarnAlways, "LogRouterPeekPopped", dbgid)
.detail("Begin", reqBegin)
.detail("Popped", poppedVer)
.detail("Tag", reqTag.toString())
.detail("Seq", reqSequence.present() ? reqSequence.get().second : -1)
.detail("SeqCursor", reqSequence.present() ? reqSequence.get().first : UID())
.detail("Start", self->startVersion);
.detail("Start", startVersion);
if (std::is_same<PromiseType, Promise<TLogPeekReply>>::value) {
// kills logRouterPeekStream actor, otherwise that actor becomes stuck
throw operation_obsolete();
@ -603,13 +625,13 @@ Future<Void> logRouterPeekMessages(PromiseType replyPromise,
replyPromise.send(Never());
}
return Void();
co_return;
}
ASSERT(reqBegin >= poppedVersion(self, reqTag) && reqBegin >= self->startVersion);
ASSERT(reqBegin >= getTagPopVersion(reqTag) && reqBegin >= startVersion);
endVersion = self->version.get() + 1;
peekMessagesFromMemory(self, reqTag, reqBegin, messages, endVersion);
endVersion = version.get() + 1;
peekMessagesFromMemory(reqTag, reqBegin, messages, endVersion);
// Reply the peek request when
// - Have data return to the caller, or
@ -620,43 +642,44 @@ Future<Void> logRouterPeekMessages(PromiseType replyPromise,
break;
}
state Version waitUntilVersion = self->version.get() + 1;
Version waitUntilVersion = version.get() + 1;
// Currently, from `reqBegin` to self->version are all empty peeks. Wait for more version, or the empty batching
// interval has expired.
wait(self->version.whenAtLeast(waitUntilVersion) ||
delay(SERVER_KNOBS->PEEK_BATCHING_EMPTY_MSG_INTERVAL - (now() - startTime)));
if (self->version.get() < waitUntilVersion) {
auto ready = version.whenAtLeast(waitUntilVersion) ||
delay(SERVER_KNOBS->PEEK_BATCHING_EMPTY_MSG_INTERVAL - (now() - startTime));
co_await ready;
if (version.get() < waitUntilVersion) {
break; // We know that from `reqBegin` to self->version are all empty messages. Skip re-executing the peek
// logic.
}
}
TLogPeekReply reply;
reply.maxKnownVersion = self->version.get();
reply.minKnownCommittedVersion = self->poppedVersion;
reply.maxKnownVersion = version.get();
reply.minKnownCommittedVersion = poppedVersion;
auto messagesValue = messages.toValue();
reply.arena.dependsOn(messagesValue.arena());
reply.messages = messagesValue;
reply.popped = self->minPopped.get() >= self->startVersion ? self->minPopped.get() : 0;
reply.popped = minPopped.get() >= startVersion ? minPopped.get() : 0;
reply.end = endVersion;
reply.onlySpilled = false;
if (reqSequence.present()) {
auto& trackerData = self->peekTracker[peekId];
auto& trackerData = peekTracker[peekId];
trackerData.lastUpdate = now();
auto& sequenceData = trackerData.sequence_version[sequence + 1];
if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) {
replyPromise.sendError(operation_obsolete());
if (!sequenceData.isSet())
sequenceData.sendError(operation_obsolete());
return Void();
co_return;
}
if (sequenceData.isSet()) {
if (sequenceData.getFuture().get().first != reply.end) {
CODE_PROBE(true, "tlog peek second attempt ended at a different version");
replyPromise.sendError(operation_obsolete());
return Void();
co_return;
}
} else {
sequenceData.send(std::make_pair(reply.end, reply.onlySpilled));
@ -665,42 +688,41 @@ Future<Void> logRouterPeekMessages(PromiseType replyPromise,
}
replyPromise.send(reply);
DebugLogTraceEvent("LogRouterPeek4", self->dbgid)
DebugLogTraceEvent("LogRouterPeek4", dbgid)
.detail("Tag", reqTag.toString())
.detail("ReqBegin", reqBegin)
.detail("End", reply.end)
.detail("MessageSize", reply.messages.size())
.detail("PoppedVersion", self->poppedVersion);
return Void();
.detail("PoppedVersion", poppedVersion);
}
// This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover
ACTOR Future<Void> logRouterPeekStream(LogRouterData* self, TLogPeekStreamRequest req) {
self->activePeekStreams++;
Future<Void> LogRouterData::logRouterPeekStream(TLogPeekStreamRequest req) {
activePeekStreams++;
state Version begin = req.begin;
state bool onlySpilled = false;
Version begin = req.begin;
bool onlySpilled = false;
req.reply.setByteLimit(std::min(SERVER_KNOBS->MAXIMUM_PEEK_BYTES, req.limitBytes));
loop {
state TLogPeekStreamReply reply;
state Promise<TLogPeekReply> promise;
state Future<TLogPeekReply> future(promise.getFuture());
TLogPeekStreamReply reply;
Promise<TLogPeekReply> promise;
Future<TLogPeekReply> future(promise.getFuture());
try {
wait(req.reply.onReady() && store(reply.rep, future) &&
logRouterPeekMessages(promise, self, begin, req.tag, req.returnIfBlocked, onlySpilled));
auto ready = req.reply.onReady() && store(reply.rep, future) &&
logRouterPeekMessages(promise, begin, req.tag, req.returnIfBlocked, onlySpilled);
co_await ready;
reply.rep.begin = begin;
req.reply.send(reply);
begin = reply.rep.end;
onlySpilled = reply.rep.onlySpilled;
if (reply.rep.end > self->version.get()) {
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
if (reply.rep.end > version.get()) {
co_await delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask());
} else {
wait(delay(0, g_network->getCurrentTask()));
co_await delay(0, g_network->getCurrentTask());
}
} catch (Error& e) {
self->activePeekStreams--;
TraceEvent(SevDebug, "LogRouterPeekStreamEnd", self->dbgid)
activePeekStreams--;
TraceEvent(SevDebug, "LogRouterPeekStreamEnd", dbgid)
.errorUnsuppressed(e)
.detail("Tag", req.tag)
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress())
@ -708,7 +730,7 @@ ACTOR Future<Void> logRouterPeekStream(LogRouterData* self, TLogPeekStreamReques
if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
return Void();
co_return;
} else {
throw;
}
@ -716,11 +738,11 @@ ACTOR Future<Void> logRouterPeekStream(LogRouterData* self, TLogPeekStreamReques
}
}
ACTOR Future<Void> cleanupPeekTrackers(LogRouterData* self) {
Future<Void> LogRouterData::cleanupPeekTrackers() {
loop {
double minTimeUntilExpiration = SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME;
auto it = self->peekTracker.begin();
while (it != self->peekTracker.end()) {
auto it = peekTracker.begin();
while (it != peekTracker.end()) {
double timeUntilExpiration = it->second.lastUpdate + SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME - now();
if (timeUntilExpiration < 1.0e-6) {
for (auto seq : it->second.sequence_version) {
@ -728,18 +750,18 @@ ACTOR Future<Void> cleanupPeekTrackers(LogRouterData* self) {
seq.second.sendError(timed_out());
}
}
it = self->peekTracker.erase(it);
it = peekTracker.erase(it);
} else {
minTimeUntilExpiration = std::min(minTimeUntilExpiration, timeUntilExpiration);
++it;
}
}
wait(delay(minTimeUntilExpiration));
co_await delay(minTimeUntilExpiration);
}
}
ACTOR Future<Void> logRouterPop(LogRouterData* self, TLogPopRequest req) {
Future<Void> logRouterPop(LogRouterData* self, TLogPopRequest req) {
auto tagData = self->getTagData(req.tag);
if (!tagData) {
tagData = self->createTagData(req.tag, req.to, req.durableKnownCommittedVersion);
@ -747,11 +769,11 @@ ACTOR Future<Void> logRouterPop(LogRouterData* self, TLogPopRequest req) {
DebugLogTraceEvent("LogRouterPop", self->dbgid).detail("Tag", req.tag.toString()).detail("PopVersion", req.to);
tagData->popped = req.to;
tagData->durableKnownCommittedVersion = req.durableKnownCommittedVersion;
wait(tagData->eraseMessagesBefore(req.to, self, TaskPriority::TLogPop));
co_await tagData->eraseMessagesBefore(req.to, TaskPriority::TLogPop);
}
state Version minPopped = std::numeric_limits<Version>::max();
state Version minKnownCommittedVersion = std::numeric_limits<Version>::max();
Version minPopped = std::numeric_limits<Version>::max();
Version minKnownCommittedVersion = std::numeric_limits<Version>::max();
for (auto it : self->tag_data) {
if (it) {
minPopped = std::min(it->popped, minPopped);
@ -761,7 +783,7 @@ ACTOR Future<Void> logRouterPop(LogRouterData* self, TLogPopRequest req) {
while (!self->messageBlocks.empty() && self->messageBlocks.front().first < minPopped) {
self->messageBlocks.pop_front();
wait(yield(TaskPriority::TLogPop));
co_await yield(TaskPriority::TLogPop);
}
self->poppedVersion = std::min(minKnownCommittedVersion, self->minKnownCommittedVersion);
@ -771,49 +793,54 @@ ACTOR Future<Void> logRouterPop(LogRouterData* self, TLogPopRequest req) {
}
req.reply.send(Void());
self->minPopped.set(std::max(minPopped, self->minPopped.get()));
return Void();
}
ACTOR Future<Void> logRouterCore(TLogInterface interf,
InitializeLogRouterRequest req,
Reference<AsyncVar<ServerDBInfo> const> db) {
state LogRouterData logRouterData(interf.id(), req);
state PromiseStream<Future<Void>> addActor;
state Future<Void> error = actorCollection(addActor.getFuture());
state Future<Void> dbInfoChange = Void();
Future<Void> logRouterCore(TLogInterface interf,
InitializeLogRouterRequest req,
Reference<AsyncVar<ServerDBInfo> const> db) {
LogRouterData logRouterData(interf.id(), req);
PromiseStream<Future<Void>> addActor;
Future<Void> error = actorCollection(addActor.getFuture());
Future<Void> dbInfoChange = Void();
addActor.send(pullAsyncData(&logRouterData));
addActor.send(cleanupPeekTrackers(&logRouterData));
addActor.send(logRouterData.pullAsyncData());
addActor.send(logRouterData.cleanupPeekTrackers());
addActor.send(traceRole(Role::LOG_ROUTER, interf.id()));
loop choose {
when(wait(dbInfoChange)) {
dbInfoChange = db->onChange();
logRouterData.allowPops = db->get().recoveryState == RecoveryState::FULLY_RECOVERED &&
db->get().recoveryCount >= req.recoveryCount;
logRouterData.logSystem->set(ILogSystem::fromServerDBInfo(logRouterData.dbgid, db->get(), true));
}
when(TLogPeekRequest req = waitNext(interf.peekMessages.getFuture())) {
addActor.send(logRouterPeekMessages(
req.reply, &logRouterData, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, req.sequence));
}
when(TLogPeekStreamRequest req = waitNext(interf.peekStreamMessages.getFuture())) {
TraceEvent(SevDebug, "LogRouterPeekStream", logRouterData.dbgid)
.detail("Tag", req.tag)
.detail("Token", interf.peekStreamMessages.getEndpoint().token);
addActor.send(logRouterPeekStream(&logRouterData, req));
}
when(TLogPopRequest req = waitNext(interf.popMessages.getFuture())) {
// Request from remote tLog to pop data from LR
addActor.send(logRouterPop(&logRouterData, req));
}
when(wait(error)) {}
loop {
co_await Choose()
.When(dbInfoChange,
[&](const Void&) {
dbInfoChange = db->onChange();
logRouterData.allowPops = db->get().recoveryState == RecoveryState::FULLY_RECOVERED &&
db->get().recoveryCount >= req.recoveryCount;
logRouterData.logSystem->set(ILogSystem::fromServerDBInfo(logRouterData.dbgid, db->get(), true));
})
.When(interf.peekMessages.getFuture(),
[&](const TLogPeekRequest& req) {
addActor.send(logRouterData.logRouterPeekMessages(
req.reply, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, req.sequence));
})
.When(interf.peekStreamMessages.getFuture(),
[&](const TLogPeekStreamRequest& req) {
TraceEvent(SevDebug, "LogRouterPeekStream", logRouterData.dbgid)
.detail("Tag", req.tag)
.detail("Token", interf.peekStreamMessages.getEndpoint().token);
addActor.send(logRouterData.logRouterPeekStream(req));
})
.When(interf.popMessages.getFuture(),
[&](const TLogPopRequest& req) {
// Request from remote tLog to pop data from LR
addActor.send(logRouterPop(&logRouterData, req));
})
.When(error, [](const Void&) {})
.run();
}
}
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo> const> db,
uint64_t recoveryCount,
TLogInterface myInterface) {
Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo> const> db,
uint64_t recoveryCount,
TLogInterface myInterface) {
loop {
bool isDisplaced =
((db->get().recoveryCount > recoveryCount && db->get().recoveryState != RecoveryState::UNINITIALIZED) ||
@ -822,30 +849,34 @@ ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo> const> db,
if (isDisplaced) {
throw worker_removed();
}
wait(db->onChange());
co_await db->onChange();
}
}
ACTOR Future<Void> logRouter(TLogInterface interf,
InitializeLogRouterRequest req,
Reference<AsyncVar<ServerDBInfo> const> db) {
Future<Void> logRouter(TLogInterface interf,
InitializeLogRouterRequest req,
Reference<AsyncVar<ServerDBInfo> const> db) {
try {
TraceEvent("LogRouterStart", interf.id())
.detail("Start", req.startVersion)
.detail("Tag", req.routerTag.toString())
.detail("Localities", req.tLogLocalities.size())
.detail("Locality", req.locality);
state Future<Void> core = logRouterCore(interf, req, db);
loop choose {
when(wait(core)) {
return Void();
Future<Void> core = logRouterCore(interf, req, db);
loop {
bool shouldExit = false;
co_await Choose()
.When(core, [&](const Void&) { shouldExit = true; })
.When(checkRemoved(db, req.recoveryCount, interf), [](const Void&) { /* do nothing */ })
.run();
if (shouldExit) {
co_return;
}
when(wait(checkRemoved(db, req.recoveryCount, interf))) {}
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled || e.code() == error_code_worker_removed) {
TraceEvent("LogRouterTerminated", interf.id()).errorUnsuppressed(e);
return Void();
co_return;
}
throw;
}

View File

@ -1274,9 +1274,9 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
ACTOR Future<Void> resolver(ResolverInterface resolver,
InitializeResolverRequest initReq,
Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> logRouter(TLogInterface interf,
InitializeLogRouterRequest req,
Reference<AsyncVar<ServerDBInfo> const> db);
Future<Void> logRouter(TLogInterface interf,
InitializeLogRouterRequest req,
Reference<AsyncVar<ServerDBInfo> const> db);
Future<Void> dataDistributor(DataDistributorInterface ddi, Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> ratekeeper(RatekeeperInterface rki, Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> consistencyScan(ConsistencyScanInterface csInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo);

View File

@ -21,6 +21,8 @@
#include "fdbrpc/DDSketch.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "flow/CodeProbe.h"
#include "flow/Coroutines.h"
#include "flow/DeterministicRandom.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -47,7 +49,22 @@ struct WatchesWorkload : TestWorkload {
tempRand.randomShuffle(nodeOrder);
}
Future<Void> setup(Database const& cx) override { return _setup(cx, this); }
Future<Void> setup(Database const& cx) override {
// return _setup(cx, this);
std::vector<Future<Void>> setupActors;
for (int i = 0; i < nodes; i++)
if (i % clientCount == clientId)
setupActors.push_back(
watcherInit(cx, keyForIndex(nodeOrder[i]), keyForIndex(nodeOrder[i + 1]), extraPerNode));
co_await waitForAll(setupActors);
for (int i = 0; i < nodes; i++)
if (i % clientCount == clientId)
clients.push_back(watcher(cx, keyForIndex(nodeOrder[i]), keyForIndex(nodeOrder[i + 1]), extraPerNode));
co_return;
}
Future<Void> start(Database const& cx) override {
if (clientId == 0)
@ -82,47 +99,22 @@ struct WatchesWorkload : TestWorkload {
return result;
}
ACTOR Future<Void> _setup(Database cx, WatchesWorkload* self) {
std::vector<Future<Void>> setupActors;
for (int i = 0; i < self->nodes; i++)
if (i % self->clientCount == self->clientId)
setupActors.push_back(self->watcherInit(cx,
self->keyForIndex(self->nodeOrder[i]),
self->keyForIndex(self->nodeOrder[i + 1]),
self->extraPerNode));
wait(waitForAll(setupActors));
for (int i = 0; i < self->nodes; i++)
if (i % self->clientCount == self->clientId)
self->clients.push_back(self->watcher(cx,
self->keyForIndex(self->nodeOrder[i]),
self->keyForIndex(self->nodeOrder[i + 1]),
self->extraPerNode));
return Void();
}
ACTOR static Future<Void> watcherInit(Database cx, Key watchKey, Key setKey, int extraNodes) {
state Transaction tr(cx);
state int extraLoc = 0;
Future<Void> watcherInit(Database cx, Key watchKey, Key setKey, int extraNodes) {
int extraLoc = 0;
while (extraLoc < extraNodes) {
try {
co_await cx.run([&](Transaction* tr) -> Future<Void> {
for (int i = 0; i < 1000 && extraLoc + i < extraNodes; i++) {
Key extraKey = KeyRef(watchKey.toString() + format("%d", extraLoc + i));
Value extraValue = ValueRef(std::string(100, '.'));
tr.set(extraKey, extraValue);
//TraceEvent("WatcherInitialSetupExtra").detail("Key", printable(extraKey)).detail("Value", printable(extraValue));
tr->set(extraKey, extraValue);
// TraceEvent("WatcherInitialSetupExtra").detail("Key", extraKey).detail("Value", extraValue);
}
wait(tr.commit());
co_await tr->commit();
extraLoc += 1000;
//TraceEvent("WatcherInitialSetup").detail("Watch", printable(watchKey)).detail("Ver", tr.getCommittedVersion());
} catch (Error& e) {
//TraceEvent("WatcherInitialSetupError").error(e).detail("ExtraLoc", extraLoc);
wait(tr.onError(e));
}
CODE_PROBE(true, "Watches workload initial setup");
// TraceEvent("WatcherInitialSetup").detail("Watch", watchKey).detail("Ver", tr->getCommittedVersion());
});
}
return Void();
}
ACTOR static Future<Void> watcher(Database cx, Key watchKey, Key setKey, int extraNodes) {
@ -177,87 +169,76 @@ struct WatchesWorkload : TestWorkload {
}
}
ACTOR static Future<Void> watchesWorker(Database cx, WatchesWorkload* self) {
state Key startKey = self->keyForIndex(self->nodeOrder[0]);
state Key endKey = self->keyForIndex(self->nodeOrder[self->nodes]);
state Optional<Value> expectedValue;
state Optional<Value> startValue;
state double startTime = now();
state double chainStartTime;
Future<Void> watchesWorker(Database cx, WatchesWorkload* self) {
Key startKey = self->keyForIndex(self->nodeOrder[0]);
Key endKey = self->keyForIndex(self->nodeOrder[self->nodes]);
Optional<Value> expectedValue;
Optional<Value> startValue;
double startTime = now();
double chainStartTime;
loop {
state Transaction tr(cx);
state bool isValue = deterministicRandom()->random01() > 0.5;
state Value assignedValue = Value(deterministicRandom()->randomUniqueID().toString());
state bool firstAttempt = true;
loop {
try {
wait(success(tr.getReadVersion()));
Optional<Value> _startValue = wait(tr.get(startKey));
if (firstAttempt) {
startValue = _startValue;
firstAttempt = false;
}
expectedValue = Optional<Value>();
if (startValue.present()) {
if (isValue)
expectedValue = assignedValue;
} else
expectedValue = assignedValue;
if (expectedValue.present())
tr.set(startKey, expectedValue.get());
else
tr.clear(startKey);
wait(tr.commit());
break;
} catch (Error& e) {
wait(tr.onError(e));
bool isValue = deterministicRandom()->random01() > 0.5;
Value assignedValue = Value(deterministicRandom()->randomUniqueID().toString());
bool firstAttempt = true;
co_await cx.run([&](Transaction* tr) -> Future<Void> {
co_await tr->getReadVersion();
Optional<Value> _startValue = co_await tr->get(startKey);
if (firstAttempt) {
startValue = _startValue;
firstAttempt = false;
}
}
expectedValue = Optional<Value>();
if (startValue.present()) {
if (isValue)
expectedValue = assignedValue;
} else
expectedValue = assignedValue;
if (expectedValue.present())
tr->set(startKey, expectedValue.get());
else
tr->clear(startKey);
co_await tr->commit();
CODE_PROBE(expectedValue.present(), "watches workload set a key");
CODE_PROBE(!expectedValue.present(), "watches workload clear a key");
co_return;
});
chainStartTime = now();
firstAttempt = true;
loop {
state Transaction tr2(cx);
state bool finished = false;
loop {
try {
state Optional<Value> endValue = wait(tr2.get(endKey));
if (endValue == expectedValue) {
finished = true;
break;
}
if (!firstAttempt || endValue != startValue) {
TraceEvent(SevError, "WatcherError")
.detail("FirstAttempt", firstAttempt)
.detail("StartValue", printable(startValue))
.detail("EndValue", printable(endValue))
.detail("ExpectedValue", printable(expectedValue))
.detail("EndVersion", tr2.getReadVersion().get());
}
state Future<Void> watchFuture = tr2.watch(makeReference<Watch>(endKey, startValue));
wait(tr2.commit());
wait(watchFuture);
firstAttempt = false;
break;
} catch (Error& e) {
wait(tr2.onError(e));
bool finished = false;
while (!finished) {
co_await cx.run([&](Transaction* tr2) -> Future<Void> {
Optional<Value> endValue = co_await tr2->get(endKey);
if (endValue == expectedValue) {
finished = true;
co_return;
}
}
if (finished)
break;
if (!firstAttempt || endValue != startValue) {
TraceEvent(SevError, "WatcherError")
.detail("FirstAttempt", firstAttempt)
.detail("StartValue", printable(startValue))
.detail("EndValue", printable(endValue))
.detail("ExpectedValue", printable(expectedValue))
.detail("EndVersion", tr2->getReadVersion().get());
}
Future<Void> watchFuture = tr2->watch(makeReference<Watch>(endKey, startValue));
co_await tr2->commit();
co_await watchFuture;
CODE_PROBE(true, "watcher workload watch fired");
firstAttempt = false;
});
}
self->cycleLatencies.addSample(now() - chainStartTime);
++self->cycles;
if (g_network->isSimulated())
wait(delay(deterministicRandom()->random01() < 0.5 ? 0 : deterministicRandom()->random01() * 60));
co_await delay(deterministicRandom()->random01() < 0.5 ? 0 : deterministicRandom()->random01() * 60);
if (now() - startTime > self->testDuration)
break;
}
return Void();
}
};