Merge commit 'f773b9460d31d31b7d421860fc647936f31aa1fa'

# Conflicts:
#	tests/fast/SidebandWithStatus.txt
#	tests/rare/LargeApiCorrectnessStatus.txt
#	tests/slow/DDBalanceAndRemoveStatus.txt
This commit is contained in:
Evan Tschannen 2018-03-09 14:47:03 -08:00
commit 91bb8faa45
14 changed files with 104 additions and 56 deletions

View File

@ -564,7 +564,7 @@ public:
// Force is required if there is not a restorable snapshot which both
// - begins at or after expireEndVersion
// - ends at or before restorableBeginVersion
bool forceNeeded = true;
state bool forceNeeded = true;
for(KeyspaceSnapshotFile &s : desc.snapshots) {
if(s.restorable.orDefault(false) && s.beginVersion >= expireEndVersion && s.endVersion <= restorableBeginVersion) {
forceNeeded = false;
@ -572,9 +572,6 @@ public:
}
}
if(forceNeeded && !force)
throw backup_cannot_expire();
// Get metadata
state Optional<Version> expiredEnd;
state Optional<Version> logBegin;
@ -604,52 +601,94 @@ public:
newLogBeginVersion = expireEndVersion;
}
else {
// If the last log overlaps the expiredEnd then use the log's begin version
// If the last log overlaps the expiredEnd then use the log's begin version and move the expiredEnd
// back to match it.
if(last.endVersion > expireEndVersion) {
newLogBeginVersion = last.beginVersion;
logs.pop_back();
expireEndVersion = newLogBeginVersion.get();
}
}
}
// If we have a new log begin version then potentially update the property but definitely set
// expireEndVersion to the new log begin because we will only be deleting files up to but not
// including that version.
if(newLogBeginVersion.present()) {
expireEndVersion = newLogBeginVersion.get();
// If the new version is greater than the existing one or the
// existing one is not present then write the new value
if(logBegin.orDefault(0) < newLogBeginVersion.get()) {
Void _ = wait(bc->logBeginVersion().set(newLogBeginVersion.get()));
}
}
else {
// Otherwise, if the old logBeginVersion is present and older than expireEndVersion then clear it because
// it refers to a version in a range we're about to delete and apparently continuity through
// expireEndVersion is broken.
if(logBegin.present() && logBegin.get() < expireEndVersion)
Void _ = wait(bc->logBeginVersion().clear());
}
// Delete files
state std::vector<Future<Void>> deletes;
// Make a list of files to delete
state std::vector<std::string> toDelete;
// Move filenames out of vector then destroy it to save memory
for(auto const &f : logs) {
deletes.push_back(bc->deleteFile(f.fileName));
toDelete.push_back(std::move(f.fileName));
}
logs.clear();
// Move filenames out of vector then destroy it to save memory
for(auto const &f : ranges) {
// Must recheck version because list returns data up to and including the given endVersion
if(f.version < expireEndVersion)
deletes.push_back(bc->deleteFile(f.fileName));
toDelete.push_back(std::move(f.fileName));
}
ranges.clear();
for(auto const &f : desc.snapshots) {
if(f.endVersion < expireEndVersion)
deletes.push_back(bc->deleteFile(f.fileName));
toDelete.push_back(std::move(f.fileName));
}
desc = BackupDescription();
// If some files to delete were found AND force is needed AND the force option is NOT set, then fail
if(!toDelete.empty() && forceNeeded && !force)
throw backup_cannot_expire();
// We are about to start deleting files, at which point no data prior to the expire end version can be
// safely assumed to exist. The [logBegin, logEnd) range from the container's metadata describes
// a range of log versions which can be assumed to exist, so if the range of data being deleted overlaps
// that range then the metadata range must be updated.
// If we're expiring the entire log range described by the metadata then clear both metadata values
if(logEnd.present() && logEnd.get() < expireEndVersion) {
if(logBegin.present())
Void _ = wait(bc->logBeginVersion().clear());
if(logEnd.present())
Void _ = wait(bc->logEndVersion().clear());
}
else {
// If we are expiring to a point within the metadata range then update the begin if we have a new
// log begin version (which we should!) or clear the metadata range if we do not (which would be
// repairing the metadata from an incorrect state)
if(logBegin.present() && logBegin.get() < expireEndVersion) {
if(newLogBeginVersion.present()) {
Void _ = wait(bc->logBeginVersion().set(newLogBeginVersion.get()));
}
else {
if(logBegin.present())
Void _ = wait(bc->logBeginVersion().clear());
if(logEnd.present())
Void _ = wait(bc->logEndVersion().clear());
}
}
}
Void _ = wait(waitForAll(deletes));
// Delete files, but limit parallelism because the file list could use a lot of memory and the corresponding
// delete actor states would use even more if they all existed at the same time.
state std::list<Future<Void>> deleteFutures;
while(!toDelete.empty() || !deleteFutures.empty()) {
// While there are files to delete and budget in the deleteFutures list, start a delete
while(!toDelete.empty() && deleteFutures.size() < CLIENT_KNOBS->BACKUP_CONCURRENT_DELETES) {
deleteFutures.push_back(bc->deleteFile(toDelete.back()));
toDelete.pop_back();
}
// Wait for deletes to finish until there are only targetDeletesInFlight remaining.
// If there are no files left to start then this value is 0, otherwise it is one less
// than the delete concurrency limit.
state int targetFuturesSize = toDelete.empty() ? 0 : (CLIENT_KNOBS->BACKUP_CONCURRENT_DELETES - 1);
while(deleteFutures.size() > targetFuturesSize) {
Void _ = wait(deleteFutures.front());
deleteFutures.pop_front();
}
}
// Update the expiredEndVersion property.
Void _ = wait(bc->expiredEndVersion().set(expireEndVersion));

