Getting from a dictionary isn't blocked until SYSTEM RELOAD is finished.

This commit is contained in:
Vitaly Baranov 2019-12-15 16:22:04 +03:00
parent d0d5c72e4d
commit 68187b5e08
1 changed files with 180 additions and 156 deletions

View File

@ -388,11 +388,11 @@ public:
infos.clear(); /// We clear this map to tell the threads that we don't want any load results anymore.
/// Wait for all the threads to finish.
while (!loading_ids.empty())
while (!loading_threads.empty())
{
auto it = loading_ids.begin();
auto it = loading_threads.begin();
auto thread = std::move(it->second);
loading_ids.erase(it);
loading_threads.erase(it);
lock.unlock();
event.notify_all();
thread.join();
@ -425,14 +425,13 @@ public:
if (!config_is_same)
{
/// Configuration has been changed.
info.config_changed = true;
info.object_config = new_config;
if (info.triedToLoad())
{
/// The object has been tried to load before, so it is currently in use or was in use
/// and we should try to reload it with the new config.
cancelLoading(info);
startLoading(name, info);
startLoading(info, true);
}
}
}
@ -445,7 +444,7 @@ public:
{
Info & info = infos.emplace(name, Info{name, config}).first->second;
if (always_load_everything)
startLoading(name, info);
startLoading(info);
}
}
@ -473,7 +472,7 @@ public:
/// Start loading all the objects which were not loaded yet.
for (auto & [name, info] : infos)
if (!info.triedToLoad())
startLoading(name, info);
startLoading(info);
}
}
@ -551,7 +550,7 @@ public:
ReturnType tryLoad(const String & name, Duration timeout)
{
std::unique_lock lock{mutex};
Info * info = loadImpl(name, timeout, lock);
Info * info = loadImpl(name, timeout, false, lock);
if (!info)
return notExists<ReturnType>(name);
return info->getLoadResult<ReturnType>();
@ -561,7 +560,7 @@ public:
ReturnType tryLoad(const FilterByNameFunction & filter, Duration timeout)
{
std::unique_lock lock{mutex};
loadImpl(filter, timeout, lock);
loadImpl(filter, timeout, false, lock);
return collectLoadResults<ReturnType>(filter);
}
@ -570,13 +569,7 @@ public:
ReturnType tryLoadOrReload(const String & name, Duration timeout)
{
std::unique_lock lock{mutex};
Info * info = getInfo(name);
if (!info)
return notExists<ReturnType>(name);
cancelLoading(*info);
info->forced_to_reload = true;
info = loadImpl(name, timeout, lock);
Info * info = loadImpl(name, timeout, true, lock);
if (!info)
return notExists<ReturnType>(name);
return info->getLoadResult<ReturnType>();
@ -586,16 +579,7 @@ public:
ReturnType tryLoadOrReload(const FilterByNameFunction & filter, Duration timeout)
{
std::unique_lock lock{mutex};
for (auto & [name, info] : infos)
{
if (filter(name))
{
cancelLoading(info);
info.forced_to_reload = true;
}
}
loadImpl(filter, timeout, lock);
loadImpl(filter, timeout, true, lock);
return collectLoadResults<ReturnType>(filter);
}
@ -611,7 +595,7 @@ public:
for (const auto & name_and_info : infos)
{
const auto & info = name_and_info.second;
if ((now >= info.next_update_time) && !info.loading() && info.loaded())
if ((now >= info.next_update_time) && !info.is_loading() && info.loaded())
should_update_map.emplace(info.object, info.failedToReload());
}
}
@ -641,7 +625,7 @@ public:
TimePoint now = std::chrono::system_clock::now();
for (auto & [name, info] : infos)
{
if ((now >= info.next_update_time) && !info.loading())
if ((now >= info.next_update_time) && !info.is_loading())
{
if (info.loaded())
{
@ -657,12 +641,12 @@ public:
}
/// Object was modified or it was failed to reload last time, so it should be reloaded.
startLoading(name, info);
startLoading(info);
}
else if (info.failed())
{
/// Object was never loaded successfully and should be reloaded.
startLoading(name, info);
startLoading(info);
}
}
}
@ -676,24 +660,24 @@ private:
bool loaded() const { return object != nullptr; }
bool failed() const { return !object && exception; }
bool loading() const { return loading_id != 0; }
bool triedToLoad() const { return loaded() || failed() || loading(); }
bool ready() const { return (loaded() || failed()) && !forced_to_reload; }
bool loadedOrFailed() const { return loaded() || failed(); }
bool triedToLoad() const { return loaded() || failed() || is_loading(); }
bool failedToReload() const { return loaded() && exception != nullptr; }
bool is_loading() const { return loading_id > state_id; }
Status status() const
{
if (object)
return loading() ? Status::LOADED_AND_RELOADING : Status::LOADED;
return is_loading() ? Status::LOADED_AND_RELOADING : Status::LOADED;
else if (exception)
return loading() ? Status::FAILED_AND_RELOADING : Status::FAILED;
return is_loading() ? Status::FAILED_AND_RELOADING : Status::FAILED;
else
return loading() ? Status::LOADING : Status::NOT_LOADED;
return is_loading() ? Status::LOADING : Status::NOT_LOADED;
}
Duration loadingDuration() const
{
if (loading())
if (is_loading())
return std::chrono::duration_cast<Duration>(std::chrono::system_clock::now() - loading_start_time);
return std::chrono::duration_cast<Duration>(loading_end_time - loading_start_time);
}
@ -726,7 +710,8 @@ private:
ObjectConfig object_config;
TimePoint loading_start_time;
TimePoint loading_end_time;
size_t loading_id = 0; /// Non-zero if it's loading right now.
size_t state_id = 0; /// Index of the current state of this `info`, this index is incremented every loading.
size_t loading_id = 0; /// The value which will be stored in `state_id` after finishing the current loading.
size_t error_count = 0; /// Numbers of errors since last successful loading.
std::exception_ptr exception; /// Last error occurred.
TimePoint next_update_time = TimePoint::max(); /// Time of the next update, `TimePoint::max()` means "never".
@ -769,17 +754,25 @@ private:
return results;
}
Info * loadImpl(const String & name, Duration timeout, std::unique_lock<std::mutex> & lock)
Info * loadImpl(const String & name, Duration timeout, bool forced_to_reload, std::unique_lock<std::mutex> & lock)
{
Info * info;
auto pred = [&]()
std::optional<size_t> min_id;
Info * info = nullptr;
auto pred = [&]
{
info = getInfo(name);
if (!info || info->ready())
return true;
if (!info->loading())
startLoading(name, *info);
return info->ready();
if (!info)
return true; /// stop
if (!min_id)
min_id = getMinIDToFinishLoading(forced_to_reload);
if (info->state_id >= min_id)
return true; /// stop
if (info->loading_id < min_id)
startLoading(*info, forced_to_reload, *min_id);
return false; /// wait for the next event
};
if (timeout == WAIT)
@ -790,19 +783,26 @@ private:
return info;
}
void loadImpl(const FilterByNameFunction & filter, Duration timeout, std::unique_lock<std::mutex> & lock)
void loadImpl(const FilterByNameFunction & filter, Duration timeout, bool forced_to_reload, std::unique_lock<std::mutex> & lock)
{
auto pred = [&]()
std::optional<size_t> min_id;
auto pred = [&]
{
if (!min_id)
min_id = getMinIDToFinishLoading(forced_to_reload);
bool all_ready = true;
for (auto & [name, info] : infos)
{
if (info.ready() || !filter(name))
if (!filter(name))
continue;
if (!info.loading())
startLoading(name, info);
if (!info.ready())
all_ready = false;
if (info.state_id >= min_id)
continue;
all_ready = false;
if (info.loading_id < min_id)
startLoading(info, forced_to_reload, *min_id);
}
return all_ready;
};
@ -813,13 +813,39 @@ private:
event.wait_for(lock, timeout, pred);
}
void startLoading(const String & name, Info & info)
/// When state_id >= getMinIDToFinishLoading() the loading is considered as finished.
size_t getMinIDToFinishLoading(bool forced_to_reload) const
{
if (info.loading())
return;
if (forced_to_reload)
{
/// We need to force reloading, that's why we return next_id_counter here
/// (because info.state_id < next_id_counter for any info).
return next_id_counter;
}
/// The loading of an object can cause the loading of another object.
/// We use the same "min_id" in this case to allows reloading multiple objects at once
/// taking into account their dependencies.
auto it = min_id_to_finish_loading_dependencies.find(std::this_thread::get_id());
if (it != min_id_to_finish_loading_dependencies.end())
return it->second;
/// We just need the first loading to be finished, that's why we return 1 here
/// (because info.state_id >= 1 since the first loading is finished, successfully or not).
return 1;
}
void startLoading(Info & info, bool forced_to_reload = false, size_t min_id_to_finish_loading_dependencies_ = 1)
{
if (info.is_loading())
{
if (!forced_to_reload)
return;
cancelLoading(info);
}
/// All loadings have unique loading IDs.
size_t loading_id = next_loading_id++;
size_t loading_id = next_id_counter++;
info.loading_id = loading_id;
info.loading_start_time = std::chrono::system_clock::now();
info.loading_end_time = TimePoint{};
@ -827,24 +853,88 @@ private:
if (enable_async_loading)
{
/// Put a job to the thread pool for the loading.
auto thread = ThreadFromGlobalPool{&LoadingDispatcher::doLoading, this, name, loading_id, true};
loading_ids.try_emplace(loading_id, std::move(thread));
auto thread = ThreadFromGlobalPool{&LoadingDispatcher::doLoading, this, info.name, loading_id, forced_to_reload, min_id_to_finish_loading_dependencies_, true};
loading_threads.try_emplace(loading_id, std::move(thread));
}
else
{
/// Perform the loading immediately.
doLoading(name, loading_id, false);
doLoading(info.name, loading_id, forced_to_reload, min_id_to_finish_loading_dependencies_, false);
}
}
/// Load one object, returns object ptr or exception
/// Do not require locking
std::pair<LoadablePtr, std::exception_ptr> loadOneObject(
const String & name,
const ObjectConfig & config,
LoadablePtr previous_version)
void cancelLoading(Info & info)
{
if (!info.is_loading())
return;
/// In fact we cannot actually CANCEL the loading (because it's possibly already being performed in another thread).
/// But we can reset the `loading_id` and doLoading() will understand it as a signal to stop loading.
info.loading_id = info.state_id;
info.loading_end_time = std::chrono::system_clock::now();
}
/// Does the loading, possibly in the separate thread.
void doLoading(const String & name, size_t loading_id, bool forced_to_reload, size_t min_id_to_finish_loading_dependencies_, bool async)
{
try
{
/// Prepare for loading.
std::optional<Info> info;
{
LoadingGuardForAsyncLoad lock(async, mutex);
info = prepareToLoadSingleObject(name, loading_id, min_id_to_finish_loading_dependencies_, lock);
if (!info)
return;
}
/// Previous version can be used as the base for new loading, enabling loading only part of data.
auto previous_version_as_base_for_loading = info->object;
if (forced_to_reload)
previous_version_as_base_for_loading = nullptr; /// Need complete reloading, cannot use the previous version.
/// Loading.
auto [new_object, new_exception] = loadSingleObject(name, info->object_config, previous_version_as_base_for_loading);
if (!new_object && !new_exception)
throw Exception("No object created and no exception raised for " + type_name, ErrorCodes::LOGICAL_ERROR);
/// Saving the result of the loading.
{
LoadingGuardForAsyncLoad lock(async, mutex);
saveResultOfLoadingSingleObject(name, loading_id, info->object, new_object, new_exception, info->error_count, lock);
finishLoadingSingleObject(name, loading_id, lock);
}
event.notify_all();
}
catch (...)
{
LoadingGuardForAsyncLoad lock(async, mutex);
finishLoadingSingleObject(name, loading_id, lock);
throw;
}
}
/// Returns single object info, checks loading_id and name.
std::optional<Info> prepareToLoadSingleObject(
const String & name, size_t loading_id, size_t min_id_to_finish_loading_dependencies_, const LoadingGuardForAsyncLoad &)
{
Info * info = getInfo(name);
/// We check here if this is exactly the same loading as we planned to perform.
/// This check is necessary because the object could be removed or load with another config before this thread even starts.
if (!info || !info->is_loading() || (info->loading_id != loading_id))
return {};
min_id_to_finish_loading_dependencies[std::this_thread::get_id()] = min_id_to_finish_loading_dependencies_;
return *info;
}
/// Load one object, returns object ptr or exception.
std::pair<LoadablePtr, std::exception_ptr>
loadSingleObject(const String & name, const ObjectConfig & config, LoadablePtr previous_version)
{
/// Use `create_function` to perform the actual loading.
/// It's much better to do it with `mutex` unlocked because the loading can take a lot of time
/// and require access to other objects.
LoadablePtr new_object;
std::exception_ptr new_exception;
try
@ -856,44 +946,18 @@ private:
new_exception = std::current_exception();
}
return std::make_pair(new_object, new_exception);
}
}
/// Return single object info, checks loading_id and name
std::optional<Info> getSingleObjectInfo(const String & name, size_t loading_id, bool async)
{
LoadingGuardForAsyncLoad lock(async, mutex);
Info * info = getInfo(name);
if (!info || !info->loading() || (info->loading_id != loading_id))
return {};
return *info;
}
/// Removes object loading_id from loading_ids if it present
/// in other case do nothin should by done with lock
void finishObjectLoading(size_t loading_id, const LoadingGuardForAsyncLoad &)
{
auto it = loading_ids.find(loading_id);
if (it != loading_ids.end())
{
it->second.detach();
loading_ids.erase(it);
}
}
/// Process loading result
/// Calculates next update time and process errors
void processLoadResult(
/// Saves the result of the loading, calculates the time of the next update, and handles errors.
void saveResultOfLoadingSingleObject(
const String & name,
size_t loading_id,
LoadablePtr previous_version,
LoadablePtr new_object,
std::exception_ptr new_exception,
size_t error_count,
bool async)
const LoadingGuardForAsyncLoad &)
{
LoadingGuardForAsyncLoad lock(async, mutex);
/// Calculate a new update time.
TimePoint next_update_time;
try
@ -918,9 +982,9 @@ private:
Info * info = getInfo(name);
/// And again we should check if this is still the same loading as we were doing.
/// We should check if this is still the same loading as we were doing.
/// This is necessary because the object could be removed or load with another config while the `mutex` was unlocked.
if (!info || !info->loading() || (info->loading_id != loading_id))
if (!info || !info->is_loading() || (info->loading_id != loading_id))
return;
if (new_exception)
@ -944,66 +1008,25 @@ private:
info->exception = new_exception;
info->error_count = error_count;
info->loading_end_time = std::chrono::system_clock::now();
info->loading_id = 0;
info->state_id = info->loading_id;
info->next_update_time = next_update_time;
info->forced_to_reload = false;
if (new_object)
info->config_changed = false;
finishObjectLoading(loading_id, lock);
}
/// Does the loading, possibly in the separate thread.
void doLoading(const String & name, size_t loading_id, bool async)
{
try
{
/// We check here if this is exactly the same loading as we planned to perform.
/// This check is necessary because the object could be removed or load with another config before this thread even starts.
std::optional<Info> info = getSingleObjectInfo(name, loading_id, async);
if (!info)
return;
/// Use `create_function` to perform the actual loading.
/// It's much better to do it with `mutex` unlocked because the loading can take a lot of time
/// and require access to other objects.
auto previous_version_to_use = info->object;
bool need_complete_reloading = !info->object || info->config_changed || info->forced_to_reload;
if (need_complete_reloading)
previous_version_to_use = nullptr; /// Need complete reloading, cannot use the previous version.
auto [new_object, new_exception] = loadOneObject(name, info->object_config, previous_version_to_use);
if (!new_object && !new_exception)
throw Exception("No object created and no exception raised for " + type_name, ErrorCodes::LOGICAL_ERROR);
processLoadResult(name, loading_id, info->object, new_object, new_exception, info->error_count, async);
event.notify_all();
}
catch (...)
{
LoadingGuardForAsyncLoad lock(async, mutex);
finishObjectLoading(loading_id, lock);
throw;
}
}
void cancelLoading(const String & name)
/// Removes the references to the loading thread from the maps.
void finishLoadingSingleObject(const String & name, size_t loading_id, const LoadingGuardForAsyncLoad &)
{
Info * info = getInfo(name);
if (info)
cancelLoading(*info);
}
if (info && (info->loading_id == loading_id))
info->loading_id = info->state_id;
void cancelLoading(Info & info)
{
if (!info.loading())
return;
min_id_to_finish_loading_dependencies.erase(std::this_thread::get_id());
/// In fact we cannot actually CANCEL the loading (because it's possibly already being performed in another thread).
/// But we can reset the `loading_id` and doLoading() will understand it as a signal to stop loading.
info.loading_id = 0;
info.loading_end_time = std::chrono::system_clock::now();
auto it = loading_threads.find(loading_id);
if (it != loading_threads.end())
{
it->second.detach();
loading_threads.erase(it);
}
}
/// Calculate next update time for loaded_object. Can be called without mutex locking,
@ -1042,8 +1065,9 @@ private:
std::unordered_map<String, Info> infos;
bool always_load_everything = false;
std::atomic<bool> enable_async_loading = false;
std::unordered_map<size_t, ThreadFromGlobalPool> loading_ids;
size_t next_loading_id = 1; /// should always be > 0
std::unordered_map<size_t, ThreadFromGlobalPool> loading_threads;
std::unordered_map<std::thread::id, size_t> min_id_to_finish_loading_dependencies;
size_t next_id_counter = 1; /// should always be > 0
mutable pcg64 rnd_engine{randomSeed()};
};
@ -1263,7 +1287,7 @@ void ExternalLoader::checkLoaded(const ExternalLoader::LoadResult & result,
if (result.status == ExternalLoader::Status::LOADING)
throw Exception(type_name + " '" + result.name + "' is still loading", ErrorCodes::BAD_ARGUMENTS);
if (result.exception)
std::rethrow_exceptiozn(result.exception);
std::rethrow_exception(result.exception);
if (result.status == ExternalLoader::Status::NOT_EXIST)
throw Exception(type_name + " '" + result.name + "' not found", ErrorCodes::BAD_ARGUMENTS);
if (result.status == ExternalLoader::Status::NOT_LOADED)