completely refactor how we manage blocking and unblocking threads

This commit is contained in:
Ralf Jung 2024-05-26 15:18:59 +02:00
parent 5fa30f7eaa
commit a131243557
18 changed files with 832 additions and 895 deletions

View File

@ -169,7 +169,7 @@ trait EvalContextExtPriv<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
size,
align,
memory_kind,
ecx.get_active_thread(),
ecx.active_thread(),
) {
if let Some(clock) = clock {
ecx.acquire_clock(&clock);
@ -367,7 +367,7 @@ impl<'mir, 'tcx> MiriMachine<'mir, 'tcx> {
// `alloc_id_from_addr` any more.
global_state.exposed.remove(&dead_id);
// Also remember this address for future reuse.
let thread = self.threads.get_active_thread_id();
let thread = self.threads.active_thread();
global_state.reuse.add_addr(rng, addr, size, align, kind, thread, || {
if let Some(data_race) = &self.data_race {
data_race.release_clock(&self.threads).clone()

View File

@ -839,7 +839,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriInterpCxExt<'mir, 'tcx> {
fn acquire_clock(&self, clock: &VClock) {
let this = self.eval_context_ref();
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(clock, this.get_active_thread());
data_race.acquire_clock(clock, &this.machine.threads);
}
}
}
@ -1662,13 +1662,14 @@ impl GlobalState {
/// This should be called strictly before any calls to
/// `thread_joined`.
#[inline]
pub fn thread_terminated(&mut self, thread_mgr: &ThreadManager<'_, '_>, current_span: Span) {
pub fn thread_terminated(&mut self, thread_mgr: &ThreadManager<'_, '_>) {
let current_index = self.active_thread_index(thread_mgr);
// Increment the clock to a unique termination timestamp.
let vector_clocks = self.vector_clocks.get_mut();
let current_clocks = &mut vector_clocks[current_index];
current_clocks.increment_clock(current_index, current_span);
current_clocks
.increment_clock(current_index, thread_mgr.active_thread_ref().current_span());
// Load the current thread id for the executing vector.
let vector_info = self.vector_info.get_mut();
@ -1722,11 +1723,12 @@ impl GlobalState {
format!("thread `{thread_name}`")
}
/// Acquire the given clock into the given thread, establishing synchronization with
/// Acquire the given clock into the current thread, establishing synchronization with
/// the moment when that clock snapshot was taken via `release_clock`.
/// As this is an acquire operation, the thread timestamp is not
/// incremented.
pub fn acquire_clock(&self, clock: &VClock, thread: ThreadId) {
pub fn acquire_clock<'mir, 'tcx>(&self, clock: &VClock, threads: &ThreadManager<'mir, 'tcx>) {
let thread = threads.active_thread();
let (_, mut clocks) = self.thread_state_mut(thread);
clocks.clock.join(clock);
}
@ -1738,7 +1740,7 @@ impl GlobalState {
&self,
threads: &ThreadManager<'mir, 'tcx>,
) -> Ref<'_, VClock> {
let thread = threads.get_active_thread_id();
let thread = threads.active_thread();
let span = threads.active_thread_ref().current_span();
// We increment the clock each time this happens, to ensure no two releases
// can be confused with each other.
@ -1782,7 +1784,7 @@ impl GlobalState {
&self,
thread_mgr: &ThreadManager<'_, '_>,
) -> (VectorIdx, Ref<'_, ThreadClockSet>) {
self.thread_state(thread_mgr.get_active_thread_id())
self.thread_state(thread_mgr.active_thread())
}
/// Load the current vector clock in use and the current set of thread clocks
@ -1792,14 +1794,14 @@ impl GlobalState {
&self,
thread_mgr: &ThreadManager<'_, '_>,
) -> (VectorIdx, RefMut<'_, ThreadClockSet>) {
self.thread_state_mut(thread_mgr.get_active_thread_id())
self.thread_state_mut(thread_mgr.active_thread())
}
/// Return the current thread, should be the same
/// as the data-race active thread.
#[inline]
fn active_thread_index(&self, thread_mgr: &ThreadManager<'_, '_>) -> VectorIdx {
let active_thread_id = thread_mgr.get_active_thread_id();
let active_thread_id = thread_mgr.active_thread();
self.thread_index(active_thread_id)
}

View File

@ -4,29 +4,11 @@ use rustc_index::Idx;
use rustc_middle::ty::layout::TyAndLayout;
use super::sync::EvalContextExtPriv as _;
use super::thread::MachineCallback;
use super::vector_clock::VClock;
use crate::*;
declare_id!(InitOnceId);
/// A thread waiting on an InitOnce object.
struct InitOnceWaiter<'mir, 'tcx> {
/// The thread that is waiting.
thread: ThreadId,
/// The callback that should be executed, after the thread has been woken up.
callback: Box<dyn MachineCallback<'mir, 'tcx> + 'tcx>,
}
impl<'mir, 'tcx> std::fmt::Debug for InitOnceWaiter<'mir, 'tcx> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InitOnce")
.field("thread", &self.thread)
.field("callback", &"dyn MachineCallback")
.finish()
}
}
#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
/// The current status of a one time initialization.
pub enum InitOnceStatus {
@ -38,68 +20,14 @@ pub enum InitOnceStatus {
/// The one time initialization state.
#[derive(Default, Debug)]
pub(super) struct InitOnce<'mir, 'tcx> {
pub(super) struct InitOnce {
status: InitOnceStatus,
waiters: VecDeque<InitOnceWaiter<'mir, 'tcx>>,
waiters: VecDeque<ThreadId>,
clock: VClock,
}
impl<'mir, 'tcx> VisitProvenance for InitOnce<'mir, 'tcx> {
fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
for waiter in self.waiters.iter() {
waiter.callback.visit_provenance(visit);
}
}
}
impl<'mir, 'tcx: 'mir> EvalContextExtPriv<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
trait EvalContextExtPriv<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
/// Synchronize with the previous initialization attempt of an InitOnce.
#[inline]
fn init_once_observe_attempt(&mut self, id: InitOnceId) {
let this = self.eval_context_mut();
let current_thread = this.get_active_thread();
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(&this.machine.sync.init_onces[id].clock, current_thread);
}
}
#[inline]
fn init_once_wake_waiter(
&mut self,
id: InitOnceId,
waiter: InitOnceWaiter<'mir, 'tcx>,
) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
let current_thread = this.get_active_thread();
this.unblock_thread(waiter.thread, BlockReason::InitOnce(id));
// Call callback, with the woken-up thread as `current`.
this.set_active_thread(waiter.thread);
this.init_once_observe_attempt(id);
waiter.callback.call(this)?;
this.set_active_thread(current_thread);
Ok(())
}
}
impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
fn init_once_get_or_create_id(
&mut self,
lock_op: &OpTy<'tcx, Provenance>,
lock_layout: TyAndLayout<'tcx>,
offset: u64,
) -> InterpResult<'tcx, InitOnceId> {
let this = self.eval_context_mut();
this.init_once_get_or_create(|ecx, next_id| {
ecx.get_or_create_id(next_id, lock_op, lock_layout, offset)
})
}
/// Provides the closure with the next InitOnceId. Creates that InitOnce if the closure returns None,
/// otherwise returns the value from the closure.
#[inline]
@ -120,6 +48,21 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
Ok(new_index)
}
}
}
impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
fn init_once_get_or_create_id(
&mut self,
lock_op: &OpTy<'tcx, Provenance>,
lock_layout: TyAndLayout<'tcx>,
offset: u64,
) -> InterpResult<'tcx, InitOnceId> {
let this = self.eval_context_mut();
this.init_once_get_or_create(|ecx, next_id| {
ecx.get_or_create_id(next_id, lock_op, lock_layout, offset)
})
}
#[inline]
fn init_once_status(&mut self, id: InitOnceId) -> InitOnceStatus {
@ -132,14 +75,14 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
fn init_once_enqueue_and_block(
&mut self,
id: InitOnceId,
thread: ThreadId,
callback: Box<dyn MachineCallback<'mir, 'tcx> + 'tcx>,
callback: impl UnblockCallback<'mir, 'tcx> + 'tcx,
) {
let this = self.eval_context_mut();
let thread = this.active_thread();
let init_once = &mut this.machine.sync.init_onces[id];
assert_ne!(init_once.status, InitOnceStatus::Complete, "queueing on complete init once");
init_once.waiters.push_back(InitOnceWaiter { thread, callback });
this.block_thread(thread, BlockReason::InitOnce(id));
init_once.waiters.push_back(thread);
this.block_thread(BlockReason::InitOnce(id), None, callback);
}
/// Begin initializing this InitOnce. Must only be called after checking that it is currently
@ -177,7 +120,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
// Wake up everyone.
// need to take the queue to avoid having `this` be borrowed multiple times
for waiter in std::mem::take(&mut init_once.waiters) {
this.init_once_wake_waiter(id, waiter)?;
this.unblock_thread(waiter, BlockReason::InitOnce(id))?;
}
Ok(())
@ -192,6 +135,8 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
InitOnceStatus::Begun,
"failing already completed or uninit init once"
);
// This is again uninitialized.
init_once.status = InitOnceStatus::Uninitialized;
// Each complete happens-before the end of the wait
if let Some(data_race) = &this.machine.data_race {
@ -200,10 +145,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
// Wake up one waiting thread, so they can go ahead and try to init this.
if let Some(waiter) = init_once.waiters.pop_front() {
this.init_once_wake_waiter(id, waiter)?;
} else {
// Nobody there to take this, so go back to 'uninit'
init_once.status = InitOnceStatus::Uninitialized;
this.unblock_thread(waiter, BlockReason::InitOnce(id))?;
}
Ok(())
@ -221,6 +163,6 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
"observing the completion of incomplete init once"
);
this.init_once_observe_attempt(id);
this.acquire_clock(&this.machine.sync.init_onces[id].clock);
}
}

View File

