Bug fixes involving use of AFCached rate control in simulation.
This commit is contained in:
parent
e56fe02a25
commit
52c2695f59
|
@ -234,7 +234,7 @@ public:
|
|||
|
||||
void setRateControl(Reference<IRateControl> const& rc) override { rateControl = rc; }
|
||||
|
||||
Reference<IRateControl> const& getRateControl() { return rateControl; }
|
||||
Reference<IRateControl> const& getRateControl() override { return rateControl; }
|
||||
|
||||
virtual void addref() {
|
||||
ReferenceCounted<AsyncFileCached>::addref();
|
||||
|
@ -244,6 +244,9 @@ public:
|
|||
if (delref_no_destroy()) {
|
||||
// If this is ever ThreadSafeReferenceCounted...
|
||||
// setrefCountUnsafe(0);
|
||||
if(rateControl) {
|
||||
rateControl->killWaiters(io_error());
|
||||
}
|
||||
|
||||
auto f = quiesce();
|
||||
//TraceEvent("AsyncFileCachedDel").detail("Filename", filename)
|
||||
|
|
|
@ -84,6 +84,7 @@ public:
|
|||
virtual int64_t debugFD() = 0;
|
||||
|
||||
// Used for rate control, at present, only AsyncFileCached supports it
|
||||
virtual Reference<IRateControl> const& getRateControl() { throw unsupported_operation(); }
|
||||
virtual void setRateControl(Reference<IRateControl> const& rc) { throw unsupported_operation(); }
|
||||
};
|
||||
|
||||
|
|
|
@ -30,6 +30,8 @@ public:
|
|||
// If all of the allowance is not used the unused units can be given back.
|
||||
// For convenience, n can safely be negative.
|
||||
virtual void returnUnused(int n) = 0;
|
||||
virtual void killWaiters(const Error &e) = 0;
|
||||
virtual void wakeWaiters() = 0;
|
||||
virtual void addref() = 0;
|
||||
virtual void delref() = 0;
|
||||
};
|
||||
|
@ -40,12 +42,14 @@ public:
|
|||
SpeedLimit(int windowLimit, double windowSeconds) : m_limit(windowLimit), m_seconds(windowSeconds), m_last_update(0), m_budget(0) {
|
||||
m_last_update = now();
|
||||
}
|
||||
virtual ~SpeedLimit() {}
|
||||
virtual ~SpeedLimit() {
|
||||
m_stop.send(Never());
|
||||
}
|
||||
|
||||
virtual void addref() { ReferenceCounted<SpeedLimit>::addref(); }
|
||||
virtual void delref() { ReferenceCounted<SpeedLimit>::delref(); }
|
||||
void addref() override { ReferenceCounted<SpeedLimit>::addref(); }
|
||||
void delref() override { ReferenceCounted<SpeedLimit>::delref(); }
|
||||
|
||||
virtual Future<Void> getAllowance(unsigned int n) {
|
||||
Future<Void> getAllowance(unsigned int n) override {
|
||||
// Replenish budget based on time since last update
|
||||
double ts = now();
|
||||
// returnUnused happens to do exactly what we want here
|
||||
|
@ -56,20 +60,33 @@ public:
|
|||
if(m_budget >= 0)
|
||||
return Void();
|
||||
// Otherise return the amount of time it will take for the budget to rise to 0.
|
||||
return delay(m_seconds * -m_budget / m_limit);
|
||||
return m_stop.getFuture() || delay(m_seconds * -m_budget / m_limit);
|
||||
}
|
||||
|
||||
virtual void returnUnused(int n) {
|
||||
void returnUnused(int n) override {
|
||||
if(n < 0)
|
||||
return;
|
||||
m_budget = std::min<int64_t>(m_budget + n, m_limit);
|
||||
}
|
||||
|
||||
void wakeWaiters() override {
|
||||
Promise<Void> p;
|
||||
p.swap(m_stop);
|
||||
p.send(Void());
|
||||
}
|
||||
|
||||
void killWaiters(const Error &e) override {
|
||||
Promise<Void> p;
|
||||
p.swap(m_stop);
|
||||
p.sendError(e);
|
||||
}
|
||||
|
||||
private:
|
||||
int m_limit;
|
||||
double m_seconds;
|
||||
double m_last_update;
|
||||
int64_t m_budget;
|
||||
Promise<Void> m_stop;
|
||||
};
|
||||
|
||||
// An IRateControl implemenation that enforces no limit
|
||||
|
@ -77,9 +94,11 @@ class Unlimited : public IRateControl, ReferenceCounted<Unlimited> {
|
|||
public:
|
||||
Unlimited() {}
|
||||
virtual ~Unlimited() {}
|
||||
virtual void addref() { ReferenceCounted<Unlimited>::addref(); }
|
||||
virtual void delref() { ReferenceCounted<Unlimited>::delref(); }
|
||||
void addref() override { ReferenceCounted<Unlimited>::addref(); }
|
||||
void delref() override { ReferenceCounted<Unlimited>::delref(); }
|
||||
|
||||
virtual Future<Void> getAllowance(unsigned int n) { return Void(); }
|
||||
virtual void returnUnused(int n) {}
|
||||
Future<Void> getAllowance(unsigned int n) override { return Void(); }
|
||||
void returnUnused(int n) override {}
|
||||
void wakeWaiters() override {}
|
||||
void killWaiters(const Error &e) override {}
|
||||
};
|
||||
|
|
|
@ -1298,6 +1298,7 @@ int SQLiteDB::checkAllPageChecksums() {
|
|||
.detail("TotalErrors", totalErrors);
|
||||
|
||||
ASSERT(!vfsAsyncIsOpen(filename));
|
||||
ASSERT(!vfsAsyncIsOpen(filename + "-wal"));
|
||||
|
||||
return totalErrors;
|
||||
}
|
||||
|
@ -1350,10 +1351,29 @@ void SQLiteDB::open(bool writable) {
|
|||
|
||||
// Set Rate control if FLOW_KNOBS are positive
|
||||
if (FLOW_KNOBS->FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT > 0 && FLOW_KNOBS->FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS > 0) {
|
||||
Reference<SpeedLimit> rc(new SpeedLimit(FLOW_KNOBS->FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT,
|
||||
FLOW_KNOBS->FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS));
|
||||
dbFile.get()->setRateControl(rc);
|
||||
walFile.get()->setRateControl(rc);
|
||||
// The writer thread is created before the readers, so it should initialize the rate controls.
|
||||
if(writable) {
|
||||
// If either rate control is already initialized, then its file is still being held open by a previous
|
||||
// usage, likely because ~AsyncFileCached() launched a background actor that is waiting for outstanding
|
||||
// operations to finish. In case any of those are waiting for time to pass due to the rate control,
|
||||
// send them an io_error.
|
||||
if(dbFile.get()->getRateControl()) {
|
||||
dbFile.get()->getRateControl()->killWaiters(io_error());
|
||||
}
|
||||
if(walFile.get()->getRateControl()) {
|
||||
walFile.get()->getRateControl()->killWaiters(io_error());
|
||||
}
|
||||
|
||||
// Create a new rate control and assign it to both files.
|
||||
Reference<SpeedLimit> rc(new SpeedLimit(FLOW_KNOBS->FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT,
|
||||
FLOW_KNOBS->FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS));
|
||||
dbFile.get()->setRateControl(rc);
|
||||
walFile.get()->setRateControl(rc);
|
||||
} else {
|
||||
// When a reader thread is opened, the rate controls should already be equal and not null
|
||||
ASSERT(dbFile.get()->getRateControl() == walFile.get()->getRateControl());
|
||||
ASSERT(dbFile.get()->getRateControl());
|
||||
}
|
||||
}
|
||||
|
||||
//TraceEvent("KVThreadInitStage").detail("Stage",2).detail("Filename", filename).detail("Writable", writable);
|
||||
|
@ -1957,6 +1977,7 @@ KeyValueStoreSQLite::KeyValueStoreSQLite(std::string const& filename, UID id, Ke
|
|||
|
||||
//The DB file should not already be open
|
||||
ASSERT(!vfsAsyncIsOpen(filename));
|
||||
ASSERT(!vfsAsyncIsOpen(filename + "-wal"));
|
||||
|
||||
readCursors.resize(SERVER_KNOBS->SQLITE_READER_THREADS); //< number of read threads
|
||||
|
||||
|
|
|
@ -92,8 +92,16 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) {
|
|||
init( MAX_EVICT_ATTEMPTS, 100 ); if( randomize && BUGGIFY ) MAX_EVICT_ATTEMPTS = 2;
|
||||
init( CACHE_EVICTION_POLICY, "random" );
|
||||
init( PAGE_CACHE_TRUNCATE_LOOKUP_FRACTION, 0.1 ); if( randomize && BUGGIFY ) PAGE_CACHE_TRUNCATE_LOOKUP_FRACTION = 0.0; else if( randomize && BUGGIFY ) PAGE_CACHE_TRUNCATE_LOOKUP_FRACTION = 1.0;
|
||||
init( FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT, -1 ); if( randomize && BUGGIFY ) FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT = 1e6; // 0 - auto(TODO); Negative - no limit
|
||||
init( FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS, -1 ); if( randomize && BUGGIFY ) FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS = 1; // 0 - auto(TODO); Negative - no limit
|
||||
init( FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS, -1 );
|
||||
init( FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT, -1 );
|
||||
if( randomize && BUGGIFY ) {
|
||||
// Choose an window between .01 and 1.01 seconds.
|
||||
FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS = 0.01 + deterministicRandom()->random01();
|
||||
// Choose 10k to 50k operations per second
|
||||
int opsPerSecond = deterministicRandom()->randomInt(10000, 50000);
|
||||
// Set window limit to opsPerSecond scaled down to window size
|
||||
FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT = opsPerSecond * FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS;
|
||||
}
|
||||
|
||||
//AsyncFileEIO
|
||||
init( EIO_MAX_PARALLELISM, 4 );
|
||||
|
|
Loading…
Reference in New Issue