diff --git a/backend/onyx/background/celery/configs/monitoring.py b/backend/onyx/background/celery/configs/monitoring.py index 90f7b889cce..5116fbd8c0a 100644 --- a/backend/onyx/background/celery/configs/monitoring.py +++ b/backend/onyx/background/celery/configs/monitoring.py @@ -17,5 +17,5 @@ # Monitoring worker specific settings worker_concurrency = 1 # Single worker is sufficient for monitoring -worker_pool = "solo" +worker_pool = "threads" worker_prefetch_multiplier = 1 diff --git a/backend/onyx/background/celery/tasks/monitoring/tasks.py b/backend/onyx/background/celery/tasks/monitoring/tasks.py index a78939ef036..d087f17440b 100644 --- a/backend/onyx/background/celery/tasks/monitoring/tasks.py +++ b/backend/onyx/background/celery/tasks/monitoring/tasks.py @@ -5,16 +5,18 @@ from celery import shared_task from celery import Task +from celery.exceptions import SoftTimeLimitExceeded from pydantic import BaseModel from redis import Redis +from redis.lock import Lock as RedisLock from sqlalchemy import select from sqlalchemy.orm import Session from onyx.background.celery.apps.app_base import task_logger from onyx.background.celery.tasks.vespa.tasks import celery_get_queue_length -from onyx.configs.app_configs import JOB_TIMEOUT from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from onyx.configs.constants import OnyxRedisLocks from onyx.db.engine import get_db_current_time from onyx.db.engine import get_session_with_tenant from onyx.db.enums import IndexingStatus @@ -28,6 +30,8 @@ from onyx.utils.telemetry import optional_telemetry from onyx.utils.telemetry import RecordType +_MONITORING_SOFT_TIME_LIMIT = 60 * 5 # 5 minutes +_MONITORING_TIME_LIMIT = _MONITORING_SOFT_TIME_LIMIT + 60 # 6 minutes _CONNECTOR_INDEX_ATTEMPT_START_LATENCY_KEY_FMT = ( "monitoring_connector_index_attempt_start_latency:{cc_pair_id}:{index_attempt_id}" @@ -385,7 +389,8 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric] @shared_task( name=OnyxCeleryTask.MONITOR_BACKGROUND_PROCESSES, - soft_time_limit=JOB_TIMEOUT, + soft_time_limit=_MONITORING_SOFT_TIME_LIMIT, + time_limit=_MONITORING_TIME_LIMIT, queue=OnyxCeleryQueues.MONITORING, bind=True, ) @@ -397,7 +402,18 @@ def monitor_background_processes(self: Task, *, tenant_id: str | None) -> None: - Syncing speed metrics - Worker status and task counts """ - task_logger.info("Starting background process monitoring") + task_logger.info("Starting background monitoring") + r = get_redis_client(tenant_id=tenant_id) + + lock_monitoring: RedisLock = r.lock( + OnyxRedisLocks.MONITOR_BACKGROUND_PROCESSES_LOCK, + timeout=_MONITORING_SOFT_TIME_LIMIT, + ) + + # these tasks should never overlap + if not lock_monitoring.acquire(blocking=False): + task_logger.info("Skipping monitoring task because it is already running") + return None try: # Get Redis client for Celery broker @@ -420,8 +436,16 @@ def monitor_background_processes(self: Task, *, tenant_id: str | None) -> None: if metric.key: _mark_metric_as_emitted(redis_std, metric.key) - task_logger.info("Successfully collected background process metrics") - + task_logger.info("Successfully collected background metrics") + except SoftTimeLimitExceeded: + task_logger.info( + "Soft time limit exceeded, task is being terminated gracefully." + ) except Exception as e: task_logger.exception("Error collecting background process metrics") raise e + finally: + if lock_monitoring.owned(): + lock_monitoring.release() + + task_logger.info("Background monitoring task finished") diff --git a/backend/onyx/configs/constants.py b/backend/onyx/configs/constants.py index a3d21bdc724..9f8cb97174b 100644 --- a/backend/onyx/configs/constants.py +++ b/backend/onyx/configs/constants.py @@ -278,6 +278,7 @@ class OnyxRedisLocks: "da_lock:check_connector_external_group_sync_beat" ) MONITOR_VESPA_SYNC_BEAT_LOCK = "da_lock:monitor_vespa_sync_beat" + MONITOR_BACKGROUND_PROCESSES_LOCK = "da_lock:monitor_background_processes" CONNECTOR_DOC_PERMISSIONS_SYNC_LOCK_PREFIX = ( "da_lock:connector_doc_permissions_sync" diff --git a/deployment/cloud_kubernetes/hpa/workers_hpa.yaml b/deployment/cloud_kubernetes/hpa/workers_hpa.yaml index fd24b9eeac3..5e35da9f088 100644 --- a/deployment/cloud_kubernetes/hpa/workers_hpa.yaml +++ b/deployment/cloud_kubernetes/hpa/workers_hpa.yaml @@ -54,3 +54,22 @@ spec: target: type: Utilization averageUtilization: 70 +--- +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: celery-worker-monitoring-hpa +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: celery-worker-indexing + minReplicas: 1 + maxReplicas: 4 + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 70 diff --git a/deployment/cloud_kubernetes/workers/monitoring.yaml b/deployment/cloud_kubernetes/workers/monitoring.yaml new file mode 100644 index 00000000000..6ccb7c99745 --- /dev/null +++ b/deployment/cloud_kubernetes/workers/monitoring.yaml @@ -0,0 +1,62 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: celery-worker-monitoring +spec: + replicas: 2 + selector: + matchLabels: + app: celery-worker-monitoring + template: + metadata: + labels: + app: celery-worker-monitoring + spec: + containers: + - name: celery-worker-monitoring + image: onyxdotapp/onyx-backend-cloud:v0.14.0-cloud.beta.21 + imagePullPolicy: IfNotPresent + command: + [ + "celery", + "-A", + "onyx.background.celery.versioned_apps.monitoring", + "worker", + "--loglevel=INFO", + "--hostname=monitoring@%n", + "-Q", + "monitoring", + "--prefetch-multiplier=8", + "--concurrency=8", + ] + env: + - name: REDIS_PASSWORD + valueFrom: + secretKeyRef: + name: onyx-secrets + key: redis_password + - name: ONYX_VERSION + value: "v0.11.0-cloud.beta.8" + envFrom: + - configMapRef: + name: env-configmap + volumeMounts: + - name: vespa-certificates + mountPath: "/app/certs" + readOnly: true + resources: + requests: + cpu: "1000m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "1Gi" + volumes: + - name: vespa-certificates + secret: + secretName: vespa-certificates + items: + - key: cert.pem + path: cert.pem + - key: key.pem + path: key.pem