Add an option to get minKnownCommittedVersion from Proxies

The backup worker needs to use this version for popping when running in a NOOP
mode. This option is added to GetReadVersionRequest and proxies will send back
minKnownCommittedVersion if the option is set.

Also add a couple of knobs for backup workers.
This commit is contained in:
Jingyu Zhou 2020-01-07 14:15:29 -08:00
parent 7989f3f015
commit 1311fec45a
5 changed files with 40 additions and 31 deletions

View File

@ -173,6 +173,7 @@ struct GetReadVersionRequest : TimedRequest {
PRIORITY_BATCH = 1 << 24
};
enum {
FLAG_USE_MIN_KNOWN_COMMITTED_VERSION = 4,
FLAG_USE_PROVISIONAL_PROXIES = 2,
FLAG_CAUSAL_READ_RISKY = 1,
FLAG_PRIORITY_MASK = PRIORITY_SYSTEM_IMMEDIATE,

View File

@ -206,7 +206,7 @@ static Value makePadding(int size) {
// Saves messages in the range of [0, numMsg) to a file and then remove these
// messages. The file format is a sequence of (Version, sub#, msgSize, message),
ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int numMsg) {
state int blockSize = 1 << 20; // TODO: make this a knob.
state int blockSize = SERVER_KNOBS->BACKUP_FILE_BLOCK_BYTES;
state Reference<IBackupFile> logFile =
wait(self->container->writeTaggedLogFile(self->messages[0].getVersion(), popVersion, blockSize, self->tag.id));
TraceEvent("OpenMutationFile", self->myId)
@ -357,37 +357,31 @@ ACTOR Future<Void> pullAsyncData(BackupData* self) {
}
}
// Get a recently committed version from proxies, which is actually a read version.
ACTOR Future<Version> getCommittedVersion(Database cx) {
state Transaction tr(cx);
loop {
try {
Version v = wait(tr.getReadVersion());
return v;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<Void> monitorBackupKeyOrPullData(BackupData* self) {
state Future<Void> started = monitorBackupStarted(self);
loop {
state Future<Void> started = monitorBackupStarted(self);
loop choose {
when(wait(started)) { break; }
when(Version version = wait(getCommittedVersion(self->cx))) {
// Get a committed version and pop it so that TLog can remove the data.
self->savedVersion = std::max(version - 5000000, self->savedVersion);
self->minKnownCommittedVersion = std::max(version - 5000000, self->minKnownCommittedVersion);
wait(delay(2.0, self->cx->taskID)); // TODO: make delay a knob
self->pop(); // Pop while the worker is in this NOOP state.
loop {
GetReadVersionRequest request(1, GetReadVersionRequest::PRIORITY_DEFAULT |
GetReadVersionRequest::FLAG_USE_MIN_KNOWN_COMMITTED_VERSION);
choose {
when(wait(started)) { break; }
when(wait(self->cx->onMasterProxiesChanged())) {}
when(GetReadVersionReply reply = wait(loadBalance(self->cx->getMasterProxies(false),
&MasterProxyInterface::getConsistentReadVersion,
request, self->cx->taskID))) {
self->savedVersion = std::max(reply.version, self->savedVersion);
self->minKnownCommittedVersion = std::max(reply.version, self->minKnownCommittedVersion);
self->pop(); // Pop while the worker is in this NOOP state.
wait(delay(SERVER_KNOBS->BACKUP_NOOP_POP_DELAY, self->cx->taskID));
}
}
}
}
TraceEvent("BackupWorkerStartPullData", self->myId);
wait(pullAsyncData(self));
return Void();
TraceEvent("BackupWorkerStartPullData", self->myId);
wait(pullAsyncData(self));
}
}
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db, LogEpoch recoveryCount,

View File

@ -356,6 +356,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
// Backup Worker
init( BACKUP_TIMEOUT, 0.4 );
init( BACKUP_NOOP_POP_DELAY, 5.0 );
init( BACKUP_FILE_BLOCK_BYTES, 1024 * 1024 );
//Cluster Controller
init( CLUSTER_CONTROLLER_LOGGING_DELAY, 5.0 );

View File

@ -294,6 +294,8 @@ public:
// Backup Worker
double BACKUP_TIMEOUT; // master's reaction time for backup failure
double BACKUP_NOOP_POP_DELAY;
int BACKUP_FILE_BLOCK_BYTES;
//Cluster Controller
double CLUSTER_CONTROLLER_LOGGING_DELAY;

View File

@ -1174,14 +1174,23 @@ struct TransactionRateInfo {
}
};
ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::vector<GetReadVersionRequest> requests, ProxyStats *stats) {
ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::vector<GetReadVersionRequest> requests,
ProxyStats* stats, Version minKnownCommittedVersion) {
GetReadVersionReply reply = wait(replyFuture);
GetReadVersionReply minKCVReply = reply;
minKCVReply.version = minKnownCommittedVersion;
double end = timer();
for(GetReadVersionRequest const& request : requests) {
if(request.priority() >= GetReadVersionRequest::PRIORITY_DEFAULT) {
stats->grvLatencyBands.addMeasurement(end - request.requestTime());
}
request.reply.send(reply);
if (request.flags & GetReadVersionRequest::FLAG_USE_MIN_KNOWN_COMMITTED_VERSION) {
request.reply.send(minKCVReply);
} else {
request.reply.send(reply);
}
}
return Void();
@ -1299,7 +1308,8 @@ ACTOR static Future<Void> transactionStarter(
for (int i = 0; i < start.size(); i++) {
if (start[i].size()) {
Future<GetReadVersionReply> readVersionReply = getLiveCommittedVersion(commitData, i, &otherProxies, debugID, transactionsStarted[i], systemTransactionsStarted[i], defaultPriTransactionsStarted[i], batchPriTransactionsStarted[i]);
addActor.send(sendGrvReplies(readVersionReply, start[i], &commitData->stats));
addActor.send(sendGrvReplies(readVersionReply, start[i], &commitData->stats,
commitData->minKnownCommittedVersion));
// for now, base dynamic batching on the time for normal requests (not read_risky)
if (i == 0) {