Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add peek to AsyncQueue #557

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions chronos/asyncsync.nim
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ proc popLastImpl[T](aq: AsyncQueue[T]): T =
aq.putters.wakeupNext()
res

proc peekFirstImpl[T](aq: AsyncQueue[T]): T =
aq.queue.peekFirst()

proc peekLastImpl[T](aq: AsyncQueue[T]): T =
aq.queue.peekLast()

proc addFirstNoWait*[T](aq: AsyncQueue[T], item: T) {.
raises: [AsyncQueueFullError].} =
## Put an item ``item`` to the beginning of the queue ``aq`` immediately.
Expand Down Expand Up @@ -293,6 +299,26 @@ proc popLastNoWait*[T](aq: AsyncQueue[T]): T {.
raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!")
aq.popLastImpl()

proc peekFirstNoWait*[T](aq: AsyncQueue[T]): T {.
raises: [AsyncQueueEmptyError].} =
## Get an item from the beginning of the queue ``aq`` immediately but without
## removing it.
##
## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised.
if aq.empty():
raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!")
aq.peekFirstImpl()

proc peekLastNoWait*[T](aq: AsyncQueue[T]): T {.
raises: [AsyncQueueEmptyError].} =
## Get an item from the end of the queue ``aq`` immediately but without
## removing it.
##
## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised.
if aq.empty():
raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!")
aq.peekLastImpl()

proc addFirst*[T](aq: AsyncQueue[T], item: T) {.
async: (raises: [CancelledError]).} =
## Put an ``item`` to the beginning of the queue ``aq``. If the queue is full,
Expand Down Expand Up @@ -357,6 +383,42 @@ proc popLast*[T](aq: AsyncQueue[T]): Future[T] {.
raise exc
aq.popLastImpl()

proc peekFirst*[T](aq: AsyncQueue[T]): Future[T] {.
async: (raises: [CancelledError]).} =
## Return an ``item`` without removing it from the beginning of the queue
## ``aq``. If the queue is empty, wait until an item is available.
while aq.empty():
let getter =
Future[void].Raising([CancelledError]).init("AsyncQueue.peekFirst")
aq.getters.add(getter)
try:
await getter
except CancelledError as exc:
if not(aq.empty()) and not(getter.cancelled()):
aq.getters.wakeupNext()
raise exc
let res = aq.peekFirstImpl()
aq.getters.wakeupNext()
res

proc peekLast*[T](aq: AsyncQueue[T]): Future[T] {.
async: (raises: [CancelledError]).} =
## Return an ``item`` without removing it from the end of the queue ``aq``.
## If the queue is empty, wait until an item is available.
while aq.empty():
let getter =
Future[void].Raising([CancelledError]).init("AsyncQueue.peekLast")
aq.getters.add(getter)
try:
await getter
except CancelledError as exc:
if not(aq.empty()) and not(getter.cancelled()):
aq.getters.wakeupNext()
raise exc
let res = aq.peekLastImpl()
aq.getters.wakeupNext()
res

proc putNoWait*[T](aq: AsyncQueue[T], item: T) {.
raises: [AsyncQueueFullError].} =
## Alias of ``addLastNoWait()``.
Expand All @@ -367,6 +429,11 @@ proc getNoWait*[T](aq: AsyncQueue[T]): T {.
## Alias of ``popFirstNoWait()``.
aq.popFirstNoWait()

proc peekNoWait*[T](aq: AsyncQueue[T]): T {.
raises: [AsyncQueueEmptyError].} =
## Alias of ``peekFirstNoWait()``.
aq.peekFirstNoWait()

proc put*[T](aq: AsyncQueue[T], item: T): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
## Alias of ``addLast()``.
Expand All @@ -377,6 +444,11 @@ proc get*[T](aq: AsyncQueue[T]): Future[T] {.
## Alias of ``popFirst()``.
aq.popFirst()

proc peek*[T](aq: AsyncQueue[T]): Future[T] {.
async: (raw: true, raises: [CancelledError]).} =
## Alias of ``peekFirst()``.
aq.peekFirst()

proc clear*[T](aq: AsyncQueue[T]) {.inline.} =
## Clears all elements of queue ``aq``.
aq.queue.clear()
Expand Down
51 changes: 51 additions & 0 deletions tests/testsync.nim
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,57 @@ suite "Asynchronous sync primitives test suite":
test "AsyncQueue() contains test":
check test9() == true

test "AsyncQueue() peek test":
let q = newAsyncQueue[int]()
q.putNoWait(1)
q.putNoWait(2)

check:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there needs to be a test here that ensures that the following also works:

let a = q.peekFirst()
let b = q.popFirst()

both in that order and in the reverse order - where peek-after-pop should probably not trigger

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added more tests as suggested.

q.peekNoWait() == 1
q.peekFirstNoWait() == 1
q.peekLastNoWait() == 2
(waitFor q.peek()) == 1
(waitFor q.peekFirst()) == 1
(waitFor q.peekLast()) == 2

test "AsyncQueue() peek before pop test":
let q = newAsyncQueue[int]()
q.putNoWait(1)

let
a = q.peekFirst()
b = q.popFirst()

check:
a.completed() == true
b.completed() == true
a.read() == 1
b.read() == 1
q.len() == 0

test "AsyncQueue() peek after pop test":
let q = newAsyncQueue[int]()
q.putNoWait(1)

let
a = q.popFirst()
b = q.peekFirst()

check:
a.completed() == true
b.completed() == false
a.read() == 1
q.len() == 0

q.putNoWait(2)
poll()

check:
a.completed() == true
b.completed() == true
b.read() == 2
q.len() == 1

test "AsyncEventQueue() behavior test":
let eventQueue = newAsyncEventQueue[int]()
let key = eventQueue.register()
Expand Down