Merge pull request #2746 from etschannen/release-6.2
A variety of small fixes
This commit is contained in:
commit
a5cfc1b038
|
@ -2277,6 +2277,7 @@ public:
|
||||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||||
state std::string statusText;
|
state std::string statusText;
|
||||||
|
state int retries = 0;
|
||||||
|
|
||||||
loop{
|
loop{
|
||||||
try {
|
try {
|
||||||
|
@ -2294,27 +2295,33 @@ public:
|
||||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||||
|
|
||||||
state Future<Optional<Value>> fPaused = tr->get(backupAgent->taskBucket->getPauseKey());
|
state Future<Optional<Value>> fPaused = tr->get(backupAgent->taskBucket->getPauseKey());
|
||||||
|
state Future<Standalone<RangeResultRef>> fErrorValues = errorLimit > 0 ? tr->getRange(backupAgent->errors.get(BinaryWriter::toValue(logUid, Unversioned())).range(), errorLimit, false, true) : Future<Standalone<RangeResultRef>>();
|
||||||
|
state Future<Optional<Value>> fBackupUid = tr->get(backupAgent->states.get(BinaryWriter::toValue(logUid, Unversioned())).pack(DatabaseBackupAgent::keyFolderId));
|
||||||
|
state Future<Optional<Value>> fBackupVerison = tr->get(BinaryWriter::toValue(logUid, Unversioned()).withPrefix(applyMutationsBeginRange.begin));
|
||||||
|
state Future<Optional<Key>> fTagName = tr->get(backupAgent->states.get(BinaryWriter::toValue(logUid, Unversioned())).pack(BackupAgentBase::keyConfigBackupTag));
|
||||||
|
state Future<Optional<Value>> fStopVersionKey = tr->get(backupAgent->states.get(BinaryWriter::toValue(logUid, Unversioned())).pack(BackupAgentBase::keyStateStop));
|
||||||
|
state Future<Optional<Key>> fBackupKeysPacked = tr->get(backupAgent->config.get(BinaryWriter::toValue(logUid, Unversioned())).pack(BackupAgentBase::keyConfigBackupRanges));
|
||||||
|
|
||||||
int backupStateInt = wait(backupAgent->getStateValue(tr, logUid));
|
int backupStateInt = wait(backupAgent->getStateValue(tr, logUid));
|
||||||
state BackupAgentBase::enumState backupState = (BackupAgentBase::enumState)backupStateInt;
|
state BackupAgentBase::enumState backupState = (BackupAgentBase::enumState)backupStateInt;
|
||||||
|
|
||||||
if (backupState == DatabaseBackupAgent::STATE_NEVERRAN) {
|
if (backupState == DatabaseBackupAgent::STATE_NEVERRAN) {
|
||||||
statusText += "No previous backups found.\n";
|
statusText += "No previous backups found.\n";
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
state std::string tagNameDisplay;
|
state std::string tagNameDisplay;
|
||||||
Optional<Key> tagName = wait(tr->get(backupAgent->states.get(BinaryWriter::toValue(logUid, Unversioned())).pack(BackupAgentBase::keyConfigBackupTag)));
|
Optional<Key> tagName = wait(fTagName);
|
||||||
|
|
||||||
// Define the display tag name
|
// Define the display tag name
|
||||||
if (tagName.present()) {
|
if (tagName.present()) {
|
||||||
tagNameDisplay = tagName.get().toString();
|
tagNameDisplay = tagName.get().toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
state Optional<Value> uid = wait(tr->get(backupAgent->config.get(BinaryWriter::toValue(logUid, Unversioned())).pack(BackupAgentBase::keyFolderId)));
|
state Optional<Value> stopVersionKey = wait(fStopVersionKey);
|
||||||
state Optional<Value> stopVersionKey = wait(tr->get(backupAgent->states.get(BinaryWriter::toValue(logUid, Unversioned())).pack(BackupAgentBase::keyStateStop)));
|
|
||||||
|
Optional<Key> backupKeysPacked = wait(fBackupKeysPacked);
|
||||||
|
|
||||||
state Standalone<VectorRef<KeyRangeRef>> backupRanges;
|
state Standalone<VectorRef<KeyRangeRef>> backupRanges;
|
||||||
Optional<Key> backupKeysPacked = wait(tr->get(backupAgent->config.get(BinaryWriter::toValue(logUid, Unversioned())).pack(BackupAgentBase::keyConfigBackupRanges)));
|
|
||||||
|
|
||||||
if (backupKeysPacked.present()) {
|
if (backupKeysPacked.present()) {
|
||||||
BinaryReader br(backupKeysPacked.get(), IncludeVersion());
|
BinaryReader br(backupKeysPacked.get(), IncludeVersion());
|
||||||
br >> backupRanges;
|
br >> backupRanges;
|
||||||
|
@ -2350,7 +2357,7 @@ public:
|
||||||
|
|
||||||
// Append the errors, if requested
|
// Append the errors, if requested
|
||||||
if (errorLimit > 0) {
|
if (errorLimit > 0) {
|
||||||
Standalone<RangeResultRef> values = wait(tr->getRange(backupAgent->errors.get(BinaryWriter::toValue(logUid, Unversioned())).range(), errorLimit, false, true));
|
Standalone<RangeResultRef> values = wait( fErrorValues );
|
||||||
|
|
||||||
// Display the errors, if any
|
// Display the errors, if any
|
||||||
if (values.size() > 0) {
|
if (values.size() > 0) {
|
||||||
|
@ -2367,10 +2374,9 @@ public:
|
||||||
|
|
||||||
|
|
||||||
//calculate time differential
|
//calculate time differential
|
||||||
state Optional<Value> backupUid = wait(tr->get(backupAgent->states.get(BinaryWriter::toValue(logUid, Unversioned())).pack(DatabaseBackupAgent::keyFolderId)));
|
Optional<Value> backupUid = wait(fBackupUid);
|
||||||
if(backupUid.present()) {
|
if(backupUid.present()) {
|
||||||
Optional<Value> v = wait(tr->get(BinaryWriter::toValue(logUid, Unversioned()).withPrefix(applyMutationsBeginRange.begin)));
|
Optional<Value> v = wait(fBackupVerison);
|
||||||
|
|
||||||
if (v.present()) {
|
if (v.present()) {
|
||||||
state Version destApplyBegin = BinaryReader::fromStringRef<Version>(v.get(), Unversioned());
|
state Version destApplyBegin = BinaryReader::fromStringRef<Version>(v.get(), Unversioned());
|
||||||
Version sourceVersion = wait(srcReadVersion);
|
Version sourceVersion = wait(srcReadVersion);
|
||||||
|
@ -2387,6 +2393,11 @@ public:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
catch (Error &e) {
|
catch (Error &e) {
|
||||||
|
retries++;
|
||||||
|
if(retries > 5) {
|
||||||
|
statusText += format("\nWARNING: Could not fetch full DR status: %s\n", e.name());
|
||||||
|
return statusText;
|
||||||
|
}
|
||||||
wait(tr->onError(e));
|
wait(tr->onError(e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3552,6 +3552,7 @@ public:
|
||||||
ACTOR static Future<Void> submitBackup(FileBackupAgent* backupAgent, Reference<ReadYourWritesTransaction> tr, Key outContainer, int snapshotIntervalSeconds, std::string tagName, Standalone<VectorRef<KeyRangeRef>> backupRanges, bool stopWhenDone) {
|
ACTOR static Future<Void> submitBackup(FileBackupAgent* backupAgent, Reference<ReadYourWritesTransaction> tr, Key outContainer, int snapshotIntervalSeconds, std::string tagName, Standalone<VectorRef<KeyRangeRef>> backupRanges, bool stopWhenDone) {
|
||||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||||
|
tr->setOption(FDBTransactionOptions::COMMIT_ON_FIRST_PROXY);
|
||||||
|
|
||||||
TraceEvent(SevInfo, "FBA_SubmitBackup")
|
TraceEvent(SevInfo, "FBA_SubmitBackup")
|
||||||
.detail("TagName", tagName.c_str())
|
.detail("TagName", tagName.c_str())
|
||||||
|
|
|
@ -754,9 +754,9 @@ public:
|
||||||
// Everything actually network related is delegated to the Sim2Net class; Sim2 is only concerned with simulating machines and time
|
// Everything actually network related is delegated to the Sim2Net class; Sim2 is only concerned with simulating machines and time
|
||||||
virtual double now() { return time; }
|
virtual double now() { return time; }
|
||||||
|
|
||||||
// timer() can be up to one second ahead of now()
|
// timer() can be up to 0.1 seconds ahead of now()
|
||||||
virtual double timer() {
|
virtual double timer() {
|
||||||
timerTime += deterministicRandom()->random01()*(time+1.0-timerTime)/2.0;
|
timerTime += deterministicRandom()->random01()*(time+0.1-timerTime)/2.0;
|
||||||
return timerTime;
|
return timerTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -240,7 +240,7 @@ public:
|
||||||
int64_t physicalBytes = getLoadAverage();
|
int64_t physicalBytes = getLoadAverage();
|
||||||
double minAvailableSpaceRatio = getMinAvailableSpaceRatio(includeInFlight);
|
double minAvailableSpaceRatio = getMinAvailableSpaceRatio(includeInFlight);
|
||||||
int64_t inFlightBytes = includeInFlight ? getDataInFlightToTeam() / servers.size() : 0;
|
int64_t inFlightBytes = includeInFlight ? getDataInFlightToTeam() / servers.size() : 0;
|
||||||
double availableSpaceMultiplier = SERVER_KNOBS->FREE_SPACE_RATIO_CUTOFF / ( std::max( std::min( SERVER_KNOBS->FREE_SPACE_RATIO_CUTOFF, minAvailableSpaceRatio ), 0.000001 ) );
|
double availableSpaceMultiplier = SERVER_KNOBS->AVAILABLE_SPACE_RATIO_CUTOFF / ( std::max( std::min( SERVER_KNOBS->AVAILABLE_SPACE_RATIO_CUTOFF, minAvailableSpaceRatio ), 0.000001 ) );
|
||||||
if(servers.size()>2) {
|
if(servers.size()>2) {
|
||||||
//make sure in triple replication the penalty is high enough that you will always avoid a team with a member at 20% free space
|
//make sure in triple replication the penalty is high enough that you will always avoid a team with a member at 20% free space
|
||||||
availableSpaceMultiplier = availableSpaceMultiplier * availableSpaceMultiplier;
|
availableSpaceMultiplier = availableSpaceMultiplier * availableSpaceMultiplier;
|
||||||
|
|
|
@ -184,7 +184,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
|
||||||
init( DD_MERGE_COALESCE_DELAY, isSimulated ? 30.0 : 300.0 ); if( randomize && BUGGIFY ) DD_MERGE_COALESCE_DELAY = 0.001;
|
init( DD_MERGE_COALESCE_DELAY, isSimulated ? 30.0 : 300.0 ); if( randomize && BUGGIFY ) DD_MERGE_COALESCE_DELAY = 0.001;
|
||||||
init( STORAGE_METRICS_POLLING_DELAY, 2.0 ); if( randomize && BUGGIFY ) STORAGE_METRICS_POLLING_DELAY = 15.0;
|
init( STORAGE_METRICS_POLLING_DELAY, 2.0 ); if( randomize && BUGGIFY ) STORAGE_METRICS_POLLING_DELAY = 15.0;
|
||||||
init( STORAGE_METRICS_RANDOM_DELAY, 0.2 );
|
init( STORAGE_METRICS_RANDOM_DELAY, 0.2 );
|
||||||
init( FREE_SPACE_RATIO_CUTOFF, 0.35 );
|
init( AVAILABLE_SPACE_RATIO_CUTOFF, 0.05 );
|
||||||
init( DESIRED_TEAMS_PER_SERVER, 5 ); if( randomize && BUGGIFY ) DESIRED_TEAMS_PER_SERVER = 1;
|
init( DESIRED_TEAMS_PER_SERVER, 5 ); if( randomize && BUGGIFY ) DESIRED_TEAMS_PER_SERVER = 1;
|
||||||
init( MAX_TEAMS_PER_SERVER, 5*DESIRED_TEAMS_PER_SERVER );
|
init( MAX_TEAMS_PER_SERVER, 5*DESIRED_TEAMS_PER_SERVER );
|
||||||
init( DD_SHARD_SIZE_GRANULARITY, 5000000 );
|
init( DD_SHARD_SIZE_GRANULARITY, 5000000 );
|
||||||
|
@ -318,6 +318,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
|
||||||
init( ALWAYS_CAUSAL_READ_RISKY, false );
|
init( ALWAYS_CAUSAL_READ_RISKY, false );
|
||||||
init( MAX_COMMIT_UPDATES, 2000 ); if( randomize && BUGGIFY ) MAX_COMMIT_UPDATES = 1;
|
init( MAX_COMMIT_UPDATES, 2000 ); if( randomize && BUGGIFY ) MAX_COMMIT_UPDATES = 1;
|
||||||
init( MIN_PROXY_COMPUTE, 0.001 );
|
init( MIN_PROXY_COMPUTE, 0.001 );
|
||||||
|
init( MAX_PROXY_COMPUTE, 2.0 );
|
||||||
init( PROXY_COMPUTE_BUCKETS, 20000 );
|
init( PROXY_COMPUTE_BUCKETS, 20000 );
|
||||||
init( PROXY_COMPUTE_GROWTH_RATE, 0.01 );
|
init( PROXY_COMPUTE_GROWTH_RATE, 0.01 );
|
||||||
|
|
||||||
|
|
|
@ -149,8 +149,7 @@ public:
|
||||||
double DD_MERGE_COALESCE_DELAY;
|
double DD_MERGE_COALESCE_DELAY;
|
||||||
double STORAGE_METRICS_POLLING_DELAY;
|
double STORAGE_METRICS_POLLING_DELAY;
|
||||||
double STORAGE_METRICS_RANDOM_DELAY;
|
double STORAGE_METRICS_RANDOM_DELAY;
|
||||||
double FREE_SPACE_RATIO_CUTOFF;
|
double AVAILABLE_SPACE_RATIO_CUTOFF;
|
||||||
double FREE_SPACE_CUTOFF_PENALTY;
|
|
||||||
int DESIRED_TEAMS_PER_SERVER;
|
int DESIRED_TEAMS_PER_SERVER;
|
||||||
int MAX_TEAMS_PER_SERVER;
|
int MAX_TEAMS_PER_SERVER;
|
||||||
int64_t DD_SHARD_SIZE_GRANULARITY;
|
int64_t DD_SHARD_SIZE_GRANULARITY;
|
||||||
|
@ -264,6 +263,7 @@ public:
|
||||||
bool ALWAYS_CAUSAL_READ_RISKY;
|
bool ALWAYS_CAUSAL_READ_RISKY;
|
||||||
int MAX_COMMIT_UPDATES;
|
int MAX_COMMIT_UPDATES;
|
||||||
double MIN_PROXY_COMPUTE;
|
double MIN_PROXY_COMPUTE;
|
||||||
|
double MAX_PROXY_COMPUTE;
|
||||||
int PROXY_COMPUTE_BUCKETS;
|
int PROXY_COMPUTE_BUCKETS;
|
||||||
double PROXY_COMPUTE_GROWTH_RATE;
|
double PROXY_COMPUTE_GROWTH_RATE;
|
||||||
|
|
||||||
|
|
|
@ -534,7 +534,7 @@ ACTOR Future<Void> commitBatch(
|
||||||
/////// Phase 1: Pre-resolution processing (CPU bound except waiting for a version # which is separately pipelined and *should* be available by now (unless empty commit); ordered; currently atomic but could yield)
|
/////// Phase 1: Pre-resolution processing (CPU bound except waiting for a version # which is separately pipelined and *should* be available by now (unless empty commit); ordered; currently atomic but could yield)
|
||||||
TEST(self->latestLocalCommitBatchResolving.get() < localBatchNumber-1); // Queuing pre-resolution commit processing
|
TEST(self->latestLocalCommitBatchResolving.get() < localBatchNumber-1); // Queuing pre-resolution commit processing
|
||||||
wait(self->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber-1));
|
wait(self->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber-1));
|
||||||
state Future<Void> releaseDelay = delay(batchOperations*self->commitComputePerOperation[latencyBucket], TaskPriority::ProxyMasterVersionReply);
|
state Future<Void> releaseDelay = delay(std::min(SERVER_KNOBS->MAX_PROXY_COMPUTE, batchOperations*self->commitComputePerOperation[latencyBucket]), TaskPriority::ProxyMasterVersionReply);
|
||||||
|
|
||||||
if (debugID.present())
|
if (debugID.present())
|
||||||
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GettingCommitVersion");
|
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GettingCommitVersion");
|
||||||
|
|
Loading…
Reference in New Issue