Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncSemaphore and AsyncPriorityQueue. #147

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions chronos/asyncloop.nim
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ when defined(windows) or defined(nimdoc):
result.trackers = initTable[string, TrackerBase]()
initAPI(result)

var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
var gDisp{.threadvar.}: PDispatcher ## Thread's dispatcher

proc setThreadDispatcher*(disp: PDispatcher) {.gcsafe, raises: [Defect].}
proc getThreadDispatcher*(): PDispatcher {.gcsafe, raises: [Defect].}
Expand Down Expand Up @@ -515,7 +515,7 @@ elif unixPlatform:
result.trackers = initTable[string, TrackerBase]()
initAPI(result)

var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
var gDisp{.threadvar.}: PDispatcher ## Thread's dispatcher

proc setThreadDispatcher*(disp: PDispatcher) {.gcsafe, raises: [Defect].}
proc getThreadDispatcher*(): PDispatcher {.gcsafe, raises: [Defect].}
Expand Down Expand Up @@ -1009,7 +1009,7 @@ proc wait*[T](fut: Future[T], timeout = -1): Future[T] {.
include asyncmacro2

proc runForever*() =
## Begins a never ending global dispatcher poll loop.
## Begins a never ending thread's dispatcher poll loop.
while true:
poll()

Expand Down
182 changes: 161 additions & 21 deletions chronos/asyncsync.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# MIT license (LICENSE-MIT)

## This module implements some core synchronization primitives
import std/[sequtils, deques]
import std/[sequtils, deques, heapqueue]
import ./asyncloop

type
Expand Down Expand Up @@ -51,10 +51,37 @@ type
queue: Deque[T]
maxsize: int

AsyncSemaphore* = ref object of RootRef
## A semaphore manages an internal counter which is decremented by each
## ``acquire()`` call and incremented by each ``release()`` call. The
## counter can never go below zero; when ``acquire()`` finds that it is
## zero, it blocks, waiting until some other task calls ``release()``.
##
## The ``size`` argument gives the initial value for the internal
## counter; it defaults to ``1``. If the value given is less than 0,
## ``AssertionError`` is raised.
counter: int
waiters: seq[Future[void]]
maxcounter: int

AsyncPriorityQueue*[T] = ref object of RootRef
## A priority queue, useful for coordinating producer and consumer
## coroutines, but with priority in mind. Entries with lowest priority will
## be obtained first.
##
## If ``maxsize`` is less than or equal to zero, the queue size is
## infinite. If it is an integer greater than ``0``, then "await put()"
## will block when the queue reaches ``maxsize``, until an item is
## removed by "await get()".
getters: seq[Future[void]]
putters: seq[Future[void]]
queue: HeapQueue[T]
maxsize: int

AsyncQueueEmptyError* = object of CatchableError
## ``AsyncQueue`` is empty.
## ``AsyncQueue`` or ``AsyncPriorityQueue`` is empty.
AsyncQueueFullError* = object of CatchableError
## ``AsyncQueue`` is full.
## ``AsyncQueue`` or ``AsyncPriorityQueue`` is full.
AsyncLockError* = object of CatchableError
## ``AsyncLock`` is either locked or unlocked.

Expand Down Expand Up @@ -180,6 +207,55 @@ proc isSet*(event: AsyncEvent): bool =
## Return `true` if and only if the internal flag of ``event`` is `true`.
event.flag

proc wakeupNext(waiters: var seq[Future[void]]) {.inline.} =
var i = 0
while i < len(waiters):
var waiter = waiters[i]
inc(i)

if not(waiter.finished()):
waiter.complete()
break

if i > 0:
waiters.delete(0, i - 1)

proc newAsyncSemaphore*(value: int = 1): AsyncSemaphore =
## Creates a new asynchronous bounded semaphore ``AsyncSemaphore`` with
## internal counter set to ``value``.
doAssert(value >= 0, "AsyncSemaphore initial value must be bigger or equal 0")
discard getThreadDispatcher()
AsyncSemaphore(waiters: newSeq[Future[void]](), counter: value,
maxcounter: value)

proc locked*(asem: AsyncSemaphore): bool =
## Returns ``true`` if semaphore can not be acquired immediately
(asem.counter == 0)

proc acquire*(asem: AsyncSemaphore) {.async.} =
## Acquire a semaphore.
##
## If the internal counter is larger than zero on entry, decrement it by one
## and return immediately. If its zero on entry, block and wait until some
## other task has called ``release()`` to make it larger than 0.
while asem.counter <= 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the loop should not be needed here - there will be exactly one queue item per count below zero so they should line up perfectly

let waiter = newFuture[void]("AsyncSemaphore.acquire")
asem.waiters.add(waiter)
try:
await waiter
except CatchableError as exc:
if asem.counter > 0 and not(waiter.cancelled()):
asem.waiters.wakeupNext()
raise exc
dec(asem.counter)

proc release*(asem: AsyncSemaphore) =
## Release a semaphore, incrementing internal counter by one.
if asem.counter >= asem.maxcounter:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doAssert?

raiseAssert("AsyncSemaphore released too many times")
inc(asem.counter)
asem.waiters.wakeupNext()

proc newAsyncQueue*[T](maxsize: int = 0): AsyncQueue[T] =
## Creates a new asynchronous queue ``AsyncQueue``.

Expand All @@ -193,20 +269,23 @@ proc newAsyncQueue*[T](maxsize: int = 0): AsyncQueue[T] =
maxsize: maxsize
)

proc wakeupNext(waiters: var seq[Future[void]]) {.inline.} =
var i = 0
while i < len(waiters):
var waiter = waiters[i]
inc(i)

