Merge pull request #1473 from jzhou77/tlog

Add pseudo tags that are mapped 1-to-1 with log router tags
This commit is contained in:
Evan Tschannen 2019-05-03 13:44:33 -07:00 committed by GitHub
commit 99373b0acb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 432 additions and 466 deletions

View File

@ -33,7 +33,15 @@ typedef StringRef KeyRef;
typedef StringRef ValueRef;
typedef int64_t Generation;
enum { tagLocalitySpecial = -1, tagLocalityLogRouter = -2, tagLocalityRemoteLog = -3, tagLocalityUpgraded = -4, tagLocalitySatellite = -5, tagLocalityInvalid = -99 }; //The TLog and LogRouter require these number to be as compact as possible
enum {
tagLocalitySpecial = -1,
tagLocalityLogRouter = -2,
tagLocalityRemoteLog = -3,
tagLocalityUpgraded = -4,
tagLocalitySatellite = -5,
tagLocalityLogRouterMapped = -6,
tagLocalityInvalid = -99
}; //The TLog and LogRouter require these number to be as compact as possible
#pragma pack(push, 1)
struct Tag {
@ -47,6 +55,10 @@ struct Tag {
bool operator != ( const Tag& r ) const { return locality!=r.locality || id!=r.id; }
bool operator < ( const Tag& r ) const { return locality < r.locality || (locality == r.locality && id < r.id); }
int toTagDataIndex() {
return locality >= 0 ? 2 * locality : 1 - (2 * locality);
}
std::string toString() const {
return format("%d:%d", locality, id);
}

View File

@ -94,6 +94,18 @@ struct ServerCacheInfo {
std::vector<Tag> tags;
std::vector<Reference<StorageInfo>> src_info;
std::vector<Reference<StorageInfo>> dest_info;
void populateTags() {
if (tags.size()) return;
for (const auto& info : src_info) {
tags.push_back(info->tag);
}
for (const auto& info : dest_info) {
tags.push_back(info->tag);
}
uniquify(tags);
}
};
struct GetValueReply : public LoadBalancedReply {

View File

@ -835,7 +835,6 @@ void filterLocalityDataForPolicy(Reference<IReplicationPolicy> policy, LocalityD
void filterLocalityDataForPolicy(Reference<IReplicationPolicy> policy, std::vector<LocalityData>* vld) {
if (!policy) return;
std::set<std::string> keys = policy->attributeKeys();
for (LocalityData& ld : *vld) {
filterLocalityDataForPolicy(policy, &ld);
}

View File

@ -42,6 +42,20 @@ struct applyMutationsData {
Reference<KeyRangeMap<Version>> keyVersion;
};
static Reference<StorageInfo> getStorageInfo(UID id, std::map<UID, Reference<StorageInfo>>* storageCache, IKeyValueStore* txnStateStore) {
Reference<StorageInfo> storageInfo;
auto cacheItr = storageCache->find(id);
if(cacheItr == storageCache->end()) {
storageInfo = Reference<StorageInfo>( new StorageInfo() );
storageInfo->tag = decodeServerTagValue( txnStateStore->readValue( serverTagKeyFor(id) ).get().get() );
storageInfo->interf = decodeServerListValue( txnStateStore->readValue( serverListKeyFor(id) ).get().get() );
(*storageCache)[id] = storageInfo;
} else {
storageInfo = cacheItr->second;
}
return storageInfo;
}
// It is incredibly important that any modifications to txnStateStore are done in such a way that
// the same operations will be done on all proxies at the same time. Otherwise, the data stored in
// txnStateStore will become corrupted.
@ -62,36 +76,19 @@ static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<Mut
decodeKeyServersValue(m.param2, src, dest);
ASSERT(storageCache);
Reference<StorageInfo> storageInfo;
ServerCacheInfo info;
info.tags.reserve(src.size() + dest.size());
info.src_info.reserve(src.size());
info.dest_info.reserve(dest.size());
for(auto id : src) {
auto cacheItr = storageCache->find(id);
if(cacheItr == storageCache->end()) {
storageInfo = Reference<StorageInfo>( new StorageInfo() );
storageInfo->tag = decodeServerTagValue( txnStateStore->readValue( serverTagKeyFor(id) ).get().get() );
storageInfo->interf = decodeServerListValue( txnStateStore->readValue( serverListKeyFor(id) ).get().get() );
(*storageCache)[id] = storageInfo;
} else {
storageInfo = cacheItr->second;
}
for (const auto& id : src) {
auto storageInfo = getStorageInfo(id, storageCache, txnStateStore);
ASSERT(storageInfo->tag != invalidTag);
info.tags.push_back( storageInfo->tag );
info.src_info.push_back( storageInfo );
}
for(auto id : dest) {
auto cacheItr = storageCache->find(id);
if(cacheItr == storageCache->end()) {
storageInfo = Reference<StorageInfo>( new StorageInfo() );
storageInfo->tag = decodeServerTagValue( txnStateStore->readValue( serverTagKeyFor(id) ).get().get() );
storageInfo->interf = decodeServerListValue( txnStateStore->readValue( serverListKeyFor(id) ).get().get() );
(*storageCache)[id] = storageInfo;
} else {
storageInfo = cacheItr->second;
}
for (const auto& id : dest) {
auto storageInfo = getStorageInfo(id, storageCache, txnStateStore);
ASSERT(storageInfo->tag != invalidTag);
info.tags.push_back( storageInfo->tag );
info.dest_info.push_back( storageInfo );
@ -258,15 +255,13 @@ static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<Mut
}
if (m.param1 == lastEpochEndKey) {
for (auto t : allTags)
toCommit->addTag(t);
toCommit->addTags(allTags);
toCommit->addTypedMessage(LogProtocolMessage());
}
MutationRef privatized = m;
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
for (auto t : allTags)
toCommit->addTag(t);
toCommit->addTags(allTags);
toCommit->addTypedMessage(privatized);
}
}

View File

@ -20,12 +20,18 @@
#ifndef FDBSERVER_DBCORESTATE_H
#define FDBSERVER_DBCORESTATE_H
#pragma once
#include <set>
#include <vector>
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/ReplicationPolicy.h"
#include "fdbserver/LogSystemConfig.h"
#include "fdbserver/MasterInterface.h"
class LogSet;
struct OldLogData;
// This structure is stored persistently in CoordinatedState and must be versioned carefully!
// It records a synchronous replication topology which can be used in the absence of faults (or under a limited
// number of failures, in the case of less than full write quorums) to durably commit transactions. When faults or
@ -49,15 +55,16 @@ struct CoreTLogSet {
TLogVersion tLogVersion;
CoreTLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityUpgraded), startVersion(invalidVersion) {}
explicit CoreTLogSet(const LogSet& logset);
bool operator == (CoreTLogSet const& rhs) const {
return tLogs == rhs.tLogs && tLogWriteAntiQuorum == rhs.tLogWriteAntiQuorum && tLogReplicationFactor == rhs.tLogReplicationFactor && isLocal == rhs.isLocal && satelliteTagLocations == rhs.satelliteTagLocations &&
locality == rhs.locality && startVersion == rhs.startVersion && ((!tLogPolicy && !rhs.tLogPolicy) || (tLogPolicy && rhs.tLogPolicy && (tLogPolicy->info() == rhs.tLogPolicy->info())));
locality == rhs.locality && startVersion == rhs.startVersion && ((!tLogPolicy && !rhs.tLogPolicy) || (tLogPolicy && rhs.tLogPolicy && (tLogPolicy->info() == rhs.tLogPolicy->info())));
}
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, tLogs, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations);
serializer(ar, tLogs, tLogWriteAntiQuorum, tLogReplicationFactor, tLogPolicy, tLogLocalities, isLocal, locality, startVersion, satelliteTagLocations);
if (ar.isDeserializing && ar.protocolVersion() < 0x0FDB00B061030001LL) {
tLogVersion = TLogVersion::V2;
} else {
@ -70,11 +77,13 @@ struct OldTLogCoreData {
std::vector<CoreTLogSet> tLogs;
int32_t logRouterTags;
Version epochEnd;
std::set<int8_t> pseudoLocalities;
OldTLogCoreData() : epochEnd(0), logRouterTags(0) {}
explicit OldTLogCoreData(const OldLogData&);
bool operator == (OldTLogCoreData const& rhs) const {
return tLogs == rhs.tLogs && logRouterTags == rhs.logRouterTags && epochEnd == rhs.epochEnd;
return tLogs == rhs.tLogs && logRouterTags == rhs.logRouterTags && epochEnd == rhs.epochEnd && pseudoLocalities == rhs.pseudoLocalities;
}
template <class Archive>
@ -87,6 +96,9 @@ struct OldTLogCoreData {
serializer(ar, tLogs[0].tLogs, tLogs[0].tLogWriteAntiQuorum, tLogs[0].tLogReplicationFactor, tLogs[0].tLogPolicy, epochEnd, tLogs[0].tLogLocalities);
tLogs[0].tLogVersion = TLogVersion::V2;
}
if (ar.protocolVersion() > 0x0FDB00B061060001LL) {
serializer(ar, pseudoLocalities);
}
}
};
@ -95,9 +107,10 @@ struct DBCoreState {
int32_t logRouterTags;
std::vector<OldTLogCoreData> oldTLogData;
DBRecoveryCount recoveryCount; // Increases with sequential successful recoveries.
int logSystemType;
LogSystemType logSystemType;
std::set<int8_t> pseudoLocalities;
DBCoreState() : logRouterTags(0), recoveryCount(0), logSystemType(0) {}
DBCoreState() : logRouterTags(0), recoveryCount(0), logSystemType(LogSystemType::empty) {}
vector<UID> getPriorCommittedLogServers() {
vector<UID> priorCommittedLogServers;
@ -117,7 +130,7 @@ struct DBCoreState {
}
bool isEqual(DBCoreState const& r) const {
return logSystemType == r.logSystemType && recoveryCount == r.recoveryCount && tLogs == r.tLogs && oldTLogData == r.oldTLogData && logRouterTags == r.logRouterTags;
return logSystemType == r.logSystemType && recoveryCount == r.recoveryCount && tLogs == r.tLogs && oldTLogData == r.oldTLogData && logRouterTags == r.logRouterTags && pseudoLocalities == r.pseudoLocalities;
}
bool operator == ( const DBCoreState& rhs ) const { return isEqual(rhs); }
@ -132,6 +145,9 @@ struct DBCoreState {
ASSERT(ar.protocolVersion() >= 0x0FDB00A460010001LL);
if(ar.protocolVersion() >= 0x0FDB00A560010001LL) {
serializer(ar, tLogs, logRouterTags, oldTLogData, recoveryCount, logSystemType);
if (ar.protocolVersion() > 0x0FDB00B061060001LL) {
serializer(ar, pseudoLocalities);
}
} else if(ar.isDeserializing) {
tLogs.push_back(CoreTLogSet());
serializer(ar, tLogs[0].tLogs, tLogs[0].tLogWriteAntiQuorum, recoveryCount, tLogs[0].tLogReplicationFactor, logSystemType);

View File

@ -103,7 +103,7 @@ struct LogRouterData {
return newTagData;
}
LogRouterData(UID dbgid, InitializeLogRouterRequest req) : dbgid(dbgid), routerTag(req.routerTag), logSystem(new AsyncVar<Reference<ILogSystem>>()), version(req.startVersion-1), minPopped(0), startVersion(req.startVersion), allowPops(false), minKnownCommittedVersion(0), poppedVersion(0), foundEpochEnd(false) {
LogRouterData(UID dbgid, const InitializeLogRouterRequest& req) : dbgid(dbgid), routerTag(req.routerTag), logSystem(new AsyncVar<Reference<ILogSystem>>()), version(req.startVersion-1), minPopped(0), startVersion(req.startVersion), allowPops(false), minKnownCommittedVersion(0), poppedVersion(0), foundEpochEnd(false) {
//setup just enough of a logSet to be able to call getPushLocations
logSet.logServers.resize(req.tLogLocalities.size());
logSet.tLogPolicy = req.tLogPolicy;
@ -145,7 +145,7 @@ void commitMessages( LogRouterData* self, Version version, const std::vector<Tag
for(auto& msg : taggedMessages) {
if(msg.message.size() > block.capacity() - block.size()) {
self->messageBlocks.push_back( std::make_pair(version, block) );
self->messageBlocks.emplace_back(version, block);
block = Standalone<VectorRef<uint8_t>>();
block.reserve(block.arena(), std::max<int64_t>(SERVER_KNOBS->TLOG_MESSAGE_BLOCK_BYTES, msgSize));
}
@ -158,7 +158,7 @@ void commitMessages( LogRouterData* self, Version version, const std::vector<Tag
}
if (version >= tagData->popped) {
tagData->version_messages.push_back(std::make_pair(version, LengthPrefixedStringRef((uint32_t*)(block.end() - msg.message.size()))));
tagData->version_messages.emplace_back(version, LengthPrefixedStringRef((uint32_t*)(block.end() - msg.message.size())));
if(tagData->version_messages.back().second.expectedSize() > SERVER_KNOBS->MAX_MESSAGE_SIZE) {
TraceEvent(SevWarnAlways, "LargeMessage").detail("Size", tagData->version_messages.back().second.expectedSize());
}
@ -167,7 +167,7 @@ void commitMessages( LogRouterData* self, Version version, const std::vector<Tag
msgSize -= msg.message.size();
}
self->messageBlocks.push_back( std::make_pair(version, block) );
self->messageBlocks.emplace_back(version, block);
}
ACTOR Future<Void> waitForVersion( LogRouterData *self, Version ver ) {
@ -259,8 +259,8 @@ ACTOR Future<Void> pullAsyncData( LogRouterData *self ) {
tagAndMsg.message = r->getMessageWithTags();
tags.clear();
self->logSet.getPushLocations(r->getTags(), tags, 0);
for(auto t : tags) {
tagAndMsg.tags.push_back(Tag(tagLocalityRemoteLog, t));
for (const auto& t : tags) {
tagAndMsg.tags.emplace_back(tagLocalityRemoteLog, t);
}
messages.push_back(std::move(tagAndMsg));
@ -378,7 +378,8 @@ ACTOR Future<Void> logRouterPop( LogRouterData* self, TLogPopRequest req ) {
self->poppedVersion = std::min(minKnownCommittedVersion, self->minKnownCommittedVersion);
if(self->logSystem->get() && self->allowPops) {
self->logSystem->get()->pop(self->poppedVersion, self->routerTag);
const Tag popTag = self->logSystem->get()->getPseudoPopTag(self->routerTag, ProcessClass::LogRouterClass);
self->logSystem->get()->pop(self->poppedVersion, popTag);
}
req.reply.send(Void());
self->minPopped.set(std::max(minPopped, self->minPopped.get()));

View File

@ -20,7 +20,9 @@
#ifndef FDBSERVER_LOGSYSTEM_H
#define FDBSERVER_LOGSYSTEM_H
#pragma once
#include <set>
#include <vector>
#include "fdbserver/TLogInterface.h"
#include "fdbserver/WorkerInterface.actor.h"
@ -31,6 +33,8 @@
#include "fdbrpc/Replication.h"
struct DBCoreState;
struct TLogSet;
struct CoreTLogSet;
class LogSet : NonCopyable, public ReferenceCounted<LogSet> {
public:
@ -51,6 +55,8 @@ public:
std::vector<std::vector<int>> satelliteTagLocations;
LogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityInvalid), startVersion(invalidVersion) {}
LogSet(const TLogSet& tlogSet);
LogSet(const CoreTLogSet& coreSet);
std::string logRouterString() {
std::string result;
@ -63,6 +69,15 @@ public:
return result;
}
bool hasLogRouter(UID id) {
for (const auto& router : logRouters) {
if (router->get().id() == id) {
return true;
}
}
return false;
}
std::string logServerString() {
std::string result;
for(int i = 0; i < logServers.size(); i++) {
@ -284,7 +299,7 @@ struct ILogSystem {
virtual bool hasMessage() = 0;
//pre: only callable if hasMessage() returns true
//return the tags associated with the message for teh current sequence
//return the tags associated with the message for the current sequence
virtual const std::vector<Tag>& getTags() = 0;
//pre: only callable if hasMessage() returns true
@ -682,6 +697,14 @@ struct ILogSystem {
virtual Tag getRandomRouterTag() = 0;
virtual void stopRejoins() = 0;
// Returns the pseudo tag to be popped for the given process class. If the
// process class doesn't use pseudo tag, return the same tag.
virtual Tag getPseudoPopTag(Tag tag, ProcessClass::ClassType type) = 0;
virtual bool isPseudoLocality(int8_t locality) = 0;
virtual Version popPseudoLocalityTag(int8_t locality, Version upTo) = 0;
};
struct LengthPrefixedStringRef {
@ -725,6 +748,11 @@ struct LogPushData : NonCopyable {
next_message_tags.push_back( tag );
}
template<class T>
void addTags(T tags) {
next_message_tags.insert(next_message_tags.end(), tags.begin(), tags.end());
}
void addMessage( StringRef rawMessageWithoutLength, bool usePreviousLocations = false ) {
if( !usePreviousLocations ) {
prev_tags.clear();

View File

@ -55,6 +55,9 @@ protected:
Optional<Interface> iface;
};
class LogSet;
struct OldLogData;
struct TLogSet {
std::vector<OptionalInterface<TLogInterface>> tLogs;
std::vector<OptionalInterface<TLogInterface>> logRouters;
@ -68,6 +71,7 @@ struct TLogSet {
std::vector<std::vector<int>> satelliteTagLocations;
TLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), locality(tagLocalityInvalid), startVersion(invalidVersion) {}
explicit TLogSet(const LogSet& rhs);
std::string toString() const {
return format("anti: %d replication: %d local: %d routers: %d tLogs: %s locality: %d", tLogWriteAntiQuorum, tLogReplicationFactor, isLocal, logRouters.size(), describe(tLogs).c_str(), locality);
@ -126,15 +130,17 @@ struct OldTLogConf {
std::vector<TLogSet> tLogs;
Version epochEnd;
int32_t logRouterTags;
std::set<int8_t> pseudoLocalities;
OldTLogConf() : epochEnd(0), logRouterTags(0) {}
explicit OldTLogConf(const OldLogData&);
std::string toString() const {
return format("end: %d tags: %d %s", epochEnd, logRouterTags, describe(tLogs).c_str());
}
bool operator == ( const OldTLogConf& rhs ) const {
return tLogs == rhs.tLogs && epochEnd == rhs.epochEnd && logRouterTags == rhs.logRouterTags;
return tLogs == rhs.tLogs && epochEnd == rhs.epochEnd && logRouterTags == rhs.logRouterTags && pseudoLocalities == rhs.pseudoLocalities;
}
bool isEqualIds(OldTLogConf const& r) const {
@ -151,12 +157,18 @@ struct OldTLogConf {
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, tLogs, epochEnd, logRouterTags);
serializer(ar, tLogs, epochEnd, logRouterTags, pseudoLocalities);
}
};
enum class LogSystemType {
empty = 0,
tagPartitioned = 2,
};
BINARY_SERIALIZABLE(LogSystemType);
struct LogSystemConfig {
int32_t logSystemType;
LogSystemType logSystemType;
std::vector<TLogSet> tLogs;
int32_t logRouterTags;
std::vector<OldTLogConf> oldTLogs;
@ -164,8 +176,9 @@ struct LogSystemConfig {
UID recruitmentID;
bool stopped;
Optional<Version> recoveredAt;
std::set<int8_t> pseudoLocalities;
LogSystemConfig() : logSystemType(0), logRouterTags(0), expectedLogSets(0), stopped(false) {}
LogSystemConfig() : logSystemType(LogSystemType::empty), logRouterTags(0), expectedLogSets(0), stopped(false) {}
std::string toString() const {
return format("type: %d oldGenerations: %d tags: %d %s", logSystemType, oldTLogs.size(), logRouterTags, describe(tLogs).c_str());
@ -286,7 +299,7 @@ struct LogSystemConfig {
bool operator == ( const LogSystemConfig& rhs ) const { return isEqual(rhs); }
bool isEqual(LogSystemConfig const& r) const {
return logSystemType == r.logSystemType && tLogs == r.tLogs && oldTLogs == r.oldTLogs && expectedLogSets == r.expectedLogSets && logRouterTags == r.logRouterTags && recruitmentID == r.recruitmentID && stopped == r.stopped && recoveredAt == r.recoveredAt;
return logSystemType == r.logSystemType && tLogs == r.tLogs && oldTLogs == r.oldTLogs && expectedLogSets == r.expectedLogSets && logRouterTags == r.logRouterTags && recruitmentID == r.recruitmentID && stopped == r.stopped && recoveredAt == r.recoveredAt && pseudoLocalities == r.pseudoLocalities;
}
bool isEqualIds(LogSystemConfig const& r) const {
@ -306,7 +319,7 @@ struct LogSystemConfig {
}
for( auto& i : r.tLogs ) {
for( auto& j : oldTLogs[0].tLogs ) {
for( auto& j : oldTLogs[0].tLogs ) {
if( i.isEqualIds(j) ) {
return true;
}
@ -317,7 +330,7 @@ struct LogSystemConfig {
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, logSystemType, tLogs, logRouterTags, oldTLogs, expectedLogSets, recruitmentID, stopped, recoveredAt);
serializer(ar, logSystemType, tLogs, logRouterTags, oldTLogs, expectedLogSets, recruitmentID, stopped, recoveredAt, pseudoLocalities);
}
};

View File

@ -470,7 +470,7 @@ ACTOR Future<Void> commitBatch(
for(auto it : versionReply.resolverChanges) {
auto rs = self->keyResolvers.modify(it.range);
for(auto r = rs.begin(); r != rs.end(); ++r)
r->value().push_back(std::make_pair(versionReply.resolverChangesVersion,it.dest));
r->value().emplace_back(versionReply.resolverChangesVersion,it.dest);
}
//TraceEvent("ProxyGotVer", self->dbgid).detail("Commit", commitVersion).detail("Prev", prevVersion);
@ -561,7 +561,7 @@ ACTOR Future<Void> commitBatch(
// These changes to txnStateStore will be committed by the other proxy, so we simply discard the commit message
auto fcm = self->logAdapter->getCommitMessage();
storeCommits.push_back(std::make_pair(fcm, self->txnStateStore->commit()));
storeCommits.emplace_back(fcm, self->txnStateStore->commit());
//discardCommit( dbgid, fcm, txnStateStore->commit() );
if (initialState) {
@ -648,7 +648,7 @@ ACTOR Future<Void> commitBatch(
state Optional<Value> metadataVersionAfter = self->txnStateStore->readValue(metadataVersionKey).get();
auto fcm = self->logAdapter->getCommitMessage();
storeCommits.push_back(std::make_pair(fcm, self->txnStateStore->commit()));
storeCommits.emplace_back(fcm, self->txnStateStore->commit());
self->version = commitVersion;
if (!self->validState.isSet()) self->validState.send(Void());
ASSERT(commitVersion);
@ -700,8 +700,7 @@ ACTOR Future<Void> commitBatch(
if (debugMutation("ProxyCommit", commitVersion, m))
TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(tags)).detail("Mutation", m.toString()).detail("Version", commitVersion);
for (auto& tag : tags)
toCommit.addTag(tag);
toCommit.addTags(tags);
toCommit.addTypedMessage(m);
}
else if (m.type == MutationRef::ClearRange) {
@ -712,41 +711,20 @@ ACTOR Future<Void> commitBatch(
// Fast path
if (debugMutation("ProxyCommit", commitVersion, m))
TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(ranges.begin().value().tags)).detail("Mutation", m.toString()).detail("Version", commitVersion);
auto& tags = ranges.begin().value().tags;
if(!tags.size()) {
for( auto info : ranges.begin().value().src_info ) {
tags.push_back( info->tag );
}
for( auto info : ranges.begin().value().dest_info ) {
tags.push_back( info->tag );
}
uniquify(tags);
}
for (auto& tag : tags)
toCommit.addTag(tag);
ranges.begin().value().populateTags();
toCommit.addTags(ranges.begin().value().tags);
}
else {
TEST(true); //A clear range extends past a shard boundary
std::set<Tag> allSources;
for (auto r : ranges) {
auto& tags = r.value().tags;
if(!tags.size()) {
for( auto info : r.value().src_info ) {
tags.push_back(info->tag);
}
for( auto info : r.value().dest_info ) {
tags.push_back(info->tag);
}
uniquify(tags);
}
allSources.insert(tags.begin(), tags.end());
r.value().populateTags();
allSources.insert(r.value().tags.begin(), r.value().tags.end());
}
if (debugMutation("ProxyCommit", commitVersion, m))
TraceEvent("ProxyCommitTo", self->dbgid).detail("To", describe(allSources)).detail("Mutation", m.toString()).detail("Version", commitVersion);
for (auto& tag : allSources)
toCommit.addTag(tag);
toCommit.addTags(allSources);
}
toCommit.addTypedMessage(m);
}
@ -837,8 +815,7 @@ ACTOR Future<Void> commitBatch(
ASSERT( backupMutation.param1.startsWith(logRangeMutation.first) ); // We are writing into the configured destination
auto& tags = self->tagsForKey(backupMutation.param1);
for (auto& tag : tags)
toCommit.addTag(tag);
toCommit.addTags(tags);
toCommit.addTypedMessage(backupMutation);
// if (debugMutation("BackupProxyCommit", commitVersion, backupMutation)) {
@ -931,7 +908,7 @@ ACTOR Future<Void> commitBatch(
self->txsPopVersions.pop_front();
}
self->txsPopVersions.push_back(std::make_pair(commitVersion, msg.popTo));
self->txsPopVersions.emplace_back(commitVersion, msg.popTo);
}
self->logSystem->pop(msg.popTo, txsTag);
@ -1451,7 +1428,7 @@ ACTOR Future<Void> masterProxyServerCore(
auto rs = commitData.keyResolvers.modify(allKeys);
for(auto r = rs.begin(); r != rs.end(); ++r)
r->value().push_back(std::make_pair<Version,int>(0,0));
r->value().emplace_back(0,0);
commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), db->get(), false, addActor);
commitData.logAdapter = new LogSystemDiskQueueAdapter(commitData.logSystem, txsTag, Reference<AsyncVar<PeekSpecialInfo>>(), false);
@ -1563,7 +1540,6 @@ ACTOR Future<Void> masterProxyServerCore(
Standalone<VectorRef<MutationRef>> mutations;
std::vector<std::pair<MapPair<Key,ServerCacheInfo>,int>> keyInfoData;
vector<UID> src, dest;
Reference<StorageInfo> storageInfo;
ServerCacheInfo info;
for(auto &kv : data) {
if( kv.key.startsWith(keyServersPrefix) ) {
@ -1573,36 +1549,20 @@ ACTOR Future<Void> masterProxyServerCore(
info.tags.clear();
info.src_info.clear();
info.dest_info.clear();
for(auto& id : src) {
auto cacheItr = commitData.storageCache.find(id);
if(cacheItr == commitData.storageCache.end()) {
storageInfo = Reference<StorageInfo>( new StorageInfo() );
storageInfo->tag = decodeServerTagValue( commitData.txnStateStore->readValue( serverTagKeyFor(id) ).get().get() );
storageInfo->interf = decodeServerListValue( commitData.txnStateStore->readValue( serverListKeyFor(id) ).get().get() );
commitData.storageCache[id] = storageInfo;
} else {
storageInfo = cacheItr->second;
}
for (const auto& id : src) {
auto storageInfo = getStorageInfo(id, &commitData.storageCache, commitData.txnStateStore);
ASSERT(storageInfo->tag != invalidTag);
info.tags.push_back( storageInfo->tag );
info.src_info.push_back( storageInfo );
}
for(auto& id : dest) {
auto cacheItr = commitData.storageCache.find(id);
if(cacheItr == commitData.storageCache.end()) {
storageInfo = Reference<StorageInfo>( new StorageInfo() );
storageInfo->tag = decodeServerTagValue( commitData.txnStateStore->readValue( serverTagKeyFor(id) ).get().get() );
storageInfo->interf = decodeServerListValue( commitData.txnStateStore->readValue( serverListKeyFor(id) ).get().get() );
commitData.storageCache[id] = storageInfo;
} else {
storageInfo = cacheItr->second;
}
for (const auto& id : dest) {
auto storageInfo = getStorageInfo(id, &commitData.storageCache, commitData.txnStateStore);
ASSERT(storageInfo->tag != invalidTag);
info.tags.push_back( storageInfo->tag );
info.dest_info.push_back( storageInfo );
}
uniquify(info.tags);
keyInfoData.push_back( std::make_pair(MapPair<Key,ServerCacheInfo>(k, info), 1) );
keyInfoData.emplace_back(MapPair<Key,ServerCacheInfo>(k, info), 1);
}
} else {
mutations.push_back(mutations.arena(), MutationRef(MutationRef::SetValue, kv.key, kv.value));

View File

@ -373,7 +373,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
int unpoppedRecoveredTags;
Reference<TagData> getTagData(Tag tag) {
int idx = tag.locality >= 0 ? 2*tag.locality : 1-(2*tag.locality);
int idx = tag.toTagDataIndex();
if(idx >= tag_data.size()) {
tag_data.resize(idx+1);
}
@ -389,8 +389,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
popped = recoveredAt + 1;
}
Reference<TagData> newTagData = Reference<TagData>( new TagData(tag, popped, nothingPersistent, poppedRecently, unpoppedRecovered) );
int idx = tag.locality >= 0 ? 2*tag.locality : 1-(2*tag.locality);
tag_data[idx][tag.id] = newTagData;
tag_data[tag.toTagDataIndex()][tag.id] = newTagData;
return newTagData;
}
@ -883,25 +882,32 @@ std::deque<std::pair<Version, LengthPrefixedStringRef>> & getVersionMessages( Re
};
ACTOR Future<Void> tLogPop( TLogData* self, TLogPopRequest req, Reference<LogData> logData ) {
auto tagData = logData->getTagData(req.tag);
state Version upTo = req.to;
int8_t tagLocality = req.tag.locality;
if (logData->logSystem->get().isValid() && logData->logSystem->get()->isPseudoLocality(tagLocality)) {
upTo = logData->logSystem->get()->popPseudoLocalityTag(tagLocality, req.to);
tagLocality = tagLocalityLogRouter;
}
state Tag tag(tagLocality, req.tag.id);
auto tagData = logData->getTagData(tag);
if (!tagData) {
tagData = logData->createTagData(req.tag, req.to, true, true, false);
} else if (req.to > tagData->popped) {
tagData->popped = req.to;
tagData = logData->createTagData(tag, upTo, true, true, false);
} else if (upTo > tagData->popped) {
tagData->popped = upTo;
tagData->poppedRecently = true;
if(tagData->unpoppedRecovered && req.to > logData->recoveredAt) {
if(tagData->unpoppedRecovered && upTo > logData->recoveredAt) {
tagData->unpoppedRecovered = false;
logData->unpoppedRecoveredTags--;
TraceEvent("TLogPoppedTag", logData->logId).detail("Tags", logData->unpoppedRecoveredTags).detail("Tag", req.tag.toString()).detail("DurableKCVer", logData->durableKnownCommittedVersion).detail("RecoveredAt", logData->recoveredAt);
TraceEvent("TLogPoppedTag", logData->logId).detail("Tags", logData->unpoppedRecoveredTags).detail("Tag", tag.toString()).detail("DurableKCVer", logData->durableKnownCommittedVersion).detail("RecoveredAt", logData->recoveredAt);
if(logData->unpoppedRecoveredTags == 0 && logData->durableKnownCommittedVersion >= logData->recoveredAt && logData->recoveryComplete.canBeSet()) {
logData->recoveryComplete.send(Void());
}
}
if ( req.to > logData->persistentDataDurableVersion )
wait(tagData->eraseMessagesBefore( req.to, self, logData, TaskTLogPop ));
//TraceEvent("TLogPop", self->dbgid).detail("Tag", req.tag).detail("To", req.to);
if (upTo > logData->persistentDataDurableVersion)
wait(tagData->eraseMessagesBefore(upTo, self, logData, TaskTLogPop));
//TraceEvent("TLogPop", self->dbgid).detail("Tag", tag.toString()).detail("To", upTo);
}
req.reply.send(Void());
@ -1980,7 +1986,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
throw logData->removed.getError();
}
if (req.recoverFrom.logSystemType == 2) {
if (req.recoverFrom.logSystemType == LogSystemType::tagPartitioned) {
logData->unrecoveredBefore = req.startVersion;
logData->recoveredAt = req.recoverAt;
logData->knownCommittedVersion = req.startVersion - 1;

View File

@ -429,7 +429,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
int unpoppedRecoveredTags;
Reference<TagData> getTagData(Tag tag) {
int idx = tag.locality >= 0 ? 2*tag.locality : 1-(2*tag.locality);
int idx = tag.toTagDataIndex();
if(idx >= tag_data.size()) {
tag_data.resize(idx+1);
}
@ -445,8 +445,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
popped = recoveredAt + 1;
}
Reference<TagData> newTagData = Reference<TagData>( new TagData(tag, popped, 0, nothingPersistent, poppedRecently, unpoppedRecovered) );
int idx = tag.locality >= 0 ? 2*tag.locality : 1-(2*tag.locality);
tag_data[idx][tag.id] = newTagData;
tag_data[tag.toTagDataIndex()][tag.id] = newTagData;
return newTagData;
}
@ -1123,26 +1122,33 @@ std::deque<std::pair<Version, LengthPrefixedStringRef>> & getVersionMessages( Re
};
ACTOR Future<Void> tLogPop( TLogData* self, TLogPopRequest req, Reference<LogData> logData ) {
auto tagData = logData->getTagData(req.tag);
state Version upTo = req.to;
int8_t tagLocality = req.tag.locality;
if (logData->logSystem->get().isValid() && logData->logSystem->get()->isPseudoLocality(tagLocality)) {
upTo = logData->logSystem->get()->popPseudoLocalityTag(tagLocality, req.to);
tagLocality = tagLocalityLogRouter;
}
state Tag tag(tagLocality, req.tag.id);
auto tagData = logData->getTagData(tag);
if (!tagData) {
tagData = logData->createTagData(req.tag, req.to, true, true, false);
} else if (req.to > tagData->popped) {
tagData->popped = req.to;
tagData = logData->createTagData(tag, upTo, true, true, false);
} else if (upTo > tagData->popped) {
tagData->popped = upTo;
tagData->poppedRecently = true;
tagData->requiresPoppedLocationUpdate = true;
if(tagData->unpoppedRecovered && req.to > logData->recoveredAt) {
if(tagData->unpoppedRecovered && upTo > logData->recoveredAt) {
tagData->unpoppedRecovered = false;
logData->unpoppedRecoveredTags--;
TraceEvent("TLogPoppedTag", logData->logId).detail("Tags", logData->unpoppedRecoveredTags).detail("Tag", req.tag.toString()).detail("DurableKCVer", logData->durableKnownCommittedVersion).detail("RecoveredAt", logData->recoveredAt);
TraceEvent("TLogPoppedTag", logData->logId).detail("Tags", logData->unpoppedRecoveredTags).detail("Tag", tag.toString()).detail("DurableKCVer", logData->durableKnownCommittedVersion).detail("RecoveredAt", logData->recoveredAt);
if(logData->unpoppedRecoveredTags == 0 && logData->durableKnownCommittedVersion >= logData->recoveredAt && logData->recoveryComplete.canBeSet()) {
logData->recoveryComplete.send(Void());
}
}
if ( req.to > logData->persistentDataDurableVersion )
wait(tagData->eraseMessagesBefore( req.to, self, logData, TaskTLogPop ));
//TraceEvent("TLogPop", self->dbgid).detail("Tag", req.tag).detail("To", req.to);
if (upTo > logData->persistentDataDurableVersion)
wait(tagData->eraseMessagesBefore(upTo, self, logData, TaskTLogPop));
//TraceEvent("TLogPop", self->dbgid).detail("Tag", tag.toString()).detail("To", upTo);
}
req.reply.send(Void());
@ -2367,7 +2373,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
throw logData->removed.getError();
}
if (req.recoverFrom.logSystemType == 2) {
if (req.recoverFrom.logSystemType == LogSystemType::tagPartitioned) {
logData->unrecoveredBefore = req.startVersion;
logData->recoveredAt = req.recoverAt;
logData->knownCommittedVersion = req.startVersion - 1;

View File

@ -45,8 +45,22 @@ struct OldLogData {
std::vector<Reference<LogSet>> tLogs;
int32_t logRouterTags;
Version epochEnd;
std::set<int8_t> pseudoLocalities;
OldLogData() : epochEnd(0), logRouterTags(0) {}
// Constructor for T of OldTLogConf and OldTLogCoreData
template<class T>
explicit OldLogData(const T& conf)
: logRouterTags(conf.logRouterTags), epochEnd(conf.epochEnd),
pseudoLocalities(conf.pseudoLocalities)
{
tLogs.resize(conf.tLogs.size());
for (int j = 0; j < conf.tLogs.size(); j++) {
Reference<LogSet> logSet(new LogSet(conf.tLogs[j]));
tLogs[j] = logSet;
}
}
};
struct LogLockInfo {
@ -58,15 +72,101 @@ struct LogLockInfo {
LogLockInfo() : epochEnd(std::numeric_limits<Version>::max()), isCurrent(false) {}
};
LogSet::LogSet(const TLogSet& tLogSet) :
tLogWriteAntiQuorum(tLogSet.tLogWriteAntiQuorum),
tLogReplicationFactor(tLogSet.tLogReplicationFactor),
tLogLocalities(tLogSet.tLogLocalities), tLogVersion(tLogSet.tLogVersion),
tLogPolicy(tLogSet.tLogPolicy), isLocal(tLogSet.isLocal),
locality(tLogSet.locality), startVersion(tLogSet.startVersion),
satelliteTagLocations(tLogSet.satelliteTagLocations)
{
for(const auto& log : tLogSet.tLogs) {
logServers.emplace_back(new AsyncVar<OptionalInterface<TLogInterface>>(log));
}
for(const auto& log : tLogSet.logRouters) {
logRouters.emplace_back(new AsyncVar<OptionalInterface<TLogInterface>>(log));
}
filterLocalityDataForPolicy(tLogPolicy, &tLogLocalities);
updateLocalitySet(tLogLocalities);
}
LogSet::LogSet(const CoreTLogSet& coreSet) :
tLogWriteAntiQuorum(coreSet.tLogWriteAntiQuorum),
tLogReplicationFactor(coreSet.tLogReplicationFactor),
tLogLocalities(coreSet.tLogLocalities), tLogVersion(coreSet.tLogVersion),
tLogPolicy(coreSet.tLogPolicy), isLocal(coreSet.isLocal),
locality(coreSet.locality), startVersion(coreSet.startVersion),
satelliteTagLocations(coreSet.satelliteTagLocations)
{
for(const auto& log : coreSet.tLogs) {
logServers.emplace_back(new AsyncVar<OptionalInterface<TLogInterface>>(OptionalInterface<TLogInterface>(log)));
}
filterLocalityDataForPolicy(tLogPolicy, &tLogLocalities);
updateLocalitySet(tLogLocalities);
}
TLogSet::TLogSet(const LogSet& rhs) :
tLogWriteAntiQuorum(rhs.tLogWriteAntiQuorum),
tLogReplicationFactor(rhs.tLogReplicationFactor),
tLogLocalities(rhs.tLogLocalities), tLogVersion(rhs.tLogVersion),
tLogPolicy(rhs.tLogPolicy), isLocal(rhs.isLocal), locality(rhs.locality),
startVersion(rhs.startVersion),
satelliteTagLocations(rhs.satelliteTagLocations)
{
for (const auto& tlog : rhs.logServers) {
tLogs.push_back(tlog->get());
}
for (const auto& logRouter : rhs.logRouters) {
logRouters.push_back(logRouter->get());
}
}
OldTLogConf::OldTLogConf(const OldLogData& oldLogData) :
logRouterTags(oldLogData.logRouterTags), epochEnd(oldLogData.epochEnd),
pseudoLocalities(oldLogData.pseudoLocalities)
{
for (const Reference<LogSet>& logSet : oldLogData.tLogs) {
tLogs.emplace_back(*logSet);
}
}
CoreTLogSet::CoreTLogSet(const LogSet& logset) :
tLogWriteAntiQuorum(logset.tLogWriteAntiQuorum),
tLogReplicationFactor(logset.tLogReplicationFactor),
tLogLocalities(logset.tLogLocalities),
tLogPolicy(logset.tLogPolicy), isLocal(logset.isLocal),
locality(logset.locality), startVersion(logset.startVersion),
satelliteTagLocations(logset.satelliteTagLocations),
tLogVersion(logset.tLogVersion)
{
for (const auto &log : logset.logServers) {
tLogs.push_back(log->get().id());
}
}
OldTLogCoreData::OldTLogCoreData(const OldLogData& oldData) :
logRouterTags(oldData.logRouterTags), epochEnd(oldData.epochEnd),
pseudoLocalities(oldData.pseudoLocalities)
{
for (const Reference<LogSet>& logSet : oldData.tLogs) {
if (logSet->logServers.size()) {
tLogs.emplace_back(*logSet);
}
}
}
struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogSystem> {
UID dbgid;
int logSystemType;
LogSystemType logSystemType;
std::vector<Reference<LogSet>> tLogs;
int expectedLogSets;
int logRouterTags;
UID recruitmentID;
int repopulateRegionAntiQuorum;
bool stopped;
std::set<int8_t> pseudoLocalities;
std::map<int8_t, Version> pseudoLocalityPopVersion;
// new members
Future<Void> rejoins;
@ -88,7 +188,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
std::vector<OldLogData> oldLogData;
AsyncTrigger logSystemConfigChanged;
TagPartitionedLogSystem( UID dbgid, LocalityData locality, Optional<PromiseStream<Future<Void>>> addActor = Optional<PromiseStream<Future<Void>>>() ) : dbgid(dbgid), locality(locality), addActor(addActor), popActors(false), recoveryCompleteWrittenToCoreState(false), remoteLogsWrittenToCoreState(false), logSystemType(0), logRouterTags(0), expectedLogSets(0), hasRemoteServers(false), stopped(false), repopulateRegionAntiQuorum(0) {}
TagPartitionedLogSystem( UID dbgid, LocalityData locality, Optional<PromiseStream<Future<Void>>> addActor = Optional<PromiseStream<Future<Void>>>() ) : dbgid(dbgid), locality(locality), addActor(addActor), popActors(false), recoveryCompleteWrittenToCoreState(false), remoteLogsWrittenToCoreState(false), logSystemType(LogSystemType::empty), logRouterTags(0), expectedLogSets(0), hasRemoteServers(false), stopped(false), repopulateRegionAntiQuorum(0) {}
virtual void stopRejoins() {
rejoins = Future<Void>();
@ -117,12 +217,47 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return dbgid;
}
void addPseudoLocality(int8_t locality) {
ASSERT(locality < 0);
pseudoLocalities.insert(locality);
pseudoLocalityPopVersion[locality] = 0;
}
Tag getPseudoPopTag(Tag tag, ProcessClass::ClassType type) override {
switch (type) {
case ProcessClass::LogRouterClass:
if (tag.locality == tagLocalityLogRouter && pseudoLocalities.count(tag.locality) > 0) {
tag.locality = tagLocalityLogRouterMapped;
}
break;
default:
break;
}
return tag;
}
bool isPseudoLocality(int8_t locality) override {
return pseudoLocalities.count(locality) > 0;
}
Version popPseudoLocalityTag(int8_t locality, Version upTo) override {
ASSERT(isPseudoLocality(locality));
auto& localityVersion = pseudoLocalityPopVersion[locality];
localityVersion = std::max(localityVersion, upTo);
Version minVersion = localityVersion;
for (const auto& it : pseudoLocalityPopVersion) {
minVersion = std::min(minVersion, it.second);
}
return minVersion;
}
static Future<Void> recoverAndEndEpoch(Reference<AsyncVar<Reference<ILogSystem>>> const& outLogSystem, UID const& dbgid, DBCoreState const& oldState, FutureStream<TLogRejoinRequest> const& rejoins, LocalityData const& locality, bool* forceRecovery) {
return epochEnd( outLogSystem, dbgid, oldState, rejoins, locality, forceRecovery );
}
static Reference<ILogSystem> fromLogSystemConfig( UID const& dbgid, LocalityData const& locality, LogSystemConfig const& lsConf, bool excludeRemote, bool useRecoveredAt, Optional<PromiseStream<Future<Void>>> addActor ) {
ASSERT( lsConf.logSystemType == 2 || (lsConf.logSystemType == 0 && !lsConf.tLogs.size()) );
ASSERT(lsConf.logSystemType == LogSystemType::tagPartitioned || (lsConf.logSystemType == LogSystemType::empty && !lsConf.tLogs.size()));
//ASSERT(lsConf.epoch == epoch); //< FIXME
Reference<TagPartitionedLogSystem> logSystem( new TagPartitionedLogSystem(dbgid, locality, addActor) );
@ -134,58 +269,15 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(useRecoveredAt) {
logSystem->recoveredAt = lsConf.recoveredAt;
}
for( int i = 0; i < lsConf.tLogs.size(); i++ ) {
TLogSet const& tLogSet = lsConf.tLogs[i];
if(!excludeRemote || tLogSet.isLocal) {
Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
logSystem->tLogs.push_back( logSet );
for( auto& log : tLogSet.tLogs) {
logSet->logServers.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
for( auto& log : tLogSet.logRouters) {
logSet->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
logSet->tLogVersion = tLogSet.tLogVersion;
logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum;
logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor;
logSet->tLogPolicy = tLogSet.tLogPolicy;
logSet->tLogLocalities = tLogSet.tLogLocalities;
logSet->isLocal = tLogSet.isLocal;
logSet->locality = tLogSet.locality;
logSet->startVersion = tLogSet.startVersion;
logSet->satelliteTagLocations = tLogSet.satelliteTagLocations;
filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities);
logSet->updateLocalitySet(logSet->tLogLocalities);
logSystem->pseudoLocalities = lsConf.pseudoLocalities;
for (const TLogSet& tLogSet : lsConf.tLogs) {
if (!excludeRemote || tLogSet.isLocal) {
logSystem->tLogs.emplace_back(new LogSet(tLogSet));
}
}
logSystem->oldLogData.resize(lsConf.oldTLogs.size());
for( int i = 0; i < lsConf.oldTLogs.size(); i++ ) {
logSystem->oldLogData[i].tLogs.resize(lsConf.oldTLogs[i].tLogs.size());
for( int j = 0; j < lsConf.oldTLogs[i].tLogs.size(); j++ ) {
Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
logSystem->oldLogData[i].tLogs[j] = logSet;
TLogSet const& tLogData = lsConf.oldTLogs[i].tLogs[j];
for( auto & log : tLogData.tLogs) {
logSet->logServers.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
for( auto & log : tLogData.logRouters) {
logSet->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
logSet->tLogVersion = tLogData.tLogVersion;
logSet->tLogWriteAntiQuorum = tLogData.tLogWriteAntiQuorum;
logSet->tLogReplicationFactor = tLogData.tLogReplicationFactor;
logSet->tLogPolicy = tLogData.tLogPolicy;
logSet->tLogLocalities = tLogData.tLogLocalities;
logSet->isLocal = tLogData.isLocal;
logSet->locality = tLogData.locality;
logSet->startVersion = tLogData.startVersion;
logSet->satelliteTagLocations = tLogData.satelliteTagLocations;
filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities);
logSet->updateLocalitySet(logSet->tLogLocalities);
}
logSystem->oldLogData[i].logRouterTags = lsConf.oldTLogs[i].logRouterTags;
logSystem->oldLogData[i].epochEnd = lsConf.oldTLogs[i].epochEnd;
for (const auto& oldTlogConf : lsConf.oldTLogs) {
logSystem->oldLogData.emplace_back(oldTlogConf);
}
logSystem->logSystemType = lsConf.logSystemType;
@ -193,68 +285,24 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
static Reference<ILogSystem> fromOldLogSystemConfig( UID const& dbgid, LocalityData const& locality, LogSystemConfig const& lsConf ) {
ASSERT( lsConf.logSystemType == 2 || (lsConf.logSystemType == 0 && !lsConf.tLogs.size()) );
ASSERT( lsConf.logSystemType == LogSystemType::tagPartitioned || (lsConf.logSystemType == LogSystemType::empty && !lsConf.tLogs.size()) );
//ASSERT(lsConf.epoch == epoch); //< FIXME
Reference<TagPartitionedLogSystem> logSystem( new TagPartitionedLogSystem(dbgid, locality) );
if(lsConf.oldTLogs.size()) {
logSystem->tLogs.resize( lsConf.oldTLogs[0].tLogs.size());
for( int i = 0; i < lsConf.oldTLogs[0].tLogs.size(); i++ ) {
Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
logSystem->tLogs[i] = logSet;
TLogSet const& tLogSet = lsConf.oldTLogs[0].tLogs[i];
for( auto & log : tLogSet.tLogs) {
logSet->logServers.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
for( auto & log : tLogSet.logRouters) {
logSet->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
logSet->tLogVersion = tLogSet.tLogVersion;
logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum;
logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor;
logSet->tLogPolicy = tLogSet.tLogPolicy;
logSet->tLogLocalities = tLogSet.tLogLocalities;
logSet->isLocal = tLogSet.isLocal;
logSet->locality = tLogSet.locality;
logSet->startVersion = tLogSet.startVersion;
logSet->satelliteTagLocations = tLogSet.satelliteTagLocations;
filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities);
logSet->updateLocalitySet(logSet->tLogLocalities);
if (lsConf.oldTLogs.size()) {
for (const TLogSet& tLogSet : lsConf.oldTLogs[0].tLogs) {
logSystem->tLogs.emplace_back(new LogSet(tLogSet));
}
logSystem->logRouterTags = lsConf.oldTLogs[0].logRouterTags;
//logSystem->epochEnd = lsConf.oldTLogs[0].epochEnd;
logSystem->oldLogData.resize(lsConf.oldTLogs.size()-1);
for( int i = 1; i < lsConf.oldTLogs.size(); i++ ) {
logSystem->oldLogData[i-1].tLogs.resize(lsConf.oldTLogs[i].tLogs.size());
for( int j = 0; j < lsConf.oldTLogs[i].tLogs.size(); j++ ) {
Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
logSystem->oldLogData[i-1].tLogs[j] = logSet;
TLogSet const& tLogSet = lsConf.oldTLogs[i].tLogs[j];
for( auto & log : tLogSet.tLogs) {
logSet->logServers.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
for( auto & log : tLogSet.logRouters) {
logSet->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( log ) ) );
}
logSet->tLogVersion = tLogSet.tLogVersion;
logSet->tLogWriteAntiQuorum = tLogSet.tLogWriteAntiQuorum;
logSet->tLogReplicationFactor = tLogSet.tLogReplicationFactor;
logSet->tLogPolicy = tLogSet.tLogPolicy;
logSet->tLogLocalities = tLogSet.tLogLocalities;
logSet->isLocal = tLogSet.isLocal;
logSet->locality = tLogSet.locality;
logSet->startVersion = tLogSet.startVersion;
logSet->satelliteTagLocations = tLogSet.satelliteTagLocations;
filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities);
logSet->updateLocalitySet(logSet->tLogLocalities);
}
logSystem->oldLogData[i-1].logRouterTags = lsConf.oldTLogs[i].logRouterTags;
logSystem->oldLogData[i-1].epochEnd = lsConf.oldTLogs[i].epochEnd;
for (int i = 1; i < lsConf.oldTLogs.size(); i++ ) {
logSystem->oldLogData.emplace_back(lsConf.oldTLogs[i]);
}
}
logSystem->logSystemType = lsConf.logSystemType;
logSystem->stopped = true;
logSystem->pseudoLocalities = lsConf.pseudoLocalities;
return logSystem;
}
@ -268,49 +316,21 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
newState.tLogs.clear();
newState.logRouterTags = logRouterTags;
for(auto &t : tLogs) {
if(t->logServers.size()) {
CoreTLogSet coreSet;
for(auto &log : t->logServers) {
coreSet.tLogs.push_back(log->get().id());
coreSet.tLogLocalities.push_back(log->get().interf().locality);
newState.pseudoLocalities = pseudoLocalities;
for (const auto &t : tLogs) {
if (t->logServers.size()) {
newState.tLogs.emplace_back(*t);
newState.tLogs.back().tLogLocalities.clear();
for (const auto& log : t->logServers) {
newState.tLogs.back().tLogLocalities.push_back(log->get().interf().locality);
}
coreSet.tLogVersion = t->tLogVersion;
coreSet.tLogWriteAntiQuorum = t->tLogWriteAntiQuorum;
coreSet.tLogReplicationFactor = t->tLogReplicationFactor;
coreSet.tLogPolicy = t->tLogPolicy;
coreSet.isLocal = t->isLocal;
coreSet.locality = t->locality;
coreSet.startVersion = t->startVersion;
coreSet.satelliteTagLocations = t->satelliteTagLocations;
newState.tLogs.push_back(coreSet);
}
}
newState.oldTLogData.clear();
if(!recoveryComplete.isValid() || !recoveryComplete.isReady() || (repopulateRegionAntiQuorum == 0 && (!remoteRecoveryComplete.isValid() || !remoteRecoveryComplete.isReady()))) {
newState.oldTLogData.resize(oldLogData.size());
for(int i = 0; i < oldLogData.size(); i++) {
for(auto &t : oldLogData[i].tLogs) {
if(t->logServers.size()) {
CoreTLogSet coreSet;
for(auto &log : t->logServers) {
coreSet.tLogs.push_back(log->get().id());
}
coreSet.tLogLocalities = t->tLogLocalities;
coreSet.tLogVersion = t->tLogVersion;
coreSet.tLogWriteAntiQuorum = t->tLogWriteAntiQuorum;
coreSet.tLogReplicationFactor = t->tLogReplicationFactor;
coreSet.tLogPolicy = t->tLogPolicy;
coreSet.isLocal = t->isLocal;
coreSet.locality = t->locality;
coreSet.startVersion = t->startVersion;
coreSet.satelliteTagLocations = t->satelliteTagLocations;
newState.oldTLogData[i].tLogs.push_back(coreSet);
}
}
newState.oldTLogData[i].logRouterTags = oldLogData[i].logRouterTags;
newState.oldTLogData[i].epochEnd = oldLogData[i].epochEnd;
for (const auto& oldData : oldLogData) {
newState.oldTLogData.emplace_back(oldData);
}
}
@ -456,7 +476,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(lastBegin < end && localSets.size()) {
TraceEvent("TLogPeekAllAddingCurrent", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("BestLogs", localSets[bestSet]->logServerString());
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet, localSets[bestSet]->bestLocationFor( tag ), tag, lastBegin, end, parallelGetMore)) );
cursors.emplace_back(new ILogSystem::SetPeekCursor( localSets, bestSet, localSets[bestSet]->bestLocationFor( tag ), tag, lastBegin, end, parallelGetMore));
}
int i = 0;
while(begin < lastBegin) {
@ -501,7 +521,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(thisBegin < lastBegin) {
if(thisBegin < end) {
TraceEvent("TLogPeekAllAddingOld", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("BestLogs", localOldSets[bestOldSet]->logServerString()).detail("LastBegin", lastBegin).detail("ThisBegin", thisBegin);
cursors.push_back( Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localOldSets, bestOldSet, localOldSets[bestOldSet]->bestLocationFor( tag ), tag, thisBegin, std::min(lastBegin, end), parallelGetMore)) );
cursors.emplace_back(new ILogSystem::SetPeekCursor(localOldSets, bestOldSet, localOldSets[bestOldSet]->bestLocationFor( tag ), tag, thisBegin, std::min(lastBegin, end), parallelGetMore));
epochEnds.push_back(LogMessageVersion(std::min(lastBegin, end)));
}
lastBegin = thisBegin;
@ -537,7 +557,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
std::vector< LogMessageVersion > epochEnds;
TraceEvent("TLogPeekRemoteAddingBest", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("BestSet", bestSet).detail("BestSetStart", lastBegin).detail("LogRouterIds", tLogs[bestSet]->logRouterString());
cursors.push_back( Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, lastBegin, getPeekEnd(), false, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0 ) ) );
cursors.emplace_back(new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, lastBegin, getPeekEnd(), false, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0 ) );
int i = 0;
while(begin < lastBegin) {
if(i == oldLogData.size()) {
@ -565,9 +585,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(thisBegin < lastBegin) {
TraceEvent("TLogPeekRemoteAddingOldBest", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("BestOldSet", bestOldSet).detail("LogRouterIds", oldLogData[i].tLogs[bestOldSet]->logRouterString())
.detail("LastBegin", lastBegin).detail("ThisBegin", thisBegin).detail("BestStartVer", oldLogData[i].tLogs[bestOldSet]->startVersion);
cursors.push_back( Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( oldLogData[i].tLogs[bestOldSet]->logRouters, -1, (int)oldLogData[i].tLogs[bestOldSet]->logRouters.size(), tag,
thisBegin, lastBegin, false, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0 ) ) );
epochEnds.push_back(LogMessageVersion(lastBegin));
cursors.emplace_back(new ILogSystem::MergedPeekCursor(oldLogData[i].tLogs[bestOldSet]->logRouters, -1, (int)oldLogData[i].tLogs[bestOldSet]->logRouters.size(), tag,
thisBegin, lastBegin, false, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0));
epochEnds.emplace_back(lastBegin);
lastBegin = thisBegin;
}
i++;
@ -652,10 +672,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(tLogs[bestSet]->startVersion < end) {
TraceEvent("TLogPeekLocalAddingBest", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).detail("BestSet", bestSet).detail("BestSetStart", tLogs[bestSet]->startVersion).detail("LogId", tLogs[bestSet]->logServers[tLogs[bestSet]->bestLocationFor( tag )]->get().id());
if(useMergePeekCursors) {
cursors.push_back( Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logServers, tLogs[bestSet]->bestLocationFor( tag ), tLogs[bestSet]->logServers.size() + 1 - tLogs[bestSet]->tLogReplicationFactor, tag,
tLogs[bestSet]->startVersion, end, true, tLogs[bestSet]->tLogLocalities, tLogs[bestSet]->tLogPolicy, tLogs[bestSet]->tLogReplicationFactor) ) );
cursors.emplace_back(new ILogSystem::MergedPeekCursor(tLogs[bestSet]->logServers, tLogs[bestSet]->bestLocationFor( tag ), tLogs[bestSet]->logServers.size() + 1 - tLogs[bestSet]->tLogReplicationFactor, tag,
tLogs[bestSet]->startVersion, end, true, tLogs[bestSet]->tLogLocalities, tLogs[bestSet]->tLogPolicy, tLogs[bestSet]->tLogReplicationFactor));
} else {
cursors.push_back( Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( tLogs[bestSet]->logServers[tLogs[bestSet]->bestLocationFor( tag )], tag, tLogs[bestSet]->startVersion, end, false, false ) ) );
cursors.emplace_back(new ILogSystem::ServerPeekCursor( tLogs[bestSet]->logServers[tLogs[bestSet]->bestLocationFor( tag )], tag, tLogs[bestSet]->startVersion, end, false, false));
}
}
Version lastBegin = tLogs[bestSet]->startVersion;
@ -705,9 +725,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(thisBegin < end) {
TraceEvent("TLogPeekLocalAddingOldBest", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end)
.detail("LogServers", oldLogData[i].tLogs[bestOldSet]->logServerString()).detail("ThisBegin", thisBegin).detail("LastBegin", lastBegin);
cursors.push_back( Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( oldLogData[i].tLogs[bestOldSet]->logServers, oldLogData[i].tLogs[bestOldSet]->bestLocationFor( tag ), oldLogData[i].tLogs[bestOldSet]->logServers.size() + 1 - oldLogData[i].tLogs[bestOldSet]->tLogReplicationFactor, tag,
thisBegin, std::min(lastBegin, end), useMergePeekCursors, oldLogData[i].tLogs[bestOldSet]->tLogLocalities, oldLogData[i].tLogs[bestOldSet]->tLogPolicy, oldLogData[i].tLogs[bestOldSet]->tLogReplicationFactor)));
epochEnds.push_back(LogMessageVersion(std::min(lastBegin, end)));
cursors.emplace_back(new ILogSystem::MergedPeekCursor( oldLogData[i].tLogs[bestOldSet]->logServers, oldLogData[i].tLogs[bestOldSet]->bestLocationFor( tag ), oldLogData[i].tLogs[bestOldSet]->logServers.size() + 1 - oldLogData[i].tLogs[bestOldSet]->tLogReplicationFactor, tag,
thisBegin, std::min(lastBegin, end), useMergePeekCursors, oldLogData[i].tLogs[bestOldSet]->tLogLocalities, oldLogData[i].tLogs[bestOldSet]->tLogPolicy, oldLogData[i].tLogs[bestOldSet]->tLogReplicationFactor));
epochEnds.emplace_back(std::min(lastBegin, end));
}
lastBegin = thisBegin;
}
@ -736,7 +756,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
cursors.resize(2);
cursors[1] = peekLocal(dbgid, tag, begin, localEnd, true, peekLocality);
cursors[0] = peekAll(dbgid, localEnd, end, tag, true);
epochEnds.push_back(LogMessageVersion(localEnd));
epochEnds.emplace_back(localEnd);
return Reference<ILogSystem::MultiCursor>( new ILogSystem::MultiCursor(cursors, epochEnds) );
} catch( Error& e ) {
@ -765,7 +785,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for(int i = 0; i < history.size(); i++) {
TraceEvent("TLogPeekSingleAddingOld", dbgid).detail("Tag", tag.toString()).detail("HistoryTag", history[i].second.toString()).detail("Begin", i+1 == history.size() ? begin : std::max(history[i+1].first, begin)).detail("End", history[i].first);
cursors.push_back( peekLocal(dbgid, history[i].second, i+1 == history.size() ? begin : std::max(history[i+1].first, begin), history[i].first, false) );
epochEnds.push_back(LogMessageVersion(history[i].first));
epochEnds.emplace_back(history[i].first);
}
return Reference<ILogSystem::MultiCursor>( new ILogSystem::MultiCursor(cursors, epochEnds) );
@ -775,12 +795,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
virtual Reference<IPeekCursor> peekLogRouter( UID dbgid, Version begin, Tag tag ) {
bool found = false;
for( auto& log : tLogs ) {
for( auto& router : log->logRouters ) {
if(router->get().id() == dbgid) {
found = true;
break;
}
}
found = log->hasLogRouter(dbgid);
if(found) {
break;
}
@ -815,12 +830,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for(auto& old : oldLogData) {
found = false;
for( auto& log : old.tLogs ) {
for( auto& router : log->logRouters ) {
if(router->get().id() == dbgid) {
found = true;
break;
}
}
found = log->hasLogRouter(dbgid);
if(found) {
break;
}
@ -1027,59 +1037,15 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystemConfig.recruitmentID = recruitmentID;
logSystemConfig.stopped = stopped;
logSystemConfig.recoveredAt = recoveredAt;
for( int i = 0; i < tLogs.size(); i++ ) {
Reference<LogSet> logSet = tLogs[i];
if(logSet->isLocal || remoteLogsWrittenToCoreState) {
logSystemConfig.tLogs.push_back(TLogSet());
TLogSet& log = logSystemConfig.tLogs.back();
log.tLogVersion = logSet->tLogVersion;
log.tLogWriteAntiQuorum = logSet->tLogWriteAntiQuorum;
log.tLogReplicationFactor = logSet->tLogReplicationFactor;
log.tLogPolicy = logSet->tLogPolicy;
log.tLogLocalities = logSet->tLogLocalities;
log.isLocal = logSet->isLocal;
log.locality = logSet->locality;
log.startVersion = logSet->startVersion;
log.satelliteTagLocations = logSet->satelliteTagLocations;
for( int i = 0; i < logSet->logServers.size(); i++ ) {
log.tLogs.push_back(logSet->logServers[i]->get());
}
for( int i = 0; i < logSet->logRouters.size(); i++ ) {
log.logRouters.push_back(logSet->logRouters[i]->get());
}
for (const Reference<LogSet>& logSet : tLogs) {
if (logSet->isLocal || remoteLogsWrittenToCoreState) {
logSystemConfig.tLogs.emplace_back(*logSet);
}
}
if(!recoveryCompleteWrittenToCoreState.get()) {
for( int i = 0; i < oldLogData.size(); i++ ) {
logSystemConfig.oldTLogs.push_back(OldTLogConf());
logSystemConfig.oldTLogs[i].tLogs.resize(oldLogData[i].tLogs.size());
for( int j = 0; j < oldLogData[i].tLogs.size(); j++ ) {
TLogSet& log = logSystemConfig.oldTLogs[i].tLogs[j];
Reference<LogSet> logSet = oldLogData[i].tLogs[j];
log.tLogVersion = logSet->tLogVersion;
log.tLogWriteAntiQuorum = logSet->tLogWriteAntiQuorum;
log.tLogReplicationFactor = logSet->tLogReplicationFactor;
log.tLogPolicy = logSet->tLogPolicy;
log.tLogLocalities = logSet->tLogLocalities;
log.isLocal = logSet->isLocal;
log.locality = logSet->locality;
log.startVersion = logSet->startVersion;
log.satelliteTagLocations = logSet->satelliteTagLocations;
for( int i = 0; i < logSet->logServers.size(); i++ ) {
log.tLogs.push_back(logSet->logServers[i]->get());
}
for( int i = 0; i < logSet->logRouters.size(); i++ ) {
log.logRouters.push_back(logSet->logRouters[i]->get());
}
}
logSystemConfig.oldTLogs[i].logRouterTags = oldLogData[i].logRouterTags;
logSystemConfig.oldTLogs[i].epochEnd = oldLogData[i].epochEnd;
for (const auto& oldData : oldLogData) {
logSystemConfig.oldTLogs.emplace_back(oldData);
}
}
return logSystemConfig;
@ -1091,7 +1057,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for(auto& t : tLogs) {
if(t->isLocal || remoteLogsWrittenToCoreState) {
for( int i = 0; i < t->logServers.size(); i++ ) {
logs.push_back(std::make_pair(t->logServers[i]->get().id(), t->logServers[i]->get().present() ? t->logServers[i]->get().interf().address() : NetworkAddress()));
logs.emplace_back(t->logServers[i]->get().id(), t->logServers[i]->get().present() ? t->logServers[i]->get().interf().address() : NetworkAddress());
}
}
}
@ -1099,7 +1065,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for( int i = 0; i < oldLogData.size(); i++ ) {
for(auto& t : oldLogData[i].tLogs) {
for( int j = 0; j < t->logServers.size(); j++ ) {
oldLogs.push_back(std::make_pair(t->logServers[j]->get().id(), t->logServers[j]->get().present() ? t->logServers[j]->get().interf().address() : NetworkAddress()));
oldLogs.emplace_back(t->logServers[j]->get().id(), t->logServers[j]->get().present() ? t->logServers[j]->get().interf().address() : NetworkAddress());
}
}
}
@ -1153,7 +1119,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
virtual bool hasRemoteLogs() {
return logRouterTags > 0;
return logRouterTags > 0 || pseudoLocalities.size() > 0;
}
virtual Tag getRandomRouterTag() {
@ -1373,60 +1339,23 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
state std::vector<std::vector<Reference<AsyncVar<bool>>>> logFailed;
state std::vector<Future<Void>> failureTrackers;
logServers.resize(prevState.tLogs.size());
for( int i = 0; i < prevState.tLogs.size(); i++ ) {
Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
logServers[i] = logSet;
CoreTLogSet const& coreSet = prevState.tLogs[i];
for (const CoreTLogSet& coreSet : prevState.tLogs) {
logServers.emplace_back(new LogSet(coreSet));
std::vector<Reference<AsyncVar<bool>>> failed;
for(int j = 0; j < coreSet.tLogs.size(); j++) {
Reference<AsyncVar<OptionalInterface<TLogInterface>>> logVar = Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(coreSet.tLogs[j]) ) );
logSet->logServers.push_back( logVar );
allLogServers.push_back( logVar );
failed.push_back( Reference<AsyncVar<bool>>( new AsyncVar<bool>() ) );
failureTrackers.push_back( monitorLog(logVar, failed[j] ) );
for (const auto& logVar : logServers.back()->logServers) {
allLogServers.push_back(logVar);
failed.emplace_back(new AsyncVar<bool>());
failureTrackers.push_back(monitorLog(logVar, failed.back()));
}
logSet->tLogVersion = coreSet.tLogVersion;
logSet->tLogReplicationFactor = coreSet.tLogReplicationFactor;
logSet->tLogWriteAntiQuorum = coreSet.tLogWriteAntiQuorum;
logSet->tLogPolicy = coreSet.tLogPolicy;
logSet->tLogLocalities = coreSet.tLogLocalities;
logSet->isLocal = coreSet.isLocal;
logSet->locality = coreSet.locality;
logSet->startVersion = coreSet.startVersion;
logSet->satelliteTagLocations = coreSet.satelliteTagLocations;
filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities);
logSet->updateLocalitySet(logSet->tLogLocalities);
logFailed.push_back(failed);
}
oldLogData.resize(prevState.oldTLogData.size());
for( int i = 0; i < prevState.oldTLogData.size(); i++ ) {
OldLogData& oldData = oldLogData[i];
OldTLogCoreData const& old = prevState.oldTLogData[i];
oldData.tLogs.resize(old.tLogs.size());
for( int j = 0; j < old.tLogs.size(); j++ ) {
Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
oldData.tLogs[j] = logSet;
CoreTLogSet const& log = old.tLogs[j];
for(int k = 0; k < log.tLogs.size(); k++) {
Reference<AsyncVar<OptionalInterface<TLogInterface>>> logVar = Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(log.tLogs[k]) ) );
logSet->logServers.push_back( logVar );
allLogServers.push_back( logVar );
}
logSet->tLogVersion = log.tLogVersion;
logSet->tLogReplicationFactor = log.tLogReplicationFactor;
logSet->tLogWriteAntiQuorum = log.tLogWriteAntiQuorum;
logSet->tLogPolicy = log.tLogPolicy;
logSet->tLogLocalities = log.tLogLocalities;
logSet->isLocal = log.isLocal;
logSet->locality = log.locality;
logSet->startVersion = log.startVersion;
logSet->satelliteTagLocations = log.satelliteTagLocations;
filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities);
logSet->updateLocalitySet(logSet->tLogLocalities);
for (const auto& oldTlogData : prevState.oldTLogData) {
oldLogData.emplace_back(oldTlogData);
for (const auto& logSet : oldLogData.back().tLogs) {
allLogServers.insert(allLogServers.end(), logSet->logServers.begin(), logSet->logServers.end());
}
oldData.epochEnd = old.epochEnd;
oldData.logRouterTags = old.logRouterTags;
}
state Future<Void> rejoins = trackRejoins( dbgid, allLogServers, rejoinRequests );
@ -1507,7 +1436,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
std::vector<Reference<AsyncVar<bool>>> failed;
for(auto& log : logServers[0]->logServers) {
failed.push_back( Reference<AsyncVar<bool>>( new AsyncVar<bool>() ) );
failed.emplace_back(new AsyncVar<bool>());
failureTrackers.push_back( monitorLog(log, failed.back() ) );
}
ASSERT(logFailed.size() == 1);
@ -1551,6 +1480,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystem->knownCommittedVersion = knownCommittedVersion;
logSystem->remoteLogsWrittenToCoreState = true;
logSystem->stopped = true;
logSystem->pseudoLocalities = prevState.pseudoLocalities;
outLogSystem->set(logSystem);
}
@ -1567,12 +1497,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
state Version lastStart = std::numeric_limits<Version>::max();
if(!forRemote) {
Version maxStart = 0;
for(auto& logSet : self->tLogs) {
if(logSet->isLocal) {
maxStart = std::max(maxStart, logSet->startVersion);
}
}
Version maxStart = getMaxLocalStartVersion(self->tLogs);
lastStart = std::max(startVersion, maxStart);
if( self->logRouterTags == 0 ) {
@ -1602,7 +1527,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for(auto& tLogs : self->tLogs) {
//Recruit log routers for old generations of the primary locality
if(tLogs->locality == locality) {
logRouterInitializationReplies.push_back(vector<Future<TLogInterface>>());
logRouterInitializationReplies.emplace_back();
for( int i = 0; i < self->logRouterTags; i++) {
InitializeLogRouterRequest req;
req.recoveryCount = recoveryCount;
@ -1621,12 +1546,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
for(auto& old : self->oldLogData) {
Version maxStart = 0;
for(auto& logSet : old.tLogs) {
if(logSet->isLocal) {
maxStart = std::max(maxStart, logSet->startVersion);
}
}
Version maxStart = getMaxLocalStartVersion(old.tLogs);
if(old.logRouterTags == 0 || maxStart >= lastStart) {
break;
@ -1651,7 +1571,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for(auto& tLogs : old.tLogs) {
//Recruit log routers for old generations of the primary locality
if(tLogs->locality == locality) {
logRouterInitializationReplies.push_back(vector<Future<TLogInterface>>());
logRouterInitializationReplies.emplace_back();
for( int i = 0; i < old.logRouterTags; i++) {
InitializeLogRouterRequest req;
req.recoveryCount = recoveryCount;
@ -1676,18 +1596,13 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
vector<Future<Void>> failed;
if(!forRemote) {
Version maxStart = 0;
for(auto& logSet : self->tLogs) {
if(logSet->isLocal) {
maxStart = std::max(maxStart, logSet->startVersion);
}
}
Version maxStart = getMaxLocalStartVersion(self->tLogs);
lastStart = std::max(startVersion, maxStart);
for(auto& tLogs : self->tLogs) {
if(tLogs->locality == locality) {
for( int i = 0; i < logRouterInitializationReplies[nextReplies].size(); i++ ) {
tLogs->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(logRouterInitializationReplies[nextReplies][i].get()) ) ) );
tLogs->logRouters.emplace_back(new AsyncVar<OptionalInterface<TLogInterface>>(OptionalInterface<TLogInterface>(logRouterInitializationReplies[nextReplies][i].get())));
failed.push_back( waitFailureClient( logRouterInitializationReplies[nextReplies][i].get().waitFailure, SERVER_KNOBS->TLOG_TIMEOUT, -SERVER_KNOBS->TLOG_TIMEOUT/SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY ) );
}
nextReplies++;
@ -1696,12 +1611,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
for(auto& old : self->oldLogData) {
Version maxStart = 0;
for(auto& logSet : old.tLogs) {
if(logSet->isLocal) {
maxStart = std::max(maxStart, logSet->startVersion);
}
}
Version maxStart = getMaxLocalStartVersion(old.tLogs);
if(old.logRouterTags == 0 || maxStart >= lastStart) {
break;
}
@ -1709,7 +1619,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for(auto& tLogs : old.tLogs) {
if(tLogs->locality == locality) {
for( int i = 0; i < logRouterInitializationReplies[nextReplies].size(); i++ ) {
tLogs->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(logRouterInitializationReplies[nextReplies][i].get()) ) ) );
tLogs->logRouters.emplace_back(new AsyncVar<OptionalInterface<TLogInterface>>(OptionalInterface<TLogInterface>(logRouterInitializationReplies[nextReplies][i].get())));
if(!forRemote) {
failed.push_back( waitFailureClient( logRouterInitializationReplies[nextReplies][i].get().waitFailure, SERVER_KNOBS->TLOG_TIMEOUT, -SERVER_KNOBS->TLOG_TIMEOUT/SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY ) );
}
@ -1727,11 +1637,31 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return Void();
}
static Version getMaxLocalStartVersion(std::vector<Reference<LogSet>>& tLogs) {
Version maxStart = 0;
for (const auto& logSet : tLogs) {
if(logSet->isLocal) {
maxStart = std::max(maxStart, logSet->startVersion);
}
}
return maxStart;
}
static std::vector<Tag> getLocalTags(int8_t locality, const std::vector<Tag>& allTags) {
std::vector<Tag> localTags;
for (const auto& tag : allTags) {
if (locality == tagLocalitySpecial || locality == tag.locality || tag.locality < 0) {
localTags.push_back(tag);
}
}
return localTags;
}
ACTOR static Future<Void> newRemoteEpoch( TagPartitionedLogSystem* self, Reference<TagPartitionedLogSystem> oldLogSystem, Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, int8_t remoteLocality, std::vector<Tag> allTags ) {
TraceEvent("RemoteLogRecruitment_WaitingForWorkers");
state RecruitRemoteFromConfigurationReply remoteWorkers = wait( fRemoteWorkers );
state Reference<LogSet> logSet = Reference<LogSet>( new LogSet() );
state Reference<LogSet> logSet(new LogSet());
logSet->tLogReplicationFactor = configuration.getRemoteTLogReplicationFactor();
logSet->tLogVersion = configuration.tLogVersion;
logSet->tLogPolicy = configuration.getRemoteTLogPolicy();
@ -1779,12 +1709,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logRouterInitializationReplies.push_back( transformErrors( throwErrorOr( remoteWorkers.logRouters[i%remoteWorkers.logRouters.size()].logRouter.getReplyUnlessFailedFor( req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
}
std::vector<Tag> localTags;
for(auto& tag : allTags) {
if(remoteLocality == tagLocalitySpecial || remoteLocality == tag.locality || tag.locality < 0) {
localTags.push_back(tag);
}
}
std::vector<Tag> localTags = getLocalTags(remoteLocality, allTags);
LogSystemConfig oldLogSystemConfig = oldLogSystem->getLogSystemConfig();
state vector<Future<TLogInterface>> remoteTLogInitializationReplies;
vector< InitializeTLogRequest > remoteTLogReqs( remoteWorkers.remoteTLogs.size() );
@ -1794,7 +1720,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
req.logVersion = configuration.tLogVersion;
req.storeType = configuration.tLogDataStoreType;
req.spillType = configuration.tLogSpillType;
req.recoverFrom = oldLogSystem->getLogSystemConfig();
req.recoverFrom = oldLogSystemConfig;
req.recoverAt = oldLogSystem->recoverAt.get();
req.knownCommittedVersion = oldLogSystem->knownCommittedVersion;
req.epoch = recoveryCount;
@ -1827,7 +1753,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
wait( waitForAll(remoteTLogInitializationReplies) && waitForAll(logRouterInitializationReplies) && oldRouterRecruitment );
for( int i = 0; i < logRouterInitializationReplies.size(); i++ ) {
logSet->logRouters.push_back( Reference<AsyncVar<OptionalInterface<TLogInterface>>>( new AsyncVar<OptionalInterface<TLogInterface>>( OptionalInterface<TLogInterface>(logRouterInitializationReplies[i].get()) ) ) );
logSet->logRouters.emplace_back(new AsyncVar<OptionalInterface<TLogInterface>>(OptionalInterface<TLogInterface>(logRouterInitializationReplies[i].get())));
}
for( int i = 0; i < remoteTLogInitializationReplies.size(); i++ ) {
@ -1850,7 +1776,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
int8_t primaryLocality, int8_t remoteLocality, std::vector<Tag> allTags, Reference<AsyncVar<bool>> recruitmentStalled ) {
state double startTime = now();
state Reference<TagPartitionedLogSystem> logSystem( new TagPartitionedLogSystem(oldLogSystem->getDebugID(), oldLogSystem->locality) );
logSystem->logSystemType = 2;
logSystem->logSystemType = LogSystemType::tagPartitioned;
logSystem->expectedLogSets = 1;
logSystem->recoveredAt = oldLogSystem->recoverAt;
logSystem->repopulateRegionAntiQuorum = configuration.repopulateRegionAntiQuorum;
@ -1860,11 +1786,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(configuration.usableRegions > 1) {
logSystem->logRouterTags = recr.tLogs.size() * std::max<int>(1, configuration.desiredLogRouterCount / std::max<int>(1,recr.tLogs.size()));
logSystem->expectedLogSets++;
} else {
logSystem->logRouterTags = 0;
logSystem->addPseudoLocality(tagLocalityLogRouterMapped);
}
logSystem->tLogs.push_back( Reference<LogSet>( new LogSet() ) );
logSystem->tLogs.emplace_back(new LogSet());
logSystem->tLogs[0]->tLogVersion = configuration.tLogVersion;
logSystem->tLogs[0]->tLogWriteAntiQuorum = configuration.tLogWriteAntiQuorum;
logSystem->tLogs[0]->tLogReplicationFactor = configuration.tLogReplicationFactor;
@ -1875,7 +1800,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
state RegionInfo region = configuration.getRegion(recr.dcId);
if(region.satelliteTLogReplicationFactor > 0) {
logSystem->tLogs.push_back( Reference<LogSet>( new LogSet() ) );
logSystem->tLogs.emplace_back(new LogSet());
if(recr.satelliteFallback) {
logSystem->tLogs[1]->tLogWriteAntiQuorum = region.satelliteTLogWriteAntiQuorumFallback;
logSystem->tLogs[1]->tLogReplicationFactor = region.satelliteTLogReplicationFactorFallback;
@ -1903,15 +1828,13 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
if(oldLogSystem->tLogs.size()) {
logSystem->oldLogData.push_back(OldLogData());
logSystem->oldLogData.emplace_back();
logSystem->oldLogData[0].tLogs = oldLogSystem->tLogs;
logSystem->oldLogData[0].epochEnd = oldLogSystem->knownCommittedVersion + 1;
logSystem->oldLogData[0].logRouterTags = oldLogSystem->logRouterTags;
logSystem->oldLogData[0].pseudoLocalities = oldLogSystem->pseudoLocalities;
}
for(int i = 0; i < oldLogSystem->oldLogData.size(); i++) {
logSystem->oldLogData.push_back(oldLogSystem->oldLogData[i]);
}
logSystem->oldLogData.insert(logSystem->oldLogData.end(), oldLogSystem->oldLogData.begin(), oldLogSystem->oldLogData.end());
logSystem->tLogs[0]->startVersion = oldLogSystem->knownCommittedVersion + 1;
state int lockNum = 0;
@ -1959,12 +1882,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
oldLogSystem->logSystemConfigChanged.trigger();
}
std::vector<Tag> localTags;
for(auto& tag : allTags) {
if(primaryLocality == tagLocalitySpecial || primaryLocality == tag.locality || tag.locality < 0) {
localTags.push_back(tag);
}
}
std::vector<Tag> localTags = getLocalTags(primaryLocality, allTags);
state LogSystemConfig oldLogSystemConfig = oldLogSystem->getLogSystemConfig();
state vector<Future<TLogInterface>> initializationReplies;
vector< InitializeTLogRequest > reqs( recr.tLogs.size() );
@ -1974,7 +1893,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
req.logVersion = configuration.tLogVersion;
req.storeType = configuration.tLogDataStoreType;
req.spillType = configuration.tLogSpillType;
req.recoverFrom = oldLogSystem->getLogSystemConfig();
req.recoverFrom = oldLogSystemConfig;
req.recoverAt = oldLogSystem->recoverAt.get();
req.knownCommittedVersion = oldLogSystem->knownCommittedVersion;
req.epoch = recoveryCount;
@ -2008,8 +1927,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
state std::vector<Future<Void>> recoveryComplete;
if(region.satelliteTLogReplicationFactor > 0) {
std::vector<Tag> satelliteTags;
satelliteTags.push_back(txsTag);
std::vector<Tag> satelliteTags(1, txsTag);
state vector<Future<TLogInterface>> satelliteInitializationReplies;
vector< InitializeTLogRequest > sreqs( recr.satelliteTLogs.size() );
@ -2019,7 +1937,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
req.logVersion = configuration.tLogVersion;
req.storeType = configuration.tLogDataStoreType;
req.spillType = configuration.tLogSpillType;
req.recoverFrom = oldLogSystem->getLogSystemConfig();
req.recoverFrom = oldLogSystemConfig;
req.recoverAt = oldLogSystem->recoverAt.get();
req.knownCommittedVersion = oldLogSystem->knownCommittedVersion;
req.epoch = recoveryCount;
@ -2318,18 +2236,18 @@ Future<Void> ILogSystem::recoverAndEndEpoch(Reference<AsyncVar<Reference<ILogSys
}
Reference<ILogSystem> ILogSystem::fromLogSystemConfig( UID const& dbgid, struct LocalityData const& locality, struct LogSystemConfig const& conf, bool excludeRemote, bool useRecoveredAt, Optional<PromiseStream<Future<Void>>> addActor ) {
if (conf.logSystemType == 0)
if (conf.logSystemType == LogSystemType::empty)
return Reference<ILogSystem>();
else if (conf.logSystemType == 2)
else if (conf.logSystemType == LogSystemType::tagPartitioned)
return TagPartitionedLogSystem::fromLogSystemConfig( dbgid, locality, conf, excludeRemote, useRecoveredAt, addActor );
else
throw internal_error();
}
Reference<ILogSystem> ILogSystem::fromOldLogSystemConfig( UID const& dbgid, struct LocalityData const& locality, struct LogSystemConfig const& conf ) {
if (conf.logSystemType == 0)
if (conf.logSystemType == LogSystemType::empty)
return Reference<ILogSystem>();
else if (conf.logSystemType == 2)
else if (conf.logSystemType == LogSystemType::tagPartitioned)
return TagPartitionedLogSystem::fromOldLogSystemConfig( dbgid, locality, conf );
else
throw internal_error();

View File

@ -326,12 +326,12 @@ ACTOR Future<Void> newTLogServers( Reference<MasterData> self, RecruitFromConfig
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers = brokenPromiseToNever( self->clusterController.recruitRemoteFromConfiguration.getReply( RecruitRemoteFromConfigurationRequest( self->configuration, remoteDcId, recr.tLogs.size() * std::max<int>(1, self->configuration.desiredLogRouterCount / std::max<int>(1, recr.tLogs.size())), exclusionWorkerIds) ) );
self->primaryLocality = self->dcId_locality[recr.dcId];
self->logSystem = Reference<ILogSystem>();
self->logSystem = Reference<ILogSystem>(); // Cancels the actors in the previous log system.
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, fRemoteWorkers, self->configuration, self->cstate.myDBState.recoveryCount + 1, self->primaryLocality, self->dcId_locality[remoteDcId], self->allTags, self->recruitmentStalled ) );
self->logSystem = newLogSystem;
} else {
self->primaryLocality = tagLocalitySpecial;
self->logSystem = Reference<ILogSystem>();
self->logSystem = Reference<ILogSystem>(); // Cancels the actors in the previous log system.
Reference<ILogSystem> newLogSystem = wait( oldLogSystem->newEpoch( recr, Never(), self->configuration, self->cstate.myDBState.recoveryCount + 1, self->primaryLocality, tagLocalitySpecial, self->allTags, self->recruitmentStalled ) );
self->logSystem = newLogSystem;
}

View File

@ -55,7 +55,7 @@ using namespace boost::asio::ip;
//
// xyzdev
// vvvv
const uint64_t currentProtocolVersion = 0x0FDB00B061060001LL;
const uint64_t currentProtocolVersion = 0x0FDB00B061070001LL;
const uint64_t compatibleProtocolVersionMask = 0xffffffffffff0000LL;
const uint64_t minValidProtocolVersion = 0x0FDB00A200060001LL;