FastRestore: Clear RestoreRole key in DB at finishRestore
This commit is the one that passes correctness tests after refactoring the fast restore.
This commit is contained in:
parent
879bf8dc7b
commit
32c030b7d6
|
@ -165,6 +165,7 @@ struct RestoreWorkerData : NonCopyable, public ReferenceCounted<RestoreWorkerDa
|
|||
|
||||
|
||||
// Restore worker
|
||||
// MX: This function is not used for now. Will change it to only clear restoreWorkerKey later.
|
||||
ACTOR Future<Void> handlerTerminateWorkerRequest(RestoreSimpleRequest req, Reference<RestoreWorkerData> self, RestoreWorkerInterface workerInterf, Database cx) {
|
||||
state Transaction tr(cx);
|
||||
|
||||
|
|
|
@ -96,7 +96,7 @@ ACTOR Future<Void> restoreApplierCore(Reference<RestoreApplierData> self, Restor
|
|||
}
|
||||
when ( RestoreSimpleRequest req = waitNext(applierInterf.finishRestore.getFuture()) ) {
|
||||
requestTypeStr = "finishRestore";
|
||||
req.reply.send(RestoreCommonReply(self->id(), req.cmdID));
|
||||
wait( handlerFinishRestoreRequest(req, self, cx) );
|
||||
break;
|
||||
}
|
||||
// TODO: To modify the interface for the following 2 when condition
|
||||
|
|
|
@ -92,11 +92,11 @@ ACTOR Future<Void> restoreLoaderCore(Reference<RestoreLoaderData> self, RestoreL
|
|||
|
||||
when ( RestoreVersionBatchRequest req = waitNext(loaderInterf.initVersionBatch.getFuture()) ) {
|
||||
requestTypeStr = "initVersionBatch";
|
||||
wait(handleInitVersionBatchRequest(req, self));
|
||||
wait( handleInitVersionBatchRequest(req, self) );
|
||||
}
|
||||
when ( RestoreSimpleRequest req = waitNext(loaderInterf.finishRestore.getFuture()) ) {
|
||||
requestTypeStr = "finishRestore";
|
||||
req.reply.send(RestoreCommonReply(self->id(), req.cmdID));
|
||||
wait( handlerFinishRestoreRequest(req, self, cx) );
|
||||
break;
|
||||
}
|
||||
// TODO: To modify the following when conditions
|
||||
|
|
|
@ -32,7 +32,7 @@
|
|||
class Database;
|
||||
struct RestoreWorkerData;
|
||||
|
||||
// id is the id of the worker to be monitored
|
||||
// id is the id of the worker to be monitored
|
||||
// This actor is used for both restore loader and restore applier
|
||||
ACTOR Future<Void> handleHeartbeat(RestoreSimpleRequest req, UID id) {
|
||||
wait( delay(0.1) ); // To avoid warning
|
||||
|
@ -41,6 +41,34 @@ ACTOR Future<Void> handleHeartbeat(RestoreSimpleRequest req, UID id) {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> handlerFinishRestoreRequest(RestoreSimpleRequest req, Reference<RestoreRoleData> self, Database cx) {
|
||||
state Transaction tr(cx);
|
||||
|
||||
loop {
|
||||
try {
|
||||
tr.reset();
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
if ( self->role == RestoreRole::Loader ) {
|
||||
tr.clear(restoreLoaderKeyFor(self->id()));
|
||||
} else if ( self->role == RestoreRole::Applier ) {
|
||||
tr.clear(restoreApplierKeyFor(self->id()));
|
||||
} else {
|
||||
UNREACHABLE();
|
||||
}
|
||||
wait( tr.commit() ) ;
|
||||
printf("Node:%s finish restore, clear the interface keys for all roles on the worker (id:%s) and the worker itself. Then exit\n", self->describeNode().c_str(), self->id().toString().c_str());
|
||||
req.reply.send( RestoreCommonReply(self->id(), req.cmdID) );
|
||||
break;
|
||||
} catch( Error &e ) {
|
||||
printf("[WARNING] Node:%s finishRestoreHandler() transaction error:%s\n", self->describeNode().c_str(), e.what());
|
||||
wait( tr.onError(e) );
|
||||
}
|
||||
};
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Restore Worker: collect restore role interfaces locally by reading the specific system keys
|
||||
ACTOR Future<Void> _collectRestoreRoleInterfaces(Reference<RestoreRoleData> self, Database cx) {
|
||||
state Transaction tr(cx);
|
||||
|
|
|
@ -53,6 +53,7 @@ struct RestoreSimpleRequest;
|
|||
ACTOR Future<Void> handleHeartbeat(RestoreSimpleRequest req, UID id);
|
||||
ACTOR Future<Void> handleCollectRestoreRoleInterfaceRequest(RestoreSimpleRequest req, Reference<RestoreRoleData> self, Database cx);
|
||||
ACTOR Future<Void> handleInitVersionBatchRequest(RestoreVersionBatchRequest req, Reference<RestoreRoleData> self);
|
||||
ACTOR Future<Void> handlerFinishRestoreRequest(RestoreSimpleRequest req, Reference<RestoreRoleData> self, Database cx);
|
||||
|
||||
ACTOR Future<Void> _collectRestoreRoleInterfaces(Reference<RestoreRoleData> self, Database cx);
|
||||
|
||||
|
|
Loading…
Reference in New Issue