Fixed callback invocation from multiple threads

Closes #97

There was a potential race condition described in #97 that I realized I
had seen occasionally when interacting with an element that was
currently being animated. These were in complex situations, so I thought
I had a situation that could have legitimately caused the warning.

However, this warning is preventing a very specific coding "error", and
that program did not have it. The existing implementation would
potentially prevent one thread's change from invoking its callbacks
because another thread was already executing its callbacks.

This change moves that state into a Mutex/Condvar pair that allows
detecting reentry while allowing other threads to block until its their
turn. When it becomes their turn, they can check whether the callbacks
were invoked with the current value or not to prevent callbacks from
being invoked in quick succeession with the same value by multiple
threads.
This commit is contained in:
Jonathan Johnson 2023-12-19 11:21:55 -08:00
parent 63fd92eea6
commit 4959296e07
No known key found for this signature in database
GPG key ID: A66D6A34D6620579

View file

@ -7,11 +7,10 @@ use std::hash::{BuildHasher, Hash};
use std::ops::{Deref, DerefMut, Not};
use std::panic::UnwindSafe;
use std::str::FromStr;
use std::sync::atomic::{self, AtomicBool};
use std::sync::{Arc, Mutex, MutexGuard, TryLockError, Weak};
use std::task::{Poll, Waker};
use std::thread::ThreadId;
use std::time::Duration;
use std::thread::{self, ThreadId};
use std::time::{Duration, Instant};
use ahash::AHashSet;
use alot::{LotId, Lots};
@ -834,9 +833,9 @@ impl<T> DynamicData<T> {
F: for<'a> FnMut() + Send + 'static,
{
let state = self.state().expect("deadlocked");
let mut callbacks = state.callbacks.callbacks.lock().ignore_poison();
let mut data = state.callbacks.callbacks.lock().ignore_poison();
CallbackHandle {
id: Some(callbacks.push(Box::new(map))),
id: Some(data.callbacks.push(Box::new(map))),
callbacks: state.callbacks.clone(),
}
}
@ -936,8 +935,8 @@ impl PartialEq for CallbackHandle {
impl Drop for CallbackHandle {
fn drop(&mut self) {
if let Some(id) = self.id {
let mut callbacks = self.callbacks.callbacks.lock().ignore_poison();
callbacks.remove(id);
let mut data = self.callbacks.callbacks.lock().ignore_poison();
data.callbacks.remove(id);
}
}
}
@ -966,7 +965,10 @@ impl<T> State<T> {
waker.wake();
}
ChangeCallbacks(self.callbacks.clone())
ChangeCallbacks {
data: self.callbacks.clone(),
changed_at: Instant::now(),
}
}
fn debug(&self, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result
@ -994,34 +996,79 @@ where
#[derive(Default)]
struct ChangeCallbacksData {
callbacks: Mutex<Lots<Box<dyn ValueCallback>>>,
currently_executing: AtomicBool,
callbacks: Mutex<CallbacksList>,
currently_executing: Mutex<Option<ThreadId>>,
sync: UnwindsafeCondvar,
}
struct ChangeCallbacks(Arc<ChangeCallbacksData>);
struct CallbacksList {
callbacks: Lots<Box<dyn ValueCallback>>,
invoked_at: Instant,
}
impl Default for CallbacksList {
fn default() -> Self {
Self {
callbacks: Lots::new(),
invoked_at: Instant::now(),
}
}
}
struct ChangeCallbacks {
data: Arc<ChangeCallbacksData>,
changed_at: Instant,
}
impl Drop for ChangeCallbacks {
fn drop(&mut self) {
if self
.0
.currently_executing
.compare_exchange(
false,
true,
atomic::Ordering::Release,
atomic::Ordering::Acquire,
)
.is_ok()
{
let mut callbacks = self.0.callbacks.lock().ignore_poison();
for callback in &mut *callbacks {
callback.changed();
let mut currently_executing = self.data.currently_executing.lock().expect("lock poisoned");
let current_thread = thread::current().id();
loop {
match &*currently_executing {
None => {
// No other thread is executing these callbacks. Set this
// thread as the current executor so that we can prevent
// infinite cycles.
*currently_executing = Some(current_thread);
drop(currently_executing);
// Invoke the callbacks
let mut state = self.data.callbacks.lock().ignore_poison();
// If the callbacks have already been invoked by another
// thread such that the callbacks observed the value our
// thread wrote, we can skip the callbacks.
if state.invoked_at < self.changed_at {
state.invoked_at = Instant::now();
for callback in &mut state.callbacks {
callback.changed();
}
}
drop(state);
// Remove ourselves as the current executor, notifying any
// other threads that are waiting.
currently_executing =
self.data.currently_executing.lock().expect("lock poisoned");
*currently_executing = None;
drop(currently_executing);
self.data.sync.notify_all();
return;
}
Some(executing) if executing == &current_thread => {
tracing::warn!("Could not invoke dynamic callbacks because they are already running on this thread");
return;
}
Some(_) => {
currently_executing = self
.data
.sync
.wait(currently_executing)
.expect("lock poisoned");
}
}
self.0
.currently_executing
.store(false, atomic::Ordering::Release);
} else {
tracing::warn!("Could not invoke dynamic callbacks because they are already running on this thread");
}
}
}
@ -2447,3 +2494,25 @@ where
}
}
}
#[test]
fn map_cycle_is_finite() {
crate::initialize_tracing();
let a = Dynamic::new(0_usize);
// This callback updates a each time a is updated with a + 1, causing an
// infinite cycle if not broken by Gooey.
a.for_each_cloned({
let a = a.clone();
move |current| {
a.set(current + 1);
}
})
.persist();
// Gooey will invoke the callback for the first set call, but the set call
// within the callback will not cause the callback to be invoked again.
// Thus, we expect setting the value to 1 to result in `a` containing 2.
a.set(1);
assert_eq!(a.get(), 2);
}