Extend RebootRequest API to include time to suspend the process before reboot. This is intended to be used for testing purposes to simulate failures.
This commit is contained in:
parent
6ececa94ce
commit
5eb833759e
|
@ -779,7 +779,6 @@ const KeyRef exeRestore = LiteralStringRef("fdbrestore");
|
|||
const KeyRef exeDatabaseAgent = LiteralStringRef("dr_agent");
|
||||
const KeyRef exeDatabaseBackup = LiteralStringRef("fdbdr");
|
||||
|
||||
extern void flushTraceFileVoid();
|
||||
extern const char* getHGVersion();
|
||||
|
||||
#ifdef _WIN32
|
||||
|
|
|
@ -49,12 +49,14 @@ struct RebootRequest {
|
|||
constexpr static FileIdentifier file_identifier = 11913957;
|
||||
bool deleteData;
|
||||
bool checkData;
|
||||
uint32_t waitForDuration;
|
||||
|
||||
explicit RebootRequest(bool deleteData = false, bool checkData = false) : deleteData(deleteData), checkData(checkData) {}
|
||||
explicit RebootRequest(bool deleteData = false, bool checkData = false, uint32_t waitForDuration = 0)
|
||||
: deleteData(deleteData), checkData(checkData), waitForDuration(waitForDuration) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, deleteData, checkData);
|
||||
serializer(ar, deleteData, checkData, waitForDuration);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -107,6 +107,7 @@ set(FDBSERVER_SRCS
|
|||
workloads/ChangeConfig.actor.cpp
|
||||
workloads/ClientTransactionProfileCorrectness.actor.cpp
|
||||
workloads/TriggerRecovery.actor.cpp
|
||||
workloads/SuspendProcesses.actor.cpp
|
||||
workloads/CommitBugCheck.actor.cpp
|
||||
workloads/ConfigureDatabase.actor.cpp
|
||||
workloads/ConflictRange.actor.cpp
|
||||
|
|
|
@ -107,6 +107,7 @@
|
|||
<ActorCompiler Include="workloads\AtomicOpsApiCorrectness.actor.cpp" />
|
||||
<ActorCompiler Include="workloads\ClientTransactionProfileCorrectness.actor.cpp" />
|
||||
<ActorCompiler Include="workloads\TriggerRecovery.actor.cpp" />
|
||||
<ActorCompiler Include="workloads\SuspendProcesses.actor.cpp" />
|
||||
<ActorCompiler Include="workloads\BackupToDBAbort.actor.cpp" />
|
||||
<ActorCompiler Include="workloads\BackupToDBCorrectness.actor.cpp" />
|
||||
<ActorCompiler Include="workloads\BackupToDBUpgrade.actor.cpp" />
|
||||
|
|
|
@ -234,6 +234,9 @@
|
|||
<ActorCompiler Include="workloads\TriggerRecovery.actor.cpp">
|
||||
<Filter>workloads</Filter>
|
||||
</ActorCompiler>
|
||||
<ActorCompiler Include="workloads\SuspendProcesses.actor.cpp">
|
||||
<Filter>workloads</Filter>
|
||||
</ActorCompiler>
|
||||
<ActorCompiler Include="workloads\StatusWorkload.actor.cpp">
|
||||
<Filter>workloads</Filter>
|
||||
</ActorCompiler>
|
||||
|
|
|
@ -866,6 +866,13 @@ ACTOR Future<Void> workerServer(
|
|||
|
||||
when( RebootRequest req = waitNext( interf.clientInterface.reboot.getFuture() ) ) {
|
||||
state RebootRequest rebootReq = req;
|
||||
if(req.waitForDuration) {
|
||||
TraceEvent("RebootRequestSuspendingProcess").detail("Duration", req.waitForDuration);
|
||||
flushTraceFileVoid();
|
||||
setProfilingEnabled(0);
|
||||
g_network->stop();
|
||||
threadSleep(req.waitForDuration);
|
||||
}
|
||||
if(rebootReq.checkData) {
|
||||
Reference<IAsyncFile> checkFile = wait( IAsyncFileSystem::filesystem()->open( joinPath(folder, validationFilename), IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_READWRITE, 0600 ) );
|
||||
wait( checkFile->sync() );
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
#include <boost/algorithm/string/predicate.hpp>
|
||||
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "fdbserver/ServerDBInfo.h"
|
||||
#include "fdbclient/Status.h"
|
||||
#include "fdbclient/StatusClient.h"
|
||||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "fdbclient/RunTransaction.actor.h"
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
struct SuspendProcessesWorkload : TestWorkload {
|
||||
std::vector<std::string> prefixSuspendProcesses;
|
||||
double suspendTimeDuration;
|
||||
double waitTimeDuration;
|
||||
|
||||
SuspendProcessesWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
prefixSuspendProcesses = getOption(options, LiteralStringRef("prefixesSuspendProcesses"), std::vector<std::string>());
|
||||
waitTimeDuration = getOption(options, LiteralStringRef("waitTimeDuration"), 0);
|
||||
suspendTimeDuration = getOption(options, LiteralStringRef("suspendTimeDuration"), 0);
|
||||
}
|
||||
|
||||
virtual std::string description() { return "SuspendProcesses"; }
|
||||
|
||||
virtual Future<Void> setup(Database const& cx) { return Void(); }
|
||||
|
||||
ACTOR Future<Void> _start(Database cx, SuspendProcessesWorkload* self) {
|
||||
wait(delay(self->waitTimeDuration));
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Standalone<RangeResultRef> kvs = wait(tr.getRange(
|
||||
KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces"), LiteralStringRef("\xff\xff\xff")), 1));
|
||||
std::vector<Standalone<StringRef>> suspendProcessInterfaces;
|
||||
for (auto it : kvs) {
|
||||
auto ip_port = it.key.endsWith(LiteralStringRef(":tls"))
|
||||
? it.key.removeSuffix(LiteralStringRef(":tls"))
|
||||
: it.key;
|
||||
for (auto& killProcess : self->prefixSuspendProcesses) {
|
||||
if (boost::starts_with(ip_port.toString().c_str(), killProcess.c_str())) {
|
||||
suspendProcessInterfaces.push_back(it.value);
|
||||
TraceEvent("SuspendProcessSelectedProcess").detail("IpPort", printable(ip_port));
|
||||
}
|
||||
}
|
||||
}
|
||||
for (auto& interf : suspendProcessInterfaces) {
|
||||
BinaryReader::fromStringRef<ClientWorkerInterface>(interf, IncludeVersion())
|
||||
.reboot.send(RebootRequest(false, false, self->suspendTimeDuration));
|
||||
}
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
virtual Future<Void> start(Database const& cx) {
|
||||
if (clientId != 0) return Void();
|
||||
return _start(cx, this);
|
||||
}
|
||||
|
||||
virtual Future<bool> check(Database const& cx) { return true; }
|
||||
|
||||
virtual void getMetrics(vector<PerfMetric>& m) {}
|
||||
};
|
||||
|
||||
WorkloadFactory<SuspendProcessesWorkload> SuspendProcessesWorkloadFactory("SuspendProcesses");
|
|
@ -35,8 +35,6 @@ std::map<int, int>& Error::errorCounts() {
|
|||
|
||||
#include <iostream>
|
||||
|
||||
extern void flushTraceFileVoid();
|
||||
|
||||
Error Error::fromUnvalidatedCode(int code) {
|
||||
if (code < 0 || code > 30000) {
|
||||
Error e = Error::fromCode(error_code_unknown_error);
|
||||
|
|
|
@ -537,6 +537,7 @@ void openTraceFile(const NetworkAddress& na, uint64_t rollsize, uint64_t maxLogs
|
|||
void initTraceEventMetrics();
|
||||
void closeTraceFile();
|
||||
bool traceFileIsOpen();
|
||||
void flushTraceFileVoid();
|
||||
|
||||
// Changes the format of trace files. Returns false if the format is unrecognized. No longer safe to call after a call
|
||||
// to openTraceFile.
|
||||
|
|
Loading…
Reference in New Issue