Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Variable CRUD methods in client #16564

Merged
merged 4 commits into from
Jan 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 14 additions & 110 deletions src/prefect/client/orchestration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
ArtifactCollectionClient,
ArtifactCollectionAsyncClient,
)
from prefect.client.orchestration._variables.client import (
VariableClient,
VariableAsyncClient,
)

import prefect
import prefect.exceptions
Expand Down Expand Up @@ -54,8 +58,6 @@
LogCreate,
TaskRunCreate,
TaskRunUpdate,
VariableCreate,
VariableUpdate,
WorkPoolCreate,
WorkPoolUpdate,
WorkQueueCreate,
Expand Down Expand Up @@ -89,7 +91,6 @@
Parameter,
TaskRunPolicy,
TaskRunResult,
Variable,
Worker,
WorkerMetadata,
WorkPool,
Expand Down Expand Up @@ -244,7 +245,11 @@ def get_client(
)


class PrefectClient(ArtifactAsyncClient, ArtifactCollectionAsyncClient):
class PrefectClient(
ArtifactAsyncClient,
ArtifactCollectionAsyncClient,
VariableAsyncClient,
):
"""
An asynchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/).

Expand Down Expand Up @@ -2953,61 +2958,6 @@ async def get_scheduled_flow_runs_for_work_pool(
response.json()
)

async def create_variable(self, variable: VariableCreate) -> Variable:
"""
Creates an variable with the provided configuration.

Args:
variable: Desired configuration for the new variable.
Returns:
Information about the newly created variable.
"""
response = await self._client.post(
"/variables/",
json=variable.model_dump(mode="json", exclude_unset=True),
)
return Variable(**response.json())

async def update_variable(self, variable: VariableUpdate) -> None:
"""
Updates a variable with the provided configuration.

Args:
variable: Desired configuration for the updated variable.
Returns:
Information about the updated variable.
"""
await self._client.patch(
f"/variables/name/{variable.name}",
json=variable.model_dump(mode="json", exclude_unset=True),
)

async def read_variable_by_name(self, name: str) -> Optional[Variable]:
"""Reads a variable by name. Returns None if no variable is found."""
try:
response = await self._client.get(f"/variables/name/{name}")
return Variable(**response.json())
except httpx.HTTPStatusError as e:
if e.response.status_code == status.HTTP_404_NOT_FOUND:
return None
else:
raise

async def delete_variable_by_name(self, name: str) -> None:
"""Deletes a variable by name."""
try:
await self._client.delete(f"/variables/name/{name}")
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
else:
raise

async def read_variables(self, limit: Optional[int] = None) -> list[Variable]:
"""Reads all variables."""
response = await self._client.post("/variables/filter", json={"limit": limit})
return pydantic.TypeAdapter(list[Variable]).validate_python(response.json())

async def read_worker_metadata(self) -> dict[str, Any]:
"""Reads worker metadata stored in Prefect collection registry."""
response = await self._client.get("collections/views/aggregate-worker-metadata")
Expand Down Expand Up @@ -3436,7 +3386,11 @@ def __exit__(self, *_: object) -> NoReturn:
assert False, "This should never be called but must be defined for __enter__"


class SyncPrefectClient(ArtifactClient, ArtifactCollectionClient):
class SyncPrefectClient(
ArtifactClient,
ArtifactCollectionClient,
VariableClient,
):
"""
A synchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/).

Expand Down Expand Up @@ -4316,53 +4270,3 @@ def read_block_document_by_name(
else:
raise
return BlockDocument.model_validate(response.json())

def create_variable(self, variable: VariableCreate) -> Variable:
"""
Creates an variable with the provided configuration.

Args:
variable: Desired configuration for the new variable.
Returns:
Information about the newly created variable.
"""
response = self._client.post(
"/variables/",
json=variable.model_dump(mode="json", exclude_unset=True),
)
return Variable(**response.json())

def update_variable(self, variable: VariableUpdate) -> None:
"""
Updates a variable with the provided configuration.

Args:
variable: Desired configuration for the updated variable.
Returns:
Information about the updated variable.
"""
self._client.patch(
f"/variables/name/{variable.name}",
json=variable.model_dump(mode="json", exclude_unset=True),
)

def read_variable_by_name(self, name: str) -> Optional[Variable]:
"""Reads a variable by name. Returns None if no variable is found."""
try:
response = self._client.get(f"/variables/name/{name}")
return Variable(**response.json())
except httpx.HTTPStatusError as e:
if e.response.status_code == status.HTTP_404_NOT_FOUND:
return None
else:
raise

def delete_variable_by_name(self, name: str) -> None:
"""Deletes a variable by name."""
try:
self._client.delete(f"/variables/name/{name}")
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
else:
raise
Empty file.
157 changes: 157 additions & 0 deletions src/prefect/client/orchestration/_variables/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from __future__ import annotations

from typing import TYPE_CHECKING

import httpx

from prefect.client.orchestration.base import BaseAsyncClient, BaseClient
from prefect.exceptions import ObjectNotFound

if TYPE_CHECKING:
from prefect.client.schemas.actions import (
VariableCreate,
VariableUpdate,
)
from prefect.client.schemas.objects import (
Variable,
)


class VariableClient(BaseClient):
def create_variable(self, variable: "VariableCreate") -> "Variable":
"""
Creates an variable with the provided configuration.

Args:
variable: Desired configuration for the new variable.
Returns:
Information about the newly created variable.
"""
response = self._client.post(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to use self.request for type safety here and in the other methods?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woof, yep!

"/variables/",
json=variable.model_dump(mode="json", exclude_unset=True),
)
from prefect.client.schemas.objects import Variable

return Variable.model_validate(response.json())

def read_variable_by_name(self, name: str) -> "Variable | None":
"""Reads a variable by name. Returns None if no variable is found."""
try:
response = self.request(
"GET", "/variables/name/{name}", path_params={"name": name}
)
from prefect.client.schemas.objects import Variable

return Variable(**response.json())
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return None
else:
raise

def read_variables(self, limit: int | None = None) -> list["Variable"]:
"""Reads all variables."""
response = self.request("POST", "/variables/filter", json={"limit": limit})
from prefect.client.schemas.objects import Variable

return Variable.model_validate_list(response.json())

def update_variable(self, variable: "VariableUpdate") -> None:
"""
Updates a variable with the provided configuration.

Args:
variable: Desired configuration for the updated variable.
Returns:
Information about the updated variable.
"""
self._client.patch(
f"/variables/name/{variable.name}",
json=variable.model_dump(mode="json", exclude_unset=True),
)
return None

def delete_variable_by_name(self, name: str) -> None:
"""Deletes a variable by name."""
try:
self.request(
"DELETE",
"/variables/name/{name}",
path_params={"name": name},
)
return None
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
raise ObjectNotFound(http_exc=e) from e
else:
raise


class VariableAsyncClient(BaseAsyncClient):
async def create_variable(self, variable: "VariableCreate") -> "Variable":
"""Creates a variable with the provided configuration."""
response = await self._client.post(
"/variables/",
json=variable.model_dump(mode="json", exclude_unset=True),
)
from prefect.client.schemas.objects import Variable

return Variable.model_validate(response.json())

async def read_variable_by_name(self, name: str) -> "Variable | None":
"""Reads a variable by name. Returns None if no variable is found."""
try:
response = await self.request(
"GET",
"/variables/name/{name}",
path_params={"name": name},
)
from prefect.client.schemas.objects import Variable

return Variable.model_validate(response.json())
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return None
else:
raise

async def read_variables(self, limit: int | None = None) -> list["Variable"]:
"""Reads all variables."""
response = await self.request(
"POST", "/variables/filter", json={"limit": limit}
)
from prefect.client.schemas.objects import Variable

return Variable.model_validate_list(response.json())

async def update_variable(self, variable: "VariableUpdate") -> None:
"""
Updates a variable with the provided configuration.

Args:
variable: Desired configuration for the updated variable.
Returns:
Information about the updated variable.
"""
await self.request(
"PATCH",
"/variables/name/{name}",
path_params={"name": variable.name},
json=variable.model_dump(mode="json", exclude_unset=True),
)
return None

async def delete_variable_by_name(self, name: str) -> None:
"""Deletes a variable by name."""
try:
await self.request(
"DELETE",
"/variables/name/{name}",
path_params={"name": name},
)
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
raise ObjectNotFound(http_exc=e) from e
else:
raise
Loading