-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueue.py
73 lines (55 loc) · 2.34 KB
/
queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# queue.py: adapted from uasyncio V2
# Copyright (c) 2018-2020 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
# Code is based on Paul Sokolovsky's work.
# This is a temporary solution until uasyncio V3 gets an efficient official version
import asyncio
# Exception raised by get_nowait().
class QueueEmpty(Exception):
pass
# Exception raised by put_nowait().
class QueueFull(Exception):
pass
class Queue:
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._queue = []
self._evput = asyncio.Event() # Triggered by put, tested by get
self._evget = asyncio.Event() # Triggered by get, tested by put
def _get(self):
self._evget.set() # Schedule all tasks waiting on get
self._evget.clear()
return self._queue.pop(0)
async def get(self): # Usage: item = await queue.get()
while self.empty(): # May be multiple tasks waiting on get()
# Queue is empty, suspend task until a put occurs
# 1st of N tasks gets, the rest loop again
await self._evput.wait()
return self._get()
def get_nowait(self): # Remove and return an item from the queue.
# Return an item if one is immediately available, else raise QueueEmpty.
if self.empty():
raise QueueEmpty()
return self._get()
def _put(self, val):
self._evput.set() # Schedule tasks waiting on put
self._evput.clear()
self._queue.append(val)
async def put(self, val): # Usage: await queue.put(item)
while self.full():
# Queue full
await self._evget.wait()
# Task(s) waiting to get from queue, schedule first Task
self._put(val)
def put_nowait(self, val): # Put an item into the queue without blocking.
if self.full():
raise QueueFull()
self._put(val)
def qsize(self): # Number of items in the queue.
return len(self._queue)
def empty(self): # Return True if the queue is empty, False otherwise.
return len(self._queue) == 0
def full(self): # Return True if there are maxsize items in the queue.
# Note: if the Queue was initialized with maxsize=0 (the default) or
# any negative number, then full() is never True.
return self.maxsize > 0 and self.qsize() >= self.maxsize