2017-05-26 04:48:44 +08:00
/*
* LogSystemPeekCursor . actor . cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013 - 2018 Apple Inc . and the FoundationDB project authors
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* http : //www.apache.org/licenses/LICENSE-2.0
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
2018-10-20 01:30:13 +08:00
# include "fdbserver/LogSystem.h"
2017-05-26 04:48:44 +08:00
# include "fdbrpc/FailureMonitor.h"
2018-10-20 01:30:13 +08:00
# include "fdbserver/Knobs.h"
2017-11-16 13:05:10 +08:00
# include "fdbrpc/ReplicationUtils.h"
2019-02-18 10:46:59 +08:00
# include "flow/actorcompiler.h" // has to be last include
2017-05-26 04:48:44 +08:00
ILogSystem : : ServerPeekCursor : : ServerPeekCursor ( Reference < AsyncVar < OptionalInterface < TLogInterface > > > const & interf , Tag tag , Version begin , Version end , bool returnIfBlocked , bool parallelGetMore )
: interf ( interf ) , tag ( tag ) , messageVersion ( begin ) , end ( end ) , hasMsg ( false ) , rd ( results . arena , results . messages , Unversioned ( ) ) , randomID ( g_random - > randomUniqueID ( ) ) , poppedVersion ( 0 ) , returnIfBlocked ( returnIfBlocked ) , sequence ( 0 ) , parallelGetMore ( parallelGetMore ) {
this - > results . maxKnownVersion = 0 ;
2018-06-22 06:29:46 +08:00
this - > results . minKnownCommittedVersion = 0 ;
2018-06-09 02:11:08 +08:00
//TraceEvent("SPC_Starting", randomID).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).backtrace();
2017-05-26 04:48:44 +08:00
}
2018-03-17 02:40:21 +08:00
ILogSystem : : ServerPeekCursor : : ServerPeekCursor ( TLogPeekReply const & results , LogMessageVersion const & messageVersion , LogMessageVersion const & end , int32_t messageLength , int32_t rawLength , bool hasMsg , Version poppedVersion , Tag tag )
: results ( results ) , tag ( tag ) , rd ( results . arena , results . messages , Unversioned ( ) ) , messageVersion ( messageVersion ) , end ( end ) , messageLength ( messageLength ) , rawLength ( rawLength ) , hasMsg ( hasMsg ) , randomID ( g_random - > randomUniqueID ( ) ) , poppedVersion ( poppedVersion ) , returnIfBlocked ( false ) , sequence ( 0 ) , parallelGetMore ( false )
2017-05-26 04:48:44 +08:00
{
2018-06-09 02:11:08 +08:00
//TraceEvent("SPC_Clone", randomID);
2017-05-26 04:48:44 +08:00
this - > results . maxKnownVersion = 0 ;
2018-06-22 06:29:46 +08:00
this - > results . minKnownCommittedVersion = 0 ;
2017-05-26 04:48:44 +08:00
if ( hasMsg )
nextMessage ( ) ;
advanceTo ( messageVersion ) ;
}
Reference < ILogSystem : : IPeekCursor > ILogSystem : : ServerPeekCursor : : cloneNoMore ( ) {
2018-03-17 02:40:21 +08:00
return Reference < ILogSystem : : ServerPeekCursor > ( new ILogSystem : : ServerPeekCursor ( results , messageVersion , end , messageLength , rawLength , hasMsg , poppedVersion , tag ) ) ;
2017-05-26 04:48:44 +08:00
}
void ILogSystem : : ServerPeekCursor : : setProtocolVersion ( uint64_t version ) {
rd . setProtocolVersion ( version ) ;
}
Arena & ILogSystem : : ServerPeekCursor : : arena ( ) { return results . arena ; }
ArenaReader * ILogSystem : : ServerPeekCursor : : reader ( ) {
return & rd ;
}
bool ILogSystem : : ServerPeekCursor : : hasMessage ( ) {
2018-06-09 02:11:08 +08:00
//TraceEvent("SPC_HasMessage", randomID).detail("HasMsg", hasMsg);
2017-05-26 04:48:44 +08:00
return hasMsg ;
}
void ILogSystem : : ServerPeekCursor : : nextMessage ( ) {
2018-06-09 02:11:08 +08:00
//TraceEvent("SPC_NextMessage", randomID).detail("MessageVersion", messageVersion.toString());
2017-05-26 04:48:44 +08:00
ASSERT ( hasMsg ) ;
if ( rd . empty ( ) ) {
messageVersion . reset ( std : : min ( results . end , end . version ) ) ;
hasMsg = false ;
return ;
}
if ( * ( int32_t * ) rd . peekBytes ( 4 ) = = - 1 ) {
// A version
int32_t dummy ;
Version ver ;
rd > > dummy > > ver ;
2018-06-09 02:11:08 +08:00
//TraceEvent("SPC_ProcessSeq", randomID).detail("MessageVersion", messageVersion.toString()).detail("Ver", ver).detail("Tag", tag.toString());
2017-05-26 04:48:44 +08:00
//ASSERT( ver >= messageVersion.version );
messageVersion . reset ( ver ) ;
if ( messageVersion > = end ) {
messageVersion = end ;
hasMsg = false ;
return ;
}
ASSERT ( ! rd . empty ( ) ) ;
}
2017-06-30 06:50:19 +08:00
uint16_t tagCount ;
2018-03-17 02:40:21 +08:00
rd . checkpoint ( ) ;
2017-06-30 06:50:19 +08:00
rd > > messageLength > > messageVersion . sub > > tagCount ;
tags . resize ( tagCount ) ;
for ( int i = 0 ; i < tagCount ; i + + ) {
rd > > tags [ i ] ;
}
2018-03-17 02:40:21 +08:00
rawLength = messageLength + sizeof ( messageLength ) ;
messageLength - = ( sizeof ( messageVersion . sub ) + sizeof ( tagCount ) + tagCount * sizeof ( Tag ) ) ;
2017-05-26 04:48:44 +08:00
hasMsg = true ;
2018-06-09 02:11:08 +08:00
//TraceEvent("SPC_NextMessageB", randomID).detail("MessageVersion", messageVersion.toString());
2017-05-26 04:48:44 +08:00
}
StringRef ILogSystem : : ServerPeekCursor : : getMessage ( ) {
2018-06-09 02:11:08 +08:00
//TraceEvent("SPC_GetMessage", randomID);
2017-05-26 04:48:44 +08:00
return StringRef ( ( uint8_t const * ) rd . readBytes ( messageLength ) , messageLength ) ;
}
2018-03-17 02:40:21 +08:00
StringRef ILogSystem : : ServerPeekCursor : : getMessageWithTags ( ) {
rd . rewind ( ) ;
return StringRef ( ( uint8_t const * ) rd . readBytes ( rawLength ) , rawLength ) ;
}
const std : : vector < Tag > & ILogSystem : : ServerPeekCursor : : getTags ( ) {
2017-06-30 06:50:19 +08:00
return tags ;
}
2017-05-26 04:48:44 +08:00
void ILogSystem : : ServerPeekCursor : : advanceTo ( LogMessageVersion n ) {
2018-06-09 02:11:08 +08:00
//TraceEvent("SPC_AdvanceTo", randomID).detail("N", n.toString());
2017-05-26 04:48:44 +08:00
while ( messageVersion < n & & hasMessage ( ) ) {
getMessage ( ) ;
nextMessage ( ) ;
}
if ( hasMessage ( ) )
return ;
//if( more.isValid() && !more.isReady() ) more.cancel();
if ( messageVersion < n ) {
messageVersion = n ;
}
}
2018-01-12 08:09:49 +08:00
ACTOR Future < Void > serverPeekParallelGetMore ( ILogSystem : : ServerPeekCursor * self , int taskID ) {
2017-05-26 04:48:44 +08:00
if ( ! self - > interf | | self - > messageVersion > = self - > end ) {
2018-08-11 04:57:10 +08:00
wait ( Future < Void > ( Never ( ) ) ) ;
2017-05-26 04:48:44 +08:00
throw internal_error ( ) ;
}
2017-08-10 06:58:06 +08:00
if ( ! self - > interfaceChanged . isValid ( ) ) {
self - > interfaceChanged = self - > interf - > onChange ( ) ;
}
2017-05-26 04:48:44 +08:00
loop {
2018-06-26 02:15:49 +08:00
state Version expectedBegin = self - > messageVersion . version ;
2017-05-26 04:48:44 +08:00
try {
while ( self - > futureResults . size ( ) < SERVER_KNOBS - > PARALLEL_GET_MORE_REQUESTS & & self - > interf - > get ( ) . present ( ) ) {
2018-01-12 08:09:49 +08:00
self - > futureResults . push_back ( brokenPromiseToNever ( self - > interf - > get ( ) . interf ( ) . peekMessages . getReply ( TLogPeekRequest ( self - > messageVersion . version , self - > tag , self - > returnIfBlocked , std : : make_pair ( self - > randomID , self - > sequence + + ) ) , taskID ) ) ) ;
2017-05-26 04:48:44 +08:00
}
choose {
when ( TLogPeekReply res = wait ( self - > interf - > get ( ) . present ( ) ? self - > futureResults . front ( ) : Never ( ) ) ) {
2018-06-26 02:15:49 +08:00
if ( res . begin . get ( ) ! = expectedBegin ) {
throw timed_out ( ) ;
}
expectedBegin = res . end ;
2017-05-26 04:48:44 +08:00
self - > futureResults . pop_front ( ) ;
self - > results = res ;
if ( res . popped . present ( ) )
self - > poppedVersion = std : : min ( std : : max ( self - > poppedVersion , res . popped . get ( ) ) , self - > end . version ) ;
self - > rd = ArenaReader ( self - > results . arena , self - > results . messages , Unversioned ( ) ) ;
LogMessageVersion skipSeq = self - > messageVersion ;
self - > hasMsg = true ;
self - > nextMessage ( ) ;
self - > advanceTo ( skipSeq ) ;
2018-06-09 02:11:08 +08:00
//TraceEvent("SPC_GetMoreB", self->randomID).detail("Has", self->hasMessage()).detail("End", res.end).detail("Popped", res.popped.present() ? res.popped.get() : 0);
2017-05-26 04:48:44 +08:00
return Void ( ) ;
}
2018-08-11 04:57:10 +08:00
when ( wait ( self - > interfaceChanged ) ) {
2017-08-10 06:58:06 +08:00
self - > interfaceChanged = self - > interf - > onChange ( ) ;
self - > randomID = g_random - > randomUniqueID ( ) ;
2017-05-26 04:48:44 +08:00
self - > sequence = 0 ;
self - > futureResults . clear ( ) ;
}
}
} catch ( Error & e ) {
if ( e . code ( ) = = error_code_end_of_stream ) {
self - > end . reset ( self - > messageVersion . version ) ;
return Void ( ) ;
} else if ( e . code ( ) = = error_code_timed_out ) {
TraceEvent ( " PeekCursorTimedOut " , self - > randomID ) ;
2017-08-10 06:58:06 +08:00
self - > interfaceChanged = self - > interf - > onChange ( ) ;
2017-05-26 04:48:44 +08:00
self - > randomID = g_random - > randomUniqueID ( ) ;
self - > sequence = 0 ;
self - > futureResults . clear ( ) ;
} else {
throw e ;
}
}
}
}
2018-01-12 08:09:49 +08:00
ACTOR Future < Void > serverPeekGetMore ( ILogSystem : : ServerPeekCursor * self , int taskID ) {
2017-05-26 04:48:44 +08:00
if ( ! self - > interf | | self - > messageVersion > = self - > end ) {
2018-08-11 04:57:10 +08:00
wait ( Future < Void > ( Never ( ) ) ) ;
2017-05-26 04:48:44 +08:00
throw internal_error ( ) ;
}
try {
loop {
choose {
when ( TLogPeekReply res = wait ( self - > interf - > get ( ) . present ( ) ?
2018-01-12 08:09:49 +08:00
brokenPromiseToNever ( self - > interf - > get ( ) . interf ( ) . peekMessages . getReply ( TLogPeekRequest ( self - > messageVersion . version , self - > tag , self - > returnIfBlocked ) , taskID ) ) : Never ( ) ) ) {
2017-05-26 04:48:44 +08:00
self - > results = res ;
if ( res . popped . present ( ) )
self - > poppedVersion = std : : min ( std : : max ( self - > poppedVersion , res . popped . get ( ) ) , self - > end . version ) ;
self - > rd = ArenaReader ( self - > results . arena , self - > results . messages , Unversioned ( ) ) ;
LogMessageVersion skipSeq = self - > messageVersion ;
self - > hasMsg = true ;
self - > nextMessage ( ) ;
self - > advanceTo ( skipSeq ) ;
2018-06-09 02:11:08 +08:00
//TraceEvent("SPC_GetMoreB", self->randomID).detail("Has", self->hasMessage()).detail("End", res.end).detail("Popped", res.popped.present() ? res.popped.get() : 0);
2017-05-26 04:48:44 +08:00
return Void ( ) ;
}
2018-08-11 04:57:10 +08:00
when ( wait ( self - > interf - > onChange ( ) ) ) { }
2017-05-26 04:48:44 +08:00
}
}
} catch ( Error & e ) {
if ( e . code ( ) = = error_code_end_of_stream ) {
self - > end . reset ( self - > messageVersion . version ) ;
return Void ( ) ;
}
throw e ;
}
}
2018-01-12 08:09:49 +08:00
Future < Void > ILogSystem : : ServerPeekCursor : : getMore ( int taskID ) {
2018-06-09 02:11:08 +08:00
//TraceEvent("SPC_GetMore", randomID).detail("HasMessage", hasMessage()).detail("More", !more.isValid() || more.isReady()).detail("MessageVersion", messageVersion.toString()).detail("End", end.toString());
2017-05-26 04:48:44 +08:00
if ( hasMessage ( ) )
return Void ( ) ;
if ( ! more . isValid ( ) | | more . isReady ( ) ) {
2018-01-12 08:09:49 +08:00
more = parallelGetMore ? serverPeekParallelGetMore ( this , taskID ) : serverPeekGetMore ( this , taskID ) ;
2017-05-26 04:48:44 +08:00
}
return more ;
}
ACTOR Future < Void > serverPeekOnFailed ( ILogSystem : : ServerPeekCursor * self ) {
loop {
choose {
2018-08-11 04:57:10 +08:00
when ( wait ( self - > interf - > get ( ) . present ( ) ? IFailureMonitor : : failureMonitor ( ) . onDisconnectOrFailure ( self - > interf - > get ( ) . interf ( ) . peekMessages . getEndpoint ( ) ) : Never ( ) ) ) { return Void ( ) ; }
when ( wait ( self - > interf - > onChange ( ) ) ) { }
2017-05-26 04:48:44 +08:00
}
}
}
Future < Void > ILogSystem : : ServerPeekCursor : : onFailed ( ) {
return serverPeekOnFailed ( this ) ;
}
bool ILogSystem : : ServerPeekCursor : : isActive ( ) {
if ( ! interf - > get ( ) . present ( ) )
return false ;
if ( messageVersion > = end )
return false ;
return IFailureMonitor : : failureMonitor ( ) . getState ( interf - > get ( ) . interf ( ) . peekMessages . getEndpoint ( ) ) . isAvailable ( ) ;
}
2017-07-16 06:15:03 +08:00
bool ILogSystem : : ServerPeekCursor : : isExhausted ( ) {
return messageVersion > = end ;
}
2018-07-13 03:09:48 +08:00
const LogMessageVersion & ILogSystem : : ServerPeekCursor : : version ( ) { return messageVersion ; } // Call only after nextMessage(). The sequence of the current message, or results.end if nextMessage() has returned false.
2017-05-26 04:48:44 +08:00
2018-06-22 06:29:46 +08:00
Version ILogSystem : : ServerPeekCursor : : getMinKnownCommittedVersion ( ) { return results . minKnownCommittedVersion ; }
2017-05-26 04:48:44 +08:00
Version ILogSystem : : ServerPeekCursor : : popped ( ) { return poppedVersion ; }
2018-07-13 03:09:48 +08:00
ILogSystem : : MergedPeekCursor : : MergedPeekCursor ( vector < Reference < ILogSystem : : IPeekCursor > > const & serverCursors , Version begin )
2018-07-12 06:43:55 +08:00
: serverCursors ( serverCursors ) , bestServer ( - 1 ) , readQuorum ( serverCursors . size ( ) ) , tag ( invalidTag ) , currentCursor ( 0 ) , hasNextMessage ( false ) ,
2018-07-13 03:09:48 +08:00
messageVersion ( begin ) , randomID ( g_random - > randomUniqueID ( ) ) , tLogReplicationFactor ( 0 ) {
2018-03-30 06:12:38 +08:00
sortedVersions . resize ( serverCursors . size ( ) ) ;
}
2018-03-30 08:54:08 +08:00
ILogSystem : : MergedPeekCursor : : MergedPeekCursor ( std : : vector < Reference < AsyncVar < OptionalInterface < TLogInterface > > > > const & logServers , int bestServer , int readQuorum , Tag tag , Version begin , Version end ,
bool parallelGetMore , std : : vector < LocalityData > const & tLogLocalities , IRepPolicyRef const tLogPolicy , int tLogReplicationFactor )
2018-07-13 03:09:48 +08:00
: bestServer ( bestServer ) , readQuorum ( readQuorum ) , tag ( tag ) , currentCursor ( 0 ) , hasNextMessage ( false ) , messageVersion ( begin ) , randomID ( g_random - > randomUniqueID ( ) ) , tLogReplicationFactor ( tLogReplicationFactor ) {
2018-07-12 06:43:55 +08:00
if ( tLogPolicy ) {
logSet = Reference < LogSet > ( new LogSet ( ) ) ;
logSet - > tLogPolicy = tLogPolicy ;
logSet - > tLogLocalities = tLogLocalities ;
filterLocalityDataForPolicy ( logSet - > tLogPolicy , & logSet - > tLogLocalities ) ;
logSet - > updateLocalitySet ( logSet - > tLogLocalities ) ;
}
2017-05-26 04:48:44 +08:00
for ( int i = 0 ; i < logServers . size ( ) ; i + + ) {
2017-06-30 06:50:19 +08:00
Reference < ILogSystem : : ServerPeekCursor > cursor ( new ILogSystem : : ServerPeekCursor ( logServers [ i ] , tag , begin , end , bestServer > = 0 , parallelGetMore ) ) ;
2018-06-09 02:11:08 +08:00
//TraceEvent("MPC_Starting", randomID).detail("Cursor", cursor->randomID).detail("End", end);
2017-05-26 04:48:44 +08:00
serverCursors . push_back ( cursor ) ;
}
sortedVersions . resize ( serverCursors . size ( ) ) ;
}
2018-07-12 06:43:55 +08:00
ILogSystem : : MergedPeekCursor : : MergedPeekCursor ( vector < Reference < ILogSystem : : IPeekCursor > > const & serverCursors , LogMessageVersion const & messageVersion , int bestServer , int readQuorum , Optional < LogMessageVersion > nextVersion , Reference < LogSet > logSet , int tLogReplicationFactor )
: serverCursors ( serverCursors ) , bestServer ( bestServer ) , readQuorum ( readQuorum ) , currentCursor ( 0 ) , hasNextMessage ( false ) , messageVersion ( messageVersion ) , nextVersion ( nextVersion ) , logSet ( logSet ) ,
2018-07-13 03:09:48 +08:00
randomID ( g_random - > randomUniqueID ( ) ) , tLogReplicationFactor ( tLogReplicationFactor ) {
2018-03-30 06:12:38 +08:00
sortedVersions . resize ( serverCursors . size ( ) ) ;
2017-05-26 04:48:44 +08:00
calcHasMessage ( ) ;
}
Reference < ILogSystem : : IPeekCursor > ILogSystem : : MergedPeekCursor : : cloneNoMore ( ) {
vector < Reference < ILogSystem : : IPeekCursor > > cursors ;
for ( auto it : serverCursors ) {
cursors . push_back ( it - > cloneNoMore ( ) ) ;
}
2018-07-12 06:43:55 +08:00
return Reference < ILogSystem : : MergedPeekCursor > ( new ILogSystem : : MergedPeekCursor ( cursors , messageVersion , bestServer , readQuorum , nextVersion , logSet , tLogReplicationFactor ) ) ;
2017-05-26 04:48:44 +08:00
}
void ILogSystem : : MergedPeekCursor : : setProtocolVersion ( uint64_t version ) {
for ( auto it : serverCursors )
if ( it - > hasMessage ( ) )
it - > setProtocolVersion ( version ) ;
}
Arena & ILogSystem : : MergedPeekCursor : : arena ( ) { return serverCursors [ currentCursor ] - > arena ( ) ; }
ArenaReader * ILogSystem : : MergedPeekCursor : : reader ( ) { return serverCursors [ currentCursor ] - > reader ( ) ; }
void ILogSystem : : MergedPeekCursor : : calcHasMessage ( ) {
2017-06-30 06:50:19 +08:00
if ( bestServer > = 0 ) {
if ( nextVersion . present ( ) ) serverCursors [ bestServer ] - > advanceTo ( nextVersion . get ( ) ) ;
if ( serverCursors [ bestServer ] - > hasMessage ( ) ) {
messageVersion = serverCursors [ bestServer ] - > version ( ) ;
currentCursor = bestServer ;
hasNextMessage = true ;
2017-05-26 04:48:44 +08:00
2017-06-30 06:50:19 +08:00
for ( auto & c : serverCursors )
c - > advanceTo ( messageVersion ) ;
2017-05-26 04:48:44 +08:00
2017-06-30 06:50:19 +08:00
return ;
}
2017-05-26 04:48:44 +08:00
2017-06-30 06:50:19 +08:00
auto bestVersion = serverCursors [ bestServer ] - > version ( ) ;
for ( auto & c : serverCursors )
c - > advanceTo ( bestVersion ) ;
}
2017-05-26 04:48:44 +08:00
hasNextMessage = false ;
2018-03-30 06:12:38 +08:00
updateMessage ( false ) ;
2018-07-12 06:43:55 +08:00
if ( ! hasNextMessage & & logSet ) {
2018-03-30 06:12:38 +08:00
updateMessage ( true ) ;
}
2017-05-26 04:48:44 +08:00
}
2018-03-30 06:12:38 +08:00
void ILogSystem : : MergedPeekCursor : : updateMessage ( bool usePolicy ) {
2017-05-26 04:48:44 +08:00
loop {
bool advancedPast = false ;
sortedVersions . clear ( ) ;
for ( int i = 0 ; i < serverCursors . size ( ) ; i + + ) {
auto & serverCursor = serverCursors [ i ] ;
if ( nextVersion . present ( ) ) serverCursor - > advanceTo ( nextVersion . get ( ) ) ;
sortedVersions . push_back ( std : : pair < LogMessageVersion , int > ( serverCursor - > version ( ) , i ) ) ;
}
2018-03-30 06:12:38 +08:00
if ( usePolicy ) {
2018-07-12 06:43:55 +08:00
ASSERT ( logSet - > tLogPolicy ) ;
2018-03-30 06:12:38 +08:00
std : : sort ( sortedVersions . begin ( ) , sortedVersions . end ( ) ) ;
2018-07-12 06:43:55 +08:00
locations . clear ( ) ;
2018-03-30 06:12:38 +08:00
for ( auto sortedVersion : sortedVersions ) {
2018-07-12 06:43:55 +08:00
locations . push_back ( logSet - > logEntryArray [ sortedVersion . second ] ) ;
if ( locations . size ( ) > = tLogReplicationFactor & & logSet - > satisfiesPolicy ( locations ) ) {
2018-03-30 06:12:38 +08:00
messageVersion = sortedVersion . first ;
break ;
}
}
} else {
std : : nth_element ( sortedVersions . begin ( ) , sortedVersions . end ( ) - readQuorum , sortedVersions . end ( ) ) ;
messageVersion = sortedVersions [ sortedVersions . size ( ) - readQuorum ] . first ;
}
2017-05-26 04:48:44 +08:00
for ( int i = 0 ; i < serverCursors . size ( ) ; i + + ) {
auto & c = serverCursors [ i ] ;
auto start = c - > version ( ) ;
c - > advanceTo ( messageVersion ) ;
2018-03-30 06:12:38 +08:00
if ( start < = messageVersion & & messageVersion < c - > version ( ) ) {
2017-05-26 04:48:44 +08:00
advancedPast = true ;
TEST ( true ) ; //Merge peek cursor advanced past desired sequence
}
}
if ( ! advancedPast )
break ;
}
for ( int i = 0 ; i < serverCursors . size ( ) ; i + + ) {
auto & c = serverCursors [ i ] ;
ASSERT_WE_THINK ( ! c - > hasMessage ( ) | | c - > version ( ) > = messageVersion ) ; // Seems like the loop above makes this unconditionally true
if ( c - > version ( ) = = messageVersion & & c - > hasMessage ( ) ) {
hasNextMessage = true ;
currentCursor = i ;
2018-07-13 03:09:48 +08:00
break ;
2017-05-26 04:48:44 +08:00
}
}
}
bool ILogSystem : : MergedPeekCursor : : hasMessage ( ) {
return hasNextMessage ;
}
void ILogSystem : : MergedPeekCursor : : nextMessage ( ) {
nextVersion = version ( ) ;
nextVersion . get ( ) . sub + + ;
serverCursors [ currentCursor ] - > nextMessage ( ) ;
calcHasMessage ( ) ;
ASSERT ( hasMessage ( ) | | ! version ( ) . sub ) ;
}
StringRef ILogSystem : : MergedPeekCursor : : getMessage ( ) { return serverCursors [ currentCursor ] - > getMessage ( ) ; }
2018-07-13 03:09:48 +08:00
StringRef ILogSystem : : MergedPeekCursor : : getMessageWithTags ( ) {
return serverCursors [ currentCursor ] - > getMessageWithTags ( ) ;
2018-04-09 12:24:05 +08:00
}
2018-03-17 02:40:21 +08:00
const std : : vector < Tag > & ILogSystem : : MergedPeekCursor : : getTags ( ) {
2017-06-30 06:50:19 +08:00
return serverCursors [ currentCursor ] - > getTags ( ) ;
}
2017-05-26 04:48:44 +08:00
void ILogSystem : : MergedPeekCursor : : advanceTo ( LogMessageVersion n ) {
2018-07-12 06:43:55 +08:00
bool canChange = false ;
for ( auto & c : serverCursors ) {
if ( c - > version ( ) < n ) {
canChange = true ;
c - > advanceTo ( n ) ;
}
}
if ( canChange ) {
calcHasMessage ( ) ;
}
2017-05-26 04:48:44 +08:00
}
2018-01-12 08:09:49 +08:00
ACTOR Future < Void > mergedPeekGetMore ( ILogSystem : : MergedPeekCursor * self , LogMessageVersion startVersion , int taskID ) {
2017-05-26 04:48:44 +08:00
loop {
2018-06-09 02:11:08 +08:00
//TraceEvent("MPC_GetMoreA", self->randomID).detail("Start", startVersion.toString());
2017-06-30 06:50:19 +08:00
if ( self - > bestServer > = 0 & & self - > serverCursors [ self - > bestServer ] - > isActive ( ) ) {
2017-05-26 04:48:44 +08:00
ASSERT ( ! self - > serverCursors [ self - > bestServer ] - > hasMessage ( ) ) ;
2018-08-11 04:57:10 +08:00
wait ( self - > serverCursors [ self - > bestServer ] - > getMore ( taskID ) | | self - > serverCursors [ self - > bestServer ] - > onFailed ( ) ) ;
2017-05-26 04:48:44 +08:00
} else {
vector < Future < Void > > q ;
for ( auto & c : self - > serverCursors )
if ( ! c - > hasMessage ( ) )
2018-01-12 08:09:49 +08:00
q . push_back ( c - > getMore ( taskID ) ) ;
2018-08-11 04:57:10 +08:00
wait ( quorum ( q , 1 ) ) ;
2017-05-26 04:48:44 +08:00
}
self - > calcHasMessage ( ) ;
2018-06-09 02:11:08 +08:00
//TraceEvent("MPC_GetMoreB", self->randomID).detail("HasMessage", self->hasMessage()).detail("Start", startVersion.toString()).detail("Seq", self->version().toString());
2018-04-09 12:24:05 +08:00
if ( self - > hasMessage ( ) | | self - > version ( ) > startVersion ) {
2017-05-26 04:48:44 +08:00
return Void ( ) ;
2018-04-09 12:24:05 +08:00
}
2017-05-26 04:48:44 +08:00
}
}
2018-01-12 08:09:49 +08:00
Future < Void > ILogSystem : : MergedPeekCursor : : getMore ( int taskID ) {
2017-06-30 06:50:19 +08:00
if ( ! serverCursors . size ( ) )
return Never ( ) ;
2017-05-26 04:48:44 +08:00
auto startVersion = version ( ) ;
calcHasMessage ( ) ;
if ( hasMessage ( ) )
return Void ( ) ;
if ( nextVersion . present ( ) )
advanceTo ( nextVersion . get ( ) ) ;
ASSERT ( ! hasMessage ( ) ) ;
if ( version ( ) > startVersion )
return Void ( ) ;
2018-01-12 08:09:49 +08:00
return mergedPeekGetMore ( this , startVersion , taskID ) ;
2017-05-26 04:48:44 +08:00
}
Future < Void > ILogSystem : : MergedPeekCursor : : onFailed ( ) {
ASSERT ( false ) ;
return Never ( ) ;
}
bool ILogSystem : : MergedPeekCursor : : isActive ( ) {
ASSERT ( false ) ;
return false ;
}
2017-07-16 06:15:03 +08:00
bool ILogSystem : : MergedPeekCursor : : isExhausted ( ) {
2018-04-24 13:03:55 +08:00
return serverCursors [ currentCursor ] - > isExhausted ( ) ;
2017-07-16 06:15:03 +08:00
}
2018-07-13 03:09:48 +08:00
const LogMessageVersion & ILogSystem : : MergedPeekCursor : : version ( ) { return messageVersion ; }
2017-05-26 04:48:44 +08:00
2018-06-22 06:29:46 +08:00
Version ILogSystem : : MergedPeekCursor : : getMinKnownCommittedVersion ( ) {
return serverCursors [ currentCursor ] - > getMinKnownCommittedVersion ( ) ;
}
2017-05-26 04:48:44 +08:00
Version ILogSystem : : MergedPeekCursor : : popped ( ) {
Version poppedVersion = 0 ;
for ( auto & c : serverCursors )
poppedVersion = std : : max ( poppedVersion , c - > popped ( ) ) ;
return poppedVersion ;
}
2017-07-12 06:48:10 +08:00
ILogSystem : : SetPeekCursor : : SetPeekCursor ( std : : vector < Reference < LogSet > > const & logSets , int bestSet , int bestServer , Tag tag , Version begin , Version end , bool parallelGetMore )
2017-07-11 08:41:32 +08:00
: logSets ( logSets ) , bestSet ( bestSet ) , bestServer ( bestServer ) , tag ( tag ) , currentCursor ( 0 ) , currentSet ( bestSet ) , hasNextMessage ( false ) , messageVersion ( begin ) , useBestSet ( true ) , randomID ( g_random - > randomUniqueID ( ) ) {
serverCursors . resize ( logSets . size ( ) ) ;
int maxServers = 0 ;
for ( int i = 0 ; i < logSets . size ( ) ; i + + ) {
2017-07-12 06:48:10 +08:00
for ( int j = 0 ; j < logSets [ i ] - > logServers . size ( ) ; j + + ) {
Reference < ILogSystem : : ServerPeekCursor > cursor ( new ILogSystem : : ServerPeekCursor ( logSets [ i ] - > logServers [ j ] , tag , begin , end , true , parallelGetMore ) ) ;
2017-07-11 08:41:32 +08:00
serverCursors [ i ] . push_back ( cursor ) ;
}
maxServers = std : : max < int > ( maxServers , serverCursors [ i ] . size ( ) ) ;
}
sortedVersions . resize ( maxServers ) ;
}
2018-04-26 09:20:28 +08:00
ILogSystem : : SetPeekCursor : : SetPeekCursor ( std : : vector < Reference < LogSet > > const & logSets , std : : vector < std : : vector < Reference < IPeekCursor > > > const & serverCursors , LogMessageVersion const & messageVersion , int bestSet , int bestServer ,
2018-04-27 09:32:12 +08:00
Optional < LogMessageVersion > nextVersion , bool useBestSet ) : logSets ( logSets ) , serverCursors ( serverCursors ) , messageVersion ( messageVersion ) , bestSet ( bestSet ) , bestServer ( bestServer ) , nextVersion ( nextVersion ) , currentSet ( bestSet ) , currentCursor ( 0 ) ,
hasNextMessage ( false ) , useBestSet ( useBestSet ) , randomID ( g_random - > randomUniqueID ( ) ) {
2018-04-26 09:20:28 +08:00
int maxServers = 0 ;
for ( int i = 0 ; i < logSets . size ( ) ; i + + ) {
maxServers = std : : max < int > ( maxServers , serverCursors [ i ] . size ( ) ) ;
}
sortedVersions . resize ( maxServers ) ;
calcHasMessage ( ) ;
}
2017-07-11 08:41:32 +08:00
Reference < ILogSystem : : IPeekCursor > ILogSystem : : SetPeekCursor : : cloneNoMore ( ) {
2018-04-26 09:20:28 +08:00
vector < vector < Reference < ILogSystem : : IPeekCursor > > > cursors ;
cursors . resize ( logSets . size ( ) ) ;
for ( int i = 0 ; i < logSets . size ( ) ; i + + ) {
for ( int j = 0 ; j < logSets [ i ] - > logServers . size ( ) ; j + + ) {
cursors [ i ] . push_back ( serverCursors [ i ] [ j ] - > cloneNoMore ( ) ) ;
}
}
2018-04-27 09:32:12 +08:00
return Reference < ILogSystem : : SetPeekCursor > ( new ILogSystem : : SetPeekCursor ( logSets , cursors , messageVersion , bestSet , bestServer , nextVersion , useBestSet ) ) ;
2017-07-11 08:41:32 +08:00
}
void ILogSystem : : SetPeekCursor : : setProtocolVersion ( uint64_t version ) {
for ( auto & cursors : serverCursors ) {
for ( auto & it : cursors ) {
if ( it - > hasMessage ( ) ) {
it - > setProtocolVersion ( version ) ;
}
}
}
}
Arena & ILogSystem : : SetPeekCursor : : arena ( ) { return serverCursors [ currentSet ] [ currentCursor ] - > arena ( ) ; }
ArenaReader * ILogSystem : : SetPeekCursor : : reader ( ) { return serverCursors [ currentSet ] [ currentCursor ] - > reader ( ) ; }
void ILogSystem : : SetPeekCursor : : calcHasMessage ( ) {
2017-09-08 06:32:08 +08:00
if ( bestSet > = 0 & & bestServer > = 0 ) {
if ( nextVersion . present ( ) ) {
2018-06-09 02:11:08 +08:00
//TraceEvent("LPC_CalcNext").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage).detail("NextVersion", nextVersion.get().toString());
2017-09-08 06:32:08 +08:00
serverCursors [ bestSet ] [ bestServer ] - > advanceTo ( nextVersion . get ( ) ) ;
}
if ( serverCursors [ bestSet ] [ bestServer ] - > hasMessage ( ) ) {
messageVersion = serverCursors [ bestSet ] [ bestServer ] - > version ( ) ;
currentSet = bestSet ;
currentCursor = bestServer ;
hasNextMessage = true ;
2017-07-11 08:41:32 +08:00
2018-06-09 02:11:08 +08:00
//TraceEvent("LPC_Calc1").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage);
2017-09-08 06:32:08 +08:00
for ( auto & cursors : serverCursors ) {
for ( auto & c : cursors ) {
c - > advanceTo ( messageVersion ) ;
}
2017-07-11 08:41:32 +08:00
}
2017-09-08 06:32:08 +08:00
return ;
}
2017-07-11 08:41:32 +08:00
2017-09-08 06:32:08 +08:00
auto bestVersion = serverCursors [ bestSet ] [ bestServer ] - > version ( ) ;
for ( auto & cursors : serverCursors ) {
for ( auto & c : cursors ) {
c - > advanceTo ( bestVersion ) ;
}
2017-07-11 08:41:32 +08:00
}
}
2017-07-16 06:15:03 +08:00
hasNextMessage = false ;
2017-07-11 08:41:32 +08:00
if ( useBestSet ) {
updateMessage ( bestSet , false ) ; // Use Quorum logic
2018-06-09 02:11:08 +08:00
//TraceEvent("LPC_Calc2").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage);
2017-07-11 08:41:32 +08:00
if ( ! hasNextMessage ) {
updateMessage ( bestSet , true ) ;
2018-06-09 02:11:08 +08:00
//TraceEvent("LPC_Calc3").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage);
2017-07-11 08:41:32 +08:00
}
} else {
for ( int i = 0 ; i < logSets . size ( ) & & ! hasNextMessage ; i + + ) {
if ( i ! = bestSet ) {
updateMessage ( i , false ) ; // Use Quorum logic
}
}
2018-06-09 02:11:08 +08:00
//TraceEvent("LPC_Calc4").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage);
2017-07-11 08:41:32 +08:00
for ( int i = 0 ; i < logSets . size ( ) & & ! hasNextMessage ; i + + ) {
if ( i ! = bestSet ) {
updateMessage ( i , true ) ;
}
}
2018-06-09 02:11:08 +08:00
//TraceEvent("LPC_Calc5").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage);
2017-07-11 08:41:32 +08:00
}
}
void ILogSystem : : SetPeekCursor : : updateMessage ( int logIdx , bool usePolicy ) {
loop {
bool advancedPast = false ;
sortedVersions . clear ( ) ;
for ( int i = 0 ; i < serverCursors [ logIdx ] . size ( ) ; i + + ) {
2017-07-14 03:29:21 +08:00
auto & serverCursor = serverCursors [ logIdx ] [ i ] ;
if ( nextVersion . present ( ) ) serverCursor - > advanceTo ( nextVersion . get ( ) ) ;
sortedVersions . push_back ( std : : pair < LogMessageVersion , int > ( serverCursor - > version ( ) , i ) ) ;
2018-06-09 02:11:08 +08:00
//TraceEvent("LPC_Update1").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage).detail("ServerVer", serverCursor->version().toString()).detail("I", i);
2017-07-11 08:41:32 +08:00
}
if ( usePolicy ) {
std : : sort ( sortedVersions . begin ( ) , sortedVersions . end ( ) ) ;
2018-07-12 06:43:55 +08:00
locations . clear ( ) ;
2017-07-11 08:41:32 +08:00
for ( auto sortedVersion : sortedVersions ) {
2017-07-12 06:48:10 +08:00
auto & locality = logSets [ logIdx ] - > tLogLocalities [ sortedVersion . second ] ;
2018-07-12 06:43:55 +08:00
locations . push_back ( logSets [ logIdx ] - > logEntryArray [ sortedVersion . second ] ) ;
if ( locations . size ( ) > = logSets [ logIdx ] - > tLogReplicationFactor & & logSets [ logIdx ] - > satisfiesPolicy ( locations ) ) {
2017-07-11 08:41:32 +08:00
messageVersion = sortedVersion . first ;
break ;
}
}
} else {
//(int)oldLogData[i].logServers.size() + 1 - oldLogData[i].tLogReplicationFactor
2017-07-12 06:48:10 +08:00
std : : nth_element ( sortedVersions . begin ( ) , sortedVersions . end ( ) - ( logSets [ logIdx ] - > logServers . size ( ) + 1 - logSets [ logIdx ] - > tLogReplicationFactor ) , sortedVersions . end ( ) ) ;
messageVersion = sortedVersions [ sortedVersions . size ( ) - ( logSets [ logIdx ] - > logServers . size ( ) + 1 - logSets [ logIdx ] - > tLogReplicationFactor ) ] . first ;
2017-07-11 08:41:32 +08:00
}
2017-09-08 06:32:08 +08:00
for ( auto & cursors : serverCursors ) {
for ( auto & c : cursors ) {
auto start = c - > version ( ) ;
c - > advanceTo ( messageVersion ) ;
2018-03-30 06:12:38 +08:00
if ( start < = messageVersion & & messageVersion < c - > version ( ) ) {
2017-09-08 06:32:08 +08:00
advancedPast = true ;
TEST ( true ) ; //Merge peek cursor advanced past desired sequence
}
2017-07-11 08:41:32 +08:00
}
}
if ( ! advancedPast )
break ;
}
for ( int i = 0 ; i < serverCursors [ logIdx ] . size ( ) ; i + + ) {
auto & c = serverCursors [ logIdx ] [ i ] ;
ASSERT_WE_THINK ( ! c - > hasMessage ( ) | | c - > version ( ) > = messageVersion ) ; // Seems like the loop above makes this unconditionally true
if ( c - > version ( ) = = messageVersion & & c - > hasMessage ( ) ) {
hasNextMessage = true ;
currentSet = logIdx ;
currentCursor = i ;
break ;
}
}
}
bool ILogSystem : : SetPeekCursor : : hasMessage ( ) {
return hasNextMessage ;
}
void ILogSystem : : SetPeekCursor : : nextMessage ( ) {
nextVersion = version ( ) ;
nextVersion . get ( ) . sub + + ;
serverCursors [ currentSet ] [ currentCursor ] - > nextMessage ( ) ;
calcHasMessage ( ) ;
ASSERT ( hasMessage ( ) | | ! version ( ) . sub ) ;
}
StringRef ILogSystem : : SetPeekCursor : : getMessage ( ) { return serverCursors [ currentSet ] [ currentCursor ] - > getMessage ( ) ; }
2018-03-17 02:40:21 +08:00
StringRef ILogSystem : : SetPeekCursor : : getMessageWithTags ( ) { return serverCursors [ currentSet ] [ currentCursor ] - > getMessageWithTags ( ) ; }
const std : : vector < Tag > & ILogSystem : : SetPeekCursor : : getTags ( ) {
2017-07-11 08:41:32 +08:00
return serverCursors [ currentSet ] [ currentCursor ] - > getTags ( ) ;
}
void ILogSystem : : SetPeekCursor : : advanceTo ( LogMessageVersion n ) {
2018-07-12 06:43:55 +08:00
bool canChange = false ;
2017-07-11 08:41:32 +08:00
for ( auto & cursors : serverCursors ) {
for ( auto & c : cursors ) {
2018-07-12 06:43:55 +08:00
if ( c - > version ( ) < n ) {
canChange = true ;
c - > advanceTo ( n ) ;
}
2017-07-11 08:41:32 +08:00
}
}
2018-07-12 06:43:55 +08:00
if ( canChange ) {
calcHasMessage ( ) ;
}
2017-07-11 08:41:32 +08:00
}
2018-01-17 02:48:50 +08:00
ACTOR Future < Void > setPeekGetMore ( ILogSystem : : SetPeekCursor * self , LogMessageVersion startVersion , int taskID ) {
2017-07-11 08:41:32 +08:00
loop {
2018-06-09 02:11:08 +08:00
//TraceEvent("LPC_GetMore1", self->randomID).detail("Start", startVersion.toString()).detail("Tag", self->tag);
2017-07-11 08:41:32 +08:00
if ( self - > bestServer > = 0 & & self - > bestSet > = 0 & & self - > serverCursors [ self - > bestSet ] [ self - > bestServer ] - > isActive ( ) ) {
ASSERT ( ! self - > serverCursors [ self - > bestSet ] [ self - > bestServer ] - > hasMessage ( ) ) ;
2018-06-09 02:11:08 +08:00
//TraceEvent("LPC_GetMore2", self->randomID).detail("Start", startVersion.toString()).detail("Tag", self->tag);
2018-08-11 04:57:10 +08:00
wait ( self - > serverCursors [ self - > bestSet ] [ self - > bestServer ] - > getMore ( taskID ) | | self - > serverCursors [ self - > bestSet ] [ self - > bestServer ] - > onFailed ( ) ) ;
2017-07-11 08:41:32 +08:00
self - > useBestSet = true ;
} else {
2017-07-16 06:15:03 +08:00
//FIXME: if best set is exhausted, do not peek remote servers
2017-07-11 08:41:32 +08:00
bool bestSetValid = self - > bestSet > = 0 ;
if ( bestSetValid ) {
2018-07-12 06:43:55 +08:00
self - > locations . clear ( ) ;
2017-07-11 08:41:32 +08:00
for ( int i = 0 ; i < self - > serverCursors [ self - > bestSet ] . size ( ) ; i + + ) {
2018-03-08 04:54:53 +08:00
if ( ! self - > serverCursors [ self - > bestSet ] [ i ] - > isActive ( ) & & self - > serverCursors [ self - > bestSet ] [ i ] - > version ( ) < = self - > messageVersion ) {
2018-07-12 06:43:55 +08:00
self - > locations . push_back ( self - > logSets [ self - > bestSet ] - > logEntryArray [ i ] ) ;
2017-07-11 08:41:32 +08:00
}
}
2018-07-12 06:43:55 +08:00
bestSetValid = self - > locations . size ( ) < self - > logSets [ self - > bestSet ] - > tLogReplicationFactor | | ! self - > logSets [ self - > bestSet ] - > satisfiesPolicy ( self - > locations ) ;
2017-07-11 08:41:32 +08:00
}
2018-01-17 10:12:40 +08:00
if ( bestSetValid | | self - > logSets . size ( ) = = 1 ) {
2017-10-20 06:36:32 +08:00
if ( ! self - > useBestSet ) {
self - > useBestSet = true ;
self - > calcHasMessage ( ) ;
if ( self - > hasMessage ( ) | | self - > version ( ) > startVersion )
return Void ( ) ;
}
2018-07-12 06:43:55 +08:00
2018-06-09 02:11:08 +08:00
//TraceEvent("LPC_GetMore3", self->randomID).detail("Start", startVersion.toString()).detail("Tag", self->tag.toString()).detail("BestSetSize", self->serverCursors[self->bestSet].size());
2017-07-11 08:41:32 +08:00
vector < Future < Void > > q ;
for ( auto & c : self - > serverCursors [ self - > bestSet ] ) {
if ( ! c - > hasMessage ( ) ) {
2018-01-17 02:48:50 +08:00
q . push_back ( c - > getMore ( taskID ) ) ;
2017-10-25 06:09:31 +08:00
if ( c - > isActive ( ) ) {
q . push_back ( c - > onFailed ( ) ) ;
}
2017-07-11 08:41:32 +08:00
}
}
2018-08-11 04:57:10 +08:00
wait ( quorum ( q , 1 ) ) ;
2017-07-11 08:41:32 +08:00
} else {
//FIXME: this will peeking way too many cursors when satellites exist, and does not need to peek bestSet cursors since we cannot get anymore data from them
vector < Future < Void > > q ;
2018-06-09 02:11:08 +08:00
//TraceEvent("LPC_GetMore4", self->randomID).detail("Start", startVersion.toString()).detail("Tag", self->tag);
2017-07-11 08:41:32 +08:00
for ( auto & cursors : self - > serverCursors ) {
for ( auto & c : cursors ) {
if ( ! c - > hasMessage ( ) ) {
2018-01-17 02:48:50 +08:00
q . push_back ( c - > getMore ( taskID ) ) ;
2017-07-11 08:41:32 +08:00
}
}
}
2018-08-11 04:57:10 +08:00
wait ( quorum ( q , 1 ) ) ;
2017-07-11 08:41:32 +08:00
self - > useBestSet = false ;
}
}
self - > calcHasMessage ( ) ;
2018-06-09 02:11:08 +08:00
//TraceEvent("LPC_GetMoreB", self->randomID).detail("HasMessage", self->hasMessage()).detail("Start", startVersion.toString()).detail("Seq", self->version().toString());
2017-07-11 08:41:32 +08:00
if ( self - > hasMessage ( ) | | self - > version ( ) > startVersion )
return Void ( ) ;
}
}
2018-01-17 02:48:50 +08:00
Future < Void > ILogSystem : : SetPeekCursor : : getMore ( int taskID ) {
2017-07-11 08:41:32 +08:00
auto startVersion = version ( ) ;
calcHasMessage ( ) ;
if ( hasMessage ( ) )
return Void ( ) ;
if ( nextVersion . present ( ) )
advanceTo ( nextVersion . get ( ) ) ;
ASSERT ( ! hasMessage ( ) ) ;
if ( version ( ) > startVersion )
return Void ( ) ;
2018-01-17 02:48:50 +08:00
return setPeekGetMore ( this , startVersion , taskID ) ;
2017-07-11 08:41:32 +08:00
}
Future < Void > ILogSystem : : SetPeekCursor : : onFailed ( ) {
ASSERT ( false ) ;
return Never ( ) ;
}
bool ILogSystem : : SetPeekCursor : : isActive ( ) {
ASSERT ( false ) ;
return false ;
}
2017-07-16 06:15:03 +08:00
bool ILogSystem : : SetPeekCursor : : isExhausted ( ) {
2018-04-24 13:03:55 +08:00
return serverCursors [ currentSet ] [ currentCursor ] - > isExhausted ( ) ;
2017-07-16 06:15:03 +08:00
}
2018-07-13 03:09:48 +08:00
const LogMessageVersion & ILogSystem : : SetPeekCursor : : version ( ) { return messageVersion ; }
2017-07-11 08:41:32 +08:00
2018-06-22 06:29:46 +08:00
Version ILogSystem : : SetPeekCursor : : getMinKnownCommittedVersion ( ) {
return serverCursors [ currentSet ] [ currentCursor ] - > getMinKnownCommittedVersion ( ) ;
}
2017-07-11 08:41:32 +08:00
Version ILogSystem : : SetPeekCursor : : popped ( ) {
Version poppedVersion = 0 ;
for ( auto & cursors : serverCursors ) {
for ( auto & c : cursors ) {
poppedVersion = std : : max ( poppedVersion , c - > popped ( ) ) ;
}
}
return poppedVersion ;
}
2018-10-04 04:57:45 +08:00
ILogSystem : : MultiCursor : : MultiCursor ( std : : vector < Reference < IPeekCursor > > cursors , std : : vector < LogMessageVersion > epochEnds ) : cursors ( cursors ) , epochEnds ( epochEnds ) , poppedVersion ( 0 ) {
for ( int i = 0 ; i < std : : min < int > ( cursors . size ( ) , SERVER_KNOBS - > MULTI_CURSOR_PRE_FETCH_LIMIT ) ; i + + ) {
cursors [ cursors . size ( ) - i - 1 ] - > getMore ( ) ;
}
}
2017-05-26 04:48:44 +08:00
Reference < ILogSystem : : IPeekCursor > ILogSystem : : MultiCursor : : cloneNoMore ( ) {
return cursors . back ( ) - > cloneNoMore ( ) ;
}
void ILogSystem : : MultiCursor : : setProtocolVersion ( uint64_t version ) {
cursors . back ( ) - > setProtocolVersion ( version ) ;
}
Arena & ILogSystem : : MultiCursor : : arena ( ) {
return cursors . back ( ) - > arena ( ) ;
}
ArenaReader * ILogSystem : : MultiCursor : : reader ( ) {
return cursors . back ( ) - > reader ( ) ;
}
bool ILogSystem : : MultiCursor : : hasMessage ( ) {
return cursors . back ( ) - > hasMessage ( ) ;
}
void ILogSystem : : MultiCursor : : nextMessage ( ) {
cursors . back ( ) - > nextMessage ( ) ;
}
StringRef ILogSystem : : MultiCursor : : getMessage ( ) {
return cursors . back ( ) - > getMessage ( ) ;
}
2018-03-17 02:40:21 +08:00
StringRef ILogSystem : : MultiCursor : : getMessageWithTags ( ) {
return cursors . back ( ) - > getMessageWithTags ( ) ;
}
const std : : vector < Tag > & ILogSystem : : MultiCursor : : getTags ( ) {
2017-06-30 06:50:19 +08:00
return cursors . back ( ) - > getTags ( ) ;
}
2017-05-26 04:48:44 +08:00
void ILogSystem : : MultiCursor : : advanceTo ( LogMessageVersion n ) {
while ( cursors . size ( ) > 1 & & n > = epochEnds . back ( ) ) {
poppedVersion = std : : max ( poppedVersion , cursors . back ( ) - > popped ( ) ) ;
cursors . pop_back ( ) ;
epochEnds . pop_back ( ) ;
}
cursors . back ( ) - > advanceTo ( n ) ;
}
2018-01-12 08:09:49 +08:00
Future < Void > ILogSystem : : MultiCursor : : getMore ( int taskID ) {
2018-11-05 12:26:23 +08:00
LogMessageVersion startVersion = cursors . back ( ) - > version ( ) ;
2017-05-26 04:48:44 +08:00
while ( cursors . size ( ) > 1 & & cursors . back ( ) - > version ( ) > = epochEnds . back ( ) ) {
poppedVersion = std : : max ( poppedVersion , cursors . back ( ) - > popped ( ) ) ;
cursors . pop_back ( ) ;
epochEnds . pop_back ( ) ;
}
2018-11-03 04:04:09 +08:00
if ( cursors . back ( ) - > version ( ) > startVersion ) {
return Void ( ) ;
2017-05-26 04:48:44 +08:00
}
2018-01-12 08:09:49 +08:00
return cursors . back ( ) - > getMore ( taskID ) ;
2017-05-26 04:48:44 +08:00
}
Future < Void > ILogSystem : : MultiCursor : : onFailed ( ) {
return cursors . back ( ) - > onFailed ( ) ;
}
bool ILogSystem : : MultiCursor : : isActive ( ) {
return cursors . back ( ) - > isActive ( ) ;
}
2017-07-16 06:15:03 +08:00
bool ILogSystem : : MultiCursor : : isExhausted ( ) {
2018-04-24 13:03:55 +08:00
return cursors . back ( ) - > isExhausted ( ) ;
2017-07-16 06:15:03 +08:00
}
2018-07-13 03:09:48 +08:00
const LogMessageVersion & ILogSystem : : MultiCursor : : version ( ) {
2017-05-26 04:48:44 +08:00
return cursors . back ( ) - > version ( ) ;
}
2018-06-22 06:29:46 +08:00
Version ILogSystem : : MultiCursor : : getMinKnownCommittedVersion ( ) {
return cursors . back ( ) - > getMinKnownCommittedVersion ( ) ;
}
2017-05-26 04:48:44 +08:00
Version ILogSystem : : MultiCursor : : popped ( ) {
return std : : max ( poppedVersion , cursors . back ( ) - > popped ( ) ) ;
}
2018-07-13 03:09:48 +08:00
ILogSystem : : BufferedCursor : : BufferedCursor ( std : : vector < Reference < IPeekCursor > > cursors , Version begin , Version end , bool collectTags ) : cursors ( cursors ) , messageVersion ( begin ) , end ( end ) , collectTags ( collectTags ) , hasNextMessage ( false ) , messageIndex ( 0 ) {
messages . reserve ( 10000 ) ;
}
void ILogSystem : : BufferedCursor : : combineMessages ( ) {
if ( ! hasNextMessage ) {
return ;
}
tags . clear ( ) ;
tags . push_back ( messages [ messageIndex ] . tags [ 0 ] ) ;
for ( int i = messageIndex + 1 ; i < messages . size ( ) & & messages [ messageIndex ] . version = = messages [ i ] . version ; i + + ) {
tags . push_back ( messages [ i ] . tags [ 0 ] ) ;
messageIndex = i ;
}
auto & msg = messages [ messageIndex ] ;
BinaryWriter messageWriter ( Unversioned ( ) ) ;
messageWriter < < uint32_t ( msg . message . size ( ) + sizeof ( uint32_t ) + sizeof ( uint16_t ) + tags . size ( ) * sizeof ( Tag ) ) < < msg . version . sub < < uint16_t ( tags . size ( ) ) ;
for ( auto & t : tags ) {
messageWriter < < t ;
}
messageWriter . serializeBytes ( msg . message ) ;
msg . arena = Arena ( ) ;
msg . tags = tags ;
msg . message = StringRef ( msg . arena , messageWriter . toStringRef ( ) ) ;
}
Reference < ILogSystem : : IPeekCursor > ILogSystem : : BufferedCursor : : cloneNoMore ( ) {
ASSERT ( false ) ;
return Reference < ILogSystem : : IPeekCursor > ( ) ;
}
void ILogSystem : : BufferedCursor : : setProtocolVersion ( uint64_t version ) {
for ( auto & c : cursors ) {
c - > setProtocolVersion ( version ) ;
}
}
Arena & ILogSystem : : BufferedCursor : : arena ( ) {
return messages [ messageIndex ] . arena ;
}
ArenaReader * ILogSystem : : BufferedCursor : : reader ( ) {
ASSERT ( false ) ;
return cursors [ 0 ] - > reader ( ) ;
}
bool ILogSystem : : BufferedCursor : : hasMessage ( ) {
return hasNextMessage ;
}
void ILogSystem : : BufferedCursor : : nextMessage ( ) {
messageIndex + + ;
if ( messageIndex = = messages . size ( ) ) {
hasNextMessage = false ;
}
if ( collectTags ) {
combineMessages ( ) ;
}
}
StringRef ILogSystem : : BufferedCursor : : getMessage ( ) {
ASSERT ( false ) ;
return StringRef ( ) ;
}
StringRef ILogSystem : : BufferedCursor : : getMessageWithTags ( ) {
return messages [ messageIndex ] . message ;
}
const std : : vector < Tag > & ILogSystem : : BufferedCursor : : getTags ( ) {
return messages [ messageIndex ] . tags ;
}
void ILogSystem : : BufferedCursor : : advanceTo ( LogMessageVersion n ) {
ASSERT ( false ) ;
}
ACTOR Future < Void > bufferedGetMoreLoader ( ILogSystem : : BufferedCursor * self , Reference < ILogSystem : : IPeekCursor > cursor , Version maxVersion , int taskID ) {
loop {
2018-08-11 04:57:10 +08:00
wait ( yield ( ) ) ;
2018-07-13 03:09:48 +08:00
if ( cursor - > version ( ) . version > = maxVersion ) {
return Void ( ) ;
}
while ( cursor - > hasMessage ( ) ) {
self - > messages . push_back ( ILogSystem : : BufferedCursor : : BufferedMessage ( cursor - > arena ( ) , self - > collectTags ? cursor - > getMessage ( ) : cursor - > getMessageWithTags ( ) , cursor - > getTags ( ) , cursor - > version ( ) ) ) ;
cursor - > nextMessage ( ) ;
if ( cursor - > version ( ) . version > = maxVersion ) {
return Void ( ) ;
}
}
2018-08-11 04:57:10 +08:00
wait ( cursor - > getMore ( taskID ) ) ;
2018-07-13 03:09:48 +08:00
}
}
ACTOR Future < Void > bufferedGetMore ( ILogSystem : : BufferedCursor * self , int taskID ) {
if ( self - > messageVersion . version > = self - > end ) {
2018-08-11 04:57:10 +08:00
wait ( Future < Void > ( Never ( ) ) ) ;
2018-07-13 03:09:48 +08:00
throw internal_error ( ) ;
}
state Version targetVersion = std : : min ( self - > end , self - > messageVersion . version + SERVER_KNOBS - > VERSIONS_PER_BATCH ) ;
self - > messages . clear ( ) ;
std : : vector < Future < Void > > loaders ;
loaders . reserve ( self - > cursors . size ( ) ) ;
for ( auto & cursor : self - > cursors ) {
loaders . push_back ( bufferedGetMoreLoader ( self , cursor , targetVersion , taskID ) ) ;
}
2018-08-11 04:57:10 +08:00
wait ( waitForAll ( loaders ) ) ;
wait ( yield ( ) ) ;
2018-07-13 03:09:48 +08:00
if ( self - > collectTags ) {
std : : sort ( self - > messages . begin ( ) , self - > messages . end ( ) ) ;
} else {
uniquify ( self - > messages ) ;
}
self - > messageIndex = 0 ;
self - > hasNextMessage = self - > messages . size ( ) > 0 ;
self - > messageVersion = LogMessageVersion ( targetVersion ) ;
if ( self - > collectTags ) {
self - > combineMessages ( ) ;
}
2018-07-13 08:47:35 +08:00
2018-08-11 04:57:10 +08:00
wait ( yield ( ) ) ;
2018-07-13 03:09:48 +08:00
return Void ( ) ;
}
Future < Void > ILogSystem : : BufferedCursor : : getMore ( int taskID ) {
if ( hasMessage ( ) )
return Void ( ) ;
return bufferedGetMore ( this , taskID ) ;
}
Future < Void > ILogSystem : : BufferedCursor : : onFailed ( ) {
ASSERT ( false ) ;
return Never ( ) ;
}
bool ILogSystem : : BufferedCursor : : isActive ( ) {
ASSERT ( false ) ;
return false ;
}
bool ILogSystem : : BufferedCursor : : isExhausted ( ) {
ASSERT ( false ) ;
return false ;
}
const LogMessageVersion & ILogSystem : : BufferedCursor : : version ( ) {
if ( hasNextMessage ) {
return messages [ messageIndex ] . version ;
}
return messageVersion ;
}
Version ILogSystem : : BufferedCursor : : getMinKnownCommittedVersion ( ) {
ASSERT ( false ) ;
return invalidVersion ;
}
Version ILogSystem : : BufferedCursor : : popped ( ) {
ASSERT ( false ) ;
return invalidVersion ;
}