foundationdb/fdbserver/worker.actor.cpp

2486 lines
99 KiB
C++
Raw Normal View History

2017-05-26 04:48:44 +08:00
/*
* worker.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
2017-05-26 04:48:44 +08:00
* http://www.apache.org/licenses/LICENSE-2.0
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <tuple>
#include <boost/lexical_cast.hpp>
2020-02-26 07:00:18 +08:00
#include "fdbrpc/Locality.h"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/ProcessInterface.h"
2020-02-26 07:00:18 +08:00
#include "fdbclient/StorageServerInterface.h"
2020-05-09 08:14:42 +08:00
#include "fdbserver/Knobs.h"
2017-05-26 04:48:44 +08:00
#include "flow/ActorCollection.h"
2020-10-06 01:07:51 +08:00
#include "flow/ProtocolVersion.h"
2017-05-26 04:48:44 +08:00
#include "flow/SystemMonitor.h"
#include "flow/TDMetric.actor.h"
#include "fdbrpc/simulator.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/MetricLogger.actor.h"
#include "fdbserver/BackupInterface.h"
#include "fdbserver/RoleLineage.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/TesterInterface.actor.h" // for poisson()
#include "fdbserver/IDiskQueue.h"
2017-05-26 04:48:44 +08:00
#include "fdbclient/DatabaseContext.h"
#include "fdbserver/DataDistributorInterface.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/FDBExecHelper.actor.h"
2017-05-26 04:48:44 +08:00
#include "fdbserver/CoordinationInterface.h"
#include "fdbserver/LocalConfiguration.h"
2017-05-26 04:48:44 +08:00
#include "fdbclient/MonitorLeader.h"
#include "fdbclient/ClientWorkerInterface.h"
#include "flow/Profiler.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/Trace.h"
#include "flow/flow.h"
#include "flow/network.h"
2017-05-26 04:48:44 +08:00
#ifdef __linux__
#include <fcntl.h>
#include <stdio.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
2020-02-02 02:00:06 +08:00
#endif
#if defined(__linux__) || defined(__FreeBSD__)
2017-05-26 04:48:44 +08:00
#ifdef USE_GPERFTOOLS
#include "gperftools/profiler.h"
#include "gperftools/heap-profiler.h"
2017-05-26 04:48:44 +08:00
#endif
#include <unistd.h>
#include <thread>
#include <execinfo.h>
#endif
#include "flow/actorcompiler.h" // This must be the last #include.
2017-05-26 04:48:44 +08:00
#if CENABLED(0, NOT_IN_CLEAN)
extern IKeyValueStore* keyValueStoreCompressTestData(IKeyValueStore* store);
#define KV_STORE(filename, uid) keyValueStoreCompressTestData(keyValueStoreSQLite(filename, uid))
2017-05-26 04:48:44 +08:00
#elif CENABLED(0, NOT_IN_CLEAN)
#define KV_STORE(filename, uid) keyValueStoreSQLite(filename, uid)
2017-05-26 04:48:44 +08:00
#else
#define KV_STORE(filename, uid) keyValueStoreMemory(filename, uid)
2017-05-26 04:48:44 +08:00
#endif
namespace {
RoleLineageCollector roleLineageCollector;
}
ACTOR Future<std::vector<Endpoint>> tryDBInfoBroadcast(RequestStream<UpdateServerDBInfoRequest> stream,
UpdateServerDBInfoRequest req) {
ErrorOr<std::vector<Endpoint>> rep =
wait(stream.getReplyUnlessFailedFor(req, SERVER_KNOBS->DBINFO_FAILED_DELAY, 0));
if (rep.present()) {
2020-04-11 08:02:11 +08:00
return rep.get();
}
2020-04-11 08:02:11 +08:00
req.broadcastInfo.push_back(stream.getEndpoint());
return req.broadcastInfo;
}
ACTOR Future<std::vector<Endpoint>> broadcastDBInfoRequest(UpdateServerDBInfoRequest req,
int sendAmount,
Optional<Endpoint> sender,
bool sendReply) {
state std::vector<Future<std::vector<Endpoint>>> replies;
state ReplyPromise<std::vector<Endpoint>> reply = req.reply;
resetReply(req);
int currentStream = 0;
2020-04-11 04:45:16 +08:00
std::vector<Endpoint> broadcastEndpoints = req.broadcastInfo;
for (int i = 0; i < sendAmount && currentStream < broadcastEndpoints.size(); i++) {
2020-04-07 11:58:43 +08:00
std::vector<Endpoint> endpoints;
2020-04-11 04:45:16 +08:00
RequestStream<UpdateServerDBInfoRequest> cur(broadcastEndpoints[currentStream++]);
while (currentStream < broadcastEndpoints.size() * (i + 1) / sendAmount) {
2020-04-11 04:45:16 +08:00
endpoints.push_back(broadcastEndpoints[currentStream++]);
}
2020-04-07 11:58:43 +08:00
req.broadcastInfo = endpoints;
replies.push_back(tryDBInfoBroadcast(cur, req));
resetReply(req);
}
wait(waitForAll(replies));
std::vector<Endpoint> notUpdated;
if (sender.present()) {
notUpdated.push_back(sender.get());
}
for (auto& it : replies) {
notUpdated.insert(notUpdated.end(), it.get().begin(), it.get().end());
}
if (sendReply) {
reply.send(notUpdated);
}
return notUpdated;
}
2021-07-12 12:11:21 +08:00
ACTOR static Future<Void> extractClientInfo(Reference<AsyncVar<ServerDBInfo> const> db,
Reference<AsyncVar<ClientDBInfo>> info) {
2020-09-11 08:44:15 +08:00
state std::vector<UID> lastCommitProxyUIDs;
state std::vector<CommitProxyInterface> lastCommitProxies;
2020-07-15 15:37:41 +08:00
state std::vector<UID> lastGrvProxyUIDs;
state std::vector<GrvProxyInterface> lastGrvProxies;
2017-05-26 04:48:44 +08:00
loop {
ClientDBInfo ni = db->get().client;
2020-09-11 08:44:15 +08:00
shrinkProxyList(ni, lastCommitProxyUIDs, lastCommitProxies, lastGrvProxyUIDs, lastGrvProxies);
info->set(ni);
wait(db->onChange());
2017-05-26 04:48:44 +08:00
}
}
2021-07-12 12:11:21 +08:00
Database openDBOnServer(Reference<AsyncVar<ServerDBInfo> const> const& db,
TaskPriority taskID,
LockAware lockAware,
EnableLocalityLoadBalance enableLocalityLoadBalance) {
2020-11-07 15:50:55 +08:00
auto info = makeReference<AsyncVar<ClientDBInfo>>();
auto cx = DatabaseContext::create(info,
extractClientInfo(db, info),
enableLocalityLoadBalance ? db->get().myLocality : LocalityData(),
enableLocalityLoadBalance,
taskID,
lockAware);
GlobalConfig::create(cx, db, std::addressof(db->get().client));
2021-06-08 09:01:14 +08:00
GlobalConfig::globalConfig().trigger(samplingFrequency, samplingProfilerUpdateFrequency);
GlobalConfig::globalConfig().trigger(samplingWindow, samplingProfilerUpdateWindow);
return cx;
2017-05-26 04:48:44 +08:00
}
struct ErrorInfo {
Error error;
const Role& role;
2017-05-26 04:48:44 +08:00
UID id;
ErrorInfo(Error e, const Role& role, UID id) : error(e), role(role), id(id) {}
template <class Ar>
void serialize(Ar&) {
ASSERT(false);
}
2017-05-26 04:48:44 +08:00
};
Error checkIOTimeout(Error const& e) {
// Convert all_errors to io_timeout if global timeout bool was set
bool timeoutOccurred = (bool)g_network->global(INetwork::enASIOTimedOut);
// In simulation, have to check global timed out flag for both this process and the machine process on which IO is
// done
if (g_network->isSimulated() && !timeoutOccurred)
timeoutOccurred = g_pSimulator->getCurrentProcess()->machine->machineProcess->global(INetwork::enASIOTimedOut);
if (timeoutOccurred) {
TEST(true); // Timeout occurred
Error timeout = io_timeout();
// Preserve injectedness of error
if (e.isInjectedFault())
timeout = timeout.asInjectedFault();
return timeout;
}
return e;
}
ACTOR Future<Void> forwardError(PromiseStream<ErrorInfo> errors, Role role, UID id, Future<Void> process) {
2017-05-26 04:48:44 +08:00
try {
wait(process);
errors.send(ErrorInfo(success(), role, id));
2017-05-26 04:48:44 +08:00
return Void();
} catch (Error& e) {
errors.send(ErrorInfo(e, role, id));
2017-05-26 04:48:44 +08:00
return Void();
}
}
ACTOR Future<Void> handleIOErrors(Future<Void> actor, IClosable* store, UID id, Future<Void> onClosed = Void()) {
state Future<ErrorOr<Void>> storeError = actor.isReady() ? Never() : errorOr(store->getError());
2017-05-26 04:48:44 +08:00
choose {
when(state ErrorOr<Void> e = wait(errorOr(actor))) {
2018-10-20 09:55:35 +08:00
if (e.isError() && e.getError().code() == error_code_please_reboot) {
// no need to wait.
} else {
wait(onClosed);
}
if (e.isError() && e.getError().code() == error_code_broken_promise && !storeError.isReady()) {
wait(delay(0.00001 + FLOW_KNOBS->MAX_BUGGIFIED_DELAY));
}
if (storeError.isReady())
throw storeError.get().getError();
if (e.isError())
throw e.getError();
else
return e.get();
2017-05-26 04:48:44 +08:00
}
when(ErrorOr<Void> e = wait(storeError)) {
2017-05-26 04:48:44 +08:00
TraceEvent("WorkerTerminatingByIOError", id).error(e.getError(), true);
actor.cancel();
// file_not_found can occur due to attempting to open a partially deleted DiskQueue, which should not be
// reported SevError.
2017-05-26 04:48:44 +08:00
if (e.getError().code() == error_code_file_not_found) {
TEST(true); // Worker terminated with file_not_found error
return Void();
}
throw e.getError();
}
}
}
ACTOR Future<Void> workerHandleErrors(FutureStream<ErrorInfo> errors) {
2017-05-26 04:48:44 +08:00
loop choose {
when(ErrorInfo _err = waitNext(errors)) {
ErrorInfo err = _err;
bool ok = err.error.code() == error_code_success || err.error.code() == error_code_please_reboot ||
err.error.code() == error_code_actor_cancelled ||
err.error.code() == error_code_coordinators_changed || // The worker server was cancelled
err.error.code() == error_code_shutdown_in_progress;
2017-05-26 04:48:44 +08:00
if (!ok) {
err.error = checkIOTimeout(err.error); // Possibly convert error to io_timeout
}
endRole(err.role, err.id, "Error", ok, err.error);
2017-05-26 04:48:44 +08:00
if (err.error.code() == error_code_please_reboot ||
(err.role == Role::SHARED_TRANSACTION_LOG &&
(err.error.code() == error_code_io_error || err.error.code() == error_code_io_timeout)))
throw err.error;
2017-05-26 04:48:44 +08:00
}
}
}
// Improve simulation code coverage by sometimes deferring the destruction of workerInterface (and therefore "endpoint
// not found" responses to clients
2017-05-26 04:48:44 +08:00
// for an extra second, so that clients are more likely to see broken_promise errors
ACTOR template <class T>
Future<Void> zombie(T workerInterface, Future<Void> worker) {
2017-05-26 04:48:44 +08:00
try {
wait(worker);
2017-05-26 04:48:44 +08:00
if (BUGGIFY)
wait(delay(1.0));
2017-05-26 04:48:44 +08:00
return Void();
} catch (Error& e) {
throw;
}
}
ACTOR Future<Void> loadedPonger(FutureStream<LoadedPingRequest> pings) {
state Standalone<StringRef> payloadBack(std::string(20480, '.'));
2017-05-26 04:48:44 +08:00
loop {
LoadedPingRequest pong = waitNext(pings);
2017-05-26 04:48:44 +08:00
LoadedReply rep;
rep.payload = (pong.loadReply ? payloadBack : LiteralStringRef(""));
rep.id = pong.id;
pong.reply.send(rep);
2017-05-26 04:48:44 +08:00
}
}
StringRef fileStoragePrefix = LiteralStringRef("storage-");
2021-03-06 03:28:15 +08:00
StringRef testingStoragePrefix = LiteralStringRef("testingstorage-");
2017-05-26 04:48:44 +08:00
StringRef fileLogDataPrefix = LiteralStringRef("log-");
StringRef fileVersionedLogDataPrefix = LiteralStringRef("log2-");
2017-05-26 04:48:44 +08:00
StringRef fileLogQueuePrefix = LiteralStringRef("logqueue-");
StringRef tlogQueueExtension = LiteralStringRef("fdq");
enum class FilesystemCheck {
FILES_ONLY,
DIRECTORIES_ONLY,
FILES_AND_DIRECTORIES,
};
struct KeyValueStoreSuffix {
KeyValueStoreType type;
std::string suffix;
FilesystemCheck check;
};
KeyValueStoreSuffix bTreeV1Suffix = { KeyValueStoreType::SSD_BTREE_V1, ".fdb", FilesystemCheck::FILES_ONLY };
KeyValueStoreSuffix bTreeV2Suffix = { KeyValueStoreType::SSD_BTREE_V2, ".sqlite", FilesystemCheck::FILES_ONLY };
KeyValueStoreSuffix memorySuffix = { KeyValueStoreType::MEMORY, "-0.fdq", FilesystemCheck::FILES_ONLY };
KeyValueStoreSuffix memoryRTSuffix = { KeyValueStoreType::MEMORY_RADIXTREE, "-0.fdr", FilesystemCheck::FILES_ONLY };
KeyValueStoreSuffix redwoodSuffix = { KeyValueStoreType::SSD_REDWOOD_V1, ".redwood", FilesystemCheck::FILES_ONLY };
KeyValueStoreSuffix rocksdbSuffix = { KeyValueStoreType::SSD_ROCKSDB_V1,
".rocksdb",
FilesystemCheck::DIRECTORIES_ONLY };
2017-05-26 04:48:44 +08:00
std::string validationFilename = "_validate";
std::string filenameFromSample(KeyValueStoreType storeType, std::string folder, std::string sample_filename) {
if (storeType == KeyValueStoreType::SSD_BTREE_V1)
return joinPath(folder, sample_filename);
else if (storeType == KeyValueStoreType::SSD_BTREE_V2)
return joinPath(folder, sample_filename);
else if (storeType == KeyValueStoreType::MEMORY || storeType == KeyValueStoreType::MEMORY_RADIXTREE)
return joinPath(folder, sample_filename.substr(0, sample_filename.size() - 5));
else if (storeType == KeyValueStoreType::SSD_REDWOOD_V1)
return joinPath(folder, sample_filename);
2020-06-16 00:45:36 +08:00
else if (storeType == KeyValueStoreType::SSD_ROCKSDB_V1)
return joinPath(folder, sample_filename);
2017-05-26 04:48:44 +08:00
UNREACHABLE();
}
std::string filenameFromId(KeyValueStoreType storeType, std::string folder, std::string prefix, UID id) {
2021-03-06 03:28:15 +08:00
if (storeType == KeyValueStoreType::SSD_BTREE_V1)
return joinPath(folder, prefix + id.toString() + ".fdb");
2017-05-26 04:48:44 +08:00
else if (storeType == KeyValueStoreType::SSD_BTREE_V2)
return joinPath(folder, prefix + id.toString() + ".sqlite");
else if (storeType == KeyValueStoreType::MEMORY || storeType == KeyValueStoreType::MEMORY_RADIXTREE)
return joinPath(folder, prefix + id.toString() + "-");
else if (storeType == KeyValueStoreType::SSD_REDWOOD_V1)
return joinPath(folder, prefix + id.toString() + ".redwood");
2020-06-16 00:45:36 +08:00
else if (storeType == KeyValueStoreType::SSD_ROCKSDB_V1)
return joinPath(folder, prefix + id.toString() + ".rocksdb");
2017-05-26 04:48:44 +08:00
2021-05-13 02:53:20 +08:00
TraceEvent(SevError, "UnknownStoreType").detail("StoreType", storeType.toString());
2017-05-26 04:48:44 +08:00
UNREACHABLE();
}
struct TLogOptions {
TLogOptions() = default;
TLogOptions(TLogVersion v, TLogSpillType s) : version(v), spillType(s) {}
TLogVersion version = TLogVersion::DEFAULT;
TLogSpillType spillType = TLogSpillType::UNSET;
static ErrorOr<TLogOptions> FromStringRef(StringRef s) {
TLogOptions options;
for (StringRef key = s.eat("_"), value = s.eat("_"); s.size() != 0 || key.size();
key = s.eat("_"), value = s.eat("_")) {
if (key.size() != 0 && value.size() == 0)
return default_error_or();
if (key == LiteralStringRef("V")) {
ErrorOr<TLogVersion> tLogVersion = TLogVersion::FromStringRef(value);
if (tLogVersion.isError())
return tLogVersion.getError();
options.version = tLogVersion.get();
} else if (key == LiteralStringRef("LS")) {
ErrorOr<TLogSpillType> tLogSpillType = TLogSpillType::FromStringRef(value);
if (tLogSpillType.isError())
return tLogSpillType.getError();
options.spillType = tLogSpillType.get();
} else {
return default_error_or();
}
}
return options;
}
bool operator==(const TLogOptions& o) {
return version == o.version && (spillType == o.spillType || version >= TLogVersion::V5);
}
std::string toPrefix() const {
std::string toReturn = "";
switch (version) {
case TLogVersion::UNSET:
ASSERT(false);
case TLogVersion::V2:
return "";
case TLogVersion::V3:
case TLogVersion::V4:
toReturn =
"V_" + boost::lexical_cast<std::string>(version) + "_LS_" + boost::lexical_cast<std::string>(spillType);
break;
case TLogVersion::V5:
2020-09-05 07:57:36 +08:00
case TLogVersion::V6:
toReturn = "V_" + boost::lexical_cast<std::string>(version);
break;
}
ASSERT_WE_THINK(FromStringRef(toReturn).get() == *this);
return toReturn + "-";
}
};
TLogFn tLogFnForOptions(TLogOptions options) {
switch (options.version) {
case TLogVersion::V2:
if (options.spillType == TLogSpillType::REFERENCE)
ASSERT(false);
return oldTLog_6_0::tLog;
case TLogVersion::V3:
case TLogVersion::V4:
if (options.spillType == TLogSpillType::VALUE)
return oldTLog_6_0::tLog;
else
return oldTLog_6_2::tLog;
case TLogVersion::V5:
case TLogVersion::V6:
return tLog;
default:
ASSERT(false);
}
return tLog;
}
2017-05-26 04:48:44 +08:00
struct DiskStore {
enum COMPONENT { TLogData, Storage, UNSET };
2017-05-26 04:48:44 +08:00
UID storeID = UID();
std::string filename = ""; // For KVStoreMemory just the base filename to be passed to IDiskQueue
COMPONENT storedComponent = UNSET;
KeyValueStoreType storeType = KeyValueStoreType::END;
TLogOptions tLogOptions;
2017-05-26 04:48:44 +08:00
};
std::vector<DiskStore> getDiskStores(std::string folder,
std::string suffix,
KeyValueStoreType type,
FilesystemCheck check) {
std::vector<DiskStore> result;
vector<std::string> files;
if (check == FilesystemCheck::FILES_ONLY || check == FilesystemCheck::FILES_AND_DIRECTORIES) {
files = platform::listFiles(folder, suffix);
}
if (check == FilesystemCheck::DIRECTORIES_ONLY || check == FilesystemCheck::FILES_AND_DIRECTORIES) {
for (const auto& directory : platform::listDirectories(folder)) {
if (StringRef(directory).endsWith(suffix)) {
files.push_back(directory);
}
2020-06-19 01:21:14 +08:00
}
}
2017-05-26 04:48:44 +08:00
for (int idx = 0; idx < files.size(); idx++) {
2017-05-26 04:48:44 +08:00
DiskStore store;
store.storeType = type;
StringRef filename = StringRef(files[idx]);
Standalone<StringRef> prefix;
if (filename.startsWith(fileStoragePrefix)) {
2017-05-26 04:48:44 +08:00
store.storedComponent = DiskStore::Storage;
prefix = fileStoragePrefix;
2021-03-06 03:28:15 +08:00
} else if (filename.startsWith(testingStoragePrefix)) {
store.storedComponent = DiskStore::Storage;
prefix = testingStoragePrefix;
} else if (filename.startsWith(fileVersionedLogDataPrefix)) {
store.storedComponent = DiskStore::TLogData;
// Use the option string that's in the file rather than tLogOptions.toPrefix(),
// because they might be different if a new option was introduced in this version.
StringRef optionsString = filename.removePrefix(fileVersionedLogDataPrefix).eat("-");
TraceEvent("DiskStoreVersioned").detail("Filename", filename);
ErrorOr<TLogOptions> tLogOptions = TLogOptions::FromStringRef(optionsString);
if (tLogOptions.isError()) {
TraceEvent(SevWarn, "DiskStoreMalformedFilename").detail("Filename", filename);
continue;
}
TraceEvent("DiskStoreVersionedSuccess").detail("Filename", filename);
store.tLogOptions = tLogOptions.get();
prefix = filename.substr(0, fileVersionedLogDataPrefix.size() + optionsString.size() + 1);
} else if (filename.startsWith(fileLogDataPrefix)) {
TraceEvent("DiskStoreUnversioned").detail("Filename", filename);
2017-05-26 04:48:44 +08:00
store.storedComponent = DiskStore::TLogData;
store.tLogOptions.version = TLogVersion::V2;
store.tLogOptions.spillType = TLogSpillType::VALUE;
2017-05-26 04:48:44 +08:00
prefix = fileLogDataPrefix;
} else {
2017-05-26 04:48:44 +08:00
continue;
}
2017-05-26 04:48:44 +08:00
store.storeID = UID::fromString(files[idx].substr(prefix.size(), 32));
store.filename = filenameFromSample(type, folder, files[idx]);
result.push_back(store);
2017-05-26 04:48:44 +08:00
}
return result;
}
std::vector<DiskStore> getDiskStores(std::string folder) {
auto result = getDiskStores(folder, bTreeV1Suffix.suffix, bTreeV1Suffix.type, bTreeV1Suffix.check);
auto result1 = getDiskStores(folder, bTreeV2Suffix.suffix, bTreeV2Suffix.type, bTreeV2Suffix.check);
result.insert(result.end(), result1.begin(), result1.end());
auto result2 = getDiskStores(folder, memorySuffix.suffix, memorySuffix.type, memorySuffix.check);
result.insert(result.end(), result2.begin(), result2.end());
auto result3 = getDiskStores(folder, redwoodSuffix.suffix, redwoodSuffix.type, redwoodSuffix.check);
result.insert(result.end(), result3.begin(), result3.end());
auto result4 = getDiskStores(folder, memoryRTSuffix.suffix, memoryRTSuffix.type, memoryRTSuffix.check);
result.insert(result.end(), result4.begin(), result4.end());
auto result5 = getDiskStores(folder, rocksdbSuffix.suffix, rocksdbSuffix.type, rocksdbSuffix.check);
result.insert(result.end(), result5.begin(), result5.end());
2017-05-26 04:48:44 +08:00
return result;
}
// Register the worker interf to cluster controller (cc) and
// re-register the worker when key roles interface, e.g., cc, dd, ratekeeper, change.
ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControllerFullInterface>> const> ccInterface,
WorkerInterface interf,
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
ProcessClass initialClass,
Reference<AsyncVar<Optional<DataDistributorInterface>> const> ddInterf,
Reference<AsyncVar<Optional<RatekeeperInterface>> const> rkInterf,
Reference<AsyncVar<bool> const> degraded,
Reference<ClusterConnectionFile> connFile,
2021-08-06 09:50:11 +08:00
Reference<AsyncVar<std::set<std::string>> const> issues,
LocalConfiguration* localConfig) {
2017-05-26 04:48:44 +08:00
// Keeps the cluster controller (as it may be re-elected) informed that this worker exists
// The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply
// (requiring us to re-register) The registration request piggybacks optional distributor interface if it exists.
2017-05-26 04:48:44 +08:00
state Generation requestGeneration = 0;
state ProcessClass processClass = initialClass;
state Reference<AsyncVar<Optional<std::pair<uint16_t, StorageServerInterface>>>> scInterf(
new AsyncVar<Optional<std::pair<uint16_t, StorageServerInterface>>>());
2019-11-13 05:01:29 +08:00
state Future<Void> cacheProcessFuture;
state Future<Void> cacheErrorsFuture;
state Optional<double> incorrectTime;
2017-05-26 04:48:44 +08:00
loop {
RegisterWorkerRequest request(interf,
initialClass,
processClass,
asyncPriorityInfo->get(),
requestGeneration++,
ddInterf->get(),
rkInterf->get(),
2021-08-06 09:50:11 +08:00
degraded->get(),
localConfig->lastSeenVersion(),
localConfig->configClassSet());
for (auto const& i : issues->get()) {
request.issues.push_back_deep(request.issues.arena(), i);
}
ClusterConnectionString fileConnectionString;
if (connFile && !connFile->fileContentsUpToDate(fileConnectionString)) {
request.issues.push_back_deep(request.issues.arena(), LiteralStringRef("incorrect_cluster_file_contents"));
std::string connectionString = connFile->getConnectionString().toString();
if (!incorrectTime.present()) {
incorrectTime = now();
}
if (connFile->canGetFilename()) {
// Don't log a SevWarnAlways initially to account for transient issues (e.g. someone else changing the
// file right before us)
TraceEvent(now() - incorrectTime.get() > 300 ? SevWarnAlways : SevWarn, "IncorrectClusterFileContents")
.detail("Filename", connFile->getFilename())
.detail("ConnectionStringFromFile", fileConnectionString.toString())
.detail("CurrentConnectionString", connectionString);
}
} else {
incorrectTime = Optional<double>();
}
auto peers = FlowTransport::transport().getIncompatiblePeers();
for (auto it = peers->begin(); it != peers->end();) {
if (now() - it->second.second > FLOW_KNOBS->INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING) {
2020-04-07 11:58:43 +08:00
request.incompatiblePeers.push_back(it->first);
it = peers->erase(it);
} else {
it++;
}
}
state bool ccInterfacePresent = ccInterface->get().present();
state Future<RegisterWorkerReply> registrationReply =
ccInterfacePresent ? brokenPromiseToNever(ccInterface->get().get().registerWorker.getReply(request))
: Never();
state double startTime = now();
loop choose {
when(RegisterWorkerReply reply = wait(registrationReply)) {
processClass = reply.processClass;
asyncPriorityInfo->set(reply.priorityInfo);
TraceEvent("WorkerRegisterReply")
.detail("CCID", ccInterface->get().get().id())
.detail("ProcessClass", reply.processClass.toString());
break;
}
when(wait(delay(SERVER_KNOBS->UNKNOWN_CC_TIMEOUT))) {
if (!ccInterfacePresent) {
TraceEvent(SevWarn, "WorkerRegisterTimeout").detail("WaitTime", now() - startTime);
}
}
when(wait(ccInterface->onChange())) { break; }
when(wait(ddInterf->onChange())) { break; }
when(wait(rkInterf->onChange())) { break; }
when(wait(degraded->onChange())) { break; }
when(wait(FlowTransport::transport().onIncompatibleChanged())) { break; }
when(wait(issues->onChange())) { break; }
}
2017-05-26 04:48:44 +08:00
}
}
// Returns true if `address` is used in the db (indicated by `dbInfo`) transaction system and in the db's primary DC.
bool addressInDbAndPrimaryDc(const NetworkAddress& address, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
const auto& dbi = dbInfo->get();
2021-06-18 01:13:08 +08:00
if (dbi.master.addresses().contains(address)) {
return true;
}
if (dbi.distributor.present() && dbi.distributor.get().address() == address) {
return true;
}
if (dbi.ratekeeper.present() && dbi.ratekeeper.get().address() == address) {
return true;
}
for (const auto& resolver : dbi.resolvers) {
if (resolver.address() == address) {
return true;
}
}
for (const auto& grvProxy : dbi.client.grvProxies) {
if (grvProxy.addresses().contains(address)) {
return true;
}
}
for (const auto& commitProxy : dbi.client.commitProxies) {
if (commitProxy.addresses().contains(address)) {
return true;
}
}
auto localityIsInPrimaryDc = [&dbInfo](const LocalityData& locality) {
return locality.dcId() == dbInfo->get().master.locality.dcId();
};
for (const auto& logSet : dbi.logSystemConfig.tLogs) {
for (const auto& tlog : logSet.tLogs) {
if (!tlog.present()) {
continue;
}
if (!localityIsInPrimaryDc(tlog.interf().filteredLocality)) {
continue;
}
2021-06-18 01:13:08 +08:00
if (tlog.interf().addresses().contains(address)) {
return true;
}
}
}
return false;
}
bool addressesInDbAndPrimaryDc(const NetworkAddressList& addresses, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
return addressInDbAndPrimaryDc(addresses.address, dbInfo) ||
(addresses.secondaryAddress.present() && addressInDbAndPrimaryDc(addresses.secondaryAddress.get(), dbInfo));
}
namespace {
TEST_CASE("/fdbserver/worker/addressInDbAndPrimaryDc") {
// Setup a ServerDBInfo for test.
ServerDBInfo testDbInfo;
LocalityData testLocal;
testLocal.set(LiteralStringRef("dcid"), StringRef(std::to_string(1)));
testDbInfo.master.locality = testLocal;
// Manually set up a master address.
NetworkAddress testAddress(IPAddress(0x13131313), 1);
testDbInfo.master.changeCoordinators =
RequestStream<struct ChangeCoordinatorsRequest>(Endpoint({ testAddress }, UID(1, 2)));
// First, create an empty TLogInterface, and check that it shouldn't be considered as in primary DC.
testDbInfo.logSystemConfig.tLogs.push_back(TLogSet());
testDbInfo.logSystemConfig.tLogs.back().tLogs.push_back(OptionalInterface<TLogInterface>());
ASSERT(!addressInDbAndPrimaryDc(g_network->getLocalAddress(), makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
// Create a remote TLog. Although the remote TLog also uses the local address, it shouldn't be considered as
// in primary DC given the remote locality.
LocalityData fakeRemote;
fakeRemote.set(LiteralStringRef("dcid"), StringRef(std::to_string(2)));
TLogInterface remoteTlog(fakeRemote);
remoteTlog.initEndpoints();
testDbInfo.logSystemConfig.tLogs.back().tLogs.push_back(OptionalInterface(remoteTlog));
ASSERT(!addressInDbAndPrimaryDc(g_network->getLocalAddress(), makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
// Next, create a local TLog. Now, the local address should be considered as in local DC.
TLogInterface localTlog(testLocal);
localTlog.initEndpoints();
testDbInfo.logSystemConfig.tLogs.back().tLogs.push_back(OptionalInterface(localTlog));
ASSERT(addressInDbAndPrimaryDc(g_network->getLocalAddress(), makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
// Use the master's address to test, which should be considered as in local DC.
testDbInfo.logSystemConfig.tLogs.clear();
ASSERT(addressInDbAndPrimaryDc(testAddress, makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
// Last, tests that proxies included in the ClientDbInfo are considered as local.
NetworkAddress grvProxyAddress(IPAddress(0x26262626), 1);
GrvProxyInterface grvProxyInterf;
grvProxyInterf.getConsistentReadVersion = RequestStream<struct GetReadVersionRequest>(Endpoint({ grvProxyAddress }, UID(1, 2)));
testDbInfo.client.grvProxies.push_back(grvProxyInterf);
ASSERT(addressInDbAndPrimaryDc(grvProxyAddress, makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
NetworkAddress commitProxyAddress(IPAddress(0x37373737), 1);
CommitProxyInterface commitProxyInterf;
commitProxyInterf.commit = RequestStream<struct CommitTransactionRequest>(Endpoint({ commitProxyAddress }, UID(1, 2)));
testDbInfo.client.commitProxies.push_back(commitProxyInterf);
ASSERT(addressInDbAndPrimaryDc(commitProxyAddress, makeReference<AsyncVar<ServerDBInfo>>(testDbInfo)));
return Void();
}
} // namespace
// The actor that actively monitors the health of local and peer servers, and reports anomaly to the cluster controller.
ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFullInterface>> const> ccInterface,
WorkerInterface interf,
LocalityData locality,
Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
loop {
Future<Void> nextHealthCheckDelay = Never();
if (dbInfo->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS &&
2021-06-18 01:13:08 +08:00
addressesInDbAndPrimaryDc(interf.addresses(), dbInfo) && ccInterface->get().present()) {
nextHealthCheckDelay = delay(SERVER_KNOBS->WORKER_HEALTH_MONITOR_INTERVAL);
2021-06-18 01:13:08 +08:00
const auto& allPeers = FlowTransport::transport().getAllPeers();
UpdateWorkerHealthRequest req;
2021-06-18 01:13:08 +08:00
for (const auto& [address, peer] : allPeers) {
if (peer->pingLatencies.getPopulationSize() < SERVER_KNOBS->PEER_LATENCY_CHECK_MIN_POPULATION) {
// Ignore peers that don't have enough samples.
// TODO(zhewu): Currently, FlowTransport latency monitor clears ping latency samples on a regular
// basis, which may affect the measurement count. Currently,
// WORKER_HEALTH_MONITOR_INTERVAL is much smaller than the ping clearance interval, so
// it may be ok. If this ends to be a problem, we need to consider keep track of last
// ping latencies logged.
continue;
}
if (!addressInDbAndPrimaryDc(address, dbInfo)) {
// Ignore the servers that are not in the database's transaction system and not in the primary DC.
// Note that currently we are not monitor storage servers, since lagging in storage servers today
// already can trigger server exclusion by data distributor.
continue;
}
if (peer->pingLatencies.percentile(SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE) >
SERVER_KNOBS->PEER_LATENCY_DEGRADATION_THRESHOLD ||
peer->timeoutCount / (double)(peer->pingLatencies.getPopulationSize()) >
SERVER_KNOBS->PEER_TIMEOUT_PERCENTAGE_DEGRADATION_THRESHOLD) {
// This is a degraded peer.
TraceEvent("HealthMonitorDetectDegradedPeer")
2021-06-18 01:13:08 +08:00
.suppressFor(30)
.detail("Peer", address)
.detail("Elapsed", now() - peer->lastLoggedTime)
.detail("MinLatency", peer->pingLatencies.min())
.detail("MaxLatency", peer->pingLatencies.max())
.detail("MeanLatency", peer->pingLatencies.mean())
.detail("MedianLatency", peer->pingLatencies.median())
.detail("CheckedPercentile", SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE)
.detail("CheckedPercentileLatency",
peer->pingLatencies.percentile(SERVER_KNOBS->PEER_LATENCY_DEGRADATION_PERCENTILE))
.detail("Count", peer->pingLatencies.getPopulationSize())
.detail("TimeoutCount", peer->timeoutCount);
req.degradedPeers.push_back(address);
}
}
if (!req.degradedPeers.empty()) {
req.address = FlowTransport::transport().getLocalAddress();
ccInterface->get().get().updateWorkerHealth.send(req);
}
}
choose {
when(wait(nextHealthCheckDelay)) {}
when(wait(ccInterface->onChange())) {}
when(wait(dbInfo->onChange())) {}
}
}
}
2020-02-02 02:00:06 +08:00
#if (defined(__linux__) || defined(__FreeBSD__)) && defined(USE_GPERFTOOLS)
// A set of threads that should be profiled
2017-05-26 04:48:44 +08:00
std::set<std::thread::id> profiledThreads;
// Returns whether or not a given thread should be profiled
int filter_in_thread(void* arg) {
2017-05-26 04:48:44 +08:00
return profiledThreads.count(std::this_thread::get_id()) > 0 ? 1 : 0;
}
#endif
// Enables the calling thread to be profiled
2017-05-26 04:48:44 +08:00
void registerThreadForProfiling() {
2020-02-02 02:00:06 +08:00
#if (defined(__linux__) || defined(__FreeBSD__)) && defined(USE_GPERFTOOLS)
// Not sure if this is actually needed, but a call to backtrace was advised here:
// http://groups.google.com/group/google-perftools/browse_thread/thread/0dfd74532e038eb8/2686d9f24ac4365f?pli=1
2017-05-26 04:48:44 +08:00
profiledThreads.insert(std::this_thread::get_id());
const int num_levels = 100;
void* pc[num_levels];
2017-05-26 04:48:44 +08:00
backtrace(pc, num_levels);
#endif
}
// Starts or stops the CPU profiler
2017-05-26 04:48:44 +08:00
void updateCpuProfiler(ProfilerRequest req) {
switch (req.type) {
case ProfilerRequest::Type::GPROF:
2020-02-02 02:00:06 +08:00
#if (defined(__linux__) || defined(__FreeBSD__)) && defined(USE_GPERFTOOLS) && !defined(VALGRIND)
switch (req.action) {
case ProfilerRequest::Action::ENABLE: {
const char* path = (const char*)req.outputFile.begin();
ProfilerOptions* options = new ProfilerOptions();
options->filter_in_thread = &filter_in_thread;
2020-08-19 05:18:50 +08:00
options->filter_in_thread_arg = nullptr;
ProfilerStartWithOptions(path, options);
break;
}
case ProfilerRequest::Action::DISABLE:
ProfilerStop();
break;
case ProfilerRequest::Action::RUN:
ASSERT(false); // User should have called runProfiler.
break;
}
2017-05-26 04:48:44 +08:00
#endif
break;
case ProfilerRequest::Type::FLOW:
switch (req.action) {
case ProfilerRequest::Action::ENABLE:
startProfiling(g_network, {}, req.outputFile);
break;
case ProfilerRequest::Action::DISABLE:
stopProfiling();
break;
case ProfilerRequest::Action::RUN:
ASSERT(false); // User should have called runProfiler.
break;
}
break;
default:
ASSERT(false);
break;
2017-05-26 04:48:44 +08:00
}
}
ACTOR Future<Void> runCpuProfiler(ProfilerRequest req) {
if (req.action == ProfilerRequest::Action::RUN) {
req.action = ProfilerRequest::Action::ENABLE;
updateCpuProfiler(req);
wait(delay(req.duration));
req.action = ProfilerRequest::Action::DISABLE;
updateCpuProfiler(req);
return Void();
} else {
updateCpuProfiler(req);
return Void();
2017-05-26 04:48:44 +08:00
}
}
void runHeapProfiler(const char* msg) {
#if defined(__linux__) && defined(USE_GPERFTOOLS) && !defined(VALGRIND)
if (IsHeapProfilerRunning()) {
HeapProfilerDump(msg);
} else {
TraceEvent("ProfilerError").detail("Message", "HeapProfiler not running");
}
#else
TraceEvent("ProfilerError").detail("Message", "HeapProfiler Unsupported");
#endif
}
ACTOR Future<Void> runProfiler(ProfilerRequest req) {
if (req.type == ProfilerRequest::Type::GPROF_HEAP) {
runHeapProfiler("User triggered heap dump");
} else {
wait(runCpuProfiler(req));
}
return Void();
}
bool checkHighMemory(int64_t threshold, bool* error) {
#if defined(__linux__) && defined(USE_GPERFTOOLS) && !defined(VALGRIND)
*error = false;
uint64_t page_size = sysconf(_SC_PAGESIZE);
int fd = open("/proc/self/statm", O_RDONLY | O_CLOEXEC);
if (fd < 0) {
TraceEvent("OpenStatmFileFailure").log();
*error = true;
return false;
}
const int buf_sz = 256;
char stat_buf[buf_sz];
ssize_t stat_nread = read(fd, stat_buf, buf_sz);
if (stat_nread < 0) {
TraceEvent("ReadStatmFileFailure").log();
*error = true;
return false;
}
uint64_t vmsize, rss;
sscanf(stat_buf, "%lu %lu", &vmsize, &rss);
rss *= page_size;
if (rss >= threshold) {
return true;
}
#else
TraceEvent("CheckHighMemoryUnsupported").log();
*error = true;
#endif
return false;
}
// Runs heap profiler when RSS memory usage is high.
ACTOR Future<Void> monitorHighMemory(int64_t threshold) {
if (threshold <= 0)
return Void();
loop {
bool err = false;
bool highmem = checkHighMemory(threshold, &err);
if (err)
break;
if (highmem)
runHeapProfiler("Highmem heap dump");
wait(delay(SERVER_KNOBS->HEAP_PROFILER_INTERVAL));
}
return Void();
}
struct TrackRunningStorage {
UID self;
KeyValueStoreType storeType;
std::set<std::pair<UID, KeyValueStoreType>>* runningStorages;
TrackRunningStorage(UID self,
KeyValueStoreType storeType,
std::set<std::pair<UID, KeyValueStoreType>>* runningStorages)
: self(self), storeType(storeType), runningStorages(runningStorages) {
runningStorages->emplace(self, storeType);
}
~TrackRunningStorage() { runningStorages->erase(std::make_pair(self, storeType)); };
};
ACTOR Future<Void> storageServerRollbackRebooter(std::set<std::pair<UID, KeyValueStoreType>>* runningStorages,
Future<Void> prevStorageServer,
KeyValueStoreType storeType,
std::string filename,
UID id,
LocalityData locality,
2021-03-06 03:28:15 +08:00
bool isTss,
Reference<AsyncVar<ServerDBInfo>> db,
std::string folder,
ActorCollection* filesClosed,
int64_t memoryLimit,
IKeyValueStore* store) {
state TrackRunningStorage _(id, storeType, runningStorages);
2017-05-26 04:48:44 +08:00
loop {
ErrorOr<Void> e = wait(errorOr(prevStorageServer));
if (!e.isError())
return Void();
else if (e.getError().code() != error_code_please_reboot)
throw e.getError();
2017-05-26 04:48:44 +08:00
TraceEvent("StorageServerRequestedReboot", id).log();
2017-05-26 04:48:44 +08:00
StorageServerInterface recruited;
recruited.uniqueID = id;
recruited.locality = locality;
2021-05-13 02:53:20 +08:00
recruited.tssPairID =
isTss ? Optional<UID>(UID()) : Optional<UID>(); // set this here since we use its presence to determine
// whether this server is a tss or not
recruited.initEndpoints();
DUMPTOKEN(recruited.getValue);
DUMPTOKEN(recruited.getKey);
DUMPTOKEN(recruited.getKeyValues);
DUMPTOKEN(recruited.getShardState);
DUMPTOKEN(recruited.waitMetrics);
DUMPTOKEN(recruited.splitMetrics);
DUMPTOKEN(recruited.getReadHotRanges);
DUMPTOKEN(recruited.getRangeSplitPoints);
2019-07-26 07:27:32 +08:00
DUMPTOKEN(recruited.getStorageMetrics);
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.getQueuingMetrics);
DUMPTOKEN(recruited.getKeyValueStoreType);
DUMPTOKEN(recruited.watchValue);
DUMPTOKEN(recruited.getKeyValuesStream);
prevStorageServer =
storageServer(store, recruited, db, folder, Promise<Void>(), Reference<ClusterConnectionFile>(nullptr));
2018-10-20 09:55:35 +08:00
prevStorageServer = handleIOErrors(prevStorageServer, store, id, store->onClosed());
2017-05-26 04:48:44 +08:00
}
}
ACTOR Future<Void> storageCacheRollbackRebooter(Future<Void> prevStorageCache,
UID id,
LocalityData locality,
Reference<AsyncVar<ServerDBInfo>> db) {
2020-03-03 09:11:23 +08:00
loop {
ErrorOr<Void> e = wait(errorOr(prevStorageCache));
2020-03-03 09:11:23 +08:00
if (!e.isError()) {
TraceEvent("StorageCacheRequestedReboot1", id).log();
2020-03-03 09:11:23 +08:00
return Void();
} else if (e.getError().code() != error_code_please_reboot &&
e.getError().code() != error_code_worker_removed) {
TraceEvent("StorageCacheRequestedReboot2", id).detail("Code", e.getError().code());
2020-03-03 09:11:23 +08:00
throw e.getError();
}
TraceEvent("StorageCacheRequestedReboot", id).log();
2020-03-03 09:11:23 +08:00
StorageServerInterface recruited;
recruited.uniqueID = deterministicRandom()->randomUniqueID(); // id;
2020-03-03 09:11:23 +08:00
recruited.locality = locality;
recruited.initEndpoints();
DUMPTOKEN(recruited.getValue);
DUMPTOKEN(recruited.getKey);
DUMPTOKEN(recruited.getKeyValues);
DUMPTOKEN(recruited.getShardState);
DUMPTOKEN(recruited.waitMetrics);
DUMPTOKEN(recruited.splitMetrics);
DUMPTOKEN(recruited.getStorageMetrics);
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.getQueuingMetrics);
DUMPTOKEN(recruited.getKeyValueStoreType);
DUMPTOKEN(recruited.watchValue);
prevStorageCache = storageCacheServer(recruited, 0, db);
}
}
2017-05-26 04:48:44 +08:00
// FIXME: This will not work correctly in simulation as all workers would share the same roles map
std::set<std::pair<std::string, std::string>> g_roles;
Standalone<StringRef> roleString(std::set<std::pair<std::string, std::string>> roles, bool with_ids) {
std::string result;
for (auto& r : roles) {
if (!result.empty())
2017-05-26 04:48:44 +08:00
result.append(",");
result.append(r.first);
if (with_ids) {
2017-05-26 04:48:44 +08:00
result.append(":");
result.append(r.second);
}
}
return StringRef(result);
}
void startRole(const Role& role,
UID roleId,
UID workerId,
const std::map<std::string, std::string>& details,
const std::string& origination) {
if (role.includeInTraceRoles) {
addTraceRole(role.abbreviation);
}
2017-05-26 04:48:44 +08:00
TraceEvent ev("Role", roleId);
ev.detail("As", role.roleName)
.detail("Transition", "Begin")
.detail("Origination", origination)
.detail("OnWorker", workerId);
for (auto it = details.begin(); it != details.end(); it++)
2017-05-26 04:48:44 +08:00
ev.detail(it->first.c_str(), it->second);
ev.trackLatest(roleId.shortString() + ".Role");
2017-05-26 04:48:44 +08:00
// Update roles map, log Roles metrics
g_roles.insert({ role.roleName, roleId.shortString() });
2017-05-26 04:48:44 +08:00
StringMetricHandle(LiteralStringRef("Roles")) = roleString(g_roles, false);
StringMetricHandle(LiteralStringRef("RolesWithIDs")) = roleString(g_roles, true);
if (g_network->isSimulated())
g_simulator.addRole(g_network->getLocalAddress(), role.roleName);
2017-05-26 04:48:44 +08:00
}
void endRole(const Role& role, UID id, std::string reason, bool ok, Error e) {
2017-05-26 04:48:44 +08:00
{
TraceEvent ev("Role", id);
if (e.code() != invalid_error_code)
2017-05-26 04:48:44 +08:00
ev.error(e, true);
ev.detail("Transition", "End").detail("As", role.roleName).detail("Reason", reason);
2017-05-26 04:48:44 +08:00
ev.trackLatest(id.shortString() + ".Role");
2017-05-26 04:48:44 +08:00
}
if (!ok) {
std::string type = role.roleName + "Failed";
2017-05-26 04:48:44 +08:00
TraceEvent err(SevError, type.c_str(), id);
if (e.code() != invalid_error_code) {
2017-05-26 04:48:44 +08:00
err.error(e, true);
}
err.detail("Reason", reason);
2017-05-26 04:48:44 +08:00
}
latestEventCache.clear(id.shortString());
2017-05-26 04:48:44 +08:00
// Update roles map, log Roles metrics
g_roles.erase({ role.roleName, id.shortString() });
2017-05-26 04:48:44 +08:00
StringMetricHandle(LiteralStringRef("Roles")) = roleString(g_roles, false);
StringMetricHandle(LiteralStringRef("RolesWithIDs")) = roleString(g_roles, true);
if (g_network->isSimulated())
g_simulator.removeRole(g_network->getLocalAddress(), role.roleName);
if (role.includeInTraceRoles) {
removeTraceRole(role.abbreviation);
}
2017-05-26 04:48:44 +08:00
}
ACTOR Future<Void> traceRole(Role role, UID roleId) {
loop {
2020-05-09 08:14:42 +08:00
wait(delay(SERVER_KNOBS->WORKER_LOGGING_INTERVAL));
TraceEvent("Role", roleId).detail("Transition", "Refresh").detail("As", role.roleName);
}
}
2021-03-16 08:34:56 +08:00
ACTOR Future<Void> workerSnapCreate(WorkerSnapRequest snapReq, Standalone<StringRef> snapFolder) {
state ExecCmdValueString snapArg(snapReq.snapPayload);
try {
int err = wait(execHelper(&snapArg, snapReq.snapUID, snapFolder.toString(), snapReq.role.toString()));
std::string uidStr = snapReq.snapUID.toString();
TraceEvent("ExecTraceWorker")
.detail("Uid", uidStr)
.detail("Status", err)
.detail("Role", snapReq.role)
.detail("Value", snapFolder)
.detail("ExecPayload", snapReq.snapPayload);
2019-07-13 07:26:28 +08:00
if (err != 0) {
throw operation_failed();
}
if (snapReq.role.toString() == "storage") {
printStorageVersionInfo();
}
snapReq.reply.send(Void());
} catch (Error& e) {
2019-07-04 08:15:23 +08:00
TraceEvent("ExecHelperError").error(e, true /*includeCancelled*/);
if (e.code() != error_code_operation_cancelled) {
2019-07-04 08:15:23 +08:00
snapReq.reply.sendError(e);
} else {
throw e;
2019-07-04 08:15:23 +08:00
}
}
return Void();
}
// TODO: `issues` is right now only updated by `monitorTraceLogIssues` and thus is being `set` on every update.
// It could be changed to `insert` and `trigger` later if we want to use it as a generic way for the caller of this
// function to report issues to cluster controller.
ACTOR Future<Void> monitorTraceLogIssues(Reference<AsyncVar<std::set<std::string>>> issues) {
state bool pingTimeout = false;
loop {
wait(delay(SERVER_KNOBS->TRACE_LOG_FLUSH_FAILURE_CHECK_INTERVAL_SECONDS));
Future<Void> pingAck = pingTraceLogWriterThread();
try {
wait(timeoutError(pingAck, SERVER_KNOBS->TRACE_LOG_PING_TIMEOUT_SECONDS));
} catch (Error& e) {
if (e.code() == error_code_timed_out) {
pingTimeout = true;
} else {
throw;
}
}
std::set<std::string> _issues;
retrieveTraceLogIssues(_issues);
if (pingTimeout) {
// Ping trace log writer thread timeout.
_issues.insert("trace_log_writer_thread_unresponsive");
pingTimeout = false;
2017-05-26 04:48:44 +08:00
}
issues->set(_issues);
2017-05-26 04:48:44 +08:00
}
}
class SharedLogsKey {
TLogVersion logVersion;
TLogSpillType spillType;
KeyValueStoreType storeType;
public:
SharedLogsKey(const TLogOptions& options, KeyValueStoreType kvst)
: logVersion(options.version), spillType(options.spillType), storeType(kvst) {
if (logVersion >= TLogVersion::V5)
spillType = TLogSpillType::UNSET;
}
bool operator<(const SharedLogsKey& other) const {
return std::tie(logVersion, spillType, storeType) <
std::tie(other.logVersion, other.spillType, other.storeType);
}
};
struct SharedLogsValue {
Future<Void> actor = Void();
UID uid = UID();
PromiseStream<InitializeTLogRequest> requests;
SharedLogsValue() = default;
SharedLogsValue(Future<Void> actor, UID uid, PromiseStream<InitializeTLogRequest> requests)
: actor(actor), uid(uid), requests(requests) {}
};
ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>> const> ccInterface,
LocalityData locality,
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
ProcessClass initialClass,
std::string folder,
int64_t memoryLimit,
std::string metricsConnFile,
std::string metricsPrefix,
Promise<Void> recoveredDiskFiles,
int64_t memoryProfileThreshold,
std::string _coordFolder,
std::string whitelistBinPaths,
2021-08-06 09:50:11 +08:00
Reference<AsyncVar<ServerDBInfo>> dbInfo,
UseConfigDB useConfigDB,
LocalConfiguration* localConfig) {
state PromiseStream<ErrorInfo> errors;
state Reference<AsyncVar<Optional<DataDistributorInterface>>> ddInterf(
new AsyncVar<Optional<DataDistributorInterface>>());
state Reference<AsyncVar<Optional<RatekeeperInterface>>> rkInterf(new AsyncVar<Optional<RatekeeperInterface>>());
state Future<Void> handleErrors = workerHandleErrors(errors.getFuture()); // Needs to be stopped last
2017-05-26 04:48:44 +08:00
state ActorCollection errorForwarders(false);
state Future<Void> loggingTrigger = Void();
state double loggingDelay = SERVER_KNOBS->WORKER_LOGGING_INTERVAL;
state ActorCollection filesClosed(true);
state Promise<Void> stopping;
state WorkerCache<InitializeStorageReply> storageCache;
2017-05-26 04:48:44 +08:00
state Future<Void> metricsLogger;
state Reference<AsyncVar<bool>> degraded = FlowTransport::transport().getDegraded();
// tLogFnForOptions() can return a function that doesn't correspond with the FDB version that the
// TLogVersion represents. This can be done if the newer TLog doesn't support a requested option.
// As (store type, spill type) can map to the same TLogFn across multiple TLogVersions, we need to
// decide if we should collapse them into the same SharedTLog instance as well. The answer
// here is no, so that when running with log_version==3, all files should say V=3.
state std::map<SharedLogsKey, SharedLogsValue> sharedLogs;
state Reference<AsyncVar<UID>> activeSharedTLog(new AsyncVar<UID>());
state WorkerCache<InitializeBackupReply> backupWorkerCache;
2019-04-23 21:55:55 +08:00
state std::string coordFolder = abspath(_coordFolder);
2017-05-26 04:48:44 +08:00
state WorkerInterface interf(locality);
state std::set<std::pair<UID, KeyValueStoreType>> runningStorages;
interf.initEndpoints();
2017-05-26 04:48:44 +08:00
state Reference<AsyncVar<std::set<std::string>>> issues(new AsyncVar<std::set<std::string>>());
folder = abspath(folder);
if (metricsPrefix.size() > 0) {
if (metricsConnFile.size() > 0) {
2017-05-26 04:48:44 +08:00
try {
state Database db =
Database::createDatabase(metricsConnFile, Database::API_VERSION_LATEST, IsInternal::True, locality);
metricsLogger = runMetrics(db, KeyRef(metricsPrefix));
} catch (Error& e) {
2017-05-26 04:48:44 +08:00
TraceEvent(SevWarnAlways, "TDMetricsBadClusterFile").error(e).detail("ConnFile", metricsConnFile);
}
} else {
auto lockAware = metricsPrefix.size() && metricsPrefix[0] == '\xff' ? LockAware::True : LockAware::False;
metricsLogger =
runMetrics(openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, lockAware), KeyRef(metricsPrefix));
2017-05-26 04:48:44 +08:00
}
GlobalConfig::globalConfig().trigger(samplingFrequency, samplingProfilerUpdateFrequency);
2017-05-26 04:48:44 +08:00
}
errorForwarders.add(resetAfter(degraded,
SERVER_KNOBS->DEGRADED_RESET_INTERVAL,
false,
SERVER_KNOBS->DEGRADED_WARNING_LIMIT,
SERVER_KNOBS->DEGRADED_WARNING_RESET_DELAY,
"DegradedReset"));
errorForwarders.add(loadedPonger(interf.debugPing.getFuture()));
errorForwarders.add(waitFailureServer(interf.waitFailure.getFuture()));
errorForwarders.add(monitorTraceLogIssues(issues));
errorForwarders.add(testerServerCore(interf.testerInterface, connFile, dbInfo, locality));
errorForwarders.add(monitorHighMemory(memoryProfileThreshold));
2017-05-26 04:48:44 +08:00
filesClosed.add(stopping.getFuture());
initializeSystemMonitorMachineState(SystemMonitorMachineState(
folder, locality.dcId(), locality.zoneId(), locality.machineId(), g_network->getLocalAddress().ip));
2017-05-26 04:48:44 +08:00
{
2020-12-03 11:52:57 +08:00
auto recruited = interf;
2017-05-26 04:48:44 +08:00
DUMPTOKEN(recruited.clientInterface.reboot);
DUMPTOKEN(recruited.clientInterface.profiler);
2017-05-26 04:48:44 +08:00
DUMPTOKEN(recruited.tLog);
DUMPTOKEN(recruited.master);
2020-09-11 08:44:15 +08:00
DUMPTOKEN(recruited.commitProxy);
2020-07-15 15:37:41 +08:00
DUMPTOKEN(recruited.grvProxy);
2017-05-26 04:48:44 +08:00
DUMPTOKEN(recruited.resolver);
DUMPTOKEN(recruited.storage);
DUMPTOKEN(recruited.debugPing);
DUMPTOKEN(recruited.coordinationPing);
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.setMetricsRate);
DUMPTOKEN(recruited.eventLogRequest);
DUMPTOKEN(recruited.traceBatchDumpRequest);
2020-04-11 04:45:16 +08:00
DUMPTOKEN(recruited.updateServerDBInfo);
2017-05-26 04:48:44 +08:00
}
state std::vector<Future<Void>> recoveries;
2017-05-26 04:48:44 +08:00
try {
std::vector<DiskStore> stores = getDiskStores(folder);
2017-05-26 04:48:44 +08:00
bool validateDataFiles = deleteFile(joinPath(folder, validationFilename));
for (int f = 0; f < stores.size(); f++) {
2017-05-26 04:48:44 +08:00
DiskStore s = stores[f];
// FIXME: Error handling
if (s.storedComponent == DiskStore::Storage) {
2021-06-14 09:23:59 +08:00
LocalLineage _;
2021-05-28 02:25:00 +08:00
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Storage;
IKeyValueStore* kv =
openKVStore(s.storeType, s.filename, s.storeID, memoryLimit, false, validateDataFiles);
2017-05-26 04:48:44 +08:00
Future<Void> kvClosed = kv->onClosed();
filesClosed.add(kvClosed);
2017-05-26 04:48:44 +08:00
2021-03-06 03:28:15 +08:00
// std::string doesn't have startsWith
std::string tssPrefix = testingStoragePrefix.toString();
// TODO might be more efficient to mark a boolean on DiskStore in getDiskStores, but that kind of breaks
// the abstraction since DiskStore also applies to storage cache + tlog
bool isTss = s.filename.find(tssPrefix) != std::string::npos;
Role ssRole = isTss ? Role::TESTING_STORAGE_SERVER : Role::STORAGE_SERVER;
2017-05-26 04:48:44 +08:00
StorageServerInterface recruited;
recruited.uniqueID = s.storeID;
recruited.locality = locality;
2021-05-13 02:53:20 +08:00
recruited.tssPairID =
isTss ? Optional<UID>(UID())
: Optional<UID>(); // presence of optional is used as source of truth for tss vs not. Value
// gets overridden later in restoreDurableState
2017-05-26 04:48:44 +08:00
recruited.initEndpoints();
std::map<std::string, std::string> details;
details["StorageEngine"] = s.storeType.toString();
2021-03-06 03:28:15 +08:00
details["IsTSS"] = isTss ? "Yes" : "No";
startRole(ssRole, recruited.id(), interf.id(), details, "Restored");
2017-05-26 04:48:44 +08:00
DUMPTOKEN(recruited.getValue);
DUMPTOKEN(recruited.getKey);
DUMPTOKEN(recruited.getKeyValues);
DUMPTOKEN(recruited.getShardState);
DUMPTOKEN(recruited.waitMetrics);
DUMPTOKEN(recruited.splitMetrics);
DUMPTOKEN(recruited.getReadHotRanges);
DUMPTOKEN(recruited.getRangeSplitPoints);
2019-07-26 07:27:32 +08:00
DUMPTOKEN(recruited.getStorageMetrics);
2017-05-26 04:48:44 +08:00
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.getQueuingMetrics);
DUMPTOKEN(recruited.getKeyValueStoreType);
DUMPTOKEN(recruited.watchValue);
DUMPTOKEN(recruited.getKeyValuesStream);
2017-05-26 04:48:44 +08:00
Promise<Void> recovery;
Future<Void> f = storageServer(kv, recruited, dbInfo, folder, recovery, connFile);
recoveries.push_back(recovery.getFuture());
f = handleIOErrors(f, kv, s.storeID, kvClosed);
f = storageServerRollbackRebooter(&runningStorages,
f,
s.storeType,
s.filename,
recruited.id(),
recruited.locality,
2021-03-06 03:28:15 +08:00
isTss,
dbInfo,
folder,
&filesClosed,
memoryLimit,
kv);
2021-03-06 03:28:15 +08:00
errorForwarders.add(forwardError(errors, ssRole, recruited.id(), f));
} else if (s.storedComponent == DiskStore::TLogData) {
2021-06-14 09:23:59 +08:00
LocalLineage _;
2021-05-28 02:25:00 +08:00
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::TLog;
std::string logQueueBasename;
const std::string filename = basename(s.filename);
if (StringRef(filename).startsWith(fileLogDataPrefix)) {
logQueueBasename = fileLogQueuePrefix.toString();
} else {
StringRef optionsString = StringRef(filename).removePrefix(fileVersionedLogDataPrefix).eat("-");
logQueueBasename = fileLogQueuePrefix.toString() + optionsString.toString() + "-";
}
ASSERT_WE_THINK(abspath(parentDirectory(s.filename)) == folder);
IKeyValueStore* kv = openKVStore(s.storeType, s.filename, s.storeID, memoryLimit, validateDataFiles);
const DiskQueueVersion dqv =
s.tLogOptions.version >= TLogVersion::V3 ? DiskQueueVersion::V1 : DiskQueueVersion::V0;
const int64_t diskQueueWarnSize =
s.tLogOptions.spillType == TLogSpillType::VALUE ? 10 * SERVER_KNOBS->TARGET_BYTES_PER_TLOG : -1;
IDiskQueue* queue = openDiskQueue(joinPath(folder, logQueueBasename + s.storeID.toString() + "-"),
tlogQueueExtension.toString(),
s.storeID,
dqv,
diskQueueWarnSize);
filesClosed.add(kv->onClosed());
filesClosed.add(queue->onClosed());
2017-05-26 04:48:44 +08:00
std::map<std::string, std::string> details;
details["StorageEngine"] = s.storeType.toString();
startRole(Role::SHARED_TRANSACTION_LOG, s.storeID, interf.id(), details, "Restored");
2017-05-26 04:48:44 +08:00
Promise<Void> oldLog;
Promise<Void> recovery;
TLogFn tLogFn = tLogFnForOptions(s.tLogOptions);
auto& logData = sharedLogs[SharedLogsKey(s.tLogOptions, s.storeType)];
// FIXME: Shouldn't if logData.first isValid && !isReady, shouldn't we
// be sending a fake InitializeTLogRequest rather than calling tLog() ?
Future<Void> tl =
tLogFn(kv,
queue,
dbInfo,
locality,
!logData.actor.isValid() || logData.actor.isReady() ? logData.requests
: PromiseStream<InitializeTLogRequest>(),
s.storeID,
interf.id(),
true,
oldLog,
recovery,
folder,
degraded,
activeSharedTLog);
recoveries.push_back(recovery.getFuture());
activeSharedTLog->set(s.storeID);
tl = handleIOErrors(tl, kv, s.storeID);
tl = handleIOErrors(tl, queue, s.storeID);
if (!logData.actor.isValid() || logData.actor.isReady()) {
logData.actor = oldLog.getFuture() || tl;
logData.uid = s.storeID;
2017-05-26 04:48:44 +08:00
}
errorForwarders.add(forwardError(errors, Role::SHARED_TRANSACTION_LOG, s.storeID, tl));
2017-05-26 04:48:44 +08:00
}
}
bool hasCache = false;
// start cache role if we have the right process class
if (initialClass.classType() == ProcessClass::StorageCacheClass) {
hasCache = true;
StorageServerInterface recruited;
recruited.locality = locality;
recruited.initEndpoints();
std::map<std::string, std::string> details;
startRole(Role::STORAGE_CACHE, recruited.id(), interf.id(), details);
// DUMPTOKEN(recruited.getVersion);
DUMPTOKEN(recruited.getValue);
DUMPTOKEN(recruited.getKey);
DUMPTOKEN(recruited.getKeyValues);
DUMPTOKEN(recruited.getShardState);
DUMPTOKEN(recruited.waitMetrics);
DUMPTOKEN(recruited.splitMetrics);
DUMPTOKEN(recruited.getStorageMetrics);
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.getQueuingMetrics);
DUMPTOKEN(recruited.getKeyValueStoreType);
DUMPTOKEN(recruited.watchValue);
2020-01-25 03:00:50 +08:00
auto f = storageCacheServer(recruited, 0, dbInfo);
f = storageCacheRollbackRebooter(f, recruited.id(), recruited.locality, dbInfo);
errorForwarders.add(forwardError(errors, Role::STORAGE_CACHE, recruited.id(), f));
}
2017-05-26 04:48:44 +08:00
std::map<std::string, std::string> details;
details["Locality"] = locality.toString();
details["DataFolder"] = folder;
details["StoresPresent"] = format("%d", stores.size());
details["CachePresent"] = hasCache ? "true" : "false";
startRole(Role::WORKER, interf.id(), interf.id(), details);
errorForwarders.add(traceRole(Role::WORKER, interf.id()));
2017-05-26 04:48:44 +08:00
wait(waitForAll(recoveries));
recoveredDiskFiles.send(Void());
errorForwarders.add(registrationClient(ccInterface,
interf,
asyncPriorityInfo,
initialClass,
ddInterf,
rkInterf,
degraded,
connFile,
2021-08-06 09:50:11 +08:00
issues,
localConfig));
if (useConfigDB != UseConfigDB::DISABLED) {
errorForwarders.add(localConfig->consume(interf.configBroadcastInterface));
}
if (SERVER_KNOBS->ENABLE_WORKER_HEALTH_MONITOR) {
errorForwarders.add(healthMonitor(ccInterface, interf, locality, dbInfo));
}
TraceEvent("RecoveriesComplete", interf.id());
2017-05-26 04:48:44 +08:00
loop choose {
when(UpdateServerDBInfoRequest req = waitNext(interf.updateServerDBInfo.getFuture())) {
ServerDBInfo localInfo = BinaryReader::fromStringRef<ServerDBInfo>(
req.serializedDbInfo, AssumeVersion(g_network->protocolVersion()));
localInfo.myLocality = locality;
if (localInfo.infoGeneration < dbInfo->get().infoGeneration &&
localInfo.clusterInterface == dbInfo->get().clusterInterface) {
2020-04-12 11:05:03 +08:00
std::vector<Endpoint> rep = req.broadcastInfo;
rep.push_back(interf.updateServerDBInfo.getEndpoint());
req.reply.send(rep);
} else {
Optional<Endpoint> notUpdated;
if (!ccInterface->get().present() || localInfo.clusterInterface != ccInterface->get().get()) {
2020-04-12 11:05:03 +08:00
notUpdated = interf.updateServerDBInfo.getEndpoint();
} else if (localInfo.infoGeneration > dbInfo->get().infoGeneration ||
dbInfo->get().clusterInterface != ccInterface->get().get()) {
TraceEvent("GotServerDBInfoChange")
.detail("ChangeID", localInfo.id)
.detail("MasterID", localInfo.master.id())
.detail("RatekeeperID",
localInfo.ratekeeper.present() ? localInfo.ratekeeper.get().id() : UID())
.detail("DataDistributorID",
localInfo.distributor.present() ? localInfo.distributor.get().id() : UID());
2020-04-12 11:05:03 +08:00
dbInfo->set(localInfo);
}
errorForwarders.add(
success(broadcastDBInfoRequest(req, SERVER_KNOBS->DBINFO_SEND_AMOUNT, notUpdated, true)));
}
}
when(RebootRequest req = waitNext(interf.clientInterface.reboot.getFuture())) {
2017-05-26 04:48:44 +08:00
state RebootRequest rebootReq = req;
// If suspendDuration is INT_MAX, the trace will not be logged if it was inside the next block
// Also a useful trace to have even if suspendDuration is 0
TraceEvent("RebootRequestSuspendingProcess").detail("Duration", req.waitForDuration);
if (req.waitForDuration) {
flushTraceFileVoid();
setProfilingEnabled(0);
g_network->stop();
threadSleep(req.waitForDuration);
}
if (rebootReq.checkData) {
Reference<IAsyncFile> checkFile =
wait(IAsyncFileSystem::filesystem()->open(joinPath(folder, validationFilename),
IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_READWRITE,
0600));
wait(checkFile->sync());
2017-05-26 04:48:44 +08:00
}
if (g_network->isSimulated()) {
TraceEvent("SimulatedReboot").detail("Deletion", rebootReq.deleteData);
if (rebootReq.deleteData) {
2017-05-26 04:48:44 +08:00
throw please_reboot_delete();
}
throw please_reboot();
} else {
TraceEvent("ProcessReboot").log();
2017-05-26 04:48:44 +08:00
ASSERT(!rebootReq.deleteData);
flushAndExit(0);
}
}
when(ProfilerRequest req = waitNext(interf.clientInterface.profiler.getFuture())) {
state ProfilerRequest profilerReq = req;
// There really isn't a great "filepath sanitizer" or "filepath escape" function available,
// thus we instead enforce a different requirement. One can only write to a file that's
// beneath the working directory, and we remove the ability to do any symlink or ../..
// tricks by resolving all paths through `abspath` first.
try {
std::string realLogDir = abspath(SERVER_KNOBS->LOG_DIRECTORY);
std::string realOutPath = abspath(realLogDir + "/" + profilerReq.outputFile.toString());
if (realLogDir.size() < realOutPath.size() &&
strncmp(realLogDir.c_str(), realOutPath.c_str(), realLogDir.size()) == 0) {
profilerReq.outputFile = realOutPath;
uncancellable(runProfiler(profilerReq));
profilerReq.reply.send(Void());
} else {
profilerReq.reply.sendError(client_invalid_operation());
}
} catch (Error& e) {
profilerReq.reply.sendError(e);
}
}
when(RecruitMasterRequest req = waitNext(interf.master.getFuture())) {
2021-06-14 09:23:59 +08:00
LocalLineage _;
2021-05-28 02:25:00 +08:00
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Master;
2017-05-26 04:48:44 +08:00
MasterInterface recruited;
recruited.locality = locality;
recruited.initEndpoints();
startRole(Role::MASTER, recruited.id(), interf.id());
2017-05-26 04:48:44 +08:00
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.tlogRejoin);
DUMPTOKEN(recruited.changeCoordinators);
DUMPTOKEN(recruited.getCommitVersion);
DUMPTOKEN(recruited.getLiveCommittedVersion);
DUMPTOKEN(recruited.reportLiveCommittedVersion);
DUMPTOKEN(recruited.notifyBackupWorkerDone);
// printf("Recruited as masterServer\n");
Future<Void> masterProcess = masterServer(
recruited, dbInfo, ccInterface, ServerCoordinators(connFile), req.lifetime, req.forceRecovery);
errorForwarders.add(
zombie(recruited, forwardError(errors, Role::MASTER, recruited.id(), masterProcess)));
2017-05-26 04:48:44 +08:00
req.reply.send(recruited);
}
when(InitializeDataDistributorRequest req = waitNext(interf.dataDistributor.getFuture())) {
2021-06-14 09:23:59 +08:00
LocalLineage _;
2021-05-28 02:25:00 +08:00
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::DataDistributor;
DataDistributorInterface recruited(locality);
2019-02-01 02:10:41 +08:00
recruited.initEndpoints();
if (ddInterf->get().present()) {
recruited = ddInterf->get().get();
TEST(true); // Recruited while already a data distributor.
} else {
startRole(Role::DATA_DISTRIBUTOR, recruited.id(), interf.id());
DUMPTOKEN(recruited.waitFailure);
Future<Void> dataDistributorProcess = dataDistributor(recruited, dbInfo);
errorForwarders.add(forwardError(
errors,
Role::DATA_DISTRIBUTOR,
recruited.id(),
setWhenDoneOrError(dataDistributorProcess, ddInterf, Optional<DataDistributorInterface>())));
ddInterf->set(Optional<DataDistributorInterface>(recruited));
}
TraceEvent("DataDistributorReceived", req.reqId).detail("DataDistributorId", recruited.id());
req.reply.send(recruited);
}
when(InitializeRatekeeperRequest req = waitNext(interf.ratekeeper.getFuture())) {
2021-06-14 09:23:59 +08:00
LocalLineage _;
2021-05-28 02:25:00 +08:00
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Ratekeeper;
RatekeeperInterface recruited(locality, req.reqId);
recruited.initEndpoints();
if (rkInterf->get().present()) {
recruited = rkInterf->get().get();
TEST(true); // Recruited while already a ratekeeper.
} else {
startRole(Role::RATEKEEPER, recruited.id(), interf.id());
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.getRateInfo);
DUMPTOKEN(recruited.haltRatekeeper);
DUMPTOKEN(recruited.reportCommitCostEstimation);
2019-03-13 02:34:16 +08:00
Future<Void> ratekeeperProcess = ratekeeper(recruited, dbInfo);
errorForwarders.add(
forwardError(errors,
Role::RATEKEEPER,
recruited.id(),
setWhenDoneOrError(ratekeeperProcess, rkInterf, Optional<RatekeeperInterface>())));
rkInterf->set(Optional<RatekeeperInterface>(recruited));
}
TraceEvent("Ratekeeper_InitRequest", req.reqId).detail("RatekeeperId", recruited.id());
req.reply.send(recruited);
}
when(InitializeBackupRequest req = waitNext(interf.backup.getFuture())) {
if (!backupWorkerCache.exists(req.reqId)) {
2021-06-14 09:23:59 +08:00
LocalLineage _;
2021-05-28 02:25:00 +08:00
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Backup;
BackupInterface recruited(locality);
recruited.initEndpoints();
startRole(Role::BACKUP, recruited.id(), interf.id());
DUMPTOKEN(recruited.waitFailure);
ReplyPromise<InitializeBackupReply> backupReady = req.reply;
backupWorkerCache.set(req.reqId, backupReady.getFuture());
Future<Void> backupProcess = backupWorker(recruited, req, dbInfo);
2020-03-24 03:47:42 +08:00
backupProcess = storageCache.removeOnReady(req.reqId, backupProcess);
errorForwarders.add(forwardError(errors, Role::BACKUP, recruited.id(), backupProcess));
TraceEvent("BackupInitRequest", req.reqId).detail("BackupId", recruited.id());
InitializeBackupReply reply(recruited, req.backupEpoch);
backupReady.send(reply);
} else {
forwardPromise(req.reply, backupWorkerCache.get(req.reqId));
}
}
when(InitializeTLogRequest req = waitNext(interf.tLog.getFuture())) {
// For now, there's a one-to-one mapping of spill type to TLogVersion.
// With future work, a particular version of the TLog can support multiple
// different spilling strategies, at which point SpillType will need to be
// plumbed down into tLogFn.
if (req.logVersion < TLogVersion::MIN_RECRUITABLE) {
TraceEvent(SevError, "InitializeTLogInvalidLogVersion")
.detail("Version", req.logVersion)
.detail("MinRecruitable", TLogVersion::MIN_RECRUITABLE);
req.reply.sendError(internal_error());
}
2021-06-14 09:23:59 +08:00
LocalLineage _;
2021-05-28 02:25:00 +08:00
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::TLog;
TLogOptions tLogOptions(req.logVersion, req.spillType);
TLogFn tLogFn = tLogFnForOptions(tLogOptions);
auto& logData = sharedLogs[SharedLogsKey(tLogOptions, req.storeType)];
logData.requests.send(req);
if (!logData.actor.isValid() || logData.actor.isReady()) {
UID logId = deterministicRandom()->randomUniqueID();
2017-05-26 04:48:44 +08:00
std::map<std::string, std::string> details;
details["ForMaster"] = req.recruitmentID.shortString();
details["StorageEngine"] = req.storeType.toString();
// FIXME: start role for every tlog instance, rather that just for the shared actor, also use a
// different role type for the shared actor
startRole(Role::SHARED_TRANSACTION_LOG, logId, interf.id(), details);
const StringRef prefix =
req.logVersion > TLogVersion::V2 ? fileVersionedLogDataPrefix : fileLogDataPrefix;
std::string filename =
filenameFromId(req.storeType, folder, prefix.toString() + tLogOptions.toPrefix(), logId);
IKeyValueStore* data = openKVStore(req.storeType, filename, logId, memoryLimit);
const DiskQueueVersion dqv =
tLogOptions.version >= TLogVersion::V3 ? DiskQueueVersion::V1 : DiskQueueVersion::V0;
IDiskQueue* queue = openDiskQueue(
joinPath(folder,
fileLogQueuePrefix.toString() + tLogOptions.toPrefix() + logId.toString() + "-"),
tlogQueueExtension.toString(),
logId,
dqv);
filesClosed.add(data->onClosed());
filesClosed.add(queue->onClosed());
Future<Void> tLogCore = tLogFn(data,
queue,
dbInfo,
locality,
logData.requests,
logId,
interf.id(),
false,
Promise<Void>(),
Promise<Void>(),
folder,
degraded,
activeSharedTLog);
tLogCore = handleIOErrors(tLogCore, data, logId);
tLogCore = handleIOErrors(tLogCore, queue, logId);
errorForwarders.add(forwardError(errors, Role::SHARED_TRANSACTION_LOG, logId, tLogCore));
logData.actor = tLogCore;
logData.uid = logId;
2017-05-26 04:48:44 +08:00
}
activeSharedTLog->set(logData.uid);
2017-05-26 04:48:44 +08:00
}
when(InitializeStorageRequest req = waitNext(interf.storage.getFuture())) {
2021-06-16 06:49:27 +08:00
// We want to prevent double recruiting on a worker unless we try to recruit something
// with a different storage engine (otherwise storage migration won't work for certain
// configuration). Additionally we also need to allow double recruitment for seed servers.
// The reason for this is that a storage will only remove itself if after it was able
// to read the system key space. But if recovery fails right after a `configure new ...`
// was run it won't be able to do so.
if (!storageCache.exists(req.reqId) &&
(std::all_of(runningStorages.begin(),
runningStorages.end(),
[&req](const auto& p) { return p.second != req.storeType; }) ||
req.seedTag != invalidTag)) {
2021-06-14 09:23:59 +08:00
LocalLineage _;
2021-05-28 02:25:00 +08:00
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Storage;
2021-05-13 02:53:20 +08:00
bool isTss = req.tssPairIDAndVersion.present();
2017-05-26 04:48:44 +08:00
StorageServerInterface recruited(req.interfaceId);
recruited.locality = locality;
2021-05-13 02:53:20 +08:00
recruited.tssPairID = isTss ? req.tssPairIDAndVersion.get().first : Optional<UID>();
2017-05-26 04:48:44 +08:00
recruited.initEndpoints();
std::map<std::string, std::string> details;
details["StorageEngine"] = req.storeType.toString();
2021-05-13 02:53:20 +08:00
details["IsTSS"] = std::to_string(isTss);
Role ssRole = isTss ? Role::TESTING_STORAGE_SERVER : Role::STORAGE_SERVER;
2021-03-06 03:28:15 +08:00
startRole(ssRole, recruited.id(), interf.id(), details);
2017-05-26 04:48:44 +08:00
DUMPTOKEN(recruited.getValue);
DUMPTOKEN(recruited.getKey);
DUMPTOKEN(recruited.getKeyValues);
DUMPTOKEN(recruited.getShardState);
DUMPTOKEN(recruited.waitMetrics);
DUMPTOKEN(recruited.splitMetrics);
DUMPTOKEN(recruited.getReadHotRanges);
DUMPTOKEN(recruited.getRangeSplitPoints);
2019-07-26 07:27:32 +08:00
DUMPTOKEN(recruited.getStorageMetrics);
2017-05-26 04:48:44 +08:00
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.getQueuingMetrics);
DUMPTOKEN(recruited.getKeyValueStoreType);
DUMPTOKEN(recruited.watchValue);
DUMPTOKEN(recruited.getKeyValuesStream);
// printf("Recruited as storageServer\n");
2017-05-26 04:48:44 +08:00
std::string filename =
2021-03-06 03:28:15 +08:00
filenameFromId(req.storeType,
folder,
2021-05-13 02:53:20 +08:00
isTss ? testingStoragePrefix.toString() : fileStoragePrefix.toString(),
2021-03-06 03:28:15 +08:00
recruited.id());
IKeyValueStore* data = openKVStore(req.storeType, filename, recruited.id(), memoryLimit);
2017-05-26 04:48:44 +08:00
Future<Void> kvClosed = data->onClosed();
filesClosed.add(kvClosed);
ReplyPromise<InitializeStorageReply> storageReady = req.reply;
storageCache.set(req.reqId, storageReady.getFuture());
2021-05-13 02:53:20 +08:00
Future<Void> s = storageServer(data,
recruited,
req.seedTag,
isTss ? req.tssPairIDAndVersion.get().second : 0,
storageReady,
dbInfo,
folder);
2017-05-26 04:48:44 +08:00
s = handleIOErrors(s, data, recruited.id(), kvClosed);
s = storageCache.removeOnReady(req.reqId, s);
s = storageServerRollbackRebooter(&runningStorages,
s,
req.storeType,
filename,
recruited.id(),
recruited.locality,
2021-05-13 02:53:20 +08:00
isTss,
dbInfo,
folder,
&filesClosed,
memoryLimit,
data);
2021-03-06 03:28:15 +08:00
errorForwarders.add(forwardError(errors, ssRole, recruited.id(), s));
} else if (storageCache.exists(req.reqId)) {
forwardPromise(req.reply, storageCache.get(req.reqId));
} else {
TraceEvent("AttemptedDoubleRecruitement", interf.id()).detail("ForRole", "StorageServer");
2021-06-16 07:37:05 +08:00
errorForwarders.add(map(delay(0.5), [reply = req.reply](Void) {
reply.sendError(recruitment_failed());
return Void();
}));
}
2017-05-26 04:48:44 +08:00
}
2020-09-11 08:44:15 +08:00
when(InitializeCommitProxyRequest req = waitNext(interf.commitProxy.getFuture())) {
2021-06-14 09:23:59 +08:00
LocalLineage _;
2021-05-28 02:25:00 +08:00
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::CommitProxy;
2020-09-11 08:44:15 +08:00
CommitProxyInterface recruited;
recruited.processId = locality.processId();
recruited.provisional = false;
2017-05-26 04:48:44 +08:00
recruited.initEndpoints();
std::map<std::string, std::string> details;
details["ForMaster"] = req.master.id().shortString();
2020-09-11 08:44:15 +08:00
startRole(Role::COMMIT_PROXY, recruited.id(), interf.id(), details);
2017-05-26 04:48:44 +08:00
DUMPTOKEN(recruited.commit);
DUMPTOKEN(recruited.getConsistentReadVersion);
DUMPTOKEN(recruited.getKeyServersLocations);
DUMPTOKEN(recruited.getStorageServerRejoinInfo);
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.txnState);
2020-09-11 08:44:15 +08:00
// printf("Recruited as commitProxyServer\n");
errorForwarders.add(zombie(recruited,
forwardError(errors,
Role::COMMIT_PROXY,
recruited.id(),
commitProxyServer(recruited, req, dbInfo, whitelistBinPaths))));
2017-05-26 04:48:44 +08:00
req.reply.send(recruited);
}
when(InitializeGrvProxyRequest req = waitNext(interf.grvProxy.getFuture())) {
2021-06-14 09:23:59 +08:00
LocalLineage _;
2021-05-28 02:25:00 +08:00
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::GrvProxy;
2020-07-15 15:37:41 +08:00
GrvProxyInterface recruited;
recruited.processId = locality.processId();
recruited.provisional = false;
recruited.initEndpoints();
std::map<std::string, std::string> details;
details["ForMaster"] = req.master.id().shortString();
startRole(Role::GRV_PROXY, recruited.id(), interf.id(), details);
2020-07-15 15:37:41 +08:00
DUMPTOKEN(recruited.getConsistentReadVersion);
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.getHealthMetrics);
// printf("Recruited as grvProxyServer\n");
errorForwarders.add(zombie(
recruited,
forwardError(errors, Role::GRV_PROXY, recruited.id(), grvProxyServer(recruited, req, dbInfo))));
2020-07-15 15:37:41 +08:00
req.reply.send(recruited);
}
when(InitializeResolverRequest req = waitNext(interf.resolver.getFuture())) {
2021-06-14 09:23:59 +08:00
LocalLineage _;
2021-05-28 02:25:00 +08:00
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Resolver;
2017-05-26 04:48:44 +08:00
ResolverInterface recruited;
recruited.locality = locality;
recruited.initEndpoints();
std::map<std::string, std::string> details;
startRole(Role::RESOLVER, recruited.id(), interf.id(), details);
2017-05-26 04:48:44 +08:00
DUMPTOKEN(recruited.resolve);
DUMPTOKEN(recruited.metrics);
DUMPTOKEN(recruited.split);
DUMPTOKEN(recruited.waitFailure);
errorForwarders.add(zombie(
recruited, forwardError(errors, Role::RESOLVER, recruited.id(), resolver(recruited, req, dbInfo))));
2017-05-26 04:48:44 +08:00
req.reply.send(recruited);
}
when(InitializeLogRouterRequest req = waitNext(interf.logRouter.getFuture())) {
2021-06-14 09:23:59 +08:00
LocalLineage _;
2021-05-28 02:25:00 +08:00
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::LogRouter;
TLogInterface recruited(locality);
recruited.initEndpoints();
std::map<std::string, std::string> details;
startRole(Role::LOG_ROUTER, recruited.id(), interf.id(), details);
DUMPTOKEN(recruited.peekMessages);
DUMPTOKEN(recruited.popMessages);
DUMPTOKEN(recruited.commit);
DUMPTOKEN(recruited.lock);
DUMPTOKEN(recruited.getQueuingMetrics);
DUMPTOKEN(recruited.confirmRunning);
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.recoveryFinished);
DUMPTOKEN(recruited.disablePopRequest);
DUMPTOKEN(recruited.enablePopRequest);
DUMPTOKEN(recruited.snapRequest);
errorForwarders.add(
zombie(recruited,
forwardError(errors, Role::LOG_ROUTER, recruited.id(), logRouter(recruited, req, dbInfo))));
req.reply.send(recruited);
}
when(CoordinationPingMessage m = waitNext(interf.coordinationPing.getFuture())) {
TraceEvent("CoordinationPing", interf.id())
.detail("CCID", m.clusterControllerId)
.detail("TimeStep", m.timeStep);
2017-05-26 04:48:44 +08:00
}
when(SetMetricsLogRateRequest req = waitNext(interf.setMetricsRate.getFuture())) {
TraceEvent("LoggingRateChange", interf.id())
.detail("OldDelay", loggingDelay)
.detail("NewLogPS", req.metricsLogsPerSecond);
if (req.metricsLogsPerSecond != 0) {
2017-05-26 04:48:44 +08:00
loggingDelay = 1.0 / req.metricsLogsPerSecond;
loggingTrigger = Void();
}
}
when(EventLogRequest req = waitNext(interf.eventLogRequest.getFuture())) {
TraceEventFields e;
if (req.getLastError)
e = latestEventCache.getLatestError();
2017-05-26 04:48:44 +08:00
else
e = latestEventCache.get(req.eventName.toString());
2017-05-26 04:48:44 +08:00
req.reply.send(e);
}
when(TraceBatchDumpRequest req = waitNext(interf.traceBatchDumpRequest.getFuture())) {
2017-05-26 04:48:44 +08:00
g_traceBatch.dump();
req.reply.send(Void());
}
when(DiskStoreRequest req = waitNext(interf.diskStoreRequest.getFuture())) {
2017-05-26 04:48:44 +08:00
Standalone<VectorRef<UID>> ids;
for (DiskStore d : getDiskStores(folder)) {
2017-05-26 04:48:44 +08:00
bool included = true;
if (!req.includePartialStores) {
if (d.storeType == KeyValueStoreType::SSD_BTREE_V1) {
2017-05-26 04:48:44 +08:00
included = fileExists(d.filename + ".fdb-wal");
} else if (d.storeType == KeyValueStoreType::SSD_BTREE_V2) {
2017-05-26 04:48:44 +08:00
included = fileExists(d.filename + ".sqlite-wal");
} else if (d.storeType == KeyValueStoreType::SSD_REDWOOD_V1) {
included = fileExists(d.filename + "0.pagerlog") && fileExists(d.filename + "1.pagerlog");
} else if (d.storeType == KeyValueStoreType::SSD_ROCKSDB_V1) {
included = fileExists(joinPath(d.filename, "CURRENT")) &&
fileExists(joinPath(d.filename, "IDENTITY"));
} else if (d.storeType == KeyValueStoreType::MEMORY) {
2017-05-26 04:48:44 +08:00
included = fileExists(d.filename + "1.fdq");
} else {
ASSERT(d.storeType == KeyValueStoreType::MEMORY_RADIXTREE);
included = fileExists(d.filename + "1.fdr");
2017-05-26 04:48:44 +08:00
}
if (d.storedComponent == DiskStore::COMPONENT::TLogData && included) {
included = false;
// The previous code assumed that d.filename is a filename. But that is not true.
// d.filename is a path. Removing a prefix and adding a new one just makes a broken
// directory name. So fileExists would always return false.
// Weirdly, this doesn't break anything, as tested by taking a clean check of FDB,
// setting included to false always, and then running correctness. So I'm just
// improving the situation by actually marking it as broken.
// FIXME: this whole thing
/*
std::string logDataBasename;
StringRef filename = d.filename;
if (filename.startsWith(fileLogDataPrefix)) {
logDataBasename = fileLogQueuePrefix.toString() +
d.filename.substr(fileLogDataPrefix.size()); } else { StringRef optionsString =
filename.removePrefix(fileVersionedLogDataPrefix).eat("-"); logDataBasename =
fileLogQueuePrefix.toString() + optionsString.toString() + "-";
}
TraceEvent("DiskStoreRequest").detail("FilenameBasename", logDataBasename);
if (fileExists(logDataBasename + "0.fdq") && fileExists(logDataBasename + "1.fdq")) {
included = true;
}
*/
2017-05-26 04:48:44 +08:00
}
}
if (included) {
2017-05-26 04:48:44 +08:00
ids.push_back(ids.arena(), d.storeID);
}
}
req.reply.send(ids);
}
when(wait(loggingTrigger)) {
2017-05-26 04:48:44 +08:00
systemMonitor();
loggingTrigger = delay(loggingDelay, TaskPriority::FlushTrace);
2017-05-26 04:48:44 +08:00
}
when(state WorkerSnapRequest snapReq = waitNext(interf.workerSnapReq.getFuture())) {
Standalone<StringRef> snapFolder = StringRef(folder);
if (snapReq.role.toString() == "coord") {
snapFolder = coordFolder;
}
errorForwarders.add(workerSnapCreate(snapReq, snapFolder));
}
when(wait(errorForwarders.getResult())) {}
when(wait(handleErrors)) {}
2017-05-26 04:48:44 +08:00
}
} catch (Error& err) {
// Make sure actors are cancelled before "recovery" promises are destructed.
for (auto f : recoveries)
f.cancel();
2017-05-26 04:48:44 +08:00
state Error e = err;
bool ok = e.code() == error_code_please_reboot || e.code() == error_code_actor_cancelled ||
e.code() == error_code_please_reboot_delete;
2017-05-26 04:48:44 +08:00
endRole(Role::WORKER, interf.id(), "WorkerError", ok, e);
2017-05-26 04:48:44 +08:00
errorForwarders.clear(false);
sharedLogs.clear();
2017-05-26 04:48:44 +08:00
if (e.code() !=
error_code_actor_cancelled) { // We get cancelled e.g. when an entire simulation times out, but in that case
// we won't be restarted and don't need to wait for shutdown
2017-05-26 04:48:44 +08:00
stopping.send(Void());
wait(filesClosed.getResult()); // Wait for complete shutdown of KV stores
wait(delay(0.0)); // Unwind the callstack to make sure that IAsyncFile references are all gone
2017-05-26 04:48:44 +08:00
TraceEvent(SevInfo, "WorkerShutdownComplete", interf.id());
}
throw e;
}
}
ACTOR Future<Void> extractClusterInterface(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> a,
Reference<AsyncVar<Optional<ClusterInterface>>> b) {
2017-05-26 04:48:44 +08:00
loop {
if (a->get().present())
2017-05-26 04:48:44 +08:00
b->set(a->get().get().clientInterface);
else
b->set(Optional<ClusterInterface>());
wait(a->onChange());
2017-05-26 04:48:44 +08:00
}
}
static std::set<int> const& normalWorkerErrors() {
static std::set<int> s;
if (s.empty()) {
s.insert(error_code_please_reboot);
s.insert(error_code_please_reboot_delete);
2017-05-26 04:48:44 +08:00
}
return s;
}
ACTOR Future<Void> fileNotFoundToNever(Future<Void> f) {
2017-05-26 04:48:44 +08:00
try {
wait(f);
2017-05-26 04:48:44 +08:00
return Void();
} catch (Error& e) {
if (e.code() == error_code_file_not_found) {
TraceEvent(SevWarn, "ClusterCoordinatorFailed").error(e);
2017-05-26 04:48:44 +08:00
return Never();
}
throw;
}
}
ACTOR Future<Void> printTimeout() {
wait(delay(5));
if (!g_network->isSimulated()) {
2017-05-26 04:48:44 +08:00
fprintf(stderr, "Warning: FDBD has not joined the cluster after 5 seconds.\n");
fprintf(stderr, " Check configuration and availability using the 'status' command with the fdbcli\n");
}
return Void();
}
ACTOR Future<Void> printOnFirstConnected(Reference<AsyncVar<Optional<ClusterInterface>>> ci) {
2017-05-26 04:48:44 +08:00
state Future<Void> timeoutFuture = printTimeout();
loop {
choose {
when(wait(ci->get().present() ? IFailureMonitor::failureMonitor().onStateEqual(
ci->get().get().openDatabase.getEndpoint(), FailureStatus(false))
: Never())) {
2017-05-26 04:48:44 +08:00
printf("FDBD joined cluster.\n");
TraceEvent("FDBDConnected").log();
2017-05-26 04:48:44 +08:00
return Void();
}
when(wait(ci->onChange())) {}
2017-05-26 04:48:44 +08:00
}
}
}
2018-06-09 02:51:51 +08:00
ClusterControllerPriorityInfo getCCPriorityInfo(std::string filePath, ProcessClass processClass) {
if (!fileExists(filePath))
return ClusterControllerPriorityInfo(ProcessClass(processClass.classType(), ProcessClass::CommandLineSource)
.machineClassFitness(ProcessClass::ClusterController),
false,
ClusterControllerPriorityInfo::FitnessUnknown);
std::string contents(readFileBytes(filePath, 1000));
BinaryReader br(StringRef(contents), IncludeVersion());
ClusterControllerPriorityInfo priorityInfo(
ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown);
br >> priorityInfo;
if (!br.empty()) {
if (g_network->isSimulated()) {
ASSERT(false);
} else {
TraceEvent(SevWarnAlways, "FitnessFileCorrupted").detail("filePath", filePath);
return ClusterControllerPriorityInfo(ProcessClass(processClass.classType(), ProcessClass::CommandLineSource)
.machineClassFitness(ProcessClass::ClusterController),
false,
ClusterControllerPriorityInfo::FitnessUnknown);
}
}
return priorityInfo;
}
ACTOR Future<Void> monitorAndWriteCCPriorityInfo(std::string filePath,
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo) {
loop {
wait(asyncPriorityInfo->onChange());
std::string contents(BinaryWriter::toValue(asyncPriorityInfo->get(),
IncludeVersion(ProtocolVersion::withClusterControllerPriorityInfo()))
.toString());
atomicReplace(filePath, contents, false);
}
}
ACTOR Future<UID> createAndLockProcessIdFile(std::string folder) {
state UID processIDUid;
platform::createDirectory(folder);
loop {
try {
state std::string lockFilePath = joinPath(folder, "processId");
state ErrorOr<Reference<IAsyncFile>> lockFile = wait(errorOr(IAsyncFileSystem::filesystem(g_network)->open(
lockFilePath, IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_LOCK, 0600)));
if (lockFile.isError() && lockFile.getError().code() == error_code_file_not_found &&
!fileExists(lockFilePath)) {
Reference<IAsyncFile> _lockFile = wait(IAsyncFileSystem::filesystem()->open(
lockFilePath,
IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_LOCK |
IAsyncFile::OPEN_READWRITE,
0600));
lockFile = _lockFile;
processIDUid = deterministicRandom()->randomUniqueID();
2020-05-23 08:16:59 +08:00
BinaryWriter wr(IncludeVersion(ProtocolVersion::withProcessIDFile()));
wr << processIDUid;
wait(lockFile.get()->write(wr.getData(), wr.getLength(), 0));
wait(lockFile.get()->sync());
} else {
if (lockFile.isError())
throw lockFile.getError(); // If we've failed to open the file, throw an exception
int64_t fileSize = wait(lockFile.get()->size());
state Key fileData = makeString(fileSize);
wait(success(lockFile.get()->read(mutateString(fileData), fileSize, 0)));
try {
processIDUid = BinaryReader::fromStringRef<UID>(fileData, IncludeVersion());
return processIDUid;
} catch (Error& e) {
if (!g_network->isSimulated()) {
throw;
}
lockFile = ErrorOr<Reference<IAsyncFile>>();
wait(IAsyncFileSystem::filesystem()->deleteFile(lockFilePath, true));
}
}
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
if (!e.isInjectedFault()) {
fprintf(stderr,
"ERROR: error creating or opening process id file `%s'.\n",
joinPath(folder, "processId").c_str());
}
TraceEvent(SevError, "OpenProcessIdError").error(e);
throw;
}
}
}
ACTOR Future<MonitorLeaderInfo> monitorLeaderRemotelyOneGeneration(Reference<ClusterConnectionFile> connFile,
Reference<AsyncVar<Value>> result,
MonitorLeaderInfo info) {
state ClusterConnectionString ccf = info.intermediateConnFile->getConnectionString();
state vector<NetworkAddress> addrs = ccf.coordinators();
state ElectionResultRequest request;
state int index = 0;
state int successIndex = 0;
request.key = ccf.clusterKey();
request.coordinators = ccf.coordinators();
deterministicRandom()->randomShuffle(addrs);
loop {
LeaderElectionRegInterface interf(addrs[index]);
request.reply = ReplyPromise<Optional<LeaderInfo>>();
ErrorOr<Optional<LeaderInfo>> leader = wait(interf.electionResult.tryGetReply(request));
if (leader.present()) {
if (leader.get().present()) {
if (leader.get().get().forward) {
2020-11-07 15:50:55 +08:00
info.intermediateConnFile = makeReference<ClusterConnectionFile>(
connFile->getFilename(), ClusterConnectionString(leader.get().get().serializedInfo.toString()));
return info;
}
if (connFile != info.intermediateConnFile) {
if (!info.hasConnected) {
TraceEvent(SevWarnAlways, "IncorrectClusterFileContentsAtConnection")
.detail("Filename", connFile->getFilename())
.detail("ConnectionStringFromFile", connFile->getConnectionString().toString())
.detail("CurrentConnectionString",
info.intermediateConnFile->getConnectionString().toString());
}
connFile->setConnectionString(info.intermediateConnFile->getConnectionString());
info.intermediateConnFile = connFile;
}
info.hasConnected = true;
connFile->notifyConnected();
request.knownLeader = leader.get().get().changeID;
ClusterControllerPriorityInfo info = leader.get().get().getPriorityInfo();
if (leader.get().get().serializedInfo.size() && !info.isExcluded &&
(info.dcFitness == ClusterControllerPriorityInfo::FitnessPrimary ||
info.dcFitness == ClusterControllerPriorityInfo::FitnessPreferred ||
info.dcFitness == ClusterControllerPriorityInfo::FitnessUnknown)) {
result->set(leader.get().get().serializedInfo);
} else {
result->set(Value());
}
}
successIndex = index;
} else {
index = (index + 1) % addrs.size();
if (index == successIndex) {
wait(delay(CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY));
}
}
}
}
ACTOR Future<Void> monitorLeaderRemotelyInternal(Reference<ClusterConnectionFile> connFile,
Reference<AsyncVar<Value>> outSerializedLeaderInfo) {
state MonitorLeaderInfo info(connFile);
loop {
MonitorLeaderInfo _info = wait(monitorLeaderRemotelyOneGeneration(connFile, outSerializedLeaderInfo, info));
info = _info;
}
}
template <class LeaderInterface>
Future<Void> monitorLeaderRemotely(Reference<ClusterConnectionFile> const& connFile,
Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader) {
LeaderDeserializer<LeaderInterface> deserializer;
2020-11-07 15:50:55 +08:00
auto serializedInfo = makeReference<AsyncVar<Value>>();
Future<Void> m = monitorLeaderRemotelyInternal(connFile, serializedInfo);
return m || deserializer(serializedInfo, outKnownLeader);
}
ACTOR Future<Void> monitorLeaderRemotelyWithDelayedCandidacy(
Reference<ClusterConnectionFile> connFile,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC,
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
Future<Void> recoveredDiskFiles,
LocalityData locality,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
2021-06-11 11:57:50 +08:00
UseConfigDB useConfigDB) {
state Future<Void> monitor = monitorLeaderRemotely(connFile, currentCC);
state Future<Void> timeout;
wait(recoveredDiskFiles);
loop {
if (currentCC->get().present() && dbInfo->get().clusterInterface == currentCC->get().get() &&
IFailureMonitor::failureMonitor()
.getState(currentCC->get().get().registerWorker.getEndpoint())
.isAvailable()) {
timeout = Future<Void>();
} else if (!timeout.isValid()) {
timeout =
delay(SERVER_KNOBS->MIN_DELAY_CC_WORST_FIT_CANDIDACY_SECONDS +
(deterministicRandom()->random01() * (SERVER_KNOBS->MAX_DELAY_CC_WORST_FIT_CANDIDACY_SECONDS -
SERVER_KNOBS->MIN_DELAY_CC_WORST_FIT_CANDIDACY_SECONDS)));
}
choose {
when(wait(currentCC->onChange())) {}
when(wait(dbInfo->onChange())) {}
when(wait(currentCC->get().present() ? IFailureMonitor::failureMonitor().onStateChanged(
currentCC->get().get().registerWorker.getEndpoint())
: Never())) {}
when(wait(timeout.isValid() ? timeout : Never())) {
monitor.cancel();
wait(clusterController(
2021-06-11 11:57:50 +08:00
connFile, currentCC, asyncPriorityInfo, recoveredDiskFiles, locality, useConfigDB));
return Void();
}
}
}
}
extern void setupStackSignal();
ACTOR Future<Void> serveProtocolInfo() {
state RequestStream<ProtocolInfoRequest> protocolInfo(
PeerCompatibilityPolicy{ RequirePeer::AtLeast, ProtocolVersion::withStableInterfaces() });
protocolInfo.makeWellKnownEndpoint(WLTOKEN_PROTOCOL_INFO, TaskPriority::DefaultEndpoint);
loop {
state ProtocolInfoRequest req = waitNext(protocolInfo.getFuture());
req.reply.send(ProtocolInfoReply{ g_network->protocolVersion() });
}
}
// Handles requests from ProcessInterface, an interface meant for direct
// communication between the client and FDB processes.
ACTOR Future<Void> serveProcess() {
state ProcessInterface process;
process.getInterface.makeWellKnownEndpoint(WLTOKEN_PROCESS, TaskPriority::DefaultEndpoint);
loop {
choose {
when(GetProcessInterfaceRequest req = waitNext(process.getInterface.getFuture())) {
req.reply.send(process);
}
when(ActorLineageRequest req = waitNext(process.actorLineage.getFuture())) {
state SampleCollection sampleCollector;
auto samples = sampleCollector->get(req.timeStart, req.timeEnd);
std::vector<SerializedSample> serializedSamples;
for (const auto& samplePtr : samples) {
auto serialized = SerializedSample{ .time = samplePtr->time };
for (const auto& [waitState, pair] : samplePtr->data) {
if (waitState >= req.waitStateStart && waitState <= req.waitStateEnd) {
serialized.data[waitState] = std::string(pair.first, pair.second);
}
}
serializedSamples.push_back(std::move(serialized));
}
ActorLineageReply reply{ serializedSamples };
req.reply.send(reply);
}
}
}
}
ACTOR Future<Void> fdbd(Reference<ClusterConnectionFile> connFile,
LocalityData localities,
ProcessClass processClass,
std::string dataFolder,
std::string coordFolder,
int64_t memoryLimit,
std::string metricsConnFile,
std::string metricsPrefix,
int64_t memoryProfileThreshold,
std::string whitelistBinPaths,
std::string configPath,
std::map<std::string, std::string> manualKnobOverrides,
2021-06-11 11:57:50 +08:00
UseConfigDB useConfigDB) {
state vector<Future<Void>> actors;
state Promise<Void> recoveredDiskFiles;
state LocalConfiguration localConfig(dataFolder, configPath, manualKnobOverrides);
2021-07-28 01:07:18 +08:00
// setupStackSignal();
2021-05-28 02:25:00 +08:00
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::Worker;
2021-06-11 11:57:50 +08:00
if (useConfigDB != UseConfigDB::DISABLED) {
wait(localConfig.initialize());
}
actors.push_back(serveProtocolInfo());
actors.push_back(serveProcess());
try {
ServerCoordinators coordinators(connFile);
if (g_network->isSimulated()) {
whitelistBinPaths = ",, random_path, /bin/snap_create.sh,,";
}
TraceEvent("StartingFDBD")
.detail("ZoneID", localities.zoneId())
.detail("MachineId", localities.machineId())
.detail("DiskPath", dataFolder)
.detail("CoordPath", coordFolder)
.detail("WhiteListBinPath", whitelistBinPaths);
// SOMEDAY: start the services on the machine in a staggered fashion in simulation?
// Endpoints should be registered first before any process trying to connect to it.
// So coordinationServer actor should be the first one executed before any other.
if (coordFolder.size()) {
// SOMEDAY: remove the fileNotFound wrapper and make DiskQueue construction safe from errors setting up
// their files
2021-06-11 11:57:50 +08:00
actors.push_back(fileNotFoundToNever(coordinationServer(coordFolder, coordinators.ccf, useConfigDB)));
}
state UID processIDUid = wait(createAndLockProcessIdFile(dataFolder));
localities.set(LocalityData::keyProcessId, processIDUid.toString());
// Only one process can execute on a dataFolder from this point onwards
2018-06-09 02:51:51 +08:00
std::string fitnessFilePath = joinPath(dataFolder, "fitness");
2020-11-07 15:50:55 +08:00
auto cc = makeReference<AsyncVar<Optional<ClusterControllerFullInterface>>>();
auto ci = makeReference<AsyncVar<Optional<ClusterInterface>>>();
auto asyncPriorityInfo =
makeReference<AsyncVar<ClusterControllerPriorityInfo>>(getCCPriorityInfo(fitnessFilePath, processClass));
auto dbInfo = makeReference<AsyncVar<ServerDBInfo>>();
actors.push_back(reportErrors(monitorAndWriteCCPriorityInfo(fitnessFilePath, asyncPriorityInfo),
"MonitorAndWriteCCPriorityInfo"));
2020-06-06 07:27:04 +08:00
if (processClass.machineClassFitness(ProcessClass::ClusterController) == ProcessClass::NeverAssign) {
actors.push_back(reportErrors(monitorLeader(connFile, cc), "ClusterController"));
} else if (processClass.machineClassFitness(ProcessClass::ClusterController) == ProcessClass::WorstFit &&
SERVER_KNOBS->MAX_DELAY_CC_WORST_FIT_CANDIDACY_SECONDS > 0) {
2021-06-11 11:57:50 +08:00
actors.push_back(reportErrors(
monitorLeaderRemotelyWithDelayedCandidacy(
connFile, cc, asyncPriorityInfo, recoveredDiskFiles.getFuture(), localities, dbInfo, useConfigDB),
"ClusterController"));
} else {
actors.push_back(reportErrors(
clusterController(
2021-06-11 11:57:50 +08:00
connFile, cc, asyncPriorityInfo, recoveredDiskFiles.getFuture(), localities, useConfigDB),
"ClusterController"));
}
actors.push_back(reportErrors(extractClusterInterface(cc, ci), "ExtractClusterInterface"));
actors.push_back(reportErrorsExcept(workerServer(connFile,
cc,
localities,
asyncPriorityInfo,
processClass,
dataFolder,
memoryLimit,
metricsConnFile,
metricsPrefix,
recoveredDiskFiles,
memoryProfileThreshold,
coordFolder,
whitelistBinPaths,
2021-08-06 09:50:11 +08:00
dbInfo,
useConfigDB,
&localConfig),
"WorkerServer",
UID(),
&normalWorkerErrors()));
state Future<Void> firstConnect = reportErrors(printOnFirstConnected(ci), "ClusterFirstConnectedError");
wait(quorum(actors, 1));
ASSERT(false); // None of these actors should terminate normally
throw internal_error();
} catch (Error& e) {
// Make sure actors are cancelled before recoveredDiskFiles is destructed.
// Otherwise, these actors may get a broken promise error.
for (auto f : actors)
f.cancel();
Error err = checkIOTimeout(e);
throw err;
}
2017-05-26 04:48:44 +08:00
}
2018-09-06 06:53:12 +08:00
const Role Role::WORKER("Worker", "WK", false);
const Role Role::STORAGE_SERVER("StorageServer", "SS");
const Role Role::TESTING_STORAGE_SERVER("TestingStorageServer", "ST");
const Role Role::TRANSACTION_LOG("TLog", "TL");
2018-09-06 06:53:12 +08:00
const Role Role::SHARED_TRANSACTION_LOG("SharedTLog", "SL", false);
2020-09-11 08:44:15 +08:00
const Role Role::COMMIT_PROXY("CommitProxyServer", "CP");
2020-07-15 15:37:41 +08:00
const Role Role::GRV_PROXY("GrvProxyServer", "GP");
const Role Role::MASTER("MasterServer", "MS");
const Role Role::RESOLVER("Resolver", "RV");
const Role Role::CLUSTER_CONTROLLER("ClusterController", "CC");
const Role Role::TESTER("Tester", "TS");
const Role Role::LOG_ROUTER("LogRouter", "LR");
const Role Role::DATA_DISTRIBUTOR("DataDistributor", "DD");
const Role Role::RATEKEEPER("Ratekeeper", "RK");
2019-11-13 05:01:29 +08:00
const Role Role::STORAGE_CACHE("StorageCache", "SC");
2019-07-04 02:09:36 +08:00
const Role Role::COORDINATOR("Coordinator", "CD");
const Role Role::BACKUP("Backup", "BK");