use the first RequestStream as the base endpoint

This commit is contained in:
Evan Tschannen 2020-05-20 13:52:22 -07:00
parent 459186dad0
commit 6a006c4c1f
5 changed files with 32 additions and 38 deletions

View File

@ -40,7 +40,6 @@ struct MasterProxyInterface {
Optional<Key> processId;
bool provisional;
Endpoint base;
RequestStream< struct CommitTransactionRequest > commit;
RequestStream< struct GetReadVersionRequest > getConsistentReadVersion; // Returns a version which (1) is committed, and (2) is >= the latest version reported committed (by a commit response) when this request was sent
// (at some point between when this request is sent and when its response is received, the latest version reported committed)
@ -63,18 +62,17 @@ struct MasterProxyInterface {
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, processId, provisional, base);
serializer(ar, processId, provisional, commit);
if( Archive::isDeserializing ) {
commit = RequestStream< struct CommitTransactionRequest >( base.getAdjustedEndpoint(0) );
getConsistentReadVersion = RequestStream< struct GetReadVersionRequest >( base.getAdjustedEndpoint(1) );
getKeyServersLocations = RequestStream< struct GetKeyServerLocationsRequest >( base.getAdjustedEndpoint(2) );
getStorageServerRejoinInfo = RequestStream< struct GetStorageServerRejoinInfoRequest >( base.getAdjustedEndpoint(3) );
waitFailure = RequestStream<ReplyPromise<Void>>( base.getAdjustedEndpoint(4) );
getRawCommittedVersion = RequestStream< struct GetRawCommittedVersionRequest >( base.getAdjustedEndpoint(5) );
txnState = RequestStream< struct TxnStateRequest >( base.getAdjustedEndpoint(6) );
getHealthMetrics = RequestStream< struct GetHealthMetricsRequest >( base.getAdjustedEndpoint(7) );
proxySnapReq = RequestStream< struct ProxySnapRequest >( base.getAdjustedEndpoint(8) );
exclusionSafetyCheckReq = RequestStream< struct ExclusionSafetyCheckRequest >( base.getAdjustedEndpoint(9) );
getConsistentReadVersion = RequestStream< struct GetReadVersionRequest >( commit.getEndpoint().getAdjustedEndpoint(1) );
getKeyServersLocations = RequestStream< struct GetKeyServerLocationsRequest >( commit.getEndpoint().getAdjustedEndpoint(2) );
getStorageServerRejoinInfo = RequestStream< struct GetStorageServerRejoinInfoRequest >( commit.getEndpoint().getAdjustedEndpoint(3) );
waitFailure = RequestStream<ReplyPromise<Void>>( commit.getEndpoint().getAdjustedEndpoint(4) );
getRawCommittedVersion = RequestStream< struct GetRawCommittedVersionRequest >( commit.getEndpoint().getAdjustedEndpoint(5) );
txnState = RequestStream< struct TxnStateRequest >( commit.getEndpoint().getAdjustedEndpoint(6) );
getHealthMetrics = RequestStream< struct GetHealthMetricsRequest >( commit.getEndpoint().getAdjustedEndpoint(7) );
proxySnapReq = RequestStream< struct ProxySnapRequest >( commit.getEndpoint().getAdjustedEndpoint(8) );
exclusionSafetyCheckReq = RequestStream< struct ExclusionSafetyCheckRequest >( commit.getEndpoint().getAdjustedEndpoint(9) );
}
}
@ -90,7 +88,7 @@ struct MasterProxyInterface {
streams.push_back(getHealthMetrics.getReceiver());
streams.push_back(proxySnapReq.getReceiver());
streams.push_back(exclusionSafetyCheckReq.getReceiver());
base = FlowTransport::transport().addEndpoints(streams);
FlowTransport::transport().addEndpoints(streams);
}
};

View File

