Refactored Dymamics to be weak + drop bug fixes

After adding weak_clone, I realized that all map functions should use
weak references anyways. In the process of implementing this, I ran
tests a lot and found other edge cases where I hadn't properly reasoned
about Drop behavior.

While some of the state cleanup is a bit overkill at the moment, this is
in anticipation of wanting a WeakDynamicReader-like type.
This commit is contained in:
Jonathan Johnson 2024-01-01 12:35:21 -08:00
parent e4dfd9320d
commit a9a41a1582
No known key found for this signature in database
GPG key ID: A66D6A34D6620579
5 changed files with 90 additions and 56 deletions

View file

@ -31,12 +31,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- A potential edge case where a `DynamicReader` would not return after being
disconnected has been removed.
- [#120][120]: Dividing a `ZeroToOne` now properly checks for `NaN` and `0.`.
- Removed a possible deadlock when using `DynamicReader::block_until_updated`.
- Removed an edge case ensuring `Waker`s are signaled for `DynamicReader`s that
are waiting for value when the last `Dynamic` is dropped.
### Changed
- `WidgetCacheKey` now includes the `KludgineId` of the context it was created
from. This ensures if a `WidgetInstance` moves or is shared between windows,
the cache is invalidated.
- All `Dynamic` mapping functions now utilize weak references, and clean up as
necessary if a value is not able to be upgraded.
### Added
@ -57,10 +62,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `Window::titled` allows setting a window's title, and can be provided a
string-type or a `Dynamic<String>` to allow updating the title while the
window is open.
- `Dynamic::weak_clone` returns a new dynamic that is updated from the original
dynamic, but is careful to not add any strong references to the source
`Dynamic`. This allows breaking dynamic graphs into independent "sections"
that can be deallocated independently of other graphs it is connected with.
- `DynamicReader::on_disconnect` allows attaching a callback that is invoked
once the final source `Dynamic` is dropped.
- `Dynamic::instances()` returns the number of clones the dynamic has in

4
Cargo.lock generated
View file

@ -2208,9 +2208,9 @@ checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe"
[[package]]
name = "similar"
version = "2.3.0"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2aeaf503862c419d66959f5d7ca015337d864e9c49485d771b732e2a20453597"
checksum = "32fea41aca09ee824cc9724996433064c89f7777e60762749a4170a14abbfa21"
[[package]]
name = "siphasher"

View file

@ -1,4 +1,5 @@
[workspace]
members = ["cushy-macros"]
[package]
name = "cushy"

View file

@ -53,7 +53,7 @@ impl DebugContext {
section.values.lock().push(Box::new(RegisteredValue {
label: label.into(),
value: reader.clone(),
widget: make_observer(value.weak_clone()).make_widget(),
widget: make_observer(value.clone()).make_widget(),
}))
});
let this = self.clone();

View file

