forked from OSchip/llvm-project
[XRay] Move buffer extents back to the heap
Summary: This change addresses an issue which shows up with the synchronised race between threads writing into a buffer, and another thread reading the buffer. In a lot of cases, we cannot guarantee that threads will always see the signal to finalise their buffers in time despite the grace periods and state machine maintained through atomic variables. This change addresses it by ensuring that the same instance being updated to indicate how much of the buffer is "used" by the writing thread is the same instance being read by the thread processing the buffer to be written out to disk or handled through the iterators. To do this, we ensure that all the "extents" instances live in their own the backing store, in a different contiguous page from the buffer-specific backing store. We also take precautions to ensure that the atomic variables are cache-line-sized to prevent false-sharing from unnecessarily causing cache contention on unrelated writes/reads. It's feasible that we may in the future be able to move the storage of the extents objects into the single backing store, slightly changing the way to compute the size(s) of the buffers, but in the meantime we'll settle for the isolation afforded by having a different backing store for the extents instances. Reviewers: mboerger Subscribers: jfb, llvm-commits Differential Revision: https://reviews.llvm.org/D54684 llvm-svn: 347280
This commit is contained in:
parent
8e0e35a3f5
commit
ba02cb58cf
|
@ -82,7 +82,7 @@ std::string serialize(BufferQueue &Buffers, int32_t Version) {
|
|||
Serialized.append(reinterpret_cast<const char *>(&HeaderStorage),
|
||||
sizeof(XRayFileHeader));
|
||||
Buffers.apply([&](const BufferQueue::Buffer &B) {
|
||||
auto Size = atomic_load_relaxed(&B.Extents);
|
||||
auto Size = atomic_load_relaxed(B.Extents);
|
||||
auto Extents =
|
||||
createMetadataRecord<MetadataRecord::RecordKinds::BufferExtents>(Size);
|
||||
Serialized.append(reinterpret_cast<const char *>(&Extents),
|
||||
|
|
|
@ -23,7 +23,6 @@
|
|||
#include <sys/mman.h>
|
||||
|
||||
using namespace __xray;
|
||||
using namespace __sanitizer;
|
||||
|
||||
namespace {
|
||||
|
||||
|
@ -53,6 +52,18 @@ void incRefCount(BufferQueue::ControlBlock *C) {
|
|||
atomic_fetch_add(&C->RefCount, 1, memory_order_acq_rel);
|
||||
}
|
||||
|
||||
// We use a struct to ensure that we are allocating one atomic_uint64_t per
|
||||
// cache line. This allows us to not worry about false-sharing among atomic
|
||||
// objects being updated (constantly) by different threads.
|
||||
struct ExtentsPadded {
|
||||
union {
|
||||
atomic_uint64_t Extents;
|
||||
unsigned char Storage[kCacheLineSize];
|
||||
};
|
||||
};
|
||||
|
||||
constexpr size_t kExtentsSize = sizeof(ExtentsPadded);
|
||||
|
||||
} // namespace
|
||||
|
||||
BufferQueue::ErrorCode BufferQueue::init(size_t BS, size_t BC) {
|
||||
|
@ -71,13 +82,25 @@ BufferQueue::ErrorCode BufferQueue::init(size_t BS, size_t BC) {
|
|||
if (BackingStore == nullptr)
|
||||
return BufferQueue::ErrorCode::NotEnoughMemory;
|
||||
|
||||
auto CleanupBackingStore = __sanitizer::at_scope_exit([&, this] {
|
||||
auto CleanupBackingStore = at_scope_exit([&, this] {
|
||||
if (Success)
|
||||
return;
|
||||
deallocControlBlock(BackingStore, BufferSize, BufferCount);
|
||||
BackingStore = nullptr;
|
||||
});
|
||||
|
||||
// Initialize enough atomic_uint64_t instances, each
|
||||
ExtentsBackingStore = allocControlBlock(kExtentsSize, BufferCount);
|
||||
if (ExtentsBackingStore == nullptr)
|
||||
return BufferQueue::ErrorCode::NotEnoughMemory;
|
||||
|
||||
auto CleanupExtentsBackingStore = at_scope_exit([&, this] {
|
||||
if (Success)
|
||||
return;
|
||||
deallocControlBlock(ExtentsBackingStore, kExtentsSize, BufferCount);
|
||||
ExtentsBackingStore = nullptr;
|
||||
});
|
||||
|
||||
Buffers = initArray<BufferRep>(BufferCount);
|
||||
if (Buffers == nullptr)
|
||||
return BufferQueue::ErrorCode::NotEnoughMemory;
|
||||
|
@ -89,6 +112,7 @@ BufferQueue::ErrorCode BufferQueue::init(size_t BS, size_t BC) {
|
|||
// First, we initialize the refcount in the ControlBlock, which we treat as
|
||||
// being at the start of the BackingStore pointer.
|
||||
atomic_store(&BackingStore->RefCount, 1, memory_order_release);
|
||||
atomic_store(&ExtentsBackingStore->RefCount, 1, memory_order_release);
|
||||
|
||||
// Then we initialise the individual buffers that sub-divide the whole backing
|
||||
// store. Each buffer will start at the `Data` member of the ControlBlock, and
|
||||
|
@ -96,11 +120,15 @@ BufferQueue::ErrorCode BufferQueue::init(size_t BS, size_t BC) {
|
|||
for (size_t i = 0; i < BufferCount; ++i) {
|
||||
auto &T = Buffers[i];
|
||||
auto &Buf = T.Buff;
|
||||
atomic_store(&Buf.Extents, 0, memory_order_release);
|
||||
auto *E = reinterpret_cast<ExtentsPadded *>(&ExtentsBackingStore->Data +
|
||||
(kExtentsSize * i));
|
||||
Buf.Extents = &E->Extents;
|
||||
atomic_store(Buf.Extents, 0, memory_order_release);
|
||||
Buf.Generation = generation();
|
||||
Buf.Data = &BackingStore->Data + (BufferSize * i);
|
||||
Buf.Size = BufferSize;
|
||||
Buf.BackingStore = BackingStore;
|
||||
Buf.ExtentsBackingStore = ExtentsBackingStore;
|
||||
Buf.Count = BufferCount;
|
||||
T.Used = false;
|
||||
}
|
||||
|
@ -120,6 +148,7 @@ BufferQueue::BufferQueue(size_t B, size_t N,
|
|||
Mutex(),
|
||||
Finalizing{1},
|
||||
BackingStore(nullptr),
|
||||
ExtentsBackingStore(nullptr),
|
||||
Buffers(nullptr),
|
||||
Next(Buffers),
|
||||
First(Buffers),
|
||||
|
@ -144,6 +173,7 @@ BufferQueue::ErrorCode BufferQueue::getBuffer(Buffer &Buf) {
|
|||
}
|
||||
|
||||
incRefCount(BackingStore);
|
||||
incRefCount(ExtentsBackingStore);
|
||||
Buf = B->Buff;
|
||||
Buf.Generation = generation();
|
||||
B->Used = true;
|
||||
|
@ -159,6 +189,7 @@ BufferQueue::ErrorCode BufferQueue::releaseBuffer(Buffer &Buf) {
|
|||
if (Buf.Generation != generation() || LiveBuffers == 0) {
|
||||
Buf = {};
|
||||
decRefCount(Buf.BackingStore, Buf.Size, Buf.Count);
|
||||
decRefCount(Buf.ExtentsBackingStore, kExtentsSize, Buf.Count);
|
||||
return BufferQueue::ErrorCode::Ok;
|
||||
}
|
||||
|
||||
|
@ -176,8 +207,8 @@ BufferQueue::ErrorCode BufferQueue::releaseBuffer(Buffer &Buf) {
|
|||
B->Buff = Buf;
|
||||
B->Used = true;
|
||||
decRefCount(Buf.BackingStore, Buf.Size, Buf.Count);
|
||||
atomic_store(&B->Buff.Extents,
|
||||
atomic_load(&Buf.Extents, memory_order_acquire),
|
||||
decRefCount(Buf.ExtentsBackingStore, kExtentsSize, Buf.Count);
|
||||
atomic_store(B->Buff.Extents, atomic_load(Buf.Extents, memory_order_acquire),
|
||||
memory_order_release);
|
||||
Buf = {};
|
||||
return ErrorCode::Ok;
|
||||
|
@ -194,7 +225,9 @@ void BufferQueue::cleanupBuffers() {
|
|||
B->~BufferRep();
|
||||
deallocateBuffer(Buffers, BufferCount);
|
||||
decRefCount(BackingStore, BufferSize, BufferCount);
|
||||
decRefCount(ExtentsBackingStore, kExtentsSize, BufferCount);
|
||||
BackingStore = nullptr;
|
||||
ExtentsBackingStore = nullptr;
|
||||
Buffers = nullptr;
|
||||
BufferCount = 0;
|
||||
BufferSize = 0;
|
||||
|
|
|
@ -32,10 +32,11 @@ namespace __xray {
|
|||
class BufferQueue {
|
||||
public:
|
||||
/// ControlBlock represents the memory layout of how we interpret the backing
|
||||
/// store for all buffers managed by a BufferQueue instance. The ControlBlock
|
||||
/// has the reference count as the first member, sized according to
|
||||
/// platform-specific cache-line size. We never use the Buffer member of the
|
||||
/// union, which is only there for compiler-supported alignment and sizing.
|
||||
/// store for all buffers and extents managed by a BufferQueue instance. The
|
||||
/// ControlBlock has the reference count as the first member, sized according
|
||||
/// to platform-specific cache-line size. We never use the Buffer member of
|
||||
/// the union, which is only there for compiler-supported alignment and
|
||||
/// sizing.
|
||||
///
|
||||
/// This ensures that the `Data` member will be placed at least kCacheLineSize
|
||||
/// bytes from the beginning of the structure.
|
||||
|
@ -52,7 +53,7 @@ public:
|
|||
};
|
||||
|
||||
struct Buffer {
|
||||
atomic_uint64_t Extents{0};
|
||||
atomic_uint64_t *Extents = nullptr;
|
||||
uint64_t Generation{0};
|
||||
void *Data = nullptr;
|
||||
size_t Size = 0;
|
||||
|
@ -60,6 +61,7 @@ public:
|
|||
private:
|
||||
friend class BufferQueue;
|
||||
ControlBlock *BackingStore = nullptr;
|
||||
ControlBlock *ExtentsBackingStore = nullptr;
|
||||
size_t Count = 0;
|
||||
};
|
||||
|
||||
|
@ -142,6 +144,9 @@ private:
|
|||
// The collocated ControlBlock and buffer storage.
|
||||
ControlBlock *BackingStore;
|
||||
|
||||
// The collocated ControlBlock and extents storage.
|
||||
ControlBlock *ExtentsBackingStore;
|
||||
|
||||
// A dynamically allocated array of BufferRep instances.
|
||||
BufferRep *Buffers;
|
||||
|
||||
|
|
|
@ -64,7 +64,7 @@ template <size_t Version = 5> class FDRController {
|
|||
First = true;
|
||||
UndoableFunctionEnters = 0;
|
||||
UndoableTailExits = 0;
|
||||
atomic_store(&B.Extents, 0, memory_order_release);
|
||||
atomic_store(B.Extents, 0, memory_order_release);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -123,7 +123,7 @@ template <size_t Version = 5> class FDRController {
|
|||
if (First) {
|
||||
First = false;
|
||||
W.resetRecord();
|
||||
atomic_store(&B.Extents, 0, memory_order_release);
|
||||
atomic_store(B.Extents, 0, memory_order_release);
|
||||
return setupNewBuffer();
|
||||
}
|
||||
|
||||
|
|
|
@ -86,7 +86,7 @@ class FDRLogWriter {
|
|||
// read the bytes in the buffer will see the writes committed before the
|
||||
// extents are updated.
|
||||
atomic_thread_fence(memory_order_release);
|
||||
atomic_fetch_add(&Buffer.Extents, sizeof(T), memory_order_acq_rel);
|
||||
atomic_fetch_add(Buffer.Extents, sizeof(T), memory_order_acq_rel);
|
||||
}
|
||||
|
||||
public:
|
||||
|
@ -116,7 +116,7 @@ public:
|
|||
// read the bytes in the buffer will see the writes committed before the
|
||||
// extents are updated.
|
||||
atomic_thread_fence(memory_order_release);
|
||||
atomic_fetch_add(&Buffer.Extents, Size, memory_order_acq_rel);
|
||||
atomic_fetch_add(Buffer.Extents, Size, memory_order_acq_rel);
|
||||
return Size;
|
||||
}
|
||||
|
||||
|
@ -160,7 +160,7 @@ public:
|
|||
// read the bytes in the buffer will see the writes committed before the
|
||||
// extents are updated.
|
||||
atomic_thread_fence(memory_order_release);
|
||||
atomic_fetch_add(&Buffer.Extents, sizeof(R) + sizeof(A),
|
||||
atomic_fetch_add(Buffer.Extents, sizeof(R) + sizeof(A),
|
||||
memory_order_acq_rel);
|
||||
return true;
|
||||
}
|
||||
|
@ -185,7 +185,7 @@ public:
|
|||
// read the bytes in the buffer will see the writes committed before the
|
||||
// extents are updated.
|
||||
atomic_thread_fence(memory_order_release);
|
||||
atomic_fetch_add(&Buffer.Extents, sizeof(R) + EventSize,
|
||||
atomic_fetch_add(Buffer.Extents, sizeof(R) + EventSize,
|
||||
memory_order_acq_rel);
|
||||
return true;
|
||||
}
|
||||
|
@ -208,7 +208,7 @@ public:
|
|||
// read the bytes in the buffer will see the writes committed before the
|
||||
// extents are updated.
|
||||
atomic_thread_fence(memory_order_release);
|
||||
atomic_fetch_add(&Buffer.Extents, EventSize, memory_order_acq_rel);
|
||||
atomic_fetch_add(Buffer.Extents, EventSize, memory_order_acq_rel);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -216,13 +216,13 @@ public:
|
|||
|
||||
void resetRecord() {
|
||||
NextRecord = reinterpret_cast<char *>(Buffer.Data);
|
||||
atomic_store(&Buffer.Extents, 0, memory_order_release);
|
||||
atomic_store(Buffer.Extents, 0, memory_order_release);
|
||||
}
|
||||
|
||||
void undoWrites(size_t B) {
|
||||
DCHECK_GE(NextRecord - B, reinterpret_cast<char *>(Buffer.Data));
|
||||
NextRecord -= B;
|
||||
atomic_fetch_sub(&Buffer.Extents, B, memory_order_acq_rel);
|
||||
atomic_fetch_sub(Buffer.Extents, B, memory_order_acq_rel);
|
||||
}
|
||||
|
||||
}; // namespace __xray
|
||||
|
|
|
@ -250,7 +250,7 @@ XRayBuffer fdrIterator(const XRayBuffer B) {
|
|||
// fence ordering to ensure that writes we expect to have been completed
|
||||
// before the fence are fully committed before we read the extents.
|
||||
atomic_thread_fence(memory_order_acquire);
|
||||
auto BufferSize = atomic_load(&It->Extents, memory_order_acquire);
|
||||
auto BufferSize = atomic_load(It->Extents, memory_order_acquire);
|
||||
SerializedBufferSize = BufferSize + sizeof(MetadataRecord);
|
||||
CurrentBuffer = allocateBuffer(SerializedBufferSize);
|
||||
if (CurrentBuffer == nullptr)
|
||||
|
@ -364,7 +364,7 @@ XRayLogFlushStatus fdrLoggingFlush() XRAY_NEVER_INSTRUMENT {
|
|||
// still use a Metadata record, but fill in the extents instead for the
|
||||
// data.
|
||||
MetadataRecord ExtentsRecord;
|
||||
auto BufferExtents = atomic_load(&B.Extents, memory_order_acquire);
|
||||
auto BufferExtents = atomic_load(B.Extents, memory_order_acquire);
|
||||
DCHECK(BufferExtents <= B.Size);
|
||||
ExtentsRecord.Type = uint8_t(RecordType::Metadata);
|
||||
ExtentsRecord.RecordKind =
|
||||
|
|
Loading…
Reference in New Issue