Skip to content

Commit

Permalink
use "spawn" in multiprocessing
Browse files Browse the repository at this point in the history
  • Loading branch information
TaiSakuma committed Jun 22, 2022
1 parent 002b75c commit 308b3f7
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 16 deletions.
10 changes: 4 additions & 6 deletions nextline/registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,14 @@


class Registrar:
def __init__(self, registry: MutableMapping):
def __init__(
self, registry: MutableMapping, queue: Queue[Tuple[str, Any, bool]]
):
self._registry = registry
self._queue: Queue[Tuple[str, Any, bool]] = Queue()
self._queue = queue
self._thread = ExcThread(target=self._relay, daemon=True)
self._thread.start()

@property
def queue(self) -> Queue[Tuple[str, Any, bool]]:
return self._queue

def close(self):
self._queue.put(None)
self._thread.join()
Expand Down
19 changes: 13 additions & 6 deletions nextline/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import signal
import asyncio
from queue import Empty
from multiprocessing import Queue, Process

import multiprocessing as mp
from threading import Event
from concurrent.futures import ThreadPoolExecutor
from tblib import pickling_support # type: ignore
Expand All @@ -16,6 +17,8 @@
from .types import RunNo, TraceNo
from .count import RunNoCounter

_mp = mp.get_context("spawn") # NOTE: monkey patched in tests

pickling_support.install()

SCRIPT_FILE_NAME = "<string>"
Expand Down Expand Up @@ -53,10 +56,11 @@ def __init__(self, statement: str, run_no_start_from=1):
self.registry = SubscribableDict[Any, Any]()
self._run_no = RunNo(run_no_start_from - 1)
self._run_no_count = RunNoCounter(run_no_start_from)
self._registrar = Registrar(self.registry)
queue = _mp.Queue()
self._registrar = Registrar(self.registry, queue)

self.context = RunArg(
statement=statement, filename=filename, queue=self._registrar.queue
statement=statement, filename=filename, queue=queue
)

self._registrar.script_change(script=statement, filename=filename)
Expand Down Expand Up @@ -259,15 +263,15 @@ class Running(State):

def __init__(self, context: RunArg):
self._context = context
self._q_commands: QueueCommands = Queue()
self._q_commands: QueueCommands = _mp.Queue()
self._event = Event()
self._executor = ThreadPoolExecutor(max_workers=1)
self._f = self._executor.submit(self._run)
assert self._event.wait(2.0)

def _run(self):
q_done: QueueDone = Queue()
self._p = Process(
q_done: QueueDone = _mp.Queue()
self._p = _mp.Process(
target=run,
args=(self._context, self._q_commands, q_done),
daemon=True,
Expand All @@ -283,6 +287,8 @@ def _run(self):
async def finish(self):
self.assert_not_obsolete()
ret, exc = await to_thread(self._f.result)
print(type(exc), exc)
print(self._p)
self._executor.shutdown()
finished = Finished(self._context, result=ret, exception=exc)
self.obsolete()
Expand All @@ -293,6 +299,7 @@ def send_pdb_command(self, trace_id: TraceNo, command: str) -> None:

def interrupt(self) -> None:
if self._p.pid:
print(self._p.pid)
os.kill(self._p.pid, signal.SIGINT)

def terminate(self) -> None:
Expand Down
3 changes: 2 additions & 1 deletion tests/scenarios/test_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from itertools import groupby
from collections import Counter
from pathlib import Path
from typing import Optional, Set
from typing import Optional, Set, Tuple

import pytest

Expand Down Expand Up @@ -172,6 +172,7 @@ async def run(nextline: Nextline):
async def control_execution(nextline: Nextline):
prev_ids: Set[int] = set()
agen = agen_with_wait(nextline.subscribe_trace_ids())
pending: Tuple[asyncio.Future] = ()
async for ids_ in agen:
ids = set(ids_)
new_ids, prev_ids = ids - prev_ids, ids # type: ignore
Expand Down
11 changes: 8 additions & 3 deletions tests/state/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@
from unittest.mock import Mock

from nextline.utils import ExcThread
from queue import Queue


@pytest.fixture(autouse=True)
def monkey_patch_process(monkeypatch):
monkeypatch.setattr("nextline.state.Process", ExcThread)
yield
def monkey_patch_mp(monkeypatch):
class mock_context:
Queue = Queue
Process = ExcThread

monkeypatch.setattr("nextline.state._mp", mock_context)
return


@pytest.fixture
Expand Down

0 comments on commit 308b3f7

Please sign in to comment.