format code
This commit is contained in:
parent
2df0474fec
commit
fd74a16f35
|
@ -64,8 +64,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( TLOG_MESSAGE_BLOCK_BYTES, 10e6 );
|
||||
init( TLOG_MESSAGE_BLOCK_OVERHEAD_FACTOR, double(TLOG_MESSAGE_BLOCK_BYTES) / (TLOG_MESSAGE_BLOCK_BYTES - MAX_MESSAGE_SIZE) ); //1.0121466709838096006362758832473
|
||||
init( PEEK_TRACKER_EXPIRATION_TIME, 600 ); if( randomize && BUGGIFY ) PEEK_TRACKER_EXPIRATION_TIME = deterministicRandom()->coinflip() ? 0.1 : 120;
|
||||
init( PEEK_USEING_STREAMING, true );
|
||||
init( PARALLEL_GET_MORE_REQUESTS, 32 ); if( randomize && BUGGIFY ) PARALLEL_GET_MORE_REQUESTS = 2;
|
||||
init( PEEK_USEING_STREAMING, true );
|
||||
init( PARALLEL_GET_MORE_REQUESTS, 32 ); if( randomize && BUGGIFY ) PARALLEL_GET_MORE_REQUESTS = 2;
|
||||
init( MULTI_CURSOR_PRE_FETCH_LIMIT, 10 );
|
||||
init( MAX_QUEUE_COMMIT_BYTES, 15e6 ); if( randomize && BUGGIFY ) MAX_QUEUE_COMMIT_BYTES = 5000;
|
||||
init( DESIRED_OUTSTANDING_MESSAGES, 5000 ); if( randomize && BUGGIFY ) DESIRED_OUTSTANDING_MESSAGES = deterministicRandom()->randomInt(0,100);
|
||||
|
|
|
@ -41,8 +41,8 @@ public:
|
|||
// often, so that versions always advance smoothly
|
||||
|
||||
// TLogs
|
||||
bool PEEK_USEING_STREAMING;
|
||||
double TLOG_TIMEOUT; // tlog OR commit proxy failure - master's reaction time
|
||||
bool PEEK_USEING_STREAMING;
|
||||
double TLOG_TIMEOUT; // tlog OR commit proxy failure - master's reaction time
|
||||
double TLOG_SLOW_REJOIN_WARN_TIMEOUT_SECS; // Warns if a tlog takes too long to rejoin
|
||||
double RECOVERY_TLOG_SMART_QUORUM_DELAY; // smaller might be better for bug amplification
|
||||
double TLOG_STORAGE_MIN_UPDATE_INTERVAL;
|
||||
|
|
|
@ -441,10 +441,11 @@ Version poppedVersion(LogRouterData* self, Tag tag) {
|
|||
return tagData->popped;
|
||||
}
|
||||
|
||||
// TODO: enable streaming peek log from log router
|
||||
// This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover
|
||||
ACTOR Future<Void> logRouterPeekStream(LogRouterData* self, TLogPeekStreamRequest req) {
|
||||
return Void();
|
||||
}
|
||||
// ACTOR Future<Void> logRouterPeekStream(LogRouterData* self, TLogPeekStreamRequest req) {
|
||||
// return Void();
|
||||
// }
|
||||
|
||||
ACTOR Future<Void> logRouterPeekMessages(LogRouterData* self, TLogPeekRequest req) {
|
||||
state BinaryWriter messages(Unversioned());
|
||||
|
@ -652,7 +653,7 @@ ACTOR Future<Void> logRouterCore(TLogInterface interf,
|
|||
}
|
||||
when(TLogPeekStreamRequest req = waitNext(interf.peekStreamMessages.getFuture())) {
|
||||
// addActor.send(logRouterPeekStream(&logRouterData, req));
|
||||
// FIXME: temporarily disable streaming peek from LogRouter
|
||||
// FIXME: currently LogRouter doesn't support streaming peek request
|
||||
TraceEvent(SevError, "LogRouterPeekStream", logRouterData.dbgid)
|
||||
.detail("Token", interf.peekStreamMessages.getEndpoint().token);
|
||||
req.reply.sendError(operation_failed());
|
||||
|
|
|
@ -52,7 +52,8 @@ ILogSystem::ServerPeekCursor::ServerPeekCursor(Reference<AsyncVar<OptionalInterf
|
|||
: interf(interf), tag(tag), rd(results.arena, results.messages, Unversioned()), messageVersion(begin), end(end),
|
||||
poppedVersion(0), hasMsg(false), randomID(deterministicRandom()->randomUniqueID()),
|
||||
returnIfBlocked(returnIfBlocked), onlySpilled(false), parallelGetMore(parallelGetMore), sequence(0), lastReset(0),
|
||||
resetCheck(Void()), slowReplies(0), fastReplies(0), unknownReplies(0), usePeekStream(SERVER_KNOBS->PEEK_USEING_STREAMING) {
|
||||
resetCheck(Void()), slowReplies(0), fastReplies(0), unknownReplies(0),
|
||||
usePeekStream(SERVER_KNOBS->PEEK_USEING_STREAMING) {
|
||||
this->results.maxKnownVersion = 0;
|
||||
this->results.minKnownCommittedVersion = 0;
|
||||
TraceEvent("SPC_Starting", randomID)
|
||||
|
@ -361,8 +362,8 @@ ACTOR Future<Void> serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, T
|
|||
.detail("End", res.end)
|
||||
.detail("Popped", res.popped.present() ? res.popped.get() : 0);
|
||||
|
||||
// NOTE: delay is needed here since TLog need to be scheduled to response if there are TLog and SS
|
||||
// on the same machine
|
||||
// NOTE: delay is necessary here since ReplyPromiseStream delivers reply on high priority. Here we
|
||||
// change the priority to the intended one.
|
||||
wait(delay(0, taskID));
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -83,8 +83,8 @@ struct TLogInterface {
|
|||
streams.push_back(disablePopRequest.getReceiver());
|
||||
streams.push_back(enablePopRequest.getReceiver());
|
||||
streams.push_back(snapRequest.getReceiver());
|
||||
streams.push_back(peekStreamMessages.getReceiver(TaskPriority::TLogPeek));
|
||||
FlowTransport::transport().addEndpoints(streams);
|
||||
streams.push_back(peekStreamMessages.getReceiver(TaskPriority::TLogPeek));
|
||||
FlowTransport::transport().addEndpoints(streams);
|
||||
}
|
||||
|
||||
template <class Ar>
|
||||
|
|
|
@ -33,7 +33,6 @@
|
|||
// clang-format off
|
||||
ERROR( success, 0, "Success" )
|
||||
ERROR( end_of_stream, 1, "End of stream" )
|
||||
ERROR( no_action_needed, 2, "No action needed" )
|
||||
ERROR( operation_failed, 1000, "Operation failed")
|
||||
ERROR( wrong_shard_server, 1001, "Shard is not available from this server")
|
||||
ERROR( operation_obsolete, 1002, "Operation result no longer necessary")
|
||||
|
|
Loading…
Reference in New Issue