Switch to signalling storageIntefaceReg actor with an Optional<Future<Void>>

This commit is contained in:
Bharadwaj V.R 2022-03-29 11:50:46 -07:00
parent dd3a453f5b
commit 2f7b68d06f
1 changed files with 8 additions and 12 deletions

View File

@ -836,7 +836,7 @@ public:
Promise<Void> coreStarted; Promise<Void> coreStarted;
bool shuttingDown; bool shuttingDown;
Promise<bool> registerInterfaceAcceptingRequests; Promise<Void> registerInterfaceAcceptingRequests;
Future<Void> interfaceRegistered; Future<Void> interfaceRegistered;
bool behind; bool behind;
@ -6803,12 +6803,10 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
if ((data->lastTLogVersion - data->version.get()) < SERVER_KNOBS->STORAGE_RECOVERY_VERSION_LAG_LIMIT) { if ((data->lastTLogVersion - data->version.get()) < SERVER_KNOBS->STORAGE_RECOVERY_VERSION_LAG_LIMIT) {
if (data->registerInterfaceAcceptingRequests.canBeSet()) { if (data->registerInterfaceAcceptingRequests.canBeSet()) {
data->registerInterfaceAcceptingRequests.send(true); data->registerInterfaceAcceptingRequests.send(Void());
ErrorOr<Void> e = wait(errorOr(data->interfaceRegistered)); ErrorOr<Void> e = wait(errorOr(data->interfaceRegistered));
if (e.isError()) { if (e.isError()) {
TraceEvent(SevWarn, "StorageInterfaceRegistrationFailed") TraceEvent(SevWarn, "StorageInterfaceRegistrationFailed", data->thisServerID).error(e.getError());
.detail("ServerID", data->thisServerID)
.detail("Error", e.getError().code());
} }
} }
} }
@ -8623,10 +8621,10 @@ ACTOR Future<Void> replaceTSSInterface(StorageServer* self, StorageServerInterfa
ACTOR Future<Void> storageInterfaceRegistration(StorageServer* self, ACTOR Future<Void> storageInterfaceRegistration(StorageServer* self,
StorageServerInterface ssi, StorageServerInterface ssi,
Future<bool> interfaceAcceptingRequests) { Optional<Future<Void>> readyToAcceptRequests) {
bool acceptingRequests = wait(interfaceAcceptingRequests); if (readyToAcceptRequests.present()) {
if (acceptingRequests) { wait(readyToAcceptRequests.get());
ssi.startAcceptingRequests(); ssi.startAcceptingRequests();
} else { } else {
ssi.stopAcceptingRequests(); ssi.stopAcceptingRequests();
@ -8673,7 +8671,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
if (seedTag == invalidTag) { if (seedTag == invalidTag) {
ssi.startAcceptingRequests(); ssi.startAcceptingRequests();
self.registerInterfaceAcceptingRequests.send(true); self.registerInterfaceAcceptingRequests.send(Void());
// Might throw recruitment_failed in case of simultaneous master failure // Might throw recruitment_failed in case of simultaneous master failure
std::pair<Version, Tag> verAndTag = wait(addStorageServer(self.cx, ssi)); std::pair<Version, Tag> verAndTag = wait(addStorageServer(self.cx, ssi));
@ -8789,10 +8787,8 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
if (recovered.canBeSet()) if (recovered.canBeSet())
recovered.send(Void()); recovered.send(Void());
state Promise<bool> registerInterface; state Future<Void> f = storageInterfaceRegistration(&self, ssi, {});
state Future<Void> f = storageInterfaceRegistration(&self, ssi, registerInterface.getFuture());
wait(delay(0)); wait(delay(0));
registerInterface.send(false);
ErrorOr<Void> e = wait(errorOr(f)); ErrorOr<Void> e = wait(errorOr(f));
if (e.isError()) { if (e.isError()) {
Error e = f.getError(); Error e = f.getError();