Add comments to HA code and loadBalance code

This commit is contained in:
Meng Xu 2021-02-10 13:45:06 -08:00
parent cb52535f46
commit 9122be4d81
9 changed files with 31 additions and 11 deletions

View File

@ -94,6 +94,7 @@ void fdb_flow_test() {
g_network->run(); g_network->run();
} }
// FDB obj used by bindings
namespace FDB { namespace FDB {
class DatabaseImpl : public Database, NonCopyable { class DatabaseImpl : public Database, NonCopyable {
public: public:

View File

@ -137,7 +137,7 @@ struct StorageInfo : NonCopyable, public ReferenceCounted<StorageInfo> {
}; };
struct ServerCacheInfo { struct ServerCacheInfo {
std::vector<Tag> tags; std::vector<Tag> tags; // all tags in both primary and remote DC for the key-range
std::vector<Reference<StorageInfo>> src_info; std::vector<Reference<StorageInfo>> src_info;
std::vector<Reference<StorageInfo>> dest_info; std::vector<Reference<StorageInfo>> dest_info;

View File

@ -169,7 +169,9 @@ void addLaggingRequest(Future<Optional<Reply>> reply, Promise<Void> requestFinis
// Keep trying to get a reply from any of servers until success or cancellation; tries to take into account // Keep trying to get a reply from any of servers until success or cancellation; tries to take into account
// failMon's information for load balancing and avoiding failed servers // failMon's information for load balancing and avoiding failed servers
// If ALL the servers are failed and the list of servers is not fresh, throws an exception to let the caller refresh the list of servers // If ALL the servers are failed and the list of servers is not fresh, throws an exception to let the caller refresh the
// list of servers When model is set, load balance among alternatives in the same DC, aiming to balance request queue
// length on these interfaces. If too many interfaces in the same DC are bad, try remote interfaces.
ACTOR template <class Interface, class Request, class Multi> ACTOR template <class Interface, class Request, class Multi>
Future< REPLY_TYPE(Request) > loadBalance( Future< REPLY_TYPE(Request) > loadBalance(
Reference<MultiInterface<Multi>> alternatives, Reference<MultiInterface<Multi>> alternatives,
@ -206,10 +208,15 @@ Future< REPLY_TYPE(Request) > loadBalance(
int badServers = 0; int badServers = 0;
for(int i=0; i<alternatives->size(); i++) { for(int i=0; i<alternatives->size(); i++) {
// countBest(): the number of alternatives in the same locality (i.e., DC by default) as alternatives[0].
// if the if-statement is correct, it won't try to send requests to the remote ones.
if(badServers < std::min(i, FLOW_KNOBS->LOAD_BALANCE_MAX_BAD_OPTIONS + 1) && i == alternatives->countBest()) { if(badServers < std::min(i, FLOW_KNOBS->LOAD_BALANCE_MAX_BAD_OPTIONS + 1) && i == alternatives->countBest()) {
// If there are not enough local servers that are bad,
// we won't even try to send requests to remote servers.
// An interface is bad if its endpoint fails or if reply from the interface has a high penalty.
break; break;
} }
RequestStream<Request> const* thisStream = &alternatives->get( i, channel ); RequestStream<Request> const* thisStream = &alternatives->get( i, channel );
if (!IFailureMonitor::failureMonitor().getState( thisStream->getEndpoint() ).failed) { if (!IFailureMonitor::failureMonitor().getState( thisStream->getEndpoint() ).failed) {
auto& qd = model->getMeasurement(thisStream->getEndpoint().token.first()); auto& qd = model->getMeasurement(thisStream->getEndpoint().token.first());
@ -217,9 +224,10 @@ Future< REPLY_TYPE(Request) > loadBalance(
double thisMetric = qd.smoothOutstanding.smoothTotal(); double thisMetric = qd.smoothOutstanding.smoothTotal();
double thisTime = qd.latency; double thisTime = qd.latency;
if(FLOW_KNOBS->LOAD_BALANCE_PENALTY_IS_BAD && qd.penalty > 1.001) { if(FLOW_KNOBS->LOAD_BALANCE_PENALTY_IS_BAD && qd.penalty > 1.001) {
// penalty is sent from server.
++badServers; ++badServers;
} }
if(thisMetric < bestMetric) { if(thisMetric < bestMetric) {
if(i != bestAlt) { if(i != bestAlt) {
nextAlt = bestAlt; nextAlt = bestAlt;
@ -249,7 +257,7 @@ Future< REPLY_TYPE(Request) > loadBalance(
if(now() > qd.failedUntil) { if(now() > qd.failedUntil) {
double thisMetric = qd.smoothOutstanding.smoothTotal(); double thisMetric = qd.smoothOutstanding.smoothTotal();
double thisTime = qd.latency; double thisTime = qd.latency;
if( thisMetric < nextMetric ) { if( thisMetric < nextMetric ) {
nextAlt = i; nextAlt = i;
nextMetric = thisMetric; nextMetric = thisMetric;

View File

@ -43,7 +43,7 @@ std::string describe( KVPair<K,V> const& p ) { return format("%d ", p.k) + descr
template <class T> template <class T>
struct ReferencedInterface : public ReferenceCounted<ReferencedInterface<T>> { struct ReferencedInterface : public ReferenceCounted<ReferencedInterface<T>> {
T interf; T interf;
int8_t distance; int8_t distance; // choose one enum value in LBDistance type
std::string toString() const { std::string toString() const {
return interf.toString(); return interf.toString();
} }
@ -222,7 +222,8 @@ public:
} }
private: private:
vector<Reference<ReferencedInterface<T>>> alternatives; vector<Reference<ReferencedInterface<T>>> alternatives;
int16_t bestCount; int16_t bestCount; // The number of interfaces in the same location as alternatives[0]. The same location means
// DC by default and machine if more than one alternatives are on the same machine).
}; };
template <class Ar, class T> void load(Ar& ar, Reference<MultiInterface<T>>&) { ASSERT(false); } //< required for Future<T> template <class Ar, class T> void load(Ar& ar, Reference<MultiInterface<T>>&) { ASSERT(false); } //< required for Future<T>

View File

@ -945,6 +945,7 @@ public:
} }
} }
// Check if txn system is recruited successfully in each region
void checkRegions(const std::vector<RegionInfo>& regions) { void checkRegions(const std::vector<RegionInfo>& regions) {
if(desiredDcIds.get().present() && desiredDcIds.get().get().size() == 2 && desiredDcIds.get().get()[0].get() == regions[0].dcId && desiredDcIds.get().get()[1].get() == regions[1].dcId) { if(desiredDcIds.get().present() && desiredDcIds.get().get().size() == 2 && desiredDcIds.get().get()[0].get() == regions[0].dcId && desiredDcIds.get().get()[1].get() == regions[1].dcId) {
return; return;

View File

@ -420,7 +420,7 @@ struct ProxyCommitData {
uint64_t commitVersionRequestNumber; uint64_t commitVersionRequestNumber;
uint64_t mostRecentProcessedRequestNumber; uint64_t mostRecentProcessedRequestNumber;
KeyRangeMap<Deque<std::pair<Version,int>>> keyResolvers; KeyRangeMap<Deque<std::pair<Version,int>>> keyResolvers;
KeyRangeMap<ServerCacheInfo> keyInfo; KeyRangeMap<ServerCacheInfo> keyInfo; // keyrange -> all storage servers in all DCs for the keyrange
KeyRangeMap<bool> cacheInfo; KeyRangeMap<bool> cacheInfo;
std::map<Key, applyMutationsData> uid_applyMutationsData; std::map<Key, applyMutationsData> uid_applyMutationsData;
bool firstProxy; bool firstProxy;

View File

@ -1038,7 +1038,12 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
bestSet = bestSatelliteSet; bestSet = bestSatelliteSet;
} }
TraceEvent("TLogPeekLogRouterOldSets", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("OldEpoch", old.epochEnd).detail("RecoveredAt", recoveredAt.present() ? recoveredAt.get() : -1).detail("FirstOld", firstOld); TraceEvent("TLogPeekLogRouterOldSets", dbgid)
.detail("Tag", tag.toString())
.detail("Begin", begin)
.detail("OldEpoch", old.epochEnd)
.detail("RecoveredAt", recoveredAt.present() ? recoveredAt.get() : -1)
.detail("FirstOld", firstOld);
//FIXME: do this merge on one of the logs in the other data center to avoid sending multiple copies across the WAN //FIXME: do this merge on one of the logs in the other data center to avoid sending multiple copies across the WAN
return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet, localSets[bestSet]->bestLocationFor( tag ), tag, begin, firstOld && recoveredAt.present() ? recoveredAt.get() + 1 : old.epochEnd, true ) ); return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet, localSets[bestSet]->bestLocationFor( tag ), tag, begin, firstOld && recoveredAt.present() ? recoveredAt.get() + 1 : old.epochEnd, true ) );
} }

View File

@ -210,8 +210,11 @@ struct RecruitFromConfigurationReply {
std::vector<WorkerInterface> proxies; std::vector<WorkerInterface> proxies;
std::vector<WorkerInterface> resolvers; std::vector<WorkerInterface> resolvers;
std::vector<WorkerInterface> storageServers; std::vector<WorkerInterface> storageServers;
std::vector<WorkerInterface> oldLogRouters; std::vector<WorkerInterface> oldLogRouters; // why need oldLogRouters?
Optional<Key> dcId; Optional<Key> dcId; // dcId is where master is recruited. It prefers to be in configuration.primaryDcId, but
// it can be recruited from configuration.secondaryDc: The dcId will be the secondaryDcId and
// this generation's primaryDC in memory is different from configuration.primaryDcId.
// when is dcId set?
bool satelliteFallback; bool satelliteFallback;
RecruitFromConfigurationReply() : satelliteFallback(false) {} RecruitFromConfigurationReply() : satelliteFallback(false) {}

View File

@ -691,6 +691,7 @@ public:
return counters.bytesInput.getValue() - counters.bytesDurable.getValue(); return counters.bytesInput.getValue() - counters.bytesDurable.getValue();
} }
// penalty used by loadBalance() to balance requests among SSes. We prefer SS with less write queue size.
double getPenalty() { double getPenalty() {
return std::max(std::max(1.0, (queueSize() - (SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER - return std::max(std::max(1.0, (queueSize() - (SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER -
2.0 * SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER)) / 2.0 * SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER)) /