Skip to content

Commit

Permalink
esp-idf-svc#56
Browse files Browse the repository at this point in the history
  • Loading branch information
ivmarkov committed Dec 30, 2023
1 parent 0e2957f commit 26731ac
Showing 1 changed file with 30 additions and 34 deletions.
64 changes: 30 additions & 34 deletions src/utils/asyncify/event_bus.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use core::fmt::Debug;
use core::future::Future;
use core::future::poll_fn;
use core::marker::PhantomData;
use core::pin::Pin;
use core::task::{Context, Poll, Waker};
use core::time::Duration;

Expand Down Expand Up @@ -107,56 +106,53 @@ where
P: Clone + Send,
{
pub async fn recv(&self) -> Result<P, E> {
NextFuture(self).await
let _defer = defer(|| self.cleanup());

poll_fn(|ctx| self.poll(ctx)).await
}
}

struct NextFuture<'a, CV, P, S, E>(&'a AsyncSubscription<CV, P, S, E>)
where
CV: RawCondvar + Send + Sync,
CV::RawMutex: Send + Sync,
P: Clone + Send,
S: Send;
pub async fn recv_mut(&mut self) -> Result<P, E> {
let _defer = {
let state = self.state.clone();
defer(move || state.0.lock().waker = None)
};

impl<'a, CV, P, S, E> Drop for NextFuture<'a, CV, P, S, E>
where
CV: RawCondvar + Send + Sync,
CV::RawMutex: Send + Sync,
P: Clone + Send,
S: Send,
{
fn drop(&mut self) {
let mut state = self.0.state.0.lock();
state.waker = None;
poll_fn(move |ctx| self.poll(ctx)).await
}
}

impl<'a, CV, P, S, E> Future for NextFuture<'a, CV, P, S, E>
where
CV: RawCondvar + Send + Sync,
CV::RawMutex: Send + Sync,
P: Clone + Send,
S: Send,
{
type Output = Result<P, E>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut state = self.0.state.0.lock();
fn poll(&self, cx: &mut Context<'_>) -> Poll<Result<P, E>> {
let mut state = self.state.0.lock();

let value = state.value.take();

if let Some(value) = value {
self.0.state.1.notify_all();
self.state.1.notify_all();

Poll::Ready(Ok(value))
} else {
state.waker = Some(cx.waker().clone());

self.0.state.1.notify_all();
self.state.1.notify_all();

Poll::Pending
}
}

fn cleanup(&self) {
self.state.0.lock().waker = None;
}
}

fn defer<F: FnOnce()>(f: F) -> impl Drop {
struct Defer<F: FnOnce()>(Option<F>);

impl<F: FnOnce()> Drop for Defer<F> {
fn drop(&mut self) {
self.0.take().map(|f| f());
}
}

Defer(Some(f))
}

pub struct AsyncEventBus<U, CV, E> {
Expand Down

0 comments on commit 26731ac

Please sign in to comment.