added comments

This commit is contained in:
Markus Pilman 2021-06-15 16:49:27 -06:00
parent dc17be093c
commit 56eaf1bc83
3 changed files with 18 additions and 4 deletions

View File

@ -284,6 +284,8 @@ ACTOR Future<vector<WorkerInterface>> getStorageWorkers(Database cx,
return result; return result;
} }
// Helper function to extract he maximum SQ size of all provided messages. All futures in the
// messages vector have to be ready.
int64_t extractMaxQueueSize(const std::vector<Future<TraceEventFields>>& messages, int64_t extractMaxQueueSize(const std::vector<Future<TraceEventFields>>& messages,
const std::vector<StorageServerInterface>& servers) { const std::vector<StorageServerInterface>& servers) {
@ -315,6 +317,7 @@ int64_t extractMaxQueueSize(const std::vector<Future<TraceEventFields>>& message
return maxQueueSize; return maxQueueSize;
} }
// Timeout wrapper when getting the storage metrics. This will do some additional tracing
ACTOR Future<TraceEventFields> getStorageMetricsTimeout(UID storage, WorkerInterface wi) { ACTOR Future<TraceEventFields> getStorageMetricsTimeout(UID storage, WorkerInterface wi) {
state Future<TraceEventFields> result = state Future<TraceEventFields> result =
wi.eventLogRequest.getReply(EventLogRequest(StringRef(storage.toString() + "/StorageMetrics"))); wi.eventLogRequest.getReply(EventLogRequest(StringRef(storage.toString() + "/StorageMetrics")));
@ -608,6 +611,8 @@ ACTOR Future<Void> reconfigureAfter(Database cx,
return Void(); return Void();
} }
// Waits until a database quiets down (no data in flight, small tlog queue, low SQ, no active data distribution). This
// requires the database to be available and healthy in order to succeed.
ACTOR Future<Void> waitForQuietDatabase(Database cx, ACTOR Future<Void> waitForQuietDatabase(Database cx,
Reference<AsyncVar<ServerDBInfo>> dbInfo, Reference<AsyncVar<ServerDBInfo>> dbInfo,
std::string phase, std::string phase,

View File

@ -53,17 +53,19 @@ const int MACHINE_REBOOT_TIME = 10;
bool destructed = false; bool destructed = false;
// given a std::variant T, this templated class will define type which will be a variant of all types in T plus Args...
template <class T, class... Args> template <class T, class... Args>
struct concatenate_variant_t; struct variant_add_t;
template <class... Args1, class... Args2> template <class... Args1, class... Args2>
struct concatenate_variant_t<std::variant<Args1...>, Args2...> { struct variant_add_t<std::variant<Args1...>, Args2...> {
using type = std::variant<Args1..., Args2...>; using type = std::variant<Args1..., Args2...>;
}; };
template <class T, class... Args> template <class T, class... Args>
using concatenate_variant = typename concatenate_variant_t<T, Args...>::type; using variant_add = typename variant_add_t<T, Args...>::type;
// variant_with_optional_t::type will be a std::variant<Args..., Optional<Args>...>
template <class... Args> template <class... Args>
struct variant_with_optional_t; struct variant_with_optional_t;
@ -74,12 +76,13 @@ struct variant_with_optional_t<Single> {
template <class Head, class... Tail> template <class Head, class... Tail>
struct variant_with_optional_t<Head, Tail...> { struct variant_with_optional_t<Head, Tail...> {
using type = concatenate_variant<typename variant_with_optional_t<Tail...>::type, Head, Optional<Head>>; using type = variant_add<typename variant_with_optional_t<Tail...>::type, Head, Optional<Head>>;
}; };
template <class... Args> template <class... Args>
using variant_with_optional = typename variant_with_optional_t<Args...>::type; using variant_with_optional = typename variant_with_optional_t<Args...>::type;
// Expects a std::variant and will make a pointer out of each argument
template <class T> template <class T>
struct add_pointers_t; struct add_pointers_t;

View File

@ -1536,6 +1536,12 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
activeSharedTLog->set(logData.uid); activeSharedTLog->set(logData.uid);
} }
when(InitializeStorageRequest req = waitNext(interf.storage.getFuture())) { when(InitializeStorageRequest req = waitNext(interf.storage.getFuture())) {
// We want to prevent double recruiting on a worker unless we try to recruit something
// with a different storage engine (otherwise storage migration won't work for certain
// configuration). Additionally we also need to allow double recruitment for seed servers.
// The reason for this is that a storage will only remove itself if after it was able
// to read the system key space. But if recovery fails right after a `configure new ...`
// was run it won't be able to do so.
if (!storageCache.exists(req.reqId) && if (!storageCache.exists(req.reqId) &&
(std::all_of(runningStorages.begin(), (std::all_of(runningStorages.begin(),
runningStorages.end(), runningStorages.end(),