Skip to content

Commit

Permalink
feat(apm): add custom trace span
Browse files Browse the repository at this point in the history
  • Loading branch information
OrenZhang committed Dec 15, 2024
1 parent 68390b2 commit fddfb3a
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 152 deletions.
23 changes: 16 additions & 7 deletions apps/chat/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
from django.contrib.auth import get_user_model
from django.shortcuts import get_object_or_404
from django.utils import timezone
from opentelemetry import trace
from opentelemetry.sdk.trace import Span
from opentelemetry.trace import SpanKind

from apps.chat.constants import OpenAIRole
from apps.chat.constants import OpenAIRole, SpanType
from apps.chat.models import AIModel, ChatLog

USER_MODEL = get_user_model()
Expand Down Expand Up @@ -37,18 +40,20 @@ def __init__(self, user: str, model: str, messages: List[dict], temperature: flo
model=self.model,
created_at=int(datetime.datetime.now().timestamp() * 1000),
)
self.tracer = trace.get_tracer(self.__class__.__name__)

async def chat(self, *args, **kwargs) -> any:
"""
Chat
"""

try:
async for text in self._chat(*args, **kwargs):
yield text
except Exception as e:
await self.record()
raise e
with self.start_span(SpanType.CHAT, SpanKind.SERVER):
try:
async for text in self._chat(*args, **kwargs):
yield text
except Exception as e:
await self.record()
raise e

@abc.abstractmethod
async def _chat(self, *args, **kwargs) -> any:
Expand All @@ -70,3 +75,7 @@ async def record(self, prompt_tokens: int = 0, completion_tokens: int = 0) -> No
# save
self.log.finished_at = int(timezone.now().timestamp() * 1000)
await database_sync_to_async(self.log.save)()

def start_span(self, name: str, kind: SpanKind, **kwargs) -> Span:
span: Span = self.tracer.start_as_current_span(name=name, kind=kind, **kwargs)
return span
56 changes: 32 additions & 24 deletions apps/chat/client/claude.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,16 @@
from django.conf import settings
from django.utils.translation import gettext
from httpx import Client
from opentelemetry.trace import SpanKind
from ovinc_client.core.logger import logger

from apps.chat.client.base import BaseClient
from apps.chat.constants import ClaudeMessageType, MessageContentType, OpenAIRole
from apps.chat.constants import (
ClaudeMessageType,
MessageContentType,
OpenAIRole,
SpanType,
)
from apps.chat.exceptions import FileExtractFailed, GenerateFailed


Expand All @@ -30,35 +36,37 @@ async def _chat(self, *args, **kwargs) -> any:
)
system, messages = self.parse_messages()
try:
response = client.messages.create(
max_tokens=settings.ANTHROPIC_MAX_TOKENS,
system=system,
messages=messages,
model=self.model,
temperature=self.temperature,
top_p=self.top_p,
stream=True,
timeout=settings.ANTHROPIC_TIMEOUT,
)
with self.start_span(SpanType.API, SpanKind.CLIENT):
response = client.messages.create(
max_tokens=settings.ANTHROPIC_MAX_TOKENS,
system=system,
messages=messages,
model=self.model,
temperature=self.temperature,
top_p=self.top_p,
stream=True,
timeout=settings.ANTHROPIC_TIMEOUT,
)
except Exception as err: # pylint: disable=W0718
logger.exception("[GenerateContentFailed] %s", err)
yield str(GenerateFailed())
response = []
prompt_tokens = 0
completion_tokens = 0
# pylint: disable=E1133
for chunk in response:
match chunk.type:
case ClaudeMessageType.MESSAGE_START:
chunk: RawMessageStartEvent
prompt_tokens = chunk.message.usage.input_tokens
self.log.chat_id = chunk.message.id
case ClaudeMessageType.MESSAGE_DELTA:
chunk: RawMessageDeltaEvent
completion_tokens = chunk.usage.output_tokens
case ClaudeMessageType.CONTENT_BLOCK_DELTA:
chunk: RawContentBlockDeltaEvent
yield chunk.delta.text
with self.start_span(SpanType.CHUNK, SpanKind.SERVER):
# pylint: disable=E1133
for chunk in response:
match chunk.type:
case ClaudeMessageType.MESSAGE_START:
chunk: RawMessageStartEvent
prompt_tokens = chunk.message.usage.input_tokens
self.log.chat_id = chunk.message.id
case ClaudeMessageType.MESSAGE_DELTA:
chunk: RawMessageDeltaEvent
completion_tokens = chunk.usage.output_tokens
case ClaudeMessageType.CONTENT_BLOCK_DELTA:
chunk: RawContentBlockDeltaEvent
yield chunk.delta.text
await self.record(prompt_tokens=prompt_tokens, completion_tokens=completion_tokens)

