Skip to content

Commit

Permalink
feat(thread): use custom executor
Browse files Browse the repository at this point in the history
  • Loading branch information
OrenZhang committed Dec 2, 2024
1 parent 7c63c3e commit cf7bfb7
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 16 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
default_stages: [commit]
default_stages: [pre-commit]
repos:
- repo: https://github.com/asottile/pyupgrade
rev: v3.3.1
Expand Down
29 changes: 18 additions & 11 deletions apps/account/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
WeChatLoginReqSerializer,
)
from core.auth import ApplicationAuthenticate
from core.threadpool import db_executor
from core.utils import is_wechat

USER_MODEL: User = get_user_model()
Expand Down Expand Up @@ -81,7 +82,7 @@ async def sign_in(self, request, *args, **kwargs):
request_data = request_serializer.validated_data

# login
user: User = await database_sync_to_async(auth.authenticate)(request, **request_data)
user: User = await database_sync_to_async(auth.authenticate, executor=db_executor)(request, **request_data)
if not user:
raise WrongSignInParam()

Expand All @@ -90,7 +91,7 @@ async def sign_in(self, request, *args, **kwargs):
await self.update_user_by_wechat(user, request_data["wechat_code"])

# auth session
await database_sync_to_async(auth.login)(request, user)
await database_sync_to_async(auth.login, executor=db_executor)(request, user)

