2017-05-26 04:48:44 +08:00
|
|
|
/*
|
2020-08-12 06:40:35 +08:00
|
|
|
* ThreadSafeTransaction.cpp
|
2017-05-26 04:48:44 +08:00
|
|
|
*
|
|
|
|
* This source file is part of the FoundationDB open source project
|
|
|
|
*
|
|
|
|
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
2018-02-22 02:25:11 +08:00
|
|
|
*
|
2017-05-26 04:48:44 +08:00
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
* You may obtain a copy of the License at
|
2018-02-22 02:25:11 +08:00
|
|
|
*
|
2017-05-26 04:48:44 +08:00
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
2018-02-22 02:25:11 +08:00
|
|
|
*
|
2017-05-26 04:48:44 +08:00
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
* limitations under the License.
|
|
|
|
*/
|
|
|
|
|
2021-11-04 02:19:07 +08:00
|
|
|
#include "fdbclient/BlobGranuleFiles.h"
|
2021-10-11 11:44:56 +08:00
|
|
|
#include "fdbclient/ClusterConnectionFile.h"
|
2018-10-20 01:30:13 +08:00
|
|
|
#include "fdbclient/ThreadSafeTransaction.h"
|
|
|
|
#include "fdbclient/DatabaseContext.h"
|
2020-05-23 00:25:32 +08:00
|
|
|
#include "fdbclient/versions.h"
|
2020-10-20 00:57:11 +08:00
|
|
|
#include "fdbclient/NativeAPI.actor.h"
|
2017-05-26 04:48:44 +08:00
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
// Users of ThreadSafeTransaction might share Reference<ThreadSafe...> between different threads as long as they don't
|
|
|
|
// call addRef (e.g. C API follows this). Therefore, it is unsafe to call (explicitly or implicitly) this->addRef in any
|
|
|
|
// of these functions.
|
2017-05-26 04:48:44 +08:00
|
|
|
|
2018-09-22 06:58:14 +08:00
|
|
|
ThreadFuture<Void> ThreadSafeDatabase::onConnected() {
|
2021-03-11 02:06:03 +08:00
|
|
|
DatabaseContext* db = this->db;
|
|
|
|
return onMainThread([db]() -> Future<Void> {
|
2018-09-22 06:58:14 +08:00
|
|
|
db->checkDeferredError();
|
|
|
|
return db->onConnected();
|
2021-03-11 02:06:03 +08:00
|
|
|
});
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
ThreadFuture<Reference<IDatabase>> ThreadSafeDatabase::createFromExistingDatabase(Database db) {
|
2021-03-11 02:06:03 +08:00
|
|
|
return onMainThread([db]() {
|
2017-05-26 04:48:44 +08:00
|
|
|
db->checkDeferredError();
|
2021-03-11 02:06:03 +08:00
|
|
|
DatabaseContext* cx = db.getPtr();
|
2018-09-27 01:27:55 +08:00
|
|
|
cx->addref();
|
|
|
|
return Future<Reference<IDatabase>>(Reference<IDatabase>(new ThreadSafeDatabase(cx)));
|
2018-09-22 06:58:14 +08:00
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2017-05-26 04:48:44 +08:00
|
|
|
Reference<ITransaction> ThreadSafeDatabase::createTransaction() {
|
2021-08-28 08:07:47 +08:00
|
|
|
auto type = isConfigDB ? ISingleThreadTransaction::Type::SIMPLE_CONFIG : ISingleThreadTransaction::Type::RYW;
|
2021-05-11 09:15:56 +08:00
|
|
|
return Reference<ITransaction>(new ThreadSafeTransaction(db, type));
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
void ThreadSafeDatabase::setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value) {
|
2019-07-03 06:42:53 +08:00
|
|
|
auto itr = FDBDatabaseOptions::optionInfo.find(option);
|
2021-03-11 02:06:03 +08:00
|
|
|
if (itr != FDBDatabaseOptions::optionInfo.end()) {
|
2019-07-03 06:42:53 +08:00
|
|
|
TraceEvent("SetDatabaseOption").detail("Option", itr->second.name);
|
2021-03-11 02:06:03 +08:00
|
|
|
} else {
|
2019-07-03 06:42:53 +08:00
|
|
|
TraceEvent("UnknownDatabaseOption").detail("Option", option);
|
|
|
|
throw invalid_option();
|
|
|
|
}
|
2021-05-11 09:15:56 +08:00
|
|
|
if (itr->first == FDBDatabaseOptions::USE_CONFIG_DATABASE) {
|
|
|
|
isConfigDB = true;
|
|
|
|
}
|
2019-07-03 06:42:53 +08:00
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
DatabaseContext* db = this->db;
|
2017-05-26 04:48:44 +08:00
|
|
|
Standalone<Optional<StringRef>> passValue = value;
|
2019-06-29 04:24:32 +08:00
|
|
|
|
|
|
|
// ThreadSafeDatabase is not allowed to do anything with options except pass them through to RYW.
|
2021-03-11 02:06:03 +08:00
|
|
|
onMainThreadVoid(
|
|
|
|
[db, option, passValue]() {
|
|
|
|
db->checkDeferredError();
|
|
|
|
db->setOption(option, passValue.contents());
|
|
|
|
},
|
2021-06-07 00:23:24 +08:00
|
|
|
&db->deferredError);
|
2018-09-27 01:27:55 +08:00
|
|
|
}
|
|
|
|
|
2021-01-04 14:45:09 +08:00
|
|
|
ThreadFuture<int64_t> ThreadSafeDatabase::rebootWorker(const StringRef& address, bool check, int duration) {
|
2021-03-11 02:06:03 +08:00
|
|
|
DatabaseContext* db = this->db;
|
2020-12-21 14:44:04 +08:00
|
|
|
Key addressKey = address;
|
2021-03-11 02:06:03 +08:00
|
|
|
return onMainThread([db, addressKey, check, duration]() -> Future<int64_t> {
|
2020-12-21 14:44:04 +08:00
|
|
|
return db->rebootWorker(addressKey, check, duration);
|
2021-03-11 02:06:03 +08:00
|
|
|
});
|
2020-08-22 05:30:52 +08:00
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
ThreadFuture<Void> ThreadSafeDatabase::forceRecoveryWithDataLoss(const StringRef& dcid) {
|
|
|
|
DatabaseContext* db = this->db;
|
2021-01-21 17:11:40 +08:00
|
|
|
Key dcidKey = dcid;
|
2021-03-11 02:06:03 +08:00
|
|
|
return onMainThread([db, dcidKey]() -> Future<Void> { return db->forceRecoveryWithDataLoss(dcidKey); });
|
2021-01-21 16:42:23 +08:00
|
|
|
}
|
|
|
|
|
2021-02-09 14:17:16 +08:00
|
|
|
ThreadFuture<Void> ThreadSafeDatabase::createSnapshot(const StringRef& uid, const StringRef& snapshot_command) {
|
2021-03-11 02:06:03 +08:00
|
|
|
DatabaseContext* db = this->db;
|
2021-02-09 13:39:10 +08:00
|
|
|
Key snapUID = uid;
|
|
|
|
Key cmd = snapshot_command;
|
2021-03-11 02:06:03 +08:00
|
|
|
return onMainThread([db, snapUID, cmd]() -> Future<Void> { return db->createSnapshot(snapUID, cmd); });
|
2021-01-27 03:53:35 +08:00
|
|
|
}
|
|
|
|
|
2021-03-24 04:56:37 +08:00
|
|
|
// Return the main network thread busyness
|
2021-03-16 07:23:56 +08:00
|
|
|
double ThreadSafeDatabase::getMainThreadBusyness() {
|
2021-03-17 07:29:02 +08:00
|
|
|
ASSERT(g_network);
|
2021-03-16 07:23:56 +08:00
|
|
|
return g_network->networkInfo.metrics.networkBusyness;
|
|
|
|
}
|
|
|
|
|
2021-04-16 02:45:14 +08:00
|
|
|
// Returns the protocol version reported by the coordinator this client is connected to
|
2021-04-15 03:50:30 +08:00
|
|
|
// If an expected version is given, the future won't return until the protocol version is different than expected
|
2021-04-27 05:39:27 +08:00
|
|
|
// Note: this will never return if the server is running a protocol from FDB 5.0 or older
|
2021-04-15 03:50:30 +08:00
|
|
|
ThreadFuture<ProtocolVersion> ThreadSafeDatabase::getServerProtocol(Optional<ProtocolVersion> expectedVersion) {
|
|
|
|
DatabaseContext* db = this->db;
|
2021-04-16 02:45:14 +08:00
|
|
|
return onMainThread(
|
|
|
|
[db, expectedVersion]() -> Future<ProtocolVersion> { return db->getClusterProtocol(expectedVersion); });
|
2021-04-15 03:50:30 +08:00
|
|
|
}
|
|
|
|
|
2018-09-27 01:27:55 +08:00
|
|
|
ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion) {
|
2021-03-11 02:06:03 +08:00
|
|
|
ClusterConnectionFile* connFile =
|
|
|
|
new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFilename).first);
|
2019-03-15 01:26:22 +08:00
|
|
|
|
|
|
|
// Allocate memory for the Database from this thread (so the pointer is known for subsequent method calls)
|
|
|
|
// but run its constructor on the main thread
|
2021-03-11 02:06:03 +08:00
|
|
|
DatabaseContext* db = this->db = DatabaseContext::allocateOnForeignThread();
|
2019-03-15 01:26:22 +08:00
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
onMainThreadVoid(
|
|
|
|
[db, connFile, apiVersion]() {
|
|
|
|
try {
|
|
|
|
Database::createDatabase(
|
2021-07-17 15:11:40 +08:00
|
|
|
Reference<ClusterConnectionFile>(connFile), apiVersion, IsInternal::False, LocalityData(), db)
|
2021-03-11 02:06:03 +08:00
|
|
|
.extractPtr();
|
|
|
|
} catch (Error& e) {
|
|
|
|
new (db) DatabaseContext(e);
|
|
|
|
} catch (...) {
|
|
|
|
new (db) DatabaseContext(unknown_error());
|
|
|
|
}
|
|
|
|
},
|
|
|
|
nullptr);
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
ThreadSafeDatabase::~ThreadSafeDatabase() {
|
2021-03-11 02:06:03 +08:00
|
|
|
DatabaseContext* db = this->db;
|
|
|
|
onMainThreadVoid([db]() { db->delref(); }, nullptr);
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2021-05-11 09:15:56 +08:00
|
|
|
ThreadSafeTransaction::ThreadSafeTransaction(DatabaseContext* cx, ISingleThreadTransaction::Type type) {
|
2017-05-26 04:48:44 +08:00
|
|
|
// Allocate memory for the transaction from this thread (so the pointer is known for subsequent method calls)
|
|
|
|
// but run its constructor on the main thread
|
|
|
|
|
|
|
|
// It looks strange that the DatabaseContext::addref is deferred by the onMainThreadVoid call, but it is safe
|
|
|
|
// because the reference count of the DatabaseContext is solely managed from the main thread. If cx is destructed
|
|
|
|
// immediately after this call, it will defer the DatabaseContext::delref (and onMainThread preserves the order of
|
|
|
|
// these operations).
|
2021-05-11 06:00:01 +08:00
|
|
|
auto tr = this->tr = ISingleThreadTransaction::allocateOnForeignThread(type);
|
2017-05-26 04:48:44 +08:00
|
|
|
// No deferred error -- if the construction of the RYW transaction fails, we have no where to put it
|
2019-03-26 07:11:50 +08:00
|
|
|
onMainThreadVoid(
|
2021-06-30 01:29:33 +08:00
|
|
|
[tr, cx]() {
|
2019-03-26 07:11:50 +08:00
|
|
|
cx->addref();
|
2021-06-30 01:29:33 +08:00
|
|
|
tr->setDatabase(Database(cx));
|
2019-03-26 07:11:50 +08:00
|
|
|
},
|
2020-09-21 02:33:09 +08:00
|
|
|
nullptr);
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2021-04-23 16:32:30 +08:00
|
|
|
// This constructor is only used while refactoring fdbcli and only called from the main thread
|
|
|
|
ThreadSafeTransaction::ThreadSafeTransaction(ReadYourWritesTransaction* ryw) : tr(ryw) {
|
|
|
|
if (tr)
|
|
|
|
tr->addref();
|
|
|
|
}
|
|
|
|
|
2017-05-26 04:48:44 +08:00
|
|
|
ThreadSafeTransaction::~ThreadSafeTransaction() {
|
2021-04-30 13:31:16 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
2017-05-26 04:48:44 +08:00
|
|
|
if (tr)
|
2021-03-11 02:06:03 +08:00
|
|
|
onMainThreadVoid([tr]() { tr->delref(); }, nullptr);
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
void ThreadSafeTransaction::cancel() {
|
2021-05-11 06:00:01 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
onMainThreadVoid([tr]() { tr->cancel(); }, nullptr);
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
void ThreadSafeTransaction::setVersion(Version v) {
|
2021-05-11 06:00:01 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
onMainThreadVoid([tr, v]() { tr->setVersion(v); }, &tr->deferredError);
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
ThreadFuture<Version> ThreadSafeTransaction::getReadVersion() {
|
2021-05-11 06:00:01 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
return onMainThread([tr]() -> Future<Version> {
|
2021-03-11 02:06:03 +08:00
|
|
|
tr->checkDeferredError();
|
|
|
|
return tr->getReadVersion();
|
|
|
|
});
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
ThreadFuture<Optional<Value>> ThreadSafeTransaction::get(const KeyRef& key, bool snapshot) {
|
2017-05-26 04:48:44 +08:00
|
|
|
Key k = key;
|
|
|
|
|
2021-05-11 06:00:01 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
return onMainThread([tr, k, snapshot]() -> Future<Optional<Value>> {
|
2021-03-11 02:06:03 +08:00
|
|
|
tr->checkDeferredError();
|
2021-07-03 12:41:50 +08:00
|
|
|
return tr->get(k, Snapshot{ snapshot });
|
2021-03-11 02:06:03 +08:00
|
|
|
});
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
ThreadFuture<Key> ThreadSafeTransaction::getKey(const KeySelectorRef& key, bool snapshot) {
|
2017-05-26 04:48:44 +08:00
|
|
|
KeySelector k = key;
|
|
|
|
|
2021-05-11 06:00:01 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
return onMainThread([tr, k, snapshot]() -> Future<Key> {
|
2021-03-11 02:06:03 +08:00
|
|
|
tr->checkDeferredError();
|
2021-07-03 12:41:50 +08:00
|
|
|
return tr->getKey(k, Snapshot{ snapshot });
|
2021-03-11 02:06:03 +08:00
|
|
|
});
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
ThreadFuture<int64_t> ThreadSafeTransaction::getEstimatedRangeSizeBytes(const KeyRangeRef& keys) {
|
2020-01-14 07:45:56 +08:00
|
|
|
KeyRange r = keys;
|
|
|
|
|
2021-05-11 06:00:01 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
return onMainThread([tr, r]() -> Future<int64_t> {
|
2021-03-11 02:06:03 +08:00
|
|
|
tr->checkDeferredError();
|
|
|
|
return tr->getEstimatedRangeSizeBytes(r);
|
|
|
|
});
|
2020-01-14 07:45:56 +08:00
|
|
|
}
|
|
|
|
|
2020-06-19 00:41:50 +08:00
|
|
|
ThreadFuture<Standalone<VectorRef<KeyRef>>> ThreadSafeTransaction::getRangeSplitPoints(const KeyRangeRef& range,
|
|
|
|
int64_t chunkSize) {
|
|
|
|
KeyRange r = range;
|
|
|
|
|
2021-05-11 06:00:01 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
return onMainThread([tr, r, chunkSize]() -> Future<Standalone<VectorRef<KeyRef>>> {
|
2020-06-19 00:41:50 +08:00
|
|
|
tr->checkDeferredError();
|
|
|
|
return tr->getRangeSplitPoints(r, chunkSize);
|
|
|
|
});
|
|
|
|
}
|
2020-01-14 07:45:56 +08:00
|
|
|
|
2021-05-04 04:14:16 +08:00
|
|
|
ThreadFuture<RangeResult> ThreadSafeTransaction::getRange(const KeySelectorRef& begin,
|
|
|
|
const KeySelectorRef& end,
|
|
|
|
int limit,
|
|
|
|
bool snapshot,
|
|
|
|
bool reverse) {
|
2017-05-26 04:48:44 +08:00
|
|
|
KeySelector b = begin;
|
|
|
|
KeySelector e = end;
|
|
|
|
|
2021-04-30 13:31:16 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
2021-05-04 04:14:16 +08:00
|
|
|
return onMainThread([tr, b, e, limit, snapshot, reverse]() -> Future<RangeResult> {
|
2021-03-11 02:06:03 +08:00
|
|
|
tr->checkDeferredError();
|
2021-07-03 12:41:50 +08:00
|
|
|
return tr->getRange(b, e, limit, Snapshot{ snapshot }, Reverse{ reverse });
|
2021-03-11 02:06:03 +08:00
|
|
|
});
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2021-05-04 04:14:16 +08:00
|
|
|
ThreadFuture<RangeResult> ThreadSafeTransaction::getRange(const KeySelectorRef& begin,
|
|
|
|
const KeySelectorRef& end,
|
|
|
|
GetRangeLimits limits,
|
|
|
|
bool snapshot,
|
|
|
|
bool reverse) {
|
2017-05-26 04:48:44 +08:00
|
|
|
KeySelector b = begin;
|
|
|
|
KeySelector e = end;
|
|
|
|
|
2021-04-30 13:31:16 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
2021-05-04 04:14:16 +08:00
|
|
|
return onMainThread([tr, b, e, limits, snapshot, reverse]() -> Future<RangeResult> {
|
2021-03-11 02:06:03 +08:00
|
|
|
tr->checkDeferredError();
|
2021-07-03 12:41:50 +08:00
|
|
|
return tr->getRange(b, e, limits, Snapshot{ snapshot }, Reverse{ reverse });
|
2021-03-11 02:06:03 +08:00
|
|
|
});
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2021-11-10 03:07:09 +08:00
|
|
|
ThreadFuture<RangeResult> ThreadSafeTransaction::getRangeAndFlatMap(const KeySelectorRef& begin,
|
|
|
|
const KeySelectorRef& end,
|
|
|
|
const StringRef& mapper,
|
|
|
|
GetRangeLimits limits,
|
|
|
|
bool snapshot,
|
|
|
|
bool reverse) {
|
|
|
|
KeySelector b = begin;
|
|
|
|
KeySelector e = end;
|
|
|
|
Key h = mapper;
|
|
|
|
|
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
return onMainThread([tr, b, e, h, limits, snapshot, reverse]() -> Future<RangeResult> {
|
|
|
|
tr->checkDeferredError();
|
|
|
|
return tr->getRangeAndFlatMap(b, e, h, limits, Snapshot{ snapshot }, Reverse{ reverse });
|
2021-03-11 02:06:03 +08:00
|
|
|
});
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
ThreadFuture<Standalone<VectorRef<const char*>>> ThreadSafeTransaction::getAddressesForKey(const KeyRef& key) {
|
2017-05-26 04:48:44 +08:00
|
|
|
Key k = key;
|
|
|
|
|
2021-05-11 06:00:01 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
return onMainThread([tr, k]() -> Future<Standalone<VectorRef<const char*>>> {
|
2017-05-26 04:48:44 +08:00
|
|
|
tr->checkDeferredError();
|
|
|
|
return tr->getAddressesForKey(k);
|
2021-03-11 02:06:03 +08:00
|
|
|
});
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2021-11-09 02:24:45 +08:00
|
|
|
ThreadFuture<Standalone<VectorRef<KeyRangeRef>>> ThreadSafeTransaction::getBlobGranuleRanges(
|
|
|
|
const KeyRangeRef& keyRange) {
|
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
KeyRange r = keyRange;
|
|
|
|
|
|
|
|
return onMainThread([tr, r]() -> Future<Standalone<VectorRef<KeyRangeRef>>> {
|
|
|
|
tr->checkDeferredError();
|
|
|
|
return tr->getBlobGranuleRanges(r);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
ThreadFuture<RangeResult> ThreadSafeTransaction::readBlobGranules(const KeyRangeRef& keyRange,
|
|
|
|
Version beginVersion,
|
|
|
|
Optional<Version> readVersion,
|
|
|
|
ReadBlobGranuleContext granule_context) {
|
|
|
|
// In V1 of api this is required, field is just for forward compatibility
|
|
|
|
ASSERT(beginVersion == 0);
|
|
|
|
|
|
|
|
bool doMaterialize = !granule_context.debugNoMaterialize;
|
|
|
|
|
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
KeyRange r = keyRange;
|
|
|
|
|
|
|
|
int64_t readVersionOut;
|
|
|
|
ThreadFuture<Standalone<VectorRef<BlobGranuleChunkRef>>> getFilesFuture = onMainThread(
|
|
|
|
[tr, r, beginVersion, readVersion, &readVersionOut]() -> Future<Standalone<VectorRef<BlobGranuleChunkRef>>> {
|
|
|
|
tr->checkDeferredError();
|
|
|
|
return tr->readBlobGranules(r, beginVersion, readVersion, &readVersionOut);
|
|
|
|
});
|
|
|
|
|
|
|
|
// FIXME: can this safely avoid another main thread jump?
|
|
|
|
getFilesFuture.blockUntilReadyCheckOnMainThread();
|
|
|
|
|
|
|
|
// propagate error to client
|
|
|
|
if (getFilesFuture.isError()) {
|
|
|
|
return ThreadFuture<RangeResult>(getFilesFuture.getError());
|
|
|
|
}
|
|
|
|
|
|
|
|
Standalone<VectorRef<BlobGranuleChunkRef>> files = getFilesFuture.get();
|
|
|
|
|
|
|
|
try {
|
|
|
|
RangeResult results;
|
|
|
|
// FIXME: could submit multiple chunks to start_load_f in parallel?
|
|
|
|
for (BlobGranuleChunkRef& chunk : files) {
|
|
|
|
RangeResult chunkRows;
|
|
|
|
|
|
|
|
int64_t snapshotLoadId;
|
|
|
|
int64_t deltaLoadIds[chunk.deltaFiles.size()];
|
|
|
|
|
|
|
|
// FIXME: move to transactions?
|
|
|
|
if (doMaterialize) {
|
|
|
|
// Start load process for all files in chunk
|
|
|
|
// In V1 of api snapshot is required, optional is just for forward compatibility
|
|
|
|
ASSERT(chunk.snapshotFile.present());
|
|
|
|
std::string snapshotFname = chunk.snapshotFile.get().filename.toString();
|
|
|
|
snapshotLoadId = granule_context.start_load_f(snapshotFname.c_str(),
|
|
|
|
snapshotFname.size(),
|
|
|
|
chunk.snapshotFile.get().offset,
|
|
|
|
chunk.snapshotFile.get().length,
|
|
|
|
granule_context.userContext);
|
|
|
|
int64_t deltaLoadIds[chunk.deltaFiles.size()];
|
|
|
|
int64_t deltaLoadLengths[chunk.deltaFiles.size()];
|
|
|
|
StringRef deltaData[chunk.deltaFiles.size()];
|
|
|
|
for (int deltaFileIdx = 0; deltaFileIdx < chunk.deltaFiles.size(); deltaFileIdx++) {
|
|
|
|
std::string deltaFName = chunk.deltaFiles[deltaFileIdx].filename.toString();
|
|
|
|
deltaLoadIds[deltaFileIdx] = granule_context.start_load_f(deltaFName.c_str(),
|
|
|
|
deltaFName.size(),
|
|
|
|
chunk.deltaFiles[deltaFileIdx].offset,
|
|
|
|
chunk.deltaFiles[deltaFileIdx].length,
|
|
|
|
granule_context.userContext);
|
|
|
|
deltaLoadLengths[deltaFileIdx] = chunk.deltaFiles[deltaFileIdx].length;
|
|
|
|
}
|
|
|
|
|
|
|
|
// once all loads kicked off, load data for chunk
|
|
|
|
StringRef snapshotData(granule_context.get_load_f(snapshotLoadId, granule_context.userContext),
|
|
|
|
chunk.snapshotFile.get().length);
|
|
|
|
for (int i = 0; i < chunk.deltaFiles.size(); i++) {
|
|
|
|
deltaData[i] = StringRef(granule_context.get_load_f(deltaLoadIds[i], granule_context.userContext),
|
|
|
|
chunk.deltaFiles[i].length);
|
|
|
|
}
|
|
|
|
|
|
|
|
// materialize rows from chunk
|
|
|
|
chunkRows = materializeBlobGranule(chunk, keyRange, readVersionOut, snapshotData, deltaData);
|
|
|
|
}
|
|
|
|
|
|
|
|
results.arena().dependsOn(chunkRows.arena());
|
|
|
|
results.append(results.arena(), chunkRows.begin(), chunkRows.size());
|
|
|
|
|
|
|
|
if (doMaterialize) {
|
|
|
|
granule_context.free_load_f(snapshotLoadId, granule_context.userContext);
|
|
|
|
for (int i = 0; i < chunk.deltaFiles.size(); i++) {
|
|
|
|
granule_context.free_load_f(deltaLoadIds[i], granule_context.userContext);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return results;
|
|
|
|
} catch (Error& e) {
|
|
|
|
return ThreadFuture<RangeResult>(e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
void ThreadSafeTransaction::addReadConflictRange(const KeyRangeRef& keys) {
|
2017-05-26 04:48:44 +08:00
|
|
|
KeyRange r = keys;
|
|
|
|
|
2021-05-11 06:00:01 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
onMainThreadVoid([tr, r]() { tr->addReadConflictRange(r); }, &tr->deferredError);
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
void ThreadSafeTransaction::makeSelfConflicting() {
|
2021-05-11 06:00:01 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
onMainThreadVoid([tr]() { tr->makeSelfConflicting(); }, &tr->deferredError);
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
void ThreadSafeTransaction::atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) {
|
2017-05-26 04:48:44 +08:00
|
|
|
Key k = key;
|
|
|
|
Value v = value;
|
|
|
|
|
2021-05-11 06:00:01 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
onMainThreadVoid([tr, k, v, operationType]() { tr->atomicOp(k, v, operationType); }, &tr->deferredError);
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
void ThreadSafeTransaction::set(const KeyRef& key, const ValueRef& value) {
|
2017-05-26 04:48:44 +08:00
|
|
|
Key k = key;
|
|
|
|
Value v = value;
|
|
|
|
|
2021-05-11 06:00:01 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
onMainThreadVoid([tr, k, v]() { tr->set(k, v); }, &tr->deferredError);
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
void ThreadSafeTransaction::clear(const KeyRangeRef& range) {
|
2017-05-26 04:48:44 +08:00
|
|
|
KeyRange r = range;
|
|
|
|
|
2021-05-11 06:00:01 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
onMainThreadVoid([tr, r]() { tr->clear(r); }, &tr->deferredError);
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
void ThreadSafeTransaction::clear(const KeyRef& begin, const KeyRef& end) {
|
2017-05-26 04:48:44 +08:00
|
|
|
Key b = begin;
|
|
|
|
Key e = end;
|
|
|
|
|
2021-05-11 06:00:01 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
2021-03-11 02:06:03 +08:00
|
|
|
onMainThreadVoid(
|
2021-05-11 06:00:01 +08:00
|
|
|
[tr, b, e]() {
|
2021-03-11 02:06:03 +08:00
|
|
|
if (b > e)
|
|
|
|
throw inverted_range();
|
2017-05-26 04:48:44 +08:00
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
tr->clear(KeyRangeRef(b, e));
|
|
|
|
},
|
2021-05-11 05:09:08 +08:00
|
|
|
&tr->deferredError);
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
void ThreadSafeTransaction::clear(const KeyRef& key) {
|
2017-05-26 04:48:44 +08:00
|
|
|
Key k = key;
|
|
|
|
|
2021-05-11 06:00:01 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
onMainThreadVoid([tr, k]() { tr->clear(k); }, &tr->deferredError);
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
ThreadFuture<Void> ThreadSafeTransaction::watch(const KeyRef& key) {
|
2017-05-26 04:48:44 +08:00
|
|
|
Key k = key;
|
|
|
|
|
2021-05-11 06:00:01 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
return onMainThread([tr, k]() -> Future<Void> {
|
2017-05-26 04:48:44 +08:00
|
|
|
tr->checkDeferredError();
|
|
|
|
return tr->watch(k);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
void ThreadSafeTransaction::addWriteConflictRange(const KeyRangeRef& keys) {
|
2017-05-26 04:48:44 +08:00
|
|
|
KeyRange r = keys;
|
|
|
|
|
2021-05-11 06:00:01 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
onMainThreadVoid([tr, r]() { tr->addWriteConflictRange(r); }, &tr->deferredError);
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
ThreadFuture<Void> ThreadSafeTransaction::commit() {
|
2021-05-11 06:00:01 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
return onMainThread([tr]() -> Future<Void> {
|
2021-03-11 02:06:03 +08:00
|
|
|
tr->checkDeferredError();
|
|
|
|
return tr->commit();
|
|
|
|
});
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
Version ThreadSafeTransaction::getCommittedVersion() {
|
|
|
|
// This should be thread safe when called legally, but it is fragile
|
2019-06-26 07:32:27 +08:00
|
|
|
return tr->getCommittedVersion();
|
|
|
|
}
|
|
|
|
|
2019-06-29 01:15:37 +08:00
|
|
|
ThreadFuture<int64_t> ThreadSafeTransaction::getApproximateSize() {
|
2021-05-11 06:00:01 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
return onMainThread([tr]() -> Future<int64_t> { return tr->getApproximateSize(); });
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
ThreadFuture<Standalone<StringRef>> ThreadSafeTransaction::getVersionstamp() {
|
2021-05-11 06:00:01 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
return onMainThread([tr]() -> Future<Standalone<StringRef>> { return tr->getVersionstamp(); });
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
void ThreadSafeTransaction::setOption(FDBTransactionOptions::Option option, Optional<StringRef> value) {
|
2019-07-11 09:48:54 +08:00
|
|
|
auto itr = FDBTransactionOptions::optionInfo.find(option);
|
2021-03-11 02:06:03 +08:00
|
|
|
if (itr == FDBTransactionOptions::optionInfo.end()) {
|
2019-07-11 09:48:54 +08:00
|
|
|
TraceEvent("UnknownTransactionOption").detail("Option", option);
|
|
|
|
throw invalid_option();
|
|
|
|
}
|
2021-05-11 06:00:01 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
2017-05-26 04:48:44 +08:00
|
|
|
Standalone<Optional<StringRef>> passValue = value;
|
2019-06-29 04:24:32 +08:00
|
|
|
|
|
|
|
// ThreadSafeTransaction is not allowed to do anything with options except pass them through to RYW.
|
2021-05-11 06:00:01 +08:00
|
|
|
onMainThreadVoid([tr, option, passValue]() { tr->setOption(option, passValue.contents()); }, &tr->deferredError);
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
ThreadFuture<Void> ThreadSafeTransaction::checkDeferredError() {
|
2021-05-11 06:00:01 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
return onMainThread([tr]() {
|
2017-05-26 04:48:44 +08:00
|
|
|
try {
|
|
|
|
tr->checkDeferredError();
|
2021-03-11 02:06:03 +08:00
|
|
|
} catch (Error& e) {
|
2021-05-11 05:09:08 +08:00
|
|
|
tr->deferredError = Error();
|
2017-05-26 04:48:44 +08:00
|
|
|
return Future<Void>(e);
|
|
|
|
}
|
|
|
|
return Future<Void>(Void());
|
2021-03-11 02:06:03 +08:00
|
|
|
});
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
ThreadFuture<Void> ThreadSafeTransaction::onError(Error const& e) {
|
2021-05-11 06:00:01 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
return onMainThread([tr, e]() { return tr->onError(e); });
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2020-06-10 08:33:41 +08:00
|
|
|
void ThreadSafeTransaction::operator=(ThreadSafeTransaction&& r) noexcept {
|
2017-05-26 04:48:44 +08:00
|
|
|
tr = r.tr;
|
2020-09-21 02:33:09 +08:00
|
|
|
r.tr = nullptr;
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2020-06-10 08:33:41 +08:00
|
|
|
ThreadSafeTransaction::ThreadSafeTransaction(ThreadSafeTransaction&& r) noexcept {
|
2017-05-26 04:48:44 +08:00
|
|
|
tr = r.tr;
|
2020-09-21 02:33:09 +08:00
|
|
|
r.tr = nullptr;
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
void ThreadSafeTransaction::reset() {
|
2021-05-11 06:00:01 +08:00
|
|
|
ISingleThreadTransaction* tr = this->tr;
|
|
|
|
onMainThreadVoid([tr]() { tr->reset(); }, nullptr);
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2019-11-16 04:26:51 +08:00
|
|
|
extern const char* getSourceVersion();
|
2017-05-26 04:48:44 +08:00
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
ThreadSafeApi::ThreadSafeApi()
|
|
|
|
: apiVersion(-1), clientVersion(format("%s,%s,%llx", FDB_VT_VERSION, getSourceVersion(), currentProtocolVersion)),
|
|
|
|
transportId(0) {}
|
2017-05-26 04:48:44 +08:00
|
|
|
|
|
|
|
void ThreadSafeApi::selectApiVersion(int apiVersion) {
|
|
|
|
this->apiVersion = apiVersion;
|
|
|
|
}
|
|
|
|
|
|
|
|
const char* ThreadSafeApi::getClientVersion() {
|
|
|
|
// There is only one copy of the ThreadSafeAPI, and it never gets deleted. Also, clientVersion is never modified.
|
|
|
|
return clientVersion.c_str();
|
|
|
|
}
|
|
|
|
|
|
|
|
void ThreadSafeApi::setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value) {
|
|
|
|
if (option == FDBNetworkOptions::EXTERNAL_CLIENT_TRANSPORT_ID) {
|
2021-03-11 02:06:03 +08:00
|
|
|
if (value.present()) {
|
2017-05-26 04:48:44 +08:00
|
|
|
transportId = std::stoull(value.get().toString().c_str());
|
|
|
|
}
|
2021-03-11 02:06:03 +08:00
|
|
|
} else {
|
2017-05-26 04:48:44 +08:00
|
|
|
::setNetworkOption(option, value);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void ThreadSafeApi::setupNetwork() {
|
|
|
|
::setupNetwork(transportId);
|
|
|
|
}
|
|
|
|
|
|
|
|
void ThreadSafeApi::runNetwork() {
|
2018-05-09 07:33:43 +08:00
|
|
|
Optional<Error> runErr;
|
|
|
|
try {
|
|
|
|
::runNetwork();
|
2021-03-11 02:06:03 +08:00
|
|
|
} catch (Error& e) {
|
2021-10-14 04:36:17 +08:00
|
|
|
TraceEvent(SevError, "RunNetworkError").error(e);
|
2018-05-09 07:33:43 +08:00
|
|
|
runErr = e;
|
2021-10-14 04:36:17 +08:00
|
|
|
} catch (std::exception& e) {
|
|
|
|
runErr = unknown_error();
|
|
|
|
TraceEvent(SevError, "RunNetworkError").error(unknown_error()).detail("RootException", e.what());
|
|
|
|
} catch (...) {
|
|
|
|
runErr = unknown_error();
|
|
|
|
TraceEvent(SevError, "RunNetworkError").error(unknown_error());
|
2018-05-09 07:33:43 +08:00
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
for (auto& hook : threadCompletionHooks) {
|
2018-05-09 07:33:43 +08:00
|
|
|
try {
|
|
|
|
hook.first(hook.second);
|
2021-03-11 02:06:03 +08:00
|
|
|
} catch (Error& e) {
|
2018-05-09 07:33:43 +08:00
|
|
|
TraceEvent(SevError, "NetworkShutdownHookError").error(e);
|
2021-10-14 04:36:17 +08:00
|
|
|
} catch (std::exception& e) {
|
|
|
|
TraceEvent(SevError, "NetworkShutdownHookError").error(unknown_error()).detail("RootException", e.what());
|
2021-03-11 02:06:03 +08:00
|
|
|
} catch (...) {
|
2018-05-09 07:33:43 +08:00
|
|
|
TraceEvent(SevError, "NetworkShutdownHookError").error(unknown_error());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
if (runErr.present()) {
|
2018-05-09 07:33:43 +08:00
|
|
|
throw runErr.get();
|
|
|
|
}
|
2021-10-14 04:36:17 +08:00
|
|
|
|
|
|
|
TraceEvent("RunNetworkTerminating");
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
void ThreadSafeApi::stopNetwork() {
|
|
|
|
::stopNetwork();
|
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
Reference<IDatabase> ThreadSafeApi::createDatabase(const char* clusterFilePath) {
|
2018-09-27 01:27:55 +08:00
|
|
|
return Reference<IDatabase>(new ThreadSafeDatabase(clusterFilePath, apiVersion));
|
2017-05-26 04:48:44 +08:00
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
void ThreadSafeApi::addNetworkThreadCompletionHook(void (*hook)(void*), void* hookParameter) {
|
2018-05-09 07:33:43 +08:00
|
|
|
if (!g_network) {
|
|
|
|
throw network_not_setup();
|
|
|
|
}
|
|
|
|
|
2021-03-11 02:06:03 +08:00
|
|
|
MutexHolder holder(lock); // We could use the network thread to protect this action, but then we can't guarantee
|
|
|
|
// upon return that the hook is set.
|
2021-05-11 07:32:02 +08:00
|
|
|
threadCompletionHooks.emplace_back(hook, hookParameter);
|
2018-05-09 07:33:43 +08:00
|
|
|
}
|