if not(waiter.finished()):
waiter.complete()
break
proc newAsyncPriorityQueue*[T](maxsize: int = 0): AsyncPriorityQueue[T] =
## Creates new asynchronous priority queue ``AsyncPriorityQueue``.
##
## To use a ``AsyncPriorityQueue` with a custom object, the ``<`` operator
## must be implemented.

if i > 0:
waiters.delete(0, i - 1)
# Workaround for callSoon() not worked correctly before
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

callSoon calls getThreadDispatcher, why should this be needed here?

# getThreadDispatcher() call.
discard getThreadDispatcher()
AsyncPriorityQueue[T](
getters: newSeq[Future[void]](),
putters: newSeq[Future[void]](),
queue: initHeapQueue[T](),
maxsize: maxsize
)

proc full*[T](aq: AsyncQueue[T]): bool {.inline.} =
proc full*[T](aq: AsyncQueue[T] | AsyncPriorityQueue[T]): bool {.inline.} =
## Return ``true`` if there are ``maxsize`` items in the queue.
##
## Note: If the ``aq`` was initialized with ``maxsize = 0`` (default),
Expand All @@ -216,7 +295,7 @@ proc full*[T](aq: AsyncQueue[T]): bool {.inline.} =
else:
(len(aq.queue) >= aq.maxsize)

proc empty*[T](aq: AsyncQueue[T]): bool {.inline.} =
proc empty*[T](aq: AsyncQueue[T] | AsyncPriorityQueue[T]): bool {.inline.} =
## Return ``true`` if the queue is empty, ``false`` otherwise.
(len(aq.queue) == 0)

Expand Down Expand Up @@ -330,24 +409,76 @@ proc get*[T](aq: AsyncQueue[T]): Future[T] {.inline.} =
## Alias of ``popFirst()``.
aq.popFirst()

proc clear*[T](aq: AsyncQueue[T]) {.inline.} =
proc pushNoWait*[T](aq: AsyncPriorityQueue[T], item: T) {.inline.} =
## Push ``item`` onto the queue ``aq`` immediately, maintaining the heap
## invariant.
##
## If queue ``aq`` is full, then ``AsyncQueueFullError`` exception raised.
if aq.full():
raise newException(AsyncQueueFullError, "AsyncPriorityQueue is full!")
aq.queue.push(item)
aq.getters.wakeupNext()

proc popNoWait*[T](aq: AsyncPriorityQueue[T]): T {.inline.} =
## Pop and return the item with lowest priority from queue ``aq``,
## maintaining the heap invariant.
##
## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised.
if aq.empty():
raise newException(AsyncQueueEmptyError, "AsyncPriorityQueue is empty!")
let res = aq.queue.pop()
aq.putters.wakeupNext()
res

proc push*[T](aq: AsyncPriorityQueue[T], item: T) {.async.} =
## Push ``item`` onto the queue ``aq``. If the queue is full, wait until a
## free slot is available.
while aq.full():
var putter = newFuture[void]("AsyncPriorityQueue.push")
aq.putters.add(putter)
try:
await putter
except CatchableError as exc:
if not(aq.full()) and not(putter.cancelled()):
aq.putters.wakeupNext()
raise exc
aq.pushNoWait(item)

proc pop*[T](aq: AsyncPriorityQueue[T]): Future[T] {.async.} =
## Remove and return an ``item`` with lowest priority from the queue ``aq``.
## If the queue is empty, wait until an item is available.
while aq.empty():
var getter = newFuture[void]("AsyncPriorityQueue.pop")
aq.getters.add(getter)
try:
await getter
except CatchableError as exc:
if not(aq.empty()) and not(getter.cancelled()):
aq.getters.wakeupNext()
raise exc
return aq.popNoWait()

proc clear*[T](aq: AsyncQueue[T]) {.
inline, deprecated: "Procedure clear() can lead to hangs!".} =
## Clears all elements of queue ``aq``.
aq.queue.clear()

proc len*[T](aq: AsyncQueue[T]): int {.inline.} =
proc len*[T](aq: AsyncQueue[T] | AsyncPriorityQueue[T]): int {.inline.} =
## Return the number of elements in ``aq``.
len(aq.queue)

proc size*[T](aq: AsyncQueue[T]): int {.inline.} =
proc size*[T](aq: AsyncQueue[T] | AsyncPriorityQueue[T]): int {.inline.} =
## Return the maximum number of elements in ``aq``.
len(aq.maxsize)

proc `[]`*[T](aq: AsyncQueue[T], i: Natural) : T {.inline.} =
proc `[]`*[T](aq: AsyncQueue[T] | AsyncPriorityQueue[T],
i: Natural) : T {.inline.} =
## Access the i-th element of ``aq`` by order from first to last.
## ``aq[0]`` is the first element, ``aq[^1]`` is the last element.
aq.queue[i]

proc `[]`*[T](aq: AsyncQueue[T], i: BackwardsIndex) : T {.inline.} =
proc `[]`*[T](aq: AsyncQueue[T] | AsyncPriorityQueue[T],
i: BackwardsIndex) : T {.inline.} =
## Access the i-th element of ``aq`` by order from first to last.
## ``aq[0]`` is the first element, ``aq[^1]`` is the last element.
aq.queue[len(aq.queue) - int(i)]
Expand Down Expand Up @@ -390,3 +521,12 @@ proc `$`*[T](aq: AsyncQueue[T]): string =
res.addQuoted(item)
res.add("]")
res

proc `$`*[T](aq: AsyncPriorityQueue[T]): string =
## Turn on AsyncPriorityQueue ``aq`` into its string representation.
var res = "["
for i in 0 ..< len(aq.queue):
if len(res) > 1: res.add(", ")
res.addQuoted(aq.queue[i])
res.add("]")
res
Loading