tlogs serve reads to log routers at a low priority, to prevent them from using all their resources catching up a remote dc that has been down for a long time
increase the amount of memory ratekeeper budgets for tlogs so that there is a gap after the spill threshold to prevent temporarily overshooting the budget
This commit is contained in:
parent
71f89f372f
commit
be1a4d74c7
|
@ -2069,7 +2069,7 @@ ACTOR Future<Void> updateDatacenterVersionDifference( ClusterControllerData *sel
|
|||
|
||||
state Optional<TLogInterface> primaryLog;
|
||||
state Optional<TLogInterface> remoteLog;
|
||||
if(self->db.serverInfo->get().recoveryState == RecoveryState::FULLY_RECOVERED) {
|
||||
if(self->db.serverInfo->get().recoveryState >= RecoveryState::ALL_LOGS_RECRUITED) {
|
||||
for(auto& logSet : self->db.serverInfo->get().logSystemConfig.tLogs) {
|
||||
if(logSet.isLocal && logSet.locality != tagLocalitySatellite) {
|
||||
for(auto& tLog : logSet.tLogs) {
|
||||
|
|
|
@ -63,6 +63,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( PARALLEL_GET_MORE_REQUESTS, 32 ); if( randomize && BUGGIFY ) PARALLEL_GET_MORE_REQUESTS = 2;
|
||||
init( MAX_QUEUE_COMMIT_BYTES, 15e6 ); if( randomize && BUGGIFY ) MAX_QUEUE_COMMIT_BYTES = 5000;
|
||||
init( VERSIONS_PER_BATCH, VERSIONS_PER_SECOND/20 ); if( randomize && BUGGIFY ) VERSIONS_PER_BATCH = std::max<int64_t>(1,VERSIONS_PER_SECOND/1000);
|
||||
init( CONCURRENT_LOG_ROUTER_READS, 1 );
|
||||
|
||||
// Data distribution queue
|
||||
init( HEALTH_POLL_TIME, 1.0 );
|
||||
|
@ -322,7 +323,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( STORAGE_HARD_LIMIT_BYTES, 1500e6 ); if( smallStorageTarget ) STORAGE_HARD_LIMIT_BYTES = 4500e3;
|
||||
|
||||
bool smallTlogTarget = randomize && BUGGIFY;
|
||||
init( TARGET_BYTES_PER_TLOG, 2000e6 ); if( smallTlogTarget ) TARGET_BYTES_PER_TLOG = 2000e3;
|
||||
init( TARGET_BYTES_PER_TLOG, 2400e6 ); if( smallTlogTarget ) TARGET_BYTES_PER_TLOG = 2000e3;
|
||||
init( SPRING_BYTES_TLOG, 400e6 ); if( smallTlogTarget ) SPRING_BYTES_TLOG = 200e3;
|
||||
init( TLOG_SPILL_THRESHOLD, 1500e6 ); if( smallTlogTarget ) TLOG_SPILL_THRESHOLD = 1500e3; if( randomize && BUGGIFY ) TLOG_SPILL_THRESHOLD = 0;
|
||||
init( TLOG_HARD_LIMIT_BYTES, 3000e6 ); if( smallTlogTarget ) TLOG_HARD_LIMIT_BYTES = 3000e3;
|
||||
|
|
|
@ -67,6 +67,7 @@ public:
|
|||
int PARALLEL_GET_MORE_REQUESTS;
|
||||
int64_t MAX_QUEUE_COMMIT_BYTES;
|
||||
int64_t VERSIONS_PER_BATCH;
|
||||
int CONCURRENT_LOG_ROUTER_READS;
|
||||
|
||||
// Data distribution queue
|
||||
double HEALTH_POLL_TIME;
|
||||
|
|
|
@ -270,13 +270,14 @@ struct TLogData : NonCopyable {
|
|||
|
||||
PromiseStream<Future<Void>> sharedActors;
|
||||
bool terminated;
|
||||
FlowLock concurrentLogRouterReads;
|
||||
|
||||
TLogData(UID dbgid, IKeyValueStore* persistentData, IDiskQueue * persistentQueue, Reference<AsyncVar<ServerDBInfo>> const& dbInfo)
|
||||
: dbgid(dbgid), instanceID(g_random->randomUniqueID().first()),
|
||||
persistentData(persistentData), rawPersistentQueue(persistentQueue), persistentQueue(new TLogQueue(persistentQueue, dbgid)),
|
||||
dbInfo(dbInfo), queueCommitBegin(0), queueCommitEnd(0), prevVersion(0),
|
||||
diskQueueCommitBytes(0), largeDiskQueueCommitBytes(false),
|
||||
bytesInput(0), bytesDurable(0), updatePersist(Void()), terminated(false)
|
||||
bytesInput(0), bytesDurable(0), updatePersist(Void()), terminated(false), concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS)
|
||||
{
|
||||
}
|
||||
};
|
||||
|
@ -991,6 +992,12 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
Void _ = wait( delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask()) );
|
||||
}
|
||||
|
||||
if( req.tag.locality == tagLocalityLogRouter ) {
|
||||
Void _ = wait( self->concurrentLogRouterReads.take() );
|
||||
state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads);
|
||||
Void _ = wait( delay(0.0, TaskLowPriority) );
|
||||
}
|
||||
|
||||
Version poppedVer = poppedVersion(logData, req.tag);
|
||||
if(poppedVer > req.begin) {
|
||||
TLogPeekReply rep;
|
||||
|
|
|
@ -68,7 +68,6 @@ enum {
|
|||
TaskDataDistribution = 3500,
|
||||
TaskDiskWrite = 3010,
|
||||
TaskUpdateStorage = 3000,
|
||||
TaskBatchCopy = 2900,
|
||||
TaskLowPriority = 2000,
|
||||
|
||||
TaskMinPriority = 1000
|
||||
|
|
Loading…
Reference in New Issue