Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add job task service #11

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/spaceone/inventory_v2/interface/grpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
from .region import Region
from .collector import Collector
from .job import Job

# from .job_task import JobTask
from .job_task import JobTask

_all_ = ["app"]

app = GRPCServer()
app.add_service(Region)
app.add_service(Collector)
app.add_service(Job)
# app.add_service(JobTask)
app.add_service(JobTask)
39 changes: 39 additions & 0 deletions src/spaceone/inventory_v2/interface/grpc/job_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from spaceone.api.inventory_v2.v1 import job_task_pb2, job_task_pb2_grpc
from spaceone.core.pygrpc import BaseAPI

from spaceone.inventory_v2.service.job_task_service import JobTaskService


class JobTask(BaseAPI, job_task_pb2_grpc.JobTaskServicer):
pb2 = job_task_pb2
pb2_grpc = job_task_pb2_grpc

def delete(self, request, context):
params, metadata = self.parse_request(request, context)
job_task_svc = JobTaskService(metadata)
job_task_svc.delete(params)
return self.empty()

def get(self, request, context):
params, metadata = self.parse_request(request, context)
job_task_svc = JobTaskService(metadata)
response = job_task_svc.get(params)
return self.dict_to_message(response)

def get_detail(self, request, context):
params, metadata = self.parse_request(request, context)
job_task_svc = JobTaskService(metadata)
response = job_task_svc.get_detail(params)
return self.dict_to_message(response)

def list(self, request, context):
params, metadata = self.parse_request(request, context)
job_task_svc = JobTaskService(metadata)
response = job_task_svc.list(params)
return self.dict_to_message(response)

def stat(self, request, context):
params, metadata = self.parse_request(request, context)
job_task_svc = JobTaskService(metadata)
response = job_task_svc.stat(params)
return self.dict_to_message(response)
60 changes: 60 additions & 0 deletions src/spaceone/inventory_v2/manager/job_task_detail_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import logging
from typing import Tuple

from spaceone.core.manager import BaseManager
from spaceone.core.model.mongo_model import QuerySet

from spaceone.inventory_v2.model.job_task.database import JobTask, JobTaskDetail

_LOGGER = logging.getLogger(__name__)