@ -111,19 +111,10 @@ struct RwLock {
declare_id!(CondvarId);
/// A thread waiting on a conditional variable.
#[derive(Debug)]
struct CondvarWaiter {
/// The thread that is waiting on this variable.
thread: ThreadId,
/// The mutex on which the thread is waiting.
lock: MutexId,
}
/// The conditional variable state.
#[derive(Default, Debug)]
struct Condvar {
waiters: VecDeque<CondvarWaiter>,
waiters: VecDeque<ThreadId>,
/// Tracks the happens-before relationship
/// between a cond-var signal and a cond-var
/// wait during a non-spurious signal event.
@ -155,20 +146,12 @@ struct FutexWaiter {
/// The state of all synchronization objects.
#[derive(Default, Debug)]
pub struct SynchronizationObjects<'mir, 'tcx> {
pub struct SynchronizationObjects {
mutexes: IndexVec<MutexId, Mutex>,
rwlocks: IndexVec<RwLockId, RwLock>,
condvars: IndexVec<CondvarId, Condvar>,
futexes: FxHashMap<u64, Futex>,
pub(super) init_onces: IndexVec<InitOnceId, InitOnce<'mir, 'tcx>>,
}
impl<'mir, 'tcx> VisitProvenance for SynchronizationObjects<'mir, 'tcx> {
fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
for init_once in self.init_onces.iter() {
init_once.visit_provenance(visit);
}
}
pub(super) init_onces: IndexVec<InitOnceId, InitOnce>,
}
// Private extension trait for local helper methods
@ -210,45 +193,69 @@ pub(super) trait EvalContextExtPriv<'mir, 'tcx: 'mir>:
})
}
/// Take a reader out of the queue waiting for the lock.
/// Returns `true` if some thread got the rwlock.
/// Provides the closure with the next MutexId. Creates that mutex if the closure returns None,
/// otherwise returns the value from the closure.
#[inline]
fn rwlock_dequeue_and_lock_reader(&mut self, id: RwLockId) -> bool {
fn mutex_get_or_create<F>(&mut self, existing: F) -> InterpResult<'tcx, MutexId>
where
F: FnOnce(&mut MiriInterpCx<'mir, 'tcx>, MutexId) -> InterpResult<'tcx, Option<MutexId>>,
{
let this = self.eval_context_mut();
if let Some(reader) = this.machine.sync.rwlocks[id].reader_queue.pop_front() {
this.unblock_thread(reader, BlockReason::RwLock(id));
this.rwlock_reader_lock(id, reader);
true
let next_index = this.machine.sync.mutexes.next_index();
if let Some(old) = existing(this, next_index)? {
if this.machine.sync.mutexes.get(old).is_none() {
throw_ub_format!("mutex has invalid ID");
}
Ok(old)
} else {
false
let new_index = this.machine.sync.mutexes.push(Default::default());
assert_eq!(next_index, new_index);
Ok(new_index)
}
}
/// Take the writer out of the queue waiting for the lock.
/// Returns `true` if some thread got the rwlock.
/// Provides the closure with the next RwLockId. Creates that RwLock if the closure returns None,
/// otherwise returns the value from the closure.
#[inline]
fn rwlock_dequeue_and_lock_writer(&mut self, id: RwLockId) -> bool {
fn rwlock_get_or_create<F>(&mut self, existing: F) -> InterpResult<'tcx, RwLockId>
where
F: FnOnce(&mut MiriInterpCx<'mir, 'tcx>, RwLockId) -> InterpResult<'tcx, Option<RwLockId>>,
{
let this = self.eval_context_mut();
if let Some(writer) = this.machine.sync.rwlocks[id].writer_queue.pop_front() {
this.unblock_thread(writer, BlockReason::RwLock(id));
this.rwlock_writer_lock(id, writer);
true
let next_index = this.machine.sync.rwlocks.next_index();
if let Some(old) = existing(this, next_index)? {
if this.machine.sync.rwlocks.get(old).is_none() {
throw_ub_format!("rwlock has invalid ID");
}
Ok(old)
} else {
false
let new_index = this.machine.sync.rwlocks.push(Default::default());
assert_eq!(next_index, new_index);
Ok(new_index)
}
}
/// Take a thread out of the queue waiting for the mutex, and lock
/// the mutex for it. Returns `true` if some thread has the mutex now.
/// Provides the closure with the next CondvarId. Creates that Condvar if the closure returns None,
/// otherwise returns the value from the closure.
#[inline]
fn mutex_dequeue_and_lock(&mut self, id: MutexId) -> bool {
fn condvar_get_or_create<F>(&mut self, existing: F) -> InterpResult<'tcx, CondvarId>
where
F: FnOnce(
&mut MiriInterpCx<'mir, 'tcx>,
CondvarId,
) -> InterpResult<'tcx, Option<CondvarId>>,
{
let this = self.eval_context_mut();
if let Some(thread) = this.machine.sync.mutexes[id].queue.pop_front() {
this.unblock_thread(thread, BlockReason::Mutex(id));
this.mutex_lock(id, thread);
true
let next_index = this.machine.sync.condvars.next_index();
if let Some(old) = existing(this, next_index)? {
if this.machine.sync.condvars.get(old).is_none() {
throw_ub_format!("condvar has invalid ID");
}
Ok(old)
} else {
false
let new_index = this.machine.sync.condvars.push(Default::default());
assert_eq!(next_index, new_index);
Ok(new_index)
}
}
}
@ -295,27 +302,6 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
})
}
#[inline]
/// Provides the closure with the next MutexId. Creates that mutex if the closure returns None,
/// otherwise returns the value from the closure
fn mutex_get_or_create<F>(&mut self, existing: F) -> InterpResult<'tcx, MutexId>
where
F: FnOnce(&mut MiriInterpCx<'mir, 'tcx>, MutexId) -> InterpResult<'tcx, Option<MutexId>>,
{
let this = self.eval_context_mut();
let next_index = this.machine.sync.mutexes.next_index();
if let Some(old) = existing(this, next_index)? {
if this.machine.sync.mutexes.get(old).is_none() {
throw_ub_format!("mutex has invalid ID");
}
Ok(old)
} else {
let new_index = this.machine.sync.mutexes.push(Default::default());
assert_eq!(next_index, new_index);
Ok(new_index)
}
}
#[inline]
/// Get the id of the thread that currently owns this lock.
fn mutex_get_owner(&mut self, id: MutexId) -> ThreadId {
@ -331,8 +317,9 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
}
/// Lock by setting the mutex owner and increasing the lock count.
fn mutex_lock(&mut self, id: MutexId, thread: ThreadId) {
fn mutex_lock(&mut self, id: MutexId) {
let this = self.eval_context_mut();
let thread = this.active_thread();
let mutex = &mut this.machine.sync.mutexes[id];
if let Some(current_owner) = mutex.owner {
assert_eq!(thread, current_owner, "mutex already locked by another thread");
@ -345,7 +332,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
}
mutex.lock_count = mutex.lock_count.checked_add(1).unwrap();
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(&mutex.clock, thread);
data_race.acquire_clock(&mutex.clock, &this.machine.threads);
}
}
@ -353,14 +340,14 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
/// count. If the lock count reaches 0, release the lock and potentially
/// give to a new owner. If the lock was not locked by the current thread,
/// return `None`.
fn mutex_unlock(&mut self, id: MutexId) -> Option<usize> {
fn mutex_unlock(&mut self, id: MutexId) -> InterpResult<'tcx, Option<usize>> {
let this = self.eval_context_mut();
let mutex = &mut this.machine.sync.mutexes[id];
if let Some(current_owner) = mutex.owner {
Ok(if let Some(current_owner) = mutex.owner {
// Mutex is locked.
if current_owner != this.machine.threads.get_active_thread_id() {
if current_owner != this.machine.threads.active_thread() {
// Only the owner can unlock the mutex.
return None;
return Ok(None);
}
let old_lock_count = mutex.lock_count;
mutex.lock_count = old_lock_count
@ -373,42 +360,52 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
if let Some(data_race) = &this.machine.data_race {
mutex.clock.clone_from(&data_race.release_clock(&this.machine.threads));
}
this.mutex_dequeue_and_lock(id);
if let Some(thread) = this.machine.sync.mutexes[id].queue.pop_front() {
this.unblock_thread(thread, BlockReason::Mutex(id))?;
}
}
Some(old_lock_count)
} else {
// Mutex is not locked.
None
}
})
}
/// Put the thread into the queue waiting for the mutex.
/// Once the Mutex becomes available, `retval` will be written to `dest`.
#[inline]
fn mutex_enqueue_and_block(&mut self, id: MutexId, thread: ThreadId) {
fn mutex_enqueue_and_block(
&mut self,
id: MutexId,
retval: Scalar<Provenance>,
dest: MPlaceTy<'tcx, Provenance>,
) {
let this = self.eval_context_mut();
assert!(this.mutex_is_locked(id), "queing on unlocked mutex");
let thread = this.active_thread();
this.machine.sync.mutexes[id].queue.push_back(thread);
this.block_thread(thread, BlockReason::Mutex(id));
}
this.block_thread(BlockReason::Mutex(id), None, Callback { id, retval, dest });
/// Provides the closure with the next RwLockId. Creates that RwLock if the closure returns None,
/// otherwise returns the value from the closure
#[inline]
fn rwlock_get_or_create<F>(&mut self, existing: F) -> InterpResult<'tcx, RwLockId>
where
F: FnOnce(&mut MiriInterpCx<'mir, 'tcx>, RwLockId) -> InterpResult<'tcx, Option<RwLockId>>,
{
let this = self.eval_context_mut();
let next_index = this.machine.sync.rwlocks.next_index();
if let Some(old) = existing(this, next_index)? {
if this.machine.sync.rwlocks.get(old).is_none() {
throw_ub_format!("rwlock has invalid ID");
struct Callback<'tcx> {
id: MutexId,
retval: Scalar<Provenance>,
dest: MPlaceTy<'tcx, Provenance>,
}
impl<'tcx> VisitProvenance for Callback<'tcx> {
fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
let Callback { id: _, retval, dest } = self;
retval.visit_provenance(visit);
dest.visit_provenance(visit);
}
}
impl<'mir, 'tcx: 'mir> UnblockCallback<'mir, 'tcx> for Callback<'tcx> {
fn unblock(self: Box<Self>, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
assert!(!this.mutex_is_locked(self.id));
this.mutex_lock(self.id);
this.write_scalar(self.retval, &self.dest)?;
Ok(())
}
Ok(old)
} else {
let new_index = this.machine.sync.rwlocks.push(Default::default());
assert_eq!(next_index, new_index);
Ok(new_index)
}
}
@ -437,23 +434,24 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
/// Read-lock the lock by adding the `reader` the list of threads that own
/// this lock.
fn rwlock_reader_lock(&mut self, id: RwLockId, reader: ThreadId) {
fn rwlock_reader_lock(&mut self, id: RwLockId) {
let this = self.eval_context_mut();
let thread = this.active_thread();
assert!(!this.rwlock_is_write_locked(id), "the lock is write locked");
trace!("rwlock_reader_lock: {:?} now also held (one more time) by {:?}", id, reader);
trace!("rwlock_reader_lock: {:?} now also held (one more time) by {:?}", id, thread);
let rwlock = &mut this.machine.sync.rwlocks[id];
let count = rwlock.readers.entry(reader).or_insert(0);
let count = rwlock.readers.entry(thread).or_insert(0);
*count = count.checked_add(1).expect("the reader counter overflowed");
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(&rwlock.clock_unlocked, reader);
data_race.acquire_clock(&rwlock.clock_unlocked, &this.machine.threads);
}
}
/// Try read-unlock the lock for the current threads and potentially give the lock to a new owner.
/// Returns `true` if succeeded, `false` if this `reader` did not hold the lock.
fn rwlock_reader_unlock(&mut self, id: RwLockId) -> bool {
fn rwlock_reader_unlock(&mut self, id: RwLockId) -> InterpResult<'tcx, bool> {
let this = self.eval_context_mut();
let thread = this.get_active_thread();
let thread = this.active_thread();
let rwlock = &mut this.machine.sync.rwlocks[id];
match rwlock.readers.entry(thread) {
Entry::Occupied(mut entry) => {
@ -467,7 +465,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
trace!("rwlock_reader_unlock: {:?} held one less time by {:?}", id, thread);
}
}
Entry::Vacant(_) => return false, // we did not even own this lock
Entry::Vacant(_) => return Ok(false), // we did not even own this lock
}
if let Some(data_race) = &this.machine.data_race {
// Add this to the shared-release clock of all concurrent readers.
@ -481,48 +479,79 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
// happen-before the writers
let rwlock = &mut this.machine.sync.rwlocks[id];
rwlock.clock_unlocked.clone_from(&rwlock.clock_current_readers);
this.rwlock_dequeue_and_lock_writer(id);
// See if there is a thread to unblock.
if let Some(writer) = rwlock.writer_queue.pop_front() {
this.unblock_thread(writer, BlockReason::RwLock(id))?;
}
}
true
Ok(true)
}
/// Put the reader in the queue waiting for the lock and block it.
/// Once the lock becomes available, `retval` will be written to `dest`.
#[inline]
fn rwlock_enqueue_and_block_reader(&mut self, id: RwLockId, reader: ThreadId) {
fn rwlock_enqueue_and_block_reader(
&mut self,
id: RwLockId,
retval: Scalar<Provenance>,
dest: MPlaceTy<'tcx, Provenance>,
) {
let this = self.eval_context_mut();
let thread = this.active_thread();
assert!(this.rwlock_is_write_locked(id), "read-queueing on not write locked rwlock");
this.machine.sync.rwlocks[id].reader_queue.push_back(reader);
this.block_thread(reader, BlockReason::RwLock(id));
this.machine.sync.rwlocks[id].reader_queue.push_back(thread);
this.block_thread(BlockReason::RwLock(id), None, Callback { id, retval, dest });
struct Callback<'tcx> {
id: RwLockId,
retval: Scalar<Provenance>,
dest: MPlaceTy<'tcx, Provenance>,
}
impl<'tcx> VisitProvenance for Callback<'tcx> {
fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
let Callback { id: _, retval, dest } = self;
retval.visit_provenance(visit);
dest.visit_provenance(visit);
}
}
impl<'mir, 'tcx: 'mir> UnblockCallback<'mir, 'tcx> for Callback<'tcx> {
fn unblock(self: Box<Self>, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
this.rwlock_reader_lock(self.id);
this.write_scalar(self.retval, &self.dest)?;
Ok(())
}
}
}
/// Lock by setting the writer that owns the lock.
#[inline]
fn rwlock_writer_lock(&mut self, id: RwLockId, writer: ThreadId) {
fn rwlock_writer_lock(&mut self, id: RwLockId) {
let this = self.eval_context_mut();
let thread = this.active_thread();
assert!(!this.rwlock_is_locked(id), "the rwlock is already locked");
trace!("rwlock_writer_lock: {:?} now held by {:?}", id, writer);
trace!("rwlock_writer_lock: {:?} now held by {:?}", id, thread);
let rwlock = &mut this.machine.sync.rwlocks[id];
rwlock.writer = Some(writer);
rwlock.writer = Some(thread);
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(&rwlock.clock_unlocked, writer);
data_race.acquire_clock(&rwlock.clock_unlocked, &this.machine.threads);
}
}
/// Try to unlock an rwlock held by the current thread.
/// Return `false` if it is held by another thread.
#[inline]
fn rwlock_writer_unlock(&mut self, id: RwLockId) -> bool {
fn rwlock_writer_unlock(&mut self, id: RwLockId) -> InterpResult<'tcx, bool> {
let this = self.eval_context_mut();
let thread = this.get_active_thread();
let thread = this.active_thread();
let rwlock = &mut this.machine.sync.rwlocks[id];
if let Some(current_writer) = rwlock.writer {
Ok(if let Some(current_writer) = rwlock.writer {
if current_writer != thread {
// Only the owner can unlock the rwlock.
return false;
return Ok(false);
}
rwlock.writer = None;
trace!("rwlock_writer_unlock: {:?} unlocked by {:?}", id, thread);
// Release memory to next lock holder.
// Record release clock for next lock holder.
if let Some(data_race) = &this.machine.data_race {
rwlock.clock_unlocked.clone_from(&*data_race.release_clock(&this.machine.threads));
}
@ -531,50 +560,54 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
// We are prioritizing writers here against the readers. As a
// result, not only readers can starve writers, but also writers can
// starve readers.
if this.rwlock_dequeue_and_lock_writer(id) {
// Someone got the write lock, nice.
if let Some(writer) = rwlock.writer_queue.pop_front() {
this.unblock_thread(writer, BlockReason::RwLock(id))?;
} else {
// Give the lock to all readers.
while this.rwlock_dequeue_and_lock_reader(id) {
// Rinse and repeat.
// Take the entire read queue and wake them all up.
let readers = std::mem::take(&mut rwlock.reader_queue);
for reader in readers {
this.unblock_thread(reader, BlockReason::RwLock(id))?;
}
}
true
} else {
false
}
})
}
/// Put the writer in the queue waiting for the lock.
/// Once the lock becomes available, `retval` will be written to `dest`.
#[inline]
fn rwlock_enqueue_and_block_writer(&mut self, id: RwLockId, writer: ThreadId) {
fn rwlock_enqueue_and_block_writer(
&mut self,
id: RwLockId,
retval: Scalar<Provenance>,
dest: MPlaceTy<'tcx, Provenance>,
) {
let this = self.eval_context_mut();
assert!(this.rwlock_is_locked(id), "write-queueing on unlocked rwlock");
this.machine.sync.rwlocks[id].writer_queue.push_back(writer);
this.block_thread(writer, BlockReason::RwLock(id));
}
let thread = this.active_thread();
this.machine.sync.rwlocks[id].writer_queue.push_back(thread);
this.block_thread(BlockReason::RwLock(id), None, Callback { id, retval, dest });
/// Provides the closure with the next CondvarId. Creates that Condvar if the closure returns None,
/// otherwise returns the value from the closure
#[inline]
fn condvar_get_or_create<F>(&mut self, existing: F) -> InterpResult<'tcx, CondvarId>
where
F: FnOnce(
&mut MiriInterpCx<'mir, 'tcx>,
CondvarId,
) -> InterpResult<'tcx, Option<CondvarId>>,
{
let this = self.eval_context_mut();
let next_index = this.machine.sync.condvars.next_index();
if let Some(old) = existing(this, next_index)? {
if this.machine.sync.condvars.get(old).is_none() {
throw_ub_format!("condvar has invalid ID");
struct Callback<'tcx> {
id: RwLockId,
retval: Scalar<Provenance>,
dest: MPlaceTy<'tcx, Provenance>,
}
impl<'tcx> VisitProvenance for Callback<'tcx> {
fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
let Callback { id: _, retval, dest } = self;
retval.visit_provenance(visit);
dest.visit_provenance(visit);
}
}
impl<'mir, 'tcx: 'mir> UnblockCallback<'mir, 'tcx> for Callback<'tcx> {
fn unblock(self: Box<Self>, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
this.rwlock_writer_lock(self.id);
this.write_scalar(self.retval, &self.dest)?;
Ok(())
}
Ok(old)
} else {
let new_index = this.machine.sync.condvars.push(Default::default());
assert_eq!(next_index, new_index);
Ok(new_index)
}
}
@ -585,17 +618,106 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
!this.machine.sync.condvars[id].waiters.is_empty()
}
/// Mark that the thread is waiting on the conditional variable.
fn condvar_wait(&mut self, id: CondvarId, thread: ThreadId, lock: MutexId) {
/// Release the mutex and let the current thread wait on the given condition variable.
/// Once it is signaled, the mutex will be acquired and `retval_succ` will be written to `dest`.
/// If the timeout happens first, `retval_timeout` will be written to `dest`.
fn condvar_wait(
&mut self,
condvar: CondvarId,
mutex: MutexId,
timeout: Option<Timeout>,
retval_succ: Scalar<Provenance>,
retval_timeout: Scalar<Provenance>,
dest: MPlaceTy<'tcx, Provenance>,
) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
let waiters = &mut this.machine.sync.condvars[id].waiters;
assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting");
waiters.push_back(CondvarWaiter { thread, lock });
if let Some(old_locked_count) = this.mutex_unlock(mutex)? {
if old_locked_count != 1 {
throw_unsup_format!(
"awaiting a condvar on a mutex acquired multiple times is not supported"
);
}
} else {
throw_ub_format!(
"awaiting a condvar on a mutex that is unlocked or owned by a different thread"
);
}
let thread = this.active_thread();
let waiters = &mut this.machine.sync.condvars[condvar].waiters;
waiters.push_back(thread);
this.block_thread(
BlockReason::Condvar(condvar),
timeout,
Callback { condvar, mutex, retval_succ, retval_timeout, dest },
);
return Ok(());
struct Callback<'tcx> {
condvar: CondvarId,
mutex: MutexId,
retval_succ: Scalar<Provenance>,
retval_timeout: Scalar<Provenance>,
dest: MPlaceTy<'tcx, Provenance>,
}
impl<'tcx> VisitProvenance for Callback<'tcx> {
fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
let Callback { condvar: _, mutex: _, retval_succ, retval_timeout, dest } = self;
retval_succ.visit_provenance(visit);
retval_timeout.visit_provenance(visit);
dest.visit_provenance(visit);
}
}
impl<'tcx, 'mir> Callback<'tcx> {
#[allow(clippy::boxed_local)]
fn reacquire_mutex(
self: Box<Self>,
this: &mut MiriInterpCx<'mir, 'tcx>,
retval: Scalar<Provenance>,
) -> InterpResult<'tcx> {
if this.mutex_is_locked(self.mutex) {
assert_ne!(this.mutex_get_owner(self.mutex), this.active_thread());
this.mutex_enqueue_and_block(self.mutex, retval, self.dest);
} else {
// We can have it right now!
this.mutex_lock(self.mutex);
// Don't forget to write the return value.
this.write_scalar(retval, &self.dest)?;
}
Ok(())
}
}
impl<'mir, 'tcx: 'mir> UnblockCallback<'mir, 'tcx> for Callback<'tcx> {
fn unblock(self: Box<Self>, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
// The condvar was signaled. Make sure we get the clock for that.
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(
&this.machine.sync.condvars[self.condvar].clock,
&this.machine.threads,
);
}
// Try to acquire the mutex.
// The timeout only applies to the first wait (until the signal), not for mutex acquisition.
let retval = self.retval_succ;
self.reacquire_mutex(this, retval)
}
fn timeout(
self: Box<Self>,
this: &mut InterpCx<'mir, 'tcx, MiriMachine<'mir, 'tcx>>,
) -> InterpResult<'tcx> {
// We have to remove the waiter from the queue again.
let thread = this.active_thread();
let waiters = &mut this.machine.sync.condvars[self.condvar].waiters;
waiters.retain(|waiter| *waiter != thread);
// Now get back the lock.
let retval = self.retval_timeout;
self.reacquire_mutex(this, retval)
}
}
}
/// Wake up some thread (if there is any) sleeping on the conditional
/// variable.
fn condvar_signal(&mut self, id: CondvarId) -> Option<(ThreadId, MutexId)> {
/// variable. Returns `true` iff any thread was woken up.
fn condvar_signal(&mut self, id: CondvarId) -> InterpResult<'tcx, bool> {
let this = self.eval_context_mut();
let condvar = &mut this.machine.sync.condvars[id];
let data_race = &this.machine.data_race;
@ -604,32 +726,87 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
if let Some(data_race) = data_race {
condvar.clock.clone_from(&*data_race.release_clock(&this.machine.threads));
}
condvar.waiters.pop_front().map(|waiter| {
if let Some(data_race) = data_race {
data_race.acquire_clock(&condvar.clock, waiter.thread);
}
(waiter.thread, waiter.lock)
})
let Some(waiter) = condvar.waiters.pop_front() else {
return Ok(false);
};
this.unblock_thread(waiter, BlockReason::Condvar(id))?;
Ok(true)
}
#[inline]
/// Remove the thread from the queue of threads waiting on this conditional variable.
fn condvar_remove_waiter(&mut self, id: CondvarId, thread: ThreadId) {
let this = self.eval_context_mut();
this.machine.sync.condvars[id].waiters.retain(|waiter| waiter.thread != thread);
}
fn futex_wait(&mut self, addr: u64, thread: ThreadId, bitset: u32) {
/// Wait for the futex to be signaled, or a timeout.
/// On a signal, `retval_succ` is written to `dest`.
/// On a timeout, `retval_timeout` is written to `dest` and `errno_timeout` is set as the last error.
fn futex_wait(
&mut self,
addr: u64,
bitset: u32,
timeout: Option<Timeout>,
retval_succ: Scalar<Provenance>,
retval_timeout: Scalar<Provenance>,
dest: MPlaceTy<'tcx, Provenance>,
errno_timeout: Scalar<Provenance>,
) {
let this = self.eval_context_mut();
let thread = this.active_thread();
let futex = &mut this.machine.sync.futexes.entry(addr).or_default();
let waiters = &mut futex.waiters;
assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting");
waiters.push_back(FutexWaiter { thread, bitset });
this.block_thread(
BlockReason::Futex { addr },
timeout,
Callback { addr, retval_succ, retval_timeout, dest, errno_timeout },
);
struct Callback<'tcx> {
addr: u64,
retval_succ: Scalar<Provenance>,
retval_timeout: Scalar<Provenance>,
dest: MPlaceTy<'tcx, Provenance>,
errno_timeout: Scalar<Provenance>,
}
impl<'tcx> VisitProvenance for Callback<'tcx> {
fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
let Callback { addr: _, retval_succ, retval_timeout, dest, errno_timeout } = self;
retval_succ.visit_provenance(visit);
retval_timeout.visit_provenance(visit);
dest.visit_provenance(visit);
errno_timeout.visit_provenance(visit);
}
}
impl<'mir, 'tcx: 'mir> UnblockCallback<'mir, 'tcx> for Callback<'tcx> {
fn unblock(self: Box<Self>, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
let futex = this.machine.sync.futexes.get(&self.addr).unwrap();
// Acquire the clock of the futex.
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(&futex.clock, &this.machine.threads);
}
// Write the return value.
this.write_scalar(self.retval_succ, &self.dest)?;
Ok(())
}
fn timeout(
self: Box<Self>,
this: &mut InterpCx<'mir, 'tcx, MiriMachine<'mir, 'tcx>>,
) -> InterpResult<'tcx> {
// Remove the waiter from the futex.
let thread = this.active_thread();
let futex = this.machine.sync.futexes.get_mut(&self.addr).unwrap();
futex.waiters.retain(|waiter| waiter.thread != thread);
// Set errno and write return value.
this.set_last_error(self.errno_timeout)?;
this.write_scalar(self.retval_timeout, &self.dest)?;
Ok(())
}
}
}
fn futex_wake(&mut self, addr: u64, bitset: u32) -> Option<ThreadId> {
/// Returns whether anything was woken.
fn futex_wake(&mut self, addr: u64, bitset: u32) -> InterpResult<'tcx, bool> {
let this = self.eval_context_mut();
let futex = &mut this.machine.sync.futexes.get_mut(&addr)?;
let Some(futex) = this.machine.sync.futexes.get_mut(&addr) else {
return Ok(false);
};
let data_race = &this.machine.data_race;
// Each futex-wake happens-before the end of the futex wait
@ -638,19 +815,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
}
// Wake up the first thread in the queue that matches any of the bits in the bitset.
futex.waiters.iter().position(|w| w.bitset & bitset != 0).map(|i| {
let waiter = futex.waiters.remove(i).unwrap();
if let Some(data_race) = data_race {
data_race.acquire_clock(&futex.clock, waiter.thread);
}
waiter.thread
})
}
fn futex_remove_waiter(&mut self, addr: u64, thread: ThreadId) {
let this = self.eval_context_mut();
if let Some(futex) = this.machine.sync.futexes.get_mut(&addr) {
futex.waiters.retain(|waiter| waiter.thread != thread);
}
let Some(i) = futex.waiters.iter().position(|w| w.bitset & bitset != 0) else {
return Ok(false);
};
let waiter = futex.waiters.remove(i).unwrap();
this.unblock_thread(waiter.thread, BlockReason::Futex { addr })?;
Ok(true)
}
}

