From 4959296e076d479cd4c38d49ea00a5841f6230a6 Mon Sep 17 00:00:00 2001 From: Jonathan Johnson Date: Tue, 19 Dec 2023 11:21:55 -0800 Subject: [PATCH] Fixed callback invocation from multiple threads Closes #97 There was a potential race condition described in #97 that I realized I had seen occasionally when interacting with an element that was currently being animated. These were in complex situations, so I thought I had a situation that could have legitimately caused the warning. However, this warning is preventing a very specific coding "error", and that program did not have it. The existing implementation would potentially prevent one thread's change from invoking its callbacks because another thread was already executing its callbacks. This change moves that state into a Mutex/Condvar pair that allows detecting reentry while allowing other threads to block until its their turn. When it becomes their turn, they can check whether the callbacks were invoked with the current value or not to prevent callbacks from being invoked in quick succeession with the same value by multiple threads. --- src/value.rs | 129 +++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 99 insertions(+), 30 deletions(-) diff --git a/src/value.rs b/src/value.rs index 879098b..9ee0a0e 100644 --- a/src/value.rs +++ b/src/value.rs @@ -7,11 +7,10 @@ use std::hash::{BuildHasher, Hash}; use std::ops::{Deref, DerefMut, Not}; use std::panic::UnwindSafe; use std::str::FromStr; -use std::sync::atomic::{self, AtomicBool}; use std::sync::{Arc, Mutex, MutexGuard, TryLockError, Weak}; use std::task::{Poll, Waker}; -use std::thread::ThreadId; -use std::time::Duration; +use std::thread::{self, ThreadId}; +use std::time::{Duration, Instant}; use ahash::AHashSet; use alot::{LotId, Lots}; @@ -834,9 +833,9 @@ impl DynamicData { F: for<'a> FnMut() + Send + 'static, { let state = self.state().expect("deadlocked"); - let mut callbacks = state.callbacks.callbacks.lock().ignore_poison(); + let mut data = state.callbacks.callbacks.lock().ignore_poison(); CallbackHandle { - id: Some(callbacks.push(Box::new(map))), + id: Some(data.callbacks.push(Box::new(map))), callbacks: state.callbacks.clone(), } } @@ -936,8 +935,8 @@ impl PartialEq for CallbackHandle { impl Drop for CallbackHandle { fn drop(&mut self) { if let Some(id) = self.id { - let mut callbacks = self.callbacks.callbacks.lock().ignore_poison(); - callbacks.remove(id); + let mut data = self.callbacks.callbacks.lock().ignore_poison(); + data.callbacks.remove(id); } } } @@ -966,7 +965,10 @@ impl State { waker.wake(); } - ChangeCallbacks(self.callbacks.clone()) + ChangeCallbacks { + data: self.callbacks.clone(), + changed_at: Instant::now(), + } } fn debug(&self, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result @@ -994,34 +996,79 @@ where #[derive(Default)] struct ChangeCallbacksData { - callbacks: Mutex>>, - currently_executing: AtomicBool, + callbacks: Mutex, + currently_executing: Mutex>, + sync: UnwindsafeCondvar, } -struct ChangeCallbacks(Arc); +struct CallbacksList { + callbacks: Lots>, + invoked_at: Instant, +} + +impl Default for CallbacksList { + fn default() -> Self { + Self { + callbacks: Lots::new(), + invoked_at: Instant::now(), + } + } +} + +struct ChangeCallbacks { + data: Arc, + changed_at: Instant, +} impl Drop for ChangeCallbacks { fn drop(&mut self) { - if self - .0 - .currently_executing - .compare_exchange( - false, - true, - atomic::Ordering::Release, - atomic::Ordering::Acquire, - ) - .is_ok() - { - let mut callbacks = self.0.callbacks.lock().ignore_poison(); - for callback in &mut *callbacks { - callback.changed(); + let mut currently_executing = self.data.currently_executing.lock().expect("lock poisoned"); + let current_thread = thread::current().id(); + loop { + match &*currently_executing { + None => { + // No other thread is executing these callbacks. Set this + // thread as the current executor so that we can prevent + // infinite cycles. + *currently_executing = Some(current_thread); + drop(currently_executing); + + // Invoke the callbacks + let mut state = self.data.callbacks.lock().ignore_poison(); + // If the callbacks have already been invoked by another + // thread such that the callbacks observed the value our + // thread wrote, we can skip the callbacks. + if state.invoked_at < self.changed_at { + state.invoked_at = Instant::now(); + for callback in &mut state.callbacks { + callback.changed(); + } + } + drop(state); + + // Remove ourselves as the current executor, notifying any + // other threads that are waiting. + currently_executing = + self.data.currently_executing.lock().expect("lock poisoned"); + *currently_executing = None; + drop(currently_executing); + self.data.sync.notify_all(); + + return; + } + Some(executing) if executing == ¤t_thread => { + tracing::warn!("Could not invoke dynamic callbacks because they are already running on this thread"); + + return; + } + Some(_) => { + currently_executing = self + .data + .sync + .wait(currently_executing) + .expect("lock poisoned"); + } } - self.0 - .currently_executing - .store(false, atomic::Ordering::Release); - } else { - tracing::warn!("Could not invoke dynamic callbacks because they are already running on this thread"); } } } @@ -2447,3 +2494,25 @@ where } } } + +#[test] +fn map_cycle_is_finite() { + crate::initialize_tracing(); + let a = Dynamic::new(0_usize); + + // This callback updates a each time a is updated with a + 1, causing an + // infinite cycle if not broken by Gooey. + a.for_each_cloned({ + let a = a.clone(); + move |current| { + a.set(current + 1); + } + }) + .persist(); + + // Gooey will invoke the callback for the first set call, but the set call + // within the callback will not cause the callback to be invoked again. + // Thus, we expect setting the value to 1 to result in `a` containing 2. + a.set(1); + assert_eq!(a.get(), 2); +}