diff --git a/apps/chat/client/base.py b/apps/chat/client/base.py index 09a19f6..57ab072 100644 --- a/apps/chat/client/base.py +++ b/apps/chat/client/base.py @@ -6,8 +6,11 @@ from django.contrib.auth import get_user_model from django.shortcuts import get_object_or_404 from django.utils import timezone +from opentelemetry import trace +from opentelemetry.sdk.trace import Span +from opentelemetry.trace import SpanKind -from apps.chat.constants import OpenAIRole +from apps.chat.constants import OpenAIRole, SpanType from apps.chat.models import AIModel, ChatLog USER_MODEL = get_user_model() @@ -37,18 +40,20 @@ def __init__(self, user: str, model: str, messages: List[dict], temperature: flo model=self.model, created_at=int(datetime.datetime.now().timestamp() * 1000), ) + self.tracer = trace.get_tracer(self.__class__.__name__) async def chat(self, *args, **kwargs) -> any: """ Chat """ - try: - async for text in self._chat(*args, **kwargs): - yield text - except Exception as e: - await self.record() - raise e + with self.start_span(SpanType.CHAT, SpanKind.SERVER): + try: + async for text in self._chat(*args, **kwargs): + yield text + except Exception as e: + await self.record() + raise e @abc.abstractmethod async def _chat(self, *args, **kwargs) -> any: @@ -70,3 +75,7 @@ async def record(self, prompt_tokens: int = 0, completion_tokens: int = 0) -> No # save self.log.finished_at = int(timezone.now().timestamp() * 1000) await database_sync_to_async(self.log.save)() + + def start_span(self, name: str, kind: SpanKind, **kwargs) -> Span: + span: Span = self.tracer.start_as_current_span(name=name, kind=kind, **kwargs) + return span diff --git a/apps/chat/client/claude.py b/apps/chat/client/claude.py index 2c35040..8a28535 100644 --- a/apps/chat/client/claude.py +++ b/apps/chat/client/claude.py @@ -10,10 +10,16 @@ from django.conf import settings from django.utils.translation import gettext from httpx import Client +from opentelemetry.trace import SpanKind from ovinc_client.core.logger import logger from apps.chat.client.base import BaseClient -from apps.chat.constants import ClaudeMessageType, MessageContentType, OpenAIRole +from apps.chat.constants import ( + ClaudeMessageType, + MessageContentType, + OpenAIRole, + SpanType, +) from apps.chat.exceptions import FileExtractFailed, GenerateFailed @@ -30,35 +36,37 @@ async def _chat(self, *args, **kwargs) -> any: ) system, messages = self.parse_messages() try: - response = client.messages.create( - max_tokens=settings.ANTHROPIC_MAX_TOKENS, - system=system, - messages=messages, - model=self.model, - temperature=self.temperature, - top_p=self.top_p, - stream=True, - timeout=settings.ANTHROPIC_TIMEOUT, - ) + with self.start_span(SpanType.API, SpanKind.CLIENT): + response = client.messages.create( + max_tokens=settings.ANTHROPIC_MAX_TOKENS, + system=system, + messages=messages, + model=self.model, + temperature=self.temperature, + top_p=self.top_p, + stream=True, + timeout=settings.ANTHROPIC_TIMEOUT, + ) except Exception as err: # pylint: disable=W0718 logger.exception("[GenerateContentFailed] %s", err) yield str(GenerateFailed()) response = [] prompt_tokens = 0 completion_tokens = 0 - # pylint: disable=E1133 - for chunk in response: - match chunk.type: - case ClaudeMessageType.MESSAGE_START: - chunk: RawMessageStartEvent - prompt_tokens = chunk.message.usage.input_tokens - self.log.chat_id = chunk.message.id - case ClaudeMessageType.MESSAGE_DELTA: - chunk: RawMessageDeltaEvent - completion_tokens = chunk.usage.output_tokens - case ClaudeMessageType.CONTENT_BLOCK_DELTA: - chunk: RawContentBlockDeltaEvent - yield chunk.delta.text + with self.start_span(SpanType.CHUNK, SpanKind.SERVER): + # pylint: disable=E1133 + for chunk in response: + match chunk.type: + case ClaudeMessageType.MESSAGE_START: + chunk: RawMessageStartEvent + prompt_tokens = chunk.message.usage.input_tokens + self.log.chat_id = chunk.message.id + case ClaudeMessageType.MESSAGE_DELTA: + chunk: RawMessageDeltaEvent + completion_tokens = chunk.usage.output_tokens + case ClaudeMessageType.CONTENT_BLOCK_DELTA: + chunk: RawContentBlockDeltaEvent + yield chunk.delta.text await self.record(prompt_tokens=prompt_tokens, completion_tokens=completion_tokens) def parse_messages(self) -> (str, List[dict]): diff --git a/apps/chat/client/gemini.py b/apps/chat/client/gemini.py index 03656ce..fbce0eb 100644 --- a/apps/chat/client/gemini.py +++ b/apps/chat/client/gemini.py @@ -7,10 +7,11 @@ from django.utils.translation import gettext from httpx import Client from openai import OpenAI +from opentelemetry.trace import SpanKind from ovinc_client.core.logger import logger from apps.chat.client.openai import BaseClient -from apps.chat.constants import MessageContentType +from apps.chat.constants import MessageContentType, SpanType from apps.chat.exceptions import FileExtractFailed, GenerateFailed @@ -40,28 +41,30 @@ def post_init(self) -> None: async def _chat(self, *args, **kwargs) -> any: try: - response = self.client.chat.completions.create( - model=self.model, - messages=self.messages, - temperature=self.temperature, - top_p=self.top_p, - stream=True, - timeout=settings.GEMINI_CHAT_TIMEOUT, - stream_options={"include_usage": True}, - ) + with self.start_span(SpanType.API, SpanKind.CLIENT): + response = self.client.chat.completions.create( + model=self.model, + messages=self.messages, + temperature=self.temperature, + top_p=self.top_p, + stream=True, + timeout=settings.GEMINI_CHAT_TIMEOUT, + stream_options={"include_usage": True}, + ) except Exception as err: # pylint: disable=W0718 logger.exception("[GenerateContentFailed] %s", err) yield str(GenerateFailed()) response = [] prompt_tokens = 0 completion_tokens = 0 - # pylint: disable=E1133 - for chunk in response: - if chunk.choices: - yield chunk.choices[0].delta.content or "" - if chunk.usage: - prompt_tokens = chunk.usage.prompt_tokens - completion_tokens = chunk.usage.completion_tokens + with self.start_span(SpanType.CHUNK, SpanKind.SERVER): + # pylint: disable=E1133 + for chunk in response: + if chunk.choices: + yield chunk.choices[0].delta.content or "" + if chunk.usage: + prompt_tokens = chunk.usage.prompt_tokens + completion_tokens = chunk.usage.completion_tokens await self.record(prompt_tokens=prompt_tokens, completion_tokens=completion_tokens) def convert_url_to_base64(self, url: str) -> str: diff --git a/apps/chat/client/hunyuan.py b/apps/chat/client/hunyuan.py index 8495123..be7ee3e 100644 --- a/apps/chat/client/hunyuan.py +++ b/apps/chat/client/hunyuan.py @@ -8,6 +8,7 @@ import httpx from django.conf import settings +from opentelemetry.trace import SpanKind from ovinc_client.core.logger import logger from rest_framework import status from tencentcloud.common import credential @@ -21,6 +22,7 @@ HunyuanLogoControl, HunyuanReviseControl, MessageContentType, + SpanType, ) from apps.chat.exceptions import GenerateFailed, LoadImageFailed from apps.chat.models import HunYuanChuck @@ -36,7 +38,8 @@ class HunYuanClient(BaseClient): async def _chat(self, *args, **kwargs) -> any: # call hunyuan api try: - response = self.call_api() + with self.start_span(SpanType.API, SpanKind.CLIENT): + response = self.call_api() except Exception as err: # pylint: disable=W0718 logger.exception("[GenerateContentFailed] %s", err) yield str(GenerateFailed()) @@ -45,13 +48,14 @@ async def _chat(self, *args, **kwargs) -> any: prompt_tokens = 0 completion_tokens = 0 # explain completion - for chunk in response: - chunk = json.loads(chunk["data"]) - chunk = HunYuanChuck.create(chunk) - self.log.chat_id = chunk.Id - prompt_tokens = chunk.Usage.PromptTokens - completion_tokens = chunk.Usage.CompletionTokens - yield chunk.Choices[0].Delta.Content + with self.start_span(SpanType.CHUNK, SpanKind.SERVER): + for chunk in response: + chunk = json.loads(chunk["data"]) + chunk = HunYuanChuck.create(chunk) + self.log.chat_id = chunk.Id + prompt_tokens = chunk.Usage.PromptTokens + completion_tokens = chunk.Usage.CompletionTokens + yield chunk.Choices[0].Delta.Content await self.record(prompt_tokens=prompt_tokens, completion_tokens=completion_tokens) def call_api(self) -> models.ChatCompletionsResponse: @@ -110,11 +114,13 @@ async def _chat(self, *args, **kwargs) -> any: # call hunyuan api try: # submit job - response = self.call_api(client) + with self.start_span(SpanType.API, SpanKind.CLIENT): + response = self.call_api(client) # wait for result start_time = time.time() while time.time() - start_time < settings.HUNYUAN_IMAGE_JOB_TIMEOUT: - result = self.call_result_api(client, response.JobId) + with self.start_span(SpanType.FETCH, SpanKind.CLIENT): + result = self.call_result_api(client, response.JobId) # if not finished, continue loop if result.JobStatusCode in [HunyuanJobStatusCode.RUNNING, HunyuanJobStatusCode.WAITING]: yield "" @@ -122,29 +128,31 @@ async def _chat(self, *args, **kwargs) -> any: continue # if finished, check result if result.JobStatusCode == HunyuanJobStatusCode.FINISHED: + await self.record(completion_tokens=len(result.ResultImage)) # all failed if all(i != HUNYUAN_SUCCESS_DETAIL for i in result.ResultDetails): yield str(GenerateFailed()) - await self.record() break - # record - self.log.chat_id = response.JobId - await self.record(completion_tokens=len(result.ResultImage)) - # use first success picture - message_index = min( - index for (index, detail) in enumerate(result.ResultDetails) if detail == HUNYUAN_SUCCESS_DETAIL - ) - message_url = result.ResultImage[message_index] - httpx_client = httpx.AsyncClient(http2=True) - image_resp = await httpx_client.get(message_url) - await httpx_client.aclose() - if image_resp.status_code != status.HTTP_200_OK: - raise LoadImageFailed() - url = await COSClient().put_object( - file=image_resp.content, - file_name=f"{uuid.uuid4().hex}.{image_resp.headers['content-type'].split('/')[-1]}", - ) - yield f"![output]({TCloudUrlParser(url).url})" + with self.start_span(SpanType.CHUNK, SpanKind.SERVER): + # record + self.log.chat_id = response.JobId + # use first success picture + message_index = min( + index + for (index, detail) in enumerate(result.ResultDetails) + if detail == HUNYUAN_SUCCESS_DETAIL + ) + message_url = result.ResultImage[message_index] + httpx_client = httpx.AsyncClient(http2=True) + image_resp = await httpx_client.get(message_url) + await httpx_client.aclose() + if image_resp.status_code != status.HTTP_200_OK: + raise LoadImageFailed() + url = await COSClient().put_object( + file=image_resp.content, + file_name=f"{uuid.uuid4().hex}.{image_resp.headers['content-type'].split('/')[-1]}", + ) + yield f"![output]({TCloudUrlParser(url).url})" else: yield f"{result.JobErrorMsg}({result.JobErrorCode})" await self.record() diff --git a/apps/chat/client/kimi.py b/apps/chat/client/kimi.py index 38aed0c..a054934 100644 --- a/apps/chat/client/kimi.py +++ b/apps/chat/client/kimi.py @@ -4,9 +4,11 @@ from django.conf import settings from openai import OpenAI +from opentelemetry.trace import SpanKind from ovinc_client.core.logger import logger from apps.chat.client.base import BaseClient +from apps.chat.constants import SpanType from apps.chat.exceptions import GenerateFailed @@ -25,26 +27,28 @@ def __init__(self, user: str, model: str, messages: List[dict], temperature: flo async def _chat(self, *args, **kwargs) -> any: try: - response = self.client.chat.completions.create( - model=self.model, - messages=self.messages, - temperature=self.temperature, - top_p=self.top_p, - stream=True, - timeout=settings.KIMI_CHAT_TIMEOUT, - ) + with self.start_span(SpanType.API, SpanKind.CLIENT): + response = self.client.chat.completions.create( + model=self.model, + messages=self.messages, + temperature=self.temperature, + top_p=self.top_p, + stream=True, + timeout=settings.KIMI_CHAT_TIMEOUT, + ) except Exception as err: # pylint: disable=W0718 logger.exception("[GenerateContentFailed] %s", err) yield str(GenerateFailed()) response = [] prompt_tokens = 0 completion_tokens = 0 - # pylint: disable=E1133 - for chunk in response: - self.log.chat_id = chunk.id - usage = chunk.choices[0].model_extra.get("usage") or {} - if usage: - prompt_tokens = usage.get("prompt_tokens", prompt_tokens) - completion_tokens = usage.get("completion_tokens", completion_tokens) - yield chunk.choices[0].delta.content or "" + with self.start_span(SpanType.CHUNK, SpanKind.SERVER): + # pylint: disable=E1133 + for chunk in response: + self.log.chat_id = chunk.id + usage = chunk.choices[0].model_extra.get("usage") or {} + if usage: + prompt_tokens = usage.get("prompt_tokens", prompt_tokens) + completion_tokens = usage.get("completion_tokens", completion_tokens) + yield chunk.choices[0].delta.content or "" await self.record(prompt_tokens=prompt_tokens, completion_tokens=completion_tokens) diff --git a/apps/chat/client/midjourney.py b/apps/chat/client/midjourney.py index 861196f..c5e6b48 100644 --- a/apps/chat/client/midjourney.py +++ b/apps/chat/client/midjourney.py @@ -4,11 +4,12 @@ from django.conf import settings from httpx import AsyncClient +from opentelemetry.trace import SpanKind from ovinc_client.core.logger import logger from rest_framework import status from apps.chat.client.base import BaseClient -from apps.chat.constants import MidjourneyResult +from apps.chat.constants import MidjourneyResult, SpanType from apps.chat.exceptions import GenerateFailed, LoadImageFailed from apps.cos.client import COSClient from apps.cos.utils import TCloudUrlParser @@ -29,10 +30,11 @@ async def _chat(self, *args, **kwargs) -> any: ) # call midjourney api try: - # submit job - response = await client.post( - url=settings.MIDJOURNEY_IMAGINE_API_PATH, json={"prompt": self.messages[-1]["content"]} - ) + with self.start_span(SpanType.API, SpanKind.CLIENT): + # submit job + response = await client.post( + url=settings.MIDJOURNEY_IMAGINE_API_PATH, json={"prompt": self.messages[-1]["content"]} + ) result_id = response.json()["result"] # wait for result start_time = time.time() @@ -49,18 +51,19 @@ async def _chat(self, *args, **kwargs) -> any: yield str(result_data.get("failReason") or GenerateFailed()) await self.record() break - # record - await self.record(completion_tokens=1) - # use first success picture - message_url = result_data["imageUrl"] - image_resp = await client.get(message_url) - if image_resp.status_code != status.HTTP_200_OK: - raise LoadImageFailed() - url = await COSClient().put_object( - file=image_resp.content, - file_name=f"{uuid.uuid4().hex}.{image_resp.headers['content-type'].split('/')[-1]}", - ) - yield f"![output]({TCloudUrlParser(url).url})" + with self.start_span(SpanType.CHUNK, SpanKind.SERVER): + # record + await self.record(completion_tokens=1) + # use first success picture + message_url = result_data["imageUrl"] + image_resp = await client.get(message_url) + if image_resp.status_code != status.HTTP_200_OK: + raise LoadImageFailed() + url = await COSClient().put_object( + file=image_resp.content, + file_name=f"{uuid.uuid4().hex}.{image_resp.headers['content-type'].split('/')[-1]}", + ) + yield f"![output]({TCloudUrlParser(url).url})" break except Exception as err: # pylint: disable=W0718 logger.exception("[GenerateContentFailed] %s", err) diff --git a/apps/chat/client/openai.py b/apps/chat/client/openai.py index a62ec49..84dbf74 100644 --- a/apps/chat/client/openai.py +++ b/apps/chat/client/openai.py @@ -8,10 +8,12 @@ from django.conf import settings from httpx import AsyncClient, Client from openai import OpenAI +from opentelemetry.trace import SpanKind from ovinc_client.core.logger import logger from rest_framework import status from apps.chat.client.base import BaseClient +from apps.chat.constants import SpanType from apps.chat.exceptions import GenerateFailed, LoadImageFailed from apps.cos.client import COSClient from apps.cos.utils import TCloudUrlParser @@ -41,15 +43,16 @@ class OpenAIClient(OpenAIMixin, BaseClient): async def _chat(self, *args, **kwargs) -> any: try: - response = self.client.chat.completions.create( - model=self.model.replace(".", ""), - messages=self.messages, - temperature=self.temperature, - top_p=self.top_p, - stream=True, - timeout=settings.OPENAI_CHAT_TIMEOUT, - stream_options={"include_usage": True}, - ) + with self.start_span(SpanType.API, SpanKind.CLIENT): + response = self.client.chat.completions.create( + model=self.model.replace(".", ""), + messages=self.messages, + temperature=self.temperature, + top_p=self.top_p, + stream=True, + timeout=settings.OPENAI_CHAT_TIMEOUT, + stream_options={"include_usage": True}, + ) except Exception as err: # pylint: disable=W0718 logger.exception("[GenerateContentFailed] %s", err) yield str(GenerateFailed()) @@ -57,15 +60,16 @@ async def _chat(self, *args, **kwargs) -> any: content = "" prompt_tokens = 0 completion_tokens = 0 - # pylint: disable=E1133 - for chunk in response: - self.log.chat_id = chunk.id - if chunk.choices: - content += chunk.choices[0].delta.content or "" - yield chunk.choices[0].delta.content or "" - if chunk.usage: - prompt_tokens = chunk.usage.prompt_tokens - completion_tokens = chunk.usage.completion_tokens + with self.start_span(SpanType.CHUNK, SpanKind.SERVER): + # pylint: disable=E1133 + for chunk in response: + self.log.chat_id = chunk.id + if chunk.choices: + content += chunk.choices[0].delta.content or "" + yield chunk.choices[0].delta.content or "" + if chunk.usage: + prompt_tokens = chunk.usage.prompt_tokens + completion_tokens = chunk.usage.completion_tokens await self.record(prompt_tokens=prompt_tokens, completion_tokens=completion_tokens) @@ -76,31 +80,33 @@ class OpenAIVisionClient(OpenAIMixin, BaseClient): async def _chat(self, *args, **kwargs) -> any: try: - # noinspection PyTypeChecker - response = self.client.images.generate( - model=self.model.replace(".", ""), - prompt=self.messages[-1]["content"], - n=1, - size=self.model_inst.vision_size, - quality=self.model_inst.vision_quality, - style=self.model_inst.vision_style, - ) + with self.start_span(SpanType.API, SpanKind.CLIENT): + # noinspection PyTypeChecker + response = self.client.images.generate( + model=self.model.replace(".", ""), + prompt=self.messages[-1]["content"], + n=1, + size=self.model_inst.vision_size, + quality=self.model_inst.vision_quality, + style=self.model_inst.vision_style, + ) except Exception as err: # pylint: disable=W0718 logger.exception("[GenerateContentFailed] %s", err) yield str(GenerateFailed()) return - # record - await self.record(completion_tokens=1) - # image - if not settings.ENABLE_IMAGE_PROXY: - yield f"![{self.messages[-1]['content']}]({response.data[0].url})" - httpx_client = AsyncClient(http2=True, proxy=settings.OPENAI_HTTP_PROXY_URL) - image_resp = await httpx_client.get(response.data[0].url) - await httpx_client.aclose() - if image_resp.status_code != status.HTTP_200_OK: - raise LoadImageFailed() - url = await COSClient().put_object( - file=image_resp.content, - file_name=f"{uuid.uuid4().hex}.{urlparse(response.data[0].url).path.split('.')[-1]}", - ) - yield f"![output]({TCloudUrlParser(url).url})" + with self.start_span(SpanType.CHUNK, SpanKind.SERVER): + # record + await self.record(completion_tokens=1) + # image + if not settings.ENABLE_IMAGE_PROXY: + yield f"![{self.messages[-1]['content']}]({response.data[0].url})" + httpx_client = AsyncClient(http2=True, proxy=settings.OPENAI_HTTP_PROXY_URL) + image_resp = await httpx_client.get(response.data[0].url) + await httpx_client.aclose() + if image_resp.status_code != status.HTTP_200_OK: + raise LoadImageFailed() + url = await COSClient().put_object( + file=image_resp.content, + file_name=f"{uuid.uuid4().hex}.{urlparse(response.data[0].url).path.split('.')[-1]}", + ) + yield f"![output]({TCloudUrlParser(url).url})" diff --git a/apps/chat/constants.py b/apps/chat/constants.py index c7ca549..4432550 100644 --- a/apps/chat/constants.py +++ b/apps/chat/constants.py @@ -147,3 +147,14 @@ class ClaudeMessageType(TextChoices): MESSAGE_START = "message_start", gettext_lazy("Message Start") MESSAGE_DELTA = "message_delta", gettext_lazy("Message Delta") CONTENT_BLOCK_DELTA = "content_block_delta", gettext_lazy("Content Block Delta") + + +class SpanType(TextChoices): + """ + Span Type + """ + + API = "api", gettext_lazy("API") + CHUNK = "chunk", gettext_lazy("Chunk") + FETCH = "fetch", gettext_lazy("Fetch") + CHAT = "chat", gettext_lazy("Chat")