Merge pull request #903 from ajbeamon/move-batcher-into-proxy
Move the sort of generic batcher from fdbrpc and make it specific to …
This commit is contained in:
commit
6f4ad84777
|
@ -1,114 +0,0 @@
|
|||
/*
|
||||
* batcher.actor.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source version.
|
||||
#if defined(NO_INTELLISENSE) && !defined(FLOW_BATCHER_ACTOR_G_H)
|
||||
#define FLOW_BATCHER_ACTOR_G_H
|
||||
#include "batcher.actor.g.h"
|
||||
#elif !defined(FLOW_BATCHER_ACTOR_H)
|
||||
#define FLOW_BATCHER_ACTOR_H
|
||||
|
||||
#include "flow/actorcompiler.h"
|
||||
#include "flow/flow.h"
|
||||
#include "flow/Stats.h"
|
||||
|
||||
template <class X>
|
||||
void logOnReceive(X x) { }
|
||||
|
||||
void logOnReceive(CommitTransactionRequest x) {
|
||||
if(x.debugID.present())
|
||||
g_traceBatch.addEvent("CommitDebug", x.debugID.get().first(), "MasterProxyServer.batcher");
|
||||
}
|
||||
|
||||
template <class X>
|
||||
bool firstInBatch(X x) { return false; }
|
||||
|
||||
bool firstInBatch(CommitTransactionRequest x) {
|
||||
return x.firstInBatch();
|
||||
}
|
||||
|
||||
ACTOR template <class X>
|
||||
Future<Void> batcher(PromiseStream<std::pair<std::vector<X>, int> > out, FutureStream<X> in, double avgMinDelay, double* avgMaxDelay, double emptyBatchTimeout, int maxCount, int desiredBytes, int maxBytes, Optional<PromiseStream<Void>> batchStartedStream, int64_t *commitBatchesMemBytesCount, int64_t commitBatchesMemBytesLimit, int taskID = TaskDefaultDelay, Counter* counter = 0)
|
||||
{
|
||||
Void _ = wait( delayJittered(*avgMaxDelay, taskID) ); // smooth out
|
||||
// This is set up to deliver even zero-size batches if emptyBatchTimeout elapses, because that's what master proxy wants. The source control history
|
||||
// contains a version that does not.
|
||||
|
||||
state double lastBatch = 0;
|
||||
|
||||
loop {
|
||||
state Future<Void> timeout;
|
||||
state std::vector<X> batch;
|
||||
state int batchBytes = 0;
|
||||
|
||||
if(emptyBatchTimeout <= 0)
|
||||
timeout = Never();
|
||||
else
|
||||
timeout = delayJittered(emptyBatchTimeout, taskID);
|
||||
|
||||
while (!timeout.isReady() && !(batch.size() == maxCount || batchBytes >= desiredBytes)) {
|
||||
choose {
|
||||
when ( X x = waitNext(in) ) {
|
||||
int bytes = getBytes(x);
|
||||
// Drop requests if memory is under severe pressure
|
||||
if (*commitBatchesMemBytesCount + bytes > commitBatchesMemBytesLimit) {
|
||||
x.reply.sendError(proxy_memory_limit_exceeded());
|
||||
TraceEvent(SevWarnAlways, "ProxyCommitBatchMemoryThresholdExceeded").suppressFor(60).detail("CommitBatchesMemBytesCount", *commitBatchesMemBytesCount).detail("CommitBatchesMemLimit", commitBatchesMemBytesLimit);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Process requests in the normal case
|
||||
if (counter) ++*counter;
|
||||
logOnReceive(x);
|
||||
if (!batch.size()) {
|
||||
if(batchStartedStream.present())
|
||||
batchStartedStream.get().send(Void());
|
||||
if (now() - lastBatch > *avgMaxDelay)
|
||||
timeout = delayJittered(avgMinDelay, taskID);
|
||||
else
|
||||
timeout = delayJittered(*avgMaxDelay - (now() - lastBatch), taskID);
|
||||
}
|
||||
|
||||
bool first = firstInBatch( x );
|
||||
if((batchBytes + bytes > maxBytes || first) && batch.size()) {
|
||||
out.send({ batch, batchBytes });
|
||||
lastBatch = now();
|
||||
if(batchStartedStream.present())
|
||||
batchStartedStream.get().send(Void());
|
||||
timeout = delayJittered(*avgMaxDelay, taskID);
|
||||
batch = std::vector<X>();
|
||||
batchBytes = 0;
|
||||
}
|
||||
|
||||
batch.push_back(x);
|
||||
batchBytes += bytes;
|
||||
*commitBatchesMemBytesCount += bytes;
|
||||
}
|
||||
when ( Void _ = wait( timeout ) ) {}
|
||||
}
|
||||
}
|
||||
out.send({std::move(batch), batchBytes});
|
||||
lastBatch = now();
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
|
@ -71,9 +71,6 @@
|
|||
<ActorCompiler Include="AsyncFileNonDurable.actor.h">
|
||||
<EnableCompile>false</EnableCompile>
|
||||
</ActorCompiler>
|
||||
<ActorCompiler Include="batcher.actor.h">
|
||||
<EnableCompile>false</EnableCompile>
|
||||
</ActorCompiler>
|
||||
<ClInclude Include="AsyncFileWriteChecker.h" />
|
||||
<ClInclude Include="ContinuousSample.h" />
|
||||
<ClInclude Include="crc32c.h" />
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
<ActorCompiler Include="FailureMonitor.actor.cpp" />
|
||||
<ActorCompiler Include="FlowTransport.actor.cpp" />
|
||||
<ActorCompiler Include="ActorFuzz.actor.cpp" />
|
||||
<ActorCompiler Include="batcher.actor.h" />
|
||||
<ActorCompiler Include="AsyncFileReadAhead.actor.h" />
|
||||
<ActorCompiler Include="IAsyncFile.actor.cpp" />
|
||||
</ItemGroup>
|
||||
|
|
|
@ -32,7 +32,6 @@
|
|||
#include "IKeyValueStore.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbrpc/sim_validation.h"
|
||||
#include "fdbrpc/batcher.actor.h"
|
||||
#include "fdbclient/Notified.h"
|
||||
#include "fdbclient/KeyRangeMap.h"
|
||||
#include "ConflictSet.h"
|
||||
|
@ -110,12 +109,13 @@ ACTOR Future<Void> getRate(UID myID, MasterInterface master, int64_t* inTransact
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> queueTransactionStartRequests(std::priority_queue< std::pair<GetReadVersionRequest, int64_t>,
|
||||
std::vector< std::pair<GetReadVersionRequest, int64_t> > > *transactionQueue,
|
||||
ACTOR Future<Void> queueTransactionStartRequests(
|
||||
std::priority_queue< std::pair<GetReadVersionRequest, int64_t>, std::vector< std::pair<GetReadVersionRequest, int64_t> > > *transactionQueue,
|
||||
FutureStream<GetReadVersionRequest> readVersionRequests,
|
||||
PromiseStream<Void> GRVTimer, double *lastGRVTime,
|
||||
double *GRVBatchTime, FutureStream<double> replyTimes,
|
||||
ProxyStats* stats) {
|
||||
ProxyStats* stats)
|
||||
{
|
||||
state int64_t counter = 0;
|
||||
loop choose{
|
||||
when(GetReadVersionRequest req = waitNext(readVersionRequests)) {
|
||||
|
@ -189,6 +189,7 @@ struct ProxyCommitData {
|
|||
bool firstProxy;
|
||||
double lastCoalesceTime;
|
||||
bool locked;
|
||||
double commitBatchInterval;
|
||||
|
||||
int64_t localCommitBatchesStarted;
|
||||
NotifiedVersion latestLocalCommitBatchResolving;
|
||||
|
@ -231,9 +232,9 @@ struct ProxyCommitData {
|
|||
committedVersion(recoveryTransactionVersion), version(0), minKnownCommittedVersion(0),
|
||||
lastVersionTime(0), commitVersionRequestNumber(1), mostRecentProcessedRequestNumber(0),
|
||||
getConsistentReadVersion(getConsistentReadVersion), commit(commit), lastCoalesceTime(0),
|
||||
localCommitBatchesStarted(0), locked(false), firstProxy(firstProxy),
|
||||
cx(openDBOnServer(db, TaskDefaultEndpoint, true, true)), singleKeyMutationEvent(LiteralStringRef("SingleKeyMutation")),
|
||||
commitBatchesMemBytesCount(0), lastTxsPop(0)
|
||||
localCommitBatchesStarted(0), locked(false), commitBatchInterval(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MIN),
|
||||
firstProxy(firstProxy), cx(openDBOnServer(db, TaskDefaultEndpoint, true, true)),
|
||||
singleKeyMutationEvent(LiteralStringRef("SingleKeyMutation")), commitBatchesMemBytesCount(0), lastTxsPop(0)
|
||||
{}
|
||||
};
|
||||
|
||||
|
@ -318,10 +319,75 @@ struct ResolutionRequestBuilder {
|
|||
}
|
||||
};
|
||||
|
||||
ACTOR Future<Void> commitBatcher(ProxyCommitData *commitData, PromiseStream<std::pair<std::vector<CommitTransactionRequest>, int> > out, FutureStream<CommitTransactionRequest> in, int desiredBytes, int64_t memBytesLimit) {
|
||||
Void _ = wait(delayJittered(commitData->commitBatchInterval, TaskProxyCommitBatcher));
|
||||
|
||||
state double lastBatch = 0;
|
||||
|
||||
loop{
|
||||
state Future<Void> timeout;
|
||||
state std::vector<CommitTransactionRequest> batch;
|
||||
state int batchBytes = 0;
|
||||
|
||||
if(SERVER_KNOBS->MAX_COMMIT_BATCH_INTERVAL <= 0) {
|
||||
timeout = Never();
|
||||
}
|
||||
else {
|
||||
timeout = delayJittered(SERVER_KNOBS->MAX_COMMIT_BATCH_INTERVAL, TaskProxyCommitBatcher);
|
||||
}
|
||||
|
||||
while(!timeout.isReady() && !(batch.size() == SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_COUNT_MAX || batchBytes >= desiredBytes)) {
|
||||
choose{
|
||||
when(CommitTransactionRequest req = waitNext(in)) {
|
||||
int bytes = getBytes(req);
|
||||
|
||||
// Drop requests if memory is under severe pressure
|
||||
if(commitData->commitBatchesMemBytesCount + bytes > memBytesLimit) {
|
||||
req.reply.sendError(proxy_memory_limit_exceeded());
|
||||
TraceEvent(SevWarnAlways, "ProxyCommitBatchMemoryThresholdExceeded").suppressFor(60).detail("MemBytesCount", commitData->commitBatchesMemBytesCount).detail("MemLimit", memBytesLimit);
|
||||
continue;
|
||||
}
|
||||
|
||||
++commitData->stats.txnCommitIn;
|
||||
|
||||
if(req.debugID.present()) {
|
||||
g_traceBatch.addEvent("CommitDebug", req.debugID.get().first(), "MasterProxyServer.batcher");
|
||||
}
|
||||
|
||||
if(!batch.size()) {
|
||||
commitData->commitBatchStartNotifications.send(Void());
|
||||
if(now() - lastBatch > commitData->commitBatchInterval) {
|
||||
timeout = delayJittered(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE, TaskProxyCommitBatcher);
|
||||
}
|
||||
else {
|
||||
timeout = delayJittered(commitData->commitBatchInterval - (now() - lastBatch), TaskProxyCommitBatcher);
|
||||
}
|
||||
}
|
||||
|
||||
if((batchBytes + bytes > CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT || req.firstInBatch()) && batch.size()) {
|
||||
out.send({ batch, batchBytes });
|
||||
lastBatch = now();
|
||||
commitData->commitBatchStartNotifications.send(Void());
|
||||
timeout = delayJittered(commitData->commitBatchInterval, TaskProxyCommitBatcher);
|
||||
batch = std::vector<CommitTransactionRequest>();
|
||||
batchBytes = 0;
|
||||
}
|
||||
|
||||
batch.push_back(req);
|
||||
batchBytes += bytes;
|
||||
commitData->commitBatchesMemBytesCount += bytes;
|
||||
}
|
||||
when(Void _ = wait(timeout)) {}
|
||||
}
|
||||
}
|
||||
out.send({ std::move(batch), batchBytes });
|
||||
lastBatch = now();
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> commitBatch(
|
||||
ProxyCommitData* self,
|
||||
vector<CommitTransactionRequest> trs,
|
||||
double *commitBatchTime,
|
||||
int currentBatchMemBytesCount)
|
||||
{
|
||||
state int64_t localBatchNumber = ++self->localCommitBatchesStarted;
|
||||
|
@ -920,10 +986,10 @@ ACTOR Future<Void> commitBatch(
|
|||
|
||||
// Dynamic batching for commits
|
||||
double target_latency = (now() - t1) * SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_LATENCY_FRACTION;
|
||||
*commitBatchTime =
|
||||
self->commitBatchInterval =
|
||||
std::max(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MIN,
|
||||
std::min(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MAX,
|
||||
target_latency * SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_SMOOTHER_ALPHA + *commitBatchTime * (1-SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_SMOOTHER_ALPHA)));
|
||||
target_latency * SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_SMOOTHER_ALPHA + self->commitBatchInterval * (1-SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_SMOOTHER_ALPHA)));
|
||||
|
||||
|
||||
self->commitBatchesMemBytesCount -= currentBatchMemBytesCount;
|
||||
|
@ -1276,7 +1342,7 @@ ACTOR Future<Void> masterProxyServerCore(
|
|||
|
||||
state Future<Sequence> sequenceFuture = (Sequence)0;
|
||||
state PromiseStream< std::pair<vector<CommitTransactionRequest>, int> > batchedCommits;
|
||||
state Future<Void> commitBatcher;
|
||||
state Future<Void> commitBatcherActor;
|
||||
state Future<Void> lastCommitComplete = Void();
|
||||
|
||||
state PromiseStream<Future<Void>> addActor;
|
||||
|
@ -1284,7 +1350,6 @@ ACTOR Future<Void> masterProxyServerCore(
|
|||
state double lastCommit = 0;
|
||||
state std::set<Sequence> txnSequences;
|
||||
state Sequence maxSequence = std::numeric_limits<Sequence>::max();
|
||||
state double commitBatchInterval = SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MIN;
|
||||
|
||||
addActor.send( fetchVersions(&commitData) );
|
||||
addActor.send( waitFailureServer(proxy.waitFailure.getFuture()) );
|
||||
|
@ -1326,7 +1391,8 @@ ACTOR Future<Void> masterProxyServerCore(
|
|||
(int)std::min<double>(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_MAX,
|
||||
std::max<double>(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_MIN,
|
||||
SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_SCALE_BASE * pow(db->get().client.proxies.size(), SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_BYTES_SCALE_POWER)));
|
||||
commitBatcher = batcher(batchedCommits, proxy.commit.getFuture(), SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE, &commitBatchInterval, SERVER_KNOBS->MAX_COMMIT_BATCH_INTERVAL, SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_COUNT_MAX, commitBatchByteLimit, CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT, commitData.commitBatchStartNotifications, &commitData.commitBatchesMemBytesCount, commitBatchesMemoryLimit, TaskProxyCommitBatcher, &commitData.stats.txnCommitIn);
|
||||
|
||||
commitBatcherActor = commitBatcher(&commitData, batchedCommits, proxy.commit.getFuture(), commitBatchByteLimit, commitBatchesMemoryLimit);
|
||||
loop choose{
|
||||
when( Void _ = wait( dbInfoChange ) ) {
|
||||
dbInfoChange = db->onChange();
|
||||
|
@ -1347,7 +1413,7 @@ ACTOR Future<Void> masterProxyServerCore(
|
|||
lastCommit = now();
|
||||
|
||||
if (trs.size() || lastCommitComplete.isReady()) {
|
||||
lastCommitComplete = commitBatch(&commitData, trs, &commitBatchInterval, batchBytes);
|
||||
lastCommitComplete = commitBatch(&commitData, trs, batchBytes);
|
||||
addActor.send(lastCommitComplete);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue