Skip to content

Commit

Permalink
Add Redis lock implementation and Excel export
Browse files Browse the repository at this point in the history
  • Loading branch information
susilnem committed Sep 13, 2024
1 parent c8b2a3b commit da70bd9
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 49 deletions.
4 changes: 2 additions & 2 deletions api/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ def test_guest_user_permission(self):
"/api/v2/language/",
f"/api/v2/language/{id}/",
"/api/v2/event/",
"/api/v2/ops-learning/",
f"/api/v2/ops-learning/{id}/",
]

go_post_apis = [
Expand Down Expand Up @@ -98,8 +100,6 @@ def test_guest_user_permission(self):
f"/api/v2/flash-update/{id}/",
"/api/v2/local-units/",
f"/api/v2/local-units/{id}/",
"/api/v2/ops-learning/",
f"/api/v2/ops-learning/{id}/",
f"/api/v2/pdf-export/{id}/",
"/api/v2/per-assessment/",
f"/api/v2/per-assessment/{id}/",
Expand Down
48 changes: 48 additions & 0 deletions main/lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import time
import typing
from contextlib import contextmanager

from django.conf import settings
from django.core.cache import caches
from django.db import models
from django_redis.client import DefaultClient

cache: DefaultClient = caches["default"]


class RedisLockKey(models.TextChoices):
"""
Register for generating lock keys
"""

_BASE = "dj-lock"

OPERATION_LEARNING_SUMMARY = _BASE + "-operation-learning-summary-{0}"
OPERATION_LEARNING_SUMMARY_EXPORT = _BASE + "-operation-learning-summary-export-{0}"


@contextmanager
def redis_lock(
key: RedisLockKey,
id: typing.Union[int, str],
lock_expire: int = settings.REDIS_DEFAULT_LOCK_EXPIRE,
):
"""
Locking mechanism using Redis
"""
lock_id = key.format(id)
timeout_at = time.monotonic() + lock_expire - 3
# cache.add fails if the key already exists
status = cache.add(lock_id, 1, lock_expire)

try:
yield status
finally:
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if time.monotonic() < timeout_at and status:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else
# also don't release the lock if we didn't acquire it
cache.delete(lock_id)
3 changes: 3 additions & 0 deletions main/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,9 @@ def log_render_extra_context(record):
}
}

# Redis locking
REDIS_DEFAULT_LOCK_EXPIRE = 60 * 10 # Lock expires in 10min (in seconds)

