diff --git a/openmp/runtime/src/kmp.h b/openmp/runtime/src/kmp.h index fbb5ab81bf14..9f462a2d85e3 100644 --- a/openmp/runtime/src/kmp.h +++ b/openmp/runtime/src/kmp.h @@ -2160,30 +2160,35 @@ typedef union kmp_depnode kmp_depnode_t; typedef struct kmp_depnode_list kmp_depnode_list_t; typedef struct kmp_dephash_entry kmp_dephash_entry_t; +// Compiler sends us this info: typedef struct kmp_depend_info { kmp_intptr_t base_addr; size_t len; struct { bool in : 1; bool out : 1; + bool mtx : 1; } flags; } kmp_depend_info_t; +// Internal structures to work with task dependencies: struct kmp_depnode_list { kmp_depnode_t *node; kmp_depnode_list_t *next; }; +// Max number of mutexinoutset dependencies per node +#define MAX_MTX_DEPS 4 + typedef struct kmp_base_depnode { - kmp_depnode_list_t *successors; - kmp_task_t *task; - - kmp_lock_t lock; - + kmp_depnode_list_t *successors; /* used under lock */ + kmp_task_t *task; /* non-NULL if depnode is active, used under lock */ + kmp_lock_t *mtx_locks[MAX_MTX_DEPS]; /* lock mutexinoutset dependent tasks */ + kmp_int32 mtx_num_locks; /* number of locks in mtx_locks array */ + kmp_lock_t lock; /* guards shared fields: task, successors */ #if KMP_SUPPORT_GRAPH_OUTPUT kmp_uint32 id; #endif - std::atomic npredecessors; std::atomic nrefs; } kmp_base_depnode_t; @@ -2198,6 +2203,9 @@ struct kmp_dephash_entry { kmp_intptr_t addr; kmp_depnode_t *last_out; kmp_depnode_list_t *last_ins; + kmp_depnode_list_t *last_mtxs; + kmp_int32 last_flag; + kmp_lock_t *mtx_lock; /* is referenced by depnodes w/mutexinoutset dep */ kmp_dephash_entry_t *next_in_bucket; }; diff --git a/openmp/runtime/src/kmp_taskdeps.cpp b/openmp/runtime/src/kmp_taskdeps.cpp index b5d53f1ae688..b48c5b633ce6 100644 --- a/openmp/runtime/src/kmp_taskdeps.cpp +++ b/openmp/runtime/src/kmp_taskdeps.cpp @@ -37,12 +37,14 @@ static std::atomic kmp_node_id_seed = ATOMIC_VAR_INIT(0); #endif static void __kmp_init_node(kmp_depnode_t *node) { - node->dn.task = NULL; // set to null initially, it will point to the right - // task once dependences have been processed node->dn.successors = NULL; + node->dn.task = NULL; // will point to the rigth task + // once dependences have been processed + for (int i = 0; i < MAX_MTX_DEPS; ++i) + node->dn.mtx_locks[i] = NULL; + node->dn.mtx_num_locks = 0; __kmp_init_lock(&node->dn.lock); - KMP_ATOMIC_ST_RLX(&node->dn.nrefs, - 1); // init creates the first reference to the node + KMP_ATOMIC_ST_RLX(&node->dn.nrefs, 1); // init creates the first reference #ifdef KMP_SUPPORT_GRAPH_OUTPUT node->dn.id = KMP_ATOMIC_INC(&kmp_node_id_seed); #endif @@ -94,6 +96,9 @@ static kmp_dephash_t *__kmp_dephash_create(kmp_info_t *thread, return h; } +#define ENTRY_LAST_INS 0 +#define ENTRY_LAST_MTXS 1 + static kmp_dephash_entry * __kmp_dephash_find(kmp_info_t *thread, kmp_dephash_t *h, kmp_intptr_t addr) { kmp_int32 bucket = __kmp_dephash_hash(addr, h->size); @@ -115,6 +120,9 @@ __kmp_dephash_find(kmp_info_t *thread, kmp_dephash_t *h, kmp_intptr_t addr) { entry->addr = addr; entry->last_out = NULL; entry->last_ins = NULL; + entry->last_mtxs = NULL; + entry->last_flag = ENTRY_LAST_INS; + entry->mtx_lock = NULL; entry->next_in_bucket = h->buckets[bucket]; h->buckets[bucket] = entry; #ifdef KMP_DEBUG @@ -173,6 +181,58 @@ static inline void __kmp_track_dependence(kmp_depnode_t *source, #endif /* OMPT_SUPPORT && OMPT_OPTIONAL */ } +static inline kmp_int32 +__kmp_depnode_link_successor(kmp_int32 gtid, kmp_info_t *thread, + kmp_task_t *task, kmp_depnode_t *node, + kmp_depnode_list_t *plist) { + if (!plist) + return 0; + kmp_int32 npredecessors = 0; + // link node as successor of list elements + for (kmp_depnode_list_t *p = plist; p; p = p->next) { + kmp_depnode_t *dep = p->node; + if (dep->dn.task) { + KMP_ACQUIRE_DEPNODE(gtid, dep); + if (dep->dn.task) { + __kmp_track_dependence(dep, node, task); + dep->dn.successors = __kmp_add_node(thread, dep->dn.successors, node); + KA_TRACE(40, ("__kmp_process_deps: T#%d adding dependence from %p to " + "%p\n", + gtid, KMP_TASK_TO_TASKDATA(dep->dn.task), + KMP_TASK_TO_TASKDATA(task))); + npredecessors++; + } + KMP_RELEASE_DEPNODE(gtid, dep); + } + } + return npredecessors; +} + +static inline kmp_int32 __kmp_depnode_link_successor(kmp_int32 gtid, + kmp_info_t *thread, + kmp_task_t *task, + kmp_depnode_t *source, + kmp_depnode_t *sink) { + if (!sink) + return 0; + kmp_int32 npredecessors = 0; + if (sink->dn.task) { + // synchronously add source to sink' list of successors + KMP_ACQUIRE_DEPNODE(gtid, sink); + if (sink->dn.task) { + __kmp_track_dependence(sink, source, task); + sink->dn.successors = __kmp_add_node(thread, sink->dn.successors, source); + KA_TRACE(40, ("__kmp_process_deps: T#%d adding dependence from %p to " + "%p\n", + gtid, KMP_TASK_TO_TASKDATA(sink->dn.task), + KMP_TASK_TO_TASKDATA(task))); + npredecessors++; + } + KMP_RELEASE_DEPNODE(gtid, sink); + } + return npredecessors; +} + template static inline kmp_int32 __kmp_process_deps(kmp_int32 gtid, kmp_depnode_t *node, kmp_dephash_t *hash, @@ -187,72 +247,106 @@ __kmp_process_deps(kmp_int32 gtid, kmp_depnode_t *node, kmp_dephash_t *hash, for (kmp_int32 i = 0; i < ndeps; i++) { const kmp_depend_info_t *dep = &dep_list[i]; - KMP_DEBUG_ASSERT(dep->flags.in); - if (filter && dep->base_addr == 0) continue; // skip filtered entries kmp_dephash_entry_t *info = __kmp_dephash_find(thread, hash, dep->base_addr); kmp_depnode_t *last_out = info->last_out; + kmp_depnode_list_t *last_ins = info->last_ins; + kmp_depnode_list_t *last_mtxs = info->last_mtxs; - if (dep->flags.out && info->last_ins) { - for (kmp_depnode_list_t *p = info->last_ins; p; p = p->next) { - kmp_depnode_t *indep = p->node; - if (indep->dn.task) { - KMP_ACQUIRE_DEPNODE(gtid, indep); - if (indep->dn.task) { - __kmp_track_dependence(indep, node, task); - indep->dn.successors = - __kmp_add_node(thread, indep->dn.successors, node); - KA_TRACE(40, ("__kmp_process_deps<%d>: T#%d adding dependence from " - "%p to %p\n", - filter, gtid, KMP_TASK_TO_TASKDATA(indep->dn.task), - KMP_TASK_TO_TASKDATA(task))); - npredecessors++; + if (dep->flags.out) { // out --> clean lists of ins and mtxs if any + if (last_ins || last_mtxs) { + if (info->last_flag == ENTRY_LAST_INS) { // INS were last + npredecessors += + __kmp_depnode_link_successor(gtid, thread, task, node, last_ins); + } else { // MTXS were last + npredecessors += + __kmp_depnode_link_successor(gtid, thread, task, node, last_mtxs); + } + __kmp_depnode_list_free(thread, last_ins); + __kmp_depnode_list_free(thread, last_mtxs); + info->last_ins = NULL; + info->last_mtxs = NULL; + } else { + npredecessors += + __kmp_depnode_link_successor(gtid, thread, task, node, last_out); + } + __kmp_node_deref(thread, last_out); + if (dep_barrier) { + // if this is a sync point in the serial sequence, then the previous + // outputs are guaranteed to be completed after the execution of this + // task so the previous output nodes can be cleared. + info->last_out = NULL; + } else { + info->last_out = __kmp_node_ref(node); + } + } else if (dep->flags.in) { + // in --> link node to either last_out or last_mtxs, clean earlier deps + if (last_mtxs) { + npredecessors += + __kmp_depnode_link_successor(gtid, thread, task, node, last_mtxs); + __kmp_node_deref(thread, last_out); + info->last_out = NULL; + if (info->last_flag == ENTRY_LAST_MTXS && last_ins) { // MTXS were last + // clean old INS before creating new list + __kmp_depnode_list_free(thread, last_ins); + info->last_ins = NULL; + } + } else { + // link node as successor of the last_out if any + npredecessors += + __kmp_depnode_link_successor(gtid, thread, task, node, last_out); + } + info->last_flag = ENTRY_LAST_INS; + info->last_ins = __kmp_add_node(thread, info->last_ins, node); + } else { + KMP_DEBUG_ASSERT(dep->flags.mtx == 1); + // mtx --> link node to either last_out or last_ins, clean earlier deps + if (last_ins) { + npredecessors += + __kmp_depnode_link_successor(gtid, thread, task, node, last_ins); + __kmp_node_deref(thread, last_out); + info->last_out = NULL; + if (info->last_flag == ENTRY_LAST_INS && last_mtxs) { // INS were last + // clean old MTXS before creating new list + __kmp_depnode_list_free(thread, last_mtxs); + info->last_mtxs = NULL; + } + } else { + // link node as successor of the last_out if any + npredecessors += + __kmp_depnode_link_successor(gtid, thread, task, node, last_out); + } + info->last_flag = ENTRY_LAST_MTXS; + info->last_mtxs = __kmp_add_node(thread, info->last_mtxs, node); + if (info->mtx_lock == NULL) { + info->mtx_lock = (kmp_lock_t *)__kmp_allocate(sizeof(kmp_lock_t)); + __kmp_init_lock(info->mtx_lock); + } + KMP_DEBUG_ASSERT(node->dn.mtx_num_locks < MAX_MTX_DEPS); + kmp_int32 m; + // Save lock in node's array + for (m = 0; m < MAX_MTX_DEPS; ++m) { + // sort pointers in decreasing order to avoid potential livelock + if (node->dn.mtx_locks[m] < info->mtx_lock) { + KMP_DEBUG_ASSERT(node->dn.mtx_locks[node->dn.mtx_num_locks] == NULL); + for (int n = node->dn.mtx_num_locks; n > m; --n) { + // shift right all lesser non-NULL pointers + KMP_DEBUG_ASSERT(node->dn.mtx_locks[n - 1] != NULL); + node->dn.mtx_locks[n] = node->dn.mtx_locks[n - 1]; } - KMP_RELEASE_DEPNODE(gtid, indep); + node->dn.mtx_locks[m] = info->mtx_lock; + break; } } - - __kmp_depnode_list_free(thread, info->last_ins); - info->last_ins = NULL; - - } else if (last_out && last_out->dn.task) { - KMP_ACQUIRE_DEPNODE(gtid, last_out); - if (last_out->dn.task) { - __kmp_track_dependence(last_out, node, task); - last_out->dn.successors = - __kmp_add_node(thread, last_out->dn.successors, node); - KA_TRACE( - 40, - ("__kmp_process_deps<%d>: T#%d adding dependence from %p to %p\n", - filter, gtid, KMP_TASK_TO_TASKDATA(last_out->dn.task), - KMP_TASK_TO_TASKDATA(task))); - - npredecessors++; - } - KMP_RELEASE_DEPNODE(gtid, last_out); - } - - if (dep_barrier) { - // if this is a sync point in the serial sequence, then the previous - // outputs are guaranteed to be completed after - // the execution of this task so the previous output nodes can be cleared. - __kmp_node_deref(thread, last_out); - info->last_out = NULL; - } else { - if (dep->flags.out) { - __kmp_node_deref(thread, last_out); - info->last_out = __kmp_node_ref(node); - } else - info->last_ins = __kmp_add_node(thread, info->last_ins, node); + KMP_DEBUG_ASSERT(m < MAX_MTX_DEPS); // must break from loop + node->dn.mtx_num_locks++; } } - KA_TRACE(30, ("__kmp_process_deps<%d>: T#%d found %d predecessors\n", filter, gtid, npredecessors)); - return npredecessors; } @@ -266,8 +360,7 @@ static bool __kmp_check_deps(kmp_int32 gtid, kmp_depnode_t *node, kmp_depend_info_t *dep_list, kmp_int32 ndeps_noalias, kmp_depend_info_t *noalias_dep_list) { - int i; - + int i, n_mtxs = 0; #if KMP_DEBUG kmp_taskdata_t *taskdata = KMP_TASK_TO_TASKDATA(task); #endif @@ -279,13 +372,31 @@ static bool __kmp_check_deps(kmp_int32 gtid, kmp_depnode_t *node, // Filter deps in dep_list // TODO: Different algorithm for large dep_list ( > 10 ? ) for (i = 0; i < ndeps; i++) { - if (dep_list[i].base_addr != 0) - for (int j = i + 1; j < ndeps; j++) + if (dep_list[i].base_addr != 0) { + for (int j = i + 1; j < ndeps; j++) { if (dep_list[i].base_addr == dep_list[j].base_addr) { dep_list[i].flags.in |= dep_list[j].flags.in; - dep_list[i].flags.out |= dep_list[j].flags.out; + dep_list[i].flags.out |= + (dep_list[j].flags.out || + (dep_list[i].flags.in && dep_list[j].flags.mtx) || + (dep_list[i].flags.mtx && dep_list[j].flags.in)); + dep_list[i].flags.mtx = + dep_list[i].flags.mtx | dep_list[j].flags.mtx && + !dep_list[i].flags.out; dep_list[j].base_addr = 0; // Mark j element as void } + } + if (dep_list[i].flags.mtx) { + // limit number of mtx deps to MAX_MTX_DEPS per node + if (n_mtxs < MAX_MTX_DEPS && task != NULL) { + ++n_mtxs; + } else { + dep_list[i].flags.in = 1; // downgrade mutexinoutset to inout + dep_list[i].flags.out = 1; + dep_list[i].flags.mtx = 0; + } + } + } } // doesn't need to be atomic as no other thread is going to be accessing this @@ -469,7 +580,7 @@ kmp_int32 __kmpc_omp_task_with_deps(ident_t *loc_ref, kmp_int32 gtid, KA_TRACE(10, ("__kmpc_omp_task_with_deps(exit): T#%d task had no blocking " "dependencies : " - "loc=%p task=%p, transferring to __kmpc_omp_task\n", + "loc=%p task=%p, transferring to __kmp_omp_task\n", gtid, loc_ref, new_taskdata)); kmp_int32 ret = __kmp_omp_task(gtid, new_task, true); diff --git a/openmp/runtime/src/kmp_taskdeps.h b/openmp/runtime/src/kmp_taskdeps.h index 2e79b1cafc91..8496884d1c3a 100644 --- a/openmp/runtime/src/kmp_taskdeps.h +++ b/openmp/runtime/src/kmp_taskdeps.h @@ -62,7 +62,12 @@ static inline void __kmp_dephash_free_entries(kmp_info_t *thread, for (kmp_dephash_entry_t *entry = h->buckets[i]; entry; entry = next) { next = entry->next_in_bucket; __kmp_depnode_list_free(thread, entry->last_ins); + __kmp_depnode_list_free(thread, entry->last_mtxs); __kmp_node_deref(thread, entry->last_out); + if (entry->mtx_lock) { + __kmp_destroy_lock(entry->mtx_lock); + __kmp_free(entry->mtx_lock); + } #if USE_FAST_MEMORY __kmp_fast_free(thread, entry); #else diff --git a/openmp/runtime/src/kmp_tasking.cpp b/openmp/runtime/src/kmp_tasking.cpp index d2b5ddeb2ffb..2d7468659c58 100644 --- a/openmp/runtime/src/kmp_tasking.cpp +++ b/openmp/runtime/src/kmp_tasking.cpp @@ -32,7 +32,7 @@ static void __kmp_alloc_task_deque(kmp_info_t *thread, static int __kmp_realloc_task_threads_data(kmp_info_t *thread, kmp_task_team_t *task_team); -#ifdef OMP_45_ENABLED +#if OMP_45_ENABLED static void __kmp_bottom_half_finish_proxy(kmp_int32 gtid, kmp_task_t *ptask); #endif @@ -251,6 +251,79 @@ static void __kmp_pop_task_stack(kmp_int32 gtid, kmp_info_t *thread, } #endif /* BUILD_TIED_TASK_STACK */ +// returns 1 if new task is allowed to execute, 0 otherwise +// checks Task Scheduling constraint (if requested) and +// mutexinoutset dependencies if any +static bool __kmp_task_is_allowed(int gtid, const kmp_int32 is_constrained, + const kmp_taskdata_t *tasknew, + const kmp_taskdata_t *taskcurr) { + if (is_constrained && (tasknew->td_flags.tiedness == TASK_TIED)) { + // Check if the candidate obeys the Task Scheduling Constraints (TSC) + // only descendant of all deferred tied tasks can be scheduled, checking + // the last one is enough, as it in turn is the descendant of all others + kmp_taskdata_t *current = taskcurr->td_last_tied; + KMP_DEBUG_ASSERT(current != NULL); + // check if the task is not suspended on barrier + if (current->td_flags.tasktype == TASK_EXPLICIT || + current->td_taskwait_thread > 0) { // <= 0 on barrier + kmp_int32 level = current->td_level; + kmp_taskdata_t *parent = tasknew->td_parent; + while (parent != current && parent->td_level > level) { + // check generation up to the level of the current task + parent = parent->td_parent; + KMP_DEBUG_ASSERT(parent != NULL); + } + if (parent != current) + return false; + } + } + // Check mutexinoutset dependencies, acquire locks + kmp_depnode_t *node = tasknew->td_depnode; + if (node && (node->dn.mtx_num_locks > 0)) { + for (int i = 0; i < node->dn.mtx_num_locks; ++i) { + KMP_DEBUG_ASSERT(node->dn.mtx_locks[i] != NULL); + if (__kmp_test_lock(node->dn.mtx_locks[i], gtid)) + continue; + // could not get the lock, release previous locks + for (int j = i - 1; j >= 0; --j) + __kmp_release_lock(node->dn.mtx_locks[j], gtid); + return false; + } + // negative num_locks means all locks acquired successfully + node->dn.mtx_num_locks = -node->dn.mtx_num_locks; + } + return true; +} + +// __kmp_realloc_task_deque: +// Re-allocates a task deque for a particular thread, copies the content from +// the old deque and adjusts the necessary data structures relating to the +// deque. This operation must be done with the deque_lock being held +static void __kmp_realloc_task_deque(kmp_info_t *thread, + kmp_thread_data_t *thread_data) { + kmp_int32 size = TASK_DEQUE_SIZE(thread_data->td); + kmp_int32 new_size = 2 * size; + + KE_TRACE(10, ("__kmp_realloc_task_deque: T#%d reallocating deque[from %d to " + "%d] for thread_data %p\n", + __kmp_gtid_from_thread(thread), size, new_size, thread_data)); + + kmp_taskdata_t **new_deque = + (kmp_taskdata_t **)__kmp_allocate(new_size * sizeof(kmp_taskdata_t *)); + + int i, j; + for (i = thread_data->td.td_deque_head, j = 0; j < size; + i = (i + 1) & TASK_DEQUE_MASK(thread_data->td), j++) + new_deque[j] = thread_data->td.td_deque[i]; + + __kmp_free(thread_data->td.td_deque); + + thread_data->td.td_deque_head = 0; + thread_data->td.td_deque_tail = size; + thread_data->td.td_deque = new_deque; + thread_data->td.td_deque_size = new_size; +} + // __kmp_push_task: Add a task to the thread's deque static kmp_int32 __kmp_push_task(kmp_int32 gtid, kmp_task_t *task) { kmp_info_t *thread = __kmp_threads[gtid]; @@ -298,33 +371,47 @@ static kmp_int32 __kmp_push_task(kmp_int32 gtid, kmp_task_t *task) { __kmp_alloc_task_deque(thread, thread_data); } + int locked = 0; // Check if deque is full if (TCR_4(thread_data->td.td_deque_ntasks) >= TASK_DEQUE_SIZE(thread_data->td)) { - KA_TRACE(20, ("__kmp_push_task: T#%d deque is full; returning " - "TASK_NOT_PUSHED for task %p\n", - gtid, taskdata)); - return TASK_NOT_PUSHED; + if (__kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata, + thread->th.th_current_task)) { + KA_TRACE(20, ("__kmp_push_task: T#%d deque is full; returning " + "TASK_NOT_PUSHED for task %p\n", + gtid, taskdata)); + return TASK_NOT_PUSHED; + } else { + __kmp_acquire_bootstrap_lock(&thread_data->td.td_deque_lock); + locked = 1; + // expand deque to push the task which is not allowed to execute + __kmp_realloc_task_deque(thread, thread_data); + } } - // Lock the deque for the task push operation - __kmp_acquire_bootstrap_lock(&thread_data->td.td_deque_lock); - + if (!locked) { + __kmp_acquire_bootstrap_lock(&thread_data->td.td_deque_lock); #if OMP_45_ENABLED - // Need to recheck as we can get a proxy task from a thread outside of OpenMP - if (TCR_4(thread_data->td.td_deque_ntasks) >= - TASK_DEQUE_SIZE(thread_data->td)) { - __kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock); - KA_TRACE(20, ("__kmp_push_task: T#%d deque is full on 2nd check; returning " - "TASK_NOT_PUSHED for task %p\n", - gtid, taskdata)); - return TASK_NOT_PUSHED; + // Need to recheck as we can get a proxy task from thread outside of OpenMP + if (TCR_4(thread_data->td.td_deque_ntasks) >= + TASK_DEQUE_SIZE(thread_data->td)) { + if (__kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata, + thread->th.th_current_task)) { + __kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock); + KA_TRACE(20, ("__kmp_push_task: T#%d deque is full on 2nd check; " + "returning TASK_NOT_PUSHED for task %p\n", + gtid, taskdata)); + return TASK_NOT_PUSHED; + } else { + // expand deque to push the task which is not allowed to execute + __kmp_realloc_task_deque(thread, thread_data); + } + } +#endif } -#else // Must have room since no thread can add tasks but calling thread KMP_DEBUG_ASSERT(TCR_4(thread_data->td.td_deque_ntasks) < TASK_DEQUE_SIZE(thread_data->td)); -#endif thread_data->td.td_deque[thread_data->td.td_deque_tail] = taskdata; // Push taskdata @@ -678,11 +765,31 @@ static void __kmp_free_task_and_ancestors(kmp_int32 gtid, taskdata = parent_taskdata; + if (team_serial) + return; // Stop checking ancestors at implicit task instead of walking up ancestor // tree to avoid premature deallocation of ancestors. - if (team_serial || taskdata->td_flags.tasktype == TASK_IMPLICIT) + if (taskdata->td_flags.tasktype == TASK_IMPLICIT) { + if (taskdata->td_dephash) { // do we need to cleanup dephash? + int children = KMP_ATOMIC_LD_ACQ(&taskdata->td_incomplete_child_tasks); + kmp_tasking_flags_t flags_old = taskdata->td_flags; + if (children == 0 && flags_old.complete == 1) { + kmp_tasking_flags_t flags_new = flags_old; + flags_new.complete = 0; + if (KMP_COMPARE_AND_STORE_ACQ32( + RCAST(kmp_int32 *, &taskdata->td_flags), + *RCAST(kmp_int32 *, &flags_old), + *RCAST(kmp_int32 *, &flags_new))) { + KA_TRACE(100, ("__kmp_free_task_and_ancestors: T#%d cleans " + "dephash of implicit task %p\n", + gtid, taskdata)); + // cleanup dephash of finished implicit task + __kmp_dephash_free_entries(thread, taskdata->td_dephash); + } + } + } return; - + } // Predecrement simulated by "- 1" calculation children = KMP_ATOMIC_DEC(&taskdata->td_allocated_child_tasks) - 1; KMP_DEBUG_ASSERT(children >= 0); @@ -750,6 +857,17 @@ static void __kmp_task_finish(kmp_int32 gtid, kmp_task_t *task, __ompt_task_finish(task, resumed_task); #endif + // Check mutexinoutset dependencies, release locks + kmp_depnode_t *node = taskdata->td_depnode; + if (node && (node->dn.mtx_num_locks < 0)) { + // negative num_locks means all locks were acquired + node->dn.mtx_num_locks = -node->dn.mtx_num_locks; + for (int i = node->dn.mtx_num_locks - 1; i >= 0; --i) { + KMP_DEBUG_ASSERT(node->dn.mtx_locks[i] != NULL); + __kmp_release_lock(node->dn.mtx_locks[i], gtid); + } + } + KMP_DEBUG_ASSERT(taskdata->td_flags.complete == 0); taskdata->td_flags.complete = 1; // mark the task as completed KMP_DEBUG_ASSERT(taskdata->td_flags.started == 1); @@ -976,8 +1094,24 @@ void __kmp_init_implicit_task(ident_t *loc_ref, kmp_info_t *this_thr, // thread: thread data structure corresponding to implicit task void __kmp_finish_implicit_task(kmp_info_t *thread) { kmp_taskdata_t *task = thread->th.th_current_task; - if (task->td_dephash) - __kmp_dephash_free_entries(thread, task->td_dephash); + if (task->td_dephash) { + int children; + task->td_flags.complete = 1; + children = KMP_ATOMIC_LD_ACQ(&task->td_incomplete_child_tasks); + kmp_tasking_flags_t flags_old = task->td_flags; + if (children == 0 && flags_old.complete == 1) { + kmp_tasking_flags_t flags_new = flags_old; + flags_new.complete = 0; + if (KMP_COMPARE_AND_STORE_ACQ32(RCAST(kmp_int32 *, &task->td_flags), + *RCAST(kmp_int32 *, &flags_old), + *RCAST(kmp_int32 *, &flags_new))) { + KA_TRACE(100, ("__kmp_finish_implicit_task: T#%d cleans " + "dephash of implicit task %p\n", + thread->th.th_info.ds.ds_gtid, task)); + __kmp_dephash_free_entries(thread, task->td_dephash); + } + } + } } // __kmp_free_implicit_task: Release resources associated to implicit tasks @@ -2229,33 +2363,16 @@ static kmp_task_t *__kmp_remove_my_task(kmp_info_t *thread, kmp_int32 gtid, TASK_DEQUE_MASK(thread_data->td); // Wrap index. taskdata = thread_data->td.td_deque[tail]; - if (is_constrained && (taskdata->td_flags.tiedness == TASK_TIED)) { - // we need to check if the candidate obeys task scheduling constraint (TSC) - // only descendant of all deferred tied tasks can be scheduled, checking - // the last one is enough, as it in turn is the descendant of all others - kmp_taskdata_t *current = thread->th.th_current_task->td_last_tied; - KMP_DEBUG_ASSERT(current != NULL); - // check if last tied task is not suspended on barrier - if (current->td_flags.tasktype == TASK_EXPLICIT || - current->td_taskwait_thread > 0) { // <= 0 on barrier - kmp_int32 level = current->td_level; - kmp_taskdata_t *parent = taskdata->td_parent; - while (parent != current && parent->td_level > level) { - parent = parent->td_parent; // check generation up to the level of the - // current task - KMP_DEBUG_ASSERT(parent != NULL); - } - if (parent != current) { - // The TSC does not allow to steal victim task - __kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock); - KA_TRACE(10, ("__kmp_remove_my_task(exit #2): T#%d No tasks to remove: " - "ntasks=%d head=%u tail=%u\n", - gtid, thread_data->td.td_deque_ntasks, - thread_data->td.td_deque_head, - thread_data->td.td_deque_tail)); - return NULL; - } - } + if (!__kmp_task_is_allowed(gtid, is_constrained, taskdata, + thread->th.th_current_task)) { + // The TSC does not allow to steal victim task + __kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock); + KA_TRACE(10, + ("__kmp_remove_my_task(exit #3): T#%d TSC blocks tail task: " + "ntasks=%d head=%u tail=%u\n", + gtid, thread_data->td.td_deque_ntasks, + thread_data->td.td_deque_head, thread_data->td.td_deque_tail)); + return NULL; } thread_data->td.td_deque_tail = tail; @@ -2263,7 +2380,7 @@ static kmp_task_t *__kmp_remove_my_task(kmp_info_t *thread, kmp_int32 gtid, __kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock); - KA_TRACE(10, ("__kmp_remove_my_task(exit #2): T#%d task %p removed: " + KA_TRACE(10, ("__kmp_remove_my_task(exit #4): T#%d task %p removed: " "ntasks=%d head=%u tail=%u\n", gtid, taskdata, thread_data->td.td_deque_ntasks, thread_data->td.td_deque_head, thread_data->td.td_deque_tail)); @@ -2284,7 +2401,7 @@ static kmp_task_t *__kmp_steal_task(kmp_info_t *victim_thr, kmp_int32 gtid, kmp_taskdata_t *taskdata; kmp_taskdata_t *current; kmp_thread_data_t *victim_td, *threads_data; - kmp_int32 level, target; + kmp_int32 target; kmp_int32 victim_tid; KMP_DEBUG_ASSERT(__kmp_tasking_mode != tskm_immediate_exec); @@ -2324,69 +2441,33 @@ static kmp_task_t *__kmp_steal_task(kmp_info_t *victim_thr, kmp_int32 gtid, } KMP_DEBUG_ASSERT(victim_td->td.td_deque != NULL); - + current = __kmp_threads[gtid]->th.th_current_task; taskdata = victim_td->td.td_deque[victim_td->td.td_deque_head]; - if (is_constrained && (taskdata->td_flags.tiedness == TASK_TIED)) { - // we need to check if the candidate obeys task scheduling constraint (TSC) - // only descendant of all deferred tied tasks can be scheduled, checking - // the last one is enough, as it in turn is the descendant of all others - current = __kmp_threads[gtid]->th.th_current_task->td_last_tied; - KMP_DEBUG_ASSERT(current != NULL); - // check if last tied task is not suspended on barrier - if (current->td_flags.tasktype == TASK_EXPLICIT || - current->td_taskwait_thread > 0) { // <= 0 on barrier - level = current->td_level; - kmp_taskdata_t *parent = taskdata->td_parent; - while (parent != current && parent->td_level > level) { - parent = parent->td_parent; // check generation up to the level of the - // current task - KMP_DEBUG_ASSERT(parent != NULL); - } - if (parent != current) { - if (!task_team->tt.tt_untied_task_encountered) { - // The TSC does not allow to steal victim task - __kmp_release_bootstrap_lock(&victim_td->td.td_deque_lock); - KA_TRACE(10, - ("__kmp_steal_task(exit #3): T#%d could not steal from " - "T#%d: task_team=%p ntasks=%d head=%u tail=%u\n", - gtid, __kmp_gtid_from_thread(victim_thr), task_team, ntasks, - victim_td->td.td_deque_head, victim_td->td.td_deque_tail)); - return NULL; - } - taskdata = NULL; // will check other tasks in victim's deque - } - } - } - if (taskdata != NULL) { + if (__kmp_task_is_allowed(gtid, is_constrained, taskdata, current)) { // Bump head pointer and Wrap. victim_td->td.td_deque_head = (victim_td->td.td_deque_head + 1) & TASK_DEQUE_MASK(victim_td->td); } else { + if (!task_team->tt.tt_untied_task_encountered) { + // The TSC does not allow to steal victim task + __kmp_release_bootstrap_lock(&victim_td->td.td_deque_lock); + KA_TRACE(10, ("__kmp_steal_task(exit #3): T#%d could not steal from " + "T#%d: task_team=%p ntasks=%d head=%u tail=%u\n", + gtid, __kmp_gtid_from_thread(victim_thr), task_team, ntasks, + victim_td->td.td_deque_head, victim_td->td.td_deque_tail)); + return NULL; + } int i; // walk through victim's deque trying to steal any task target = victim_td->td.td_deque_head; + taskdata = NULL; for (i = 1; i < ntasks; ++i) { target = (target + 1) & TASK_DEQUE_MASK(victim_td->td); taskdata = victim_td->td.td_deque[target]; - if (taskdata->td_flags.tiedness == TASK_TIED) { - // check if the candidate obeys the TSC - kmp_taskdata_t *parent = taskdata->td_parent; - // check generation up to the level of the current task - while (parent != current && parent->td_level > level) { - parent = parent->td_parent; - KMP_DEBUG_ASSERT(parent != NULL); - } - if (parent != current) { - // The TSC does not allow to steal the candidate - taskdata = NULL; - continue; - } else { - // found victim tied task - break; - } + if (__kmp_task_is_allowed(gtid, is_constrained, taskdata, current)) { + break; // found victim task } else { - // found victim untied task - break; + taskdata = NULL; } } if (taskdata == NULL) { @@ -2834,35 +2915,6 @@ static void __kmp_alloc_task_deque(kmp_info_t *thread, thread_data->td.td_deque_size = INITIAL_TASK_DEQUE_SIZE; } -// __kmp_realloc_task_deque: -// Re-allocates a task deque for a particular thread, copies the content from -// the old deque and adjusts the necessary data structures relating to the -// deque. This operation must be done with a the deque_lock being held -static void __kmp_realloc_task_deque(kmp_info_t *thread, - kmp_thread_data_t *thread_data) { - kmp_int32 size = TASK_DEQUE_SIZE(thread_data->td); - kmp_int32 new_size = 2 * size; - - KE_TRACE(10, ("__kmp_realloc_task_deque: T#%d reallocating deque[from %d to " - "%d] for thread_data %p\n", - __kmp_gtid_from_thread(thread), size, new_size, thread_data)); - - kmp_taskdata_t **new_deque = - (kmp_taskdata_t **)__kmp_allocate(new_size * sizeof(kmp_taskdata_t *)); - - int i, j; - for (i = thread_data->td.td_deque_head, j = 0; j < size; - i = (i + 1) & TASK_DEQUE_MASK(thread_data->td), j++) - new_deque[j] = thread_data->td.td_deque[i]; - - __kmp_free(thread_data->td.td_deque); - - thread_data->td.td_deque_head = 0; - thread_data->td.td_deque_tail = size; - thread_data->td.td_deque = new_deque; - thread_data->td.td_deque_size = new_size; -} - // __kmp_free_task_deque: // Deallocates a task deque for a particular thread. Happens at library // deallocation so don't need to reset all thread data fields. @@ -3422,7 +3474,7 @@ release_and_exit: /* The finish of the proxy tasks is divided in two pieces: - the top half is the one that can be done from a thread outside the team - - the bottom half must be run from a them within the team + - the bottom half must be run from a thread within the team In order to run the bottom half the task gets queued back into one of the threads of the team. Once the td_incomplete_child_task counter of the parent diff --git a/openmp/runtime/test/tasking/omp50_task_depend_mtx.c b/openmp/runtime/test/tasking/omp50_task_depend_mtx.c new file mode 100644 index 000000000000..79c270e94a7d --- /dev/null +++ b/openmp/runtime/test/tasking/omp50_task_depend_mtx.c @@ -0,0 +1,152 @@ +// RUN: %libomp-compile-and-run + +// Tests OMP 5.0 task dependences "mutexinoutset", emulates compiler codegen +// Mutually exclusive tasks get same input dependency info array +// +// Task tree created: +// task0 task1 +// \ / \ +// task2 task5 +// / \ +// task3 task4 +// / \ +// task6 <-->task7 (these two are mutually exclusive) +// \ / +// task8 +// +#include +#include + +#ifdef _WIN32 +#include +#define mysleep(n) Sleep(n) +#else +#include +#define mysleep(n) usleep((n)*1000) +#endif + +static int checker = 0; // to check if two tasks run simultaneously +static int err = 0; +#ifndef DELAY +#define DELAY 100 +#endif + +// --------------------------------------------------------------------------- +// internal data to emulate compiler codegen +typedef int(*entry_t)(int, int**); +typedef struct DEP { + size_t addr; + size_t len; + int flags; +} dep; +typedef struct ID { + int reserved_1; + int flags; + int reserved_2; + int reserved_3; + char *psource; +} id; + +int thunk(int gtid, int** pshareds) { + int t = **pshareds; + int th = omp_get_thread_num(); + #pragma omp atomic + ++checker; + printf("task __%d, th %d\n", t, th); + if (checker != 1) { + err++; + printf("Error1, checker %d != 1\n", checker); + } + mysleep(DELAY); + if (checker != 1) { + err++; + printf("Error2, checker %d != 1\n", checker); + } + #pragma omp atomic + --checker; + return 0; +} + +#ifdef __cplusplus +extern "C" { +#endif +int __kmpc_global_thread_num(id*); +extern int** __kmpc_omp_task_alloc(id *loc, int gtid, int flags, + size_t sz, size_t shar, entry_t rtn); +int +__kmpc_omp_task_with_deps(id *loc, int gtid, int **task, int nd, dep *dep_lst, + int nd_noalias, dep *noalias_dep_lst); +static id loc = {0, 2, 0, 0, ";file;func;0;0;;"}; +#ifdef __cplusplus +} // extern "C" +#endif +// End of internal data +// --------------------------------------------------------------------------- + +int main() +{ + int i1,i2,i3,i4; + omp_set_num_threads(2); + #pragma omp parallel + { + #pragma omp single nowait + { + dep sdep[2]; + int **ptr; + int gtid = __kmpc_global_thread_num(&loc); + int t = omp_get_thread_num(); + #pragma omp task depend(in: i1, i2) + { int th = omp_get_thread_num(); + printf("task 0_%d, th %d\n", t, th); + mysleep(DELAY); } + #pragma omp task depend(in: i1, i3) + { int th = omp_get_thread_num(); + printf("task 1_%d, th %d\n", t, th); + mysleep(DELAY); } + #pragma omp task depend(in: i2) depend(out: i1) + { int th = omp_get_thread_num(); + printf("task 2_%d, th %d\n", t, th); + mysleep(DELAY); } + #pragma omp task depend(in: i1) + { int th = omp_get_thread_num(); + printf("task 3_%d, th %d\n", t, th); + mysleep(DELAY); } + #pragma omp task depend(out: i2) + { int th = omp_get_thread_num(); + printf("task 4_%d, th %d\n", t, th); + mysleep(DELAY+5); } // wait a bit longer than task 3 + #pragma omp task depend(out: i3) + { int th = omp_get_thread_num(); + printf("task 5_%d, th %d\n", t, th); + mysleep(DELAY); } +// compiler codegen start + // task1 + ptr = __kmpc_omp_task_alloc(&loc, gtid, 0, 28, 16, thunk); + sdep[0].addr = (size_t)&i1; + sdep[0].len = 0; // not used + sdep[0].flags = 4; // mx + sdep[1].addr = (size_t)&i4; + sdep[1].len = 0; // not used + sdep[1].flags = 4; // mx + **ptr = t + 10; // init single shared variable + __kmpc_omp_task_with_deps(&loc, gtid, ptr, 2, sdep, 0, 0); + + // task2 + ptr = __kmpc_omp_task_alloc(&loc, gtid, 0, 28, 16, thunk); + **ptr = t + 20; // init single shared variable + __kmpc_omp_task_with_deps(&loc, gtid, ptr, 2, sdep, 0, 0); +// compiler codegen end + #pragma omp task depend(in: i1) + { int th = omp_get_thread_num(); + printf("task 8_%d, th %d\n", t, th); + mysleep(DELAY); } + } // single + } // parallel + if (err == 0) { + printf("passed\n"); + return 0; + } else { + printf("failed\n"); + return 1; + } +} diff --git a/openmp/runtime/test/tasking/omp50_task_depend_mtx2.c b/openmp/runtime/test/tasking/omp50_task_depend_mtx2.c new file mode 100644 index 000000000000..ec8a7d1cab69 --- /dev/null +++ b/openmp/runtime/test/tasking/omp50_task_depend_mtx2.c @@ -0,0 +1,155 @@ +// RUN: %libomp-compile-and-run + +// Tests OMP 5.0 task dependences "mutexinoutset", emulates compiler codegen +// Mutually exclusive tasks get input dependency info array sorted differently +// +// Task tree created: +// task0 task1 +// \ / \ +// task2 task5 +// / \ +// task3 task4 +// / \ +// task6 <-->task7 (these two are mutually exclusive) +// \ / +// task8 +// +#include +#include + +#ifdef _WIN32 +#include +#define mysleep(n) Sleep(n) +#else +#include +#define mysleep(n) usleep((n)*1000) +#endif + +static int checker = 0; // to check if two tasks run simultaneously +static int err = 0; +#ifndef DELAY +#define DELAY 100 +#endif + +// --------------------------------------------------------------------------- +// internal data to emulate compiler codegen +typedef int(*entry_t)(int, int**); +typedef struct DEP { + size_t addr; + size_t len; + int flags; +} dep; +typedef struct ID { + int reserved_1; + int flags; + int reserved_2; + int reserved_3; + char *psource; +} id; + +int thunk(int gtid, int** pshareds) { + int t = **pshareds; + int th = omp_get_thread_num(); + #pragma omp atomic + ++checker; + printf("task __%d, th %d\n", t, th); + if (checker != 1) { + err++; + printf("Error1, checker %d != 1\n", checker); + } + mysleep(DELAY); + if (checker != 1) { + err++; + printf("Error2, checker %d != 1\n", checker); + } + #pragma omp atomic + --checker; + return 0; +} + +#ifdef __cplusplus +extern "C" { +#endif +int __kmpc_global_thread_num(id*); +extern int** __kmpc_omp_task_alloc(id *loc, int gtid, int flags, + size_t sz, size_t shar, entry_t rtn); +int +__kmpc_omp_task_with_deps(id *loc, int gtid, int **task, int nd, dep *dep_lst, + int nd_noalias, dep *noalias_dep_lst); +static id loc = {0, 2, 0, 0, ";file;func;0;0;;"}; +#ifdef __cplusplus +} // extern "C" +#endif +// End of internal data +// --------------------------------------------------------------------------- + +int main() +{ + int i1,i2,i3,i4; + omp_set_num_threads(2); + #pragma omp parallel + { + #pragma omp single nowait + { + dep sdep[2]; + int **ptr; + int gtid = __kmpc_global_thread_num(&loc); + int t = omp_get_thread_num(); + #pragma omp task depend(in: i1, i2) + { int th = omp_get_thread_num(); + printf("task 0_%d, th %d\n", t, th); + mysleep(DELAY); } + #pragma omp task depend(in: i1, i3) + { int th = omp_get_thread_num(); + printf("task 1_%d, th %d\n", t, th); + mysleep(DELAY); } + #pragma omp task depend(in: i2) depend(out: i1) + { int th = omp_get_thread_num(); + printf("task 2_%d, th %d\n", t, th); + mysleep(DELAY); } + #pragma omp task depend(in: i1) + { int th = omp_get_thread_num(); + printf("task 3_%d, th %d\n", t, th); + mysleep(DELAY); } + #pragma omp task depend(out: i2) + { int th = omp_get_thread_num(); + printf("task 4_%d, th %d\n", t, th); + mysleep(DELAY+5); } // wait a bit longer than task 3 + #pragma omp task depend(out: i3) + { int th = omp_get_thread_num(); + printf("task 5_%d, th %d\n", t, th); + mysleep(DELAY); } +// compiler codegen start + // task1 + ptr = __kmpc_omp_task_alloc(&loc, gtid, 0, 28, 16, thunk); + sdep[0].addr = (size_t)&i1; + sdep[0].len = 0; // not used + sdep[0].flags = 4; // mx + sdep[1].addr = (size_t)&i4; + sdep[1].len = 0; // not used + sdep[1].flags = 4; // mx + **ptr = t + 10; // init single shared variable + __kmpc_omp_task_with_deps(&loc, gtid, ptr, 2, sdep, 0, 0); + + // task2 + ptr = __kmpc_omp_task_alloc(&loc, gtid, 0, 28, 16, thunk); + // reverse pointers - library should sort them uniquely + sdep[0].addr = (size_t)&i4; + sdep[1].addr = (size_t)&i1; + **ptr = t + 20; // init single shared variable + __kmpc_omp_task_with_deps(&loc, gtid, ptr, 2, sdep, 0, 0); +// compiler codegen end + #pragma omp task depend(in: i1) + { int th = omp_get_thread_num(); + printf("task 8_%d, th %d\n", t, th); + mysleep(DELAY); } + } // single + } // parallel + if (err == 0) { + printf("passed\n"); + return 0; + } else { + printf("failed\n"); + return 1; + } +}