cache the serialization of clientDBInfo
This commit is contained in:
parent
79bb24484c
commit
9b4f7626bb
|
@ -155,7 +155,7 @@ struct OpenDatabaseCoordRequest {
|
|||
UID knownClientInfoID;
|
||||
Key clusterKey;
|
||||
vector<NetworkAddress> coordinators;
|
||||
ReplyPromise< struct ClientDBInfo > reply;
|
||||
ReplyPromise< CachedSerialization<struct ClientDBInfo> > reply;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
|
|
|
@ -609,11 +609,11 @@ ACTOR Future<Void> getClientInfoFromLeader( Reference<AsyncVar<Optional<ClusterC
|
|||
} else {
|
||||
resetReply(req);
|
||||
}
|
||||
req.knownClientInfoID = clientData->clientInfo->get().id;
|
||||
req.knownClientInfoID = clientData->clientInfo->get().read().id;
|
||||
choose {
|
||||
when( ClientDBInfo ni = wait( brokenPromiseToNever( knownLeader->get().get().clientInterface.openDatabase.getReply( req ) ) ) ) {
|
||||
TraceEvent("MonitorLeaderForProxiesGotClientInfo", knownLeader->get().get().clientInterface.id()).detail("Proxy0", ni.proxies.size() ? ni.proxies[0].id() : UID()).detail("ClientID", ni.id);
|
||||
clientData->clientInfo->set(ni);
|
||||
clientData->clientInfo->set(CachedSerialization<ClientDBInfo>(ni));
|
||||
}
|
||||
when( wait( knownLeader->onChange() ) ) {}
|
||||
}
|
||||
|
@ -649,7 +649,7 @@ ACTOR Future<Void> monitorLeaderForProxies( Key clusterKey, vector<NetworkAddres
|
|||
ClientDBInfo outInfo;
|
||||
outInfo.id = deterministicRandom()->randomUniqueID();
|
||||
outInfo.forward = leader.get().first.serializedInfo;
|
||||
clientData->clientInfo->set(outInfo);
|
||||
clientData->clientInfo->set(CachedSerialization<ClientDBInfo>(outInfo));
|
||||
TraceEvent("MonitorLeaderForProxiesForwarding").detail("NewConnStr", leader.get().first.serializedInfo.toString());
|
||||
return Void();
|
||||
}
|
||||
|
@ -709,11 +709,11 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration( Reference<ClusterCo
|
|||
incorrectTime = Optional<double>();
|
||||
}
|
||||
|
||||
state ErrorOr<ClientDBInfo> rep = wait( clientLeaderServer.openDatabase.tryGetReply( req, TaskPriority::CoordinationReply ) );
|
||||
state ErrorOr<CachedSerialization<ClientDBInfo>> rep = wait( clientLeaderServer.openDatabase.tryGetReply( req, TaskPriority::CoordinationReply ) );
|
||||
if (rep.present()) {
|
||||
if( rep.get().forward.present() ) {
|
||||
TraceEvent("MonitorProxiesForwarding").detail("NewConnStr", rep.get().forward.get().toString()).detail("OldConnStr", info.intermediateConnFile->getConnectionString().toString());
|
||||
info.intermediateConnFile = Reference<ClusterConnectionFile>(new ClusterConnectionFile(connFile->getFilename(), ClusterConnectionString(rep.get().forward.get().toString())));
|
||||
if( rep.get().read().forward.present() ) {
|
||||
TraceEvent("MonitorProxiesForwarding").detail("NewConnStr", rep.get().read().forward.get().toString()).detail("OldConnStr", info.intermediateConnFile->getConnectionString().toString());
|
||||
info.intermediateConnFile = Reference<ClusterConnectionFile>(new ClusterConnectionFile(connFile->getFilename(), ClusterConnectionString(rep.get().read().forward.get().toString())));
|
||||
return info;
|
||||
}
|
||||
if(connFile != info.intermediateConnFile) {
|
||||
|
@ -729,7 +729,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration( Reference<ClusterCo
|
|||
info.hasConnected = true;
|
||||
connFile->notifyConnected();
|
||||
|
||||
auto& ni = rep.get();
|
||||
auto& ni = rep.get().mutate();
|
||||
if(ni.proxies.size() > CLIENT_KNOBS->MAX_CLIENT_PROXY_CONNECTIONS) {
|
||||
std::vector<UID> proxyUIDs;
|
||||
for(auto& proxy : ni.proxies) {
|
||||
|
@ -747,7 +747,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration( Reference<ClusterCo
|
|||
ni.proxies = lastProxies;
|
||||
}
|
||||
|
||||
clientInfo->set( rep.get() );
|
||||
clientInfo->set( rep.get().read() );
|
||||
successIdx = idx;
|
||||
} else if(idx == successIdx) {
|
||||
wait(delay(CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY));
|
||||
|
|
|
@ -42,11 +42,11 @@ struct ClientStatusInfo {
|
|||
|
||||
struct ClientData {
|
||||
std::map<NetworkAddress, ClientStatusInfo> clientStatusInfoMap;
|
||||
Reference<AsyncVar<ClientDBInfo>> clientInfo;
|
||||
Reference<AsyncVar<CachedSerialization<ClientDBInfo>>> clientInfo;
|
||||
|
||||
OpenDatabaseRequest getRequest();
|
||||
|
||||
ClientData() : clientInfo( new AsyncVar<ClientDBInfo>( ClientDBInfo() ) ) {}
|
||||
ClientData() : clientInfo( new AsyncVar<CachedSerialization<ClientDBInfo>>( CachedSerialization<ClientDBInfo>() ) ) {}
|
||||
};
|
||||
|
||||
template <class LeaderInterface>
|
||||
|
|
|
@ -209,7 +209,7 @@ TEST_CASE("/fdbserver/Coordination/localGenerationReg/simple") {
|
|||
}
|
||||
|
||||
ACTOR Future<Void> openDatabase(ClientData* db, int* clientCount, Reference<AsyncVar<bool>> hasConnectedClients, OpenDatabaseCoordRequest req) {
|
||||
if(db->clientInfo->get().id != req.knownClientInfoID && !db->clientInfo->get().forward.present()) {
|
||||
if(db->clientInfo->get().read().id != req.knownClientInfoID && !db->clientInfo->get().read().forward.present()) {
|
||||
req.reply.send( db->clientInfo->get() );
|
||||
return Void();
|
||||
}
|
||||
|
@ -218,7 +218,7 @@ ACTOR Future<Void> openDatabase(ClientData* db, int* clientCount, Reference<Asyn
|
|||
|
||||
db->clientStatusInfoMap[req.reply.getEndpoint().getPrimaryAddress()] = ClientStatusInfo(req.traceLogGroup, req.supportedVersions, req.issues);
|
||||
|
||||
while (db->clientInfo->get().id == req.knownClientInfoID && !db->clientInfo->get().forward.present()) {
|
||||
while (db->clientInfo->get().read().id == req.knownClientInfoID && !db->clientInfo->get().read().forward.present()) {
|
||||
choose {
|
||||
when (wait( db->clientInfo->onChange() )) {}
|
||||
when (wait( delayJittered( SERVER_KNOBS->CLIENT_REGISTER_INTERVAL ) )) { break; } // The client might be long gone!
|
||||
|
@ -315,7 +315,7 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
|
|||
ClientDBInfo outInfo;
|
||||
outInfo.id = deterministicRandom()->randomUniqueID();
|
||||
outInfo.forward = req.conn.toString();
|
||||
clientData.clientInfo->set(outInfo);
|
||||
clientData.clientInfo->set(CachedSerialization<ClientDBInfo>(outInfo));
|
||||
req.reply.send( Void() );
|
||||
ASSERT(!hasConnectedClients->get());
|
||||
return Void();
|
||||
|
@ -478,7 +478,7 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore
|
|||
ClientDBInfo info;
|
||||
info.id = deterministicRandom()->randomUniqueID();
|
||||
info.forward = forward.get().serializedInfo;
|
||||
req.reply.send( info );
|
||||
req.reply.send( CachedSerialization<ClientDBInfo>(info) );
|
||||
} else {
|
||||
regs.getInterface(req.clusterKey, id).openDatabase.send( req );
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue