This commit is contained in:
parent
e5728fcb84
commit
92fc089eb0
|
@ -239,8 +239,10 @@ ACTOR Future<Void> forwardError(PromiseStream<ErrorInfo> errors, Role role, UID
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> handleIOErrors(Future<Void> actor, IClosable* store, UID id, Future<Void> onClosed = Void()) {
|
||||
state Future<ErrorOr<Void>> storeError = actor.isReady() ? Never() : errorOr(store->getError());
|
||||
ACTOR Future<Void> handleIOErrors(Future<Void> actor,
|
||||
Future<ErrorOr<Void>> storeError,
|
||||
UID id,
|
||||
Future<Void> onClosed = Void()) {
|
||||
choose {
|
||||
when(state ErrorOr<Void> e = wait(errorOr(actor))) {
|
||||
if (e.isError() && e.getError().code() == error_code_please_reboot) {
|
||||
|
@ -277,6 +279,11 @@ ACTOR Future<Void> handleIOErrors(Future<Void> actor, IClosable* store, UID id,
|
|||
}
|
||||
}
|
||||
|
||||
Future<Void> handleIOErrors(Future<Void> actor, IClosable* store, UID id, Future<Void> onClosed = Void()) {
|
||||
Future<ErrorOr<Void>> storeError = actor.isReady() ? Never() : errorOr(store->getError());
|
||||
return handleIOErrors(actor, storeError, id, onClosed);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> workerHandleErrors(FutureStream<ErrorInfo> errors) {
|
||||
loop choose {
|
||||
when(ErrorInfo _err = waitNext(errors)) {
|
||||
|
@ -1624,9 +1631,10 @@ ACTOR Future<Void> storageServerRollbackRebooter(std::set<std::pair<UID, KeyValu
|
|||
DUMPTOKEN(recruited.changeFeedPop);
|
||||
DUMPTOKEN(recruited.changeFeedVersionUpdate);
|
||||
|
||||
Future<ErrorOr<Void>> storeError = errorOr(store->getError());
|
||||
prevStorageServer =
|
||||
storageServer(store, recruited, db, folder, Promise<Void>(), Reference<IClusterConnectionRecord>(nullptr));
|
||||
prevStorageServer = handleIOErrors(prevStorageServer, store, id, store->onClosed());
|
||||
prevStorageServer = handleIOErrors(prevStorageServer, storeError, id, store->onClosed());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2230,10 +2238,12 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
|
|||
DUMPTOKEN(recruited.changeFeedPop);
|
||||
DUMPTOKEN(recruited.changeFeedVersionUpdate);
|
||||
|
||||
Future<ErrorOr<Void>> storeError = errorOr(kv->getError());
|
||||
Promise<Void> recovery;
|
||||
Future<Void> f = storageServer(kv, recruited, dbInfo, folder, recovery, connRecord);
|
||||
recoveries.push_back(recovery.getFuture());
|
||||
f = handleIOErrors(f, kv, s.storeID, kvClosed);
|
||||
|
||||
f = handleIOErrors(f, storeError, s.storeID, kvClosed);
|
||||
f = storageServerRollbackRebooter(&runningStorages,
|
||||
&storageCleaners,
|
||||
f,
|
||||
|
@ -2949,6 +2959,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
|
|||
filesClosed.add(kvClosed);
|
||||
ReplyPromise<InitializeStorageReply> storageReady = req.reply;
|
||||
storageCache.set(req.reqId, storageReady.getFuture());
|
||||
Future<ErrorOr<Void>> storeError = errorOr(data->getError());
|
||||
Future<Void> s = storageServer(data,
|
||||
recruited,
|
||||
req.seedTag,
|
||||
|
@ -2957,7 +2968,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
|
|||
storageReady,
|
||||
dbInfo,
|
||||
folder);
|
||||
s = handleIOErrors(s, data, recruited.id(), kvClosed);
|
||||
s = handleIOErrors(s, storeError, recruited.id(), kvClosed);
|
||||
s = storageCache.removeOnReady(req.reqId, s);
|
||||
s = storageServerRollbackRebooter(&runningStorages,
|
||||
&storageCleaners,
|
||||
|
|
Loading…
Reference in New Issue