2017-05-26 04:48:44 +08:00
/*
* LogSystem . h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013 - 2018 Apple Inc . and the FoundationDB project authors
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* http : //www.apache.org/licenses/LICENSE-2.0
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
# ifndef FDBSERVER_LOGSYSTEM_H
# define FDBSERVER_LOGSYSTEM_H
2019-04-11 01:30:34 +08:00
# include <set>
# include <vector>
2018-10-20 01:30:13 +08:00
# include "fdbserver/TLogInterface.h"
2019-02-18 11:13:26 +08:00
# include "fdbserver/WorkerInterface.actor.h"
2018-04-09 12:24:05 +08:00
# include "fdbclient/DatabaseConfiguration.h"
2017-05-26 04:48:44 +08:00
# include "flow/IndexedSet.h"
# include "fdbrpc/ReplicationPolicy.h"
# include "fdbrpc/Locality.h"
# include "fdbrpc/Replication.h"
struct DBCoreState ;
2019-04-11 02:21:27 +08:00
struct TLogSet ;
struct CoreTLogSet ;
2017-05-26 04:48:44 +08:00
2019-07-30 10:17:10 +08:00
// The set of tLog servers and logRouters for a log tag
2017-07-12 06:48:10 +08:00
class LogSet : NonCopyable , public ReferenceCounted < LogSet > {
2017-07-11 08:41:32 +08:00
public :
std : : vector < Reference < AsyncVar < OptionalInterface < TLogInterface > > > > logServers ;
std : : vector < Reference < AsyncVar < OptionalInterface < TLogInterface > > > > logRouters ;
2019-05-21 05:22:31 +08:00
std : : vector < Reference < AsyncVar < OptionalInterface < BackupInterface > > > > backupWorkers ;
2017-07-11 08:41:32 +08:00
int32_t tLogWriteAntiQuorum ;
int32_t tLogReplicationFactor ;
std : : vector < LocalityData > tLogLocalities ; // Stores the localities of the log servers
2019-02-27 08:47:04 +08:00
TLogVersion tLogVersion ;
2019-03-14 04:14:39 +08:00
Reference < IReplicationPolicy > tLogPolicy ;
Reference < LocalitySet > logServerSet ;
2017-07-11 08:41:32 +08:00
std : : vector < int > logIndexArray ;
2018-07-12 06:43:55 +08:00
std : : vector < LocalityEntry > logEntryArray ;
2017-07-11 08:41:32 +08:00
bool isLocal ;
2017-09-08 06:32:08 +08:00
int8_t locality ;
2018-03-30 06:12:38 +08:00
Version startVersion ;
2018-04-09 12:24:05 +08:00
std : : vector < Future < TLogLockResult > > replies ;
2018-06-17 08:39:02 +08:00
std : : vector < std : : vector < int > > satelliteTagLocations ;
2017-07-11 08:41:32 +08:00
2018-06-16 03:36:19 +08:00
LogSet ( ) : tLogWriteAntiQuorum ( 0 ) , tLogReplicationFactor ( 0 ) , isLocal ( true ) , locality ( tagLocalityInvalid ) , startVersion ( invalidVersion ) { }
2019-04-11 02:21:27 +08:00
LogSet ( const TLogSet & tlogSet ) ;
LogSet ( const CoreTLogSet & coreSet ) ;
2017-07-11 08:41:32 +08:00
2018-04-30 09:54:47 +08:00
std : : string logRouterString ( ) {
std : : string result ;
for ( int i = 0 ; i < logRouters . size ( ) ; i + + ) {
if ( i > 0 ) {
result + = " , " ;
}
result + = logRouters [ i ] - > get ( ) . id ( ) . toString ( ) ;
}
return result ;
}
2019-04-19 01:18:11 +08:00
bool hasLogRouter ( UID id ) {
for ( const auto & router : logRouters ) {
if ( router - > get ( ) . id ( ) = = id ) {
return true ;
}
}
return false ;
}
2018-04-30 09:54:47 +08:00
std : : string logServerString ( ) {
std : : string result ;
for ( int i = 0 ; i < logServers . size ( ) ; i + + ) {
if ( i > 0 ) {
result + = " , " ;
}
result + = logServers [ i ] - > get ( ) . id ( ) . toString ( ) ;
}
return result ;
}
2019-06-20 09:15:09 +08:00
void populateSatelliteTagLocations ( int logRouterTags , int oldLogRouterTags , int txsTags , int oldTxsTags ) {
2018-06-17 08:39:02 +08:00
satelliteTagLocations . clear ( ) ;
2019-06-20 09:15:09 +08:00
satelliteTagLocations . resize ( std : : max ( { logRouterTags , oldLogRouterTags , txsTags , oldTxsTags } ) + 1 ) ;
2018-06-28 13:14:50 +08:00
std : : map < int , int > server_usedBest ;
2018-06-18 04:29:17 +08:00
std : : set < std : : pair < int , int > > used_servers ;
2018-06-17 08:39:02 +08:00
for ( int i = 0 ; i < tLogLocalities . size ( ) ; i + + ) {
2018-06-18 04:29:17 +08:00
used_servers . insert ( std : : make_pair ( 0 , i ) ) ;
2018-06-17 08:39:02 +08:00
}
2019-03-14 04:14:39 +08:00
Reference < LocalitySet > serverSet = Reference < LocalitySet > ( new LocalityMap < std : : pair < int , int > > ( ) ) ;
2018-06-17 08:39:02 +08:00
LocalityMap < std : : pair < int , int > > * serverMap = ( LocalityMap < std : : pair < int , int > > * ) serverSet . getPtr ( ) ;
2018-06-18 04:29:17 +08:00
std : : vector < std : : pair < int , int > > resultPairs ;
2018-06-17 08:39:02 +08:00
for ( int loc = 0 ; loc < satelliteTagLocations . size ( ) ; loc + + ) {
int team = loc ;
if ( loc < logRouterTags ) {
team = loc + 1 ;
} else if ( loc = = logRouterTags ) {
team = 0 ;
}
2018-06-18 04:29:17 +08:00
bool teamComplete = false ;
2018-06-17 08:39:02 +08:00
alsoServers . resize ( 1 ) ;
serverMap - > clear ( ) ;
2018-06-18 04:29:17 +08:00
resultPairs . clear ( ) ;
for ( auto & used_idx : used_servers ) {
auto entry = serverMap - > add ( tLogLocalities [ used_idx . second ] , & used_idx ) ;
if ( ! resultPairs . size ( ) ) {
resultPairs . push_back ( used_idx ) ;
alsoServers [ 0 ] = entry ;
2018-06-17 08:39:02 +08:00
}
resultEntries . clear ( ) ;
if ( serverSet - > selectReplicas ( tLogPolicy , alsoServers , resultEntries ) ) {
2018-06-18 04:29:17 +08:00
for ( auto & entry : resultEntries ) {
resultPairs . push_back ( * serverMap - > getObject ( entry ) ) ;
2018-06-17 08:39:02 +08:00
}
2018-06-28 13:14:50 +08:00
int firstBestUsed = server_usedBest [ resultPairs [ 0 ] . second ] ;
for ( int i = 1 ; i < resultPairs . size ( ) ; i + + ) {
int thisBestUsed = server_usedBest [ resultPairs [ i ] . second ] ;
if ( thisBestUsed < firstBestUsed ) {
std : : swap ( resultPairs [ 0 ] , resultPairs [ i ] ) ;
firstBestUsed = thisBestUsed ;
}
}
server_usedBest [ resultPairs [ 0 ] . second ] + + ;
2018-06-18 04:29:17 +08:00
for ( auto & res : resultPairs ) {
satelliteTagLocations [ team ] . push_back ( res . second ) ;
used_servers . erase ( res ) ;
res . first + + ;
used_servers . insert ( res ) ;
}
teamComplete = true ;
2018-06-17 08:39:02 +08:00
break ;
}
}
2018-06-18 04:29:17 +08:00
ASSERT ( teamComplete ) ;
2018-06-17 08:39:02 +08:00
}
checkSatelliteTagLocations ( ) ;
}
void checkSatelliteTagLocations ( ) {
2018-06-28 13:14:50 +08:00
std : : vector < int > usedBest ;
2018-06-17 08:39:02 +08:00
std : : vector < int > used ;
2018-06-28 13:14:50 +08:00
usedBest . resize ( tLogLocalities . size ( ) ) ;
2018-06-17 08:39:02 +08:00
used . resize ( tLogLocalities . size ( ) ) ;
for ( auto team : satelliteTagLocations ) {
2018-06-28 13:14:50 +08:00
usedBest [ team [ 0 ] ] + + ;
2018-06-17 08:39:02 +08:00
for ( auto loc : team ) {
used [ loc ] + + ;
}
}
2018-06-28 13:14:50 +08:00
int minUsedBest = satelliteTagLocations . size ( ) ;
int maxUsedBest = 0 ;
for ( auto i : usedBest ) {
minUsedBest = std : : min ( minUsedBest , i ) ;
maxUsedBest = std : : max ( maxUsedBest , i ) ;
}
2018-06-17 08:39:02 +08:00
int minUsed = satelliteTagLocations . size ( ) ;
int maxUsed = 0 ;
for ( auto i : used ) {
minUsed = std : : min ( minUsed , i ) ;
maxUsed = std : : max ( maxUsed , i ) ;
}
2018-07-06 04:00:13 +08:00
bool foundDuplicate = false ;
std : : set < Optional < Key > > zones ;
2019-03-19 03:17:59 +08:00
std : : set < Optional < Key > > dcs ;
2018-07-06 04:00:13 +08:00
for ( auto & loc : tLogLocalities ) {
if ( zones . count ( loc . zoneId ( ) ) ) {
foundDuplicate = true ;
break ;
}
zones . insert ( loc . zoneId ( ) ) ;
2019-03-21 00:30:11 +08:00
dcs . insert ( loc . dcId ( ) ) ;
2018-07-06 04:00:13 +08:00
}
2019-03-19 03:17:59 +08:00
bool moreThanOneDC = dcs . size ( ) > 1 ? true : false ;
2018-07-06 04:00:13 +08:00
2019-03-20 06:23:14 +08:00
TraceEvent ( ( ( maxUsed - minUsed > 1 ) | | ( maxUsedBest - minUsedBest > 1 ) ) ? ( g_network - > isSimulated ( ) & & ! foundDuplicate & & ! moreThanOneDC ? SevError : SevWarnAlways ) : SevInfo , " CheckSatelliteTagLocations " ) . detail ( " MinUsed " , minUsed ) . detail ( " MaxUsed " , maxUsed ) . detail ( " MinUsedBest " , minUsedBest ) . detail ( " MaxUsedBest " , maxUsedBest ) . detail ( " DuplicateZones " , foundDuplicate ) . detail ( " NumOfDCs " , dcs . size ( ) ) ;
2018-06-17 08:39:02 +08:00
}
2017-07-11 08:41:32 +08:00
int bestLocationFor ( Tag tag ) {
2018-06-17 08:39:02 +08:00
if ( locality = = tagLocalitySatellite ) {
return satelliteTagLocations [ tag = = txsTag ? 0 : tag . id + 1 ] [ 0 ] ;
}
//the following logic supports upgrades from 5.X
2018-06-16 03:36:19 +08:00
if ( tag = = txsTag ) return txsTagOld % logServers . size ( ) ;
return tag . id % logServers . size ( ) ;
2017-07-11 08:41:32 +08:00
}
2019-05-17 04:54:06 +08:00
void updateLocalitySet ( std : : vector < LocalityData > const & localities ) {
2017-07-11 08:41:32 +08:00
LocalityMap < int > * logServerMap ;
2019-03-14 04:14:39 +08:00
logServerSet = Reference < LocalitySet > ( new LocalityMap < int > ( ) ) ;
2017-07-11 08:41:32 +08:00
logServerMap = ( LocalityMap < int > * ) logServerSet . getPtr ( ) ;
2018-07-12 06:43:55 +08:00
logEntryArray . clear ( ) ;
logEntryArray . reserve ( localities . size ( ) ) ;
2017-07-11 08:41:32 +08:00
logIndexArray . clear ( ) ;
2018-04-29 09:04:57 +08:00
logIndexArray . reserve ( localities . size ( ) ) ;
2017-07-11 08:41:32 +08:00
2018-04-29 09:04:57 +08:00
for ( int i = 0 ; i < localities . size ( ) ; i + + ) {
2017-07-11 08:41:32 +08:00
logIndexArray . push_back ( i ) ;
2018-07-12 06:43:55 +08:00
logEntryArray . push_back ( logServerMap - > add ( localities [ i ] , & logIndexArray . back ( ) ) ) ;
2017-07-11 08:41:32 +08:00
}
}
2018-07-12 06:43:55 +08:00
bool satisfiesPolicy ( const std : : vector < LocalityEntry > & locations ) {
resultEntries . clear ( ) ;
// Run the policy, assert if unable to satify
bool result = logServerSet - > selectReplicas ( tLogPolicy , locations , resultEntries ) ;
ASSERT ( result ) ;
return resultEntries . size ( ) = = 0 ;
}
2019-11-06 10:07:30 +08:00
void getPushLocations ( VectorRef < Tag > tags , std : : vector < int > & locations , int locationOffset ,
2019-03-11 22:31:44 +08:00
bool allLocations = false ) {
2018-06-17 08:39:02 +08:00
if ( locality = = tagLocalitySatellite ) {
for ( auto & t : tags ) {
2019-06-20 09:15:09 +08:00
if ( t = = txsTag | | t . locality = = tagLocalityTxs | | t . locality = = tagLocalityLogRouter ) {
2018-06-17 08:39:02 +08:00
for ( int loc : satelliteTagLocations [ t = = txsTag ? 0 : t . id + 1 ] ) {
locations . push_back ( locationOffset + loc ) ;
}
}
}
2018-06-19 06:27:28 +08:00
uniquify ( locations ) ;
2018-06-17 08:39:02 +08:00
return ;
}
2017-07-11 08:41:32 +08:00
newLocations . clear ( ) ;
alsoServers . clear ( ) ;
resultEntries . clear ( ) ;
2019-03-11 22:31:44 +08:00
if ( allLocations ) {
// special handling for allLocations
TraceEvent ( " AllLocationsSet " ) ;
for ( int i = 0 ; i < logServers . size ( ) ; i + + ) {
newLocations . push_back ( i ) ;
}
} else {
for ( auto & t : tags ) {
if ( locality = = tagLocalitySpecial | | t . locality = = locality | | t . locality < 0 ) {
newLocations . push_back ( bestLocationFor ( t ) ) ;
}
2017-07-11 08:41:32 +08:00
}
}
uniquify ( newLocations ) ;
if ( newLocations . size ( ) )
alsoServers . reserve ( newLocations . size ( ) ) ;
// Convert locations to the also servers
for ( auto location : newLocations ) {
locations . push_back ( locationOffset + location ) ;
2018-07-12 06:43:55 +08:00
alsoServers . push_back ( logEntryArray [ location ] ) ;
2017-07-11 08:41:32 +08:00
}
// Run the policy, assert if unable to satify
bool result = logServerSet - > selectReplicas ( tLogPolicy , alsoServers , resultEntries ) ;
ASSERT ( result ) ;
// Add the new servers to the location array
LocalityMap < int > * logServerMap = ( LocalityMap < int > * ) logServerSet . getPtr ( ) ;
for ( auto entry : resultEntries ) {
locations . push_back ( locationOffset + * logServerMap - > getObject ( entry ) ) ;
}
2018-06-09 04:57:00 +08:00
//TraceEvent("GetPushLocations").detail("Policy", tLogPolicy->info())
2017-07-11 08:41:32 +08:00
// .detail("Results", locations.size()).detail("Selection", logServerSet->size())
// .detail("Included", alsoServers.size()).detail("Duration", timer() - t);
}
private :
std : : vector < LocalityEntry > alsoServers , resultEntries ;
std : : vector < int > newLocations ;
} ;
2017-05-26 04:48:44 +08:00
struct ILogSystem {
// Represents a particular (possibly provisional) epoch of the log subsystem
struct IPeekCursor {
//clones the peek cursor, however you cannot call getMore() on the cloned cursor.
virtual Reference < IPeekCursor > cloneNoMore ( ) = 0 ;
2019-06-19 05:49:04 +08:00
virtual void setProtocolVersion ( ProtocolVersion version ) = 0 ;
2017-05-26 04:48:44 +08:00
2018-03-17 02:40:21 +08:00
//if hasMessage() returns true, getMessage(), getMessageWithTags(), or reader() can be called.
2017-05-26 04:48:44 +08:00
//does not modify the cursor
virtual bool hasMessage ( ) = 0 ;
2017-06-30 06:50:19 +08:00
//pre: only callable if hasMessage() returns true
2019-04-02 04:56:45 +08:00
//return the tags associated with the message for the current sequence
2019-11-06 10:07:30 +08:00
virtual VectorRef < Tag > getTags ( ) = 0 ;
2017-06-30 06:50:19 +08:00
2017-05-26 04:48:44 +08:00
//pre: only callable if hasMessage() returns true
2018-03-17 02:40:21 +08:00
//returns the arena containing the contents of getMessage(), getMessageWithTags(), and reader()
2017-05-26 04:48:44 +08:00
virtual Arena & arena ( ) = 0 ;
//pre: only callable if hasMessage() returns true
//returns an arena reader for the next message
2018-03-17 02:40:21 +08:00
//caller cannot call getMessage(), getMessageWithTags(), and reader()
2017-05-26 04:48:44 +08:00
//the caller must advance the reader before calling nextMessage()
virtual ArenaReader * reader ( ) = 0 ;
//pre: only callable if hasMessage() returns true
2018-03-17 02:40:21 +08:00
//caller cannot call getMessage(), getMessageWithTags(), and reader()
2017-05-26 04:48:44 +08:00
//return the contents of the message for the current sequence
virtual StringRef getMessage ( ) = 0 ;
2018-03-17 02:40:21 +08:00
//pre: only callable if hasMessage() returns true
//caller cannot call getMessage(), getMessageWithTags(), and reader()
//return the contents of the message for the current sequence
virtual StringRef getMessageWithTags ( ) = 0 ;
//pre: only callable after getMessage(), getMessageWithTags(), or reader()
2017-05-26 04:48:44 +08:00
//post: hasMessage() and version() have been updated
//hasMessage() will never return false "in the middle" of a version (that is, if it does return false, version().subsequence will be zero) < FIXME: Can we lose this property?
virtual void nextMessage ( ) = 0 ;
//advances the cursor to the supplied LogMessageVersion, and updates hasMessage
virtual void advanceTo ( LogMessageVersion n ) = 0 ;
//returns immediately if hasMessage() returns true.
2019-06-21 08:48:24 +08:00
//returns when either the result of hasMessage() or version() has changed, or a cursor has internally been exhausted.
2019-06-25 17:47:35 +08:00
virtual Future < Void > getMore ( TaskPriority taskID = TaskPriority : : TLogPeekReply ) = 0 ;
2017-05-26 04:48:44 +08:00
//returns when the failure monitor detects that the servers associated with the cursor are failed
virtual Future < Void > onFailed ( ) = 0 ;
//returns false if:
// (1) the failure monitor detects that the servers associated with the cursor is failed
// (2) the interface is not present
// (3) the cursor cannot return any more results
virtual bool isActive ( ) = 0 ;
2017-07-16 06:15:03 +08:00
//returns true if the cursor cannot return any more results
virtual bool isExhausted ( ) = 0 ;
2017-05-26 04:48:44 +08:00
// Returns the smallest possible message version which the current message (if any) or a subsequent message might have
// (If hasMessage(), this is therefore the message version of the current message)
2018-07-13 03:09:48 +08:00
virtual const LogMessageVersion & version ( ) = 0 ;
2017-05-26 04:48:44 +08:00
//So far, the cursor has returned all messages which both satisfy the criteria passed to peek() to create the cursor AND have (popped(),0) <= message version number <= version()
//Other messages might have been skipped
virtual Version popped ( ) = 0 ;
// Returns the maximum version known to have been pushed (not necessarily durably) into the log system (0 is always a possible result!)
virtual Version getMaxKnownVersion ( ) { return 0 ; }
2018-06-22 06:29:46 +08:00
virtual Version getMinKnownCommittedVersion ( ) = 0 ;
2017-05-26 04:48:44 +08:00
virtual void addref ( ) = 0 ;
virtual void delref ( ) = 0 ;
} ;
struct ServerPeekCursor : IPeekCursor , ReferenceCounted < ServerPeekCursor > {
Reference < AsyncVar < OptionalInterface < TLogInterface > > > interf ;
2019-09-05 05:52:09 +08:00
const Tag tag ;
2017-05-26 04:48:44 +08:00
TLogPeekReply results ;
ArenaReader rd ;
LogMessageVersion messageVersion , end ;
Version poppedVersion ;
2019-09-05 05:52:09 +08:00
TagsAndMessage messageAndTags ;
2017-05-26 04:48:44 +08:00
bool hasMsg ;
Future < Void > more ;
UID randomID ;
bool returnIfBlocked ;
2019-05-15 08:07:49 +08:00
bool onlySpilled ;
2017-05-26 04:48:44 +08:00
bool parallelGetMore ;
int sequence ;
Deque < Future < TLogPeekReply > > futureResults ;
2017-08-10 06:58:06 +08:00
Future < Void > interfaceChanged ;
2017-05-26 04:48:44 +08:00
ServerPeekCursor ( Reference < AsyncVar < OptionalInterface < TLogInterface > > > const & interf , Tag tag , Version begin , Version end , bool returnIfBlocked , bool parallelGetMore ) ;
2019-09-05 05:52:09 +08:00
ServerPeekCursor ( TLogPeekReply const & results , LogMessageVersion const & messageVersion , LogMessageVersion const & end , TagsAndMessage const & message , bool hasMsg , Version poppedVersion , Tag tag ) ;
2017-05-26 04:48:44 +08:00
virtual Reference < IPeekCursor > cloneNoMore ( ) ;
2019-06-19 05:49:04 +08:00
virtual void setProtocolVersion ( ProtocolVersion version ) ;
2017-05-26 04:48:44 +08:00
virtual Arena & arena ( ) ;
virtual ArenaReader * reader ( ) ;
virtual bool hasMessage ( ) ;
virtual void nextMessage ( ) ;
virtual StringRef getMessage ( ) ;
2018-03-17 02:40:21 +08:00
virtual StringRef getMessageWithTags ( ) ;
2019-11-06 10:07:30 +08:00
virtual VectorRef < Tag > getTags ( ) ;
2017-05-26 04:48:44 +08:00
virtual void advanceTo ( LogMessageVersion n ) ;
2019-06-25 17:47:35 +08:00
virtual Future < Void > getMore ( TaskPriority taskID = TaskPriority : : TLogPeekReply ) ;
2017-05-26 04:48:44 +08:00
virtual Future < Void > onFailed ( ) ;
virtual bool isActive ( ) ;
2017-07-16 06:15:03 +08:00
virtual bool isExhausted ( ) ;
2018-07-13 03:09:48 +08:00
virtual const LogMessageVersion & version ( ) ;
2017-05-26 04:48:44 +08:00
virtual Version popped ( ) ;
2018-06-22 06:29:46 +08:00
virtual Version getMinKnownCommittedVersion ( ) ;
2017-05-26 04:48:44 +08:00
virtual void addref ( ) {
ReferenceCounted < ServerPeekCursor > : : addref ( ) ;
}
virtual void delref ( ) {
ReferenceCounted < ServerPeekCursor > : : delref ( ) ;
}
virtual Version getMaxKnownVersion ( ) { return results . maxKnownVersion ; }
} ;
struct MergedPeekCursor : IPeekCursor , ReferenceCounted < MergedPeekCursor > {
2018-07-12 06:43:55 +08:00
Reference < LogSet > logSet ;
2019-05-17 04:54:06 +08:00
std : : vector < Reference < IPeekCursor > > serverCursors ;
2018-07-12 06:43:55 +08:00
std : : vector < LocalityEntry > locations ;
2017-07-11 08:41:32 +08:00
std : : vector < std : : pair < LogMessageVersion , int > > sortedVersions ;
2017-05-26 04:48:44 +08:00
Tag tag ;
int bestServer , currentCursor , readQuorum ;
Optional < LogMessageVersion > nextVersion ;
LogMessageVersion messageVersion ;
bool hasNextMessage ;
UID randomID ;
2017-06-30 06:50:19 +08:00
int tLogReplicationFactor ;
2019-11-05 11:47:45 +08:00
Future < Void > more ;
2018-07-12 06:43:55 +08:00
2019-05-17 04:54:06 +08:00
MergedPeekCursor ( std : : vector < Reference < ILogSystem : : IPeekCursor > > const & serverCursors , Version begin ) ;
2019-03-14 04:14:39 +08:00
MergedPeekCursor ( std : : vector < Reference < AsyncVar < OptionalInterface < TLogInterface > > > > const & logServers , int bestServer , int readQuorum , Tag tag , Version begin , Version end , bool parallelGetMore , std : : vector < LocalityData > const & tLogLocalities , Reference < IReplicationPolicy > const tLogPolicy , int tLogReplicationFactor ) ;
2019-05-17 04:54:06 +08:00
MergedPeekCursor ( std : : vector < Reference < IPeekCursor > > const & serverCursors , LogMessageVersion const & messageVersion , int bestServer , int readQuorum , Optional < LogMessageVersion > nextVersion , Reference < LogSet > logSet , int tLogReplicationFactor ) ;
2017-05-26 04:48:44 +08:00
virtual Reference < IPeekCursor > cloneNoMore ( ) ;
2019-06-19 05:49:04 +08:00
virtual void setProtocolVersion ( ProtocolVersion version ) ;
2017-05-26 04:48:44 +08:00
virtual Arena & arena ( ) ;
virtual ArenaReader * reader ( ) ;
void calcHasMessage ( ) ;
2018-03-30 06:12:38 +08:00
void updateMessage ( bool usePolicy ) ;
2017-05-26 04:48:44 +08:00
virtual bool hasMessage ( ) ;
virtual void nextMessage ( ) ;
virtual StringRef getMessage ( ) ;
2018-03-17 02:40:21 +08:00
virtual StringRef getMessageWithTags ( ) ;
2019-11-06 10:07:30 +08:00
virtual VectorRef < Tag > getTags ( ) ;
2017-05-26 04:48:44 +08:00
virtual void advanceTo ( LogMessageVersion n ) ;
2019-06-25 17:47:35 +08:00
virtual Future < Void > getMore ( TaskPriority taskID = TaskPriority : : TLogPeekReply ) ;
2017-05-26 04:48:44 +08:00
virtual Future < Void > onFailed ( ) ;
virtual bool isActive ( ) ;
2017-07-16 06:15:03 +08:00
virtual bool isExhausted ( ) ;
2018-07-13 03:09:48 +08:00
virtual const LogMessageVersion & version ( ) ;
2017-05-26 04:48:44 +08:00
virtual Version popped ( ) ;
2018-06-22 06:29:46 +08:00
virtual Version getMinKnownCommittedVersion ( ) ;
2017-05-26 04:48:44 +08:00
virtual void addref ( ) {
ReferenceCounted < MergedPeekCursor > : : addref ( ) ;
}
virtual void delref ( ) {
ReferenceCounted < MergedPeekCursor > : : delref ( ) ;
}
} ;
2017-07-11 08:41:32 +08:00
struct SetPeekCursor : IPeekCursor , ReferenceCounted < SetPeekCursor > {
2017-07-12 06:48:10 +08:00
std : : vector < Reference < LogSet > > logSets ;
2017-07-11 08:41:32 +08:00
std : : vector < std : : vector < Reference < IPeekCursor > > > serverCursors ;
Tag tag ;
int bestSet , bestServer , currentSet , currentCursor ;
2018-07-12 06:43:55 +08:00
std : : vector < LocalityEntry > locations ;
2017-07-11 08:41:32 +08:00
std : : vector < std : : pair < LogMessageVersion , int > > sortedVersions ;
Optional < LogMessageVersion > nextVersion ;
LogMessageVersion messageVersion ;
bool hasNextMessage ;
bool useBestSet ;
UID randomID ;
2019-11-05 11:47:45 +08:00
Future < Void > more ;
2017-07-11 08:41:32 +08:00
2017-07-12 06:48:10 +08:00
SetPeekCursor ( std : : vector < Reference < LogSet > > const & logSets , int bestSet , int bestServer , Tag tag , Version begin , Version end , bool parallelGetMore ) ;
2018-04-27 09:32:12 +08:00
SetPeekCursor ( std : : vector < Reference < LogSet > > const & logSets , std : : vector < std : : vector < Reference < IPeekCursor > > > const & serverCursors , LogMessageVersion const & messageVersion , int bestSet , int bestServer , Optional < LogMessageVersion > nextVersion , bool useBestSet ) ;
2017-07-11 08:41:32 +08:00
virtual Reference < IPeekCursor > cloneNoMore ( ) ;
2019-06-19 05:49:04 +08:00
virtual void setProtocolVersion ( ProtocolVersion version ) ;
2017-07-11 08:41:32 +08:00
virtual Arena & arena ( ) ;
virtual ArenaReader * reader ( ) ;
void calcHasMessage ( ) ;
void updateMessage ( int logIdx , bool usePolicy ) ;
virtual bool hasMessage ( ) ;
virtual void nextMessage ( ) ;
virtual StringRef getMessage ( ) ;
2018-03-17 02:40:21 +08:00
virtual StringRef getMessageWithTags ( ) ;
2019-11-06 10:07:30 +08:00
virtual VectorRef < Tag > getTags ( ) ;
2017-07-11 08:41:32 +08:00
virtual void advanceTo ( LogMessageVersion n ) ;
2019-06-25 17:47:35 +08:00
virtual Future < Void > getMore ( TaskPriority taskID = TaskPriority : : TLogPeekReply ) ;
2017-07-11 08:41:32 +08:00
virtual Future < Void > onFailed ( ) ;
virtual bool isActive ( ) ;
2017-07-16 06:15:03 +08:00
virtual bool isExhausted ( ) ;
2018-07-13 03:09:48 +08:00
virtual const LogMessageVersion & version ( ) ;
2017-07-11 08:41:32 +08:00
virtual Version popped ( ) ;
2018-06-22 06:29:46 +08:00
virtual Version getMinKnownCommittedVersion ( ) ;
2017-07-11 08:41:32 +08:00
virtual void addref ( ) {
ReferenceCounted < SetPeekCursor > : : addref ( ) ;
}
virtual void delref ( ) {
ReferenceCounted < SetPeekCursor > : : delref ( ) ;
}
} ;
2017-05-26 04:48:44 +08:00
struct MultiCursor : IPeekCursor , ReferenceCounted < MultiCursor > {
std : : vector < Reference < IPeekCursor > > cursors ;
std : : vector < LogMessageVersion > epochEnds ;
Version poppedVersion ;
2019-07-30 12:19:47 +08:00
MultiCursor ( std : : vector < Reference < IPeekCursor > > cursors , std : : vector < LogMessageVersion > epochEnds ) ;
2017-05-26 04:48:44 +08:00
virtual Reference < IPeekCursor > cloneNoMore ( ) ;
2019-06-19 05:49:04 +08:00
virtual void setProtocolVersion ( ProtocolVersion version ) ;
2017-05-26 04:48:44 +08:00
virtual Arena & arena ( ) ;
virtual ArenaReader * reader ( ) ;
virtual bool hasMessage ( ) ;
virtual void nextMessage ( ) ;
virtual StringRef getMessage ( ) ;
2018-03-17 02:40:21 +08:00
virtual StringRef getMessageWithTags ( ) ;
2019-11-06 10:07:30 +08:00
virtual VectorRef < Tag > getTags ( ) ;
2017-05-26 04:48:44 +08:00
virtual void advanceTo ( LogMessageVersion n ) ;
2019-06-25 17:47:35 +08:00
virtual Future < Void > getMore ( TaskPriority taskID = TaskPriority : : TLogPeekReply ) ;
2017-05-26 04:48:44 +08:00
virtual Future < Void > onFailed ( ) ;
virtual bool isActive ( ) ;
2017-07-16 06:15:03 +08:00
virtual bool isExhausted ( ) ;
2018-07-13 03:09:48 +08:00
virtual const LogMessageVersion & version ( ) ;
2017-05-26 04:48:44 +08:00
virtual Version popped ( ) ;
2018-06-22 06:29:46 +08:00
virtual Version getMinKnownCommittedVersion ( ) ;
2017-05-26 04:48:44 +08:00
virtual void addref ( ) {
ReferenceCounted < MultiCursor > : : addref ( ) ;
}
virtual void delref ( ) {
ReferenceCounted < MultiCursor > : : delref ( ) ;
}
} ;
2018-07-13 03:09:48 +08:00
struct BufferedCursor : IPeekCursor , ReferenceCounted < BufferedCursor > {
struct BufferedMessage {
Arena arena ;
StringRef message ;
2019-11-06 10:07:30 +08:00
VectorRef < Tag > tags ;
2018-07-13 03:09:48 +08:00
LogMessageVersion version ;
BufferedMessage ( ) { }
2019-07-31 03:38:44 +08:00
explicit BufferedMessage ( Version version ) : version ( version ) { }
2019-11-06 10:07:30 +08:00
BufferedMessage ( Arena arena , StringRef message , const VectorRef < Tag > & tags , const LogMessageVersion & version ) : arena ( arena ) , message ( message ) , tags ( tags ) , version ( version ) { }
2018-07-13 03:09:48 +08:00
bool operator < ( BufferedMessage const & r ) const {
return version < r . version ;
}
bool operator = = ( BufferedMessage const & r ) const {
return version = = r . version ;
}
} ;
std : : vector < Reference < IPeekCursor > > cursors ;
2019-11-05 11:47:45 +08:00
std : : vector < Deque < BufferedMessage > > cursorMessages ;
2018-07-13 03:09:48 +08:00
std : : vector < BufferedMessage > messages ;
int messageIndex ;
LogMessageVersion messageVersion ;
Version end ;
bool hasNextMessage ;
2019-06-20 09:15:09 +08:00
bool withTags ;
2019-11-05 11:47:45 +08:00
bool knownUnique ;
2019-11-06 10:07:30 +08:00
Version minKnownCommittedVersion ;
2019-07-30 12:19:47 +08:00
Version poppedVersion ;
Version initialPoppedVersion ;
2019-07-30 12:36:42 +08:00
bool canDiscardPopped ;
2019-07-31 05:42:05 +08:00
Future < Void > more ;
2019-11-05 11:47:45 +08:00
int targetQueueSize ;
UID randomID ;
2018-07-13 03:09:48 +08:00
//FIXME: collectTags is needed to support upgrades from 5.X to 6.0. Remove this code when we no longer support that upgrade.
bool collectTags ;
void combineMessages ( ) ;
2019-07-30 12:36:42 +08:00
BufferedCursor ( std : : vector < Reference < IPeekCursor > > cursors , Version begin , Version end , bool withTags , bool collectTags , bool canDiscardPopped ) ;
2019-11-05 11:47:45 +08:00
BufferedCursor ( std : : vector < Reference < AsyncVar < OptionalInterface < TLogInterface > > > > const & logServers , Tag tag , Version begin , Version end , bool parallelGetMore ) ;
2018-07-13 03:09:48 +08:00
virtual Reference < IPeekCursor > cloneNoMore ( ) ;
2019-06-19 05:49:04 +08:00
virtual void setProtocolVersion ( ProtocolVersion version ) ;
2018-07-13 03:09:48 +08:00
virtual Arena & arena ( ) ;
virtual ArenaReader * reader ( ) ;
virtual bool hasMessage ( ) ;
virtual void nextMessage ( ) ;
virtual StringRef getMessage ( ) ;
virtual StringRef getMessageWithTags ( ) ;
2019-11-06 10:07:30 +08:00
virtual VectorRef < Tag > getTags ( ) ;
2018-07-13 03:09:48 +08:00
virtual void advanceTo ( LogMessageVersion n ) ;
2019-06-25 17:47:35 +08:00
virtual Future < Void > getMore ( TaskPriority taskID = TaskPriority : : TLogPeekReply ) ;
2018-07-13 03:09:48 +08:00
virtual Future < Void > onFailed ( ) ;
virtual bool isActive ( ) ;
virtual bool isExhausted ( ) ;
virtual const LogMessageVersion & version ( ) ;
virtual Version popped ( ) ;
virtual Version getMinKnownCommittedVersion ( ) ;
virtual void addref ( ) {
ReferenceCounted < BufferedCursor > : : addref ( ) ;
}
virtual void delref ( ) {
ReferenceCounted < BufferedCursor > : : delref ( ) ;
}
} ;
2017-05-26 04:48:44 +08:00
virtual void addref ( ) = 0 ;
virtual void delref ( ) = 0 ;
virtual std : : string describe ( ) = 0 ;
virtual UID getDebugID ( ) = 0 ;
virtual void toCoreState ( DBCoreState & ) = 0 ;
2018-07-15 07:26:45 +08:00
virtual bool remoteStorageRecovered ( ) = 0 ;
2017-05-26 04:48:44 +08:00
virtual Future < Void > onCoreStateChanged ( ) = 0 ;
// Returns if and when the output of toCoreState() would change (for example, when older logs can be discarded from the state)
virtual void coreStateWritten ( DBCoreState const & newState ) = 0 ;
// Called when a core state has been written to the coordinators
virtual Future < Void > onError ( ) = 0 ;
// Never returns normally, but throws an error if the subsystem stops working
//Future<Void> push( UID bundle, int64_t seq, VectorRef<TaggedMessageRef> messages );
2018-06-22 06:29:46 +08:00
virtual Future < Version > push ( Version prevVersion , Version version , Version knownCommittedVersion , Version minKnownCommittedVersion , struct LogPushData & data , Optional < UID > debugID = Optional < UID > ( ) ) = 0 ;
2017-05-26 04:48:44 +08:00
// Waits for the version number of the bundle (in this epoch) to be prevVersion (i.e. for all pushes ordered earlier)
// Puts the given messages into the bundle, each with the given tags, and with message versions (version, 0) - (version, N)
// Changes the version number of the bundle to be version (unblocking the next push)
// Returns when the preceding changes are durable. (Later we will need multiple return signals for diffferent durability levels)
// If the current epoch has ended, push will not return, and the pushed messages will not be visible in any subsequent epoch (but may become visible in this epoch)
2019-11-05 11:47:45 +08:00
virtual Reference < IPeekCursor > peek ( UID dbgid , Version begin , Optional < Version > end , Tag tag , bool parallelGetMore = false ) = 0 ;
2017-05-26 04:48:44 +08:00
// Returns (via cursor interface) a stream of messages with the given tag and message versions >= (begin, 0), ordered by message version
// If pop was previously or concurrently called with upTo > begin, the cursor may not return all such messages. In that case cursor->popped() will
// be greater than begin to reflect that.
2018-07-13 03:09:48 +08:00
virtual Reference < IPeekCursor > peek ( UID dbgid , Version begin , Optional < Version > end , std : : vector < Tag > tags , bool parallelGetMore = false ) = 0 ;
2018-03-30 06:12:38 +08:00
// Same contract as peek(), but for a set of tags
2019-05-17 04:54:06 +08:00
virtual Reference < IPeekCursor > peekSingle ( UID dbgid , Version begin , Tag tag , std : : vector < std : : pair < Version , Tag > > history = std : : vector < std : : pair < Version , Tag > > ( ) ) = 0 ;
2017-05-26 04:48:44 +08:00
// Same contract as peek(), but blocks until the preferred log server(s) for the given tag are available (and is correspondingly less expensive)
2018-05-01 04:36:35 +08:00
virtual Reference < IPeekCursor > peekLogRouter ( UID dbgid , Version begin , Tag tag ) = 0 ;
2018-06-22 06:29:46 +08:00
// Same contract as peek(), but can only peek from the logs elected in the same generation.
2018-03-30 06:12:38 +08:00
// If the preferred log server is down, a different log from the same generation will merge results locally before sending them to the log router.
2019-07-31 04:25:25 +08:00
virtual Reference < IPeekCursor > peekTxs ( UID dbgid , Version begin , int8_t peekLocality , Version localEnd , bool canDiscardPopped ) = 0 ;
2019-06-20 09:15:09 +08:00
// Same contract as peek(), but only for peeking the txsLocality. It allows specifying a preferred peek locality.
2018-09-29 03:21:08 +08:00
2019-08-06 08:01:48 +08:00
virtual Future < Version > getTxsPoppedVersion ( ) = 0 ;
2019-02-22 08:52:27 +08:00
virtual Version getKnownCommittedVersion ( ) = 0 ;
2018-10-03 08:44:14 +08:00
2019-02-22 08:52:27 +08:00
virtual Future < Void > onKnownCommittedVersionChange ( ) = 0 ;
2018-10-03 08:44:14 +08:00
2019-06-20 09:15:09 +08:00
virtual void popTxs ( Version upTo , int8_t popLocality = tagLocalityInvalid ) = 0 ;
2019-08-01 09:27:36 +08:00
virtual void pop ( Version upTo , Tag tag , Version knownCommittedVersion = 0 , int8_t popLocality = tagLocalityInvalid ) = 0 ;
2017-05-26 04:48:44 +08:00
// Permits, but does not require, the log subsystem to strip `tag` from any or all messages with message versions < (upTo,0)
// The popping of any given message may be arbitrarily delayed.
virtual Future < Void > confirmEpochLive ( Optional < UID > debugID = Optional < UID > ( ) ) = 0 ;
// Returns success after confirming that pushes in the current epoch are still possible
2017-08-04 07:16:36 +08:00
virtual Future < Void > endEpoch ( ) = 0 ;
// Ends the current epoch without starting a new one
2018-06-27 09:20:28 +08:00
static Reference < ILogSystem > fromServerDBInfo ( UID const & dbgid , struct ServerDBInfo const & db , bool useRecoveredAt = false , Optional < PromiseStream < Future < Void > > > addActor = Optional < PromiseStream < Future < Void > > > ( ) ) ;
static Reference < ILogSystem > fromLogSystemConfig ( UID const & dbgid , struct LocalityData const & , struct LogSystemConfig const & , bool excludeRemote = false , bool useRecoveredAt = false , Optional < PromiseStream < Future < Void > > > addActor = Optional < PromiseStream < Future < Void > > > ( ) ) ;
2017-05-26 04:48:44 +08:00
// Constructs a new ILogSystem implementation from the given ServerDBInfo/LogSystemConfig. Might return a null reference if there isn't a fully recovered log system available.
// The caller can peek() the returned log system and can push() if it has version numbers reserved for it and prevVersions
static Reference < ILogSystem > fromOldLogSystemConfig ( UID const & dbgid , struct LocalityData const & , struct LogSystemConfig const & ) ;
// Constructs a new ILogSystem implementation from the old log data within a ServerDBInfo/LogSystemConfig. Might return a null reference if there isn't a fully recovered log system available.
2019-02-19 07:13:18 +08:00
static Future < Void > recoverAndEndEpoch ( Reference < AsyncVar < Reference < ILogSystem > > > const & outLogSystem , UID const & dbgid , DBCoreState const & oldState , FutureStream < TLogRejoinRequest > const & rejoins , LocalityData const & locality , bool * forceRecovery ) ;
2017-05-26 04:48:44 +08:00
// Constructs a new ILogSystem implementation based on the given oldState and rejoining log servers
// Ensures that any calls to push or confirmEpochLive in the current epoch but strictly later than change_epoch will not return
// Whenever changes in the set of available log servers require restarting recovery with a different end sequence, outLogSystem will be changed to a new ILogSystem
virtual Version getEnd ( ) = 0 ;
// Call only on an ILogSystem obtained from recoverAndEndEpoch()
// Returns the first unreadable version number of the recovered epoch (i.e. message version numbers < (get_end(), 0) will be readable)
2018-06-14 09:14:14 +08:00
virtual Future < Reference < ILogSystem > > newEpoch ( struct RecruitFromConfigurationReply const & recr , Future < struct RecruitRemoteFromConfigurationReply > const & fRemoteWorkers , DatabaseConfiguration const & config ,
LogEpoch recoveryCount , int8_t primaryLocality , int8_t remoteLocality , std : : vector < Tag > const & allTags , Reference < AsyncVar < bool > > const & recruitmentStalled ) = 0 ;
2017-05-26 04:48:44 +08:00
// Call only on an ILogSystem obtained from recoverAndEndEpoch()
// Returns an ILogSystem representing a new epoch immediately following this one. The new epoch is only provisional until the caller updates the coordinated DBCoreState
virtual LogSystemConfig getLogSystemConfig ( ) = 0 ;
// Returns the physical configuration of this LogSystem, that could be used to construct an equivalent LogSystem using fromLogSystemConfig()
virtual Standalone < StringRef > getLogsValue ( ) = 0 ;
virtual Future < Void > onLogSystemConfigChange ( ) = 0 ;
// Returns when the log system configuration has changed due to a tlog rejoin.
2019-11-06 10:07:30 +08:00
virtual void getPushLocations ( VectorRef < Tag > tags , std : : vector < int > & locations , bool allLocations = false ) = 0 ;
void getPushLocations ( std : : vector < Tag > const & tags , std : : vector < int > & locations , bool allLocations = false ) {
getPushLocations ( VectorRef < Tag > ( ( Tag * ) & tags . front ( ) , tags . size ( ) ) , locations , allLocations ) ;
}
2017-05-26 04:48:44 +08:00
2019-07-17 10:09:09 +08:00
virtual bool hasRemoteLogs ( ) const = 0 ;
2018-01-09 04:04:19 +08:00
2019-07-17 10:09:09 +08:00
virtual Tag getRandomRouterTag ( ) const = 0 ;
2017-06-30 06:50:19 +08:00
2019-07-17 10:09:09 +08:00
virtual Tag getRandomTxsTag ( ) const = 0 ;
2019-06-20 09:15:09 +08:00
2019-07-17 10:09:53 +08:00
// Returns the TLogVersion of the current generation of TLogs.
// (This only exists because getLogSystemConfig is a significantly more expensive call.)
virtual TLogVersion getTLogVersion ( ) const = 0 ;
2019-06-20 09:15:09 +08:00
2017-05-26 04:48:44 +08:00
virtual void stopRejoins ( ) = 0 ;
2019-04-24 06:39:26 +08:00
// Returns the pseudo tag to be popped for the given process class. If the
// process class doesn't use pseudo tag, return the same tag.
virtual Tag getPseudoPopTag ( Tag tag , ProcessClass : : ClassType type ) = 0 ;
virtual bool isPseudoLocality ( int8_t locality ) = 0 ;
2019-04-30 07:44:11 +08:00
virtual Version popPseudoLocalityTag ( int8_t locality , Version upTo ) = 0 ;
2017-05-26 04:48:44 +08:00
} ;
2017-06-30 06:50:19 +08:00
struct LengthPrefixedStringRef {
// Represents a pointer to a string which is prefixed by a 4-byte length
// A LengthPrefixedStringRef is only pointer-sized (8 bytes vs 12 bytes for StringRef), but the corresponding string is 4 bytes bigger, and
// substring operations aren't efficient as they are with StringRef. It's a good choice when there might be lots of references to the same
// exact string.
uint32_t * length ;
StringRef toStringRef ( ) const { ASSERT ( length ) ; return StringRef ( ( uint8_t * ) ( length + 1 ) , * length ) ; }
int expectedSize ( ) const { ASSERT ( length ) ; return * length ; }
uint32_t * getLengthPtr ( ) const { return length ; }
LengthPrefixedStringRef ( ) : length ( NULL ) { }
LengthPrefixedStringRef ( uint32_t * length ) : length ( length ) { }
} ;
template < class T >
struct CompareFirst {
bool operator ( ) ( T const & lhs , T const & rhs ) const {
return lhs . first < rhs . first ;
}
} ;
2017-05-26 04:48:44 +08:00
struct LogPushData : NonCopyable {
// Log subsequences have to start at 1 (the MergedPeekCursor relies on this to make sure we never have !hasMessage() in the middle of data for a version
2019-07-23 06:44:49 +08:00
explicit LogPushData ( Reference < ILogSystem > logSystem ) : logSystem ( logSystem ) , subsequence ( 1 ) {
2017-07-12 06:48:10 +08:00
for ( auto & log : logSystem - > getLogSystemConfig ( ) . tLogs ) {
if ( log . isLocal ) {
2018-03-17 07:47:05 +08:00
for ( int i = 0 ; i < log . tLogs . size ( ) ; i + + ) {
messagesWriter . push_back ( BinaryWriter ( AssumeVersion ( currentProtocolVersion ) ) ) ;
}
2017-07-12 06:48:10 +08:00
}
}
2017-05-26 04:48:44 +08:00
}
2019-06-20 09:15:09 +08:00
void addTxsTag ( ) {
2019-07-17 10:09:53 +08:00
if ( logSystem - > getTLogVersion ( ) > = TLogVersion : : V4 ) {
next_message_tags . push_back ( logSystem - > getRandomTxsTag ( ) ) ;
} else {
next_message_tags . push_back ( txsTag ) ;
}
2019-06-20 09:15:09 +08:00
}
2017-05-26 04:48:44 +08:00
// addTag() adds a tag for the *next* message to be added
void addTag ( Tag tag ) {
next_message_tags . push_back ( tag ) ;
}
2019-04-11 01:30:34 +08:00
template < class T >
void addTags ( T tags ) {
next_message_tags . insert ( next_message_tags . end ( ) , tags . begin ( ) , tags . end ( ) ) ;
}
2019-07-31 03:42:50 +08:00
void addMessage ( StringRef rawMessageWithoutLength , bool usePreviousLocations ) {
2017-05-26 04:48:44 +08:00
if ( ! usePreviousLocations ) {
2017-06-30 06:50:19 +08:00
prev_tags . clear ( ) ;
2018-01-09 04:04:19 +08:00
if ( logSystem - > hasRemoteLogs ( ) ) {
prev_tags . push_back ( logSystem - > getRandomRouterTag ( ) ) ;
}
2017-06-30 06:50:19 +08:00
for ( auto & tag : next_message_tags ) {
prev_tags . push_back ( tag ) ;
}
2017-07-14 03:29:21 +08:00
msg_locations . clear ( ) ;
logSystem - > getPushLocations ( prev_tags , msg_locations ) ;
2017-06-30 06:50:19 +08:00
next_message_tags . clear ( ) ;
2017-05-26 04:48:44 +08:00
}
uint32_t subseq = this - > subsequence + + ;
2019-07-30 12:19:47 +08:00
uint32_t msgsize = rawMessageWithoutLength . size ( ) + sizeof ( subseq ) + sizeof ( uint16_t ) + sizeof ( Tag ) * prev_tags . size ( ) ;
2017-05-26 04:48:44 +08:00
for ( int loc : msg_locations ) {
2019-07-30 12:19:47 +08:00
messagesWriter [ loc ] < < msgsize < < subseq < < uint16_t ( prev_tags . size ( ) ) ;
2017-06-30 06:50:19 +08:00
for ( auto & tag : prev_tags )
messagesWriter [ loc ] < < tag ;
2017-05-26 04:48:44 +08:00
messagesWriter [ loc ] . serializeBytes ( rawMessageWithoutLength ) ;
}
}
template < class T >
2019-03-11 22:31:44 +08:00
void addTypedMessage ( T const & item , bool allLocations = false ) {
2017-06-30 06:50:19 +08:00
prev_tags . clear ( ) ;
2018-01-09 04:04:19 +08:00
if ( logSystem - > hasRemoteLogs ( ) ) {
prev_tags . push_back ( logSystem - > getRandomRouterTag ( ) ) ;
}
2017-06-30 06:50:19 +08:00
for ( auto & tag : next_message_tags ) {
prev_tags . push_back ( tag ) ;
}
2017-07-14 03:29:21 +08:00
msg_locations . clear ( ) ;
2019-03-11 22:31:44 +08:00
logSystem - > getPushLocations ( prev_tags , msg_locations , allLocations ) ;
2017-05-26 04:48:44 +08:00
uint32_t subseq = this - > subsequence + + ;
for ( int loc : msg_locations ) {
// FIXME: memcpy after the first time
BinaryWriter & wr = messagesWriter [ loc ] ;
int offset = wr . getLength ( ) ;
2017-06-30 06:50:19 +08:00
wr < < uint32_t ( 0 ) < < subseq < < uint16_t ( prev_tags . size ( ) ) ;
for ( auto & tag : prev_tags )
wr < < tag ;
wr < < item ;
2017-05-26 04:48:44 +08:00
* ( uint32_t * ) ( ( uint8_t * ) wr . getData ( ) + offset ) = wr . getLength ( ) - offset - sizeof ( uint32_t ) ;
}
next_message_tags . clear ( ) ;
}
2019-03-29 02:52:50 +08:00
Standalone < StringRef > getMessages ( int loc ) {
return messagesWriter [ loc ] . toValue ( ) ;
2017-05-26 04:48:44 +08:00
}
private :
Reference < ILogSystem > logSystem ;
2019-05-17 04:54:06 +08:00
std : : vector < Tag > next_message_tags ;
std : : vector < Tag > prev_tags ;
std : : vector < BinaryWriter > messagesWriter ;
std : : vector < int > msg_locations ;
2017-05-26 04:48:44 +08:00
uint32_t subsequence ;
} ;
# endif