Merge pull request #3591 from apple/release-6.3

Merge Release 6.3 to master
This commit is contained in:
Meng Xu 2020-08-02 12:45:02 -07:00 committed by GitHub
commit 62a6e71626
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 52 additions and 44 deletions

View File

@ -2154,8 +2154,8 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
return false;
} else {
state std::vector<AddressExclusion> addresses;
state std::set<AddressExclusion> exclusions;
state std::vector<AddressExclusion> exclusionVector;
state std::set<AddressExclusion> exclusionSet;
bool force = false;
state bool waitForAllExcluded = true;
state bool markFailed = false;
@ -2174,8 +2174,8 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
printf(" Do not include the `:tls' suffix when naming a process\n");
return true;
}
addresses.push_back( a );
exclusions.insert( a );
exclusionVector.push_back(a);
exclusionSet.insert(a);
}
}
@ -2183,7 +2183,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
if (markFailed) {
state bool safe;
try {
bool _safe = wait(makeInterruptable(checkSafeExclusions(db, addresses)));
bool _safe = wait(makeInterruptable(checkSafeExclusions(db, exclusionVector)));
safe = _safe;
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) throw;
@ -2245,7 +2245,8 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
return true;
}
NetworkAddress addr = NetworkAddress::parse(addrStr);
bool excluded = (process.has("excluded") && process.last().get_bool()) || addressExcluded(exclusions, addr);
bool excluded =
(process.has("excluded") && process.last().get_bool()) || addressExcluded(exclusionSet, addr);
ssTotalCount++;
if (excluded)
ssExcludedCount++;
@ -2286,7 +2287,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
}
}
wait(makeInterruptable(excludeServers(db, addresses, markFailed)));
wait(makeInterruptable(excludeServers(db, exclusionVector, markFailed)));
if (waitForAllExcluded) {
printf("Waiting for state to be removed from all excluded servers. This may take a while.\n");
@ -2297,7 +2298,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
warn.cancel();
state std::set<NetworkAddress> notExcludedServers =
wait(makeInterruptable(checkForExcludingServers(db, addresses, waitForAllExcluded)));
wait(makeInterruptable(checkForExcludingServers(db, exclusionVector, waitForAllExcluded)));
std::vector<ProcessData> workers = wait( makeInterruptable(getWorkers(db)) );
std::map<IPAddress, std::set<uint16_t>> workerPorts;
for(auto addr : workers)
@ -2305,7 +2306,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
// Print a list of all excluded addresses that don't have a corresponding worker
std::set<AddressExclusion> absentExclusions;
for(auto addr : addresses) {
for (const auto& addr : exclusionVector) {
auto worker = workerPorts.find(addr.ip);
if(worker == workerPorts.end())
absentExclusions.insert(addr);
@ -2313,43 +2314,46 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
absentExclusions.insert(addr);
}
for (auto addr : addresses) {
NetworkAddress _addr(addr.ip, addr.port);
if (absentExclusions.find(addr) != absentExclusions.end()) {
if(addr.port == 0)
for (const auto& exclusion : exclusionVector) {
if (absentExclusions.find(exclusion) != absentExclusions.end()) {
if (exclusion.port == 0) {
printf(" %s(Whole machine) ---- WARNING: Missing from cluster!Be sure that you excluded the "
"correct machines before removing them from the cluster!\n",
addr.ip.toString().c_str());
else
exclusion.ip.toString().c_str());
} else {
printf(" %s ---- WARNING: Missing from cluster! Be sure that you excluded the correct processes "
"before removing them from the cluster!\n",
addr.toString().c_str());
} else if (notExcludedServers.find(_addr) != notExcludedServers.end()) {
if (addr.port == 0)
exclusion.toString().c_str());
}
} else if (std::any_of(notExcludedServers.begin(), notExcludedServers.end(),
[&](const NetworkAddress& a) { return addressExcluded({ exclusion }, a); })) {
if (exclusion.port == 0) {
printf(" %s(Whole machine) ---- WARNING: Exclusion in progress! It is not safe to remove this "
"machine from the cluster\n",
addr.ip.toString().c_str());
else
exclusion.ip.toString().c_str());
} else {
printf(" %s ---- WARNING: Exclusion in progress! It is not safe to remove this process from the "
"cluster\n",
addr.toString().c_str());
exclusion.toString().c_str());
}
} else {
if (addr.port == 0)
if (exclusion.port == 0) {
printf(" %s(Whole machine) ---- Successfully excluded. It is now safe to remove this machine "
"from the cluster.\n",
addr.ip.toString().c_str());
else
exclusion.ip.toString().c_str());
} else {
printf(
" %s ---- Successfully excluded. It is now safe to remove this process from the cluster.\n",
addr.toString().c_str());
exclusion.toString().c_str());
}
}
}
bool foundCoordinator = false;
auto ccs = ClusterConnectionFile( ccf->getFilename() ).getConnectionString();
for( auto& c : ccs.coordinators()) {
if (std::count( addresses.begin(), addresses.end(), AddressExclusion(c.ip, c.port) ) ||
std::count( addresses.begin(), addresses.end(), AddressExclusion(c.ip) )) {
if (std::count(exclusionVector.begin(), exclusionVector.end(), AddressExclusion(c.ip, c.port)) ||
std::count(exclusionVector.begin(), exclusionVector.end(), AddressExclusion(c.ip))) {
printf("WARNING: %s is a coordinator!\n", c.toString().c_str());
foundCoordinator = true;
}

View File

@ -27,6 +27,7 @@
#include "flow/crc32c.h"
#include "flow/flow.h"
#include <atomic>
#include <cstdint>
#include <unordered_map>
@ -87,9 +88,9 @@ void* FastAllocator<Size>::freelist = nullptr;
typedef void (*ThreadInitFunction)();
ThreadInitFunction threadInitFunction = 0; // See ThreadCleanup.cpp in the C binding
void setFastAllocatorThreadInitFunction( ThreadInitFunction f ) {
void setFastAllocatorThreadInitFunction(ThreadInitFunction f) {
ASSERT( !threadInitFunction );
threadInitFunction = f;
threadInitFunction = f;
}
std::atomic<int64_t> g_hugeArenaMemory(0);
@ -221,28 +222,32 @@ struct FastAllocator<Size>::GlobalData {
CRITICAL_SECTION mutex;
std::vector<void*> magazines; // These magazines are always exactly magazine_size ("full")
std::vector<std::pair<int, void*>> partial_magazines; // Magazines that are not "full" and their counts. Only created by releaseThreadMagazines().
long long totalMemory;
std::atomic<long long> totalMemory;
long long partialMagazineUnallocatedMemory;
long long activeThreads;
GlobalData() : totalMemory(0), partialMagazineUnallocatedMemory(0), activeThreads(0) {
std::atomic<long long> activeThreads;
GlobalData() : totalMemory(0), partialMagazineUnallocatedMemory(0), activeThreads(0) {
InitializeCriticalSection(&mutex);
}
};
template <int Size>
long long FastAllocator<Size>::getTotalMemory() {
return globalData()->totalMemory;
return globalData()->totalMemory.load();
}
// This does not include memory held by various threads that's available for allocation
template <int Size>
long long FastAllocator<Size>::getApproximateMemoryUnused() {
return globalData()->magazines.size() * magazine_size * Size + globalData()->partialMagazineUnallocatedMemory;
EnterCriticalSection(&globalData()->mutex);
long long unused =
globalData()->magazines.size() * magazine_size * Size + globalData()->partialMagazineUnallocatedMemory;
LeaveCriticalSection(&globalData()->mutex);
return unused;
}
template <int Size>
long long FastAllocator<Size>::getActiveThreads() {
return globalData()->activeThreads;
return globalData()->activeThreads.load();
}
#if FAST_ALLOCATOR_DEBUG
@ -411,9 +416,7 @@ void FastAllocator<Size>::initThread() {
threadInitFunction();
}
EnterCriticalSection(&globalData()->mutex);
++globalData()->activeThreads;
LeaveCriticalSection(&globalData()->mutex);
globalData()->activeThreads.fetch_add(1);
threadData.freelist = nullptr;
threadData.alternate = nullptr;
@ -442,7 +445,7 @@ void FastAllocator<Size>::getMagazine() {
threadData.count = p.first;
return;
}
globalData()->totalMemory += magazine_size*Size;
globalData()->totalMemory.fetch_add(magazine_size * Size);
LeaveCriticalSection(&globalData()->mutex);
// Allocate a new page of data from the system allocator
@ -454,8 +457,9 @@ void FastAllocator<Size>::getMagazine() {
#if FAST_ALLOCATOR_DEBUG
#ifdef WIN32
static int alt = 0; alt++;
block = (void**)VirtualAllocEx( GetCurrentProcess(),
(void*)( ((getSizeCode(Size)<<11) + alt) * magazine_size*Size), magazine_size*Size, MEM_COMMIT|MEM_RESERVE, PAGE_READWRITE );
block =
(void**)VirtualAllocEx(GetCurrentProcess(), (void*)(((getSizeCode(Size) << 11) + alt) * magazine_size * Size),
magazine_size * Size, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
static int alt = 0; alt++;
void* desiredBlock = (void*)( ((getSizeCode(Size)<<11) + alt) * magazine_size*Size);
@ -479,7 +483,7 @@ void FastAllocator<Size>::getMagazine() {
block[i*PSize+1] = block[i*PSize] = &block[(i+1)*PSize];
check( &block[i*PSize], false );
}
block[(magazine_size-1)*PSize+1] = block[(magazine_size-1)*PSize] = nullptr;
check( &block[(magazine_size-1)*PSize], false );
threadData.freelist = block;
@ -509,7 +513,7 @@ void FastAllocator<Size>::releaseThreadMagazines() {
globalData()->magazines.push_back(thr.alternate);
}
}
--globalData()->activeThreads;
globalData()->activeThreads.fetch_add(-1);
LeaveCriticalSection(&globalData()->mutex);
thr.count = 0;

View File

@ -133,7 +133,7 @@ private:
};
static thread_local ThreadData threadData;
static thread_local bool threadInitialized;
static GlobalData* globalData() {
static GlobalData* globalData() noexcept {
#ifdef VALGRIND
ANNOTATE_RWLOCK_ACQUIRED(vLock, 1);
#endif