View File

@ -1,7 +1,6 @@
//! Implements threads.
use std::cell::RefCell;
use std::collections::hash_map::Entry;
use std::mem;
use std::num::TryFromIntError;
use std::sync::atomic::Ordering::Relaxed;
use std::task::Poll;
@ -41,12 +40,23 @@ pub enum TlsAllocAction {
Leak,
}
/// Trait for callbacks that can be executed when some event happens, such as after a timeout.
pub trait MachineCallback<'mir, 'tcx>: VisitProvenance {
fn call(&self, ecx: &mut InterpCx<'mir, 'tcx, MiriMachine<'mir, 'tcx>>) -> InterpResult<'tcx>;
}
/// Trait for callbacks that are executed when a thread gets unblocked.
pub trait UnblockCallback<'mir, 'tcx>: VisitProvenance {
fn unblock(
self: Box<Self>,
ecx: &mut InterpCx<'mir, 'tcx, MiriMachine<'mir, 'tcx>>,
) -> InterpResult<'tcx>;
type TimeoutCallback<'mir, 'tcx> = Box<dyn MachineCallback<'mir, 'tcx> + 'tcx>;
fn timeout(
self: Box<Self>,
_ecx: &mut InterpCx<'mir, 'tcx, MiriMachine<'mir, 'tcx>>,
) -> InterpResult<'tcx> {
unreachable!(
"timeout on a thread that was blocked without a timeout (or someone forgot to overwrite this method)"
)
}
}
type DynUnblockCallback<'mir, 'tcx> = Box<dyn UnblockCallback<'mir, 'tcx> + 'tcx>;
/// A thread identifier.
#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
@ -117,17 +127,45 @@ pub enum BlockReason {
}
/// The state of a thread.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ThreadState {
enum ThreadState<'mir, 'tcx> {
/// The thread is enabled and can be executed.
Enabled,
/// The thread is blocked on something.
Blocked(BlockReason),
Blocked {
reason: BlockReason,
timeout: Option<Timeout>,
callback: DynUnblockCallback<'mir, 'tcx>,
},
/// The thread has terminated its execution. We do not delete terminated
/// threads (FIXME: why?).
Terminated,
}
impl<'mir, 'tcx> std::fmt::Debug for ThreadState<'mir, 'tcx> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Enabled => write!(f, "Enabled"),
Self::Blocked { reason, timeout, .. } =>
f.debug_struct("Blocked").field("reason", reason).field("timeout", timeout).finish(),
Self::Terminated => write!(f, "Terminated"),
}
}
}
impl<'mir, 'tcx> ThreadState<'mir, 'tcx> {
fn is_enabled(&self) -> bool {
matches!(self, ThreadState::Enabled)
}
fn is_terminated(&self) -> bool {
matches!(self, ThreadState::Terminated)
}
fn is_blocked_on(&self, reason: BlockReason) -> bool {
matches!(*self, ThreadState::Blocked { reason: actual_reason, .. } if actual_reason == reason)
}
}
/// The join status of a thread.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum ThreadJoinStatus {
@ -142,7 +180,7 @@ enum ThreadJoinStatus {
/// A thread.
pub struct Thread<'mir, 'tcx> {
state: ThreadState,
state: ThreadState<'mir, 'tcx>,
/// Name of the thread.
thread_name: Option<Vec<u8>>,
@ -323,41 +361,24 @@ impl VisitProvenance for Frame<'_, '_, Provenance, FrameExtra<'_>> {
}
}
/// A specific moment in time.
/// The moment in time when a blocked thread should be woken up.
#[derive(Debug)]
pub enum CallbackTime {
pub enum Timeout {
Monotonic(Instant),
RealTime(SystemTime),
}
impl CallbackTime {
impl Timeout {
/// How long do we have to wait from now until the specified time?
fn get_wait_time(&self, clock: &Clock) -> Duration {
match self {
CallbackTime::Monotonic(instant) => instant.duration_since(clock.now()),
CallbackTime::RealTime(time) =>
time.duration_since(SystemTime::now()).unwrap_or(Duration::new(0, 0)),
Timeout::Monotonic(instant) => instant.duration_since(clock.now()),
Timeout::RealTime(time) =>
time.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO),
}
}
}
/// Callbacks are used to implement timeouts. For example, waiting on a
/// conditional variable with a timeout creates a callback that is called after
/// the specified time and unblocks the thread. If another thread signals on the
/// conditional variable, the signal handler deletes the callback.
struct TimeoutCallbackInfo<'mir, 'tcx> {
/// The callback should be called no earlier than this time.
call_time: CallbackTime,
/// The called function.
callback: TimeoutCallback<'mir, 'tcx>,
}
impl<'mir, 'tcx> std::fmt::Debug for TimeoutCallbackInfo<'mir, 'tcx> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "TimeoutCallback({:?})", self.call_time)
}
}
/// A set of threads.
#[derive(Debug)]
pub struct ThreadManager<'mir, 'tcx> {
@ -369,11 +390,9 @@ pub struct ThreadManager<'mir, 'tcx> {
threads: IndexVec<ThreadId, Thread<'mir, 'tcx>>,
/// A mapping from a thread-local static to an allocation id of a thread
/// specific allocation.
thread_local_alloc_ids: RefCell<FxHashMap<(DefId, ThreadId), Pointer<Provenance>>>,
thread_local_alloc_ids: FxHashMap<(DefId, ThreadId), Pointer<Provenance>>,
/// A flag that indicates that we should change the active thread.
yield_active_thread: bool,
/// Callbacks that are called once the specified time passes.
timeout_callbacks: FxHashMap<ThreadId, TimeoutCallbackInfo<'mir, 'tcx>>,
}
impl VisitProvenance for ThreadManager<'_, '_> {
@ -381,7 +400,6 @@ impl VisitProvenance for ThreadManager<'_, '_> {
let ThreadManager {
threads,
thread_local_alloc_ids,
timeout_callbacks,
active_thread: _,
yield_active_thread: _,
} = self;
@ -389,12 +407,9 @@ impl VisitProvenance for ThreadManager<'_, '_> {
for thread in threads {
thread.visit_provenance(visit);
}
for ptr in thread_local_alloc_ids.borrow().values() {
for ptr in thread_local_alloc_ids.values() {
ptr.visit_provenance(visit);
}
for callback in timeout_callbacks.values() {
callback.callback.visit_provenance(visit);
}
}
}
@ -408,7 +423,6 @@ impl<'mir, 'tcx> Default for ThreadManager<'mir, 'tcx> {
threads,
thread_local_alloc_ids: Default::default(),
yield_active_thread: false,
timeout_callbacks: FxHashMap::default(),
}
}
}
@ -430,18 +444,15 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
/// Check if we have an allocation for the given thread local static for the
/// active thread.
fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option<Pointer<Provenance>> {
self.thread_local_alloc_ids.borrow().get(&(def_id, self.active_thread)).cloned()
self.thread_local_alloc_ids.get(&(def_id, self.active_thread)).cloned()
}
/// Set the pointer for the allocation of the given thread local
/// static for the active thread.
///
/// Panics if a thread local is initialized twice for the same thread.
fn set_thread_local_alloc(&self, def_id: DefId, ptr: Pointer<Provenance>) {
self.thread_local_alloc_ids
.borrow_mut()
.try_insert((def_id, self.active_thread), ptr)
.unwrap();
fn set_thread_local_alloc(&mut self, def_id: DefId, ptr: Pointer<Provenance>) {
self.thread_local_alloc_ids.try_insert((def_id, self.active_thread), ptr).unwrap();
}
/// Borrow the stack of the active thread.
@ -480,7 +491,7 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
}
/// Get the id of the currently active thread.
pub fn get_active_thread_id(&self) -> ThreadId {
pub fn active_thread(&self) -> ThreadId {
self.active_thread
}
@ -492,17 +503,17 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
/// Get the total of threads that are currently live, i.e., not yet terminated.
/// (They might be blocked.)
pub fn get_live_thread_count(&self) -> usize {
self.threads.iter().filter(|t| !matches!(t.state, ThreadState::Terminated)).count()
self.threads.iter().filter(|t| !t.state.is_terminated()).count()
}
/// Has the given thread terminated?
fn has_terminated(&self, thread_id: ThreadId) -> bool {
self.threads[thread_id].state == ThreadState::Terminated
self.threads[thread_id].state.is_terminated()
}
/// Have all threads terminated?
fn have_all_terminated(&self) -> bool {
self.threads.iter().all(|thread| thread.state == ThreadState::Terminated)
self.threads.iter().all(|thread| thread.state.is_terminated())
}
/// Enable the thread for execution. The thread must be terminated.
@ -532,8 +543,7 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
fn detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx> {
trace!("detaching {:?}", id);
let is_ub = if allow_terminated_joined && self.threads[id].state == ThreadState::Terminated
{
let is_ub = if allow_terminated_joined && self.threads[id].state.is_terminated() {
// "Detached" in particular means "not yet joined". Redundant detaching is still UB.
self.threads[id].join_status == ThreadJoinStatus::Detached
} else {
@ -561,15 +571,41 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
// Mark the joined thread as being joined so that we detect if other
// threads try to join it.
self.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
if self.threads[joined_thread_id].state != ThreadState::Terminated {
// The joined thread is still running, we need to wait for it.
self.active_thread_mut().state =
ThreadState::Blocked(BlockReason::Join(joined_thread_id));
if !self.threads[joined_thread_id].state.is_terminated() {
trace!(
"{:?} blocked on {:?} when trying to join",
self.active_thread,
joined_thread_id
);
// The joined thread is still running, we need to wait for it.
// Unce we get unblocked, perform the appropriate synchronization.
self.block_thread(
BlockReason::Join(joined_thread_id),
None,
Callback { joined_thread_id },
);
struct Callback {
joined_thread_id: ThreadId,
}
impl VisitProvenance for Callback {
fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {}
}
impl<'mir, 'tcx: 'mir> UnblockCallback<'mir, 'tcx> for Callback {
fn unblock(
self: Box<Self>,
this: &mut MiriInterpCx<'mir, 'tcx>,
) -> InterpResult<'tcx> {
if let Some(data_race) = &mut this.machine.data_race {
data_race.thread_joined(
&this.machine.threads,
this.machine.threads.active_thread(),
self.joined_thread_id,
);
}
Ok(())
}
}
} else {
// The thread has already terminated - mark join happens-before
if let Some(data_race) = data_race {
@ -596,9 +632,9 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
// Sanity check `join_status`.
assert!(
self.threads.iter().all(|thread| {
thread.state != ThreadState::Blocked(BlockReason::Join(joined_thread_id))
}),
self.threads
.iter()
.all(|thread| { !thread.state.is_blocked_on(BlockReason::Join(joined_thread_id)) }),
"this thread already has threads waiting for its termination"
);
@ -620,18 +656,15 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
}
/// Put the thread into the blocked state.
fn block_thread(&mut self, thread: ThreadId, reason: BlockReason) {
let state = &mut self.threads[thread].state;
assert_eq!(*state, ThreadState::Enabled);
*state = ThreadState::Blocked(reason);
}
/// Put the blocked thread into the enabled state.
/// Sanity-checks that the thread previously was blocked for the right reason.
fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) {
let state = &mut self.threads[thread].state;
assert_eq!(*state, ThreadState::Blocked(reason));
*state = ThreadState::Enabled;
fn block_thread(
&mut self,
reason: BlockReason,
timeout: Option<Timeout>,
callback: impl UnblockCallback<'mir, 'tcx> + 'tcx,
) {
let state = &mut self.threads[self.active_thread].state;
assert!(state.is_enabled());
*state = ThreadState::Blocked { reason, timeout, callback: Box::new(callback) }
}
/// Change the active thread to some enabled thread.
@ -642,87 +675,18 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
self.yield_active_thread = true;
}
/// Register the given `callback` to be called once the `call_time` passes.
///
/// The callback will be called with `thread` being the active thread, and
/// the callback may not change the active thread.
fn register_timeout_callback(
&mut self,
thread: ThreadId,
call_time: CallbackTime,
callback: TimeoutCallback<'mir, 'tcx>,
) {
self.timeout_callbacks
.try_insert(thread, TimeoutCallbackInfo { call_time, callback })
.unwrap();
}
/// Unregister the callback for the `thread`.
fn unregister_timeout_callback_if_exists(&mut self, thread: ThreadId) {
self.timeout_callbacks.remove(&thread);
}
/// Get a callback that is ready to be called.
fn get_ready_callback(
&mut self,
clock: &Clock,
) -> Option<(ThreadId, TimeoutCallback<'mir, 'tcx>)> {
// We iterate over all threads in the order of their indices because
// this allows us to have a deterministic scheduler.
for thread in self.threads.indices() {
match self.timeout_callbacks.entry(thread) {
Entry::Occupied(entry) => {
if entry.get().call_time.get_wait_time(clock) == Duration::new(0, 0) {
return Some((thread, entry.remove().callback));
}
/// Get the wait time for the next timeout, or `None` if no timeout is pending.
fn next_callback_wait_time(&self, clock: &Clock) -> Option<Duration> {
self.threads
.iter()
.filter_map(|t| {
match &t.state {
ThreadState::Blocked { timeout: Some(timeout), .. } =>
Some(timeout.get_wait_time(clock)),
_ => None,
}
Entry::Vacant(_) => {}
}
}
None
}
/// Wakes up threads joining on the active one and deallocates thread-local statics.
/// The `AllocId` that can now be freed are returned.
fn thread_terminated(
&mut self,
mut data_race: Option<&mut data_race::GlobalState>,
current_span: Span,
) -> Vec<Pointer<Provenance>> {
let mut free_tls_statics = Vec::new();
{
let mut thread_local_statics = self.thread_local_alloc_ids.borrow_mut();
thread_local_statics.retain(|&(_def_id, thread), &mut alloc_id| {
if thread != self.active_thread {
// Keep this static around.
return true;
}
// Delete this static from the map and from memory.
// We cannot free directly here as we cannot use `?` in this context.
free_tls_statics.push(alloc_id);
false
});
}
// Set the thread into a terminated state in the data-race detector.
if let Some(ref mut data_race) = data_race {
data_race.thread_terminated(self, current_span);
}
// Check if we need to unblock any threads.
let mut joined_threads = vec![]; // store which threads joined, we'll need it
for (i, thread) in self.threads.iter_enumerated_mut() {
if thread.state == ThreadState::Blocked(BlockReason::Join(self.active_thread)) {
// The thread has terminated, mark happens-before edge to joining thread
if data_race.is_some() {
joined_threads.push(i);
}
trace!("unblocking {:?} because {:?} terminated", i, self.active_thread);
thread.state = ThreadState::Enabled;
}
}
for &i in &joined_threads {
data_race.as_mut().unwrap().thread_joined(self, i, self.active_thread);
}
free_tls_statics
})
.min()
}
/// Decide which action to take next and on which thread.
@ -733,9 +697,7 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
/// blocked, terminated, or has explicitly asked to be preempted).
fn schedule(&mut self, clock: &Clock) -> InterpResult<'tcx, SchedulingAction> {
// This thread and the program can keep going.
if self.threads[self.active_thread].state == ThreadState::Enabled
&& !self.yield_active_thread
{
if self.threads[self.active_thread].state.is_enabled() && !self.yield_active_thread {
// The currently active thread is still enabled, just continue with it.
return Ok(SchedulingAction::ExecuteStep);
}
@ -745,9 +707,8 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
// `pthread_cond_timedwait`, "an error is returned if [...] the absolute time specified by
// abstime has already been passed at the time of the call".
// <https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_cond_timedwait.html>
let potential_sleep_time =
self.timeout_callbacks.values().map(|info| info.call_time.get_wait_time(clock)).min();
if potential_sleep_time == Some(Duration::new(0, 0)) {
let potential_sleep_time = self.next_callback_wait_time(clock);
if potential_sleep_time == Some(Duration::ZERO) {
return Ok(SchedulingAction::ExecuteTimeoutCallback);
}
// No callbacks immediately scheduled, pick a regular thread to execute.
@ -765,7 +726,7 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
.chain(self.threads.iter_enumerated().take(self.active_thread.index()));
for (id, thread) in threads {
debug_assert_ne!(self.active_thread, id);
if thread.state == ThreadState::Enabled {
if thread.state.is_enabled() {
info!(
"---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
self.get_thread_display_name(id),
@ -776,11 +737,11 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
}
}
self.yield_active_thread = false;
if self.threads[self.active_thread].state == ThreadState::Enabled {
if self.threads[self.active_thread].state.is_enabled() {
return Ok(SchedulingAction::ExecuteStep);
}
// We have not found a thread to execute.
if self.threads.iter().all(|thread| thread.state == ThreadState::Terminated) {
if self.threads.iter().all(|thread| thread.state.is_terminated()) {
unreachable!("all threads terminated without the main thread terminating?!");
} else if let Some(sleep_time) = potential_sleep_time {
// All threads are currently blocked, but we have unexecuted
@ -799,29 +760,40 @@ trait EvalContextPrivExt<'mir, 'tcx: 'mir>: MiriInterpCxExt<'mir, 'tcx> {
#[inline]
fn run_timeout_callback(&mut self) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
let (thread, callback) = if let Some((thread, callback)) =
this.machine.threads.get_ready_callback(&this.machine.clock)
{
(thread, callback)
} else {
// get_ready_callback can return None if the computer's clock
// was shifted after calling the scheduler and before the call
// to get_ready_callback (see issue
// https://github.com/rust-lang/miri/issues/1763). In this case,
// just do nothing, which effectively just returns to the
// scheduler.
return Ok(());
};
// This back-and-forth with `set_active_thread` is here because of two
// design decisions:
// 1. Make the caller and not the callback responsible for changing
// thread.
// 2. Make the scheduler the only place that can change the active
// thread.
let old_thread = this.set_active_thread(thread);
callback.call(this)?;
this.set_active_thread(old_thread);
Ok(())
let mut found_callback = None;
// Find a blocked thread that has timed out.
for (id, thread) in this.machine.threads.threads.iter_enumerated_mut() {
match &thread.state {
ThreadState::Blocked { timeout: Some(timeout), .. }
if timeout.get_wait_time(&this.machine.clock) == Duration::ZERO =>
{
let old_state = mem::replace(&mut thread.state, ThreadState::Enabled);
let ThreadState::Blocked { callback, .. } = old_state else { unreachable!() };
found_callback = Some((id, callback));
// Run the fallback (after the loop because borrow-checking).
break;
}
_ => {}
}
}
if let Some((thread, callback)) = found_callback {
// This back-and-forth with `set_active_thread` is here because of two
// design decisions:
// 1. Make the caller and not the callback responsible for changing
// thread.
// 2. Make the scheduler the only place that can change the active
// thread.
let old_thread = this.machine.threads.set_active_thread_id(thread);
callback.timeout(this)?;
this.machine.threads.set_active_thread_id(old_thread);
}
// found_callback can remain None if the computer's clock
// was shifted after calling the scheduler and before the call
// to get_ready_callback (see issue
// https://github.com/rust-lang/miri/issues/1763). In this case,
// just do nothing, which effectively just returns to the
// scheduler.
return Ok(());
}
#[inline]
@ -904,7 +876,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
// Finally switch to new thread so that we can push the first stackframe.
// After this all accesses will be treated as occurring in the new thread.
let old_thread_id = this.set_active_thread(new_thread_id);
let old_thread_id = this.machine.threads.set_active_thread_id(new_thread_id);
// Perform the function pointer load in the new thread frame.
let instance = this.get_ptr_fn(start_routine)?.as_instance()?;
@ -923,11 +895,110 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
)?;
// Restore the old active thread frame.
this.set_active_thread(old_thread_id);
this.machine.threads.set_active_thread_id(old_thread_id);
Ok(new_thread_id)
}
/// Handles thread termination of the active thread: wakes up threads joining on this one,
/// and deals with the thread's thread-local statics according to `tls_alloc_action`.
///
/// This is called by the eval loop when a thread's on_stack_empty returns `Ready`.
fn terminate_active_thread(&mut self, tls_alloc_action: TlsAllocAction) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
// Mark thread as terminated.
let thread = this.active_thread_mut();
assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated");
thread.state = ThreadState::Terminated;
if let Some(ref mut data_race) = this.machine.data_race {
data_race.thread_terminated(&this.machine.threads);
}
// Deallocate TLS.
let gone_thread = this.active_thread();
{
let mut free_tls_statics = Vec::new();
this.machine.threads.thread_local_alloc_ids.retain(
|&(_def_id, thread), &mut alloc_id| {
if thread != gone_thread {
// A different thread, keep this static around.
return true;
}
// Delete this static from the map and from memory.
// We cannot free directly here as we cannot use `?` in this context.
free_tls_statics.push(alloc_id);
false
},
);
// Now free the TLS statics.
for ptr in free_tls_statics {
match tls_alloc_action {
TlsAllocAction::Deallocate =>
this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?,
TlsAllocAction::Leak =>
if let Some(alloc) = ptr.provenance.get_alloc_id() {
trace!(
"Thread-local static leaked and stored as static root: {:?}",
alloc
);
this.machine.static_roots.push(alloc);
},
}
}
}
// Unblock joining threads.
let unblock_reason = BlockReason::Join(gone_thread);
let threads = &this.machine.threads.threads;
let joining_threads = threads
.iter_enumerated()
.filter(|(_, thread)| thread.state.is_blocked_on(unblock_reason))
.map(|(id, _)| id)
.collect::<Vec<_>>();
for thread in joining_threads {
this.unblock_thread(thread, unblock_reason)?;
}
Ok(())
}
/// Block the current thread, with an optional timeout.
/// The callback will be invoked when the thread gets unblocked.
#[inline]
fn block_thread(
&mut self,
reason: BlockReason,
timeout: Option<Timeout>,
callback: impl UnblockCallback<'mir, 'tcx> + 'tcx,
) {
let this = self.eval_context_mut();
if !this.machine.communicate() && matches!(timeout, Some(Timeout::RealTime(..))) {
panic!("cannot have `RealTime` callback with isolation enabled!")
}
this.machine.threads.block_thread(reason, timeout, callback);
}
/// Put the blocked thread into the enabled state.
/// Sanity-checks that the thread previously was blocked for the right reason.
fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
let old_state =
mem::replace(&mut this.machine.threads.threads[thread].state, ThreadState::Enabled);
let callback = match old_state {
ThreadState::Blocked { reason: actual_reason, callback, .. } => {
assert_eq!(
reason, actual_reason,
"unblock_thread: thread was blocked for the wrong reason"
);
callback
}
_ => panic!("unblock_thread: thread was not blocked"),
};
// The callback must be executed in the previously blocked thread.
let old_thread = this.machine.threads.set_active_thread_id(thread);
callback.unblock(this)?;
this.machine.threads.set_active_thread_id(old_thread);
Ok(())
}
#[inline]
fn detach_thread(
&mut self,
@ -955,15 +1026,9 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
}
#[inline]
fn set_active_thread(&mut self, thread_id: ThreadId) -> ThreadId {
let this = self.eval_context_mut();
this.machine.threads.set_active_thread_id(thread_id)
}
#[inline]
fn get_active_thread(&self) -> ThreadId {
fn active_thread(&self) -> ThreadId {
let this = self.eval_context_ref();
this.machine.threads.get_active_thread_id()
this.machine.threads.active_thread()
}
#[inline]
@ -1025,16 +1090,6 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
self.eval_context_ref().machine.threads.get_thread_name(thread)
}
#[inline]
fn block_thread(&mut self, thread: ThreadId, reason: BlockReason) {
self.eval_context_mut().machine.threads.block_thread(thread, reason);
}
#[inline]
fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) {
self.eval_context_mut().machine.threads.unblock_thread(thread, reason);
}
#[inline]
fn yield_active_thread(&mut self) {
self.eval_context_mut().machine.threads.yield_active_thread();
@ -1050,26 +1105,6 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
}
}
#[inline]
fn register_timeout_callback(
&mut self,
thread: ThreadId,
call_time: CallbackTime,
callback: TimeoutCallback<'mir, 'tcx>,
) {
let this = self.eval_context_mut();
if !this.machine.communicate() && matches!(call_time, CallbackTime::RealTime(..)) {
panic!("cannot have `RealTime` callback with isolation enabled!")
}
this.machine.threads.register_timeout_callback(thread, call_time, callback);
}
#[inline]
fn unregister_timeout_callback_if_exists(&mut self, thread: ThreadId) {
let this = self.eval_context_mut();
this.machine.threads.unregister_timeout_callback_if_exists(thread);
}
/// Run the core interpreter loop. Returns only when an interrupt occurs (an error or program
/// termination).
fn run_threads(&mut self) -> InterpResult<'tcx, !> {
@ -1099,32 +1134,4 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
}
}
}
/// Handles thread termination of the active thread: wakes up threads joining on this one,
/// and deals with the thread's thread-local statics according to `tls_alloc_action`.
///
/// This is called by the eval loop when a thread's on_stack_empty returns `Ready`.
#[inline]
fn terminate_active_thread(&mut self, tls_alloc_action: TlsAllocAction) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
let thread = this.active_thread_mut();
assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated");
thread.state = ThreadState::Terminated;
let current_span = this.machine.current_span();
let thread_local_allocations =
this.machine.threads.thread_terminated(this.machine.data_race.as_mut(), current_span);
for ptr in thread_local_allocations {
match tls_alloc_action {
TlsAllocAction::Deallocate =>
this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?,
TlsAllocAction::Leak =>
if let Some(alloc) = ptr.provenance.get_alloc_id() {
trace!("Thread-local static leaked and stored as static root: {:?}", alloc);
this.machine.static_roots.push(alloc);
},
}
}
Ok(())
}
}

View File

@ -411,7 +411,7 @@ pub fn report_error<'tcx, 'mir>(
vec![],
helps,
&stacktrace,
Some(ecx.get_active_thread()),
Some(ecx.active_thread()),
&ecx.machine,
);
@ -419,7 +419,7 @@ pub fn report_error<'tcx, 'mir>(
if show_all_threads {
for (thread, stack) in ecx.machine.threads.all_stacks() {
if thread != ecx.get_active_thread() {
if thread != ecx.active_thread() {
let stacktrace = Frame::generate_stacktrace_from_stack(stack);
let (stacktrace, was_pruned) = prune_stacktrace(stacktrace, &ecx.machine);
any_pruned |= was_pruned;
@ -684,7 +684,7 @@ impl<'mir, 'tcx> MiriMachine<'mir, 'tcx> {
notes,
helps,
&stacktrace,
Some(self.threads.get_active_thread_id()),
Some(self.threads.active_thread()),
self,
);
}
@ -712,7 +712,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
vec![],
vec![],
&stacktrace,
Some(this.get_active_thread()),
Some(this.active_thread()),
&this.machine,
);
}

View File

@ -123,7 +123,8 @@ pub use crate::concurrency::{
init_once::{EvalContextExt as _, InitOnceId},
sync::{CondvarId, EvalContextExt as _, MutexId, RwLockId, SynchronizationObjects},
thread::{
BlockReason, CallbackTime, EvalContextExt as _, StackEmptyCallback, ThreadId, ThreadManager,
BlockReason, EvalContextExt as _, StackEmptyCallback, ThreadId, ThreadManager, Timeout,
UnblockCallback,
},
};
pub use crate::diagnostics::{

View File

@ -473,7 +473,7 @@ pub struct MiriMachine<'mir, 'tcx> {
/// The set of threads.
pub(crate) threads: ThreadManager<'mir, 'tcx>,
/// The state of the primitive synchronization objects.
pub(crate) sync: SynchronizationObjects<'mir, 'tcx>,
pub(crate) sync: SynchronizationObjects,
/// Precomputed `TyLayout`s for primitive data types that are commonly used inside Miri.
pub(crate) layouts: PrimitiveLayouts<'tcx>,
@ -770,7 +770,7 @@ impl VisitProvenance for MiriMachine<'_, '_> {
#[rustfmt::skip]
let MiriMachine {
threads,
sync,
sync: _,
tls,
env_vars,
main_fn_ret_place,
@ -819,7 +819,6 @@ impl VisitProvenance for MiriMachine<'_, '_> {
} = self;
threads.visit_provenance(visit);
sync.visit_provenance(visit);
tls.visit_provenance(visit);
env_vars.visit_provenance(visit);
dirs.visit_provenance(visit);
@ -1371,7 +1370,7 @@ impl<'mir, 'tcx> Machine<'mir, 'tcx> for MiriMachine<'mir, 'tcx> {
Some(profiler.start_recording_interval_event_detached(
*name,
measureme::EventId::from_label(*name),
ecx.get_active_thread().to_u32(),
ecx.active_thread().to_u32(),
))
} else {
None

View File

@ -6,7 +6,6 @@ use std::time::{Duration, SystemTime};
use chrono::{DateTime, Datelike, Offset, Timelike, Utc};
use chrono_tz::Tz;
use crate::concurrency::thread::MachineCallback;
use crate::*;
/// Returns the time elapsed between the provided time and the unix epoch as a `Duration`.
@ -336,16 +335,9 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let timeout_time = now
.checked_add(duration)
.unwrap_or_else(|| now.checked_add(Duration::from_secs(3600)).unwrap());
let timeout_time = Timeout::Monotonic(timeout_time);
let active_thread = this.get_active_thread();
this.block_thread(active_thread, BlockReason::Sleep);
this.register_timeout_callback(
active_thread,
CallbackTime::Monotonic(timeout_time),
Box::new(UnblockCallback { thread_to_unblock: active_thread }),
);
this.block_thread(BlockReason::Sleep, Some(timeout_time), SleepCallback);
Ok(0)
}
@ -359,31 +351,25 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let duration = Duration::from_millis(timeout_ms.into());
let timeout_time = this.machine.clock.now().checked_add(duration).unwrap();
let timeout_time = Timeout::Monotonic(timeout_time);
let active_thread = this.get_active_thread();
this.block_thread(active_thread, BlockReason::Sleep);
this.register_timeout_callback(
active_thread,
CallbackTime::Monotonic(timeout_time),
Box::new(UnblockCallback { thread_to_unblock: active_thread }),
);
this.block_thread(BlockReason::Sleep, Some(timeout_time), SleepCallback);
Ok(())
}
}
struct UnblockCallback {
thread_to_unblock: ThreadId,
}
impl VisitProvenance for UnblockCallback {
struct SleepCallback;
impl VisitProvenance for SleepCallback {
fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {}
}
impl<'mir, 'tcx: 'mir> MachineCallback<'mir, 'tcx> for UnblockCallback {
fn call(&self, ecx: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
ecx.unblock_thread(self.thread_to_unblock, BlockReason::Sleep);
impl<'mir, 'tcx: 'mir> UnblockCallback<'mir, 'tcx> for SleepCallback {
fn timeout(self: Box<Self>, _this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
Ok(())
}
fn unblock(
self: Box<Self>,
_this: &mut InterpCx<'mir, 'tcx, MiriMachine<'mir, 'tcx>>,
) -> InterpResult<'tcx> {
panic!("a sleeping thread should only ever be woken up via the timeout")
}
}

View File

@ -282,7 +282,7 @@ impl<'tcx> TlsDtorsState<'tcx> {
}
}
Done => {
this.machine.tls.delete_all_thread_tls(this.get_active_thread());
this.machine.tls.delete_all_thread_tls(this.active_thread());
return Ok(Poll::Ready(()));
}
}
@ -332,7 +332,7 @@ trait EvalContextPrivExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
/// executed.
fn schedule_macos_tls_dtor(&mut self) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
let thread_id = this.get_active_thread();
let thread_id = this.active_thread();
if let Some((instance, data)) = this.machine.tls.macos_thread_dtors.remove(&thread_id) {
trace!("Running macos dtor {:?} on {:?} at {:?}", instance, data, thread_id);
@ -354,7 +354,7 @@ trait EvalContextPrivExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
state: &mut RunningDtorState,
) -> InterpResult<'tcx, Poll<()>> {
let this = self.eval_context_mut();
let active_thread = this.get_active_thread();
let active_thread = this.active_thread();
// Fetch next dtor after `key`.
let dtor = match this.machine.tls.fetch_tls_dtor(state.last_key, active_thread) {

View File

@ -388,14 +388,14 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
"pthread_getspecific" => {
let [key] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
let key = this.read_scalar(key)?.to_bits(key.layout.size)?;
let active_thread = this.get_active_thread();
let active_thread = this.active_thread();
let ptr = this.machine.tls.load_tls(key, active_thread, this)?;
this.write_scalar(ptr, dest)?;
}
"pthread_setspecific" => {
let [key, new_ptr] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
let key = this.read_scalar(key)?.to_bits(key.layout.size)?;
let active_thread = this.get_active_thread();
let active_thread = this.active_thread();
let new_data = this.read_scalar(new_ptr)?;
this.machine.tls.store_tls(key, active_thread, new_data, &*this.tcx)?;
@ -426,8 +426,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
}
"pthread_mutex_lock" => {
let [mutex] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
let result = this.pthread_mutex_lock(mutex)?;
this.write_scalar(Scalar::from_i32(result), dest)?;
this.pthread_mutex_lock(mutex, dest)?;
}
"pthread_mutex_trylock" => {
let [mutex] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
@ -446,8 +445,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
}
"pthread_rwlock_rdlock" => {
let [rwlock] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
let result = this.pthread_rwlock_rdlock(rwlock)?;
this.write_scalar(Scalar::from_i32(result), dest)?;
this.pthread_rwlock_rdlock(rwlock, dest)?;
}
"pthread_rwlock_tryrdlock" => {
let [rwlock] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
@ -456,8 +454,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
}
"pthread_rwlock_wrlock" => {
let [rwlock] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
let result = this.pthread_rwlock_wrlock(rwlock)?;
this.write_scalar(Scalar::from_i32(result), dest)?;
this.pthread_rwlock_wrlock(rwlock, dest)?;
}
"pthread_rwlock_trywrlock" => {
let [rwlock] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
@ -513,8 +510,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
}
"pthread_cond_wait" => {
let [cond, mutex] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
let result = this.pthread_cond_wait(cond, mutex)?;
this.write_scalar(Scalar::from_i32(result), dest)?;
this.pthread_cond_wait(cond, mutex, dest)?;
}
"pthread_cond_timedwait" => {
let [cond, mutex, abstime] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;

View File

@ -1,6 +1,5 @@
use std::time::SystemTime;
use crate::concurrency::thread::MachineCallback;
use crate::*;
/// Implementation of the SYS_futex syscall.
@ -32,7 +31,6 @@ pub fn futex<'tcx>(
let op = this.read_scalar(&args[1])?.to_i32()?;
let val = this.read_scalar(&args[2])?.to_i32()?;
let thread = this.get_active_thread();
// This is a vararg function so we have to bring our own type for this pointer.
let addr = this.ptr_to_mplace(addr, this.machine.layouts.i32);
let addr_usize = addr.ptr().addr().bytes();
@ -107,22 +105,18 @@ pub fn futex<'tcx>(
Some(if wait_bitset {
// FUTEX_WAIT_BITSET uses an absolute timestamp.
if realtime {
CallbackTime::RealTime(
SystemTime::UNIX_EPOCH.checked_add(duration).unwrap(),
)
Timeout::RealTime(SystemTime::UNIX_EPOCH.checked_add(duration).unwrap())
} else {
CallbackTime::Monotonic(
Timeout::Monotonic(
this.machine.clock.anchor().checked_add(duration).unwrap(),
)
}
} else {
// FUTEX_WAIT uses a relative timestamp.
if realtime {
CallbackTime::RealTime(SystemTime::now().checked_add(duration).unwrap())
Timeout::RealTime(SystemTime::now().checked_add(duration).unwrap())
} else {
CallbackTime::Monotonic(
this.machine.clock.now().checked_add(duration).unwrap(),
)
Timeout::Monotonic(this.machine.clock.now().checked_add(duration).unwrap())
}
})
};
@ -158,7 +152,7 @@ pub fn futex<'tcx>(
// to see an up-to-date value.
//
// The above case distinction is valid since both FUTEX_WAIT and FUTEX_WAKE
// contain a SeqCst fence, therefore inducting a total order between the operations.
// contain a SeqCst fence, therefore inducing a total order between the operations.
// It is also critical that the fence, the atomic load, and the comparison in FUTEX_WAIT
// altogether happen atomically. If the other thread's fence in FUTEX_WAKE
// gets interleaved after our fence, then we lose the guarantee on the
@ -174,48 +168,16 @@ pub fn futex<'tcx>(
// It's not uncommon for `addr` to be passed as another type than `*mut i32`, such as `*const AtomicI32`.
let futex_val = this.read_scalar_atomic(&addr, AtomicReadOrd::Relaxed)?.to_i32()?;
if val == futex_val {
// The value still matches, so we block the thread make it wait for FUTEX_WAKE.
this.block_thread(thread, BlockReason::Futex { addr: addr_usize });
this.futex_wait(addr_usize, thread, bitset);
// Succesfully waking up from FUTEX_WAIT always returns zero.
this.write_scalar(Scalar::from_target_isize(0, this), dest)?;
// Register a timeout callback if a timeout was specified.
// This callback will override the return value when the timeout triggers.
if let Some(timeout_time) = timeout_time {
struct Callback<'tcx> {
thread: ThreadId,
addr_usize: u64,
dest: MPlaceTy<'tcx, Provenance>,
}
impl<'tcx> VisitProvenance for Callback<'tcx> {
fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
let Callback { thread: _, addr_usize: _, dest } = self;
dest.visit_provenance(visit);
}
}
impl<'mir, 'tcx: 'mir> MachineCallback<'mir, 'tcx> for Callback<'tcx> {
fn call(&self, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
this.unblock_thread(
self.thread,
BlockReason::Futex { addr: self.addr_usize },
);
this.futex_remove_waiter(self.addr_usize, self.thread);
let etimedout = this.eval_libc("ETIMEDOUT");
this.set_last_error(etimedout)?;
this.write_scalar(Scalar::from_target_isize(-1, this), &self.dest)?;
Ok(())
}
}
this.register_timeout_callback(
thread,
timeout_time,
Box::new(Callback { thread, addr_usize, dest: dest.clone() }),
);
}
// The value still matches, so we block the thread and make it wait for FUTEX_WAKE.
this.futex_wait(
addr_usize,
bitset,
timeout_time,
Scalar::from_target_isize(0, this), // retval_succ
Scalar::from_target_isize(-1, this), // retval_timeout
dest.clone(),
this.eval_libc("ETIMEDOUT"),
);
} else {
// The futex value doesn't match the expected value, so we return failure
// right away without sleeping: -1 and errno set to EAGAIN.
@ -257,9 +219,7 @@ pub fn futex<'tcx>(
let mut n = 0;
#[allow(clippy::arithmetic_side_effects)]
for _ in 0..val {
if let Some(thread) = this.futex_wake(addr_usize, bitset) {
this.unblock_thread(thread, BlockReason::Futex { addr: addr_usize });
this.unregister_timeout_callback_if_exists(thread);
if this.futex_wake(addr_usize, bitset)? {
n += 1;
} else {
break;

View File

@ -131,7 +131,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let dtor = this.read_pointer(dtor)?;
let dtor = this.get_ptr_fn(dtor)?.as_instance()?;
let data = this.read_scalar(data)?;
let active_thread = this.get_active_thread();
let active_thread = this.active_thread();
this.machine.tls.set_macos_thread_dtor(active_thread, dtor, data)?;
}

View File

@ -3,7 +3,6 @@ use std::time::SystemTime;
use rustc_target::abi::Size;
use crate::concurrency::thread::MachineCallback;
use crate::*;
// pthread_mutexattr_t is either 4 or 8 bytes, depending on the platform.
@ -373,59 +372,6 @@ fn cond_set_clock_id<'mir, 'tcx: 'mir>(
)
}
/// Try to reacquire the mutex associated with the condition variable after we
/// were signaled.
fn reacquire_cond_mutex<'mir, 'tcx: 'mir>(
ecx: &mut MiriInterpCx<'mir, 'tcx>,
thread: ThreadId,
condvar: CondvarId,
mutex: MutexId,
) -> InterpResult<'tcx> {
ecx.unblock_thread(thread, BlockReason::Condvar(condvar));
if ecx.mutex_is_locked(mutex) {
ecx.mutex_enqueue_and_block(mutex, thread);
} else {
ecx.mutex_lock(mutex, thread);
}
Ok(())
}
/// After a thread waiting on a condvar was signaled:
/// Reacquire the conditional variable and remove the timeout callback if any
/// was registered.
fn post_cond_signal<'mir, 'tcx: 'mir>(
ecx: &mut MiriInterpCx<'mir, 'tcx>,
thread: ThreadId,
condvar: CondvarId,
mutex: MutexId,
) -> InterpResult<'tcx> {
reacquire_cond_mutex(ecx, thread, condvar, mutex)?;
// Waiting for the mutex is not included in the waiting time because we need
// to acquire the mutex always even if we get a timeout.
ecx.unregister_timeout_callback_if_exists(thread);
Ok(())
}
/// Release the mutex associated with the condition variable because we are
/// entering the waiting state.
fn release_cond_mutex_and_block<'mir, 'tcx: 'mir>(
ecx: &mut MiriInterpCx<'mir, 'tcx>,
thread: ThreadId,
condvar: CondvarId,
mutex: MutexId,
) -> InterpResult<'tcx> {
assert_eq!(ecx.get_active_thread(), thread);
if let Some(old_locked_count) = ecx.mutex_unlock(mutex) {
if old_locked_count != 1 {
throw_unsup_format!("awaiting on a lock acquired multiple times is not supported");
}
} else {
throw_ub_format!("awaiting on unlocked or owned by a different thread mutex");
}
ecx.block_thread(thread, BlockReason::Condvar(condvar));
Ok(())
}
impl<'mir, 'tcx> EvalContextExt<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
fn pthread_mutexattr_init(
@ -531,19 +477,21 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
Ok(0)
}
fn pthread_mutex_lock(&mut self, mutex_op: &OpTy<'tcx, Provenance>) -> InterpResult<'tcx, i32> {
fn pthread_mutex_lock(
&mut self,
mutex_op: &OpTy<'tcx, Provenance>,
dest: &MPlaceTy<'tcx, Provenance>,
) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
let kind = mutex_get_kind(this, mutex_op)?;
let id = mutex_get_id(this, mutex_op)?;
let active_thread = this.get_active_thread();
if this.mutex_is_locked(id) {
let ret = if this.mutex_is_locked(id) {
let owner_thread = this.mutex_get_owner(id);
if owner_thread != active_thread {
// Enqueue the active thread.
this.mutex_enqueue_and_block(id, active_thread);
Ok(0)
if owner_thread != this.active_thread() {
this.mutex_enqueue_and_block(id, Scalar::from_i32(0), dest.clone());
return Ok(());
} else {
// Trying to acquire the same mutex again.
if is_mutex_kind_default(this, kind)? {
@ -551,10 +499,10 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
} else if is_mutex_kind_normal(this, kind)? {
throw_machine_stop!(TerminationInfo::Deadlock);
} else if kind == this.eval_libc_i32("PTHREAD_MUTEX_ERRORCHECK") {
Ok(this.eval_libc_i32("EDEADLK"))
this.eval_libc_i32("EDEADLK")
} else if kind == this.eval_libc_i32("PTHREAD_MUTEX_RECURSIVE") {
this.mutex_lock(id, active_thread);
Ok(0)
this.mutex_lock(id);
0
} else {
throw_unsup_format!(
"called pthread_mutex_lock on an unsupported type of mutex"
@ -563,9 +511,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
}
} else {
// The mutex is unlocked. Let's lock it.
this.mutex_lock(id, active_thread);
Ok(0)
}
this.mutex_lock(id);
0
};
this.write_scalar(Scalar::from_i32(ret), dest)?;
Ok(())
}
fn pthread_mutex_trylock(
@ -576,11 +526,10 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let kind = mutex_get_kind(this, mutex_op)?;
let id = mutex_get_id(this, mutex_op)?;
let active_thread = this.get_active_thread();
if this.mutex_is_locked(id) {
let owner_thread = this.mutex_get_owner(id);
if owner_thread != active_thread {
if owner_thread != this.active_thread() {
Ok(this.eval_libc_i32("EBUSY"))
} else {
if is_mutex_kind_default(this, kind)?
@ -589,7 +538,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
{
Ok(this.eval_libc_i32("EBUSY"))
} else if kind == this.eval_libc_i32("PTHREAD_MUTEX_RECURSIVE") {
this.mutex_lock(id, active_thread);
this.mutex_lock(id);
Ok(0)
} else {
throw_unsup_format!(
@ -599,7 +548,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
}
} else {
// The mutex is unlocked. Let's lock it.
this.mutex_lock(id, active_thread);
this.mutex_lock(id);
Ok(0)
}
}
@ -613,7 +562,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let kind = mutex_get_kind(this, mutex_op)?;
let id = mutex_get_id(this, mutex_op)?;
if let Some(_old_locked_count) = this.mutex_unlock(id) {
if let Some(_old_locked_count) = this.mutex_unlock(id)? {
// The mutex was locked by the current thread.
Ok(0)
} else {
@ -666,19 +615,20 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
fn pthread_rwlock_rdlock(
&mut self,
rwlock_op: &OpTy<'tcx, Provenance>,
) -> InterpResult<'tcx, i32> {
dest: &MPlaceTy<'tcx, Provenance>,
) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
let id = rwlock_get_id(this, rwlock_op)?;
let active_thread = this.get_active_thread();
if this.rwlock_is_write_locked(id) {
this.rwlock_enqueue_and_block_reader(id, active_thread);
Ok(0)
this.rwlock_enqueue_and_block_reader(id, Scalar::from_i32(0), dest.clone());
} else {
this.rwlock_reader_lock(id, active_thread);
Ok(0)
this.rwlock_reader_lock(id);
this.write_null(dest)?;
}
Ok(())
}
fn pthread_rwlock_tryrdlock(
@ -688,12 +638,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let this = self.eval_context_mut();
let id = rwlock_get_id(this, rwlock_op)?;
let active_thread = this.get_active_thread();
if this.rwlock_is_write_locked(id) {
Ok(this.eval_libc_i32("EBUSY"))
} else {
this.rwlock_reader_lock(id, active_thread);
this.rwlock_reader_lock(id);
Ok(0)
}
}
@ -701,11 +650,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
fn pthread_rwlock_wrlock(
&mut self,
rwlock_op: &OpTy<'tcx, Provenance>,
) -> InterpResult<'tcx, i32> {
dest: &MPlaceTy<'tcx, Provenance>,
) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
let id = rwlock_get_id(this, rwlock_op)?;
let active_thread = this.get_active_thread();
if this.rwlock_is_locked(id) {
// Note: this will deadlock if the lock is already locked by this
@ -720,12 +669,13 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
// report the deadlock only when no thread can continue execution,
// but we could detect that this lock is already locked and report
// an error.)
this.rwlock_enqueue_and_block_writer(id, active_thread);
this.rwlock_enqueue_and_block_writer(id, Scalar::from_i32(0), dest.clone());
} else {
this.rwlock_writer_lock(id, active_thread);
this.rwlock_writer_lock(id);
this.write_null(dest)?;
}
Ok(0)
Ok(())
}
fn pthread_rwlock_trywrlock(
@ -735,12 +685,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let this = self.eval_context_mut();
let id = rwlock_get_id(this, rwlock_op)?;
let active_thread = this.get_active_thread();
if this.rwlock_is_locked(id) {
Ok(this.eval_libc_i32("EBUSY"))
} else {
this.rwlock_writer_lock(id, active_thread);
this.rwlock_writer_lock(id);
Ok(0)
}
}
@ -754,9 +703,9 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let id = rwlock_get_id(this, rwlock_op)?;
#[allow(clippy::if_same_then_else)]
if this.rwlock_reader_unlock(id) {
if this.rwlock_reader_unlock(id)? {
Ok(0)
} else if this.rwlock_writer_unlock(id) {
} else if this.rwlock_writer_unlock(id)? {
Ok(0)
} else {
throw_ub_format!("unlocked an rwlock that was not locked by the active thread");
@ -885,10 +834,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
fn pthread_cond_signal(&mut self, cond_op: &OpTy<'tcx, Provenance>) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let id = cond_get_id(this, cond_op)?;
if let Some((thread, mutex)) = this.condvar_signal(id) {
post_cond_signal(this, thread, id, mutex)?;
}
this.condvar_signal(id)?;
Ok(0)
}
@ -898,11 +844,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
) -> InterpResult<'tcx, i32> {
let this = self.eval_context_mut();
let id = cond_get_id(this, cond_op)?;
while let Some((thread, mutex)) = this.condvar_signal(id) {
post_cond_signal(this, thread, id, mutex)?;
}
while this.condvar_signal(id)? {}
Ok(0)
}
@ -910,17 +852,23 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
&mut self,
cond_op: &OpTy<'tcx, Provenance>,
mutex_op: &OpTy<'tcx, Provenance>,
) -> InterpResult<'tcx, i32> {
dest: &MPlaceTy<'tcx, Provenance>,
) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
let id = cond_get_id(this, cond_op)?;
let mutex_id = mutex_get_id(this, mutex_op)?;
let active_thread = this.get_active_thread();
release_cond_mutex_and_block(this, active_thread, id, mutex_id)?;
this.condvar_wait(id, active_thread, mutex_id);
this.condvar_wait(
id,
mutex_id,
None, // no timeout
Scalar::from_i32(0),
Scalar::from_i32(0), // retval_timeout -- unused
dest.clone(),
)?;
Ok(0)
Ok(())
}
fn pthread_cond_timedwait(
@ -934,7 +882,6 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let id = cond_get_id(this, cond_op)?;
let mutex_id = mutex_get_id(this, mutex_op)?;
let active_thread = this.get_active_thread();
// Extract the timeout.
let clock_id = cond_get_clock_id(this, cond_op)?;
@ -948,61 +895,23 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
return Ok(());
}
};
let timeout_time = if is_cond_clock_realtime(this, clock_id) {
this.check_no_isolation("`pthread_cond_timedwait` with `CLOCK_REALTIME`")?;
CallbackTime::RealTime(SystemTime::UNIX_EPOCH.checked_add(duration).unwrap())
Timeout::RealTime(SystemTime::UNIX_EPOCH.checked_add(duration).unwrap())
} else if clock_id == this.eval_libc_i32("CLOCK_MONOTONIC") {
CallbackTime::Monotonic(this.machine.clock.anchor().checked_add(duration).unwrap())
Timeout::Monotonic(this.machine.clock.anchor().checked_add(duration).unwrap())
} else {
throw_unsup_format!("unsupported clock id: {}", clock_id);
};
release_cond_mutex_and_block(this, active_thread, id, mutex_id)?;
this.condvar_wait(id, active_thread, mutex_id);
// We return success for now and override it in the timeout callback.
this.write_scalar(Scalar::from_i32(0), dest)?;
struct Callback<'tcx> {
active_thread: ThreadId,
mutex_id: MutexId,
id: CondvarId,
dest: MPlaceTy<'tcx, Provenance>,
}
impl<'tcx> VisitProvenance for Callback<'tcx> {
fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
let Callback { active_thread: _, mutex_id: _, id: _, dest } = self;
dest.visit_provenance(visit);
}
}
impl<'mir, 'tcx: 'mir> MachineCallback<'mir, 'tcx> for Callback<'tcx> {
fn call(&self, ecx: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
assert_eq!(self.active_thread, ecx.get_active_thread());
// We are not waiting for the condvar any more, wait for the
// mutex instead.
reacquire_cond_mutex(ecx, self.active_thread, self.id, self.mutex_id)?;
// Remove the thread from the conditional variable.
ecx.condvar_remove_waiter(self.id, self.active_thread);
// Set the return value: we timed out.
let etimedout = ecx.eval_libc("ETIMEDOUT");
ecx.write_scalar(etimedout, &self.dest)?;
Ok(())
}
}
// Register the timeout callback.
let dest = dest.clone();
this.register_timeout_callback(
active_thread,
timeout_time,
Box::new(Callback { active_thread, mutex_id, id, dest }),
);
this.condvar_wait(
id,
mutex_id,
Some(timeout_time),
Scalar::from_i32(0),
this.eval_libc("ETIMEDOUT"), // retval_timeout
dest.clone(),
)?;
Ok(())
}

