diff --git a/fdbrpc/AsyncFileCached.actor.h b/fdbrpc/AsyncFileCached.actor.h index 1fe7d9b69f..8acbda1f12 100644 --- a/fdbrpc/AsyncFileCached.actor.h +++ b/fdbrpc/AsyncFileCached.actor.h @@ -235,7 +235,7 @@ public: void setRateControl(Reference const& rc) override { rateControl = rc; } - Reference const& getRateControl() { return rateControl; } + Reference const& getRateControl() override { return rateControl; } virtual void addref() { ReferenceCounted::addref(); @@ -245,6 +245,9 @@ public: if (delref_no_destroy()) { // If this is ever ThreadSafeReferenceCounted... // setrefCountUnsafe(0); + if(rateControl) { + rateControl->killWaiters(io_error()); + } auto f = quiesce(); //TraceEvent("AsyncFileCachedDel").detail("Filename", filename) diff --git a/fdbrpc/IAsyncFile.h b/fdbrpc/IAsyncFile.h index b8fe40ebfd..8656d542bb 100644 --- a/fdbrpc/IAsyncFile.h +++ b/fdbrpc/IAsyncFile.h @@ -84,6 +84,7 @@ public: virtual int64_t debugFD() = 0; // Used for rate control, at present, only AsyncFileCached supports it + virtual Reference const& getRateControl() { throw unsupported_operation(); } virtual void setRateControl(Reference const& rc) { throw unsupported_operation(); } }; diff --git a/fdbrpc/IRateControl.h b/fdbrpc/IRateControl.h index c2316a8fbb..4896edab31 100644 --- a/fdbrpc/IRateControl.h +++ b/fdbrpc/IRateControl.h @@ -30,6 +30,8 @@ public: // If all of the allowance is not used the unused units can be given back. // For convenience, n can safely be negative. virtual void returnUnused(int n) = 0; + virtual void killWaiters(const Error &e) = 0; + virtual void wakeWaiters() = 0; virtual void addref() = 0; virtual void delref() = 0; }; @@ -40,12 +42,14 @@ public: SpeedLimit(int windowLimit, double windowSeconds) : m_limit(windowLimit), m_seconds(windowSeconds), m_last_update(0), m_budget(0) { m_last_update = now(); } - virtual ~SpeedLimit() {} + virtual ~SpeedLimit() { + m_stop.send(Never()); + } - virtual void addref() { ReferenceCounted::addref(); } - virtual void delref() { ReferenceCounted::delref(); } + void addref() override { ReferenceCounted::addref(); } + void delref() override { ReferenceCounted::delref(); } - virtual Future getAllowance(unsigned int n) { + Future getAllowance(unsigned int n) override { // Replenish budget based on time since last update double ts = now(); // returnUnused happens to do exactly what we want here @@ -56,20 +60,33 @@ public: if(m_budget >= 0) return Void(); // Otherise return the amount of time it will take for the budget to rise to 0. - return delay(m_seconds * -m_budget / m_limit); + return m_stop.getFuture() || delay(m_seconds * -m_budget / m_limit); } - virtual void returnUnused(int n) { + void returnUnused(int n) override { if(n < 0) return; m_budget = std::min(m_budget + n, m_limit); } + void wakeWaiters() override { + Promise p; + p.swap(m_stop); + p.send(Void()); + } + + void killWaiters(const Error &e) override { + Promise p; + p.swap(m_stop); + p.sendError(e); + } + private: int m_limit; double m_seconds; double m_last_update; int64_t m_budget; + Promise m_stop; }; // An IRateControl implemenation that enforces no limit @@ -77,9 +94,11 @@ class Unlimited : public IRateControl, ReferenceCounted { public: Unlimited() {} virtual ~Unlimited() {} - virtual void addref() { ReferenceCounted::addref(); } - virtual void delref() { ReferenceCounted::delref(); } + void addref() override { ReferenceCounted::addref(); } + void delref() override { ReferenceCounted::delref(); } - virtual Future getAllowance(unsigned int n) { return Void(); } - virtual void returnUnused(int n) {} + Future getAllowance(unsigned int n) override { return Void(); } + void returnUnused(int n) override {} + void wakeWaiters() override {} + void killWaiters(const Error &e) override {} }; diff --git a/fdbserver/KeyValueStoreSQLite.actor.cpp b/fdbserver/KeyValueStoreSQLite.actor.cpp index 8c592771e5..d1ad923f9c 100644 --- a/fdbserver/KeyValueStoreSQLite.actor.cpp +++ b/fdbserver/KeyValueStoreSQLite.actor.cpp @@ -1323,6 +1323,7 @@ int SQLiteDB::checkAllPageChecksums() { .detail("TotalErrors", totalErrors); ASSERT(!vfsAsyncIsOpen(filename)); + ASSERT(!vfsAsyncIsOpen(filename + "-wal")); return totalErrors; } @@ -1375,10 +1376,29 @@ void SQLiteDB::open(bool writable) { // Set Rate control if FLOW_KNOBS are positive if (FLOW_KNOBS->FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT > 0 && FLOW_KNOBS->FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS > 0) { - Reference rc(new SpeedLimit(FLOW_KNOBS->FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT, - FLOW_KNOBS->FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS)); - dbFile.get()->setRateControl(rc); - walFile.get()->setRateControl(rc); + // The writer thread is created before the readers, so it should initialize the rate controls. + if(writable) { + // If either rate control is already initialized, then its file is still being held open by a previous + // usage, likely because ~AsyncFileCached() launched a background actor that is waiting for outstanding + // operations to finish. In case any of those are waiting for time to pass due to the rate control, + // send them an io_error. + if(dbFile.get()->getRateControl()) { + dbFile.get()->getRateControl()->killWaiters(io_error()); + } + if(walFile.get()->getRateControl()) { + walFile.get()->getRateControl()->killWaiters(io_error()); + } + + // Create a new rate control and assign it to both files. + Reference rc(new SpeedLimit(FLOW_KNOBS->FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT, + FLOW_KNOBS->FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS)); + dbFile.get()->setRateControl(rc); + walFile.get()->setRateControl(rc); + } else { + // When a reader thread is opened, the rate controls should already be equal and not null + ASSERT(dbFile.get()->getRateControl() == walFile.get()->getRateControl()); + ASSERT(dbFile.get()->getRateControl()); + } } //TraceEvent("KVThreadInitStage").detail("Stage",2).detail("Filename", filename).detail("Writable", writable); @@ -1984,6 +2004,7 @@ KeyValueStoreSQLite::KeyValueStoreSQLite(std::string const& filename, UID id, Ke //The DB file should not already be open ASSERT(!vfsAsyncIsOpen(filename)); + ASSERT(!vfsAsyncIsOpen(filename + "-wal")); readCursors.resize(SERVER_KNOBS->SQLITE_READER_THREADS); //< number of read threads diff --git a/flow/Knobs.cpp b/flow/Knobs.cpp index 40b8dda23a..0a1867d1a0 100644 --- a/flow/Knobs.cpp +++ b/flow/Knobs.cpp @@ -113,8 +113,17 @@ void FlowKnobs::initialize(bool randomize, bool isSimulated) { init( MAX_EVICT_ATTEMPTS, 100 ); if( randomize && BUGGIFY ) MAX_EVICT_ATTEMPTS = 2; init( CACHE_EVICTION_POLICY, "random" ); init( PAGE_CACHE_TRUNCATE_LOOKUP_FRACTION, 0.1 ); if( randomize && BUGGIFY ) PAGE_CACHE_TRUNCATE_LOOKUP_FRACTION = 0.0; else if( randomize && BUGGIFY ) PAGE_CACHE_TRUNCATE_LOOKUP_FRACTION = 1.0; - init( FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT, 500 ); if( randomize && BUGGIFY ) FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT = 1e6; // 0 - auto(TODO); Negative - no limit - init( FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS, 0.05 ); if( randomize && BUGGIFY ) FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS = 1; // 0 - auto(TODO); Negative - no limit + + init( FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS, -1 ); + init( FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT, -1 ); + if( randomize && BUGGIFY ) { + // Choose an window between .01 and 1.01 seconds. + FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS = 0.01 + deterministicRandom()->random01(); + // Choose 10k to 50k operations per second + int opsPerSecond = deterministicRandom()->randomInt(10000, 50000); + // Set window limit to opsPerSecond scaled down to window size + FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT = opsPerSecond * FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS; + } //AsyncFileEIO init( EIO_MAX_PARALLELISM, 4 );