diff --git a/kolibri/core/auth/management/commands/resumesync.py b/kolibri/core/auth/management/commands/resumesync.py
index c95000c188c..d1fefd0b9f4 100644
--- a/kolibri/core/auth/management/commands/resumesync.py
+++ b/kolibri/core/auth/management/commands/resumesync.py
@@ -8,7 +8,9 @@ class Command(MorangoSyncCommand):
def add_arguments(self, parser):
parser.add_argument(
- "--id", type=str, help="ID of an incomplete session to resume sync"
+ "--sync-session-id",
+ type=str,
+ help="ID of an incomplete session to resume sync",
)
parser.add_argument(
"--baseurl", type=str, default=DATA_PORTAL_SYNCING_BASE_URL, dest="baseurl"
@@ -45,7 +47,7 @@ def add_arguments(self, parser):
def handle_async(self, *args, **options):
(baseurl, sync_session_id, chunk_size,) = (
options["baseurl"],
- options["id"],
+ options["sync_session_id"],
options["chunk_size"],
)
diff --git a/kolibri/core/auth/management/utils.py b/kolibri/core/auth/management/utils.py
index 5d36b0d04d5..c64529e75f9 100644
--- a/kolibri/core/auth/management/utils.py
+++ b/kolibri/core/auth/management/utils.py
@@ -418,6 +418,13 @@ def _sync(self, sync_session_client, **options): # noqa: C901
dataset_cache.clear()
dataset_cache.activate()
+ # add the sync session ID to the job (task) if it exists for retrying it
+ if self.job:
+ self.job.extra_metadata.update(
+ sync_session_id=sync_session_client.sync_session.id
+ )
+ self.job.save_meta()
+
if not noninteractive:
# output session ID for CLI user
logger.info("Session ID: {}".format(sync_session_client.sync_session.id))
@@ -435,7 +442,7 @@ def _sync(self, sync_session_client, **options): # noqa: C901
noninteractive,
pull_filter,
)
- # and push our own data to server
+ # and push our own data to server
if not no_push:
self._push(
sync_session_client,
diff --git a/kolibri/core/auth/tasks.py b/kolibri/core/auth/tasks.py
index b0cb8ab2677..163b023460c 100644
--- a/kolibri/core/auth/tasks.py
+++ b/kolibri/core/auth/tasks.py
@@ -13,6 +13,7 @@
from django.utils import timezone
from morango.errors import MorangoResumeSyncError
from morango.models import InstanceIDModel
+from morango.models.core import SyncSession
from requests.exceptions import ConnectionError
from rest_framework import serializers
from rest_framework import status
@@ -261,6 +262,26 @@ def validate(self, data):
"args": [data["command"]],
}
+ def validate_for_restart(self, job):
+ data = super(SyncJobValidator, self).validate_for_restart(job)
+
+ # find the sync_session_id the command added to the job metadata when it ran
+ sync_session_id = job.extra_metadata.get("sync_session_id")
+ if sync_session_id:
+ try:
+ SyncSession.objects.get(pk=sync_session_id, active=True)
+ except SyncSession.DoesNotExist:
+ sync_session_id = None
+
+ # if we didn't get an existing active sync_session_id,
+ # we'll fall back to default functionality
+ if sync_session_id:
+ kwargs = data.get("kwargs")
+ kwargs.update(sync_session_id=sync_session_id)
+ data.update(args=("resumesync",), kwargs=kwargs)
+
+ return data
+
facility_task_queue = "facility_task"
diff --git a/kolibri/core/auth/test/test_auth_tasks.py b/kolibri/core/auth/test/test_auth_tasks.py
index 2720e04bd30..403b50c6b7a 100644
--- a/kolibri/core/auth/test/test_auth_tasks.py
+++ b/kolibri/core/auth/test/test_auth_tasks.py
@@ -5,6 +5,7 @@
from django.urls import reverse
from mock import Mock
from mock import patch
+from morango.models.core import SyncSession
from requests.exceptions import ConnectionError
from rest_framework import serializers
from rest_framework.exceptions import AuthenticationFailed
@@ -29,6 +30,7 @@
from kolibri.core.public.constants.user_sync_statuses import QUEUED
from kolibri.core.public.constants.user_sync_statuses import SYNC
from kolibri.core.tasks.job import Job
+from kolibri.core.tasks.job import State
DUMMY_PASSWORD = "password"
@@ -41,6 +43,7 @@
traceback="",
percentage_progress=0,
cancellable=False,
+ track_progress=True,
extra_metadata={},
func="",
)
@@ -668,6 +671,46 @@ def test_validate_and_create_sync_credentials_no_credentials(
with self.assertRaises(PermissionDenied):
PeerFacilitySyncJobValidator(data=data).is_valid(raise_exception=True)
+ def test_validate_for_restart__not_restartable(self):
+ job = fake_job(state=State.RUNNING)
+ with self.assertRaises(serializers.ValidationError):
+ PeerFacilitySyncJobValidator(instance=job).data
+
+ def test_validate_for_restart__missing_sync_session(self):
+ job = fake_job(state=State.FAILED, args=("sync",), kwargs={"test": True})
+ new_job_data = PeerFacilitySyncJobValidator(instance=job).data
+ self.assertEqual(new_job_data["args"], ("sync",))
+ self.assertEqual(new_job_data["kwargs"], {"test": True})
+
+ @patch("kolibri.core.auth.tasks.SyncSession.objects.get")
+ def test_validate_for_restart__inactive_sync_session(self, mock_get):
+ job = fake_job(
+ state=State.FAILED,
+ args=("sync",),
+ kwargs={"test": True},
+ extra_metadata={"sync_session_id": "abc123"},
+ )
+ mock_get.side_effect = SyncSession.DoesNotExist
+ new_job_data = PeerFacilitySyncJobValidator(instance=job).data
+ mock_get.assert_called_once_with(pk="abc123", active=True)
+ self.assertEqual(new_job_data["args"], ("sync",))
+ self.assertEqual(new_job_data["kwargs"], {"test": True})
+
+ @patch("kolibri.core.auth.tasks.SyncSession.objects.get")
+ def test_validate_for_restart__resume(self, mock_get):
+ job = fake_job(
+ state=State.FAILED,
+ args=("sync",),
+ kwargs={"test": True},
+ extra_metadata={"sync_session_id": "abc123"},
+ )
+ new_job_data = PeerFacilitySyncJobValidator(instance=job).data
+ mock_get.assert_called_once_with(pk="abc123", active=True)
+ self.assertEqual(new_job_data["args"], ("resumesync",))
+ self.assertEqual(
+ new_job_data["kwargs"], {"test": True, "sync_session_id": "abc123"}
+ )
+
class TestRequestSoUDSync(TestCase):
def setUp(self):
diff --git a/kolibri/core/tasks/api.py b/kolibri/core/tasks/api.py
index ea7bb27ca70..53c90720674 100644
--- a/kolibri/core/tasks/api.py
+++ b/kolibri/core/tasks/api.py
@@ -9,7 +9,6 @@
from six import string_types
from kolibri.core.tasks.exceptions import JobNotFound
-from kolibri.core.tasks.exceptions import JobNotRestartable
from kolibri.core.tasks.job import State
from kolibri.core.tasks.main import job_storage
from kolibri.core.tasks.registry import TaskRegistry
@@ -185,13 +184,15 @@ def restart(self, request, pk=None):
job_to_restart = self._get_job_for_pk(request, pk)
- try:
- restarted_job_id = job_storage.restart_job(job_id=job_to_restart.job_id)
- except JobNotRestartable:
- raise serializers.ValidationError(
- "Cannot restart job with state: {}".format(job_to_restart.state)
- )
+ registered_task = TaskRegistry[job_to_restart.func]
+ job = registered_task.validate_job_restart(request.user, job_to_restart)
+ # delete existing task after validation
+ job_storage.clear(job_id=job_to_restart.job_id, force=False)
+
+ restarted_job_id = job_storage.enqueue_job(
+ job, queue=registered_task.queue, priority=registered_task.priority
+ )
job_response = self._job_to_response(
job_storage.get_job(job_id=restarted_job_id)
)
diff --git a/kolibri/core/tasks/exceptions.py b/kolibri/core/tasks/exceptions.py
index 10bfbb168ab..8f23bd541f3 100644
--- a/kolibri/core/tasks/exceptions.py
+++ b/kolibri/core/tasks/exceptions.py
@@ -11,7 +11,3 @@ class UserCancelledError(CancelledError):
class JobNotFound(Exception):
pass
-
-
-class JobNotRestartable(Exception):
- pass
diff --git a/kolibri/core/tasks/job.py b/kolibri/core/tasks/job.py
index 882eaac407f..bcb4470c15b 100644
--- a/kolibri/core/tasks/job.py
+++ b/kolibri/core/tasks/job.py
@@ -182,18 +182,6 @@ def from_json(cls, json_string):
return Job(func, **working_dictionary)
- @classmethod
- def from_job(cls, job, **kwargs):
- if not isinstance(job, cls):
- raise TypeError("job must be an instance of {}".format(cls))
- kwargs["args"] = copy.copy(job.args)
- kwargs["kwargs"] = copy.copy(job.kwargs)
- kwargs["track_progress"] = job.track_progress
- kwargs["cancellable"] = job.cancellable
- kwargs["extra_metadata"] = job.extra_metadata.copy()
- kwargs["facility_id"] = job.facility_id
- return cls(job.func, **kwargs)
-
def __init__(
self,
func,
diff --git a/kolibri/core/tasks/registry.py b/kolibri/core/tasks/registry.py
index 8cb76fdbb57..2a45ae518cd 100644
--- a/kolibri/core/tasks/registry.py
+++ b/kolibri/core/tasks/registry.py
@@ -229,6 +229,24 @@ def validate_job_data(self, user, data):
return job
+ def validate_job_restart(self, user, job):
+ """
+ :type user: kolibri.core.auth.models.FacilityUser
+ :type job: kolibri.core.tasks.job.Job
+ :return: A new job object for restarting
+ :rtype: kolibri.core.tasks.job.Job
+ """
+ validator = self.validator(instance=job, context={"user": user})
+
+ try:
+ job = self._ready_job(**validator.data)
+ except TypeError:
+ raise serializers.ValidationError(
+ "Invalid job data returned from validator."
+ )
+
+ return job
+
def enqueue(self, job=None, **job_kwargs):
"""
Enqueue the function with arguments passed to this method.
diff --git a/kolibri/core/tasks/storage.py b/kolibri/core/tasks/storage.py
index 589f0a04ee4..3f3e0934ee9 100644
--- a/kolibri/core/tasks/storage.py
+++ b/kolibri/core/tasks/storage.py
@@ -16,7 +16,6 @@
from kolibri.core.tasks.constants import DEFAULT_QUEUE
from kolibri.core.tasks.exceptions import JobNotFound
-from kolibri.core.tasks.exceptions import JobNotRestartable
from kolibri.core.tasks.job import Job
from kolibri.core.tasks.job import Priority
from kolibri.core.tasks.job import State
@@ -241,33 +240,6 @@ def get_job(self, job_id):
job, _ = self._get_job_and_orm_job(job_id, session)
return job
- def restart_job(self, job_id):
- """
- First deletes the job with id = job_id then enqueues a new job with the same
- job_id as the one we deleted, with same args and kwargs.
-
- Returns the job_id of enqueued job.
-
- Raises `JobNotRestartable` exception if the job with id = job_id state is
- not in CANCELED or FAILED.
- """
- with self.session_scope() as session:
- job_to_restart, orm_job = self._get_job_and_orm_job(job_id, session)
- queue = orm_job.queue
- priority = orm_job.priority
-
- if job_to_restart.state in [State.CANCELED, State.FAILED]:
- self.clear(job_id=job_to_restart.job_id, force=False)
- job = Job.from_job(
- job_to_restart,
- job_id=job_to_restart.job_id,
- )
- return self.enqueue_job(job, queue=queue, priority=priority)
- else:
- raise JobNotRestartable(
- "Cannot restart job with state={}".format(job_to_restart.state)
- )
-
def check_job_canceled(self, job_id):
job = self.get_job(job_id)
return job.state == State.CANCELED or job.state == State.CANCELING
diff --git a/kolibri/core/tasks/test/taskrunner/test_storage.py b/kolibri/core/tasks/test/taskrunner/test_storage.py
index 0a2da1d3f91..416c0fc34c2 100644
--- a/kolibri/core/tasks/test/taskrunner/test_storage.py
+++ b/kolibri/core/tasks/test/taskrunner/test_storage.py
@@ -2,10 +2,8 @@
import time
import pytest
-from mock import patch
from kolibri.core.tasks.decorators import register_task
-from kolibri.core.tasks.exceptions import JobNotRestartable
from kolibri.core.tasks.job import Job
from kolibri.core.tasks.job import Priority
from kolibri.core.tasks.job import State
@@ -142,27 +140,3 @@ def test_gets_oldest_high_priority_job_first(self, defaultbackend, simplejob):
defaultbackend.enqueue_job(simplejob, QUEUE, Priority.HIGH)
assert defaultbackend.get_next_queued_job().job_id == job_id
-
- def test_restart_job(self, defaultbackend, simplejob):
- with patch("kolibri.core.tasks.main.job_storage", wraps=defaultbackend):
- job_id = defaultbackend.enqueue_job(simplejob, QUEUE)
-
- for state in [
- State.COMPLETED,
- State.RUNNING,
- State.QUEUED,
- State.SCHEDULED,
- State.CANCELING,
- ]:
- defaultbackend._update_job(job_id, state)
- with pytest.raises(JobNotRestartable):
- defaultbackend.restart_job(job_id)
-
- for state in [State.CANCELED, State.FAILED]:
- defaultbackend._update_job(job_id, state)
-
- restarted_job_id = defaultbackend.restart_job(job_id)
- restarted_job = defaultbackend.get_job(restarted_job_id)
-
- assert restarted_job_id == job_id
- assert restarted_job.state == State.QUEUED
diff --git a/kolibri/core/tasks/test/test_api.py b/kolibri/core/tasks/test/test_api.py
index 3d8d12b250b..4f8f961dfef 100644
--- a/kolibri/core/tasks/test/test_api.py
+++ b/kolibri/core/tasks/test/test_api.py
@@ -726,15 +726,33 @@ def test_retrieval_404(self, mock_job_storage):
def test_restart_task(self, mock_job_storage):
self.client.login(username=self.facility2user.username, password=DUMMY_PASSWORD)
- mock_job_storage.restart_job.return_value = self.jobs[2].job_id
+ self.jobs[2].state = State.FAILED
mock_job_storage.get_job.return_value = self.jobs[2]
+ def _clear(**kwargs):
+ self.jobs[2].state = State.QUEUED
+
+ mock_job_storage.clear.side_effect = _clear
+
response = self.client.post(
reverse("kolibri:core:task-restart", kwargs={"pk": "2"}), format="json"
)
self.assertEqual(response.data, self.jobs_response[2])
- mock_job_storage.restart_job.assert_called_once_with(job_id="2")
+ mock_job_storage.clear.assert_called_once_with(job_id="2", force=False)
+
+ def test_restart_task__not_restartable(self, mock_job_storage):
+ self.client.login(username=self.facility2user.username, password=DUMMY_PASSWORD)
+
+ mock_job_storage.get_job.return_value = self.jobs[2]
+
+ response = self.client.post(
+ reverse("kolibri:core:task-restart", kwargs={"pk": "2"}), format="json"
+ )
+
+ self.assertEqual(response.status_code, 400)
+ self.assertEqual(str(response.data[0]), "Cannot restart job with state=QUEUED")
+ mock_job_storage.clear.assert_not_called()
def test_restart_task_respect_permissions(self, mock_job_storage):
self.client.login(username=self.facility2user.username, password=DUMMY_PASSWORD)
diff --git a/kolibri/core/tasks/test/test_job.py b/kolibri/core/tasks/test/test_job.py
index fceeed5e315..e1147947f79 100644
--- a/kolibri/core/tasks/test/test_job.py
+++ b/kolibri/core/tasks/test/test_job.py
@@ -53,10 +53,15 @@ def test_job_save_as_cancellable__no_storage(self):
self.job.save_as_cancellable(cancellable=cancellable)
+class TestingJobValidator(JobValidator):
+ pass
+
+
class TestRegisteredTask(TestCase):
def setUp(self):
self.registered_task = RegisteredTask(
int,
+ validator=TestingJobValidator,
priority=Priority.HIGH,
queue="test",
permission_classes=[IsSuperAdmin],
@@ -67,7 +72,7 @@ def setUp(self):
def test_constructor_sets_required_params(self):
self.assertEqual(self.registered_task.func, int)
- self.assertEqual(self.registered_task.validator, JobValidator)
+ self.assertEqual(self.registered_task.validator, TestingJobValidator)
self.assertEqual(self.registered_task.priority, Priority.HIGH)
self.assertTrue(isinstance(self.registered_task.permissions[0], IsSuperAdmin))
self.assertEqual(self.registered_task.job_id, "test")
@@ -153,3 +158,20 @@ def test_enqueue(self, job_storage_mock, _ready_job_mock):
queue=self.registered_task.queue,
priority=self.registered_task.priority,
)
+
+ @mock.patch("kolibri.core.tasks.registry.RegisteredTask._ready_job")
+ def test_validate_job_restart(self, _ready_job_mock):
+ mock_user = mock.MagicMock(spec="kolibri.core.auth.models.FacilityUser")
+ mock_job = mock.MagicMock(spec="kolibri.core.tasks.registry.Job")
+
+ _ready_job_mock.return_value = "job"
+
+ with mock.patch.object(
+ TestingJobValidator, "validate_for_restart"
+ ) as mock_validate_for_restart:
+ mock_validate_for_restart.return_value = {"test": True}
+ result = self.registered_task.validate_job_restart(mock_user, mock_job)
+ mock_validate_for_restart.assert_called_once_with(mock_job)
+
+ self.assertEqual(result, "job")
+ _ready_job_mock.assert_called_once_with(test=True)
diff --git a/kolibri/core/tasks/test/test_validation.py b/kolibri/core/tasks/test/test_validation.py
new file mode 100644
index 00000000000..a5244ca9247
--- /dev/null
+++ b/kolibri/core/tasks/test/test_validation.py
@@ -0,0 +1,48 @@
+from django.test import SimpleTestCase
+from rest_framework import serializers
+
+from kolibri.core.tasks.job import Job
+from kolibri.core.tasks.job import State
+from kolibri.core.tasks.validation import JobValidator
+
+
+class JobValidatorTestCase(SimpleTestCase):
+ def setUp(self):
+ def add(x, y):
+ return x + y
+
+ self.job = Job(
+ add,
+ job_id="123",
+ state=State.PENDING,
+ args=("test",),
+ kwargs={"test": True},
+ track_progress=True,
+ cancellable=False,
+ extra_metadata={"extra": True},
+ )
+
+ def test_validate_for_restart(self):
+ for state in [State.CANCELED, State.FAILED]:
+ self.job.state = state
+ validator = JobValidator(instance=self.job)
+ self.assertEqual(
+ validator.data,
+ dict(
+ job_id="123",
+ args=("test",),
+ kwargs={"test": True},
+ track_progress=True,
+ cancellable=False,
+ extra_metadata={"extra": True},
+ facility_id=None,
+ ),
+ )
+
+ def test_validate_for_restart__not_restartable(self):
+ for state in [State.QUEUED, State.COMPLETED, State.SCHEDULED, State.RUNNING]:
+ self.job.state = state
+ validator = JobValidator(instance=self.job)
+
+ with self.assertRaises(serializers.ValidationError):
+ self.assertFalse(validator.data)
diff --git a/kolibri/core/tasks/validation.py b/kolibri/core/tasks/validation.py
index af9186c72e5..a99046019dc 100644
--- a/kolibri/core/tasks/validation.py
+++ b/kolibri/core/tasks/validation.py
@@ -1,5 +1,9 @@
+import copy
+
from rest_framework import serializers
+from kolibri.core.tasks.job import State
+
class JobValidator(serializers.Serializer):
"""
@@ -19,6 +23,31 @@ def validate(self, data):
"extra_metadata": {},
}
+ def validate_for_restart(self, job):
+ """
+ :param job: The job for which to restart
+ :type job: kolibri.core.tasks.job.Job
+ :return: A dictionary of data for instantiating a new job
+ """
+ if job.state not in [State.CANCELED, State.FAILED]:
+ raise serializers.ValidationError(
+ "Cannot restart job with state={}".format(job.state)
+ )
+
+ return {
+ # default behavior is to retain the same job ID, so the existing job requires deletion
+ "job_id": job.job_id,
+ "args": copy.copy(job.args),
+ "kwargs": copy.copy(job.kwargs),
+ "track_progress": job.track_progress,
+ "cancellable": job.cancellable,
+ "extra_metadata": job.extra_metadata.copy(),
+ "facility_id": job.facility_id,
+ }
+
+ def to_representation(self, instance):
+ return self.validate_for_restart(instance or self.instance)
+
def run_validation(self, data):
value = super(JobValidator, self).run_validation(data)
if not isinstance(value, dict):
diff --git a/kolibri/plugins/device/assets/src/constants.js b/kolibri/plugins/device/assets/src/constants.js
index b9d57a09f3a..d67d7d41f57 100644
--- a/kolibri/plugins/device/assets/src/constants.js
+++ b/kolibri/plugins/device/assets/src/constants.js
@@ -55,6 +55,21 @@ export const TaskStatuses = Object.freeze({
CANCELING: 'CANCELING',
});
+export const SyncTaskStatuses = {
+ SESSION_CREATION: 'SESSION_CREATION',
+ REMOTE_QUEUING: 'REMOTE_QUEUING',
+ PULLING: 'PULLING',
+ LOCAL_DEQUEUING: 'LOCAL_DEQUEUING',
+ LOCAL_QUEUING: 'LOCAL_QUEUING',
+ PUSHING: 'PUSHING',
+ REMOTE_DEQUEUING: 'REMOTE_DEQUEUING',
+ REMOVING_FACILITY: 'REMOVING_FACILITY',
+ PENDING: 'PENDING',
+ COMPLETED: 'COMPLETED',
+ CANCELLED: 'CANCELLED',
+ FAILED: 'FAILED',
+};
+
export const TransferTypes = {
LOCALEXPORT: 'localexport',
LOCALIMPORT: 'localimport',
diff --git a/kolibri/plugins/device/assets/src/views/FacilitiesPage/FacilityTaskPanel.vue b/kolibri/plugins/device/assets/src/views/FacilitiesPage/FacilityTaskPanel.vue
index 500c42f5726..5bdabdd2592 100644
--- a/kolibri/plugins/device/assets/src/views/FacilitiesPage/FacilityTaskPanel.vue
+++ b/kolibri/plugins/device/assets/src/views/FacilitiesPage/FacilityTaskPanel.vue
@@ -1,13 +1,13 @@