diff --git a/doc/source/user_guide/ProgressReporter.ipynb b/doc/source/user_guide/ProgressReporter.ipynb new file mode 100644 index 00000000..f399459f --- /dev/null +++ b/doc/source/user_guide/ProgressReporter.ipynb @@ -0,0 +1,97 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Progress reporting for long running tasks" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "from oda_api.api import ProgressReporter\n", + "import time" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Initialise ProgressReporter" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "pr = ProgressReporter()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Split your task into subtasks to enable progress report\n", + "use ProgressReporter.report_progress method to send progress reports" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "n_steps = 5\n", + "for step in range(n_steps):\n", + " stage = f\"simulation stage {step}\"\n", + " progress = 100 * step // n_steps\n", + " n_substeps = 10\n", + " \n", + " # optionally define subtasks\n", + " for substep in range(n_substeps):\n", + " substage = f\"subtask {substep}\"\n", + " subtask_progress = 100 * substep // n_substeps\n", + "\n", + " time.sleep(0.001) # replace this by actual calculation\n", + "\n", + " # report progress, optionally adding extra message\n", + " message='some message'\n", + " \n", + " pr.report_progress(stage=stage, progress=progress, substage=substage, subprogress=subtask_progress, message=message)" + ] + } + ], + "metadata": { + "kernel_info": { + "name": "python2" + }, + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.18" + }, + "nteract": { + "version": "0.15.0" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/doc/source/user_guide/tutorial_main.rst b/doc/source/user_guide/tutorial_main.rst index 514b5dbd..841de97f 100644 --- a/doc/source/user_guide/tutorial_main.rst +++ b/doc/source/user_guide/tutorial_main.rst @@ -25,6 +25,8 @@ The following tutorial covers a large fraction of the code features. The example Upload a product to the Product Gallery + Progress reporting for long running tasks + Examples of workflows diff --git a/oda_api/api.py b/oda_api/api.py index f5b5d6d6..4dc2dc2c 100644 --- a/oda_api/api.py +++ b/oda_api/api.py @@ -36,6 +36,7 @@ import requests import ast import json +import re try: # compatibility in some remaining environments @@ -1339,3 +1340,49 @@ def from_response_json(cls, res_json, instrument, product): p.meta_data = p.meta return d + +class ProgressReporter(object): + """ + The class allows to report task progress to end user + """ + def __init__(self): + self._callback = None + callback_file = ".oda_api_callback" # perhaps it would be better to define this constant in a common lib + if not os.path.isfile(callback_file): + return + with open(callback_file, 'r') as file: + self._callback = file.read().strip() + + @property + def enabled(self): + return self._callback is not None + + def report_progress(self, stage: str=None, progress: int=50, substage: str=None, subprogress: int=None, message:str=None): + """ + Report progress via callback URL + :param stage: current stage description string + :param progress: current stage progress in % + :param substage: current substage description string + :param subprogress: current substage progress in % + :param message: message to pass + """ + callback_payload = dict(stage=stage, progress=progress, substage=substage, subprogress=subprogress, message=message) + callback_payload = {k: v for k, v in callback_payload.items() if v is not None} + callback_payload['action'] = 'progress' + if not self.enabled: + logger.info('no callback registered, skipping') + return + + logger.info('will perform callback: %s', self._callback) + + if re.match('^file://', self._callback): + with open(self._callback.replace('file://', ''), "w") as f: + json.dump(callback_payload, f) + logger.info('stored callback in a file %s', self._callback) + + elif re.match('^https?://', self._callback): + r = requests.get(self._callback, params=callback_payload) + logger.info('callback %s returns %s : %s', self._callback, r, r.text) + + else: + raise NotImplementedError diff --git a/tests/test_progress_report.py b/tests/test_progress_report.py new file mode 100644 index 00000000..4c3079dd --- /dev/null +++ b/tests/test_progress_report.py @@ -0,0 +1,43 @@ +from oda_api.api import ProgressReporter +import os +import json + +callback_file = ".oda_api_callback" +request_params = dict(stage='simulation', progress=50, substage='spectra', subprogress=30, message='some message') + +def test_progress_reporter_disabled(): + if os.path.isfile(callback_file): + os.remove(callback_file) + # if callback is not available + pr = ProgressReporter() + assert not pr.enabled + # the call below should not produce exception + try: + pr.report_progress(**request_params) + except: + assert False, 'report_progress raises exception in case of disabled ProgressReporter' + +def test_progress_reporter_enabled(): + # if callback is available + try: + dump_file = 'callback' + with open(callback_file, 'w') as file: + print(f'file://{os.getcwd()}/{dump_file}', file=file) + + pr = ProgressReporter() + assert pr.enabled + + pr.report_progress(**request_params) + + # verify that params passed to report_progress were saved to dump_file + with open(dump_file) as json_file: + saved_params = json.load(json_file) + # append extra param befor check + request_params['action'] = 'progress' # this key is added by report_progress + assert saved_params == request_params + finally: + if os.path.isfile(callback_file): + os.remove(callback_file) + if os.path.isfile(dump_file): + os.remove(dump_file) +