Fix FastAlloc thread cleanup on OSX (#6485)

* Use ThreadData constructor and destructor

Instead of pthread api

* Make threadData a function static thread local variable

* Fix linux build
This commit is contained in:
Andrew Noyes 2022-03-04 12:49:40 -08:00 committed by GitHub
parent f4291af1ab
commit 2bd2ddd779
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 69 additions and 152 deletions

View File

@ -1,7 +1,6 @@
set(FDB_C_SRCS
fdb_c.cpp
foundationdb/fdb_c.h
ThreadCleanup.cpp)
foundationdb/fdb_c.h)
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/foundationdb)
@ -76,10 +75,8 @@ if(WIN32)
set_property(SOURCE ${asm_file} PROPERTY LANGUAGE ASM_MASM)
endif()
# The tests don't build on windows and ARM macs
# doctest doesn't seem to compile on ARM macs, we should
# check later whether this works
if(NOT WIN32 AND NOT IS_ARM_MAC)
# The tests don't build on windows
if(NOT WIN32)
set(MAKO_SRCS
test/mako/mako.c
test/mako/mako.h

View File

@ -1,64 +0,0 @@
/*
* ThreadCleanup.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/Platform.h"
#include "flow/FastAlloc.h"
#if defined(WIN32)
#include <Windows.h>
BOOL WINAPI DllMain(HINSTANCE dll, DWORD reason, LPVOID reserved) {
if (reason == DLL_THREAD_DETACH)
releaseAllThreadMagazines();
return TRUE;
}
#elif defined(__unixish__)
#ifdef __INTEL_COMPILER
#pragma warning(disable : 2415)
#endif
static pthread_key_t threadDestructorKey;
static void threadDestructor(void*) {
releaseAllThreadMagazines();
}
void registerThread() {
pthread_setspecific(threadDestructorKey, (const void*)1);
}
static int initThreadDestructorKey() {
if (!pthread_key_create(&threadDestructorKey, &threadDestructor)) {
registerThread();
setFastAllocatorThreadInitFunction(&registerThread);
}
return 0;
}
static int threadDestructorKeyInit = initThreadDestructorKey();
#else
#error Port me!
#endif

View File

@ -6,7 +6,7 @@ ExternalProject_Add(
doctest
PREFIX ${CMAKE_BINARY_DIR}/doctest
GIT_REPOSITORY https://github.com/onqtam/doctest.git
GIT_TAG 1c8da00c978c19e00a434b2b1f854fcffc9fba35 # v2.4.0
GIT_TAG 8424be522357e68d8c6178375546bb0cf9d5f6b3 # v2.4.1
TIMEOUT 10
CONFIGURE_COMMAND ""
BUILD_COMMAND ""

View File

@ -2358,6 +2358,19 @@ TEST_CASE("commit_does_not_reset") {
}
}
TEST_CASE("Fast alloc thread cleanup") {
// Try to cause an OOM if thread cleanup doesn't work
for (int i = 0; i < 50000; ++i) {
auto thread = std::thread([]() {
fdb::Transaction tr(db);
for (int s = 0; s < 11; ++s) {
tr.set(key("foo"), std::string(8 << s, '\x00'));
}
});
thread.join();
}
}
int main(int argc, char** argv) {
if (argc < 3) {
std::cout << "Unit tests for the FoundationDB C API.\n"

View File

@ -72,10 +72,13 @@
#endif
template <int Size>
INIT_SEG thread_local typename FastAllocator<Size>::ThreadData FastAllocator<Size>::threadData;
INIT_SEG thread_local typename FastAllocator<Size>::ThreadDataInit FastAllocator<Size>::threadDataInit;
template <int Size>
thread_local bool FastAllocator<Size>::threadInitialized = false;
typename FastAllocator<Size>::ThreadData& FastAllocator<Size>::threadData() noexcept {
static thread_local ThreadData threadData;
return threadData;
}
#ifdef VALGRIND
template <int Size>
@ -108,14 +111,6 @@ bool valgrindPrecise() {
template <int Size>
void* FastAllocator<Size>::freelist = nullptr;
typedef void (*ThreadInitFunction)();
ThreadInitFunction threadInitFunction = 0; // See ThreadCleanup.cpp in the C binding
void setFastAllocatorThreadInitFunction(ThreadInitFunction f) {
ASSERT(!threadInitFunction);
threadInitFunction = f;
}
std::atomic<int64_t> g_hugeArenaMemory(0);
double hugeArenaLastLogged = 0;
@ -310,9 +305,6 @@ static int64_t getSizeCode(int i) {
template <int Size>
void* FastAllocator<Size>::allocate() {
if (!threadInitialized) {
initThread();
}
#if defined(USE_GPERFTOOLS) || defined(ADDRESS_SANITIZER)
// Some usages of FastAllocator require 4096 byte alignment.
@ -327,7 +319,7 @@ void* FastAllocator<Size>::allocate() {
#endif
#if FASTALLOC_THREAD_SAFE
ThreadData& thr = threadData;
ThreadData& thr = threadData();
if (!thr.freelist) {
ASSERT(thr.count == 0);
if (thr.alternate) {
@ -366,9 +358,6 @@ void* FastAllocator<Size>::allocate() {
template <int Size>
void FastAllocator<Size>::release(void* ptr) {
if (!threadInitialized) {
initThread();
}
#if defined(USE_GPERFTOOLS) || defined(ADDRESS_SANITIZER)
return aligned_free(ptr);
@ -381,7 +370,7 @@ void FastAllocator<Size>::release(void* ptr) {
#endif
#if FASTALLOC_THREAD_SAFE
ThreadData& thr = threadData;
ThreadData& thr = threadData();
if (thr.count == magazine_size) {
if (thr.alternate) // Two full magazines, return one
releaseMagazine(thr.alternate);
@ -462,39 +451,33 @@ void FastAllocator<Size>::check(void* ptr, bool alloc) {
}
template <int Size>
void FastAllocator<Size>::initThread() {
threadInitialized = true;
if (threadInitFunction) {
threadInitFunction();
}
FastAllocator<Size>::ThreadData::ThreadData() {
globalData()->activeThreads.fetch_add(1);
threadData.freelist = nullptr;
threadData.alternate = nullptr;
threadData.count = 0;
freelist = nullptr;
alternate = nullptr;
count = 0;
}
template <int Size>
void FastAllocator<Size>::getMagazine() {
ASSERT(threadInitialized);
ASSERT(!threadData.freelist && !threadData.alternate && threadData.count == 0);
ThreadData& thr = threadData();
ASSERT(!thr.freelist && !thr.alternate && thr.count == 0);
EnterCriticalSection(&globalData()->mutex);
if (globalData()->magazines.size()) {
void* m = globalData()->magazines.back();
globalData()->magazines.pop_back();
LeaveCriticalSection(&globalData()->mutex);
threadData.freelist = m;
threadData.count = magazine_size;
thr.freelist = m;
thr.count = magazine_size;
return;
} else if (globalData()->partial_magazines.size()) {
std::pair<int, void*> p = globalData()->partial_magazines.back();
globalData()->partial_magazines.pop_back();
globalData()->partialMagazineUnallocatedMemory -= p.first * Size;
LeaveCriticalSection(&globalData()->mutex);
threadData.freelist = p.second;
threadData.count = p.first;
thr.freelist = p.second;
thr.count = p.first;
return;
}
globalData()->totalMemory.fetch_add(magazine_size * Size);
@ -546,55 +529,32 @@ void FastAllocator<Size>::getMagazine() {
block[(magazine_size - 1) * PSize + 1] = block[(magazine_size - 1) * PSize] = nullptr;
check(&block[(magazine_size - 1) * PSize], false);
threadData.freelist = block;
threadData.count = magazine_size;
thr.freelist = block;
thr.count = magazine_size;
}
template <int Size>
void FastAllocator<Size>::releaseMagazine(void* mag) {
ASSERT(threadInitialized);
EnterCriticalSection(&globalData()->mutex);
globalData()->magazines.push_back(mag);
LeaveCriticalSection(&globalData()->mutex);
}
template <int Size>
void FastAllocator<Size>::releaseThreadMagazines() {
if (threadInitialized) {
threadInitialized = false;
ThreadData& thr = threadData;
EnterCriticalSection(&globalData()->mutex);
if (thr.freelist || thr.alternate) {
if (thr.freelist) {
ASSERT(thr.count > 0 && thr.count <= magazine_size);
globalData()->partial_magazines.emplace_back(thr.count, thr.freelist);
globalData()->partialMagazineUnallocatedMemory += thr.count * Size;
}
if (thr.alternate) {
globalData()->magazines.push_back(thr.alternate);
}
}
globalData()->activeThreads.fetch_add(-1);
LeaveCriticalSection(&globalData()->mutex);
thr.count = 0;
thr.alternate = nullptr;
thr.freelist = nullptr;
FastAllocator<Size>::ThreadData::~ThreadData() {
EnterCriticalSection(&globalData()->mutex);
if (freelist) {
ASSERT_ABORT(count > 0 && count <= magazine_size);
globalData()->partial_magazines.emplace_back(count, freelist);
globalData()->partialMagazineUnallocatedMemory += count * Size;
}
}
if (alternate) {
globalData()->magazines.push_back(alternate);
}
globalData()->activeThreads.fetch_add(-1);
LeaveCriticalSection(&globalData()->mutex);
void releaseAllThreadMagazines() {
FastAllocator<16>::releaseThreadMagazines();
FastAllocator<32>::releaseThreadMagazines();
FastAllocator<64>::releaseThreadMagazines();
FastAllocator<96>::releaseThreadMagazines();
FastAllocator<128>::releaseThreadMagazines();
FastAllocator<256>::releaseThreadMagazines();
FastAllocator<512>::releaseThreadMagazines();
FastAllocator<1024>::releaseThreadMagazines();
FastAllocator<2048>::releaseThreadMagazines();
FastAllocator<4096>::releaseThreadMagazines();
FastAllocator<8192>::releaseThreadMagazines();
FastAllocator<16384>::releaseThreadMagazines();
count = 0;
alternate = nullptr;
freelist = nullptr;
}
int64_t getTotalUnusedAllocatedMemory() {

View File

@ -114,8 +114,6 @@ public:
static long long getApproximateMemoryUnused();
static long long getActiveThreads();
static void releaseThreadMagazines();
#ifdef ALLOC_INSTRUMENTATION
static volatile int32_t pageCount;
#endif
@ -134,9 +132,26 @@ private:
void* freelist;
int count; // there are count items on freelist
void* alternate; // alternate is either a full magazine, or an empty one
ThreadData();
~ThreadData();
};
static thread_local ThreadData threadData;
static thread_local bool threadInitialized;
struct ThreadDataInit {
ThreadDataInit() { threadData(); }
};
// Used to try to initialize threadData as early as possible. It's still
// possible that a static thread local variable (that owns fast-allocated
// memory) could be constructed before threadData, in which case threadData
// would be destroyed by the time that variable's destructor attempts to free.
// This is undefined behavior if this happens, which is why we want to
// initialize threadData as early as possible.
static thread_local ThreadDataInit threadDataInit;
// Used to access threadData. Returning a reference to a function-level
// static guarantees that threadData will be constructed before it's
// accessed here. Furthermore, if accessing threadData from a static thread
// local variable's constructor, this guarantees that threadData will
// outlive this object, since destruction order is the reverse of
// construction order.
static ThreadData& threadData() noexcept;
static GlobalData* globalData() noexcept {
#ifdef VALGRIND
ANNOTATE_RWLOCK_ACQUIRED(vLock, 1);
@ -151,7 +166,6 @@ private:
}
static void* freelist;
static void initThread();
static void getMagazine();
static void releaseMagazine(void*);
};
@ -160,9 +174,6 @@ extern std::atomic<int64_t> g_hugeArenaMemory;
void hugeArenaSample(int size);
void releaseAllThreadMagazines();
int64_t getTotalUnusedAllocatedMemory();
void setFastAllocatorThreadInitFunction(
void (*)()); // The given function will be called at least once in each thread that allocates from a FastAllocator.
// Currently just one such function is tracked.
inline constexpr int nextFastAllocatedSize(int x) {
assert(x > 0 && x <= 8192);