Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ NEW: Add ProcessLauncher.process_cache #213

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions plumpy/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,7 @@ class PersistenceError(Exception):

class ClosedError(Exception):
"""Raised when an mutable operation is attempted on a closed process"""


class DuplicateProcess(Exception):
"""Raised when an ProcessLauncher is asked to launch a process it is already running."""
42 changes: 38 additions & 4 deletions plumpy/process_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import copy
import logging
from typing import Any, cast, Dict, Optional, Sequence, TYPE_CHECKING, Union
from weakref import WeakValueDictionary

import kiwipy

from . import loaders
from . import exceptions
from . import communications
from . import futures
from . import loaders
from . import persistence
from .utils import PID_TYPE

Expand All @@ -27,6 +29,7 @@

if TYPE_CHECKING:
from .processes import Process # pylint: disable=cyclic-import
ProcessCacheType = WeakValueDictionary[PID_TYPE, Process] # pylint: disable=unsubscriptable-object

ProcessResult = Any
ProcessStatus = Any
Expand Down Expand Up @@ -527,6 +530,20 @@ def __init__(
else:
self._loader = loaders.get_object_loader()

# using a weak reference ensures the processes can be garbage cleaned on completion
self._process_cache: 'ProcessCacheType' = WeakValueDictionary()

@property
def process_cache(self) -> 'ProcessCacheType':
"""Return a dictionary mapping PIDs to launched processes that are still in memory.

The mapping uses a `WeakValueDictionary`, meaning that processes can be removed,
once they are no longer referenced anywhere else.
This means the dictionary will always contain all processes still running,
but potentially also processes that have terminated but have not yet been garbage collected.
"""
return copy.copy(self._process_cache)

async def __call__(self, communicator: kiwipy.Communicator, task: Dict[str, Any]) -> Union[PID_TYPE, ProcessResult]:
"""
Receive a task.
Expand Down Expand Up @@ -571,15 +588,24 @@ async def _launch(
init_kwargs = {}

proc_class = self._loader.load_object(process_class)
proc = proc_class(*init_args, **init_kwargs)
proc: Process = proc_class(*init_args, **init_kwargs)

if proc.pid in self._process_cache and not self._process_cache[proc.pid].has_terminated():
raise exceptions.DuplicateProcess(f'Process<{proc.pid}> is already running')
Comment on lines +591 to +594
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we actually use the "launch" part of the communicator but exclusively the "continue" so I don't think this will matter for our use-case, but isn't it a bit weird to be able to hit a duplicate process when launching it? When you launch it, it is the first time you are creating it and so the pid shouldn't already running as it can when continuing an existing process. I can see how there can still be a clash in generated process ids, but I think that the exception type or at the very least the message should be different from the one in _continue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeh as you mention, we don't use it, so this is only for completeness. I would say though, whether you continue or launch, if there are two processes with the same PID they are duplicates, so I disagree that the message should be any difference.
(if you launch twice with the same PID, this is no different to continuing twice)

Copy link
Collaborator

@sphuber sphuber Mar 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite sure I agree. I agree that the result of having a duplicate PID is the same, however, the origin would be very different and I think that is important to reflect. When you launch a new PID is created and so it should be unique. If that is not the case, then the ID generating algorithm is fundamentally broken, which is completely different from the case in continue where can simply have requested to continue the same process twice.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to have the lat word lol

however, the origin would be very different and I think that is important to reflect

but then you could see the origin in the traceback

When you launch a new PID is created

this would not be the case if you specifically set the pid in init_args or init_kwargs


if persist and self._persister is not None:
self._persister.save_checkpoint(proc)

self._process_cache[proc.pid] = proc

if nowait:
asyncio.ensure_future(proc.step_until_terminated())
return proc.pid

await proc.step_until_terminated()
try:
await proc.step_until_terminated()
finally:
self._process_cache.pop(proc.pid, None)

return proc.future().result()

Expand All @@ -602,15 +628,23 @@ async def _continue(
LOGGER.warning('rejecting task: cannot continue process<%d> because no persister is available', pid)
raise communications.TaskRejected('Cannot continue process, no persister')

if pid in self._process_cache and not self._process_cache[pid].has_terminated():
raise exceptions.DuplicateProcess(f'Process<{pid}> is already running')

# Do not catch exceptions here, because if these operations fail, the continue task should except and bubble up
saved_state = self._persister.load_checkpoint(pid, tag)
proc = cast('Process', saved_state.unbundle(self._load_context))

self._process_cache[proc.pid] = proc

if nowait:
asyncio.ensure_future(proc.step_until_terminated())
return proc.pid

await proc.step_until_terminated()
try:
await proc.step_until_terminated()
finally:
self._process_cache.pop(proc.pid, None)

return proc.future().result()

Expand Down
14 changes: 13 additions & 1 deletion test/rmq/test_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import asyncio
import shortuuid

from kiwipy import BroadcastFilter, RemoteException, rmq
import pytest
from kiwipy import BroadcastFilter, rmq

import plumpy
from plumpy import communications, process_comms
Expand Down Expand Up @@ -204,3 +204,15 @@ async def test_continue(self, loop_communicator, async_controller, persister):
# Let the process run to the end
result = await async_controller.continue_process(pid)
assert result, utils.DummyProcessWithOutput.EXPECTED_OUTPUTS

@pytest.mark.asyncio
async def test_duplicate_process(self, loop_communicator, async_controller, persister):
loop = asyncio.get_event_loop()
launcher = plumpy.ProcessLauncher(loop, persister=persister)
loop_communicator.add_task_subscriber(launcher)
process = utils.DummyProcessWithOutput()
persister.save_checkpoint(process)
launcher._process_cache[process.pid] = process
assert process.pid in launcher.process_cache
with pytest.raises(RemoteException, match='already running'):
await async_controller.continue_process(process.pid)