2017-06-30 06:50:19 +08:00
/*
* LogRouter . actor . cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013 - 2018 Apple Inc . and the FoundationDB project authors
*
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
*
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
# include "flow/ActorCollection.h"
2019-02-18 07:41:16 +08:00
# include "fdbclient/NativeAPI.actor.h"
2019-02-18 11:13:26 +08:00
# include "fdbserver/WorkerInterface.actor.h"
2018-10-20 01:30:13 +08:00
# include "fdbserver/WaitFailure.h"
# include "fdbserver/Knobs.h"
# include "fdbserver/ServerDBInfo.h"
# include "fdbserver/LogSystem.h"
2017-06-30 06:50:19 +08:00
# include "fdbclient/SystemData.h"
2018-10-20 01:30:13 +08:00
# include "fdbserver/ApplyMetadataMutation.h"
# include "fdbserver/RecoveryState.h"
2017-06-30 06:50:19 +08:00
# include "fdbclient/Atomic.h"
# include "flow/TDMetric.actor.h"
# include "flow/Stats.h"
2018-08-11 06:18:24 +08:00
# include "flow/actorcompiler.h" // This must be the last #include.
2017-06-30 06:50:19 +08:00
struct LogRouterData {
2018-03-18 02:08:37 +08:00
struct TagData : NonCopyable , public ReferenceCounted < TagData > {
2017-06-30 06:50:19 +08:00
std : : deque < std : : pair < Version , LengthPrefixedStringRef > > version_messages ;
Version popped ;
2018-06-27 09:20:28 +08:00
Version durableKnownCommittedVersion ;
2018-03-18 02:08:37 +08:00
Tag tag ;
2017-06-30 06:50:19 +08:00
2018-06-27 09:20:28 +08:00
TagData ( Tag tag , Version popped , Version durableKnownCommittedVersion ) : tag ( tag ) , popped ( popped ) , durableKnownCommittedVersion ( durableKnownCommittedVersion ) { }
2017-06-30 06:50:19 +08:00
2019-01-26 08:49:59 +08:00
TagData ( TagData & & r ) BOOST_NOEXCEPT : version_messages ( std : : move ( r . version_messages ) ) , tag ( r . tag ) , popped ( r . popped ) , durableKnownCommittedVersion ( r . durableKnownCommittedVersion ) { }
void operator = ( TagData & & r ) BOOST_NOEXCEPT {
2017-06-30 06:50:19 +08:00
version_messages = std : : move ( r . version_messages ) ;
2018-03-18 02:08:37 +08:00
tag = r . tag ;
2017-06-30 06:50:19 +08:00
popped = r . popped ;
2018-06-27 09:20:28 +08:00
durableKnownCommittedVersion = r . durableKnownCommittedVersion ;
2017-06-30 06:50:19 +08:00
}
// Erase messages not needed to update *from* versions >= before (thus, messages with toversion <= before)
2019-06-25 17:47:35 +08:00
ACTOR Future < Void > eraseMessagesBefore ( TagData * self , Version before , LogRouterData * tlogData , TaskPriority taskID ) {
2017-06-30 06:50:19 +08:00
while ( ! self - > version_messages . empty ( ) & & self - > version_messages . front ( ) . first < before ) {
Version version = self - > version_messages . front ( ) . first ;
int64_t messagesErased = 0 ;
while ( ! self - > version_messages . empty ( ) & & self - > version_messages . front ( ) . first = = version ) {
+ + messagesErased ;
self - > version_messages . pop_front ( ) ;
}
2018-08-11 04:57:10 +08:00
wait ( yield ( taskID ) ) ;
2017-06-30 06:50:19 +08:00
}
return Void ( ) ;
}
2019-06-25 17:47:35 +08:00
Future < Void > eraseMessagesBefore ( Version before , LogRouterData * tlogData , TaskPriority taskID ) {
2017-06-30 06:50:19 +08:00
return eraseMessagesBefore ( this , before , tlogData , taskID ) ;
}
} ;
2019-07-24 02:45:04 +08:00
const UID dbgid ;
2017-06-30 06:50:19 +08:00
Reference < AsyncVar < Reference < ILogSystem > > > logSystem ;
NotifiedVersion version ;
2018-03-30 06:12:38 +08:00
NotifiedVersion minPopped ;
2019-07-24 02:45:04 +08:00
const Version startVersion ;
2018-06-22 06:29:46 +08:00
Version minKnownCommittedVersion ;
2019-02-19 08:47:38 +08:00
Version poppedVersion ;
2017-06-30 06:50:19 +08:00
Deque < std : : pair < Version , Standalone < VectorRef < uint8_t > > > > messageBlocks ;
Tag routerTag ;
2018-04-18 06:03:22 +08:00
bool allowPops ;
2018-04-29 09:04:57 +08:00
LogSet logSet ;
2019-02-16 06:33:01 +08:00
bool foundEpochEnd ;
2017-06-30 06:50:19 +08:00
2019-09-13 05:26:37 +08:00
struct PeekTrackerData {
std : : map < int , Promise < std : : pair < Version , bool > > > sequence_version ;
double lastUpdate ;
} ;
std : : map < UID , PeekTrackerData > peekTracker ;
2019-05-03 07:43:20 +08:00
CounterCollection cc ;
Future < Void > logger ;
2018-03-18 02:08:37 +08:00
std : : vector < Reference < TagData > > tag_data ; //we only store data for the remote tag locality
Reference < TagData > getTagData ( Tag tag ) {
ASSERT ( tag . locality = = tagLocalityRemoteLog ) ;
if ( tag . id > = tag_data . size ( ) ) {
tag_data . resize ( tag . id + 1 ) ;
}
return tag_data [ tag . id ] ;
}
//only callable after getTagData returns a null reference
2018-04-19 03:07:29 +08:00
Reference < TagData > createTagData ( Tag tag , Version popped , Version knownCommittedVersion ) {
2019-07-24 02:45:04 +08:00
Reference < TagData > newTagData ( new TagData ( tag , popped , knownCommittedVersion ) ) ;
2018-03-18 02:08:37 +08:00
tag_data [ tag . id ] = newTagData ;
return newTagData ;
}
2019-05-09 09:19:35 +08:00
LogRouterData ( UID dbgid , const InitializeLogRouterRequest & req ) : dbgid ( dbgid ) , routerTag ( req . routerTag ) , logSystem ( new AsyncVar < Reference < ILogSystem > > ( ) ) ,
2019-05-03 07:16:25 +08:00
version ( req . startVersion - 1 ) , minPopped ( 0 ) , startVersion ( req . startVersion ) , allowPops ( false ) , minKnownCommittedVersion ( 0 ) , poppedVersion ( 0 ) , foundEpochEnd ( false ) ,
2019-05-03 07:43:20 +08:00
cc ( " LogRouter " , dbgid . toString ( ) ) {
2018-04-29 09:04:57 +08:00
//setup just enough of a logSet to be able to call getPushLocations
logSet . logServers . resize ( req . tLogLocalities . size ( ) ) ;
logSet . tLogPolicy = req . tLogPolicy ;
2018-04-30 04:47:32 +08:00
logSet . locality = req . locality ;
2018-04-29 09:04:57 +08:00
logSet . updateLocalitySet ( req . tLogLocalities ) ;
2018-05-01 01:58:41 +08:00
for ( int i = 0 ; i < req . tLogLocalities . size ( ) ; i + + ) {
Tag tag ( tagLocalityRemoteLog , i ) ;
auto tagData = getTagData ( tag ) ;
if ( ! tagData ) {
tagData = createTagData ( tag , 0 , 0 ) ;
}
}
2019-05-03 07:43:20 +08:00
specialCounter ( cc , " Version " , [ this ] ( ) { return this - > version . get ( ) ; } ) ;
specialCounter ( cc , " MinPopped " , [ this ] ( ) { return this - > minPopped . get ( ) ; } ) ;
specialCounter ( cc , " MinKnownCommittedVersion " , [ this ] ( ) { return this - > minKnownCommittedVersion ; } ) ;
specialCounter ( cc , " PoppedVersion " , [ this ] ( ) { return this - > poppedVersion ; } ) ;
logger = traceCounters ( " LogRouterMetrics " , dbgid , SERVER_KNOBS - > WORKER_LOGGING_INTERVAL , & cc , " LogRouterMetrics " ) ;
2018-04-29 09:04:57 +08:00
}
2017-06-30 06:50:19 +08:00
} ;
2018-03-17 02:40:21 +08:00
void commitMessages ( LogRouterData * self , Version version , const std : : vector < TagsAndMessage > & taggedMessages ) {
if ( ! taggedMessages . size ( ) ) {
2017-06-30 06:50:19 +08:00
return ;
}
2018-03-17 02:40:21 +08:00
int msgSize = 0 ;
for ( auto & i : taggedMessages ) {
msgSize + = i . message . size ( ) ;
}
2017-06-30 06:50:19 +08:00
// Grab the last block in the blocks list so we can share its arena
// We pop all of the elements of it to create a "fresh" vector that starts at the end of the previous vector
Standalone < VectorRef < uint8_t > > block ;
if ( self - > messageBlocks . empty ( ) ) {
block = Standalone < VectorRef < uint8_t > > ( ) ;
2018-03-17 02:40:21 +08:00
block . reserve ( block . arena ( ) , std : : max < int64_t > ( SERVER_KNOBS - > TLOG_MESSAGE_BLOCK_BYTES , msgSize ) ) ;
2017-06-30 06:50:19 +08:00
}
else {
block = self - > messageBlocks . back ( ) . second ;
}
block . pop_front ( block . size ( ) ) ;
2018-03-17 02:40:21 +08:00
for ( auto & msg : taggedMessages ) {
if ( msg . message . size ( ) > block . capacity ( ) - block . size ( ) ) {
2019-04-17 13:34:56 +08:00
self - > messageBlocks . emplace_back ( version , block ) ;
2018-03-17 02:40:21 +08:00
block = Standalone < VectorRef < uint8_t > > ( ) ;
block . reserve ( block . arena ( ) , std : : max < int64_t > ( SERVER_KNOBS - > TLOG_MESSAGE_BLOCK_BYTES , msgSize ) ) ;
2017-06-30 06:50:19 +08:00
}
2018-03-17 02:40:21 +08:00
block . append ( block . arena ( ) , msg . message . begin ( ) , msg . message . size ( ) ) ;
for ( auto & tag : msg . tags ) {
2018-03-18 02:08:37 +08:00
auto tagData = self - > getTagData ( tag ) ;
if ( ! tagData ) {
2018-04-19 03:07:29 +08:00
tagData = self - > createTagData ( tag , 0 , 0 ) ;
2018-03-17 02:40:21 +08:00
}
2017-06-30 06:50:19 +08:00
2018-03-18 02:08:37 +08:00
if ( version > = tagData - > popped ) {
2019-04-17 13:34:56 +08:00
tagData - > version_messages . emplace_back ( version , LengthPrefixedStringRef ( ( uint32_t * ) ( block . end ( ) - msg . message . size ( ) ) ) ) ;
2018-03-18 02:08:37 +08:00
if ( tagData - > version_messages . back ( ) . second . expectedSize ( ) > SERVER_KNOBS - > MAX_MESSAGE_SIZE ) {
TraceEvent ( SevWarnAlways , " LargeMessage " ) . detail ( " Size " , tagData - > version_messages . back ( ) . second . expectedSize ( ) ) ;
2017-06-30 06:50:19 +08:00
}
}
}
2018-06-22 06:29:46 +08:00
2018-03-17 02:40:21 +08:00
msgSize - = msg . message . size ( ) ;
2017-06-30 06:50:19 +08:00
}
2019-04-17 13:34:56 +08:00
self - > messageBlocks . emplace_back ( version , block ) ;
2017-06-30 06:50:19 +08:00
}
2019-02-16 06:33:01 +08:00
ACTOR Future < Void > waitForVersion ( LogRouterData * self , Version ver ) {
// The only time the log router should allow a gap in versions larger than MAX_READ_TRANSACTION_LIFE_VERSIONS is when processing epoch end.
// Since one set of log routers is created per generation of transaction logs, the gap caused by epoch end will be within MAX_VERSIONS_IN_FLIGHT of the log routers start version.
2019-02-19 06:58:34 +08:00
if ( self - > version . get ( ) < self - > startVersion ) {
if ( ver > self - > startVersion ) {
self - > version . set ( self - > startVersion ) ;
wait ( self - > minPopped . whenAtLeast ( self - > version . get ( ) ) ) ;
}
return Void ( ) ;
}
2019-02-16 06:33:01 +08:00
if ( ! self - > foundEpochEnd ) {
wait ( self - > minPopped . whenAtLeast ( std : : min ( self - > version . get ( ) , ver - SERVER_KNOBS - > MAX_READ_TRANSACTION_LIFE_VERSIONS ) ) ) ;
} else {
while ( self - > minPopped . get ( ) + SERVER_KNOBS - > MAX_READ_TRANSACTION_LIFE_VERSIONS < ver ) {
if ( self - > minPopped . get ( ) + SERVER_KNOBS - > MAX_READ_TRANSACTION_LIFE_VERSIONS > self - > version . get ( ) ) {
self - > version . set ( self - > minPopped . get ( ) + SERVER_KNOBS - > MAX_READ_TRANSACTION_LIFE_VERSIONS ) ;
2019-06-25 17:47:35 +08:00
wait ( yield ( TaskPriority : : TLogCommit ) ) ;
2019-02-16 06:33:01 +08:00
} else {
wait ( self - > minPopped . whenAtLeast ( ( self - > minPopped . get ( ) + 1 ) ) ) ;
}
}
}
if ( ver > = self - > startVersion + SERVER_KNOBS - > MAX_VERSIONS_IN_FLIGHT ) {
self - > foundEpochEnd = true ;
}
return Void ( ) ;
}
2018-04-29 09:04:57 +08:00
ACTOR Future < Void > pullAsyncData ( LogRouterData * self ) {
2017-06-30 06:50:19 +08:00
state Future < Void > dbInfoChange = Void ( ) ;
state Reference < ILogSystem : : IPeekCursor > r ;
2018-04-18 02:16:48 +08:00
state Version tagAt = self - > version . get ( ) + 1 ;
2017-06-30 06:50:19 +08:00
state Version lastVer = 0 ;
loop {
2019-07-24 02:45:04 +08:00
loop choose {
when ( wait ( r ? r - > getMore ( TaskPriority : : TLogCommit ) : Never ( ) ) ) { break ; }
when ( wait ( dbInfoChange ) ) { // FIXME: does this actually happen?
if ( self - > logSystem - > get ( ) ) {
r = self - > logSystem - > get ( ) - > peekLogRouter ( self - > dbgid , tagAt , self - > routerTag ) ;
} else {
r = Reference < ILogSystem : : IPeekCursor > ( ) ;
2017-06-30 06:50:19 +08:00
}
2019-07-24 02:45:04 +08:00
dbInfoChange = self - > logSystem - > onChange ( ) ;
2017-06-30 06:50:19 +08:00
}
}
2018-06-22 06:29:46 +08:00
self - > minKnownCommittedVersion = std : : max ( self - > minKnownCommittedVersion , r - > getMinKnownCommittedVersion ( ) ) ;
2018-03-30 06:12:38 +08:00
state Version ver = 0 ;
state std : : vector < TagsAndMessage > messages ;
2019-11-06 10:07:30 +08:00
state Arena arena ;
2017-06-30 06:50:19 +08:00
while ( true ) {
2018-03-30 06:12:38 +08:00
state bool foundMessage = r - > hasMessage ( ) ;
2017-06-30 06:50:19 +08:00
if ( ! foundMessage | | r - > version ( ) . version ! = ver ) {
ASSERT ( r - > version ( ) . version > lastVer ) ;
if ( ver ) {
2019-02-16 06:33:01 +08:00
wait ( waitForVersion ( self , ver ) ) ;
2018-03-17 02:40:21 +08:00
commitMessages ( self , ver , messages ) ;
2017-06-30 06:50:19 +08:00
self - > version . set ( ver ) ;
2019-06-25 17:47:35 +08:00
wait ( yield ( TaskPriority : : TLogCommit ) ) ;
2018-06-09 02:11:08 +08:00
//TraceEvent("LogRouterVersion").detail("Ver",ver);
2017-06-30 06:50:19 +08:00
}
lastVer = ver ;
ver = r - > version ( ) . version ;
2018-03-17 02:40:21 +08:00
messages . clear ( ) ;
2019-11-06 10:07:30 +08:00
arena = Arena ( ) ;
2017-06-30 06:50:19 +08:00
if ( ! foundMessage ) {
ver - - ; //ver is the next possible version we will get data for
if ( ver > self - > version . get ( ) ) {
2019-02-16 06:33:01 +08:00
wait ( waitForVersion ( self , ver ) ) ;
2017-06-30 06:50:19 +08:00
self - > version . set ( ver ) ;
2019-06-25 17:47:35 +08:00
wait ( yield ( TaskPriority : : TLogCommit ) ) ;
2017-06-30 06:50:19 +08:00
}
break ;
}
}
2018-03-17 02:40:21 +08:00
TagsAndMessage tagAndMsg ;
tagAndMsg . message = r - > getMessageWithTags ( ) ;
2019-07-24 02:45:04 +08:00
std : : vector < int > tags ;
2018-04-29 09:04:57 +08:00
self - > logSet . getPushLocations ( r - > getTags ( ) , tags , 0 ) ;
2019-11-06 10:07:30 +08:00
tagAndMsg . tags . reserve ( arena , tags . size ( ) ) ;
2019-04-17 13:34:56 +08:00
for ( const auto & t : tags ) {
2019-11-06 10:07:30 +08:00
tagAndMsg . tags . push_back ( arena , Tag ( tagLocalityRemoteLog , t ) ) ;
2017-06-30 06:50:19 +08:00
}
2018-03-17 02:40:21 +08:00
messages . push_back ( std : : move ( tagAndMsg ) ) ;
2017-06-30 06:50:19 +08:00
r - > nextMessage ( ) ;
}
2018-04-20 05:33:31 +08:00
tagAt = std : : max ( r - > version ( ) . version , self - > version . get ( ) + 1 ) ;
2017-06-30 06:50:19 +08:00
}
}
std : : deque < std : : pair < Version , LengthPrefixedStringRef > > & get_version_messages ( LogRouterData * self , Tag tag ) {
2018-03-18 02:08:37 +08:00
auto tagData = self - > getTagData ( tag ) ;
if ( ! tagData ) {
2017-06-30 06:50:19 +08:00
static std : : deque < std : : pair < Version , LengthPrefixedStringRef > > empty ;
return empty ;
}
2018-03-18 02:08:37 +08:00
return tagData - > version_messages ;
2017-06-30 06:50:19 +08:00
} ;
void peekMessagesFromMemory ( LogRouterData * self , TLogPeekRequest const & req , BinaryWriter & messages , Version & endVersion ) {
ASSERT ( ! messages . getLength ( ) ) ;
auto & deque = get_version_messages ( self , req . tag ) ;
2019-03-19 06:03:43 +08:00
//TraceEvent("TLogPeekMem", self->dbgid).detail("Tag", req.tag1).detail("PDS", self->persistentDataSequence).detail("PDDS", self->persistentDataDurableSequence).detail("Oldest", map1.empty() ? 0 : map1.begin()->key ).detail("OldestMsgCount", map1.empty() ? 0 : map1.begin()->value.size());
2017-06-30 06:50:19 +08:00
auto it = std : : lower_bound ( deque . begin ( ) , deque . end ( ) , std : : make_pair ( req . begin , LengthPrefixedStringRef ( ) ) , CompareFirst < std : : pair < Version , LengthPrefixedStringRef > > ( ) ) ;
Version currentVersion = - 1 ;
for ( ; it ! = deque . end ( ) ; + + it ) {
if ( it - > first ! = currentVersion ) {
if ( messages . getLength ( ) > = SERVER_KNOBS - > DESIRED_TOTAL_BYTES ) {
2018-04-19 07:06:44 +08:00
endVersion = currentVersion + 1 ;
2018-06-09 02:11:08 +08:00
//TraceEvent("TLogPeekMessagesReached2", self->dbgid);
2017-06-30 06:50:19 +08:00
break ;
}
currentVersion = it - > first ;
2019-09-06 02:30:02 +08:00
messages < < VERSION_HEADER < < currentVersion ;
2017-06-30 06:50:19 +08:00
}
messages < < it - > second . toStringRef ( ) ;
}
}
Version poppedVersion ( LogRouterData * self , Tag tag ) {
2018-03-18 02:08:37 +08:00
auto tagData = self - > getTagData ( tag ) ;
if ( ! tagData )
2017-06-30 06:50:19 +08:00
return Version ( 0 ) ;
2018-03-18 02:08:37 +08:00
return tagData - > popped ;
2017-06-30 06:50:19 +08:00
}
ACTOR Future < Void > logRouterPeekMessages ( LogRouterData * self , TLogPeekRequest req ) {
state BinaryWriter messages ( Unversioned ( ) ) ;
2019-09-13 05:26:37 +08:00
state int sequence = - 1 ;
state UID peekId ;
if ( req . sequence . present ( ) ) {
try {
peekId = req . sequence . get ( ) . first ;
sequence = req . sequence . get ( ) . second ;
2019-11-02 05:02:44 +08:00
if ( sequence > = SERVER_KNOBS - > PARALLEL_GET_MORE_REQUESTS & & self - > peekTracker . find ( peekId ) = = self - > peekTracker . end ( ) ) {
throw timed_out ( ) ;
}
2019-09-13 05:26:37 +08:00
auto & trackerData = self - > peekTracker [ peekId ] ;
if ( sequence = = 0 & & trackerData . sequence_version . find ( 0 ) = = trackerData . sequence_version . end ( ) ) {
trackerData . sequence_version [ 0 ] . send ( std : : make_pair ( req . begin , req . onlySpilled ) ) ;
}
auto seqBegin = trackerData . sequence_version . begin ( ) ;
// The peek cursor and this comparison need to agree about the maximum number of in-flight requests.
while ( trackerData . sequence_version . size ( ) & & seqBegin - > first < = sequence - SERVER_KNOBS - > PARALLEL_GET_MORE_REQUESTS ) {
if ( seqBegin - > second . canBeSet ( ) ) {
seqBegin - > second . sendError ( timed_out ( ) ) ;
}
trackerData . sequence_version . erase ( seqBegin ) ;
seqBegin = trackerData . sequence_version . begin ( ) ;
}
if ( trackerData . sequence_version . size ( ) & & sequence < seqBegin - > first ) {
throw timed_out ( ) ;
}
trackerData . lastUpdate = now ( ) ;
std : : pair < Version , bool > prevPeekData = wait ( trackerData . sequence_version [ sequence ] . getFuture ( ) ) ;
req . begin = prevPeekData . first ;
req . onlySpilled = prevPeekData . second ;
wait ( yield ( ) ) ;
} catch ( Error & e ) {
if ( e . code ( ) = = error_code_timed_out ) {
req . reply . sendError ( timed_out ( ) ) ;
return Void ( ) ;
} else {
throw ;
}
}
}
2017-06-30 06:50:19 +08:00
2018-10-31 04:44:37 +08:00
//TraceEvent("LogRouterPeek1", self->dbgid).detail("From", req.reply.getEndpoint().getPrimaryAddress()).detail("Ver", self->version.get()).detail("Begin", req.begin);
2017-06-30 06:50:19 +08:00
if ( req . returnIfBlocked & & self - > version . get ( ) < req . begin ) {
//TraceEvent("LogRouterPeek2", self->dbgid);
req . reply . sendError ( end_of_stream ( ) ) ;
2019-09-13 05:26:37 +08:00
if ( req . sequence . present ( ) ) {
auto & trackerData = self - > peekTracker [ peekId ] ;
auto & sequenceData = trackerData . sequence_version [ sequence + 1 ] ;
if ( ! sequenceData . isSet ( ) ) {
sequenceData . send ( std : : make_pair ( req . begin , req . onlySpilled ) ) ;
}
}
2017-06-30 06:50:19 +08:00
return Void ( ) ;
}
if ( self - > version . get ( ) < req . begin ) {
2018-08-11 04:57:10 +08:00
wait ( self - > version . whenAtLeast ( req . begin ) ) ;
wait ( delay ( SERVER_KNOBS - > TLOG_PEEK_DELAY , g_network - > getCurrentTask ( ) ) ) ;
2017-06-30 06:50:19 +08:00
}
Version poppedVer = poppedVersion ( self , req . tag ) ;
2018-04-13 07:15:17 +08:00
if ( poppedVer > req . begin | | req . begin < self - > startVersion ) {
2018-04-09 12:24:05 +08:00
//This should only happen if a packet is sent multiple times and the reply is not needed.
// Since we are using popped differently, do not send a reply.
2018-06-09 02:11:08 +08:00
TraceEvent ( SevWarnAlways , " LogRouterPeekPopped " , self - > dbgid ) . detail ( " Begin " , req . begin ) . detail ( " Popped " , poppedVer ) . detail ( " Start " , self - > startVersion ) ;
2018-04-09 12:24:05 +08:00
req . reply . send ( Never ( ) ) ;
2019-09-13 05:26:37 +08:00
if ( req . sequence . present ( ) ) {
auto & trackerData = self - > peekTracker [ peekId ] ;
auto & sequenceData = trackerData . sequence_version [ sequence + 1 ] ;
if ( ! sequenceData . isSet ( ) ) {
sequenceData . send ( std : : make_pair ( req . begin , req . onlySpilled ) ) ;
}
}
2017-06-30 06:50:19 +08:00
return Void ( ) ;
}
Version endVersion = self - > version . get ( ) + 1 ;
peekMessagesFromMemory ( self , req , messages , endVersion ) ;
TLogPeekReply reply ;
reply . maxKnownVersion = self - > version . get ( ) ;
2019-02-19 08:47:38 +08:00
reply . minKnownCommittedVersion = self - > poppedVersion ;
2019-03-29 02:52:50 +08:00
reply . messages = messages . toValue ( ) ;
2018-05-07 00:32:41 +08:00
reply . popped = self - > minPopped . get ( ) > = self - > startVersion ? self - > minPopped . get ( ) : 0 ;
2017-06-30 06:50:19 +08:00
reply . end = endVersion ;
2019-05-15 08:07:49 +08:00
reply . onlySpilled = false ;
2017-06-30 06:50:19 +08:00
2019-09-13 05:26:37 +08:00
if ( req . sequence . present ( ) ) {
auto & trackerData = self - > peekTracker [ peekId ] ;
trackerData . lastUpdate = now ( ) ;
auto & sequenceData = trackerData . sequence_version [ sequence + 1 ] ;
if ( trackerData . sequence_version . size ( ) & & sequence + 1 < trackerData . sequence_version . begin ( ) - > first ) {
req . reply . sendError ( timed_out ( ) ) ;
if ( ! sequenceData . isSet ( ) )
sequenceData . sendError ( timed_out ( ) ) ;
return Void ( ) ;
}
if ( sequenceData . isSet ( ) ) {
if ( sequenceData . getFuture ( ) . get ( ) . first ! = reply . end ) {
TEST ( true ) ; //tlog peek second attempt ended at a different version
req . reply . sendError ( timed_out ( ) ) ;
return Void ( ) ;
}
} else {
sequenceData . send ( std : : make_pair ( reply . end , reply . onlySpilled ) ) ;
}
reply . begin = req . begin ;
}
2017-06-30 06:50:19 +08:00
req . reply . send ( reply ) ;
//TraceEvent("LogRouterPeek4", self->dbgid);
return Void ( ) ;
}
2019-09-13 07:27:39 +08:00
ACTOR Future < Void > cleanupPeekTrackers ( LogRouterData * self ) {
loop {
double minTimeUntilExpiration = SERVER_KNOBS - > PEEK_TRACKER_EXPIRATION_TIME ;
auto it = self - > peekTracker . begin ( ) ;
while ( it ! = self - > peekTracker . end ( ) ) {
double timeUntilExpiration = it - > second . lastUpdate + SERVER_KNOBS - > PEEK_TRACKER_EXPIRATION_TIME - now ( ) ;
if ( timeUntilExpiration < 1.0e-6 ) {
for ( auto seq : it - > second . sequence_version ) {
if ( ! seq . second . isSet ( ) ) {
seq . second . sendError ( timed_out ( ) ) ;
}
}
it = self - > peekTracker . erase ( it ) ;
} else {
minTimeUntilExpiration = std : : min ( minTimeUntilExpiration , timeUntilExpiration ) ;
+ + it ;
}
}
wait ( delay ( minTimeUntilExpiration ) ) ;
}
}
2017-06-30 06:50:19 +08:00
ACTOR Future < Void > logRouterPop ( LogRouterData * self , TLogPopRequest req ) {
2018-03-18 02:08:37 +08:00
auto tagData = self - > getTagData ( req . tag ) ;
if ( ! tagData ) {
2018-06-27 09:20:28 +08:00
tagData = self - > createTagData ( req . tag , req . to , req . durableKnownCommittedVersion ) ;
2018-03-18 02:08:37 +08:00
} else if ( req . to > tagData - > popped ) {
tagData - > popped = req . to ;
2018-06-27 09:20:28 +08:00
tagData - > durableKnownCommittedVersion = req . durableKnownCommittedVersion ;
2019-06-25 17:47:35 +08:00
wait ( tagData - > eraseMessagesBefore ( req . to , self , TaskPriority : : TLogPop ) ) ;
2017-06-30 06:50:19 +08:00
}
state Version minPopped = std : : numeric_limits < Version > : : max ( ) ;
2018-04-19 03:07:29 +08:00
state Version minKnownCommittedVersion = std : : numeric_limits < Version > : : max ( ) ;
2018-03-18 02:08:37 +08:00
for ( auto it : self - > tag_data ) {
if ( it ) {
minPopped = std : : min ( it - > popped , minPopped ) ;
2018-06-27 09:20:28 +08:00
minKnownCommittedVersion = std : : min ( it - > durableKnownCommittedVersion , minKnownCommittedVersion ) ;
2018-03-18 02:08:37 +08:00
}
2017-06-30 06:50:19 +08:00
}
2018-06-06 04:42:48 +08:00
while ( ! self - > messageBlocks . empty ( ) & & self - > messageBlocks . front ( ) . first < minPopped ) {
2017-06-30 06:50:19 +08:00
self - > messageBlocks . pop_front ( ) ;
2019-06-25 17:47:35 +08:00
wait ( yield ( TaskPriority : : TLogPop ) ) ;
2017-06-30 06:50:19 +08:00
}
2019-02-19 08:47:38 +08:00
self - > poppedVersion = std : : min ( minKnownCommittedVersion , self - > minKnownCommittedVersion ) ;
2018-04-18 06:03:22 +08:00
if ( self - > logSystem - > get ( ) & & self - > allowPops ) {
2019-04-24 06:39:26 +08:00
const Tag popTag = self - > logSystem - > get ( ) - > getPseudoPopTag ( self - > routerTag , ProcessClass : : LogRouterClass ) ;
self - > logSystem - > get ( ) - > pop ( self - > poppedVersion , popTag ) ;
2017-07-16 06:15:03 +08:00
}
2017-06-30 06:50:19 +08:00
req . reply . send ( Void ( ) ) ;
2018-04-09 12:24:05 +08:00
self - > minPopped . set ( std : : max ( minPopped , self - > minPopped . get ( ) ) ) ;
2017-06-30 06:50:19 +08:00
return Void ( ) ;
}
ACTOR Future < Void > logRouterCore (
TLogInterface interf ,
2018-04-29 09:04:57 +08:00
InitializeLogRouterRequest req ,
2017-06-30 06:50:19 +08:00
Reference < AsyncVar < ServerDBInfo > > db )
{
2018-04-29 09:04:57 +08:00
state LogRouterData logRouterData ( interf . id ( ) , req ) ;
2017-06-30 06:50:19 +08:00
state PromiseStream < Future < Void > > addActor ;
state Future < Void > error = actorCollection ( addActor . getFuture ( ) ) ;
state Future < Void > dbInfoChange = Void ( ) ;
2018-04-29 09:04:57 +08:00
addActor . send ( pullAsyncData ( & logRouterData ) ) ;
2019-09-13 07:27:39 +08:00
addActor . send ( cleanupPeekTrackers ( & logRouterData ) ) ;
2017-06-30 06:50:19 +08:00
loop choose {
2018-08-11 04:57:10 +08:00
when ( wait ( dbInfoChange ) ) {
2017-06-30 06:50:19 +08:00
dbInfoChange = db - > onChange ( ) ;
2018-07-05 15:08:51 +08:00
logRouterData . allowPops = db - > get ( ) . recoveryState = = RecoveryState : : FULLY_RECOVERED ;
2018-06-02 09:42:48 +08:00
logRouterData . logSystem - > set ( ILogSystem : : fromServerDBInfo ( logRouterData . dbgid , db - > get ( ) , true ) ) ;
2017-06-30 06:50:19 +08:00
}
when ( TLogPeekRequest req = waitNext ( interf . peekMessages . getFuture ( ) ) ) {
addActor . send ( logRouterPeekMessages ( & logRouterData , req ) ) ;
}
when ( TLogPopRequest req = waitNext ( interf . popMessages . getFuture ( ) ) ) {
addActor . send ( logRouterPop ( & logRouterData , req ) ) ;
}
2018-08-11 04:57:10 +08:00
when ( wait ( error ) ) { }
2017-06-30 06:50:19 +08:00
}
}
2018-04-09 12:24:05 +08:00
ACTOR Future < Void > checkRemoved ( Reference < AsyncVar < ServerDBInfo > > db , uint64_t recoveryCount , TLogInterface myInterface ) {
2019-06-01 01:20:57 +08:00
loop {
bool isDisplaced =
( ( db - > get ( ) . recoveryCount > recoveryCount & & db - > get ( ) . recoveryState ! = RecoveryState : : UNINITIALIZED ) | |
( db - > get ( ) . recoveryCount = = recoveryCount & & db - > get ( ) . recoveryState = = RecoveryState : : FULLY_RECOVERED ) ) ;
isDisplaced = isDisplaced & & ! db - > get ( ) . logSystemConfig . hasLogRouter ( myInterface . id ( ) ) ;
2018-03-30 06:12:38 +08:00
if ( isDisplaced ) {
2017-07-10 05:46:16 +08:00
throw worker_removed ( ) ;
}
2018-08-11 04:57:10 +08:00
wait ( db - > onChange ( ) ) ;
2017-06-30 06:50:19 +08:00
}
}
ACTOR Future < Void > logRouter (
TLogInterface interf ,
InitializeLogRouterRequest req ,
Reference < AsyncVar < ServerDBInfo > > db )
{
try {
2018-06-16 03:36:19 +08:00
TraceEvent ( " LogRouterStart " , interf . id ( ) ) . detail ( " Start " , req . startVersion ) . detail ( " Tag " , req . routerTag . toString ( ) ) . detail ( " Localities " , req . tLogLocalities . size ( ) ) . detail ( " Locality " , req . locality ) ;
2018-04-29 09:04:57 +08:00
state Future < Void > core = logRouterCore ( interf , req , db ) ;
2017-06-30 06:50:19 +08:00
loop choose {
2018-08-11 04:57:10 +08:00
when ( wait ( core ) ) { return Void ( ) ; }
when ( wait ( checkRemoved ( db , req . recoveryCount , interf ) ) ) { }
2017-06-30 06:50:19 +08:00
}
}
catch ( Error & e ) {
if ( e . code ( ) = = error_code_actor_cancelled | | e . code ( ) = = error_code_worker_removed )
{
TraceEvent ( " LogRouterTerminated " , interf . id ( ) ) . error ( e , true ) ;
return Void ( ) ;
}
throw ;
}
}