Checksum DiskQueue pages on read, but at a lower priority.
If a server has its data spilled, then it's behind the 5s window. Feeding it data is less important than committing, so we can hide the extra CPU usage from checksumming the read amplified disk queue pages.
This commit is contained in:
parent
ee4721a63f
commit
7f5bc2981f
|
@ -31,6 +31,7 @@
|
|||
#include "fdbserver/IKeyValueStore.h"
|
||||
#include "flow/ActorCollection.h"
|
||||
#include "fdbrpc/FailureMonitor.h"
|
||||
#include "fdbrpc/crc32c.h"
|
||||
#include "fdbserver/IDiskQueue.h"
|
||||
#include "fdbrpc/sim_validation.h"
|
||||
#include "fdbrpc/simulator.h"
|
||||
|
@ -663,7 +664,7 @@ ACTOR Future<Void> updatePersistentData( TLogData* self, Reference<LogData> logD
|
|||
uint32_t length = static_cast<uint32_t>(end.lo - begin.lo);
|
||||
refSpilledTagCount++;
|
||||
|
||||
uint32_t size = msg->second.expectedSize();
|
||||
uint32_t size = 0;
|
||||
for(; msg != tagData->versionMessages.end() && msg->first == currentVersion; ++msg) {
|
||||
// Fast forward until we find a new version.
|
||||
size += msg->second.expectedSize();
|
||||
|
@ -1201,6 +1202,11 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
else
|
||||
messages.serializeBytes( messages2.toStringRef() );
|
||||
} else {
|
||||
// Calculating checksums of read pages is potentially expensive, and storage servers with
|
||||
// spilled data are likely behind and not contributing usefully to the cluster anyway.
|
||||
// Thus, we penalize their priority slightly to make sure that commits have a higher priority
|
||||
// than catching up old storage servers.
|
||||
wait(delay(0, TaskTLogSpilledPeekReply));
|
||||
// FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow.
|
||||
Standalone<VectorRef<KeyValueRef>> kvrefs = wait(
|
||||
self->persistentData->readRange(KeyRangeRef(
|
||||
|
@ -1233,12 +1239,12 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
}
|
||||
if (earlyEnd) break;
|
||||
}
|
||||
wait( self->peekMemoryLimiter.take(TaskTLogPeekReply, commitBytes) );
|
||||
wait( self->peekMemoryLimiter.take(TaskTLogSpilledPeekReply, commitBytes) );
|
||||
state FlowLock::Releaser memoryReservation(self->peekMemoryLimiter, commitBytes);
|
||||
state std::vector<Future<Standalone<StringRef>>> messageReads;
|
||||
messageReads.reserve( commitLocations.size() );
|
||||
for (const auto& pair : commitLocations) {
|
||||
messageReads.push_back( self->rawPersistentQueue->read(pair.first, pair.second, CheckHashes::NO ) );
|
||||
messageReads.push_back( self->rawPersistentQueue->read(pair.first, pair.second, CheckHashes::YES ) );
|
||||
}
|
||||
commitLocations.clear();
|
||||
wait( waitForAll( messageReads ) );
|
||||
|
@ -1260,8 +1266,10 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
messages << int32_t(-1) << entry.version;
|
||||
|
||||
std::vector<StringRef> parsedMessages = wait(parseMessagesForTag(entry.messages, req.tag, logData->logRouterTags));
|
||||
uint32_t checksum = 0;
|
||||
for (StringRef msg : parsedMessages) {
|
||||
messages << msg;
|
||||
checksum = crc32c_append(checksum, msg.begin(), msg.size());
|
||||
}
|
||||
|
||||
lastRefMessageVersion = entry.version;
|
||||
|
|
|
@ -50,6 +50,7 @@ enum {
|
|||
TaskTLogPeek = 8590,
|
||||
TaskTLogCommitReply = 8580,
|
||||
TaskTLogCommit = 8570,
|
||||
TaskTLogSpilledPeekReply = 8567,
|
||||
TaskProxyGetRawCommittedVersion = 8565,
|
||||
TaskProxyResolverReply = 8560,
|
||||
TaskProxyCommitBatcher = 8550,
|
||||
|
|
Loading…
Reference in New Issue