merge 5.2 into 6.0

This commit is contained in:
Evan Tschannen 2018-07-08 20:14:06 -07:00
commit 5a2cb3037b
5 changed files with 38 additions and 10 deletions

View File

@ -264,10 +264,12 @@ struct Peer : NonCopyable {
// Throw away the current unsent list, dropping the reference count on each PacketBuffer that accounts for presence in the unsent list // Throw away the current unsent list, dropping the reference count on each PacketBuffer that accounts for presence in the unsent list
unsent.discardAll(); unsent.discardAll();
// Compact reliable packets into a new unsent range // If there are reliable packets, compact reliable packets into a new unsent range
PacketBuffer* pb = unsent.getWriteBuffer(); if(!reliable.empty()) {
pb = reliable.compact(pb, NULL); PacketBuffer* pb = unsent.getWriteBuffer();
unsent.setWriteBuffer(pb); pb = reliable.compact(pb, NULL);
unsent.setWriteBuffer(pb);
}
} }
void onIncomingConnection( Reference<IConnection> conn, Future<Void> reader ) { void onIncomingConnection( Reference<IConnection> conn, Future<Void> reader ) {

View File

@ -211,19 +211,31 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
loop choose { loop choose {
when ( GetLeaderRequest req = waitNext( interf.getLeader.getFuture() ) ) { when ( GetLeaderRequest req = waitNext( interf.getLeader.getFuture() ) ) {
if (currentNominee.present() && currentNominee.get().changeID != req.knownLeader) if (currentNominee.present() && currentNominee.get().changeID != req.knownLeader) {
req.reply.send( currentNominee.get() ); req.reply.send( currentNominee.get() );
else } else {
notify.push_back( req.reply ); notify.push_back( req.reply );
if(notify.size() > SERVER_KNOBS->MAX_NOTIFICATIONS) {
for(int i=0; i<notify.size(); i++)
notify[i].send( currentNominee.get() );
notify.clear();
}
}
} }
when ( CandidacyRequest req = waitNext( interf.candidacy.getFuture() ) ) { when ( CandidacyRequest req = waitNext( interf.candidacy.getFuture() ) ) {
//TraceEvent("CandidacyRequest").detail("Nominee", req.myInfo.changeID ); //TraceEvent("CandidacyRequest").detail("Nominee", req.myInfo.changeID );
availableCandidates.erase( LeaderInfo(req.prevChangeID) ); availableCandidates.erase( LeaderInfo(req.prevChangeID) );
availableCandidates.insert( req.myInfo ); availableCandidates.insert( req.myInfo );
if (currentNominee.present() && currentNominee.get().changeID != req.knownLeader) if (currentNominee.present() && currentNominee.get().changeID != req.knownLeader) {
req.reply.send( currentNominee.get() ); req.reply.send( currentNominee.get() );
else } else {
notify.push_back( req.reply ); notify.push_back( req.reply );
if(notify.size() > SERVER_KNOBS->MAX_NOTIFICATIONS) {
for(int i=0; i<notify.size(); i++)
notify[i].send( currentNominee.get() );
notify.clear();
}
}
} }
when (LeaderHeartbeatRequest req = waitNext( interf.leaderHeartbeat.getFuture() ) ) { when (LeaderHeartbeatRequest req = waitNext( interf.leaderHeartbeat.getFuture() ) ) {
//TODO: use notify to only send a heartbeat once per interval //TODO: use notify to only send a heartbeat once per interval
@ -237,6 +249,7 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
newInfo.serializedInfo = req.conn.toString(); newInfo.serializedInfo = req.conn.toString();
for(int i=0; i<notify.size(); i++) for(int i=0; i<notify.size(); i++)
notify[i].send( newInfo ); notify[i].send( newInfo );
notify.clear();
req.reply.send( Void() ); req.reply.send( Void() );
return Void(); return Void();
} }
@ -425,4 +438,4 @@ ACTOR Future<Void> coordinationServer(std::string dataFolder) {
TraceEvent("CoordinationServerError", myID).error(e, true); TraceEvent("CoordinationServerError", myID).error(e, true);
throw; throw;
} }
} }

View File

@ -209,6 +209,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
// Leader election // Leader election
bool longLeaderElection = randomize && BUGGIFY; bool longLeaderElection = randomize && BUGGIFY;
init( MAX_NOTIFICATIONS, 100000 );
init( CANDIDATE_MIN_DELAY, 0.05 ); init( CANDIDATE_MIN_DELAY, 0.05 );
init( CANDIDATE_MAX_DELAY, 1.0 ); init( CANDIDATE_MAX_DELAY, 1.0 );
init( CANDIDATE_GROWTH_RATE, 1.2 ); init( CANDIDATE_GROWTH_RATE, 1.2 );

View File

@ -155,6 +155,7 @@ public:
int64_t REPLACE_CONTENTS_BYTES; int64_t REPLACE_CONTENTS_BYTES;
// Leader election // Leader election
int MAX_NOTIFICATIONS;
double CANDIDATE_MIN_DELAY; double CANDIDATE_MIN_DELAY;
double CANDIDATE_MAX_DELAY; double CANDIDATE_MAX_DELAY;
double CANDIDATE_GROWTH_RATE; double CANDIDATE_GROWTH_RATE;

View File

@ -88,10 +88,18 @@ volatile int32_t FastAllocator<Size>::pageCount;
thread_local bool memSample_entered = false; thread_local bool memSample_entered = false;
#endif #endif
#ifdef ALLOC_INSTRUMENTATION_STDOUT
thread_local bool inRecordAllocation = false;
#endif
void recordAllocation( void *ptr, size_t size ) { void recordAllocation( void *ptr, size_t size ) {
#ifdef ALLOC_INSTRUMENTATION_STDOUT #ifdef ALLOC_INSTRUMENTATION_STDOUT
if( inRecordAllocation )
return;
inRecordAllocation = true;
std::string trace = platform::get_backtrace(); std::string trace = platform::get_backtrace();
printf("Alloc\t%p\t%d\t%s\n", ptr, size, platform::get_backtrace().c_str()); printf("Alloc\t%p\t%d\t%s\n", ptr, size, trace.c_str());
inRecordAllocation = false;
#endif #endif
#ifdef ALLOC_INSTRUMENTATION #ifdef ALLOC_INSTRUMENTATION
if( memSample_entered ) if( memSample_entered )
@ -143,7 +151,10 @@ void recordAllocation( void *ptr, size_t size ) {
void recordDeallocation( void *ptr ) { void recordDeallocation( void *ptr ) {
#ifdef ALLOC_INSTRUMENTATION_STDOUT #ifdef ALLOC_INSTRUMENTATION_STDOUT
if( inRecordAllocation )
return;
printf("Dealloc\t%p\n", ptr); printf("Dealloc\t%p\n", ptr);
inRecordAllocation = false;
#endif #endif
#ifdef ALLOC_INSTRUMENTATION #ifdef ALLOC_INSTRUMENTATION
if( memSample_entered ) // could this lead to deallocations not being recorded? if( memSample_entered ) // could this lead to deallocations not being recorded?