Redwood chunked file growth and low priority IO starvation prevention (#5936)
* Redwood files now growth in large page chunks controlled by a knob to reduce truncate() calls for expansion. PriorityMultiLock has limit on consecutive same-priority lock release. Increased Redwood max priority level to 3 for more separation at higher BTree levels. * Simulation fix, don't mark certain IO timeout errors as injected unless the simulated process has been set to have an unreliable disk. * Pager writes now truncate gradually upward, one chunk at a time, in response to writes, which wait on only the necessary truncate operations. Increased buggified chunk size because truncate can be very slow in simulation. * In simulation, ioTimeoutError() and ioDegradedOrTimeoutError() will wait until at least the target timeout interval past the point when simulation is sped up. * PriorityMultiLock::toString() prints more info and is now public. * Added queued time to PriorityMultiLock. * Bug fix to handle when speedUpSimulation changes later than the configured time. * Refactored mutation application in leaf nodes to do fewer comparisons and do in place value updates if the new value is the same size as the old value. * Renamed updatingInPlace to updatingDeltaTree for clarity. Inlined switchToLinearMerge() since it is only used in one place. * Updated extendToCover to be more clear by passing in the old extension future as a parameter. Fixed initialization warning.
This commit is contained in:
parent
e752fdd69f
commit
508429f30d
|
@ -771,6 +771,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( REDWOOD_LAZY_CLEAR_MAX_PAGES, 1e6 );
|
||||
init( REDWOOD_REMAP_CLEANUP_WINDOW, 50 );
|
||||
init( REDWOOD_REMAP_CLEANUP_LAG, 0.1 );
|
||||
init( REDWOOD_PAGEFILE_GROWTH_SIZE_PAGES, 20000 ); if( randomize && BUGGIFY ) { REDWOOD_PAGEFILE_GROWTH_SIZE_PAGES = deterministicRandom()->randomInt(200, 1000); }
|
||||
init( REDWOOD_METRICS_INTERVAL, 5.0 );
|
||||
init( REDWOOD_HISTOGRAM_INTERVAL, 30.0 );
|
||||
|
||||
|
|
|
@ -724,6 +724,7 @@ public:
|
|||
int64_t REDWOOD_REMAP_CLEANUP_WINDOW; // Remap remover lag interval in which to coalesce page writes
|
||||
double REDWOOD_REMAP_CLEANUP_LAG; // Maximum allowed remap remover lag behind the cleanup window as a multiple of
|
||||
// the window size
|
||||
int REDWOOD_PAGEFILE_GROWTH_SIZE_PAGES; // Number of pages to grow page file by
|
||||
double REDWOOD_METRICS_INTERVAL;
|
||||
double REDWOOD_HISTOGRAM_INTERVAL;
|
||||
|
||||
|
|
|
@ -25,9 +25,8 @@
|
|||
#include "flow/actorcompiler.h"
|
||||
|
||||
ACTOR Future<Void> disableConnectionFailuresAfter(double time, std::string context) {
|
||||
wait(delay(time));
|
||||
|
||||
if (g_network->isSimulated()) {
|
||||
wait(delayUntil(time));
|
||||
g_simulator.connectionFailuresDisableDuration = 1e6;
|
||||
g_simulator.speedUpSimulation = true;
|
||||
TraceEvent(SevWarnAlways, ("DisableConnectionFailures_" + context).c_str());
|
||||
|
|
|
@ -128,8 +128,13 @@ public:
|
|||
};
|
||||
|
||||
private:
|
||||
typedef Promise<Lock> Slot;
|
||||
typedef Deque<Slot> Queue;
|
||||
struct Waiter {
|
||||
Waiter() : queuedTime(now()) {}
|
||||
Promise<Lock> lockPromise;
|
||||
double queuedTime;
|
||||
};
|
||||
|
||||
typedef Deque<Waiter> Queue;
|
||||
|
||||
#if PRIORITYMULTILOCK_DEBUG
|
||||
#define prioritylock_printf(...) printf(__VA_ARGS__)
|
||||
|
@ -138,7 +143,8 @@ private:
|
|||
#endif
|
||||
|
||||
public:
|
||||
PriorityMultiLock(int concurrency, int maxPriority) : concurrency(concurrency), available(concurrency), waiting(0) {
|
||||
PriorityMultiLock(int concurrency, int maxPriority, int launchLimit = std::numeric_limits<int>::max())
|
||||
: concurrency(concurrency), available(concurrency), waiting(0), launchLimit(launchLimit) {
|
||||
waiters.resize(maxPriority + 1);
|
||||
fRunner = runner(this);
|
||||
}
|
||||
|
@ -157,11 +163,37 @@ public:
|
|||
return p;
|
||||
}
|
||||
|
||||
Slot s;
|
||||
waiters[priority].push_back(s);
|
||||
Waiter w;
|
||||
waiters[priority].push_back(w);
|
||||
++waiting;
|
||||
prioritylock_printf("lock exit queued %s\n", toString().c_str());
|
||||
return s.getFuture();
|
||||
return w.lockPromise.getFuture();
|
||||
}
|
||||
|
||||
std::string toString() const {
|
||||
int runnersDone = 0;
|
||||
for (int i = 0; i < runners.size(); ++i) {
|
||||
if (runners[i].isReady()) {
|
||||
++runnersDone;
|
||||
}
|
||||
}
|
||||
|
||||
std::string s =
|
||||
format("{ ptr=%p concurrency=%d available=%d running=%d waiting=%d runnersQueue=%d runnersDone=%d ",
|
||||
this,
|
||||
concurrency,
|
||||
available,
|
||||
concurrency - available,
|
||||
waiting,
|
||||
runners.size(),
|
||||
runnersDone);
|
||||
|
||||
for (int i = 0; i < waiters.size(); ++i) {
|
||||
s += format("p%d_waiters=%u ", i, waiters[i].size());
|
||||
}
|
||||
|
||||
s += "}";
|
||||
return s;
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -181,6 +213,13 @@ private:
|
|||
state Future<Void> error = self->brokenOnDestruct.getFuture();
|
||||
state int maxPriority = self->waiters.size() - 1;
|
||||
|
||||
// Priority to try to run tasks from next
|
||||
state int priority = maxPriority;
|
||||
state Queue* pQueue = &self->waiters[maxPriority];
|
||||
|
||||
// Track the number of waiters unlocked at the same priority in a row
|
||||
state int lastPriorityCount = 0;
|
||||
|
||||
loop {
|
||||
// Cleanup finished runner futures at the front of the runner queue.
|
||||
while (!self->runners.empty() && self->runners.front().isReady()) {
|
||||
|
@ -197,20 +236,22 @@ private:
|
|||
}
|
||||
|
||||
// While there are available slots and there are waiters, launch tasks
|
||||
int priority = maxPriority;
|
||||
|
||||
while (self->available > 0 && self->waiting > 0) {
|
||||
auto& q = self->waiters[priority];
|
||||
prioritylock_printf(
|
||||
"Checking priority=%d prioritySize=%d %s\n", priority, q.size(), self->toString().c_str());
|
||||
prioritylock_printf("Checking priority=%d lastPriorityCount=%d %s\n",
|
||||
priority,
|
||||
lastPriorityCount,
|
||||
self->toString().c_str());
|
||||
|
||||
while (!q.empty()) {
|
||||
Slot s = q.front();
|
||||
q.pop_front();
|
||||
while (!pQueue->empty() && ++lastPriorityCount < self->launchLimit) {
|
||||
Waiter w = pQueue->front();
|
||||
pQueue->pop_front();
|
||||
--self->waiting;
|
||||
Lock lock;
|
||||
prioritylock_printf(" Running waiter priority=%d prioritySize=%d\n", priority, q.size());
|
||||
s.send(lock);
|
||||
prioritylock_printf(" Running waiter priority=%d wait=%f %s\n",
|
||||
priority,
|
||||
now() - w.queuedTime,
|
||||
self->toString().c_str());
|
||||
w.lockPromise.send(lock);
|
||||
|
||||
// Self may have been destructed during the lock callback
|
||||
if (error.isReady()) {
|
||||
|
@ -228,24 +269,28 @@ private:
|
|||
}
|
||||
}
|
||||
|
||||
// Wrap around to highest priority
|
||||
// If there are no more slots available, then don't move to the next priority
|
||||
if (self->available == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Decrease priority, wrapping around to max from 0
|
||||
if (priority == 0) {
|
||||
priority = maxPriority;
|
||||
} else {
|
||||
--priority;
|
||||
}
|
||||
|
||||
pQueue = &self->waiters[priority];
|
||||
lastPriorityCount = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::string toString() const {
|
||||
return format(
|
||||
"{ slots=%d/%d waiting=%d runners=%d }", (concurrency - available), concurrency, waiting, runners.size());
|
||||
}
|
||||
|
||||
int concurrency;
|
||||
int available;
|
||||
int waiting;
|
||||
int launchLimit;
|
||||
std::vector<Queue> waiters;
|
||||
Deque<Future<Void>> runners;
|
||||
Future<Void> fRunner;
|
||||
|
@ -371,7 +416,7 @@ std::string toString(const std::pair<F, S>& o) {
|
|||
|
||||
static constexpr int ioMinPriority = 0;
|
||||
static constexpr int ioLeafPriority = 1;
|
||||
static constexpr int ioMaxPriority = 2;
|
||||
static constexpr int ioMaxPriority = 3;
|
||||
|
||||
// A FIFO queue of T stored as a linked list of pages.
|
||||
// Main operations are pop(), pushBack(), pushFront(), and flush().
|
||||
|
@ -2102,10 +2147,10 @@ public:
|
|||
int concurrentExtentReads,
|
||||
bool memoryOnly = false,
|
||||
Promise<Void> errorPromise = {})
|
||||
: ioLock(FLOW_KNOBS->MAX_OUTSTANDING, ioMaxPriority), pageCacheBytes(pageCacheSizeBytes), pHeader(nullptr),
|
||||
desiredPageSize(desiredPageSize), desiredExtentSize(desiredExtentSize), filename(filename),
|
||||
memoryOnly(memoryOnly), errorPromise(errorPromise), remapCleanupWindow(remapCleanupWindow),
|
||||
concurrentExtentReads(new FlowLock(concurrentExtentReads)) {
|
||||
: ioLock(FLOW_KNOBS->MAX_OUTSTANDING, ioMaxPriority, FLOW_KNOBS->MAX_OUTSTANDING / 2),
|
||||
pageCacheBytes(pageCacheSizeBytes), pHeader(nullptr), desiredPageSize(desiredPageSize),
|
||||
desiredExtentSize(desiredExtentSize), filename(filename), memoryOnly(memoryOnly), errorPromise(errorPromise),
|
||||
remapCleanupWindow(remapCleanupWindow), concurrentExtentReads(new FlowLock(concurrentExtentReads)) {
|
||||
|
||||
if (!g_redwoodMetricsActor.isValid()) {
|
||||
g_redwoodMetricsActor = redwoodMetricsLogger();
|
||||
|
@ -2175,6 +2220,8 @@ public:
|
|||
wait(store(fileSize, self->pageFile->size()));
|
||||
}
|
||||
|
||||
self->fileExtension = Void();
|
||||
|
||||
debug_printf(
|
||||
"DWALPager(%s) recover exists=%d fileSize=%" PRId64 "\n", self->filename.c_str(), exists, fileSize);
|
||||
// TODO: If the file exists but appears to never have been successfully committed is this an error or
|
||||
|
@ -2221,6 +2268,9 @@ public:
|
|||
}
|
||||
|
||||
self->setPageSize(self->pHeader->pageSize);
|
||||
self->filePageCount = fileSize / self->physicalPageSize;
|
||||
self->filePageCountPending = self->filePageCount;
|
||||
|
||||
if (self->logicalPageSize != self->desiredPageSize) {
|
||||
TraceEvent(SevWarn, "RedwoodPageSizeNotDesired")
|
||||
.detail("Filename", self->filename)
|
||||
|
@ -2320,6 +2370,8 @@ public:
|
|||
|
||||
// Now that the header page has been allocated, set page size to desired
|
||||
self->setPageSize(self->desiredPageSize);
|
||||
self->filePageCount = 0;
|
||||
self->filePageCountPending = 0;
|
||||
|
||||
// Now set the extent size, do this always after setting the page size as
|
||||
// extent size is a multiple of page size
|
||||
|
@ -2397,7 +2449,10 @@ public:
|
|||
self->recoveryVersion,
|
||||
self->pHeader->oldestVersion,
|
||||
self->logicalPageSize,
|
||||
self->physicalPageSize);
|
||||
self->physicalPageSize,
|
||||
self->pHeader->pageCount,
|
||||
self->filePageCount);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -2486,12 +2541,14 @@ public:
|
|||
// Grow the pager file by one page and return it
|
||||
LogicalPageID newLastPageID() {
|
||||
LogicalPageID id = pHeader->pageCount;
|
||||
++pHeader->pageCount;
|
||||
growPager(1);
|
||||
return id;
|
||||
}
|
||||
|
||||
Future<LogicalPageID> newPageID() override { return newPageID_impl(this); }
|
||||
|
||||
void growPager(int64_t pages) { pHeader->pageCount += pages; }
|
||||
|
||||
// Get a new, previously available extent and it's first page ID. The page will be considered in-use after the next
|
||||
// commit regardless of whether or not it was written to, until it is returned to the pager via freePage()
|
||||
ACTOR static Future<LogicalPageID> newExtentPageID_impl(DWALPager* self, QueueID queueID) {
|
||||
|
@ -2521,7 +2578,7 @@ public:
|
|||
// That translates to extentID being same as the return first pageID
|
||||
LogicalPageID newLastExtentID() {
|
||||
LogicalPageID id = pHeader->pageCount;
|
||||
pHeader->pageCount += pagesPerExtent;
|
||||
growPager(pagesPerExtent);
|
||||
return id;
|
||||
}
|
||||
|
||||
|
@ -2541,11 +2598,44 @@ public:
|
|||
if (self->memoryOnly) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
// If a truncation up to include pageID has not yet been completed
|
||||
if (pageID >= self->filePageCount) {
|
||||
// And no extension pending will include pageID
|
||||
if (pageID >= self->filePageCountPending) {
|
||||
// Update extension to a new one that waits on the old one and extends further
|
||||
self->fileExtension = extendToCover(self, pageID, self->fileExtension);
|
||||
}
|
||||
|
||||
// Wait for extension that covers pageID to complete;
|
||||
wait(self->fileExtension);
|
||||
}
|
||||
|
||||
// Note: Not using forwardError here so a write error won't be discovered until commit time.
|
||||
wait(self->pageFile->write(data, blockSize, (int64_t)pageID * blockSize));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> extendToCover(DWALPager* self, uint64_t pageID, Future<Void> previousExtension) {
|
||||
// Calculate new page count, round up to nearest multiple of growth size > pageID
|
||||
state int64_t newPageCount = pageID + SERVER_KNOBS->REDWOOD_PAGEFILE_GROWTH_SIZE_PAGES -
|
||||
(pageID % SERVER_KNOBS->REDWOOD_PAGEFILE_GROWTH_SIZE_PAGES);
|
||||
|
||||
// Indicate that extension to this new count has been started
|
||||
self->filePageCountPending = newPageCount;
|
||||
|
||||
// Wait for any previous extensions to complete
|
||||
wait(previousExtension);
|
||||
|
||||
// Grow the file
|
||||
wait(self->pageFile->truncate(newPageCount * self->physicalPageSize));
|
||||
|
||||
// Indicate that extension to the new count has been completed
|
||||
self->filePageCount = newPageCount;
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> writePhysicalPage(PagerEventReasons reason,
|
||||
unsigned int level,
|
||||
Standalone<VectorRef<PhysicalPageID>> pageIDs,
|
||||
|
@ -3699,6 +3789,13 @@ private:
|
|||
Reference<ArenaPage> headerPage;
|
||||
Header* pHeader;
|
||||
|
||||
// Pages - pages known to be in the file, truncations complete to that size
|
||||
int64_t filePageCount;
|
||||
// Pages that will be in file once fileExtension is ready
|
||||
int64_t filePageCountPending;
|
||||
// Future representing the end of all pending truncations
|
||||
Future<Void> fileExtension;
|
||||
|
||||
int desiredPageSize;
|
||||
int desiredExtentSize;
|
||||
|
||||
|
@ -7154,7 +7251,7 @@ RedwoodRecordRef VersionedBTree::dbEnd(LiteralStringRef("\xff\xff\xff\xff\xff"))
|
|||
class KeyValueStoreRedwood : public IKeyValueStore {
|
||||
public:
|
||||
KeyValueStoreRedwood(std::string filePrefix, UID logID)
|
||||
: m_filePrefix(filePrefix), m_concurrentReads(SERVER_KNOBS->REDWOOD_KVSTORE_CONCURRENT_READS, 0),
|
||||
: m_filename(filePrefix), m_concurrentReads(SERVER_KNOBS->REDWOOD_KVSTORE_CONCURRENT_READS, 0),
|
||||
prefetch(SERVER_KNOBS->REDWOOD_KVSTORE_RANGE_PREFETCH) {
|
||||
|
||||
int pageSize =
|
||||
|
@ -7184,17 +7281,17 @@ public:
|
|||
Future<Void> init() override { return m_init; }
|
||||
|
||||
ACTOR Future<Void> init_impl(KeyValueStoreRedwood* self) {
|
||||
TraceEvent(SevInfo, "RedwoodInit").detail("FilePrefix", self->m_filePrefix);
|
||||
TraceEvent(SevInfo, "RedwoodInit").detail("FilePrefix", self->m_filename);
|
||||
wait(self->m_tree->init());
|
||||
TraceEvent(SevInfo, "RedwoodInitComplete")
|
||||
.detail("FilePrefix", self->m_filePrefix)
|
||||
.detail("Filename", self->m_filename)
|
||||
.detail("Version", self->m_tree->getLastCommittedVersion());
|
||||
self->m_nextCommitVersion = self->m_tree->getLastCommittedVersion() + 1;
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR void shutdown(KeyValueStoreRedwood* self, bool dispose) {
|
||||
TraceEvent(SevInfo, "RedwoodShutdown").detail("FilePrefix", self->m_filePrefix).detail("Dispose", dispose);
|
||||
TraceEvent(SevInfo, "RedwoodShutdown").detail("Filename", self->m_filename).detail("Dispose", dispose);
|
||||
if (self->m_error.canBeSet()) {
|
||||
self->m_error.sendError(actor_cancelled()); // Ideally this should be shutdown_in_progress
|
||||
}
|
||||
|
@ -7206,9 +7303,7 @@ public:
|
|||
self->m_tree->close();
|
||||
wait(closedFuture);
|
||||
self->m_closed.send(Void());
|
||||
TraceEvent(SevInfo, "RedwoodShutdownComplete")
|
||||
.detail("FilePrefix", self->m_filePrefix)
|
||||
.detail("Dispose", dispose);
|
||||
TraceEvent(SevInfo, "RedwoodShutdownComplete").detail("Filename", self->m_filename).detail("Dispose", dispose);
|
||||
delete self;
|
||||
}
|
||||
|
||||
|
@ -7428,7 +7523,7 @@ public:
|
|||
~KeyValueStoreRedwood() override{};
|
||||
|
||||
private:
|
||||
std::string m_filePrefix;
|
||||
std::string m_filename;
|
||||
VersionedBTree* m_tree;
|
||||
Future<Void> m_init;
|
||||
Promise<Void> m_closed;
|
||||
|
@ -9038,7 +9133,7 @@ TEST_CASE("Lredwood/correctness/btree") {
|
|||
g_redwoodMetricsActor = Void(); // Prevent trace event metrics from starting
|
||||
g_redwoodMetrics.clear();
|
||||
|
||||
state std::string fileName = params.get("fileName").orDefault("unittest_pageFile.redwood");
|
||||
state std::string fileName = params.get("Filename").orDefault("unittest_pageFile.redwood");
|
||||
IPager2* pager;
|
||||
|
||||
state bool serialTest = params.getInt("serialTest").orDefault(deterministicRandom()->random01() < 0.25);
|
||||
|
@ -9628,7 +9723,7 @@ TEST_CASE(":/redwood/performance/extentQueue") {
|
|||
TEST_CASE(":/redwood/performance/set") {
|
||||
state SignalableActorCollection actors;
|
||||
|
||||
state std::string fileName = params.get("fileName").orDefault("unittest.redwood");
|
||||
state std::string fileName = params.get("Filename").orDefault("unittest.redwood");
|
||||
state int pageSize = params.getInt("pageSize").orDefault(SERVER_KNOBS->REDWOOD_DEFAULT_PAGE_SIZE);
|
||||
state int extentSize = params.getInt("extentSize").orDefault(SERVER_KNOBS->REDWOOD_DEFAULT_EXTENT_SIZE);
|
||||
state int64_t pageCacheBytes = params.getInt("pageCacheBytes").orDefault(FLOW_KNOBS->PAGE_CACHE_4K);
|
||||
|
|
|
@ -1094,6 +1094,66 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
|
|||
|
||||
typedef decltype(&tLog) TLogFn;
|
||||
|
||||
ACTOR template <class T>
|
||||
Future<T> ioTimeoutError(Future<T> what, double time) {
|
||||
// Before simulation is sped up, IO operations can take a very long time so limit timeouts
|
||||
// to not end until at least time after simulation is sped up.
|
||||
if (g_network->isSimulated() && !g_simulator.speedUpSimulation) {
|
||||
time += std::max(0.0, FLOW_KNOBS->SIM_SPEEDUP_AFTER_SECONDS - now());
|
||||
}
|
||||
Future<Void> end = lowPriorityDelay(time);
|
||||
choose {
|
||||
when(T t = wait(what)) { return t; }
|
||||
when(wait(end)) {
|
||||
Error err = io_timeout();
|
||||
if (g_network->isSimulated() && !g_simulator.getCurrentProcess()->isReliable()) {
|
||||
err = err.asInjectedFault();
|
||||
}
|
||||
TraceEvent(SevError, "IoTimeoutError").error(err);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR template <class T>
|
||||
Future<T> ioDegradedOrTimeoutError(Future<T> what,
|
||||
double errTime,
|
||||
Reference<AsyncVar<bool>> degraded,
|
||||
double degradedTime) {
|
||||
// Before simulation is sped up, IO operations can take a very long time so limit timeouts
|
||||
// to not end until at least time after simulation is sped up.
|
||||
if (g_network->isSimulated() && !g_simulator.speedUpSimulation) {
|
||||
double timeShift = std::max(0.0, FLOW_KNOBS->SIM_SPEEDUP_AFTER_SECONDS - now());
|
||||
errTime += timeShift;
|
||||
degradedTime += timeShift;
|
||||
}
|
||||
|
||||
if (degradedTime < errTime) {
|
||||
Future<Void> degradedEnd = lowPriorityDelay(degradedTime);
|
||||
choose {
|
||||
when(T t = wait(what)) { return t; }
|
||||
when(wait(degradedEnd)) {
|
||||
TEST(true); // TLog degraded
|
||||
TraceEvent(SevWarnAlways, "IoDegraded").log();
|
||||
degraded->set(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Future<Void> end = lowPriorityDelay(errTime - degradedTime);
|
||||
choose {
|
||||
when(T t = wait(what)) { return t; }
|
||||
when(wait(end)) {
|
||||
Error err = io_timeout();
|
||||
if (g_network->isSimulated() && !g_simulator.getCurrentProcess()->isReliable()) {
|
||||
err = err.asInjectedFault();
|
||||
}
|
||||
TraceEvent(SevError, "IoTimeoutError").error(err);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#include "fdbserver/ServerDBInfo.h"
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
||||
|
|
|
@ -1439,7 +1439,7 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
|
|||
cx = openDBOnServer(dbInfo);
|
||||
}
|
||||
|
||||
state Future<Void> disabler = disableConnectionFailuresAfter(450, "Tester");
|
||||
state Future<Void> disabler = disableConnectionFailuresAfter(FLOW_KNOBS->SIM_SPEEDUP_AFTER_SECONDS, "Tester");
|
||||
|
||||
// Change the configuration (and/or create the database) if necessary
|
||||
printf("startingConfiguration:%s start\n", startingConfiguration.toString().c_str());
|
||||
|
|
|
@ -203,6 +203,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
|
|||
init( MAX_TRACE_FIELD_LENGTH, 495 ); // If the value of this is changed, the corresponding default in Trace.cpp should be changed as well
|
||||
init( MAX_TRACE_EVENT_LENGTH, 4000 ); // If the value of this is changed, the corresponding default in Trace.cpp should be changed as well
|
||||
init( ALLOCATION_TRACING_ENABLED, true );
|
||||
init( SIM_SPEEDUP_AFTER_SECONDS, 450 );
|
||||
|
||||
//TDMetrics
|
||||
init( MAX_METRICS, 600 );
|
||||
|
|
|
@ -245,6 +245,7 @@ public:
|
|||
double MAX_CLOGGING_LATENCY;
|
||||
double MAX_BUGGIFIED_DELAY;
|
||||
int SIM_CONNECT_ERROR_MODE;
|
||||
double SIM_SPEEDUP_AFTER_SECONDS;
|
||||
|
||||
// Tracefiles
|
||||
int ZERO_LENGTH_FILE_PAD;
|
||||
|
|
|
@ -844,53 +844,6 @@ Future<Void> timeoutWarningCollector(FutureStream<Void> const& input,
|
|||
Future<bool> quorumEqualsTrue(std::vector<Future<bool>> const& futures, int const& required);
|
||||
Future<Void> lowPriorityDelay(double const& waitTime);
|
||||
|
||||
ACTOR template <class T>
|
||||
Future<T> ioTimeoutError(Future<T> what, double time) {
|
||||
Future<Void> end = lowPriorityDelay(time);
|
||||
choose {
|
||||
when(T t = wait(what)) { return t; }
|
||||
when(wait(end)) {
|
||||
Error err = io_timeout();
|
||||
if (g_network->isSimulated()) {
|
||||
err = err.asInjectedFault();
|
||||
}
|
||||
TraceEvent(SevError, "IoTimeoutError").error(err);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR template <class T>
|
||||
Future<T> ioDegradedOrTimeoutError(Future<T> what,
|
||||
double errTime,
|
||||
Reference<AsyncVar<bool>> degraded,
|
||||
double degradedTime) {
|
||||
if (degradedTime < errTime) {
|
||||
Future<Void> degradedEnd = lowPriorityDelay(degradedTime);
|
||||
choose {
|
||||
when(T t = wait(what)) { return t; }
|
||||
when(wait(degradedEnd)) {
|
||||
TEST(true); // TLog degraded
|
||||
TraceEvent(SevWarnAlways, "IoDegraded").log();
|
||||
degraded->set(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Future<Void> end = lowPriorityDelay(errTime - degradedTime);
|
||||
choose {
|
||||
when(T t = wait(what)) { return t; }
|
||||
when(wait(end)) {
|
||||
Error err = io_timeout();
|
||||
if (g_network->isSimulated()) {
|
||||
err = err.asInjectedFault();
|
||||
}
|
||||
TraceEvent(SevError, "IoTimeoutError").error(err);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR template <class T>
|
||||
Future<Void> streamHelper(PromiseStream<T> output, PromiseStream<Error> errors, Future<T> input) {
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue