From aab2583665223257ab1667cdecf429eb7aad2d39 Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+AAAZZAM@users.noreply.github.com> Date: Tue, 31 Dec 2024 17:35:04 -0600 Subject: [PATCH 1/4] init --- src/prefect/client/orchestration/__init__.py | 130 ++-------------- .../orchestration/_variables/__init__.py | 0 .../client/orchestration/_variables/client.py | 139 ++++++++++++++++++ 3 files changed, 155 insertions(+), 114 deletions(-) create mode 100644 src/prefect/client/orchestration/_variables/__init__.py create mode 100644 src/prefect/client/orchestration/_variables/client.py diff --git a/src/prefect/client/orchestration/__init__.py b/src/prefect/client/orchestration/__init__.py index 484944de1185..4976f5f5f47b 100644 --- a/src/prefect/client/orchestration/__init__.py +++ b/src/prefect/client/orchestration/__init__.py @@ -25,6 +25,10 @@ ArtifactCollectionClient, ArtifactCollectionAsyncClient, ) +from prefect.client.orchestration._variables.client import ( + VariableClient, + VariableAsyncClient, +) import prefect import prefect.exceptions @@ -54,8 +58,6 @@ LogCreate, TaskRunCreate, TaskRunUpdate, - VariableCreate, - VariableUpdate, WorkPoolCreate, WorkPoolUpdate, WorkQueueCreate, @@ -89,7 +91,6 @@ Parameter, TaskRunPolicy, TaskRunResult, - Variable, Worker, WorkerMetadata, WorkPool, @@ -152,15 +153,13 @@ def get_client( *, httpx_settings: Optional[dict[str, Any]] = ..., sync_client: Literal[False] = False, -) -> "PrefectClient": - ... +) -> "PrefectClient": ... @overload def get_client( *, httpx_settings: Optional[dict[str, Any]] = ..., sync_client: Literal[True] = ... -) -> "SyncPrefectClient": - ... +) -> "SyncPrefectClient": ... def get_client( @@ -244,7 +243,11 @@ def get_client( ) -class PrefectClient(ArtifactAsyncClient, ArtifactCollectionAsyncClient): +class PrefectClient( + ArtifactAsyncClient, + ArtifactCollectionAsyncClient, + VariableAsyncClient, +): """ An asynchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/). @@ -2953,61 +2956,6 @@ async def get_scheduled_flow_runs_for_work_pool( response.json() ) - async def create_variable(self, variable: VariableCreate) -> Variable: - """ - Creates an variable with the provided configuration. - - Args: - variable: Desired configuration for the new variable. - Returns: - Information about the newly created variable. - """ - response = await self._client.post( - "/variables/", - json=variable.model_dump(mode="json", exclude_unset=True), - ) - return Variable(**response.json()) - - async def update_variable(self, variable: VariableUpdate) -> None: - """ - Updates a variable with the provided configuration. - - Args: - variable: Desired configuration for the updated variable. - Returns: - Information about the updated variable. - """ - await self._client.patch( - f"/variables/name/{variable.name}", - json=variable.model_dump(mode="json", exclude_unset=True), - ) - - async def read_variable_by_name(self, name: str) -> Optional[Variable]: - """Reads a variable by name. Returns None if no variable is found.""" - try: - response = await self._client.get(f"/variables/name/{name}") - return Variable(**response.json()) - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - return None - else: - raise - - async def delete_variable_by_name(self, name: str) -> None: - """Deletes a variable by name.""" - try: - await self._client.delete(f"/variables/name/{name}") - except httpx.HTTPStatusError as e: - if e.response.status_code == 404: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - - async def read_variables(self, limit: Optional[int] = None) -> list[Variable]: - """Reads all variables.""" - response = await self._client.post("/variables/filter", json={"limit": limit}) - return pydantic.TypeAdapter(list[Variable]).validate_python(response.json()) - async def read_worker_metadata(self) -> dict[str, Any]: """Reads worker metadata stored in Prefect collection registry.""" response = await self._client.get("collections/views/aggregate-worker-metadata") @@ -3436,7 +3384,11 @@ def __exit__(self, *_: object) -> NoReturn: assert False, "This should never be called but must be defined for __enter__" -class SyncPrefectClient(ArtifactClient, ArtifactCollectionClient): +class SyncPrefectClient( + ArtifactClient, + ArtifactCollectionClient, + VariableClient, +): """ A synchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/). @@ -4316,53 +4268,3 @@ def read_block_document_by_name( else: raise return BlockDocument.model_validate(response.json()) - - def create_variable(self, variable: VariableCreate) -> Variable: - """ - Creates an variable with the provided configuration. - - Args: - variable: Desired configuration for the new variable. - Returns: - Information about the newly created variable. - """ - response = self._client.post( - "/variables/", - json=variable.model_dump(mode="json", exclude_unset=True), - ) - return Variable(**response.json()) - - def update_variable(self, variable: VariableUpdate) -> None: - """ - Updates a variable with the provided configuration. - - Args: - variable: Desired configuration for the updated variable. - Returns: - Information about the updated variable. - """ - self._client.patch( - f"/variables/name/{variable.name}", - json=variable.model_dump(mode="json", exclude_unset=True), - ) - - def read_variable_by_name(self, name: str) -> Optional[Variable]: - """Reads a variable by name. Returns None if no variable is found.""" - try: - response = self._client.get(f"/variables/name/{name}") - return Variable(**response.json()) - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - return None - else: - raise - - def delete_variable_by_name(self, name: str) -> None: - """Deletes a variable by name.""" - try: - self._client.delete(f"/variables/name/{name}") - except httpx.HTTPStatusError as e: - if e.response.status_code == 404: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise diff --git a/src/prefect/client/orchestration/_variables/__init__.py b/src/prefect/client/orchestration/_variables/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/src/prefect/client/orchestration/_variables/client.py b/src/prefect/client/orchestration/_variables/client.py new file mode 100644 index 000000000000..ac9d84136ee2 --- /dev/null +++ b/src/prefect/client/orchestration/_variables/client.py @@ -0,0 +1,139 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import httpx + +from prefect.client.orchestration.base import BaseAsyncClient, BaseClient +from prefect.exceptions import ObjectNotFound + +if TYPE_CHECKING: + from prefect.client.schemas.actions import ( + VariableCreate, + VariableUpdate, + ) + from prefect.client.schemas.objects import ( + Variable, + ) + + +class VariableClient(BaseClient): + def create_variable(self, variable: "VariableCreate") -> "Variable": + """ + Creates an variable with the provided configuration. + + Args: + variable: Desired configuration for the new variable. + Returns: + Information about the newly created variable. + """ + response = self._client.post( + "/variables/", + json=variable.model_dump(mode="json", exclude_unset=True), + ) + from prefect.client.schemas.objects import Variable + + return Variable.model_validate(response.json()) + + def read_variable_by_name(self, name: str) -> "Variable | None": + """Reads a variable by name. Returns None if no variable is found.""" + try: + response = self._client.get(f"/variables/name/{name}") + from prefect.client.schemas.objects import Variable + + return Variable(**response.json()) + except httpx.HTTPStatusError as e: + if e.response.status_code == status.HTTP_404_NOT_FOUND: + return None + else: + raise + + def read_variables(self, limit: int | None = None) -> list["Variable"]: + """Reads all variables.""" + response = self._client.post("/variables/filter", json={"limit": limit}) + from prefect.client.schemas.objects import Variable + + return Variable.model_validate_list(response.json()) + + def update_variable(self, variable: "VariableUpdate") -> None: + """ + Updates a variable with the provided configuration. + + Args: + variable: Desired configuration for the updated variable. + Returns: + Information about the updated variable. + """ + self._client.patch( + f"/variables/name/{variable.name}", + json=variable.model_dump(mode="json", exclude_unset=True), + ) + return None + + def delete_variable_by_name(self, name: str) -> None: + """Deletes a variable by name.""" + try: + self._client.delete(f"/variables/name/{name}") + return None + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + +class VariableAsyncClient(BaseAsyncClient): + async def create_variable(self, variable: "VariableCreate") -> "Variable": + """Creates a variable with the provided configuration.""" + response = await self._client.post( + "/variables/", + json=variable.model_dump(mode="json", exclude_unset=True), + ) + from prefect.client.schemas.objects import Variable + + return Variable.model_validate(response.json()) + + async def read_variable_by_name(self, name: str) -> "Variable | None": + """Reads a variable by name. Returns None if no variable is found.""" + try: + response = await self._client.get(f"/variables/name/{name}") + from prefect.client.schemas.objects import Variable + + return Variable.model_validate(response.json()) + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + return None + else: + raise + + async def read_variables(self, limit: int | None = None) -> list["Variable"]: + """Reads all variables.""" + response = await self._client.post("/variables/filter", json={"limit": limit}) + from prefect.client.schemas.objects import Variable + + return Variable.model_validate_list(response.json()) + + async def update_variable(self, variable: "VariableUpdate") -> None: + """ + Updates a variable with the provided configuration. + + Args: + variable: Desired configuration for the updated variable. + Returns: + Information about the updated variable. + """ + await self._client.patch( + f"/variables/name/{variable.name}", + json=variable.model_dump(mode="json", exclude_unset=True), + ) + return None + + async def delete_variable_by_name(self, name: str) -> None: + """Deletes a variable by name.""" + try: + await self._client.delete(f"/variables/name/{name}") + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise From 9bb3e7fa2409d416398f986eeae20bc549b83912 Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+AAAZZAM@users.noreply.github.com> Date: Tue, 31 Dec 2024 17:58:12 -0600 Subject: [PATCH 2/4] Update client.py --- src/prefect/client/orchestration/_variables/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/prefect/client/orchestration/_variables/client.py b/src/prefect/client/orchestration/_variables/client.py index ac9d84136ee2..dcdfed45a581 100644 --- a/src/prefect/client/orchestration/_variables/client.py +++ b/src/prefect/client/orchestration/_variables/client.py @@ -43,7 +43,7 @@ def read_variable_by_name(self, name: str) -> "Variable | None": return Variable(**response.json()) except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: + if e.response.status_code == 404: return None else: raise From decba838838ea619c381c8706d466916551e827d Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+AAAZZAM@users.noreply.github.com> Date: Tue, 31 Dec 2024 18:04:25 -0600 Subject: [PATCH 3/4] linit --- src/prefect/client/orchestration/__init__.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/prefect/client/orchestration/__init__.py b/src/prefect/client/orchestration/__init__.py index 4976f5f5f47b..e288f2574bbd 100644 --- a/src/prefect/client/orchestration/__init__.py +++ b/src/prefect/client/orchestration/__init__.py @@ -153,13 +153,15 @@ def get_client( *, httpx_settings: Optional[dict[str, Any]] = ..., sync_client: Literal[False] = False, -) -> "PrefectClient": ... +) -> "PrefectClient": + ... @overload def get_client( *, httpx_settings: Optional[dict[str, Any]] = ..., sync_client: Literal[True] = ... -) -> "SyncPrefectClient": ... +) -> "SyncPrefectClient": + ... def get_client( From e5e49448096532acff89eebcdcebf3fb32bcdbdc Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+AAAZZAM@users.noreply.github.com> Date: Wed, 1 Jan 2025 14:56:56 -0600 Subject: [PATCH 4/4] actually use the things we build --- .../client/orchestration/_variables/client.py | 34 ++++++++++++++----- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/src/prefect/client/orchestration/_variables/client.py b/src/prefect/client/orchestration/_variables/client.py index dcdfed45a581..7381d29fa33f 100644 --- a/src/prefect/client/orchestration/_variables/client.py +++ b/src/prefect/client/orchestration/_variables/client.py @@ -38,7 +38,9 @@ def create_variable(self, variable: "VariableCreate") -> "Variable": def read_variable_by_name(self, name: str) -> "Variable | None": """Reads a variable by name. Returns None if no variable is found.""" try: - response = self._client.get(f"/variables/name/{name}") + response = self.request( + "GET", "/variables/name/{name}", path_params={"name": name} + ) from prefect.client.schemas.objects import Variable return Variable(**response.json()) @@ -50,7 +52,7 @@ def read_variable_by_name(self, name: str) -> "Variable | None": def read_variables(self, limit: int | None = None) -> list["Variable"]: """Reads all variables.""" - response = self._client.post("/variables/filter", json={"limit": limit}) + response = self.request("POST", "/variables/filter", json={"limit": limit}) from prefect.client.schemas.objects import Variable return Variable.model_validate_list(response.json()) @@ -73,7 +75,11 @@ def update_variable(self, variable: "VariableUpdate") -> None: def delete_variable_by_name(self, name: str) -> None: """Deletes a variable by name.""" try: - self._client.delete(f"/variables/name/{name}") + self.request( + "DELETE", + "/variables/name/{name}", + path_params={"name": name}, + ) return None except httpx.HTTPStatusError as e: if e.response.status_code == 404: @@ -96,7 +102,11 @@ async def create_variable(self, variable: "VariableCreate") -> "Variable": async def read_variable_by_name(self, name: str) -> "Variable | None": """Reads a variable by name. Returns None if no variable is found.""" try: - response = await self._client.get(f"/variables/name/{name}") + response = await self.request( + "GET", + "/variables/name/{name}", + path_params={"name": name}, + ) from prefect.client.schemas.objects import Variable return Variable.model_validate(response.json()) @@ -108,7 +118,9 @@ async def read_variable_by_name(self, name: str) -> "Variable | None": async def read_variables(self, limit: int | None = None) -> list["Variable"]: """Reads all variables.""" - response = await self._client.post("/variables/filter", json={"limit": limit}) + response = await self.request( + "POST", "/variables/filter", json={"limit": limit} + ) from prefect.client.schemas.objects import Variable return Variable.model_validate_list(response.json()) @@ -122,8 +134,10 @@ async def update_variable(self, variable: "VariableUpdate") -> None: Returns: Information about the updated variable. """ - await self._client.patch( - f"/variables/name/{variable.name}", + await self.request( + "PATCH", + "/variables/name/{name}", + path_params={"name": variable.name}, json=variable.model_dump(mode="json", exclude_unset=True), ) return None @@ -131,7 +145,11 @@ async def update_variable(self, variable: "VariableUpdate") -> None: async def delete_variable_by_name(self, name: str) -> None: """Deletes a variable by name.""" try: - await self._client.delete(f"/variables/name/{name}") + await self.request( + "DELETE", + "/variables/name/{name}", + path_params={"name": name}, + ) except httpx.HTTPStatusError as e: if e.response.status_code == 404: raise ObjectNotFound(http_exc=e) from e