Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Client's Artifact CRUD methods into Mixin #16496

Merged
merged 20 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/prefect/_internal/schemas/bases.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing_extensions import Self

from prefect.types import DateTime
from prefect.utilities.generics import validate_list

T = TypeVar("T")

Expand Down Expand Up @@ -56,6 +57,17 @@ def __eq__(self, other: Any) -> bool:
else:
return copy_dict == other

@classmethod
def model_validate_list(
cls,
obj: Any,
*,
strict: Optional[bool] = None,
from_attributes: Optional[bool] = None,
context: Optional[Any] = None,
) -> list[Self]:
return validate_list(cls, obj)

def __rich_repr__(self) -> RichReprResult:
# Display all of the fields in the model if they differ from the default value
for name, field in self.model_fields.items():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@
from starlette import status
from typing_extensions import ParamSpec, Self, TypeVar

from prefect.client.orchestration._artifacts.client import (
ArtifactClient,
ArtifactAsyncClient,
ArtifactCollectionClient,
ArtifactCollectionAsyncClient,
)

import prefect
import prefect.exceptions
import prefect.settings
import prefect.states
from prefect.client.constants import SERVER_API_VERSION
from prefect.client.schemas import FlowRun, OrchestrationResult, TaskRun, sorting
from prefect.client.schemas.actions import (
ArtifactCreate,
ArtifactUpdate,
BlockDocumentCreate,
BlockDocumentUpdate,
BlockSchemaCreate,
Expand Down Expand Up @@ -57,8 +62,6 @@
WorkQueueUpdate,
)
from prefect.client.schemas.filters import (
ArtifactCollectionFilter,
ArtifactFilter,
DeploymentFilter,
FlowFilter,
FlowRunFilter,
Expand All @@ -71,8 +74,6 @@
WorkQueueFilterName,
)
from prefect.client.schemas.objects import (
Artifact,
ArtifactCollection,
BlockDocument,
BlockSchema,
BlockType,
Expand Down Expand Up @@ -103,8 +104,6 @@
)
from prefect.client.schemas.schedules import SCHEDULE_TYPES
from prefect.client.schemas.sorting import (
ArtifactCollectionSort,
ArtifactSort,
DeploymentSort,
FlowRunSort,
FlowSort,
Expand Down Expand Up @@ -245,7 +244,7 @@ def get_client(
)


class PrefectClient:
class PrefectClient(ArtifactAsyncClient, ArtifactCollectionAsyncClient):
"""
An asynchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/).

Expand Down Expand Up @@ -2952,142 +2951,6 @@ async def get_scheduled_flow_runs_for_work_pool(
response.json()
)

async def create_artifact(
self,
artifact: ArtifactCreate,
) -> Artifact:
"""
Creates an artifact with the provided configuration.

Args:
artifact: Desired configuration for the new artifact.
Returns:
Information about the newly created artifact.
"""

response = await self._client.post(
"/artifacts/",
json=artifact.model_dump(mode="json", exclude_unset=True),
)

return Artifact.model_validate(response.json())

async def update_artifact(
self,
artifact_id: UUID,
artifact: ArtifactUpdate,
) -> None:
"""
Updates an artifact

Args:
artifact: Desired values for the updated artifact.
Returns:
Information about the updated artifact.
"""

await self._client.patch(
f"/artifacts/{artifact_id}",
json=artifact.model_dump(mode="json", exclude_unset=True),
)

async def read_artifacts(
self,
*,
artifact_filter: Optional[ArtifactFilter] = None,
flow_run_filter: Optional[FlowRunFilter] = None,
task_run_filter: Optional[TaskRunFilter] = None,
sort: Optional[ArtifactSort] = None,
limit: Optional[int] = None,
offset: int = 0,
) -> list[Artifact]:
"""
Query the Prefect API for artifacts. Only artifacts matching all criteria will
be returned.
Args:
artifact_filter: filter criteria for artifacts
flow_run_filter: filter criteria for flow runs
task_run_filter: filter criteria for task runs
sort: sort criteria for the artifacts
limit: limit for the artifact query
offset: offset for the artifact query
Returns:
a list of Artifact model representations of the artifacts
"""
body: dict[str, Any] = {
"artifacts": (
artifact_filter.model_dump(mode="json") if artifact_filter else None
),
"flow_runs": (
flow_run_filter.model_dump(mode="json") if flow_run_filter else None
),
"task_runs": (
task_run_filter.model_dump(mode="json") if task_run_filter else None
),
"sort": sort,
"limit": limit,
"offset": offset,
}
response = await self._client.post("/artifacts/filter", json=body)
return pydantic.TypeAdapter(list[Artifact]).validate_python(response.json())

async def read_latest_artifacts(
self,
*,
artifact_filter: Optional[ArtifactCollectionFilter] = None,
flow_run_filter: Optional[FlowRunFilter] = None,
task_run_filter: Optional[TaskRunFilter] = None,
sort: Optional[ArtifactCollectionSort] = None,
limit: Optional[int] = None,
offset: int = 0,
) -> list[ArtifactCollection]:
"""
Query the Prefect API for artifacts. Only artifacts matching all criteria will
be returned.
Args:
artifact_filter: filter criteria for artifacts
flow_run_filter: filter criteria for flow runs
task_run_filter: filter criteria for task runs
sort: sort criteria for the artifacts
limit: limit for the artifact query
offset: offset for the artifact query
Returns:
a list of Artifact model representations of the artifacts
"""
body: dict[str, Any] = {
"artifacts": (
artifact_filter.model_dump(mode="json") if artifact_filter else None
),
"flow_runs": (
flow_run_filter.model_dump(mode="json") if flow_run_filter else None
),
"task_runs": (
task_run_filter.model_dump(mode="json") if task_run_filter else None
),
"sort": sort,
"limit": limit,
"offset": offset,
}
response = await self._client.post("/artifacts/latest/filter", json=body)
return pydantic.TypeAdapter(list[ArtifactCollection]).validate_python(
response.json()
)

async def delete_artifact(self, artifact_id: UUID) -> None:
"""
Deletes an artifact with the provided id.

Args:
artifact_id: The id of the artifact to delete.
"""
try:
await self._client.delete(f"/artifacts/{artifact_id}")
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
else:
raise

async def create_variable(self, variable: VariableCreate) -> Variable:
"""
Creates an variable with the provided configuration.
Expand Down Expand Up @@ -3571,7 +3434,7 @@ def __exit__(self, *_: object) -> NoReturn:
assert False, "This should never be called but must be defined for __enter__"


class SyncPrefectClient:
class SyncPrefectClient(ArtifactClient, ArtifactCollectionClient):
"""
A synchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/).

Expand Down Expand Up @@ -4354,26 +4217,6 @@ def read_deployment_by_name(

return DeploymentResponse.model_validate(response.json())

def create_artifact(
self,
artifact: ArtifactCreate,
) -> Artifact:
"""
Creates an artifact with the provided configuration.

Args:
artifact: Desired configuration for the new artifact.
Returns:
Information about the newly created artifact.
"""

response = self._client.post(
"/artifacts/",
json=artifact.model_dump(mode="json", exclude_unset=True),
)

return Artifact.model_validate(response.json())

def release_concurrency_slots(
self, names: list[str], slots: int, occupancy_seconds: float
) -> httpx.Response:
Expand Down
Empty file.
Loading
Loading