Merge branch 'release-5.0'
# Conflicts: # versions.target
This commit is contained in:
commit
50fb44be92
|
@ -115,6 +115,7 @@ struct ILogSystem {
|
||||||
bool parallelGetMore;
|
bool parallelGetMore;
|
||||||
int sequence;
|
int sequence;
|
||||||
Deque<Future<TLogPeekReply>> futureResults;
|
Deque<Future<TLogPeekReply>> futureResults;
|
||||||
|
Future<Void> interfaceChanged;
|
||||||
|
|
||||||
ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>> const& interf, Tag tag, Version begin, Version end, bool returnIfBlocked, bool parallelGetMore );
|
ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>> const& interf, Tag tag, Version begin, Version end, bool returnIfBlocked, bool parallelGetMore );
|
||||||
|
|
||||||
|
|
|
@ -119,6 +119,10 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
|
||||||
throw internal_error();
|
throw internal_error();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(!self->interfaceChanged.isValid()) {
|
||||||
|
self->interfaceChanged = self->interf->onChange();
|
||||||
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
try {
|
try {
|
||||||
while(self->futureResults.size() < SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->interf->get().present()) {
|
while(self->futureResults.size() < SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->interf->get().present()) {
|
||||||
|
@ -139,7 +143,9 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
|
||||||
//TraceEvent("SPC_getMoreB", self->randomID).detail("has", self->hasMessage()).detail("end", res.end).detail("popped", res.popped.present() ? res.popped.get() : 0);
|
//TraceEvent("SPC_getMoreB", self->randomID).detail("has", self->hasMessage()).detail("end", res.end).detail("popped", res.popped.present() ? res.popped.get() : 0);
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
when( Void _ = wait( self->interf->onChange() ) ) {
|
when( Void _ = wait( self->interfaceChanged ) ) {
|
||||||
|
self->interfaceChanged = self->interf->onChange();
|
||||||
|
self->randomID = g_random->randomUniqueID();
|
||||||
self->sequence = 0;
|
self->sequence = 0;
|
||||||
self->futureResults.clear();
|
self->futureResults.clear();
|
||||||
}
|
}
|
||||||
|
@ -150,6 +156,7 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
|
||||||
return Void();
|
return Void();
|
||||||
} else if(e.code() == error_code_timed_out) {
|
} else if(e.code() == error_code_timed_out) {
|
||||||
TraceEvent("PeekCursorTimedOut", self->randomID);
|
TraceEvent("PeekCursorTimedOut", self->randomID);
|
||||||
|
self->interfaceChanged = self->interf->onChange();
|
||||||
self->randomID = g_random->randomUniqueID();
|
self->randomID = g_random->randomUniqueID();
|
||||||
self->sequence = 0;
|
self->sequence = 0;
|
||||||
self->futureResults.clear();
|
self->futureResults.clear();
|
||||||
|
|
|
@ -1129,11 +1129,11 @@ ACTOR Future<Void> respondToRecovered( TLogInterface tli, Future<Void> recovery
|
||||||
|
|
||||||
ACTOR Future<Void> cleanupPeekTrackers( TLogData* self ) {
|
ACTOR Future<Void> cleanupPeekTrackers( TLogData* self ) {
|
||||||
loop {
|
loop {
|
||||||
double minExpireTime = SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME;
|
double minTimeUntilExpiration = SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME;
|
||||||
auto it = self->peekTracker.begin();
|
auto it = self->peekTracker.begin();
|
||||||
while(it != self->peekTracker.end()) {
|
while(it != self->peekTracker.end()) {
|
||||||
double expireTime = SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME - now()-it->second.lastUpdate;
|
double timeUntilExpiration = it->second.lastUpdate + SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME - now();
|
||||||
if(expireTime < 1.0e-6) {
|
if(timeUntilExpiration < 1.0e-6) {
|
||||||
for(auto seq : it->second.sequence_version) {
|
for(auto seq : it->second.sequence_version) {
|
||||||
if(!seq.second.isSet()) {
|
if(!seq.second.isSet()) {
|
||||||
seq.second.sendError(timed_out());
|
seq.second.sendError(timed_out());
|
||||||
|
@ -1141,12 +1141,12 @@ ACTOR Future<Void> cleanupPeekTrackers( TLogData* self ) {
|
||||||
}
|
}
|
||||||
it = self->peekTracker.erase(it);
|
it = self->peekTracker.erase(it);
|
||||||
} else {
|
} else {
|
||||||
minExpireTime = std::min(minExpireTime, expireTime);
|
minTimeUntilExpiration = std::min(minTimeUntilExpiration, timeUntilExpiration);
|
||||||
++it;
|
++it;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Void _ = wait( delay(minExpireTime) );
|
Void _ = wait( delay(minTimeUntilExpiration) );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,7 @@
|
||||||
|
|
||||||
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
|
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
|
||||||
<Product Name='$(var.Title)'
|
<Product Name='$(var.Title)'
|
||||||
Id='{6B9B840B-A6E2-4E07-B41A-86910C574923}'
|
Id='{137B53CD-E95D-450D-B16C-268F2855EA13}'
|
||||||
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
|
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
|
||||||
Version='$(var.Version)'
|
Version='$(var.Version)'
|
||||||
Manufacturer='$(var.Manufacturer)'
|
Manufacturer='$(var.Manufacturer)'
|
||||||
|
|
Loading…
Reference in New Issue