From d44dbcf36f5761db54da8ed56f154afa5ee11d5b Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+AAAZZAM@users.noreply.github.com> Date: Wed, 1 Jan 2025 15:20:34 -0600 Subject: [PATCH 01/11] init --- src/prefect/client/orchestration/__init__.py | 228 +-------- .../_concurrency_limits/__init__.py | 0 .../_concurrency_limits/client.py | 454 ++++++++++++++++++ 3 files changed, 467 insertions(+), 215 deletions(-) create mode 100644 src/prefect/client/orchestration/_concurrency_limits/__init__.py create mode 100644 src/prefect/client/orchestration/_concurrency_limits/client.py diff --git a/src/prefect/client/orchestration/__init__.py b/src/prefect/client/orchestration/__init__.py index 484944de1185..b4c305e76b0f 100644 --- a/src/prefect/client/orchestration/__init__.py +++ b/src/prefect/client/orchestration/__init__.py @@ -26,6 +26,11 @@ ArtifactCollectionAsyncClient, ) +from prefect.client.orchestration._concurrency_limits.client import ( + ConcurrencyLimitAsyncClient, + ConcurrencyLimitClient, +) + import prefect import prefect.exceptions import prefect.settings @@ -38,7 +43,6 @@ BlockSchemaCreate, BlockTypeCreate, BlockTypeUpdate, - ConcurrencyLimitCreate, DeploymentCreate, DeploymentFlowRunCreate, DeploymentScheduleCreate, @@ -77,7 +81,6 @@ BlockDocument, BlockSchema, BlockType, - ConcurrencyLimit, ConcurrencyOptions, Constant, DeploymentSchedule, @@ -152,15 +155,13 @@ def get_client( *, httpx_settings: Optional[dict[str, Any]] = ..., sync_client: Literal[False] = False, -) -> "PrefectClient": - ... +) -> "PrefectClient": ... @overload def get_client( *, httpx_settings: Optional[dict[str, Any]] = ..., sync_client: Literal[True] = ... -) -> "SyncPrefectClient": - ... +) -> "SyncPrefectClient": ... def get_client( @@ -244,7 +245,9 @@ def get_client( ) -class PrefectClient(ArtifactAsyncClient, ArtifactCollectionAsyncClient): +class PrefectClient( + ArtifactAsyncClient, ArtifactCollectionAsyncClient, ConcurrencyLimitAsyncClient +): """ An asynchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/). @@ -811,213 +814,6 @@ async def delete_flow_run( else: raise - async def create_concurrency_limit( - self, - tag: str, - concurrency_limit: int, - ) -> UUID: - """ - Create a tag concurrency limit in the Prefect API. These limits govern concurrently - running tasks. - - Args: - tag: a tag the concurrency limit is applied to - concurrency_limit: the maximum number of concurrent task runs for a given tag - - Raises: - httpx.RequestError: if the concurrency limit was not created for any reason - - Returns: - the ID of the concurrency limit in the backend - """ - - concurrency_limit_create = ConcurrencyLimitCreate( - tag=tag, - concurrency_limit=concurrency_limit, - ) - response = await self._client.post( - "/concurrency_limits/", - json=concurrency_limit_create.model_dump(mode="json"), - ) - - concurrency_limit_id = response.json().get("id") - - if not concurrency_limit_id: - raise httpx.RequestError(f"Malformed response: {response}") - - return UUID(concurrency_limit_id) - - async def read_concurrency_limit_by_tag( - self, - tag: str, - ) -> ConcurrencyLimit: - """ - Read the concurrency limit set on a specific tag. - - Args: - tag: a tag the concurrency limit is applied to - - Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 - httpx.RequestError: if the concurrency limit was not created for any reason - - Returns: - the concurrency limit set on a specific tag - """ - try: - response = await self._client.get( - f"/concurrency_limits/tag/{tag}", - ) - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - - concurrency_limit_id = response.json().get("id") - - if not concurrency_limit_id: - raise httpx.RequestError(f"Malformed response: {response}") - - concurrency_limit = ConcurrencyLimit.model_validate(response.json()) - return concurrency_limit - - async def read_concurrency_limits( - self, - limit: int, - offset: int, - ) -> list[ConcurrencyLimit]: - """ - Lists concurrency limits set on task run tags. - - Args: - limit: the maximum number of concurrency limits returned - offset: the concurrency limit query offset - - Returns: - a list of concurrency limits - """ - - body = { - "limit": limit, - "offset": offset, - } - - response = await self._client.post("/concurrency_limits/filter", json=body) - return pydantic.TypeAdapter(list[ConcurrencyLimit]).validate_python( - response.json() - ) - - async def reset_concurrency_limit_by_tag( - self, - tag: str, - slot_override: Optional[list[Union[UUID, str]]] = None, - ) -> None: - """ - Resets the concurrency limit slots set on a specific tag. - - Args: - tag: a tag the concurrency limit is applied to - slot_override: a list of task run IDs that are currently using a - concurrency slot, please check that any task run IDs included in - `slot_override` are currently running, otherwise those concurrency - slots will never be released. - - Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 - httpx.RequestError: If request fails - - """ - if slot_override is not None: - slot_override = [str(slot) for slot in slot_override] - - try: - await self._client.post( - f"/concurrency_limits/tag/{tag}/reset", - json=dict(slot_override=slot_override), - ) - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - - async def delete_concurrency_limit_by_tag( - self, - tag: str, - ) -> None: - """ - Delete the concurrency limit set on a specific tag. - - Args: - tag: a tag the concurrency limit is applied to - - Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 - httpx.RequestError: If request fails - - """ - try: - await self._client.delete( - f"/concurrency_limits/tag/{tag}", - ) - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - - async def increment_v1_concurrency_slots( - self, - names: list[str], - task_run_id: UUID, - ) -> httpx.Response: - """ - Increment concurrency limit slots for the specified limits. - - Args: - names (List[str]): A list of limit names for which to increment limits. - task_run_id (UUID): The task run ID incrementing the limits. - """ - data: dict[str, Any] = { - "names": names, - "task_run_id": str(task_run_id), - } - - return await self._client.post( - "/concurrency_limits/increment", - json=data, - ) - - async def decrement_v1_concurrency_slots( - self, - names: list[str], - task_run_id: UUID, - occupancy_seconds: float, - ) -> httpx.Response: - """ - Decrement concurrency limit slots for the specified limits. - - Args: - names (List[str]): A list of limit names to decrement. - task_run_id (UUID): The task run ID that incremented the limits. - occupancy_seconds (float): The duration in seconds that the limits - were held. - - Returns: - httpx.Response: The HTTP response from the server. - """ - data: dict[str, Any] = { - "names": names, - "task_run_id": str(task_run_id), - "occupancy_seconds": occupancy_seconds, - } - - return await self._client.post( - "/concurrency_limits/decrement", - json=data, - ) - async def create_work_queue( self, name: str, @@ -3436,7 +3232,9 @@ def __exit__(self, *_: object) -> NoReturn: assert False, "This should never be called but must be defined for __enter__" -class SyncPrefectClient(ArtifactClient, ArtifactCollectionClient): +class SyncPrefectClient( + ArtifactClient, ArtifactCollectionClient, ConcurrencyLimitClient +): """ A synchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/). diff --git a/src/prefect/client/orchestration/_concurrency_limits/__init__.py b/src/prefect/client/orchestration/_concurrency_limits/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/src/prefect/client/orchestration/_concurrency_limits/client.py b/src/prefect/client/orchestration/_concurrency_limits/client.py new file mode 100644 index 000000000000..847dcb321075 --- /dev/null +++ b/src/prefect/client/orchestration/_concurrency_limits/client.py @@ -0,0 +1,454 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from httpx import HTTPStatusError, RequestError + +from prefect.client.orchestration.base import BaseAsyncClient, BaseClient +from prefect.exceptions import ObjectNotFound + +if TYPE_CHECKING: + from uuid import UUID + + from httpx import Response + + from prefect.client.schemas.actions import ConcurrencyLimitCreate + from prefect.client.schemas.objects import ConcurrencyLimit + + +class ConcurrencyLimitClient(BaseClient): + def create_concurrency_limit( + self, + tag: str, + concurrency_limit: int, + ) -> "UUID": + """ + Create a tag concurrency limit in the Prefect API. These limits govern concurrently + running tasks. + + Args: + tag: a tag the concurrency limit is applied to + concurrency_limit: the maximum number of concurrent task runs for a given tag + + Raises: + httpx.RequestError: if the concurrency limit was not created for any reason + + Returns: + the ID of the concurrency limit in the backend + """ + + concurrency_limit_create = ConcurrencyLimitCreate( + tag=tag, + concurrency_limit=concurrency_limit, + ) + response = self.request( + "POST", + "/concurrency_limits/", + json=concurrency_limit_create.model_dump(mode="json"), + ) + + concurrency_limit_id = response.json().get("id") + + if not concurrency_limit_id: + raise RequestError(f"Malformed response: {response}") + from uuid import UUID + + return UUID(concurrency_limit_id) + + def read_concurrency_limit_by_tag( + self, + tag: str, + ) -> "ConcurrencyLimit": + """ + Read the concurrency limit set on a specific tag. + + Args: + tag: a tag the concurrency limit is applied to + + Raises: + prefect.exceptions.ObjectNotFound: If request returns 404 + httpx.RequestError: if the concurrency limit was not created for any reason + + Returns: + the concurrency limit set on a specific tag + """ + try: + response = self.request( + "GET", + "/concurrency_limits/tag/{tag}", + path_params={"tag": tag}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + concurrency_limit_id = response.json().get("id") + + if not concurrency_limit_id: + raise RequestError(f"Malformed response: {response}") + from prefect.client.schemas.objects import ConcurrencyLimit + + return ConcurrencyLimit.model_validate(response.json()) + + def read_concurrency_limits( + self, + limit: int, + offset: int, + ) -> list["ConcurrencyLimit"]: + """ + Lists concurrency limits set on task run tags. + + Args: + limit: the maximum number of concurrency limits returned + offset: the concurrency limit query offset + + Returns: + a list of concurrency limits + """ + + body = { + "limit": limit, + "offset": offset, + } + + response = self.request("POST", "/concurrency_limits/filter", json=body) + from prefect.client.schemas.objects import ConcurrencyLimit + + return ConcurrencyLimit.model_validate_list(response.json()) + + def reset_concurrency_limit_by_tag( + self, + tag: str, + slot_override: list["UUID | str"] | None = None, + ) -> None: + """ + Resets the concurrency limit slots set on a specific tag. + + Args: + tag: a tag the concurrency limit is applied to + slot_override: a list of task run IDs that are currently using a + concurrency slot, please check that any task run IDs included in + `slot_override` are currently running, otherwise those concurrency + slots will never be released. + + Raises: + prefect.exceptions.ObjectNotFound: If request returns 404 + httpx.RequestError: If request fails + + """ + if slot_override is not None: + slot_override = [str(slot) for slot in slot_override] + + try: + self.request( + "POST", + "/concurrency_limits/tag/{tag}/reset", + path_params={"tag": tag}, + json=dict(slot_override=slot_override), + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + def delete_concurrency_limit_by_tag( + self, + tag: str, + ) -> None: + """ + Delete the concurrency limit set on a specific tag. + + Args: + tag: a tag the concurrency limit is applied to + + Raises: + prefect.exceptions.ObjectNotFound: If request returns 404 + httpx.RequestError: If request fails + + """ + try: + self.request( + "DELETE", + "/concurrency_limits/tag/{tag}", + path_params={"tag": tag}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + def increment_v1_concurrency_slots( + self, + names: list[str], + task_run_id: "UUID", + ) -> "Response": + """ + Increment concurrency limit slots for the specified limits. + + Args: + names (List[str]): A list of limit names for which to increment limits. + task_run_id (UUID): The task run ID incrementing the limits. + """ + data: dict[str, Any] = { + "names": names, + "task_run_id": str(task_run_id), + } + + return self.request( + "POST", + "/concurrency_limits/increment", + json=data, + ) + + def decrement_v1_concurrency_slots( + self, + names: list[str], + task_run_id: "UUID", + occupancy_seconds: float, + ) -> "Response": + """ + Decrement concurrency limit slots for the specified limits. + + Args: + names (List[str]): A list of limit names to decrement. + task_run_id (UUID): The task run ID that incremented the limits. + occupancy_seconds (float): The duration in seconds that the limits + were held. + + Returns: + httpx.Response: The HTTP response from the server. + """ + data: dict[str, Any] = { + "names": names, + "task_run_id": str(task_run_id), + "occupancy_seconds": occupancy_seconds, + } + + return self.request( + "POST", + "/concurrency_limits/decrement", + json=data, + ) + + +class ConcurrencyLimitAsyncClient(BaseAsyncClient): + async def create_concurrency_limit( + self, + tag: str, + concurrency_limit: int, + ) -> "UUID": + """ + Create a tag concurrency limit in the Prefect API. These limits govern concurrently + running tasks. + + Args: + tag: a tag the concurrency limit is applied to + concurrency_limit: the maximum number of concurrent task runs for a given tag + + Raises: + httpx.RequestError: if the concurrency limit was not created for any reason + + Returns: + the ID of the concurrency limit in the backend + """ + + concurrency_limit_create = ConcurrencyLimitCreate( + tag=tag, + concurrency_limit=concurrency_limit, + ) + response = await self.request( + "POST", + "/concurrency_limits/", + json=concurrency_limit_create.model_dump(mode="json"), + ) + + concurrency_limit_id = response.json().get("id") + + if not concurrency_limit_id: + raise RequestError(f"Malformed response: {response}") + from uuid import UUID + + return UUID(concurrency_limit_id) + + async def read_concurrency_limit_by_tag( + self, + tag: str, + ) -> "ConcurrencyLimit": + """ + Read the concurrency limit set on a specific tag. + + Args: + tag: a tag the concurrency limit is applied to + + Raises: + prefect.exceptions.ObjectNotFound: If request returns 404 + httpx.RequestError: if the concurrency limit was not created for any reason + + Returns: + the concurrency limit set on a specific tag + """ + try: + response = await self.request( + "GET", + "/concurrency_limits/tag/{tag}", + path_params={"tag": tag}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + concurrency_limit_id = response.json().get("id") + + if not concurrency_limit_id: + raise RequestError(f"Malformed response: {response}") + from prefect.client.schemas.objects import ConcurrencyLimit + + return ConcurrencyLimit.model_validate(response.json()) + + async def read_concurrency_limits( + self, + limit: int, + offset: int, + ) -> list["ConcurrencyLimit"]: + """ + Lists concurrency limits set on task run tags. + + Args: + limit: the maximum number of concurrency limits returned + offset: the concurrency limit query offset + + Returns: + a list of concurrency limits + """ + + body = { + "limit": limit, + "offset": offset, + } + + response = await self.request("POST", "/concurrency_limits/filter", json=body) + from prefect.client.schemas.objects import ConcurrencyLimit + + return ConcurrencyLimit.model_validate_list(response.json()) + + async def reset_concurrency_limit_by_tag( + self, + tag: str, + slot_override: list["UUID | str"] | None = None, + ) -> None: + """ + Resets the concurrency limit slots set on a specific tag. + + Args: + tag: a tag the concurrency limit is applied to + slot_override: a list of task run IDs that are currently using a + concurrency slot, please check that any task run IDs included in + `slot_override` are currently running, otherwise those concurrency + slots will never be released. + + Raises: + prefect.exceptions.ObjectNotFound: If request returns 404 + httpx.RequestError: If request fails + + """ + if slot_override is not None: + slot_override = [str(slot) for slot in slot_override] + + try: + await self.request( + "POST", + "/concurrency_limits/tag/{tag}/reset", + path_params={"tag": tag}, + json=dict(slot_override=slot_override), + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + async def delete_concurrency_limit_by_tag( + self, + tag: str, + ) -> None: + """ + Delete the concurrency limit set on a specific tag. + + Args: + tag: a tag the concurrency limit is applied to + + Raises: + prefect.exceptions.ObjectNotFound: If request returns 404 + httpx.RequestError: If request fails + + """ + try: + await self.request( + "DELETE", + "/concurrency_limits/tag/{tag}", + path_params={"tag": tag}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + async def increment_v1_concurrency_slots( + self, + names: list[str], + task_run_id: "UUID", + ) -> "Response": + """ + Increment concurrency limit slots for the specified limits. + + Args: + names (List[str]): A list of limit names for which to increment limits. + task_run_id (UUID): The task run ID incrementing the limits. + """ + data: dict[str, Any] = { + "names": names, + "task_run_id": str(task_run_id), + } + + return await self.request( + "POST", + "/concurrency_limits/increment", + json=data, + ) + + async def decrement_v1_concurrency_slots( + self, + names: list[str], + task_run_id: "UUID", + occupancy_seconds: float, + ) -> "Response": + """ + Decrement concurrency limit slots for the specified limits. + + Args: + names (List[str]): A list of limit names to decrement. + task_run_id (UUID): The task run ID that incremented the limits. + occupancy_seconds (float): The duration in seconds that the limits + were held. + + Returns: + httpx.Response: The HTTP response from the server. + """ + data: dict[str, Any] = { + "names": names, + "task_run_id": str(task_run_id), + "occupancy_seconds": occupancy_seconds, + } + + return await self.request( + "POST", + "/concurrency_limits/decrement", + json=data, + ) From 4e13b7237ffd22d7a91ef9e7634df80d5cae8823 Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+AAAZZAM@users.noreply.github.com> Date: Wed, 1 Jan 2025 15:37:34 -0600 Subject: [PATCH 02/11] also tackle gcls --- src/prefect/client/orchestration/__init__.py | 181 ---------- .../_concurrency_limits/client.py | 319 +++++++++++++++++- 2 files changed, 310 insertions(+), 190 deletions(-) diff --git a/src/prefect/client/orchestration/__init__.py b/src/prefect/client/orchestration/__init__.py index b4c305e76b0f..3935de981e68 100644 --- a/src/prefect/client/orchestration/__init__.py +++ b/src/prefect/client/orchestration/__init__.py @@ -53,8 +53,6 @@ FlowRunNotificationPolicyCreate, FlowRunNotificationPolicyUpdate, FlowRunUpdate, - GlobalConcurrencyLimitCreate, - GlobalConcurrencyLimitUpdate, LogCreate, TaskRunCreate, TaskRunUpdate, @@ -102,7 +100,6 @@ from prefect.client.schemas.responses import ( DeploymentResponse, FlowRunResponse, - GlobalConcurrencyLimitResponse, WorkerFlowRunResponse, ) from prefect.client.schemas.schedules import SCHEDULE_TYPES @@ -2810,136 +2807,6 @@ async def read_worker_metadata(self) -> dict[str, Any]: response.raise_for_status() return response.json() - async def increment_concurrency_slots( - self, - names: list[str], - slots: int, - mode: str, - create_if_missing: Optional[bool] = None, - ) -> httpx.Response: - return await self._client.post( - "/v2/concurrency_limits/increment", - json={ - "names": names, - "slots": slots, - "mode": mode, - "create_if_missing": create_if_missing if create_if_missing else False, - }, - ) - - async def release_concurrency_slots( - self, names: list[str], slots: int, occupancy_seconds: float - ) -> httpx.Response: - """ - Release concurrency slots for the specified limits. - - Args: - names (List[str]): A list of limit names for which to release slots. - slots (int): The number of concurrency slots to release. - occupancy_seconds (float): The duration in seconds that the slots - were occupied. - - Returns: - httpx.Response: The HTTP response from the server. - """ - - return await self._client.post( - "/v2/concurrency_limits/decrement", - json={ - "names": names, - "slots": slots, - "occupancy_seconds": occupancy_seconds, - }, - ) - - async def create_global_concurrency_limit( - self, concurrency_limit: GlobalConcurrencyLimitCreate - ) -> UUID: - response = await self._client.post( - "/v2/concurrency_limits/", - json=concurrency_limit.model_dump(mode="json", exclude_unset=True), - ) - return UUID(response.json()["id"]) - - async def update_global_concurrency_limit( - self, name: str, concurrency_limit: GlobalConcurrencyLimitUpdate - ) -> httpx.Response: - try: - response = await self._client.patch( - f"/v2/concurrency_limits/{name}", - json=concurrency_limit.model_dump(mode="json", exclude_unset=True), - ) - return response - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - - async def delete_global_concurrency_limit_by_name( - self, name: str - ) -> httpx.Response: - try: - response = await self._client.delete(f"/v2/concurrency_limits/{name}") - return response - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - - async def read_global_concurrency_limit_by_name( - self, name: str - ) -> GlobalConcurrencyLimitResponse: - try: - response = await self._client.get(f"/v2/concurrency_limits/{name}") - return GlobalConcurrencyLimitResponse.model_validate(response.json()) - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - - async def upsert_global_concurrency_limit_by_name( - self, name: str, limit: int - ) -> None: - """Creates a global concurrency limit with the given name and limit if one does not already exist. - - If one does already exist matching the name then update it's limit if it is different. - - Note: This is not done atomically. - """ - try: - existing_limit = await self.read_global_concurrency_limit_by_name(name) - except prefect.exceptions.ObjectNotFound: - existing_limit = None - - if not existing_limit: - await self.create_global_concurrency_limit( - GlobalConcurrencyLimitCreate( - name=name, - limit=limit, - ) - ) - elif existing_limit.limit != limit: - await self.update_global_concurrency_limit( - name, GlobalConcurrencyLimitUpdate(limit=limit) - ) - - async def read_global_concurrency_limits( - self, limit: int = 10, offset: int = 0 - ) -> list[GlobalConcurrencyLimitResponse]: - response = await self._client.post( - "/v2/concurrency_limits/filter", - json={ - "limit": limit, - "offset": offset, - }, - ) - return pydantic.TypeAdapter( - list[GlobalConcurrencyLimitResponse] - ).validate_python(response.json()) - async def create_flow_run_input( self, flow_run_id: UUID, key: str, value: str, sender: Optional[str] = None ) -> None: @@ -4017,54 +3884,6 @@ def read_deployment_by_name( return DeploymentResponse.model_validate(response.json()) - def release_concurrency_slots( - self, names: list[str], slots: int, occupancy_seconds: float - ) -> httpx.Response: - """ - Release concurrency slots for the specified limits. - - Args: - names (List[str]): A list of limit names for which to release slots. - slots (int): The number of concurrency slots to release. - occupancy_seconds (float): The duration in seconds that the slots - were occupied. - - Returns: - httpx.Response: The HTTP response from the server. - """ - return self._client.post( - "/v2/concurrency_limits/decrement", - json={ - "names": names, - "slots": slots, - "occupancy_seconds": occupancy_seconds, - }, - ) - - def decrement_v1_concurrency_slots( - self, names: list[str], occupancy_seconds: float, task_run_id: UUID - ) -> httpx.Response: - """ - Release the specified concurrency limits. - - Args: - names (List[str]): A list of limit names to decrement. - occupancy_seconds (float): The duration in seconds that the slots - were held. - task_run_id (UUID): The task run ID that incremented the limits. - - Returns: - httpx.Response: The HTTP response from the server. - """ - return self._client.post( - "/concurrency_limits/decrement", - json={ - "names": names, - "occupancy_seconds": occupancy_seconds, - "task_run_id": str(task_run_id), - }, - ) - def update_flow_run_labels( self, flow_run_id: UUID, labels: KeyValueLabelsField ) -> None: diff --git a/src/prefect/client/orchestration/_concurrency_limits/client.py b/src/prefect/client/orchestration/_concurrency_limits/client.py index 847dcb321075..bddbe691bc44 100644 --- a/src/prefect/client/orchestration/_concurrency_limits/client.py +++ b/src/prefect/client/orchestration/_concurrency_limits/client.py @@ -12,8 +12,13 @@ from httpx import Response - from prefect.client.schemas.actions import ConcurrencyLimitCreate + from prefect.client.schemas.actions import ( + ConcurrencyLimitCreate, + GlobalConcurrencyLimitCreate, + GlobalConcurrencyLimitUpdate, + ) from prefect.client.schemas.objects import ConcurrencyLimit + from prefect.client.schemas.responses import GlobalConcurrencyLimitResponse class ConcurrencyLimitClient(BaseClient): @@ -66,7 +71,7 @@ def read_concurrency_limit_by_tag( tag: a tag the concurrency limit is applied to Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 + ObjectNotFound: If request returns 404 httpx.RequestError: if the concurrency limit was not created for any reason Returns: @@ -134,7 +139,7 @@ def reset_concurrency_limit_by_tag( slots will never be released. Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 + ObjectNotFound: If request returns 404 httpx.RequestError: If request fails """ @@ -165,7 +170,7 @@ def delete_concurrency_limit_by_tag( tag: a tag the concurrency limit is applied to Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 + ObjectNotFound: If request returns 404 httpx.RequestError: If request fails """ @@ -220,7 +225,7 @@ def decrement_v1_concurrency_slots( were held. Returns: - httpx.Response: The HTTP response from the server. + "Response": The HTTP response from the server. """ data: dict[str, Any] = { "names": names, @@ -234,6 +239,153 @@ def decrement_v1_concurrency_slots( json=data, ) + def increment_concurrency_slots( + self, + names: list[str], + slots: int, + mode: str, + create_if_missing: bool | None = None, + ) -> "Response": + return self.request( + "POST", + "/v2/concurrency_limits/increment", + json={ + "names": names, + "slots": slots, + "mode": mode, + "create_if_missing": create_if_missing if create_if_missing else False, + }, + ) + + def release_concurrency_slots( + self, names: list[str], slots: int, occupancy_seconds: float + ) -> "Response": + """ + Release concurrency slots for the specified limits. + + Args: + names (List[str]): A list of limit names for which to release slots. + slots (int): The number of concurrency slots to release. + occupancy_seconds (float): The duration in seconds that the slots + were occupied. + + Returns: + "Response": The HTTP response from the server. + """ + + return self.request( + "POST", + "/v2/concurrency_limits/decrement", + json={ + "names": names, + "slots": slots, + "occupancy_seconds": occupancy_seconds, + }, + ) + + def create_global_concurrency_limit( + self, concurrency_limit: "GlobalConcurrencyLimitCreate" + ) -> "UUID": + response = self.request( + "POST", + "/v2/concurrency_limits/", + json=concurrency_limit.model_dump(mode="json", exclude_unset=True), + ) + from uuid import UUID + + return UUID(response.json()["id"]) + + def update_global_concurrency_limit( + self, name: str, concurrency_limit: "GlobalConcurrencyLimitUpdate" + ) -> "Response": + try: + response = self.request( + "PATCH", + "/v2/concurrency_limits/{id_or_name}", + path_params={"id_or_name": name}, + json=concurrency_limit.model_dump(mode="json", exclude_unset=True), + ) + return response + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + def delete_global_concurrency_limit_by_name(self, name: str) -> "Response": + try: + response = self.request( + "DELETE", + "/v2/concurrency_limits/{id_or_name}", + path_params={"id_or_name": name}, + ) + return response + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + def read_global_concurrency_limit_by_name( + self, name: str + ) -> "GlobalConcurrencyLimitResponse": + try: + response = self.request( + "GET", + "/v2/concurrency_limits/{id_or_name}", + path_params={"id_or_name": name}, + ) + from prefect.client.schemas.responses import GlobalConcurrencyLimitResponse + + return GlobalConcurrencyLimitResponse.model_validate(response.json()) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + def upsert_global_concurrency_limit_by_name(self, name: str, limit: int) -> None: + """Creates a global concurrency limit with the given name and limit if one does not already exist. + + If one does already exist matching the name then update it's limit if it is different. + + Note: This is not done atomically. + """ + from prefect.client.schemas.actions import GlobalConcurrencyLimitCreate + + try: + existing_limit = self.read_global_concurrency_limit_by_name(name) + except ObjectNotFound: + existing_limit = None + + if not existing_limit: + self.create_global_concurrency_limit( + GlobalConcurrencyLimitCreate( + name=name, + limit=limit, + ) + ) + elif existing_limit.limit != limit: + self.update_global_concurrency_limit( + name, GlobalConcurrencyLimitUpdate(limit=limit) + ) + + def read_global_concurrency_limits( + self, limit: int = 10, offset: int = 0 + ) -> list["GlobalConcurrencyLimitResponse"]: + response = self.request( + "POST", + "/v2/concurrency_limits/filter", + json={ + "limit": limit, + "offset": offset, + }, + ) + + from prefect.client.schemas.responses import GlobalConcurrencyLimitResponse + + return GlobalConcurrencyLimitResponse.model_validate_list(response.json()) + class ConcurrencyLimitAsyncClient(BaseAsyncClient): async def create_concurrency_limit( @@ -285,7 +437,7 @@ async def read_concurrency_limit_by_tag( tag: a tag the concurrency limit is applied to Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 + ObjectNotFound: If request returns 404 httpx.RequestError: if the concurrency limit was not created for any reason Returns: @@ -353,7 +505,7 @@ async def reset_concurrency_limit_by_tag( slots will never be released. Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 + ObjectNotFound: If request returns 404 httpx.RequestError: If request fails """ @@ -384,7 +536,7 @@ async def delete_concurrency_limit_by_tag( tag: a tag the concurrency limit is applied to Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 + ObjectNotFound: If request returns 404 httpx.RequestError: If request fails """ @@ -439,7 +591,7 @@ async def decrement_v1_concurrency_slots( were held. Returns: - httpx.Response: The HTTP response from the server. + "Response": The HTTP response from the server. """ data: dict[str, Any] = { "names": names, @@ -452,3 +604,152 @@ async def decrement_v1_concurrency_slots( "/concurrency_limits/decrement", json=data, ) + + async def increment_concurrency_slots( + self, + names: list[str], + slots: int, + mode: str, + create_if_missing: bool | None = None, + ) -> "Response": + return await self.request( + "POST", + "/v2/concurrency_limits/increment", + json={ + "names": names, + "slots": slots, + "mode": mode, + "create_if_missing": create_if_missing if create_if_missing else False, + }, + ) + + async def release_concurrency_slots( + self, names: list[str], slots: int, occupancy_seconds: float + ) -> "Response": + """ + Release concurrency slots for the specified limits. + + Args: + names (List[str]): A list of limit names for which to release slots. + slots (int): The number of concurrency slots to release. + occupancy_seconds (float): The duration in seconds that the slots + were occupied. + + Returns: + "Response": The HTTP response from the server. + """ + + return await self.request( + "POST", + "/v2/concurrency_limits/decrement", + json={ + "names": names, + "slots": slots, + "occupancy_seconds": occupancy_seconds, + }, + ) + + async def create_global_concurrency_limit( + self, concurrency_limit: "GlobalConcurrencyLimitCreate" + ) -> "UUID": + response = await self.request( + "POST", + "/v2/concurrency_limits/", + json=concurrency_limit.model_dump(mode="json", exclude_unset=True), + ) + from uuid import UUID + + return UUID(response.json()["id"]) + + async def update_global_concurrency_limit( + self, name: str, concurrency_limit: "GlobalConcurrencyLimitUpdate" + ) -> "Response": + try: + response = await self.request( + "PATCH", + "/v2/concurrency_limits/{id_or_name}", + path_params={"id_or_name": name}, + json=concurrency_limit.model_dump(mode="json", exclude_unset=True), + ) + return response + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + async def delete_global_concurrency_limit_by_name(self, name: str) -> "Response": + try: + response = await self.request( + "DELETE", + "/v2/concurrency_limits/{id_or_name}", + path_params={"id_or_name": name}, + ) + return response + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + async def read_global_concurrency_limit_by_name( + self, name: str + ) -> "GlobalConcurrencyLimitResponse": + try: + response = await self.request( + "GET", + "/v2/concurrency_limits/{id_or_name}", + path_params={"id_or_name": name}, + ) + from prefect.client.schemas.responses import GlobalConcurrencyLimitResponse + + return GlobalConcurrencyLimitResponse.model_validate(response.json()) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + async def upsert_global_concurrency_limit_by_name( + self, name: str, limit: int + ) -> None: + """Creates a global concurrency limit with the given name and limit if one does not already exist. + + If one does already exist matching the name then update it's limit if it is different. + + Note: This is not done atomically. + """ + from prefect.client.schemas.actions import GlobalConcurrencyLimitCreate + + try: + existing_limit = await self.read_global_concurrency_limit_by_name(name) + except ObjectNotFound: + existing_limit = None + + if not existing_limit: + await self.create_global_concurrency_limit( + GlobalConcurrencyLimitCreate( + name=name, + limit=limit, + ) + ) + elif existing_limit.limit != limit: + await self.update_global_concurrency_limit( + name, GlobalConcurrencyLimitUpdate(limit=limit) + ) + + async def read_global_concurrency_limits( + self, limit: int = 10, offset: int = 0 + ) -> list["GlobalConcurrencyLimitResponse"]: + response = await self.request( + "POST", + "/v2/concurrency_limits/filter", + json={ + "limit": limit, + "offset": offset, + }, + ) + + from prefect.client.schemas.responses import GlobalConcurrencyLimitResponse + + return GlobalConcurrencyLimitResponse.model_validate_list(response.json()) From 95225c3f3b3cf2d1479e55529743390a5ef23486 Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+AAAZZAM@users.noreply.github.com> Date: Wed, 1 Jan 2025 15:37:49 -0600 Subject: [PATCH 03/11] lint --- src/prefect/client/orchestration/__init__.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/prefect/client/orchestration/__init__.py b/src/prefect/client/orchestration/__init__.py index 3935de981e68..f31706910013 100644 --- a/src/prefect/client/orchestration/__init__.py +++ b/src/prefect/client/orchestration/__init__.py @@ -152,13 +152,15 @@ def get_client( *, httpx_settings: Optional[dict[str, Any]] = ..., sync_client: Literal[False] = False, -) -> "PrefectClient": ... +) -> "PrefectClient": + ... @overload def get_client( *, httpx_settings: Optional[dict[str, Any]] = ..., sync_client: Literal[True] = ... -) -> "SyncPrefectClient": ... +) -> "SyncPrefectClient": + ... def get_client( From 2a082012ff3350330b4ac4dd5386acc5042015f2 Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+AAAZZAM@users.noreply.github.com> Date: Wed, 1 Jan 2025 16:23:05 -0600 Subject: [PATCH 04/11] Update client.py --- .../client/orchestration/_concurrency_limits/client.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/prefect/client/orchestration/_concurrency_limits/client.py b/src/prefect/client/orchestration/_concurrency_limits/client.py index bddbe691bc44..21d494d8dd9d 100644 --- a/src/prefect/client/orchestration/_concurrency_limits/client.py +++ b/src/prefect/client/orchestration/_concurrency_limits/client.py @@ -41,6 +41,7 @@ def create_concurrency_limit( Returns: the ID of the concurrency limit in the backend """ + from prefect.client.schemas.actions import ConcurrencyLimitCreate concurrency_limit_create = ConcurrencyLimitCreate( tag=tag, @@ -351,7 +352,10 @@ def upsert_global_concurrency_limit_by_name(self, name: str, limit: int) -> None Note: This is not done atomically. """ - from prefect.client.schemas.actions import GlobalConcurrencyLimitCreate + from prefect.client.schemas.actions import ( + GlobalConcurrencyLimitCreate, + GlobalConcurrencyLimitUpdate, + ) try: existing_limit = self.read_global_concurrency_limit_by_name(name) From 9d74d9c5f91e9db85bf06597c058fce8d167fa9d Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+AAAZZAM@users.noreply.github.com> Date: Wed, 1 Jan 2025 16:38:23 -0600 Subject: [PATCH 05/11] Update client.py --- .../client/orchestration/_concurrency_limits/client.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/prefect/client/orchestration/_concurrency_limits/client.py b/src/prefect/client/orchestration/_concurrency_limits/client.py index 21d494d8dd9d..46093387f8c8 100644 --- a/src/prefect/client/orchestration/_concurrency_limits/client.py +++ b/src/prefect/client/orchestration/_concurrency_limits/client.py @@ -13,7 +13,6 @@ from httpx import Response from prefect.client.schemas.actions import ( - ConcurrencyLimitCreate, GlobalConcurrencyLimitCreate, GlobalConcurrencyLimitUpdate, ) @@ -411,6 +410,7 @@ async def create_concurrency_limit( Returns: the ID of the concurrency limit in the backend """ + from prefect.client.schemas.actions import ConcurrencyLimitCreate concurrency_limit_create = ConcurrencyLimitCreate( tag=tag, @@ -723,7 +723,10 @@ async def upsert_global_concurrency_limit_by_name( Note: This is not done atomically. """ - from prefect.client.schemas.actions import GlobalConcurrencyLimitCreate + from prefect.client.schemas.actions import ( + GlobalConcurrencyLimitCreate, + GlobalConcurrencyLimitUpdate, + ) try: existing_limit = await self.read_global_concurrency_limit_by_name(name) From 7d416c3791e1cf9c79601b225adb57ffe566261f Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+AAAZZAM@users.noreply.github.com> Date: Wed, 1 Jan 2025 17:20:39 -0600 Subject: [PATCH 06/11] lint --- src/prefect/client/orchestration/__init__.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/prefect/client/orchestration/__init__.py b/src/prefect/client/orchestration/__init__.py index 955245ea424c..9103d12d5b5d 100644 --- a/src/prefect/client/orchestration/__init__.py +++ b/src/prefect/client/orchestration/__init__.py @@ -29,6 +29,8 @@ from prefect.client.orchestration._concurrency_limits.client import ( ConcurrencyLimitAsyncClient, ConcurrencyLimitClient, +) + from prefect.client.orchestration._logs.client import ( LogClient, LogAsyncClient, @@ -252,7 +254,7 @@ class PrefectClient( ArtifactCollectionAsyncClient, LogAsyncClient, VariableAsyncClient, - ConcurrencyLimitAsyncClient + ConcurrencyLimitAsyncClient, ): """ An asynchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/). @@ -3023,7 +3025,7 @@ class SyncPrefectClient( ArtifactCollectionClient, LogClient, VariableClient, - ConcurrencyLimitClient + ConcurrencyLimitClient, ): """ A synchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/). From 40f857ec8451c8f25d1409b5f38da4c04128b8d5 Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+aaazzam@users.noreply.github.com> Date: Wed, 1 Jan 2025 18:23:58 -0600 Subject: [PATCH 07/11] Update src/prefect/client/orchestration/_concurrency_limits/client.py Co-authored-by: nate nowack --- .../client/orchestration/_concurrency_limits/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/prefect/client/orchestration/_concurrency_limits/client.py b/src/prefect/client/orchestration/_concurrency_limits/client.py index 46093387f8c8..7983bf71ad8a 100644 --- a/src/prefect/client/orchestration/_concurrency_limits/client.py +++ b/src/prefect/client/orchestration/_concurrency_limits/client.py @@ -565,8 +565,8 @@ async def increment_v1_concurrency_slots( Increment concurrency limit slots for the specified limits. Args: - names (List[str]): A list of limit names for which to increment limits. - task_run_id (UUID): The task run ID incrementing the limits. + names: A list of limit names for which to increment limits. + task_run_id: The task run ID incrementing the limits. """ data: dict[str, Any] = { "names": names, From 79f6997a188dd07eb45cd9976ed93a27e5fcaaf2 Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+aaazzam@users.noreply.github.com> Date: Wed, 1 Jan 2025 18:24:06 -0600 Subject: [PATCH 08/11] Update src/prefect/client/orchestration/_concurrency_limits/client.py Co-authored-by: nate nowack --- .../client/orchestration/_concurrency_limits/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/prefect/client/orchestration/_concurrency_limits/client.py b/src/prefect/client/orchestration/_concurrency_limits/client.py index 7983bf71ad8a..b4852935d953 100644 --- a/src/prefect/client/orchestration/_concurrency_limits/client.py +++ b/src/prefect/client/orchestration/_concurrency_limits/client.py @@ -589,8 +589,8 @@ async def decrement_v1_concurrency_slots( Decrement concurrency limit slots for the specified limits. Args: - names (List[str]): A list of limit names to decrement. - task_run_id (UUID): The task run ID that incremented the limits. + names: A list of limit names to decrement. + task_run_id: The task run ID that incremented the limits. occupancy_seconds (float): The duration in seconds that the limits were held. From 7792364067509685d5cf8b71fda593c05a05a060 Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+aaazzam@users.noreply.github.com> Date: Wed, 1 Jan 2025 18:24:12 -0600 Subject: [PATCH 09/11] Update src/prefect/client/orchestration/_concurrency_limits/client.py Co-authored-by: nate nowack --- .../client/orchestration/_concurrency_limits/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/prefect/client/orchestration/_concurrency_limits/client.py b/src/prefect/client/orchestration/_concurrency_limits/client.py index b4852935d953..394a390effbc 100644 --- a/src/prefect/client/orchestration/_concurrency_limits/client.py +++ b/src/prefect/client/orchestration/_concurrency_limits/client.py @@ -634,8 +634,8 @@ async def release_concurrency_slots( Release concurrency slots for the specified limits. Args: - names (List[str]): A list of limit names for which to release slots. - slots (int): The number of concurrency slots to release. + names: A list of limit names for which to release slots. + slots: The number of concurrency slots to release. occupancy_seconds (float): The duration in seconds that the slots were occupied. From 5195dbf4c2ffd58a96022a74284097cdaebd8d41 Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+aaazzam@users.noreply.github.com> Date: Wed, 1 Jan 2025 18:24:19 -0600 Subject: [PATCH 10/11] Update src/prefect/client/orchestration/_concurrency_limits/client.py Co-authored-by: nate nowack --- .../client/orchestration/_concurrency_limits/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/prefect/client/orchestration/_concurrency_limits/client.py b/src/prefect/client/orchestration/_concurrency_limits/client.py index 394a390effbc..21a4114aec5d 100644 --- a/src/prefect/client/orchestration/_concurrency_limits/client.py +++ b/src/prefect/client/orchestration/_concurrency_limits/client.py @@ -219,8 +219,8 @@ def decrement_v1_concurrency_slots( Decrement concurrency limit slots for the specified limits. Args: - names (List[str]): A list of limit names to decrement. - task_run_id (UUID): The task run ID that incremented the limits. + names: A list of limit names to decrement. + task_run_id: The task run ID that incremented the limits. occupancy_seconds (float): The duration in seconds that the limits were held. From e6ec6a9a49fd20c6c9b15a1bdf8b6724b8b7acbc Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+aaazzam@users.noreply.github.com> Date: Wed, 1 Jan 2025 18:24:26 -0600 Subject: [PATCH 11/11] Update src/prefect/client/orchestration/_concurrency_limits/client.py Co-authored-by: nate nowack --- .../client/orchestration/_concurrency_limits/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/prefect/client/orchestration/_concurrency_limits/client.py b/src/prefect/client/orchestration/_concurrency_limits/client.py index 21a4114aec5d..5872fceae766 100644 --- a/src/prefect/client/orchestration/_concurrency_limits/client.py +++ b/src/prefect/client/orchestration/_concurrency_limits/client.py @@ -264,8 +264,8 @@ def release_concurrency_slots( Release concurrency slots for the specified limits. Args: - names (List[str]): A list of limit names for which to release slots. - slots (int): The number of concurrency slots to release. + names: A list of limit names for which to release slots. + slots: The number of concurrency slots to release. occupancy_seconds (float): The duration in seconds that the slots were occupied.