foundationdb/flow/genericactors.actor.h

1817 lines
47 KiB
C
Raw Normal View History

2017-05-26 04:48:44 +08:00
/*
* genericactors.actor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
2017-05-26 04:48:44 +08:00
* http://www.apache.org/licenses/LICENSE-2.0
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source version.
#if defined(NO_INTELLISENSE) && !defined(FLOW_GENERICACTORS_ACTOR_G_H)
#define FLOW_GENERICACTORS_ACTOR_G_H
#include "flow/genericactors.actor.g.h"
2017-05-26 04:48:44 +08:00
#elif !defined(GENERICACTORS_ACTOR_H)
#define GENERICACTORS_ACTOR_H
#include <list>
#include "flow/flow.h"
#include "flow/Knobs.h"
#include "flow/Util.h"
#include "flow/IndexedSet.h"
#include "flow/actorcompiler.h" // This must be the last #include.
2017-05-26 04:48:44 +08:00
#pragma warning( disable: 4355 ) // 'this' : used in base member initializer list
ACTOR template<class T, class X>
Future<T> traceAfter(Future<T> what, const char* type, const char* key, X value, bool traceErrors = false)
{
try {
T val = wait(what);
TraceEvent(type).detail(key, value);
return val;
} catch( Error &e ) {
if(traceErrors) TraceEvent(type).error(e,true).detail(key, value);
2017-05-26 04:48:44 +08:00
throw;
}
}
ACTOR template<class T, class X>
Future<T> traceAfterCall(Future<T> what, const char* type, const char* key, X func, bool traceErrors = false)
{
try {
state T val = wait(what);
try {
TraceEvent(type).detail(key, func(val));
} catch( Error &e ) {
TraceEvent(SevError, "TraceAfterCallError").error(e);
}
return val;
} catch( Error &e ) {
if(traceErrors) TraceEvent(type).error(e,true);
throw;
}
}
ACTOR template <class T>
Future<Optional<T>> stopAfter( Future<T> what ) {
state Optional<T> ret = T();
try {
T _ = wait(what);
ret = Optional<T>(_);
} catch (Error& e) {
bool ok = e.code() == error_code_please_reboot || e.code() == error_code_please_reboot_delete || e.code() == error_code_actor_cancelled;
2017-05-26 04:48:44 +08:00
TraceEvent(ok ? SevInfo : SevError, "StopAfterError").error(e);
if(!ok) {
fprintf(stderr, "Fatal Error: %s\n", e.what());
ret = Optional<T>();
}
}
g_network->stop();
return ret;
}
template <class T>
T sorted(T range) {
std::sort(range.begin(), range.end());
return range;
}
template <class T>
ErrorOr<T> errorOr( T t ) {
return ErrorOr<T>(t);
}
ACTOR template <class T>
Future<ErrorOr<T>> errorOr( Future<T> f ) {
try {
T t = wait(f);
return ErrorOr<T>(t);
} catch (Error& e) {
return ErrorOr<T>(e);
}
}
ACTOR template <class T>
Future<T> throwErrorOr( Future<ErrorOr<T>> f ) {
ErrorOr<T> t = wait(f);
if(t.isError()) throw t.getError();
return t.get();
}
ACTOR template <class T>
Future<T> transformErrors( Future<T> f, Error err ) {
try {
T t = wait( f );
return t;
} catch( Error &e ) {
if( e.code() == error_code_actor_cancelled )
throw e;
throw err;
}
}
ACTOR template <class T>
Future<T> transformError( Future<T> f, Error inErr, Error outErr ) {
try {
T t = wait( f );
return t;
} catch( Error &e ) {
if( e.code() == inErr.code() )
throw outErr;
throw e;
}
}
2017-05-26 04:48:44 +08:00
// Note that the RequestStream<T> version of forwardPromise doesn't exist, because what to do with errors?
ACTOR template <class T>
void forwardEvent( Event* ev, Future<T> input ) {
try {
T value = wait(input);
} catch (Error&) {
}
ev->set();
}
ACTOR template <class T>
void forwardEvent( Event* ev, T* t, Error* err, FutureStream<T> input ) {
try {
T value = waitNext(input);
*t = std::move(value);
ev->set();
} catch (Error& e) {
*err = e;
ev->set();
}
}
ACTOR template <class T>
Future<Void> waitForAllReady( std::vector<Future<T>> results ) {
state int i = 0;
loop {
if (i == results.size()) return Void();
try {
2019-04-17 06:44:55 +08:00
wait(success(results[i]));
2017-05-26 04:48:44 +08:00
} catch (...) {
}
i++;
}
}
ACTOR template <class T>
Future<T> timeout( Future<T> what, double time, T timedoutValue, TaskPriority taskID = TaskPriority::DefaultDelay ) {
2017-05-26 04:48:44 +08:00
Future<Void> end = delay( time, taskID );
choose {
when( T t = wait( what ) ) { return t; }
when( wait( end ) ) { return timedoutValue; }
2017-05-26 04:48:44 +08:00
}
}
ACTOR template <class T>
Future<Optional<T>> timeout( Future<T> what, double time ) {
Future<Void> end = delay( time );
choose {
when( T t = wait( what ) ) { return t; }
when( wait( end ) ) { return Optional<T>(); }
2017-05-26 04:48:44 +08:00
}
}
ACTOR template <class T>
Future<T> timeoutError( Future<T> what, double time, TaskPriority taskID = TaskPriority::DefaultDelay ) {
2017-05-26 04:48:44 +08:00
Future<Void> end = delay( time, taskID );
choose {
when( T t = wait( what ) ) { return t; }
when( wait( end ) ) { throw timed_out(); }
2017-05-26 04:48:44 +08:00
}
}
ACTOR template <class T>
Future<T> delayed( Future<T> what, double time = 0.0, TaskPriority taskID = TaskPriority::DefaultDelay ) {
2017-05-26 04:48:44 +08:00
try {
state T t = wait( what );
wait( delay( time, taskID ) );
2017-05-26 04:48:44 +08:00
return t;
} catch( Error &e ) {
state Error err = e;
wait( delay( time, taskID ) );
2017-05-26 04:48:44 +08:00
throw err;
}
}
ACTOR template<class Func>
Future<Void> recurring( Func what, double interval, TaskPriority taskID = TaskPriority::DefaultDelay ) {
2017-05-26 04:48:44 +08:00
loop choose {
when ( wait( delay( interval, taskID ) ) ) { what(); }
2017-05-26 04:48:44 +08:00
}
}
ACTOR template<class Func>
Future<Void> trigger( Func what, Future<Void> signal ) {
wait( signal );
2017-05-26 04:48:44 +08:00
what();
return Void();
}
ACTOR template<class Func>
Future<Void> triggerOnError( Func what, Future<Void> signal ) {
try {
wait( signal );
2017-05-26 04:48:44 +08:00
}
catch(Error &e) {
what();
}
return Void();
}
//Waits for a future to complete and cannot be cancelled
//Most situations will use the overload below, which does not require a promise
ACTOR template<class T>
void uncancellable(Future<T> what, Promise<T> result)
{
try {
T val = wait(what);
result.send(val);
} catch( Error &e ) {
result.sendError(e);
}
2017-05-26 04:48:44 +08:00
}
2019-08-10 07:51:34 +08:00
// Waits for a future to complete and cannot be cancelled
ACTOR template <class T>
[[flow_allow_discard]] Future<T> uncancellable(Future<T> what) {
2017-05-26 04:48:44 +08:00
Promise<T> resultPromise;
Future<T> result = resultPromise.getFuture();
uncancellable(what, resultPromise);
T val = wait(result);
return val;
}
//Holds onto an object until a future either completes or is cancelled
//Used to prevent the object from being reclaimed
//
// NOTE: the order of the arguments is important. The arguments will be destructed in
// reverse order, and we need the object to be destructed last.
ACTOR template<class T, class X>
Future<T> holdWhile(X object, Future<T> what)
{
T val = wait(what);
return val;
}
ACTOR template<class T, class X>
Future<Void> holdWhileVoid(X object, Future<T> what)
{
T val = wait(what);
return Void();
}
// Assign the future value of what to out
template<class T>
Future<Void> store(T &out, Future<T> what) {
return map(what, [&out](T const &v) { out = v; return Void(); });
}
template<class T>
Future<Void> storeOrThrow(T &out, Future<Optional<T>> what, Error e = key_not_found()) {
return map(what, [&out,e](Optional<T> const &o) {
if(!o.present())
throw e;
out = o.get();
return Void();
});
}
2017-05-26 04:48:44 +08:00
//Waits for a future to be ready, and then applies an asynchronous function to it.
ACTOR template<class T, class F, class U = decltype( fake<F>()(fake<T>()).getValue() )>
Future<U> mapAsync(Future<T> what, F actorFunc)
{
T val = wait(what);
U ret = wait(actorFunc(val));
return ret;
}
//maps a vector of futures with an asynchronous function
template<class T, class F>
std::vector<Future<std::invoke_result_t<F, T>>> mapAsync(std::vector<Future<T>> const& what, F const& actorFunc)
2017-05-26 04:48:44 +08:00
{
std::vector<std::invoke_result_t<F, T>> ret;
2020-04-25 03:21:35 +08:00
ret.reserve(what.size());
for (const auto& f : what) ret.push_back(mapAsync(f, actorFunc));
2017-05-26 04:48:44 +08:00
return ret;
}
//maps a stream with an asynchronous function
ACTOR template<class T, class F, class U = decltype( fake<F>()(fake<T>()).getValue() )>
Future<Void> mapAsync( FutureStream<T> input, F actorFunc, PromiseStream<U> output ) {
state Deque<Future<U>> futures;
loop {
try {
choose {
when( T nextInput = waitNext( input ) ) {
futures.push_back( actorFunc(nextInput) );
}
when( U nextOutput = wait( futures.size() == 0 ? Never() : futures.front() ) ) {
output.send( nextOutput );
futures.pop_front();
}
}
}
catch ( Error& e ) {
if( e.code() == error_code_end_of_stream ) {
break;
}
else {
output.sendError( e );
throw e;
}
}
}
while(futures.size()) {
U nextOutput = wait( futures.front() );
output.send( nextOutput );
futures.pop_front();
}
output.sendError(end_of_stream());
return Void();
}
//Waits for a future to be ready, and then applies a function to it.
ACTOR template<class T, class F>
Future<std::invoke_result_t<F, T>> map(Future<T> what, F func)
2017-05-26 04:48:44 +08:00
{
T val = wait(what);
return func(val);
}
//maps a vector of futures
template<class T, class F>
2019-08-02 00:56:49 +08:00
std::vector<Future<std::invoke_result_t<F, T>>> map(std::vector<Future<T>> const& what, F const& func)
2017-05-26 04:48:44 +08:00
{
2019-08-02 00:56:49 +08:00
std::vector<Future<std::invoke_result_t<F, T>>> ret;
2020-04-25 03:21:35 +08:00
ret.reserve(what.size());
for (const auto& f : what) ret.push_back(map(f, func));
2017-05-26 04:48:44 +08:00
return ret;
}
//maps a stream
ACTOR template<class T, class F>
2019-08-02 00:56:49 +08:00
Future<Void> map( FutureStream<T> input, F func, PromiseStream<std::invoke_result_t<F, T>> output )
2017-05-26 04:48:44 +08:00
{
loop {
try {
T nextInput = waitNext( input );
output.send(func(nextInput));
}
catch ( Error& e ) {
if( e.code() == error_code_end_of_stream ) {
break;
}
else
throw;
}
}
output.sendError(end_of_stream());
return Void();
}
//Returns if the future returns true, otherwise waits forever.
2019-06-21 00:29:01 +08:00
ACTOR Future<Void> returnIfTrue( Future<bool> f );
2017-05-26 04:48:44 +08:00
//Returns if the future, when waited on and then evaluated with the predicate, returns true, otherwise waits forever
template<class T, class F>
Future<Void> returnIfTrue( Future<T> what, F pred)
{
return returnIfTrue( map( what, pred ) );
}
//filters a stream
ACTOR template<class T, class F>
Future<Void> filter( FutureStream<T> input, F pred, PromiseStream<T> output )
{
loop {
try {
T nextInput = waitNext( input );
if(func(nextInput))
output.send(nextInput);
}
catch ( Error& e ) {
if( e.code() == error_code_end_of_stream ) {
break;
}
else
throw;
}
}
output.sendError(end_of_stream());
return Void();
}
//filters a stream asynchronously
ACTOR template<class T, class F>
Future<Void> asyncFilter( FutureStream<T> input, F actorPred, PromiseStream<T> output )
{
state Deque<std::pair<T, Future<bool>>> futures;
state std::pair<T, Future<bool>> p;
loop {
try {
choose {
when ( T nextInput = waitNext(input) ) {
futures.push_back( std::pair<T, Future<bool>>(nextInput, actorPred(nextInput)) );
}
when ( bool pass = wait( futures.size() == 0 ? Never() : futures.front().second ) ) {
if(pass) output.send(futures.front().first);
futures.pop_front();
}
}
}
catch ( Error& e ) {
if( e.code() == error_code_end_of_stream ) {
break;
}
else {
throw e;
}
}
}
while(futures.size()) {
p = futures.front();
bool pass = wait( p.second );
if(pass) output.send(p.first);
futures.pop_front();
}
output.sendError(end_of_stream());
return Void();
}
template <class T>
struct WorkerCache {
// SOMEDAY: Would we do better to use "unreliable" (at most once) transport for the initialize requests and get rid of this?
// It doesn't provide true at most once behavior because things are removed from the cache after they have terminated.
bool exists( UID id ) {
return id_interface.count( id ) != 0;
}
void set( UID id, const Future<T>& onReady ) {
ASSERT( !exists( id ) );
id_interface[ id ] = onReady;
}
Future<T> get( UID id ) {
ASSERT( exists( id ) );
return id_interface[ id ];
}
Future<Void> removeOnReady( UID id, Future<Void> const& ready ) {
return removeOnReady( this, id, ready );
}
private:
ACTOR static Future<Void> removeOnReady( WorkerCache* self, UID id, Future<Void> ready ) {
try {
wait(ready);
2017-05-26 04:48:44 +08:00
self->id_interface.erase(id);
return Void();
} catch ( Error &e ) {
self->id_interface.erase(id);
throw;
}
}
std::map<UID, Future<T>> id_interface;
};
template <class K, class V>
class AsyncMap : NonCopyable {
public:
// Represents a complete function from keys to values (K -> V)
// All values not explicitly inserted map to V()
// If this isn't appropriate, use V=Optional<X>
AsyncMap() : defaultValue(), destructing(false) {}
virtual ~AsyncMap() {
destructing = true;
items.clear();
}
void set( K const& k, V const& v ) {
auto& i = items[k];
if (i.value != v)
setUnconditional(k,v,i);
}
void setUnconditional( K const& k, V const& v ) {
setUnconditional(k,v,items[k]);
}
void triggerAll() {
std::vector<Promise<Void>> ps;
for(auto it = items.begin(); it != items.end(); ++it){
ps.resize(ps.size()+1);
ps.back().swap( it->second.change );
}
std::vector<Promise<Void>> noDestroy = ps; // See explanation of noDestroy in setUnconditional()
for(auto p=ps.begin(); p!=ps.end(); ++p)
p->send(Void());
}
void triggerRange( K const& begin, K const& end ) {
std::vector<Promise<Void>> ps;
for(auto it = items.lower_bound(begin); it != items.end() && it->first < end; ++it){
ps.resize(ps.size()+1);
ps.back().swap( it->second.change );
}
std::vector<Promise<Void>> noDestroy = ps; // See explanation of noDestroy in setUnconditional()
for(auto p=ps.begin(); p!=ps.end(); ++p)
p->send(Void());
}
void trigger( K const& key ) {
if( items.count(key) != 0 ) {
auto& i = items[key];
Promise<Void> trigger;
i.change.swap(trigger);
Promise<Void> noDestroy = trigger; // See explanation of noDestroy in setUnconditional()
if (i.value == defaultValue)
items.erase(key);
trigger.send(Void());
}
}
void clear( K const& k ) { set(k, V()); }
V const& get( K const& k ) {
auto it = items.find(k);
if (it != items.end())
return it->second.value;
else
return defaultValue;
}
int count( K const& k ) {
auto it = items.find(k);
if (it != items.end())
return 1;
return 0;
}
virtual Future<Void> onChange( K const& k ) { // throws broken_promise if this is destroyed
auto &item = items[k];
if (item.value == defaultValue)
return destroyOnCancel( this, k, item.change.getFuture() );
return item.change.getFuture();
}
std::vector<K> getKeys() {
std::vector<K> keys;
2020-04-25 03:21:35 +08:00
keys.reserve(items.size());
2017-05-26 04:48:44 +08:00
for(auto i = items.begin(); i != items.end(); ++i)
keys.push_back( i->first );
return keys;
}
void resetNoWaiting() {
for(auto i = items.begin(); i != items.end(); ++i)
ASSERT( i->second.change.getFuture().getFutureReferenceCount() == 1 );
items.clear();
}
protected:
// Invariant: Every item in the map either has value!=defaultValue xor a destroyOnCancel actor waiting on change.getFuture()
struct P {
V value;
Promise<Void> change;
P() : value() {}
};
std::map<K, P> items;
const V defaultValue;
bool destructing;
void setUnconditional( K const& k, V const& v, P& i ) {
Promise<Void> trigger;
i.change.swap(trigger);
Promise<Void> noDestroy = trigger; // The send(Void()) or even V::operator= could cause destroyOnCancel,
// which could undo the change to i.value here. Keeping the promise reference count >= 2
// prevents destroyOnCancel from erasing anything from the map.
if (v == defaultValue)
items.erase(k);
else
i.value = v;
trigger.send(Void());
}
ACTOR Future<Void> destroyOnCancel( AsyncMap* self, K key, Future<Void> change ) {
try {
wait(change);
2017-05-26 04:48:44 +08:00
return Void();
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled && !self->destructing && change.getFutureReferenceCount()==1 && change.getPromiseReferenceCount()==1) {
if(EXPENSIVE_VALIDATION) {
auto& p = self->items[key];
ASSERT(p.change.getFuture() == change);
}
self->items.erase(key);
}
throw;
}
}
};
template <class V>
class ReferencedObject : NonCopyable, public ReferenceCounted<ReferencedObject<V>> {
public:
ReferencedObject() : value() {}
ReferencedObject(V const& v) : value(v) {}
2020-03-06 09:25:33 +08:00
ReferencedObject(V&& v) : value(std::move(v)) {}
ReferencedObject(ReferencedObject&& r) : value(std::move(r.value)) {}
void operator=(ReferencedObject&& r) {
value = std::move(r.value);
}
V const& get() const {
return value;
}
2020-03-06 09:25:33 +08:00
V& mutate() {
return value;
}
void set(V const& v) {
value = v;
}
2020-03-06 09:25:33 +08:00
void set(V&& v) {
value = std::move(v);
}
static Reference<ReferencedObject<V>> from(V const& v) {
return Reference<ReferencedObject<V>>(new ReferencedObject<V>(v));
}
2020-03-06 09:25:33 +08:00
static Reference<ReferencedObject<V>> from(V&& v) {
return Reference<ReferencedObject<V>>(new ReferencedObject<V>(std::move(v)));
}
private:
V value;
};
2017-05-26 04:48:44 +08:00
template <class V>
class AsyncVar : NonCopyable, public ReferenceCounted<AsyncVar<V>> {
public:
AsyncVar() : value() {}
AsyncVar( V const& v ) : value(v) {}
AsyncVar(AsyncVar&& av) : value(std::move(av.value)), nextChange(std::move(av.nextChange)) {}
void operator=(AsyncVar&& av) { value = std::move(av.value); nextChange = std::move(av.nextChange); }
V const& get() const {
return value;
}
Future<Void> onChange() const {
return nextChange.getFuture();
}
void set( V const& v ) {
if (v != value)
setUnconditional(v);
}
void setUnconditional( V const& v ) {
2018-07-17 06:56:43 +08:00
Promise<Void> t;
this->nextChange.swap(t);
2017-05-26 04:48:44 +08:00
this->value = v;
2018-07-17 06:56:43 +08:00
t.send(Void());
2017-05-26 04:48:44 +08:00
}
void trigger() {
2018-07-17 06:56:43 +08:00
Promise<Void> t;
this->nextChange.swap(t);
t.send(Void());
}
2017-05-26 04:48:44 +08:00
private:
V value;
Promise<Void> nextChange;
};
class AsyncTrigger : NonCopyable {
public:
AsyncTrigger() {}
AsyncTrigger(AsyncTrigger&& at) : v(std::move(at.v)) {}
void operator=(AsyncTrigger&& at) { v = std::move(at.v); }
Future<Void> onTrigger() {
return v.onChange();
}
void trigger() {
2018-07-17 06:56:43 +08:00
v.trigger();
2017-05-26 04:48:44 +08:00
}
private:
AsyncVar<Void> v;
};
class Debouncer : NonCopyable {
public:
explicit Debouncer( double delay ) { worker = debounceWorker(this, delay); }
Debouncer(Debouncer&& at) : input(std::move(at.input)), output(std::move(at.output)) {}
void operator=(Debouncer&& at) { input = std::move(at.input); output = std::move(at.output); }
Future<Void> onTrigger() {
return output.onChange();
}
void trigger() {
input.setUnconditional(Void());
}
private:
AsyncVar<Void> input;
AsyncVar<Void> output;
Future<Void> worker;
ACTOR Future<Void> debounceWorker( Debouncer* self, double bounceTime ) {
loop {
wait( self->input.onChange() );
2017-05-26 04:48:44 +08:00
loop {
choose {
when(wait( self->input.onChange() )) {}
when(wait( delay(bounceTime) )) { break; }
2017-05-26 04:48:44 +08:00
}
}
self->output.setUnconditional(Void());
}
}
};
2020-02-13 02:41:52 +08:00
ACTOR template <class T>
Future<Void> asyncDeserialize(Reference<AsyncVar<Standalone<StringRef>>> input,
Reference<AsyncVar<Optional<T>>> output) {
2017-05-26 04:48:44 +08:00
loop {
2019-04-12 04:24:00 +08:00
if (input->get().size()) {
2020-02-13 02:41:52 +08:00
ObjectReader reader(input->get().begin(), IncludeVersion());
T res;
reader.deserialize(res);
output->set(res);
2019-04-12 04:24:00 +08:00
} else
2017-05-26 04:48:44 +08:00
output->set( Optional<T>() );
wait( input->onChange() );
2017-05-26 04:48:44 +08:00
}
}
ACTOR template <class V, class T>
void forwardVector( Future<V> values, std::vector<Promise<T>> out ) {
V in = wait( values );
ASSERT (in.size() == out.size());
for(int i=0; i<out.size(); i++)
out[i].send( in[i] );
}
ACTOR template <class T>
Future<Void> delayedAsyncVar(Reference<AsyncVar<T>> in, Reference<AsyncVar<T>> out, double time) {
try {
loop {
wait( delay( time ) );
out->set( in->get() );
wait( in->onChange() );
}
} catch (Error& e) {
out->set( in->get() );
throw;
}
}
ACTOR template <class T>
Future<Void> setAfter(Reference<AsyncVar<T>> var, double time, T val) {
wait( delay( time ) );
var->set( val );
return Void();
}
ACTOR template <class T>
Future<Void> resetAfter( Reference<AsyncVar<T>> var, double time, T val, int warningLimit = -1, double warningResetDelay = 0, const char* context = NULL ) {
state bool isEqual = var->get() == val;
state Future<Void> resetDelay = isEqual ? Never() : delay(time);
state int resetCount = 0;
state double lastReset = now();
loop {
choose {
2019-03-11 13:56:12 +08:00
when( wait( resetDelay ) ) {
var->set( val );
if(now() - lastReset > warningResetDelay) {
resetCount = 0;
}
resetCount++;
if(context && warningLimit >= 0 && resetCount > warningLimit) {
TraceEvent(SevWarnAlways, context).detail("ResetCount", resetCount).detail("LastReset", now() - lastReset);
}
lastReset = now();
2019-03-13 05:38:54 +08:00
isEqual = true;
resetDelay = Never();
}
when( wait( var->onChange() ) ) {}
}
if( isEqual && var->get() != val ) {
isEqual = false;
resetDelay = delay(time);
}
if( !isEqual && var->get() == val ) {
isEqual = true;
resetDelay = Never();
}
}
}
2019-02-01 02:10:41 +08:00
ACTOR template <class T>
Future<Void> setWhenDoneOrError( Future<Void> condition, Reference<AsyncVar<T>> var, T val ) {
try {
wait( condition );
}
catch ( Error& e ) {
if (e.code() == error_code_actor_cancelled) throw;
}
2019-02-01 02:10:41 +08:00
var->set( val );
return Void();
}
2017-05-26 04:48:44 +08:00
Future<bool> allTrue( const std::vector<Future<bool>>& all );
Future<Void> anyTrue( std::vector<Reference<AsyncVar<bool>>> const& input, Reference<AsyncVar<bool>> const& output );
2017-05-26 04:48:44 +08:00
Future<Void> cancelOnly( std::vector<Future<Void>> const& futures );
Future<Void> timeoutWarningCollector( FutureStream<Void> const& input, double const& logDelay, const char* const& context, UID const& id );
Future<bool> quorumEqualsTrue( std::vector<Future<bool>> const& futures, int const& required );
Future<Void> lowPriorityDelay( double const& waitTime );
2017-05-26 04:48:44 +08:00
ACTOR template <class T>
Future<T> ioTimeoutError( Future<T> what, double time ) {
Future<Void> end = lowPriorityDelay( time );
choose {
when( T t = wait( what ) ) { return t; }
when( wait( end ) ) {
Error err = io_timeout();
if(g_network->isSimulated()) {
err = err.asInjectedFault();
}
TraceEvent(SevError, "IoTimeoutError").error(err);
throw err;
}
}
}
2017-05-26 04:48:44 +08:00
ACTOR template <class T>
Future<Void> streamHelper( PromiseStream<T> output, PromiseStream<Error> errors, Future<T> input ) {
try {
T value = wait(input);
output.send(value);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled)
throw;
errors.send(e);
}
return Void();
}
template <class T>
Future<Void> makeStream( const std::vector<Future<T>>& futures, PromiseStream<T>& stream, PromiseStream<Error>& errors ) {
std::vector<Future<Void>> forwarders;
2020-04-25 03:21:35 +08:00
forwarders.reserve(futures.size());
2017-05-26 04:48:44 +08:00
for(int f=0; f<futures.size(); f++)
forwarders.push_back( streamHelper( stream, errors, futures[f] ) );
return cancelOnly(forwarders);
}
template <class T>
class QuorumCallback;
template <class T>
struct Quorum : SAV<Void> {
int antiQuorum;
int count;
static inline int sizeFor(int count) {
return sizeof(Quorum<T>) + sizeof(QuorumCallback<T>)*count;
}
virtual void destroy() {
int size = sizeFor(this->count);
this->~Quorum();
freeFast(size, this);
}
virtual void cancel() {
int cancelled_callbacks = 0;
for (int i = 0; i < count; i++)
if (callbacks()[i].next) {
callbacks()[i].remove();
callbacks()[i].next = 0;
++cancelled_callbacks;
}
if (canBeSet())
sendError(actor_cancelled());
for (int i = 0; i < cancelled_callbacks; i++)
delPromiseRef();
}
explicit Quorum(int quorum, int count) : SAV<Void>(1, count), antiQuorum(count - quorum + 1), count(count) {
if (!quorum) this->send(Void());
}
void oneSuccess() {
if (getPromiseReferenceCount() == antiQuorum && canBeSet())
this->sendAndDelPromiseRef(Void());
else
delPromiseRef();
}
void oneError(Error err) {
if (canBeSet())
this->sendErrorAndDelPromiseRef(err);
else
delPromiseRef();
}
QuorumCallback<T>* callbacks() { return (QuorumCallback<T>*)(this + 1); }
};
template <class T>
class QuorumCallback : public Callback<T> {
public:
virtual void fire(const T& value) {
Callback<T>::remove();
Callback<T>::next = 0;
head->oneSuccess();
}
virtual void error(Error error) {
Callback<T>::remove();
Callback<T>::next = 0;
head->oneError(error);
}
private:
template <class U>
friend Future<Void> quorum(std::vector<Future<U>> const& results, int n);
2017-05-26 04:48:44 +08:00
Quorum<T>* head;
QuorumCallback() = default;
QuorumCallback(Future<T> future, Quorum<T>* head) : head(head) { future.addCallbackAndClear(this); }
2017-05-26 04:48:44 +08:00
};
template <class T>
Future<Void> quorum(std::vector<Future<T>> const& results, int n) {
ASSERT(n >= 0 && n <= results.size());
int size = Quorum<T>::sizeFor(results.size());
Quorum<T>* q = new (allocateFast(size)) Quorum<T>(n, results.size());
QuorumCallback<T>* nextCallback = q->callbacks();
for (auto& r : results) {
2017-05-26 04:48:44 +08:00
if (r.isReady()) {
new (nextCallback) QuorumCallback<T>();
2017-05-26 04:48:44 +08:00
nextCallback->next = 0;
if (r.isError())
q->oneError(r.getError());
else
q->oneSuccess();
} else
2017-05-26 04:48:44 +08:00
new (nextCallback) QuorumCallback<T>(r, q);
++nextCallback;
}
return Future<Void>(q);
}
ACTOR template <class T>
Future<Void> smartQuorum( std::vector<Future<T>> results, int required, double extraSeconds, TaskPriority taskID = TaskPriority::DefaultDelay ) {
if (results.empty() && required == 0) return Void();
wait(quorum(results, required));
2017-05-26 04:48:44 +08:00
choose {
when (wait(quorum(results, (int)results.size()))) {return Void();}
when (wait(delay(extraSeconds, taskID))) {return Void(); }
2017-05-26 04:48:44 +08:00
}
}
template <class T>
Future<Void> waitForAll( std::vector<Future<T>> const& results ) {
if (results.empty()) return Void();
return quorum( results, (int)results.size() );
}
template <class T>
Future<Void> waitForAny( std::vector<Future<T>> const& results ) {
if (results.empty()) return Void();
return quorum( results, 1 );
}
2019-06-21 00:29:01 +08:00
ACTOR Future<bool> shortCircuitAny( std::vector<Future<bool>> f );
2017-05-26 04:48:44 +08:00
ACTOR template <class T>
Future<std::vector<T>> getAll( std::vector<Future<T>> input ) {
if (input.empty()) return std::vector<T>();
wait( quorum( input, input.size() ) );
2017-05-26 04:48:44 +08:00
std::vector<T> output;
2020-04-25 03:21:35 +08:00
output.reserve(input.size());
2017-05-26 04:48:44 +08:00
for(int i=0; i<input.size(); i++)
output.push_back( input[i].get() );
return output;
}
ACTOR template <class T>
Future<std::vector<T>> appendAll( std::vector<Future<std::vector<T>>> input ) {
wait( quorum( input, input.size() ) );
2017-05-26 04:48:44 +08:00
std::vector<T> output;
2020-04-25 03:21:35 +08:00
size_t sz = 0;
for (const auto& f : input) {
sz += f.get().size();
}
output.reserve(sz);
2017-05-26 04:48:44 +08:00
for(int i=0; i<input.size(); i++) {
auto const& r = input[i].get();
output.insert( output.end(), r.begin(), r.end() );
}
return output;
}
ACTOR template <class T> Future<Void> onEqual( Future<T> in, T equalTo ) {
T t = wait(in);
if ( t == equalTo )
return Void();
wait(Never()); // never return
2017-05-26 04:48:44 +08:00
throw internal_error(); // does not happen
}
ACTOR template <class T>
Future<Void> success( Future<T> of ) {
T t = wait( of );
(void)t;
2017-05-26 04:48:44 +08:00
return Void();
}
ACTOR template <class T>
Future<Void> ready( Future<T> f ) {
try {
2019-04-17 06:29:03 +08:00
wait(success(f));
2017-05-26 04:48:44 +08:00
} catch (...) {
}
return Void();
}
ACTOR template <class T>
Future<T> waitAndForward( FutureStream<T> input ) {
T output = waitNext( input );
return output;
}
ACTOR template <class T>
Future<T> reportErrorsExcept( Future<T> in, const char* context, UID id, std::set<int> const* pExceptErrors ) {
try {
T t = wait( in );
return t;
} catch (Error& e) {
if (e.code() != error_code_actor_cancelled && (!pExceptErrors || !pExceptErrors->count(e.code())))
TraceEvent(SevError, context, id).error(e);
throw;
}
}
template <class T>
Future<T> reportErrors( Future<T> const& in, const char* context, UID id = UID() ) {
return reportErrorsExcept(in, context, id, NULL);
}
ACTOR template <class T>
Future<T> require( Future<Optional<T>> in, int errorCode ) {
Optional<T> o = wait(in);
if (o.present()) {
return o.get();
}
else {
throw Error(errorCode);
}
}
ACTOR template <class T>
Future<T> waitForFirst( std::vector<Future<T>> items ) {
state PromiseStream<T> resultStream;
state PromiseStream<Error> errorStream;
state Future<Void> forCancellation = makeStream( items, resultStream, errorStream );
state FutureStream<T> resultFutureStream = resultStream.getFuture();
state FutureStream<Error> errorFutureStream = errorStream.getFuture();
choose {
when (T val = waitNext( resultFutureStream )) {
forCancellation = Future<Void>();
return val;
}
when (Error e = waitNext( errorFutureStream )) {
forCancellation = Future<Void>();
throw e;
}
}
}
ACTOR template <class T>
Future<T> tag( Future<Void> future, T what ) {
wait(future);
2017-05-26 04:48:44 +08:00
return what;
}
ACTOR template <class T>
Future<Void> tag( Future<Void> future, T what, PromiseStream<T> stream ) {
wait( future );
2017-05-26 04:48:44 +08:00
stream.send( what );
return Void();
}
ACTOR template <class T>
Future<T> tagError( Future<Void> future, Error e) {
wait(future);
2017-05-26 04:48:44 +08:00
throw e;
}
//If the future is ready, yields and returns. Otherwise, returns when future is set.
template <class T>
Future<T> orYield( Future<T> f ) {
if(f.isReady()) {
if(f.isError())
return tagError<T>(yield(), f.getError());
else
return tag(yield(), f.get());
}
else
return f;
}
2019-06-21 00:29:01 +08:00
Future<Void> orYield( Future<Void> f );
2017-05-26 04:48:44 +08:00
ACTOR template <class T> Future<T> chooseActor( Future<T> lhs, Future<T> rhs ) {
choose {
when ( T t = wait(lhs) ) { return t; }
when ( T t = wait(rhs) ) { return t; }
}
}
// set && set -> set
// error && x -> error
// all others -> unset
2019-06-21 00:29:01 +08:00
inline Future<Void> operator &&( Future<Void> const& lhs, Future<Void> const& rhs ) {
2017-05-26 04:48:44 +08:00
if(lhs.isReady()) {
if(lhs.isError()) return lhs;
else return rhs;
}
if(rhs.isReady()) {
if(rhs.isError()) return rhs;
else return lhs;
}
2020-04-25 03:21:35 +08:00
return waitForAll(std::vector<Future<Void>>{ lhs, rhs });
2017-05-26 04:48:44 +08:00
}
// error || unset -> error
// unset || unset -> unset
// all others -> set
inline Future<Void> operator ||( Future<Void> const& lhs, Future<Void> const& rhs ) {
if(lhs.isReady()) {
if(lhs.isError()) return lhs;
if(rhs.isReady()) return rhs;
return lhs;
}
return chooseActor( lhs, rhs );
}
ACTOR template <class T> Future<T> brokenPromiseToNever( Future<T> in ) {
try {
T t = wait(in);
return t;
} catch (Error& e) {
if (e.code() != error_code_broken_promise)
Fix VersionStamp problems by instead adding a COMMIT_ON_FIRST_PROXY transaction option. Simulation identified the fact that we can violate the VersionStamps-are-always-increasing promise via the following series of events: 1. On proxy 0, dumpData adds commit requests to proxy 0's commit promise stream 2. To any proxy, a client submits the first transaction of abortBackup, which stops further dumpData calls on proxy 0. 3. To any proxy that is not proxy 0, submit a transaction that checks if it needs to upgrade the destination version. 4. The transaction from (3) is committed 5. Transactions from (1) are committed This is possible because the dumpData transactions have no read conflict ranges, and thus it's impossible to make them abort due to "conflicting" transactions. There's also no promise that if client C sends a commit to proxy A, and later a client D sends a commit to proxy B, that B must log its commit after A. (We only promise that if C is told it was committed before D is told it was committed, then A committed before B.) There was a failed attempt to fix this problem. We tried to add read conflict ranges to dumpData transactions so that they could be aborted by "conflicting" transactions. However, this failed because this now means that dumpData transactions require conflict resolution, and the stale read version that they use can cause them to be aborted with a transaction_too_old error. (Transactions that don't have read conflict ranges will never return transaction_too_old, because with no reads, the read snapshot version is effectively meaningless.) This was never previously possible, so the existing code doesn't retry commits, and to make things more complicated, the dumpData commits must be applied in order. This would require either adding dependencies to transactions (if A is going to commit then B must also be/have committed), which would be complicated, or submitting transactions with a fixed read version, and replaying the failed commits with a higher read version once we get a transaction_too_old error, which would unacceptably slow down the maximum throughput of dumpData. Thus, we've instead elected to add a special transaction option that bypasses proxy load balancing for commits, and always commits against proxy 0. We can know for certain that after the transaction from (2) is committed, all of the dumpData transactions that will be committed have been added to the commit promise stream on proxy 0. Thus, if we enqueue another transaction against proxy 0, we can know that it will be placed into the promise stream after all of the dumpData transactions, thus providing the semantics that we require: no dumpData transaction can commit after the destination version upgrade transaction.
2017-12-20 08:44:07 +08:00
throw;
wait(Never()); // never return
2017-05-26 04:48:44 +08:00
throw internal_error(); // does not happen
}
}
Fix VersionStamp problems by instead adding a COMMIT_ON_FIRST_PROXY transaction option. Simulation identified the fact that we can violate the VersionStamps-are-always-increasing promise via the following series of events: 1. On proxy 0, dumpData adds commit requests to proxy 0's commit promise stream 2. To any proxy, a client submits the first transaction of abortBackup, which stops further dumpData calls on proxy 0. 3. To any proxy that is not proxy 0, submit a transaction that checks if it needs to upgrade the destination version. 4. The transaction from (3) is committed 5. Transactions from (1) are committed This is possible because the dumpData transactions have no read conflict ranges, and thus it's impossible to make them abort due to "conflicting" transactions. There's also no promise that if client C sends a commit to proxy A, and later a client D sends a commit to proxy B, that B must log its commit after A. (We only promise that if C is told it was committed before D is told it was committed, then A committed before B.) There was a failed attempt to fix this problem. We tried to add read conflict ranges to dumpData transactions so that they could be aborted by "conflicting" transactions. However, this failed because this now means that dumpData transactions require conflict resolution, and the stale read version that they use can cause them to be aborted with a transaction_too_old error. (Transactions that don't have read conflict ranges will never return transaction_too_old, because with no reads, the read snapshot version is effectively meaningless.) This was never previously possible, so the existing code doesn't retry commits, and to make things more complicated, the dumpData commits must be applied in order. This would require either adding dependencies to transactions (if A is going to commit then B must also be/have committed), which would be complicated, or submitting transactions with a fixed read version, and replaying the failed commits with a higher read version once we get a transaction_too_old error, which would unacceptably slow down the maximum throughput of dumpData. Thus, we've instead elected to add a special transaction option that bypasses proxy load balancing for commits, and always commits against proxy 0. We can know for certain that after the transaction from (2) is committed, all of the dumpData transactions that will be committed have been added to the commit promise stream on proxy 0. Thus, if we enqueue another transaction against proxy 0, we can know that it will be placed into the promise stream after all of the dumpData transactions, thus providing the semantics that we require: no dumpData transaction can commit after the destination version upgrade transaction.
2017-12-20 08:44:07 +08:00
ACTOR template <class T> Future<T> brokenPromiseToMaybeDelivered( Future<T> in ) {
try {
T t = wait(in);
return t;
} catch (Error& e) {
if (e.code() == error_code_broken_promise) {
throw request_maybe_delivered();
}
throw;
}
}
2017-05-26 04:48:44 +08:00
ACTOR template <class T> void tagAndForward( Promise<T>* pOutputPromise, T value, Future<Void> signal ) {
state Promise<T> out( std::move(*pOutputPromise) );
wait( signal );
2017-05-26 04:48:44 +08:00
out.send(value);
}
ACTOR template <class T> void tagAndForwardError( Promise<T>* pOutputPromise, Error value, Future<Void> signal ) {
state Promise<T> out( std::move(*pOutputPromise) );
wait( signal );
2017-05-26 04:48:44 +08:00
out.sendError(value);
}
ACTOR template <class T> Future<T> waitOrError(Future<T> f, Future<Void> errorSignal) {
choose {
when(T val = wait(f)) {
return val;
}
when(wait(errorSignal)) {
ASSERT(false);
throw internal_error();
}
}
}
2017-05-26 04:48:44 +08:00
struct FlowLock : NonCopyable, public ReferenceCounted<FlowLock> {
// FlowLock implements a nonblocking critical section: there can be only a limited number of clients executing code between
// wait(take()) and release(). Not thread safe. take() returns only when the number of holders of the lock is fewer than the
// number of permits, and release() makes the caller no longer a holder of the lock. release() only runs waiting take()rs
// after the caller wait()s
struct Releaser : NonCopyable {
FlowLock* lock;
int remaining;
Releaser() : lock(0), remaining(0) {}
2019-03-04 05:29:23 +08:00
Releaser( FlowLock& lock, int64_t amount = 1 ) : lock(&lock), remaining(amount) {}
2020-06-10 08:33:41 +08:00
Releaser(Releaser&& r) noexcept : lock(r.lock), remaining(r.remaining) { r.remaining = 0; }
2017-05-26 04:48:44 +08:00
void operator=(Releaser&& r) { if (remaining) lock->release(remaining); lock = r.lock; remaining = r.remaining; r.remaining = 0; }
2019-03-04 05:29:23 +08:00
void release( int64_t amount = -1 ) {
2017-05-26 04:48:44 +08:00
if( amount == -1 || amount > remaining )
amount = remaining;
if (remaining)
lock->release( amount );
remaining -= amount;
}
~Releaser() { if (remaining) lock->release(remaining); }
};
FlowLock() : permits(1), active(0) {}
2019-03-04 05:29:23 +08:00
explicit FlowLock(int64_t permits) : permits(permits), active(0) {}
2017-05-26 04:48:44 +08:00
Future<Void> take(TaskPriority taskID = TaskPriority::DefaultYield, int64_t amount = 1) {
if (active + amount <= permits || active == 0) {
2017-05-26 04:48:44 +08:00
active += amount;
return safeYieldActor(this, taskID, amount);
}
return takeActor(this, taskID, amount);
}
2019-03-04 05:29:23 +08:00
void release( int64_t amount = 1 ) {
ASSERT( (active > 0 || amount == 0) && active - amount >= 0 );
2017-05-26 04:48:44 +08:00
active -= amount;
while( !takers.empty() ) {
if( active + takers.begin()->second <= permits || active == 0 ) {
2019-03-04 05:29:23 +08:00
std::pair< Promise<Void>, int64_t > next = std::move( *takers.begin() );
2017-05-26 04:48:44 +08:00
active += next.second;
takers.pop_front();
next.first.send(Void());
} else {
break;
}
}
}
Future<Void> releaseWhen( Future<Void> const& signal, int amount = 1 ) { return releaseWhenActor( this, signal, amount ); }
// returns when any permits are available, having taken as many as possible up to the given amount, and modifies amount to the number of permits taken
2019-03-04 05:29:23 +08:00
Future<Void> takeUpTo(int64_t& amount) {
2017-05-26 04:48:44 +08:00
return takeMoreActor(this, &amount);
}
2019-03-04 05:29:23 +08:00
int64_t available() const { return permits - active; }
int64_t activePermits() const { return active; }
2017-05-26 04:48:44 +08:00
int waiters() const { return takers.size(); }
private:
2019-03-04 05:29:23 +08:00
std::list< std::pair< Promise<Void>, int64_t > > takers;
const int64_t permits;
int64_t active;
2017-05-26 04:48:44 +08:00
Promise<Void> broken_on_destruct;
ACTOR static Future<Void> takeActor(FlowLock* lock, TaskPriority taskID, int64_t amount) {
2019-03-04 05:29:23 +08:00
state std::list<std::pair<Promise<Void>, int64_t>>::iterator it = lock->takers.insert(lock->takers.end(), std::make_pair(Promise<Void>(), amount));
2017-05-26 04:48:44 +08:00
try {
wait( it->first.getFuture() );
2017-05-26 04:48:44 +08:00
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
lock->takers.erase(it);
lock->release(0);
}
throw;
}
try {
double duration = BUGGIFY_WITH_PROB(.001) ? deterministicRandom()->random01()*FLOW_KNOBS->BUGGIFY_FLOW_LOCK_RELEASE_DELAY : 0.0;
choose{ when(wait(delay(duration, taskID))) {} // So release()ing the lock doesn't cause arbitrary code to run on the stack
when(wait(lock->broken_on_destruct.getFuture())) {} }
2017-05-26 04:48:44 +08:00
return Void();
} catch (...) {
TEST(true); // If we get cancelled here, we are holding the lock but the caller doesn't know, so release it
lock->release(amount);
throw;
}
}
2019-03-04 05:29:23 +08:00
ACTOR static Future<Void> takeMoreActor(FlowLock* lock, int64_t* amount) {
wait(lock->take());
2019-03-04 05:29:23 +08:00
int64_t extra = std::min( lock->available(), *amount-1 );
2017-05-26 04:48:44 +08:00
lock->active += extra;
*amount = 1 + extra;
return Void();
}
ACTOR static Future<Void> safeYieldActor(FlowLock* lock, TaskPriority taskID, int64_t amount) {
2017-05-26 04:48:44 +08:00
try {
choose{
when(wait(yield(taskID))) {}
when(wait(lock->broken_on_destruct.getFuture())) {}
2017-05-26 04:48:44 +08:00
}
return Void();
} catch (Error& e) {
lock->release(amount);
throw;
}
}
2019-03-04 05:29:23 +08:00
ACTOR static Future<Void> releaseWhenActor( FlowLock* self, Future<Void> signal, int64_t amount ) {
wait(signal);
2017-05-26 04:48:44 +08:00
self->release(amount);
return Void();
}
};
struct NotifiedInt {
NotifiedInt( int64_t val = 0 ) : val(val) {}
Future<Void> whenAtLeast( int64_t limit ) {
if (val >= limit)
return Void();
Promise<Void> p;
waiting.push( std::make_pair(limit,p) );
return p.getFuture();
}
int64_t get() const { return val; }
void set( int64_t v ) {
ASSERT( v >= val );
if (v != val) {
val = v;
std::vector<Promise<Void>> toSend;
while ( waiting.size() && v >= waiting.top().first ) {
Promise<Void> p = std::move(waiting.top().second);
waiting.pop();
toSend.push_back(p);
}
for(auto& p : toSend) {
p.send(Void());
}
}
}
void operator=( int64_t v ) {
set( v );
}
2020-06-10 08:33:41 +08:00
NotifiedInt(NotifiedInt&& r) noexcept : waiting(std::move(r.waiting)), val(r.val) {}
void operator=(NotifiedInt&& r) noexcept {
waiting = std::move(r.waiting);
val = r.val;
}
private:
typedef std::pair<int64_t,Promise<Void>> Item;
struct ItemCompare {
bool operator()(const Item& a, const Item& b) { return a.first > b.first; }
};
std::priority_queue<Item, std::vector<Item>, ItemCompare> waiting;
int64_t val;
};
struct BoundedFlowLock : NonCopyable, public ReferenceCounted<BoundedFlowLock> {
// BoundedFlowLock is different from a FlowLock in that it has a bound on how many locks can be taken from the oldest outstanding lock.
// For instance, with a FlowLock that has two permits, if one permit is taken but never released, the other permit can be reused an unlimited
// amount of times, but with a BoundedFlowLock, it can only be reused a fixed number of times.
struct Releaser : NonCopyable {
BoundedFlowLock* lock;
int64_t permitNumber;
Releaser() : lock(nullptr), permitNumber(0) {}
Releaser( BoundedFlowLock* lock, int64_t permitNumber ) : lock(lock), permitNumber(permitNumber) {}
2020-06-10 08:33:41 +08:00
Releaser(Releaser&& r) noexcept : lock(r.lock), permitNumber(r.permitNumber) { r.permitNumber = 0; }
void operator=(Releaser&& r) { if (permitNumber) lock->release(permitNumber); lock = r.lock; permitNumber = r.permitNumber; r.permitNumber = 0; }
void release() {
if (permitNumber) {
lock->release(permitNumber);
}
permitNumber = 0;
}
~Releaser() { if (permitNumber) lock->release(permitNumber); }
};
BoundedFlowLock() : unrestrictedPermits(1), boundedPermits(0), nextPermitNumber(0), minOutstanding(0) {}
explicit BoundedFlowLock(int64_t unrestrictedPermits, int64_t boundedPermits) : unrestrictedPermits(unrestrictedPermits), boundedPermits(boundedPermits), nextPermitNumber(0), minOutstanding(0) {}
Future<int64_t> take() {
return takeActor(this);
}
void release( int64_t permitNumber ) {
outstanding.erase(permitNumber);
updateMinOutstanding();
}
private:
IndexedSet<int64_t, int64_t> outstanding;
NotifiedInt minOutstanding;
int64_t nextPermitNumber;
const int64_t unrestrictedPermits;
const int64_t boundedPermits;
void updateMinOutstanding() {
auto it = outstanding.index(unrestrictedPermits-1);
if(it == outstanding.end()) {
minOutstanding.set(nextPermitNumber);
} else {
minOutstanding.set(*it);
}
}
ACTOR static Future<int64_t> takeActor(BoundedFlowLock* lock) {
state int64_t permitNumber = ++lock->nextPermitNumber;
lock->outstanding.insert(permitNumber, 1);
lock->updateMinOutstanding();
wait( lock->minOutstanding.whenAtLeast(std::max<int64_t>(0, permitNumber - lock->boundedPermits)) );
return permitNumber;
}
};
2017-05-26 04:48:44 +08:00
ACTOR template <class T>
Future<Void> yieldPromiseStream( FutureStream<T> input, PromiseStream<T> output, TaskPriority taskID = TaskPriority::DefaultYield ) {
2017-05-26 04:48:44 +08:00
loop {
T f = waitNext( input );
output.send( f );
wait( yield( taskID ) );
2017-05-26 04:48:44 +08:00
}
}
struct YieldedFutureActor : SAV<Void>, ActorCallback<YieldedFutureActor, 1, Void>, FastAllocated<YieldedFutureActor> {
Error in_error_state;
typedef ActorCallback<YieldedFutureActor, 1, Void> CB1;
using FastAllocated<YieldedFutureActor>::operator new;
using FastAllocated<YieldedFutureActor>::operator delete;
YieldedFutureActor(Future<Void> && f) : SAV<Void>(1, 1), in_error_state(Error::fromCode(UNSET_ERROR_CODE)) {
f.addYieldedCallbackAndClear(static_cast< ActorCallback< YieldedFutureActor, 1, Void >* >(this));
}
void cancel()
{
if (!SAV<Void>::canBeSet()) return; // Cancel could be invoked *by* a callback within finish(). Otherwise it's guaranteed that we are waiting either on the original future or on a delay().
ActorCallback<YieldedFutureActor, 1, Void>::remove();
SAV<Void>::sendErrorAndDelPromiseRef(actor_cancelled());
}
virtual void destroy() {
delete this;
}
void a_callback_fire(ActorCallback<YieldedFutureActor, 1, Void>*, Void) {
if (int16_t(in_error_state.code()) == UNSET_ERROR_CODE) {
in_error_state = Error::fromCode(SET_ERROR_CODE);
if (check_yield())
doYield();
else
finish();
} else {
// We hit this case when and only when the delay() created by a previous doYield() fires. Then we want to get at least one task done, regardless of what check_yield() would say.
finish();
}
}
void a_callback_error(ActorCallback<YieldedFutureActor, 1, Void>*, Error const& err) {
ASSERT(int16_t(in_error_state.code()) == UNSET_ERROR_CODE);
in_error_state = err;
if (check_yield())
doYield();
else
finish();
}
void finish() {
ActorCallback<YieldedFutureActor, 1, Void>::remove();
if (int16_t(in_error_state.code()) == SET_ERROR_CODE)
SAV<Void>::sendAndDelPromiseRef(Void());
else
SAV<Void>::sendErrorAndDelPromiseRef(in_error_state);
}
void doYield() {
// Since we are being fired, we are the first callback in the ring, and `prev` is the source future
Callback<Void>* source = CB1::prev;
ASSERT(source->next == static_cast<CB1*>(this));
// Remove the source future from the ring. All the remaining callbacks in the ring should be yielded, since yielded callbacks are installed at the end
CB1::prev = source->prev;
CB1::prev->next = static_cast<CB1*>(this);
// The source future's ring is now empty, since we have removed all the callbacks
source->next = source->prev = source;
source->unwait();
// Link all the callbacks, including this one, into the ring of a delay future so that after a short time they will be fired again
delay(0, g_network->getCurrentTask()).addCallbackChainAndClear(static_cast< CB1* >(this));
}
};
2019-06-21 00:29:01 +08:00
inline Future<Void> yieldedFuture(Future<Void> f) {
2017-05-26 04:48:44 +08:00
if (f.isReady())
return yield();
else
return Future<Void>(new YieldedFutureActor(std::move(f)));
}
//An AsyncMap that uses a yieldedFuture in its onChange method.
template <class K, class V>
class YieldedAsyncMap : public AsyncMap<K, V> {
public:
Future<Void> onChange(K const& k) { // throws broken_promise if this is destroyed
auto &item = AsyncMap<K, V>::items[k];
if (item.value == AsyncMap<K, V>::defaultValue)
return destroyOnCancelYield(this, k, item.change.getFuture());
return yieldedFuture(item.change.getFuture());
}
ACTOR static Future<Void> destroyOnCancelYield( YieldedAsyncMap* self, K key, Future<Void> change ) {
try {
wait(yieldedFuture(change));
2017-05-26 04:48:44 +08:00
return Void();
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled && !self->destructing && change.getFutureReferenceCount() == 1 && change.getPromiseReferenceCount() == 1) {
if(EXPENSIVE_VALIDATION) {
auto& p = self->items[key];
ASSERT(p.change.getFuture() == change);
}
self->items.erase(key);
}
throw;
}
}
};
ACTOR template <class T>
Future<T> delayActionJittered( Future<T> what, double time ) {
wait( delayJittered( time ) );
2017-05-26 04:48:44 +08:00
T t = wait( what );
return t;
}
class AndFuture {
public:
AndFuture() { }
AndFuture(AndFuture const& f) {
futures = f.futures;
}
2020-06-10 08:33:41 +08:00
AndFuture(AndFuture&& f) noexcept { futures = std::move(f.futures); }
2017-05-26 04:48:44 +08:00
AndFuture(Future<Void> const& f) {
futures.push_back(f);
}
AndFuture(Error const& e) {
futures.push_back(e);
}
operator Future<Void>() {
return getFuture();
}
void operator=(AndFuture const& f) {
futures = f.futures;
}
2020-06-10 08:33:41 +08:00
void operator=(AndFuture&& f) noexcept { futures = std::move(f.futures); }
2017-05-26 04:48:44 +08:00
void operator=(Future<Void> const& f) {
futures.push_back(f);
}
void operator=(Error const& e) {
futures.push_back(e);
}
Future<Void> getFuture() {
if(futures.empty())
return Void();
if(futures.size() == 1)
return futures[0];
Future<Void> f = waitForAll(futures);
2020-04-25 03:21:35 +08:00
futures = std::vector<Future<Void>>{ f };
2017-05-26 04:48:44 +08:00
return f;
}
bool isReady() {
for( int i = futures.size() - 1; i >= 0; --i ) {
if( !futures[i].isReady() ) {
return false;
}
else if(!futures[i].isError()) {
swapAndPop(&futures, i);
2017-05-26 04:48:44 +08:00
}
}
return true;
}
bool isError() {
for( int i = 0; i < futures.size(); i++ )
if( futures[i].isError() )
return true;
return false;
}
void cleanup() {
for( int i = 0; i < futures.size(); i++ ) {
if( futures[i].isReady() && !futures[i].isError() ) {
swapAndPop(&futures, i--);
2017-05-26 04:48:44 +08:00
}
}
}
void add(Future<Void> const& f) {
if(!f.isReady() || f.isError())
futures.push_back(f);
}
void add(AndFuture f) {
add(f.getFuture());
}
private:
std::vector<Future<Void>> futures;
};
// Performs an unordered merge of a and b.
ACTOR template <class T>
Future<Void> unorderedMergeStreams(FutureStream<T> a, FutureStream<T> b, PromiseStream<T> output) {
state Future<T> aFuture = waitAndForward(a);
state Future<T> bFuture = waitAndForward(b);
state bool aOpen = true;
state bool bOpen = true;
loop{
try {
choose {
when(T val = wait(aFuture)) {
output.send(val);
aFuture = waitAndForward(a);
}
when(T val = wait(bFuture)) {
output.send(val);
bFuture = waitAndForward(b);
}
}
}
catch (Error &e) {
if (e.code() != error_code_end_of_stream) {
output.sendError(e);
break;
}
ASSERT(!aFuture.isError() || !bFuture.isError() || aFuture.getError().code() == bFuture.getError().code());
if (aFuture.isError()) {
aFuture = Never();
aOpen = false;
}
if (bFuture.isError()) {
bFuture = Never();
bOpen = false;
}
if (!aOpen && !bOpen) {
output.sendError(e);
break;
}
}
}
return Void();
}
// Returns the ordered merge of a and b, assuming that a and b are both already ordered (prefer a over b if keys are equal). T must be a class that implements compare()
ACTOR template <class T>
Future<Void> orderedMergeStreams( FutureStream<T> a, FutureStream<T> b, PromiseStream<T> output ) {
state Optional<T> savedKVa;
state bool aOpen;
state Optional<T> savedKVb;
state bool bOpen;
aOpen = bOpen = true;
loop {
if ( aOpen && !savedKVa.present() ) {
try {
T KVa = waitNext( a );
savedKVa = Optional<T>( KVa );
} catch ( Error& e ) {
if ( e.code() == error_code_end_of_stream ) {
aOpen = false;
if (!bOpen) {
output.sendError(e);
}
} else {
output.sendError(e);
break;
}
}
}
if ( bOpen && !savedKVb.present() ) {
try {
T KVb = waitNext( b );
savedKVb = Optional<T>( KVb );
} catch ( Error& e ) {
if ( e.code() == error_code_end_of_stream ) {
bOpen = false;
if (!aOpen) {
output.sendError(e);
}
} else {
output.sendError(e);
break;
}
}
}
if (!aOpen) {
output.send( savedKVb.get() );
savedKVb = Optional<T>();
} else if (!bOpen) {
output.send( savedKVa.get() );
savedKVa = Optional<T>();
} else {
int cmp = savedKVa.get().compare( savedKVb.get() );
if ( cmp == 0 ) {
// prefer a
output.send( savedKVa.get() );
savedKVa = Optional<T>();
savedKVb = Optional<T>();
} else if ( cmp < 0 ) {
output.send( savedKVa.get() );
savedKVa = Optional<T>();
} else {
output.send( savedKVb.get() );
savedKVb = Optional<T>();
}
}
}
return Void();
}
ACTOR template<class T>
Future<Void> timeReply(Future<T> replyToTime, PromiseStream<double> timeOutput){
state double startTime = now();
try {
T _ = wait(replyToTime);
wait( delay(0) );
2017-05-26 04:48:44 +08:00
timeOutput.send(now() - startTime);
} catch( Error &e ) {
// Ignore broken promises. They typically occur during shutdown and our callers don't want to have to create brokenPromiseToNever actors to ignore them. For what it's worth we are breaking timeOutput to pass the pain along.
if( e.code() != error_code_broken_promise )
throw;
}
return Void();
}
#include "flow/unactorcompiler.h"
2017-05-26 04:48:44 +08:00
#endif