From a0e159f7aa6c521940b170984e146b468840d7c8 Mon Sep 17 00:00:00 2001 From: Jonathan Peyton Date: Thu, 8 Oct 2015 18:23:38 +0000 Subject: [PATCH] OpenMP Wait/release improvements. These changes improve the wait/release mechanism for threads spinning in barriers that are handling tasks while spinnin by providing feedback to the barriers about any task stealing that occurs. Differential Revision: http://reviews.llvm.org/D13353 llvm-svn: 249711 --- openmp/runtime/src/kmp_barrier.cpp | 7 +-- openmp/runtime/src/kmp_os.h | 4 +- openmp/runtime/src/kmp_runtime.c | 11 ++--- openmp/runtime/src/kmp_tasking.c | 9 ++-- openmp/runtime/src/kmp_wait_release.h | 61 +++++++++++++++------------ 5 files changed, 51 insertions(+), 41 deletions(-) diff --git a/openmp/runtime/src/kmp_barrier.cpp b/openmp/runtime/src/kmp_barrier.cpp index 237f2984f2cc..997c21f39192 100644 --- a/openmp/runtime/src/kmp_barrier.cpp +++ b/openmp/runtime/src/kmp_barrier.cpp @@ -17,6 +17,8 @@ #include "kmp_wait_release.h" #include "kmp_stats.h" #include "kmp_itt.h" +#include "kmp_os.h" + #if KMP_MIC #include @@ -886,8 +888,7 @@ __kmp_hierarchical_barrier_release(enum barrier_type bt, kmp_info_t *this_thr, i kmp_flag_oncore flag(&thr_bar->parent_bar->b_go, KMP_BARRIER_STATE_BUMP, thr_bar->offset, bt, this_thr USE_ITT_BUILD_ARG(itt_sync_obj) ); - flag.wait(this_thr, TRUE - USE_ITT_BUILD_ARG(itt_sync_obj) ); + flag.wait(this_thr, TRUE); if (thr_bar->wait_flag == KMP_BARRIER_SWITCHING) { // Thread was switched to own b_go TCW_8(thr_bar->b_go, KMP_INIT_BARRIER_STATE); // Reset my b_go flag for next time } @@ -909,6 +910,7 @@ __kmp_hierarchical_barrier_release(enum barrier_type bt, kmp_info_t *this_thr, i KMP_MB(); // Flush all pending memory write invalidates. } + nproc = this_thr->th.th_team_nproc; int level = team->t.t_level; #if OMP_40_ENABLED if (team->t.t_threads[0]->th.th_teams_microtask ) { // are we inside the teams construct? @@ -920,7 +922,6 @@ __kmp_hierarchical_barrier_release(enum barrier_type bt, kmp_info_t *this_thr, i #endif if (level == 1) thr_bar->use_oncore_barrier = 1; else thr_bar->use_oncore_barrier = 0; // Do not use oncore barrier when nested - nproc = this_thr->th.th_team_nproc; // If the team size has increased, we still communicate with old leaves via oncore barrier. unsigned short int old_leaf_kids = thr_bar->leaf_kids; diff --git a/openmp/runtime/src/kmp_os.h b/openmp/runtime/src/kmp_os.h index 0ad099623b9b..54416bc3c5be 100644 --- a/openmp/runtime/src/kmp_os.h +++ b/openmp/runtime/src/kmp_os.h @@ -397,7 +397,7 @@ extern kmp_real64 __kmp_xchg_real64( volatile kmp_real64 *p, kmp_real64 v ); //# define KMP_COMPARE_AND_STORE_RET32(p, cv, sv) __kmp_compare_and_store_ret32( (p), (cv), (sv) ) # define KMP_COMPARE_AND_STORE_RET64(p, cv, sv) __kmp_compare_and_store_ret64( (p), (cv), (sv) ) -# define KMP_XCHG_FIXED8(p, v) __kmp_xchg_fixed8( (p), (v) ); +# define KMP_XCHG_FIXED8(p, v) __kmp_xchg_fixed8( (volatile kmp_int8*)(p), (kmp_int8)(v) ); # define KMP_XCHG_FIXED16(p, v) __kmp_xchg_fixed16( (p), (v) ); //# define KMP_XCHG_FIXED32(p, v) __kmp_xchg_fixed32( (p), (v) ); //# define KMP_XCHG_FIXED64(p, v) __kmp_xchg_fixed64( (p), (v) ); @@ -534,7 +534,7 @@ extern kmp_real64 __kmp_xchg_real64( volatile kmp_real64 *p, kmp_real64 v ); # define KMP_COMPARE_AND_STORE_RET32(p, cv, sv) __kmp_compare_and_store_ret32( (p), (cv), (sv) ) # define KMP_COMPARE_AND_STORE_RET64(p, cv, sv) __kmp_compare_and_store_ret64( (p), (cv), (sv) ) -# define KMP_XCHG_FIXED8(p, v) __kmp_xchg_fixed8( (p), (v) ); +# define KMP_XCHG_FIXED8(p, v) __kmp_xchg_fixed8( (volatile kmp_int8*)(p), (kmp_int8)(v) ); # define KMP_XCHG_FIXED16(p, v) __kmp_xchg_fixed16( (p), (v) ); # define KMP_XCHG_FIXED32(p, v) __kmp_xchg_fixed32( (p), (v) ); # define KMP_XCHG_FIXED64(p, v) __kmp_xchg_fixed64( (p), (v) ); diff --git a/openmp/runtime/src/kmp_runtime.c b/openmp/runtime/src/kmp_runtime.c index e5619d17c56f..b1d5e06bfd13 100644 --- a/openmp/runtime/src/kmp_runtime.c +++ b/openmp/runtime/src/kmp_runtime.c @@ -5080,14 +5080,15 @@ __kmp_allocate_team( kmp_root_t *root, int new_nproc, int max_nproc, __kmp_initialize_team( team, new_nproc, new_icvs, root->r.r_uber_thread->th.th_ident ); if ( __kmp_tasking_mode != tskm_immediate_exec ) { + // Signal the worker threads to stop looking for tasks while spin waiting. + // The task teams are reference counted and will be deallocated by the last worker thread. int tt_idx; for (tt_idx=0; tt_idx<2; ++tt_idx) { + // We don't know which of the two task teams workers are waiting on, so deactivate both. kmp_task_team_t *task_team = team->t.t_task_team[tt_idx]; - if ( task_team != NULL ) { - KMP_DEBUG_ASSERT( ! TCR_4(task_team->tt.tt_found_tasks) ); - task_team->tt.tt_nproc = new_nproc; - task_team->tt.tt_unfinished_threads = new_nproc; - task_team->tt.tt_ref_ct = new_nproc - 1; + if ( (task_team != NULL) && TCR_SYNC_4(task_team->tt.tt_active) ) { + TCW_SYNC_4( task_team->tt.tt_active, FALSE ); + team->t.t_task_team[tt_idx] = NULL; } } } diff --git a/openmp/runtime/src/kmp_tasking.c b/openmp/runtime/src/kmp_tasking.c index abbfe3e2e765..20ac242e3a73 100644 --- a/openmp/runtime/src/kmp_tasking.c +++ b/openmp/runtime/src/kmp_tasking.c @@ -38,6 +38,7 @@ static void __kmp_bottom_half_finish_proxy( kmp_int32 gtid, kmp_task_t * ptask ) #endif static inline void __kmp_null_resume_wrapper(int gtid, volatile void *flag) { + if (!flag) return; switch (((kmp_flag_64 *)flag)->get_type()) { case flag32: __kmp_resume_32(gtid, NULL); break; case flag64: __kmp_resume_64(gtid, NULL); break; @@ -2538,11 +2539,9 @@ __kmp_task_team_setup( kmp_info_t *this_thr, kmp_team_t *team, int both, int alw __kmp_gtid_from_thread(this_thr), team->t.t_task_team[this_thr->th.th_task_state], ((team != NULL) ? team->t.t_id : -1)) ); } - //else - // All threads have reported in, and no tasks were spawned - // for this release->gather region. Leave the old task - // team struct in place for the upcoming region. No task - // teams are formed for serialized teams. + // else: Either all threads have reported in, and no tasks were spawned for this release->gather region + // Leave the old task team struct in place for the upcoming region. + // No task teams are formed for serialized teams. if (both) { int other_team = 1 - this_thr->th.th_task_state; if ( ( team->t.t_task_team[other_team] == NULL ) && ( team->t.t_nproc > 1 ) ) { // setup other team as well diff --git a/openmp/runtime/src/kmp_wait_release.h b/openmp/runtime/src/kmp_wait_release.h index 261df2762f64..97949efdd5c9 100644 --- a/openmp/runtime/src/kmp_wait_release.h +++ b/openmp/runtime/src/kmp_wait_release.h @@ -55,6 +55,10 @@ class kmp_flag { * @result the pointer to the actual flag */ volatile P * get() { return loc; } + /*! + * @param new_loc in set loc to point at new_loc + */ + void set(volatile P *new_loc) { loc = new_loc; } /*! * @result the flag_type */ @@ -67,10 +71,15 @@ class kmp_flag { bool done_check_val(P old_loc); bool notdone_check(); P internal_release(); + void suspend(int th_gtid); + void resume(int th_gtid); P set_sleeping(); P unset_sleeping(); bool is_sleeping(); + bool is_any_sleeping(); bool is_sleeping_val(P old_loc); + int execute_tasks(kmp_info_t *this_thr, kmp_int32 gtid, int final_spin, int *thread_finished + USE_ITT_BUILD_ARG(void * itt_sync_obj), kmp_int32 is_constrained); */ }; @@ -281,26 +290,24 @@ static inline void __kmp_release_template(C *flag) KF_TRACE(20, ("__kmp_release: T#%d releasing T#%d spin(%p)\n", gtid, target_gtid, flag->get())); KMP_DEBUG_ASSERT(flag->get()); KMP_FSYNC_RELEASING(flag->get()); - - typename C::flag_t old_spin = flag->internal_release(); - - KF_TRACE(100, ("__kmp_release: T#%d old spin(%p)=%d, set new spin=%d\n", - gtid, flag->get(), old_spin, *(flag->get()))); - + + flag->internal_release(); + + KF_TRACE(100, ("__kmp_release: T#%d set new spin=%d\n", gtid, flag->get(), *(flag->get()))); + if (__kmp_dflt_blocktime != KMP_MAX_BLOCKTIME) { // Only need to check sleep stuff if infinite block time not set - if (flag->is_sleeping_val(old_spin)) { + if (flag->is_any_sleeping()) { // Are *any* of the threads that wait on this flag sleeping? for (unsigned int i=0; iget_num_waiters(); ++i) { - kmp_info_t * waiter = flag->get_waiter(i); - int wait_gtid = waiter->th.th_info.ds.ds_gtid; - // Wake up thread if needed - KF_TRACE(50, ("__kmp_release: T#%d waking up thread T#%d since sleep spin(%p) set\n", - gtid, wait_gtid, flag->get())); - flag->resume(wait_gtid); + kmp_info_t * waiter = flag->get_waiter(i); // if a sleeping waiter exists at i, sets current_waiter to i inside the flag + if (waiter) { + int wait_gtid = waiter->th.th_info.ds.ds_gtid; + // Wake up thread if needed + KF_TRACE(50, ("__kmp_release: T#%d waking up thread T#%d since sleep flag(%p) set\n", + gtid, wait_gtid, flag->get())); + flag->resume(wait_gtid); // unsets flag's current_waiter when done + } } - } else { - KF_TRACE(50, ("__kmp_release: T#%d don't wake up thread T#%d since sleep spin(%p) not set\n", - gtid, target_gtid, flag->get())); } } } @@ -382,8 +389,8 @@ public: * @result Actual flag value before release was applied. * Trigger all waiting threads to run by modifying flag to release state. */ - FlagType internal_release() { - return traits_type::test_then_add4((volatile FlagType *)this->get()); + void internal_release() { + (void) traits_type::test_then_add4((volatile FlagType *)this->get()); } /*! * @result Actual flag value before sleep bit(s) set. @@ -408,6 +415,9 @@ public: * Test whether there are threads sleeping on the flag. */ bool is_sleeping() { return is_sleeping_val(*(this->get())); } + bool is_any_sleeping() { return is_sleeping_val(*(this->get())); } + kmp_uint8 *get_stolen() { return NULL; } + enum barrier_type get_bt() { return bs_last_barrier; } }; class kmp_flag_32 : public kmp_basic_flag { @@ -508,18 +518,15 @@ public: } return false; } - kmp_uint64 internal_release() { - kmp_uint64 old_val; + void internal_release() { if (__kmp_dflt_blocktime == KMP_MAX_BLOCKTIME) { - old_val = *get(); byteref(get(),offset) = 1; } else { kmp_uint64 mask=0; byteref(&mask,offset) = 1; - old_val = KMP_TEST_THEN_OR64((volatile kmp_int64 *)get(), mask); + (void) KMP_TEST_THEN_OR64((volatile kmp_int64 *)get(), mask); } - return old_val; } kmp_uint64 set_sleeping() { return KMP_TEST_THEN_OR64((kmp_int64 volatile *)get(), KMP_BARRIER_SLEEP_STATE); @@ -529,9 +536,9 @@ public: } bool is_sleeping_val(kmp_uint64 old_loc) { return old_loc & KMP_BARRIER_SLEEP_STATE; } bool is_sleeping() { return is_sleeping_val(*get()); } - void wait(kmp_info_t *this_thr, int final_spin - USE_ITT_BUILD_ARG(void * itt_sync_obj)) { - __kmp_wait_template(this_thr, this, final_spin + bool is_any_sleeping() { return is_sleeping_val(*get()); } + void wait(kmp_info_t *this_thr, int final_spin) { + __kmp_wait_template(this_thr, this, final_spin USE_ITT_BUILD_ARG(itt_sync_obj)); } void release() { __kmp_release_template(this); } @@ -542,6 +549,8 @@ public: return __kmp_execute_tasks_oncore(this_thr, gtid, this, final_spin, thread_finished USE_ITT_BUILD_ARG(itt_sync_obj), is_constrained); } + kmp_uint8 *get_stolen() { return NULL; } + enum barrier_type get_bt() { return bt; } }; /*!