From 2403d63fa6dc212ccba833629120ac97436c99be Mon Sep 17 00:00:00 2001 From: Lalith Kota Date: Mon, 11 Dec 2023 18:25:13 +0530 Subject: [PATCH 1/8] Modified GCTB according to change on Common G2PConnect Module --- .../services/id_translate_service.py | 9 +++- .../payment_backend.py | 51 +++++++++--------- .../payment_backend.py | 52 ++++++++++--------- .../services/g2p_connect_id_translate.py | 28 ++++++++-- 4 files changed, 86 insertions(+), 54 deletions(-) diff --git a/g2p-cash-transfer-bridge-core/src/g2p_cash_transfer_bridge_core/services/id_translate_service.py b/g2p-cash-transfer-bridge-core/src/g2p_cash_transfer_bridge_core/services/id_translate_service.py index acad929..2ea665c 100644 --- a/g2p-cash-transfer-bridge-core/src/g2p_cash_transfer_bridge_core/services/id_translate_service.py +++ b/g2p-cash-transfer-bridge-core/src/g2p_cash_transfer_bridge_core/services/id_translate_service.py @@ -4,7 +4,14 @@ class IdTranslateService(BaseService): - async def translate(self, ids: List[str]) -> List[str]: + async def translate(self, ids: List[str], **kwargs) -> List[str]: + """ + Get an ID and return it. + This method should be implemented in concrete subclasses. + """ + raise NotImplementedError() + + def translate_sync(self, ids: List[str], **kwargs) -> List[str]: """ Get an ID and return it. This method should be implemented in concrete subclasses. diff --git a/gctb-mojaloop-sdk-payment-backend/payment_backend.py b/gctb-mojaloop-sdk-payment-backend/payment_backend.py index 46a19a4..383b777 100755 --- a/gctb-mojaloop-sdk-payment-backend/payment_backend.py +++ b/gctb-mojaloop-sdk-payment-backend/payment_backend.py @@ -1,9 +1,8 @@ #!/usr/bin/env python3 # ruff: noqa: E402 -import asyncio import logging -from contextlib import asynccontextmanager +import time from datetime import datetime from typing import List @@ -18,12 +17,12 @@ IdTranslateService, ) from openg2p_fastapi_common.config import Settings as BaseSettings -from openg2p_fastapi_common.context import dbengine from openg2p_fastapi_common.service import BaseService +from openg2p_fastapi_common.utils.ctx_thread import CTXThread from pydantic import BaseModel from pydantic_settings import SettingsConfigDict -from sqlalchemy import and_, select -from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker +from sqlalchemy import and_, create_engine, select +from sqlalchemy.orm import Session class Settings(BaseSettings): @@ -71,6 +70,8 @@ class MojaloopSdkPaymentBackendService(BaseService): def __init__(self, name="", **kwargs): super().__init__(name if name else _config.payment_backend_name, **kwargs) self._id_translate_service = IdTranslateService.get_component() + self.disburse_loop_killed = False + self.disburse_loop_thread: CTXThread = None @property def id_translate_service(self): @@ -79,13 +80,14 @@ def id_translate_service(self): return self._id_translate_service def post_init(self): - asyncio.create_task(self.disburse_loop()) + self.disburse_loop_thread = CTXThread(target=self.disburse_loop) + self.disburse_loop_thread.start() - async def disburse_loop(self): - while True: + def disburse_loop(self): + while not self.disburse_loop_killed: db_response = [] - async_session_maker = async_sessionmaker(dbengine.get()) - async with async_session_maker() as session: + dbengine = create_engine(_config.db_datasource, echo=_config.db_logging) + with Session(dbengine, expire_on_commit=False) as session: stmt = select(PaymentListItem) if ( _config.dsbmt_loop_filter_backend_name @@ -117,31 +119,32 @@ async def disburse_loop(self): ) ) stmt = stmt.order_by(PaymentListItem.id.asc()) - result = await session.execute(stmt) + result = session.execute(stmt) db_response = list(result.scalars()) if db_response: _logger.info( "GCTB Mojaloop - processing payment from payment list." ) - await self.disburse(db_response, session) + self.disburse(db_response, session) else: _logger.info( "GCTB Mojaloop - no records found in payment list table." ) - await asyncio.sleep(_config.dsbmt_loop_interval_secs) + time.sleep(_config.dsbmt_loop_interval_secs) - async def disburse(self, payments: List[PaymentListItem], session: AsyncSession): + def disburse(self, payments: List[PaymentListItem], session: Session): for payment in payments: payee_acc_no = "" if _config.translate_id_to_fa: try: - payee_acc_no = await self.id_translate_service.translate( + payee_acc_no = self.id_translate_service.translate_sync( [ payment.to_fa, ], - max_retries=10, + loop_sleep=0, + max_retries=100, ) if payee_acc_no: payee_acc_no = payee_acc_no[0] @@ -158,9 +161,7 @@ async def disburse(self, payments: List[PaymentListItem], session: AsyncSession) }, "to": { "idType": _config.payee_id_type, - "idValue": await self.get_payee_id_value_from_payee_fa( - payee_acc_no - ), + "idValue": self.get_payee_id_value_from_payee_fa(payee_acc_no), }, "currency": payment.currency, "amount": float(payment.amount), @@ -189,9 +190,9 @@ async def disburse(self, payments: List[PaymentListItem], session: AsyncSession) payment.error_code = SingleDisburseStatusEnum.rjct_payment_failed payment.error_msg = "Mojaloop Payment Failed with unknown reason" - await session.commit() + session.commit() - async def get_payee_id_value_from_payee_fa(self, fa: str) -> str: + def get_payee_id_value_from_payee_fa(self, fa: str) -> str: return fa[fa.find(":") + 1 : fa.rfind("@")] @@ -208,11 +209,11 @@ def initialize(self, **kwargs): super().initialize(**kwargs) self.payment_backend = MojaloopSdkPaymentBackendService() - @asynccontextmanager - async def fastapi_app_lifespan(self, app: FastAPI): + async def fastapi_app_startup(self, app: FastAPI): self.payment_backend.post_init() - yield - await dbengine.get().dispose() + + async def fastapi_app_shutdown(self, app: FastAPI): + self.payment_backend.disburse_loop_killed = True if __name__ == "__main__": diff --git a/gctb-simple-mpesa-payment-backend/payment_backend.py b/gctb-simple-mpesa-payment-backend/payment_backend.py index 0179b74..791d19b 100755 --- a/gctb-simple-mpesa-payment-backend/payment_backend.py +++ b/gctb-simple-mpesa-payment-backend/payment_backend.py @@ -1,9 +1,8 @@ #!/usr/bin/env python3 # ruff: noqa: E402 -import asyncio import logging -from contextlib import asynccontextmanager +import time from datetime import datetime from typing import List @@ -18,12 +17,12 @@ IdTranslateService, ) from openg2p_fastapi_common.config import Settings as BaseSettings -from openg2p_fastapi_common.context import dbengine from openg2p_fastapi_common.service import BaseService +from openg2p_fastapi_common.utils.ctx_thread import CTXThread from pydantic import BaseModel from pydantic_settings import SettingsConfigDict -from sqlalchemy import and_, select -from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker +from sqlalchemy import and_, create_engine, select +from sqlalchemy.orm import Session class Settings(BaseSettings): @@ -68,8 +67,9 @@ class ReferenceIdStatus(BaseModel): class SimpleMpesaPaymentBackendService(BaseService): def __init__(self, name="", **kwargs): super().__init__(name if name else _config.payment_backend_name, **kwargs) - self._id_translate_service = IdTranslateService.get_component() + self.disburse_loop_killed = False + self.disburse_loop_thread: CTXThread = None @property def id_translate_service(self): @@ -78,13 +78,14 @@ def id_translate_service(self): return self._id_translate_service def post_init(self): - asyncio.create_task(self.disburse_loop()) + self.disburse_loop_thread = CTXThread(target=self.disburse_loop) + self.disburse_loop_thread.start() - async def disburse_loop(self): - while True: + def disburse_loop(self): + while not self.disburse_loop_killed: db_response = [] - async_session_maker = async_sessionmaker(dbengine.get()) - async with async_session_maker() as session: + dbengine = create_engine(_config.db_datasource, echo=_config.db_logging) + with Session(dbengine, expire_on_commit=False) as session: stmt = select(PaymentListItem) if ( _config.dsbmt_loop_filter_backend_name @@ -116,22 +117,22 @@ async def disburse_loop(self): ) ) stmt = stmt.order_by(PaymentListItem.id.asc()) - result = await session.execute(stmt) + result = session.execute(stmt) db_response = list(result.scalars()) if db_response: _logger.info( "GCTB Simple Mpesa - processing payment from payment list." ) - await self.disburse(db_response, session) + self.disburse(db_response, session) else: _logger.info( "GCTB Simple Mpesa - no records found in payment list table." ) - await asyncio.sleep(_config.dsbmt_loop_interval_secs) + time.sleep(_config.dsbmt_loop_interval_secs) - async def disburse(self, payments: List[PaymentListItem], session: AsyncSession): + def disburse(self, payments: List[PaymentListItem], session: Session): try: # from_fa field will be ignored in this payment_backend data = {"email": _config.agent_email, "password": _config.agent_password} @@ -152,18 +153,19 @@ async def disburse(self, payments: List[PaymentListItem], session: AsyncSession) payment.status = MsgStatusEnum.rjct payment.error_code = SingleDisburseStatusEnum.rjct_payment_failed payment.error_msg = "Mpesa Payment Failed during authentication" - await session.commit() + session.commit() return for payment in payments: payee_acc_no = "" if _config.translate_id_to_fa: try: - payee_acc_no = await self.id_translate_service.translate( + payee_acc_no = self.id_translate_service.translate_sync( [ payment.to_fa, ], - max_retries=10, + loop_sleep=0, + max_retries=100, ) if payee_acc_no: payee_acc_no = payee_acc_no[0] @@ -176,7 +178,7 @@ async def disburse(self, payments: List[PaymentListItem], session: AsyncSession) } data = { "amount": int(float(payment.amount)), - "accountNo": await self.get_account_no_from_payee_fa(payee_acc_no), + "accountNo": self.get_account_no_from_payee_fa(payee_acc_no), "customerType": _config.customer_type, } try: @@ -199,9 +201,9 @@ async def disburse(self, payments: List[PaymentListItem], session: AsyncSession) payment.error_code = SingleDisburseStatusEnum.rjct_payment_failed payment.error_msg = "Mpesa Payment Failed with unknown reason" - await session.commit() + session.commit() - async def get_account_no_from_payee_fa(self, fa: str) -> str: + def get_account_no_from_payee_fa(self, fa: str) -> str: return fa[fa.find(":") + 1 : fa.rfind(".")] @@ -218,11 +220,11 @@ def initialize(self, **kwargs): super().initialize(**kwargs) self.payment_backend = SimpleMpesaPaymentBackendService() - @asynccontextmanager - async def fastapi_app_lifespan(self, app: FastAPI): + async def fastapi_app_startup(self, app: FastAPI): self.payment_backend.post_init() - yield - await dbengine.get().dispose() + + async def fastapi_app_shutdown(self, app: FastAPI): + self.payment_backend.disburse_loop_killed = True if __name__ == "__main__": diff --git a/gctb-translate-id-fa/src/gctb_translate_id_fa/services/g2p_connect_id_translate.py b/gctb-translate-id-fa/src/gctb_translate_id_fa/services/g2p_connect_id_translate.py index a1f183a..26b242a 100644 --- a/gctb-translate-id-fa/src/gctb_translate_id_fa/services/g2p_connect_id_translate.py +++ b/gctb-translate-id-fa/src/gctb_translate_id_fa/services/g2p_connect_id_translate.py @@ -21,9 +21,31 @@ def id_mapper_service(self): self._id_mapper_service = IDMapperResolveService.get_component() return self._id_mapper_service - async def translate(self, ids: List[str], max_retries=10) -> List[str]: - res = await self.id_mapper_service.resolve_request_sync( - [MapperValue(id=id) for id in ids], max_retries=max_retries + async def translate( + self, ids: List[str], loop_sleep=1, max_retries=10 + ) -> List[str]: + res = await self.id_mapper_service.resolve_request( + [MapperValue(id=id) for id in ids], + loop_sleep=loop_sleep, + max_retries=max_retries, + ) + if not res: + raise BaseAppException( + "GCTB-IMS-300", + "ID Mapper Resolve Id: No response received", + ) + if not res.refs: + raise BaseAppException( + "G2P-IMS-301", + "ID Mapper Resolve Id: Invalid Txn without any requests received", + ) + return [res.refs[key].fa for key in res.refs] + + def translate_sync(self, ids: List[str], loop_sleep=1, max_retries=10) -> List[str]: + res = self.id_mapper_service.resolve_request_sync( + [MapperValue(id=id) for id in ids], + loop_sleep=loop_sleep, + max_retries=max_retries, ) if not res: raise BaseAppException( From 5aa5deb466a90d427855c723b434565bcd75599a Mon Sep 17 00:00:00 2001 From: Lalith Kota Date: Mon, 11 Dec 2023 23:07:43 +0530 Subject: [PATCH 2/8] Payment backends: Fixed problem with psycopg2 --- .../k8s-mojaloop-payment-backend.yaml | 2 ++ gctb-mojaloop-sdk-payment-backend/payment_backend.py | 4 ++++ .../k8s-simple-mpesa-payment-backend.yaml | 2 ++ gctb-simple-mpesa-payment-backend/payment_backend.py | 4 ++++ payment-backend.Dockerfile | 3 +++ 5 files changed, 15 insertions(+) diff --git a/gctb-mojaloop-sdk-payment-backend/k8s-mojaloop-payment-backend.yaml b/gctb-mojaloop-sdk-payment-backend/k8s-mojaloop-payment-backend.yaml index aa42aae..0c681eb 100644 --- a/gctb-mojaloop-sdk-payment-backend/k8s-mojaloop-payment-backend.yaml +++ b/gctb-mojaloop-sdk-payment-backend/k8s-mojaloop-payment-backend.yaml @@ -91,6 +91,8 @@ spec: secretKeyRef: key: password name: postgres-postgresql + - name: GCTB_ID_TRANSLATE_QUEUE_REDIS_HOST + value: gctb-redis-master volumes: - name: payment-backend-scripts configMap: diff --git a/gctb-mojaloop-sdk-payment-backend/payment_backend.py b/gctb-mojaloop-sdk-payment-backend/payment_backend.py index 383b777..4391b19 100755 --- a/gctb-mojaloop-sdk-payment-backend/payment_backend.py +++ b/gctb-mojaloop-sdk-payment-backend/payment_backend.py @@ -40,6 +40,7 @@ class Settings(BaseSettings): openapi_version: str = "0.1.0" payment_backend_name: str = "mojaloop" db_dbname: str = "gctbdb" + db_driver: str = "postgresql" dsbmt_loop_interval_secs: int = 10 dsbmt_loop_filter_backend_name: bool = True @@ -209,6 +210,9 @@ def initialize(self, **kwargs): super().initialize(**kwargs) self.payment_backend = MojaloopSdkPaymentBackendService() + def init_db(self): + pass + async def fastapi_app_startup(self, app: FastAPI): self.payment_backend.post_init() diff --git a/gctb-simple-mpesa-payment-backend/k8s-simple-mpesa-payment-backend.yaml b/gctb-simple-mpesa-payment-backend/k8s-simple-mpesa-payment-backend.yaml index 5c1ccd1..15f0ce3 100644 --- a/gctb-simple-mpesa-payment-backend/k8s-simple-mpesa-payment-backend.yaml +++ b/gctb-simple-mpesa-payment-backend/k8s-simple-mpesa-payment-backend.yaml @@ -87,6 +87,8 @@ spec: secretKeyRef: key: password name: postgres-postgresql + - name: GCTB_ID_TRANSLATE_QUEUE_REDIS_HOST + value: gctb-redis-master volumes: - name: payment-backend-scripts configMap: diff --git a/gctb-simple-mpesa-payment-backend/payment_backend.py b/gctb-simple-mpesa-payment-backend/payment_backend.py index 791d19b..33287db 100755 --- a/gctb-simple-mpesa-payment-backend/payment_backend.py +++ b/gctb-simple-mpesa-payment-backend/payment_backend.py @@ -40,6 +40,7 @@ class Settings(BaseSettings): openapi_version: str = "0.1.0" payment_backend_name: str = "mpesa" db_dbname: str = "gctbdb" + db_driver: str = "postgresql" dsbmt_loop_interval_secs: int = 10 dsbmt_loop_filter_backend_name: bool = True @@ -220,6 +221,9 @@ def initialize(self, **kwargs): super().initialize(**kwargs) self.payment_backend = SimpleMpesaPaymentBackendService() + def init_db(self): + pass + async def fastapi_app_startup(self, app: FastAPI): self.payment_backend.post_init() diff --git a/payment-backend.Dockerfile b/payment-backend.Dockerfile index 3c47d94..8f1abc4 100644 --- a/payment-backend.Dockerfile +++ b/payment-backend.Dockerfile @@ -11,6 +11,9 @@ RUN groupadd -g ${container_user_gid} ${container_user_group} \ WORKDIR /app +RUN install_packages libpq-dev \ + && apt-get clean && rm -rf /var/lib/apt/lists /var/cache/apt/archives + RUN chown -R ${container_user}:${container_user_group} /app USER ${container_user} From 9b06bdfd4fc7d9a238951881d0e75e3190ff957c Mon Sep 17 00:00:00 2001 From: Lalith Kota Date: Tue, 12 Dec 2023 00:12:32 +0530 Subject: [PATCH 3/8] Payment backends: Fixed wrong env vars --- .../k8s-mojaloop-payment-backend.yaml | 4 ++-- .../k8s-simple-mpesa-payment-backend.yaml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/gctb-mojaloop-sdk-payment-backend/k8s-mojaloop-payment-backend.yaml b/gctb-mojaloop-sdk-payment-backend/k8s-mojaloop-payment-backend.yaml index 0c681eb..7511ae2 100644 --- a/gctb-mojaloop-sdk-payment-backend/k8s-mojaloop-payment-backend.yaml +++ b/gctb-mojaloop-sdk-payment-backend/k8s-mojaloop-payment-backend.yaml @@ -34,6 +34,8 @@ spec: value: http://gctb-mojaloop-sdk-payment-backend.gctb/internal/callback/mapper - name: GCTB_ID_TRANSLATE_MAPPER_RESOLVE_URL value: http://mapper.spar/v0.1.0/mapper/resolve + - name: GCTB_ID_TRANSLATE_QUEUE_REDIS_HOST + value: gctb-redis-master - name: GCTB_MOJALOOP_SDK_TRANSFER_URL value: http://ml-simulators-sim-dfsp1-scheme-adapter.ml:4001/transfers - name: GCTB_MOJALOOP_SDK_PAYER_ID_VALUE @@ -91,8 +93,6 @@ spec: secretKeyRef: key: password name: postgres-postgresql - - name: GCTB_ID_TRANSLATE_QUEUE_REDIS_HOST - value: gctb-redis-master volumes: - name: payment-backend-scripts configMap: diff --git a/gctb-simple-mpesa-payment-backend/k8s-simple-mpesa-payment-backend.yaml b/gctb-simple-mpesa-payment-backend/k8s-simple-mpesa-payment-backend.yaml index 15f0ce3..75348d0 100644 --- a/gctb-simple-mpesa-payment-backend/k8s-simple-mpesa-payment-backend.yaml +++ b/gctb-simple-mpesa-payment-backend/k8s-simple-mpesa-payment-backend.yaml @@ -34,6 +34,8 @@ spec: value: http://gctb-simple-mpesa-payment-backend.gctb/internal/callback/mapper - name: GCTB_ID_TRANSLATE_MAPPER_RESOLVE_URL value: http://mapper.spar/v0.1.0/mapper/resolve + - name: GCTB_ID_TRANSLATE_QUEUE_REDIS_HOST + value: gctb-redis-master - name: GCTB_SIMPLE_MPESA_DB_PASSWORD valueFrom: secretKeyRef: @@ -87,8 +89,6 @@ spec: secretKeyRef: key: password name: postgres-postgresql - - name: GCTB_ID_TRANSLATE_QUEUE_REDIS_HOST - value: gctb-redis-master volumes: - name: payment-backend-scripts configMap: From cf2610780dc2f4a81b538a26c8053320371fdf05 Mon Sep 17 00:00:00 2001 From: Lalith Kota Date: Tue, 12 Dec 2023 15:26:43 +0530 Subject: [PATCH 4/8] Backends: Added health check and initiali delay for loop. Fixed multiple sessions issue. reduced readiness and liveness times --- .../k8s-mojaloop-payment-backend.yaml | 14 +++++----- .../payment_backend.py | 27 +++++++++++++------ .../k8s-simple-mpesa-payment-backend.yaml | 14 +++++----- .../payment_backend.py | 27 +++++++++++++------ 4 files changed, 52 insertions(+), 30 deletions(-) diff --git a/gctb-mojaloop-sdk-payment-backend/k8s-mojaloop-payment-backend.yaml b/gctb-mojaloop-sdk-payment-backend/k8s-mojaloop-payment-backend.yaml index 7511ae2..8bb8013 100644 --- a/gctb-mojaloop-sdk-payment-backend/k8s-mojaloop-payment-backend.yaml +++ b/gctb-mojaloop-sdk-payment-backend/k8s-mojaloop-payment-backend.yaml @@ -50,25 +50,25 @@ spec: name: http protocol: TCP livenessProbe: - failureThreshold: 5 + failureThreshold: 2 httpGet: path: /ping port: 8000 scheme: HTTP - initialDelaySeconds: 15 - periodSeconds: 60 + initialDelaySeconds: 5 + periodSeconds: 30 successThreshold: 1 timeoutSeconds: 1 readinessProbe: - failureThreshold: 20 + failureThreshold: 2 httpGet: path: /ping port: 8000 scheme: HTTP - initialDelaySeconds: 30 - periodSeconds: 30 + initialDelaySeconds: 2 + periodSeconds: 10 successThreshold: 1 - timeoutSeconds: 10 + timeoutSeconds: 1 volumeMounts: - name: payment-backend-scripts mountPath: /app/payment_backend diff --git a/gctb-mojaloop-sdk-payment-backend/payment_backend.py b/gctb-mojaloop-sdk-payment-backend/payment_backend.py index 4391b19..bfcbc2b 100755 --- a/gctb-mojaloop-sdk-payment-backend/payment_backend.py +++ b/gctb-mojaloop-sdk-payment-backend/payment_backend.py @@ -17,6 +17,7 @@ IdTranslateService, ) from openg2p_fastapi_common.config import Settings as BaseSettings +from openg2p_fastapi_common.errors import BaseAppException from openg2p_fastapi_common.service import BaseService from openg2p_fastapi_common.utils.ctx_thread import CTXThread from pydantic import BaseModel @@ -42,6 +43,7 @@ class Settings(BaseSettings): db_dbname: str = "gctbdb" db_driver: str = "postgresql" + dsbmt_loop_intial_delay_secs: int = 30 dsbmt_loop_interval_secs: int = 10 dsbmt_loop_filter_backend_name: bool = True dsbmt_loop_filter_status: List[str] = ["rcvd", "fail"] @@ -85,10 +87,11 @@ def post_init(self): self.disburse_loop_thread.start() def disburse_loop(self): - while not self.disburse_loop_killed: - db_response = [] - dbengine = create_engine(_config.db_datasource, echo=_config.db_logging) - with Session(dbengine, expire_on_commit=False) as session: + time.sleep(_config.dsbmt_loop_intial_delay_secs) + dbengine = create_engine(_config.db_datasource, echo=_config.db_logging) + with Session(dbengine, expire_on_commit=False) as session: + while not self.disburse_loop_killed: + db_response = [] stmt = select(PaymentListItem) if ( _config.dsbmt_loop_filter_backend_name @@ -132,8 +135,7 @@ def disburse_loop(self): _logger.info( "GCTB Mojaloop - no records found in payment list table." ) - - time.sleep(_config.dsbmt_loop_interval_secs) + time.sleep(_config.dsbmt_loop_interval_secs) def disburse(self, payments: List[PaymentListItem], session: Session): for payment in payments: @@ -202,13 +204,23 @@ def get_payee_id_value_from_payee_fa(self, fa: str) -> str: Initializer as G2pConnectMapperInitializer, ) from openg2p_fastapi_common.app import Initializer -from openg2p_fastapi_common.ping import PingInitializer +from openg2p_fastapi_common.ping import PingController + + +class PingController(PingController): + async def get_ping(self): + res = await super().get_ping() + payment_backend = MojaloopSdkPaymentBackendService.get_component() + if not payment_backend.disburse_loop_thread.is_alive(): + raise BaseAppException("GCTB-MLP-700", "Disbursement Loop is not running") + return res class PaymentBackendInitializer(Initializer): def initialize(self, **kwargs): super().initialize(**kwargs) self.payment_backend = MojaloopSdkPaymentBackendService() + PingController().post_init() def init_db(self): pass @@ -224,5 +236,4 @@ async def fastapi_app_shutdown(self, app: FastAPI): main_init = PaymentBackendInitializer() G2pConnectMapperInitializer() TranslateIdInitializer() - PingInitializer() main_init.main() diff --git a/gctb-simple-mpesa-payment-backend/k8s-simple-mpesa-payment-backend.yaml b/gctb-simple-mpesa-payment-backend/k8s-simple-mpesa-payment-backend.yaml index 75348d0..4e84d8f 100644 --- a/gctb-simple-mpesa-payment-backend/k8s-simple-mpesa-payment-backend.yaml +++ b/gctb-simple-mpesa-payment-backend/k8s-simple-mpesa-payment-backend.yaml @@ -46,25 +46,25 @@ spec: name: http protocol: TCP livenessProbe: - failureThreshold: 5 + failureThreshold: 2 httpGet: path: /ping port: 8000 scheme: HTTP - initialDelaySeconds: 15 - periodSeconds: 60 + initialDelaySeconds: 5 + periodSeconds: 30 successThreshold: 1 timeoutSeconds: 1 readinessProbe: - failureThreshold: 20 + failureThreshold: 2 httpGet: path: /ping port: 8000 scheme: HTTP - initialDelaySeconds: 30 - periodSeconds: 30 + initialDelaySeconds: 2 + periodSeconds: 10 successThreshold: 1 - timeoutSeconds: 10 + timeoutSeconds: 1 volumeMounts: - name: payment-backend-scripts mountPath: /app/payment_backend diff --git a/gctb-simple-mpesa-payment-backend/payment_backend.py b/gctb-simple-mpesa-payment-backend/payment_backend.py index 33287db..83edcab 100755 --- a/gctb-simple-mpesa-payment-backend/payment_backend.py +++ b/gctb-simple-mpesa-payment-backend/payment_backend.py @@ -17,6 +17,7 @@ IdTranslateService, ) from openg2p_fastapi_common.config import Settings as BaseSettings +from openg2p_fastapi_common.errors import BaseAppException from openg2p_fastapi_common.service import BaseService from openg2p_fastapi_common.utils.ctx_thread import CTXThread from pydantic import BaseModel @@ -42,6 +43,7 @@ class Settings(BaseSettings): db_dbname: str = "gctbdb" db_driver: str = "postgresql" + dsbmt_loop_intial_delay_secs: int = 30 dsbmt_loop_interval_secs: int = 10 dsbmt_loop_filter_backend_name: bool = True dsbmt_loop_filter_status: List[str] = ["rcvd", "fail"] @@ -83,10 +85,11 @@ def post_init(self): self.disburse_loop_thread.start() def disburse_loop(self): - while not self.disburse_loop_killed: - db_response = [] - dbengine = create_engine(_config.db_datasource, echo=_config.db_logging) - with Session(dbengine, expire_on_commit=False) as session: + time.sleep(_config.dsbmt_loop_intial_delay_secs) + dbengine = create_engine(_config.db_datasource, echo=_config.db_logging) + with Session(dbengine, expire_on_commit=False) as session: + while not self.disburse_loop_killed: + db_response = [] stmt = select(PaymentListItem) if ( _config.dsbmt_loop_filter_backend_name @@ -130,8 +133,7 @@ def disburse_loop(self): _logger.info( "GCTB Simple Mpesa - no records found in payment list table." ) - - time.sleep(_config.dsbmt_loop_interval_secs) + time.sleep(_config.dsbmt_loop_interval_secs) def disburse(self, payments: List[PaymentListItem], session: Session): try: @@ -213,13 +215,23 @@ def get_account_no_from_payee_fa(self, fa: str) -> str: Initializer as G2pConnectMapperInitializer, ) from openg2p_fastapi_common.app import Initializer -from openg2p_fastapi_common.ping import PingInitializer +from openg2p_fastapi_common.ping import PingController + + +class PingController(PingController): + async def get_ping(self): + res = await super().get_ping() + payment_backend = SimpleMpesaPaymentBackendService.get_component() + if not payment_backend.disburse_loop_thread.is_alive(): + raise BaseAppException("GCTB-MLP-700", "Disbursement Loop is not running") + return res class PaymentBackendInitializer(Initializer): def initialize(self, **kwargs): super().initialize(**kwargs) self.payment_backend = SimpleMpesaPaymentBackendService() + PingController().post_init() def init_db(self): pass @@ -235,5 +247,4 @@ async def fastapi_app_shutdown(self, app: FastAPI): main_init = PaymentBackendInitializer() G2pConnectMapperInitializer() TranslateIdInitializer() - PingInitializer() main_init.main() From 73ca7ae686fd8a1d4c5b80bef64dbf93e8b967b3 Mon Sep 17 00:00:00 2001 From: Lalith Kota Date: Thu, 21 Dec 2023 18:22:22 +0530 Subject: [PATCH 5/8] Fixed pq-dev package dependency in docker build --- Dockerfile | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Dockerfile b/Dockerfile index c6bedbc..7ea5eef 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,6 +10,9 @@ RUN groupadd -g ${container_user_gid} ${container_user_group} \ WORKDIR /app +RUN install_packages libpq-dev \ + && apt-get clean && rm -rf /var/lib/apt/lists /var/cache/apt/archives + RUN chown -R ${container_user}:${container_user_group} /app USER ${container_user} From 104721ee987a2e77f1d01f779076f3cdeed08417 Mon Sep 17 00:00:00 2001 From: Lalith Kota Date: Thu, 21 Dec 2023 23:41:04 +0530 Subject: [PATCH 6/8] Removed legacy mapping models --- .../src/g2p_cash_transfer_bridge_api/config.py | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/g2p-cash-transfer-bridge-api/src/g2p_cash_transfer_bridge_api/config.py b/g2p-cash-transfer-bridge-api/src/g2p_cash_transfer_bridge_api/config.py index 39f9184..0e60dc2 100644 --- a/g2p-cash-transfer-bridge-api/src/g2p_cash_transfer_bridge_api/config.py +++ b/g2p-cash-transfer-bridge-api/src/g2p_cash_transfer_bridge_api/config.py @@ -4,23 +4,6 @@ from pydantic import BaseModel, model_validator -class PayerFaPayeeFaMapping(BaseModel): - order: int - """ - Order of payer fa mapping - """ - - regex: str - """ - regex to match the payee fa - """ - - payer_fa: str - """ - Payer Fa , if the payee fa matches the given regex - """ - - class FaBackendMapping(BaseModel): order: int """ From 101eb370a204f7a8e9fb3ecf53dbdd280ec13b8c Mon Sep 17 00:00:00 2001 From: Lalith Kota Date: Thu, 21 Dec 2023 23:56:39 +0530 Subject: [PATCH 7/8] Added initial Db scripts --- db_scripts/0.1.0/ddl/01.types.sql | 0 db_scripts/0.1.0/ddl/02.tables.sql | 18 ++++++ db_scripts/0.1.0/ddl/03.sequences.sql | 9 +++ db_scripts/0.1.0/ddl/04.defaults.sql | 1 + db_scripts/0.1.0/ddl/05.constraints.sql | 2 + db_scripts/0.1.0/dml/01.data.sql | 0 db_scripts/README.md | 25 +++++++ db_scripts/deploy.sh | 86 +++++++++++++++++++++++++ 8 files changed, 141 insertions(+) create mode 100644 db_scripts/0.1.0/ddl/01.types.sql create mode 100644 db_scripts/0.1.0/ddl/02.tables.sql create mode 100644 db_scripts/0.1.0/ddl/03.sequences.sql create mode 100644 db_scripts/0.1.0/ddl/04.defaults.sql create mode 100644 db_scripts/0.1.0/ddl/05.constraints.sql create mode 100644 db_scripts/0.1.0/dml/01.data.sql create mode 100644 db_scripts/README.md create mode 100755 db_scripts/deploy.sh diff --git a/db_scripts/0.1.0/ddl/01.types.sql b/db_scripts/0.1.0/ddl/01.types.sql new file mode 100644 index 0000000..e69de29 diff --git a/db_scripts/0.1.0/ddl/02.tables.sql b/db_scripts/0.1.0/ddl/02.tables.sql new file mode 100644 index 0000000..b3ff41d --- /dev/null +++ b/db_scripts/0.1.0/ddl/02.tables.sql @@ -0,0 +1,18 @@ +CREATE TABLE payment_list ( + id integer NOT NULL, + batch_id character varying NOT NULL, + request_id character varying NOT NULL, + request_timestamp timestamp without time zone NOT NULL, + from_fa character varying, + to_fa character varying NOT NULL, + amount character varying NOT NULL, + currency character varying NOT NULL, + status character varying(4) NOT NULL, + file character varying, + error_code character varying(27), + error_msg character varying, + backend_name character varying, + created_at timestamp without time zone NOT NULL, + updated_at timestamp without time zone, + active boolean NOT NULL +); diff --git a/db_scripts/0.1.0/ddl/03.sequences.sql b/db_scripts/0.1.0/ddl/03.sequences.sql new file mode 100644 index 0000000..0c672bc --- /dev/null +++ b/db_scripts/0.1.0/ddl/03.sequences.sql @@ -0,0 +1,9 @@ +CREATE SEQUENCE payment_list_id_seq + AS integer + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + +ALTER SEQUENCE payment_list_id_seq OWNED BY payment_list.id; diff --git a/db_scripts/0.1.0/ddl/04.defaults.sql b/db_scripts/0.1.0/ddl/04.defaults.sql new file mode 100644 index 0000000..5190d77 --- /dev/null +++ b/db_scripts/0.1.0/ddl/04.defaults.sql @@ -0,0 +1 @@ +ALTER TABLE ONLY payment_list ALTER COLUMN id SET DEFAULT nextval('payment_list_id_seq'::regclass); diff --git a/db_scripts/0.1.0/ddl/05.constraints.sql b/db_scripts/0.1.0/ddl/05.constraints.sql new file mode 100644 index 0000000..df61f1d --- /dev/null +++ b/db_scripts/0.1.0/ddl/05.constraints.sql @@ -0,0 +1,2 @@ +ALTER TABLE ONLY payment_list + ADD CONSTRAINT payment_list_pkey PRIMARY KEY (id); diff --git a/db_scripts/0.1.0/dml/01.data.sql b/db_scripts/0.1.0/dml/01.data.sql new file mode 100644 index 0000000..e69de29 diff --git a/db_scripts/README.md b/db_scripts/README.md new file mode 100644 index 0000000..f41f9b4 --- /dev/null +++ b/db_scripts/README.md @@ -0,0 +1,25 @@ +## Database Initialization Scripts + +### PostgreSQL + +- Create a new role/user called "gctbuser" and create a new database called "gctbdb", + with "gctbuser" as the owner. + No need to run this step if Postgres was installed through openg2p's deployment script. + ```sql + CREATE ROLE gctbuser WITH LOGIN NOSUPERUSER CREATEDB CREATEROLE INHERIT REPLICATION CONNECTION LIMIT -1 PASSWORD 'xxxxxx'; + CREATE DATABASE gctbdb WITH OWNER = gctbuser CONNECTION LIMIT = -1; + ``` +- Then run + ```sh + DB_HOST="openg2p.sandbox.net" \ + DB_USER_PASSWORD="xxxxxx" \ + ./deploy.sh + ``` + - The following optional Env vars can also be passed: + - `VERSION="1.0.0"` Do not set this if you want latest version. + - `DB_PORT="5432"` Default is 5432. + - `DB_NAME="mydb"` Default is gctbdb. + - `DB_USER="myuser"` Default is gctbuser. + - `DEPLOY_DDL="false"` Default is true. If false, will not run DDL scripts. + - `DEPLOY_DML="false"` Default is true. If false, will not run DML scripts. + - `LOG_DB_QUERY="true"` Default is false. Logs all Db queries. diff --git a/db_scripts/deploy.sh b/db_scripts/deploy.sh new file mode 100755 index 0000000..3d29227 --- /dev/null +++ b/db_scripts/deploy.sh @@ -0,0 +1,86 @@ +#!/usr/bin/env bash + +echoerr() { + echo "$@" 1>&2 +} + +get_scripts_path() { + dirname "$0" +} + +get_default_version() { + basename $(ls -d1 $(get_scripts_path)/*/ | tail -n 1) +} + +execute_script_in_folder() { + folder_path=$1 + for file in $folder_path/* ; do + if [ -d "$file" ]; then + execute_script_in_folder $file + elif [[ $file == *.sh ]]; then + bash $file + elif [[ $file == *.sql || $file == *.psql ]]; then + if [[ $LOG_DB_QUERY == "true" ]]; then + PGPASSWORD="$DB_USER_PASSWORD" \ + psql \ + -h $DB_HOST \ + -p $DB_PORT \ + -d $DB_NAME \ + -U $DB_USER \ + -a -c "$(envsubst < $file)" + else + PGPASSWORD="$DB_USER_PASSWORD" \ + psql \ + -h $DB_HOST \ + -p $DB_PORT \ + -d $DB_NAME \ + -U $DB_USER \ + -c "$(envsubst < $file)" + fi + fi + done +} + +if [ -z "$VERSION" ]; then + export VERSION=$(get_default_version) +else + export VERSION="${VERSION%/}" +fi +if [ -z "$DB_HOST" ]; then + echoerr "DB_HOST not given!" + exit 1 +fi +if [ -z "$DB_PORT" ]; then + export DB_PORT=5432 +fi +if [ -z "$DB_NAME" ]; then + export DB_NAME="gctbdb" +fi +if [ -z "$DB_USER" ]; then + export DB_USER="gctbuser" +fi +if [ -z "$DB_USER_PASSWORD" ]; then + echoerr "DB_USER_PASSWORD not given!" + exit 1 +fi +if [ -z "$DEPLOY_DDL" ]; then + export DEPLOY_DDL="true" +fi +if [ -z "$DEPLOY_DML" ]; then + export DEPLOY_DML="true" +fi +if [ -z "$LOG_DB_QUERY" ]; then + export LOG_DB_QUERY="false" +fi + +if ! [ -d "$(get_scripts_path)/$VERSION" ]; then + echoerr "Given Version not found!" + exit 1; +fi + +if [[ "$DEPLOY_DDL" == "true" ]]; then + execute_script_in_folder $(get_scripts_path)/$VERSION/ddl +fi +if [[ "$DEPLOY_DML" == "true" ]]; then + execute_script_in_folder $(get_scripts_path)/$VERSION/dml +fi From 2e6fc19ae3e39284c36b7f6bd7084eb3fe39c9cc Mon Sep 17 00:00:00 2001 From: Lalith Kota Date: Fri, 22 Dec 2023 00:36:22 +0530 Subject: [PATCH 8/8] Added basic docs for gctb apis --- db_scripts/0.1.0/ddl/05.constraints.sql | 4 +-- db_scripts/deploy.sh | 2 +- .../controllers/disbursement_controller.py | 30 +++++++++++++++++++ 3 files changed, 33 insertions(+), 3 deletions(-) diff --git a/db_scripts/0.1.0/ddl/05.constraints.sql b/db_scripts/0.1.0/ddl/05.constraints.sql index df61f1d..fdbb45a 100644 --- a/db_scripts/0.1.0/ddl/05.constraints.sql +++ b/db_scripts/0.1.0/ddl/05.constraints.sql @@ -1,2 +1,2 @@ -ALTER TABLE ONLY payment_list - ADD CONSTRAINT payment_list_pkey PRIMARY KEY (id); +ALTER TABLE ONLY payment_list + ADD CONSTRAINT payment_list_pkey PRIMARY KEY (id); diff --git a/db_scripts/deploy.sh b/db_scripts/deploy.sh index 3d29227..d28ca13 100755 --- a/db_scripts/deploy.sh +++ b/db_scripts/deploy.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -echoerr() { +echoerr() { echo "$@" 1>&2 } diff --git a/g2p-cash-transfer-bridge-api/src/g2p_cash_transfer_bridge_api/controllers/disbursement_controller.py b/g2p-cash-transfer-bridge-api/src/g2p_cash_transfer_bridge_api/controllers/disbursement_controller.py index 4b46a66..f837856 100644 --- a/g2p-cash-transfer-bridge-api/src/g2p_cash_transfer_bridge_api/controllers/disbursement_controller.py +++ b/g2p-cash-transfer-bridge-api/src/g2p_cash_transfer_bridge_api/controllers/disbursement_controller.py @@ -42,6 +42,20 @@ def __init__(self, **kwargs): ) async def disburse_sync_disburse(self, request: DisburseHttpRequest): + """ + Make a disbursement request. (G2P Connect compliant API - sync). + - This API does NOT perform the entire disursement process synchronously. + It only receives the disbubrsement request and returns acknowledgement synchronously. + Use the status API to get the actual status of disbursement. + - The payee_fa field in message->disbursements[] can either be FA or ID of the payee, + depending on the bridge configuration. + - If bridge is configured to receive ID in payee_fa, then the bridge will translate ID + to FA using a G2P Connect ID Mapper before making payment + (Depends on the payment backend). + - The payer_fa field in message->disbursements[] is optional in this impl of bridge. + If payer_fa is not given, the bridge will take the default values configured + (Depends on the payment backend). + """ # Perform any extra validations here if not request.message.transaction_id: request.message.transaction_id = str(uuid.uuid4()) @@ -83,6 +97,22 @@ async def process_disbursement(): ) async def disburse_sync_txn_status(self, request: DisburseTxnStatusHttpRequest): + """ + Get status of a disbursement request. (G2P Connect compliant API - sync). + - The current supported value for txn_type in message->txnstatus_request is "disburse". + - The current supported values for attribute_type in message->txnstatus_request are + "transaction_id" and "reference_id_list". + - To get the status of a particular transaction, pass attribute_type as "transaction_id". + Then attribute_value in message->txnstatus_request expects a transaction id (string). + - To get the status of individual payments within transactions, pass attribute_type is + "reference_id_list". + Then attribute_value in message->txnstatus_request expects a list of reference + ids (payment ids, list of strings). + + Errors: + - Code: GCTB-PMS-350. HTTP: 400. Message: attribute_value is supposed to be a string. + - Code: GCTB-PMS-350. HTTP: 400. Message: attribute_value is supposed to be a list. + """ disburse_status_response = await self.payment_multiplexer.disbursement_status( request.message )