Skip to content

Commit

Permalink
--wip-- [skip ci]
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Jun 4, 2024
1 parent 43b0b4e commit b6d107a
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 13 deletions.
1 change: 1 addition & 0 deletions elfo-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ thread_local = { version = "1.1.3", optional = true }
unicycle = "0.9.3"
rmp-serde = { version = "1.1.0", optional = true }
humantime-serde = "1"
padded-semaphore = { path = "../../padded-semaphore" }

[dev-dependencies]
elfo-utils = { version = "0.2.5", path = "../elfo-utils", features = ["test-util"] }
Expand Down
2 changes: 1 addition & 1 deletion elfo-core/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl Envelope {

let header = EnvelopeHeader {
link: <_>::default(),
created_time: Instant::now(),
created_time: metrics::try_recorder().map_or(Instant::ZERO, |_| Instant::now()),
trace_id,
kind,
message_offset,
Expand Down
22 changes: 10 additions & 12 deletions elfo-core/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use cordyceps::{
mpsc_queue::{Links, MpscQueue},
Linked,
};
use padded_semaphore::{RawSemaphore, TryAcquireError};
use parking_lot::Mutex;
use tokio::sync::{Notify, Semaphore, TryAcquireError};
use tokio::sync::Notify;

use elfo_utils::CachePadded;

Expand Down Expand Up @@ -49,7 +50,7 @@ const LIMIT: usize = 100_000;

pub(crate) struct Mailbox {
queue: MpscQueue<EnvelopeHeader>,
tx_semaphore: Semaphore,
tx_semaphore: RawSemaphore,
rx_notify: CachePadded<Notify>,
closed_trace_id: Mutex<Option<TraceId>>,
}
Expand All @@ -58,28 +59,25 @@ impl Mailbox {
pub(crate) fn new() -> Self {
Self {
queue: MpscQueue::new_with_stub(Envelope::stub()),
tx_semaphore: Semaphore::new(LIMIT),
tx_semaphore: RawSemaphore::new(LIMIT),
rx_notify: CachePadded(Notify::new()),
closed_trace_id: Mutex::new(None),
}
}

pub(crate) async fn send(&self, envelope: Envelope) -> Result<(), SendError<Envelope>> {
let permit = match self.tx_semaphore.acquire().await {
Ok(permit) => permit,
Err(_) => return Err(SendError(envelope)),
};
if self.tx_semaphore.acquire().await.is_err() {
return Err(SendError(envelope));
}

permit.forget();
self.queue.enqueue(envelope);
self.rx_notify.notify_one();
Ok(())
}

pub(crate) fn try_send(&self, envelope: Envelope) -> Result<(), TrySendError<Envelope>> {
match self.tx_semaphore.try_acquire() {
Ok(permit) => {
permit.forget();
Ok(()) => {
self.queue.enqueue(envelope);
self.rx_notify.notify_one();
Ok(())
Expand All @@ -93,7 +91,7 @@ impl Mailbox {
loop {
if let Some(envelope) = self.queue.dequeue() {
// TODO: try_dequeue?
self.tx_semaphore.add_permits(1);
self.tx_semaphore.release();
return RecvResult::Data(envelope);
}

Expand All @@ -108,7 +106,7 @@ impl Mailbox {
pub(crate) fn try_recv(&self) -> Option<RecvResult> {
match self.queue.dequeue() {
Some(envelope) => {
self.tx_semaphore.add_permits(1);
self.tx_semaphore.release();
Some(RecvResult::Data(envelope))
}
None if self.tx_semaphore.is_closed() => Some(self.on_close()),
Expand Down
2 changes: 2 additions & 0 deletions elfo-utils/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use quanta::Clock;
pub struct Instant(u64); // TODO: make it `NonZeroU64`?

impl Instant {
pub const ZERO: Self = Self(0);

/// Returns the current time.
#[inline]
pub fn now() -> Self {
Expand Down

0 comments on commit b6d107a

Please sign in to comment.