Skip to content

Commit

Permalink
217 add functionality to send callback messages (#225)
Browse files Browse the repository at this point in the history
* ProgressReporter class added

* ProgressReporter.report_progress signature was changed

* help string updated for report_progress

* test_progress_report was added

* report_progress refactored, extra test added

* Update oda_api/api.py

Co-authored-by: Denys Savchenko <[email protected]>

* tests updated to address reviewer's comments

* ProgressReporter documentation was added

* bug fixing tests/test_progress_report.py

---------

Co-authored-by: Denys Savchenko <[email protected]>
  • Loading branch information
okolo and dsavchenko authored Jan 10, 2024
1 parent f8ce41a commit 961a316
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 0 deletions.
97 changes: 97 additions & 0 deletions doc/source/user_guide/ProgressReporter.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Progress reporting for long running tasks"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from oda_api.api import ProgressReporter\n",
"import time"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Initialise ProgressReporter"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"pr = ProgressReporter()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Split your task into subtasks to enable progress report\n",
"use ProgressReporter.report_progress method to send progress reports"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"n_steps = 5\n",
"for step in range(n_steps):\n",
" stage = f\"simulation stage {step}\"\n",
" progress = 100 * step // n_steps\n",
" n_substeps = 10\n",
" \n",
" # optionally define subtasks\n",
" for substep in range(n_substeps):\n",
" substage = f\"subtask {substep}\"\n",
" subtask_progress = 100 * substep // n_substeps\n",
"\n",
" time.sleep(0.001) # replace this by actual calculation\n",
"\n",
" # report progress, optionally adding extra message\n",
" message='some message'\n",
" \n",
" pr.report_progress(stage=stage, progress=progress, substage=substage, subprogress=subtask_progress, message=message)"
]
}
],
"metadata": {
"kernel_info": {
"name": "python2"
},
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.18"
},
"nteract": {
"version": "0.15.0"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
2 changes: 2 additions & 0 deletions doc/source/user_guide/tutorial_main.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ The following tutorial covers a large fraction of the code features. The example

Upload a product to the Product Gallery <UploadToGallery.ipynb>

Progress reporting for long running tasks <ProgressReporter.ipynb>

Examples of workflows <https://github.com/cdcihub/oda_api_benchmark>


Expand Down
47 changes: 47 additions & 0 deletions oda_api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import requests
import ast
import json
import re

try:
# compatibility in some remaining environments
Expand Down Expand Up @@ -1339,3 +1340,49 @@ def from_response_json(cls, res_json, instrument, product):
p.meta_data = p.meta

return d

class ProgressReporter(object):
"""
The class allows to report task progress to end user
"""
def __init__(self):
self._callback = None
callback_file = ".oda_api_callback" # perhaps it would be better to define this constant in a common lib
if not os.path.isfile(callback_file):
return
with open(callback_file, 'r') as file:
self._callback = file.read().strip()

@property
def enabled(self):
return self._callback is not None

def report_progress(self, stage: str=None, progress: int=50, substage: str=None, subprogress: int=None, message:str=None):
"""
Report progress via callback URL
:param stage: current stage description string
:param progress: current stage progress in %
:param substage: current substage description string
:param subprogress: current substage progress in %
:param message: message to pass
"""
callback_payload = dict(stage=stage, progress=progress, substage=substage, subprogress=subprogress, message=message)
callback_payload = {k: v for k, v in callback_payload.items() if v is not None}
callback_payload['action'] = 'progress'
if not self.enabled:
logger.info('no callback registered, skipping')
return

logger.info('will perform callback: %s', self._callback)

if re.match('^file://', self._callback):
with open(self._callback.replace('file://', ''), "w") as f:
json.dump(callback_payload, f)
logger.info('stored callback in a file %s', self._callback)

elif re.match('^https?://', self._callback):
r = requests.get(self._callback, params=callback_payload)
logger.info('callback %s returns %s : %s', self._callback, r, r.text)

else:
raise NotImplementedError
43 changes: 43 additions & 0 deletions tests/test_progress_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from oda_api.api import ProgressReporter
import os
import json

callback_file = ".oda_api_callback"
request_params = dict(stage='simulation', progress=50, substage='spectra', subprogress=30, message='some message')

def test_progress_reporter_disabled():
if os.path.isfile(callback_file):
os.remove(callback_file)
# if callback is not available
pr = ProgressReporter()
assert not pr.enabled
# the call below should not produce exception
try:
pr.report_progress(**request_params)
except:
assert False, 'report_progress raises exception in case of disabled ProgressReporter'

def test_progress_reporter_enabled():
# if callback is available
try:
dump_file = 'callback'
with open(callback_file, 'w') as file:
print(f'file://{os.getcwd()}/{dump_file}', file=file)

pr = ProgressReporter()
assert pr.enabled

pr.report_progress(**request_params)

# verify that params passed to report_progress were saved to dump_file
with open(dump_file) as json_file:
saved_params = json.load(json_file)
# append extra param befor check
request_params['action'] = 'progress' # this key is added by report_progress
assert saved_params == request_params
finally:
if os.path.isfile(callback_file):
os.remove(callback_file)
if os.path.isfile(dump_file):
os.remove(dump_file)

0 comments on commit 961a316

Please sign in to comment.