added logging to TaskBucket

This commit is contained in:
Evan Tschannen 2019-11-12 19:15:56 -08:00
parent fd5c57d4e3
commit 5fbd9f2ed5
4 changed files with 41 additions and 2 deletions

View File

@ -97,6 +97,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( MUTATION_BLOCK_SIZE, 10000 );
// TaskBucket
init( TASKBUCKET_LOGGING_DELAY, 5.0 );
init( TASKBUCKET_MAX_PRIORITY, 1 );
init( TASKBUCKET_CHECK_TIMEOUT_CHANCE, 0.02 ); if( randomize && BUGGIFY ) TASKBUCKET_CHECK_TIMEOUT_CHANCE = 1.0;
init( TASKBUCKET_TIMEOUT_JITTER_OFFSET, 0.9 );

View File

@ -99,6 +99,7 @@ public:
int MUTATION_BLOCK_SIZE;
// Taskbucket
double TASKBUCKET_LOGGING_DELAY;
int TASKBUCKET_MAX_PRIORITY;
double TASKBUCKET_CHECK_TIMEOUT_CHANCE;
double TASKBUCKET_TIMEOUT_JITTER_OFFSET;

View File

@ -316,6 +316,7 @@ public:
ACTOR static Future<Void> extendTimeoutRepeatedly(Database cx, Reference<TaskBucket> taskBucket, Reference<Task> task) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state double start = now();
state Version versionNow = wait(runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) {
taskBucket->setOptions(tr);
return map(tr->getReadVersion(), [=](Version v) {
@ -329,6 +330,13 @@ public:
// Wait until we are half way to the timeout version of this task
wait(delay(0.8 * (BUGGIFY ? (2 * deterministicRandom()->random01()) : 1.0) * (double)(task->timeoutVersion - (uint64_t)versionNow) / CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
if(now() - start > 300) {
TraceEvent(SevWarnAlways, "TaskBucketLongExtend")
.detail("Duration", now() - start)
.detail("TaskUID", task->key)
.detail("TaskType", task->params[Task::reservedTaskParamKeyType])
.detail("Priority", task->getPriority());
}
// Take the extendMutex lock until we either succeed or stop trying to extend due to failure
wait(task->extendMutex.take());
releaser = FlowLock::Releaser(task->extendMutex, 1);
@ -430,6 +438,7 @@ public:
loop {
// Start running tasks while slots are available and we keep finding work to do
++taskBucket->dispatchSlotChecksStarted;
while(!availableSlots.empty()) {
getTasks.clear();
for(int i = 0, imax = std::min<unsigned int>(getBatchSize, availableSlots.size()); i < imax; ++i)
@ -439,18 +448,22 @@ public:
bool done = false;
for(int i = 0; i < getTasks.size(); ++i) {
if(getTasks[i].isError()) {
++taskBucket->dispatchErrors;
done = true;
continue;
}
Reference<Task> task = getTasks[i].get();
if(task) {
// Start the task
++taskBucket->dispatchDoTasks;
int slot = availableSlots.back();
availableSlots.pop_back();
tasks[slot] = taskBucket->doTask(cx, futureBucket, task);
}
else
else {
++taskBucket->dispatchEmptyTasks;
done = true;
}
}
if(done) {
@ -460,11 +473,16 @@ public:
else
getBatchSize = std::min<unsigned int>(getBatchSize * 2, maxConcurrentTasks);
}
++taskBucket->dispatchSlotChecksComplete;
// Wait for a task to be done. Also, if we have any slots available then stop waiting after pollDelay at the latest.
Future<Void> w = ready(waitForAny(tasks));
if(!availableSlots.empty())
if(!availableSlots.empty()) {
if(*pollDelay > 600) {
TraceEvent(SevWarnAlways, "TaskBucketLongPollDelay").suppressFor(1.0).detail("Delay", *pollDelay);
}
w = w || delay(*pollDelay * (0.9 + deterministicRandom()->random01() / 5)); // Jittered by 20 %, so +/- 10%
}
wait(w);
// Check all of the task slots, any that are finished should be replaced with Never() and their slots added back to availableSlots
@ -783,7 +801,15 @@ TaskBucket::TaskBucket(const Subspace& subspace, bool sysAccess, bool priorityBa
, system_access(sysAccess)
, priority_batch(priorityBatch)
, lock_aware(lockAware)
, cc("TaskBucket")
, dbgid( deterministicRandom()->randomUniqueID() )
, dispatchSlotChecksStarted("DispatchSlotChecksStarted", cc)
, dispatchErrors("DispatchErrors", cc)
, dispatchDoTasks("DispatchDoTasks", cc)
, dispatchEmptyTasks("DispatchEmptyTasks", cc)
, dispatchSlotChecksComplete("DispatchSlotChecksComplete", cc)
{
metricLogger = traceCounters("TaskBucketMetrics", dbgid, CLIENT_KNOBS->TASKBUCKET_LOGGING_DELAY, &cc);
}
TaskBucket::~TaskBucket() {

View File

@ -228,12 +228,23 @@ public:
Database src;
Map<Key, Future<Reference<KeyRangeMap<Version>>>> key_version;
CounterCollection cc;
Counter dispatchSlotChecksStarted;
Counter dispatchErrors;
Counter dispatchDoTasks;
Counter dispatchEmptyTasks;
Counter dispatchSlotChecksComplete;
UID dbgid;
double getTimeoutSeconds() const {
return (double)timeout / CLIENT_KNOBS->CORE_VERSIONSPERSECOND;
}
private:
friend class TaskBucketImpl;
Future<Void> metricLogger;
Subspace prefix;
Subspace active;
Key pauseKey;