Implementation of OpenMP 5.0 mutexinoutset task dependency type.

Differential Revision: https://reviews.llvm.org/D53380

llvm-svn: 346307
This commit is contained in:
Andrey Churbanov 2018-11-07 12:19:57 +00:00
parent 8cc5cf2ee9
commit c334434550
6 changed files with 683 additions and 200 deletions

View File

@ -2160,30 +2160,35 @@ typedef union kmp_depnode kmp_depnode_t;
typedef struct kmp_depnode_list kmp_depnode_list_t;
typedef struct kmp_dephash_entry kmp_dephash_entry_t;
// Compiler sends us this info:
typedef struct kmp_depend_info {
kmp_intptr_t base_addr;
size_t len;
struct {
bool in : 1;
bool out : 1;
bool mtx : 1;
} flags;
} kmp_depend_info_t;
// Internal structures to work with task dependencies:
struct kmp_depnode_list {
kmp_depnode_t *node;
kmp_depnode_list_t *next;
};
// Max number of mutexinoutset dependencies per node
#define MAX_MTX_DEPS 4
typedef struct kmp_base_depnode {
kmp_depnode_list_t *successors;
kmp_task_t *task;
kmp_lock_t lock;
kmp_depnode_list_t *successors; /* used under lock */
kmp_task_t *task; /* non-NULL if depnode is active, used under lock */
kmp_lock_t *mtx_locks[MAX_MTX_DEPS]; /* lock mutexinoutset dependent tasks */
kmp_int32 mtx_num_locks; /* number of locks in mtx_locks array */
kmp_lock_t lock; /* guards shared fields: task, successors */
#if KMP_SUPPORT_GRAPH_OUTPUT
kmp_uint32 id;
#endif
std::atomic<kmp_int32> npredecessors;
std::atomic<kmp_int32> nrefs;
} kmp_base_depnode_t;
@ -2198,6 +2203,9 @@ struct kmp_dephash_entry {
kmp_intptr_t addr;
kmp_depnode_t *last_out;
kmp_depnode_list_t *last_ins;
kmp_depnode_list_t *last_mtxs;
kmp_int32 last_flag;
kmp_lock_t *mtx_lock; /* is referenced by depnodes w/mutexinoutset dep */
kmp_dephash_entry_t *next_in_bucket;
};

View File

@ -37,12 +37,14 @@ static std::atomic<kmp_int32> kmp_node_id_seed = ATOMIC_VAR_INIT(0);
#endif
static void __kmp_init_node(kmp_depnode_t *node) {
node->dn.task = NULL; // set to null initially, it will point to the right
// task once dependences have been processed
node->dn.successors = NULL;
node->dn.task = NULL; // will point to the rigth task
// once dependences have been processed
for (int i = 0; i < MAX_MTX_DEPS; ++i)
node->dn.mtx_locks[i] = NULL;
node->dn.mtx_num_locks = 0;
__kmp_init_lock(&node->dn.lock);
KMP_ATOMIC_ST_RLX(&node->dn.nrefs,
1); // init creates the first reference to the node
KMP_ATOMIC_ST_RLX(&node->dn.nrefs, 1); // init creates the first reference
#ifdef KMP_SUPPORT_GRAPH_OUTPUT
node->dn.id = KMP_ATOMIC_INC(&kmp_node_id_seed);
#endif
@ -94,6 +96,9 @@ static kmp_dephash_t *__kmp_dephash_create(kmp_info_t *thread,
return h;
}
#define ENTRY_LAST_INS 0
#define ENTRY_LAST_MTXS 1
static kmp_dephash_entry *
__kmp_dephash_find(kmp_info_t *thread, kmp_dephash_t *h, kmp_intptr_t addr) {
kmp_int32 bucket = __kmp_dephash_hash(addr, h->size);
@ -115,6 +120,9 @@ __kmp_dephash_find(kmp_info_t *thread, kmp_dephash_t *h, kmp_intptr_t addr) {
entry->addr = addr;
entry->last_out = NULL;
entry->last_ins = NULL;
entry->last_mtxs = NULL;
entry->last_flag = ENTRY_LAST_INS;
entry->mtx_lock = NULL;
entry->next_in_bucket = h->buckets[bucket];
h->buckets[bucket] = entry;
#ifdef KMP_DEBUG
@ -173,6 +181,58 @@ static inline void __kmp_track_dependence(kmp_depnode_t *source,
#endif /* OMPT_SUPPORT && OMPT_OPTIONAL */
}
static inline kmp_int32
__kmp_depnode_link_successor(kmp_int32 gtid, kmp_info_t *thread,
kmp_task_t *task, kmp_depnode_t *node,
kmp_depnode_list_t *plist) {
if (!plist)
return 0;
kmp_int32 npredecessors = 0;
// link node as successor of list elements
for (kmp_depnode_list_t *p = plist; p; p = p->next) {
kmp_depnode_t *dep = p->node;
if (dep->dn.task) {
KMP_ACQUIRE_DEPNODE(gtid, dep);
if (dep->dn.task) {
__kmp_track_dependence(dep, node, task);
dep->dn.successors = __kmp_add_node(thread, dep->dn.successors, node);
KA_TRACE(40, ("__kmp_process_deps: T#%d adding dependence from %p to "
"%p\n",
gtid, KMP_TASK_TO_TASKDATA(dep->dn.task),
KMP_TASK_TO_TASKDATA(task)));
npredecessors++;
}
KMP_RELEASE_DEPNODE(gtid, dep);
}
}
return npredecessors;
}
static inline kmp_int32 __kmp_depnode_link_successor(kmp_int32 gtid,
kmp_info_t *thread,
kmp_task_t *task,
kmp_depnode_t *source,
kmp_depnode_t *sink) {
if (!sink)
return 0;
kmp_int32 npredecessors = 0;
if (sink->dn.task) {
// synchronously add source to sink' list of successors
KMP_ACQUIRE_DEPNODE(gtid, sink);
if (sink->dn.task) {
__kmp_track_dependence(sink, source, task);
sink->dn.successors = __kmp_add_node(thread, sink->dn.successors, source);
KA_TRACE(40, ("__kmp_process_deps: T#%d adding dependence from %p to "
"%p\n",
gtid, KMP_TASK_TO_TASKDATA(sink->dn.task),
KMP_TASK_TO_TASKDATA(task)));
npredecessors++;
}
KMP_RELEASE_DEPNODE(gtid, sink);
}
return npredecessors;
}
template <bool filter>
static inline kmp_int32
__kmp_process_deps(kmp_int32 gtid, kmp_depnode_t *node, kmp_dephash_t *hash,
@ -187,72 +247,106 @@ __kmp_process_deps(kmp_int32 gtid, kmp_depnode_t *node, kmp_dephash_t *hash,
for (kmp_int32 i = 0; i < ndeps; i++) {
const kmp_depend_info_t *dep = &dep_list[i];
KMP_DEBUG_ASSERT(dep->flags.in);
if (filter && dep->base_addr == 0)
continue; // skip filtered entries
kmp_dephash_entry_t *info =
__kmp_dephash_find(thread, hash, dep->base_addr);
kmp_depnode_t *last_out = info->last_out;
kmp_depnode_list_t *last_ins = info->last_ins;
kmp_depnode_list_t *last_mtxs = info->last_mtxs;
if (dep->flags.out && info->last_ins) {
for (kmp_depnode_list_t *p = info->last_ins; p; p = p->next) {
kmp_depnode_t *indep = p->node;
if (indep->dn.task) {
KMP_ACQUIRE_DEPNODE(gtid, indep);
if (indep->dn.task) {
__kmp_track_dependence(indep, node, task);
indep->dn.successors =
__kmp_add_node(thread, indep->dn.successors, node);
KA_TRACE(40, ("__kmp_process_deps<%d>: T#%d adding dependence from "
"%p to %p\n",
filter, gtid, KMP_TASK_TO_TASKDATA(indep->dn.task),
KMP_TASK_TO_TASKDATA(task)));
npredecessors++;
if (dep->flags.out) { // out --> clean lists of ins and mtxs if any
if (last_ins || last_mtxs) {
if (info->last_flag == ENTRY_LAST_INS) { // INS were last
npredecessors +=
__kmp_depnode_link_successor(gtid, thread, task, node, last_ins);
} else { // MTXS were last
npredecessors +=
__kmp_depnode_link_successor(gtid, thread, task, node, last_mtxs);
}
__kmp_depnode_list_free(thread, last_ins);
__kmp_depnode_list_free(thread, last_mtxs);
info->last_ins = NULL;
info->last_mtxs = NULL;
} else {
npredecessors +=
__kmp_depnode_link_successor(gtid, thread, task, node, last_out);
}
__kmp_node_deref(thread, last_out);
if (dep_barrier) {
// if this is a sync point in the serial sequence, then the previous
// outputs are guaranteed to be completed after the execution of this
// task so the previous output nodes can be cleared.
info->last_out = NULL;
} else {
info->last_out = __kmp_node_ref(node);
}
} else if (dep->flags.in) {
// in --> link node to either last_out or last_mtxs, clean earlier deps
if (last_mtxs) {
npredecessors +=
__kmp_depnode_link_successor(gtid, thread, task, node, last_mtxs);
__kmp_node_deref(thread, last_out);
info->last_out = NULL;
if (info->last_flag == ENTRY_LAST_MTXS && last_ins) { // MTXS were last
// clean old INS before creating new list
__kmp_depnode_list_free(thread, last_ins);
info->last_ins = NULL;
}
} else {
// link node as successor of the last_out if any
npredecessors +=
__kmp_depnode_link_successor(gtid, thread, task, node, last_out);
}
info->last_flag = ENTRY_LAST_INS;
info->last_ins = __kmp_add_node(thread, info->last_ins, node);
} else {
KMP_DEBUG_ASSERT(dep->flags.mtx == 1);
// mtx --> link node to either last_out or last_ins, clean earlier deps
if (last_ins) {
npredecessors +=
__kmp_depnode_link_successor(gtid, thread, task, node, last_ins);
__kmp_node_deref(thread, last_out);
info->last_out = NULL;
if (info->last_flag == ENTRY_LAST_INS && last_mtxs) { // INS were last
// clean old MTXS before creating new list
__kmp_depnode_list_free(thread, last_mtxs);
info->last_mtxs = NULL;
}
} else {
// link node as successor of the last_out if any
npredecessors +=
__kmp_depnode_link_successor(gtid, thread, task, node, last_out);
}
info->last_flag = ENTRY_LAST_MTXS;
info->last_mtxs = __kmp_add_node(thread, info->last_mtxs, node);
if (info->mtx_lock == NULL) {
info->mtx_lock = (kmp_lock_t *)__kmp_allocate(sizeof(kmp_lock_t));
__kmp_init_lock(info->mtx_lock);
}
KMP_DEBUG_ASSERT(node->dn.mtx_num_locks < MAX_MTX_DEPS);
kmp_int32 m;
// Save lock in node's array
for (m = 0; m < MAX_MTX_DEPS; ++m) {
// sort pointers in decreasing order to avoid potential livelock
if (node->dn.mtx_locks[m] < info->mtx_lock) {
KMP_DEBUG_ASSERT(node->dn.mtx_locks[node->dn.mtx_num_locks] == NULL);
for (int n = node->dn.mtx_num_locks; n > m; --n) {
// shift right all lesser non-NULL pointers
KMP_DEBUG_ASSERT(node->dn.mtx_locks[n - 1] != NULL);
node->dn.mtx_locks[n] = node->dn.mtx_locks[n - 1];
}
KMP_RELEASE_DEPNODE(gtid, indep);
node->dn.mtx_locks[m] = info->mtx_lock;
break;
}
}
__kmp_depnode_list_free(thread, info->last_ins);
info->last_ins = NULL;
} else if (last_out && last_out->dn.task) {
KMP_ACQUIRE_DEPNODE(gtid, last_out);
if (last_out->dn.task) {
__kmp_track_dependence(last_out, node, task);
last_out->dn.successors =
__kmp_add_node(thread, last_out->dn.successors, node);
KA_TRACE(
40,
("__kmp_process_deps<%d>: T#%d adding dependence from %p to %p\n",
filter, gtid, KMP_TASK_TO_TASKDATA(last_out->dn.task),
KMP_TASK_TO_TASKDATA(task)));
npredecessors++;
}
KMP_RELEASE_DEPNODE(gtid, last_out);
}
if (dep_barrier) {
// if this is a sync point in the serial sequence, then the previous
// outputs are guaranteed to be completed after
// the execution of this task so the previous output nodes can be cleared.
__kmp_node_deref(thread, last_out);
info->last_out = NULL;
} else {
if (dep->flags.out) {
__kmp_node_deref(thread, last_out);
info->last_out = __kmp_node_ref(node);
} else
info->last_ins = __kmp_add_node(thread, info->last_ins, node);
KMP_DEBUG_ASSERT(m < MAX_MTX_DEPS); // must break from loop
node->dn.mtx_num_locks++;
}
}
KA_TRACE(30, ("__kmp_process_deps<%d>: T#%d found %d predecessors\n", filter,
gtid, npredecessors));
return npredecessors;
}
@ -266,8 +360,7 @@ static bool __kmp_check_deps(kmp_int32 gtid, kmp_depnode_t *node,
kmp_depend_info_t *dep_list,
kmp_int32 ndeps_noalias,
kmp_depend_info_t *noalias_dep_list) {
int i;
int i, n_mtxs = 0;
#if KMP_DEBUG
kmp_taskdata_t *taskdata = KMP_TASK_TO_TASKDATA(task);
#endif
@ -279,13 +372,31 @@ static bool __kmp_check_deps(kmp_int32 gtid, kmp_depnode_t *node,
// Filter deps in dep_list
// TODO: Different algorithm for large dep_list ( > 10 ? )
for (i = 0; i < ndeps; i++) {
if (dep_list[i].base_addr != 0)
for (int j = i + 1; j < ndeps; j++)
if (dep_list[i].base_addr != 0) {
for (int j = i + 1; j < ndeps; j++) {
if (dep_list[i].base_addr == dep_list[j].base_addr) {
dep_list[i].flags.in |= dep_list[j].flags.in;
dep_list[i].flags.out |= dep_list[j].flags.out;
dep_list[i].flags.out |=
(dep_list[j].flags.out ||
(dep_list[i].flags.in && dep_list[j].flags.mtx) ||
(dep_list[i].flags.mtx && dep_list[j].flags.in));
dep_list[i].flags.mtx =
dep_list[i].flags.mtx | dep_list[j].flags.mtx &&
!dep_list[i].flags.out;
dep_list[j].base_addr = 0; // Mark j element as void
}
}
if (dep_list[i].flags.mtx) {
// limit number of mtx deps to MAX_MTX_DEPS per node
if (n_mtxs < MAX_MTX_DEPS && task != NULL) {
++n_mtxs;
} else {
dep_list[i].flags.in = 1; // downgrade mutexinoutset to inout
dep_list[i].flags.out = 1;
dep_list[i].flags.mtx = 0;
}
}
}
}
// doesn't need to be atomic as no other thread is going to be accessing this
@ -469,7 +580,7 @@ kmp_int32 __kmpc_omp_task_with_deps(ident_t *loc_ref, kmp_int32 gtid,
KA_TRACE(10, ("__kmpc_omp_task_with_deps(exit): T#%d task had no blocking "
"dependencies : "
"loc=%p task=%p, transferring to __kmpc_omp_task\n",
"loc=%p task=%p, transferring to __kmp_omp_task\n",
gtid, loc_ref, new_taskdata));
kmp_int32 ret = __kmp_omp_task(gtid, new_task, true);

View File

@ -62,7 +62,12 @@ static inline void __kmp_dephash_free_entries(kmp_info_t *thread,
for (kmp_dephash_entry_t *entry = h->buckets[i]; entry; entry = next) {
next = entry->next_in_bucket;
__kmp_depnode_list_free(thread, entry->last_ins);
__kmp_depnode_list_free(thread, entry->last_mtxs);
__kmp_node_deref(thread, entry->last_out);
if (entry->mtx_lock) {
__kmp_destroy_lock(entry->mtx_lock);
__kmp_free(entry->mtx_lock);
}
#if USE_FAST_MEMORY
__kmp_fast_free(thread, entry);
#else

View File

@ -32,7 +32,7 @@ static void __kmp_alloc_task_deque(kmp_info_t *thread,
static int __kmp_realloc_task_threads_data(kmp_info_t *thread,
kmp_task_team_t *task_team);
#ifdef OMP_45_ENABLED
#if OMP_45_ENABLED
static void __kmp_bottom_half_finish_proxy(kmp_int32 gtid, kmp_task_t *ptask);
#endif
@ -251,6 +251,79 @@ static void __kmp_pop_task_stack(kmp_int32 gtid, kmp_info_t *thread,
}
#endif /* BUILD_TIED_TASK_STACK */
// returns 1 if new task is allowed to execute, 0 otherwise
// checks Task Scheduling constraint (if requested) and
// mutexinoutset dependencies if any
static bool __kmp_task_is_allowed(int gtid, const kmp_int32 is_constrained,
const kmp_taskdata_t *tasknew,
const kmp_taskdata_t *taskcurr) {
if (is_constrained && (tasknew->td_flags.tiedness == TASK_TIED)) {
// Check if the candidate obeys the Task Scheduling Constraints (TSC)
// only descendant of all deferred tied tasks can be scheduled, checking
// the last one is enough, as it in turn is the descendant of all others
kmp_taskdata_t *current = taskcurr->td_last_tied;
KMP_DEBUG_ASSERT(current != NULL);
// check if the task is not suspended on barrier
if (current->td_flags.tasktype == TASK_EXPLICIT ||
current->td_taskwait_thread > 0) { // <= 0 on barrier
kmp_int32 level = current->td_level;
kmp_taskdata_t *parent = tasknew->td_parent;
while (parent != current && parent->td_level > level) {
// check generation up to the level of the current task
parent = parent->td_parent;
KMP_DEBUG_ASSERT(parent != NULL);
}
if (parent != current)
return false;
}
}
// Check mutexinoutset dependencies, acquire locks
kmp_depnode_t *node = tasknew->td_depnode;
if (node && (node->dn.mtx_num_locks > 0)) {
for (int i = 0; i < node->dn.mtx_num_locks; ++i) {
KMP_DEBUG_ASSERT(node->dn.mtx_locks[i] != NULL);
if (__kmp_test_lock(node->dn.mtx_locks[i], gtid))
continue;
// could not get the lock, release previous locks
for (int j = i - 1; j >= 0; --j)
__kmp_release_lock(node->dn.mtx_locks[j], gtid);
return false;
}
// negative num_locks means all locks acquired successfully
node->dn.mtx_num_locks = -node->dn.mtx_num_locks;
}
return true;
}
// __kmp_realloc_task_deque:
// Re-allocates a task deque for a particular thread, copies the content from
// the old deque and adjusts the necessary data structures relating to the
// deque. This operation must be done with the deque_lock being held
static void __kmp_realloc_task_deque(kmp_info_t *thread,
kmp_thread_data_t *thread_data) {
kmp_int32 size = TASK_DEQUE_SIZE(thread_data->td);
kmp_int32 new_size = 2 * size;
KE_TRACE(10, ("__kmp_realloc_task_deque: T#%d reallocating deque[from %d to "
"%d] for thread_data %p\n",
__kmp_gtid_from_thread(thread), size, new_size, thread_data));
kmp_taskdata_t **new_deque =
(kmp_taskdata_t **)__kmp_allocate(new_size * sizeof(kmp_taskdata_t *));
int i, j;
for (i = thread_data->td.td_deque_head, j = 0; j < size;
i = (i + 1) & TASK_DEQUE_MASK(thread_data->td), j++)
new_deque[j] = thread_data->td.td_deque[i];
__kmp_free(thread_data->td.td_deque);
thread_data->td.td_deque_head = 0;
thread_data->td.td_deque_tail = size;
thread_data->td.td_deque = new_deque;
thread_data->td.td_deque_size = new_size;
}
// __kmp_push_task: Add a task to the thread's deque
static kmp_int32 __kmp_push_task(kmp_int32 gtid, kmp_task_t *task) {
kmp_info_t *thread = __kmp_threads[gtid];
@ -298,33 +371,47 @@ static kmp_int32 __kmp_push_task(kmp_int32 gtid, kmp_task_t *task) {
__kmp_alloc_task_deque(thread, thread_data);
}
int locked = 0;
// Check if deque is full
if (TCR_4(thread_data->td.td_deque_ntasks) >=
TASK_DEQUE_SIZE(thread_data->td)) {
KA_TRACE(20, ("__kmp_push_task: T#%d deque is full; returning "
"TASK_NOT_PUSHED for task %p\n",
gtid, taskdata));
return TASK_NOT_PUSHED;
if (__kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata,
thread->th.th_current_task)) {
KA_TRACE(20, ("__kmp_push_task: T#%d deque is full; returning "
"TASK_NOT_PUSHED for task %p\n",
gtid, taskdata));
return TASK_NOT_PUSHED;
} else {
__kmp_acquire_bootstrap_lock(&thread_data->td.td_deque_lock);
locked = 1;
// expand deque to push the task which is not allowed to execute
__kmp_realloc_task_deque(thread, thread_data);
}
}
// Lock the deque for the task push operation
__kmp_acquire_bootstrap_lock(&thread_data->td.td_deque_lock);
if (!locked) {
__kmp_acquire_bootstrap_lock(&thread_data->td.td_deque_lock);
#if OMP_45_ENABLED
// Need to recheck as we can get a proxy task from a thread outside of OpenMP
if (TCR_4(thread_data->td.td_deque_ntasks) >=
TASK_DEQUE_SIZE(thread_data->td)) {
__kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock);
KA_TRACE(20, ("__kmp_push_task: T#%d deque is full on 2nd check; returning "
"TASK_NOT_PUSHED for task %p\n",
gtid, taskdata));
return TASK_NOT_PUSHED;
// Need to recheck as we can get a proxy task from thread outside of OpenMP
if (TCR_4(thread_data->td.td_deque_ntasks) >=
TASK_DEQUE_SIZE(thread_data->td)) {
if (__kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata,
thread->th.th_current_task)) {
__kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock);
KA_TRACE(20, ("__kmp_push_task: T#%d deque is full on 2nd check; "
"returning TASK_NOT_PUSHED for task %p\n",
gtid, taskdata));
return TASK_NOT_PUSHED;
} else {
// expand deque to push the task which is not allowed to execute
__kmp_realloc_task_deque(thread, thread_data);
}
}
#endif
}
#else
// Must have room since no thread can add tasks but calling thread
KMP_DEBUG_ASSERT(TCR_4(thread_data->td.td_deque_ntasks) <
TASK_DEQUE_SIZE(thread_data->td));
#endif
thread_data->td.td_deque[thread_data->td.td_deque_tail] =
taskdata; // Push taskdata
@ -678,11 +765,31 @@ static void __kmp_free_task_and_ancestors(kmp_int32 gtid,
taskdata = parent_taskdata;
if (team_serial)
return;
// Stop checking ancestors at implicit task instead of walking up ancestor
// tree to avoid premature deallocation of ancestors.
if (team_serial || taskdata->td_flags.tasktype == TASK_IMPLICIT)
if (taskdata->td_flags.tasktype == TASK_IMPLICIT) {
if (taskdata->td_dephash) { // do we need to cleanup dephash?
int children = KMP_ATOMIC_LD_ACQ(&taskdata->td_incomplete_child_tasks);
kmp_tasking_flags_t flags_old = taskdata->td_flags;
if (children == 0 && flags_old.complete == 1) {
kmp_tasking_flags_t flags_new = flags_old;
flags_new.complete = 0;
if (KMP_COMPARE_AND_STORE_ACQ32(
RCAST(kmp_int32 *, &taskdata->td_flags),
*RCAST(kmp_int32 *, &flags_old),
*RCAST(kmp_int32 *, &flags_new))) {
KA_TRACE(100, ("__kmp_free_task_and_ancestors: T#%d cleans "
"dephash of implicit task %p\n",
gtid, taskdata));
// cleanup dephash of finished implicit task
__kmp_dephash_free_entries(thread, taskdata->td_dephash);
}
}
}
return;
}
// Predecrement simulated by "- 1" calculation
children = KMP_ATOMIC_DEC(&taskdata->td_allocated_child_tasks) - 1;
KMP_DEBUG_ASSERT(children >= 0);
@ -750,6 +857,17 @@ static void __kmp_task_finish(kmp_int32 gtid, kmp_task_t *task,
__ompt_task_finish(task, resumed_task);
#endif
// Check mutexinoutset dependencies, release locks
kmp_depnode_t *node = taskdata->td_depnode;
if (node && (node->dn.mtx_num_locks < 0)) {
// negative num_locks means all locks were acquired
node->dn.mtx_num_locks = -node->dn.mtx_num_locks;
for (int i = node->dn.mtx_num_locks - 1; i >= 0; --i) {
KMP_DEBUG_ASSERT(node->dn.mtx_locks[i] != NULL);
__kmp_release_lock(node->dn.mtx_locks[i], gtid);
}
}
KMP_DEBUG_ASSERT(taskdata->td_flags.complete == 0);
taskdata->td_flags.complete = 1; // mark the task as completed
KMP_DEBUG_ASSERT(taskdata->td_flags.started == 1);
@ -976,8 +1094,24 @@ void __kmp_init_implicit_task(ident_t *loc_ref, kmp_info_t *this_thr,
// thread: thread data structure corresponding to implicit task
void __kmp_finish_implicit_task(kmp_info_t *thread) {
kmp_taskdata_t *task = thread->th.th_current_task;
if (task->td_dephash)
__kmp_dephash_free_entries(thread, task->td_dephash);
if (task->td_dephash) {
int children;
task->td_flags.complete = 1;
children = KMP_ATOMIC_LD_ACQ(&task->td_incomplete_child_tasks);
kmp_tasking_flags_t flags_old = task->td_flags;
if (children == 0 && flags_old.complete == 1) {
kmp_tasking_flags_t flags_new = flags_old;
flags_new.complete = 0;
if (KMP_COMPARE_AND_STORE_ACQ32(RCAST(kmp_int32 *, &task->td_flags),
*RCAST(kmp_int32 *, &flags_old),
*RCAST(kmp_int32 *, &flags_new))) {
KA_TRACE(100, ("__kmp_finish_implicit_task: T#%d cleans "
"dephash of implicit task %p\n",
thread->th.th_info.ds.ds_gtid, task));
__kmp_dephash_free_entries(thread, task->td_dephash);
}
}
}
}
// __kmp_free_implicit_task: Release resources associated to implicit tasks
@ -2229,33 +2363,16 @@ static kmp_task_t *__kmp_remove_my_task(kmp_info_t *thread, kmp_int32 gtid,
TASK_DEQUE_MASK(thread_data->td); // Wrap index.
taskdata = thread_data->td.td_deque[tail];
if (is_constrained && (taskdata->td_flags.tiedness == TASK_TIED)) {
// we need to check if the candidate obeys task scheduling constraint (TSC)
// only descendant of all deferred tied tasks can be scheduled, checking
// the last one is enough, as it in turn is the descendant of all others
kmp_taskdata_t *current = thread->th.th_current_task->td_last_tied;
KMP_DEBUG_ASSERT(current != NULL);
// check if last tied task is not suspended on barrier
if (current->td_flags.tasktype == TASK_EXPLICIT ||
current->td_taskwait_thread > 0) { // <= 0 on barrier
kmp_int32 level = current->td_level;
kmp_taskdata_t *parent = taskdata->td_parent;
while (parent != current && parent->td_level > level) {
parent = parent->td_parent; // check generation up to the level of the
// current task
KMP_DEBUG_ASSERT(parent != NULL);
}
if (parent != current) {
// The TSC does not allow to steal victim task
__kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock);
KA_TRACE(10, ("__kmp_remove_my_task(exit #2): T#%d No tasks to remove: "
"ntasks=%d head=%u tail=%u\n",
gtid, thread_data->td.td_deque_ntasks,
thread_data->td.td_deque_head,
thread_data->td.td_deque_tail));
return NULL;
}
}
if (!__kmp_task_is_allowed(gtid, is_constrained, taskdata,
thread->th.th_current_task)) {
// The TSC does not allow to steal victim task
__kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock);
KA_TRACE(10,
("__kmp_remove_my_task(exit #3): T#%d TSC blocks tail task: "
"ntasks=%d head=%u tail=%u\n",
gtid, thread_data->td.td_deque_ntasks,
thread_data->td.td_deque_head, thread_data->td.td_deque_tail));
return NULL;
}
thread_data->td.td_deque_tail = tail;
@ -2263,7 +2380,7 @@ static kmp_task_t *__kmp_remove_my_task(kmp_info_t *thread, kmp_int32 gtid,
__kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock);
KA_TRACE(10, ("__kmp_remove_my_task(exit #2): T#%d task %p removed: "
KA_TRACE(10, ("__kmp_remove_my_task(exit #4): T#%d task %p removed: "
"ntasks=%d head=%u tail=%u\n",
gtid, taskdata, thread_data->td.td_deque_ntasks,
thread_data->td.td_deque_head, thread_data->td.td_deque_tail));
@ -2284,7 +2401,7 @@ static kmp_task_t *__kmp_steal_task(kmp_info_t *victim_thr, kmp_int32 gtid,
kmp_taskdata_t *taskdata;
kmp_taskdata_t *current;
kmp_thread_data_t *victim_td, *threads_data;
kmp_int32 level, target;
kmp_int32 target;
kmp_int32 victim_tid;
KMP_DEBUG_ASSERT(__kmp_tasking_mode != tskm_immediate_exec);
@ -2324,69 +2441,33 @@ static kmp_task_t *__kmp_steal_task(kmp_info_t *victim_thr, kmp_int32 gtid,
}
KMP_DEBUG_ASSERT(victim_td->td.td_deque != NULL);
current = __kmp_threads[gtid]->th.th_current_task;
taskdata = victim_td->td.td_deque[victim_td->td.td_deque_head];
if (is_constrained && (taskdata->td_flags.tiedness == TASK_TIED)) {
// we need to check if the candidate obeys task scheduling constraint (TSC)
// only descendant of all deferred tied tasks can be scheduled, checking
// the last one is enough, as it in turn is the descendant of all others
current = __kmp_threads[gtid]->th.th_current_task->td_last_tied;
KMP_DEBUG_ASSERT(current != NULL);
// check if last tied task is not suspended on barrier
if (current->td_flags.tasktype == TASK_EXPLICIT ||
current->td_taskwait_thread > 0) { // <= 0 on barrier
level = current->td_level;
kmp_taskdata_t *parent = taskdata->td_parent;
while (parent != current && parent->td_level > level) {
parent = parent->td_parent; // check generation up to the level of the
// current task
KMP_DEBUG_ASSERT(parent != NULL);
}
if (parent != current) {
if (!task_team->tt.tt_untied_task_encountered) {
// The TSC does not allow to steal victim task
__kmp_release_bootstrap_lock(&victim_td->td.td_deque_lock);
KA_TRACE(10,
("__kmp_steal_task(exit #3): T#%d could not steal from "
"T#%d: task_team=%p ntasks=%d head=%u tail=%u\n",
gtid, __kmp_gtid_from_thread(victim_thr), task_team, ntasks,
victim_td->td.td_deque_head, victim_td->td.td_deque_tail));
return NULL;
}
taskdata = NULL; // will check other tasks in victim's deque
}
}
}
if (taskdata != NULL) {
if (__kmp_task_is_allowed(gtid, is_constrained, taskdata, current)) {
// Bump head pointer and Wrap.
victim_td->td.td_deque_head =
(victim_td->td.td_deque_head + 1) & TASK_DEQUE_MASK(victim_td->td);
} else {
if (!task_team->tt.tt_untied_task_encountered) {
// The TSC does not allow to steal victim task
__kmp_release_bootstrap_lock(&victim_td->td.td_deque_lock);
KA_TRACE(10, ("__kmp_steal_task(exit #3): T#%d could not steal from "
"T#%d: task_team=%p ntasks=%d head=%u tail=%u\n",
gtid, __kmp_gtid_from_thread(victim_thr), task_team, ntasks,
victim_td->td.td_deque_head, victim_td->td.td_deque_tail));
return NULL;
}
int i;
// walk through victim's deque trying to steal any task
target = victim_td->td.td_deque_head;
taskdata = NULL;
for (i = 1; i < ntasks; ++i) {
target = (target + 1) & TASK_DEQUE_MASK(victim_td->td);
taskdata = victim_td->td.td_deque[target];
if (taskdata->td_flags.tiedness == TASK_TIED) {
// check if the candidate obeys the TSC
kmp_taskdata_t *parent = taskdata->td_parent;
// check generation up to the level of the current task
while (parent != current && parent->td_level > level) {
parent = parent->td_parent;
KMP_DEBUG_ASSERT(parent != NULL);
}
if (parent != current) {
// The TSC does not allow to steal the candidate
taskdata = NULL;
continue;
} else {
// found victim tied task
break;
}
if (__kmp_task_is_allowed(gtid, is_constrained, taskdata, current)) {
break; // found victim task
} else {
// found victim untied task
break;
taskdata = NULL;
}
}
if (taskdata == NULL) {
@ -2834,35 +2915,6 @@ static void __kmp_alloc_task_deque(kmp_info_t *thread,
thread_data->td.td_deque_size = INITIAL_TASK_DEQUE_SIZE;
}
// __kmp_realloc_task_deque:
// Re-allocates a task deque for a particular thread, copies the content from
// the old deque and adjusts the necessary data structures relating to the
// deque. This operation must be done with a the deque_lock being held
static void __kmp_realloc_task_deque(kmp_info_t *thread,
kmp_thread_data_t *thread_data) {
kmp_int32 size = TASK_DEQUE_SIZE(thread_data->td);
kmp_int32 new_size = 2 * size;
KE_TRACE(10, ("__kmp_realloc_task_deque: T#%d reallocating deque[from %d to "
"%d] for thread_data %p\n",
__kmp_gtid_from_thread(thread), size, new_size, thread_data));
kmp_taskdata_t **new_deque =
(kmp_taskdata_t **)__kmp_allocate(new_size * sizeof(kmp_taskdata_t *));
int i, j;
for (i = thread_data->td.td_deque_head, j = 0; j < size;
i = (i + 1) & TASK_DEQUE_MASK(thread_data->td), j++)
new_deque[j] = thread_data->td.td_deque[i];
__kmp_free(thread_data->td.td_deque);
thread_data->td.td_deque_head = 0;
thread_data->td.td_deque_tail = size;
thread_data->td.td_deque = new_deque;
thread_data->td.td_deque_size = new_size;
}
// __kmp_free_task_deque:
// Deallocates a task deque for a particular thread. Happens at library
// deallocation so don't need to reset all thread data fields.
@ -3422,7 +3474,7 @@ release_and_exit:
/* The finish of the proxy tasks is divided in two pieces:
- the top half is the one that can be done from a thread outside the team
- the bottom half must be run from a them within the team
- the bottom half must be run from a thread within the team
In order to run the bottom half the task gets queued back into one of the
threads of the team. Once the td_incomplete_child_task counter of the parent

View File

@ -0,0 +1,152 @@
// RUN: %libomp-compile-and-run
// Tests OMP 5.0 task dependences "mutexinoutset", emulates compiler codegen
// Mutually exclusive tasks get same input dependency info array
//
// Task tree created:
// task0 task1
// \ / \
// task2 task5
// / \
// task3 task4
// / \
// task6 <-->task7 (these two are mutually exclusive)
// \ /
// task8
//
#include <stdio.h>
#include <omp.h>
#ifdef _WIN32
#include <windows.h>
#define mysleep(n) Sleep(n)
#else
#include <unistd.h>
#define mysleep(n) usleep((n)*1000)
#endif
static int checker = 0; // to check if two tasks run simultaneously
static int err = 0;
#ifndef DELAY
#define DELAY 100
#endif
// ---------------------------------------------------------------------------
// internal data to emulate compiler codegen
typedef int(*entry_t)(int, int**);
typedef struct DEP {
size_t addr;
size_t len;
int flags;
} dep;
typedef struct ID {
int reserved_1;
int flags;
int reserved_2;
int reserved_3;
char *psource;
} id;
int thunk(int gtid, int** pshareds) {
int t = **pshareds;
int th = omp_get_thread_num();
#pragma omp atomic
++checker;
printf("task __%d, th %d\n", t, th);
if (checker != 1) {
err++;
printf("Error1, checker %d != 1\n", checker);
}
mysleep(DELAY);
if (checker != 1) {
err++;
printf("Error2, checker %d != 1\n", checker);
}
#pragma omp atomic
--checker;
return 0;
}
#ifdef __cplusplus
extern "C" {
#endif
int __kmpc_global_thread_num(id*);
extern int** __kmpc_omp_task_alloc(id *loc, int gtid, int flags,
size_t sz, size_t shar, entry_t rtn);
int
__kmpc_omp_task_with_deps(id *loc, int gtid, int **task, int nd, dep *dep_lst,
int nd_noalias, dep *noalias_dep_lst);
static id loc = {0, 2, 0, 0, ";file;func;0;0;;"};
#ifdef __cplusplus
} // extern "C"
#endif
// End of internal data
// ---------------------------------------------------------------------------
int main()
{
int i1,i2,i3,i4;
omp_set_num_threads(2);
#pragma omp parallel
{
#pragma omp single nowait
{
dep sdep[2];
int **ptr;
int gtid = __kmpc_global_thread_num(&loc);
int t = omp_get_thread_num();
#pragma omp task depend(in: i1, i2)
{ int th = omp_get_thread_num();
printf("task 0_%d, th %d\n", t, th);
mysleep(DELAY); }
#pragma omp task depend(in: i1, i3)
{ int th = omp_get_thread_num();
printf("task 1_%d, th %d\n", t, th);
mysleep(DELAY); }
#pragma omp task depend(in: i2) depend(out: i1)
{ int th = omp_get_thread_num();
printf("task 2_%d, th %d\n", t, th);
mysleep(DELAY); }
#pragma omp task depend(in: i1)
{ int th = omp_get_thread_num();
printf("task 3_%d, th %d\n", t, th);
mysleep(DELAY); }
#pragma omp task depend(out: i2)
{ int th = omp_get_thread_num();
printf("task 4_%d, th %d\n", t, th);
mysleep(DELAY+5); } // wait a bit longer than task 3
#pragma omp task depend(out: i3)
{ int th = omp_get_thread_num();
printf("task 5_%d, th %d\n", t, th);
mysleep(DELAY); }
// compiler codegen start
// task1
ptr = __kmpc_omp_task_alloc(&loc, gtid, 0, 28, 16, thunk);
sdep[0].addr = (size_t)&i1;
sdep[0].len = 0; // not used
sdep[0].flags = 4; // mx
sdep[1].addr = (size_t)&i4;
sdep[1].len = 0; // not used
sdep[1].flags = 4; // mx
**ptr = t + 10; // init single shared variable
__kmpc_omp_task_with_deps(&loc, gtid, ptr, 2, sdep, 0, 0);
// task2
ptr = __kmpc_omp_task_alloc(&loc, gtid, 0, 28, 16, thunk);
**ptr = t + 20; // init single shared variable
__kmpc_omp_task_with_deps(&loc, gtid, ptr, 2, sdep, 0, 0);
// compiler codegen end
#pragma omp task depend(in: i1)
{ int th = omp_get_thread_num();
printf("task 8_%d, th %d\n", t, th);
mysleep(DELAY); }
} // single
} // parallel
if (err == 0) {
printf("passed\n");
return 0;
} else {
printf("failed\n");
return 1;
}
}

View File

@ -0,0 +1,155 @@
// RUN: %libomp-compile-and-run
// Tests OMP 5.0 task dependences "mutexinoutset", emulates compiler codegen
// Mutually exclusive tasks get input dependency info array sorted differently
//
// Task tree created:
// task0 task1
// \ / \
// task2 task5
// / \
// task3 task4
// / \
// task6 <-->task7 (these two are mutually exclusive)
// \ /
// task8
//
#include <stdio.h>
#include <omp.h>
#ifdef _WIN32
#include <windows.h>
#define mysleep(n) Sleep(n)
#else
#include <unistd.h>
#define mysleep(n) usleep((n)*1000)
#endif
static int checker = 0; // to check if two tasks run simultaneously
static int err = 0;
#ifndef DELAY
#define DELAY 100
#endif
// ---------------------------------------------------------------------------
// internal data to emulate compiler codegen
typedef int(*entry_t)(int, int**);
typedef struct DEP {
size_t addr;
size_t len;
int flags;
} dep;
typedef struct ID {
int reserved_1;
int flags;
int reserved_2;
int reserved_3;
char *psource;
} id;
int thunk(int gtid, int** pshareds) {
int t = **pshareds;
int th = omp_get_thread_num();
#pragma omp atomic
++checker;
printf("task __%d, th %d\n", t, th);
if (checker != 1) {
err++;
printf("Error1, checker %d != 1\n", checker);
}
mysleep(DELAY);
if (checker != 1) {
err++;
printf("Error2, checker %d != 1\n", checker);
}
#pragma omp atomic
--checker;
return 0;
}
#ifdef __cplusplus
extern "C" {
#endif
int __kmpc_global_thread_num(id*);
extern int** __kmpc_omp_task_alloc(id *loc, int gtid, int flags,
size_t sz, size_t shar, entry_t rtn);
int
__kmpc_omp_task_with_deps(id *loc, int gtid, int **task, int nd, dep *dep_lst,
int nd_noalias, dep *noalias_dep_lst);
static id loc = {0, 2, 0, 0, ";file;func;0;0;;"};
#ifdef __cplusplus
} // extern "C"
#endif
// End of internal data
// ---------------------------------------------------------------------------
int main()
{
int i1,i2,i3,i4;
omp_set_num_threads(2);
#pragma omp parallel
{
#pragma omp single nowait
{
dep sdep[2];
int **ptr;
int gtid = __kmpc_global_thread_num(&loc);
int t = omp_get_thread_num();
#pragma omp task depend(in: i1, i2)
{ int th = omp_get_thread_num();
printf("task 0_%d, th %d\n", t, th);
mysleep(DELAY); }
#pragma omp task depend(in: i1, i3)
{ int th = omp_get_thread_num();
printf("task 1_%d, th %d\n", t, th);
mysleep(DELAY); }
#pragma omp task depend(in: i2) depend(out: i1)
{ int th = omp_get_thread_num();
printf("task 2_%d, th %d\n", t, th);
mysleep(DELAY); }
#pragma omp task depend(in: i1)
{ int th = omp_get_thread_num();
printf("task 3_%d, th %d\n", t, th);
mysleep(DELAY); }
#pragma omp task depend(out: i2)
{ int th = omp_get_thread_num();
printf("task 4_%d, th %d\n", t, th);
mysleep(DELAY+5); } // wait a bit longer than task 3
#pragma omp task depend(out: i3)
{ int th = omp_get_thread_num();
printf("task 5_%d, th %d\n", t, th);
mysleep(DELAY); }
// compiler codegen start
// task1
ptr = __kmpc_omp_task_alloc(&loc, gtid, 0, 28, 16, thunk);
sdep[0].addr = (size_t)&i1;
sdep[0].len = 0; // not used
sdep[0].flags = 4; // mx
sdep[1].addr = (size_t)&i4;
sdep[1].len = 0; // not used
sdep[1].flags = 4; // mx
**ptr = t + 10; // init single shared variable
__kmpc_omp_task_with_deps(&loc, gtid, ptr, 2, sdep, 0, 0);
// task2
ptr = __kmpc_omp_task_alloc(&loc, gtid, 0, 28, 16, thunk);
// reverse pointers - library should sort them uniquely
sdep[0].addr = (size_t)&i4;
sdep[1].addr = (size_t)&i1;
**ptr = t + 20; // init single shared variable
__kmpc_omp_task_with_deps(&loc, gtid, ptr, 2, sdep, 0, 0);
// compiler codegen end
#pragma omp task depend(in: i1)
{ int th = omp_get_thread_num();
printf("task 8_%d, th %d\n", t, th);
mysleep(DELAY); }
} // single
} // parallel
if (err == 0) {
printf("passed\n");
return 0;
} else {
printf("failed\n");
return 1;
}
}