Merge pull request #3891 from etschannen/feature-reset-proxy-connections

Reset a proxy's network connection with the master or resolvers if it is too far behind
This commit is contained in:
Meng Xu 2020-10-12 11:21:24 -07:00 committed by GitHub
commit 89469921bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 38 additions and 4 deletions

View File

@ -229,9 +229,11 @@ ACTOR Future<Void> pingLatencyLogger(TransportData* self) {
.detail("MedianLatency", peer->pingLatencies.median()) .detail("MedianLatency", peer->pingLatencies.median())
.detail("P90Latency", peer->pingLatencies.percentile(0.90)) .detail("P90Latency", peer->pingLatencies.percentile(0.90))
.detail("Count", peer->pingLatencies.getPopulationSize()) .detail("Count", peer->pingLatencies.getPopulationSize())
.detail("BytesReceived", peer->bytesReceived - peer->lastLoggedBytesReceived); .detail("BytesReceived", peer->bytesReceived - peer->lastLoggedBytesReceived)
.detail("BytesSent", peer->bytesSent - peer->lastLoggedBytesSent);
peer->pingLatencies.clear(); peer->pingLatencies.clear();
peer->lastLoggedBytesReceived = peer->bytesReceived; peer->lastLoggedBytesReceived = peer->bytesReceived;
peer->lastLoggedBytesSent = peer->bytesSent;
wait(delay(FLOW_KNOBS->PING_LOGGING_INTERVAL)); wait(delay(FLOW_KNOBS->PING_LOGGING_INTERVAL));
} else if(it == self->orderedAddresses.begin()) { } else if(it == self->orderedAddresses.begin()) {
wait(delay(FLOW_KNOBS->PING_LOGGING_INTERVAL)); wait(delay(FLOW_KNOBS->PING_LOGGING_INTERVAL));
@ -427,6 +429,7 @@ ACTOR Future<Void> connectionWriter( Reference<Peer> self, Reference<IConnection
int sent = conn->write(self->unsent.getUnsent(), /* limit= */ FLOW_KNOBS->MAX_PACKET_SEND_BYTES); int sent = conn->write(self->unsent.getUnsent(), /* limit= */ FLOW_KNOBS->MAX_PACKET_SEND_BYTES);
if (sent) { if (sent) {
self->bytesSent += sent;
self->transport->bytesSent += sent; self->transport->bytesSent += sent;
self->unsent.sent(sent); self->unsent.sent(sent);
} }

View File

@ -123,16 +123,19 @@ struct Peer : public ReferenceCounted<Peer> {
int peerReferences; int peerReferences;
bool incompatibleProtocolVersionNewer; bool incompatibleProtocolVersionNewer;
int64_t bytesReceived; int64_t bytesReceived;
int64_t bytesSent;
double lastDataPacketSentTime; double lastDataPacketSentTime;
int outstandingReplies; int outstandingReplies;
ContinuousSample<double> pingLatencies; ContinuousSample<double> pingLatencies;
int64_t lastLoggedBytesReceived; int64_t lastLoggedBytesReceived;
int64_t lastLoggedBytesSent;
explicit Peer(TransportData* transport, NetworkAddress const& destination) explicit Peer(TransportData* transport, NetworkAddress const& destination)
: transport(transport), destination(destination), outgoingConnectionIdle(true), lastConnectTime(0.0), : transport(transport), destination(destination), outgoingConnectionIdle(true), lastConnectTime(0.0),
reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true), outstandingReplies(0), reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true), outstandingReplies(0),
incompatibleProtocolVersionNewer(false), peerReferences(-1), bytesReceived(0), lastDataPacketSentTime(now()), incompatibleProtocolVersionNewer(false), peerReferences(-1), bytesReceived(0), lastDataPacketSentTime(now()),
pingLatencies(destination.isPublic() ? FLOW_KNOBS->PING_SAMPLE_AMOUNT : 1), lastLoggedBytesReceived(0) {} pingLatencies(destination.isPublic() ? FLOW_KNOBS->PING_SAMPLE_AMOUNT : 1), lastLoggedBytesReceived(0),
bytesSent(0), lastLoggedBytesSent(0) {}
void send(PacketBuffer* pb, ReliablePacket* rp, bool firstUnsent); void send(PacketBuffer* pb, ReliablePacket* rp, bool firstUnsent);

View File

@ -342,6 +342,11 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( PROXY_COMPUTE_BUCKETS, 20000 ); init( PROXY_COMPUTE_BUCKETS, 20000 );
init( PROXY_COMPUTE_GROWTH_RATE, 0.01 ); init( PROXY_COMPUTE_GROWTH_RATE, 0.01 );
init( RESET_MASTER_BATCHES, 200 );
init( RESET_RESOLVER_BATCHES, 200 );
init( RESET_MASTER_DELAY, 300.0 );
init( RESET_RESOLVER_DELAY, 300.0 );
// Master Server // Master Server
// masterCommitter() in the master server will allow lower priority tasks (e.g. DataDistibution) // masterCommitter() in the master server will allow lower priority tasks (e.g. DataDistibution)
// by delay()ing for this amount of time between accepted batches of TransactionRequests. // by delay()ing for this amount of time between accepted batches of TransactionRequests.

View File

@ -287,6 +287,11 @@ public:
int PROXY_COMPUTE_BUCKETS; int PROXY_COMPUTE_BUCKETS;
double PROXY_COMPUTE_GROWTH_RATE; double PROXY_COMPUTE_GROWTH_RATE;
int RESET_MASTER_BATCHES;
int RESET_RESOLVER_BATCHES;
double RESET_MASTER_DELAY;
double RESET_RESOLVER_DELAY;
// Master Server // Master Server
double COMMIT_SLEEP_TIME; double COMMIT_SLEEP_TIME;
double MIN_BALANCE_TIME; double MIN_BALANCE_TIME;

View File

@ -258,6 +258,8 @@ struct ProxyCommitData {
NotifiedDouble lastCommitTime; NotifiedDouble lastCommitTime;
vector<double> commitComputePerOperation; vector<double> commitComputePerOperation;
double lastMasterReset;
double lastResolverReset;
//The tag related to a storage server rarely change, so we keep a vector of tags for each key range to be slightly more CPU efficient. //The tag related to a storage server rarely change, so we keep a vector of tags for each key range to be slightly more CPU efficient.
//When a tag related to a storage server does change, we empty out all of these vectors to signify they must be repopulated. //When a tag related to a storage server does change, we empty out all of these vectors to signify they must be repopulated.
@ -314,7 +316,9 @@ struct ProxyCommitData {
getConsistentReadVersion(getConsistentReadVersion), commit(commit), lastCoalesceTime(0), getConsistentReadVersion(getConsistentReadVersion), commit(commit), lastCoalesceTime(0),
localCommitBatchesStarted(0), locked(false), commitBatchInterval(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MIN), localCommitBatchesStarted(0), locked(false), commitBatchInterval(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MIN),
firstProxy(firstProxy), cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true)), db(db), firstProxy(firstProxy), cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true)), db(db),
singleKeyMutationEvent(LiteralStringRef("SingleKeyMutation")), commitBatchesMemBytesCount(0), lastTxsPop(0), lastStartCommit(0), lastCommitLatency(SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION), lastCommitTime(0) singleKeyMutationEvent(LiteralStringRef("SingleKeyMutation")), commitBatchesMemBytesCount(0),
lastTxsPop(0), lastStartCommit(0), lastCommitLatency(SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION),
lastCommitTime(0), lastMasterReset(now()), lastResolverReset(now())
{ {
commitComputePerOperation.resize(SERVER_KNOBS->PROXY_COMPUTE_BUCKETS,0.0); commitComputePerOperation.resize(SERVER_KNOBS->PROXY_COMPUTE_BUCKETS,0.0);
} }
@ -562,6 +566,12 @@ ACTOR Future<Void> commitBatch(
if (debugID.present()) if (debugID.present())
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.Before"); g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.Before");
if(localBatchNumber-self->latestLocalCommitBatchResolving.get()>SERVER_KNOBS->RESET_MASTER_BATCHES && now()-self->lastMasterReset>SERVER_KNOBS->RESET_MASTER_DELAY) {
TraceEvent(SevWarnAlways, "ResetMasterNetwork").detail("CurrentBatch", localBatchNumber).detail("InProcessBatch", self->latestLocalCommitBatchResolving.get());
FlowTransport::transport().resetConnection(self->master.address());
self->lastMasterReset=now();
}
/////// Phase 1: Pre-resolution processing (CPU bound except waiting for a version # which is separately pipelined and *should* be available by now (unless empty commit); ordered; currently atomic but could yield) /////// Phase 1: Pre-resolution processing (CPU bound except waiting for a version # which is separately pipelined and *should* be available by now (unless empty commit); ordered; currently atomic but could yield)
TEST(self->latestLocalCommitBatchResolving.get() < localBatchNumber-1); // Queuing pre-resolution commit processing TEST(self->latestLocalCommitBatchResolving.get() < localBatchNumber-1); // Queuing pre-resolution commit processing
wait(self->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber-1)); wait(self->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber-1));
@ -618,6 +628,14 @@ ACTOR Future<Void> commitBatch(
state vector<vector<int>> transactionResolverMap = std::move( requests.transactionResolverMap ); state vector<vector<int>> transactionResolverMap = std::move( requests.transactionResolverMap );
state Future<Void> releaseFuture = releaseResolvingAfter(self, releaseDelay, localBatchNumber); state Future<Void> releaseFuture = releaseResolvingAfter(self, releaseDelay, localBatchNumber);
if(localBatchNumber-self->latestLocalCommitBatchLogging.get()>SERVER_KNOBS->RESET_RESOLVER_BATCHES && now()-self->lastResolverReset>SERVER_KNOBS->RESET_RESOLVER_DELAY) {
TraceEvent(SevWarnAlways, "ResetResolverNetwork").detail("CurrentBatch", localBatchNumber).detail("InProcessBatch", self->latestLocalCommitBatchLogging.get());
for (int r = 0; r<self->resolvers.size(); r++) {
FlowTransport::transport().resetConnection(self->resolvers[r].address());
}
self->lastResolverReset=now();
}
/////// Phase 2: Resolution (waiting on the network; pipelined) /////// Phase 2: Resolution (waiting on the network; pipelined)
state vector<ResolveTransactionBatchReply> resolution = wait( getAll(replies) ); state vector<ResolveTransactionBatchReply> resolution = wait( getAll(replies) );

View File

@ -72,7 +72,7 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) {
init( USE_OBJECT_SERIALIZER, 1 ); init( USE_OBJECT_SERIALIZER, 1 );
init( TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY, 5.0 ); init( TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY, 5.0 );
init( TOO_MANY_CONNECTIONS_CLOSED_TIMEOUT, 20.0 ); init( TOO_MANY_CONNECTIONS_CLOSED_TIMEOUT, 20.0 );
init( PING_LOGGING_INTERVAL, 1.0 ); init( PING_LOGGING_INTERVAL, 3.0 );
init( PING_SAMPLE_AMOUNT, 100 ); init( PING_SAMPLE_AMOUNT, 100 );
init( TLS_CERT_REFRESH_DELAY_SECONDS, 12*60*60 ); init( TLS_CERT_REFRESH_DELAY_SECONDS, 12*60*60 );