Add new api to get shared tlogs id and address

This commit is contained in:
Balachandar Namasivayam 2018-03-02 16:50:30 -08:00
parent ac1f3a20e8
commit 11df1aeabf
6 changed files with 36 additions and 15 deletions

View File

@ -126,6 +126,25 @@ struct LogSystemConfig {
return results;
}
std::vector<std::pair<UID, NetworkAddress>> allSharedLogs() const {
typedef std::pair<UID, NetworkAddress> IdAddrPair;
std::vector<IdAddrPair> results;
for (auto tLog : tLogs)
if (tLog.present())
results.push_back(IdAddrPair(tLog.interf().getSharedTLogID(), tLog.interf().address()));
for (auto &oldLog : oldTLogs) {
for (auto &tLog : oldLog.tLogs) {
if (tLog.present())
results.push_back(IdAddrPair(tLog.interf().getSharedTLogID(), tLog.interf().address()));
}
}
uniquify(results);
// This assert depends on the fact that uniquify will sort the elements based on <UID, NetworkAddr> order
ASSERT_WE_THINK(std::unique(results.begin(), results.end(), [](IdAddrPair &x, IdAddrPair &y) { return x.first == y.first; }) == results.end());
return results;
}
bool operator == ( const LogSystemConfig& rhs ) const { return isEqual(rhs); }
bool isEqual(LogSystemConfig const& r) const {

View File

@ -80,6 +80,8 @@ struct TLogRejoinRequest {
DBRecoveryCount recoveryCount;
ReplyPromise<bool> reply; // false means someone else registered, so we should re-register. true means this master is recovered, so don't send again to the same master.
TLogRejoinRequest() { }
explicit TLogRejoinRequest(const TLogInterface &interf) : myInterface(interf) { }
template <class Ar>
void serialize(Ar& ar) {
ar & myInterface & reply;

View File

@ -1417,8 +1417,7 @@ namespace oldTLog {
if (self->dbInfo->get().master.id() != lastMasterID) {
// The TLogRejoinRequest is needed to establish communications with a new master, which doesn't have our TLogInterface
TLogRejoinRequest req;
req.myInterface = tli;
TLogRejoinRequest req(tli);
TraceEvent("TLogRejoining", self->dbgid).detail("Master", self->dbInfo->get().master.id());
choose {
when ( bool success = wait( brokenPromiseToNever( self->dbInfo->get().master.tlogRejoin.getReply( req ) ) ) ) {

View File

@ -32,6 +32,7 @@ struct TLogInterface {
enum { LocationAwareLoadBalance = 1 };
LocalityData locality;
UID uniqueID;
UID sharedTLogID;
RequestStream< struct TLogPeekRequest > peekMessages;
RequestStream< struct TLogPopRequest > popMessages;
@ -42,8 +43,11 @@ struct TLogInterface {
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream< struct TLogRecoveryFinishedRequest > recoveryFinished;
TLogInterface() : uniqueID( g_random->randomUniqueID() ) {}
TLogInterface() { }
TLogInterface(UID _sharedTLogID, LocalityData _locality) : uniqueID( g_random->randomUniqueID() ), sharedTLogID(_sharedTLogID), locality(_locality) {}
TLogInterface(UID _uniqueID, UID _sharedTLogID, LocalityData _locality) : uniqueID(_uniqueID), sharedTLogID(_sharedTLogID), locality(_locality) {}
UID id() const { return uniqueID; }
UID getSharedTLogID() const { return sharedTLogID; }
std::string toString() const { return id().shortString(); }
bool operator == ( TLogInterface const& r ) const { return id() == r.id(); }
NetworkAddress address() const { return peekMessages.getEndpoint().address; }
@ -57,7 +61,7 @@ struct TLogInterface {
template <class Ar>
void serialize( Ar& ar ) {
ar & uniqueID & locality & peekMessages & popMessages
ar & uniqueID & sharedTLogID & locality & peekMessages & popMessages
& commit & lock & getQueuingMetrics & confirmRunning & waitFailure & recoveryFinished;
}
};

View File

@ -1119,8 +1119,7 @@ ACTOR Future<Void> rejoinMasters( TLogData* self, TLogInterface tli, DBRecoveryC
if( registerWithMaster.isReady() ) {
if ( self->dbInfo->get().master.id() != lastMasterID) {
// The TLogRejoinRequest is needed to establish communications with a new master, which doesn't have our TLogInterface
TLogRejoinRequest req;
req.myInterface = tli;
TLogRejoinRequest req(tli);
TraceEvent("TLogRejoining", self->dbgid).detail("Master", self->dbInfo->get().master.id());
choose {
when ( bool success = wait( brokenPromiseToNever( self->dbInfo->get().master.tlogRejoin.getReply( req ) ) ) ) {
@ -1341,9 +1340,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
state std::vector<Future<ErrorOr<Void>>> removed;
if(fFormat.get().get() == LiteralStringRef("FoundationDB/LogServer/2/2")) {
TLogInterface recruited;
recruited.uniqueID = self->dbgid;
recruited.locality = locality;
TLogInterface recruited(self->dbgid, self->dbgid, locality);
recruited.initEndpoints();
DUMPTOKEN( recruited.peekMessages );
@ -1374,9 +1371,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
UID id2 = BinaryReader::fromStringRef<UID>( fRecoverCounts.get()[idx].key.removePrefix(persistRecoveryCountKeys.begin), Unversioned() );
ASSERT(id1 == id2);
TLogInterface recruited;
recruited.uniqueID = id1;
recruited.locality = locality;
TLogInterface recruited(id1, id1, locality);
recruited.initEndpoints();
DUMPTOKEN( recruited.peekMessages );
@ -1679,7 +1674,7 @@ ACTOR Future<Void> recoverFromLogSystem( TLogData* self, Reference<LogData> logD
}
ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, LocalityData locality ) {
state TLogInterface recruited;
state TLogInterface recruited(self->dbgid, locality);
recruited.locality = locality;
recruited.initEndpoints();
@ -1758,7 +1753,8 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
state Future<Void> error = actorCollection( self.sharedActors.getFuture() );
TraceEvent("SharedTlog", tlogId);
// FIXME: Pass the worker id instead of stubbing it
startRole(tlogId, UID(), "SharedTLog");
try {
if(restoreFromDisk) {
Void _ = wait( restorePersistentState( &self, locality, oldLog, recovered, tlogRequests ) );
@ -1787,6 +1783,7 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
}
} catch (Error& e) {
TraceEvent("TLogError", tlogId).error(e, true);
endRole(tlogId, "SharedTLog", "Error", true);
if(recovered.canBeSet()) recovered.send(Void());
while(!tlogRequests.isEmpty()) {

View File

@ -51,7 +51,7 @@ using namespace boost::asio::ip;
// These impact both communications and the deserialization of certain database and IKeyValueStore keys
// xyzdev
// vvvv
uint64_t currentProtocolVersion = 0x0FDB00A551040001LL;
uint64_t currentProtocolVersion = 0x0FDB00A551060001LL;
uint64_t compatibleProtocolVersionMask = 0xffffffffffff0000LL;
uint64_t minValidProtocolVersion = 0x0FDB00A200060001LL;