Merge pull request #502 from etschannen/feature-remote-logs
fixed a few performance issues with multiple DC deployments
This commit is contained in:
commit
abf0e68364
|
@ -484,7 +484,7 @@ Future<Void> storageServerTracker(
|
||||||
MoveKeysLock const& lock,
|
MoveKeysLock const& lock,
|
||||||
UID const& masterId,
|
UID const& masterId,
|
||||||
std::map<UID, Reference<TCServerInfo>>* const& other_servers,
|
std::map<UID, Reference<TCServerInfo>>* const& other_servers,
|
||||||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > const& changes,
|
Optional<PromiseStream< std::pair<UID, Optional<StorageServerInterface>> >> const& changes,
|
||||||
Promise<Void> const& errorOut,
|
Promise<Void> const& errorOut,
|
||||||
Version const& addedVersion);
|
Version const& addedVersion);
|
||||||
|
|
||||||
|
@ -513,7 +513,7 @@ struct DDTeamCollection {
|
||||||
PromiseStream<UID> removedServers;
|
PromiseStream<UID> removedServers;
|
||||||
std::set<UID> recruitingIds; // The IDs of the SS which are being recruited
|
std::set<UID> recruitingIds; // The IDs of the SS which are being recruited
|
||||||
std::set<NetworkAddress> recruitingLocalities;
|
std::set<NetworkAddress> recruitingLocalities;
|
||||||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges;
|
Optional<PromiseStream< std::pair<UID, Optional<StorageServerInterface>> >> serverChanges;
|
||||||
Future<Void> initialFailureReactionDelay;
|
Future<Void> initialFailureReactionDelay;
|
||||||
Future<Void> initializationDoneActor;
|
Future<Void> initializationDoneActor;
|
||||||
Promise<Void> serverTrackerErrorOut;
|
Promise<Void> serverTrackerErrorOut;
|
||||||
|
@ -544,7 +544,7 @@ struct DDTeamCollection {
|
||||||
DatabaseConfiguration configuration,
|
DatabaseConfiguration configuration,
|
||||||
std::vector<Optional<Key>> includedDCs,
|
std::vector<Optional<Key>> includedDCs,
|
||||||
Optional<std::vector<Optional<Key>>> otherTrackedDCs,
|
Optional<std::vector<Optional<Key>>> otherTrackedDCs,
|
||||||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > const& serverChanges,
|
Optional<PromiseStream< std::pair<UID, Optional<StorageServerInterface>> >> const& serverChanges,
|
||||||
Future<Void> readyToStart, Reference<AsyncVar<bool>> zeroHealthyTeams, bool primary,
|
Future<Void> readyToStart, Reference<AsyncVar<bool>> zeroHealthyTeams, bool primary,
|
||||||
Reference<AsyncVar<bool>> processingUnhealthy)
|
Reference<AsyncVar<bool>> processingUnhealthy)
|
||||||
:cx(cx), masterId(masterId), lock(lock), output(output), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams( true ), teamBuilder( Void() ),
|
:cx(cx), masterId(masterId), lock(lock), output(output), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams( true ), teamBuilder( Void() ),
|
||||||
|
@ -1578,7 +1578,7 @@ ACTOR Future<Void> storageServerTracker(
|
||||||
MoveKeysLock lock,
|
MoveKeysLock lock,
|
||||||
UID masterId,
|
UID masterId,
|
||||||
std::map<UID, Reference<TCServerInfo>>* other_servers,
|
std::map<UID, Reference<TCServerInfo>>* other_servers,
|
||||||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > changes,
|
Optional<PromiseStream< std::pair<UID, Optional<StorageServerInterface>> >> changes,
|
||||||
Promise<Void> errorOut,
|
Promise<Void> errorOut,
|
||||||
Version addedVersion)
|
Version addedVersion)
|
||||||
{
|
{
|
||||||
|
@ -1593,7 +1593,9 @@ ACTOR Future<Void> storageServerTracker(
|
||||||
state Future<KeyValueStoreType> storeTracker = keyValueStoreTypeTracker( self, server );
|
state Future<KeyValueStoreType> storeTracker = keyValueStoreTypeTracker( self, server );
|
||||||
state bool hasWrongStoreTypeOrDC = false;
|
state bool hasWrongStoreTypeOrDC = false;
|
||||||
|
|
||||||
changes.send( std::make_pair(server->id, server->lastKnownInterface) );
|
if(changes.present()) {
|
||||||
|
changes.get().send( std::make_pair(server->id, server->lastKnownInterface) );
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
loop {
|
loop {
|
||||||
|
@ -1680,7 +1682,9 @@ ACTOR Future<Void> storageServerTracker(
|
||||||
when( Void _ = wait( failureTracker ) ) {
|
when( Void _ = wait( failureTracker ) ) {
|
||||||
// The server is failed AND all data has been removed from it, so permanently remove it.
|
// The server is failed AND all data has been removed from it, so permanently remove it.
|
||||||
TraceEvent("StatusMapChange", masterId).detail("ServerID", server->id).detail("Status", "Removing");
|
TraceEvent("StatusMapChange", masterId).detail("ServerID", server->id).detail("Status", "Removing");
|
||||||
changes.send( std::make_pair(server->id, Optional<StorageServerInterface>()) );
|
if(changes.present()) {
|
||||||
|
changes.get().send( std::make_pair(server->id, Optional<StorageServerInterface>()) );
|
||||||
|
}
|
||||||
|
|
||||||
// Remove server from FF/serverList
|
// Remove server from FF/serverList
|
||||||
Void _ = wait( removeStorageServer( cx, server->id, lock ) );
|
Void _ = wait( removeStorageServer( cx, server->id, lock ) );
|
||||||
|
@ -1699,7 +1703,9 @@ ACTOR Future<Void> storageServerTracker(
|
||||||
server->lastKnownInterface = newInterface.first;
|
server->lastKnownInterface = newInterface.first;
|
||||||
server->lastKnownClass = newInterface.second;
|
server->lastKnownClass = newInterface.second;
|
||||||
interfaceChanged = server->onInterfaceChanged;
|
interfaceChanged = server->onInterfaceChanged;
|
||||||
changes.send( std::make_pair(server->id, server->lastKnownInterface) );
|
if(changes.present()) {
|
||||||
|
changes.get().send( std::make_pair(server->id, server->lastKnownInterface) );
|
||||||
|
}
|
||||||
// We rely on the old failureTracker being actorCancelled since the old actor now has a pointer to an invalid location
|
// We rely on the old failureTracker being actorCancelled since the old actor now has a pointer to an invalid location
|
||||||
status = ServerStatus( status.isFailed, status.isUndesired, server->lastKnownInterface.locality );
|
status = ServerStatus( status.isFailed, status.isUndesired, server->lastKnownInterface.locality );
|
||||||
|
|
||||||
|
@ -1918,7 +1924,7 @@ ACTOR Future<Void> dataDistributionTeamCollection(
|
||||||
DatabaseConfiguration configuration,
|
DatabaseConfiguration configuration,
|
||||||
std::vector<Optional<Key>> includedDCs,
|
std::vector<Optional<Key>> includedDCs,
|
||||||
Optional<std::vector<Optional<Key>>> otherTrackedDCs,
|
Optional<std::vector<Optional<Key>>> otherTrackedDCs,
|
||||||
PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > serverChanges,
|
Optional<PromiseStream< std::pair<UID, Optional<StorageServerInterface>> >> serverChanges,
|
||||||
Future<Void> readyToStart,
|
Future<Void> readyToStart,
|
||||||
Reference<AsyncVar<bool>> zeroHealthyTeams,
|
Reference<AsyncVar<bool>> zeroHealthyTeams,
|
||||||
bool primary,
|
bool primary,
|
||||||
|
@ -2208,7 +2214,7 @@ ACTOR Future<Void> dataDistribution(
|
||||||
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, storageTeamSize, lastLimited, recoveryCommitVersion ), "DDQueue", mi.id(), &normalDDQueueErrors() ) );
|
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, mi, storageTeamSize, lastLimited, recoveryCommitVersion ), "DDQueue", mi.id(), &normalDDQueueErrors() ) );
|
||||||
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[0], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) );
|
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[0], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(), serverChanges, readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy ), "DDTeamCollectionPrimary", mi.id(), &normalDDQueueErrors() ) );
|
||||||
if (configuration.usableRegions > 1) {
|
if (configuration.usableRegions > 1) {
|
||||||
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[1], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, remoteDcIds, Optional<std::vector<Optional<Key>>>(), serverChanges, readyToStart.getFuture() && remoteRecovered, zeroHealthyTeams[1], false, processingUnhealthy ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) );
|
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( initData, tcis[1], cx, db, shardsAffectedByTeamFailure, lock, output, mi.id(), configuration, remoteDcIds, Optional<std::vector<Optional<Key>>>(), Optional<PromiseStream< std::pair<UID, Optional<StorageServerInterface>> >>(), readyToStart.getFuture() && remoteRecovered, zeroHealthyTeams[1], false, processingUnhealthy ), "DDTeamCollectionSecondary", mi.id(), &normalDDQueueErrors() ) );
|
||||||
}
|
}
|
||||||
|
|
||||||
Void _ = wait( waitForAll( actors ) );
|
Void _ = wait( waitForAll( actors ) );
|
||||||
|
|
|
@ -319,6 +319,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
||||||
init( TARGET_BYTES_PER_TLOG, 2000e6 ); if( smallTlogTarget ) TARGET_BYTES_PER_TLOG = 2000e3;
|
init( TARGET_BYTES_PER_TLOG, 2000e6 ); if( smallTlogTarget ) TARGET_BYTES_PER_TLOG = 2000e3;
|
||||||
init( SPRING_BYTES_TLOG, 400e6 ); if( smallTlogTarget ) SPRING_BYTES_TLOG = 200e3;
|
init( SPRING_BYTES_TLOG, 400e6 ); if( smallTlogTarget ) SPRING_BYTES_TLOG = 200e3;
|
||||||
init( TLOG_SPILL_THRESHOLD, 1500e6 ); if( smallTlogTarget ) TLOG_SPILL_THRESHOLD = 1500e3; if( randomize && BUGGIFY ) TLOG_SPILL_THRESHOLD = 0;
|
init( TLOG_SPILL_THRESHOLD, 1500e6 ); if( smallTlogTarget ) TLOG_SPILL_THRESHOLD = 1500e3; if( randomize && BUGGIFY ) TLOG_SPILL_THRESHOLD = 0;
|
||||||
|
init( TLOG_HARD_LIMIT_BYTES, 3000e6 ); if( smallTlogTarget ) TLOG_HARD_LIMIT_BYTES = 3000e3;
|
||||||
|
|
||||||
init( MAX_TRANSACTIONS_PER_BYTE, 1000 );
|
init( MAX_TRANSACTIONS_PER_BYTE, 1000 );
|
||||||
|
|
||||||
|
|
|
@ -257,6 +257,7 @@ public:
|
||||||
int64_t TARGET_BYTES_PER_TLOG;
|
int64_t TARGET_BYTES_PER_TLOG;
|
||||||
double SPRING_BYTES_TLOG;
|
double SPRING_BYTES_TLOG;
|
||||||
int64_t TLOG_SPILL_THRESHOLD;
|
int64_t TLOG_SPILL_THRESHOLD;
|
||||||
|
int64_t TLOG_HARD_LIMIT_BYTES;
|
||||||
|
|
||||||
double MAX_TRANSACTIONS_PER_BYTE;
|
double MAX_TRANSACTIONS_PER_BYTE;
|
||||||
|
|
||||||
|
|
|
@ -179,7 +179,7 @@ ACTOR Future<Void> pullAsyncData( LogRouterData *self ) {
|
||||||
loop {
|
loop {
|
||||||
loop {
|
loop {
|
||||||
choose {
|
choose {
|
||||||
when(Void _ = wait( r ? r->getMore() : Never() ) ) {
|
when(Void _ = wait( r ? r->getMore(TaskTLogCommit) : Never() ) ) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
when( Void _ = wait( dbInfoChange ) ) { //FIXME: does this actually happen?
|
when( Void _ = wait( dbInfoChange ) ) { //FIXME: does this actually happen?
|
||||||
|
@ -336,7 +336,7 @@ ACTOR Future<Void> logRouterPop( LogRouterData* self, TLogPopRequest req ) {
|
||||||
|
|
||||||
while(!self->messageBlocks.empty() && self->messageBlocks.front().first < minPopped) {
|
while(!self->messageBlocks.empty() && self->messageBlocks.front().first < minPopped) {
|
||||||
self->messageBlocks.pop_front();
|
self->messageBlocks.pop_front();
|
||||||
Void _ = wait(yield(TaskUpdateStorage));
|
Void _ = wait(yield(TaskTLogPop));
|
||||||
}
|
}
|
||||||
|
|
||||||
if(self->logSystem->get() && self->allowPops) {
|
if(self->logSystem->get() && self->allowPops) {
|
||||||
|
|
|
@ -1180,6 +1180,18 @@ ACTOR Future<Void> tLogCommit(
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
state double waitStartT = 0;
|
||||||
|
while( self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_HARD_LIMIT_BYTES ) {
|
||||||
|
if (now() - waitStartT >= 1) {
|
||||||
|
TraceEvent(SevWarn, "TLogUpdateLag", logData->logId)
|
||||||
|
.detail("Version", logData->version.get())
|
||||||
|
.detail("PersistentDataVersion", logData->persistentDataVersion)
|
||||||
|
.detail("PersistentDataDurableVersion", logData->persistentDataDurableVersion).suppressFor(1.0);
|
||||||
|
waitStartT = now();
|
||||||
|
}
|
||||||
|
Void _ = wait( delayJittered(.005, TaskTLogCommit) );
|
||||||
|
}
|
||||||
|
|
||||||
if (logData->version.get() == req.prevVersion) { // Not a duplicate (check relies on no waiting between here and self->version.set() below!)
|
if (logData->version.get() == req.prevVersion) { // Not a duplicate (check relies on no waiting between here and self->version.set() below!)
|
||||||
if(req.debugID.present())
|
if(req.debugID.present())
|
||||||
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.Before");
|
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.Before");
|
||||||
|
@ -1448,7 +1460,7 @@ ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, st
|
||||||
while (!endVersion.present() || logData->version.get() < endVersion.get()) {
|
while (!endVersion.present() || logData->version.get() < endVersion.get()) {
|
||||||
loop {
|
loop {
|
||||||
choose {
|
choose {
|
||||||
when(Void _ = wait( r ? r->getMore() : Never() ) ) {
|
when(Void _ = wait( r ? r->getMore(TaskTLogCommit) : Never() ) ) {
|
||||||
if(poppedIsKnownCommitted) {
|
if(poppedIsKnownCommitted) {
|
||||||
logData->knownCommittedVersion = std::max(logData->knownCommittedVersion, r->popped());
|
logData->knownCommittedVersion = std::max(logData->knownCommittedVersion, r->popped());
|
||||||
}
|
}
|
||||||
|
@ -1469,6 +1481,18 @@ ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, st
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
state double waitStartT = 0;
|
||||||
|
while( self->bytesInput - self->bytesDurable >= SERVER_KNOBS->TLOG_HARD_LIMIT_BYTES ) {
|
||||||
|
if (now() - waitStartT >= 1) {
|
||||||
|
TraceEvent(SevWarn, "TLogUpdateLag", logData->logId)
|
||||||
|
.detail("Version", logData->version.get())
|
||||||
|
.detail("PersistentDataVersion", logData->persistentDataVersion)
|
||||||
|
.detail("PersistentDataDurableVersion", logData->persistentDataDurableVersion).suppressFor(1.0);
|
||||||
|
waitStartT = now();
|
||||||
|
}
|
||||||
|
Void _ = wait( delayJittered(.005, TaskTLogCommit) );
|
||||||
|
}
|
||||||
|
|
||||||
Version ver = 0;
|
Version ver = 0;
|
||||||
std::vector<TagsAndMessage> messages;
|
std::vector<TagsAndMessage> messages;
|
||||||
while (true) {
|
while (true) {
|
||||||
|
|
|
@ -2323,9 +2323,8 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
|
||||||
// If we are disk bound and durableVersion is very old, we need to block updates or we could run out of memory
|
// If we are disk bound and durableVersion is very old, we need to block updates or we could run out of memory
|
||||||
// This is often referred to as the storage server e-brake (emergency brake)
|
// This is often referred to as the storage server e-brake (emergency brake)
|
||||||
state double waitStartT = 0;
|
state double waitStartT = 0;
|
||||||
while ( data->queueSize() >= SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES && data->durableVersion.get() < data->desiredOldestVersion.get() )
|
while ( data->queueSize() >= SERVER_KNOBS->STORAGE_HARD_LIMIT_BYTES && data->durableVersion.get() < data->desiredOldestVersion.get() ) {
|
||||||
{
|
if (now() - waitStartT >= 1) {
|
||||||
if (now() - waitStartT >= .1) {
|
|
||||||
TraceEvent(SevWarn, "StorageServerUpdateLag", data->thisServerID)
|
TraceEvent(SevWarn, "StorageServerUpdateLag", data->thisServerID)
|
||||||
.detail("Version", data->version.get())
|
.detail("Version", data->version.get())
|
||||||
.detail("DurableVersion", data->durableVersion.get()).suppressFor(1.0);
|
.detail("DurableVersion", data->durableVersion.get()).suppressFor(1.0);
|
||||||
|
|
Loading…
Reference in New Issue