Merge branch 'release-6.1'

This commit is contained in:
Evan Tschannen 2019-03-30 17:59:28 -07:00
commit a46620fbee
8 changed files with 42 additions and 15 deletions

View File

@ -57,7 +57,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( KEY_SIZE_LIMIT, 1e4 );
init( SYSTEM_KEY_SIZE_LIMIT, 3e4 );
init( VALUE_SIZE_LIMIT, 1e5 );
init( SPLIT_KEY_SIZE_LIMIT, KEY_SIZE_LIMIT/2 ); if( randomize && BUGGIFY ) SPLIT_KEY_SIZE_LIMIT = KEY_SIZE_LIMIT - serverKeysPrefixFor(UID()).size() - 1;
init( SPLIT_KEY_SIZE_LIMIT, KEY_SIZE_LIMIT/2 ); if( randomize && BUGGIFY ) SPLIT_KEY_SIZE_LIMIT = KEY_SIZE_LIMIT - 31;//serverKeysPrefixFor(UID()).size() - 1;
init( METADATA_VERSION_CACHE_SIZE, 1000 );
init( MAX_BATCH_SIZE, 1000 ); if( randomize && BUGGIFY ) MAX_BATCH_SIZE = 1;

View File

@ -312,7 +312,6 @@ public:
}
if (logServerSet->size() < (addingDegraded == 0 ? desired : required)) {
TraceEvent(SevWarn,"GWFTADTooFew", id).detail("Fitness", fitness).detail("Processes", logServerSet->size()).detail("Required", required).detail("TLogPolicy", policy->info()).detail("DesiredLogs", desired).detail("AddingDegraded", addingDegraded);
}
else if (logServerSet->size() == required || logServerSet->size() <= desired) {
if (logServerSet->validate(policy)) {
@ -1925,7 +1924,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
if (req.ratekeeperInterf.present()) {
if((self->recruitingRatekeeperID.present() && self->recruitingRatekeeperID.get() != req.ratekeeperInterf.get().id()) ||
self->clusterControllerDcId != w.locality.dcId()) {
TraceEvent("CC_HaltRatekeeper", self->id).detail("RKID", req.ratekeeperInterf.get().id())
TraceEvent("CC_HaltRegisteringRatekeeper", self->id).detail("RKID", req.ratekeeperInterf.get().id())
.detail("DcID", printable(self->clusterControllerDcId))
.detail("ReqDcID", printable(w.locality.dcId()))
.detail("RecruitingRKID", self->recruitingRatekeeperID.present() ? self->recruitingRatekeeperID.get() : UID());
@ -1935,7 +1934,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
const auto& ratekeeper = self->db.serverInfo->get().ratekeeper;
TraceEvent("CC_RegisterRatekeeper", self->id).detail("RKID", rki.id());
if (ratekeeper.present() && ratekeeper.get().id() != rki.id() && self->id_worker.count(ratekeeper.get().locality.processId())) {
TraceEvent("CC_HaltRatekeeper", self->id).detail("RKID", ratekeeper.get().id())
TraceEvent("CC_HaltPreviousRatekeeper", self->id).detail("RKID", ratekeeper.get().id())
.detail("DcID", printable(self->clusterControllerDcId))
.detail("ReqDcID", printable(w.locality.dcId()))
.detail("RecruitingRKID", self->recruitingRatekeeperID.present() ? self->recruitingRatekeeperID.get() : UID());
@ -2519,8 +2518,8 @@ ACTOR Future<Void> monitorDataDistributor(ClusterControllerData *self) {
loop {
if ( self->db.serverInfo->get().distributor.present() ) {
wait( waitFailureClient( self->db.serverInfo->get().distributor.get().waitFailure, SERVER_KNOBS->DD_FAILURE_TIME ) );
TraceEvent("ClusterController", self->id)
.detail("DataDistributorDied", self->db.serverInfo->get().distributor.get().id());
TraceEvent("CC_DataDistributorDied", self->id)
.detail("DistributorId", self->db.serverInfo->get().distributor.get().id());
self->db.clearInterf(ProcessClass::DataDistributorClass);
} else {
self->recruitingDistributor = true;
@ -2555,17 +2554,16 @@ ACTOR Future<Void> startRatekeeper(ClusterControllerData *self) {
}
self->recruitingRatekeeperID = req.reqId;
TraceEvent("ClusterController_RecruitRatekeeper", self->id).detail("Addr", worker.interf.address()).detail("RKID", req.reqId);
TraceEvent("CC_RecruitRatekeeper", self->id).detail("Addr", worker.interf.address()).detail("RKID", req.reqId);
ErrorOr<RatekeeperInterface> interf = wait( worker.interf.ratekeeper.getReplyUnlessFailedFor(req, SERVER_KNOBS->WAIT_FOR_RATEKEEPER_JOIN_DELAY, 0) );
if (interf.present()) {
TraceEvent("ClusterController_RatekeeperRecruited", self->id).detail("Addr", worker.interf.address());
self->recruitRatekeeper.set(false);
self->recruitingRatekeeperID = interf.get().id();
const auto& ratekeeper = self->db.serverInfo->get().ratekeeper;
TraceEvent("CC_RegisterRatekeeper", self->id).detail("RKID", interf.get().id());
TraceEvent("CC_RatekeeperRecruited", self->id).detail("Addr", worker.interf.address()).detail("RKID", interf.get().id());
if (ratekeeper.present() && ratekeeper.get().id() != interf.get().id() && self->id_worker.count(ratekeeper.get().locality.processId())) {
TraceEvent("CC_HaltRatekeeper", self->id).detail("RKID", ratekeeper.get().id())
TraceEvent("CC_HaltRatekeeperAfterRecruit", self->id).detail("RKID", ratekeeper.get().id())
.detail("DcID", printable(self->clusterControllerDcId));
self->id_worker[ratekeeper.get().locality.processId()].haltRatekeeper = brokenPromiseToNever(ratekeeper.get().haltRatekeeper.getReply(HaltRatekeeperRequest(self->id)));
}
@ -2577,7 +2575,7 @@ ACTOR Future<Void> startRatekeeper(ClusterControllerData *self) {
}
}
catch (Error& e) {
TraceEvent("ClusterController_RatekeeperRecruitError", self->id).error(e);
TraceEvent("CC_RatekeeperRecruitError", self->id).error(e);
if ( e.code() != error_code_no_more_servers ) {
throw;
}
@ -2595,7 +2593,7 @@ ACTOR Future<Void> monitorRatekeeper(ClusterControllerData *self) {
if ( self->db.serverInfo->get().ratekeeper.present() && !self->recruitRatekeeper.get() ) {
choose {
when(wait(waitFailureClient( self->db.serverInfo->get().ratekeeper.get().waitFailure, SERVER_KNOBS->RATEKEEPER_FAILURE_TIME ))) {
TraceEvent("ClusterController_RatekeeperDied", self->id)
TraceEvent("CC_RatekeeperDied", self->id)
.detail("RKID", self->db.serverInfo->get().ratekeeper.get().id());
self->db.clearInterf(ProcessClass::RatekeeperClass);
}

View File

@ -1466,7 +1466,7 @@ ACTOR Future<Void> watchDegraded(TLogData* self) {
return Void();
}
ACTOR Future<Void> doQueueCommit( TLogData* self, Reference<LogData> logData ) {
ACTOR Future<Void> doQueueCommit( TLogData* self, Reference<LogData> logData, std::vector<Reference<LogData>> missingFinalCommit ) {
state Version ver = logData->version.get();
state Version commitNumber = self->queueCommitBegin+1;
state Version knownCommittedVersion = logData->knownCommittedVersion;
@ -1507,6 +1507,11 @@ ACTOR Future<Void> doQueueCommit( TLogData* self, Reference<LogData> logData ) {
logData->queueCommittedVersion.set(ver);
self->queueCommitEnd.set(commitNumber);
for(auto& it : missingFinalCommit) {
TraceEvent("TLogCommitMissingFinalCommit", self->dbgid).detail("LogId", logData->logId).detail("Version", it->version.get()).detail("QueueVer", it->queueCommittedVersion.get());
TEST(true); //A TLog was replaced before having a chance to commit its queue
it->queueCommittedVersion.set(it->version.get());
}
return Void();
}
@ -1515,10 +1520,13 @@ ACTOR Future<Void> commitQueue( TLogData* self ) {
loop {
int foundCount = 0;
state std::vector<Reference<LogData>> missingFinalCommit;
for(auto it : self->id_data) {
if(!it.second->stopped) {
logData = it.second;
foundCount++;
} else if(it.second->version.get() > std::max(it.second->queueCommittingVersion, it.second->queueCommittedVersion.get())) {
missingFinalCommit.push_back(it.second);
}
}
@ -1544,7 +1552,8 @@ ACTOR Future<Void> commitQueue( TLogData* self ) {
while( self->queueCommitBegin != self->queueCommitEnd.get() && !self->largeDiskQueueCommitBytes.get() ) {
wait( self->queueCommitEnd.whenAtLeast(self->queueCommitBegin) || self->largeDiskQueueCommitBytes.onChange() );
}
self->sharedActors.send(doQueueCommit(self, logData));
self->sharedActors.send(doQueueCommit(self, logData, missingFinalCommit));
missingFinalCommit.clear();
}
when(wait(self->newLogData.onTrigger())) {}
}

View File

@ -241,7 +241,7 @@ struct ArenaBlock : NonCopyable, ThreadSafeReferenceCounted<ArenaBlock>
b->bigUsed = sizeof(ArenaBlock);
if(FLOW_KNOBS && g_nondeterministic_random && g_nondeterministic_random->random01() < (reqSize / FLOW_KNOBS->HUGE_ARENA_LOGGING_BYTES)) {
TraceEvent("HugeArenaSample").detail("Size", reqSize).backtrace();
hugeArenaSample(reqSize);
}
g_hugeArenaMemory += reqSize;

View File

@ -24,6 +24,7 @@
#include "flow/Trace.h"
#include "flow/Error.h"
#include "flow/Knobs.h"
#include "flow/flow.h"
#include <cstdint>
#include <unordered_map>
@ -83,6 +84,22 @@ void setFastAllocatorThreadInitFunction( ThreadInitFunction f ) {
int64_t g_hugeArenaMemory = 0;
double hugeArenaLastLogged = 0;
std::map<std::string, std::pair<int,int>> hugeArenaTraces;
void hugeArenaSample(int size) {
auto& info = hugeArenaTraces[platform::get_backtrace()];
info.first++;
info.second+=size;
if(now() - hugeArenaLastLogged > FLOW_KNOBS->HUGE_ARENA_LOGGING_INTERVAL) {
for(auto& it : hugeArenaTraces) {
TraceEvent("HugeArenaSample").detail("Count", it.second.first).detail("Size", it.second.second).detail("Backtrace", it.first);
}
hugeArenaLastLogged = now();
hugeArenaTraces.clear();
}
}
#ifdef ALLOC_INSTRUMENTATION
INIT_SEG std::map<const char*, AllocInstrInfo> allocInstr;
INIT_SEG std::unordered_map<int64_t, std::pair<uint32_t, size_t>> memSample;

View File

@ -152,6 +152,7 @@ private:
};
extern int64_t g_hugeArenaMemory;
void hugeArenaSample(int size);
void releaseAllThreadMagazines();
int64_t getTotalUnusedAllocatedMemory();
void setFastAllocatorThreadInitFunction( void (*)() ); // The given function will be called at least once in each thread that allocates from a FastAllocator. Currently just one such function is tracked.

View File

@ -49,6 +49,7 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) {
init( RANDOMSEED_RETRY_LIMIT, 4 );
init( FAST_ALLOC_LOGGING_BYTES, 10e6 );
init( HUGE_ARENA_LOGGING_BYTES, 100e6 );
init( HUGE_ARENA_LOGGING_INTERVAL, 5.0 );
//connectionMonitor
init( CONNECTION_MONITOR_LOOP_TIME, isSimulated ? 0.75 : 1.0 ); if( randomize && BUGGIFY ) CONNECTION_MONITOR_LOOP_TIME = 6.0;

View File

@ -63,6 +63,7 @@ public:
int RANDOMSEED_RETRY_LIMIT;
double FAST_ALLOC_LOGGING_BYTES;
double HUGE_ARENA_LOGGING_BYTES;
double HUGE_ARENA_LOGGING_INTERVAL;
//slow task profiling
double SLOWTASK_PROFILING_INTERVAL;