2017-05-26 04:48:44 +08:00
/*
* LogSystem . h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013 - 2018 Apple Inc . and the FoundationDB project authors
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* http : //www.apache.org/licenses/LICENSE-2.0
2018-02-22 02:25:11 +08:00
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
# ifndef FDBSERVER_LOGSYSTEM_H
# define FDBSERVER_LOGSYSTEM_H
# pragma once
# include "TLogInterface.h"
# include "WorkerInterface.h"
2018-04-09 12:24:05 +08:00
# include "fdbclient/DatabaseConfiguration.h"
2017-05-26 04:48:44 +08:00
# include "flow/IndexedSet.h"
# include "fdbrpc/ReplicationPolicy.h"
# include "fdbrpc/Locality.h"
# include "fdbrpc/Replication.h"
struct DBCoreState ;
2017-07-12 06:48:10 +08:00
class LogSet : NonCopyable , public ReferenceCounted < LogSet > {
2017-07-11 08:41:32 +08:00
public :
std : : vector < Reference < AsyncVar < OptionalInterface < TLogInterface > > > > logServers ;
std : : vector < Reference < AsyncVar < OptionalInterface < TLogInterface > > > > logRouters ;
int32_t tLogWriteAntiQuorum ;
int32_t tLogReplicationFactor ;
std : : vector < LocalityData > tLogLocalities ; // Stores the localities of the log servers
IRepPolicyRef tLogPolicy ;
LocalitySetRef logServerSet ;
std : : vector < int > logIndexArray ;
std : : map < int , LocalityEntry > logEntryMap ;
bool isLocal ;
2017-09-08 06:32:08 +08:00
int8_t locality ;
2018-03-30 06:12:38 +08:00
Version startVersion ;
2018-04-09 12:24:05 +08:00
std : : vector < Future < TLogLockResult > > replies ;
2018-06-17 08:39:02 +08:00
std : : vector < std : : vector < int > > satelliteTagLocations ;
2017-07-11 08:41:32 +08:00
2018-06-16 03:36:19 +08:00
LogSet ( ) : tLogWriteAntiQuorum ( 0 ) , tLogReplicationFactor ( 0 ) , isLocal ( true ) , locality ( tagLocalityInvalid ) , startVersion ( invalidVersion ) { }
2017-07-11 08:41:32 +08:00
2018-04-30 09:54:47 +08:00
std : : string logRouterString ( ) {
std : : string result ;
for ( int i = 0 ; i < logRouters . size ( ) ; i + + ) {
if ( i > 0 ) {
result + = " , " ;
}
result + = logRouters [ i ] - > get ( ) . id ( ) . toString ( ) ;
}
return result ;
}
std : : string logServerString ( ) {
std : : string result ;
for ( int i = 0 ; i < logServers . size ( ) ; i + + ) {
if ( i > 0 ) {
result + = " , " ;
}
result + = logServers [ i ] - > get ( ) . id ( ) . toString ( ) ;
}
return result ;
}
2018-06-17 08:39:02 +08:00
void populateSatelliteTagLocations ( int logRouterTags , int oldLogRouterTags ) {
satelliteTagLocations . clear ( ) ;
satelliteTagLocations . resize ( std : : max ( logRouterTags , oldLogRouterTags ) + 1 ) ;
2018-06-18 04:29:17 +08:00
std : : set < std : : pair < int , int > > used_servers ;
2018-06-17 08:39:02 +08:00
for ( int i = 0 ; i < tLogLocalities . size ( ) ; i + + ) {
2018-06-18 04:29:17 +08:00
used_servers . insert ( std : : make_pair ( 0 , i ) ) ;
2018-06-17 08:39:02 +08:00
}
LocalitySetRef serverSet = Reference < LocalitySet > ( new LocalityMap < std : : pair < int , int > > ( ) ) ;
LocalityMap < std : : pair < int , int > > * serverMap = ( LocalityMap < std : : pair < int , int > > * ) serverSet . getPtr ( ) ;
2018-06-18 04:29:17 +08:00
std : : vector < std : : pair < int , int > > resultPairs ;
2018-06-17 08:39:02 +08:00
for ( int loc = 0 ; loc < satelliteTagLocations . size ( ) ; loc + + ) {
int team = loc ;
if ( loc < logRouterTags ) {
team = loc + 1 ;
} else if ( loc = = logRouterTags ) {
team = 0 ;
}
2018-06-18 04:29:17 +08:00
bool teamComplete = false ;
2018-06-17 08:39:02 +08:00
alsoServers . resize ( 1 ) ;
serverMap - > clear ( ) ;
2018-06-18 04:29:17 +08:00
resultPairs . clear ( ) ;
for ( auto & used_idx : used_servers ) {
auto entry = serverMap - > add ( tLogLocalities [ used_idx . second ] , & used_idx ) ;
if ( ! resultPairs . size ( ) ) {
resultPairs . push_back ( used_idx ) ;
alsoServers [ 0 ] = entry ;
2018-06-17 08:39:02 +08:00
}
resultEntries . clear ( ) ;
if ( serverSet - > selectReplicas ( tLogPolicy , alsoServers , resultEntries ) ) {
2018-06-18 04:29:17 +08:00
for ( auto & entry : resultEntries ) {
resultPairs . push_back ( * serverMap - > getObject ( entry ) ) ;
2018-06-17 08:39:02 +08:00
}
2018-06-18 04:29:17 +08:00
for ( auto & res : resultPairs ) {
satelliteTagLocations [ team ] . push_back ( res . second ) ;
used_servers . erase ( res ) ;
res . first + + ;
used_servers . insert ( res ) ;
}
teamComplete = true ;
2018-06-17 08:39:02 +08:00
break ;
}
}
2018-06-18 04:29:17 +08:00
ASSERT ( teamComplete ) ;
2018-06-17 08:39:02 +08:00
}
checkSatelliteTagLocations ( ) ;
}
void checkSatelliteTagLocations ( ) {
std : : vector < int > used ;
used . resize ( tLogLocalities . size ( ) ) ;
for ( auto team : satelliteTagLocations ) {
for ( auto loc : team ) {
used [ loc ] + + ;
}
}
int minUsed = satelliteTagLocations . size ( ) ;
int maxUsed = 0 ;
for ( auto i : used ) {
minUsed = std : : min ( minUsed , i ) ;
maxUsed = std : : max ( maxUsed , i ) ;
}
TraceEvent ( maxUsed - minUsed > 1 ? ( g_network - > isSimulated ( ) ? SevError : SevWarnAlways ) : SevInfo , " CheckSatelliteTagLocations " ) . detail ( " MinUsed " , minUsed ) . detail ( " MaxUsed " , maxUsed ) ;
}
2017-07-11 08:41:32 +08:00
int bestLocationFor ( Tag tag ) {
2018-06-17 08:39:02 +08:00
if ( locality = = tagLocalitySatellite ) {
return satelliteTagLocations [ tag = = txsTag ? 0 : tag . id + 1 ] [ 0 ] ;
}
//the following logic supports upgrades from 5.X
2018-06-16 03:36:19 +08:00
if ( tag = = txsTag ) return txsTagOld % logServers . size ( ) ;
return tag . id % logServers . size ( ) ;
2017-07-11 08:41:32 +08:00
}
void updateLocalitySet ( ) {
LocalityMap < int > * logServerMap ;
logServerSet = LocalitySetRef ( new LocalityMap < int > ( ) ) ;
logServerMap = ( LocalityMap < int > * ) logServerSet . getPtr ( ) ;
logEntryMap . clear ( ) ;
logIndexArray . clear ( ) ;
logIndexArray . reserve ( logServers . size ( ) ) ;
for ( int i = 0 ; i < logServers . size ( ) ; i + + ) {
if ( logServers [ i ] - > get ( ) . present ( ) ) {
logIndexArray . push_back ( i ) ;
ASSERT ( logEntryMap . find ( i ) = = logEntryMap . end ( ) ) ;
logEntryMap [ logIndexArray . back ( ) ] = logServerMap - > add ( logServers [ i ] - > get ( ) . interf ( ) . locality , & logIndexArray . back ( ) ) ;
}
}
}
2018-04-29 09:04:57 +08:00
void updateLocalitySet ( vector < LocalityData > const & localities ) {
2017-07-11 08:41:32 +08:00
LocalityMap < int > * logServerMap ;
logServerSet = LocalitySetRef ( new LocalityMap < int > ( ) ) ;
logServerMap = ( LocalityMap < int > * ) logServerSet . getPtr ( ) ;
logEntryMap . clear ( ) ;
logIndexArray . clear ( ) ;
2018-04-29 09:04:57 +08:00
logIndexArray . reserve ( localities . size ( ) ) ;
2017-07-11 08:41:32 +08:00
2018-04-29 09:04:57 +08:00
for ( int i = 0 ; i < localities . size ( ) ; i + + ) {
2017-07-11 08:41:32 +08:00
ASSERT ( logEntryMap . find ( i ) = = logEntryMap . end ( ) ) ;
logIndexArray . push_back ( i ) ;
2018-04-29 09:04:57 +08:00
logEntryMap [ logIndexArray . back ( ) ] = logServerMap - > add ( localities [ i ] , & logIndexArray . back ( ) ) ;
2017-07-11 08:41:32 +08:00
}
}
void getPushLocations ( std : : vector < Tag > const & tags , std : : vector < int > & locations , int locationOffset ) {
2018-06-17 08:39:02 +08:00
if ( locality = = tagLocalitySatellite ) {
for ( auto & t : tags ) {
2018-06-18 04:29:17 +08:00
if ( t = = txsTag | | t . locality = = tagLocalityLogRouter ) {
2018-06-17 08:39:02 +08:00
for ( int loc : satelliteTagLocations [ t = = txsTag ? 0 : t . id + 1 ] ) {
locations . push_back ( locationOffset + loc ) ;
}
}
}
2018-06-19 06:27:28 +08:00
uniquify ( locations ) ;
2018-06-17 08:39:02 +08:00
return ;
}
2017-07-11 08:41:32 +08:00
newLocations . clear ( ) ;
alsoServers . clear ( ) ;
resultEntries . clear ( ) ;
2018-06-16 03:36:19 +08:00
for ( auto & t : tags ) {
if ( locality = = tagLocalitySpecial | | t . locality = = locality | | t . locality < 0 ) {
newLocations . push_back ( bestLocationFor ( t ) ) ;
2017-07-11 08:41:32 +08:00
}
}
uniquify ( newLocations ) ;
if ( newLocations . size ( ) )
alsoServers . reserve ( newLocations . size ( ) ) ;
// Convert locations to the also servers
for ( auto location : newLocations ) {
ASSERT ( logEntryMap [ location ] . _id = = location ) ;
locations . push_back ( locationOffset + location ) ;
alsoServers . push_back ( logEntryMap [ location ] ) ;
}
// Run the policy, assert if unable to satify
bool result = logServerSet - > selectReplicas ( tLogPolicy , alsoServers , resultEntries ) ;
ASSERT ( result ) ;
// Add the new servers to the location array
LocalityMap < int > * logServerMap = ( LocalityMap < int > * ) logServerSet . getPtr ( ) ;
for ( auto entry : resultEntries ) {
locations . push_back ( locationOffset + * logServerMap - > getObject ( entry ) ) ;
}
2018-06-09 04:57:00 +08:00
//TraceEvent("GetPushLocations").detail("Policy", tLogPolicy->info())
2017-07-11 08:41:32 +08:00
// .detail("Results", locations.size()).detail("Selection", logServerSet->size())
// .detail("Included", alsoServers.size()).detail("Duration", timer() - t);
}
private :
std : : vector < LocalityEntry > alsoServers , resultEntries ;
std : : vector < int > newLocations ;
} ;
2017-05-26 04:48:44 +08:00
struct ILogSystem {
// Represents a particular (possibly provisional) epoch of the log subsystem
struct IPeekCursor {
//clones the peek cursor, however you cannot call getMore() on the cloned cursor.
virtual Reference < IPeekCursor > cloneNoMore ( ) = 0 ;
virtual void setProtocolVersion ( uint64_t version ) = 0 ;
2018-03-17 02:40:21 +08:00
//if hasMessage() returns true, getMessage(), getMessageWithTags(), or reader() can be called.
2017-05-26 04:48:44 +08:00
//does not modify the cursor
virtual bool hasMessage ( ) = 0 ;
2017-06-30 06:50:19 +08:00
//pre: only callable if hasMessage() returns true
//return the tags associated with the message for teh current sequence
2018-03-17 02:40:21 +08:00
virtual const std : : vector < Tag > & getTags ( ) = 0 ;
2017-06-30 06:50:19 +08:00
2017-05-26 04:48:44 +08:00
//pre: only callable if hasMessage() returns true
2018-03-17 02:40:21 +08:00
//returns the arena containing the contents of getMessage(), getMessageWithTags(), and reader()
2017-05-26 04:48:44 +08:00
virtual Arena & arena ( ) = 0 ;
//pre: only callable if hasMessage() returns true
//returns an arena reader for the next message
2018-03-17 02:40:21 +08:00
//caller cannot call getMessage(), getMessageWithTags(), and reader()
2017-05-26 04:48:44 +08:00
//the caller must advance the reader before calling nextMessage()
virtual ArenaReader * reader ( ) = 0 ;
//pre: only callable if hasMessage() returns true
2018-03-17 02:40:21 +08:00
//caller cannot call getMessage(), getMessageWithTags(), and reader()
2017-05-26 04:48:44 +08:00
//return the contents of the message for the current sequence
virtual StringRef getMessage ( ) = 0 ;
2018-03-17 02:40:21 +08:00
//pre: only callable if hasMessage() returns true
//caller cannot call getMessage(), getMessageWithTags(), and reader()
//return the contents of the message for the current sequence
virtual StringRef getMessageWithTags ( ) = 0 ;
//pre: only callable after getMessage(), getMessageWithTags(), or reader()
2017-05-26 04:48:44 +08:00
//post: hasMessage() and version() have been updated
//hasMessage() will never return false "in the middle" of a version (that is, if it does return false, version().subsequence will be zero) < FIXME: Can we lose this property?
virtual void nextMessage ( ) = 0 ;
//advances the cursor to the supplied LogMessageVersion, and updates hasMessage
virtual void advanceTo ( LogMessageVersion n ) = 0 ;
//returns immediately if hasMessage() returns true.
//returns when either the result of hasMessage() or version() has changed.
2018-01-12 08:09:49 +08:00
virtual Future < Void > getMore ( int taskID = TaskTLogPeekReply ) = 0 ;
2017-05-26 04:48:44 +08:00
//returns when the failure monitor detects that the servers associated with the cursor are failed
virtual Future < Void > onFailed ( ) = 0 ;
//returns false if:
// (1) the failure monitor detects that the servers associated with the cursor is failed
// (2) the interface is not present
// (3) the cursor cannot return any more results
virtual bool isActive ( ) = 0 ;
2017-07-16 06:15:03 +08:00
//returns true if the cursor cannot return any more results
virtual bool isExhausted ( ) = 0 ;
2017-05-26 04:48:44 +08:00
// Returns the smallest possible message version which the current message (if any) or a subsequent message might have
// (If hasMessage(), this is therefore the message version of the current message)
virtual LogMessageVersion version ( ) = 0 ;
//So far, the cursor has returned all messages which both satisfy the criteria passed to peek() to create the cursor AND have (popped(),0) <= message version number <= version()
//Other messages might have been skipped
virtual Version popped ( ) = 0 ;
// Returns the maximum version known to have been pushed (not necessarily durably) into the log system (0 is always a possible result!)
virtual Version getMaxKnownVersion ( ) { return 0 ; }
2018-06-22 06:29:46 +08:00
virtual Version getMinKnownCommittedVersion ( ) = 0 ;
2017-05-26 04:48:44 +08:00
virtual void addref ( ) = 0 ;
virtual void delref ( ) = 0 ;
} ;
struct ServerPeekCursor : IPeekCursor , ReferenceCounted < ServerPeekCursor > {
Reference < AsyncVar < OptionalInterface < TLogInterface > > > interf ;
Tag tag ;
TLogPeekReply results ;
ArenaReader rd ;
LogMessageVersion messageVersion , end ;
Version poppedVersion ;
2018-03-17 02:40:21 +08:00
int32_t messageLength , rawLength ;
2017-06-30 06:50:19 +08:00
std : : vector < Tag > tags ;
2017-05-26 04:48:44 +08:00
bool hasMsg ;
Future < Void > more ;
UID randomID ;
bool returnIfBlocked ;
bool parallelGetMore ;
int sequence ;
Deque < Future < TLogPeekReply > > futureResults ;
2017-08-10 06:58:06 +08:00
Future < Void > interfaceChanged ;
2017-05-26 04:48:44 +08:00
ServerPeekCursor ( Reference < AsyncVar < OptionalInterface < TLogInterface > > > const & interf , Tag tag , Version begin , Version end , bool returnIfBlocked , bool parallelGetMore ) ;
2018-03-17 02:40:21 +08:00
ServerPeekCursor ( TLogPeekReply const & results , LogMessageVersion const & messageVersion , LogMessageVersion const & end , int32_t messageLength , int32_t rawLength , bool hasMsg , Version poppedVersion , Tag tag ) ;
2017-05-26 04:48:44 +08:00
virtual Reference < IPeekCursor > cloneNoMore ( ) ;
virtual void setProtocolVersion ( uint64_t version ) ;
virtual Arena & arena ( ) ;
virtual ArenaReader * reader ( ) ;
virtual bool hasMessage ( ) ;
virtual void nextMessage ( ) ;
virtual StringRef getMessage ( ) ;
2018-03-17 02:40:21 +08:00
virtual StringRef getMessageWithTags ( ) ;
virtual const std : : vector < Tag > & getTags ( ) ;
2017-05-26 04:48:44 +08:00
virtual void advanceTo ( LogMessageVersion n ) ;
2018-01-12 08:09:49 +08:00
virtual Future < Void > getMore ( int taskID = TaskTLogPeekReply ) ;
2017-05-26 04:48:44 +08:00
virtual Future < Void > onFailed ( ) ;
virtual bool isActive ( ) ;
2017-07-16 06:15:03 +08:00
virtual bool isExhausted ( ) ;
2017-05-26 04:48:44 +08:00
virtual LogMessageVersion version ( ) ;
virtual Version popped ( ) ;
2018-06-22 06:29:46 +08:00
virtual Version getMinKnownCommittedVersion ( ) ;
2017-05-26 04:48:44 +08:00
virtual void addref ( ) {
ReferenceCounted < ServerPeekCursor > : : addref ( ) ;
}
virtual void delref ( ) {
ReferenceCounted < ServerPeekCursor > : : delref ( ) ;
}
virtual Version getMaxKnownVersion ( ) { return results . maxKnownVersion ; }
} ;
struct MergedPeekCursor : IPeekCursor , ReferenceCounted < MergedPeekCursor > {
2018-03-30 06:12:38 +08:00
LocalityGroup localityGroup ;
2017-05-26 04:48:44 +08:00
vector < Reference < IPeekCursor > > serverCursors ;
2017-07-11 08:41:32 +08:00
std : : vector < std : : pair < LogMessageVersion , int > > sortedVersions ;
2017-05-26 04:48:44 +08:00
Tag tag ;
int bestServer , currentCursor , readQuorum ;
Optional < LogMessageVersion > nextVersion ;
LogMessageVersion messageVersion ;
bool hasNextMessage ;
UID randomID ;
2017-06-30 06:50:19 +08:00
int tLogReplicationFactor ;
IRepPolicyRef tLogPolicy ;
2018-03-30 06:12:38 +08:00
std : : vector < LocalityData > tLogLocalities ;
2018-04-09 12:24:05 +08:00
Arena messageArena ;
2018-03-30 09:19:29 +08:00
//FIXME: collectTags is needed to support upgrades from 5.X to 6.0. Remove this code when we no longer support that upgrade.
2018-03-30 08:54:08 +08:00
bool collectTags ;
std : : vector < Tag > tags ;
2017-05-26 04:48:44 +08:00
2018-03-30 08:54:08 +08:00
MergedPeekCursor ( vector < Reference < ILogSystem : : IPeekCursor > > const & serverCursors , Version begin , bool collectTags ) ;
2018-03-30 06:12:38 +08:00
MergedPeekCursor ( std : : vector < Reference < AsyncVar < OptionalInterface < TLogInterface > > > > const & logServers , int bestServer , int readQuorum , Tag tag , Version begin , Version end , bool parallelGetMore , std : : vector < LocalityData > const & tLogLocalities , IRepPolicyRef const tLogPolicy , int tLogReplicationFactor ) ;
MergedPeekCursor ( vector < Reference < IPeekCursor > > const & serverCursors , LogMessageVersion const & messageVersion , int bestServer , int readQuorum , Optional < LogMessageVersion > nextVersion , std : : vector < LocalityData > const & tLogLocalities , IRepPolicyRef const tLogPolicy , int tLogReplicationFactor ) ;
2017-05-26 04:48:44 +08:00
virtual Reference < IPeekCursor > cloneNoMore ( ) ;
virtual void setProtocolVersion ( uint64_t version ) ;
virtual Arena & arena ( ) ;
virtual ArenaReader * reader ( ) ;
void calcHasMessage ( ) ;
2018-03-30 06:12:38 +08:00
void updateMessage ( bool usePolicy ) ;
2017-05-26 04:48:44 +08:00
virtual bool hasMessage ( ) ;
virtual void nextMessage ( ) ;
virtual StringRef getMessage ( ) ;
2018-03-17 02:40:21 +08:00
virtual StringRef getMessageWithTags ( ) ;
virtual const std : : vector < Tag > & getTags ( ) ;
2017-05-26 04:48:44 +08:00
virtual void advanceTo ( LogMessageVersion n ) ;
2018-01-12 08:09:49 +08:00
virtual Future < Void > getMore ( int taskID = TaskTLogPeekReply ) ;
2017-05-26 04:48:44 +08:00
virtual Future < Void > onFailed ( ) ;
virtual bool isActive ( ) ;
2017-07-16 06:15:03 +08:00
virtual bool isExhausted ( ) ;
2017-05-26 04:48:44 +08:00
virtual LogMessageVersion version ( ) ;
virtual Version popped ( ) ;
2018-06-22 06:29:46 +08:00
virtual Version getMinKnownCommittedVersion ( ) ;
2017-05-26 04:48:44 +08:00
virtual void addref ( ) {
ReferenceCounted < MergedPeekCursor > : : addref ( ) ;
}
virtual void delref ( ) {
ReferenceCounted < MergedPeekCursor > : : delref ( ) ;
}
} ;
2017-07-11 08:41:32 +08:00
struct SetPeekCursor : IPeekCursor , ReferenceCounted < SetPeekCursor > {
2017-07-12 06:48:10 +08:00
std : : vector < Reference < LogSet > > logSets ;
2017-07-11 08:41:32 +08:00
std : : vector < std : : vector < Reference < IPeekCursor > > > serverCursors ;
Tag tag ;
int bestSet , bestServer , currentSet , currentCursor ;
LocalityGroup localityGroup ;
std : : vector < std : : pair < LogMessageVersion , int > > sortedVersions ;
Optional < LogMessageVersion > nextVersion ;
LogMessageVersion messageVersion ;
bool hasNextMessage ;
bool useBestSet ;
UID randomID ;
2017-07-12 06:48:10 +08:00
SetPeekCursor ( std : : vector < Reference < LogSet > > const & logSets , int bestSet , int bestServer , Tag tag , Version begin , Version end , bool parallelGetMore ) ;
2018-04-27 09:32:12 +08:00
SetPeekCursor ( std : : vector < Reference < LogSet > > const & logSets , std : : vector < std : : vector < Reference < IPeekCursor > > > const & serverCursors , LogMessageVersion const & messageVersion , int bestSet , int bestServer , Optional < LogMessageVersion > nextVersion , bool useBestSet ) ;
2017-07-11 08:41:32 +08:00
virtual Reference < IPeekCursor > cloneNoMore ( ) ;
virtual void setProtocolVersion ( uint64_t version ) ;
virtual Arena & arena ( ) ;
virtual ArenaReader * reader ( ) ;
void calcHasMessage ( ) ;
void updateMessage ( int logIdx , bool usePolicy ) ;
virtual bool hasMessage ( ) ;
virtual void nextMessage ( ) ;
virtual StringRef getMessage ( ) ;
2018-03-17 02:40:21 +08:00
virtual StringRef getMessageWithTags ( ) ;
virtual const std : : vector < Tag > & getTags ( ) ;
2017-07-11 08:41:32 +08:00
virtual void advanceTo ( LogMessageVersion n ) ;
2018-01-17 02:48:50 +08:00
virtual Future < Void > getMore ( int taskID = TaskTLogPeekReply ) ;
2017-07-11 08:41:32 +08:00
virtual Future < Void > onFailed ( ) ;
virtual bool isActive ( ) ;
2017-07-16 06:15:03 +08:00
virtual bool isExhausted ( ) ;
2017-07-11 08:41:32 +08:00
virtual LogMessageVersion version ( ) ;
virtual Version popped ( ) ;
2018-06-22 06:29:46 +08:00
virtual Version getMinKnownCommittedVersion ( ) ;
2017-07-11 08:41:32 +08:00
virtual void addref ( ) {
ReferenceCounted < SetPeekCursor > : : addref ( ) ;
}
virtual void delref ( ) {
ReferenceCounted < SetPeekCursor > : : delref ( ) ;
}
} ;
2017-05-26 04:48:44 +08:00
struct MultiCursor : IPeekCursor , ReferenceCounted < MultiCursor > {
std : : vector < Reference < IPeekCursor > > cursors ;
std : : vector < LogMessageVersion > epochEnds ;
Version poppedVersion ;
MultiCursor ( std : : vector < Reference < IPeekCursor > > cursors , std : : vector < LogMessageVersion > epochEnds ) ;
virtual Reference < IPeekCursor > cloneNoMore ( ) ;
virtual void setProtocolVersion ( uint64_t version ) ;
virtual Arena & arena ( ) ;
virtual ArenaReader * reader ( ) ;
virtual bool hasMessage ( ) ;
virtual void nextMessage ( ) ;
virtual StringRef getMessage ( ) ;
2018-03-17 02:40:21 +08:00
virtual StringRef getMessageWithTags ( ) ;
virtual const std : : vector < Tag > & getTags ( ) ;
2017-05-26 04:48:44 +08:00
virtual void advanceTo ( LogMessageVersion n ) ;
2018-01-12 08:09:49 +08:00
virtual Future < Void > getMore ( int taskID = TaskTLogPeekReply ) ;
2017-05-26 04:48:44 +08:00
virtual Future < Void > onFailed ( ) ;
virtual bool isActive ( ) ;
2017-07-16 06:15:03 +08:00
virtual bool isExhausted ( ) ;
2017-05-26 04:48:44 +08:00
virtual LogMessageVersion version ( ) ;
virtual Version popped ( ) ;
2018-06-22 06:29:46 +08:00
virtual Version getMinKnownCommittedVersion ( ) ;
2017-05-26 04:48:44 +08:00
virtual void addref ( ) {
ReferenceCounted < MultiCursor > : : addref ( ) ;
}
virtual void delref ( ) {
ReferenceCounted < MultiCursor > : : delref ( ) ;
}
} ;
virtual void addref ( ) = 0 ;
virtual void delref ( ) = 0 ;
virtual std : : string describe ( ) = 0 ;
virtual UID getDebugID ( ) = 0 ;
virtual void toCoreState ( DBCoreState & ) = 0 ;
virtual Future < Void > onCoreStateChanged ( ) = 0 ;
// Returns if and when the output of toCoreState() would change (for example, when older logs can be discarded from the state)
virtual void coreStateWritten ( DBCoreState const & newState ) = 0 ;
// Called when a core state has been written to the coordinators
virtual Future < Void > onError ( ) = 0 ;
// Never returns normally, but throws an error if the subsystem stops working
//Future<Void> push( UID bundle, int64_t seq, VectorRef<TaggedMessageRef> messages );
2018-06-22 06:29:46 +08:00
virtual Future < Version > push ( Version prevVersion , Version version , Version knownCommittedVersion , Version minKnownCommittedVersion , struct LogPushData & data , Optional < UID > debugID = Optional < UID > ( ) ) = 0 ;
2017-05-26 04:48:44 +08:00
// Waits for the version number of the bundle (in this epoch) to be prevVersion (i.e. for all pushes ordered earlier)
// Puts the given messages into the bundle, each with the given tags, and with message versions (version, 0) - (version, N)
// Changes the version number of the bundle to be version (unblocking the next push)
// Returns when the preceding changes are durable. (Later we will need multiple return signals for diffferent durability levels)
// If the current epoch has ended, push will not return, and the pushed messages will not be visible in any subsequent epoch (but may become visible in this epoch)
2018-05-01 04:36:35 +08:00
virtual Reference < IPeekCursor > peek ( UID dbgid , Version begin , Tag tag , bool parallelGetMore = false ) = 0 ;
2017-05-26 04:48:44 +08:00
// Returns (via cursor interface) a stream of messages with the given tag and message versions >= (begin, 0), ordered by message version
// If pop was previously or concurrently called with upTo > begin, the cursor may not return all such messages. In that case cursor->popped() will
// be greater than begin to reflect that.
2018-05-01 04:36:35 +08:00
virtual Reference < IPeekCursor > peek ( UID dbgid , Version begin , std : : vector < Tag > tags , bool parallelGetMore = false ) = 0 ;
2018-03-30 06:12:38 +08:00
// Same contract as peek(), but for a set of tags
2018-05-01 04:36:35 +08:00
virtual Reference < IPeekCursor > peekSingle ( UID dbgid , Version begin , Tag tag , vector < pair < Version , Tag > > history = vector < pair < Version , Tag > > ( ) ) = 0 ;
2017-05-26 04:48:44 +08:00
// Same contract as peek(), but blocks until the preferred log server(s) for the given tag are available (and is correspondingly less expensive)
2018-05-01 04:36:35 +08:00
virtual Reference < IPeekCursor > peekLogRouter ( UID dbgid , Version begin , Tag tag ) = 0 ;
2018-06-22 06:29:46 +08:00
// Same contract as peek(), but can only peek from the logs elected in the same generation.
2018-03-30 06:12:38 +08:00
// If the preferred log server is down, a different log from the same generation will merge results locally before sending them to the log router.
2018-04-19 03:07:29 +08:00
virtual void pop ( Version upTo , Tag tag , Version knownCommittedVersion = 0 , int8_t popLocality = tagLocalityInvalid ) = 0 ;
2017-05-26 04:48:44 +08:00
// Permits, but does not require, the log subsystem to strip `tag` from any or all messages with message versions < (upTo,0)
// The popping of any given message may be arbitrarily delayed.
virtual Future < Void > confirmEpochLive ( Optional < UID > debugID = Optional < UID > ( ) ) = 0 ;
// Returns success after confirming that pushes in the current epoch are still possible
2017-08-04 07:16:36 +08:00
virtual Future < Void > endEpoch ( ) = 0 ;
// Ends the current epoch without starting a new one
2018-06-27 09:20:28 +08:00
static Reference < ILogSystem > fromServerDBInfo ( UID const & dbgid , struct ServerDBInfo const & db , bool useRecoveredAt = false , Optional < PromiseStream < Future < Void > > > addActor = Optional < PromiseStream < Future < Void > > > ( ) ) ;
static Reference < ILogSystem > fromLogSystemConfig ( UID const & dbgid , struct LocalityData const & , struct LogSystemConfig const & , bool excludeRemote = false , bool useRecoveredAt = false , Optional < PromiseStream < Future < Void > > > addActor = Optional < PromiseStream < Future < Void > > > ( ) ) ;
2017-05-26 04:48:44 +08:00
// Constructs a new ILogSystem implementation from the given ServerDBInfo/LogSystemConfig. Might return a null reference if there isn't a fully recovered log system available.
// The caller can peek() the returned log system and can push() if it has version numbers reserved for it and prevVersions
static Reference < ILogSystem > fromOldLogSystemConfig ( UID const & dbgid , struct LocalityData const & , struct LogSystemConfig const & ) ;
// Constructs a new ILogSystem implementation from the old log data within a ServerDBInfo/LogSystemConfig. Might return a null reference if there isn't a fully recovered log system available.
static Future < Void > recoverAndEndEpoch ( Reference < AsyncVar < Reference < ILogSystem > > > const & outLogSystem , UID const & dbgid , DBCoreState const & oldState , FutureStream < TLogRejoinRequest > const & rejoins , LocalityData const & locality ) ;
// Constructs a new ILogSystem implementation based on the given oldState and rejoining log servers
// Ensures that any calls to push or confirmEpochLive in the current epoch but strictly later than change_epoch will not return
// Whenever changes in the set of available log servers require restarting recovery with a different end sequence, outLogSystem will be changed to a new ILogSystem
virtual Version getEnd ( ) = 0 ;
// Call only on an ILogSystem obtained from recoverAndEndEpoch()
// Returns the first unreadable version number of the recovered epoch (i.e. message version numbers < (get_end(), 0) will be readable)
2018-06-14 09:14:14 +08:00
virtual Future < Reference < ILogSystem > > newEpoch ( struct RecruitFromConfigurationReply const & recr , Future < struct RecruitRemoteFromConfigurationReply > const & fRemoteWorkers , DatabaseConfiguration const & config ,
LogEpoch recoveryCount , int8_t primaryLocality , int8_t remoteLocality , std : : vector < Tag > const & allTags , Reference < AsyncVar < bool > > const & recruitmentStalled ) = 0 ;
2017-05-26 04:48:44 +08:00
// Call only on an ILogSystem obtained from recoverAndEndEpoch()
// Returns an ILogSystem representing a new epoch immediately following this one. The new epoch is only provisional until the caller updates the coordinated DBCoreState
virtual LogSystemConfig getLogSystemConfig ( ) = 0 ;
// Returns the physical configuration of this LogSystem, that could be used to construct an equivalent LogSystem using fromLogSystemConfig()
virtual Standalone < StringRef > getLogsValue ( ) = 0 ;
virtual Future < Void > onLogSystemConfigChange ( ) = 0 ;
// Returns when the log system configuration has changed due to a tlog rejoin.
virtual void getPushLocations ( std : : vector < Tag > const & tags , vector < int > & locations ) = 0 ;
2018-01-09 04:04:19 +08:00
virtual bool hasRemoteLogs ( ) = 0 ;
2017-06-30 06:50:19 +08:00
virtual Tag getRandomRouterTag ( ) = 0 ;
2017-05-26 04:48:44 +08:00
virtual void stopRejoins ( ) = 0 ;
} ;
2017-06-30 06:50:19 +08:00
struct LengthPrefixedStringRef {
// Represents a pointer to a string which is prefixed by a 4-byte length
// A LengthPrefixedStringRef is only pointer-sized (8 bytes vs 12 bytes for StringRef), but the corresponding string is 4 bytes bigger, and
// substring operations aren't efficient as they are with StringRef. It's a good choice when there might be lots of references to the same
// exact string.
uint32_t * length ;
StringRef toStringRef ( ) const { ASSERT ( length ) ; return StringRef ( ( uint8_t * ) ( length + 1 ) , * length ) ; }
int expectedSize ( ) const { ASSERT ( length ) ; return * length ; }
uint32_t * getLengthPtr ( ) const { return length ; }
LengthPrefixedStringRef ( ) : length ( NULL ) { }
LengthPrefixedStringRef ( uint32_t * length ) : length ( length ) { }
} ;
template < class T >
struct CompareFirst {
bool operator ( ) ( T const & lhs , T const & rhs ) const {
return lhs . first < rhs . first ;
}
} ;
2017-05-26 04:48:44 +08:00
struct LogPushData : NonCopyable {
// Log subsequences have to start at 1 (the MergedPeekCursor relies on this to make sure we never have !hasMessage() in the middle of data for a version
explicit LogPushData ( Reference < ILogSystem > logSystem ) : logSystem ( logSystem ) , subsequence ( 1 ) {
2017-07-12 06:48:10 +08:00
for ( auto & log : logSystem - > getLogSystemConfig ( ) . tLogs ) {
if ( log . isLocal ) {
2018-03-17 07:47:05 +08:00
for ( int i = 0 ; i < log . tLogs . size ( ) ; i + + ) {
messagesWriter . push_back ( BinaryWriter ( AssumeVersion ( currentProtocolVersion ) ) ) ;
}
2017-07-12 06:48:10 +08:00
}
}
2017-05-26 04:48:44 +08:00
}
// addTag() adds a tag for the *next* message to be added
void addTag ( Tag tag ) {
next_message_tags . push_back ( tag ) ;
}
void addMessage ( StringRef rawMessageWithoutLength , bool usePreviousLocations = false ) {
if ( ! usePreviousLocations ) {
2017-06-30 06:50:19 +08:00
prev_tags . clear ( ) ;
2018-01-09 04:04:19 +08:00
if ( logSystem - > hasRemoteLogs ( ) ) {
prev_tags . push_back ( logSystem - > getRandomRouterTag ( ) ) ;
}
2017-06-30 06:50:19 +08:00
for ( auto & tag : next_message_tags ) {
prev_tags . push_back ( tag ) ;
}
2017-07-14 03:29:21 +08:00
msg_locations . clear ( ) ;
logSystem - > getPushLocations ( prev_tags , msg_locations ) ;
2017-06-30 06:50:19 +08:00
next_message_tags . clear ( ) ;
2017-05-26 04:48:44 +08:00
}
uint32_t subseq = this - > subsequence + + ;
for ( int loc : msg_locations ) {
2017-06-30 06:50:19 +08:00
messagesWriter [ loc ] < < uint32_t ( rawMessageWithoutLength . size ( ) + sizeof ( subseq ) + sizeof ( uint16_t ) + sizeof ( Tag ) * prev_tags . size ( ) ) < < subseq < < uint16_t ( prev_tags . size ( ) ) ;
for ( auto & tag : prev_tags )
messagesWriter [ loc ] < < tag ;
2017-05-26 04:48:44 +08:00
messagesWriter [ loc ] . serializeBytes ( rawMessageWithoutLength ) ;
}
}
template < class T >
void addTypedMessage ( T const & item ) {
2017-06-30 06:50:19 +08:00
prev_tags . clear ( ) ;
2018-01-09 04:04:19 +08:00
if ( logSystem - > hasRemoteLogs ( ) ) {
prev_tags . push_back ( logSystem - > getRandomRouterTag ( ) ) ;
}
2017-06-30 06:50:19 +08:00
for ( auto & tag : next_message_tags ) {
prev_tags . push_back ( tag ) ;
}
2017-07-14 03:29:21 +08:00
msg_locations . clear ( ) ;
logSystem - > getPushLocations ( prev_tags , msg_locations ) ;
2017-05-26 04:48:44 +08:00
uint32_t subseq = this - > subsequence + + ;
for ( int loc : msg_locations ) {
// FIXME: memcpy after the first time
BinaryWriter & wr = messagesWriter [ loc ] ;
int offset = wr . getLength ( ) ;
2017-06-30 06:50:19 +08:00
wr < < uint32_t ( 0 ) < < subseq < < uint16_t ( prev_tags . size ( ) ) ;
for ( auto & tag : prev_tags )
wr < < tag ;
wr < < item ;
2017-05-26 04:48:44 +08:00
* ( uint32_t * ) ( ( uint8_t * ) wr . getData ( ) + offset ) = wr . getLength ( ) - offset - sizeof ( uint32_t ) ;
}
next_message_tags . clear ( ) ;
}
Arena getArena ( ) { return arena ; }
StringRef getMessages ( int loc ) {
return StringRef ( arena , messagesWriter [ loc ] . toStringRef ( ) ) ; // FIXME: Unnecessary copy!
}
private :
Reference < ILogSystem > logSystem ;
Arena arena ;
vector < Tag > next_message_tags ;
2017-06-30 06:50:19 +08:00
vector < Tag > prev_tags ;
2017-05-26 04:48:44 +08:00
vector < BinaryWriter > messagesWriter ;
vector < int > msg_locations ;
uint32_t subsequence ;
} ;
# endif