llvm-svn: 344771
This commit is contained in:
Douglas Yung 2018-10-19 04:09:32 +00:00
parent 392e006129
commit e61c8eb98f
4 changed files with 62 additions and 233 deletions

View File

@ -13,9 +13,7 @@
#include "xray_buffer_queue.h"
#include "gtest/gtest.h"
#include <atomic>
#include <future>
#include <thread>
#include <unistd.h>
namespace __xray {
@ -57,7 +55,6 @@ TEST(BufferQueueTest, ReleaseUnknown) {
BufferQueue::Buffer Buf;
Buf.Data = reinterpret_cast<void *>(0xdeadbeef);
Buf.Size = kSize;
Buf.Generation = Buffers.generation();
EXPECT_EQ(BufferQueue::ErrorCode::UnrecognizedBuffer,
Buffers.releaseBuffer(Buf));
}
@ -73,7 +70,8 @@ TEST(BufferQueueTest, ErrorsWhenFinalising) {
BufferQueue::Buffer OtherBuf;
ASSERT_EQ(BufferQueue::ErrorCode::QueueFinalizing,
Buffers.getBuffer(OtherBuf));
ASSERT_EQ(BufferQueue::ErrorCode::QueueFinalizing, Buffers.finalize());
ASSERT_EQ(BufferQueue::ErrorCode::QueueFinalizing,
Buffers.finalize());
ASSERT_EQ(Buffers.releaseBuffer(Buf), BufferQueue::ErrorCode::Ok);
}
@ -113,114 +111,4 @@ TEST(BufferQueueTest, Apply) {
ASSERT_EQ(Count, 10);
}
TEST(BufferQueueTest, GenerationalSupport) {
bool Success = false;
BufferQueue Buffers(kSize, 10, Success);
ASSERT_TRUE(Success);
BufferQueue::Buffer B0;
ASSERT_EQ(Buffers.getBuffer(B0), BufferQueue::ErrorCode::Ok);
ASSERT_EQ(Buffers.finalize(),
BufferQueue::ErrorCode::Ok); // No more new buffers.
// Re-initialise the queue.
ASSERT_EQ(Buffers.init(kSize, 10), BufferQueue::ErrorCode::Ok);
BufferQueue::Buffer B1;
ASSERT_EQ(Buffers.getBuffer(B1), BufferQueue::ErrorCode::Ok);
// Validate that the buffers come from different generations.
ASSERT_NE(B0.Generation, B1.Generation);
// We stash the current generation, for use later.
auto PrevGen = B1.Generation;
// At this point, we want to ensure that we can return the buffer from the
// first "generation" would still be accepted in the new generation...
EXPECT_EQ(Buffers.releaseBuffer(B0), BufferQueue::ErrorCode::Ok);
// ... and that the new buffer is also accepted.
EXPECT_EQ(Buffers.releaseBuffer(B1), BufferQueue::ErrorCode::Ok);
// A next round will do the same, ensure that we are able to do multiple
// rounds in this case.
ASSERT_EQ(Buffers.finalize(), BufferQueue::ErrorCode::Ok);
ASSERT_EQ(Buffers.init(kSize, 10), BufferQueue::ErrorCode::Ok);
EXPECT_EQ(Buffers.getBuffer(B0), BufferQueue::ErrorCode::Ok);
EXPECT_EQ(Buffers.getBuffer(B1), BufferQueue::ErrorCode::Ok);
// Here we ensure that the generation is different from the previous
// generation.
EXPECT_NE(B0.Generation, PrevGen);
EXPECT_EQ(B1.Generation, B1.Generation);
ASSERT_EQ(Buffers.finalize(), BufferQueue::ErrorCode::Ok);
EXPECT_EQ(Buffers.releaseBuffer(B0), BufferQueue::ErrorCode::Ok);
EXPECT_EQ(Buffers.releaseBuffer(B1), BufferQueue::ErrorCode::Ok);
}
TEST(BufferQueueTest, GenerationalSupportAcrossThreads) {
bool Success = false;
BufferQueue Buffers(kSize, 10, Success);
ASSERT_TRUE(Success);
std::atomic<int> Counter{0};
// This function allows us to use thread-local storage to isolate the
// instances of the buffers to be used. It also allows us signal the threads
// of a new generation, and allow those to get new buffers. This is
// representative of how we expect the buffer queue to be used by the XRay
// runtime.
auto Process = [&] {
thread_local BufferQueue::Buffer B;
ASSERT_EQ(Buffers.getBuffer(B), BufferQueue::ErrorCode::Ok);
auto FirstGen = B.Generation;
// Signal that we've gotten a buffer in the thread.
Counter.fetch_add(1, std::memory_order_acq_rel);
while (!Buffers.finalizing()) {
Buffers.releaseBuffer(B);
Buffers.getBuffer(B);
}
// Signal that we've exited the get/release buffer loop.
Counter.fetch_sub(1, std::memory_order_acq_rel);
if (B.Data != nullptr)
Buffers.releaseBuffer(B);
// Spin until we find that the Buffer Queue is no longer finalizing.
while (Buffers.getBuffer(B) != BufferQueue::ErrorCode::Ok)
;
// Signal that we've successfully gotten a buffer in the thread.
Counter.fetch_add(1, std::memory_order_acq_rel);
EXPECT_NE(FirstGen, B.Generation);
EXPECT_EQ(Buffers.releaseBuffer(B), BufferQueue::ErrorCode::Ok);
// Signal that we've successfully exited.
Counter.fetch_sub(1, std::memory_order_acq_rel);
};
// Spawn two threads running Process.
std::thread T0(Process), T1(Process);
// Spin until we find the counter is up to 2.
while (Counter.load(std::memory_order_acquire) != 2)
;
// Then we finalize, then re-initialize immediately.
Buffers.finalize();
// Spin until we find the counter is down to 0.
while (Counter.load(std::memory_order_acquire) != 0)
;
// Then we re-initialize.
EXPECT_EQ(Buffers.init(kSize, 10), BufferQueue::ErrorCode::Ok);
T0.join();
T1.join();
ASSERT_EQ(Counter.load(std::memory_order_acquire), 0);
}
} // namespace __xray

