2017-05-26 04:48:44 +08:00
/*
* LogSystemPeekCursor . actor . cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013 - 2018 Apple Inc . and the FoundationDB project authors
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* http : //www.apache.org/licenses/LICENSE-2.0
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
2018-10-20 01:30:13 +08:00
# include "fdbserver/LogSystem.h"
2017-05-26 04:48:44 +08:00
# include "fdbrpc/FailureMonitor.h"
2018-10-20 01:30:13 +08:00
# include "fdbserver/Knobs.h"
2017-11-16 13:05:10 +08:00
# include "fdbrpc/ReplicationUtils.h"
2019-02-18 10:46:59 +08:00
# include "flow/actorcompiler.h" // has to be last include
2017-05-26 04:48:44 +08:00
ILogSystem : : ServerPeekCursor : : ServerPeekCursor ( Reference < AsyncVar < OptionalInterface < TLogInterface > > > const & interf , Tag tag , Version begin , Version end , bool returnIfBlocked , bool parallelGetMore )
2019-06-19 08:33:52 +08:00
: interf ( interf ) , tag ( tag ) , messageVersion ( begin ) , end ( end ) , hasMsg ( false ) , rd ( results . arena , results . messages , Unversioned ( ) ) , randomID ( deterministicRandom ( ) - > randomUniqueID ( ) ) , poppedVersion ( 0 ) , returnIfBlocked ( returnIfBlocked ) , sequence ( 0 ) , onlySpilled ( false ) , parallelGetMore ( parallelGetMore ) {
2017-05-26 04:48:44 +08:00
this - > results . maxKnownVersion = 0 ;
2018-06-22 06:29:46 +08:00
this - > results . minKnownCommittedVersion = 0 ;
2018-06-09 02:11:08 +08:00
//TraceEvent("SPC_Starting", randomID).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).backtrace();
2017-05-26 04:48:44 +08:00
}
2018-03-17 02:40:21 +08:00
ILogSystem : : ServerPeekCursor : : ServerPeekCursor ( TLogPeekReply const & results , LogMessageVersion const & messageVersion , LogMessageVersion const & end , int32_t messageLength , int32_t rawLength , bool hasMsg , Version poppedVersion , Tag tag )
2019-06-19 08:33:52 +08:00
: results ( results ) , tag ( tag ) , rd ( results . arena , results . messages , Unversioned ( ) ) , messageVersion ( messageVersion ) , end ( end ) , messageLength ( messageLength ) , rawLength ( rawLength ) , hasMsg ( hasMsg ) , randomID ( deterministicRandom ( ) - > randomUniqueID ( ) ) , poppedVersion ( poppedVersion ) , returnIfBlocked ( false ) , sequence ( 0 ) , onlySpilled ( false ) , parallelGetMore ( false )
2017-05-26 04:48:44 +08:00
{
2018-06-09 02:11:08 +08:00
//TraceEvent("SPC_Clone", randomID);
2017-05-26 04:48:44 +08:00
this - > results . maxKnownVersion = 0 ;
2018-06-22 06:29:46 +08:00
this - > results . minKnownCommittedVersion = 0 ;
2017-05-26 04:48:44 +08:00
if ( hasMsg )
nextMessage ( ) ;
advanceTo ( messageVersion ) ;
}
Reference < ILogSystem : : IPeekCursor > ILogSystem : : ServerPeekCursor : : cloneNoMore ( ) {
2018-03-17 02:40:21 +08:00
return Reference < ILogSystem : : ServerPeekCursor > ( new ILogSystem : : ServerPeekCursor ( results , messageVersion , end , messageLength , rawLength , hasMsg , poppedVersion , tag ) ) ;
2017-05-26 04:48:44 +08:00
}
2019-06-19 05:49:04 +08:00
void ILogSystem : : ServerPeekCursor : : setProtocolVersion ( ProtocolVersion version ) {
2017-05-26 04:48:44 +08:00
rd . setProtocolVersion ( version ) ;
}
Arena & ILogSystem : : ServerPeekCursor : : arena ( ) { return results . arena ; }
ArenaReader * ILogSystem : : ServerPeekCursor : : reader ( ) {
return & rd ;
}
bool ILogSystem : : ServerPeekCursor : : hasMessage ( ) {
2018-06-09 02:11:08 +08:00
//TraceEvent("SPC_HasMessage", randomID).detail("HasMsg", hasMsg);
2017-05-26 04:48:44 +08:00
return hasMsg ;
}
void ILogSystem : : ServerPeekCursor : : nextMessage ( ) {
2018-06-09 02:11:08 +08:00
//TraceEvent("SPC_NextMessage", randomID).detail("MessageVersion", messageVersion.toString());
2017-05-26 04:48:44 +08:00
ASSERT ( hasMsg ) ;
if ( rd . empty ( ) ) {
messageVersion . reset ( std : : min ( results . end , end . version ) ) ;
hasMsg = false ;
return ;
}
if ( * ( int32_t * ) rd . peekBytes ( 4 ) = = - 1 ) {
// A version
int32_t dummy ;
Version ver ;
rd > > dummy > > ver ;
2018-06-09 02:11:08 +08:00
//TraceEvent("SPC_ProcessSeq", randomID).detail("MessageVersion", messageVersion.toString()).detail("Ver", ver).detail("Tag", tag.toString());
2017-05-26 04:48:44 +08:00
//ASSERT( ver >= messageVersion.version );
messageVersion . reset ( ver ) ;
if ( messageVersion > = end ) {
messageVersion = end ;
hasMsg = false ;
return ;
}
ASSERT ( ! rd . empty ( ) ) ;
}
2017-06-30 06:50:19 +08:00
uint16_t tagCount ;
2018-03-17 02:40:21 +08:00
rd . checkpoint ( ) ;
2017-06-30 06:50:19 +08:00
rd > > messageLength > > messageVersion . sub > > tagCount ;
2019-11-06 10:07:30 +08:00
tags = VectorRef < Tag > ( ( Tag * ) rd . readBytes ( tagCount * sizeof ( Tag ) ) , tagCount ) ;
2018-03-17 02:40:21 +08:00
rawLength = messageLength + sizeof ( messageLength ) ;
messageLength - = ( sizeof ( messageVersion . sub ) + sizeof ( tagCount ) + tagCount * sizeof ( Tag ) ) ;
2017-05-26 04:48:44 +08:00
hasMsg = true ;
2018-06-09 02:11:08 +08:00
//TraceEvent("SPC_NextMessageB", randomID).detail("MessageVersion", messageVersion.toString());
2017-05-26 04:48:44 +08:00
}
StringRef ILogSystem : : ServerPeekCursor : : getMessage ( ) {
2018-06-09 02:11:08 +08:00
//TraceEvent("SPC_GetMessage", randomID);
2017-05-26 04:48:44 +08:00
return StringRef ( ( uint8_t const * ) rd . readBytes ( messageLength ) , messageLength ) ;
}
2018-03-17 02:40:21 +08:00
StringRef ILogSystem : : ServerPeekCursor : : getMessageWithTags ( ) {
rd . rewind ( ) ;
return StringRef ( ( uint8_t const * ) rd . readBytes ( rawLength ) , rawLength ) ;
}
2019-11-06 10:07:30 +08:00
VectorRef < Tag > ILogSystem : : ServerPeekCursor : : getTags ( ) {
2017-06-30 06:50:19 +08:00
return tags ;
}
2017-05-26 04:48:44 +08:00
void ILogSystem : : ServerPeekCursor : : advanceTo ( LogMessageVersion n ) {
2018-06-09 02:11:08 +08:00
//TraceEvent("SPC_AdvanceTo", randomID).detail("N", n.toString());
2017-05-26 04:48:44 +08:00
while ( messageVersion < n & & hasMessage ( ) ) {
getMessage ( ) ;
nextMessage ( ) ;
}
if ( hasMessage ( ) )
return ;
//if( more.isValid() && !more.isReady() ) more.cancel();
if ( messageVersion < n ) {
messageVersion = n ;
}
}
2019-06-25 17:47:35 +08:00
ACTOR Future < Void > serverPeekParallelGetMore ( ILogSystem : : ServerPeekCursor * self , TaskPriority taskID ) {
2017-05-26 04:48:44 +08:00
if ( ! self - > interf | | self - > messageVersion > = self - > end ) {
2019-09-13 07:22:28 +08:00
if ( self - > hasMessage ( ) )
return Void ( ) ;
2018-08-11 04:57:10 +08:00
wait ( Future < Void > ( Never ( ) ) ) ;
2017-05-26 04:48:44 +08:00
throw internal_error ( ) ;
}
2017-08-10 06:58:06 +08:00
if ( ! self - > interfaceChanged . isValid ( ) ) {
self - > interfaceChanged = self - > interf - > onChange ( ) ;
}
2017-05-26 04:48:44 +08:00
loop {
2018-06-26 02:15:49 +08:00
state Version expectedBegin = self - > messageVersion . version ;
2017-05-26 04:48:44 +08:00
try {
2019-06-19 16:30:49 +08:00
if ( self - > parallelGetMore | | self - > onlySpilled ) {
while ( self - > futureResults . size ( ) < SERVER_KNOBS - > PARALLEL_GET_MORE_REQUESTS & & self - > interf - > get ( ) . present ( ) ) {
self - > futureResults . push_back ( brokenPromiseToNever ( self - > interf - > get ( ) . interf ( ) . peekMessages . getReply ( TLogPeekRequest ( self - > messageVersion . version , self - > tag , self - > returnIfBlocked , self - > onlySpilled , std : : make_pair ( self - > randomID , self - > sequence + + ) ) , taskID ) ) ) ;
}
2019-10-23 08:04:57 +08:00
if ( self - > sequence = = std : : numeric_limits < decltype ( self - > sequence ) > : : max ( ) ) {
throw timed_out ( ) ;
}
2019-10-15 09:03:12 +08:00
} else if ( self - > futureResults . size ( ) = = 1 ) {
self - > randomID = deterministicRandom ( ) - > randomUniqueID ( ) ;
self - > sequence = 0 ;
2019-06-19 16:30:49 +08:00
} else if ( self - > futureResults . size ( ) = = 0 ) {
return Void ( ) ;
2017-05-26 04:48:44 +08:00
}
2019-09-13 07:22:28 +08:00
if ( self - > hasMessage ( ) )
return Void ( ) ;
2017-05-26 04:48:44 +08:00
choose {
when ( TLogPeekReply res = wait ( self - > interf - > get ( ) . present ( ) ? self - > futureResults . front ( ) : Never ( ) ) ) {
2018-06-26 02:15:49 +08:00
if ( res . begin . get ( ) ! = expectedBegin ) {
throw timed_out ( ) ;
}
expectedBegin = res . end ;
2017-05-26 04:48:44 +08:00
self - > futureResults . pop_front ( ) ;
self - > results = res ;
2019-05-15 08:07:49 +08:00
self - > onlySpilled = res . onlySpilled ;
2017-05-26 04:48:44 +08:00
if ( res . popped . present ( ) )
self - > poppedVersion = std : : min ( std : : max ( self - > poppedVersion , res . popped . get ( ) ) , self - > end . version ) ;
self - > rd = ArenaReader ( self - > results . arena , self - > results . messages , Unversioned ( ) ) ;
LogMessageVersion skipSeq = self - > messageVersion ;
self - > hasMsg = true ;
self - > nextMessage ( ) ;
self - > advanceTo ( skipSeq ) ;
2018-06-09 02:11:08 +08:00
//TraceEvent("SPC_GetMoreB", self->randomID).detail("Has", self->hasMessage()).detail("End", res.end).detail("Popped", res.popped.present() ? res.popped.get() : 0);
2017-05-26 04:48:44 +08:00
return Void ( ) ;
}
2018-08-11 04:57:10 +08:00
when ( wait ( self - > interfaceChanged ) ) {
2017-08-10 06:58:06 +08:00
self - > interfaceChanged = self - > interf - > onChange ( ) ;
2019-05-11 05:01:52 +08:00
self - > randomID = deterministicRandom ( ) - > randomUniqueID ( ) ;
2017-05-26 04:48:44 +08:00
self - > sequence = 0 ;
2019-05-15 08:07:49 +08:00
self - > onlySpilled = false ;
2017-05-26 04:48:44 +08:00
self - > futureResults . clear ( ) ;
}
}
} catch ( Error & e ) {
if ( e . code ( ) = = error_code_end_of_stream ) {
self - > end . reset ( self - > messageVersion . version ) ;
return Void ( ) ;
} else if ( e . code ( ) = = error_code_timed_out ) {
TraceEvent ( " PeekCursorTimedOut " , self - > randomID ) ;
2017-08-10 06:58:06 +08:00
self - > interfaceChanged = self - > interf - > onChange ( ) ;
2019-05-11 05:01:52 +08:00
self - > randomID = deterministicRandom ( ) - > randomUniqueID ( ) ;
2017-05-26 04:48:44 +08:00
self - > sequence = 0 ;
self - > futureResults . clear ( ) ;
} else {
throw e ;
}
}
}
}
2019-06-25 17:47:35 +08:00
ACTOR Future < Void > serverPeekGetMore ( ILogSystem : : ServerPeekCursor * self , TaskPriority taskID ) {
2017-05-26 04:48:44 +08:00
if ( ! self - > interf | | self - > messageVersion > = self - > end ) {
2018-08-11 04:57:10 +08:00
wait ( Future < Void > ( Never ( ) ) ) ;
2017-05-26 04:48:44 +08:00
throw internal_error ( ) ;
}
try {
loop {
choose {
when ( TLogPeekReply res = wait ( self - > interf - > get ( ) . present ( ) ?
2019-05-15 08:07:49 +08:00
brokenPromiseToNever ( self - > interf - > get ( ) . interf ( ) . peekMessages . getReply ( TLogPeekRequest ( self - > messageVersion . version , self - > tag , self - > returnIfBlocked , self - > onlySpilled ) , taskID ) ) : Never ( ) ) ) {
2017-05-26 04:48:44 +08:00
self - > results = res ;
2019-05-15 08:07:49 +08:00
self - > onlySpilled = res . onlySpilled ;
2017-05-26 04:48:44 +08:00
if ( res . popped . present ( ) )
self - > poppedVersion = std : : min ( std : : max ( self - > poppedVersion , res . popped . get ( ) ) , self - > end . version ) ;
self - > rd = ArenaReader ( self - > results . arena , self - > results . messages , Unversioned ( ) ) ;
LogMessageVersion skipSeq = self - > messageVersion ;
self - > hasMsg = true ;
self - > nextMessage ( ) ;
self - > advanceTo ( skipSeq ) ;
2018-06-09 02:11:08 +08:00
//TraceEvent("SPC_GetMoreB", self->randomID).detail("Has", self->hasMessage()).detail("End", res.end).detail("Popped", res.popped.present() ? res.popped.get() : 0);
2017-05-26 04:48:44 +08:00
return Void ( ) ;
}
2019-05-15 08:07:49 +08:00
when ( wait ( self - > interf - > onChange ( ) ) ) {
self - > onlySpilled = false ;
}
2017-05-26 04:48:44 +08:00
}
}
} catch ( Error & e ) {
if ( e . code ( ) = = error_code_end_of_stream ) {
self - > end . reset ( self - > messageVersion . version ) ;
return Void ( ) ;
}
throw e ;
}
}
2019-06-25 17:47:35 +08:00
Future < Void > ILogSystem : : ServerPeekCursor : : getMore ( TaskPriority taskID ) {
2018-06-09 02:11:08 +08:00
//TraceEvent("SPC_GetMore", randomID).detail("HasMessage", hasMessage()).detail("More", !more.isValid() || more.isReady()).detail("MessageVersion", messageVersion.toString()).detail("End", end.toString());
2019-09-13 07:22:28 +08:00
if ( hasMessage ( ) & & ! parallelGetMore )
2017-05-26 04:48:44 +08:00
return Void ( ) ;
if ( ! more . isValid ( ) | | more . isReady ( ) ) {
2019-06-19 16:30:49 +08:00
if ( parallelGetMore | | onlySpilled | | futureResults . size ( ) ) {
more = serverPeekParallelGetMore ( this , taskID ) ;
} else {
more = serverPeekGetMore ( this , taskID ) ;
}
2017-05-26 04:48:44 +08:00
}
return more ;
}
ACTOR Future < Void > serverPeekOnFailed ( ILogSystem : : ServerPeekCursor * self ) {
loop {
choose {
2019-07-10 07:38:59 +08:00
when ( wait ( self - > interf - > get ( ) . present ( ) ? IFailureMonitor : : failureMonitor ( ) . onStateEqual ( self - > interf - > get ( ) . interf ( ) . peekMessages . getEndpoint ( ) , FailureStatus ( ) ) : Never ( ) ) ) { return Void ( ) ; }
2018-08-11 04:57:10 +08:00
when ( wait ( self - > interf - > onChange ( ) ) ) { }
2017-05-26 04:48:44 +08:00
}
}
}
Future < Void > ILogSystem : : ServerPeekCursor : : onFailed ( ) {
return serverPeekOnFailed ( this ) ;
}
bool ILogSystem : : ServerPeekCursor : : isActive ( ) {
if ( ! interf - > get ( ) . present ( ) )
return false ;
if ( messageVersion > = end )
return false ;
return IFailureMonitor : : failureMonitor ( ) . getState ( interf - > get ( ) . interf ( ) . peekMessages . getEndpoint ( ) ) . isAvailable ( ) ;
}
2017-07-16 06:15:03 +08:00
bool ILogSystem : : ServerPeekCursor : : isExhausted ( ) {
return messageVersion > = end ;
}
2018-07-13 03:09:48 +08:00
const LogMessageVersion & ILogSystem : : ServerPeekCursor : : version ( ) { return messageVersion ; } // Call only after nextMessage(). The sequence of the current message, or results.end if nextMessage() has returned false.
2017-05-26 04:48:44 +08:00
2018-06-22 06:29:46 +08:00
Version ILogSystem : : ServerPeekCursor : : getMinKnownCommittedVersion ( ) { return results . minKnownCommittedVersion ; }
2017-05-26 04:48:44 +08:00
Version ILogSystem : : ServerPeekCursor : : popped ( ) { return poppedVersion ; }
2018-07-13 03:09:48 +08:00
ILogSystem : : MergedPeekCursor : : MergedPeekCursor ( vector < Reference < ILogSystem : : IPeekCursor > > const & serverCursors , Version begin )
2018-07-12 06:43:55 +08:00
: serverCursors ( serverCursors ) , bestServer ( - 1 ) , readQuorum ( serverCursors . size ( ) ) , tag ( invalidTag ) , currentCursor ( 0 ) , hasNextMessage ( false ) ,
2019-05-11 05:01:52 +08:00
messageVersion ( begin ) , randomID ( deterministicRandom ( ) - > randomUniqueID ( ) ) , tLogReplicationFactor ( 0 ) {
2018-03-30 06:12:38 +08:00
sortedVersions . resize ( serverCursors . size ( ) ) ;
}
2018-03-30 08:54:08 +08:00
ILogSystem : : MergedPeekCursor : : MergedPeekCursor ( std : : vector < Reference < AsyncVar < OptionalInterface < TLogInterface > > > > const & logServers , int bestServer , int readQuorum , Tag tag , Version begin , Version end ,
2019-03-14 04:14:39 +08:00
bool parallelGetMore , std : : vector < LocalityData > const & tLogLocalities , Reference < IReplicationPolicy > const tLogPolicy , int tLogReplicationFactor )
2019-05-11 05:01:52 +08:00
: bestServer ( bestServer ) , readQuorum ( readQuorum ) , tag ( tag ) , currentCursor ( 0 ) , hasNextMessage ( false ) , messageVersion ( begin ) , randomID ( deterministicRandom ( ) - > randomUniqueID ( ) ) , tLogReplicationFactor ( tLogReplicationFactor ) {
2018-07-12 06:43:55 +08:00
if ( tLogPolicy ) {
logSet = Reference < LogSet > ( new LogSet ( ) ) ;
logSet - > tLogPolicy = tLogPolicy ;
logSet - > tLogLocalities = tLogLocalities ;
filterLocalityDataForPolicy ( logSet - > tLogPolicy , & logSet - > tLogLocalities ) ;
logSet - > updateLocalitySet ( logSet - > tLogLocalities ) ;
}
2017-05-26 04:48:44 +08:00
for ( int i = 0 ; i < logServers . size ( ) ; i + + ) {
2017-06-30 06:50:19 +08:00
Reference < ILogSystem : : ServerPeekCursor > cursor ( new ILogSystem : : ServerPeekCursor ( logServers [ i ] , tag , begin , end , bestServer > = 0 , parallelGetMore ) ) ;
2018-06-09 02:11:08 +08:00
//TraceEvent("MPC_Starting", randomID).detail("Cursor", cursor->randomID).detail("End", end);
2017-05-26 04:48:44 +08:00
serverCursors . push_back ( cursor ) ;
}
sortedVersions . resize ( serverCursors . size ( ) ) ;
}
2018-07-12 06:43:55 +08:00
ILogSystem : : MergedPeekCursor : : MergedPeekCursor ( vector < Reference < ILogSystem : : IPeekCursor > > const & serverCursors , LogMessageVersion const & messageVersion , int bestServer , int readQuorum , Optional < LogMessageVersion > nextVersion , Reference < LogSet > logSet , int tLogReplicationFactor )
: serverCursors ( serverCursors ) , bestServer ( bestServer ) , readQuorum ( readQuorum ) , currentCursor ( 0 ) , hasNextMessage ( false ) , messageVersion ( messageVersion ) , nextVersion ( nextVersion ) , logSet ( logSet ) ,
2019-05-11 05:01:52 +08:00
randomID ( deterministicRandom ( ) - > randomUniqueID ( ) ) , tLogReplicationFactor ( tLogReplicationFactor ) {
2018-03-30 06:12:38 +08:00
sortedVersions . resize ( serverCursors . size ( ) ) ;
2017-05-26 04:48:44 +08:00
calcHasMessage ( ) ;
}
Reference < ILogSystem : : IPeekCursor > ILogSystem : : MergedPeekCursor : : cloneNoMore ( ) {
vector < Reference < ILogSystem : : IPeekCursor > > cursors ;
for ( auto it : serverCursors ) {
cursors . push_back ( it - > cloneNoMore ( ) ) ;
}
2018-07-12 06:43:55 +08:00
return Reference < ILogSystem : : MergedPeekCursor > ( new ILogSystem : : MergedPeekCursor ( cursors , messageVersion , bestServer , readQuorum , nextVersion , logSet , tLogReplicationFactor ) ) ;
2017-05-26 04:48:44 +08:00
}
2019-06-19 05:49:04 +08:00
void ILogSystem : : MergedPeekCursor : : setProtocolVersion ( ProtocolVersion version ) {
2017-05-26 04:48:44 +08:00
for ( auto it : serverCursors )
if ( it - > hasMessage ( ) )
it - > setProtocolVersion ( version ) ;
}
Arena & ILogSystem : : MergedPeekCursor : : arena ( ) { return serverCursors [ currentCursor ] - > arena ( ) ; }
ArenaReader * ILogSystem : : MergedPeekCursor : : reader ( ) { return serverCursors [ currentCursor ] - > reader ( ) ; }
void ILogSystem : : MergedPeekCursor : : calcHasMessage ( ) {
2017-06-30 06:50:19 +08:00
if ( bestServer > = 0 ) {
if ( nextVersion . present ( ) ) serverCursors [ bestServer ] - > advanceTo ( nextVersion . get ( ) ) ;
if ( serverCursors [ bestServer ] - > hasMessage ( ) ) {
messageVersion = serverCursors [ bestServer ] - > version ( ) ;
currentCursor = bestServer ;
hasNextMessage = true ;
2017-05-26 04:48:44 +08:00
2017-06-30 06:50:19 +08:00
for ( auto & c : serverCursors )
c - > advanceTo ( messageVersion ) ;
2017-05-26 04:48:44 +08:00
2017-06-30 06:50:19 +08:00
return ;
}
2017-05-26 04:48:44 +08:00
2017-06-30 06:50:19 +08:00
auto bestVersion = serverCursors [ bestServer ] - > version ( ) ;
for ( auto & c : serverCursors )
c - > advanceTo ( bestVersion ) ;
}
2017-05-26 04:48:44 +08:00
hasNextMessage = false ;
2018-03-30 06:12:38 +08:00
updateMessage ( false ) ;
2018-07-12 06:43:55 +08:00
if ( ! hasNextMessage & & logSet ) {
2018-03-30 06:12:38 +08:00
updateMessage ( true ) ;
}
2017-05-26 04:48:44 +08:00
}
2018-03-30 06:12:38 +08:00
void ILogSystem : : MergedPeekCursor : : updateMessage ( bool usePolicy ) {
2017-05-26 04:48:44 +08:00
loop {
bool advancedPast = false ;
sortedVersions . clear ( ) ;
for ( int i = 0 ; i < serverCursors . size ( ) ; i + + ) {
auto & serverCursor = serverCursors [ i ] ;
if ( nextVersion . present ( ) ) serverCursor - > advanceTo ( nextVersion . get ( ) ) ;
sortedVersions . push_back ( std : : pair < LogMessageVersion , int > ( serverCursor - > version ( ) , i ) ) ;
}
2018-03-30 06:12:38 +08:00
if ( usePolicy ) {
2018-07-12 06:43:55 +08:00
ASSERT ( logSet - > tLogPolicy ) ;
2018-03-30 06:12:38 +08:00
std : : sort ( sortedVersions . begin ( ) , sortedVersions . end ( ) ) ;
2018-07-12 06:43:55 +08:00
locations . clear ( ) ;
2018-03-30 06:12:38 +08:00
for ( auto sortedVersion : sortedVersions ) {
2018-07-12 06:43:55 +08:00
locations . push_back ( logSet - > logEntryArray [ sortedVersion . second ] ) ;
if ( locations . size ( ) > = tLogReplicationFactor & & logSet - > satisfiesPolicy ( locations ) ) {
2018-03-30 06:12:38 +08:00
messageVersion = sortedVersion . first ;
break ;
}
}
} else {
std : : nth_element ( sortedVersions . begin ( ) , sortedVersions . end ( ) - readQuorum , sortedVersions . end ( ) ) ;
messageVersion = sortedVersions [ sortedVersions . size ( ) - readQuorum ] . first ;
}
2017-05-26 04:48:44 +08:00
for ( int i = 0 ; i < serverCursors . size ( ) ; i + + ) {
auto & c = serverCursors [ i ] ;
auto start = c - > version ( ) ;
c - > advanceTo ( messageVersion ) ;
2018-03-30 06:12:38 +08:00
if ( start < = messageVersion & & messageVersion < c - > version ( ) ) {
2017-05-26 04:48:44 +08:00
advancedPast = true ;
TEST ( true ) ; //Merge peek cursor advanced past desired sequence
}
}
if ( ! advancedPast )
break ;
}
for ( int i = 0 ; i < serverCursors . size ( ) ; i + + ) {
auto & c = serverCursors [ i ] ;
ASSERT_WE_THINK ( ! c - > hasMessage ( ) | | c - > version ( ) > = messageVersion ) ; // Seems like the loop above makes this unconditionally true
if ( c - > version ( ) = = messageVersion & & c - > hasMessage ( ) ) {
hasNextMessage = true ;
currentCursor = i ;
2018-07-13 03:09:48 +08:00
break ;
2017-05-26 04:48:44 +08:00
}
}
}
bool ILogSystem : : MergedPeekCursor : : hasMessage ( ) {
return hasNextMessage ;
}
void ILogSystem : : MergedPeekCursor : : nextMessage ( ) {
nextVersion = version ( ) ;
nextVersion . get ( ) . sub + + ;
serverCursors [ currentCursor ] - > nextMessage ( ) ;
calcHasMessage ( ) ;
ASSERT ( hasMessage ( ) | | ! version ( ) . sub ) ;
}
StringRef ILogSystem : : MergedPeekCursor : : getMessage ( ) { return serverCursors [ currentCursor ] - > getMessage ( ) ; }
2018-07-13 03:09:48 +08:00
StringRef ILogSystem : : MergedPeekCursor : : getMessageWithTags ( ) {
return serverCursors [ currentCursor ] - > getMessageWithTags ( ) ;
2018-04-09 12:24:05 +08:00
}
2018-03-17 02:40:21 +08:00
2019-11-06 10:07:30 +08:00
VectorRef < Tag > ILogSystem : : MergedPeekCursor : : getTags ( ) {
2017-06-30 06:50:19 +08:00
return serverCursors [ currentCursor ] - > getTags ( ) ;
}
2017-05-26 04:48:44 +08:00
void ILogSystem : : MergedPeekCursor : : advanceTo ( LogMessageVersion n ) {
2018-07-12 06:43:55 +08:00
bool canChange = false ;
for ( auto & c : serverCursors ) {
if ( c - > version ( ) < n ) {
canChange = true ;
c - > advanceTo ( n ) ;
}
}
if ( canChange ) {
calcHasMessage ( ) ;
}
2017-05-26 04:48:44 +08:00
}
2019-06-25 17:47:35 +08:00
ACTOR Future < Void > mergedPeekGetMore ( ILogSystem : : MergedPeekCursor * self , LogMessageVersion startVersion , TaskPriority taskID ) {
2017-05-26 04:48:44 +08:00
loop {
2018-06-09 02:11:08 +08:00
//TraceEvent("MPC_GetMoreA", self->randomID).detail("Start", startVersion.toString());
2017-06-30 06:50:19 +08:00
if ( self - > bestServer > = 0 & & self - > serverCursors [ self - > bestServer ] - > isActive ( ) ) {
2017-05-26 04:48:44 +08:00
ASSERT ( ! self - > serverCursors [ self - > bestServer ] - > hasMessage ( ) ) ;
2018-08-11 04:57:10 +08:00
wait ( self - > serverCursors [ self - > bestServer ] - > getMore ( taskID ) | | self - > serverCursors [ self - > bestServer ] - > onFailed ( ) ) ;
2017-05-26 04:48:44 +08:00
} else {
vector < Future < Void > > q ;
for ( auto & c : self - > serverCursors )
if ( ! c - > hasMessage ( ) )
2018-01-12 08:09:49 +08:00
q . push_back ( c - > getMore ( taskID ) ) ;
2018-08-11 04:57:10 +08:00
wait ( quorum ( q , 1 ) ) ;
2017-05-26 04:48:44 +08:00
}
self - > calcHasMessage ( ) ;
2018-06-09 02:11:08 +08:00
//TraceEvent("MPC_GetMoreB", self->randomID).detail("HasMessage", self->hasMessage()).detail("Start", startVersion.toString()).detail("Seq", self->version().toString());
2018-04-09 12:24:05 +08:00
if ( self - > hasMessage ( ) | | self - > version ( ) > startVersion ) {
2017-05-26 04:48:44 +08:00
return Void ( ) ;
2018-04-09 12:24:05 +08:00
}
2017-05-26 04:48:44 +08:00
}
}
2019-06-25 17:47:35 +08:00
Future < Void > ILogSystem : : MergedPeekCursor : : getMore ( TaskPriority taskID ) {
2019-11-05 11:47:45 +08:00
if ( more . isValid ( ) & & ! more . isReady ( ) ) {
return more ;
}
2017-06-30 06:50:19 +08:00
if ( ! serverCursors . size ( ) )
return Never ( ) ;
2017-05-26 04:48:44 +08:00
auto startVersion = version ( ) ;
calcHasMessage ( ) ;
if ( hasMessage ( ) )
return Void ( ) ;
if ( nextVersion . present ( ) )
advanceTo ( nextVersion . get ( ) ) ;
ASSERT ( ! hasMessage ( ) ) ;
if ( version ( ) > startVersion )
return Void ( ) ;
2019-11-05 11:47:45 +08:00
more = mergedPeekGetMore ( this , startVersion , taskID ) ;
return more ;
2017-05-26 04:48:44 +08:00
}
Future < Void > ILogSystem : : MergedPeekCursor : : onFailed ( ) {
ASSERT ( false ) ;
return Never ( ) ;
}
bool ILogSystem : : MergedPeekCursor : : isActive ( ) {
ASSERT ( false ) ;
return false ;
}
2017-07-16 06:15:03 +08:00
bool ILogSystem : : MergedPeekCursor : : isExhausted ( ) {
2018-04-24 13:03:55 +08:00
return serverCursors [ currentCursor ] - > isExhausted ( ) ;
2017-07-16 06:15:03 +08:00
}
2018-07-13 03:09:48 +08:00
const LogMessageVersion & ILogSystem : : MergedPeekCursor : : version ( ) { return messageVersion ; }
2017-05-26 04:48:44 +08:00
2018-06-22 06:29:46 +08:00
Version ILogSystem : : MergedPeekCursor : : getMinKnownCommittedVersion ( ) {
return serverCursors [ currentCursor ] - > getMinKnownCommittedVersion ( ) ;
}
2017-05-26 04:48:44 +08:00
Version ILogSystem : : MergedPeekCursor : : popped ( ) {
Version poppedVersion = 0 ;
for ( auto & c : serverCursors )
poppedVersion = std : : max ( poppedVersion , c - > popped ( ) ) ;
return poppedVersion ;
}
2017-07-12 06:48:10 +08:00
ILogSystem : : SetPeekCursor : : SetPeekCursor ( std : : vector < Reference < LogSet > > const & logSets , int bestSet , int bestServer , Tag tag , Version begin , Version end , bool parallelGetMore )
2019-05-11 05:01:52 +08:00
: logSets ( logSets ) , bestSet ( bestSet ) , bestServer ( bestServer ) , tag ( tag ) , currentCursor ( 0 ) , currentSet ( bestSet ) , hasNextMessage ( false ) , messageVersion ( begin ) , useBestSet ( true ) , randomID ( deterministicRandom ( ) - > randomUniqueID ( ) ) {
2017-07-11 08:41:32 +08:00
serverCursors . resize ( logSets . size ( ) ) ;
int maxServers = 0 ;
for ( int i = 0 ; i < logSets . size ( ) ; i + + ) {
2017-07-12 06:48:10 +08:00
for ( int j = 0 ; j < logSets [ i ] - > logServers . size ( ) ; j + + ) {
Reference < ILogSystem : : ServerPeekCursor > cursor ( new ILogSystem : : ServerPeekCursor ( logSets [ i ] - > logServers [ j ] , tag , begin , end , true , parallelGetMore ) ) ;
2017-07-11 08:41:32 +08:00
serverCursors [ i ] . push_back ( cursor ) ;
}
maxServers = std : : max < int > ( maxServers , serverCursors [ i ] . size ( ) ) ;
}
sortedVersions . resize ( maxServers ) ;
}
2018-04-26 09:20:28 +08:00
ILogSystem : : SetPeekCursor : : SetPeekCursor ( std : : vector < Reference < LogSet > > const & logSets , std : : vector < std : : vector < Reference < IPeekCursor > > > const & serverCursors , LogMessageVersion const & messageVersion , int bestSet , int bestServer ,
2018-04-27 09:32:12 +08:00
Optional < LogMessageVersion > nextVersion , bool useBestSet ) : logSets ( logSets ) , serverCursors ( serverCursors ) , messageVersion ( messageVersion ) , bestSet ( bestSet ) , bestServer ( bestServer ) , nextVersion ( nextVersion ) , currentSet ( bestSet ) , currentCursor ( 0 ) ,
2019-05-11 05:01:52 +08:00
hasNextMessage ( false ) , useBestSet ( useBestSet ) , randomID ( deterministicRandom ( ) - > randomUniqueID ( ) ) {
2018-04-26 09:20:28 +08:00
int maxServers = 0 ;
for ( int i = 0 ; i < logSets . size ( ) ; i + + ) {
maxServers = std : : max < int > ( maxServers , serverCursors [ i ] . size ( ) ) ;
}
sortedVersions . resize ( maxServers ) ;
calcHasMessage ( ) ;
}
2017-07-11 08:41:32 +08:00
Reference < ILogSystem : : IPeekCursor > ILogSystem : : SetPeekCursor : : cloneNoMore ( ) {
2018-04-26 09:20:28 +08:00
vector < vector < Reference < ILogSystem : : IPeekCursor > > > cursors ;
cursors . resize ( logSets . size ( ) ) ;
for ( int i = 0 ; i < logSets . size ( ) ; i + + ) {
for ( int j = 0 ; j < logSets [ i ] - > logServers . size ( ) ; j + + ) {
cursors [ i ] . push_back ( serverCursors [ i ] [ j ] - > cloneNoMore ( ) ) ;
}
}
2018-04-27 09:32:12 +08:00
return Reference < ILogSystem : : SetPeekCursor > ( new ILogSystem : : SetPeekCursor ( logSets , cursors , messageVersion , bestSet , bestServer , nextVersion , useBestSet ) ) ;
2017-07-11 08:41:32 +08:00
}
2019-06-19 05:49:04 +08:00
void ILogSystem : : SetPeekCursor : : setProtocolVersion ( ProtocolVersion version ) {
2017-07-11 08:41:32 +08:00
for ( auto & cursors : serverCursors ) {
for ( auto & it : cursors ) {
if ( it - > hasMessage ( ) ) {
it - > setProtocolVersion ( version ) ;
}
}
}
}
Arena & ILogSystem : : SetPeekCursor : : arena ( ) { return serverCursors [ currentSet ] [ currentCursor ] - > arena ( ) ; }
ArenaReader * ILogSystem : : SetPeekCursor : : reader ( ) { return serverCursors [ currentSet ] [ currentCursor ] - > reader ( ) ; }
void ILogSystem : : SetPeekCursor : : calcHasMessage ( ) {
2017-09-08 06:32:08 +08:00
if ( bestSet > = 0 & & bestServer > = 0 ) {
if ( nextVersion . present ( ) ) {
2018-06-09 02:11:08 +08:00
//TraceEvent("LPC_CalcNext").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage).detail("NextVersion", nextVersion.get().toString());
2017-09-08 06:32:08 +08:00
serverCursors [ bestSet ] [ bestServer ] - > advanceTo ( nextVersion . get ( ) ) ;
}
if ( serverCursors [ bestSet ] [ bestServer ] - > hasMessage ( ) ) {
messageVersion = serverCursors [ bestSet ] [ bestServer ] - > version ( ) ;
currentSet = bestSet ;
currentCursor = bestServer ;
hasNextMessage = true ;
2017-07-11 08:41:32 +08:00
2018-06-09 02:11:08 +08:00
//TraceEvent("LPC_Calc1").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage);
2017-09-08 06:32:08 +08:00
for ( auto & cursors : serverCursors ) {
for ( auto & c : cursors ) {
c - > advanceTo ( messageVersion ) ;
}
2017-07-11 08:41:32 +08:00
}
2017-09-08 06:32:08 +08:00
return ;
}
2017-07-11 08:41:32 +08:00
2017-09-08 06:32:08 +08:00
auto bestVersion = serverCursors [ bestSet ] [ bestServer ] - > version ( ) ;
for ( auto & cursors : serverCursors ) {
for ( auto & c : cursors ) {
c - > advanceTo ( bestVersion ) ;
}
2017-07-11 08:41:32 +08:00
}
}
2017-07-16 06:15:03 +08:00
hasNextMessage = false ;
2017-07-11 08:41:32 +08:00
if ( useBestSet ) {
updateMessage ( bestSet , false ) ; // Use Quorum logic
2018-06-09 02:11:08 +08:00
//TraceEvent("LPC_Calc2").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage);
2017-07-11 08:41:32 +08:00
if ( ! hasNextMessage ) {
updateMessage ( bestSet , true ) ;
2018-06-09 02:11:08 +08:00
//TraceEvent("LPC_Calc3").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage);
2017-07-11 08:41:32 +08:00
}
} else {
for ( int i = 0 ; i < logSets . size ( ) & & ! hasNextMessage ; i + + ) {
if ( i ! = bestSet ) {
updateMessage ( i , false ) ; // Use Quorum logic
}
}
2018-06-09 02:11:08 +08:00
//TraceEvent("LPC_Calc4").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage);
2017-07-11 08:41:32 +08:00
for ( int i = 0 ; i < logSets . size ( ) & & ! hasNextMessage ; i + + ) {
if ( i ! = bestSet ) {
updateMessage ( i , true ) ;
}
}
2018-06-09 02:11:08 +08:00
//TraceEvent("LPC_Calc5").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage);
2017-07-11 08:41:32 +08:00
}
}
void ILogSystem : : SetPeekCursor : : updateMessage ( int logIdx , bool usePolicy ) {
loop {
bool advancedPast = false ;
sortedVersions . clear ( ) ;
for ( int i = 0 ; i < serverCursors [ logIdx ] . size ( ) ; i + + ) {
2017-07-14 03:29:21 +08:00
auto & serverCursor = serverCursors [ logIdx ] [ i ] ;
if ( nextVersion . present ( ) ) serverCursor - > advanceTo ( nextVersion . get ( ) ) ;
sortedVersions . push_back ( std : : pair < LogMessageVersion , int > ( serverCursor - > version ( ) , i ) ) ;
2018-06-09 02:11:08 +08:00
//TraceEvent("LPC_Update1").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage).detail("ServerVer", serverCursor->version().toString()).detail("I", i);
2017-07-11 08:41:32 +08:00
}
if ( usePolicy ) {
std : : sort ( sortedVersions . begin ( ) , sortedVersions . end ( ) ) ;
2018-07-12 06:43:55 +08:00
locations . clear ( ) ;
2017-07-11 08:41:32 +08:00
for ( auto sortedVersion : sortedVersions ) {
2018-07-12 06:43:55 +08:00
locations . push_back ( logSets [ logIdx ] - > logEntryArray [ sortedVersion . second ] ) ;
if ( locations . size ( ) > = logSets [ logIdx ] - > tLogReplicationFactor & & logSets [ logIdx ] - > satisfiesPolicy ( locations ) ) {
2017-07-11 08:41:32 +08:00
messageVersion = sortedVersion . first ;
break ;
}
}
} else {
//(int)oldLogData[i].logServers.size() + 1 - oldLogData[i].tLogReplicationFactor
2017-07-12 06:48:10 +08:00
std : : nth_element ( sortedVersions . begin ( ) , sortedVersions . end ( ) - ( logSets [ logIdx ] - > logServers . size ( ) + 1 - logSets [ logIdx ] - > tLogReplicationFactor ) , sortedVersions . end ( ) ) ;
messageVersion = sortedVersions [ sortedVersions . size ( ) - ( logSets [ logIdx ] - > logServers . size ( ) + 1 - logSets [ logIdx ] - > tLogReplicationFactor ) ] . first ;
2017-07-11 08:41:32 +08:00
}
2017-09-08 06:32:08 +08:00
for ( auto & cursors : serverCursors ) {
for ( auto & c : cursors ) {
auto start = c - > version ( ) ;
c - > advanceTo ( messageVersion ) ;
2018-03-30 06:12:38 +08:00
if ( start < = messageVersion & & messageVersion < c - > version ( ) ) {
2017-09-08 06:32:08 +08:00
advancedPast = true ;
TEST ( true ) ; //Merge peek cursor advanced past desired sequence
}
2017-07-11 08:41:32 +08:00
}
}
if ( ! advancedPast )
break ;
}
for ( int i = 0 ; i < serverCursors [ logIdx ] . size ( ) ; i + + ) {
auto & c = serverCursors [ logIdx ] [ i ] ;
ASSERT_WE_THINK ( ! c - > hasMessage ( ) | | c - > version ( ) > = messageVersion ) ; // Seems like the loop above makes this unconditionally true
if ( c - > version ( ) = = messageVersion & & c - > hasMessage ( ) ) {
hasNextMessage = true ;
currentSet = logIdx ;
currentCursor = i ;
break ;
}
}
}
bool ILogSystem : : SetPeekCursor : : hasMessage ( ) {
return hasNextMessage ;
}
void ILogSystem : : SetPeekCursor : : nextMessage ( ) {
nextVersion = version ( ) ;
nextVersion . get ( ) . sub + + ;
serverCursors [ currentSet ] [ currentCursor ] - > nextMessage ( ) ;
calcHasMessage ( ) ;
ASSERT ( hasMessage ( ) | | ! version ( ) . sub ) ;
}
StringRef ILogSystem : : SetPeekCursor : : getMessage ( ) { return serverCursors [ currentSet ] [ currentCursor ] - > getMessage ( ) ; }
2018-03-17 02:40:21 +08:00
StringRef ILogSystem : : SetPeekCursor : : getMessageWithTags ( ) { return serverCursors [ currentSet ] [ currentCursor ] - > getMessageWithTags ( ) ; }
2019-11-06 10:07:30 +08:00
VectorRef < Tag > ILogSystem : : SetPeekCursor : : getTags ( ) {
2017-07-11 08:41:32 +08:00
return serverCursors [ currentSet ] [ currentCursor ] - > getTags ( ) ;
}
void ILogSystem : : SetPeekCursor : : advanceTo ( LogMessageVersion n ) {
2018-07-12 06:43:55 +08:00
bool canChange = false ;
2017-07-11 08:41:32 +08:00
for ( auto & cursors : serverCursors ) {
for ( auto & c : cursors ) {
2018-07-12 06:43:55 +08:00
if ( c - > version ( ) < n ) {
canChange = true ;
c - > advanceTo ( n ) ;
}
2017-07-11 08:41:32 +08:00
}
}
2018-07-12 06:43:55 +08:00
if ( canChange ) {
calcHasMessage ( ) ;
}
2017-07-11 08:41:32 +08:00
}
2019-06-25 17:47:35 +08:00
ACTOR Future < Void > setPeekGetMore ( ILogSystem : : SetPeekCursor * self , LogMessageVersion startVersion , TaskPriority taskID ) {
2017-07-11 08:41:32 +08:00
loop {
2018-06-09 02:11:08 +08:00
//TraceEvent("LPC_GetMore1", self->randomID).detail("Start", startVersion.toString()).detail("Tag", self->tag);
2017-07-11 08:41:32 +08:00
if ( self - > bestServer > = 0 & & self - > bestSet > = 0 & & self - > serverCursors [ self - > bestSet ] [ self - > bestServer ] - > isActive ( ) ) {
ASSERT ( ! self - > serverCursors [ self - > bestSet ] [ self - > bestServer ] - > hasMessage ( ) ) ;
2018-06-09 02:11:08 +08:00
//TraceEvent("LPC_GetMore2", self->randomID).detail("Start", startVersion.toString()).detail("Tag", self->tag);
2018-08-11 04:57:10 +08:00
wait ( self - > serverCursors [ self - > bestSet ] [ self - > bestServer ] - > getMore ( taskID ) | | self - > serverCursors [ self - > bestSet ] [ self - > bestServer ] - > onFailed ( ) ) ;
2017-07-11 08:41:32 +08:00
self - > useBestSet = true ;
} else {
2017-07-16 06:15:03 +08:00
//FIXME: if best set is exhausted, do not peek remote servers
2017-07-11 08:41:32 +08:00
bool bestSetValid = self - > bestSet > = 0 ;
if ( bestSetValid ) {
2018-07-12 06:43:55 +08:00
self - > locations . clear ( ) ;
2017-07-11 08:41:32 +08:00
for ( int i = 0 ; i < self - > serverCursors [ self - > bestSet ] . size ( ) ; i + + ) {
2018-03-08 04:54:53 +08:00
if ( ! self - > serverCursors [ self - > bestSet ] [ i ] - > isActive ( ) & & self - > serverCursors [ self - > bestSet ] [ i ] - > version ( ) < = self - > messageVersion ) {
2018-07-12 06:43:55 +08:00
self - > locations . push_back ( self - > logSets [ self - > bestSet ] - > logEntryArray [ i ] ) ;
2017-07-11 08:41:32 +08:00
}
}
2018-07-12 06:43:55 +08:00
bestSetValid = self - > locations . size ( ) < self - > logSets [ self - > bestSet ] - > tLogReplicationFactor | | ! self - > logSets [ self - > bestSet ] - > satisfiesPolicy ( self - > locations ) ;
2017-07-11 08:41:32 +08:00
}
2018-01-17 10:12:40 +08:00
if ( bestSetValid | | self - > logSets . size ( ) = = 1 ) {
2017-10-20 06:36:32 +08:00
if ( ! self - > useBestSet ) {
self - > useBestSet = true ;
self - > calcHasMessage ( ) ;
if ( self - > hasMessage ( ) | | self - > version ( ) > startVersion )
return Void ( ) ;
}
2018-07-12 06:43:55 +08:00
2018-06-09 02:11:08 +08:00
//TraceEvent("LPC_GetMore3", self->randomID).detail("Start", startVersion.toString()).detail("Tag", self->tag.toString()).detail("BestSetSize", self->serverCursors[self->bestSet].size());
2017-07-11 08:41:32 +08:00
vector < Future < Void > > q ;
for ( auto & c : self - > serverCursors [ self - > bestSet ] ) {
if ( ! c - > hasMessage ( ) ) {
2018-01-17 02:48:50 +08:00
q . push_back ( c - > getMore ( taskID ) ) ;
2017-10-25 06:09:31 +08:00
if ( c - > isActive ( ) ) {
q . push_back ( c - > onFailed ( ) ) ;
}
2017-07-11 08:41:32 +08:00
}
}
2018-08-11 04:57:10 +08:00
wait ( quorum ( q , 1 ) ) ;
2017-07-11 08:41:32 +08:00
} else {
//FIXME: this will peeking way too many cursors when satellites exist, and does not need to peek bestSet cursors since we cannot get anymore data from them
vector < Future < Void > > q ;
2018-06-09 02:11:08 +08:00
//TraceEvent("LPC_GetMore4", self->randomID).detail("Start", startVersion.toString()).detail("Tag", self->tag);
2017-07-11 08:41:32 +08:00
for ( auto & cursors : self - > serverCursors ) {
for ( auto & c : cursors ) {
if ( ! c - > hasMessage ( ) ) {
2018-01-17 02:48:50 +08:00
q . push_back ( c - > getMore ( taskID ) ) ;
2017-07-11 08:41:32 +08:00
}
}
}
2018-08-11 04:57:10 +08:00
wait ( quorum ( q , 1 ) ) ;
2017-07-11 08:41:32 +08:00
self - > useBestSet = false ;
}
}
self - > calcHasMessage ( ) ;
2018-06-09 02:11:08 +08:00
//TraceEvent("LPC_GetMoreB", self->randomID).detail("HasMessage", self->hasMessage()).detail("Start", startVersion.toString()).detail("Seq", self->version().toString());
2017-07-11 08:41:32 +08:00
if ( self - > hasMessage ( ) | | self - > version ( ) > startVersion )
return Void ( ) ;
}
}
2019-06-25 17:47:35 +08:00
Future < Void > ILogSystem : : SetPeekCursor : : getMore ( TaskPriority taskID ) {
2019-11-05 11:47:45 +08:00
if ( more . isValid ( ) & & ! more . isReady ( ) ) {
return more ;
}
2017-07-11 08:41:32 +08:00
auto startVersion = version ( ) ;
calcHasMessage ( ) ;
if ( hasMessage ( ) )
return Void ( ) ;
if ( nextVersion . present ( ) )
advanceTo ( nextVersion . get ( ) ) ;
ASSERT ( ! hasMessage ( ) ) ;
if ( version ( ) > startVersion )
return Void ( ) ;
2019-11-05 11:47:45 +08:00
more = setPeekGetMore ( this , startVersion , taskID ) ;
return more ;
2017-07-11 08:41:32 +08:00
}
Future < Void > ILogSystem : : SetPeekCursor : : onFailed ( ) {
ASSERT ( false ) ;
return Never ( ) ;
}
bool ILogSystem : : SetPeekCursor : : isActive ( ) {
ASSERT ( false ) ;
return false ;
}
2017-07-16 06:15:03 +08:00
bool ILogSystem : : SetPeekCursor : : isExhausted ( ) {
2018-04-24 13:03:55 +08:00
return serverCursors [ currentSet ] [ currentCursor ] - > isExhausted ( ) ;
2017-07-16 06:15:03 +08:00
}
2018-07-13 03:09:48 +08:00
const LogMessageVersion & ILogSystem : : SetPeekCursor : : version ( ) { return messageVersion ; }
2017-07-11 08:41:32 +08:00
2018-06-22 06:29:46 +08:00
Version ILogSystem : : SetPeekCursor : : getMinKnownCommittedVersion ( ) {
return serverCursors [ currentSet ] [ currentCursor ] - > getMinKnownCommittedVersion ( ) ;
}
2017-07-11 08:41:32 +08:00
Version ILogSystem : : SetPeekCursor : : popped ( ) {
Version poppedVersion = 0 ;
for ( auto & cursors : serverCursors ) {
for ( auto & c : cursors ) {
poppedVersion = std : : max ( poppedVersion , c - > popped ( ) ) ;
}
}
return poppedVersion ;
}
2019-07-30 12:19:47 +08:00
ILogSystem : : MultiCursor : : MultiCursor ( std : : vector < Reference < IPeekCursor > > cursors , std : : vector < LogMessageVersion > epochEnds ) : cursors ( cursors ) , epochEnds ( epochEnds ) , poppedVersion ( 0 ) {
2018-10-04 04:57:45 +08:00
for ( int i = 0 ; i < std : : min < int > ( cursors . size ( ) , SERVER_KNOBS - > MULTI_CURSOR_PRE_FETCH_LIMIT ) ; i + + ) {
cursors [ cursors . size ( ) - i - 1 ] - > getMore ( ) ;
}
}
2017-05-26 04:48:44 +08:00
Reference < ILogSystem : : IPeekCursor > ILogSystem : : MultiCursor : : cloneNoMore ( ) {
return cursors . back ( ) - > cloneNoMore ( ) ;
}
2019-06-19 05:49:04 +08:00
void ILogSystem : : MultiCursor : : setProtocolVersion ( ProtocolVersion version ) {
2017-05-26 04:48:44 +08:00
cursors . back ( ) - > setProtocolVersion ( version ) ;
}
Arena & ILogSystem : : MultiCursor : : arena ( ) {
return cursors . back ( ) - > arena ( ) ;
}
ArenaReader * ILogSystem : : MultiCursor : : reader ( ) {
return cursors . back ( ) - > reader ( ) ;
}
bool ILogSystem : : MultiCursor : : hasMessage ( ) {
return cursors . back ( ) - > hasMessage ( ) ;
}
void ILogSystem : : MultiCursor : : nextMessage ( ) {
cursors . back ( ) - > nextMessage ( ) ;
}
StringRef ILogSystem : : MultiCursor : : getMessage ( ) {
return cursors . back ( ) - > getMessage ( ) ;
}
2018-03-17 02:40:21 +08:00
StringRef ILogSystem : : MultiCursor : : getMessageWithTags ( ) {
return cursors . back ( ) - > getMessageWithTags ( ) ;
}
2019-11-06 10:07:30 +08:00
VectorRef < Tag > ILogSystem : : MultiCursor : : getTags ( ) {
2017-06-30 06:50:19 +08:00
return cursors . back ( ) - > getTags ( ) ;
}
2017-05-26 04:48:44 +08:00
void ILogSystem : : MultiCursor : : advanceTo ( LogMessageVersion n ) {
while ( cursors . size ( ) > 1 & & n > = epochEnds . back ( ) ) {
2019-07-30 12:19:47 +08:00
poppedVersion = std : : max ( poppedVersion , cursors . back ( ) - > popped ( ) ) ;
2017-05-26 04:48:44 +08:00
cursors . pop_back ( ) ;
epochEnds . pop_back ( ) ;
}
cursors . back ( ) - > advanceTo ( n ) ;
}
2019-06-25 17:47:35 +08:00
Future < Void > ILogSystem : : MultiCursor : : getMore ( TaskPriority taskID ) {
2018-11-05 12:26:23 +08:00
LogMessageVersion startVersion = cursors . back ( ) - > version ( ) ;
2017-05-26 04:48:44 +08:00
while ( cursors . size ( ) > 1 & & cursors . back ( ) - > version ( ) > = epochEnds . back ( ) ) {
2019-07-30 12:19:47 +08:00
poppedVersion = std : : max ( poppedVersion , cursors . back ( ) - > popped ( ) ) ;
2017-05-26 04:48:44 +08:00
cursors . pop_back ( ) ;
epochEnds . pop_back ( ) ;
}
2018-11-03 04:04:09 +08:00
if ( cursors . back ( ) - > version ( ) > startVersion ) {
return Void ( ) ;
2017-05-26 04:48:44 +08:00
}
2018-01-12 08:09:49 +08:00
return cursors . back ( ) - > getMore ( taskID ) ;
2017-05-26 04:48:44 +08:00
}
Future < Void > ILogSystem : : MultiCursor : : onFailed ( ) {
return cursors . back ( ) - > onFailed ( ) ;
}
bool ILogSystem : : MultiCursor : : isActive ( ) {
return cursors . back ( ) - > isActive ( ) ;
}
2017-07-16 06:15:03 +08:00
bool ILogSystem : : MultiCursor : : isExhausted ( ) {
2018-04-24 13:03:55 +08:00
return cursors . back ( ) - > isExhausted ( ) ;
2017-07-16 06:15:03 +08:00
}
2018-07-13 03:09:48 +08:00
const LogMessageVersion & ILogSystem : : MultiCursor : : version ( ) {
2017-05-26 04:48:44 +08:00
return cursors . back ( ) - > version ( ) ;
}
2018-06-22 06:29:46 +08:00
Version ILogSystem : : MultiCursor : : getMinKnownCommittedVersion ( ) {
return cursors . back ( ) - > getMinKnownCommittedVersion ( ) ;
}
2017-05-26 04:48:44 +08:00
Version ILogSystem : : MultiCursor : : popped ( ) {
return std : : max ( poppedVersion , cursors . back ( ) - > popped ( ) ) ;
}
2018-07-13 03:09:48 +08:00
2019-11-06 10:07:30 +08:00
ILogSystem : : BufferedCursor : : BufferedCursor ( std : : vector < Reference < IPeekCursor > > cursors , Version begin , Version end , bool withTags , bool collectTags , bool canDiscardPopped ) : cursors ( cursors ) , messageVersion ( begin ) , end ( end ) , withTags ( withTags ) , collectTags ( collectTags ) , hasNextMessage ( false ) , messageIndex ( 0 ) , poppedVersion ( 0 ) , initialPoppedVersion ( 0 ) , canDiscardPopped ( canDiscardPopped ) , knownUnique ( false ) , minKnownCommittedVersion ( 0 ) , randomID ( deterministicRandom ( ) - > randomUniqueID ( ) ) {
2019-11-05 12:21:38 +08:00
targetQueueSize = SERVER_KNOBS - > DESIRED_OUTSTANDING_MESSAGES / cursors . size ( ) ;
messages . reserve ( SERVER_KNOBS - > DESIRED_OUTSTANDING_MESSAGES ) ;
2019-11-05 11:47:45 +08:00
cursorMessages . resize ( cursors . size ( ) ) ;
}
2019-11-06 10:07:30 +08:00
ILogSystem : : BufferedCursor : : BufferedCursor ( std : : vector < Reference < AsyncVar < OptionalInterface < TLogInterface > > > > const & logServers , Tag tag , Version begin , Version end , bool parallelGetMore ) : messageVersion ( begin ) , end ( end ) , withTags ( true ) , collectTags ( false ) , hasNextMessage ( false ) , messageIndex ( 0 ) , poppedVersion ( 0 ) , initialPoppedVersion ( 0 ) , canDiscardPopped ( false ) , knownUnique ( true ) , minKnownCommittedVersion ( 0 ) , randomID ( deterministicRandom ( ) - > randomUniqueID ( ) ) {
2019-11-05 12:21:38 +08:00
targetQueueSize = SERVER_KNOBS - > DESIRED_OUTSTANDING_MESSAGES / logServers . size ( ) ;
messages . reserve ( SERVER_KNOBS - > DESIRED_OUTSTANDING_MESSAGES ) ;
2019-11-05 11:47:45 +08:00
cursorMessages . resize ( logServers . size ( ) ) ;
for ( int i = 0 ; i < logServers . size ( ) ; i + + ) {
Reference < ILogSystem : : ServerPeekCursor > cursor ( new ILogSystem : : ServerPeekCursor ( logServers [ i ] , tag , begin , end , false , parallelGetMore ) ) ;
cursors . push_back ( cursor ) ;
}
2018-07-13 03:09:48 +08:00
}
void ILogSystem : : BufferedCursor : : combineMessages ( ) {
if ( ! hasNextMessage ) {
return ;
}
2019-11-06 10:32:15 +08:00
std : : vector < Tag > tags ;
2019-11-06 10:22:25 +08:00
tags . push_back ( messages [ messageIndex ] . tags [ 0 ] ) ;
2018-07-13 03:09:48 +08:00
for ( int i = messageIndex + 1 ; i < messages . size ( ) & & messages [ messageIndex ] . version = = messages [ i ] . version ; i + + ) {
2019-11-06 10:22:25 +08:00
tags . push_back ( messages [ i ] . tags [ 0 ] ) ;
2018-07-13 03:09:48 +08:00
messageIndex = i ;
}
auto & msg = messages [ messageIndex ] ;
BinaryWriter messageWriter ( Unversioned ( ) ) ;
2019-11-06 10:22:25 +08:00
messageWriter < < uint32_t ( msg . message . size ( ) + sizeof ( uint32_t ) + sizeof ( uint16_t ) + tags . size ( ) * sizeof ( Tag ) ) < < msg . version . sub < < uint16_t ( tags . size ( ) ) ;
for ( auto t : tags ) {
2018-07-13 03:09:48 +08:00
messageWriter < < t ;
}
messageWriter . serializeBytes ( msg . message ) ;
2019-03-29 02:52:50 +08:00
Standalone < StringRef > val = messageWriter . toValue ( ) ;
msg . arena = val . arena ( ) ;
msg . message = val ;
2019-11-06 10:44:30 +08:00
msg . tags = VectorRef < Tag > ( ) ;
for ( auto t : tags ) {
msg . tags . push_back ( msg . arena , t ) ;
}
2018-07-13 03:09:48 +08:00
}
Reference < ILogSystem : : IPeekCursor > ILogSystem : : BufferedCursor : : cloneNoMore ( ) {
ASSERT ( false ) ;
return Reference < ILogSystem : : IPeekCursor > ( ) ;
}
2019-06-19 05:49:04 +08:00
void ILogSystem : : BufferedCursor : : setProtocolVersion ( ProtocolVersion version ) {
2018-07-13 03:09:48 +08:00
for ( auto & c : cursors ) {
c - > setProtocolVersion ( version ) ;
}
}
Arena & ILogSystem : : BufferedCursor : : arena ( ) {
return messages [ messageIndex ] . arena ;
}
ArenaReader * ILogSystem : : BufferedCursor : : reader ( ) {
ASSERT ( false ) ;
return cursors [ 0 ] - > reader ( ) ;
}
bool ILogSystem : : BufferedCursor : : hasMessage ( ) {
return hasNextMessage ;
}
void ILogSystem : : BufferedCursor : : nextMessage ( ) {
messageIndex + + ;
if ( messageIndex = = messages . size ( ) ) {
hasNextMessage = false ;
}
if ( collectTags ) {
combineMessages ( ) ;
}
}
StringRef ILogSystem : : BufferedCursor : : getMessage ( ) {
2019-06-20 09:15:09 +08:00
ASSERT ( ! withTags ) ;
return messages [ messageIndex ] . message ;
2018-07-13 03:09:48 +08:00
}
StringRef ILogSystem : : BufferedCursor : : getMessageWithTags ( ) {
2019-06-20 09:15:09 +08:00
ASSERT ( withTags ) ;
2018-07-13 03:09:48 +08:00
return messages [ messageIndex ] . message ;
}
2019-11-06 10:07:30 +08:00
VectorRef < Tag > ILogSystem : : BufferedCursor : : getTags ( ) {
2019-06-20 09:15:09 +08:00
ASSERT ( withTags ) ;
2018-07-13 03:09:48 +08:00
return messages [ messageIndex ] . tags ;
}
void ILogSystem : : BufferedCursor : : advanceTo ( LogMessageVersion n ) {
ASSERT ( false ) ;
}
2019-11-05 11:47:45 +08:00
ACTOR Future < Void > bufferedGetMoreLoader ( ILogSystem : : BufferedCursor * self , Reference < ILogSystem : : IPeekCursor > cursor , int idx , TaskPriority taskID ) {
2018-07-13 03:09:48 +08:00
loop {
2018-08-11 04:57:10 +08:00
wait ( yield ( ) ) ;
2019-11-05 11:47:45 +08:00
if ( cursor - > version ( ) . version > = self - > end | | self - > cursorMessages [ idx ] . size ( ) > self - > targetQueueSize ) {
return Void ( ) ;
}
2019-10-24 14:06:02 +08:00
wait ( cursor - > getMore ( taskID ) ) ;
self - > poppedVersion = std : : max ( self - > poppedVersion , cursor - > popped ( ) ) ;
2019-11-06 10:07:30 +08:00
self - > minKnownCommittedVersion = std : : max ( self - > minKnownCommittedVersion , cursor - > getMinKnownCommittedVersion ( ) ) ;
2019-10-24 14:06:02 +08:00
if ( self - > canDiscardPopped ) {
self - > initialPoppedVersion = std : : max ( self - > initialPoppedVersion , cursor - > popped ( ) ) ;
}
2019-11-05 11:47:45 +08:00
if ( cursor - > version ( ) . version > = self - > end ) {
2018-07-13 03:09:48 +08:00
return Void ( ) ;
}
while ( cursor - > hasMessage ( ) ) {
2019-11-06 10:07:30 +08:00
self - > cursorMessages [ idx ] . push_back ( ILogSystem : : BufferedCursor : : BufferedMessage ( cursor - > arena ( ) , ( ! self - > withTags | | self - > collectTags ) ? cursor - > getMessage ( ) : cursor - > getMessageWithTags ( ) , ! self - > withTags ? VectorRef < Tag > ( ) : cursor - > getTags ( ) , cursor - > version ( ) ) ) ;
2018-07-13 03:09:48 +08:00
cursor - > nextMessage ( ) ;
}
}
}
2019-06-25 17:47:35 +08:00
ACTOR Future < Void > bufferedGetMore ( ILogSystem : : BufferedCursor * self , TaskPriority taskID ) {
2018-07-13 03:09:48 +08:00
if ( self - > messageVersion . version > = self - > end ) {
2018-08-11 04:57:10 +08:00
wait ( Future < Void > ( Never ( ) ) ) ;
2018-07-13 03:09:48 +08:00
throw internal_error ( ) ;
}
self - > messages . clear ( ) ;
std : : vector < Future < Void > > loaders ;
loaders . reserve ( self - > cursors . size ( ) ) ;
2019-11-05 11:47:45 +08:00
for ( int i = 0 ; i < self - > cursors . size ( ) ; i + + ) {
loaders . push_back ( bufferedGetMoreLoader ( self , self - > cursors [ i ] , i , taskID ) ) ;
2018-07-13 03:09:48 +08:00
}
2019-11-05 11:47:45 +08:00
state Future < Void > allLoaders = waitForAll ( loaders ) ;
state Version minVersion ;
loop {
2019-11-05 12:21:38 +08:00
wait ( allLoaders | | delay ( SERVER_KNOBS - > DESIRED_GET_MORE_DELAY , taskID ) ) ;
2019-11-05 11:47:45 +08:00
minVersion = self - > end ;
2019-11-06 10:07:30 +08:00
for ( auto cursor : self - > cursors ) {
2019-11-05 11:47:45 +08:00
minVersion = std : : min ( minVersion , cursor - > version ( ) . version ) ;
}
if ( minVersion > self - > messageVersion . version ) {
break ;
}
if ( allLoaders . isReady ( ) ) {
wait ( Future < Void > ( Never ( ) ) ) ;
}
}
wait ( yield ( ) ) ;
for ( auto & it : self - > cursorMessages ) {
while ( ! it . empty ( ) & & it . front ( ) . version . version < minVersion ) {
self - > messages . push_back ( it . front ( ) ) ;
it . pop_front ( ) ;
}
}
if ( self - > collectTags | | self - > knownUnique ) {
2018-07-13 03:09:48 +08:00
std : : sort ( self - > messages . begin ( ) , self - > messages . end ( ) ) ;
} else {
uniquify ( self - > messages ) ;
}
2019-11-05 11:47:45 +08:00
self - > messageVersion = LogMessageVersion ( minVersion ) ;
2018-07-13 03:09:48 +08:00
self - > messageIndex = 0 ;
self - > hasNextMessage = self - > messages . size ( ) > 0 ;
2019-11-05 11:47:45 +08:00
2018-07-13 03:09:48 +08:00
if ( self - > collectTags ) {
self - > combineMessages ( ) ;
}
2018-07-13 08:47:35 +08:00
2018-08-11 04:57:10 +08:00
wait ( yield ( ) ) ;
2019-07-31 03:38:44 +08:00
if ( self - > canDiscardPopped & & self - > poppedVersion > self - > version ( ) . version ) {
2019-11-05 11:47:45 +08:00
TraceEvent ( SevWarn , " DiscardingPoppedData " , self - > randomID ) . detail ( " Version " , self - > version ( ) . version ) . detail ( " Popped " , self - > poppedVersion ) ;
2019-07-31 03:38:44 +08:00
self - > messageVersion = std : : max ( self - > messageVersion , LogMessageVersion ( self - > poppedVersion ) ) ;
2019-11-06 10:07:30 +08:00
for ( auto cursor : self - > cursors ) {
2019-07-31 05:42:05 +08:00
cursor - > advanceTo ( self - > messageVersion ) ;
}
2019-07-31 03:21:48 +08:00
self - > messageIndex = self - > messages . size ( ) ;
2019-08-07 07:31:05 +08:00
if ( self - > messages . size ( ) > 0 & & self - > messages [ self - > messages . size ( ) - 1 ] . version . version < self - > poppedVersion ) {
2019-07-31 03:21:48 +08:00
self - > hasNextMessage = false ;
} else {
auto iter = std : : lower_bound ( self - > messages . begin ( ) , self - > messages . end ( ) ,
2019-07-31 03:38:44 +08:00
ILogSystem : : BufferedCursor : : BufferedMessage ( self - > poppedVersion ) ) ;
2019-07-31 03:21:48 +08:00
self - > hasNextMessage = iter ! = self - > messages . end ( ) ;
if ( self - > hasNextMessage ) {
self - > messageIndex = iter - self - > messages . begin ( ) ;
}
2019-07-30 12:19:47 +08:00
}
}
2019-07-31 04:44:44 +08:00
if ( self - > hasNextMessage ) {
self - > canDiscardPopped = false ;
}
2018-07-13 03:09:48 +08:00
return Void ( ) ;
}
2019-06-25 17:47:35 +08:00
Future < Void > ILogSystem : : BufferedCursor : : getMore ( TaskPriority taskID ) {
2019-07-31 05:42:05 +08:00
if ( hasMessage ( ) ) {
2018-07-13 03:09:48 +08:00
return Void ( ) ;
2019-07-31 05:42:05 +08:00
}
if ( ! more . isValid ( ) | | more . isReady ( ) ) {
more = bufferedGetMore ( this , taskID ) ;
}
return more ;
2018-07-13 03:09:48 +08:00
}
Future < Void > ILogSystem : : BufferedCursor : : onFailed ( ) {
ASSERT ( false ) ;
return Never ( ) ;
}
bool ILogSystem : : BufferedCursor : : isActive ( ) {
ASSERT ( false ) ;
return false ;
}
bool ILogSystem : : BufferedCursor : : isExhausted ( ) {
ASSERT ( false ) ;
return false ;
}
const LogMessageVersion & ILogSystem : : BufferedCursor : : version ( ) {
if ( hasNextMessage ) {
return messages [ messageIndex ] . version ;
}
return messageVersion ;
}
Version ILogSystem : : BufferedCursor : : getMinKnownCommittedVersion ( ) {
2019-11-06 10:07:30 +08:00
return minKnownCommittedVersion ;
2018-07-13 03:09:48 +08:00
}
Version ILogSystem : : BufferedCursor : : popped ( ) {
2019-07-30 12:19:47 +08:00
if ( initialPoppedVersion = = poppedVersion ) {
return 0 ;
}
return poppedVersion ;
2018-07-13 03:09:48 +08:00
}