View File

@ -94,6 +94,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( TASKBUCKET_MAX_TASK_KEYS, 1000 ); if( randomize && BUGGIFY ) TASKBUCKET_MAX_TASK_KEYS = 20;
//Backup
init( BACKUP_CONCURRENT_DELETES, 100 );
init( BACKUP_SIMULATED_LIMIT_BYTES, 1e6 ); if( randomize && BUGGIFY ) BACKUP_SIMULATED_LIMIT_BYTES = 1000;
init( BACKUP_GET_RANGE_LIMIT_BYTES, 1e6 );
init( BACKUP_LOCK_BYTES, 1e8 );

View File

@ -96,6 +96,7 @@ public:
int TASKBUCKET_MAX_TASK_KEYS;
// Backup
int BACKUP_CONCURRENT_DELETES;
int BACKUP_SIMULATED_LIMIT_BYTES;
int BACKUP_GET_RANGE_LIMIT_BYTES;
int BACKUP_LOCK_BYTES;

View File

@ -83,7 +83,7 @@ public:
TraceEvent(notFound ? SevWarn : SevWarnAlways, "FileOpenError").error(e).GetLastError().detail("File", filename).detail("Flags", flags).detail("Mode", mode);
throw e;
}
TraceEvent("AsyncFileOpened").detail("Filename", filename).detail("fd", r->result).detail("Flags", flags);
TraceEvent("AsyncFileOpened").detail("Filename", filename).detail("fd", r->result).detail("Flags", flags).suppressFor(1.0);
if ((flags & OPEN_LOCK) && !lock_fd(r->result)) {
TraceEvent(SevError, "UnableToLockFile").detail("filename", filename).GetLastError();
@ -264,7 +264,7 @@ private:
state eio_req* r = eio_close(fd, 0, eio_callback, &p);
Void _ = wait( p.getFuture() );
if (r->result) error( "CloseError", fd, r );
TraceEvent("AsyncFileClosed").detail("fd", fd);
TraceEvent("AsyncFileClosed").detail("fd", fd).suppressFor(1.0);
}
ACTOR static Future<int> read_impl( int fd, void* data, int length, int64_t offset ) {

View File

@ -161,7 +161,7 @@ public:
};
bool workerAvailable( WorkerInfo const& worker, bool checkStable ) {
return IFailureMonitor::failureMonitor().getState(worker.interf.storage.getEndpoint()).isAvailable() && ( !checkStable || worker.reboots < 2 );
return ( now() - startTime < 2 * FLOW_KNOBS->SERVER_REQUEST_INTERVAL ) || ( IFailureMonitor::failureMonitor().getState(worker.interf.storage.getEndpoint()).isAvailable() && ( !checkStable || worker.reboots < 2 ) );
}
std::pair<WorkerInterface, ProcessClass> getStorageWorker( RecruitStorageRequest const& req ) {

View File

@ -623,7 +623,6 @@ struct DDTeamCollection {
}
if( foundExact || (req.wantsTrueBest && bestOption.present() ) ) {
TraceEvent("getTeam").detail("wantsVariety", req.wantsNewServers).detail("bestOption", bestOption.get()->getDesc());
ASSERT( bestOption.present() );
req.reply.send( bestOption );
return Void();

View File

@ -917,7 +917,7 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
//FIXME: do not add data in flight to servers that were already in the src.
destination.addDataInFlightToTeam(+metrics.bytes);
TraceEvent("RelocateShardHasDestination", masterId)
TraceEvent(relocateShardInterval.severity, "RelocateShardHasDestination", masterId)
.detail("PairId", relocateShardInterval.pairID)
.detail("DestinationTeam", destination.getDesc());

View File

@ -82,6 +82,10 @@ ACTOR Future<Void> tryBecomeLeaderInternal( ServerCoordinators coordinators, Val
state bool iAmLeader = false;
state UID prevChangeID;
if( asyncProcessClass->get().machineClassFitness(ProcessClass::ClusterController) > ProcessClass::UnsetFit || asyncIsExcluded->get() ) {
Void _ = wait( delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) );
}
nominees->set( vector<Optional<LeaderInfo>>( coordinators.clientLeaderServers.size() ) );
myInfo.serializedInfo = proposedSerializedInterface;

View File

@ -792,8 +792,21 @@ ACTOR static Future<StatusObject> processStatusFetcher(
if (programStarts.count(address)) {
auto const& psxml = programStarts.at(address);
int64_t memLimit = parseInt64(extractAttribute(psxml, "MemoryLimit"));
memoryObj["limit_bytes"] = memLimit;
if(psxml.size() > 0) {
int64_t memLimit = parseInt64(extractAttribute(psxml, "MemoryLimit"));
memoryObj["limit_bytes"] = memLimit;
std::string version;
if (tryExtractAttribute(psxml, LiteralStringRef("Version"), version)) {
statusObj["version"] = version;
}
std::string commandLine;
if (tryExtractAttribute(psxml, LiteralStringRef("CommandLine"), commandLine)) {
statusObj["command_line"] = commandLine;
}
}
}
// if this process address is in the machine metrics
@ -813,9 +826,10 @@ ACTOR static Future<StatusObject> processStatusFetcher(
StatusArray messages;
if (errors.count(address) && errors[address].size())
if (errors.count(address) && errors[address].size()) {
// returns status object with type and time of error
messages.push_back(getError(errors.at(address)));
}
// string of address used so that other fields of a NetworkAddress are not compared
std::string strAddress = address.toString();
@ -840,18 +854,6 @@ ACTOR static Future<StatusObject> processStatusFetcher(
// Get roles for the worker's address as an array of objects
statusObj["roles"] = roles.getStatusForAddress(address);
if (programStarts.count(address)) {
auto const& psxml = programStarts.at(address);
std::string version;
if (tryExtractAttribute(psxml, LiteralStringRef("Version"), version))
statusObj["version"] = version;
std::string commandLine;
if (tryExtractAttribute(psxml, LiteralStringRef("CommandLine"), commandLine))
statusObj["command_line"] = commandLine;
}
if (configuration.present()){
statusObj["excluded"] = configuration.get().isExcludedServer(address);
}

View File

@ -838,7 +838,7 @@ struct ConsistencyCheckWorkload : TestWorkload
else if(!isRelocating)
{
TraceEvent("ConsistencyCheck_StorageServerUnavailable").detail("StorageServer", storageServers[j]).detail("ShardBegin", printable(range.begin)).detail("ShardEnd", printable(range.end))
.detail("Address", storageServerInterfaces[j].address()).detail("GetKeyValuesToken", storageServerInterfaces[j].getKeyValues.getEndpoint().token);
.detail("Address", storageServerInterfaces[j].address()).detail("GetKeyValuesToken", storageServerInterfaces[j].getKeyValues.getEndpoint().token).suppressFor(1.0);
//All shards should be available in quiscence
if(self->performQuiescentChecks)
@ -978,7 +978,9 @@ struct ConsistencyCheckWorkload : TestWorkload
}
}
TraceEvent("ConsistencyCheck_ReadRange").detail("range", printable(range)).detail("bytesRead", bytesReadInRange);
if(bytesReadInRange > 0) {
TraceEvent("ConsistencyCheck_ReadRange").detail("range", printable(range)).detail("bytesRead", bytesReadInRange);
}
}
//SOMEDAY: when background data distribution is implemented, include this test

View File

@ -32,7 +32,7 @@
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
<Product Name='$(var.Title)'
Id='{61C46988-7589-4B8A-9BB9-D850FD5B8B05}'
Id='{4B030686-EEAE-40E1-B69E-1394537456B2}'
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
Version='$(var.Version)'
Manufacturer='$(var.Manufacturer)'

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long