235 lines
6.8 KiB
C++
235 lines
6.8 KiB
C++
/*
|
|
* genericactors.actor.h
|
|
*
|
|
* This source file is part of the FoundationDB open source project
|
|
*
|
|
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
|
|
// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source version.
|
|
#if defined(NO_INTELLISENSE) && !defined(FDBRPC_GENERICACTORS_ACTOR_G_H)
|
|
#define FDBRPC_GENERICACTORS_ACTOR_G_H
|
|
#include "fdbrpc/genericactors.actor.g.h"
|
|
#elif !defined(RPCGENERICACTORS_ACTOR_H)
|
|
#define RPCGENERICACTORS_ACTOR_H
|
|
|
|
#include "flow/genericactors.actor.h"
|
|
#include "fdbrpc/fdbrpc.h"
|
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
|
|
|
ACTOR template <class Req>
|
|
Future<REPLY_TYPE(Req)> retryBrokenPromise( RequestStream<Req> to, Req request ) {
|
|
// Like to.getReply(request), except that a broken_promise exception results in retrying request immediately.
|
|
// Suitable for use with well known endpoints, which are likely to return to existence after the other process restarts.
|
|
// Not normally useful for ordinary endpoints, which conventionally are permanently destroyed after replying with broken_promise.
|
|
loop {
|
|
try {
|
|
REPLY_TYPE(Req) reply = wait( to.getReply( request ) );
|
|
return reply;
|
|
} catch( Error& e ) {
|
|
if (e.code() != error_code_broken_promise)
|
|
throw;
|
|
resetReply( request );
|
|
wait( delayJittered(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY) );
|
|
TEST(true); // retryBrokenPromise
|
|
}
|
|
}
|
|
}
|
|
|
|
ACTOR template <class Req>
|
|
Future<REPLY_TYPE(Req)> retryBrokenPromise( RequestStream<Req> to, Req request, TaskPriority taskID ) {
|
|
// Like to.getReply(request), except that a broken_promise exception results in retrying request immediately.
|
|
// Suitable for use with well known endpoints, which are likely to return to existence after the other process restarts.
|
|
// Not normally useful for ordinary endpoints, which conventionally are permanently destroyed after replying with broken_promise.
|
|
loop {
|
|
try {
|
|
REPLY_TYPE(Req) reply = wait( to.getReply( request, taskID ) );
|
|
return reply;
|
|
} catch( Error& e ) {
|
|
if (e.code() != error_code_broken_promise)
|
|
throw;
|
|
resetReply( request );
|
|
wait( delayJittered(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY, taskID) );
|
|
TEST(true); // retryBrokenPromise
|
|
}
|
|
}
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<T> timeoutWarning( Future<T> what, double time, PromiseStream<Void> output ) {
|
|
state Future<Void> end = delay( time );
|
|
loop choose {
|
|
when ( T t = wait( what ) ) { return t; }
|
|
when ( wait( end ) ) {
|
|
output.send( Void() );
|
|
end = delay( time );
|
|
}
|
|
}
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
void forwardPromise( Promise<T> output, Future<T> input ) {
|
|
try {
|
|
T value = wait(input);
|
|
output.send(value);
|
|
} catch (Error& err) {
|
|
output.sendError(err);
|
|
}
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
void forwardPromise(ReplyPromise<T> output, Future<T> input) {
|
|
try {
|
|
T value = wait(input);
|
|
output.send(value);
|
|
}
|
|
catch (Error& err) {
|
|
output.sendError(err);
|
|
}
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
void forwardPromise( PromiseStream<T> output, Future<T> input ) {
|
|
try{
|
|
T value = wait(input);
|
|
output.send(value);
|
|
} catch (Error& e) {
|
|
output.sendError(e);
|
|
}
|
|
}
|
|
|
|
ACTOR template <class T> Future<Void> broadcast(Future<T> input, std::vector<Promise<T>> output) {
|
|
T value = wait(input);
|
|
for (int i = 0; i<output.size(); i++)
|
|
output[i].send(value);
|
|
return Void();
|
|
}
|
|
|
|
ACTOR template <class T> Future<Void> broadcast( Future<T> input, std::vector<ReplyPromise<T>> output ) {
|
|
T value = wait( input );
|
|
for(int i=0; i<output.size(); i++)
|
|
output[i].send(value);
|
|
return Void();
|
|
}
|
|
|
|
ACTOR template <class T> Future<Void> incrementalBroadcast(Future<T> input, std::vector<Promise<T>> output, int batchSize) {
|
|
state T value = wait(input);
|
|
state int i = 0;
|
|
for (; i<output.size(); i++) {
|
|
output[i].send(value);
|
|
if((i+1)%batchSize==0) {
|
|
wait(delay(0));
|
|
}
|
|
}
|
|
return Void();
|
|
}
|
|
|
|
ACTOR template <class T> Future<Void> incrementalBroadcast( Future<T> input, std::vector<ReplyPromise<T>> output, int batchSize) {
|
|
state T value = wait( input );
|
|
state int i = 0;
|
|
for(; i<output.size(); i++) {
|
|
output[i].send(value);
|
|
if((i+1)%batchSize==0) {
|
|
wait(delay(0));
|
|
}
|
|
}
|
|
return Void();
|
|
}
|
|
|
|
// Needed for the call to endpointNotFound()
|
|
#include "fdbrpc/FailureMonitor.h"
|
|
|
|
struct PeerHolder {
|
|
Reference<Peer> peer;
|
|
explicit PeerHolder(Reference<Peer> peer) : peer(peer) {
|
|
if(peer) {
|
|
peer->outstandingReplies++;
|
|
}
|
|
}
|
|
~PeerHolder() {
|
|
if(peer) {
|
|
peer->outstandingReplies--;
|
|
}
|
|
}
|
|
};
|
|
|
|
// Implements tryGetReply, getReplyUnlessFailedFor
|
|
ACTOR template <class X>
|
|
Future<ErrorOr<X>> waitValueOrSignal( Future<X> value, Future<Void> signal, Endpoint endpoint, ReplyPromise<X> holdme = ReplyPromise<X>(), Reference<Peer> peer = Reference<Peer>() ) {
|
|
state PeerHolder holder = PeerHolder(peer);
|
|
loop {
|
|
try {
|
|
choose {
|
|
when ( X x = wait(value) ) {
|
|
return x;
|
|
}
|
|
when ( wait(signal) ) {
|
|
return ErrorOr<X>(request_maybe_delivered());
|
|
}
|
|
}
|
|
} catch (Error& e) {
|
|
if (signal.isError()) {
|
|
TraceEvent(SevError, "WaitValueOrSignalError").error(signal.getError());
|
|
return ErrorOr<X>(internal_error());
|
|
}
|
|
|
|
if( e.code() == error_code_actor_cancelled )
|
|
throw e;
|
|
|
|
// broken_promise error normally means an endpoint failure, which in tryGetReply has the same semantics as receiving the failure signal
|
|
if (e.code() != error_code_broken_promise || signal.isError())
|
|
return ErrorOr<X>(e);
|
|
|
|
IFailureMonitor::failureMonitor().endpointNotFound( endpoint );
|
|
value = Never();
|
|
}
|
|
}
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<T> sendCanceler( ReplyPromise<T> reply, ReliablePacket* send, Endpoint endpoint ) {
|
|
try {
|
|
T t = wait( reply.getFuture() );
|
|
FlowTransport::transport().cancelReliable(send);
|
|
return t;
|
|
} catch (Error& e) {
|
|
FlowTransport::transport().cancelReliable(send);
|
|
if (e.code() == error_code_broken_promise) {
|
|
IFailureMonitor::failureMonitor().endpointNotFound( endpoint );
|
|
}
|
|
throw;
|
|
}
|
|
}
|
|
|
|
ACTOR template <class X>
|
|
Future<X> reportEndpointFailure( Future<X> value, Endpoint endpoint ) {
|
|
try {
|
|
X x = wait(value);
|
|
return x;
|
|
} catch (Error& e) {
|
|
if (e.code() == error_code_broken_promise) {
|
|
IFailureMonitor::failureMonitor().endpointNotFound( endpoint );
|
|
}
|
|
throw;
|
|
}
|
|
}
|
|
|
|
Future<Void> disableConnectionFailuresAfter( double const& time, std::string const& context );
|
|
|
|
#include "flow/unactorcompiler.h"
|
|
|
|
#endif
|