def parse_messages(self) -> (str, List[dict]):
Expand Down
37 changes: 20 additions & 17 deletions apps/chat/client/gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
from django.utils.translation import gettext
from httpx import Client
from openai import OpenAI
from opentelemetry.trace import SpanKind
from ovinc_client.core.logger import logger

from apps.chat.client.openai import BaseClient
from apps.chat.constants import MessageContentType
from apps.chat.constants import MessageContentType, SpanType
from apps.chat.exceptions import FileExtractFailed, GenerateFailed


Expand Down Expand Up @@ -40,28 +41,30 @@ def post_init(self) -> None:

async def _chat(self, *args, **kwargs) -> any:
try:
response = self.client.chat.completions.create(
model=self.model,
messages=self.messages,
temperature=self.temperature,
top_p=self.top_p,
stream=True,
timeout=settings.GEMINI_CHAT_TIMEOUT,
stream_options={"include_usage": True},
)
with self.start_span(SpanType.API, SpanKind.CLIENT):
response = self.client.chat.completions.create(
model=self.model,
messages=self.messages,
temperature=self.temperature,
top_p=self.top_p,
stream=True,
timeout=settings.GEMINI_CHAT_TIMEOUT,
stream_options={"include_usage": True},
)
except Exception as err: # pylint: disable=W0718
logger.exception("[GenerateContentFailed] %s", err)
yield str(GenerateFailed())
response = []
prompt_tokens = 0
completion_tokens = 0
# pylint: disable=E1133
for chunk in response:
if chunk.choices:
yield chunk.choices[0].delta.content or ""
if chunk.usage:
prompt_tokens = chunk.usage.prompt_tokens
completion_tokens = chunk.usage.completion_tokens
with self.start_span(SpanType.CHUNK, SpanKind.SERVER):
# pylint: disable=E1133
for chunk in response:
if chunk.choices:
yield chunk.choices[0].delta.content or ""
if chunk.usage:
prompt_tokens = chunk.usage.prompt_tokens
completion_tokens = chunk.usage.completion_tokens
await self.record(prompt_tokens=prompt_tokens, completion_tokens=completion_tokens)

def convert_url_to_base64(self, url: str) -> str:
Expand Down
66 changes: 37 additions & 29 deletions apps/chat/client/hunyuan.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import httpx
from django.conf import settings
from opentelemetry.trace import SpanKind
from ovinc_client.core.logger import logger
from rest_framework import status
from tencentcloud.common import credential
Expand All @@ -21,6 +22,7 @@
HunyuanLogoControl,
HunyuanReviseControl,
MessageContentType,
SpanType,
)
from apps.chat.exceptions import GenerateFailed, LoadImageFailed
from apps.chat.models import HunYuanChuck
Expand All @@ -36,7 +38,8 @@ class HunYuanClient(BaseClient):
async def _chat(self, *args, **kwargs) -> any:
# call hunyuan api
try:
response = self.call_api()
with self.start_span(SpanType.API, SpanKind.CLIENT):
response = self.call_api()
except Exception as err: # pylint: disable=W0718
logger.exception("[GenerateContentFailed] %s", err)
yield str(GenerateFailed())
Expand All @@ -45,13 +48,14 @@ async def _chat(self, *args, **kwargs) -> any:
prompt_tokens = 0
completion_tokens = 0
# explain completion
for chunk in response:
chunk = json.loads(chunk["data"])
chunk = HunYuanChuck.create(chunk)
self.log.chat_id = chunk.Id
prompt_tokens = chunk.Usage.PromptTokens
completion_tokens = chunk.Usage.CompletionTokens
yield chunk.Choices[0].Delta.Content
with self.start_span(SpanType.CHUNK, SpanKind.SERVER):
for chunk in response:
chunk = json.loads(chunk["data"])
chunk = HunYuanChuck.create(chunk)
self.log.chat_id = chunk.Id
prompt_tokens = chunk.Usage.PromptTokens
completion_tokens = chunk.Usage.CompletionTokens
yield chunk.Choices[0].Delta.Content
await self.record(prompt_tokens=prompt_tokens, completion_tokens=completion_tokens)

