don't allow double recruitement for storages
This commit is contained in:
parent
b2271f2176
commit
dc17be093c
|
@ -743,7 +743,21 @@ ACTOR Future<Void> monitorHighMemory(int64_t threshold) {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> storageServerRollbackRebooter(Future<Void> prevStorageServer,
|
||||
struct TrackRunningStorage {
|
||||
UID self;
|
||||
KeyValueStoreType storeType;
|
||||
std::set<std::pair<UID, KeyValueStoreType>>* runningStorages;
|
||||
TrackRunningStorage(UID self,
|
||||
KeyValueStoreType storeType,
|
||||
std::set<std::pair<UID, KeyValueStoreType>>* runningStorages)
|
||||
: self(self), storeType(storeType), runningStorages(runningStorages) {
|
||||
runningStorages->emplace(self, storeType);
|
||||
}
|
||||
~TrackRunningStorage() { runningStorages->erase(std::make_pair(self, storeType)); };
|
||||
};
|
||||
|
||||
ACTOR Future<Void> storageServerRollbackRebooter(std::set<std::pair<UID, KeyValueStoreType>>* runningStorages,
|
||||
Future<Void> prevStorageServer,
|
||||
KeyValueStoreType storeType,
|
||||
std::string filename,
|
||||
UID id,
|
||||
|
@ -754,6 +768,7 @@ ACTOR Future<Void> storageServerRollbackRebooter(Future<Void> prevStorageServer,
|
|||
ActorCollection* filesClosed,
|
||||
int64_t memoryLimit,
|
||||
IKeyValueStore* store) {
|
||||
state TrackRunningStorage _(id, storeType, runningStorages);
|
||||
loop {
|
||||
ErrorOr<Void> e = wait(errorOr(prevStorageServer));
|
||||
if (!e.isError())
|
||||
|
@ -999,6 +1014,13 @@ struct SharedLogsValue {
|
|||
: actor(actor), uid(uid), requests(requests) {}
|
||||
};
|
||||
|
||||
ACTOR template <class T>
|
||||
Future<Void> sendDelayedError(ReplyPromise<T> reply, Error e, double d) {
|
||||
wait(delay(d));
|
||||
reply.sendError(e);
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
|
||||
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
|
||||
LocalityData locality,
|
||||
|
@ -1038,6 +1060,7 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
|
|||
state std::string coordFolder = abspath(_coordFolder);
|
||||
|
||||
state WorkerInterface interf(locality);
|
||||
state std::set<std::pair<UID, KeyValueStoreType>> runningStorages;
|
||||
interf.initEndpoints();
|
||||
|
||||
state Reference<AsyncVar<std::set<std::string>>> issues(new AsyncVar<std::set<std::string>>());
|
||||
|
@ -1150,7 +1173,8 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
|
|||
Future<Void> f = storageServer(kv, recruited, dbInfo, folder, recovery, connFile);
|
||||
recoveries.push_back(recovery.getFuture());
|
||||
f = handleIOErrors(f, kv, s.storeID, kvClosed);
|
||||
f = storageServerRollbackRebooter(f,
|
||||
f = storageServerRollbackRebooter(&runningStorages,
|
||||
f,
|
||||
s.storeType,
|
||||
s.filename,
|
||||
recruited.id(),
|
||||
|
@ -1512,7 +1536,11 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
|
|||
activeSharedTLog->set(logData.uid);
|
||||
}
|
||||
when(InitializeStorageRequest req = waitNext(interf.storage.getFuture())) {
|
||||
if (!storageCache.exists(req.reqId)) {
|
||||
if (!storageCache.exists(req.reqId) &&
|
||||
(std::all_of(runningStorages.begin(),
|
||||
runningStorages.end(),
|
||||
[&req](const auto& p) { return p.second != req.storeType; }) ||
|
||||
req.seedTag != invalidTag)) {
|
||||
|
||||
bool isTss = req.tssPairIDAndVersion.present();
|
||||
|
||||
|
@ -1561,7 +1589,8 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
|
|||
folder);
|
||||
s = handleIOErrors(s, data, recruited.id(), kvClosed);
|
||||
s = storageCache.removeOnReady(req.reqId, s);
|
||||
s = storageServerRollbackRebooter(s,
|
||||
s = storageServerRollbackRebooter(&runningStorages,
|
||||
s,
|
||||
req.storeType,
|
||||
filename,
|
||||
recruited.id(),
|
||||
|
@ -1573,8 +1602,12 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
|
|||
memoryLimit,
|
||||
data);
|
||||
errorForwarders.add(forwardError(errors, ssRole, recruited.id(), s));
|
||||
} else
|
||||
} else if (storageCache.exists(req.reqId)) {
|
||||
forwardPromise(req.reply, storageCache.get(req.reqId));
|
||||
} else {
|
||||
TraceEvent("AttemptedDoubleRecruitement", interf.id()).detail("ForRole", "StorageServer");
|
||||
errorForwarders.add(sendDelayedError(req.reply, recruitment_failed(), 0.5));
|
||||
}
|
||||
}
|
||||
when(InitializeCommitProxyRequest req = waitNext(interf.commitProxy.getFuture())) {
|
||||
CommitProxyInterface recruited;
|
||||
|
|
Loading…
Reference in New Issue