Merge pull request #1422 from etschannen/feature-network-degraded

Degrade processes which show signs of asymmetric network failures
This commit is contained in:
A.J. Beamon 2019-04-08 09:58:37 -07:00 committed by GitHub
commit bb0bb597ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 86 additions and 18 deletions

View File

@ -434,7 +434,8 @@
"triple",
"three_datacenter",
"three_datacenter_fallback",
"three_data_hall"
"three_data_hall",
"three_data_hall_fallback"
]},
"regions":[{
"datacenters":[{

View File

@ -239,6 +239,8 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
result["redundancy_mode"] = "triple";
} else if( tLogReplicationFactor == 4 && storageTeamSize == 3 && tlogInfo == "data_hall^2 x zoneid^2 x 1" && storageInfo == "data_hall^3 x 1" ) {
result["redundancy_mode"] = "three_data_hall";
} else if( tLogReplicationFactor == 4 && storageTeamSize == 2 && tlogInfo == "data_hall^2 x zoneid^2 x 1" && storageInfo == "data_hall^2 x 1" ) {
result["redundancy_mode"] = "three_data_hall_fallback";
} else {
customRedundancy = true;
}

View File

@ -145,6 +145,13 @@ std::map<std::string, std::string> configForToken( std::string const& mode ) {
tLogPolicy = Reference<IReplicationPolicy>(new PolicyAcross(2, "data_hall",
Reference<IReplicationPolicy>(new PolicyAcross(2, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())))
));
} else if(mode == "three_data_hall_fallback") {
redundancy="2";
log_replicas="4";
storagePolicy = Reference<IReplicationPolicy>(new PolicyAcross(2, "data_hall", Reference<IReplicationPolicy>(new PolicyOne())));
tLogPolicy = Reference<IReplicationPolicy>(new PolicyAcross(2, "data_hall",
Reference<IReplicationPolicy>(new PolicyAcross(2, "zoneid", Reference<IReplicationPolicy>(new PolicyOne())))
));
} else
redundancySpecified = false;
if (redundancySpecified) {
@ -510,6 +517,12 @@ ConfigureAutoResult parseConfig( StatusObject const& status ) {
} else if( result.old_replication == "three_datacenter_fallback" ) {
storage_replication = 4;
log_replication = 4;
} else if( result.old_replication == "three_data_hall" ) {
storage_replication = 3;
log_replication = 4;
} else if( result.old_replication == "three_data_hall_fallback" ) {
storage_replication = 2;
log_replication = 4;
} else
return ConfigureAutoResult();

View File

@ -458,7 +458,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"triple",
"three_datacenter",
"three_datacenter_fallback",
"three_data_hall"
"three_data_hall",
"three_data_hall_fallback"
]},
"regions":[{
"datacenters":[{
@ -667,7 +668,8 @@ const KeyRef JSONSchemas::clusterConfigurationSchema = LiteralStringRef(R"config
"triple",
"three_datacenter",
"three_datacenter_fallback",
"three_data_hall"
"three_data_hall",
"three_data_hall_fallback"
]},
"regions":[{
"datacenters":[{

View File

@ -138,6 +138,12 @@ FailureStatus SimpleFailureMonitor::getState( Endpoint const& endpoint ) {
}
}
FailureStatus SimpleFailureMonitor::getState( NetworkAddress const& address ) {
auto a = addressStatus.find(address);
if (a == addressStatus.end()) return FailureStatus();
else return a->second;
}
bool SimpleFailureMonitor::onlyEndpointFailed( Endpoint const& endpoint ) {
if(!endpointKnownFailed.get(endpoint))
return false;

View File

@ -84,6 +84,9 @@ public:
// Returns the currently known status for the endpoint
virtual FailureStatus getState( Endpoint const& endpoint ) = 0;
// Returns the currently known status for the address
virtual FailureStatus getState( NetworkAddress const& address ) = 0;
// Only use this function when the endpoint is known to be failed
virtual void endpointNotFound( Endpoint const& ) = 0;
@ -130,6 +133,7 @@ public:
virtual Future<Void> onStateChanged( Endpoint const& endpoint );
virtual FailureStatus getState( Endpoint const& endpoint );
virtual FailureStatus getState( NetworkAddress const& address );
virtual Future<Void> onDisconnectOrFailure( Endpoint const& endpoint );
virtual bool onlyEndpointFailed( Endpoint const& endpoint );
virtual bool permanentlyFailed( Endpoint const& endpoint );

View File

@ -149,7 +149,9 @@ public:
lastIncompatibleMessage(0),
transportId(transportId),
numIncompatibleConnections(0)
{}
{
degraded = Reference<AsyncVar<bool>>( new AsyncVar<bool>(false) );
}
~TransportData();
@ -170,6 +172,8 @@ public:
NetworkAddressList localAddresses;
std::vector<Future<Void>> listeners;
std::unordered_map<NetworkAddress, struct Peer*> peers;
std::unordered_map<NetworkAddress, std::pair<double, double>> closedPeers;
Reference<AsyncVar<bool>> degraded;
bool warnAlwaysForLargePacket;
// These declarations must be in exactly this order
@ -483,6 +487,17 @@ struct Peer : NonCopyable {
TraceEvent(ok ? SevInfo : SevWarnAlways, "IncompatibleConnectionClosed", conn ? conn->getDebugID() : UID()).error(e, true).suppressFor(1.0).detail("PeerAddr", self->destination);
}
if(self->destination.isPublic() && IFailureMonitor::failureMonitor().getState(self->destination).isAvailable()) {
auto& it = self->transport->closedPeers[self->destination];
if(now() - it.second > FLOW_KNOBS->TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY) {
it.first = now();
} else if(now() - it.first > FLOW_KNOBS->TOO_MANY_CONNECTIONS_CLOSED_TIMEOUT) {
TraceEvent(SevWarnAlways, "TooManyConnectionsClosed", conn ? conn->getDebugID() : UID()).suppressFor(5.0).detail("PeerAddr", self->destination);
self->transport->degraded->set(true);
}
it.second = now();
}
if (conn) {
conn->close();
conn = Reference<IConnection>();
@ -1100,6 +1115,10 @@ int FlowTransport::getEndpointCount() {
return -1;
}
Reference<AsyncVar<bool>> FlowTransport::getDegraded() {
return self->degraded;
}
bool FlowTransport::incompatibleOutgoingConnectionsPresent() {
return self->numIncompatibleConnections > 0;
}

View File

@ -143,6 +143,9 @@ public:
// Makes PacketID "unreliable" (either the data or a connection close event will be delivered
// eventually). It can still be used safely to send a reply to a "reliable" request.
Reference<AsyncVar<bool>> getDegraded();
// This async var will be set to true when the process cannot connect to a public network address that the failure monitor thinks is healthy.
void sendUnreliable( ISerializeSource const& what, const Endpoint& destination, bool openConnection = true );// { cancelReliable(sendReliable(what,destination)); }
int getEndpointCount();

View File

@ -75,7 +75,6 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( DISK_QUEUE_FILE_SHRINK_BYTES, 100<<20 ); // BUGGIFYd per file within the DiskQueue
init( TLOG_DEGRADED_DELAY_COUNT, 5 );
init( TLOG_DEGRADED_DURATION, 5.0 );
init( TLOG_DEGRADED_RESET_INTERVAL, 48*60*60 ); if ( randomize && BUGGIFY ) TLOG_DEGRADED_RESET_INTERVAL = 10;
// Data distribution queue
init( HEALTH_POLL_TIME, 1.0 );
@ -417,6 +416,9 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
//Worker
init( WORKER_LOGGING_INTERVAL, 5.0 );
init( INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING, 5.0 );
init( DEGRADED_RESET_INTERVAL, 24*60*60 ); if ( randomize && BUGGIFY ) DEGRADED_RESET_INTERVAL = 10;
init( DEGRADED_WARNING_LIMIT, 1 );
init( DEGRADED_WARNING_RESET_DELAY, 7*24*60*60 );
// Test harness
init( WORKER_POLL_DELAY, 1.0 );

View File

@ -79,7 +79,6 @@ public:
int64_t DISK_QUEUE_FILE_SHRINK_BYTES; // When we shrink the disk queue, by how many bytes should it shrink?
int TLOG_DEGRADED_DELAY_COUNT;
double TLOG_DEGRADED_DURATION;
double TLOG_DEGRADED_RESET_INTERVAL;
// Data distribution queue
double HEALTH_POLL_TIME;
@ -356,6 +355,9 @@ public:
//Worker
double WORKER_LOGGING_INTERVAL;
double INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING;
double DEGRADED_RESET_INTERVAL;
double DEGRADED_WARNING_LIMIT;
double DEGRADED_WARNING_RESET_DELAY;
// Test harness
double WORKER_POLL_DELAY;

View File

@ -625,7 +625,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
state WorkerCache<InitializeStorageReply> storageCache;
state Reference<AsyncVar<ServerDBInfo>> dbInfo( new AsyncVar<ServerDBInfo>(ServerDBInfo()) );
state Future<Void> metricsLogger;
state Reference<AsyncVar<bool>> degraded( new AsyncVar<bool>(false) );
state Reference<AsyncVar<bool>> degraded = FlowTransport::transport().getDegraded();
// tLogFnForOptions() can return a function that doesn't correspond with the FDB version that the
// TLogVersion represents. This can be done if the newer TLog doesn't support a requested option.
// As (store type, spill type) can map to the same TLogFn across multiple TLogVersions, we need to
@ -652,7 +652,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
}
}
errorForwarders.add( resetAfter(degraded, SERVER_KNOBS->TLOG_DEGRADED_RESET_INTERVAL, false));
errorForwarders.add( resetAfter(degraded, SERVER_KNOBS->DEGRADED_RESET_INTERVAL, false, SERVER_KNOBS->DEGRADED_WARNING_LIMIT, SERVER_KNOBS->DEGRADED_WARNING_RESET_DELAY, "DegradedReset"));
errorForwarders.add( loadedPonger( interf.debugPing.getFuture() ) );
errorForwarders.add( waitFailureServer( interf.waitFailure.getFuture() ) );
errorForwarders.add( monitorServerDBInfo( ccInterface, connFile, locality, dbInfo ) );

View File

@ -64,6 +64,8 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) {
init( RECONNECTION_TIME_GROWTH_RATE, 1.2 );
init( RECONNECTION_RESET_TIME, 5.0 );
init( CONNECTION_ACCEPT_DELAY, 0.01 );
init( TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY, 5.0 );
init( TOO_MANY_CONNECTIONS_CLOSED_TIMEOUT, 20.0 );
init( TLS_CERT_REFRESH_DELAY_SECONDS, 12*60*60 );

View File

@ -95,6 +95,8 @@ public:
int64_t BUGGIFY_SIM_PAGE_CACHE_64K;
int MAX_EVICT_ATTEMPTS;
double PAGE_CACHE_TRUNCATE_LOOKUP_FRACTION;
double TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY;
int TOO_MANY_CONNECTIONS_CLOSED_TIMEOUT;
//AsyncFileKAIO
int MAX_OUTSTANDING;

View File

@ -615,7 +615,7 @@ void getNetworkTraffic(const IPAddress& ip, uint64_t& bytesSent, uint64_t& bytes
snmp_stream >> retransSegs;
}
void getMachineLoad(uint64_t& idleTime, uint64_t& totalTime) {
void getMachineLoad(uint64_t& idleTime, uint64_t& totalTime, bool logDetails) {
INJECT_FAULT( platform_error, "getMachineLoad" ); // Even though this function doesn't throw errors, the equivalents for other platforms do, and since all of our simulation testing is on Linux...
std::ifstream stat_stream("/proc/stat", std::ifstream::in);
@ -628,7 +628,7 @@ void getMachineLoad(uint64_t& idleTime, uint64_t& totalTime) {
totalTime = t_user+t_nice+t_system+t_idle+t_iowait+t_irq+t_softirq+t_steal+t_guest;
idleTime = t_idle+t_iowait;
if( !DEBUG_DETERMINISM )
if( !DEBUG_DETERMINISM && logDetails )
TraceEvent("MachineLoadDetail").detail("User", t_user).detail("Nice", t_nice).detail("System", t_system).detail("Idle", t_idle).detail("IOWait", t_iowait).detail("IRQ", t_irq).detail("SoftIRQ", t_softirq).detail("Steal", t_steal).detail("Guest", t_guest);
}
@ -818,7 +818,7 @@ void getNetworkTraffic(const IPAddress& ip, uint64_t& bytesSent, uint64_t& bytes
free(buf);
}
void getMachineLoad(uint64_t& idleTime, uint64_t& totalTime) {
void getMachineLoad(uint64_t& idleTime, uint64_t& totalTime, bool logDetails) {
INJECT_FAULT( platform_error, "getMachineLoad" );
mach_msg_type_number_t count = HOST_CPU_LOAD_INFO_COUNT;
host_cpu_load_info_data_t r_load;
@ -1103,7 +1103,7 @@ void initPdhStrings(SystemStatisticsState *state, std::string dataFolder) {
}
#endif
SystemStatistics getSystemStatistics(std::string dataFolder, const IPAddress* ip, SystemStatisticsState** statState) {
SystemStatistics getSystemStatistics(std::string dataFolder, const IPAddress* ip, SystemStatisticsState** statState, bool logDetails) {
if( (*statState) == NULL )
(*statState) = new SystemStatisticsState();
SystemStatistics returnStats;
@ -1238,7 +1238,7 @@ SystemStatistics getSystemStatistics(std::string dataFolder, const IPAddress* ip
uint64_t clockIdleTime = (*statState)->lastClockIdleTime;
uint64_t clockTotalTime = (*statState)->lastClockTotalTime;
getMachineLoad(clockIdleTime, clockTotalTime);
getMachineLoad(clockIdleTime, clockTotalTime, logDetails);
returnStats.machineCPUSeconds = clockTotalTime - (*statState)->lastClockTotalTime != 0 ? ( 1 - ((clockIdleTime - (*statState)->lastClockIdleTime) / ((double)(clockTotalTime - (*statState)->lastClockTotalTime)))) * returnStats.elapsed : 0;
(*statState)->lastClockIdleTime = clockIdleTime;
(*statState)->lastClockTotalTime = clockTotalTime;

View File

@ -247,7 +247,7 @@ struct SystemStatisticsState;
struct IPAddress;
SystemStatistics getSystemStatistics(std::string dataFolder, const IPAddress* ip, SystemStatisticsState **statState);
SystemStatistics getSystemStatistics(std::string dataFolder, const IPAddress* ip, SystemStatisticsState **statState, bool logDetails);
double getProcessorTimeThread();
@ -272,7 +272,7 @@ void getNetworkTraffic(uint64_t& bytesSent, uint64_t& bytesReceived, uint64_t& o
void getDiskStatistics(std::string const& directory, uint64_t& currentIOs, uint64_t& busyTicks, uint64_t& reads, uint64_t& writes, uint64_t& writeSectors);
void getMachineLoad(uint64_t& idleTime, uint64_t& totalTime);
void getMachineLoad(uint64_t& idleTime, uint64_t& totalTime, bool logDetails);
double timer(); // Returns the system real time clock with high precision. May jump around when system time is adjusted!
double timer_monotonic(); // Returns a high precision monotonic clock which is adjusted to be kind of similar to timer() at startup, but might not be a globally accurate time.

View File

@ -45,7 +45,7 @@ SystemStatistics getSystemStatistics() {
static StatisticsState statState = StatisticsState();
const IPAddress ipAddr = machineState.ip.present() ? machineState.ip.get() : IPAddress();
return getSystemStatistics(
machineState.folder.present() ? machineState.folder.get() : "", &ipAddr, &statState.systemState);
machineState.folder.present() ? machineState.folder.get() : "", &ipAddr, &statState.systemState, false);
}
#define TRACEALLOCATOR( size ) TraceEvent("MemSample").detail("Count", FastAllocator<size>::getApproximateMemoryUnused()/size).detail("TotalSize", FastAllocator<size>::getApproximateMemoryUnused()).detail("SampleCount", 1).detail("Hash", "FastAllocatedUnused" #size ).detail("Bt", "na")
@ -54,7 +54,7 @@ SystemStatistics getSystemStatistics() {
SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *statState, bool machineMetrics) {
const IPAddress ipAddr = machineState.ip.present() ? machineState.ip.get() : IPAddress();
SystemStatistics currentStats = getSystemStatistics(machineState.folder.present() ? machineState.folder.get() : "",
&ipAddr, &statState->systemState);
&ipAddr, &statState->systemState, true);
NetworkData netData;
netData.init();
if (!DEBUG_DETERMINISM && currentStats.initialized) {

View File

@ -776,13 +776,23 @@ Future<Void> setAfter( Reference<AsyncVar<T>> var, double time, T val ) {
}
ACTOR template <class T>
Future<Void> resetAfter( Reference<AsyncVar<T>> var, double time, T val ) {
Future<Void> resetAfter( Reference<AsyncVar<T>> var, double time, T val, int warningLimit = -1, double warningResetDelay = 0, const char* context = NULL ) {
state bool isEqual = var->get() == val;
state Future<Void> resetDelay = isEqual ? Never() : delay(time);
state int resetCount = 0;
state double lastReset = now();
loop {
choose {
when( wait( resetDelay ) ) {
var->set( val );
if(now() - lastReset > warningResetDelay) {
resetCount = 0;
}
resetCount++;
if(context && warningLimit >= 0 && resetCount > warningLimit) {
TraceEvent(SevWarnAlways, context).detail("ResetCount", resetCount).detail("LastReset", now() - lastReset);
}
lastReset = now();
isEqual = true;
resetDelay = Never();
}