View File

@ -24,85 +24,58 @@
using namespace __xray;
using namespace __sanitizer;
BufferQueue::ErrorCode BufferQueue::init(size_t BS, size_t BC) {
SpinMutexLock Guard(&Mutex);
if (!finalizing())
return BufferQueue::ErrorCode::AlreadyInitialized;
bool Success = false;
BufferSize = BS;
BufferCount = BC;
BackingStore = allocateBuffer(BufferSize * BufferCount);
if (BackingStore == nullptr)
return BufferQueue::ErrorCode::NotEnoughMemory;
auto CleanupBackingStore = __sanitizer::at_scope_exit([&, this] {
if (Success)
return;
deallocateBuffer(BackingStore, BufferSize * BufferCount);
});
Buffers = initArray<BufferRep>(BufferCount);
if (Buffers == nullptr)
return BufferQueue::ErrorCode::NotEnoughMemory;
// At this point we increment the generation number to associate the buffers
// to the new generation.
atomic_fetch_add(&Generation, 1, memory_order_acq_rel);
Success = true;
for (size_t i = 0; i < BufferCount; ++i) {
auto &T = Buffers[i];
auto &Buf = T.Buff;
atomic_store(&Buf.Extents, 0, memory_order_release);
Buf.Generation = generation();
Buf.Data = reinterpret_cast<char *>(BackingStore) + (BufferSize * i);
Buf.Size = BufferSize;
T.Used = false;
}
Next = Buffers;
First = Buffers;
LiveBuffers = 0;
atomic_store(&Finalizing, 0, memory_order_release);
return BufferQueue::ErrorCode::Ok;
}
BufferQueue::BufferQueue(size_t B, size_t N,
bool &Success) XRAY_NEVER_INSTRUMENT
: BufferSize(B),
BufferCount(N),
Mutex(),
Finalizing{1},
BackingStore(nullptr),
Buffers(nullptr),
Finalizing{0},
BackingStore(allocateBuffer(B *N)),
Buffers(initArray<BufferQueue::BufferRep>(N)),
Next(Buffers),
First(Buffers),
LiveBuffers(0),
Generation{0} {
Success = init(B, N) == BufferQueue::ErrorCode::Ok;
LiveBuffers(0) {
if (BackingStore == nullptr) {
Success = false;
return;
}
if (Buffers == nullptr) {
deallocateBuffer(BackingStore, BufferSize * BufferCount);
Success = false;
return;
}
for (size_t i = 0; i < N; ++i) {
auto &T = Buffers[i];
auto &Buf = T.Buff;
Buf.Data = reinterpret_cast<char *>(BackingStore) + (BufferSize * i);
Buf.Size = B;
atomic_store(&Buf.Extents, 0, memory_order_release);
T.Used = false;
}
Success = true;
}
BufferQueue::ErrorCode BufferQueue::getBuffer(Buffer &Buf) {
if (atomic_load(&Finalizing, memory_order_acquire))
return ErrorCode::QueueFinalizing;
BufferRep *B = nullptr;
{
SpinMutexLock Guard(&Mutex);
if (LiveBuffers == BufferCount)
return ErrorCode::NotEnoughMemory;
B = Next++;
if (Next == (Buffers + BufferCount))
Next = Buffers;
++LiveBuffers;
}
SpinMutexLock Guard(&Mutex);
if (LiveBuffers == BufferCount)
return ErrorCode::NotEnoughMemory;
auto &T = *Next;
auto &B = T.Buff;
auto Extents = atomic_load(&B.Extents, memory_order_acquire);
atomic_store(&Buf.Extents, Extents, memory_order_release);
Buf.Data = B.Data;
Buf.Size = B.Size;
T.Used = true;
++LiveBuffers;
if (++Next == (Buffers + BufferCount))
Next = Buffers;
Buf.Data = B->Buff.Data;
Buf.Generation = generation();
Buf.Size = B->Buff.Size;
B->Used = true;
return ErrorCode::Ok;
}
@ -111,42 +84,29 @@ BufferQueue::ErrorCode BufferQueue::releaseBuffer(Buffer &Buf) {
// backing store's range.
if (Buf.Data < BackingStore ||
Buf.Data >
reinterpret_cast<char *>(BackingStore) + (BufferCount * BufferSize)) {
if (Buf.Generation != generation()) {
Buf.Data = nullptr;
Buf.Size = 0;
Buf.Generation = 0;
return BufferQueue::ErrorCode::Ok;
}
return BufferQueue::ErrorCode::UnrecognizedBuffer;
}
reinterpret_cast<char *>(BackingStore) + (BufferCount * BufferSize))
return ErrorCode::UnrecognizedBuffer;
BufferRep *B = nullptr;
{
SpinMutexLock Guard(&Mutex);
SpinMutexLock Guard(&Mutex);
// This points to a semantic bug, we really ought to not be releasing more
// buffers than we actually get.
if (LiveBuffers == 0)
return ErrorCode::NotEnoughMemory;
--LiveBuffers;
B = First++;
if (First == (Buffers + BufferCount))
First = Buffers;
}
// This points to a semantic bug, we really ought to not be releasing more
// buffers than we actually get.
if (LiveBuffers == 0)
return ErrorCode::NotEnoughMemory;
// Now that the buffer has been released, we mark it as "used".
B->Buff.Data = Buf.Data;
B->Buff.Size = Buf.Size;
B->Buff.Generation = Buf.Generation;
B->Used = true;
atomic_store(&B->Buff.Extents,
atomic_load(&Buf.Extents, memory_order_acquire),
memory_order_release);
auto Extents = atomic_load(&Buf.Extents, memory_order_acquire);
atomic_store(&First->Buff.Extents, Extents, memory_order_release);
First->Buff.Data = Buf.Data;
First->Buff.Size = Buf.Size;
First->Used = true;
Buf.Data = nullptr;
Buf.Size = 0;
Buf.Generation = 0;
atomic_store(&Buf.Extents, 0, memory_order_release);
--LiveBuffers;
if (++First == (Buffers + BufferCount))
First = Buffers;
return ErrorCode::Ok;
}

