foundationdb/flow/ProcessEvents.cpp

269 lines
7.6 KiB
C++

/*
* ProcessEvents.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2023 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/ProcessEvents.h"
#include "flow/UnitTest.h"
#include <unordered_map>
#include <vector>
namespace {
struct EventImpl {
using Id = intptr_t;
std::vector<StringRef> names;
ProcessEvents::Callback callback;
EventImpl(std::vector<StringRef> names, ProcessEvents::Callback callback)
: names(std::move(names)), callback(std::move(callback)) {
addEvent();
}
Id id() const { return reinterpret_cast<intptr_t>(this); }
void addEvent();
void removeEvent();
};
struct ProcessEventsImpl {
struct Triggering {
unsigned& value;
ProcessEventsImpl& processEvents;
explicit Triggering(unsigned& value, ProcessEventsImpl& processEvents)
: value(value), processEvents(processEvents) {
++value;
}
~Triggering() {
if (--value == 0) {
// merge modifications back into the event map
for (auto const& p : processEvents.toRemove) {
for (auto const& n : p.second) {
processEvents.events[n].erase(p.first);
}
}
processEvents.toRemove.clear();
for (auto const& p : processEvents.toInsert) {
processEvents.events[p.first].insert(p.second.begin(), p.second.end());
}
processEvents.toInsert.clear();
}
}
};
using EventMap = std::unordered_map<StringRef, std::unordered_map<EventImpl::Id, EventImpl*>>;
unsigned triggering = 0;
EventMap events;
std::map<EventImpl::Id, std::vector<StringRef>> toRemove;
EventMap toInsert;
void trigger(StringRef name, std::any const& data, Error const& e) {
Triggering _(triggering, *this);
auto iter = events.find(name);
// strictly speaking this isn't a bug, but having callbacks that aren't caught
// by anyone could mean that something was misspelled. Therefore, the safe thing
// to do is to abort.
if (iter == events.end()) {
return;
}
std::unordered_map<EventImpl::Id, EventImpl*>& callbacks = iter->second;
// after we collected all unique callbacks we can call each
for (auto const& c : callbacks) {
try {
// it's possible that the callback has been removed in
// which case attempting to call it will be undefined
// behavior.
if (toRemove.count(c.first) == 0) {
c.second->callback(name, data, e);
}
} catch (...) {
// callbacks are not allowed to throw
UNSTOPPABLE_ASSERT(false);
}
}
}
void add(StringRef const& name, EventImpl* event) {
EventMap& m = triggering ? toInsert : events;
m[name].emplace(event->id(), event);
}
void add(std::vector<StringRef> const& names, EventImpl* event) {
for (auto const& name : names) {
add(name, event);
}
}
void remove(std::vector<StringRef> names, EventImpl::Id id) {
if (triggering) {
// it's possible that the event hasn't been added yet
bool inInsertMap = false;
for (auto const& name : names) {
auto it = toInsert.find(name);
if (it == toInsert.end()) {
// either all are in the insert map or none
ASSERT(!inInsertMap);
break;
}
auto it2 = it->second.find(id);
if (it2 == it->second.end()) {
// either all are in the insert map or none
ASSERT(!inInsertMap);
break;
}
inInsertMap = true;
it->second.erase(it2);
if (it->second.empty()) {
toInsert.erase(it);
}
}
if (!inInsertMap) {
toRemove.emplace(id, std::move(names));
}
} else {
for (auto const& name : names) {
events[name].erase(id);
}
}
}
};
ProcessEventsImpl processEventsImpl;
void EventImpl::addEvent() {
processEventsImpl.add(names, this);
}
void EventImpl::removeEvent() {
processEventsImpl.remove(names, this->id());
}
} // namespace
namespace ProcessEvents {
void trigger(StringRef name, std::any const& data, Error const& e) {
processEventsImpl.trigger(name, data, e);
}
void uncancellableEvent(StringRef name, Callback callback) {
new EventImpl({ name }, std::move(callback));
}
Event::Event(StringRef name, Callback callback) {
impl = new EventImpl({ std::move(name) }, std::move(callback));
}
Event::Event(std::vector<StringRef> names, Callback callback) {
impl = new EventImpl(std::move(names), std::move(callback));
}
Event::~Event() {
auto ptr = reinterpret_cast<EventImpl*>(impl);
ptr->removeEvent();
delete ptr;
}
TEST_CASE("/flow/ProcessEvents") {
{
// Basic test
unsigned numHits = 0;
Event _("basic"_sr, [&numHits](StringRef n, std::any const&, Error const& e) {
ASSERT_EQ(n, "basic"_sr);
ASSERT_EQ(e.code(), error_code_success);
++numHits;
});
trigger("basic"_sr, ""_sr, success());
ASSERT(numHits == 1);
trigger("basic"_sr, ""_sr, success());
}
{
// Test that Events can be added during a trigger
unsigned hits1 = 0;
std::vector<std::shared_ptr<Event>> createdEvents;
std::vector<unsigned> numHits;
numHits.reserve(2);
Event _("create"_sr, [&](StringRef n, std::any const&, Error const& e) {
ASSERT_EQ(n, "create"_sr);
ASSERT_EQ(e.code(), error_code_success);
++hits1;
numHits.push_back(0);
createdEvents.push_back(std::make_shared<Event>(
std::vector<StringRef>{ "create"_sr, "secondaries"_sr },
[&numHits, idx = numHits.size() - 1](StringRef n, std::any const&, Error const& e) {
ASSERT(n == "create"_sr || n == "secondaries");
ASSERT_EQ(e.code(), error_code_success);
++numHits[idx];
}));
});
trigger("create"_sr, ""_sr, success());
ASSERT_EQ(hits1, 1);
ASSERT_EQ(createdEvents.size(), 1);
ASSERT_EQ(numHits.size(), 1);
// an event that is created in a callback mustn't be called in the same trigger
ASSERT_EQ(numHits[0], 0);
trigger("create"_sr, ""_sr, success());
ASSERT_EQ(hits1, 2);
ASSERT_EQ(createdEvents.size(), 2);
ASSERT_EQ(numHits.size(), 2);
ASSERT_EQ(numHits[0], 1);
ASSERT_EQ(numHits[1], 0);
trigger("secondaries"_sr, ""_sr, success());
ASSERT_EQ(hits1, 2);
ASSERT_EQ(createdEvents.size(), 2);
ASSERT_EQ(numHits.size(), 2);
ASSERT_EQ(numHits[0], 2);
ASSERT_EQ(numHits[1], 1);
}
{
// Remove self
unsigned called_self_delete = 0, called_non_delete = 0;
std::unique_ptr<Event> ev;
auto callback_del = [&](StringRef n, std::any const&, Error const& e) {
ASSERT_EQ(n, "deletion"_sr);
ASSERT_EQ(e.code(), error_code_success);
++called_self_delete;
// remove Event
ev.reset();
};
ev.reset(new Event("deletion"_sr, callback_del));
Event _("deletion"_sr, [&](StringRef n, std::any const&, Error const& e) {
ASSERT_EQ(n, "deletion"_sr);
ASSERT_EQ(e.code(), error_code_success);
++called_non_delete;
});
trigger("deletion"_sr, ""_sr, success());
trigger("deletion"_sr, ""_sr, success());
ASSERT_EQ(called_self_delete, 1);
ASSERT_EQ(called_non_delete, 2);
}
{
// Reentrant safe
Event ev("reentrant"_sr, [](StringRef, std::any const& data, Error const&) {
// call depth of 5
auto v = std::any_cast<int>(data);
if (v < 5) {
Event doNotCall("reentrant"_sr, [](StringRef, std::any const&, Error const&) {
// should never be called
ASSERT(false);
});
trigger("reentrant"_sr, v + 1, success());
}
});
trigger("reentrant"_sr, 0, success());
}
return Void();
}
} // namespace ProcessEvents