@ -1277,8 +1277,8 @@ void FlowTransport::addEndpoint( Endpoint& endpoint, NetworkMessageReceiver* rec
self->endpoints.insert( receiver, endpoint.token, taskID );
}
const Endpoint& FlowTransport::addEndpoints( std::vector<std::pair<FlowReceiver*, TaskPriority>> const& streams ) {
return self->endpoints.insert( self->localAddresses, streams );
void FlowTransport::addEndpoints( std::vector<std::pair<FlowReceiver*, TaskPriority>> const& streams ) {
self->endpoints.insert( self->localAddresses, streams );
}
void FlowTransport::removeEndpoint( const Endpoint& endpoint, NetworkMessageReceiver* receiver ) {

View File

@ -192,7 +192,7 @@ public:
void addEndpoint( Endpoint& endpoint, NetworkMessageReceiver*, TaskPriority taskID );
// Sets endpoint to be a new local endpoint which delivers messages to the given receiver
const Endpoint& addEndpoints( std::vector<std::pair<struct FlowReceiver*, TaskPriority>> const& streams );
void addEndpoints( std::vector<std::pair<struct FlowReceiver*, TaskPriority>> const& streams );
void removeEndpoint( const Endpoint&, NetworkMessageReceiver* );
// The given local endpoint no longer delivers messages to the given receiver or uses resources

View File

@ -33,7 +33,6 @@ typedef uint64_t DBRecoveryCount;
struct MasterInterface {
constexpr static FileIdentifier file_identifier = 5979145;
LocalityData locality;
Endpoint base;
RequestStream< ReplyPromise<Void> > waitFailure;
RequestStream< struct TLogRejoinRequest > tlogRejoin; // sent by tlog (whether or not rebooted) to communicate with a new master
RequestStream< struct ChangeCoordinatorsRequest > changeCoordinators;
@ -49,13 +48,12 @@ struct MasterInterface {
if constexpr (!is_fb_function<Archive>) {
ASSERT(ar.protocolVersion().isValid());
}
serializer(ar, locality, base);
serializer(ar, locality, waitFailure);
if( Archive::isDeserializing ) {
waitFailure = RequestStream< ReplyPromise<Void> >( base.getAdjustedEndpoint(0) );
tlogRejoin = RequestStream< struct TLogRejoinRequest >( base.getAdjustedEndpoint(1) );
changeCoordinators = RequestStream< struct ChangeCoordinatorsRequest >( base.getAdjustedEndpoint(2) );
getCommitVersion = RequestStream< struct GetCommitVersionRequest >( base.getAdjustedEndpoint(3) );
notifyBackupWorkerDone = RequestStream<struct BackupWorkerDoneRequest>( base.getAdjustedEndpoint(4) );
tlogRejoin = RequestStream< struct TLogRejoinRequest >( waitFailure.getEndpoint().getAdjustedEndpoint(1) );
changeCoordinators = RequestStream< struct ChangeCoordinatorsRequest >( waitFailure.getEndpoint().getAdjustedEndpoint(2) );
getCommitVersion = RequestStream< struct GetCommitVersionRequest >( waitFailure.getEndpoint().getAdjustedEndpoint(3) );
notifyBackupWorkerDone = RequestStream<struct BackupWorkerDoneRequest>( waitFailure.getEndpoint().getAdjustedEndpoint(4) );
}
}
@ -66,7 +64,7 @@ struct MasterInterface {
streams.push_back(changeCoordinators.getReceiver());
streams.push_back(getCommitVersion.getReceiver(TaskPriority::GetConsistentReadVersion));
streams.push_back(notifyBackupWorkerDone.getReceiver());
base = FlowTransport::transport().addEndpoints(streams);
FlowTransport::transport().addEndpoints(streams);
}
};

View File

@ -36,7 +36,6 @@ struct TLogInterface {
LocalityData filteredLocality;
UID uniqueID;
UID sharedTLogID;
Endpoint base;
RequestStream< struct TLogPeekRequest > peekMessages;
RequestStream< struct TLogPopRequest > popMessages;
@ -75,7 +74,7 @@ struct TLogInterface {
streams.push_back(disablePopRequest.getReceiver());
streams.push_back(enablePopRequest.getReceiver());
streams.push_back(snapRequest.getReceiver());
base = FlowTransport::transport().addEndpoints(streams);
FlowTransport::transport().addEndpoints(streams);
}
template <class Ar>
@ -83,19 +82,18 @@ struct TLogInterface {
if constexpr (!is_fb_function<Ar>) {
ASSERT(ar.isDeserializing || uniqueID != UID());
}
serializer(ar, uniqueID, sharedTLogID, filteredLocality, base);
serializer(ar, uniqueID, sharedTLogID, filteredLocality, peekMessages);
if( Ar::isDeserializing ) {
peekMessages = RequestStream< struct TLogPeekRequest >( base.getAdjustedEndpoint(0) );
popMessages = RequestStream< struct TLogPopRequest >( base.getAdjustedEndpoint(1) );
commit = RequestStream< struct TLogCommitRequest >( base.getAdjustedEndpoint(2) );
lock = RequestStream< ReplyPromise< struct TLogLockResult > >( base.getAdjustedEndpoint(3) );
getQueuingMetrics = RequestStream< struct TLogQueuingMetricsRequest >( base.getAdjustedEndpoint(4) );
confirmRunning = RequestStream< struct TLogConfirmRunningRequest >( base.getAdjustedEndpoint(5) );
waitFailure = RequestStream< ReplyPromise<Void> >( base.getAdjustedEndpoint(6) );
recoveryFinished = RequestStream< struct TLogRecoveryFinishedRequest >( base.getAdjustedEndpoint(7) );
disablePopRequest = RequestStream< struct TLogDisablePopRequest >( base.getAdjustedEndpoint(8) );
enablePopRequest = RequestStream< struct TLogEnablePopRequest >( base.getAdjustedEndpoint(9) );
snapRequest = RequestStream< struct TLogSnapRequest >( base.getAdjustedEndpoint(10) );
popMessages = RequestStream< struct TLogPopRequest >( peekMessages.getEndpoint().getAdjustedEndpoint(1) );
commit = RequestStream< struct TLogCommitRequest >( peekMessages.getEndpoint().getAdjustedEndpoint(2) );
lock = RequestStream< ReplyPromise< struct TLogLockResult > >( peekMessages.getEndpoint().getAdjustedEndpoint(3) );
getQueuingMetrics = RequestStream< struct TLogQueuingMetricsRequest >( peekMessages.getEndpoint().getAdjustedEndpoint(4) );
confirmRunning = RequestStream< struct TLogConfirmRunningRequest >( peekMessages.getEndpoint().getAdjustedEndpoint(5) );
waitFailure = RequestStream< ReplyPromise<Void> >( peekMessages.getEndpoint().getAdjustedEndpoint(6) );
recoveryFinished = RequestStream< struct TLogRecoveryFinishedRequest >( peekMessages.getEndpoint().getAdjustedEndpoint(7) );
disablePopRequest = RequestStream< struct TLogDisablePopRequest >( peekMessages.getEndpoint().getAdjustedEndpoint(8) );
enablePopRequest = RequestStream< struct TLogEnablePopRequest >( peekMessages.getEndpoint().getAdjustedEndpoint(9) );
snapRequest = RequestStream< struct TLogSnapRequest >( peekMessages.getEndpoint().getAdjustedEndpoint(10) );
}
}
};