-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
'await connection.close()' returns once connection thread has also forwarded _STOP_RUNNING_SENTINEL #305
base: main
Are you sure you want to change the base?
'await connection.close()' returns once connection thread has also forwarded _STOP_RUNNING_SENTINEL #305
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,10 +72,15 @@ def __init__( | |
DeprecationWarning, | ||
) | ||
|
||
def _stop_running(self): | ||
async def _stop_running(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
self._running = False | ||
# PEP 661 is not accepted yet, so we cannot type a sentinel | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
self._tx.put_nowait(_STOP_RUNNING_SENTINEL) # type: ignore[arg-type] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
function = partial(lambda: _STOP_RUNNING_SENTINEL) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
future = asyncio.get_event_loop().create_future() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
self._tx.put_nowait((future, function)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
return await future | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. return await future |
||
|
||
@property | ||
def _conn(self) -> sqlite3.Connection: | ||
|
@@ -105,16 +110,16 @@ def run(self) -> None: | |
# futures) | ||
|
||
tx_item = self._tx.get() | ||
if tx_item is _STOP_RUNNING_SENTINEL: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
break | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
future, function = tx_item | ||
|
||
try: | ||
LOG.debug("executing %s", function) | ||
result = function() | ||
LOG.debug("operation %s completed", function) | ||
future.get_loop().call_soon_threadsafe(set_result, future, result) | ||
|
||
if result is _STOP_RUNNING_SENTINEL: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
break | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
except BaseException as e: # noqa B036 | ||
LOG.debug("returning exception %s", e) | ||
future.get_loop().call_soon_threadsafe(set_exception, future, e) | ||
|
@@ -139,7 +144,7 @@ async def _connect(self) -> "Connection": | |
self._tx.put_nowait((future, self._connector)) | ||
self._connection = await future | ||
except Exception: | ||
self._stop_running() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
await self._stop_running() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
self._connection = None | ||
raise | ||
|
||
|
@@ -180,7 +185,7 @@ async def close(self) -> None: | |
LOG.info("exception occurred while closing connection") | ||
raise | ||
finally: | ||
self._stop_running() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
await self._stop_running() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
self._connection = None | ||
|
||
@contextmanager | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def _stop_running(self):
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All review points are the same type: verbatim copy of a line change.
If you are not some kind of AI, would you mind expanding with clarifying comments ?