View File

@ -33,7 +33,6 @@ class BufferQueue {
public:
struct Buffer {
atomic_uint64_t Extents{0};
uint64_t Generation{0};
void *Data = nullptr;
size_t Size = 0;
};
@ -131,10 +130,6 @@ private:
// Count of buffers that have been handed out through 'getBuffer'.
size_t LiveBuffers;
// We use a generation number to identify buffers and which generation they're
// associated with.
atomic_uint64_t Generation;
public:
enum class ErrorCode : unsigned {
Ok,
@ -142,7 +137,6 @@ public:
QueueFinalizing,
UnrecognizedBuffer,
AlreadyFinalized,
AlreadyInitialized,
};
static const char *getErrorString(ErrorCode E) {
@ -157,8 +151,6 @@ public:
return "buffer being returned not owned by buffer queue";
case ErrorCode::AlreadyFinalized:
return "queue already finalized";
case ErrorCode::AlreadyInitialized:
return "queue already initialized";
}
return "unknown error";
}
@ -189,23 +181,10 @@ public:
/// the buffer being released.
ErrorCode releaseBuffer(Buffer &Buf);
/// Initializes the buffer queue, starting a new generation. We can re-set the
/// size of buffers with |BS| along with the buffer count with |BC|.
///
/// Returns:
/// - ErrorCode::Ok when we successfully initialize the buffer. This
/// requires that the buffer queue is previously finalized.
/// - ErrorCode::AlreadyInitialized when the buffer queue is not finalized.
ErrorCode init(size_t BS, size_t BC);
bool finalizing() const {
return atomic_load(&Finalizing, memory_order_acquire);
}
uint64_t generation() const {
return atomic_load(&Generation, memory_order_acquire);
}
/// Returns the configured size of the buffers in the buffer queue.
size_t ConfiguredBufferSize() const { return BufferSize; }

View File

@ -1056,7 +1056,8 @@ void fdrLoggingHandleTypedEvent(
endBufferIfFull();
}
XRayLogInitStatus fdrLoggingInit(size_t, size_t, void *Options,
XRayLogInitStatus fdrLoggingInit(UNUSED size_t BufferSize,
UNUSED size_t BufferMax, void *Options,
size_t OptionsSize) XRAY_NEVER_INSTRUMENT {
if (Options == nullptr)
return XRayLogInitStatus::XRAY_LOG_UNINITIALIZED;
@ -1103,8 +1104,9 @@ XRayLogInitStatus fdrLoggingInit(size_t, size_t, void *Options,
// environment-variable defined options.
FDRParser.ParseString(static_cast<const char *>(Options));
*fdrFlags() = FDRFlags;
auto BufferSize = FDRFlags.buffer_size;
auto BufferMax = FDRFlags.buffer_max;
BufferSize = FDRFlags.buffer_size;
BufferMax = FDRFlags.buffer_max;
bool Success = false;
if (BQ != nullptr) {