forked from OSchip/llvm-project
[Orc][RPC] Lock the pending results data structure when installing new result
handlers, make abandonPendingResults public API. This should make installing asynchronous result handlers thread safe. The abandonPendingResults method is made public so that clients can disconnect from a remote even if they have asynchronous handlers awaing results from that remote. The asynchronous handlers will all receive "abandoned result" errors as their argument. llvm-svn: 291399
This commit is contained in:
parent
1d84d9ac48
commit
ec97c706cb
|
@ -788,15 +788,21 @@ public:
|
|||
return FnIdOrErr.takeError();
|
||||
}
|
||||
|
||||
// Allocate a sequence number.
|
||||
auto SeqNo = SequenceNumberMgr.getSequenceNumber();
|
||||
assert(!PendingResponses.count(SeqNo) &&
|
||||
"Sequence number already allocated");
|
||||
SequenceNumberT SeqNo; // initialized in locked scope below.
|
||||
{
|
||||
// Lock the pending responses map and sequence number manager.
|
||||
std::lock_guard<std::mutex> Lock(ResponsesMutex);
|
||||
|
||||
// Install the user handler.
|
||||
PendingResponses[SeqNo] =
|
||||
// Allocate a sequence number.
|
||||
SeqNo = SequenceNumberMgr.getSequenceNumber();
|
||||
assert(!PendingResponses.count(SeqNo) &&
|
||||
"Sequence number already allocated");
|
||||
|
||||
// Install the user handler.
|
||||
PendingResponses[SeqNo] =
|
||||
detail::createResponseHandler<ChannelT, typename Func::ReturnType>(
|
||||
std::move(Handler));
|
||||
}
|
||||
|
||||
// Open the function call message.
|
||||
if (auto Err = C.startSendMessage(FnId, SeqNo)) {
|
||||
|
@ -863,6 +869,24 @@ public:
|
|||
return detail::ReadArgs<ArgTs...>(Args...);
|
||||
}
|
||||
|
||||
/// Abandon all outstanding result handlers.
|
||||
///
|
||||
/// This will call all currently registered result handlers to receive an
|
||||
/// "abandoned" error as their argument. This is used internally by the RPC
|
||||
/// in error situations, but can also be called directly by clients who are
|
||||
/// disconnecting from the remote and don't or can't expect responses to their
|
||||
/// outstanding calls. (Especially for outstanding blocking calls, calling
|
||||
/// this function may be necessary to avoid dead threads).
|
||||
void abandonPendingResponses() {
|
||||
// Lock the pending responses map and sequence number manager.
|
||||
std::lock_guard<std::mutex> Lock(ResponsesMutex);
|
||||
|
||||
for (auto &KV : PendingResponses)
|
||||
KV.second->abandon();
|
||||
PendingResponses.clear();
|
||||
SequenceNumberMgr.reset();
|
||||
}
|
||||
|
||||
protected:
|
||||
// The LaunchPolicy type allows a launch policy to be specified when adding
|
||||
// a function handler. See addHandlerImpl.
|
||||
|
@ -888,28 +912,32 @@ protected:
|
|||
wrapHandler<Func>(std::move(Handler), std::move(Launch));
|
||||
}
|
||||
|
||||
// Abandon all outstanding results.
|
||||
void abandonPendingResponses() {
|
||||
for (auto &KV : PendingResponses)
|
||||
KV.second->abandon();
|
||||
PendingResponses.clear();
|
||||
SequenceNumberMgr.reset();
|
||||
}
|
||||
|
||||
Error handleResponse(SequenceNumberT SeqNo) {
|
||||
auto I = PendingResponses.find(SeqNo);
|
||||
if (I == PendingResponses.end()) {
|
||||
abandonPendingResponses();
|
||||
return orcError(OrcErrorCode::UnexpectedRPCResponse);
|
||||
using Handler = typename decltype(PendingResponses)::mapped_type;
|
||||
Handler PRHandler;
|
||||
|
||||
{
|
||||
// Lock the pending responses map and sequence number manager.
|
||||
std::unique_lock<std::mutex> Lock(ResponsesMutex);
|
||||
auto I = PendingResponses.find(SeqNo);
|
||||
|
||||
if (I != PendingResponses.end()) {
|
||||
PRHandler = std::move(I->second);
|
||||
PendingResponses.erase(I);
|
||||
SequenceNumberMgr.releaseSequenceNumber(SeqNo);
|
||||
} else {
|
||||
// Unlock the pending results map to prevent recursive lock.
|
||||
Lock.unlock();
|
||||
abandonPendingResponses();
|
||||
return orcError(OrcErrorCode::UnexpectedRPCResponse);
|
||||
}
|
||||
}
|
||||
|
||||
auto PRHandler = std::move(I->second);
|
||||
PendingResponses.erase(I);
|
||||
SequenceNumberMgr.releaseSequenceNumber(SeqNo);
|
||||
assert(PRHandler &&
|
||||
"If we didn't find a response handler we should have bailed out");
|
||||
|
||||
if (auto Err = PRHandler->handleResponse(C)) {
|
||||
abandonPendingResponses();
|
||||
SequenceNumberMgr.reset();
|
||||
return Err;
|
||||
}
|
||||
|
||||
|
@ -1016,6 +1044,7 @@ protected:
|
|||
|
||||
std::map<FunctionIdT, WrappedHandlerFn> Handlers;
|
||||
|
||||
std::mutex ResponsesMutex;
|
||||
detail::SequenceNumberManager<SequenceNumberT> SequenceNumberMgr;
|
||||
std::map<SequenceNumberT, std::unique_ptr<detail::ResponseHandler<ChannelT>>>
|
||||
PendingResponses;
|
||||
|
|
Loading…
Reference in New Issue