Merge branch 'add-throttling-on-AsyncfileCached' of github.com:sfc-gh-clin/foundationdb into afcache-write-limit
This commit is contained in:
parent
14af5857ed
commit
e56fe02a25
|
@ -230,6 +230,12 @@ public:
|
|||
return filename;
|
||||
}
|
||||
|
||||
std::vector<AFCPage*> const& getFlushable() { return flushable; }
|
||||
|
||||
void setRateControl(Reference<IRateControl> const& rc) override { rateControl = rc; }
|
||||
|
||||
Reference<IRateControl> const& getRateControl() { return rateControl; }
|
||||
|
||||
virtual void addref() {
|
||||
ReferenceCounted<AsyncFileCached>::addref();
|
||||
//TraceEvent("AsyncFileCachedAddRef").detail("Filename", filename).detail("Refcount", debugGetReferenceCount()).backtrace();
|
||||
|
@ -262,6 +268,7 @@ private:
|
|||
Reference<EvictablePageCache> pageCache;
|
||||
Future<Void> currentTruncate;
|
||||
int64_t currentTruncateSize;
|
||||
Reference<IRateControl> rateControl;
|
||||
|
||||
// Map of pointers which hold page buffers for pages which have been overwritten
|
||||
// but at the time of write there were still readZeroCopy holders.
|
||||
|
@ -287,8 +294,10 @@ private:
|
|||
Int64MetricHandle countCachePageReadsMerged;
|
||||
Int64MetricHandle countCacheReadBytes;
|
||||
|
||||
AsyncFileCached( Reference<IAsyncFile> uncached, const std::string& filename, int64_t length, Reference<EvictablePageCache> pageCache )
|
||||
: uncached(uncached), filename(filename), length(length), prevLength(length), pageCache(pageCache), currentTruncate(Void()), currentTruncateSize(0) {
|
||||
AsyncFileCached(Reference<IAsyncFile> uncached, const std::string& filename, int64_t length,
|
||||
Reference<EvictablePageCache> pageCache)
|
||||
: uncached(uncached), filename(filename), length(length), prevLength(length), pageCache(pageCache),
|
||||
currentTruncate(Void()), currentTruncateSize(0), rateControl(nullptr) {
|
||||
if( !g_network->isSimulated() ) {
|
||||
countFileCacheWrites.init(LiteralStringRef("AsyncFile.CountFileCacheWrites"), filename);
|
||||
countFileCacheReads.init(LiteralStringRef("AsyncFile.CountFileCacheReads"), filename);
|
||||
|
@ -503,6 +512,10 @@ struct AFCPage : public EvictablePage, public FastAllocated<AFCPage> {
|
|||
wait( self->notReading && self->notFlushing );
|
||||
|
||||
if (dirty) {
|
||||
// Wait for rate control if it is set
|
||||
if (self->owner->getRateControl())
|
||||
wait(self->owner->getRateControl()->getAllowance(1));
|
||||
|
||||
if ( self->pageOffset + self->pageCache->pageSize > self->owner->length ) {
|
||||
ASSERT(self->pageOffset < self->owner->length);
|
||||
memset( static_cast<uint8_t *>(self->data) + self->owner->length - self->pageOffset, 0, self->pageCache->pageSize - (self->owner->length - self->pageOffset) );
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
|
||||
#include <ctime>
|
||||
#include "flow/flow.h"
|
||||
#include "fdbrpc/IRateControl.h"
|
||||
|
||||
// All outstanding operations must be cancelled before the destructor of IAsyncFile is called.
|
||||
// The desirability of the above semantic is disputed. Some classes (AsyncFileBlobStore,
|
||||
|
@ -81,6 +82,9 @@ public:
|
|||
virtual void releaseZeroCopy( void* data, int length, int64_t offset ) {}
|
||||
|
||||
virtual int64_t debugFD() = 0;
|
||||
|
||||
// Used for rate control, at present, only AsyncFileCached supports it
|
||||
virtual void setRateControl(Reference<IRateControl> const& rc) { throw unsupported_operation(); }
|
||||
};
|
||||
|
||||
typedef void (*runCycleFuncPtr)();
|
||||
|
|
|
@ -37,9 +37,8 @@ public:
|
|||
// An IRateControl implemenation that allows at most hands out at most windowLimit units of 'credit' in windowSeconds seconds
|
||||
class SpeedLimit : public IRateControl, ReferenceCounted<SpeedLimit> {
|
||||
public:
|
||||
SpeedLimit(int windowLimit, int windowSeconds) : m_limit(windowLimit), m_seconds(windowSeconds), m_last_update(0), m_budget(0) {
|
||||
m_budget_max = m_limit * m_seconds;
|
||||
m_last_update = timer();
|
||||
SpeedLimit(int windowLimit, double windowSeconds) : m_limit(windowLimit), m_seconds(windowSeconds), m_last_update(0), m_budget(0) {
|
||||
m_last_update = now();
|
||||
}
|
||||
virtual ~SpeedLimit() {}
|
||||
|
||||
|
@ -48,7 +47,7 @@ public:
|
|||
|
||||
virtual Future<Void> getAllowance(unsigned int n) {
|
||||
// Replenish budget based on time since last update
|
||||
double ts = timer();
|
||||
double ts = now();
|
||||
// returnUnused happens to do exactly what we want here
|
||||
returnUnused((ts - m_last_update) / m_seconds * m_limit);
|
||||
m_last_update = ts;
|
||||
|
@ -63,7 +62,7 @@ public:
|
|||
virtual void returnUnused(int n) {
|
||||
if(n < 0)
|
||||
return;
|
||||
m_budget = std::min<int64_t>(m_budget + n, m_budget_max);
|
||||
m_budget = std::min<int64_t>(m_budget + n, m_limit);
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -71,7 +70,6 @@ private:
|
|||
double m_seconds;
|
||||
double m_last_update;
|
||||
int64_t m_budget;
|
||||
int64_t m_budget_max;
|
||||
};
|
||||
|
||||
// An IRateControl implemenation that enforces no limit
|
||||
|
|
|
@ -1348,6 +1348,14 @@ void SQLiteDB::open(bool writable) {
|
|||
if (dbFile.isError()) throw dbFile.getError(); // If we've failed to open the file, throw an exception
|
||||
if (walFile.isError()) throw walFile.getError(); // If we've failed to open the file, throw an exception
|
||||
|
||||
// Set Rate control if FLOW_KNOBS are positive
|
||||
if (FLOW_KNOBS->FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT > 0 && FLOW_KNOBS->FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS > 0) {
|
||||
Reference<SpeedLimit> rc(new SpeedLimit(FLOW_KNOBS->FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT,
|
||||
FLOW_KNOBS->FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS));
|
||||
dbFile.get()->setRateControl(rc);
|
||||
walFile.get()->setRateControl(rc);
|
||||
}
|
||||
|
||||
//TraceEvent("KVThreadInitStage").detail("Stage",2).detail("Filename", filename).detail("Writable", writable);
|
||||
|
||||
// Now that the file itself is open and locked, let sqlite open the database
|
||||
|
@ -1950,7 +1958,7 @@ KeyValueStoreSQLite::KeyValueStoreSQLite(std::string const& filename, UID id, Ke
|
|||
//The DB file should not already be open
|
||||
ASSERT(!vfsAsyncIsOpen(filename));
|
||||
|
||||
readCursors.resize(64); //< number of read threads
|
||||
readCursors.resize(SERVER_KNOBS->SQLITE_READER_THREADS); //< number of read threads
|
||||
|
||||
sqlite3_soft_heap_limit64( SERVER_KNOBS->SOFT_HEAP_LIMIT ); // SOMEDAY: Is this a performance issue? Should we drop the cache sizes for individual threads?
|
||||
TaskPriority taskId = g_network->getCurrentTask();
|
||||
|
|
|
@ -250,6 +250,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
|
|||
init( SQLITE_BTREE_PAGE_USABLE, 4096 - 8); // pageSize - reserveSize for page checksum
|
||||
init( SQLITE_CHUNK_SIZE_PAGES, 25600 ); // 100MB
|
||||
init( SQLITE_CHUNK_SIZE_PAGES_SIM, 1024 ); // 4MB
|
||||
init( SQLITE_READER_THREADS, 64 ); // number of read threads
|
||||
|
||||
// Maximum and minimum cell payload bytes allowed on primary page as calculated in SQLite.
|
||||
// These formulas are copied from SQLite, using its hardcoded constants, so if you are
|
||||
|
|
|
@ -222,6 +222,7 @@ public:
|
|||
double SQLITE_FRAGMENT_MIN_SAVINGS;
|
||||
int SQLITE_CHUNK_SIZE_PAGES;
|
||||
int SQLITE_CHUNK_SIZE_PAGES_SIM;
|
||||
int SQLITE_READER_THREADS;
|
||||
|
||||
// KeyValueStoreSqlite spring cleaning
|
||||
double SPRING_CLEANING_NO_ACTION_INTERVAL;
|
||||
|
|
|
@ -339,7 +339,7 @@ public:
|
|||
const double currentTime = now();
|
||||
double longest = 0;
|
||||
UID UIDofLongest;
|
||||
for (const auto kv: startTimeMap) {
|
||||
for (const auto& kv: startTimeMap) {
|
||||
const double currentRunningTime = currentTime - kv.second;
|
||||
if (longest < currentRunningTime) {
|
||||
longest = currentRunningTime;
|
||||
|
|
|
@ -92,6 +92,8 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) {
|
|||
init( MAX_EVICT_ATTEMPTS, 100 ); if( randomize && BUGGIFY ) MAX_EVICT_ATTEMPTS = 2;
|
||||
init( CACHE_EVICTION_POLICY, "random" );
|
||||
init( PAGE_CACHE_TRUNCATE_LOOKUP_FRACTION, 0.1 ); if( randomize && BUGGIFY ) PAGE_CACHE_TRUNCATE_LOOKUP_FRACTION = 0.0; else if( randomize && BUGGIFY ) PAGE_CACHE_TRUNCATE_LOOKUP_FRACTION = 1.0;
|
||||
init( FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT, -1 ); if( randomize && BUGGIFY ) FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT = 1e6; // 0 - auto(TODO); Negative - no limit
|
||||
init( FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS, -1 ); if( randomize && BUGGIFY ) FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS = 1; // 0 - auto(TODO); Negative - no limit
|
||||
|
||||
//AsyncFileEIO
|
||||
init( EIO_MAX_PARALLELISM, 4 );
|
||||
|
|
|
@ -112,6 +112,9 @@ public:
|
|||
double PAGE_CACHE_TRUNCATE_LOOKUP_FRACTION;
|
||||
double TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY;
|
||||
int TOO_MANY_CONNECTIONS_CLOSED_TIMEOUT;
|
||||
int PEER_UNAVAILABLE_FOR_LONG_TIME_TIMEOUT;
|
||||
int FLOW_CACHEDFILE_WRITE_WINDOW_LIMIT;
|
||||
double FLOW_CACHEDFILE_WRITE_WINDOW_SECONDS;
|
||||
|
||||
//AsyncFileEIO
|
||||
int EIO_MAX_PARALLELISM;
|
||||
|
|
Loading…
Reference in New Issue