foundationdb/fdbclient/MultiVersionAssignmentVars.h

396 lines
9.8 KiB
C++

/*
* MultiVersionAssignmentVars.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBCLIENT_MULTIVERSIONASSIGNMENTVARS_H
#define FDBCLIENT_MULTIVERSIONASSIGNMENTVARS_H
#pragma once
#include "flow/ThreadHelper.actor.h"
template<class T>
class AbortableSingleAssignmentVar : public ThreadSingleAssignmentVar<T>, public ThreadCallback {
public:
AbortableSingleAssignmentVar(ThreadFuture<T> future, ThreadFuture<Void> abortSignal) : future(future), abortSignal(abortSignal), hasBeenSet(false), callbacksCleared(false) {
int userParam;
ThreadSingleAssignmentVar<T>::addref();
ThreadSingleAssignmentVar<T>::addref();
// abortSignal comes first, because otherwise future could immediately call fire/error and attempt to remove this callback from abortSignal prematurely
abortSignal.callOrSetAsCallback(this, userParam, 0);
future.callOrSetAsCallback(this, userParam, 0);
}
virtual void cancel() {
cancelCallbacks();
ThreadSingleAssignmentVar<T>::cancel();
}
virtual void cleanupUnsafe() {
future.getPtr()->releaseMemory();
ThreadSingleAssignmentVar<T>::cleanupUnsafe();
}
bool canFire(int notMadeActive) { return true; }
void fire(const Void &unused, int& userParam) {
lock.enter();
if(!hasBeenSet) {
hasBeenSet = true;
lock.leave();
if(future.isReady() && !future.isError()) {
ThreadSingleAssignmentVar<T>::send(future.get());
}
else if(abortSignal.isReady()) {
ThreadSingleAssignmentVar<T>::sendError(cluster_version_changed());
}
else {
ASSERT(false);
}
}
else {
lock.leave();
}
cancelCallbacks();
ThreadSingleAssignmentVar<T>::delref();
}
void error(const Error& e, int& userParam) {
ASSERT(future.isError());
lock.enter();
if(!hasBeenSet) {
hasBeenSet = true;
lock.leave();
ThreadSingleAssignmentVar<T>::sendError(future.getError());
}
else {
lock.leave();
}
cancelCallbacks();
ThreadSingleAssignmentVar<T>::delref();
}
private:
ThreadFuture<T> future;
ThreadFuture<Void> abortSignal;
ThreadSpinLock lock;
bool hasBeenSet;
bool callbacksCleared;
void cancelCallbacks() {
lock.enter();
if(!callbacksCleared) {
callbacksCleared = true;
lock.leave();
future.getPtr()->addref(); // Cancel will delref our future, but we don't want to destroy it until this callback gets destroyed
future.getPtr()->cancel();
if(abortSignal.clearCallback(this)) {
ThreadSingleAssignmentVar<T>::delref();
}
}
else {
lock.leave();
}
}
};
template<class T>
ThreadFuture<T> abortableFuture(ThreadFuture<T> f, ThreadFuture<Void> abortSignal) {
return ThreadFuture<T>(new AbortableSingleAssignmentVar<T>(f, abortSignal));
}
template<class T>
class DLThreadSingleAssignmentVar : public ThreadSingleAssignmentVar<T> {
public:
DLThreadSingleAssignmentVar(Reference<FdbCApi> api, FdbCApi::FDBFuture *f, std::function<T(FdbCApi::FDBFuture*, FdbCApi*)> extractValue) : api(api), f(f), extractValue(extractValue), futureRefCount(1) {
ThreadSingleAssignmentVar<T>::addref();
api->futureSetCallback(f, &futureCallback, this);
}
~DLThreadSingleAssignmentVar() {
lock.assertNotEntered();
if(f) {
ASSERT_ABORT(futureRefCount == 1);
api->futureDestroy(f);
}
}
bool addFutureRef() {
lock.enter();
bool destroyed = futureRefCount == 0;
if(!destroyed) {
++futureRefCount;
}
lock.leave();
return !destroyed;
}
bool delFutureRef() {
lock.enter();
if(futureRefCount == 0) {
lock.leave();
return true;
}
bool destroyNow = (--futureRefCount == 0);
lock.leave();
if(destroyNow) {
api->futureDestroy(f);
f = NULL;
}
return destroyNow;
}
virtual void cancel() {
if(addFutureRef()) {
api->futureCancel(f);
delFutureRef();
}
ThreadSingleAssignmentVar<T>::cancel();
}
virtual void cleanupUnsafe() {
delFutureRef();
ThreadSingleAssignmentVar<T>::cleanupUnsafe();
}
void apply() {
FdbCApi::fdb_error_t error = addFutureRef() ? api->futureGetError(f) : error_code_operation_cancelled;
if(error != 0) {
delFutureRef();
ThreadSingleAssignmentVar<T>::sendError(Error(error));
}
else {
T val = extractValue(f, api.getPtr());
delFutureRef();
ThreadSingleAssignmentVar<T>::send(val);
}
ThreadSingleAssignmentVar<T>::delref();
}
static void futureCallback(FdbCApi::FDBFuture *f, void *param) {
auto sav = (DLThreadSingleAssignmentVar<T>*)param;
if(MultiVersionApi::api->callbackOnMainThread) {
onMainThreadVoid([sav](){ sav->apply(); }, NULL);
}
else {
sav->apply();
}
}
private:
const Reference<FdbCApi> api;
FdbCApi::FDBFuture *f;
const std::function<T(FdbCApi::FDBFuture *f, FdbCApi *api)> extractValue;
ThreadSpinLock lock;
int futureRefCount;
};
template<class T>
ThreadFuture<T> toThreadFuture(Reference<FdbCApi> api, FdbCApi::FDBFuture *f, std::function<T(FdbCApi::FDBFuture *f, FdbCApi *api)> extractValue) {
return ThreadFuture<T>(new DLThreadSingleAssignmentVar<T>(api, f, extractValue));
}
template<class S, class T>
class MapSingleAssignmentVar : public ThreadSingleAssignmentVar<T>, ThreadCallback {
public:
MapSingleAssignmentVar(ThreadFuture<S> source, std::function<ErrorOr<T>(ErrorOr<S>)> mapValue) : source(source), mapValue(mapValue) {
ThreadSingleAssignmentVar<T>::addref();
int userParam;
source.callOrSetAsCallback(this, userParam, 0);
}
virtual void cancel() {
source.getPtr()->addref(); // Cancel will delref our future, but we don't want to destroy it until this callback gets destroyed
source.getPtr()->cancel();
ThreadSingleAssignmentVar<T>::cancel();
}
virtual void cleanupUnsafe() {
source.getPtr()->releaseMemory();
ThreadSingleAssignmentVar<T>::cleanupUnsafe();
}
bool canFire(int notMadeActive) { return true; }
void fire(const Void &unused, int& userParam) {
sendResult(mapValue(source.get()));
ThreadSingleAssignmentVar<T>::delref();
}
void error(const Error& e, int& userParam) {
sendResult(mapValue(source.getError()));
ThreadSingleAssignmentVar<T>::delref();
}
private:
ThreadFuture<S> source;
const std::function<ErrorOr<T>(ErrorOr<S>)> mapValue;
void sendResult(ErrorOr<T> result) {
if(result.isError()) {
ThreadSingleAssignmentVar<T>::sendError(result.getError());
}
else {
ThreadSingleAssignmentVar<T>::send(result.get());
}
}
};
template<class S, class T>
ThreadFuture<T> mapThreadFuture(ThreadFuture<S> source, std::function<ErrorOr<T>(ErrorOr<S>)> mapValue) {
return ThreadFuture<T>(new MapSingleAssignmentVar<S, T>(source, mapValue));
}
template<class S, class T>
class FlatMapSingleAssignmentVar : public ThreadSingleAssignmentVar<T>, ThreadCallback {
public:
FlatMapSingleAssignmentVar(ThreadFuture<S> source, std::function<ErrorOr<ThreadFuture<T>>(ErrorOr<S>)> mapValue) : source(source), mapValue(mapValue), cancelled(false), released(false) {
ThreadSingleAssignmentVar<T>::addref();
int userParam;
source.callOrSetAsCallback(this, userParam, 0);
}
virtual void cancel() {
source.getPtr()->addref(); // Cancel will delref our future, but we don't want to destroy it until this callback gets destroyed
source.getPtr()->cancel();
lock.enter();
cancelled = true;
if(mappedFuture.isValid()) {
lock.leave();
mappedFuture.getPtr()->addref();
mappedFuture.getPtr()->cancel();
}
else {
lock.leave();
}
ThreadSingleAssignmentVar<T>::cancel();
}
virtual void cleanupUnsafe() {
source.getPtr()->releaseMemory();
lock.enter();
released = true;
if(mappedFuture.isValid()) {
lock.leave();
mappedFuture.getPtr()->releaseMemory();
}
else {
lock.leave();
}
ThreadSingleAssignmentVar<T>::cleanupUnsafe();
}
bool canFire(int notMadeActive) { return true; }
void fire(const Void &unused, int& userParam) {
if(mappedFuture.isValid()) {
sendResult(mappedFuture.get());
}
else {
setMappedFuture(mapValue(source.get()));
}
ThreadSingleAssignmentVar<T>::delref();
}
void error(const Error& e, int& userParam) {
if(mappedFuture.isValid()) {
sendResult(mappedFuture.getError());
}
else {
setMappedFuture(mapValue(source.getError()));
}
ThreadSingleAssignmentVar<T>::delref();
}
private:
ThreadFuture<S> source;
ThreadFuture<T> mappedFuture;
bool cancelled;
bool released;
const std::function<ErrorOr<ThreadFuture<T>>(ErrorOr<S>)> mapValue;
ThreadSpinLock lock;
void setMappedFuture(ErrorOr<ThreadFuture<T>> f) {
if(f.isError()) {
sendResult(f.getError());
}
else {
lock.enter();
mappedFuture = f.get();
bool doCancel = cancelled;
bool doRelease = released;
lock.leave();
if(doCancel) {
mappedFuture.getPtr()->addref();
mappedFuture.getPtr()->cancel();
}
if(doRelease) {
mappedFuture.getPtr()->releaseMemory();
}
int userParam;
ThreadSingleAssignmentVar<T>::addref();
mappedFuture.callOrSetAsCallback(this, userParam, 0);
}
}
void sendResult(ErrorOr<T> result) {
if(result.isError()) {
ThreadSingleAssignmentVar<T>::sendError(result.getError());
}
else {
ThreadSingleAssignmentVar<T>::send(result.get());
}
}
};
template<class S, class T>
ThreadFuture<T> flatMapThreadFuture(ThreadFuture<S> source, std::function<ErrorOr<ThreadFuture<T>>(ErrorOr<S>)> mapValue) {
return ThreadFuture<T>(new FlatMapSingleAssignmentVar<S, T>(source, mapValue));
}
#endif