395 lines
10 KiB
C++
395 lines
10 KiB
C++
/*
|
|
* MultiVersionAssignmentVars.h
|
|
*
|
|
* This source file is part of the FoundationDB open source project
|
|
*
|
|
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
#ifndef FDBCLIENT_MULTIVERSIONASSIGNMENTVARS_H
|
|
#define FDBCLIENT_MULTIVERSIONASSIGNMENTVARS_H
|
|
#pragma once
|
|
|
|
#include "flow/ThreadHelper.actor.h"
|
|
|
|
template <class T>
|
|
class AbortableSingleAssignmentVar final : public ThreadSingleAssignmentVar<T>, public ThreadCallback {
|
|
public:
|
|
AbortableSingleAssignmentVar(ThreadFuture<T> future, ThreadFuture<Void> abortSignal)
|
|
: future(future), abortSignal(abortSignal), hasBeenSet(false), callbacksCleared(false) {
|
|
int userParam;
|
|
|
|
ThreadSingleAssignmentVar<T>::addref();
|
|
ThreadSingleAssignmentVar<T>::addref();
|
|
|
|
// abortSignal comes first, because otherwise future could immediately call fire/error and attempt to remove
|
|
// this callback from abortSignal prematurely
|
|
abortSignal.callOrSetAsCallback(this, userParam, 0);
|
|
future.callOrSetAsCallback(this, userParam, 0);
|
|
}
|
|
|
|
void cancel() override {
|
|
cancelCallbacks();
|
|
ThreadSingleAssignmentVar<T>::cancel();
|
|
}
|
|
|
|
void cleanupUnsafe() override {
|
|
future.getPtr()->releaseMemory();
|
|
ThreadSingleAssignmentVar<T>::cleanupUnsafe();
|
|
}
|
|
|
|
bool canFire(int notMadeActive) const override { return true; }
|
|
|
|
void fire(const Void& unused, int& userParam) override {
|
|
lock.enter();
|
|
if (!hasBeenSet) {
|
|
hasBeenSet = true;
|
|
lock.leave();
|
|
|
|
if (future.isReady() && !future.isError()) {
|
|
ThreadSingleAssignmentVar<T>::send(future.get());
|
|
} else if (abortSignal.isReady()) {
|
|
ThreadSingleAssignmentVar<T>::sendError(cluster_version_changed());
|
|
} else {
|
|
ASSERT(false);
|
|
}
|
|
} else {
|
|
lock.leave();
|
|
}
|
|
|
|
cancelCallbacks();
|
|
ThreadSingleAssignmentVar<T>::delref();
|
|
}
|
|
|
|
void error(const Error& e, int& userParam) override {
|
|
ASSERT(future.isError());
|
|
lock.enter();
|
|
if (!hasBeenSet) {
|
|
hasBeenSet = true;
|
|
lock.leave();
|
|
|
|
ThreadSingleAssignmentVar<T>::sendError(future.getError());
|
|
} else {
|
|
lock.leave();
|
|
}
|
|
|
|
cancelCallbacks();
|
|
ThreadSingleAssignmentVar<T>::delref();
|
|
}
|
|
|
|
private:
|
|
ThreadFuture<T> future;
|
|
ThreadFuture<Void> abortSignal;
|
|
|
|
ThreadSpinLock lock;
|
|
bool hasBeenSet;
|
|
bool callbacksCleared;
|
|
|
|
void cancelCallbacks() {
|
|
lock.enter();
|
|
|
|
if (!callbacksCleared) {
|
|
callbacksCleared = true;
|
|
lock.leave();
|
|
|
|
future.getPtr()->addref(); // Cancel will delref our future, but we don't want to destroy it until this
|
|
// callback gets destroyed
|
|
future.getPtr()->cancel();
|
|
|
|
if (abortSignal.clearCallback(this)) {
|
|
ThreadSingleAssignmentVar<T>::delref();
|
|
}
|
|
} else {
|
|
lock.leave();
|
|
}
|
|
}
|
|
};
|
|
|
|
template <class T>
|
|
ThreadFuture<T> abortableFuture(ThreadFuture<T> f, ThreadFuture<Void> abortSignal) {
|
|
return ThreadFuture<T>(new AbortableSingleAssignmentVar<T>(f, abortSignal));
|
|
}
|
|
|
|
template <class T>
|
|
class DLThreadSingleAssignmentVar final : public ThreadSingleAssignmentVar<T> {
|
|
public:
|
|
DLThreadSingleAssignmentVar(Reference<FdbCApi> api,
|
|
FdbCApi::FDBFuture* f,
|
|
std::function<T(FdbCApi::FDBFuture*, FdbCApi*)> extractValue)
|
|
: api(api), f(f), extractValue(extractValue), futureRefCount(1) {
|
|
ThreadSingleAssignmentVar<T>::addref();
|
|
api->futureSetCallback(f, &futureCallback, this);
|
|
}
|
|
|
|
~DLThreadSingleAssignmentVar() override {
|
|
lock.assertNotEntered();
|
|
if (f) {
|
|
ASSERT_ABORT(futureRefCount == 1);
|
|
api->futureDestroy(f);
|
|
}
|
|
}
|
|
|
|
bool addFutureRef() {
|
|
lock.enter();
|
|
bool destroyed = futureRefCount == 0;
|
|
if (!destroyed) {
|
|
++futureRefCount;
|
|
}
|
|
lock.leave();
|
|
|
|
return !destroyed;
|
|
}
|
|
|
|
bool delFutureRef() {
|
|
lock.enter();
|
|
if (futureRefCount == 0) {
|
|
lock.leave();
|
|
return true;
|
|
}
|
|
|
|
bool destroyNow = (--futureRefCount == 0);
|
|
lock.leave();
|
|
|
|
if (destroyNow) {
|
|
api->futureDestroy(f);
|
|
f = nullptr;
|
|
}
|
|
|
|
return destroyNow;
|
|
}
|
|
|
|
void cancel() override {
|
|
if (addFutureRef()) {
|
|
api->futureCancel(f);
|
|
delFutureRef();
|
|
}
|
|
|
|
ThreadSingleAssignmentVar<T>::cancel();
|
|
}
|
|
|
|
void cleanupUnsafe() override {
|
|
delFutureRef();
|
|
ThreadSingleAssignmentVar<T>::cleanupUnsafe();
|
|
}
|
|
|
|
void apply() {
|
|
FdbCApi::fdb_error_t error = addFutureRef() ? api->futureGetError(f) : error_code_operation_cancelled;
|
|
if (error != 0) {
|
|
delFutureRef();
|
|
ThreadSingleAssignmentVar<T>::sendError(Error(error));
|
|
} else {
|
|
T val = extractValue(f, api.getPtr());
|
|
delFutureRef();
|
|
ThreadSingleAssignmentVar<T>::send(val);
|
|
}
|
|
|
|
ThreadSingleAssignmentVar<T>::delref();
|
|
}
|
|
|
|
static void futureCallback(FdbCApi::FDBFuture* f, void* param) {
|
|
auto sav = (DLThreadSingleAssignmentVar<T>*)param;
|
|
|
|
if (MultiVersionApi::api->callbackOnMainThread) {
|
|
onMainThreadVoid([sav]() { sav->apply(); }, nullptr);
|
|
} else {
|
|
sav->apply();
|
|
}
|
|
}
|
|
|
|
private:
|
|
const Reference<FdbCApi> api;
|
|
FdbCApi::FDBFuture* f;
|
|
const std::function<T(FdbCApi::FDBFuture* f, FdbCApi* api)> extractValue;
|
|
ThreadSpinLock lock;
|
|
|
|
int futureRefCount;
|
|
};
|
|
|
|
template <class T>
|
|
ThreadFuture<T> toThreadFuture(Reference<FdbCApi> api,
|
|
FdbCApi::FDBFuture* f,
|
|
std::function<T(FdbCApi::FDBFuture* f, FdbCApi* api)> extractValue) {
|
|
return ThreadFuture<T>(new DLThreadSingleAssignmentVar<T>(api, f, extractValue));
|
|
}
|
|
|
|
template <class S, class T>
|
|
class MapSingleAssignmentVar final : public ThreadSingleAssignmentVar<T>, ThreadCallback {
|
|
public:
|
|
MapSingleAssignmentVar(ThreadFuture<S> source, std::function<ErrorOr<T>(ErrorOr<S>)> mapValue)
|
|
: source(source), mapValue(mapValue) {
|
|
ThreadSingleAssignmentVar<T>::addref();
|
|
|
|
int userParam;
|
|
source.callOrSetAsCallback(this, userParam, 0);
|
|
}
|
|
|
|
void cancel() override {
|
|
source.getPtr()->addref(); // Cancel will delref our future, but we don't want to destroy it until this callback
|
|
// gets destroyed
|
|
source.getPtr()->cancel();
|
|
ThreadSingleAssignmentVar<T>::cancel();
|
|
}
|
|
|
|
void cleanupUnsafe() override {
|
|
source.getPtr()->releaseMemory();
|
|
ThreadSingleAssignmentVar<T>::cleanupUnsafe();
|
|
}
|
|
|
|
bool canFire(int notMadeActive) const override { return true; }
|
|
|
|
void fire(const Void& unused, int& userParam) override {
|
|
sendResult(mapValue(source.get()));
|
|
ThreadSingleAssignmentVar<T>::delref();
|
|
}
|
|
|
|
void error(const Error& e, int& userParam) override {
|
|
sendResult(mapValue(source.getError()));
|
|
ThreadSingleAssignmentVar<T>::delref();
|
|
}
|
|
|
|
private:
|
|
ThreadFuture<S> source;
|
|
const std::function<ErrorOr<T>(ErrorOr<S>)> mapValue;
|
|
|
|
void sendResult(ErrorOr<T> result) {
|
|
if (result.isError()) {
|
|
ThreadSingleAssignmentVar<T>::sendError(result.getError());
|
|
} else {
|
|
ThreadSingleAssignmentVar<T>::send(result.get());
|
|
}
|
|
}
|
|
};
|
|
|
|
template <class S, class T>
|
|
ThreadFuture<T> mapThreadFuture(ThreadFuture<S> source, std::function<ErrorOr<T>(ErrorOr<S>)> mapValue) {
|
|
return ThreadFuture<T>(new MapSingleAssignmentVar<S, T>(source, mapValue));
|
|
}
|
|
|
|
template <class S, class T>
|
|
class FlatMapSingleAssignmentVar final : public ThreadSingleAssignmentVar<T>, ThreadCallback {
|
|
public:
|
|
FlatMapSingleAssignmentVar(ThreadFuture<S> source, std::function<ErrorOr<ThreadFuture<T>>(ErrorOr<S>)> mapValue)
|
|
: source(source), mapValue(mapValue), cancelled(false), released(false) {
|
|
ThreadSingleAssignmentVar<T>::addref();
|
|
|
|
int userParam;
|
|
source.callOrSetAsCallback(this, userParam, 0);
|
|
}
|
|
|
|
void cancel() override {
|
|
source.getPtr()->addref(); // Cancel will delref our future, but we don't want to destroy it until this callback
|
|
// gets destroyed
|
|
source.getPtr()->cancel();
|
|
|
|
lock.enter();
|
|
cancelled = true;
|
|
if (mappedFuture.isValid()) {
|
|
lock.leave();
|
|
mappedFuture.getPtr()->addref();
|
|
mappedFuture.getPtr()->cancel();
|
|
} else {
|
|
lock.leave();
|
|
}
|
|
|
|
ThreadSingleAssignmentVar<T>::cancel();
|
|
}
|
|
|
|
void cleanupUnsafe() override {
|
|
source.getPtr()->releaseMemory();
|
|
|
|
lock.enter();
|
|
released = true;
|
|
if (mappedFuture.isValid()) {
|
|
lock.leave();
|
|
mappedFuture.getPtr()->releaseMemory();
|
|
} else {
|
|
lock.leave();
|
|
}
|
|
|
|
ThreadSingleAssignmentVar<T>::cleanupUnsafe();
|
|
}
|
|
|
|
bool canFire(int notMadeActive) const override { return true; }
|
|
|
|
void fire(const Void& unused, int& userParam) override {
|
|
if (mappedFuture.isValid()) {
|
|
sendResult(mappedFuture.get());
|
|
} else {
|
|
setMappedFuture(mapValue(source.get()));
|
|
}
|
|
|
|
ThreadSingleAssignmentVar<T>::delref();
|
|
}
|
|
|
|
void error(const Error& e, int& userParam) override {
|
|
if (mappedFuture.isValid()) {
|
|
sendResult(mappedFuture.getError());
|
|
} else {
|
|
setMappedFuture(mapValue(source.getError()));
|
|
}
|
|
|
|
ThreadSingleAssignmentVar<T>::delref();
|
|
}
|
|
|
|
private:
|
|
ThreadFuture<S> source;
|
|
ThreadFuture<T> mappedFuture;
|
|
bool cancelled;
|
|
bool released;
|
|
const std::function<ErrorOr<ThreadFuture<T>>(ErrorOr<S>)> mapValue;
|
|
|
|
ThreadSpinLock lock;
|
|
|
|
void setMappedFuture(ErrorOr<ThreadFuture<T>> f) {
|
|
if (f.isError()) {
|
|
sendResult(f.getError());
|
|
} else {
|
|
lock.enter();
|
|
mappedFuture = f.get();
|
|
bool doCancel = cancelled;
|
|
bool doRelease = released;
|
|
lock.leave();
|
|
|
|
if (doCancel) {
|
|
mappedFuture.getPtr()->addref();
|
|
mappedFuture.getPtr()->cancel();
|
|
}
|
|
if (doRelease) {
|
|
mappedFuture.getPtr()->releaseMemory();
|
|
}
|
|
|
|
int userParam;
|
|
ThreadSingleAssignmentVar<T>::addref();
|
|
mappedFuture.callOrSetAsCallback(this, userParam, 0);
|
|
}
|
|
}
|
|
|
|
void sendResult(ErrorOr<T> result) {
|
|
if (result.isError()) {
|
|
ThreadSingleAssignmentVar<T>::sendError(result.getError());
|
|
} else {
|
|
ThreadSingleAssignmentVar<T>::send(result.get());
|
|
}
|
|
}
|
|
};
|
|
|
|
template <class S, class T>
|
|
ThreadFuture<T> flatMapThreadFuture(ThreadFuture<S> source,
|
|
std::function<ErrorOr<ThreadFuture<T>>(ErrorOr<S>)> mapValue) {
|
|
return ThreadFuture<T>(new FlatMapSingleAssignmentVar<S, T>(source, mapValue));
|
|
}
|
|
|
|
#endif
|