diff --git a/src/value.rs b/src/value.rs index 879098b..9ee0a0e 100644 --- a/src/value.rs +++ b/src/value.rs @@ -7,11 +7,10 @@ use std::hash::{BuildHasher, Hash}; use std::ops::{Deref, DerefMut, Not}; use std::panic::UnwindSafe; use std::str::FromStr; -use std::sync::atomic::{self, AtomicBool}; use std::sync::{Arc, Mutex, MutexGuard, TryLockError, Weak}; use std::task::{Poll, Waker}; -use std::thread::ThreadId; -use std::time::Duration; +use std::thread::{self, ThreadId}; +use std::time::{Duration, Instant}; use ahash::AHashSet; use alot::{LotId, Lots}; @@ -834,9 +833,9 @@ impl DynamicData { F: for<'a> FnMut() + Send + 'static, { let state = self.state().expect("deadlocked"); - let mut callbacks = state.callbacks.callbacks.lock().ignore_poison(); + let mut data = state.callbacks.callbacks.lock().ignore_poison(); CallbackHandle { - id: Some(callbacks.push(Box::new(map))), + id: Some(data.callbacks.push(Box::new(map))), callbacks: state.callbacks.clone(), } } @@ -936,8 +935,8 @@ impl PartialEq for CallbackHandle { impl Drop for CallbackHandle { fn drop(&mut self) { if let Some(id) = self.id { - let mut callbacks = self.callbacks.callbacks.lock().ignore_poison(); - callbacks.remove(id); + let mut data = self.callbacks.callbacks.lock().ignore_poison(); + data.callbacks.remove(id); } } } @@ -966,7 +965,10 @@ impl State { waker.wake(); } - ChangeCallbacks(self.callbacks.clone()) + ChangeCallbacks { + data: self.callbacks.clone(), + changed_at: Instant::now(), + } } fn debug(&self, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result @@ -994,34 +996,79 @@ where #[derive(Default)] struct ChangeCallbacksData { - callbacks: Mutex>>, - currently_executing: AtomicBool, + callbacks: Mutex, + currently_executing: Mutex>, + sync: UnwindsafeCondvar, } -struct ChangeCallbacks(Arc); +struct CallbacksList { + callbacks: Lots>, + invoked_at: Instant, +} + +impl Default for CallbacksList { + fn default() -> Self { + Self { + callbacks: Lots::new(), + invoked_at: Instant::now(), + } + } +} + +struct ChangeCallbacks { + data: Arc, + changed_at: Instant, +} impl Drop for ChangeCallbacks { fn drop(&mut self) { - if self - .0 - .currently_executing - .compare_exchange( - false, - true, - atomic::Ordering::Release, - atomic::Ordering::Acquire, - ) - .is_ok() - { - let mut callbacks = self.0.callbacks.lock().ignore_poison(); - for callback in &mut *callbacks { - callback.changed(); + let mut currently_executing = self.data.currently_executing.lock().expect("lock poisoned"); + let current_thread = thread::current().id(); + loop { + match &*currently_executing { + None => { + // No other thread is executing these callbacks. Set this + // thread as the current executor so that we can prevent + // infinite cycles. + *currently_executing = Some(current_thread); + drop(currently_executing); + + // Invoke the callbacks + let mut state = self.data.callbacks.lock().ignore_poison(); + // If the callbacks have already been invoked by another + // thread such that the callbacks observed the value our + // thread wrote, we can skip the callbacks. + if state.invoked_at < self.changed_at { + state.invoked_at = Instant::now(); + for callback in &mut state.callbacks { + callback.changed(); + } + } + drop(state); + + // Remove ourselves as the current executor, notifying any + // other threads that are waiting. + currently_executing = + self.data.currently_executing.lock().expect("lock poisoned"); + *currently_executing = None; + drop(currently_executing); + self.data.sync.notify_all(); + + return; + } + Some(executing) if executing == ¤t_thread => { + tracing::warn!("Could not invoke dynamic callbacks because they are already running on this thread"); + + return; + } + Some(_) => { + currently_executing = self + .data + .sync + .wait(currently_executing) + .expect("lock poisoned"); + } } - self.0 - .currently_executing - .store(false, atomic::Ordering::Release); - } else { - tracing::warn!("Could not invoke dynamic callbacks because they are already running on this thread"); } } } @@ -2447,3 +2494,25 @@ where } } } + +#[test] +fn map_cycle_is_finite() { + crate::initialize_tracing(); + let a = Dynamic::new(0_usize); + + // This callback updates a each time a is updated with a + 1, causing an + // infinite cycle if not broken by Gooey. + a.for_each_cloned({ + let a = a.clone(); + move |current| { + a.set(current + 1); + } + }) + .persist(); + + // Gooey will invoke the callback for the first set call, but the set call + // within the callback will not cause the callback to be invoked again. + // Thus, we expect setting the value to 1 to result in `a` containing 2. + a.set(1); + assert_eq!(a.get(), 2); +}