Fixes to ss e-brake, tlog streaming, and their interaction
This commit is contained in:
parent
9730f670e1
commit
8dd7f8f447
|
@ -55,7 +55,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( BUGGIFY_RECOVER_MEMORY_LIMIT, 1e6 );
|
||||
init( BUGGIFY_WORKER_REMOVED_MAX_LAG, 30 );
|
||||
init( UPDATE_STORAGE_BYTE_LIMIT, 1e6 );
|
||||
init( TLOG_PEEK_DELAY, 0.00005 );
|
||||
init( TLOG_PEEK_DELAY, 0.0005 );
|
||||
init( LEGACY_TLOG_UPGRADE_ENTRIES_PER_VERSION, 100 );
|
||||
init( VERSION_MESSAGES_OVERHEAD_FACTOR_1024THS, 1072 ); // Based on a naive interpretation of the gcc version of std::deque, we would expect this to be 16 bytes overhead per 512 bytes data. In practice, it seems to be 24 bytes overhead per 512.
|
||||
init( VERSION_MESSAGES_ENTRY_BYTES_WITH_OVERHEAD, std::ceil(16.0 * VERSION_MESSAGES_OVERHEAD_FACTOR_1024THS / 1024) );
|
||||
|
@ -64,7 +64,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( TLOG_MESSAGE_BLOCK_BYTES, 10e6 );
|
||||
init( TLOG_MESSAGE_BLOCK_OVERHEAD_FACTOR, double(TLOG_MESSAGE_BLOCK_BYTES) / (TLOG_MESSAGE_BLOCK_BYTES - MAX_MESSAGE_SIZE) ); //1.0121466709838096006362758832473
|
||||
init( PEEK_TRACKER_EXPIRATION_TIME, 600 ); if( randomize && BUGGIFY ) PEEK_TRACKER_EXPIRATION_TIME = deterministicRandom()->coinflip() ? 0.1 : 120;
|
||||
init( PEEK_USING_STREAMING, true );
|
||||
init( PEEK_USING_STREAMING, true ); if( randomize && BUGGIFY ) PEEK_USING_STREAMING = false;
|
||||
init( PARALLEL_GET_MORE_REQUESTS, 32 ); if( randomize && BUGGIFY ) PARALLEL_GET_MORE_REQUESTS = 2;
|
||||
init( MULTI_CURSOR_PRE_FETCH_LIMIT, 10 );
|
||||
init( MAX_QUEUE_COMMIT_BYTES, 15e6 ); if( randomize && BUGGIFY ) MAX_QUEUE_COMMIT_BYTES = 5000;
|
||||
|
@ -534,6 +534,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( TARGET_BYTES_PER_STORAGE_SERVER_BATCH, 750e6 ); if( smallStorageTarget ) TARGET_BYTES_PER_STORAGE_SERVER_BATCH = 1500e3;
|
||||
init( SPRING_BYTES_STORAGE_SERVER_BATCH, 100e6 ); if( smallStorageTarget ) SPRING_BYTES_STORAGE_SERVER_BATCH = 150e3;
|
||||
init( STORAGE_HARD_LIMIT_BYTES, 1500e6 ); if( smallStorageTarget ) STORAGE_HARD_LIMIT_BYTES = 4500e3;
|
||||
init( STORAGE_HARD_LIMIT_BYTES_OVERAGE, 5000e3 ); if( smallStorageTarget ) STORAGE_HARD_LIMIT_BYTES_OVERAGE = 100e3; // byte+version overage ensures storage server makes enough progress on freeing up storage queue memory at hard limit by ensuring it advances desiredOldestVersion enough per commit cycle.
|
||||
init( STORAGE_HARD_LIMIT_VERSION_OVERAGE, VERSIONS_PER_SECOND / 4.0 );
|
||||
init( STORAGE_DURABILITY_LAG_HARD_MAX, 2000e6 ); if( smallStorageTarget ) STORAGE_DURABILITY_LAG_HARD_MAX = 100e6;
|
||||
init( STORAGE_DURABILITY_LAG_SOFT_MAX, 250e6 ); if( smallStorageTarget ) STORAGE_DURABILITY_LAG_SOFT_MAX = 10e6;
|
||||
|
||||
|
|
|
@ -471,6 +471,8 @@ public:
|
|||
int64_t TARGET_BYTES_PER_STORAGE_SERVER_BATCH;
|
||||
int64_t SPRING_BYTES_STORAGE_SERVER_BATCH;
|
||||
int64_t STORAGE_HARD_LIMIT_BYTES;
|
||||
int64_t STORAGE_HARD_LIMIT_BYTES_OVERAGE;
|
||||
int64_t STORAGE_HARD_LIMIT_VERSION_OVERAGE;
|
||||
int64_t STORAGE_DURABILITY_LAG_HARD_MAX;
|
||||
int64_t STORAGE_DURABILITY_LAG_SOFT_MAX;
|
||||
|
||||
|
|
|
@ -670,6 +670,9 @@ public:
|
|||
bool debug_inApplyUpdate;
|
||||
double debug_lastValidateTime;
|
||||
|
||||
int64_t lastBytesInputEBrake;
|
||||
Version lastDurableVersionEBrake;
|
||||
|
||||
int maxQueryQueue;
|
||||
int getAndResetMaxQueryQueueSize() {
|
||||
int val = maxQueryQueue;
|
||||
|
@ -869,7 +872,7 @@ public:
|
|||
fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false),
|
||||
instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false),
|
||||
versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), maxQueryQueue(0),
|
||||
transactionTagCounter(ssi.id()), counters(this),
|
||||
lastBytesInputEBrake(0), lastDurableVersionEBrake(0), transactionTagCounter(ssi.id()), counters(this),
|
||||
storageServerSourceTLogIDEventHolder(
|
||||
makeReference<EventCacheHolder>(ssi.id().toString() + "/StorageServerSourceTLogID")) {
|
||||
version.initMetric(LiteralStringRef("StorageServer.Version"), counters.cc.id);
|
||||
|
@ -3675,18 +3678,36 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
|||
try {
|
||||
// If we are disk bound and durableVersion is very old, we need to block updates or we could run out of memory
|
||||
// This is often referred to as the storage server e-brake (emergency brake)
|
||||
state double waitStartT = 0;
|
||||
while (data->queueSize() >= SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES &&
|
||||
data->durableVersion.get() < data->desiredOldestVersion.get()) {
|
||||
if (now() - waitStartT >= 1) {
|
||||
TraceEvent(SevWarn, "StorageServerUpdateLag", data->thisServerID)
|
||||
.detail("Version", data->version.get())
|
||||
.detail("DurableVersion", data->durableVersion.get());
|
||||
waitStartT = now();
|
||||
}
|
||||
|
||||
data->behind = true;
|
||||
wait(delayJittered(.005, TaskPriority::TLogPeekReply));
|
||||
// We allow the storage server to make some progress between e-brake periods, referreed to as "overage", in
|
||||
// order to ensure that it advances desiredOldestVersion enough for updateStorage to make enough progress on
|
||||
// freeing up queue size.
|
||||
state double waitStartT = 0;
|
||||
if (data->queueSize() >= SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES &&
|
||||
data->durableVersion.get() < data->desiredOldestVersion.get() &&
|
||||
((data->desiredOldestVersion.get() - SERVER_KNOBS->STORAGE_HARD_LIMIT_VERSION_OVERAGE >
|
||||
data->lastDurableVersionEBrake) ||
|
||||
(data->counters.bytesInput.getValue() - SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES_OVERAGE >
|
||||
data->lastBytesInputEBrake))) {
|
||||
|
||||
while (data->queueSize() >= SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES &&
|
||||
data->durableVersion.get() < data->desiredOldestVersion.get()) {
|
||||
if (now() - waitStartT >= 1) {
|
||||
TraceEvent(SevWarn, "StorageServerUpdateLag", data->thisServerID)
|
||||
.detail("Version", data->version.get())
|
||||
.detail("DurableVersion", data->durableVersion.get())
|
||||
.detail("DesiredOldestVersion", data->desiredOldestVersion.get())
|
||||
.detail("QueueSize", data->queueSize())
|
||||
.detail("LastBytesInputEBrake", data->lastBytesInputEBrake)
|
||||
.detail("LastDurableVersionEBrake", data->lastDurableVersionEBrake);
|
||||
waitStartT = now();
|
||||
}
|
||||
|
||||
data->behind = true;
|
||||
wait(delayJittered(.005, TaskPriority::TLogPeekReply));
|
||||
}
|
||||
data->lastBytesInputEBrake = data->counters.bytesInput.getValue();
|
||||
data->lastDurableVersionEBrake = data->durableVersion.get();
|
||||
}
|
||||
|
||||
if (g_network->isSimulated() && data->isTss() && g_simulator.tssMode == ISimulator::TSSMode::EnabledAddDelay &&
|
||||
|
|
Loading…
Reference in New Issue