diff --git a/src/prefect/client/orchestration/__init__.py b/src/prefect/client/orchestration/__init__.py index 484944de1185..e288f2574bbd 100644 --- a/src/prefect/client/orchestration/__init__.py +++ b/src/prefect/client/orchestration/__init__.py @@ -25,6 +25,10 @@ ArtifactCollectionClient, ArtifactCollectionAsyncClient, ) +from prefect.client.orchestration._variables.client import ( + VariableClient, + VariableAsyncClient, +) import prefect import prefect.exceptions @@ -54,8 +58,6 @@ LogCreate, TaskRunCreate, TaskRunUpdate, - VariableCreate, - VariableUpdate, WorkPoolCreate, WorkPoolUpdate, WorkQueueCreate, @@ -89,7 +91,6 @@ Parameter, TaskRunPolicy, TaskRunResult, - Variable, Worker, WorkerMetadata, WorkPool, @@ -244,7 +245,11 @@ def get_client( ) -class PrefectClient(ArtifactAsyncClient, ArtifactCollectionAsyncClient): +class PrefectClient( + ArtifactAsyncClient, + ArtifactCollectionAsyncClient, + VariableAsyncClient, +): """ An asynchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/). @@ -2953,61 +2958,6 @@ async def get_scheduled_flow_runs_for_work_pool( response.json() ) - async def create_variable(self, variable: VariableCreate) -> Variable: - """ - Creates an variable with the provided configuration. - - Args: - variable: Desired configuration for the new variable. - Returns: - Information about the newly created variable. - """ - response = await self._client.post( - "/variables/", - json=variable.model_dump(mode="json", exclude_unset=True), - ) - return Variable(**response.json()) - - async def update_variable(self, variable: VariableUpdate) -> None: - """ - Updates a variable with the provided configuration. - - Args: - variable: Desired configuration for the updated variable. - Returns: - Information about the updated variable. - """ - await self._client.patch( - f"/variables/name/{variable.name}", - json=variable.model_dump(mode="json", exclude_unset=True), - ) - - async def read_variable_by_name(self, name: str) -> Optional[Variable]: - """Reads a variable by name. Returns None if no variable is found.""" - try: - response = await self._client.get(f"/variables/name/{name}") - return Variable(**response.json()) - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - return None - else: - raise - - async def delete_variable_by_name(self, name: str) -> None: - """Deletes a variable by name.""" - try: - await self._client.delete(f"/variables/name/{name}") - except httpx.HTTPStatusError as e: - if e.response.status_code == 404: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - - async def read_variables(self, limit: Optional[int] = None) -> list[Variable]: - """Reads all variables.""" - response = await self._client.post("/variables/filter", json={"limit": limit}) - return pydantic.TypeAdapter(list[Variable]).validate_python(response.json()) - async def read_worker_metadata(self) -> dict[str, Any]: """Reads worker metadata stored in Prefect collection registry.""" response = await self._client.get("collections/views/aggregate-worker-metadata") @@ -3436,7 +3386,11 @@ def __exit__(self, *_: object) -> NoReturn: assert False, "This should never be called but must be defined for __enter__" -class SyncPrefectClient(ArtifactClient, ArtifactCollectionClient): +class SyncPrefectClient( + ArtifactClient, + ArtifactCollectionClient, + VariableClient, +): """ A synchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/). @@ -4316,53 +4270,3 @@ def read_block_document_by_name( else: raise return BlockDocument.model_validate(response.json()) - - def create_variable(self, variable: VariableCreate) -> Variable: - """ - Creates an variable with the provided configuration. - - Args: - variable: Desired configuration for the new variable. - Returns: - Information about the newly created variable. - """ - response = self._client.post( - "/variables/", - json=variable.model_dump(mode="json", exclude_unset=True), - ) - return Variable(**response.json()) - - def update_variable(self, variable: VariableUpdate) -> None: - """ - Updates a variable with the provided configuration. - - Args: - variable: Desired configuration for the updated variable. - Returns: - Information about the updated variable. - """ - self._client.patch( - f"/variables/name/{variable.name}", - json=variable.model_dump(mode="json", exclude_unset=True), - ) - - def read_variable_by_name(self, name: str) -> Optional[Variable]: - """Reads a variable by name. Returns None if no variable is found.""" - try: - response = self._client.get(f"/variables/name/{name}") - return Variable(**response.json()) - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - return None - else: - raise - - def delete_variable_by_name(self, name: str) -> None: - """Deletes a variable by name.""" - try: - self._client.delete(f"/variables/name/{name}") - except httpx.HTTPStatusError as e: - if e.response.status_code == 404: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise diff --git a/src/prefect/client/orchestration/_variables/__init__.py b/src/prefect/client/orchestration/_variables/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/src/prefect/client/orchestration/_variables/client.py b/src/prefect/client/orchestration/_variables/client.py new file mode 100644 index 000000000000..7381d29fa33f --- /dev/null +++ b/src/prefect/client/orchestration/_variables/client.py @@ -0,0 +1,157 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import httpx + +from prefect.client.orchestration.base import BaseAsyncClient, BaseClient +from prefect.exceptions import ObjectNotFound + +if TYPE_CHECKING: + from prefect.client.schemas.actions import ( + VariableCreate, + VariableUpdate, + ) + from prefect.client.schemas.objects import ( + Variable, + ) + + +class VariableClient(BaseClient): + def create_variable(self, variable: "VariableCreate") -> "Variable": + """ + Creates an variable with the provided configuration. + + Args: + variable: Desired configuration for the new variable. + Returns: + Information about the newly created variable. + """ + response = self._client.post( + "/variables/", + json=variable.model_dump(mode="json", exclude_unset=True), + ) + from prefect.client.schemas.objects import Variable + + return Variable.model_validate(response.json()) + + def read_variable_by_name(self, name: str) -> "Variable | None": + """Reads a variable by name. Returns None if no variable is found.""" + try: + response = self.request( + "GET", "/variables/name/{name}", path_params={"name": name} + ) + from prefect.client.schemas.objects import Variable + + return Variable(**response.json()) + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + return None + else: + raise + + def read_variables(self, limit: int | None = None) -> list["Variable"]: + """Reads all variables.""" + response = self.request("POST", "/variables/filter", json={"limit": limit}) + from prefect.client.schemas.objects import Variable + + return Variable.model_validate_list(response.json()) + + def update_variable(self, variable: "VariableUpdate") -> None: + """ + Updates a variable with the provided configuration. + + Args: + variable: Desired configuration for the updated variable. + Returns: + Information about the updated variable. + """ + self._client.patch( + f"/variables/name/{variable.name}", + json=variable.model_dump(mode="json", exclude_unset=True), + ) + return None + + def delete_variable_by_name(self, name: str) -> None: + """Deletes a variable by name.""" + try: + self.request( + "DELETE", + "/variables/name/{name}", + path_params={"name": name}, + ) + return None + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + +class VariableAsyncClient(BaseAsyncClient): + async def create_variable(self, variable: "VariableCreate") -> "Variable": + """Creates a variable with the provided configuration.""" + response = await self._client.post( + "/variables/", + json=variable.model_dump(mode="json", exclude_unset=True), + ) + from prefect.client.schemas.objects import Variable + + return Variable.model_validate(response.json()) + + async def read_variable_by_name(self, name: str) -> "Variable | None": + """Reads a variable by name. Returns None if no variable is found.""" + try: + response = await self.request( + "GET", + "/variables/name/{name}", + path_params={"name": name}, + ) + from prefect.client.schemas.objects import Variable + + return Variable.model_validate(response.json()) + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + return None + else: + raise + + async def read_variables(self, limit: int | None = None) -> list["Variable"]: + """Reads all variables.""" + response = await self.request( + "POST", "/variables/filter", json={"limit": limit} + ) + from prefect.client.schemas.objects import Variable + + return Variable.model_validate_list(response.json()) + + async def update_variable(self, variable: "VariableUpdate") -> None: + """ + Updates a variable with the provided configuration. + + Args: + variable: Desired configuration for the updated variable. + Returns: + Information about the updated variable. + """ + await self.request( + "PATCH", + "/variables/name/{name}", + path_params={"name": variable.name}, + json=variable.model_dump(mode="json", exclude_unset=True), + ) + return None + + async def delete_variable_by_name(self, name: str) -> None: + """Deletes a variable by name.""" + try: + await self.request( + "DELETE", + "/variables/name/{name}", + path_params={"name": name}, + ) + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise