Moved frame mapping to its own thread

This ensures frames render consistently.

This change only affects the animation recorder. The regular virtual
recorder interface blocks when refresh is called.
This commit is contained in:
Jonathan Johnson 2024-01-03 20:15:00 -08:00
parent bc52be440f
commit 36b80e8f34
No known key found for this signature in database
GPG key ID: A66D6A34D6620579
2 changed files with 184 additions and 103 deletions

View file

@ -10,7 +10,6 @@ fn ui() -> impl MakeWidget {
}
fn main() {
// The default recorder generated solid, rgb images.
let mut recorder = ui()
.build_recorder()
.size(Size::new(320, 240))
@ -32,7 +31,5 @@ fn main() {
.animate_cursor_to(initial_point, Duration::from_millis(250), EaseInOutSine)
.unwrap();
animation.wait_for(Duration::from_millis(500)).unwrap();
animation
.write_to("examples/offscreen-animated.png")
.unwrap();
animation.write_to("examples/offscreen-apng.png").unwrap();
}

View file

@ -4,12 +4,13 @@ use std::cell::RefCell;
use std::collections::hash_map;
use std::ffi::OsStr;
use std::hash::Hash;
use std::io;
use std::marker::PhantomData;
use std::num::TryFromIntError;
use std::ops::{Deref, DerefMut, Not};
use std::path::Path;
use std::string::ToString;
use std::sync::{Arc, Condvar, Mutex, MutexGuard, OnceLock};
use std::sync::{mpsc, Arc, Mutex, MutexGuard, OnceLock};
use std::time::{Duration, Instant};
use ahash::AHashMap;
@ -2793,18 +2794,75 @@ where
}
struct Capture {
bytes: u64,
bytes_per_row: u32,
buffer: wgpu::Buffer,
texture: Texture,
multisample: Texture,
}
impl Capture {
fn map_into<Format>(
&self,
buffer: &mut Vec<u8>,
device: &wgpu::Device,
queue: &wgpu::Queue,
) -> Result<(), wgpu::BufferAsyncError>
where
Format: CaptureFormat,
{
let mut encoder = device.create_command_encoder(&wgpu::CommandEncoderDescriptor::default());
self.texture.copy_to_buffer(
wgpu::ImageCopyBuffer {
buffer: &self.buffer,
layout: wgpu::ImageDataLayout {
offset: 0,
bytes_per_row: Some(self.bytes_per_row),
rows_per_image: None,
},
},
&mut encoder,
);
queue.submit([encoder.finish()]);
let map_result = Arc::new(Mutex::new(None));
let slice = self.buffer.slice(0..self.bytes);
slice.map_async(wgpu::MapMode::Read, {
let map_result = map_result.clone();
move |result| {
*map_result.lock().assert("thread panicked") = Some(result);
}
});
buffer.clear();
buffer.reserve(self.bytes.cast());
loop {
device.poll(wgpu::Maintain::Poll);
let mut result = map_result.lock().assert("thread panicked");
if let Some(result) = result.take() {
result?;
break;
}
}
buffer.extend_from_slice(&slice.get_mapped_range());
self.buffer.unmap();
Format::convert_rgba(buffer, self.texture.size().width.get(), self.bytes_per_row);
Ok(())
}
}
/// A recorder of a [`VirtualWindow`].
pub struct VirtualRecorder<Format = Rgb8> {
/// The virtual window being recorded.
pub window: VirtualWindow,
device: wgpu::Device,
queue: wgpu::Queue,
capture: Option<Capture>,
device: Arc<wgpu::Device>,
queue: Arc<wgpu::Queue>,
capture: Option<Box<Capture>>,
data: Vec<u8>,
cursor: Dynamic<Point<Px>>,
cursor_graphic: Drawing,
@ -2848,16 +2906,18 @@ where
&queue,
);
Ok(Self {
let mut recorder = Self {
window,
device,
queue,
device: Arc::new(device),
queue: Arc::new(queue),
cursor: Dynamic::default(),
cursor_graphic: Drawing::default(),
capture: None,
data: Vec::new(),
format: PhantomData,
})
};
recorder.refresh()?;
Ok(recorder)
}
/// Returns the tightly-packed captured bytes.
@ -2867,7 +2927,7 @@ where
&self.data
}
fn recreate_buffers_if_needed(&mut self, size: Size<UPx>, bytes: u64) {
fn recreate_buffers_if_needed(&mut self, size: Size<UPx>, bytes: u64, bytes_per_row: u32) {
if self
.capture
.as_ref()
@ -2896,20 +2956,21 @@ where
usage: wgpu::BufferUsages::COPY_DST | wgpu::BufferUsages::MAP_READ,
mapped_at_creation: false,
});
self.capture = Some(Capture {
self.capture = Some(Box::new(Capture {
bytes,
bytes_per_row,
buffer,
texture,
multisample,
});
}));
}
}
/// Redraws the contents.
pub fn refresh(&mut self) -> Result<(), wgpu::BufferAsyncError> {
fn redraw(&mut self) {
let render_size = self.window.kludgine.size().ceil();
let bytes_per_row = copy_buffer_aligned_bytes_per_row(render_size.width.get() * 4);
let size = u64::from(bytes_per_row) * u64::from(render_size.height.get());
self.recreate_buffers_if_needed(render_size, size);
self.recreate_buffers_if_needed(render_size, size, bytes_per_row);
let capture = self.capture.as_ref().assert("always initialized above");
@ -2942,69 +3003,15 @@ where
&self.queue,
Some(&self.cursor_graphic),
);
}
let mut encoder = self
.device
.create_command_encoder(&wgpu::CommandEncoderDescriptor::default());
capture.texture.copy_to_buffer(
wgpu::ImageCopyBuffer {
buffer: &capture.buffer,
layout: wgpu::ImageDataLayout {
offset: 0,
bytes_per_row: Some(bytes_per_row),
rows_per_image: None,
},
},
&mut encoder,
);
self.queue.submit([encoder.finish()]);
/// Redraws the contents.
pub fn refresh(&mut self) -> Result<(), wgpu::BufferAsyncError> {
self.redraw();
let map_result = Arc::new(Mutex::new(None));
let condvar = Arc::new(Condvar::new());
let slice = capture.buffer.slice(0..size);
let capture = self.capture.as_ref().assert("always initialized above");
std::thread::scope(|scope| {
scope.spawn({
let map_result = map_result.clone();
let condvar = condvar.clone();
move || {
slice.map_async(wgpu::MapMode::Read, {
move |result| {
*map_result.lock().assert("thread panicked") = Some(result);
condvar.notify_one();
}
});
}
});
// Now that we've queued up the data mapping thread, let's make sure
// our vec is allocated. Since an allocation can take a moment, this
// is the perfect to do it.
self.data.clear();
self.data.reserve(size.cast());
// Wait for the buffer to have been mapped.
loop {
self.device.poll(wgpu::Maintain::Poll);
let mut result = map_result.lock().assert("thread panicked");
if let Some(result) = result.take() {
result?;
break;
}
let _guard = condvar
.wait_timeout(result, Duration::from_millis(1))
.assert("thread panicked");
}
Ok(())
})?;
self.data.extend_from_slice(&slice.get_mapped_range());
capture.buffer.unmap();
Format::convert_rgba(&mut self.data, render_size.width.get(), bytes_per_row);
capture.map_into::<Format>(&mut self.data, &self.device, &self.queue)?;
Ok(())
}
@ -3017,9 +3024,9 @@ where
/// Begins recording an animated png.
pub fn record_animated_png(&mut self, target_fps: u8) -> AnimationRecorder<'_, Format> {
AnimationRecorder {
recorder: self,
target_fps,
frames: Vec::new(),
assembler: FrameAssembler::spawn::<Format>(self.device.clone(), self.queue.clone()),
recorder: self,
}
}
}
@ -3033,7 +3040,7 @@ fn copy_buffer_aligned_bytes_per_row(width: u32) -> u32 {
pub struct AnimationRecorder<'a, Format> {
recorder: &'a mut VirtualRecorder<Format>,
target_fps: u8,
frames: Vec<Frame>,
assembler: FrameAssembler,
}
impl<Format> AnimationRecorder<'_, Format>
@ -3081,19 +3088,17 @@ where
};
if final_frame || next_frame == now {
// Try to reuse an existing capture instead of forcing an
// allocation.
if let Ok(capture) = self.assembler.resuable_captures.try_recv() {
self.recorder.capture = Some(capture);
}
let elapsed = now.saturating_duration_since(last_frame);
last_frame = now;
self.recorder.refresh()?;
match self.frames.last_mut() {
Some(frame) if frame.data == self.recorder.bytes() => {
frame.duration += elapsed;
}
_ => {
self.frames.push(Frame {
data: self.recorder.bytes().to_vec(),
duration: elapsed,
});
}
self.recorder.redraw();
let capture = self.recorder.capture.take().assert("always present");
if self.assembler.sender.send((capture, elapsed)).is_err() {
break;
}
}
@ -3109,7 +3114,8 @@ where
}
/// Encodes the currently recorded frames into a new file at `path`.
pub fn write_to(&self, path: impl AsRef<Path>) -> Result<(), png::EncodingError> {
pub fn write_to(self, path: impl AsRef<Path>) -> Result<(), VirtualRecorderError> {
let frames = self.assembler.finish()?;
let mut file = std::fs::OpenOptions::new()
.create(true)
.truncate(true)
@ -3122,17 +3128,10 @@ where
);
encoder.set_color(png::ColorType::Rgb);
encoder.set_adaptive_filter(png::AdaptiveFilterType::Adaptive);
encoder.set_animated(
u32::try_from(self.frames.len()).assert("too many frames"),
0,
)?;
encoder.set_animated(u32::try_from(frames.len()).assert("too many frames"), 0)?;
encoder.set_compression(png::Compression::Best);
let mut current_frame_delay = self
.frames
.first()
.assert("always at least one frame")
.duration;
let mut current_frame_delay = frames.first().assert("always at least one frame").duration;
encoder.set_frame_delay(
current_frame_delay
.as_millis()
@ -3141,7 +3140,7 @@ where
1_000,
)?;
let mut writer = encoder.write_header()?;
for frame in &self.frames {
for frame in &frames {
if current_frame_delay != frame.duration {
current_frame_delay = frame.duration;
writer.set_frame_delay(
@ -3181,6 +3180,14 @@ pub enum VirtualRecorderError {
TooLarge,
/// An error occurred trying to read a buffer.
MapBuffer(wgpu::BufferAsyncError),
/// An error occurred encoding a png image.
PngEncode(png::EncodingError),
}
impl From<png::EncodingError> for VirtualRecorderError {
fn from(value: png::EncodingError) -> Self {
Self::PngEncode(value)
}
}
impl From<wgpu::RequestDeviceError> for VirtualRecorderError {
@ -3201,6 +3208,12 @@ impl From<TryFromIntError> for VirtualRecorderError {
}
}
impl From<io::Error> for VirtualRecorderError {
fn from(value: io::Error) -> Self {
Self::PngEncode(value.into())
}
}
/// A unique identifier of an input device.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
pub enum DeviceId {
@ -3215,3 +3228,74 @@ impl From<winit::event::DeviceId> for DeviceId {
Self::Winit(value)
}
}
struct FrameAssembler {
sender: mpsc::SyncSender<(Box<Capture>, Duration)>,
result: mpsc::Receiver<Result<Vec<Frame>, VirtualRecorderError>>,
resuable_captures: mpsc::Receiver<Box<Capture>>,
}
impl FrameAssembler {
fn spawn<Format>(device: Arc<wgpu::Device>, queue: Arc<wgpu::Queue>) -> Self
where
Format: CaptureFormat,
{
let (frame_sender, frame_receiver) = mpsc::sync_channel(1000);
let (finished_frame_sender, finished_frame_receiver) = mpsc::sync_channel(600);
let (result_sender, result_receiver) = mpsc::sync_channel(1);
std::thread::spawn(move || {
Self::assembler_thread::<Format>(
&frame_receiver,
&result_sender,
&finished_frame_sender,
&device,
&queue,
);
});
Self {
sender: frame_sender,
result: result_receiver,
resuable_captures: finished_frame_receiver,
}
}
fn finish(self) -> Result<Vec<Frame>, VirtualRecorderError> {
drop(self.sender);
self.result.recv().assert("thread panicked")
}
fn assembler_thread<Format>(
frames: &mpsc::Receiver<(Box<Capture>, Duration)>,
result: &mpsc::SyncSender<Result<Vec<Frame>, VirtualRecorderError>>,
reusable: &mpsc::SyncSender<Box<Capture>>,
device: &wgpu::Device,
queue: &wgpu::Queue,
) where
Format: CaptureFormat,
{
let mut assembled = Vec::<Frame>::new();
let mut buffer = Vec::new();
while let Ok((capture, elapsed)) = frames.recv() {
if let Err(err) = capture.map_into::<Format>(&mut buffer, device, queue) {
let _result = result.send(Err(err.into()));
return;
}
match assembled.last_mut() {
Some(frame) if frame.data == buffer => {
frame.duration += elapsed;
}
_ => {
assembled.push(Frame {
data: std::mem::take(&mut buffer),
duration: elapsed,
});
}
}
let _result = reusable.try_send(capture);
}
let _result = result.send(Ok(assembled));
}
}