def call_api(self) -> models.ChatCompletionsResponse:
Expand Down Expand Up @@ -110,41 +114,45 @@ async def _chat(self, *args, **kwargs) -> any:
# call hunyuan api
try:
# submit job
response = self.call_api(client)
with self.start_span(SpanType.API, SpanKind.CLIENT):
response = self.call_api(client)
# wait for result
start_time = time.time()
while time.time() - start_time < settings.HUNYUAN_IMAGE_JOB_TIMEOUT:
result = self.call_result_api(client, response.JobId)
with self.start_span(SpanType.FETCH, SpanKind.CLIENT):
result = self.call_result_api(client, response.JobId)
# if not finished, continue loop
if result.JobStatusCode in [HunyuanJobStatusCode.RUNNING, HunyuanJobStatusCode.WAITING]:
yield ""
await asyncio.sleep(settings.HUNYUAN_IMAGE_JOB_INTERVAL)
continue
# if finished, check result
if result.JobStatusCode == HunyuanJobStatusCode.FINISHED:
await self.record(completion_tokens=len(result.ResultImage))
# all failed
if all(i != HUNYUAN_SUCCESS_DETAIL for i in result.ResultDetails):
yield str(GenerateFailed())
await self.record()
break
# record
self.log.chat_id = response.JobId
await self.record(completion_tokens=len(result.ResultImage))
# use first success picture
message_index = min(
index for (index, detail) in enumerate(result.ResultDetails) if detail == HUNYUAN_SUCCESS_DETAIL
)
message_url = result.ResultImage[message_index]
httpx_client = httpx.AsyncClient(http2=True)
image_resp = await httpx_client.get(message_url)
await httpx_client.aclose()
if image_resp.status_code != status.HTTP_200_OK:
raise LoadImageFailed()
url = await COSClient().put_object(
file=image_resp.content,
file_name=f"{uuid.uuid4().hex}.{image_resp.headers['content-type'].split('/')[-1]}",
)
yield f"![output]({TCloudUrlParser(url).url})"
with self.start_span(SpanType.CHUNK, SpanKind.SERVER):
# record
self.log.chat_id = response.JobId
# use first success picture
message_index = min(
index
for (index, detail) in enumerate(result.ResultDetails)
if detail == HUNYUAN_SUCCESS_DETAIL
)
message_url = result.ResultImage[message_index]
httpx_client = httpx.AsyncClient(http2=True)
image_resp = await httpx_client.get(message_url)
await httpx_client.aclose()
if image_resp.status_code != status.HTTP_200_OK:
raise LoadImageFailed()
url = await COSClient().put_object(
file=image_resp.content,
file_name=f"{uuid.uuid4().hex}.{image_resp.headers['content-type'].split('/')[-1]}",
)
yield f"![output]({TCloudUrlParser(url).url})"
else:
yield f"{result.JobErrorMsg}({result.JobErrorCode})"
await self.record()
Expand Down
36 changes: 20 additions & 16 deletions apps/chat/client/kimi.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

from django.conf import settings
from openai import OpenAI
from opentelemetry.trace import SpanKind
from ovinc_client.core.logger import logger

from apps.chat.client.base import BaseClient
from apps.chat.constants import SpanType
from apps.chat.exceptions import GenerateFailed


Expand All @@ -25,26 +27,28 @@ def __init__(self, user: str, model: str, messages: List[dict], temperature: flo

async def _chat(self, *args, **kwargs) -> any:
try:
response = self.client.chat.completions.create(
model=self.model,
messages=self.messages,
temperature=self.temperature,
top_p=self.top_p,
stream=True,
timeout=settings.KIMI_CHAT_TIMEOUT,
)
with self.start_span(SpanType.API, SpanKind.CLIENT):
response = self.client.chat.completions.create(
model=self.model,
messages=self.messages,
temperature=self.temperature,
top_p=self.top_p,
stream=True,
timeout=settings.KIMI_CHAT_TIMEOUT,
)
except Exception as err: # pylint: disable=W0718
logger.exception("[GenerateContentFailed] %s", err)
yield str(GenerateFailed())
response = []
prompt_tokens = 0
completion_tokens = 0
# pylint: disable=E1133
for chunk in response:
self.log.chat_id = chunk.id
usage = chunk.choices[0].model_extra.get("usage") or {}
if usage:
prompt_tokens = usage.get("prompt_tokens", prompt_tokens)
completion_tokens = usage.get("completion_tokens", completion_tokens)
yield chunk.choices[0].delta.content or ""
with self.start_span(SpanType.CHUNK, SpanKind.SERVER):
# pylint: disable=E1133
for chunk in response:
self.log.chat_id = chunk.id
usage = chunk.choices[0].model_extra.get("usage") or {}
if usage:
prompt_tokens = usage.get("prompt_tokens", prompt_tokens)
completion_tokens = usage.get("completion_tokens", completion_tokens)
yield chunk.choices[0].delta.content or ""
await self.record(prompt_tokens=prompt_tokens, completion_tokens=completion_tokens)
Loading

0 comments on commit fddfb3a

Please sign in to comment.