documentation

This commit is contained in:
Markus Pilman 2021-03-19 18:08:09 -06:00
parent 7718ea5cfc
commit 99ac47e96c
2 changed files with 64 additions and 7 deletions

View File

@ -109,12 +109,14 @@ template class WriteOnlySet<ActorLineage, unsigned, 1024>;
// testing code // testing code
namespace { namespace {
// Some statistics
std::atomic<unsigned long> instanceCounter = 0; std::atomic<unsigned long> instanceCounter = 0;
std::atomic<unsigned long> numInserts = 0; std::atomic<unsigned long> numInserts = 0;
std::atomic<unsigned long> numErase = 0; std::atomic<unsigned long> numErase = 0;
std::atomic<unsigned long> numLockedErase = 0; std::atomic<unsigned long> numLockedErase = 0;
std::atomic<unsigned long> numCopied = 0; std::atomic<unsigned long> numCopied = 0;
// A simple object that counts the number of its instances. This is used to detect memory leaks.
struct TestObject { struct TestObject {
mutable std::atomic<unsigned> _refCount = 1; mutable std::atomic<unsigned> _refCount = 1;
TestObject() { instanceCounter.fetch_add(1); } TestObject() { instanceCounter.fetch_add(1); }
@ -130,6 +132,7 @@ struct TestObject {
using TestSet = WriteOnlySet<TestObject, unsigned, 128>; using TestSet = WriteOnlySet<TestObject, unsigned, 128>;
using Clock = std::chrono::steady_clock; using Clock = std::chrono::steady_clock;
// An actor that can join a set of threads in an async way.
ACTOR Future<Void> threadjoiner(std::shared_ptr<std::vector<std::thread>> threads, std::shared_ptr<TestSet> set) { ACTOR Future<Void> threadjoiner(std::shared_ptr<std::vector<std::thread>> threads, std::shared_ptr<TestSet> set) {
loop { loop {
wait(delay(0.1)); wait(delay(0.1));
@ -156,6 +159,7 @@ ACTOR Future<Void> threadjoiner(std::shared_ptr<std::vector<std::thread>> thread
} }
} }
// occasionally copy the contents of the past set.
void testCopier(std::shared_ptr<TestSet> set, std::chrono::seconds runFor) { void testCopier(std::shared_ptr<TestSet> set, std::chrono::seconds runFor) {
auto start = Clock::now(); auto start = Clock::now();
while (true) { while (true) {
@ -168,6 +172,7 @@ void testCopier(std::shared_ptr<TestSet> set, std::chrono::seconds runFor) {
} }
} }
// In a loop adds and removes a set of objects to the set
void writer(std::shared_ptr<TestSet> set, std::chrono::seconds runFor) { void writer(std::shared_ptr<TestSet> set, std::chrono::seconds runFor) {
auto start = Clock::now(); auto start = Clock::now();
std::random_device rDev; std::random_device rDev;
@ -203,6 +208,7 @@ void writer(std::shared_ptr<TestSet> set, std::chrono::seconds runFor) {
} }
} }
// This unit test creates 5 writer threads and one copier thread.
TEST_CASE("/flow/WriteOnlySet") { TEST_CASE("/flow/WriteOnlySet") {
if (g_network->isSimulated()) { if (g_network->isSimulated()) {
// This test is not deterministic, so we shouldn't run it in simulation // This test is not deterministic, so we shouldn't run it in simulation

View File

@ -24,6 +24,21 @@
#include "flow/Trace.h" #include "flow/Trace.h"
#include <boost/lockfree/queue.hpp> #include <boost/lockfree/queue.hpp>
/**
* This is a Write-Only set that supports copying the whole content. This data structure is lock-free and allows a user
* to insert and remove objects up to a given capacity (passed by a template).
*
* Template parameters:
* \param T The type to store.
* \param IndexType The type used as an index
* \param CAPACITY The maximum number of object this structure can store (if a user tries to store more, insert will
* fail gracefully)
* \pre T implements `void addref() const` and `void delref() const`
* \pre IndexType must have a copy constructor
* \pre IndexType must have a trivial assignment operator
* \pre IndexType must have a trivial destructor
* \pre IndexType can be used as an index into a std::vector
*/
template <class T, class IndexType, IndexType CAPACITY> template <class T, class IndexType, IndexType CAPACITY>
class WriteOnlySet { class WriteOnlySet {
public: public:
@ -37,25 +52,61 @@ public:
WriteOnlySet(const WriteOnlySet&) = delete; WriteOnlySet(const WriteOnlySet&) = delete;
WriteOnlySet& operator=(const WriteOnlySet&) = delete; WriteOnlySet& operator=(const WriteOnlySet&) = delete;
// Returns the number of elements at the time of calling. Keep in mind that this is a lockfree data structure, so /**
// the actual size might change anytime after or even during the call. This function only guarantees that the size * Attempts to insert \p lineage into the set. This method can fail if the set is full (its size is equal to its
// was whatever the method returns at one point between the start and the end of the function call. The safest way * capacity). Calling insert on a full set is safe but the method will return \ref npos if the operation fails.
// to handle this is by assuming that this returns an estimate. *
unsigned size(); * \param lineage A reference to the object the user wants to insert.
* \ret An index that can later be used to erase the value again or \ref npos if the insert failed.
* \pre lineage.getPtr() % 2 == 0 (the memory for lineage has to be at least 2 byte aligned)
*/
[[nodiscard]] Index insert(const Reference<T>& lineage);
Index insert(const Reference<T>& lineage); /**
* Erases the object associated with \p idx from the set.
*
* \ret Whether the reference count was decremented. Usually the return value is only interesting for testing and
* benchmarking purposes and will in most cases be ignored. If \ref delref wasn't called, it will be called
* later. Note that at the time the return value is checked, \ref delref might already have been called.
*/
bool erase(Index idx); bool erase(Index idx);
/**
* Copies all elements that are stored in the set into a vector. This copy operation does NOT provide a snapshot of
* the data structure. The contract is weak:
* - All object that were in the set before copy is called and weren't removed until after copy returned are
* guaranteed to be in the result.
* - Any object that was inserted while copy is running might be in the result.
* - Any object that was erased while copy is running might be in the result.
*/
std::vector<Reference<T>> copy(); std::vector<Reference<T>> copy();
private: private:
// the implementation of erase -- the wrapper just makes the function a bit more readable.
bool eraseImpl(Index idx); bool eraseImpl(Index idx);
// the last bit of a pointer within the set is used like a boolean and true means that the object is locked. Locking
// an object is only relevant for memory management. A locked pointer can still be erased from the set, but the
// erase won't call delref on the object. Instead it will push the pointer into the \ref freeList and copy will call
// delref later.
static constexpr uintptr_t LOCK = 0b1; static constexpr uintptr_t LOCK = 0b1;
std::atomic<Index> _size = 0;
// The actual memory
std::vector<std::atomic<std::uintptr_t>> _set; std::vector<std::atomic<std::uintptr_t>> _set;
static_assert(std::atomic<Index>::is_always_lock_free, "Index type can't be used as a lock-free type"); static_assert(std::atomic<Index>::is_always_lock_free, "Index type can't be used as a lock-free type");
static_assert(std::atomic<Index>::is_always_lock_free, "uintptr_t can't be used as a lock-free type"); static_assert(std::atomic<Index>::is_always_lock_free, "uintptr_t can't be used as a lock-free type");
// The freeQueue. On creation all indexes (0..capacity-1) are pushed into this queue. On insert one element from
// this queue is consumed and the resulting number is used as an index into the set. On erase the index is given
// back to the freeQueue.
boost::lockfree::queue<Index, boost::lockfree::fixed_sized<true>, boost::lockfree::capacity<CAPACITY>> freeQueue; boost::lockfree::queue<Index, boost::lockfree::fixed_sized<true>, boost::lockfree::capacity<CAPACITY>> freeQueue;
// The freeList is used for memory management. Generally copying a shared pointer can't be done in a lock-free way.
// Instead, when we copy the data structure we first copy the address, then attempt to set the last bit to 1 and
// only if that succeeds we will increment the reference count. Whenever we attempt to remove an object
// in \ref erase we remove the object from the set (using an atomic compare and swap) and only decrement the
// reference count if the last bit is 0. If it's not we'll push the pointer into this free list.
// \ref copy will consume all elements from this freeList each time it runs and decrements the refcount for each
// element.
boost::lockfree::queue<T*, boost::lockfree::fixed_sized<true>, boost::lockfree::capacity<CAPACITY>> freeList; boost::lockfree::queue<T*, boost::lockfree::fixed_sized<true>, boost::lockfree::capacity<CAPACITY>> freeList;
}; };