Skip to content

Commit

Permalink
OpenConceptLab/ocl_issues#2035 Fix new bulk import queuing
Browse files Browse the repository at this point in the history
  • Loading branch information
rkorytkowski committed Dec 13, 2024
1 parent 9548e2d commit 689fc03
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 17 deletions.
2 changes: 1 addition & 1 deletion core/code_systems/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ def update(self, instance, validated_data):
concepts = validated_data.pop('concepts', [])
new_version = validated_data.get('version')

if instance.version == new_version:
if instance.active_versions.filter(version=new_version):
self._errors.update({'version': f'Version {new_version} already exists for CodeSystem'
f' {instance.mnemonic}.'})
return instance
Expand Down
2 changes: 1 addition & 1 deletion core/concept_maps/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def update(self, instance, validated_data):
mappings = validated_data.pop('mappings', [])
new_version = validated_data.get('version')

if instance.version == new_version:
if instance.active_versions.filter(version=new_version):
self._errors.update({'version': f'Version {new_version} already exists for CodeSystem'
f' {instance.mnemonic}.'})
return instance
Expand Down
65 changes: 51 additions & 14 deletions core/importers/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from zipfile import ZipFile

from celery.result import AsyncResult, result_from_tuple
from celery import group, chain
from celery import group, chain, chord

import ijson
import requests
Expand All @@ -20,6 +20,7 @@
from pydantic import BaseModel, computed_field, PrivateAttr

from django.utils import timezone
from rest_framework.exceptions import ValidationError

from core import settings
from core.common.serializers import IdentifierSerializer
Expand All @@ -30,6 +31,7 @@
from core.importers.models import SourceImporter, SourceVersionImporter, ConceptImporter, OrganizationImporter, \
CollectionImporter, CollectionVersionImporter, MappingImporter, ReferenceImporter, CREATED, UPDATED, FAILED, \
DELETED, NOT_FOUND, PERMISSION_DENIED, UNCHANGED
from core.orgs.models import Organization
from core.sources.models import Source
from core.users.models import UserProfile
from core.collections.models import Collection
Expand Down Expand Up @@ -422,10 +424,12 @@ def schedule_tasks(self, tasks):
group_task['resource_type'], group_task['files'])
.set(queue='concurrent'))
if len(group_tasks) == 1: # Prevent celery from converting group to a single task
group_tasks.append(bulk_import_subtask_empty.si())
chained_tasks |= group(group_tasks)
group_tasks.append(bulk_import_subtask_empty.si().set(queue='concurrent'))

chained_tasks |= chord(group(group_tasks), bulk_import_subtask_empty.si().set(queue='concurrent'))
chained_tasks |= import_finisher.si(self.task_id).set(queue='concurrent')
final_task = chained_tasks.apply_async()

final_task = chained_tasks.apply_async(queue='concurrent')
return final_task

def is_importable_file(self, file_name):
Expand Down Expand Up @@ -507,10 +511,7 @@ def import_resource(self, resource, username, owner_type, owner):
# pylint: disable=too-many-arguments
@staticmethod
def import_concept_map(owner, owner_type, resource, resource_type, url, username):
source = Source.objects.filter(canonical_url=url)
if not source:
url = IdentifierSerializer.convert_fhir_url_to_ocl_uri(url, 'sources')
source = Source.objects.filter(uri=url)
source = ResourceImporter.find_existing_source(owner, owner_type, url)
context = {
'request': ImportRequest(owner_type, owner, username, resource_type)
}
Expand All @@ -524,13 +525,52 @@ def import_concept_map(owner, owner_type, resource, resource_type, url, username
serializer.save()
return serializer.errors if serializer.errors else result

@staticmethod
def find_existing_source(owner, owner_type, url):
org, user = None, None
if owner_type.lower() in ['orgs', 'organization']:
org = Organization.objects.filter(mnemonic=owner).first()
else:
user = UserProfile.objects.filter(username=owner).first()

if not org and not user:
raise ValidationError(f"Cannot find owner of type {owner_type} and id {owner}")

if org:
source = Source.objects.filter(canonical_url=url, organization=org)
else:
source = Source.objects.filter(canonical_url=url, user=user)
if not source:
url = IdentifierSerializer.convert_fhir_url_to_ocl_uri(url, 'sources')
if org:
source = Source.objects.filter(uri=url, organization=org)
else:
source = Source.objects.filter(uri=url, user=user)
return source

# pylint: disable=too-many-arguments
@staticmethod
def import_value_set(owner, owner_type, resource, resource_type, url, username):
collection = Collection.objects.filter(canonical_url=url)
org, user = None, None
if owner_type.lower() in ['orgs', 'organization']:
org = Organization.objects.filter(mnemonic=owner).first()
else:
user = UserProfile.objects.filter(username=owner).first()

if not org and not user:
raise ValidationError(f"Cannot find owner of type {owner_type} and id {owner}")

if org:
collection = Collection.objects.filter(canonical_url=url, organization=org)
else:
collection = Collection.objects.filter(canonical_url=url, user=user)

if not collection:
url = IdentifierSerializer.convert_fhir_url_to_ocl_uri(url, 'collections')
collection = Collection.objects.filter(uri=url)
if org:
collection = Collection.objects.filter(uri=url, organization=org)
else:
collection = Collection.objects.filter(uri=url, user=user)
context = {
'request': ImportRequest(owner_type, owner, username, resource_type)
}
Expand All @@ -547,10 +587,7 @@ def import_value_set(owner, owner_type, resource, resource_type, url, username):
# pylint: disable=too-many-arguments
@staticmethod
def import_code_system(owner, owner_type, resource, resource_type, url, username):
source = Source.objects.filter(canonical_url=url)
if not source:
url = IdentifierSerializer.convert_fhir_url_to_ocl_uri(url, 'sources')
source = Source.objects.filter(uri=url)
source = ResourceImporter.find_existing_source(owner, owner_type, url)
context = {
'request': ImportRequest(owner_type, owner, username, resource_type)
}
Expand Down
2 changes: 1 addition & 1 deletion core/value_sets/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def update(self, instance, validated_data):
head_collection = instance.head
new_version = validated_data.get('version')

if instance.version == new_version:
if instance.active_versions.filter(version=new_version):
self._errors.update({'version': f'Version {new_version} already exists for CodeSystem'
f' {instance.mnemonic}.'})
return instance
Expand Down

0 comments on commit 689fc03

Please sign in to comment.