View File

@ -63,7 +63,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
fn pthread_self(&mut self) -> InterpResult<'tcx, Scalar<Provenance>> {
let this = self.eval_context_mut();
let thread_id = this.get_active_thread();
let thread_id = this.active_thread();
Ok(Scalar::from_uint(thread_id.to_u32(), this.libc_ty_layout("pthread_t").size))
}

View File

@ -354,7 +354,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
"TlsGetValue" => {
let [key] = this.check_shim(abi, Abi::System { unwind: false }, link_name, args)?;
let key = u128::from(this.read_scalar(key)?.to_u32()?);
let active_thread = this.get_active_thread();
let active_thread = this.active_thread();
let ptr = this.machine.tls.load_tls(key, active_thread, this)?;
this.write_scalar(ptr, dest)?;
}
@ -362,7 +362,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let [key, new_ptr] =
this.check_shim(abi, Abi::System { unwind: false }, link_name, args)?;
let key = u128::from(this.read_scalar(key)?.to_u32()?);
let active_thread = this.get_active_thread();
let active_thread = this.active_thread();
let new_data = this.read_scalar(new_ptr)?;
this.machine.tls.store_tls(key, active_thread, new_data, &*this.tcx)?;
@ -423,8 +423,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
"InitOnceBeginInitialize" => {
let [ptr, flags, pending, context] =
this.check_shim(abi, Abi::System { unwind: false }, link_name, args)?;
let result = this.InitOnceBeginInitialize(ptr, flags, pending, context)?;
this.write_scalar(result, dest)?;
this.InitOnceBeginInitialize(ptr, flags, pending, context, dest)?;
}
"InitOnceComplete" => {
let [ptr, flags, context] =
@ -502,7 +501,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let thread = match Handle::from_scalar(handle, this)? {
Some(Handle::Thread(thread)) => thread,
Some(Handle::Pseudo(PseudoHandle::CurrentThread)) => this.get_active_thread(),
Some(Handle::Pseudo(PseudoHandle::CurrentThread)) => this.active_thread(),
_ => this.invalid_handle("SetThreadDescription")?,
};
@ -520,7 +519,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let thread = match Handle::from_scalar(handle, this)? {
Some(Handle::Thread(thread)) => thread,
Some(Handle::Pseudo(PseudoHandle::CurrentThread)) => this.get_active_thread(),
Some(Handle::Pseudo(PseudoHandle::CurrentThread)) => this.active_thread(),
_ => this.invalid_handle("SetThreadDescription")?,
};
// Looks like the default thread name is empty.

View File

@ -3,7 +3,6 @@ use std::time::Duration;
use rustc_target::abi::Size;
use crate::concurrency::init_once::InitOnceStatus;
use crate::concurrency::thread::MachineCallback;
use crate::*;
impl<'mir, 'tcx> EvalContextExtPriv<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
@ -18,6 +17,31 @@ trait EvalContextExtPriv<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let this = self.eval_context_mut();
this.init_once_get_or_create_id(init_once_op, this.windows_ty_layout("INIT_ONCE"), 0)
}
/// Returns `true` if we were succssful, `false` if we would block.
fn init_once_try_begin(
&mut self,
id: InitOnceId,
pending_place: &MPlaceTy<'tcx, Provenance>,
dest: &MPlaceTy<'tcx, Provenance>,
) -> InterpResult<'tcx, bool> {
let this = self.eval_context_mut();
Ok(match this.init_once_status(id) {
InitOnceStatus::Uninitialized => {
this.init_once_begin(id);
this.write_scalar(this.eval_windows("c", "TRUE"), pending_place)?;
this.write_scalar(this.eval_windows("c", "TRUE"), dest)?;
true
}
InitOnceStatus::Complete => {
this.init_once_observe_completed(id);
this.write_scalar(this.eval_windows("c", "FALSE"), pending_place)?;
this.write_scalar(this.eval_windows("c", "TRUE"), dest)?;
true
}
InitOnceStatus::Begun => false,
})
}
}
impl<'mir, 'tcx> EvalContextExt<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
@ -29,9 +53,9 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
flags_op: &OpTy<'tcx, Provenance>,
pending_op: &OpTy<'tcx, Provenance>,
context_op: &OpTy<'tcx, Provenance>,
) -> InterpResult<'tcx, Scalar<Provenance>> {
dest: &MPlaceTy<'tcx, Provenance>,
) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
let active_thread = this.get_active_thread();
let id = this.init_once_get_id(init_once_op)?;
let flags = this.read_scalar(flags_op)?.to_u32()?;
@ -46,58 +70,34 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
throw_unsup_format!("non-null `lpContext` in `InitOnceBeginInitialize`");
}
match this.init_once_status(id) {
InitOnceStatus::Uninitialized => {
this.init_once_begin(id);
this.write_scalar(this.eval_windows("c", "TRUE"), &pending_place)?;
}
InitOnceStatus::Begun => {
// Someone else is already on it.
// Block this thread until they are done.
// When we are woken up, set the `pending` flag accordingly.
struct Callback<'tcx> {
init_once_id: InitOnceId,
pending_place: MPlaceTy<'tcx, Provenance>,
}
impl<'tcx> VisitProvenance for Callback<'tcx> {
fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
let Callback { init_once_id: _, pending_place } = self;
pending_place.visit_provenance(visit);
}
}
impl<'mir, 'tcx> MachineCallback<'mir, 'tcx> for Callback<'tcx> {
fn call(&self, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
let pending = match this.init_once_status(self.init_once_id) {
InitOnceStatus::Uninitialized =>
unreachable!(
"status should have either been set to begun or complete"
),
InitOnceStatus::Begun => this.eval_windows("c", "TRUE"),
InitOnceStatus::Complete => this.eval_windows("c", "FALSE"),
};
this.write_scalar(pending, &self.pending_place)?;
Ok(())
}
}
this.init_once_enqueue_and_block(
id,
active_thread,
Box::new(Callback { init_once_id: id, pending_place }),
)
}
InitOnceStatus::Complete => {
this.init_once_observe_completed(id);
this.write_scalar(this.eval_windows("c", "FALSE"), &pending_place)?;
}
if this.init_once_try_begin(id, &pending_place, dest)? {
// Done!
return Ok(());
}
// This always succeeds (even if the thread is blocked, we will succeed if we ever unblock).
Ok(this.eval_windows("c", "TRUE"))
// We have to block, and then try again when we are woken up.
this.init_once_enqueue_and_block(id, Callback { id, pending_place, dest: dest.clone() });
return Ok(());
struct Callback<'tcx> {
id: InitOnceId,
pending_place: MPlaceTy<'tcx, Provenance>,
dest: MPlaceTy<'tcx, Provenance>,
}
impl<'tcx> VisitProvenance for Callback<'tcx> {
fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
let Callback { id: _, dest, pending_place } = self;
pending_place.visit_provenance(visit);
dest.visit_provenance(visit);
}
}
impl<'mir, 'tcx> UnblockCallback<'mir, 'tcx> for Callback<'tcx> {
fn unblock(self: Box<Self>, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
let ret = this.init_once_try_begin(self.id, &self.pending_place, &self.dest)?;
assert!(ret, "we were woken up but init_once_try_begin still failed");
Ok(())
}
}
}
fn InitOnceComplete(
@ -155,7 +155,6 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let size = this.read_target_usize(size_op)?;
let timeout_ms = this.read_scalar(timeout_op)?.to_u32()?;
let thread = this.get_active_thread();
let addr = ptr.addr().bytes();
if size > 8 || !size.is_power_of_two() {
@ -170,7 +169,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
None
} else {
let duration = Duration::from_millis(timeout_ms.into());
Some(CallbackTime::Monotonic(this.machine.clock.now().checked_add(duration).unwrap()))
Some(Timeout::Monotonic(this.machine.clock.now().checked_add(duration).unwrap()))
};
// See the Linux futex implementation for why this fence exists.
@ -183,41 +182,15 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
if futex_val == compare_val {
// If the values are the same, we have to block.
this.block_thread(thread, BlockReason::Futex { addr });
this.futex_wait(addr, thread, u32::MAX);
if let Some(timeout_time) = timeout_time {
struct Callback<'tcx> {
thread: ThreadId,
addr: u64,
dest: MPlaceTy<'tcx, Provenance>,
}
impl<'tcx> VisitProvenance for Callback<'tcx> {
fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
let Callback { thread: _, addr: _, dest } = self;
dest.visit_provenance(visit);
}
}
impl<'mir, 'tcx: 'mir> MachineCallback<'mir, 'tcx> for Callback<'tcx> {
fn call(&self, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> {
this.unblock_thread(self.thread, BlockReason::Futex { addr: self.addr });
this.futex_remove_waiter(self.addr, self.thread);
let error_timeout = this.eval_windows("c", "ERROR_TIMEOUT");
this.set_last_error(error_timeout)?;
this.write_scalar(Scalar::from_i32(0), &self.dest)?;
Ok(())
}
}
this.register_timeout_callback(
thread,
timeout_time,
Box::new(Callback { thread, addr, dest: dest.clone() }),
);
}
this.futex_wait(
addr,
u32::MAX, // bitset
timeout_time,
Scalar::from_i32(1), // retval_succ
Scalar::from_i32(0), // retval_timeout
dest.clone(),
this.eval_windows("c", "ERROR_TIMEOUT"),
);
}
this.write_scalar(Scalar::from_i32(1), dest)?;
@ -234,10 +207,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
this.atomic_fence(AtomicFenceOrd::SeqCst)?;
let addr = ptr.addr().bytes();
if let Some(thread) = this.futex_wake(addr, u32::MAX) {
this.unblock_thread(thread, BlockReason::Futex { addr });
this.unregister_timeout_callback_if_exists(thread);
}
this.futex_wake(addr, u32::MAX)?;
Ok(())
}
@ -250,10 +220,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
this.atomic_fence(AtomicFenceOrd::SeqCst)?;
let addr = ptr.addr().bytes();
while let Some(thread) = this.futex_wake(addr, u32::MAX) {
this.unblock_thread(thread, BlockReason::Futex { addr });
this.unregister_timeout_callback_if_exists(thread);
}
while this.futex_wake(addr, u32::MAX)? {}
Ok(())
}

View File

@ -69,7 +69,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
Some(Handle::Thread(thread)) => thread,
// Unlike on posix, the outcome of joining the current thread is not documented.
// On current Windows, it just deadlocks.
Some(Handle::Pseudo(PseudoHandle::CurrentThread)) => this.get_active_thread(),
Some(Handle::Pseudo(PseudoHandle::CurrentThread)) => this.active_thread(),
_ => this.invalid_handle("WaitForSingleObject")?,
};