diff --git a/kolibri/core/auth/management/commands/resumesync.py b/kolibri/core/auth/management/commands/resumesync.py index c95000c188c..d1fefd0b9f4 100644 --- a/kolibri/core/auth/management/commands/resumesync.py +++ b/kolibri/core/auth/management/commands/resumesync.py @@ -8,7 +8,9 @@ class Command(MorangoSyncCommand): def add_arguments(self, parser): parser.add_argument( - "--id", type=str, help="ID of an incomplete session to resume sync" + "--sync-session-id", + type=str, + help="ID of an incomplete session to resume sync", ) parser.add_argument( "--baseurl", type=str, default=DATA_PORTAL_SYNCING_BASE_URL, dest="baseurl" @@ -45,7 +47,7 @@ def add_arguments(self, parser): def handle_async(self, *args, **options): (baseurl, sync_session_id, chunk_size,) = ( options["baseurl"], - options["id"], + options["sync_session_id"], options["chunk_size"], ) diff --git a/kolibri/core/auth/management/utils.py b/kolibri/core/auth/management/utils.py index 5d36b0d04d5..c64529e75f9 100644 --- a/kolibri/core/auth/management/utils.py +++ b/kolibri/core/auth/management/utils.py @@ -418,6 +418,13 @@ def _sync(self, sync_session_client, **options): # noqa: C901 dataset_cache.clear() dataset_cache.activate() + # add the sync session ID to the job (task) if it exists for retrying it + if self.job: + self.job.extra_metadata.update( + sync_session_id=sync_session_client.sync_session.id + ) + self.job.save_meta() + if not noninteractive: # output session ID for CLI user logger.info("Session ID: {}".format(sync_session_client.sync_session.id)) @@ -435,7 +442,7 @@ def _sync(self, sync_session_client, **options): # noqa: C901 noninteractive, pull_filter, ) - # and push our own data to server + # and push our own data to server if not no_push: self._push( sync_session_client, diff --git a/kolibri/core/auth/tasks.py b/kolibri/core/auth/tasks.py index b0cb8ab2677..163b023460c 100644 --- a/kolibri/core/auth/tasks.py +++ b/kolibri/core/auth/tasks.py @@ -13,6 +13,7 @@ from django.utils import timezone from morango.errors import MorangoResumeSyncError from morango.models import InstanceIDModel +from morango.models.core import SyncSession from requests.exceptions import ConnectionError from rest_framework import serializers from rest_framework import status @@ -261,6 +262,26 @@ def validate(self, data): "args": [data["command"]], } + def validate_for_restart(self, job): + data = super(SyncJobValidator, self).validate_for_restart(job) + + # find the sync_session_id the command added to the job metadata when it ran + sync_session_id = job.extra_metadata.get("sync_session_id") + if sync_session_id: + try: + SyncSession.objects.get(pk=sync_session_id, active=True) + except SyncSession.DoesNotExist: + sync_session_id = None + + # if we didn't get an existing active sync_session_id, + # we'll fall back to default functionality + if sync_session_id: + kwargs = data.get("kwargs") + kwargs.update(sync_session_id=sync_session_id) + data.update(args=("resumesync",), kwargs=kwargs) + + return data + facility_task_queue = "facility_task" diff --git a/kolibri/core/auth/test/test_auth_tasks.py b/kolibri/core/auth/test/test_auth_tasks.py index 2720e04bd30..403b50c6b7a 100644 --- a/kolibri/core/auth/test/test_auth_tasks.py +++ b/kolibri/core/auth/test/test_auth_tasks.py @@ -5,6 +5,7 @@ from django.urls import reverse from mock import Mock from mock import patch +from morango.models.core import SyncSession from requests.exceptions import ConnectionError from rest_framework import serializers from rest_framework.exceptions import AuthenticationFailed @@ -29,6 +30,7 @@ from kolibri.core.public.constants.user_sync_statuses import QUEUED from kolibri.core.public.constants.user_sync_statuses import SYNC from kolibri.core.tasks.job import Job +from kolibri.core.tasks.job import State DUMMY_PASSWORD = "password" @@ -41,6 +43,7 @@ traceback="", percentage_progress=0, cancellable=False, + track_progress=True, extra_metadata={}, func="", ) @@ -668,6 +671,46 @@ def test_validate_and_create_sync_credentials_no_credentials( with self.assertRaises(PermissionDenied): PeerFacilitySyncJobValidator(data=data).is_valid(raise_exception=True) + def test_validate_for_restart__not_restartable(self): + job = fake_job(state=State.RUNNING) + with self.assertRaises(serializers.ValidationError): + PeerFacilitySyncJobValidator(instance=job).data + + def test_validate_for_restart__missing_sync_session(self): + job = fake_job(state=State.FAILED, args=("sync",), kwargs={"test": True}) + new_job_data = PeerFacilitySyncJobValidator(instance=job).data + self.assertEqual(new_job_data["args"], ("sync",)) + self.assertEqual(new_job_data["kwargs"], {"test": True}) + + @patch("kolibri.core.auth.tasks.SyncSession.objects.get") + def test_validate_for_restart__inactive_sync_session(self, mock_get): + job = fake_job( + state=State.FAILED, + args=("sync",), + kwargs={"test": True}, + extra_metadata={"sync_session_id": "abc123"}, + ) + mock_get.side_effect = SyncSession.DoesNotExist + new_job_data = PeerFacilitySyncJobValidator(instance=job).data + mock_get.assert_called_once_with(pk="abc123", active=True) + self.assertEqual(new_job_data["args"], ("sync",)) + self.assertEqual(new_job_data["kwargs"], {"test": True}) + + @patch("kolibri.core.auth.tasks.SyncSession.objects.get") + def test_validate_for_restart__resume(self, mock_get): + job = fake_job( + state=State.FAILED, + args=("sync",), + kwargs={"test": True}, + extra_metadata={"sync_session_id": "abc123"}, + ) + new_job_data = PeerFacilitySyncJobValidator(instance=job).data + mock_get.assert_called_once_with(pk="abc123", active=True) + self.assertEqual(new_job_data["args"], ("resumesync",)) + self.assertEqual( + new_job_data["kwargs"], {"test": True, "sync_session_id": "abc123"} + ) + class TestRequestSoUDSync(TestCase): def setUp(self): diff --git a/kolibri/core/tasks/api.py b/kolibri/core/tasks/api.py index ea7bb27ca70..53c90720674 100644 --- a/kolibri/core/tasks/api.py +++ b/kolibri/core/tasks/api.py @@ -9,7 +9,6 @@ from six import string_types from kolibri.core.tasks.exceptions import JobNotFound -from kolibri.core.tasks.exceptions import JobNotRestartable from kolibri.core.tasks.job import State from kolibri.core.tasks.main import job_storage from kolibri.core.tasks.registry import TaskRegistry @@ -185,13 +184,15 @@ def restart(self, request, pk=None): job_to_restart = self._get_job_for_pk(request, pk) - try: - restarted_job_id = job_storage.restart_job(job_id=job_to_restart.job_id) - except JobNotRestartable: - raise serializers.ValidationError( - "Cannot restart job with state: {}".format(job_to_restart.state) - ) + registered_task = TaskRegistry[job_to_restart.func] + job = registered_task.validate_job_restart(request.user, job_to_restart) + # delete existing task after validation + job_storage.clear(job_id=job_to_restart.job_id, force=False) + + restarted_job_id = job_storage.enqueue_job( + job, queue=registered_task.queue, priority=registered_task.priority + ) job_response = self._job_to_response( job_storage.get_job(job_id=restarted_job_id) ) diff --git a/kolibri/core/tasks/exceptions.py b/kolibri/core/tasks/exceptions.py index 10bfbb168ab..8f23bd541f3 100644 --- a/kolibri/core/tasks/exceptions.py +++ b/kolibri/core/tasks/exceptions.py @@ -11,7 +11,3 @@ class UserCancelledError(CancelledError): class JobNotFound(Exception): pass - - -class JobNotRestartable(Exception): - pass diff --git a/kolibri/core/tasks/job.py b/kolibri/core/tasks/job.py index 882eaac407f..bcb4470c15b 100644 --- a/kolibri/core/tasks/job.py +++ b/kolibri/core/tasks/job.py @@ -182,18 +182,6 @@ def from_json(cls, json_string): return Job(func, **working_dictionary) - @classmethod - def from_job(cls, job, **kwargs): - if not isinstance(job, cls): - raise TypeError("job must be an instance of {}".format(cls)) - kwargs["args"] = copy.copy(job.args) - kwargs["kwargs"] = copy.copy(job.kwargs) - kwargs["track_progress"] = job.track_progress - kwargs["cancellable"] = job.cancellable - kwargs["extra_metadata"] = job.extra_metadata.copy() - kwargs["facility_id"] = job.facility_id - return cls(job.func, **kwargs) - def __init__( self, func, diff --git a/kolibri/core/tasks/registry.py b/kolibri/core/tasks/registry.py index 8cb76fdbb57..2a45ae518cd 100644 --- a/kolibri/core/tasks/registry.py +++ b/kolibri/core/tasks/registry.py @@ -229,6 +229,24 @@ def validate_job_data(self, user, data): return job + def validate_job_restart(self, user, job): + """ + :type user: kolibri.core.auth.models.FacilityUser + :type job: kolibri.core.tasks.job.Job + :return: A new job object for restarting + :rtype: kolibri.core.tasks.job.Job + """ + validator = self.validator(instance=job, context={"user": user}) + + try: + job = self._ready_job(**validator.data) + except TypeError: + raise serializers.ValidationError( + "Invalid job data returned from validator." + ) + + return job + def enqueue(self, job=None, **job_kwargs): """ Enqueue the function with arguments passed to this method. diff --git a/kolibri/core/tasks/storage.py b/kolibri/core/tasks/storage.py index 589f0a04ee4..3f3e0934ee9 100644 --- a/kolibri/core/tasks/storage.py +++ b/kolibri/core/tasks/storage.py @@ -16,7 +16,6 @@ from kolibri.core.tasks.constants import DEFAULT_QUEUE from kolibri.core.tasks.exceptions import JobNotFound -from kolibri.core.tasks.exceptions import JobNotRestartable from kolibri.core.tasks.job import Job from kolibri.core.tasks.job import Priority from kolibri.core.tasks.job import State @@ -241,33 +240,6 @@ def get_job(self, job_id): job, _ = self._get_job_and_orm_job(job_id, session) return job - def restart_job(self, job_id): - """ - First deletes the job with id = job_id then enqueues a new job with the same - job_id as the one we deleted, with same args and kwargs. - - Returns the job_id of enqueued job. - - Raises `JobNotRestartable` exception if the job with id = job_id state is - not in CANCELED or FAILED. - """ - with self.session_scope() as session: - job_to_restart, orm_job = self._get_job_and_orm_job(job_id, session) - queue = orm_job.queue - priority = orm_job.priority - - if job_to_restart.state in [State.CANCELED, State.FAILED]: - self.clear(job_id=job_to_restart.job_id, force=False) - job = Job.from_job( - job_to_restart, - job_id=job_to_restart.job_id, - ) - return self.enqueue_job(job, queue=queue, priority=priority) - else: - raise JobNotRestartable( - "Cannot restart job with state={}".format(job_to_restart.state) - ) - def check_job_canceled(self, job_id): job = self.get_job(job_id) return job.state == State.CANCELED or job.state == State.CANCELING diff --git a/kolibri/core/tasks/test/taskrunner/test_storage.py b/kolibri/core/tasks/test/taskrunner/test_storage.py index 0a2da1d3f91..416c0fc34c2 100644 --- a/kolibri/core/tasks/test/taskrunner/test_storage.py +++ b/kolibri/core/tasks/test/taskrunner/test_storage.py @@ -2,10 +2,8 @@ import time import pytest -from mock import patch from kolibri.core.tasks.decorators import register_task -from kolibri.core.tasks.exceptions import JobNotRestartable from kolibri.core.tasks.job import Job from kolibri.core.tasks.job import Priority from kolibri.core.tasks.job import State @@ -142,27 +140,3 @@ def test_gets_oldest_high_priority_job_first(self, defaultbackend, simplejob): defaultbackend.enqueue_job(simplejob, QUEUE, Priority.HIGH) assert defaultbackend.get_next_queued_job().job_id == job_id - - def test_restart_job(self, defaultbackend, simplejob): - with patch("kolibri.core.tasks.main.job_storage", wraps=defaultbackend): - job_id = defaultbackend.enqueue_job(simplejob, QUEUE) - - for state in [ - State.COMPLETED, - State.RUNNING, - State.QUEUED, - State.SCHEDULED, - State.CANCELING, - ]: - defaultbackend._update_job(job_id, state) - with pytest.raises(JobNotRestartable): - defaultbackend.restart_job(job_id) - - for state in [State.CANCELED, State.FAILED]: - defaultbackend._update_job(job_id, state) - - restarted_job_id = defaultbackend.restart_job(job_id) - restarted_job = defaultbackend.get_job(restarted_job_id) - - assert restarted_job_id == job_id - assert restarted_job.state == State.QUEUED diff --git a/kolibri/core/tasks/test/test_api.py b/kolibri/core/tasks/test/test_api.py index 3d8d12b250b..4f8f961dfef 100644 --- a/kolibri/core/tasks/test/test_api.py +++ b/kolibri/core/tasks/test/test_api.py @@ -726,15 +726,33 @@ def test_retrieval_404(self, mock_job_storage): def test_restart_task(self, mock_job_storage): self.client.login(username=self.facility2user.username, password=DUMMY_PASSWORD) - mock_job_storage.restart_job.return_value = self.jobs[2].job_id + self.jobs[2].state = State.FAILED mock_job_storage.get_job.return_value = self.jobs[2] + def _clear(**kwargs): + self.jobs[2].state = State.QUEUED + + mock_job_storage.clear.side_effect = _clear + response = self.client.post( reverse("kolibri:core:task-restart", kwargs={"pk": "2"}), format="json" ) self.assertEqual(response.data, self.jobs_response[2]) - mock_job_storage.restart_job.assert_called_once_with(job_id="2") + mock_job_storage.clear.assert_called_once_with(job_id="2", force=False) + + def test_restart_task__not_restartable(self, mock_job_storage): + self.client.login(username=self.facility2user.username, password=DUMMY_PASSWORD) + + mock_job_storage.get_job.return_value = self.jobs[2] + + response = self.client.post( + reverse("kolibri:core:task-restart", kwargs={"pk": "2"}), format="json" + ) + + self.assertEqual(response.status_code, 400) + self.assertEqual(str(response.data[0]), "Cannot restart job with state=QUEUED") + mock_job_storage.clear.assert_not_called() def test_restart_task_respect_permissions(self, mock_job_storage): self.client.login(username=self.facility2user.username, password=DUMMY_PASSWORD) diff --git a/kolibri/core/tasks/test/test_job.py b/kolibri/core/tasks/test/test_job.py index fceeed5e315..e1147947f79 100644 --- a/kolibri/core/tasks/test/test_job.py +++ b/kolibri/core/tasks/test/test_job.py @@ -53,10 +53,15 @@ def test_job_save_as_cancellable__no_storage(self): self.job.save_as_cancellable(cancellable=cancellable) +class TestingJobValidator(JobValidator): + pass + + class TestRegisteredTask(TestCase): def setUp(self): self.registered_task = RegisteredTask( int, + validator=TestingJobValidator, priority=Priority.HIGH, queue="test", permission_classes=[IsSuperAdmin], @@ -67,7 +72,7 @@ def setUp(self): def test_constructor_sets_required_params(self): self.assertEqual(self.registered_task.func, int) - self.assertEqual(self.registered_task.validator, JobValidator) + self.assertEqual(self.registered_task.validator, TestingJobValidator) self.assertEqual(self.registered_task.priority, Priority.HIGH) self.assertTrue(isinstance(self.registered_task.permissions[0], IsSuperAdmin)) self.assertEqual(self.registered_task.job_id, "test") @@ -153,3 +158,20 @@ def test_enqueue(self, job_storage_mock, _ready_job_mock): queue=self.registered_task.queue, priority=self.registered_task.priority, ) + + @mock.patch("kolibri.core.tasks.registry.RegisteredTask._ready_job") + def test_validate_job_restart(self, _ready_job_mock): + mock_user = mock.MagicMock(spec="kolibri.core.auth.models.FacilityUser") + mock_job = mock.MagicMock(spec="kolibri.core.tasks.registry.Job") + + _ready_job_mock.return_value = "job" + + with mock.patch.object( + TestingJobValidator, "validate_for_restart" + ) as mock_validate_for_restart: + mock_validate_for_restart.return_value = {"test": True} + result = self.registered_task.validate_job_restart(mock_user, mock_job) + mock_validate_for_restart.assert_called_once_with(mock_job) + + self.assertEqual(result, "job") + _ready_job_mock.assert_called_once_with(test=True) diff --git a/kolibri/core/tasks/test/test_validation.py b/kolibri/core/tasks/test/test_validation.py new file mode 100644 index 00000000000..a5244ca9247 --- /dev/null +++ b/kolibri/core/tasks/test/test_validation.py @@ -0,0 +1,48 @@ +from django.test import SimpleTestCase +from rest_framework import serializers + +from kolibri.core.tasks.job import Job +from kolibri.core.tasks.job import State +from kolibri.core.tasks.validation import JobValidator + + +class JobValidatorTestCase(SimpleTestCase): + def setUp(self): + def add(x, y): + return x + y + + self.job = Job( + add, + job_id="123", + state=State.PENDING, + args=("test",), + kwargs={"test": True}, + track_progress=True, + cancellable=False, + extra_metadata={"extra": True}, + ) + + def test_validate_for_restart(self): + for state in [State.CANCELED, State.FAILED]: + self.job.state = state + validator = JobValidator(instance=self.job) + self.assertEqual( + validator.data, + dict( + job_id="123", + args=("test",), + kwargs={"test": True}, + track_progress=True, + cancellable=False, + extra_metadata={"extra": True}, + facility_id=None, + ), + ) + + def test_validate_for_restart__not_restartable(self): + for state in [State.QUEUED, State.COMPLETED, State.SCHEDULED, State.RUNNING]: + self.job.state = state + validator = JobValidator(instance=self.job) + + with self.assertRaises(serializers.ValidationError): + self.assertFalse(validator.data) diff --git a/kolibri/core/tasks/validation.py b/kolibri/core/tasks/validation.py index af9186c72e5..a99046019dc 100644 --- a/kolibri/core/tasks/validation.py +++ b/kolibri/core/tasks/validation.py @@ -1,5 +1,9 @@ +import copy + from rest_framework import serializers +from kolibri.core.tasks.job import State + class JobValidator(serializers.Serializer): """ @@ -19,6 +23,31 @@ def validate(self, data): "extra_metadata": {}, } + def validate_for_restart(self, job): + """ + :param job: The job for which to restart + :type job: kolibri.core.tasks.job.Job + :return: A dictionary of data for instantiating a new job + """ + if job.state not in [State.CANCELED, State.FAILED]: + raise serializers.ValidationError( + "Cannot restart job with state={}".format(job.state) + ) + + return { + # default behavior is to retain the same job ID, so the existing job requires deletion + "job_id": job.job_id, + "args": copy.copy(job.args), + "kwargs": copy.copy(job.kwargs), + "track_progress": job.track_progress, + "cancellable": job.cancellable, + "extra_metadata": job.extra_metadata.copy(), + "facility_id": job.facility_id, + } + + def to_representation(self, instance): + return self.validate_for_restart(instance or self.instance) + def run_validation(self, data): value = super(JobValidator, self).run_validation(data) if not isinstance(value, dict): diff --git a/kolibri/plugins/device/assets/src/constants.js b/kolibri/plugins/device/assets/src/constants.js index b9d57a09f3a..d67d7d41f57 100644 --- a/kolibri/plugins/device/assets/src/constants.js +++ b/kolibri/plugins/device/assets/src/constants.js @@ -55,6 +55,21 @@ export const TaskStatuses = Object.freeze({ CANCELING: 'CANCELING', }); +export const SyncTaskStatuses = { + SESSION_CREATION: 'SESSION_CREATION', + REMOTE_QUEUING: 'REMOTE_QUEUING', + PULLING: 'PULLING', + LOCAL_DEQUEUING: 'LOCAL_DEQUEUING', + LOCAL_QUEUING: 'LOCAL_QUEUING', + PUSHING: 'PUSHING', + REMOTE_DEQUEUING: 'REMOTE_DEQUEUING', + REMOVING_FACILITY: 'REMOVING_FACILITY', + PENDING: 'PENDING', + COMPLETED: 'COMPLETED', + CANCELLED: 'CANCELLED', + FAILED: 'FAILED', +}; + export const TransferTypes = { LOCALEXPORT: 'localexport', LOCALIMPORT: 'localimport', diff --git a/kolibri/plugins/device/assets/src/views/FacilitiesPage/FacilityTaskPanel.vue b/kolibri/plugins/device/assets/src/views/FacilitiesPage/FacilityTaskPanel.vue index 500c42f5726..5bdabdd2592 100644 --- a/kolibri/plugins/device/assets/src/views/FacilitiesPage/FacilityTaskPanel.vue +++ b/kolibri/plugins/device/assets/src/views/FacilitiesPage/FacilityTaskPanel.vue @@ -1,13 +1,13 @@