Merge branch 'master' into feature-remote-logs

# Conflicts:
#	fdbserver/DBCoreState.h
#	fdbserver/LogSystem.h
#	fdbserver/LogSystemPeekCursor.actor.cpp
#	fdbserver/TLogServer.actor.cpp
This commit is contained in:
Evan Tschannen 2018-01-14 13:40:24 -08:00
commit 21482a45e1
23 changed files with 151 additions and 108 deletions

View File

@ -2302,22 +2302,6 @@ namespace fileBackup {
return Void();
tr->reset();
} catch(Error &e) {
TraceEvent(SevWarn, "FileRestoreErrorRangeWrite")
.detail("RestoreUID", restore.getUid())
.detail("FileName", rangeFile.fileName)
.detail("FileVersion", rangeFile.version)
.detail("FileSize", rangeFile.fileSize)
.detail("ReadOffset", readOffset)
.detail("ReadLen", readLen)
.detail("BeginRange", printable(trRange.begin))
.detail("EndRange", printable(trRange.end))
.detail("StartIndex", start)
.detail("EndIndex", i)
.detail("DataSize", data.size())
.detail("Bytes", txBytes)
.error(e)
.detail("TaskInstance", (uint64_t)this);
if(e.code() == error_code_transaction_too_large)
dataSizeLimit /= 2;
else
@ -2477,21 +2461,6 @@ namespace fileBackup {
start = i;
tr->reset();
} catch(Error &e) {
TraceEvent(SevWarn, "FileRestoreErrorLogWrite")
.detail("RestoreUID", restore.getUid())
.detail("FileName", logFile.fileName)
.detail("FileBeginVersion", logFile.version)
.detail("FileEndVersion", logFile.endVersion)
.detail("FileSize", logFile.fileSize)
.detail("ReadOffset", readOffset)
.detail("ReadLen", readLen)
.detail("StartIndex", start)
.detail("EndIndex", i)
.detail("DataSize", data.size())
.detail("Bytes", txBytes)
.error(e)
.detail("TaskInstance", (uint64_t)this);
if(e.code() == error_code_transaction_too_large)
dataSizeLimit /= 2;
else
@ -2593,7 +2562,8 @@ namespace fileBackup {
state std::string beginFile = Params.beginFile().getOrDefault(task);
// Get a batch of files. We're targeting batchSize blocks being dispatched so query for batchSize files (each of which is 0 or more blocks).
state RestoreConfig::FileSetT::Values files = wait(restore.fileSet().getRange(tr, {beginVersion, beginFile}, {}, CLIENT_KNOBS->RESTORE_DISPATCH_ADDTASK_SIZE));
state int taskBatchSize = BUGGIFY ? 1 : CLIENT_KNOBS->RESTORE_DISPATCH_ADDTASK_SIZE;
state RestoreConfig::FileSetT::Values files = wait(restore.fileSet().getRange(tr, {beginVersion, beginFile}, {}, taskBatchSize));
// allPartsDone will be set once all block tasks in the current batch are finished.
state Reference<TaskFuture> allPartsDone;
@ -2705,7 +2675,7 @@ namespace fileBackup {
// For each block of the file
for(; j < f.fileSize; j += f.blockSize) {
// Stop if we've reached the addtask limit
if(blocksDispatched == CLIENT_KNOBS->RESTORE_DISPATCH_ADDTASK_SIZE)
if(blocksDispatched == taskBatchSize)
break;
if(f.isRange) {
@ -2726,7 +2696,7 @@ namespace fileBackup {
}
// Stop if we've reached the addtask limit
if(blocksDispatched == CLIENT_KNOBS->RESTORE_DISPATCH_ADDTASK_SIZE)
if(blocksDispatched == taskBatchSize)
break;
// We just completed an entire file so the next task should start at the file after this one within endVersion (or later)

View File

@ -62,8 +62,8 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( LOCATION_CACHE_EVICTION_SIZE_SIM, 10 ); if( randomize && BUGGIFY ) LOCATION_CACHE_EVICTION_SIZE_SIM = 3;
init( GET_RANGE_SHARD_LIMIT, 2 );
init( WARM_RANGE_SHARD_LIMIT, 10000 );
init( STORAGE_METRICS_SHARD_LIMIT, 10000 ); if( randomize && BUGGIFY ) STORAGE_METRICS_SHARD_LIMIT = 3;
init( WARM_RANGE_SHARD_LIMIT, 100 );
init( STORAGE_METRICS_SHARD_LIMIT, 100 ); if( randomize && BUGGIFY ) STORAGE_METRICS_SHARD_LIMIT = 3;
init( STORAGE_METRICS_UNFAIR_SPLIT_LIMIT, 2.0/3.0 );
init( STORAGE_METRICS_TOO_MANY_SHARDS_DELAY, 15.0 );
@ -119,7 +119,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( SIM_BACKUP_TASKS_PER_AGENT, 10 );
init( BACKUP_RANGEFILE_BLOCK_SIZE, 1024 * 1024);
init( BACKUP_LOGFILE_BLOCK_SIZE, 1024 * 1024);
init( RESTORE_DISPATCH_ADDTASK_SIZE, 1000 ); if( randomize && BUGGIFY ) RESTORE_DISPATCH_ADDTASK_SIZE = 1;
init( RESTORE_DISPATCH_ADDTASK_SIZE, 1000 );
init( RESTORE_DISPATCH_BATCH_SIZE, 30000 ); if( randomize && BUGGIFY ) RESTORE_DISPATCH_BATCH_SIZE = 1;
init( RESTORE_WRITE_TX_SIZE, 256 * 1024 );
init( APPLY_MAX_LOCK_BYTES, 1e9 );

View File

@ -56,6 +56,7 @@ struct MasterProxyInterface {
getConsistentReadVersion.getEndpoint(TaskProxyGetConsistentReadVersion);
getRawCommittedVersion.getEndpoint(TaskProxyGetRawCommittedVersion);
commit.getEndpoint(TaskProxyCommit);
getKeyServersLocations.getEndpoint(TaskProxyGetKeyServersLocations);
}
};

View File

@ -1076,7 +1076,7 @@ ACTOR Future< pair<KeyRange,Reference<LocationInfo>> > getKeyLocation( Database
loop {
choose {
when ( Void _ = wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetKeyServerLocationsReply rep = wait( loadBalance( cx->getMasterProxies(), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(key, Optional<KeyRef>(), 1000, isBackward, key.arena()), TaskDefaultPromiseEndpoint ) ) ) {
when ( GetKeyServerLocationsReply rep = wait( loadBalance( cx->getMasterProxies(), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(key, Optional<KeyRef>(), 100, isBackward, key.arena()), TaskDefaultPromiseEndpoint ) ) ) {
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.After");
ASSERT( rep.results.size() == 1 );
@ -1117,13 +1117,30 @@ ACTOR Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLoca
ACTOR Future<Void> warmRange_impl( Transaction *self, Database cx, KeyRange keys ) {
state int totalRanges = 0;
state int totalRequests = 0;
loop {
vector<pair<KeyRange, Reference<LocationInfo>>> locations = wait(getKeyRangeLocations(cx, keys, CLIENT_KNOBS->WARM_RANGE_SHARD_LIMIT, false, self->info));
totalRanges += CLIENT_KNOBS->WARM_RANGE_SHARD_LIMIT;
totalRequests++;
if(locations.size() == 0 || totalRanges >= cx->locationCacheSize || locations[locations.size()-1].first.end >= keys.end)
break;
keys = KeyRangeRef(locations[locations.size()-1].first.end, keys.end);
if(totalRequests%20 == 0) {
//To avoid blocking the proxies from starting other transactions, occasionally get a read version.
state Transaction tr(cx);
loop {
try {
tr.setOption( FDBTransactionOptions::LOCK_AWARE );
tr.setOption( FDBTransactionOptions::CAUSAL_READ_RISKY );
Version _ = wait( tr.getReadVersion() );
break;
} catch( Error &e ) {
Void _ = wait( tr.onError(e) );
}
}
}
}
return Void();

View File

@ -38,6 +38,7 @@
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">false</EnableCompile>
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Release|X64'">false</EnableCompile>
</ActorCompiler>
<ClInclude Include="KeyBackedTypes.h" />
<ClInclude Include="MetricLogger.h" />
<ActorCompiler Include="MetricLogger.actor.cpp" />
<ClInclude Include="FailureMonitorClient.h" />

View File

@ -534,7 +534,9 @@ ACTOR Future<Void> listBucketStream_impl(Reference<BlobStoreEndpoint> bstore, st
state FlowLock::Releaser listReleaser(bstore->concurrentLists, 1);
HTTP::Headers headers;
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", resource + HTTP::urlEncode(lastFile), headers, NULL, 0, {200}));
state std::string fullResource = resource + HTTP::urlEncode(lastFile);
lastFile.clear();
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", fullResource, headers, NULL, 0, {200}));
listReleaser.release();
try {
@ -590,23 +592,32 @@ ACTOR Future<Void> listBucketStream_impl(Reference<BlobStoreEndpoint> bstore, st
std::string p;
objectDoc.get("Prefix", p);
// If recursing, queue a sub-request, otherwise add the common prefix to the result.
if(maxDepth > 0)
if(maxDepth > 0) {
subLists.push_back(bstore->listBucketStream(bucket, results, p, delimiter, maxDepth - 1));
if(more)
lastFile = std::move(p);
}
else
result.commonPrefixes.push_back(p);
result.commonPrefixes.push_back(std::move(p));
}
}
results.send(result);
if(more) {
if(!result.objects.empty())
// lastFile will be the last commonprefix for which a sublist was started, if any
if(!result.objects.empty() && lastFile < result.objects.back().name)
lastFile = result.objects.back().name;
if(!result.commonPrefixes.empty() && lastFile < result.commonPrefixes.back())
lastFile = result.commonPrefixes.back();
}
results.send(result);
if(lastFile.empty()) {
TraceEvent(SevWarn, "BlobStoreEndpointListNoNextMarker").detail("Resource", fullResource).suppressFor(60, true);
throw backup_error();
}
}
} catch(Error &e) {
TraceEvent(SevWarn, "BlobStoreEndpointListResultParseError").detail("Resource", resource + HTTP::urlEncode(lastFile)).suppressFor(60, true);
TraceEvent(SevWarn, "BlobStoreEndpointListResultParseError").detail("Resource", fullResource).error(e).suppressFor(60, true);
throw http_bad_response();
}
}

View File

@ -222,7 +222,7 @@ struct Peer : NonCopyable {
ReliablePacketList reliable;
AsyncTrigger dataToSend; // Triggered when unsent.empty() becomes false
Future<Void> connect;
AsyncVar<bool> incompatibleDataRead;
AsyncTrigger incompatibleDataRead;
bool compatible;
bool outgoingConnectionIdle; // We don't actually have a connection open and aren't trying to open one because we don't have anything to send
double lastConnectTime;
@ -313,11 +313,10 @@ struct Peer : NonCopyable {
state ReplyPromise<Void> reply;
FlowTransport::transport().sendUnreliable( SerializeSource<ReplyPromise<Void>>(reply), remotePing.getEndpoint() );
peer->incompatibleDataRead.set(false);
choose {
when (Void _ = wait( delay( FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT ) )) { TraceEvent("ConnectionTimeout").detail("WithAddr", peer->destination); throw connection_failed(); }
when (Void _ = wait( reply.getFuture() )) {}
when (Void _ = wait( peer->incompatibleDataRead.onChange())) {}
when (Void _ = wait( peer->incompatibleDataRead.onTrigger())) {}
}
}
}
@ -673,7 +672,7 @@ ACTOR static Future<Void> connectionReader(
}
else if(!expectConnectPacket) {
unprocessed_begin = unprocessed_end;
peer->incompatibleDataRead.set(true);
peer->incompatibleDataRead.trigger();
}
if (readWillBlock)

View File

@ -37,10 +37,10 @@
<ActorCompiler Include="sim2.actor.cpp" />
<ClCompile Include="Net2FileSystem.cpp" />
<ClCompile Include="QueueModel.cpp" />
<ClCompile Include="Replication.cpp" />
<ClCompile Include="ReplicationUtils.cpp" />
<ClCompile Include="ReplicationTypes.cpp" />
<ClCompile Include="ReplicationPolicy.cpp" />
<ClCompile Include="Replication.cpp" />
<ClCompile Include="ReplicationUtils.cpp" />
<ClCompile Include="ReplicationTypes.cpp" />
<ClCompile Include="ReplicationPolicy.cpp" />
<ClCompile Include="sim_validation.cpp" />
<ActorCompiler Include="TLSConnection.actor.cpp" />
<ClCompile Include="TraceFileIO.cpp" />
@ -84,6 +84,7 @@
<ActorCompiler Include="batcher.actor.h">
<EnableCompile>false</EnableCompile>
</ActorCompiler>
<ClInclude Include="AsyncFileWriteChecker.h" />
<ClInclude Include="ContinuousSample.h" />
<ClInclude Include="crc32c.h" />
<ClInclude Include="EndpointGroup.h" />
@ -94,6 +95,9 @@
<ActorCompiler Include="genericactors.actor.h">
<EnableCompile>false</EnableCompile>
</ActorCompiler>
<ClInclude Include="JSONDoc.h" />
<ClInclude Include="linux_kaio.h" />
<ClInclude Include="LoadPlugin.h" />
<ClInclude Include="sha1\SHA1.h" />
<ClInclude Include="libb64\encode.h" />
<ClInclude Include="libb64\cencode.h" />
@ -116,17 +120,18 @@
<ClInclude Include="MultiInterface.h" />
<ClInclude Include="Net2FileSystem.h" />
<ClInclude Include="PerfMetric.h" />
<ClInclude Include="QueueModel.h" />
<ClInclude Include="Replication.h" />
<ClInclude Include="ReplicationUtils.h" />
<ClInclude Include="ReplicationTypes.h" />
<ClInclude Include="ReplicationPolicy.h" />
<ClInclude Include="QueueModel.h" />
<ClInclude Include="Replication.h" />
<ClInclude Include="ReplicationUtils.h" />
<ClInclude Include="ReplicationTypes.h" />
<ClInclude Include="ReplicationPolicy.h" />
<ClInclude Include="RangeMap.h" />
<ClInclude Include="simulator.h" />
<ClInclude Include="sim_validation.h" />
<ClInclude Include="Smoother.h" />
<ClInclude Include="TLSConnection.h" />
<ClInclude Include="TraceFileIO.h" />
<ClInclude Include="xml2json.hpp" />
<ClInclude Include="zlib\zlib.h" />
<ClInclude Include="zlib\deflate.h" />
<ClInclude Include="zlib\gzguts.h" />

View File

@ -87,6 +87,8 @@
<ClCompile Include="ReplicationPolicy.cpp" />
<ClCompile Include="crc32c.cpp" />
<ClCompile Include="generated-constants.cpp" />
<ClCompile Include="AsyncFileWriteChecker.cpp" />
<ClCompile Include="ReplicationUtils.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="LoadBalance.h" />
@ -165,6 +167,12 @@
<ClInclude Include="ReplicationTypes.h" />
<ClInclude Include="ReplicationPolicy.h" />
<ClInclude Include="crc32c.h" />
<ClInclude Include="ReplicationUtils.h" />
<ClInclude Include="xml2json.hpp" />
<ClInclude Include="AsyncFileWriteChecker.h" />
<ClInclude Include="JSONDoc.h" />
<ClInclude Include="linux_kaio.h" />
<ClInclude Include="LoadPlugin.h" />
</ItemGroup>
<ItemGroup>
<Filter Include="libcoroutine">

View File

@ -1568,6 +1568,7 @@ ACTOR Future<Void> timeKeeperSetVersion(ClusterControllerData *self) {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->set(timeKeeperVersionKey, TIME_KEEPER_VERSION);
Void _ = wait(tr->commit());
break;
@ -1595,6 +1596,7 @@ ACTOR Future<Void> timeKeeper(ClusterControllerData *self) {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Optional<Value> disableValue = wait( tr->get(timeKeeperDisableKey) );
if(disableValue.present()) {

View File

@ -1402,7 +1402,7 @@ ACTOR Future<vector<std::pair<StorageServerInterface, ProcessClass>>> getServerL
}
ACTOR Future<Void> waitServerListChange( DDTeamCollection *self, Database cx, FutureStream<Void> serverRemoved ) {
state Future<Void> checkSignal = delay(SERVER_KNOBS->SERVER_LIST_DELAY, TaskDataDistribution);
state Future<Void> checkSignal = delay(SERVER_KNOBS->SERVER_LIST_DELAY);
state Future<vector<std::pair<StorageServerInterface, ProcessClass>>> serverListAndProcessClasses = Never();
state bool isFetchingResults = false;
state Transaction tr(cx);
@ -1440,7 +1440,7 @@ ACTOR Future<Void> waitServerListChange( DDTeamCollection *self, Database cx, Fu
}
tr = Transaction(cx);
checkSignal = delay(SERVER_KNOBS->SERVER_LIST_DELAY, TaskDataDistribution);
checkSignal = delay(SERVER_KNOBS->SERVER_LIST_DELAY);
}
when( Void _ = waitNext( serverRemoved ) ) {
if( isFetchingResults ) {

View File

@ -59,6 +59,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( MAX_READ_TRANSACTION_LIFE_VERSIONS, 5 * VERSIONS_PER_SECOND ); if (randomize && BUGGIFY) MAX_READ_TRANSACTION_LIFE_VERSIONS=std::max<int>(1, 0.1 * VERSIONS_PER_SECOND); else if( randomize && BUGGIFY ) MAX_READ_TRANSACTION_LIFE_VERSIONS = 10 * VERSIONS_PER_SECOND;
init( MAX_WRITE_TRANSACTION_LIFE_VERSIONS, 5 * VERSIONS_PER_SECOND ); if (randomize && BUGGIFY) MAX_WRITE_TRANSACTION_LIFE_VERSIONS=std::max<int>(1, 1 * VERSIONS_PER_SECOND);
init( MAX_COMMIT_BATCH_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) MAX_COMMIT_BATCH_INTERVAL = 2.0; // Each master proxy generates a CommitTransactionBatchRequest at least this often, so that versions always advance smoothly
MAX_COMMIT_BATCH_INTERVAL = std::min(MAX_COMMIT_BATCH_INTERVAL, MAX_READ_TRANSACTION_LIFE_VERSIONS/double(2*VERSIONS_PER_SECOND)); // Ensure that the proxy commits 2 times every MAX_READ_TRANSACTION_LIFE_VERSIONS, otherwise the master will not give out versions fast enough
// Data distribution queue
init( HEALTH_POLL_TIME, 1.0 );

View File

@ -175,7 +175,7 @@ struct ILogSystem {
//returns immediately if hasMessage() returns true.
//returns when either the result of hasMessage() or version() has changed.
virtual Future<Void> getMore() = 0;
virtual Future<Void> getMore(int taskID = TaskTLogPeekReply) = 0;
//returns when the failure monitor detects that the servers associated with the cursor are failed
virtual Future<Void> onFailed() = 0;
@ -247,7 +247,7 @@ struct ILogSystem {
virtual void advanceTo(LogMessageVersion n);
virtual Future<Void> getMore();
virtual Future<Void> getMore(int taskID = TaskTLogPeekReply);
virtual Future<Void> onFailed();
@ -310,7 +310,7 @@ struct ILogSystem {
virtual void advanceTo(LogMessageVersion n);
virtual Future<Void> getMore();
virtual Future<Void> getMore(int taskID = TaskTLogPeekReply);
virtual Future<Void> onFailed();
@ -414,7 +414,7 @@ struct ILogSystem {
virtual void advanceTo(LogMessageVersion n);
virtual Future<Void> getMore();
virtual Future<Void> getMore(int taskID = TaskTLogPeekReply);
virtual Future<Void> onFailed();

View File

@ -123,7 +123,7 @@ void ILogSystem::ServerPeekCursor::advanceTo(LogMessageVersion n) {
}
}
ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self ) {
ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self, int taskID ) {
if( !self->interf || self->messageVersion >= self->end ) {
Void _ = wait( Future<Void>(Never()));
throw internal_error();
@ -136,7 +136,7 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
loop {
try {
while(self->futureResults.size() < SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->interf->get().present()) {
self->futureResults.push_back( brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, std::make_pair(self->randomID, self->sequence++)), TaskTLogPeekReply) ) );
self->futureResults.push_back( brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, std::make_pair(self->randomID, self->sequence++)), taskID) ) );
}
choose {
@ -177,7 +177,7 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
}
}
ACTOR Future<Void> serverPeekGetMore( ILogSystem::ServerPeekCursor* self ) {
ACTOR Future<Void> serverPeekGetMore( ILogSystem::ServerPeekCursor* self, int taskID ) {
if( !self->interf || self->messageVersion >= self->end ) {
Void _ = wait( Future<Void>(Never()));
throw internal_error();
@ -186,7 +186,7 @@ ACTOR Future<Void> serverPeekGetMore( ILogSystem::ServerPeekCursor* self ) {
loop {
choose {
when( TLogPeekReply res = wait( self->interf->get().present() ?
brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked), TaskTLogPeekReply) ) : Never() ) ) {
brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked), taskID) ) : Never() ) ) {
self->results = res;
if(res.popped.present())
self->poppedVersion = std::min( std::max(self->poppedVersion, res.popped.get()), self->end.version );
@ -210,12 +210,12 @@ ACTOR Future<Void> serverPeekGetMore( ILogSystem::ServerPeekCursor* self ) {
}
}
Future<Void> ILogSystem::ServerPeekCursor::getMore() {
Future<Void> ILogSystem::ServerPeekCursor::getMore(int taskID) {
//TraceEvent("SPC_getMore", randomID).detail("hasMessage", hasMessage()).detail("more", !more.isValid() || more.isReady()).detail("messageVersion", messageVersion.toString()).detail("end", end.toString());
if( hasMessage() )
return Void();
if( !more.isValid() || more.isReady() ) {
more = parallelGetMore ? serverPeekParallelGetMore(this) : serverPeekGetMore(this);
more = parallelGetMore ? serverPeekParallelGetMore(this, taskID) : serverPeekGetMore(this, taskID);
}
return more;
}
@ -368,17 +368,17 @@ void ILogSystem::MergedPeekCursor::advanceTo(LogMessageVersion n) {
calcHasMessage();
}
ACTOR Future<Void> mergedPeekGetMore(ILogSystem::MergedPeekCursor* self, LogMessageVersion startVersion) {
ACTOR Future<Void> mergedPeekGetMore(ILogSystem::MergedPeekCursor* self, LogMessageVersion startVersion, int taskID) {
loop {
//TraceEvent("MPC_getMoreA", self->randomID).detail("start", startVersion.toString());
if(self->bestServer >= 0 && self->serverCursors[self->bestServer]->isActive()) {
ASSERT(!self->serverCursors[self->bestServer]->hasMessage());
Void _ = wait( self->serverCursors[self->bestServer]->getMore() || self->serverCursors[self->bestServer]->onFailed() );
Void _ = wait( self->serverCursors[self->bestServer]->getMore(taskID) || self->serverCursors[self->bestServer]->onFailed() );
} else {
vector<Future<Void>> q;
for (auto& c : self->serverCursors)
if (!c->hasMessage())
q.push_back(c->getMore());
q.push_back(c->getMore(taskID));
Void _ = wait(quorum(q, 1));
}
self->calcHasMessage();
@ -388,7 +388,7 @@ ACTOR Future<Void> mergedPeekGetMore(ILogSystem::MergedPeekCursor* self, LogMess
}
}
Future<Void> ILogSystem::MergedPeekCursor::getMore() {
Future<Void> ILogSystem::MergedPeekCursor::getMore(int taskID) {
if(!serverCursors.size())
return Never();
@ -402,7 +402,7 @@ Future<Void> ILogSystem::MergedPeekCursor::getMore() {
if (version() > startVersion)
return Void();
return mergedPeekGetMore(this, startVersion);
return mergedPeekGetMore(this, startVersion, taskID);
}
Future<Void> ILogSystem::MergedPeekCursor::onFailed() {
@ -748,13 +748,13 @@ void ILogSystem::MultiCursor::advanceTo(LogMessageVersion n) {
cursors.back()->advanceTo(n);
}
Future<Void> ILogSystem::MultiCursor::getMore() {
Future<Void> ILogSystem::MultiCursor::getMore(int taskID) {
while( cursors.size() > 1 && cursors.back()->version() >= epochEnds.back() ) {
poppedVersion = std::max(poppedVersion, cursors.back()->popped());
cursors.pop_back();
epochEnds.pop_back();
}
return cursors.back()->getMore();
return cursors.back()->getMore(taskID);
}
Future<Void> ILogSystem::MultiCursor::onFailed() {

View File

@ -292,7 +292,7 @@ ACTOR Future<Void> waitForQuietDatabase( Database cx, Reference<AsyncVar<ServerD
if(g_network->isSimulated())
Void _ = wait(delay(5.0));
//Require 2 consecutive successful quiet database checks spaced 1 second apart
//Require 3 consecutive successful quiet database checks spaced 2 second apart
state int numSuccesses = 0;
loop {
@ -322,7 +322,7 @@ ACTOR Future<Void> waitForQuietDatabase( Database cx, Reference<AsyncVar<ServerD
Void _ = wait( delay( 1.0 ) );
numSuccesses = 0;
} else {
if(++numSuccesses == 2) {
if(++numSuccesses == 3) {
TraceEvent(("QuietDatabase" + phase + "Done").c_str());
break;
}

View File

@ -269,7 +269,7 @@ struct StorageServerMetrics {
// This function can run on untrusted user data. We must validate all divisions carefully.
KeyRef getSplitKey( int64_t remaining, int64_t estimated, int64_t limits, int64_t used, int64_t infinity,
bool isLastShard, StorageMetricSample& sample, double divisor, KeyRef const& lastKey, KeyRef const& key )
bool isLastShard, StorageMetricSample& sample, double divisor, KeyRef const& lastKey, KeyRef const& key, bool hasUsed )
{
ASSERT(remaining >= 0);
ASSERT(limits > 0);
@ -290,7 +290,7 @@ struct StorageServerMetrics {
// This does the conversion from native units to bytes using the divisor.
double offset = (expectedSize - used) / divisor;
if( offset <= 0 )
return lastKey;
return hasUsed ? lastKey : key;
return sample.splitEstimate( KeyRangeRef(lastKey, key), offset * ( ( 1.0 - SERVER_KNOBS->SPLIT_JITTER_AMOUNT ) + 2 * g_random->random01() * SERVER_KNOBS->SPLIT_JITTER_AMOUNT ) );
}
}
@ -312,16 +312,16 @@ struct StorageServerMetrics {
if( remaining.bytes < 2*SERVER_KNOBS->MIN_SHARD_BYTES )
break;
KeyRef key = req.keys.end;
bool hasUsed = used.bytes != 0 || used.bytesPerKSecond != 0 || used.iosPerKSecond != 0;
key = getSplitKey( remaining.bytes, estimated.bytes, req.limits.bytes, used.bytes,
req.limits.infinity, req.isLastShard, byteSample, 1, lastKey, key );
req.limits.infinity, req.isLastShard, byteSample, 1, lastKey, key, hasUsed );
if( used.bytes < SERVER_KNOBS->MIN_SHARD_BYTES )
key = std::max( key, byteSample.splitEstimate( KeyRangeRef(lastKey, req.keys.end), SERVER_KNOBS->MIN_SHARD_BYTES - used.bytes ) );
key = getSplitKey( remaining.iosPerKSecond, estimated.iosPerKSecond, req.limits.iosPerKSecond, used.iosPerKSecond,
req.limits.infinity, req.isLastShard, iopsSample, SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS, lastKey, key );
req.limits.infinity, req.isLastShard, iopsSample, SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS, lastKey, key, hasUsed );
key = getSplitKey( remaining.bytesPerKSecond, estimated.bytesPerKSecond, req.limits.bytesPerKSecond, used.bytesPerKSecond,
req.limits.infinity, req.isLastShard, bandwidthSample, SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS, lastKey, key );
ASSERT( key != lastKey || used.bytes != 0 || used.bytesPerKSecond != 0 || used.iosPerKSecond != 0);
req.limits.infinity, req.isLastShard, bandwidthSample, SERVER_KNOBS->STORAGE_METRICS_AVERAGE_INTERVAL_PER_KSECONDS, lastKey, key, hasUsed );
ASSERT( key != lastKey || hasUsed);
if( key == req.keys.end )
break;
reply.splits.push_back_deep( reply.splits.arena(), key );

View File

@ -433,7 +433,6 @@ ACTOR Future<Void> tLogLock( TLogData* self, ReplyPromise< TLogLockResult > repl
TraceEvent("TLogStop", logData->logId).detail("Ver", stopVersion).detail("isStopped", logData->stopped).detail("queueCommitted", logData->queueCommittedVersion.get());
logData->stopped = true;
logData->recovery = Void();
if(!logData->recoveryComplete.isSet()) {
logData->recoveryComplete.sendError(end_of_stream());
}
@ -1205,19 +1204,29 @@ ACTOR Future<Void> rejoinMasters( TLogData* self, TLogInterface tli, DBRecoveryC
}
}
ACTOR Future<Void> respondToRecovered( TLogInterface tli, Future<Void> recovery ) {
ACTOR Future<Void> respondToRecovered( TLogInterface tli, Promise<Void> recoveryComplete, Future<Void> recovery ) {
state bool finishedRecovery = true;
try {
Void _ = wait( recovery );
Void _ = wait( recoveryComplete.getFuture() || recovery );
} catch( Error &e ) {
if(e.code() == error_code_end_of_stream) {
return Void();
if(e.code() != error_code_end_of_stream) {
throw;
}
throw;
finishedRecovery = false;
}
ASSERT(recoveryComplete.isSet());
if(!finishedRecovery) {
recovery = Void();
}
loop {
TLogRecoveryFinishedRequest req = waitNext( tli.recoveryFinished.getFuture() );
req.reply.send(Void());
if(finishedRecovery) {
req.reply.send(Void());
} else {
req.reply.send(Never());
}
}
}
@ -1317,7 +1326,6 @@ ACTOR Future<Void> serveTLogInterface( TLogData* self, TLogInterface tli, Refere
void removeLog( TLogData* self, Reference<LogData> logData ) {
TraceEvent("TLogRemoved", logData->logId).detail("input", logData->bytesInput.getValue()).detail("durable", logData->bytesDurable.getValue());
logData->stopped = true;
logData->recovery = Void();
if(!logData->recoveryComplete.isSet()) {
logData->recoveryComplete.sendError(end_of_stream());
}
@ -1468,8 +1476,8 @@ ACTOR Future<Void> tLogCore( TLogData* self, Reference<LogData> logData, TLogInt
state Future<Void> warningCollector = timeoutWarningCollector( warningCollectorInput.getFuture(), 1.0, "TLogQueueCommitSlow", self->dbgid );
state Future<Void> error = actorCollection( logData->addActor.getFuture() );
logData->addActor.send( waitFailureServer(tli.waitFailure.getFuture()) );
logData->addActor.send( respondToRecovered(tli, logData->recoveryComplete.getFuture()) );
logData->addActor.send( logData->recovery );
logData->addActor.send( waitFailureServer( tli.waitFailure.getFuture()) );
logData->addActor.send( logData->removed );
//FIXME: update tlogMetrics to include new information, or possibly only have one copy for the shared instance
logData->addActor.send( traceCounters("TLogMetrics", logData->logId, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &logData->cc, logData->logId.toString() + "/TLogMetrics"));
@ -1711,7 +1719,7 @@ bool tlogTerminated( TLogData* self, IKeyValueStore* persistentData, TLogQueue*
return false;
}
ACTOR Future<Void> recoverTagFromLogSystem( TLogData* self, Reference<LogData> logData, Version beginVersion, Version endVersion, Tag tag, Reference<AsyncVar<int>> uncommittedBytes, Reference<AsyncVar<Reference<ILogSystem>>> logSystem ) {
ACTOR Future<Void> recoverTagFromLogSystem( TLogData* self, Reference<LogData> logData, Version beginVersion, Version endVersion, Tag tag, Reference<AsyncVar<int>> uncommittedBytes, Reference<AsyncVar<Reference<ILogSystem>>> logSystem, int taskID ) {
state Future<Void> dbInfoChange = Void();
state Reference<ILogSystem::IPeekCursor> r;
state Version tagAt = beginVersion;
@ -1723,7 +1731,7 @@ ACTOR Future<Void> recoverTagFromLogSystem( TLogData* self, Reference<LogData> l
while (tagAt <= endVersion) {
loop {
choose {
when(Void _ = wait( r ? r->getMore() : Never() ) ) {
when(Void _ = wait( r ? r->getMore(taskID) : Never() ) ) {
break;
}
when( Void _ = wait( dbInfoChange ) ) {
@ -1825,7 +1833,7 @@ ACTOR Future<Void> recoverFromLogSystem( TLogData* self, Reference<LogData> logD
state Future<Void> updater = updateLogSystem(self, logData, recoverFrom, logSystem);
for(auto tag : recoverTags )
recoverFutures.push_back(recoverTagFromLogSystem(self, logData, knownCommittedVersion, recoverAt, tag, uncommittedBytes, logSystem));
recoverFutures.push_back(recoverTagFromLogSystem(self, logData, knownCommittedVersion, recoverAt, tag, uncommittedBytes, logSystem, TaskTLogPeekReply));
state Future<Void> copyDone = waitForAll(recoverFutures);
state Future<Void> recoveryDone = Never();
@ -1837,7 +1845,7 @@ ACTOR Future<Void> recoverFromLogSystem( TLogData* self, Reference<LogData> logD
when(Void _ = wait(copyDone)) {
recoverFutures.clear();
for(auto tag : recoverTags )
recoverFutures.push_back(recoverTagFromLogSystem(self, logData, 0, knownCommittedVersion, tag, uncommittedBytes, logSystem));
recoverFutures.push_back(recoverTagFromLogSystem(self, logData, 0, knownCommittedVersion, tag, uncommittedBytes, logSystem, TaskBatchCopy));
copyDone = Never();
recoveryDone = waitForAll(recoverFutures);
@ -1884,6 +1892,7 @@ ACTOR Future<Void> recoverFromLogSystem( TLogData* self, Reference<LogData> logD
return Void();
} catch( Error &e ) {
TraceEvent("TLogRecoveryError", logData->logId).error(e,true);
ASSERT(e.code() != error_code_end_of_stream); //respondToRecovered would not handle the error properly if this function throws end_of_stream
if(!copyComplete.isSet())
copyComplete.sendError(worker_removed());
throw;
@ -1910,7 +1919,6 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
}
}
it.second->stopped = true;
it.second->recovery = Void();
if(!it.second->recoveryComplete.isSet()) {
it.second->recoveryComplete.sendError(end_of_stream());
}
@ -1947,7 +1955,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
throw worker_removed();
}
logData->recovery = recoverFromLogSystem( self, logData, req.recoverFrom, req.recoverAt, req.knownCommittedVersion, req.recoverTags, copyComplete );
logData->recovery = respondToRecovered( recruited, logData->recoveryComplete, recoverFromLogSystem( self, logData, req.recoverFrom, req.recoverAt, req.knownCommittedVersion, req.recoverTags, copyComplete ) );
Void _ = wait(copyComplete.getFuture() || logData->removed );
} else {
// Brand new tlog, initialization has already been done by caller
@ -2019,7 +2027,6 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
}
for( auto& it : self.id_data ) {
it.second->recovery = Void();
if(!it.second->recoveryComplete.isSet()) {
it.second->recoveryComplete.sendError(end_of_stream());
}

View File

@ -183,6 +183,7 @@
<ClInclude Include="sqlite\sqliteLimit.h" />
<ClInclude Include="Status.h" />
<ClInclude Include="StorageMetrics.h" />
<ClInclude Include="template_fdb.h" />
<ClInclude Include="TLogInterface.h" />
<ClInclude Include="WaitFailure.h" />
<ClInclude Include="TesterInterface.h" />

View File

@ -154,7 +154,6 @@
<Filter>workloads</Filter>
</ActorCompiler>
<ActorCompiler Include="networktest.actor.cpp" />
<ActorCompiler Include="MetricLogger.actor.cpp" />
<ActorCompiler Include="workloads\SaveAndKill.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>
@ -202,7 +201,6 @@
<ActorCompiler Include="workloads\UnitTests.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>
<ActorCompiler Include="workloads\WorkerErrors.actor.cpp" />
<ActorCompiler Include="workloads\FuzzApiCorrectness.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>
@ -265,6 +263,15 @@
<ActorCompiler Include="workloads\LowLatency.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>
<ActorCompiler Include="workloads\SlowTaskWorkload.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>
<ActorCompiler Include="workloads\WorkerErrors.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>
<ActorCompiler Include="workloads\DiskDurability.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>
</ItemGroup>
<ItemGroup>
<ClCompile Include="SkipList.cpp" />
@ -348,6 +355,7 @@
<ClInclude Include="ApplyMetadataMutation.h" />
<ClInclude Include="RecoveryState.h" />
<ClInclude Include="LogProtocolMessage.h" />
<ClInclude Include="template_fdb.h" />
</ItemGroup>
<ItemGroup>
<Filter Include="workloads">

View File

@ -1063,7 +1063,7 @@ ACTOR Future<Void> runTests( Reference<AsyncVar<Optional<struct ClusterControlle
if(tests.empty() || useDB && false ) { //FIXME: re-enable quiescence
if(waitForQuiescenceEnd) {
try {
Void _ = wait( quietDatabase( cx, dbInfo, "End", 1e6, 2e6, 2e6 ) ||
Void _ = wait( quietDatabase( cx, dbInfo, "End", 0, 2e6, 2e6 ) ||
( databasePingDelay == 0.0 ? Never() : testDatabaseLiveness( cx, databasePingDelay, "QuietDatabaseEnd" ) ) );
} catch( Error& e ) {
if( e.code() != error_code_actor_cancelled )

View File

@ -12,12 +12,13 @@
</ItemGroup>
<ItemGroup>
<ActorCompiler Include="ActorCollection.actor.cpp" />
<ActorCompiler Include="CompressedInt.actor.cpp" />
<ActorCompiler Include="CompressedInt.actor.cpp" />
<ClCompile Include="boost.cpp" />
<ClCompile Include="Deque.cpp" />
<ClCompile Include="Error.cpp" />
<ClCompile Include="FastAlloc.cpp" />
<ClCompile Include="FaultInjection.cpp" />
<ClInclude Include="MetricSample.h" />
<ClInclude Include="Profiler.h" />
<ActorCompiler Include="Profiler.actor.cpp" />
<ActorCompiler Include="Net2.actor.cpp" />
@ -75,6 +76,7 @@
<ClInclude Include="Net2Packet.h" />
<ClInclude Include="serialize.h" />
<ClInclude Include="SimpleOpt.h" />
<ClInclude Include="stacktrace.h" />
<ClInclude Include="Stats.h" />
<ClInclude Include="SystemMonitor.h" />
<ClInclude Include="ThreadPrimitives.h" />

View File

@ -10,6 +10,8 @@
<ActorCompiler Include="genericactors.actor.h" />
<ActorCompiler Include="Profiler.actor.cpp" />
<ActorCompiler Include="EventTypes.actor.h" />
<ActorCompiler Include="CompressedInt.actor.cpp" />
<ActorCompiler Include="TDMetric.actor.h" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="boost.cpp" />
@ -32,6 +34,8 @@
<ClCompile Include="Net2Packet.cpp" />
<ClCompile Include="ThreadHelper.cpp" />
<ClCompile Include="version.cpp" />
<ClCompile Include="stacktrace.amalgamation.cpp" />
<ClCompile Include="SignalSafeUnwind.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="ActorCollection.h" />
@ -54,7 +58,6 @@
<ClInclude Include="Trace.h" />
<ClInclude Include="ThreadSafeQueue.h" />
<ClInclude Include="Knobs.h" />
<ClInclude Include="TDMetric.h" />
<ClInclude Include="UnitTest.h" />
<ClInclude Include="Stats.h" />
<ClInclude Include="Deque.h" />
@ -64,6 +67,11 @@
<ClInclude Include="network.h" />
<ClInclude Include="AsioReactor.h" />
<ClInclude Include="Net2Packet.h" />
<ClInclude Include="Profiler.h" />
<ClInclude Include="CompressedInt.h" />
<ClInclude Include="SignalSafeUnwind.h" />
<ClInclude Include="MetricSample.h" />
<ClInclude Include="stacktrace.h" />
</ItemGroup>
<ItemGroup>
<None Include="no_intellisense.opt" />

View File

@ -53,6 +53,7 @@ enum {
TaskProxyCommit = 8540,
TaskTLogConfirmRunningReply = 8530,
TaskTLogConfirmRunning = 8520,
TaskProxyGetKeyServersLocations = 8515,
TaskProxyGRVTimer = 8510,
TaskProxyGetConsistentReadVersion = 8500,
TaskDefaultPromiseEndpoint = 8000,
@ -67,6 +68,7 @@ enum {
TaskDataDistribution = 3500,
TaskDiskWrite = 3010,
TaskUpdateStorage = 3000,
TaskBatchCopy = 2900,
TaskLowPriority = 2000,
TaskMinPriority = 1000