Merge pull request #2697 from etschannen/feature-correctness-fixes

A variety of correctness fixes
This commit is contained in:
A.J. Beamon 2020-02-20 13:32:18 -08:00 committed by GitHub
commit 5586e6f6d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 47 additions and 32 deletions

View File

@ -1118,7 +1118,7 @@ void FlowTransport::removePeerReference(const Endpoint& endpoint, bool isStream)
.detail("Address", endpoint.getPrimaryAddress())
.detail("Token", endpoint.token);
}
if(peer->peerReferences == 0 && peer->reliable.empty() && peer->unsent.empty() && peer->outstandingReplies==0) {
if(peer->peerReferences == 0 && peer->reliable.empty() && peer->unsent.empty() && peer->outstandingReplies==0 && peer->lastDataPacketSentTime < now() - FLOW_KNOBS->CONNECTION_MONITOR_UNREFERENCED_CLOSE_DELAY) {
peer->resetPing.trigger();
}
}

View File

@ -1718,7 +1718,7 @@ ACTOR void doReboot( ISimulator::ProcessInfo *p, ISimulator::KillType kt ) {
TEST( kt == ISimulator::RebootAndDelete ); // Simulated machine rebooted with data and coordination state deletion
TEST( kt == ISimulator::RebootProcessAndDelete ); // Simulated process rebooted with data and coordination state deletion
if( p->rebooting )
if( p->rebooting || !p->isReliable() )
return;
TraceEvent("RebootingProcess").detail("KillType", kt).detail("Address", p->address).detail("ZoneId", p->locality.zoneId()).detail("DataHall", p->locality.dataHallId()).detail("Locality", p->locality.toString()).detail("Failed", p->failed).detail("Excluded", p->excluded).detail("Cleared", p->cleared).backtrace();
p->rebooting = true;

View File

@ -166,7 +166,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
If this value is too small relative to SHARD_MIN_BYTES_PER_KSEC immediate merging work will be generated.
*/
init( STORAGE_METRIC_TIMEOUT, 600.0 ); if( randomize && BUGGIFY ) STORAGE_METRIC_TIMEOUT = deterministicRandom()->coinflip() ? 10.0 : 60.0;
init( STORAGE_METRIC_TIMEOUT, isSimulated ? 60.0 : 600.0 ); if( randomize && BUGGIFY ) STORAGE_METRIC_TIMEOUT = deterministicRandom()->coinflip() ? 10.0 : 30.0;
init( METRIC_DELAY, 0.1 ); if( randomize && BUGGIFY ) METRIC_DELAY = 1.0;
init( ALL_DATA_REMOVED_DELAY, 1.0 );
init( INITIAL_FAILURE_REACTION_DELAY, 30.0 ); if( randomize && BUGGIFY ) INITIAL_FAILURE_REACTION_DELAY = 0.0;
@ -181,8 +181,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( DATA_DISTRIBUTION_LOGGING_INTERVAL, 5.0 );
init( DD_ENABLED_CHECK_DELAY, 1.0 );
init( DD_STALL_CHECK_DELAY, 0.4 ); //Must be larger than 2*MAX_BUGGIFIED_DELAY
init( DD_LOW_BANDWIDTH_DELAY, isSimulated ? 90.0 : 240.0 ); if( randomize && BUGGIFY ) DD_LOW_BANDWIDTH_DELAY = 0; //Because of delayJitter, this should be less than 0.9 * DD_MERGE_COALESCE_DELAY
init( DD_MERGE_COALESCE_DELAY, isSimulated ? 120.0 : 300.0 ); if( randomize && BUGGIFY ) DD_MERGE_COALESCE_DELAY = 0.001;
init( DD_LOW_BANDWIDTH_DELAY, isSimulated ? 15.0 : 240.0 ); if( randomize && BUGGIFY ) DD_LOW_BANDWIDTH_DELAY = 0; //Because of delayJitter, this should be less than 0.9 * DD_MERGE_COALESCE_DELAY
init( DD_MERGE_COALESCE_DELAY, isSimulated ? 30.0 : 300.0 ); if( randomize && BUGGIFY ) DD_MERGE_COALESCE_DELAY = 0.001;
init( STORAGE_METRICS_POLLING_DELAY, 2.0 ); if( randomize && BUGGIFY ) STORAGE_METRICS_POLLING_DELAY = 15.0;
init( STORAGE_METRICS_RANDOM_DELAY, 0.2 );
init( FREE_SPACE_RATIO_CUTOFF, 0.1 );
@ -476,7 +476,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( BEHIND_CHECK_DELAY, 2.0 );
init( BEHIND_CHECK_COUNT, 2 );
init( BEHIND_CHECK_VERSIONS, 5 * VERSIONS_PER_SECOND );
init( WAIT_METRICS_WRONG_SHARD_CHANCE, 0.1 );
init( WAIT_METRICS_WRONG_SHARD_CHANCE, isSimulated ? 1.0 : 0.1 );
//Wait Failure
init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2;

View File

@ -1054,7 +1054,12 @@ ACTOR Future<Void> bufferedGetMore( ILogSystem::BufferedCursor* self, TaskPriori
loop {
wait( allLoaders || delay(SERVER_KNOBS->DESIRED_GET_MORE_DELAY, taskID) );
minVersion = self->end;
for(auto cursor : self->cursors) {
for(int i = 0; i < self->cursors.size(); i++) {
auto cursor = self->cursors[i];
while(cursor->hasMessage()) {
self->cursorMessages[i].push_back(ILogSystem::BufferedCursor::BufferedMessage(cursor->arena(), (!self->withTags || self->collectTags) ? cursor->getMessage() : cursor->getMessageWithTags(), !self->withTags ? VectorRef<Tag>() : cursor->getTags(), cursor->version()));
cursor->nextMessage();
}
minVersion = std::min(minVersion, cursor->version().version);
}
if(minVersion > self->messageVersion.version) {

View File

@ -3320,6 +3320,7 @@ ACTOR Future<Void> waitMetrics( StorageServerMetrics* self, WaitMetricsRequest r
if( timedout ) {
TEST( true ); // ShardWaitMetrics return on timeout
//FIXME: instead of using random chance, send wrong_shard_server when the call in from waitMetricsMultiple (requires additional information in the request)
if(deterministicRandom()->random01() < SERVER_KNOBS->WAIT_METRICS_WRONG_SHARD_CHANCE) {
req.reply.sendError( wrong_shard_server() );
} else {

View File

@ -1383,37 +1383,46 @@ ACTOR Future<UID> createAndLockProcessIdFile(std::string folder) {
state UID processIDUid;
platform::createDirectory(folder);
try {
state std::string lockFilePath = joinPath(folder, "processId");
state ErrorOr<Reference<IAsyncFile>> lockFile = wait(errorOr(IAsyncFileSystem::filesystem(g_network)->open(lockFilePath, IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_LOCK, 0600)));
loop {
try {
state std::string lockFilePath = joinPath(folder, "processId");
state ErrorOr<Reference<IAsyncFile>> lockFile = wait(errorOr(IAsyncFileSystem::filesystem(g_network)->open(lockFilePath, IAsyncFile::OPEN_READWRITE | IAsyncFile::OPEN_LOCK, 0600)));
if (lockFile.isError() && lockFile.getError().code() == error_code_file_not_found && !fileExists(lockFilePath)) {
Reference<IAsyncFile> _lockFile = wait(IAsyncFileSystem::filesystem()->open(lockFilePath, IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_LOCK | IAsyncFile::OPEN_READWRITE, 0600));
lockFile = _lockFile;
processIDUid = deterministicRandom()->randomUniqueID();
BinaryWriter wr(IncludeVersion());
wr << processIDUid;
wait(lockFile.get()->write(wr.getData(), wr.getLength(), 0));
wait(lockFile.get()->sync());
}
else {
if (lockFile.isError()) throw lockFile.getError(); // If we've failed to open the file, throw an exception
if (lockFile.isError() && lockFile.getError().code() == error_code_file_not_found && !fileExists(lockFilePath)) {
Reference<IAsyncFile> _lockFile = wait(IAsyncFileSystem::filesystem()->open(lockFilePath, IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_LOCK | IAsyncFile::OPEN_READWRITE, 0600));
lockFile = _lockFile;
processIDUid = deterministicRandom()->randomUniqueID();
BinaryWriter wr(IncludeVersion());
wr << processIDUid;
wait(lockFile.get()->write(wr.getData(), wr.getLength(), 0));
wait(lockFile.get()->sync());
}
else {
if (lockFile.isError()) throw lockFile.getError(); // If we've failed to open the file, throw an exception
int64_t fileSize = wait(lockFile.get()->size());
state Key fileData = makeString(fileSize);
wait(success(lockFile.get()->read(mutateString(fileData), fileSize, 0)));
processIDUid = BinaryReader::fromStringRef<UID>(fileData, IncludeVersion());
int64_t fileSize = wait(lockFile.get()->size());
state Key fileData = makeString(fileSize);
wait(success(lockFile.get()->read(mutateString(fileData), fileSize, 0)));
processIDUid = BinaryReader::fromStringRef<UID>(fileData, IncludeVersion());
}
return processIDUid;
}
}
catch (Error& e) {
if (e.code() != error_code_actor_cancelled) {
if (!e.isInjectedFault())
catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
}
if (!e.isInjectedFault()) {
fprintf(stderr, "ERROR: error creating or opening process id file `%s'.\n", joinPath(folder, "processId").c_str());
}
TraceEvent(SevError, "OpenProcessIdError").error(e);
if(!g_network->isSimulated()) {
throw;
}
deleteFile(lockFilePath);
}
throw;
}
return processIDUid;
}
ACTOR Future<Void> fdbd(

View File

@ -282,7 +282,7 @@ struct _IncludeVersion {
ar >> v;
if (!v.isValid()) {
auto err = incompatible_protocol_version();
TraceEvent(SevError, "InvalidSerializationVersion").error(err).detailf("Version", "%llx", v);
TraceEvent(SevWarnAlways, "InvalidSerializationVersion").error(err).detailf("Version", "%llx", v);
throw err;
}
if (v > currentProtocolVersion) {