[OpenMP][Plugin] Introduce generic resource pool

Currently CUDA streams are managed by `StreamManagerTy`. It works very well. Now
we have the need that some resources, such as CUDA stream and event, will be
hold by `libomptarget`. It is always good to buffer those resources. What's more
important, given the way that `libomptarget` and plugins are connected, we cannot
make sure whether plugins are still alive when `libomptarget` is destroyed. That
leads to an issue that those resouces hold by `libomptarget` might not be
released correctly. As a result, we need an unified management of all the resources
that can be shared between `libomptarget` and plugins.

`ResourcePoolTy` is designed to manage the type of resource for one device.
It has to work with an allocator which is supposed to provide `create` and
`destroy`. In this way, when the plugin is destroyed, we can make sure that
all resources allocated from native runtime library will be released correctly,
no matter whether `libomptarget` starts its destroy.

Reviewed By: ye-luo

Differential Revision: https://reviews.llvm.org/D111954
This commit is contained in:
Shilei Tian 2021-12-27 11:31:59 -05:00
parent 7171af7445
commit a697a0a4b6
1 changed files with 133 additions and 131 deletions

View File

@ -187,137 +187,125 @@ struct DeviceDataTy {
int NumThreads = 0;
};
class StreamManagerTy {
int NumberOfDevices;
// The initial size of stream pool
int EnvNumInitialStreams;
// Per-device stream mutex
std::vector<std::unique_ptr<std::mutex>> StreamMtx;
// Per-device stream Id indicates the next available stream in the pool
std::vector<int> NextStreamId;
// Per-device stream pool
std::vector<std::vector<CUstream>> StreamPool;
// Reference to per-device data
std::vector<DeviceDataTy> &DeviceData;
/// Resource allocator where \p T is the resource type.
/// Functions \p create and \p destroy return OFFLOAD_SUCCESS and OFFLOAD_FAIL
/// accordingly. The implementation should not raise any exception.
template <typename T> class AllocatorTy {
public:
/// Create a resource and assign to R.
int create(T &R) noexcept;
/// Destroy the resource.
int destroy(T) noexcept;
};
// If there is no CUstream left in the pool, we will resize the pool to
// allocate more CUstream. This function should be called with device mutex,
// and we do not resize to smaller one.
void resizeStreamPool(const int DeviceId, const size_t NewSize) {
std::vector<CUstream> &Pool = StreamPool[DeviceId];
const size_t CurrentSize = Pool.size();
assert(NewSize > CurrentSize && "new size is not larger than current size");
/// Allocator for CUstream.
template <> class AllocatorTy<CUstream> {
CUcontext Context;
CUresult Err = cuCtxSetCurrent(DeviceData[DeviceId].Context);
if (!checkResult(Err, "Error returned from cuCtxSetCurrent\n")) {
// We will return if cannot switch to the right context in case of
// creating bunch of streams that are not corresponding to the right
// device. The offloading will fail later because selected CUstream is
// nullptr.
return;
}
Pool.resize(NewSize, nullptr);
for (size_t I = CurrentSize; I < NewSize; ++I) {
checkResult(cuStreamCreate(&Pool[I], CU_STREAM_NON_BLOCKING),
"Error returned from cuStreamCreate\n");
public:
AllocatorTy(CUcontext C) noexcept : Context(C) {}
/// See AllocatorTy<T>::create.
int create(CUstream &Stream) noexcept {
if (!checkResult(cuCtxSetCurrent(Context),
"Error returned from cuCtxSetCurrent\n"))
return OFFLOAD_FAIL;
if (!checkResult(cuStreamCreate(&Stream, CU_STREAM_NON_BLOCKING),
"Error returned from cuStreamCreate\n"))
return OFFLOAD_FAIL;
return OFFLOAD_SUCCESS;
}
/// See AllocatorTy<T>::destroy.
int destroy(CUstream Stream) noexcept {
if (!checkResult(cuCtxSetCurrent(Context),
"Error returned from cuCtxSetCurrent\n"))
return OFFLOAD_FAIL;
if (!checkResult(cuStreamDestroy(Stream),
"Error returned from cuStreamDestroy\n"))
return OFFLOAD_FAIL;
return OFFLOAD_SUCCESS;
}
};
/// A generic pool of resources where \p T is the resource type.
/// \p T should be copyable as the object is stored in \p std::vector .
template <typename T> class ResourcePoolTy {
/// Index of the next available resource.
size_t Next = 0;
/// Mutex to guard the pool.
std::mutex Mutex;
/// Pool of resources.
std::vector<T> Resources;
/// A reference to the corresponding allocator.
AllocatorTy<T> Allocator;
/// If `Resources` is used up, we will fill in more resources. It assumes that
/// the new size `Size` should be always larger than the current size.
bool resize(size_t Size) {
auto CurSize = Resources.size();
assert(Size > CurSize && "Unexpected smaller size");
Resources.reserve(Size);
for (auto I = CurSize; I < Size; ++I) {
T NewItem;
int Ret = Allocator.create(NewItem);
if (Ret != OFFLOAD_SUCCESS)
return false;
Resources.push_back(NewItem);
}
return true;
}
public:
StreamManagerTy(const int NumberOfDevices,
std::vector<DeviceDataTy> &DeviceData)
: NumberOfDevices(NumberOfDevices), EnvNumInitialStreams(32),
DeviceData(DeviceData) {
StreamPool.resize(NumberOfDevices);
NextStreamId.resize(NumberOfDevices);
StreamMtx.resize(NumberOfDevices);
if (const char *EnvStr = getenv("LIBOMPTARGET_NUM_INITIAL_STREAMS"))
EnvNumInitialStreams = std::stoi(EnvStr);
// Initialize the next stream id
std::fill(NextStreamId.begin(), NextStreamId.end(), 0);
// Initialize stream mutex
for (std::unique_ptr<std::mutex> &Ptr : StreamMtx)
Ptr = std::make_unique<std::mutex>();
ResourcePoolTy(AllocatorTy<T> &&A, size_t Size = 0) noexcept
: Allocator(std::move(A)) {
(void)resize(Size);
}
~StreamManagerTy() {
// Destroy streams
for (int I = 0; I < NumberOfDevices; ++I) {
checkResult(cuCtxSetCurrent(DeviceData[I].Context),
"Error returned from cuCtxSetCurrent\n");
for (CUstream &S : StreamPool[I]) {
if (S)
checkResult(cuStreamDestroy(S),
"Error returned from cuStreamDestroy\n");
}
}
~ResourcePoolTy() noexcept {
for (auto &R : Resources)
(void)Allocator.destroy(R);
}
// Get a CUstream from pool. Per-device next stream id always points to the
// next available CUstream. That means, CUstreams [0, id-1] have been
// assigned, and [id,] are still available. If there is no CUstream left, we
// will ask more CUstreams from CUDA RT. Each time a CUstream is assigned,
// the id will increase one.
// xxxxxs+++++++++
// ^
// id
// After assignment, the pool becomes the following and s is assigned.
// xxxxxs+++++++++
// ^
// id
CUstream getStream(const int DeviceId) {
const std::lock_guard<std::mutex> Lock(*StreamMtx[DeviceId]);
int &Id = NextStreamId[DeviceId];
// No CUstream left in the pool, we need to request from CUDA RT
if (Id == static_cast<int>(StreamPool[DeviceId].size())) {
// By default we double the stream pool every time
resizeStreamPool(DeviceId, Id * 2);
}
return StreamPool[DeviceId][Id++];
/// Get a resource from pool. `Next` always points to the next available
/// resource. That means, `[0, next-1]` have been assigned, and `[id,]` are
/// still available. If there is no resource left, we will ask for more. Each
/// time a resource is assigned, the id will increase one.
/// xxxxxs+++++++++
/// ^
/// Next
/// After assignment, the pool becomes the following and s is assigned.
/// xxxxxs+++++++++
/// ^
/// Next
int acquire(T &R) noexcept {
std::lock_guard<std::mutex> LG(Mutex);
if (Next == Resources.size() && !resize(Resources.size() * 2))
return OFFLOAD_FAIL;
R = Resources[Next++];
return OFFLOAD_SUCCESS;
}
// Return a CUstream back to pool. As mentioned above, per-device next
// stream is always points to the next available CUstream, so when we return
// a CUstream, we need to first decrease the id, and then copy the CUstream
// back.
// It is worth noting that, the order of streams return might be different
// from that they're assigned, that saying, at some point, there might be
// two identical CUstreams.
// xxax+a+++++
// ^
// id
// However, it doesn't matter, because they're always on the two sides of
// id. The left one will in the end be overwritten by another CUstream.
// Therefore, after several execution, the order of pool might be different
// from its initial state.
void returnStream(const int DeviceId, CUstream Stream) {
const std::lock_guard<std::mutex> Lock(*StreamMtx[DeviceId]);
int &Id = NextStreamId[DeviceId];
assert(Id > 0 && "Wrong stream ID");
StreamPool[DeviceId][--Id] = Stream;
}
bool initializeDeviceStreamPool(const int DeviceId) {
assert(StreamPool[DeviceId].empty() && "stream pool has been initialized");
resizeStreamPool(DeviceId, EnvNumInitialStreams);
// Check the size of stream pool
if (static_cast<int>(StreamPool[DeviceId].size()) != EnvNumInitialStreams)
return false;
// Check whether each stream is valid
for (CUstream &S : StreamPool[DeviceId])
if (!S)
return false;
return true;
/// Return the resource back to the pool. When we return a resource, we need
/// to first decrease `Next`, and then copy the resource back. It is worth
/// noting that, the order of resources return might be different from that
/// they're assigned, that saying, at some point, there might be two identical
/// resources.
/// xxax+a+++++
/// ^
/// Next
/// However, it doesn't matter, because they're always on the two sides of
/// `Next`. The left one will in the end be overwritten by another resource.
/// Therefore, after several execution, the order of pool might be different
/// from its initial state.
void release(T R) noexcept {
std::lock_guard<std::mutex> LG(Mutex);
Resources[--Next] = R;
}
};
@ -331,13 +319,18 @@ class DeviceRTLTy {
int64_t RequiresFlags;
// Amount of dynamic shared memory to use at launch.
uint64_t DynamicMemorySize;
// Number of initial streams for each device.
int NumInitialStreams = 32;
static constexpr const int HardTeamLimit = 1U << 16U; // 64k
static constexpr const int HardThreadLimit = 1024;
static constexpr const int DefaultNumTeams = 128;
static constexpr const int DefaultNumThreads = 128;
std::unique_ptr<StreamManagerTy> StreamManager;
using StreamPoolTy = ResourcePoolTy<CUstream>;
using StreamAllocatorTy = AllocatorTy<CUstream>;
std::vector<std::unique_ptr<StreamPoolTy>> StreamPool;
std::vector<DeviceDataTy> DeviceData;
std::vector<CUmodule> Modules;
@ -471,8 +464,13 @@ class DeviceRTLTy {
CUstream getStream(const int DeviceId, __tgt_async_info *AsyncInfo) const {
assert(AsyncInfo && "AsyncInfo is nullptr");
if (!AsyncInfo->Queue)
AsyncInfo->Queue = StreamManager->getStream(DeviceId);
if (!AsyncInfo->Queue) {
CUstream S;
if (StreamPool[DeviceId]->acquire(S) != OFFLOAD_SUCCESS)
return nullptr;
AsyncInfo->Queue = S;
}
return reinterpret_cast<CUstream>(AsyncInfo->Queue);
}
@ -509,6 +507,7 @@ public:
}
DeviceData.resize(NumberOfDevices);
StreamPool.resize(NumberOfDevices);
// Get environment variables regarding teams
if (const char *EnvStr = getenv("OMP_TEAM_LIMIT")) {
@ -532,9 +531,11 @@ public:
DP("Parsed LIBOMPTARGET_SHARED_MEMORY_SIZE = %" PRIu64 "\n",
DynamicMemorySize);
}
StreamManager =
std::make_unique<StreamManagerTy>(NumberOfDevices, DeviceData);
if (const char *EnvStr = getenv("LIBOMPTARGET_NUM_INITIAL_STREAMS")) {
// LIBOMPTARGET_NUM_INITIAL_STREAMS has been set
NumInitialStreams = std::stoi(EnvStr);
DP("Parsed LIBOMPTARGET_NUM_INITIAL_STREAMS=%d\n", NumInitialStreams);
}
for (int I = 0; I < NumberOfDevices; ++I)
DeviceAllocators.emplace_back(I, DeviceData);
@ -556,13 +557,14 @@ public:
for (auto &M : MemoryManagers)
M.release();
StreamManager = nullptr;
for (CUmodule &M : Modules)
// Close module
if (M)
checkResult(cuModuleUnload(M), "Error returned from cuModuleUnload\n");
for (auto &S : StreamPool)
S = nullptr;
for (DeviceDataTy &D : DeviceData) {
// Destroy context
if (D.Context) {
@ -627,8 +629,9 @@ public:
return OFFLOAD_FAIL;
// Initialize stream pool
if (!StreamManager->initializeDeviceStreamPool(DeviceId))
return OFFLOAD_FAIL;
if (!StreamPool[DeviceId])
StreamPool[DeviceId] = std::make_unique<StreamPoolTy>(
StreamAllocatorTy(DeviceData[DeviceId].Context), NumInitialStreams);
// Query attributes to determine number of threads/block and blocks/grid.
int MaxGridDimX;
@ -1195,8 +1198,7 @@ public:
// Once the stream is synchronized, return it to stream pool and reset
// AsyncInfo. This is to make sure the synchronization only works for its
// own tasks.
StreamManager->returnStream(DeviceId,
reinterpret_cast<CUstream>(AsyncInfo->Queue));
StreamPool[DeviceId]->release(reinterpret_cast<CUstream>(AsyncInfo->Queue));
AsyncInfo->Queue = nullptr;
if (Err != CUDA_SUCCESS) {