only serialize a single endpoint for an interface

This commit is contained in:
Evan Tschannen 2020-04-12 16:04:48 -07:00
parent 9b5130194d
commit 0c2e8b9462
4 changed files with 35 additions and 7 deletions

View File

@ -1146,8 +1146,10 @@ void FlowTransport::removePeerReference(const Endpoint& endpoint, bool isStream)
}
}
void FlowTransport::addEndpoint( Endpoint& endpoint, NetworkMessageReceiver* receiver, TaskPriority taskID ) {
endpoint.token = deterministicRandom()->randomUniqueID();
void FlowTransport::addEndpoint( Endpoint& endpoint, NetworkMessageReceiver* receiver, TaskPriority taskID, bool randomizeEndpoint ) {
if(randomizeEndpoint) {
endpoint.token = deterministicRandom()->randomUniqueID();
}
if (receiver->isStream()) {
endpoint.addresses = self->localAddresses;
endpoint.token = UID( endpoint.token.first() | TOKEN_STREAM_FLAG, endpoint.token.second() );

View File

@ -64,6 +64,10 @@ public:
return addresses.getTLSAddress();
}
Endpoint getAdjustedEndpoint( uint32_t index ) {
return Endpoint( addresses, UID(token.first(), (token.second()&0xffffffff00000000LL) | index) );
}
bool operator == (Endpoint const& r) const {
return getPrimaryAddress() == r.getPrimaryAddress() && token == r.token;
}
@ -179,7 +183,7 @@ public:
void removePeerReference(const Endpoint&, bool isStream);
// Signal that a peer connection is no longer being used
void addEndpoint( Endpoint& endpoint, NetworkMessageReceiver*, TaskPriority taskID );
void addEndpoint( Endpoint& endpoint, NetworkMessageReceiver*, TaskPriority taskID, bool randomizeEndpoint );
// Sets endpoint to be a new local endpoint which delivers messages to the given receiver
void removeEndpoint( const Endpoint&, NetworkMessageReceiver* );

View File

@ -55,11 +55,19 @@ struct FlowReceiver : private NetworkMessageReceiver {
const Endpoint& getEndpoint(TaskPriority taskID) {
if (!endpoint.isValid()) {
m_isLocalEndpoint = true;
FlowTransport::transport().addEndpoint(endpoint, this, taskID);
FlowTransport::transport().addEndpoint(endpoint, this, taskID, true);
}
return endpoint;
}
const Endpoint& initEndpoint(Endpoint base, TaskPriority taskID) {
ASSERT(!endpoint.isValid());
m_isLocalEndpoint = true;
endpoint = base;
FlowTransport::transport().addEndpoint(endpoint, this, taskID, false);
return endpoint;
}
void makeWellKnownEndpoint(Endpoint::Token token, TaskPriority taskID) {
ASSERT(!endpoint.isValid());
m_isLocalEndpoint = true;
@ -374,6 +382,8 @@ public:
//queue = (NetNotifiedQueue<T>*)0xdeadbeef;
}
Endpoint initEndpoint(Endpoint base, uint32_t taskID = TaskPriority::DefaultEndpoint) { return queue->initEndpoint(base, taskID); }
Endpoint getEndpoint(TaskPriority taskID = TaskPriority::DefaultEndpoint) const { return queue->getEndpoint(taskID); }
void makeWellKnownEndpoint(Endpoint::Token token, TaskPriority taskID) {
queue->makeWellKnownEndpoint(token, taskID);

View File

@ -33,6 +33,7 @@ typedef uint64_t DBRecoveryCount;
struct MasterInterface {
constexpr static FileIdentifier file_identifier = 5979145;
LocalityData locality;
Endpoint base;
RequestStream< ReplyPromise<Void> > waitFailure;
RequestStream< struct TLogRejoinRequest > tlogRejoin; // sent by tlog (whether or not rebooted) to communicate with a new master
RequestStream< struct ChangeCoordinatorsRequest > changeCoordinators;
@ -48,12 +49,23 @@ struct MasterInterface {
if constexpr (!is_fb_function<Archive>) {
ASSERT(ar.protocolVersion().isValid());
}
serializer(ar, locality, waitFailure, tlogRejoin, changeCoordinators, getCommitVersion, notifyBackupWorkerDone);
serializer(ar, locality, base);
if( Ar::isDeserializing ) {
waitFailure = RequestStream< ReplyPromise<Void> >( base.getAdjustedEndpoint(1) );
tlogRejoin = RequestStream< struct TLogRejoinRequest >( base.getAdjustedEndpoint(2) );
changeCoordinators = RequestStream< struct ChangeCoordinatorsRequest >( base.getAdjustedEndpoint(3) );
getCommitVersion = RequestStream< struct GetCommitVersionRequest >( base.getAdjustedEndpoint(4) );
notifyBackupWorkerDone = RequestStream<struct BackupWorkerDoneRequest>( base.getAdjustedEndpoint(5) );
}
}
void initEndpoints() {
getCommitVersion.getEndpoint( TaskPriority::GetConsistentReadVersion );
tlogRejoin.getEndpoint( TaskPriority::MasterTLogRejoin );
base = Endpoint( g_network->getLocalAddresses(), deterministicRandom()->randomUniqueID() );
waitFailure.initEndpoint( base.getAdjustedEndpoint(1) );
tlogRejoin.initEndpoint( base.getAdjustedEndpoint(2), TaskPriority::MasterTLogRejoin );
changeCoordinators.initEndpoint( base.getAdjustedEndpoint(3) );
getCommitVersion.initEndpoint( base.getAdjustedEndpoint(4), TaskPriority::GetConsistentReadVersion );
notifyBackupWorkerDone.initEndpoint( base.getAdjustedEndpoint(5) );
}
};