Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

217 add functionality to send callback messages #225

Merged
merged 11 commits into from
Jan 10, 2024
97 changes: 97 additions & 0 deletions doc/source/user_guide/ProgressReporter.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Progress reporting for long running tasks"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from oda_api.api import ProgressReporter\n",
"import time"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Initialise ProgressReporter"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"pr = ProgressReporter()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Split your task into subtasks to enable progress report\n",
"use ProgressReporter.report_progress method to send progress reports"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"n_steps = 5\n",
"for step in range(n_steps):\n",
" stage = f\"simulation stage {step}\"\n",
" progress = 100 * step // n_steps\n",
" n_substeps = 10\n",
" \n",
" # optionally define subtasks\n",
" for substep in range(n_substeps):\n",
" substage = f\"subtask {substep}\"\n",
" subtask_progress = 100 * substep // n_substeps\n",
"\n",
" time.sleep(0.001) # replace this by actual calculation\n",
"\n",
" # report progress, optionally adding extra message\n",
" message='some message'\n",
" \n",
" pr.report_progress(stage=stage, progress=progress, substage=substage, subprogress=subtask_progress, message=message)"
]
}
],
"metadata": {
"kernel_info": {
"name": "python2"
},
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.18"
},
"nteract": {
"version": "0.15.0"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
2 changes: 2 additions & 0 deletions doc/source/user_guide/tutorial_main.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ The following tutorial covers a large fraction of the code features. The example

Upload a product to the Product Gallery <UploadToGallery.ipynb>

Progress reporting for long running tasks <ProgressReporter.ipynb>

Examples of workflows <https://github.com/cdcihub/oda_api_benchmark>


Expand Down
47 changes: 47 additions & 0 deletions oda_api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import requests
import ast
import json
import re

try:
# compatibility in some remaining environments
Expand Down Expand Up @@ -1339,3 +1340,49 @@
p.meta_data = p.meta

return d

class ProgressReporter(object):
"""
The class allows to report task progress to end user
"""
def __init__(self):
self._callback = None
callback_file = ".oda_api_callback" # perhaps it would be better to define this constant in a common lib
volodymyrss marked this conversation as resolved.
Show resolved Hide resolved
if not os.path.isfile(callback_file):
return
with open(callback_file, 'r') as file:
self._callback = file.read().strip()

Check warning on line 1354 in oda_api/api.py

View check run for this annotation

Codecov / codecov/patch

oda_api/api.py#L1349-L1354

Added lines #L1349 - L1354 were not covered by tests

@property
def enabled(self):
return self._callback is not None

Check warning on line 1358 in oda_api/api.py

View check run for this annotation

Codecov / codecov/patch

oda_api/api.py#L1358

Added line #L1358 was not covered by tests

def report_progress(self, stage: str=None, progress: int=50, substage: str=None, subprogress: int=None, message:str=None):
"""
Report progress via callback URL
:param stage: current stage description string
:param progress: current stage progress in %
:param substage: current substage description string
:param subprogress: current substage progress in %
:param message: message to pass
"""
callback_payload = dict(stage=stage, progress=progress, substage=substage, subprogress=subprogress, message=message)
callback_payload = {k: v for k, v in callback_payload.items() if v is not None}
callback_payload['action'] = 'progress'
if not self.enabled:
logger.info('no callback registered, skipping')
return

Check warning on line 1374 in oda_api/api.py

View check run for this annotation

Codecov / codecov/patch

oda_api/api.py#L1369-L1374

Added lines #L1369 - L1374 were not covered by tests

logger.info('will perform callback: %s', self._callback)

Check warning on line 1376 in oda_api/api.py

View check run for this annotation

Codecov / codecov/patch

oda_api/api.py#L1376

Added line #L1376 was not covered by tests

if re.match('^file://', self._callback):
with open(self._callback.replace('file://', ''), "w") as f:
dsavchenko marked this conversation as resolved.
Show resolved Hide resolved
json.dump(callback_payload, f)
logger.info('stored callback in a file %s', self._callback)

Check warning on line 1381 in oda_api/api.py

View check run for this annotation

Codecov / codecov/patch

oda_api/api.py#L1378-L1381

Added lines #L1378 - L1381 were not covered by tests

elif re.match('^https?://', self._callback):
r = requests.get(self._callback, params=callback_payload)
volodymyrss marked this conversation as resolved.
Show resolved Hide resolved
logger.info('callback %s returns %s : %s', self._callback, r, r.text)

Check warning on line 1385 in oda_api/api.py

View check run for this annotation

Codecov / codecov/patch

oda_api/api.py#L1383-L1385

Added lines #L1383 - L1385 were not covered by tests

else:
raise NotImplementedError

Check warning on line 1388 in oda_api/api.py

View check run for this annotation

Codecov / codecov/patch

oda_api/api.py#L1388

Added line #L1388 was not covered by tests
41 changes: 41 additions & 0 deletions tests/test_progress_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from oda_api.api import ProgressReporter
import os
import json

callback_file = ".oda_api_callback"
request_params = dict(stage='simulation', progress=50, substage='spectra', subprogress=30, message='some message')

def test_progress_reporter_disabled():
if os.path.isfile(callback_file):
os.remove(callback_file)

Check warning on line 10 in tests/test_progress_report.py

View check run for this annotation

Codecov / codecov/patch

tests/test_progress_report.py#L9-L10

Added lines #L9 - L10 were not covered by tests
# if callback is not available
pr = ProgressReporter()
assert not pr.enabled

Check warning on line 13 in tests/test_progress_report.py

View check run for this annotation

Codecov / codecov/patch

tests/test_progress_report.py#L12-L13

Added lines #L12 - L13 were not covered by tests
# the call below should not produce exception
try:
pr.report_progress(**request_params)
except:
assert False, 'report_progress raises exception in case of disabled ProgressReporter'

Check warning on line 18 in tests/test_progress_report.py

View check run for this annotation

Codecov / codecov/patch

tests/test_progress_report.py#L15-L18

Added lines #L15 - L18 were not covered by tests

def test_progress_reporter_enabled():
# if callback is available
try:
dump_file = 'callback'
with open(callback_file, 'w') as file:
print(f'file://{os.getcwd()}/{dump_file}', file=file)

Check warning on line 25 in tests/test_progress_report.py

View check run for this annotation

Codecov / codecov/patch

tests/test_progress_report.py#L22-L25

Added lines #L22 - L25 were not covered by tests

pr = ProgressReporter()
assert pr.enabled

Check warning on line 28 in tests/test_progress_report.py

View check run for this annotation

Codecov / codecov/patch

tests/test_progress_report.py#L27-L28

Added lines #L27 - L28 were not covered by tests

pr.report_progress(**request_params)

Check warning on line 30 in tests/test_progress_report.py

View check run for this annotation

Codecov / codecov/patch

tests/test_progress_report.py#L30

Added line #L30 was not covered by tests

# verify that params passed to report_progress were saved to dump_file
with open(dump_file) as json_file:
saved_params = json.load(json_file)
assert saved_params == request_params

Check warning on line 35 in tests/test_progress_report.py

View check run for this annotation

Codecov / codecov/patch

tests/test_progress_report.py#L33-L35

Added lines #L33 - L35 were not covered by tests
finally:
if os.path.isfile(callback_file):
os.remove(callback_file)
if os.path.isfile(dump_file):
os.remove(dump_file)

Check warning on line 40 in tests/test_progress_report.py

View check run for this annotation

Codecov / codecov/patch

tests/test_progress_report.py#L37-L40

Added lines #L37 - L40 were not covered by tests

Loading