foundationdb/fdbclient/MultiVersionAssignmentVars.h

395 lines
10 KiB
C
Raw Normal View History

2017-05-26 04:48:44 +08:00
/*
* MultiVersionAssignmentVars.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
2017-05-26 04:48:44 +08:00
* http://www.apache.org/licenses/LICENSE-2.0
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBCLIENT_MULTIVERSIONASSIGNMENTVARS_H
#define FDBCLIENT_MULTIVERSIONASSIGNMENTVARS_H
#pragma once
#include "flow/ThreadHelper.actor.h"
template <class T>
class AbortableSingleAssignmentVar final : public ThreadSingleAssignmentVar<T>, public ThreadCallback {
2017-05-26 04:48:44 +08:00
public:
AbortableSingleAssignmentVar(ThreadFuture<T> future, ThreadFuture<Void> abortSignal)
: future(future), abortSignal(abortSignal), hasBeenSet(false), callbacksCleared(false) {
2017-05-26 04:48:44 +08:00
int userParam;
ThreadSingleAssignmentVar<T>::addref();
ThreadSingleAssignmentVar<T>::addref();
// abortSignal comes first, because otherwise future could immediately call fire/error and attempt to remove
// this callback from abortSignal prematurely
2017-05-26 04:48:44 +08:00
abortSignal.callOrSetAsCallback(this, userParam, 0);
future.callOrSetAsCallback(this, userParam, 0);
}
2017-05-26 04:48:44 +08:00
void cancel() override {
2017-05-26 04:48:44 +08:00
cancelCallbacks();
ThreadSingleAssignmentVar<T>::cancel();
}
void cleanupUnsafe() override {
2017-05-26 04:48:44 +08:00
future.getPtr()->releaseMemory();
ThreadSingleAssignmentVar<T>::cleanupUnsafe();
}
bool canFire(int notMadeActive) const override { return true; }
2017-05-26 04:48:44 +08:00
void fire(const Void& unused, int& userParam) override {
2017-05-26 04:48:44 +08:00
lock.enter();
if (!hasBeenSet) {
2017-05-26 04:48:44 +08:00
hasBeenSet = true;
lock.leave();
if (future.isReady() && !future.isError()) {
2017-05-26 04:48:44 +08:00
ThreadSingleAssignmentVar<T>::send(future.get());
} else if (abortSignal.isReady()) {
2017-05-26 04:48:44 +08:00
ThreadSingleAssignmentVar<T>::sendError(cluster_version_changed());
} else {
2017-05-26 04:48:44 +08:00
ASSERT(false);
}
} else {
2017-05-26 04:48:44 +08:00
lock.leave();
}
cancelCallbacks();
ThreadSingleAssignmentVar<T>::delref();
}
void error(const Error& e, int& userParam) override {
2017-05-26 04:48:44 +08:00
ASSERT(future.isError());
lock.enter();
if (!hasBeenSet) {
2017-05-26 04:48:44 +08:00
hasBeenSet = true;
lock.leave();
ThreadSingleAssignmentVar<T>::sendError(future.getError());
} else {
2017-05-26 04:48:44 +08:00
lock.leave();
}
cancelCallbacks();
ThreadSingleAssignmentVar<T>::delref();
}
private:
ThreadFuture<T> future;
ThreadFuture<Void> abortSignal;
ThreadSpinLock lock;
bool hasBeenSet;
bool callbacksCleared;
void cancelCallbacks() {
lock.enter();
if (!callbacksCleared) {
2017-05-26 04:48:44 +08:00
callbacksCleared = true;
lock.leave();
future.getPtr()->addref(); // Cancel will delref our future, but we don't want to destroy it until this
// callback gets destroyed
2017-05-26 04:48:44 +08:00
future.getPtr()->cancel();
if (abortSignal.clearCallback(this)) {
2017-05-26 04:48:44 +08:00
ThreadSingleAssignmentVar<T>::delref();
}
} else {
2017-05-26 04:48:44 +08:00
lock.leave();
}
}
};
template <class T>
2017-05-26 04:48:44 +08:00
ThreadFuture<T> abortableFuture(ThreadFuture<T> f, ThreadFuture<Void> abortSignal) {
return ThreadFuture<T>(new AbortableSingleAssignmentVar<T>(f, abortSignal));
}
template <class T>
class DLThreadSingleAssignmentVar final : public ThreadSingleAssignmentVar<T> {
2017-05-26 04:48:44 +08:00
public:
DLThreadSingleAssignmentVar(Reference<FdbCApi> api,
FdbCApi::FDBFuture* f,
std::function<T(FdbCApi::FDBFuture*, FdbCApi*)> extractValue)
: api(api), f(f), extractValue(extractValue), futureRefCount(1) {
2017-05-26 04:48:44 +08:00
ThreadSingleAssignmentVar<T>::addref();
api->futureSetCallback(f, &futureCallback, this);
}
~DLThreadSingleAssignmentVar() override {
2017-05-26 04:48:44 +08:00
lock.assertNotEntered();
if (f) {
2018-04-23 17:02:13 +08:00
ASSERT_ABORT(futureRefCount == 1);
2017-05-26 04:48:44 +08:00
api->futureDestroy(f);
}
}
bool addFutureRef() {
lock.enter();
bool destroyed = futureRefCount == 0;
if (!destroyed) {
2017-05-26 04:48:44 +08:00
++futureRefCount;
}
lock.leave();
return !destroyed;
}
bool delFutureRef() {
lock.enter();
if (futureRefCount == 0) {
2017-05-26 04:48:44 +08:00
lock.leave();
return true;
}
bool destroyNow = (--futureRefCount == 0);
2017-05-26 04:48:44 +08:00
lock.leave();
if (destroyNow) {
2017-05-26 04:48:44 +08:00
api->futureDestroy(f);
2020-09-21 02:33:09 +08:00
f = nullptr;
2017-05-26 04:48:44 +08:00
}
return destroyNow;
}
void cancel() override {
if (addFutureRef()) {
2017-05-26 04:48:44 +08:00
api->futureCancel(f);
delFutureRef();
}
ThreadSingleAssignmentVar<T>::cancel();
}
void cleanupUnsafe() override {
2017-05-26 04:48:44 +08:00
delFutureRef();
ThreadSingleAssignmentVar<T>::cleanupUnsafe();
}
void apply() {
FdbCApi::fdb_error_t error = addFutureRef() ? api->futureGetError(f) : error_code_operation_cancelled;
if (error != 0) {
2017-05-26 04:48:44 +08:00
delFutureRef();
ThreadSingleAssignmentVar<T>::sendError(Error(error));
} else {
2017-05-26 04:48:44 +08:00
T val = extractValue(f, api.getPtr());
delFutureRef();
ThreadSingleAssignmentVar<T>::send(val);
}
ThreadSingleAssignmentVar<T>::delref();
}
static void futureCallback(FdbCApi::FDBFuture* f, void* param) {
2017-05-26 04:48:44 +08:00
auto sav = (DLThreadSingleAssignmentVar<T>*)param;
if (MultiVersionApi::api->callbackOnMainThread) {
onMainThreadVoid([sav]() { sav->apply(); }, nullptr);
} else {
2017-05-26 04:48:44 +08:00
sav->apply();
}
}
private:
const Reference<FdbCApi> api;
FdbCApi::FDBFuture* f;
const std::function<T(FdbCApi::FDBFuture* f, FdbCApi* api)> extractValue;
2017-05-26 04:48:44 +08:00
ThreadSpinLock lock;
int futureRefCount;
};
template <class T>
ThreadFuture<T> toThreadFuture(Reference<FdbCApi> api,
FdbCApi::FDBFuture* f,
std::function<T(FdbCApi::FDBFuture* f, FdbCApi* api)> extractValue) {
2017-05-26 04:48:44 +08:00
return ThreadFuture<T>(new DLThreadSingleAssignmentVar<T>(api, f, extractValue));
}
template <class S, class T>
class MapSingleAssignmentVar final : public ThreadSingleAssignmentVar<T>, ThreadCallback {
2017-05-26 04:48:44 +08:00
public:
MapSingleAssignmentVar(ThreadFuture<S> source, std::function<ErrorOr<T>(ErrorOr<S>)> mapValue)
: source(source), mapValue(mapValue) {
2017-05-26 04:48:44 +08:00
ThreadSingleAssignmentVar<T>::addref();
int userParam;
source.callOrSetAsCallback(this, userParam, 0);
}
void cancel() override {
source.getPtr()->addref(); // Cancel will delref our future, but we don't want to destroy it until this callback
// gets destroyed
2017-05-26 04:48:44 +08:00
source.getPtr()->cancel();
ThreadSingleAssignmentVar<T>::cancel();
}
void cleanupUnsafe() override {
2017-05-26 04:48:44 +08:00
source.getPtr()->releaseMemory();
ThreadSingleAssignmentVar<T>::cleanupUnsafe();
}
bool canFire(int notMadeActive) const override { return true; }
2017-05-26 04:48:44 +08:00
void fire(const Void& unused, int& userParam) override {
2017-05-26 04:48:44 +08:00
sendResult(mapValue(source.get()));
ThreadSingleAssignmentVar<T>::delref();
}
void error(const Error& e, int& userParam) override {
2017-05-26 04:48:44 +08:00
sendResult(mapValue(source.getError()));
ThreadSingleAssignmentVar<T>::delref();
}
private:
ThreadFuture<S> source;
const std::function<ErrorOr<T>(ErrorOr<S>)> mapValue;
void sendResult(ErrorOr<T> result) {
if (result.isError()) {
2017-05-26 04:48:44 +08:00
ThreadSingleAssignmentVar<T>::sendError(result.getError());
} else {
2017-05-26 04:48:44 +08:00
ThreadSingleAssignmentVar<T>::send(result.get());
}
}
};
template <class S, class T>
2017-05-26 04:48:44 +08:00
ThreadFuture<T> mapThreadFuture(ThreadFuture<S> source, std::function<ErrorOr<T>(ErrorOr<S>)> mapValue) {
return ThreadFuture<T>(new MapSingleAssignmentVar<S, T>(source, mapValue));
}
template <class S, class T>
class FlatMapSingleAssignmentVar final : public ThreadSingleAssignmentVar<T>, ThreadCallback {
2017-05-26 04:48:44 +08:00
public:
FlatMapSingleAssignmentVar(ThreadFuture<S> source, std::function<ErrorOr<ThreadFuture<T>>(ErrorOr<S>)> mapValue)
: source(source), mapValue(mapValue), cancelled(false), released(false) {
2017-05-26 04:48:44 +08:00
ThreadSingleAssignmentVar<T>::addref();
int userParam;
source.callOrSetAsCallback(this, userParam, 0);
}
void cancel() override {
source.getPtr()->addref(); // Cancel will delref our future, but we don't want to destroy it until this callback
// gets destroyed
2017-05-26 04:48:44 +08:00
source.getPtr()->cancel();
lock.enter();
cancelled = true;
if (mappedFuture.isValid()) {
2017-05-26 04:48:44 +08:00
lock.leave();
mappedFuture.getPtr()->addref();
mappedFuture.getPtr()->cancel();
} else {
2017-05-26 04:48:44 +08:00
lock.leave();
}
ThreadSingleAssignmentVar<T>::cancel();
}
void cleanupUnsafe() override {
2017-05-26 04:48:44 +08:00
source.getPtr()->releaseMemory();
lock.enter();
released = true;
if (mappedFuture.isValid()) {
2017-05-26 04:48:44 +08:00
lock.leave();
mappedFuture.getPtr()->releaseMemory();
} else {
2017-05-26 04:48:44 +08:00
lock.leave();
}
ThreadSingleAssignmentVar<T>::cleanupUnsafe();
}
bool canFire(int notMadeActive) const override { return true; }
2017-05-26 04:48:44 +08:00
void fire(const Void& unused, int& userParam) override {
if (mappedFuture.isValid()) {
2017-05-26 04:48:44 +08:00
sendResult(mappedFuture.get());
} else {
2017-05-26 04:48:44 +08:00
setMappedFuture(mapValue(source.get()));
}
ThreadSingleAssignmentVar<T>::delref();
}
void error(const Error& e, int& userParam) override {
if (mappedFuture.isValid()) {
2017-05-26 04:48:44 +08:00
sendResult(mappedFuture.getError());
} else {
2017-05-26 04:48:44 +08:00
setMappedFuture(mapValue(source.getError()));
}
ThreadSingleAssignmentVar<T>::delref();
}
private:
ThreadFuture<S> source;
ThreadFuture<T> mappedFuture;
bool cancelled;
bool released;
const std::function<ErrorOr<ThreadFuture<T>>(ErrorOr<S>)> mapValue;
ThreadSpinLock lock;
void setMappedFuture(ErrorOr<ThreadFuture<T>> f) {
if (f.isError()) {
2017-05-26 04:48:44 +08:00
sendResult(f.getError());
} else {
2017-05-26 04:48:44 +08:00
lock.enter();
mappedFuture = f.get();
bool doCancel = cancelled;
bool doRelease = released;
lock.leave();
if (doCancel) {
2017-05-26 04:48:44 +08:00
mappedFuture.getPtr()->addref();
mappedFuture.getPtr()->cancel();
}
if (doRelease) {
2017-05-26 04:48:44 +08:00
mappedFuture.getPtr()->releaseMemory();
}
int userParam;
ThreadSingleAssignmentVar<T>::addref();
mappedFuture.callOrSetAsCallback(this, userParam, 0);
}
}
void sendResult(ErrorOr<T> result) {
if (result.isError()) {
2017-05-26 04:48:44 +08:00
ThreadSingleAssignmentVar<T>::sendError(result.getError());
} else {
2017-05-26 04:48:44 +08:00
ThreadSingleAssignmentVar<T>::send(result.get());
}
}
};
template <class S, class T>
ThreadFuture<T> flatMapThreadFuture(ThreadFuture<S> source,
std::function<ErrorOr<ThreadFuture<T>>(ErrorOr<S>)> mapValue) {
2017-05-26 04:48:44 +08:00
return ThreadFuture<T>(new FlatMapSingleAssignmentVar<S, T>(source, mapValue));
}
#endif