disable streaming peek for localities < 0

This commit is contained in:
Xiaoxi Wang 2021-07-28 14:11:25 -07:00
parent c6b0de1264
commit 12d4f5c261
9 changed files with 53 additions and 25 deletions

View File

@ -41,8 +41,8 @@ typedef UID SpanID;
enum {
tagLocalitySpecial = -1, // tag with this locality means it is invalidTag (id=0), txsTag (id=1), or cacheTag (id=2)
tagLocalityLogRouter = -2,
tagLocalityRemoteLog = -3, // tag created by log router for remote tLogs
tagLocalityUpgraded = -4,
tagLocalityRemoteLog = -3, // tag created by log router for remote (aka. not in Primary DC) tLogs
tagLocalityUpgraded = -4, // tlogs with old log format
tagLocalitySatellite = -5,
tagLocalityLogRouterMapped = -6, // The pseudo tag used by log routers to pop the real LogRouter tag (i.e., -2)
tagLocalityTxs = -7,

View File

@ -446,6 +446,7 @@ ACTOR Future<TLogPeekReply> peekLogRouter(LogRouterData* self,
Tag tag,
bool returnIfBlocked = false,
bool reqOnlySpilled = false,
bool streamReply = false,
Optional<std::pair<UID, int>> sequence = Optional<std::pair<UID, int>>()) {
state BinaryWriter messages(Unversioned());
state int sequenceNum = -1;
@ -518,7 +519,12 @@ ACTOR Future<TLogPeekReply> peekLogRouter(LogRouterData* self,
sequenceData.send(std::make_pair(begin, reqOnlySpilled));
}
}
throw no_action_needed(); // we've already replied in the past
if (streamReply) {
// for streaming reply, we skip the popped part
begin = std::min(poppedVer, self->startVersion);
} else {
throw no_action_needed(); // we've already replied in the past
}
}
Version endVersion = self->version.get() + 1;
@ -568,18 +574,20 @@ ACTOR Future<Void> logRouterPeekStream(LogRouterData* self, TLogPeekStreamReques
state TLogPeekStreamReply reply;
try {
wait(req.reply.onReady() &&
store(reply.rep, peekLogRouter(self, req.begin, req.tag, req.returnIfBlocked, onlySpilled)));
store(reply.rep, peekLogRouter(self, req.begin, req.tag, req.returnIfBlocked, onlySpilled, true)));
req.reply.send(reply);
begin = reply.rep.end;
onlySpilled = reply.rep.onlySpilled;
wait(delay(0, g_network->getCurrentTask()));
if (reply.rep.end > self->version.get()) {
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
} else {
wait(delay(0, g_network->getCurrentTask()));
}
} catch (Error& e) {
self->activePeekStreams--;
TraceEvent(SevDebug, "LogRouterPeekStreamEnd", self->dbgid).error(e, true);
if (e.code() == error_code_no_action_needed) {
req.reply.sendError(end_of_stream());
} else if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
TraceEvent(SevDebug, "LogRouterPeekStreamEnd", self->dbgid).error(e, true);
if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);
return Void();
} else {
@ -592,7 +600,7 @@ ACTOR Future<Void> logRouterPeekStream(LogRouterData* self, TLogPeekStreamReques
ACTOR Future<Void> logRouterPeekMessages(LogRouterData* self, TLogPeekRequest req) {
try {
TLogPeekReply reply =
wait(peekLogRouter(self, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, req.sequence));
wait(peekLogRouter(self, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, false, req.sequence));
req.reply.send(reply);
} catch (Error& e) {
if (e.code() == error_code_no_action_needed) {
@ -689,9 +697,11 @@ ACTOR Future<Void> logRouterCore(TLogInterface interf,
addActor.send(logRouterPeekMessages(&logRouterData, req));
}
when(TLogPeekStreamRequest req = waitNext(interf.peekStreamMessages.getFuture())) {
TraceEvent(SevDebug, "LogRouterPeekStream", logRouterData.dbgid)
// addActor.send(logRouterPeekStream(&logRouterData, req));
// FIXME: temporarily disable streaming peek from LogRouter
TraceEvent(SevError, "LogRouterPeekStream", logRouterData.dbgid)
.detail("Token", interf.peekStreamMessages.getEndpoint().token);
addActor.send(logRouterPeekStream(&logRouterData, req));
req.reply.sendError(operation_failed());
}
when(TLogPopRequest req = waitNext(interf.popMessages.getFuture())) {
// Request from remote tLog to pop data from LR

View File

@ -358,7 +358,7 @@ ACTOR Future<Void> serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, T
// NOTE: delay is needed here since TLog need to be scheduled to response if there are TLog and SS
// on the same machine
wait(delay(0));
wait(delay(0, taskID));
return Void();
}
}
@ -417,12 +417,14 @@ Future<Void> ILogSystem::ServerPeekCursor::getMore(TaskPriority taskID) {
if (hasMessage() && !parallelGetMore)
return Void();
if (!more.isValid() || more.isReady()) {
more = serverPeekStreamGetMore(this, taskID);
/*if (parallelGetMore || onlySpilled || futureResults.size()) {
more = serverPeekParallelGetMore(this, taskID);
// TODO: remove locality check when log router support streaming peek
if (usePeekStream && tag.locality >= 0) {
more = serverPeekStreamGetMore(this, taskID);
} else if (parallelGetMore || onlySpilled || futureResults.size()) {
more = serverPeekParallelGetMore(this, taskID);
} else {
more = serverPeekGetMore(this, taskID);
}*/
more = serverPeekGetMore(this, taskID);
}
}
return more;
}

View File

@ -1138,7 +1138,11 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
req.reply.send(reply);
begin = reply.rep.end;
onlySpilled = reply.rep.onlySpilled;
wait(delay(0, g_network->getCurrentTask()));
if (reply.rep.end > logData->version.get()) {
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
} else {
wait(delay(0, g_network->getCurrentTask()));
}
} catch (Error& e) {
self->activePeekStreams--;
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true);

View File

@ -1460,7 +1460,11 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
req.reply.send(reply);
begin = reply.rep.end;
onlySpilled = reply.rep.onlySpilled;
wait(delay(0, g_network->getCurrentTask()));
if (reply.rep.end > logData->version.get()) {
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
} else {
wait(delay(0, g_network->getCurrentTask()));
}
} catch (Error& e) {
self->activePeekStreams--;
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true);

View File

@ -1892,7 +1892,11 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
req.reply.send(reply);
begin = reply.rep.end;
onlySpilled = reply.rep.onlySpilled;
wait(delay(0, g_network->getCurrentTask()));
if (reply.rep.end > logData->version.get()) {
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
} else {
wait(delay(0, g_network->getCurrentTask()));
}
} catch (Error& e) {
self->activePeekStreams--;
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true);

View File

@ -639,9 +639,9 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
wait(delay(5.0));
// The quiet database check (which runs at the end of every test) will always time out due to active data movement.
// To get around this, quiet Database will disable the perpetual wiggle in the setup phase.
printf("------- 1 -------\n");
printf("Set perpetual_storage_wiggle=0 ...\n");
wait(setPerpetualStorageWiggle(cx, false, LockAware::True));
printf("------- 2 -------\n");
printf("Set perpetual_storage_wiggle=0 Done.\n");
// Require 3 consecutive successful quiet database checks spaced 2 second apart
state int numSuccesses = 0;

View File

@ -253,7 +253,7 @@ public:
// Refer to FDBTypes.h::TLogVersion. Defaults to the maximum supported version.
int maxTLogVersion = TLogVersion::MAX_SUPPORTED;
// Set true to simplify simulation configs for easier debugging
bool simpleConfig = true;
bool simpleConfig = false;
Optional<bool> generateFearless, buggify;
Optional<int> datacenters, desiredTLogCount, commitProxyCount, grvProxyCount, resolverCount, storageEngineType,
stderrSeverity, machineCount, processesPerMachine, coordinators;

View File

@ -1935,7 +1935,11 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
req.reply.send(reply);
begin = reply.rep.end;
onlySpilled = reply.rep.onlySpilled;
wait(delay(0, g_network->getCurrentTask()));
if (reply.rep.end > logData->version.get()) {
wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()));
} else {
wait(delay(0, g_network->getCurrentTask()));
}
} catch (Error& e) {
self->activePeekStreams--;
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true);