if env("CACHE_MIDDLEWARE_SECONDS"):
CACHE_MIDDLEWARE_SECONDS = env("CACHE_MIDDLEWARE_SECONDS") # Planned: 600 for staging, 60 from prod
DISABLE_API_CACHE = env("DISABLE_API_CACHE")
Expand Down
8 changes: 4 additions & 4 deletions per/drf_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from api.models import Country
from deployments.models import SectorTag
from main.permissions import DenyGuestUserPermission
from main.permissions import DenyGuestUserMutationPermission, DenyGuestUserPermission
from main.utils import SpreadSheetContentNegotiation
from per.cache import OpslearningSummaryCacheHelper
from per.filter_set import (
Expand Down Expand Up @@ -751,8 +751,8 @@ class OpsLearningViewset(viewsets.ModelViewSet):
A simple ViewSet for viewing and editing OpsLearning records.
"""

queryset = OpsLearning.objects.all()
permission_classes = [DenyGuestUserPermission, OpsLearningPermission]
queryset = OpsLearning.objects.all().distinct()
permission_classes = [DenyGuestUserMutationPermission, OpsLearningPermission]
filterset_class = OpsLearningFilter
search_fields = (
"learning",
Expand Down Expand Up @@ -871,7 +871,7 @@ def get_renderer_context(self):
@action(
detail=False,
methods=["GET"],
permission_classes=[permissions.AllowAny],
permission_classes=[DenyGuestUserMutationPermission, OpsLearningPermission],
url_path="summary",
)
def summary(self, request):
Expand Down
15 changes: 14 additions & 1 deletion per/migrations/0121_opslearningcacheresponse_and_more.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Generated by Django 4.2.15 on 2024-08-23 03:54
# Generated by Django 4.2.15 on 2024-09-04 07:58

import django.db.models.deletion
from django.db import migrations, models
Expand Down Expand Up @@ -47,6 +47,19 @@ class Migration(migrations.Migration):
("contradictory_reports", models.TextField(blank=True, null=True, verbose_name="contradictory reports")),
("modified_at", models.DateTimeField(auto_now=True, verbose_name="modified_at")),
("created_at", models.DateTimeField(auto_now_add=True, verbose_name="created at")),
(
"export_status",
models.IntegerField(
choices=[(1, "Pending"), (2, "Success"), (3, "Failed")], default=1, verbose_name="export status"
),
),
(
"exported_file",
models.FileField(
blank=True, null=True, upload_to="ops-learning/summary/export/", verbose_name="exported file"
),
),
("exported_at", models.DateTimeField(blank=True, null=True, verbose_name="exported at")),
("used_ops_learning", models.ManyToManyField(related_name="+", to="per.opslearning")),
],
),
Expand Down
16 changes: 16 additions & 0 deletions per/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,11 @@ class Status(models.IntegerChoices):
NO_EXTRACT_AVAILABLE = 4, _("No extract available")
FAILED = 5, _("Failed")

class ExportStatus(models.IntegerChoices):
PENDING = 1, _("Pending")
SUCCESS = 2, _("Success")
FAILED = 3, _("Failed")

used_filters_hash = models.CharField(verbose_name=_("used filters hash"), max_length=32)
used_filters = models.JSONField(verbose_name=_("used filters"), default=dict)

Expand Down Expand Up @@ -805,6 +810,17 @@ class Status(models.IntegerChoices):
modified_at = models.DateTimeField(verbose_name=_("modified_at"), auto_now=True)
created_at = models.DateTimeField(verbose_name=_("created at"), auto_now_add=True)

# Caching for the exported file
export_status = models.IntegerField(
verbose_name=_("export status"),
choices=ExportStatus.choices,
default=ExportStatus.PENDING,
)
exported_file = models.FileField(
verbose_name=_("exported file"), upload_to="ops-learning/summary/export/", blank=True, null=True
)
exported_at = models.DateTimeField(verbose_name=_("exported at"), blank=True, null=True)

def __str__(self) -> str:
return self.used_filters_hash

Expand Down
18 changes: 6 additions & 12 deletions per/ops_learning_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import pandas as pd
import tiktoken
from django.conf import settings

# from django.db import transaction
from django.db.models import F
from django.utils.functional import cached_property
from openai import AzureOpenAI
Expand Down Expand Up @@ -205,8 +203,8 @@ def add_used_ops_learnings_component(
@classmethod
def fetch_ops_learnings(self, filter_data):
"""Fetches the OPS learnings from the database."""
ops_learning_qs = (
OpsLearning.objects.filter(is_validated=True)
ops_learning_filtered_qs = (
OpsLearning.objects.filter(is_validated=True, **filter_data)
.select_related(
"per_component_validated", "sector_validated", "appeal_code__country", "appeal_code__region", "appeal_code__dtype"
)
Expand All @@ -223,9 +221,6 @@ def fetch_ops_learnings(self, filter_data):
dtype_name=F("appeal_code__dtype__name"),
)
)
from per.drf_views import OpsLearningFilter

ops_learning_filtered_qs = OpsLearningFilter(filter_data, queryset=ops_learning_qs).qs
if not ops_learning_filtered_qs.exists():
logger.info("No OPS learnings found for the given filter.")
ops_learning_df = pd.DataFrame(
Expand Down Expand Up @@ -462,7 +457,8 @@ def _contextualize_learnings(df):
prioritized_learnings = self.prioritize(
ops_learning_df, components_countries, components_regions, global_list, type_prioritization
)
prioritized_learnings = ops_learning_df
else:
prioritized_learnings = ops_learning_df
logger.info("Prioritization of components completed.")
return prioritized_learnings

Expand Down Expand Up @@ -493,7 +489,7 @@ def primary_prioritize_excerpts(self, df: pd.DataFrame):
df.drop_duplicates(subset="learning").sort_values(by="appeal_year", ascending=False).reset_index(drop=True)
)

# Slice the Primary and secondary dataframes
# Slice the Primary DataFrame
sliced_primary_learning_df = self.slice_dataframe(primary_learning_df, self.PROMPT_DATA_LENGTH_LIMIT, self.ENCODING_NAME)
logger.info("Primary excerpts prioritized within token limit.")
return sliced_primary_learning_df
Expand All @@ -517,7 +513,7 @@ def seconday_prioritize_excerpts(self, df: pd.DataFrame):
pd.DataFrame(interleaved, columns=secondary_learning_df.columns).dropna(subset=["component"]).reset_index(drop=True)
)

# Slice secondary dataframes
# Slice secondary DataFrame
sliced_secondary_learning_df = self.slice_dataframe(result, self.PROMPT_DATA_LENGTH_LIMIT, self.ENCODING_NAME)
logger.info("Excerpts prioritized within token limit.")
return sliced_secondary_learning_df
Expand All @@ -531,8 +527,6 @@ def _build_intro_section(self):
+ "\n\n\n\n"
)

# Adding the used extracts in primary insights

@classmethod
def _build_instruction_section(self, request_filter: dict, df: pd.DataFrame, instruction: str):
"""Builds the instruction section of the prompt based on the request filter and DataFrame."""
Expand Down
4 changes: 3 additions & 1 deletion per/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1215,7 +1215,9 @@ class Meta:
]

def get_total_extracts_count(self, obj) -> int:
return OpsLearning.objects.filter(is_validated=True).values("id").count()
from per.drf_views import OpsLearningFilter

return OpsLearningFilter(obj.used_filters, queryset=OpsLearning.objects.filter(is_validated=True)).qs.count()

def get_used_extracts_count(self, obj) -> int:
return OpsLearning.objects.filter(id__in=obj.used_ops_learning.all()).values("id").count()
Expand Down
51 changes: 23 additions & 28 deletions per/task.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,12 @@
from celery import shared_task

from api.logger import logger
from main.lock import RedisLockKey, redis_lock
from per.models import OpsLearningCacheResponse
from per.ops_learning_summary import OpsLearningSummaryTask


def validate_primary_summary_generation(filter_data: dict) -> bool:
"""
Validates if primary summary generation is required or not
"""
keys = {"appeal_code__country", "appeal_code__region"}
if any(key in filter_data for key in keys):
for key in keys:
filter_data.pop(key, None)
return bool(filter_data)
return False


@shared_task
def generate_summary(ops_learning_summary_id: int, filter_data: dict):
def generate_ops_learning_summary(ops_learning_summary_id: int, filter_data: dict):
ops_learning_summary_instance = OpsLearningCacheResponse.objects.filter(id=ops_learning_summary_id).first()
if not ops_learning_summary_instance:
logger.error("Ops learning summary not found", exc_info=True)
Expand All @@ -43,20 +31,18 @@ def generate_summary(ops_learning_summary_id: int, filter_data: dict):
ops_learning_df=ops_learning_df, regional_list=regional_list, global_list=global_list, country_list=country_list
)

# NOTE: Primary summary generation is only required if region, country and any other filter is provided
if validate_primary_summary_generation(filter_data):
# Prioritize excerpts for primary insights
primary_learning_df = OpsLearningSummaryTask.primary_prioritize_excerpts(prioritized_learnings)
# Format primary prompt
primary_learning_prompt = OpsLearningSummaryTask.format_primary_prompt(
ops_learning_summary_instance=ops_learning_summary_instance,
primary_learning_df=primary_learning_df,
filter_data=filter_data,
)
# Generate primary summary
OpsLearningSummaryTask.get_or_create_primary_summary(
ops_learning_summary_instance=ops_learning_summary_instance, primary_learning_prompt=primary_learning_prompt
)
# Prioritize excerpts for primary insights
primary_learning_df = OpsLearningSummaryTask.primary_prioritize_excerpts(prioritized_learnings)
# Format primary prompt
primary_learning_prompt = OpsLearningSummaryTask.format_primary_prompt(
ops_learning_summary_instance=ops_learning_summary_instance,
primary_learning_df=primary_learning_df,
filter_data=filter_data,
)
# Generate primary summary
OpsLearningSummaryTask.get_or_create_primary_summary(
ops_learning_summary_instance=ops_learning_summary_instance, primary_learning_prompt=primary_learning_prompt
)

# Prioritize excerpts for secondary insights
secondary_learning_df = OpsLearningSummaryTask.seconday_prioritize_excerpts(prioritized_learnings)
Expand Down Expand Up @@ -84,3 +70,12 @@ def generate_summary(ops_learning_summary_id: int, filter_data: dict):
)
logger.error("No extracts found", exc_info=True)
return False


@shared_task
def generate_summary(ops_learning_summary_id: int, filter_data: dict):
with redis_lock(key=RedisLockKey.OPERATION_LEARNING_SUMMARY, id=ops_learning_summary_id) as acquired:
if not acquired:
logger.warning("Ops learning summary generation is already in progress")
return False
return generate_ops_learning_summary(ops_learning_summary_id, filter_data)
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit da70bd9

Please sign in to comment.