class JobTaskDetailManager(BaseManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.job_task_detail_model = JobTaskDetail
self.job_task_model = JobTask

def create_job_task_detail_by_task_vo(self, job_task_vo: JobTask) -> JobTaskDetail:
def _rollback(vo: JobTaskDetail):
_LOGGER.info(f"[ROLLBACK] Delete job task detail: {vo.job_task_id}")
vo.delete()

params = {
"job_task_id": job_task_vo.job_task_id,
"job_id": job_task_vo.job_id,
}
job_task_detail_vo: JobTaskDetail = self.job_task_detail_model.create(params)

self.transaction.add_rollback(_rollback, job_task_vo)

return job_task_detail_vo

def get_job_task_detail(
self,
job_task_id: str,
domain_id: str,
workspace_id: str = None,
user_projects: list = None,
) -> JobTaskDetail:
conditions = {
"job_task_id": job_task_id,
"domain_id": domain_id,
}

if workspace_id:
conditions["workspace_id"] = workspace_id

if user_projects:
conditions["project_id"] = user_projects

return self.job_task_detail_model.get(**conditions)

def filter_job_task_details(self, **conditions) -> QuerySet:
return self.job_task_detail_model.filter(**conditions)

def list_job_task_details(self, query: dict) -> Tuple[QuerySet, int]:
return self.job_task_detail_model.query(**query)

def stat_job_task_details(self, query: dict) -> dict:
return self.job_task_detail_model.stat(**query)
24 changes: 18 additions & 6 deletions src/spaceone/inventory_v2/manager/job_task_manager.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
import copy
import logging
import json
from typing import Tuple, Union
from jsonschema import validate
from datetime import datetime

from spaceone.core import config, queue, utils
from spaceone.core.manager import BaseManager
from spaceone.core.scheduler.task_schema import SPACEONE_TASK_SCHEMA
from spaceone.core.model.mongo_model import QuerySet

from spaceone.inventory_v2.manager.cleanup_manager import CleanupManager
from spaceone.inventory_v2.manager.job_manager import JobManager

# from spaceone.inventory.manager.cleanup_manager import CleanupManager
from spaceone.inventory_v2.model.job_task.database import JobTask
from spaceone.inventory_v2.model.job_task.database import JobTask, JobTaskDetail

_LOGGER = logging.getLogger(__name__)

Expand All @@ -22,6 +19,7 @@ class JobTaskManager(BaseManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.job_task_model = JobTask
self.job_task_detail_model = JobTaskDetail

def create_job_task(self, params: dict) -> JobTask:
def _rollback(vo: JobTask):
Expand All @@ -32,7 +30,21 @@ def _rollback(vo: JobTask):
self.transaction.add_rollback(_rollback, job_task_vo)
return job_task_vo

def get(
def create_job_task_detail(self, job_task_vo: JobTask) -> JobTaskDetail:
def _rollback(vo: JobTaskDetail):
_LOGGER.info(f"[ROLLBACK] Delete job task detail: {vo.job_task_id}")
vo.delete()

params = {
"job_task_id": job_task_vo.job_task_id,
"job_id": job_task_vo.job_id,
}
job_task_detail_vo: JobTaskDetail = self.job_task_detail_model.create(params)
self.transaction.add_rollback(_rollback, job_task_vo)

return job_task_detail_vo

def get_job_task(
self,
job_task_id: str,
domain_id: str,
Expand Down
2 changes: 1 addition & 1 deletion src/spaceone/inventory_v2/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@
from spaceone.inventory_v2.model.metric_data.database import MetricData
from spaceone.inventory_v2.model.metric_example.database import MetricExample
from spaceone.inventory_v2.model.job.database import Job
from spaceone.inventory_v2.model.job_task.database import JobTask
from spaceone.inventory_v2.model.job_task.database import JobTask, JobTaskDetail
2 changes: 1 addition & 1 deletion src/spaceone/inventory_v2/model/collector/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,6 @@ class CollectorStatQueryRequest(BaseModel):

class CollectorCollectRequest(BaseModel):
collector_id: str
secret_id: str
secret_id: Union[str, None] = None
workspace_id: Union[list, str, None] = None
domain_id: str
43 changes: 33 additions & 10 deletions src/spaceone/inventory_v2/model/job_task/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,13 @@
from spaceone.core.model.mongo_model import MongoModel


class Error(EmbeddedDocument):
error_code = StringField()
message = StringField()
additional = DictField()


class JobTask(MongoModel):
job_task_id = StringField(max_length=40, generate_id="job-task", unique=True)
status = StringField(
max_length=20,
default="PENDING",
choices=("PENDING", "CANCELED", "IN_PROGRESS", "SUCCESS", "FAILURE"),
)
provider = StringField(max_length=40, default=None, null=True)
total_sub_tasks = IntField(default=0)
remained_sub_tasks = IntField(default=0)
created_count = IntField(default=0)
Expand All @@ -24,7 +17,6 @@ class JobTask(MongoModel):
disconnected_count = IntField(default=0)
failure_count = IntField(default=0)
total_count = IntField(default=0)
errors = ListField(EmbeddedDocumentField(Error, default=None, null=True))
job_id = StringField(max_length=40)
secret_id = StringField(max_length=40)
collector_id = StringField(max_length=40)
Expand All @@ -33,21 +25,21 @@ class JobTask(MongoModel):
workspace_id = StringField(max_length=40)
domain_id = StringField(max_length=40)
created_at = DateTimeField(auto_now_add=True)
updated_at = DateTimeField(auto_now=True)
started_at = DateTimeField(default=None, null=True)
finished_at = DateTimeField(default=None, null=True)

meta = {
"updatable_fields": [
"status",
"provider",
"remained_sub_tasks",
"created_count",
"updated_count",
"deleted_count",
"disconnected_count",
"failure_count",
"errors",
"started_at",
"updated_at",
"finished_at",
],
"minimal_fields": [
Expand Down Expand Up @@ -90,3 +82,34 @@ class JobTask(MongoModel):
"domain_id",
],
}


class JobTaskDetail(MongoModel):
job_task_id = StringField(max_length=40, unique_with=["job_task_id", "job_id"])
created_info = DictField(default=None, null=True)
updated_info = DictField(default=None, null=True)
failure_info = DictField(default=None, null=True)
deleted_info = DictField(default=None, null=True)
disconnected_info = DictField(default=None, null=True)
job_id = StringField(max_length=40)
project_id = StringField(max_length=40)
workspace_id = StringField(max_length=40)
domain_id = StringField(max_length=40)
created_at = DateTimeField(auto_now_add=True)

meta = {
"updatable_fields": [
"created_info",
"updated_info",
"deleted_info",
"disconnected_info",
],
"minimal_fields": ["job_task_id", "created_info", "job_id"],
"ordering": ["-created_at"],
"indexes": [
"job_task_id",
"job_id",
"project_id",
"workspace_id",
],
}
53 changes: 53 additions & 0 deletions src/spaceone/inventory_v2/model/job_task/request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from typing import Union, Literal, List
from pydantic import BaseModel

__all__ = [
"JobTaskDeleteRequest",
"JobTaskGetRequest",
"JobTaskGetDetailRequest",
"JobTaskSearchQueryRequest",
"JobTaskStatQueryRequest",
"Status",
]

Status = Literal["PENDING", "IN_PROGRESS", "SUCCESS", "FAILURE", "CANCELLED"]


class JobTaskDeleteRequest(BaseModel):
job_task_id: str
workspace_id: Union[str, None]
domain_id: str


class JobTaskGetRequest(BaseModel):
job_task_id: str
workspace_id: Union[str, None]
domain_id: str
user_projects: Union[List[str], None]


class JobTaskGetDetailRequest(BaseModel):
job_task_id: str
workspace_id: Union[str, None]
domain_id: str
user_projects: Union[List[str], None]


class JobTaskSearchQueryRequest(BaseModel):
query: dict
job_task_id: Union[str, None]
status: Union[Status, None]
provider: Union[str, None]
job_id: Union[str, None]
secret_id: Union[str, None]
service_account_id: Union[str, None]
project_id: Union[str, None]
workspace_id: Union[str, None]
domain_id: str


class JobTaskStatQueryRequest(BaseModel):
query: dict
workspace_id: Union[str, None] = None
domain_id: str
user_projects: Union[List[str], None] = None
62 changes: 62 additions & 0 deletions src/spaceone/inventory_v2/model/job_task/response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from datetime import datetime
from typing import Union, Literal, List
from pydantic import BaseModel

from spaceone.core import utils

from spaceone.inventory_v2.model.job_task.request import Status

__all__ = ["JobTaskResponse", "JobTasksResponse", "JobTaskDetailResponse"]


class JobTaskResponse(BaseModel):
job_task_id: Union[str, None] = None
status: Union[Status, None] = None
provider: Union[str, None] = None
created_count: Union[int, None] = None
updated_count: Union[int, None] = None
failure_count: Union[int, None] = None
deleted_count: Union[int, None] = None
disconnected_count: Union[int, None] = None
job_id: Union[str, None] = None
secret_id: Union[str, None] = None
service_account_id: Union[str, None] = None
collector_id: Union[str, None] = None
project_id: Union[str, None] = None
workspace_id: Union[str, None] = None
domain_id: Union[str, None] = None
created_at: Union[datetime, None] = None
started_at: Union[datetime, None] = None
updated_at: Union[datetime, None] = None
finished_at: Union[datetime, None] = None

def dict(self, *args, **kwargs):
data = super().dict(*args, **kwargs)
data["created_at"] = utils.datetime_to_iso8601(data["created_at"])
data["started_at"] = utils.datetime_to_iso8601(data.get("started_at"))
data["updated_at"] = utils.datetime_to_iso8601(data.get("updated_at"))
data["finished_at"] = utils.datetime_to_iso8601(data.get("finished_at"))
return data


class JobTaskDetailResponse(BaseModel):
job_task_id: Union[str, None] = None
created_info: Union[dict, None] = None
updated_info: Union[dict, None] = None
failure_info: Union[dict, None] = None
deleted_info: Union[dict, None] = None
disconnected_info: Union[dict, None] = None
job_id: Union[str, None] = None
project_id: Union[str, None] = None
workspace_id: Union[str, None] = None
domain_id: Union[str, None] = None
created_at: Union[datetime, None] = None

def dict(self, *args, **kwargs):
data = super().dict(*args, **kwargs)
data["created_at"] = utils.datetime_to_iso8601(data["created_at"])


class JobTasksResponse(BaseModel):
results: List[JobTaskResponse]
total_count: Union[int, None] = None
Loading
Loading