diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index 25e31d1134..1aa1c31273 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -41,8 +41,8 @@ typedef UID SpanID; enum { tagLocalitySpecial = -1, // tag with this locality means it is invalidTag (id=0), txsTag (id=1), or cacheTag (id=2) tagLocalityLogRouter = -2, - tagLocalityRemoteLog = -3, // tag created by log router for remote tLogs - tagLocalityUpgraded = -4, + tagLocalityRemoteLog = -3, // tag created by log router for remote (aka. not in Primary DC) tLogs + tagLocalityUpgraded = -4, // tlogs with old log format tagLocalitySatellite = -5, tagLocalityLogRouterMapped = -6, // The pseudo tag used by log routers to pop the real LogRouter tag (i.e., -2) tagLocalityTxs = -7, diff --git a/fdbserver/LogRouter.actor.cpp b/fdbserver/LogRouter.actor.cpp index f0c634dfae..945fca63be 100644 --- a/fdbserver/LogRouter.actor.cpp +++ b/fdbserver/LogRouter.actor.cpp @@ -446,6 +446,7 @@ ACTOR Future peekLogRouter(LogRouterData* self, Tag tag, bool returnIfBlocked = false, bool reqOnlySpilled = false, + bool streamReply = false, Optional> sequence = Optional>()) { state BinaryWriter messages(Unversioned()); state int sequenceNum = -1; @@ -518,7 +519,12 @@ ACTOR Future peekLogRouter(LogRouterData* self, sequenceData.send(std::make_pair(begin, reqOnlySpilled)); } } - throw no_action_needed(); // we've already replied in the past + if (streamReply) { + // for streaming reply, we skip the popped part + begin = std::min(poppedVer, self->startVersion); + } else { + throw no_action_needed(); // we've already replied in the past + } } Version endVersion = self->version.get() + 1; @@ -568,18 +574,20 @@ ACTOR Future logRouterPeekStream(LogRouterData* self, TLogPeekStreamReques state TLogPeekStreamReply reply; try { wait(req.reply.onReady() && - store(reply.rep, peekLogRouter(self, req.begin, req.tag, req.returnIfBlocked, onlySpilled))); + store(reply.rep, peekLogRouter(self, req.begin, req.tag, req.returnIfBlocked, onlySpilled, true))); req.reply.send(reply); begin = reply.rep.end; onlySpilled = reply.rep.onlySpilled; - wait(delay(0, g_network->getCurrentTask())); + if (reply.rep.end > self->version.get()) { + wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); + } else { + wait(delay(0, g_network->getCurrentTask())); + } } catch (Error& e) { self->activePeekStreams--; - TraceEvent(SevDebug, "LogRouterPeekStreamEnd", self->dbgid).error(e, true); - if (e.code() == error_code_no_action_needed) { - req.reply.sendError(end_of_stream()); - } else if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { + TraceEvent(SevDebug, "LogRouterPeekStreamEnd", self->dbgid).error(e, true); + if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { req.reply.sendError(e); return Void(); } else { @@ -592,7 +600,7 @@ ACTOR Future logRouterPeekStream(LogRouterData* self, TLogPeekStreamReques ACTOR Future logRouterPeekMessages(LogRouterData* self, TLogPeekRequest req) { try { TLogPeekReply reply = - wait(peekLogRouter(self, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, req.sequence)); + wait(peekLogRouter(self, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, false, req.sequence)); req.reply.send(reply); } catch (Error& e) { if (e.code() == error_code_no_action_needed) { @@ -689,9 +697,11 @@ ACTOR Future logRouterCore(TLogInterface interf, addActor.send(logRouterPeekMessages(&logRouterData, req)); } when(TLogPeekStreamRequest req = waitNext(interf.peekStreamMessages.getFuture())) { - TraceEvent(SevDebug, "LogRouterPeekStream", logRouterData.dbgid) + // addActor.send(logRouterPeekStream(&logRouterData, req)); + // FIXME: temporarily disable streaming peek from LogRouter + TraceEvent(SevError, "LogRouterPeekStream", logRouterData.dbgid) .detail("Token", interf.peekStreamMessages.getEndpoint().token); - addActor.send(logRouterPeekStream(&logRouterData, req)); + req.reply.sendError(operation_failed()); } when(TLogPopRequest req = waitNext(interf.popMessages.getFuture())) { // Request from remote tLog to pop data from LR diff --git a/fdbserver/LogSystemPeekCursor.actor.cpp b/fdbserver/LogSystemPeekCursor.actor.cpp index 3668b52d08..b097d88c8f 100644 --- a/fdbserver/LogSystemPeekCursor.actor.cpp +++ b/fdbserver/LogSystemPeekCursor.actor.cpp @@ -358,7 +358,7 @@ ACTOR Future serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, T // NOTE: delay is needed here since TLog need to be scheduled to response if there are TLog and SS // on the same machine - wait(delay(0)); + wait(delay(0, taskID)); return Void(); } } @@ -417,12 +417,14 @@ Future ILogSystem::ServerPeekCursor::getMore(TaskPriority taskID) { if (hasMessage() && !parallelGetMore) return Void(); if (!more.isValid() || more.isReady()) { - more = serverPeekStreamGetMore(this, taskID); - /*if (parallelGetMore || onlySpilled || futureResults.size()) { - more = serverPeekParallelGetMore(this, taskID); + // TODO: remove locality check when log router support streaming peek + if (usePeekStream && tag.locality >= 0) { + more = serverPeekStreamGetMore(this, taskID); + } else if (parallelGetMore || onlySpilled || futureResults.size()) { + more = serverPeekParallelGetMore(this, taskID); } else { - more = serverPeekGetMore(this, taskID); - }*/ + more = serverPeekGetMore(this, taskID); + } } return more; } diff --git a/fdbserver/OldTLogServer_4_6.actor.cpp b/fdbserver/OldTLogServer_4_6.actor.cpp index 97d9b8efa2..e5645223c8 100644 --- a/fdbserver/OldTLogServer_4_6.actor.cpp +++ b/fdbserver/OldTLogServer_4_6.actor.cpp @@ -1138,7 +1138,11 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref req.reply.send(reply); begin = reply.rep.end; onlySpilled = reply.rep.onlySpilled; - wait(delay(0, g_network->getCurrentTask())); + if (reply.rep.end > logData->version.get()) { + wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); + } else { + wait(delay(0, g_network->getCurrentTask())); + } } catch (Error& e) { self->activePeekStreams--; TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true); diff --git a/fdbserver/OldTLogServer_6_0.actor.cpp b/fdbserver/OldTLogServer_6_0.actor.cpp index c9ec74354d..bdd2a64bc3 100644 --- a/fdbserver/OldTLogServer_6_0.actor.cpp +++ b/fdbserver/OldTLogServer_6_0.actor.cpp @@ -1460,7 +1460,11 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref req.reply.send(reply); begin = reply.rep.end; onlySpilled = reply.rep.onlySpilled; - wait(delay(0, g_network->getCurrentTask())); + if (reply.rep.end > logData->version.get()) { + wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); + } else { + wait(delay(0, g_network->getCurrentTask())); + } } catch (Error& e) { self->activePeekStreams--; TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true); diff --git a/fdbserver/OldTLogServer_6_2.actor.cpp b/fdbserver/OldTLogServer_6_2.actor.cpp index d4fcc595d0..16077d648f 100644 --- a/fdbserver/OldTLogServer_6_2.actor.cpp +++ b/fdbserver/OldTLogServer_6_2.actor.cpp @@ -1892,7 +1892,11 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref req.reply.send(reply); begin = reply.rep.end; onlySpilled = reply.rep.onlySpilled; - wait(delay(0, g_network->getCurrentTask())); + if (reply.rep.end > logData->version.get()) { + wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); + } else { + wait(delay(0, g_network->getCurrentTask())); + } } catch (Error& e) { self->activePeekStreams--; TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true); diff --git a/fdbserver/QuietDatabase.actor.cpp b/fdbserver/QuietDatabase.actor.cpp index f5e5443ca0..305543f8c9 100644 --- a/fdbserver/QuietDatabase.actor.cpp +++ b/fdbserver/QuietDatabase.actor.cpp @@ -639,9 +639,9 @@ ACTOR Future waitForQuietDatabase(Database cx, wait(delay(5.0)); // The quiet database check (which runs at the end of every test) will always time out due to active data movement. // To get around this, quiet Database will disable the perpetual wiggle in the setup phase. - printf("------- 1 -------\n"); + printf("Set perpetual_storage_wiggle=0 ...\n"); wait(setPerpetualStorageWiggle(cx, false, LockAware::True)); - printf("------- 2 -------\n"); + printf("Set perpetual_storage_wiggle=0 Done.\n"); // Require 3 consecutive successful quiet database checks spaced 2 second apart state int numSuccesses = 0; diff --git a/fdbserver/SimulatedCluster.actor.cpp b/fdbserver/SimulatedCluster.actor.cpp index bcab98b9fc..5f656f13f1 100644 --- a/fdbserver/SimulatedCluster.actor.cpp +++ b/fdbserver/SimulatedCluster.actor.cpp @@ -253,7 +253,7 @@ public: // Refer to FDBTypes.h::TLogVersion. Defaults to the maximum supported version. int maxTLogVersion = TLogVersion::MAX_SUPPORTED; // Set true to simplify simulation configs for easier debugging - bool simpleConfig = true; + bool simpleConfig = false; Optional generateFearless, buggify; Optional datacenters, desiredTLogCount, commitProxyCount, grvProxyCount, resolverCount, storageEngineType, stderrSeverity, machineCount, processesPerMachine, coordinators; diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 665a5cfc53..87f7770b5c 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1935,7 +1935,11 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref req.reply.send(reply); begin = reply.rep.end; onlySpilled = reply.rep.onlySpilled; - wait(delay(0, g_network->getCurrentTask())); + if (reply.rep.end > logData->version.get()) { + wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); + } else { + wait(delay(0, g_network->getCurrentTask())); + } } catch (Error& e) { self->activePeekStreams--; TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true);