Bug fixes involving use of AFCached rate control in simulation.

This commit is contained in:
Steve Atherton 2021-01-24 01:50:09 -08:00
parent 1c2fe4e6ea
commit 0d32bd0c44
5 changed files with 70 additions and 17 deletions

View File

@ -235,7 +235,7 @@ public:
void setRateControl(Reference<IRateControl> const& rc) override { rateControl = rc; }
Reference<IRateControl> const& getRateControl() { return rateControl; }
Reference<IRateControl> const& getRateControl() override { return rateControl; }
virtual void addref() {
ReferenceCounted<AsyncFileCached>::addref();
@ -245,6 +245,9 @@ public:
if (delref_no_destroy()) {
// If this is ever ThreadSafeReferenceCounted...
// setrefCountUnsafe(0);
if(rateControl) {
rateControl->killWaiters(io_error());
}
auto f = quiesce();
//TraceEvent("AsyncFileCachedDel").detail("Filename", filename)

View File

@ -84,6 +84,7 @@ public:
virtual int64_t debugFD() = 0;
// Used for rate control, at present, only AsyncFileCached supports it
virtual Reference<IRateControl> const& getRateControl() { throw unsupported_operation(); }
virtual void setRateControl(Reference<IRateControl> const& rc) { throw unsupported_operation(); }
};

View File

@ -30,6 +30,8 @@ public:
// If all of the allowance is not used the unused units can be given back.
// For convenience, n can safely be negative.
virtual void returnUnused(int n) = 0;
virtual void killWaiters(const Error &e) = 0;
virtual void wakeWaiters() = 0;
virtual void addref() = 0;
virtual void delref() = 0;
};
@ -40,12 +42,14 @@ public:
SpeedLimit(int windowLimit, double windowSeconds) : m_limit(windowLimit), m_seconds(windowSeconds), m_last_update(0), m_budget(0) {
m_last_update = now();
}
virtual ~SpeedLimit() {}
virtual ~SpeedLimit() {
m_stop.send(Never());
}
virtual void addref() { ReferenceCounted<SpeedLimit>::addref(); }
virtual void delref() { ReferenceCounted<SpeedLimit>::delref(); }
void addref() override { ReferenceCounted<SpeedLimit>::addref(); }
void delref() override { ReferenceCounted<SpeedLimit>::delref(); }
virtual Future<Void> getAllowance(unsigned int n) {
Future<Void> getAllowance(unsigned int n) override {
// Replenish budget based on time since last update
double ts = now();
// returnUnused happens to do exactly what we want here
@ -56,20 +60,33 @@ public:
if(m_budget >= 0)
return Void();
// Otherise return the amount of time it will take for the budget to rise to 0.
return delay(m_seconds * -m_budget / m_limit);
return m_stop.getFuture() || delay(m_seconds * -m_budget / m_limit);
}
virtual void returnUnused(int n) {
void returnUnused(int n) override {
if(n < 0)
return;
m_budget = std::min<int64_t>(m_budget + n, m_limit);
}
void wakeWaiters() override {
Promise<Void> p;
p.swap(m_stop);
p.send(Void());
}
void killWaiters(const Error &e) override {
Promise<Void> p;
p.swap(m_stop);
p.sendError(e);
}
private:
int m_limit;
double m_seconds;
double m_last_update;
int64_t m_budget;
Promise<Void> m_stop;
};
// An IRateControl implemenation that enforces no limit
@ -77,9 +94,11 @@ class Unlimited : public IRateControl, ReferenceCounted<Unlimited> {
public:
Unlimited() {}
virtual ~Unlimited() {}
virtual void addref() { ReferenceCounted<Unlimited>::addref(); }
virtual void delref() { ReferenceCounted<Unlimited>::delref(); }
void addref() override { ReferenceCounted<Unlimited>::addref(); }
void delref() override { ReferenceCounted<Unlimited>::delref(); }
virtual Future<Void> getAllowance(unsigned int n) { return Void(); }
virtual void returnUnused(int n) {}
Future<Void> getAllowance(unsigned int n) override { return Void(); }
void returnUnused(int n) override {}
void wakeWaiters() override {}
void killWaiters(const Error &e) override {}
};

View File

@ -1323,6 +1323,7 @@ int SQLiteDB::checkAllPageChecksums() {
.detail("TotalErrors", totalErrors);
ASSERT(!vfsAsyncIsOpen(filename));
ASSERT(!vfsAsyncIsOpen(filename + "-wal"));
return totalErrors;
}
@ -1375,10 +1376,29 @@ void SQLiteDB::open(bool writable) {
// Set Rate control if FLOW_KNOBS are positive
if (FLOW_KNOBS->FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT > 0 && FLOW_KNOBS->FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS > 0) {
Reference<SpeedLimit> rc(new SpeedLimit(FLOW_KNOBS->FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT,
FLOW_KNOBS->FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS));
dbFile.get()->setRateControl(rc);
walFile.get()->setRateControl(rc);
// The writer thread is created before the readers, so it should initialize the rate controls.
if(writable) {
// If either rate control is already initialized, then its file is still being held open by a previous
// usage, likely because ~AsyncFileCached() launched a background actor that is waiting for outstanding
// operations to finish. In case any of those are waiting for time to pass due to the rate control,
// send them an io_error.
if(dbFile.get()->getRateControl()) {
dbFile.get()->getRateControl()->killWaiters(io_error());
}
if(walFile.get()->getRateControl()) {
walFile.get()->getRateControl()->killWaiters(io_error());
}
// Create a new rate control and assign it to both files.
Reference<SpeedLimit> rc(new SpeedLimit(FLOW_KNOBS->FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT,
FLOW_KNOBS->FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS));
dbFile.get()->setRateControl(rc);
walFile.get()->setRateControl(rc);
} else {
// When a reader thread is opened, the rate controls should already be equal and not null
ASSERT(dbFile.get()->getRateControl() == walFile.get()->getRateControl());
ASSERT(dbFile.get()->getRateControl());
}
}
//TraceEvent("KVThreadInitStage").detail("Stage",2).detail("Filename", filename).detail("Writable", writable);
@ -1984,6 +2004,7 @@ KeyValueStoreSQLite::KeyValueStoreSQLite(std::string const& filename, UID id, Ke
//The DB file should not already be open
ASSERT(!vfsAsyncIsOpen(filename));
ASSERT(!vfsAsyncIsOpen(filename + "-wal"));
readCursors.resize(SERVER_KNOBS->SQLITE_READER_THREADS); //< number of read threads

View File

@ -113,8 +113,17 @@ void FlowKnobs::initialize(bool randomize, bool isSimulated) {
init( MAX_EVICT_ATTEMPTS, 100 ); if( randomize && BUGGIFY ) MAX_EVICT_ATTEMPTS = 2;
init( CACHE_EVICTION_POLICY, "random" );
init( PAGE_CACHE_TRUNCATE_LOOKUP_FRACTION, 0.1 ); if( randomize && BUGGIFY ) PAGE_CACHE_TRUNCATE_LOOKUP_FRACTION = 0.0; else if( randomize && BUGGIFY ) PAGE_CACHE_TRUNCATE_LOOKUP_FRACTION = 1.0;
init( FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT, 500 ); if( randomize && BUGGIFY ) FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT = 1e6; // 0 - auto(TODO); Negative - no limit
init( FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS, 0.05 ); if( randomize && BUGGIFY ) FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS = 1; // 0 - auto(TODO); Negative - no limit
init( FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS, -1 );
init( FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT, -1 );
if( randomize && BUGGIFY ) {
// Choose an window between .01 and 1.01 seconds.
FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS = 0.01 + deterministicRandom()->random01();
// Choose 10k to 50k operations per second
int opsPerSecond = deterministicRandom()->randomInt(10000, 50000);
// Set window limit to opsPerSecond scaled down to window size
FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT = opsPerSecond * FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS;
}
//AsyncFileEIO
init( EIO_MAX_PARALLELISM, 4 );