@ -261,28 +261,6 @@ impl<T> Dynamic<T> {
self.map_each(|value| U::from(value))
}
/// Returns a new dynamic that contains a weak clone of `self`'s contents.
///
/// The returned `Dynamic` does not use any strong references, ensuring the
/// returned dynamic does not extend the lifetime of `self`.
#[must_use]
pub fn weak_clone(&self) -> Self
where
T: Clone + Send + 'static,
{
let weak_source = self.downgrade();
let weak_out = Dynamic::new(self.get());
weak_out.set_source(self.0.for_each({
let weak_out = weak_out.clone();
move || {
if let Some(source) = weak_source.upgrade() {
*weak_out.lock() = source.get();
}
}
}));
weak_out
}
/// Attaches `for_each` to this value so that it is invoked each time the
/// value's contents are updated.
pub fn for_each<F>(&self, mut for_each: F) -> CallbackHandle
@ -290,9 +268,11 @@ impl<T> Dynamic<T> {
T: Send + 'static,
F: for<'a> FnMut(&'a T) + Send + 'static,
{
let this = self.clone();
let this = self.downgrade();
self.0.for_each(move || {
let this = this.upgrade().ok_or(CallbackDisconnected)?;
this.map_ref(&mut for_each);
Ok(())
})
}
@ -303,9 +283,11 @@ impl<T> Dynamic<T> {
T: Send + 'static,
F: for<'a> FnMut(&'a GenerationalValue<T>) + Send + 'static,
{
let this = self.clone();
let this = self.downgrade();
self.0.for_each(move || {
let this = this.upgrade().ok_or(CallbackDisconnected)?;
this.map_generational(&mut for_each);
Ok(())
})
}
@ -316,9 +298,11 @@ impl<T> Dynamic<T> {
T: Clone + Send + 'static,
F: FnMut(T) + Send + 'static,
{
let this = self.clone();
let this = self.downgrade();
self.0.for_each(move || {
let this = this.upgrade().ok_or(CallbackDisconnected)?;
for_each(this.get());
Ok(())
})
}
@ -342,8 +326,11 @@ impl<T> Dynamic<T> {
F: for<'a> FnMut(&'a T) -> R + Send + 'static,
R: PartialEq + Send + 'static,
{
let this = self.clone();
self.0.map_each(move || this.map_ref(&mut map))
let this = self.downgrade();
self.0.map_each(move || {
let this = this.upgrade().ok_or(CallbackDisconnected)?;
Ok(this.map_ref(&mut map))
})
}
/// Creates a new dynamic value that contains the result of invoking `map`
@ -354,8 +341,11 @@ impl<T> Dynamic<T> {
F: for<'a> FnMut(&'a GenerationalValue<T>) -> R + Send + 'static,
R: PartialEq + Send + 'static,
{
let this = self.clone();
self.0.map_each(move || this.map_generational(&mut map))
let this = self.downgrade();
self.0.map_each(move || {
let this = this.upgrade().ok_or(CallbackDisconnected)?;
Ok(this.map_generational(&mut map))
})
}
/// Creates a new dynamic value that contains the result of invoking `map`
@ -366,8 +356,11 @@ impl<T> Dynamic<T> {
F: FnMut(T) -> R + Send + 'static,
R: PartialEq + Send + 'static,
{
let this = self.clone();
self.0.map_each(move || map(this.get()))
let this = self.downgrade();
self.0.map_each(move || {
let this = this.upgrade().ok_or(CallbackDisconnected)?;
Ok(map(this.get()))
})
}
/// A helper function that invokes `with_clone` with a clone of self. This
@ -811,12 +804,9 @@ impl<T> Drop for Dynamic<T> {
// `Dynamic`.
if let Ok(mut state) = self.state() {
if Arc::strong_count(&self.0) == state.readers + 1 {
let on_disconnect = state.on_disconnect.take();
let cleanup = state.cleanup();
drop(state);
for on_disconnect in on_disconnect.into_iter().flatten() {
on_disconnect.invoke(());
}
drop(cleanup);
self.0.sync.notify_all();
}
@ -962,7 +952,7 @@ impl<T> DynamicData<T> {
pub fn for_each<F>(&self, map: F) -> CallbackHandle
where
F: for<'a> FnMut() + Send + 'static,
F: for<'a> FnMut() -> Result<(), CallbackDisconnected> + Send + 'static,
{
let state = self.state().expect("deadlocked");
let mut data = state.callbacks.callbacks.lock().ignore_poison();
@ -974,15 +964,16 @@ impl<T> DynamicData<T> {
pub fn map_each<R, F>(&self, mut map: F) -> Dynamic<R>
where
F: for<'a> FnMut() -> R + Send + 'static,
F: for<'a> FnMut() -> Result<R, CallbackDisconnected> + Send + 'static,
R: PartialEq + Send + 'static,
{
let initial_value = map();
let initial_value = map().expect("initial map returned Disconnected");
let mapped_value = Dynamic::new(initial_value);
let returned = mapped_value.clone();
let callback = self.for_each(move || {
mapped_value.set(map());
mapped_value.set(map()?);
Ok(())
});
returned.set_source(callback);
@ -991,6 +982,10 @@ impl<T> DynamicData<T> {
}
}
/// A callback function is no longer connected to its source.
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct CallbackDisconnected;
struct DebugDynamicData<'a, T>(&'a Arc<DynamicData<T>>);
impl<T> Debug for DebugDynamicData<'_, T>
@ -1215,6 +1210,39 @@ impl<T> State<T> {
.field("generation", &self.wrapped.generation.0)
.finish()
}
#[must_use]
fn cleanup(&mut self) -> StateCleanup {
StateCleanup {
on_disconnect: self.on_disconnect.take(),
wakers: std::mem::take(&mut self.wakers),
}
}
}
impl<T> Drop for State<T> {
fn drop(&mut self) {
// Ensure any disconnections that didn't fire due to deadlocking still
// are invoked.
drop(self.cleanup());
}
}
struct StateCleanup {
on_disconnect: Option<Vec<OnceCallback>>,
wakers: Vec<Waker>,
}
impl Drop for StateCleanup {
fn drop(&mut self) {
for on_disconnect in self.on_disconnect.take().into_iter().flatten() {
on_disconnect.invoke(());
}
for waker in self.wakers.drain(..) {
waker.wake();
}
}
}
impl<T> Debug for State<T>
@ -1275,9 +1303,11 @@ impl Drop for ChangeCallbacks {
// thread wrote, we can skip the callbacks.
if state.invoked_at < self.changed_at {
state.invoked_at = Instant::now();
for callback in &mut state.callbacks {
callback.changed();
}
// Invoke all callbacks, removing those that report an
// error.
state
.callbacks
.drain_filter(|callback| callback.changed().is_err());
}
drop(state);
@ -1309,15 +1339,15 @@ impl Drop for ChangeCallbacks {
}
trait ValueCallback: Send {
fn changed(&mut self);
fn changed(&mut self) -> Result<(), CallbackDisconnected>;
}
impl<F> ValueCallback for F
where
F: for<'a> FnMut() + Send + 'static,
F: for<'a> FnMut() -> Result<(), CallbackDisconnected> + Send + 'static,
{
fn changed(&mut self) {
self();
fn changed(&mut self) -> Result<(), CallbackDisconnected> {
self()
}
}
@ -2144,7 +2174,7 @@ macro_rules! impl_tuple_for_each {
}
};
($self:ident $for_each:ident $handles:ident [] [$type:ident $field:tt $var:ident]) => {
$handles += $self.$field.for_each(move |field: &$type| $for_each((field,)));
$handles += $self.$field.for_each(move |field| $for_each((field,)));
};
($self:ident $for_each:ident $handles:ident [] [$($type:ident $field:tt $var:ident),+]) => {
let $for_each = Arc::new(Mutex::new($for_each));
@ -2221,7 +2251,9 @@ impl_all_tuples!(impl_tuple_for_each);
/// function.
pub trait MapEach<T, U> {
/// The borrowed representation of `T` passed into the mapping function.
type Ref<'a>;
type Ref<'a>
where
T: 'a;
/// Apply `map_each` to each value in `self`, storing the result in the
/// returned dynamic.