Skip to content

Commit

Permalink
Wording improvements for the TaskScheduler (#17992)
Browse files Browse the repository at this point in the history
As I found the current docstrings a bit unclear while trying to wrap my
head around this class.
  • Loading branch information
anoadragon453 authored Dec 18, 2024
1 parent f1b0f9a commit 09f377f
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 37 deletions.
1 change: 1 addition & 0 deletions changelog.d/17992.doc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve documentation for the `TaskScheduler` class.
90 changes: 53 additions & 37 deletions synapse/util/task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,33 +46,43 @@

class TaskScheduler:
"""
This is a simple task sheduler aimed at resumable tasks: usually we use `run_in_background`
to launch a background task, or Twisted `deferLater` if we want to do so later on.
The problem with that is that the tasks will just stop and never be resumed if synapse
is stopped for whatever reason.
How this works:
- A function mapped to a named action should first be registered with `register_action`.
This function will be called when trying to resuming tasks after a synapse shutdown,
so this registration should happen when synapse is initialised, NOT right before scheduling
a task.
- A task can then be launched using this named action with `schedule_task`. A `params` dict
can be passed, and it will be available to the registered function when launched. This task
can be launch either now-ish, or later on by giving a `timestamp` parameter.
The function may call `update_task` at any time to update the `result` of the task,
and this can be used to resume the task at a specific point and/or to convey a result to
the code launching the task.
You can also specify the `result` (and/or an `error`) when returning from the function.
The reconciliation loop runs every minute, so this is not a precise scheduler.
There is a limit of 10 concurrent tasks, so tasks may be delayed if the pool is already
full. In this regard, please take great care that scheduled tasks can actually finished.
For now there is no mechanism to stop a running task if it is stuck.
Tasks will be run on the worker specified with `run_background_tasks_on` config,
or the main one by default.
This is a simple task scheduler designed for resumable tasks. Normally,
you'd use `run_in_background` to start a background task or Twisted's
`deferLater` if you want to run it later.
The issue is that these tasks stop completely and won't resume if Synapse is
shut down for any reason.
Here's how it works:
- Register an Action: First, you need to register a function to a named
action using `register_action`. This function will be called to resume tasks
after a Synapse shutdown. Make sure to register it when Synapse initializes,
not right before scheduling the task.
- Schedule a Task: You can launch a task linked to the named action
using `schedule_task`. You can pass a `params` dictionary, which will be
passed to the registered function when it's executed. Tasks can be scheduled
to run either immediately or later by specifying a `timestamp`.
- Update Task: The function handling the task can call `update_task` at
any point to update the task's `result`. This lets you resume the task from
a specific point or pass results back to the code that scheduled it. When
the function completes, you can also return a `result` or an `error`.
Things to keep in mind:
- The reconciliation loop runs every minute, so this is not a high-precision
scheduler.
- Only 10 tasks can run at the same time. If the pool is full, tasks may be
delayed. Make sure your scheduled tasks can actually finish.
- Currently, there's no way to stop a task if it gets stuck.
- Tasks will run on the worker defined by the `run_background_tasks_on`
setting in your configuration. If no worker is specified, they'll run on
the main one by default.
"""

# Precision of the scheduler, evaluation of tasks to run will only happen
Expand Down Expand Up @@ -157,7 +167,7 @@ async def schedule_task(
params: Optional[JsonMapping] = None,
) -> str:
"""Schedule a new potentially resumable task. A function matching the specified
`action` should have be registered with `register_action` before the task is run.
`action` should've been registered with `register_action` before the task is run.
Args:
action: the name of a previously registered action
Expand Down Expand Up @@ -210,22 +220,28 @@ async def update_task(
result: Optional[JsonMapping] = None,
error: Optional[str] = None,
) -> bool:
"""Update some task associated values. This is exposed publicly so it can
be used inside task functions, mainly to update the result and be able to
resume a task at a specific step after a restart of synapse.
"""Update some task-associated values. This is exposed publicly so it can
be used inside task functions, mainly to update the result or resume
a task at a specific step after a restart of synapse.
It can also be used to stage a task, by setting the `status` to `SCHEDULED` with
a new timestamp.
The `status` can only be set to `ACTIVE` or `SCHEDULED`, `COMPLETE` and `FAILED`
are terminal status and can only be set by returning it in the function.
The `status` can only be set to `ACTIVE` or `SCHEDULED`. `COMPLETE` and `FAILED`
are terminal statuses and can only be set by returning them from the function.
Args:
id: the id of the task to update
timestamp: useful to schedule a new stage of the task at a later date
status: the new `TaskStatus` of the task
result: the new result of the task
error: the new error of the task
Returns:
True if the update was successful, False otherwise.
Raises:
Exception: If a status other than `ACTIVE` or `SCHEDULED` was passed.
"""
if status == TaskStatus.COMPLETE or status == TaskStatus.FAILED:
raise Exception(
Expand Down Expand Up @@ -263,9 +279,9 @@ async def get_tasks(
max_timestamp: Optional[int] = None,
limit: Optional[int] = None,
) -> List[ScheduledTask]:
"""Get a list of tasks. Returns all the tasks if no args is provided.
"""Get a list of tasks. Returns all the tasks if no args are provided.
If an arg is `None` all tasks matching the other args will be selected.
If an arg is `None`, all tasks matching the other args will be selected.
If an arg is an empty list, the corresponding value of the task needs
to be `None` to be selected.
Expand All @@ -277,8 +293,8 @@ async def get_tasks(
a timestamp inferior to the specified one
limit: Only return `limit` number of rows if set.
Returns
A list of `ScheduledTask`, ordered by increasing timestamps
Returns:
A list of `ScheduledTask`, ordered by increasing timestamps.
"""
return await self._store.get_scheduled_tasks(
actions=actions,
Expand Down

0 comments on commit 09f377f

Please sign in to comment.