[OpenMP] libomp: fix dynamic loop dispatcher

Restructured dynamic loop dispatcher code.
Fixed use of dispatch buffers for nonmonotonic dynamic (static_steal) schedule:
- eliminated possibility of stealing iterations of the wrong loop when victim
  thread changed its buffer to work on another loop;
- fixed race when victim thread changed its buffer to work in nested parallel;
- eliminated "static" property of the schedule, that is now a single thread can
  execute whole loop.

Differential Revision: https://reviews.llvm.org/D103648
This commit is contained in:
AndreyChurbanov 2021-06-22 16:29:01 +03:00
parent 82c1fb5750
commit 5dd4d0d46f
9 changed files with 333 additions and 215 deletions

View File

@ -1675,14 +1675,12 @@ typedef struct KMP_ALIGN_CACHE dispatch_private_info32 {
kmp_int32 lb;
kmp_int32 st;
kmp_int32 tc;
kmp_int32 static_steal_counter; /* for static_steal only; maybe better to put
after ub */
kmp_lock_t *th_steal_lock; // lock used for chunk stealing
// KMP_ALIGN( 16 ) ensures ( if the KMP_ALIGN macro is turned on )
kmp_lock_t *steal_lock; // lock used for chunk stealing
// KMP_ALIGN(32) ensures (if the KMP_ALIGN macro is turned on)
// a) parm3 is properly aligned and
// b) all parm1-4 are in the same cache line.
// b) all parm1-4 are on the same cache line.
// Because of parm1-4 are used together, performance seems to be better
// if they are in the same line (not measured though).
// if they are on the same cache line (not measured though).
struct KMP_ALIGN(32) { // AC: changed 16 to 32 in order to simplify template
kmp_int32 parm1; // structures in kmp_dispatch.cpp. This should
@ -1694,9 +1692,6 @@ typedef struct KMP_ALIGN_CACHE dispatch_private_info32 {
kmp_uint32 ordered_lower;
kmp_uint32 ordered_upper;
#if KMP_OS_WINDOWS
// This var can be placed in the hole between 'tc' and 'parm1', instead of
// 'static_steal_counter'. It would be nice to measure execution times.
// Conditional if/endif can be removed at all.
kmp_int32 last_upper;
#endif /* KMP_OS_WINDOWS */
} dispatch_private_info32_t;
@ -1708,9 +1703,7 @@ typedef struct KMP_ALIGN_CACHE dispatch_private_info64 {
kmp_int64 lb; /* lower-bound */
kmp_int64 st; /* stride */
kmp_int64 tc; /* trip count (number of iterations) */
kmp_int64 static_steal_counter; /* for static_steal only; maybe better to put
after ub */
kmp_lock_t *th_steal_lock; // lock used for chunk stealing
kmp_lock_t *steal_lock; // lock used for chunk stealing
/* parm[1-4] are used in different ways by different scheduling algorithms */
// KMP_ALIGN( 32 ) ensures ( if the KMP_ALIGN macro is turned on )
@ -1729,9 +1722,6 @@ typedef struct KMP_ALIGN_CACHE dispatch_private_info64 {
kmp_uint64 ordered_lower;
kmp_uint64 ordered_upper;
#if KMP_OS_WINDOWS
// This var can be placed in the hole between 'tc' and 'parm1', instead of
// 'static_steal_counter'. It would be nice to measure execution times.
// Conditional if/endif can be removed at all.
kmp_int64 last_upper;
#endif /* KMP_OS_WINDOWS */
} dispatch_private_info64_t;
@ -1785,9 +1775,8 @@ typedef struct KMP_ALIGN_CACHE dispatch_private_info {
} u;
enum sched_type schedule; /* scheduling algorithm */
kmp_sched_flags_t flags; /* flags (e.g., ordered, nomerge, etc.) */
std::atomic<kmp_uint32> steal_flag; // static_steal only, state of a buffer
kmp_int32 ordered_bumped;
// To retain the structure size after making ordered_iteration scalar
kmp_int32 ordered_dummy[KMP_MAX_ORDERED - 3];
// Stack of buffers for nest of serial regions
struct dispatch_private_info *next;
kmp_int32 type_size; /* the size of types in private_info */
@ -1802,7 +1791,7 @@ typedef struct dispatch_shared_info32 {
/* chunk index under dynamic, number of idle threads under static-steal;
iteration index otherwise */
volatile kmp_uint32 iteration;
volatile kmp_uint32 num_done;
volatile kmp_int32 num_done;
volatile kmp_uint32 ordered_iteration;
// Dummy to retain the structure size after making ordered_iteration scalar
kmp_int32 ordered_dummy[KMP_MAX_ORDERED - 1];
@ -1812,7 +1801,7 @@ typedef struct dispatch_shared_info64 {
/* chunk index under dynamic, number of idle threads under static-steal;
iteration index otherwise */
volatile kmp_uint64 iteration;
volatile kmp_uint64 num_done;
volatile kmp_int64 num_done;
volatile kmp_uint64 ordered_iteration;
// Dummy to retain the structure size after making ordered_iteration scalar
kmp_int64 ordered_dummy[KMP_MAX_ORDERED - 3];
@ -1848,7 +1837,7 @@ typedef struct kmp_disp {
dispatch_private_info_t *th_dispatch_pr_current;
dispatch_private_info_t *th_disp_buffer;
kmp_int32 th_disp_index;
kmp_uint32 th_disp_index;
kmp_int32 th_doacross_buf_idx; // thread's doacross buffer index
volatile kmp_uint32 *th_doacross_flags; // pointer to shared array of flags
kmp_int64 *th_doacross_info; // info on loop bounds

View File

@ -90,6 +90,22 @@ static inline int __kmp_get_monotonicity(ident_t *loc, enum sched_type schedule,
return monotonicity;
}
#if KMP_STATIC_STEAL_ENABLED
enum { // values for steal_flag (possible states of private per-loop buffer)
UNUSED = 0,
CLAIMED = 1, // owner thread started initialization
READY = 2, // available for stealing
THIEF = 3 // finished by owner, or claimed by thief
// possible state changes:
// 0 -> 1 owner only, sync
// 0 -> 3 thief only, sync
// 1 -> 2 owner only, async
// 2 -> 3 owner only, async
// 3 -> 2 owner only, async
// 3 -> 0 last thread finishing the loop, async
};
#endif
// Initialize a dispatch_private_info_template<T> buffer for a particular
// type of schedule,chunk. The loop description is found in lb (lower bound),
// ub (upper bound), and st (stride). nproc is the number of threads relevant
@ -187,6 +203,8 @@ void __kmp_dispatch_init_algorithm(ident_t *loc, int gtid,
schedule = team->t.t_sched.r_sched_type;
monotonicity = __kmp_get_monotonicity(loc, schedule, use_hier);
schedule = SCHEDULE_WITHOUT_MODIFIERS(schedule);
if (pr->flags.ordered) // correct monotonicity for ordered loop if needed
monotonicity = SCHEDULE_MONOTONIC;
// Detail the schedule if needed (global controls are differentiated
// appropriately)
if (schedule == kmp_sch_guided_chunked) {
@ -346,7 +364,7 @@ void __kmp_dispatch_init_algorithm(ident_t *loc, int gtid,
}
switch (schedule) {
#if (KMP_STATIC_STEAL_ENABLED)
#if KMP_STATIC_STEAL_ENABLED
case kmp_sch_static_steal: {
T ntc, init;
@ -359,32 +377,37 @@ void __kmp_dispatch_init_algorithm(ident_t *loc, int gtid,
KMP_COUNT_BLOCK(OMP_LOOP_STATIC_STEAL);
T id = tid;
T small_chunk, extras;
kmp_uint32 old = UNUSED;
int claimed = pr->steal_flag.compare_exchange_strong(old, CLAIMED);
if (traits_t<T>::type_size > 4) {
// AC: TODO: check if 16-byte CAS available and use it to
// improve performance (probably wait for explicit request
// before spending time on this).
// For now use dynamically allocated per-private-buffer lock,
// free memory in __kmp_dispatch_next when status==0.
pr->u.p.steal_lock = (kmp_lock_t *)__kmp_allocate(sizeof(kmp_lock_t));
__kmp_init_lock(pr->u.p.steal_lock);
}
small_chunk = ntc / nproc;
extras = ntc % nproc;
init = id * small_chunk + (id < extras ? id : extras);
pr->u.p.count = init;
pr->u.p.ub = init + small_chunk + (id < extras ? 1 : 0);
pr->u.p.parm2 = lb;
// parm3 is the number of times to attempt stealing which is
// proportional to the number of chunks per thread up until
// the maximum value of nproc.
pr->u.p.parm3 = KMP_MIN(small_chunk + extras, nproc);
pr->u.p.parm4 = (id + 1) % nproc; // remember neighbour tid
pr->u.p.st = st;
if (traits_t<T>::type_size > 4) {
// AC: TODO: check if 16-byte CAS available and use it to
// improve performance (probably wait for explicit request
// before spending time on this).
// For now use dynamically allocated per-thread lock,
// free memory in __kmp_dispatch_next when status==0.
KMP_DEBUG_ASSERT(pr->u.p.th_steal_lock == NULL);
pr->u.p.th_steal_lock =
(kmp_lock_t *)__kmp_allocate(sizeof(kmp_lock_t));
__kmp_init_lock(pr->u.p.th_steal_lock);
if (claimed) { // are we succeeded in claiming own buffer?
pr->u.p.ub = init + small_chunk + (id < extras ? 1 : 0);
// Other threads will inspect steal_flag when searching for a victim.
// READY means other threads may steal from this thread from now on.
KMP_ATOMIC_ST_REL(&pr->steal_flag, READY);
} else {
// other thread has stolen whole our range
KMP_DEBUG_ASSERT(pr->steal_flag == THIEF);
pr->u.p.ub = init; // mark there is no iterations to work on
}
pr->u.p.parm2 = ntc; // save number of chunks
// parm3 is the number of times to attempt stealing which is
// nproc (just a heuristics, could be optimized later on).
pr->u.p.parm3 = nproc;
pr->u.p.parm4 = (id + 1) % nproc; // remember neighbour tid
break;
} else {
/* too few chunks: switching to kmp_sch_dynamic_chunked */
@ -881,6 +904,18 @@ __kmp_dispatch_init(ident_t *loc, int gtid, enum sched_type schedule, T lb,
&team->t.t_disp_buffer[my_buffer_index % __kmp_dispatch_num_buffers]);
KD_TRACE(10, ("__kmp_dispatch_init: T#%d my_buffer_index:%d\n", gtid,
my_buffer_index));
if (sh->buffer_index != my_buffer_index) { // too many loops in progress?
KD_TRACE(100, ("__kmp_dispatch_init: T#%d before wait: my_buffer_index:%d"
" sh->buffer_index:%d\n",
gtid, my_buffer_index, sh->buffer_index));
__kmp_wait<kmp_uint32>(&sh->buffer_index, my_buffer_index,
__kmp_eq<kmp_uint32> USE_ITT_BUILD_ARG(NULL));
// Note: KMP_WAIT() cannot be used there: buffer index and
// my_buffer_index are *always* 32-bit integers.
KD_TRACE(100, ("__kmp_dispatch_init: T#%d after wait: my_buffer_index:%d "
"sh->buffer_index:%d\n",
gtid, my_buffer_index, sh->buffer_index));
}
}
__kmp_dispatch_init_algorithm(loc, gtid, pr, schedule, lb, ub, st,
@ -897,24 +932,6 @@ __kmp_dispatch_init(ident_t *loc, int gtid, enum sched_type schedule, T lb,
th->th.th_dispatch->th_deo_fcn = __kmp_dispatch_deo<UT>;
th->th.th_dispatch->th_dxo_fcn = __kmp_dispatch_dxo<UT>;
}
}
if (active) {
/* The name of this buffer should be my_buffer_index when it's free to use
* it */
KD_TRACE(100, ("__kmp_dispatch_init: T#%d before wait: my_buffer_index:%d "
"sh->buffer_index:%d\n",
gtid, my_buffer_index, sh->buffer_index));
__kmp_wait<kmp_uint32>(&sh->buffer_index, my_buffer_index,
__kmp_eq<kmp_uint32> USE_ITT_BUILD_ARG(NULL));
// Note: KMP_WAIT() cannot be used there: buffer index and
// my_buffer_index are *always* 32-bit integers.
KMP_MB(); /* is this necessary? */
KD_TRACE(100, ("__kmp_dispatch_init: T#%d after wait: my_buffer_index:%d "
"sh->buffer_index:%d\n",
gtid, my_buffer_index, sh->buffer_index));
th->th.th_dispatch->th_dispatch_pr_current = (dispatch_private_info_t *)pr;
th->th.th_dispatch->th_dispatch_sh_current =
CCAST(dispatch_shared_info_t *, (volatile dispatch_shared_info_t *)sh);
@ -978,21 +995,6 @@ __kmp_dispatch_init(ident_t *loc, int gtid, enum sched_type schedule, T lb,
__kmp_str_free(&buff);
}
#endif
#if (KMP_STATIC_STEAL_ENABLED)
// It cannot be guaranteed that after execution of a loop with some other
// schedule kind all the parm3 variables will contain the same value. Even if
// all parm3 will be the same, it still exists a bad case like using 0 and 1
// rather than program life-time increment. So the dedicated variable is
// required. The 'static_steal_counter' is used.
if (pr->schedule == kmp_sch_static_steal) {
// Other threads will inspect this variable when searching for a victim.
// This is a flag showing that other threads may steal from this thread
// since then.
volatile T *p = &pr->u.p.static_steal_counter;
*p = *p + 1;
}
#endif // ( KMP_STATIC_STEAL_ENABLED )
#if OMPT_SUPPORT && OMPT_OPTIONAL
if (ompt_enabled.ompt_callback_work) {
ompt_team_info_t *team_info = __ompt_get_teaminfo(0, NULL);
@ -1082,7 +1084,6 @@ static void __kmp_dispatch_finish_chunk(int gtid, ident_t *loc) {
KD_TRACE(100, ("__kmp_dispatch_finish_chunk: T#%d called\n", gtid));
if (!th->th.th_team->t.t_serialized) {
// int cid;
dispatch_private_info_template<UT> *pr =
reinterpret_cast<dispatch_private_info_template<UT> *>(
th->th.th_dispatch->th_dispatch_pr_current);
@ -1094,7 +1095,6 @@ static void __kmp_dispatch_finish_chunk(int gtid, ident_t *loc) {
KMP_DEBUG_ASSERT(th->th.th_dispatch ==
&th->th.th_team->t.t_dispatch[th->th.th_info.ds.ds_tid]);
// for (cid = 0; cid < KMP_MAX_ORDERED; ++cid) {
UT lower = pr->u.p.ordered_lower;
UT upper = pr->u.p.ordered_upper;
UT inc = upper - lower + 1;
@ -1200,10 +1200,10 @@ int __kmp_dispatch_next_algorithm(int gtid,
}
switch (pr->schedule) {
#if (KMP_STATIC_STEAL_ENABLED)
#if KMP_STATIC_STEAL_ENABLED
case kmp_sch_static_steal: {
T chunk = pr->u.p.parm1;
UT nchunks = pr->u.p.parm2;
KD_TRACE(100,
("__kmp_dispatch_next_algorithm: T#%d kmp_sch_static_steal case\n",
gtid));
@ -1211,11 +1211,12 @@ int __kmp_dispatch_next_algorithm(int gtid,
trip = pr->u.p.tc - 1;
if (traits_t<T>::type_size > 4) {
// use lock for 8-byte and CAS for 4-byte induction
// variable. TODO (optional): check and use 16-byte CAS
kmp_lock_t *lck = pr->u.p.th_steal_lock;
// use lock for 8-byte induction variable.
// TODO (optional): check presence and use 16-byte CAS
kmp_lock_t *lck = pr->u.p.steal_lock;
KMP_DEBUG_ASSERT(lck != NULL);
if (pr->u.p.count < (UT)pr->u.p.ub) {
KMP_DEBUG_ASSERT(pr->steal_flag == READY);
__kmp_acquire_lock(lck, gtid);
// try to get own chunk of iterations
init = (pr->u.p.count)++;
@ -1225,76 +1226,122 @@ int __kmp_dispatch_next_algorithm(int gtid,
status = 0; // no own chunks
}
if (!status) { // try to steal
kmp_info_t **other_threads = team->t.t_threads;
kmp_lock_t *lckv; // victim buffer's lock
T while_limit = pr->u.p.parm3;
T while_index = 0;
T id = pr->u.p.static_steal_counter; // loop id
int idx = (th->th.th_dispatch->th_disp_index - 1) %
__kmp_dispatch_num_buffers; // current loop index
// note: victim thread can potentially execute another loop
// TODO: algorithm of searching for a victim
// should be cleaned up and measured
KMP_ATOMIC_ST_REL(&pr->steal_flag, THIEF); // mark self buffer inactive
while ((!status) && (while_limit != ++while_index)) {
dispatch_private_info_template<T> *victim;
dispatch_private_info_template<T> *v;
T remaining;
T victimIdx = pr->u.p.parm4;
T oldVictimIdx = victimIdx ? victimIdx - 1 : nproc - 1;
victim = reinterpret_cast<dispatch_private_info_template<T> *>(
&other_threads[victimIdx]->th.th_dispatch->th_disp_buffer[idx]);
KMP_DEBUG_ASSERT(victim);
while ((victim == pr || id != victim->u.p.static_steal_counter) &&
oldVictimIdx != victimIdx) {
victimIdx = (victimIdx + 1) % nproc;
victim = reinterpret_cast<dispatch_private_info_template<T> *>(
&other_threads[victimIdx]->th.th_dispatch->th_disp_buffer[idx]);
KMP_DEBUG_ASSERT(victim);
T victimId = pr->u.p.parm4;
T oldVictimId = victimId ? victimId - 1 : nproc - 1;
v = reinterpret_cast<dispatch_private_info_template<T> *>(
&team->t.t_dispatch[victimId].th_disp_buffer[idx]);
KMP_DEBUG_ASSERT(v);
while ((v == pr || KMP_ATOMIC_LD_RLX(&v->steal_flag) == THIEF) &&
oldVictimId != victimId) {
victimId = (victimId + 1) % nproc;
v = reinterpret_cast<dispatch_private_info_template<T> *>(
&team->t.t_dispatch[victimId].th_disp_buffer[idx]);
KMP_DEBUG_ASSERT(v);
}
if (victim == pr || id != victim->u.p.static_steal_counter) {
if (v == pr || KMP_ATOMIC_LD_RLX(&v->steal_flag) == THIEF) {
continue; // try once more (nproc attempts in total)
// no victim is ready yet to participate in stealing
// because no victim passed kmp_init_dispatch yet
}
if (victim->u.p.count + 2 > (UT)victim->u.p.ub) {
pr->u.p.parm4 = (victimIdx + 1) % nproc; // shift start tid
continue; // not enough chunks to steal, goto next victim
if (KMP_ATOMIC_LD_RLX(&v->steal_flag) == UNUSED) {
kmp_uint32 old = UNUSED;
// try to steal whole range from inactive victim
status = v->steal_flag.compare_exchange_strong(old, THIEF);
if (status) {
// initialize self buffer with victim's whole range of chunks
T id = victimId;
T small_chunk, extras;
small_chunk = nchunks / nproc; // chunks per thread
extras = nchunks % nproc;
init = id * small_chunk + (id < extras ? id : extras);
__kmp_acquire_lock(lck, gtid);
pr->u.p.count = init + 1; // exclude one we execute immediately
pr->u.p.ub = init + small_chunk + (id < extras ? 1 : 0);
__kmp_release_lock(lck, gtid);
pr->u.p.parm4 = (id + 1) % nproc; // remember neighbour tid
// no need to reinitialize other thread invariants: lb, st, etc.
#ifdef KMP_DEBUG
{
char *buff;
// create format specifiers before the debug output
buff = __kmp_str_format(
"__kmp_dispatch_next: T#%%d stolen chunks from T#%%d, "
"count:%%%s ub:%%%s\n",
traits_t<UT>::spec, traits_t<T>::spec);
KD_TRACE(10, (buff, gtid, id, pr->u.p.count, pr->u.p.ub));
__kmp_str_free(&buff);
}
#endif
// activate non-empty buffer and let others steal from us
if (pr->u.p.count < (UT)pr->u.p.ub)
KMP_ATOMIC_ST_REL(&pr->steal_flag, READY);
break;
}
}
if (KMP_ATOMIC_LD_RLX(&v->steal_flag) != READY ||
v->u.p.count >= (UT)v->u.p.ub) {
pr->u.p.parm4 = (victimId + 1) % nproc; // shift start victim tid
continue; // no chunks to steal, try next victim
}
lckv = v->u.p.steal_lock;
KMP_ASSERT(lckv != NULL);
__kmp_acquire_lock(lckv, gtid);
limit = v->u.p.ub; // keep initial ub
if (v->u.p.count >= limit) {
__kmp_release_lock(lckv, gtid);
pr->u.p.parm4 = (victimId + 1) % nproc; // shift start victim tid
continue; // no chunks to steal, try next victim
}
lck = victim->u.p.th_steal_lock;
KMP_ASSERT(lck != NULL);
__kmp_acquire_lock(lck, gtid);
limit = victim->u.p.ub; // keep initial ub
if (victim->u.p.count >= limit ||
(remaining = limit - victim->u.p.count) < 2) {
__kmp_release_lock(lck, gtid);
pr->u.p.parm4 = (victimIdx + 1) % nproc; // next victim
continue; // not enough chunks to steal
}
// stealing succeeded, reduce victim's ub by 1/4 of undone chunks or
// by 1
if (remaining > 3) {
// stealing succeded, reduce victim's ub by 1/4 of undone chunks
// TODO: is this heuristics good enough??
remaining = limit - v->u.p.count;
if (remaining > 7) {
// steal 1/4 of remaining
KMP_COUNT_DEVELOPER_VALUE(FOR_static_steal_stolen, remaining >> 2);
init = (victim->u.p.ub -= (remaining >> 2));
init = (v->u.p.ub -= (remaining >> 2));
} else {
// steal 1 chunk of 2 or 3 remaining
// steal 1 chunk of 1..7 remaining
KMP_COUNT_DEVELOPER_VALUE(FOR_static_steal_stolen, 1);
init = (victim->u.p.ub -= 1);
init = (v->u.p.ub -= 1);
}
__kmp_release_lock(lck, gtid);
__kmp_release_lock(lckv, gtid);
#ifdef KMP_DEBUG
{
char *buff;
// create format specifiers before the debug output
buff = __kmp_str_format(
"__kmp_dispatch_next: T#%%d stolen chunks from T#%%d, "
"count:%%%s ub:%%%s\n",
traits_t<UT>::spec, traits_t<UT>::spec);
KD_TRACE(10, (buff, gtid, victimId, init, limit));
__kmp_str_free(&buff);
}
#endif
KMP_DEBUG_ASSERT(init + 1 <= limit);
pr->u.p.parm4 = victimIdx; // remember victim to steal from
pr->u.p.parm4 = victimId; // remember victim to steal from
status = 1;
while_index = 0;
// now update own count and ub with stolen range but init chunk
__kmp_acquire_lock(pr->u.p.th_steal_lock, gtid);
// now update own count and ub with stolen range excluding init chunk
__kmp_acquire_lock(lck, gtid);
pr->u.p.count = init + 1;
pr->u.p.ub = limit;
__kmp_release_lock(pr->u.p.th_steal_lock, gtid);
__kmp_release_lock(lck, gtid);
// activate non-empty buffer and let others steal from us
if (init + 1 < limit)
KMP_ATOMIC_ST_REL(&pr->steal_flag, READY);
} // while (search for victim)
} // if (try to find victim and steal)
} else {
// 4-byte induction variable, use 8-byte CAS for pair (count, ub)
// as all operations on pair (count, ub) must be done atomically
typedef union {
struct {
UT count;
@ -1302,86 +1349,129 @@ int __kmp_dispatch_next_algorithm(int gtid,
} p;
kmp_int64 b;
} union_i4;
// All operations on 'count' or 'ub' must be combined atomically
// together.
{
union_i4 vold, vnew;
union_i4 vold, vnew;
if (pr->u.p.count < (UT)pr->u.p.ub) {
KMP_DEBUG_ASSERT(pr->steal_flag == READY);
vold.b = *(volatile kmp_int64 *)(&pr->u.p.count);
vnew = vold;
vnew.p.count++;
while (!KMP_COMPARE_AND_STORE_ACQ64(
vnew.b = vold.b;
vnew.p.count++; // get chunk from head of self range
while (!KMP_COMPARE_AND_STORE_REL64(
(volatile kmp_int64 *)&pr->u.p.count,
*VOLATILE_CAST(kmp_int64 *) & vold.b,
*VOLATILE_CAST(kmp_int64 *) & vnew.b)) {
KMP_CPU_PAUSE();
vold.b = *(volatile kmp_int64 *)(&pr->u.p.count);
vnew = vold;
vnew.b = vold.b;
vnew.p.count++;
}
vnew = vold;
init = vnew.p.count;
status = (init < (UT)vnew.p.ub);
init = vold.p.count;
status = (init < (UT)vold.p.ub);
} else {
status = 0; // no own chunks
}
if (!status) {
kmp_info_t **other_threads = team->t.t_threads;
if (!status) { // try to steal
T while_limit = pr->u.p.parm3;
T while_index = 0;
T id = pr->u.p.static_steal_counter; // loop id
int idx = (th->th.th_dispatch->th_disp_index - 1) %
__kmp_dispatch_num_buffers; // current loop index
// note: victim thread can potentially execute another loop
// TODO: algorithm of searching for a victim
// should be cleaned up and measured
KMP_ATOMIC_ST_REL(&pr->steal_flag, THIEF); // mark self buffer inactive
while ((!status) && (while_limit != ++while_index)) {
dispatch_private_info_template<T> *victim;
union_i4 vold, vnew;
dispatch_private_info_template<T> *v;
T remaining;
T victimIdx = pr->u.p.parm4;
T oldVictimIdx = victimIdx ? victimIdx - 1 : nproc - 1;
victim = reinterpret_cast<dispatch_private_info_template<T> *>(
&other_threads[victimIdx]->th.th_dispatch->th_disp_buffer[idx]);
KMP_DEBUG_ASSERT(victim);
while ((victim == pr || id != victim->u.p.static_steal_counter) &&
oldVictimIdx != victimIdx) {
victimIdx = (victimIdx + 1) % nproc;
victim = reinterpret_cast<dispatch_private_info_template<T> *>(
&other_threads[victimIdx]->th.th_dispatch->th_disp_buffer[idx]);
KMP_DEBUG_ASSERT(victim);
T victimId = pr->u.p.parm4;
T oldVictimId = victimId ? victimId - 1 : nproc - 1;
v = reinterpret_cast<dispatch_private_info_template<T> *>(
&team->t.t_dispatch[victimId].th_disp_buffer[idx]);
KMP_DEBUG_ASSERT(v);
while ((v == pr || KMP_ATOMIC_LD_RLX(&v->steal_flag) == THIEF) &&
oldVictimId != victimId) {
victimId = (victimId + 1) % nproc;
v = reinterpret_cast<dispatch_private_info_template<T> *>(
&team->t.t_dispatch[victimId].th_disp_buffer[idx]);
KMP_DEBUG_ASSERT(v);
}
if (victim == pr || id != victim->u.p.static_steal_counter) {
if (v == pr || KMP_ATOMIC_LD_RLX(&v->steal_flag) == THIEF) {
continue; // try once more (nproc attempts in total)
// no victim is ready yet to participate in stealing
// because no victim passed kmp_init_dispatch yet
}
pr->u.p.parm4 = victimIdx; // new victim found
while (1) { // CAS loop if victim has enough chunks to steal
vold.b = *(volatile kmp_int64 *)(&victim->u.p.count);
vnew = vold;
KMP_DEBUG_ASSERT((vnew.p.ub - 1) * (UT)chunk <= trip);
if (vnew.p.count >= (UT)vnew.p.ub ||
(remaining = vnew.p.ub - vnew.p.count) < 2) {
pr->u.p.parm4 = (victimIdx + 1) % nproc; // shift start victim id
break; // not enough chunks to steal, goto next victim
if (KMP_ATOMIC_LD_RLX(&v->steal_flag) == UNUSED) {
kmp_uint32 old = UNUSED;
// try to steal whole range from inactive victim
status = v->steal_flag.compare_exchange_strong(old, THIEF);
if (status) {
// initialize self buffer with victim's whole range of chunks
T id = victimId;
T small_chunk, extras;
small_chunk = nchunks / nproc; // chunks per thread
extras = nchunks % nproc;
init = id * small_chunk + (id < extras ? id : extras);
vnew.p.count = init + 1;
vnew.p.ub = init + small_chunk + (id < extras ? 1 : 0);
// write pair (count, ub) at once atomically
#if KMP_ARCH_X86
KMP_XCHG_FIXED64((volatile kmp_int64 *)(&pr->u.p.count), vnew.b);
#else
*(volatile kmp_int64 *)(&pr->u.p.count) = vnew.b;
#endif
pr->u.p.parm4 = (id + 1) % nproc; // remember neighbour tid
// no need to initialize other thread invariants: lb, st, etc.
#ifdef KMP_DEBUG
{
char *buff;
// create format specifiers before the debug output
buff = __kmp_str_format(
"__kmp_dispatch_next: T#%%d stolen chunks from T#%%d, "
"count:%%%s ub:%%%s\n",
traits_t<UT>::spec, traits_t<T>::spec);
KD_TRACE(10, (buff, gtid, id, pr->u.p.count, pr->u.p.ub));
__kmp_str_free(&buff);
}
#endif
// activate non-empty buffer and let others steal from us
if (pr->u.p.count < (UT)pr->u.p.ub)
KMP_ATOMIC_ST_REL(&pr->steal_flag, READY);
break;
}
if (remaining > 3) {
// try to steal 1/4 of remaining
vnew.p.ub -= remaining >> 2;
}
while (1) { // CAS loop with check if victim still has enough chunks
// many threads may be stealing concurrently from same victim
vold.b = *(volatile kmp_int64 *)(&v->u.p.count);
if (KMP_ATOMIC_LD_ACQ(&v->steal_flag) != READY ||
vold.p.count >= (UT)vold.p.ub) {
pr->u.p.parm4 = (victimId + 1) % nproc; // shift start victim id
break; // no chunks to steal, try next victim
}
vnew.b = vold.b;
remaining = vold.p.ub - vold.p.count;
// try to steal 1/4 of remaining
// TODO: is this heuristics good enough??
if (remaining > 7) {
vnew.p.ub -= remaining >> 2; // steal from tail of victim's range
} else {
vnew.p.ub -= 1; // steal 1 chunk of 2 or 3 remaining
vnew.p.ub -= 1; // steal 1 chunk of 1..7 remaining
}
KMP_DEBUG_ASSERT((vnew.p.ub - 1) * (UT)chunk <= trip);
// TODO: Should this be acquire or release?
if (KMP_COMPARE_AND_STORE_ACQ64(
(volatile kmp_int64 *)&victim->u.p.count,
if (KMP_COMPARE_AND_STORE_REL64(
(volatile kmp_int64 *)&v->u.p.count,
*VOLATILE_CAST(kmp_int64 *) & vold.b,
*VOLATILE_CAST(kmp_int64 *) & vnew.b)) {
// stealing succeeded
// stealing succedded
#ifdef KMP_DEBUG
{
char *buff;
// create format specifiers before the debug output
buff = __kmp_str_format(
"__kmp_dispatch_next: T#%%d stolen chunks from T#%%d, "
"count:%%%s ub:%%%s\n",
traits_t<T>::spec, traits_t<T>::spec);
KD_TRACE(10, (buff, gtid, victimId, vnew.p.ub, vold.p.ub));
__kmp_str_free(&buff);
}
#endif
KMP_COUNT_DEVELOPER_VALUE(FOR_static_steal_stolen,
vold.p.ub - vnew.p.ub);
status = 1;
while_index = 0;
pr->u.p.parm4 = victimId; // keep victim id
// now update own count and ub
init = vnew.p.ub;
vold.p.count = init + 1;
@ -1390,6 +1480,9 @@ int __kmp_dispatch_next_algorithm(int gtid,
#else
*(volatile kmp_int64 *)(&pr->u.p.count) = vold.b;
#endif
// activate non-empty buffer and let others steal from us
if (vold.p.count < (UT)vold.p.ub)
KMP_ATOMIC_ST_REL(&pr->steal_flag, READY);
break;
} // if (check CAS result)
KMP_CPU_PAUSE(); // CAS failed, repeatedly attempt
@ -1403,13 +1496,16 @@ int __kmp_dispatch_next_algorithm(int gtid,
if (p_st != NULL)
*p_st = 0;
} else {
start = pr->u.p.parm2;
start = pr->u.p.lb;
init *= chunk;
limit = chunk + init - 1;
incr = pr->u.p.st;
KMP_COUNT_DEVELOPER_VALUE(FOR_static_steal_chunks, 1);
KMP_DEBUG_ASSERT(init <= trip);
// keep track of done chunks for possible early exit from stealing
// TODO: count executed chunks locally with rare update of shared location
// test_then_inc<ST>((volatile ST *)&sh->u.s.iteration);
if ((last = (limit >= trip)) != 0)
limit = trip;
if (p_st != NULL)
@ -1422,15 +1518,10 @@ int __kmp_dispatch_next_algorithm(int gtid,
*p_lb = start + init * incr;
*p_ub = start + limit * incr;
}
if (pr->flags.ordered) {
pr->u.p.ordered_lower = init;
pr->u.p.ordered_upper = limit;
} // if
} // if
break;
} // case
#endif // ( KMP_STATIC_STEAL_ENABLED )
#endif // KMP_STATIC_STEAL_ENABLED
case kmp_sch_static_balanced: {
KD_TRACE(
10,
@ -2075,16 +2166,15 @@ static int __kmp_dispatch_next(ident_t *loc, int gtid, kmp_int32 *p_last,
th->th.th_info.ds.ds_tid);
// status == 0: no more iterations to execute
if (status == 0) {
UT num_done;
num_done = test_then_inc<ST>((volatile ST *)&sh->u.s.num_done);
ST num_done;
num_done = test_then_inc<ST>(&sh->u.s.num_done);
#ifdef KMP_DEBUG
{
char *buff;
// create format specifiers before the debug output
buff = __kmp_str_format(
"__kmp_dispatch_next: T#%%d increment num_done:%%%s\n",
traits_t<UT>::spec);
traits_t<ST>::spec);
KD_TRACE(10, (buff, gtid, sh->u.s.num_done));
__kmp_str_free(&buff);
}
@ -2093,28 +2183,31 @@ static int __kmp_dispatch_next(ident_t *loc, int gtid, kmp_int32 *p_last,
#if KMP_USE_HIER_SCHED
pr->flags.use_hier = FALSE;
#endif
if ((ST)num_done == th->th.th_team_nproc - 1) {
#if (KMP_STATIC_STEAL_ENABLED)
if (pr->schedule == kmp_sch_static_steal &&
traits_t<T>::type_size > 4) {
if (num_done == th->th.th_team_nproc - 1) {
#if KMP_STATIC_STEAL_ENABLED
if (pr->schedule == kmp_sch_static_steal) {
int i;
int idx = (th->th.th_dispatch->th_disp_index - 1) %
__kmp_dispatch_num_buffers; // current loop index
kmp_info_t **other_threads = team->t.t_threads;
// loop complete, safe to destroy locks used for stealing
for (i = 0; i < th->th.th_team_nproc; ++i) {
dispatch_private_info_template<T> *buf =
reinterpret_cast<dispatch_private_info_template<T> *>(
&other_threads[i]->th.th_dispatch->th_disp_buffer[idx]);
kmp_lock_t *lck = buf->u.p.th_steal_lock;
KMP_ASSERT(lck != NULL);
__kmp_destroy_lock(lck);
__kmp_free(lck);
buf->u.p.th_steal_lock = NULL;
&team->t.t_dispatch[i].th_disp_buffer[idx]);
KMP_ASSERT(buf->steal_flag == THIEF); // buffer must be inactive
KMP_ATOMIC_ST_RLX(&buf->steal_flag, UNUSED);
if (traits_t<T>::type_size > 4) {
// destroy locks used for stealing
kmp_lock_t *lck = buf->u.p.steal_lock;
KMP_ASSERT(lck != NULL);
__kmp_destroy_lock(lck);
__kmp_free(lck);
buf->u.p.steal_lock = NULL;
}
}
}
#endif
/* NOTE: release this buffer to be reused */
/* NOTE: release shared buffer to be reused */
KMP_MB(); /* Flush all pending memory write invalidates. */
@ -2126,8 +2219,6 @@ static int __kmp_dispatch_next(ident_t *loc, int gtid, kmp_int32 *p_last,
sh->u.s.ordered_iteration = 0;
}
KMP_MB(); /* Flush all pending memory write invalidates. */
sh->buffer_index += __kmp_dispatch_num_buffers;
KD_TRACE(100, ("__kmp_dispatch_next: T#%d change buffer_index:%d\n",
gtid, sh->buffer_index));

View File

@ -74,8 +74,7 @@ template <typename T> struct dispatch_private_infoXX_template {
T lb;
ST st; // signed
UT tc; // unsigned
T static_steal_counter; // for static_steal only; maybe better to put after ub
kmp_lock_t *th_steal_lock; // lock used for chunk stealing
kmp_lock_t *steal_lock; // lock used for chunk stealing
/* parm[1-4] are used in different ways by different scheduling algorithms */
// KMP_ALIGN( 32 ) ensures ( if the KMP_ALIGN macro is turned on )
@ -134,9 +133,8 @@ template <typename T> struct KMP_ALIGN_CACHE dispatch_private_info_template {
} u;
enum sched_type schedule; /* scheduling algorithm */
kmp_sched_flags_t flags; /* flags (e.g., ordered, nomerge, etc.) */
std::atomic<kmp_uint32> steal_flag; // static_steal only, state of a buffer
kmp_uint32 ordered_bumped;
// to retain the structure size after making order
kmp_int32 ordered_dummy[KMP_MAX_ORDERED - 3];
dispatch_private_info *next; /* stack of buffers for nest of serial regions */
kmp_uint32 type_size;
#if KMP_USE_HIER_SCHED
@ -153,10 +151,11 @@ template <typename T> struct KMP_ALIGN_CACHE dispatch_private_info_template {
// dispatch_shared_info{32,64}_t types
template <typename T> struct dispatch_shared_infoXX_template {
typedef typename traits_t<T>::unsigned_t UT;
typedef typename traits_t<T>::signed_t ST;
/* chunk index under dynamic, number of idle threads under static-steal;
iteration index otherwise */
volatile UT iteration;
volatile UT num_done;
volatile ST num_done;
volatile UT ordered_iteration;
// to retain the structure size making ordered_iteration scalar
UT ordered_dummy[KMP_MAX_ORDERED - 3];

View File

@ -924,7 +924,7 @@ void __kmp_dispatch_init_hierarchy(ident_t *loc, int n,
T lb, T ub,
typename traits_t<T>::signed_t st) {
int tid, gtid, num_hw_threads, num_threads_per_layer1, active;
int my_buffer_index;
unsigned int my_buffer_index;
kmp_info_t *th;
kmp_team_t *team;
dispatch_private_info_template<T> *pr;

View File

@ -4022,8 +4022,11 @@ static const char *__kmp_parse_single_omp_schedule(const char *name,
else if (!__kmp_strcasecmp_with_sentinel("static", ptr, *delim))
sched = kmp_sch_static;
#if KMP_STATIC_STEAL_ENABLED
else if (!__kmp_strcasecmp_with_sentinel("static_steal", ptr, *delim))
sched = kmp_sch_static_steal;
else if (!__kmp_strcasecmp_with_sentinel("static_steal", ptr, *delim)) {
// replace static_steal with dynamic to better cope with ordered loops
sched = kmp_sch_dynamic_chunked;
sched_modifier = sched_type::kmp_sch_modifier_nonmonotonic;
}
#endif
else {
// If there is no proper schedule kind, then this schedule is invalid

View File

@ -9,7 +9,7 @@
// RUN: env KMP_DISP_NUM_BUFFERS=3 %libomp-run
// RUN: env KMP_DISP_NUM_BUFFERS=4 %libomp-run
// RUN: env KMP_DISP_NUM_BUFFERS=7 %libomp-run
// UNSUPPORTED: clang-11, clang-12, clang-13
// UNSUPPORTED: clang-11, clang-12
#include <stdio.h>
#include <omp.h>
#include <stdlib.h>
@ -78,5 +78,9 @@ int main(int argc, char** argv)
num_failed++;
}
}
if (num_failed == 0)
printf("passed\n");
else
printf("failed %d\n", num_failed);
return num_failed;
}

View File

@ -3,7 +3,7 @@
// RUN: %libomp-run 1 && %libomp-run 2 && %libomp-run 5
// RUN: %libomp-compile -DMY_SCHEDULE=guided && %libomp-run 7
// RUN: %libomp-run 1 && %libomp-run 2 && %libomp-run 5
// UNSUPPORTED: clang-11, clang-12, clang-13
// UNSUPPORTED: clang-11, clang-12
#include <stdio.h>
#include <omp.h>
#include <stdlib.h>
@ -88,5 +88,9 @@ int main(int argc, char** argv)
num_failed++;
}
}
if (num_failed == 0)
printf("passed\n");
else
printf("failed %d\n", num_failed);
return num_failed;
}

View File

@ -8,8 +8,8 @@
// RUN: env OMP_SCHEDULE=auto %libomp-run 4 1
// RUN: env OMP_SCHEDULE=trapezoidal %libomp-run 101 1
// RUN: env OMP_SCHEDULE=trapezoidal,13 %libomp-run 101 13
// RUN: env OMP_SCHEDULE=static_steal %libomp-run 102 1
// RUN: env OMP_SCHEDULE=static_steal,14 %libomp-run 102 14
// RUN: env OMP_SCHEDULE=static_steal %libomp-run 2 1
// RUN: env OMP_SCHEDULE=static_steal,14 %libomp-run 2 14
#include <stdio.h>
#include <stdlib.h>

View File

@ -0,0 +1,28 @@
// RUN: %libomp-compile-and-run
//
#include <stdlib.h>
#include <stdio.h>
#include <math.h>
#include <omp.h>
#define TYPE long
#define MAX_ITER (TYPE)((TYPE)1000000)
#define EVERY (TYPE)((TYPE)100000)
int main(int argc, char* argv[]) {
TYPE x = MAX_ITER;
omp_set_max_active_levels(2);
omp_set_num_threads(2);
#pragma omp parallel for schedule(nonmonotonic:dynamic,1)
for (TYPE i = 0; i < x; i++) {
int tid = omp_get_thread_num();
omp_set_num_threads(1);
#pragma omp parallel proc_bind(spread)
{
if (i % EVERY == (TYPE)0)
printf("Outer thread %d at iter %ld\n", tid, i);
}
}
printf("passed\n");
return 0;
}