We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
https://github.com/dmannion1972/axis-websocket-metadata-python/blob/main/getsesssionid.py
https://github.com/dmannion1972/axis-websocket-metadata-python
encode/httpx#304 https://gist.github.com/tomchristie/3293d5b118b5646ce79cc074976744b0
import trio async def main(): async with ws_connect("ws://127.0.0.1:8765") as websockets: await websockets.send("Hello, world.") message = await websockets.recv() print(message) trio.run(main)
import base64 import contextlib import os import httpx import wsproto class ConnectionClosed(Exception): pass class WebsocketConnection: def __init__(self, network_steam): self._ws_connection_state = wsproto.Connection(wsproto.ConnectionType.CLIENT) self._network_stream = network_steam self._events = [] async def send(self, text): """ Send a text frame over the websocket connection. """ event = wsproto.events.TextMessage(text) data = self._ws_connection_state.send(event) await self._network_stream.write(data) async def recv(self): """ Receive the next text frame from the websocket connection. """ while not self._events: data = await self._network_stream.read(max_bytes=4096) self._ws_connection_state.receive_data(data) self._events = list(self._ws_connection_state.events()) event = self._events.pop(0) if isinstance(event, wsproto.events.TextMessage): return event.data elif isinstance(event, wsproto.events.CloseConnection): raise ConnectionClosed() @contextlib.asynccontextmanager async def ws_connect(url): headers = { "connection": "upgrade", "upgrade": "websocket", "sec-websocket-key": base64.b64encode(os.urandom(16)), "sec-websocket-version": "13", } async with httpx.AsyncClient() as client: async with client.stream("GET", url, headers=headers) as response: network_steam = response.extensions["network_stream"] yield WebsocketConnection(network_steam)
The text was updated successfully, but these errors were encountered:
https://github.com/frankie567/httpx-ws/tree/main
Sorry, something went wrong.
No branches or pull requests
https://github.com/dmannion1972/axis-websocket-metadata-python/blob/main/getsesssionid.py
https://github.com/dmannion1972/axis-websocket-metadata-python
encode/httpx#304
https://gist.github.com/tomchristie/3293d5b118b5646ce79cc074976744b0
The text was updated successfully, but these errors were encountered: