-
Notifications
You must be signed in to change notification settings - Fork 52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AsyncSemaphore and AsyncPriorityQueue. #147
Open
cheatfate
wants to merge
3
commits into
master
Choose a base branch
from
semaphore
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,7 @@ | |
# MIT license (LICENSE-MIT) | ||
|
||
## This module implements some core synchronization primitives | ||
import std/[sequtils, deques] | ||
import std/[sequtils, deques, heapqueue] | ||
import ./asyncloop | ||
|
||
type | ||
|
@@ -51,10 +51,37 @@ type | |
queue: Deque[T] | ||
maxsize: int | ||
|
||
AsyncSemaphore* = ref object of RootRef | ||
## A semaphore manages an internal counter which is decremented by each | ||
## ``acquire()`` call and incremented by each ``release()`` call. The | ||
## counter can never go below zero; when ``acquire()`` finds that it is | ||
## zero, it blocks, waiting until some other task calls ``release()``. | ||
## | ||
## The ``size`` argument gives the initial value for the internal | ||
## counter; it defaults to ``1``. If the value given is less than 0, | ||
## ``AssertionError`` is raised. | ||
counter: int | ||
waiters: seq[Future[void]] | ||
maxcounter: int | ||
|
||
AsyncPriorityQueue*[T] = ref object of RootRef | ||
## A priority queue, useful for coordinating producer and consumer | ||
## coroutines, but with priority in mind. Entries with lowest priority will | ||
## be obtained first. | ||
## | ||
## If ``maxsize`` is less than or equal to zero, the queue size is | ||
## infinite. If it is an integer greater than ``0``, then "await put()" | ||
## will block when the queue reaches ``maxsize``, until an item is | ||
## removed by "await get()". | ||
getters: seq[Future[void]] | ||
putters: seq[Future[void]] | ||
queue: HeapQueue[T] | ||
maxsize: int | ||
|
||
AsyncQueueEmptyError* = object of CatchableError | ||
## ``AsyncQueue`` is empty. | ||
## ``AsyncQueue`` or ``AsyncPriorityQueue`` is empty. | ||
AsyncQueueFullError* = object of CatchableError | ||
## ``AsyncQueue`` is full. | ||
## ``AsyncQueue`` or ``AsyncPriorityQueue`` is full. | ||
AsyncLockError* = object of CatchableError | ||
## ``AsyncLock`` is either locked or unlocked. | ||
|
||
|
@@ -180,6 +207,55 @@ proc isSet*(event: AsyncEvent): bool = | |
## Return `true` if and only if the internal flag of ``event`` is `true`. | ||
event.flag | ||
|
||
proc wakeupNext(waiters: var seq[Future[void]]) {.inline.} = | ||
var i = 0 | ||
while i < len(waiters): | ||
var waiter = waiters[i] | ||
inc(i) | ||
|
||
if not(waiter.finished()): | ||
waiter.complete() | ||
break | ||
|
||
if i > 0: | ||
waiters.delete(0, i - 1) | ||
|
||
proc newAsyncSemaphore*(value: int = 1): AsyncSemaphore = | ||
## Creates a new asynchronous bounded semaphore ``AsyncSemaphore`` with | ||
## internal counter set to ``value``. | ||
doAssert(value >= 0, "AsyncSemaphore initial value must be bigger or equal 0") | ||
discard getThreadDispatcher() | ||
AsyncSemaphore(waiters: newSeq[Future[void]](), counter: value, | ||
maxcounter: value) | ||
|
||
proc locked*(asem: AsyncSemaphore): bool = | ||
## Returns ``true`` if semaphore can not be acquired immediately | ||
(asem.counter == 0) | ||
|
||
proc acquire*(asem: AsyncSemaphore) {.async.} = | ||
## Acquire a semaphore. | ||
## | ||
## If the internal counter is larger than zero on entry, decrement it by one | ||
## and return immediately. If its zero on entry, block and wait until some | ||
## other task has called ``release()`` to make it larger than 0. | ||
while asem.counter <= 0: | ||
let waiter = newFuture[void]("AsyncSemaphore.acquire") | ||
asem.waiters.add(waiter) | ||
try: | ||
await waiter | ||
except CatchableError as exc: | ||
if asem.counter > 0 and not(waiter.cancelled()): | ||
asem.waiters.wakeupNext() | ||
raise exc | ||
dec(asem.counter) | ||
|
||
proc release*(asem: AsyncSemaphore) = | ||
## Release a semaphore, incrementing internal counter by one. | ||
if asem.counter >= asem.maxcounter: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
raiseAssert("AsyncSemaphore released too many times") | ||
inc(asem.counter) | ||
asem.waiters.wakeupNext() | ||
|
||
proc newAsyncQueue*[T](maxsize: int = 0): AsyncQueue[T] = | ||
## Creates a new asynchronous queue ``AsyncQueue``. | ||
|
||
|
@@ -193,20 +269,23 @@ proc newAsyncQueue*[T](maxsize: int = 0): AsyncQueue[T] = | |
maxsize: maxsize | ||
) | ||
|
||
proc wakeupNext(waiters: var seq[Future[void]]) {.inline.} = | ||
var i = 0 | ||
while i < len(waiters): | ||
var waiter = waiters[i] | ||
inc(i) | ||
|
||
if not(waiter.finished()): | ||
waiter.complete() | ||
break | ||
proc newAsyncPriorityQueue*[T](maxsize: int = 0): AsyncPriorityQueue[T] = | ||
## Creates new asynchronous priority queue ``AsyncPriorityQueue``. | ||
## | ||
## To use a ``AsyncPriorityQueue` with a custom object, the ``<`` operator | ||
## must be implemented. | ||
|
||
if i > 0: | ||
waiters.delete(0, i - 1) | ||
# Workaround for callSoon() not worked correctly before | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
# getThreadDispatcher() call. | ||
discard getThreadDispatcher() | ||
AsyncPriorityQueue[T]( | ||
getters: newSeq[Future[void]](), | ||
putters: newSeq[Future[void]](), | ||
queue: initHeapQueue[T](), | ||
maxsize: maxsize | ||
) | ||
|
||
proc full*[T](aq: AsyncQueue[T]): bool {.inline.} = | ||
proc full*[T](aq: AsyncQueue[T] | AsyncPriorityQueue[T]): bool {.inline.} = | ||
## Return ``true`` if there are ``maxsize`` items in the queue. | ||
## | ||
## Note: If the ``aq`` was initialized with ``maxsize = 0`` (default), | ||
|
@@ -216,7 +295,7 @@ proc full*[T](aq: AsyncQueue[T]): bool {.inline.} = | |
else: | ||
(len(aq.queue) >= aq.maxsize) | ||
|
||
proc empty*[T](aq: AsyncQueue[T]): bool {.inline.} = | ||
proc empty*[T](aq: AsyncQueue[T] | AsyncPriorityQueue[T]): bool {.inline.} = | ||
## Return ``true`` if the queue is empty, ``false`` otherwise. | ||
(len(aq.queue) == 0) | ||
|
||
|
@@ -330,24 +409,76 @@ proc get*[T](aq: AsyncQueue[T]): Future[T] {.inline.} = | |
## Alias of ``popFirst()``. | ||
aq.popFirst() | ||
|
||
proc clear*[T](aq: AsyncQueue[T]) {.inline.} = | ||
proc pushNoWait*[T](aq: AsyncPriorityQueue[T], item: T) {.inline.} = | ||
## Push ``item`` onto the queue ``aq`` immediately, maintaining the heap | ||
## invariant. | ||
## | ||
## If queue ``aq`` is full, then ``AsyncQueueFullError`` exception raised. | ||
if aq.full(): | ||
raise newException(AsyncQueueFullError, "AsyncPriorityQueue is full!") | ||
aq.queue.push(item) | ||
aq.getters.wakeupNext() | ||
|
||
proc popNoWait*[T](aq: AsyncPriorityQueue[T]): T {.inline.} = | ||
## Pop and return the item with lowest priority from queue ``aq``, | ||
## maintaining the heap invariant. | ||
## | ||
## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised. | ||
if aq.empty(): | ||
raise newException(AsyncQueueEmptyError, "AsyncPriorityQueue is empty!") | ||
let res = aq.queue.pop() | ||
aq.putters.wakeupNext() | ||
res | ||
|
||
proc push*[T](aq: AsyncPriorityQueue[T], item: T) {.async.} = | ||
## Push ``item`` onto the queue ``aq``. If the queue is full, wait until a | ||
## free slot is available. | ||
while aq.full(): | ||
var putter = newFuture[void]("AsyncPriorityQueue.push") | ||
aq.putters.add(putter) | ||
try: | ||
await putter | ||
except CatchableError as exc: | ||
if not(aq.full()) and not(putter.cancelled()): | ||
aq.putters.wakeupNext() | ||
raise exc | ||
aq.pushNoWait(item) | ||
|
||
proc pop*[T](aq: AsyncPriorityQueue[T]): Future[T] {.async.} = | ||
## Remove and return an ``item`` with lowest priority from the queue ``aq``. | ||
## If the queue is empty, wait until an item is available. | ||
while aq.empty(): | ||
var getter = newFuture[void]("AsyncPriorityQueue.pop") | ||
aq.getters.add(getter) | ||
try: | ||
await getter | ||
except CatchableError as exc: | ||
if not(aq.empty()) and not(getter.cancelled()): | ||
aq.getters.wakeupNext() | ||
raise exc | ||
return aq.popNoWait() | ||
|
||
proc clear*[T](aq: AsyncQueue[T]) {. | ||
inline, deprecated: "Procedure clear() can lead to hangs!".} = | ||
## Clears all elements of queue ``aq``. | ||
aq.queue.clear() | ||
|
||
proc len*[T](aq: AsyncQueue[T]): int {.inline.} = | ||
proc len*[T](aq: AsyncQueue[T] | AsyncPriorityQueue[T]): int {.inline.} = | ||
## Return the number of elements in ``aq``. | ||
len(aq.queue) | ||
|
||
proc size*[T](aq: AsyncQueue[T]): int {.inline.} = | ||
proc size*[T](aq: AsyncQueue[T] | AsyncPriorityQueue[T]): int {.inline.} = | ||
## Return the maximum number of elements in ``aq``. | ||
len(aq.maxsize) | ||
|
||
proc `[]`*[T](aq: AsyncQueue[T], i: Natural) : T {.inline.} = | ||
proc `[]`*[T](aq: AsyncQueue[T] | AsyncPriorityQueue[T], | ||
i: Natural) : T {.inline.} = | ||
## Access the i-th element of ``aq`` by order from first to last. | ||
## ``aq[0]`` is the first element, ``aq[^1]`` is the last element. | ||
aq.queue[i] | ||
|
||
proc `[]`*[T](aq: AsyncQueue[T], i: BackwardsIndex) : T {.inline.} = | ||
proc `[]`*[T](aq: AsyncQueue[T] | AsyncPriorityQueue[T], | ||
i: BackwardsIndex) : T {.inline.} = | ||
## Access the i-th element of ``aq`` by order from first to last. | ||
## ``aq[0]`` is the first element, ``aq[^1]`` is the last element. | ||
aq.queue[len(aq.queue) - int(i)] | ||
|
@@ -390,3 +521,12 @@ proc `$`*[T](aq: AsyncQueue[T]): string = | |
res.addQuoted(item) | ||
res.add("]") | ||
res | ||
|
||
proc `$`*[T](aq: AsyncPriorityQueue[T]): string = | ||
## Turn on AsyncPriorityQueue ``aq`` into its string representation. | ||
var res = "[" | ||
for i in 0 ..< len(aq.queue): | ||
if len(res) > 1: res.add(", ") | ||
res.addQuoted(aq.queue[i]) | ||
res.add("]") | ||
res |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the loop should not be needed here - there will be exactly one queue item per count below zero so they should line up perfectly