The storage server would block the main thread when processing a single version with a large amount of data
This commit is contained in:
parent
e36b7cd417
commit
0613a34845
|
@ -42,6 +42,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( BUGGIFY_TLOG_STORAGE_MIN_UPDATE_INTERVAL, 30 );
|
||||
init( UNFLUSHED_DATA_RATIO, 0.05 ); if( randomize && BUGGIFY ) UNFLUSHED_DATA_RATIO = 0.0;
|
||||
init( DESIRED_TOTAL_BYTES, 150000 ); if( randomize && BUGGIFY ) DESIRED_TOTAL_BYTES = 10000;
|
||||
init( DESIRED_UPDATE_BYTES, 2*DESIRED_TOTAL_BYTES );
|
||||
init( UPDATE_DELAY, 0.001 );
|
||||
init( MAXIMUM_PEEK_BYTES, 10e6 );
|
||||
init( APPLY_MUTATION_BYTES, 1e6 );
|
||||
init( RECOVERY_DATA_BYTE_LIMIT, 100000 );
|
||||
|
|
|
@ -46,6 +46,8 @@ public:
|
|||
double BUGGIFY_TLOG_STORAGE_MIN_UPDATE_INTERVAL;
|
||||
double UNFLUSHED_DATA_RATIO;
|
||||
int DESIRED_TOTAL_BYTES;
|
||||
int DESIRED_UPDATE_BYTES;
|
||||
double UPDATE_DELAY;
|
||||
int MAXIMUM_PEEK_BYTES;
|
||||
int APPLY_MUTATION_BYTES;
|
||||
int RECOVERY_DATA_BYTE_LIMIT;
|
||||
|
|
|
@ -1824,11 +1824,18 @@ ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
|
|||
++data->counters.fetchWaitingCount;
|
||||
data->counters.fetchWaitingMS += 1000*(executeStart - startt);
|
||||
|
||||
Void _ = wait(delay(0));
|
||||
// Fetch keys gets called while the update actor is processing mutations. data->version will not be updated until all mutations for a version
|
||||
// have been processed. We need to take the durableVersionLock to ensure data->version is greater than the version of the mutation which caused
|
||||
// the fetch to be initiated.
|
||||
Void _ = wait( data->durableVersionLock.take() );
|
||||
|
||||
shard->phase = AddingShard::Fetching;
|
||||
state Version fetchVersion = data->version.get();
|
||||
|
||||
data->durableVersionLock.release();
|
||||
|
||||
Void _ = wait(delay(0));
|
||||
|
||||
TraceEvent(SevDebug, "FetchKeysUnblocked", data->thisServerID).detail("FKID", interval.pairID).detail("Version", fetchVersion);
|
||||
|
||||
// Get the history
|
||||
|
@ -2267,6 +2274,7 @@ bool containsRollback( VersionUpdateRef const& changes, Version& rollbackVersion
|
|||
|
||||
class StorageUpdater {
|
||||
public:
|
||||
StorageUpdater() : fromVersion(invalidVersion), currentVersion(invalidVersion), restoredVersion(invalidVersion), processedStartKey(false) {}
|
||||
StorageUpdater(Version fromVersion, Version restoredVersion) : fromVersion(fromVersion), currentVersion(fromVersion), restoredVersion(restoredVersion), processedStartKey(false) {}
|
||||
|
||||
void applyMutation(StorageServer* data, MutationRef const& m, Version ver) {
|
||||
|
@ -2471,26 +2479,43 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
|
|||
data->updateEagerReads = &eager;
|
||||
data->debug_inApplyUpdate = true;
|
||||
|
||||
StorageUpdater updater(data->lastVersionWithData, data->restoredVersion);
|
||||
state StorageUpdater updater(data->lastVersionWithData, data->restoredVersion);
|
||||
|
||||
if (EXPENSIVE_VALIDATION) data->data().atLatest().validate();
|
||||
validate(data);
|
||||
|
||||
state bool injectedChanges = false;
|
||||
for(auto& c : fii.changes) {
|
||||
for(auto& m : c.mutations) {
|
||||
updater.applyMutation(data, m, c.version);
|
||||
state int changeNum = 0;
|
||||
state int mutationBytes = 0;
|
||||
for(; changeNum < fii.changes.size(); changeNum++) {
|
||||
state int mutationNum = 0;
|
||||
state VerUpdateRef* pUpdate = &fii.changes[changeNum];
|
||||
for(; mutationNum < pUpdate->mutations.size(); mutationNum++) {
|
||||
updater.applyMutation(data, pUpdate->mutations[mutationNum], pUpdate->version);
|
||||
mutationBytes += pUpdate->mutations[mutationNum].totalSize();
|
||||
injectedChanges = true;
|
||||
if(mutationBytes > SERVER_KNOBS->DESIRED_UPDATE_BYTES) {
|
||||
mutationBytes = 0;
|
||||
Void _ = wait(delay(SERVER_KNOBS->UPDATE_DELAY));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Version ver = invalidVersion;
|
||||
state Version ver = invalidVersion;
|
||||
cloneCursor2->setProtocolVersion(data->logProtocol);
|
||||
//TraceEvent("SSUpdatePeeked", data->thisServerID).detail("FromEpoch", data->updateEpoch).detail("FromSeq", data->updateSequence).detail("ToEpoch", results.end_epoch).detail("ToSeq", results.end_seq).detail("MsgSize", results.messages.size());
|
||||
for (;cloneCursor2->hasMessage(); cloneCursor2->nextMessage()) {
|
||||
auto &rd = *cloneCursor2->reader();
|
||||
if(mutationBytes > SERVER_KNOBS->DESIRED_UPDATE_BYTES) {
|
||||
mutationBytes = 0;
|
||||
//Instead of just yielding, leave time for the storage server to respond to reads
|
||||
Void _ = wait(delay(SERVER_KNOBS->UPDATE_DELAY));
|
||||
}
|
||||
|
||||
if (cloneCursor2->version().version > ver) ASSERT(cloneCursor2->version().version > data->version.get());
|
||||
if (cloneCursor2->version().version > ver) {
|
||||
ASSERT(cloneCursor2->version().version > data->version.get());
|
||||
}
|
||||
|
||||
auto &rd = *cloneCursor2->reader();
|
||||
|
||||
if (cloneCursor2->version().version > ver && cloneCursor2->version().version > data->version.get()) {
|
||||
++data->counters.updateVersions;
|
||||
|
@ -2514,7 +2539,7 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
|
|||
TraceEvent("SSPeekMutation", data->thisServerID).detail("Mutation", msg.toString()).detail("Version", cloneCursor2->version().toString());
|
||||
|
||||
updater.applyMutation(data, msg, ver);
|
||||
|
||||
mutationBytes += msg.totalSize();
|
||||
data->counters.mutationBytes += msg.totalSize();
|
||||
++data->counters.mutations;
|
||||
switch(msg.type) {
|
||||
|
@ -2645,17 +2670,20 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
|
|||
debug_advanceMinCommittedVersion( data->thisServerID, newOldestVersion );
|
||||
|
||||
// Taking and releasing the durableVersionLock ensures that no eager reads both begin before the commit was effective and
|
||||
// are applied after we change the durable version.
|
||||
// are applied after we change the durable version. Also ensure that we have to lock while calling changeDurableVersion,
|
||||
// because otherwise the latest version of mutableData might be partially loaded.
|
||||
Void _ = wait( data->durableVersionLock.take() );
|
||||
data->durableVersionLock.release();
|
||||
|
||||
Void _ = wait( delay(0, TaskUpdateStorage) );
|
||||
|
||||
data->popVersion( data->durableVersion.get() + 1 );
|
||||
|
||||
while (!changeDurableVersion( data, newOldestVersion )) {
|
||||
Void _ = wait( yield(TaskUpdateStorage) );
|
||||
if(g_network->check_yield(TaskUpdateStorage)) {
|
||||
data->durableVersionLock.release();
|
||||
Void _ = wait(delay(0, TaskUpdateStorage));
|
||||
Void _ = wait( data->durableVersionLock.take() );
|
||||
}
|
||||
}
|
||||
|
||||
data->durableVersionLock.release();
|
||||
|
||||
//TraceEvent("StorageServerDurable", data->thisServerID).detail("Version", newOldestVersion);
|
||||
|
||||
|
|
Loading…
Reference in New Issue