Skip to content

Commit

Permalink
Add tests for callbacks.
Browse files Browse the repository at this point in the history
  • Loading branch information
wmedrano committed Sep 14, 2024
1 parent 5663356 commit 92a3403
Show file tree
Hide file tree
Showing 8 changed files with 264 additions and 49 deletions.
4 changes: 2 additions & 2 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[profile.default]
test-threads = 1
fail-fast = false
slow-timeout = { period = "5s", terminate-after = 4 }
retries = { backoff = "fixed", count = 2, delay = "100ms" }
slow-timeout = { period = "2s", terminate-after = 2 }
retries = { backoff = "fixed", count = 3, delay = "1s" }
19 changes: 9 additions & 10 deletions src/client/async_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ where
client,
notification: notification_handler,
process: process_handler,
is_valid: AtomicBool::new(true),
is_valid_for_callback: AtomicBool::new(true),
has_panic: AtomicBool::new(false),
});
CallbackContext::register_callbacks(&mut callback_context)?;
let res = j::jack_activate(callback_context.client.raw());
Expand Down Expand Up @@ -109,23 +110,21 @@ impl<N, P> AsyncClient<N, P> {
return Err(Error::ClientIsNoLongerAlive);
}
let cb = self.callback.take().ok_or(Error::ClientIsNoLongerAlive)?;
let client = cb.client.raw();
let client_ptr = cb.client.raw();

// deactivate
if j::jack_deactivate(client) != 0 {
if j::jack_deactivate(client_ptr) != 0 {
return Err(Error::ClientDeactivationError);
}

// clear the callbacks
clear_callbacks(client)?;
clear_callbacks(client_ptr)?;
// done, take ownership of callback
if cb.is_valid.load(std::sync::atomic::Ordering::Relaxed) {
Ok(cb)
} else {
std::mem::forget(cb.notification);
std::mem::forget(cb.process);
Err(Error::ClientIsNoLongerAlive)
if cb.has_panic.load(std::sync::atomic::Ordering::Relaxed) {
std::mem::forget(cb);
return Err(Error::ClientPanicked);
}
Ok(cb)
}
}

Expand Down
56 changes: 30 additions & 26 deletions src/client/callbacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ where
ctx.notification.thread_init(&ctx.client);
});
if let Err(err) = res {
CallbackContext::<N, P>::from_raw(data).map(CallbackContext::mark_invalid);
CallbackContext::<N, P>::from_raw(data).map(|ctx| ctx.mark_invalid(true));
eprintln!("{err:?}");
std::mem::forget(err);
}
Expand All @@ -160,7 +160,7 @@ unsafe extern "C" fn shutdown<N, P>(
);
});
if let Err(err) = res {
CallbackContext::<N, P>::from_raw(data).map(CallbackContext::mark_invalid);
CallbackContext::<N, P>::from_raw(data).map(|ctx| ctx.mark_invalid(true));
eprintln!("{err:?}");
std::mem::forget(err);
}
Expand All @@ -178,14 +178,15 @@ where
let scope = ProcessScope::from_raw(n_frames, ctx.client.raw());
let c = ctx.process.process(&ctx.client, &scope);
if c == Control::Quit {
ctx.mark_invalid();
ctx.mark_invalid(false);
}
c
});
match res {
Ok(res) => res.to_ffi(),
Err(err) => {
CallbackContext::<N, P>::from_raw(data).map(CallbackContext::mark_invalid);
eprintln!("harhar");
CallbackContext::<N, P>::from_raw(data).map(|ctx| ctx.mark_invalid(true));
eprintln!("{err:?}");
std::mem::forget(err);
Control::Quit.to_ffi()
Expand All @@ -212,15 +213,15 @@ where
&*(pos as *mut crate::TransportPosition),
);
if !is_ready {
ctx.mark_invalid();
ctx.mark_invalid(false);
}
is_ready
});
match res {
Ok(true) => 1,
Ok(false) => 0,
Err(err) => {
CallbackContext::<N, P>::from_raw(data).map(CallbackContext::mark_invalid);
CallbackContext::<N, P>::from_raw(data).map(|ctx| ctx.mark_invalid(true));
eprintln!("{err:?}");
std::mem::forget(err);
0
Expand All @@ -241,7 +242,7 @@ where
ctx.notification.freewheel(&ctx.client, is_starting);
});
if let Err(err) = res {
CallbackContext::<N, P>::from_raw(data).map(CallbackContext::mark_invalid);
CallbackContext::<N, P>::from_raw(data).map(|ctx| ctx.mark_invalid(true));
eprintln!("{err:?}");
std::mem::forget(err);
}
Expand All @@ -258,14 +259,14 @@ where
};
let c = ctx.process.buffer_size(&ctx.client, n_frames);
if c == Control::Quit {
ctx.mark_invalid();
ctx.mark_invalid(false);
}
c
});
match res {
Ok(c) => c.to_ffi(),
Err(err) => {
CallbackContext::<N, P>::from_raw(data).map(CallbackContext::mark_invalid);
CallbackContext::<N, P>::from_raw(data).map(|ctx| ctx.mark_invalid(true));
eprintln!("{err:?}");
std::mem::forget(err);
Control::Quit.to_ffi()
Expand All @@ -284,14 +285,14 @@ where
};
let c = ctx.notification.sample_rate(&ctx.client, n_frames);
if c == Control::Quit {
ctx.mark_invalid();
ctx.mark_invalid(false);
}
c
});
match res {
Ok(c) => c.to_ffi(),
Err(err) => {
CallbackContext::<N, P>::from_raw(data).map(CallbackContext::mark_invalid);
CallbackContext::<N, P>::from_raw(data).map(|ctx| ctx.mark_invalid(true));
eprintln!("{err:?}");
std::mem::forget(err);
Control::Quit.to_ffi()
Expand All @@ -317,7 +318,7 @@ unsafe extern "C" fn client_registration<N, P>(
.client_registration(&ctx.client, name, register);
});
if let Err(err) = res {
CallbackContext::<N, P>::from_raw(data).map(CallbackContext::mark_invalid);
CallbackContext::<N, P>::from_raw(data).map(|ctx| ctx.mark_invalid(true));
eprintln!("{err:?}");
std::mem::forget(err);
}
Expand All @@ -340,7 +341,7 @@ unsafe extern "C" fn port_registration<N, P>(
.port_registration(&ctx.client, port_id, register);
});
if let Err(err) = res {
CallbackContext::<N, P>::from_raw(data).map(CallbackContext::mark_invalid);
CallbackContext::<N, P>::from_raw(data).map(|ctx| ctx.mark_invalid(true));
eprintln!("{err:?}");
std::mem::forget(err);
}
Expand All @@ -367,14 +368,14 @@ where
.notification
.port_rename(&ctx.client, port_id, old_name, new_name);
if c == Control::Quit {
ctx.mark_invalid();
ctx.mark_invalid(false);
}
c
});
match res {
Ok(c) => c.to_ffi(),
Err(err) => {
CallbackContext::<N, P>::from_raw(data).map(CallbackContext::mark_invalid);
CallbackContext::<N, P>::from_raw(data).map(|ctx| ctx.mark_invalid(true));
eprintln!("{err:?}");
std::mem::forget(err);
Control::Quit.to_ffi()
Expand All @@ -400,7 +401,7 @@ unsafe extern "C" fn port_connect<N, P>(
.ports_connected(&ctx.client, port_id_a, port_id_b, are_connected);
});
if let Err(err) = res {
CallbackContext::<N, P>::from_raw(data).map(CallbackContext::mark_invalid);
CallbackContext::<N, P>::from_raw(data).map(|ctx| ctx.mark_invalid(true));
eprintln!("{err:?}");
std::mem::forget(err);
}
Expand All @@ -417,14 +418,14 @@ where
};
let c = ctx.notification.graph_reorder(&ctx.client);
if c == Control::Quit {
ctx.mark_invalid();
ctx.mark_invalid(false);
}
c
});
match res {
Ok(c) => c.to_ffi(),
Err(err) => {
CallbackContext::<N, P>::from_raw(data).map(CallbackContext::mark_invalid);
CallbackContext::<N, P>::from_raw(data).map(|ctx| ctx.mark_invalid(true));
eprintln!("{err:?}");
std::mem::forget(err);
Control::Quit.to_ffi()
Expand All @@ -443,14 +444,14 @@ where
};
let c = ctx.notification.xrun(&ctx.client);
if c == Control::Quit {
ctx.mark_invalid();
ctx.mark_invalid(false);
}
c
});
match res {
Ok(c) => c.to_ffi(),
Err(err) => {
CallbackContext::<N, P>::from_raw(data).map(CallbackContext::mark_invalid);
CallbackContext::<N, P>::from_raw(data).map(|ctx| ctx.mark_invalid(true));
eprintln!("{err:?}");
std::mem::forget(err);
Control::Quit.to_ffi()
Expand Down Expand Up @@ -487,10 +488,12 @@ pub struct CallbackContext<N, P> {
pub notification: N,
/// The handler for processing.
pub process: P,
/// True if the callback is valid.
/// True if the callback is valid for callbacks.
///
/// This becomes false after a panic.
pub is_valid: AtomicBool,
/// This becomes false after quit an event that causes processing to quit.
pub is_valid_for_callback: AtomicBool,
/// True if the callback has panicked.
pub has_panic: AtomicBool,
}

impl<N, P> CallbackContext<N, P>
Expand All @@ -502,7 +505,7 @@ where
debug_assert!(!ptr.is_null());
let obj_ptr = ptr as *mut CallbackContext<N, P>;
let obj_ref = &mut *obj_ptr;
if obj_ref.is_valid.load(Ordering::Relaxed) {
if obj_ref.is_valid_for_callback.load(Ordering::Relaxed) {
Some(obj_ref)
} else {
None
Expand All @@ -513,8 +516,9 @@ where
///
/// This usually happens after a panic.
#[cold]
pub fn mark_invalid(&mut self) {
self.is_valid.store(true, Ordering::Relaxed);
pub fn mark_invalid(&mut self, did_panic: bool) {
self.is_valid_for_callback.store(true, Ordering::Relaxed);
self.has_panic.store(did_panic, Ordering::Relaxed);
}

fn raw(b: &mut Box<Self>) -> *mut libc::c_void {
Expand Down
35 changes: 28 additions & 7 deletions src/client/client_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::Arc;
use std::{ffi, fmt, ptr};

use crate::client::common::{sleep_on_test, CREATE_OR_DESTROY_CLIENT_MUTEX};
use crate::jack_enums::CodeOrMessage;
use crate::jack_utils::collect_strs;
use crate::properties::PropertyChangeHandler;
use crate::transport::Transport;
Expand Down Expand Up @@ -496,16 +497,14 @@ impl Client {
/// 4. Both ports must be owned by active clients.
///
/// # Panics
/// Panics if it is not possible to convert `source_port` or
/// `destination_port` to a `CString`.
/// Panics if it is not possible to convert `source_port` or `destination_port` to a `CString`.
pub fn connect_ports_by_name(
&self,
source_port: &str,
destination_port: &str,
) -> Result<(), Error> {
let source_cstr = ffi::CString::new(source_port).unwrap();
let destination_cstr = ffi::CString::new(destination_port).unwrap();

let res =
unsafe { j::jack_connect(self.raw(), source_cstr.as_ptr(), destination_cstr.as_ptr()) };
match res {
Expand All @@ -514,10 +513,32 @@ impl Client {
source_port.to_string(),
destination_port.to_string(),
)),
_ => Err(Error::PortConnectionError(
source_port.to_string(),
destination_port.to_string(),
)),
code => {
let code_or_message = if self
.port_by_name(&source_port)
.map(|p| p.flags().contains(PortFlags::IS_INPUT))
.unwrap_or(false)
{
CodeOrMessage::Message(
"source port does not produce a signal, it is not an input port",
)
} else if self
.port_by_name(&destination_port)
.map(|p| p.flags().contains(PortFlags::IS_OUTPUT))
.unwrap_or(false)
{
CodeOrMessage::Message(
"destination port cannot be written to, it is not an output port",
)
} else {
CodeOrMessage::Code(code)
};
Err(Error::PortConnectionError {
source: source_port.to_string(),
destination: destination_port.to_string(),
code_or_message,
})
}
}
}

Expand Down
Loading

0 comments on commit 92a3403

Please sign in to comment.