# oauth
if request_data["is_oauth"]:
Expand All @@ -104,7 +105,7 @@ async def sign_out(self, request, *args, **kwargs):
Sign out
"""

await database_sync_to_async(auth.logout)(request)
await database_sync_to_async(auth.logout, executor=db_executor)(request)
return Response()

@action(methods=["POST"], detail=False, authentication_classes=[SessionAuthenticate])
Expand All @@ -119,7 +120,7 @@ async def sign_up(self, request, *args, **kwargs):
request_data = request_serializer.validated_data

# save
user = await database_sync_to_async(USER_MODEL.objects.create_user)(
user = await database_sync_to_async(USER_MODEL.objects.create_user, executor=db_executor)(
last_login=datetime.datetime.now(),
username=request_data["username"],
password=request_data["password"],
Expand All @@ -132,7 +133,7 @@ async def sign_up(self, request, *args, **kwargs):
await self.update_user_by_wechat(user, request_data["wechat_code"])

# login session
await database_sync_to_async(auth.login)(request, user)
await database_sync_to_async(auth.login, executor=db_executor)(request, user)

# oauth
if request_data["is_oauth"]:
Expand Down Expand Up @@ -183,7 +184,9 @@ async def verify_code(self, request, *args, **kwargs):
request_data = request_serializer.validated_data

# load user
is_success, user = await database_sync_to_async(USER_MODEL.check_oauth_code)(request_data["code"])
is_success, user = await database_sync_to_async(USER_MODEL.check_oauth_code, executor=db_executor)(
request_data["code"]
)
if is_success:
return Response(await UserInfoSerializer(instance=user).adata)
raise WrongToken()
Expand Down Expand Up @@ -274,10 +277,12 @@ async def wechat_login(self, request, *args, **kwargs):
cache.set(cache_key, json.dumps(user_info, ensure_ascii=False), timeout=settings.WECHAT_SCOPE_TIMEOUT)

# load user
user: User = await database_sync_to_async(USER_MODEL.load_user_by_union_id)(union_id=user_info["unionid"])
user: User = await database_sync_to_async(USER_MODEL.load_user_by_union_id, executor=db_executor)(
union_id=user_info["unionid"]
)
if user:
await self.update_user_by_wechat(user, code)
await database_sync_to_async(auth.login)(request, user)
await database_sync_to_async(auth.login, executor=db_executor)(request, user)
return Response({"code": user.generate_oauth_code() if request_data["is_oauth"] else ""})

# need registry
Expand All @@ -299,7 +304,9 @@ async def update_user_by_wechat(self, user: User, wechat_code: str) -> None:
user.wechat_union_id = user_info["unionid"]
user.wechat_open_id = user_info["openid"]
user.avatar = user_info["headimgurl"]
await database_sync_to_async(user.save)(update_fields=["wechat_union_id", "wechat_open_id", "avatar"])
await database_sync_to_async(user.save, executor=db_executor)(
update_fields=["wechat_union_id", "wechat_open_id", "avatar"]
)

@action(methods=["POST"], detail=False, authentication_classes=[SessionAuthenticate])
async def reset_password(self, request, *args, **kwargs) -> Response:
Expand All @@ -314,14 +321,14 @@ async def reset_password(self, request, *args, **kwargs) -> Response:

# load user
try:
user: User = await database_sync_to_async(USER_MODEL.objects.get)(
user: User = await database_sync_to_async(USER_MODEL.objects.get, executor=db_executor)(
username=request_data["username"], phone_number=request_data["phone_number"]
)
except USER_MODEL.DoesNotExist as err:
raise UserNotExist() from err

# set new password
await database_sync_to_async(user.reset_password)(request_data["password"])
await database_sync_to_async(user.reset_password, executor=db_executor)(request_data["password"])

return Response()

Expand Down
3 changes: 2 additions & 1 deletion apps/notice/utils/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from apps.cel.tasks import send_notice
from apps.notice.constants import NoticeWayChoices
from apps.notice.models import NoticeLog
from core.threadpool import db_executor

USER_MODEL: User = get_user_model()

Expand Down Expand Up @@ -61,7 +62,7 @@ async def send(self) -> None:
msg = traceback.format_exc()
logger.error("[%s SendNoticeFailed] Err => %s; Detail => %s", self.__class__.__name__, err, msg)
result = {"err": str(err)}
await database_sync_to_async(NoticeLog.objects.create)(
await database_sync_to_async(NoticeLog.objects.create, executor=db_executor)(
receivers=self.receivers, content=self.content, extra_params=self.kwargs, result=str(result)
)
return result
Expand Down
5 changes: 4 additions & 1 deletion apps/notice/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)
from apps.notice.utils import NoticeBase
from core.auth import ApplicationAuthenticate
from core.threadpool import db_executor


class NoticeViewSet(MainViewSet):
Expand Down Expand Up @@ -80,7 +81,9 @@ async def registry_robot(self, request, *args, **kwargs):
# get instance
instance = None
if request.data.get("id"):
instance = await database_sync_to_async(get_object_or_404)(Robot, pk=request.data.pop("id"))
instance = await database_sync_to_async(get_object_or_404, executor=db_executor)(
Robot, pk=request.data.pop("id")
)

# validate request
request_serializer = RegistryRobotSerializer(instance=instance, data=request.data, partial=bool(instance))
Expand Down
5 changes: 4 additions & 1 deletion apps/tcloud/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from apps.notice.constants import NoticeWayChoices
from apps.tcloud.models import AuditCallback
from apps.tcloud.serializers import TCICallbackSerializer
from core.threadpool import db_executor


class AuditCallbackViewSet(MainViewSet):
Expand All @@ -27,7 +28,9 @@ async def create(self, request: Request, *args, **kwargs) -> Response:
req_slz = TCICallbackSerializer(data=request.data)
req_slz.is_valid(raise_exception=True)
# save
callback = await database_sync_to_async(AuditCallback.add_callback)(req_slz.validated_data)
callback = await database_sync_to_async(AuditCallback.add_callback, executor=db_executor)(
req_slz.validated_data
)
if callback.is_sensitive:
send_notice.delay(
notice_type=NoticeWayChoices.ROBOT,
Expand Down
3 changes: 2 additions & 1 deletion core/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from apps.application.models import Application
from core.constants import APP_AUTH_HEADER_KEY
from core.exceptions import AppAuthFailed
from core.threadpool import db_executor

USER_MODEL = get_user_model()

Expand All @@ -34,7 +35,7 @@ async def authenticate(self, request) -> (Application, None):
raise AppAuthFailed(gettext("App Auth Params Not Exist"))
# varify app
try:
app = await database_sync_to_async(Application.objects.get)(pk=app_code)
app = await database_sync_to_async(Application.objects.get, executor=db_executor)(pk=app_code)
except Application.DoesNotExist as err: # pylint: disable=E1101
raise AppAuthFailed(gettext("App Not Exist")) from err
# verify secret
Expand Down
5 changes: 5 additions & 0 deletions core/threadpool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from concurrent.futures.thread import ThreadPoolExecutor

from django.conf import settings

db_executor = ThreadPoolExecutor(max_workers=settings.DB_EXECUTOR_SIZE)
3 changes: 3 additions & 0 deletions entry/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,6 @@
CAPTCHA_APP_ID = int(os.getenv("CAPTCHA_APP_ID", "0"))
CAPTCHA_APP_SECRET = os.getenv("CAPTCHA_APP_SECRET", "")
CAPTCHA_APP_INFO_TIMEOUT = int(os.getenv("CAPTCHA_APP_INFO_TIMEOUT", str(60 * 10)))

# Thread
DB_EXECUTOR_SIZE = int(os.getenv("DB_EXECUTOR_SIZE", str(10)))

0 comments on commit cf7bfb7

Please sign in to comment.