Skip to content

Commit

Permalink
[python,aiohttp] Don't create persistent aiohttp.ClientSession in __i…
Browse files Browse the repository at this point in the history
…nit__

aiohttp's `ClientSession` & `TCPConnector` used to obtain an event loop in
__init__ (via `asyncio.get_event_loop`). However, as of aio-libs/aiohttp#8512 both
classes now obtain the running event loop and won't potentially create one. This
makes it impossible to create `ClientSession` and `TCPConnector` objects outside
of coroutines, as `get_running_loop` must be called from a coroutine.

Thus we defer the creation of a `ClientSession` into the actual request and
cache it for later usage. Thereby we pay only a very small price on the first
request, but subsequent requests will not be any more expensive.
  • Loading branch information
dcermak committed Dec 11, 2024
1 parent b218e23 commit 7d33482
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,51 +44,31 @@ class RESTClientObject:
def __init__(self, configuration) -> None:

# maxsize is number of requests to host that are allowed in parallel
maxsize = configuration.connection_pool_maxsize
self.maxsize = configuration.connection_pool_maxsize

ssl_context = ssl.create_default_context(
self.ssl_context = ssl.create_default_context(
cafile=configuration.ssl_ca_cert
)
if configuration.cert_file:
ssl_context.load_cert_chain(
self.ssl_context.load_cert_chain(
configuration.cert_file, keyfile=configuration.key_file
)

if not configuration.verify_ssl:
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE

connector = aiohttp.TCPConnector(
limit=maxsize,
ssl=ssl_context
)
self.ssl_context.check_hostname = False
self.ssl_context.verify_mode = ssl.CERT_NONE

self.proxy = configuration.proxy
self.proxy_headers = configuration.proxy_headers

# https pool manager
self.pool_manager = aiohttp.ClientSession(
connector=connector,
trust_env=True
)
self.retries = configuration.retries

retries = configuration.retries
self.retry_client: Optional[aiohttp_retry.RetryClient]
if retries is not None:
self.retry_client = aiohttp_retry.RetryClient(
client_session=self.pool_manager,
retry_options=aiohttp_retry.ExponentialRetry(
attempts=retries,
factor=2.0,
start_timeout=0.1,
max_timeout=120.0
)
)
else:
self.retry_client = None
self.pool_manager: Optional[aiohttp.ClientSession] = None
self.retry_client: Optional[aiohttp_retry.RetryClient] = None

async def close(self):
await self.pool_manager.close()
async def close(self) -> None:
if self.pool_manager:
await self.pool_manager.close()
if self.retry_client is not None:
await self.retry_client.close()

Expand Down Expand Up @@ -195,10 +175,27 @@ class RESTClientObject:
raise ApiException(status=0, reason=msg)

pool_manager: Union[aiohttp.ClientSession, aiohttp_retry.RetryClient]
if self.retry_client is not None and method in ALLOW_RETRY_METHODS:

# https pool manager
if self.pool_manager is None:
self.pool_manager = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(limit=self.maxsize, ssl=self.ssl_context),
trust_env=True,
)
pool_manager = self.pool_manager

if self.retries is not None and method in ALLOW_RETRY_METHODS:
if self.retry_client is None:
self.retry_client = aiohttp_retry.RetryClient(
client_session=self.pool_manager,
retry_options=aiohttp_retry.ExponentialRetry(
attempts=self.retries,
factor=2.0,
start_timeout=0.1,
max_timeout=120.0
)
)
pool_manager = self.retry_client
else:
pool_manager = self.pool_manager

r = await pool_manager.request(**args)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,51 +54,31 @@ class RESTClientObject:
def __init__(self, configuration) -> None:

# maxsize is number of requests to host that are allowed in parallel
maxsize = configuration.connection_pool_maxsize
self.maxsize = configuration.connection_pool_maxsize

ssl_context = ssl.create_default_context(
self.ssl_context = ssl.create_default_context(
cafile=configuration.ssl_ca_cert
)
if configuration.cert_file:
ssl_context.load_cert_chain(
self.ssl_context.load_cert_chain(
configuration.cert_file, keyfile=configuration.key_file
)

if not configuration.verify_ssl:
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE

connector = aiohttp.TCPConnector(
limit=maxsize,
ssl=ssl_context
)
self.ssl_context.check_hostname = False
self.ssl_context.verify_mode = ssl.CERT_NONE

self.proxy = configuration.proxy
self.proxy_headers = configuration.proxy_headers

# https pool manager
self.pool_manager = aiohttp.ClientSession(
connector=connector,
trust_env=True
)
self.retries = configuration.retries

retries = configuration.retries
self.retry_client: Optional[aiohttp_retry.RetryClient]
if retries is not None:
self.retry_client = aiohttp_retry.RetryClient(
client_session=self.pool_manager,
retry_options=aiohttp_retry.ExponentialRetry(
attempts=retries,
factor=2.0,
start_timeout=0.1,
max_timeout=120.0
)
)
else:
self.retry_client = None
self.pool_manager: Optional[aiohttp.ClientSession] = None
self.retry_client: Optional[aiohttp_retry.RetryClient] = None

async def close(self):
await self.pool_manager.close()
async def close(self) -> None:
if self.pool_manager:
await self.pool_manager.close()
if self.retry_client is not None:
await self.retry_client.close()

Expand Down Expand Up @@ -205,10 +185,27 @@ async def request(
raise ApiException(status=0, reason=msg)

pool_manager: Union[aiohttp.ClientSession, aiohttp_retry.RetryClient]
if self.retry_client is not None and method in ALLOW_RETRY_METHODS:

# https pool manager
if self.pool_manager is None:
self.pool_manager = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(limit=self.maxsize, ssl=self.ssl_context),
trust_env=True,
)
pool_manager = self.pool_manager

if self.retries is not None and method in ALLOW_RETRY_METHODS:
if self.retry_client is None:
self.retry_client = aiohttp_retry.RetryClient(
client_session=self.pool_manager,
retry_options=aiohttp_retry.ExponentialRetry(
attempts=self.retries,
factor=2.0,
start_timeout=0.1,
max_timeout=120.0
)
)
pool_manager = self.retry_client
else:
pool_manager = self.pool_manager

r = await pool_manager.request(**args)

Expand Down

0 comments on commit 7d33482

Please sign in to comment.