diff --git a/artiq/master/experiments.py b/artiq/master/experiments.py index fc27874762..bf5a905b07 100644 --- a/artiq/master/experiments.py +++ b/artiq/master/experiments.py @@ -64,7 +64,7 @@ async def _scan(self, root, subdir=""): exc_info=not isinstance(exc, WorkerInternalException)) # restart worker await self.worker.close() - self.worker = Worker(self.worker_handlers) + self.worker = self._create_worker() if de.is_dir(): subentries = await self._scan( root, os.path.join(subdir, de.name)) @@ -72,10 +72,12 @@ async def _scan(self, root, subdir=""): entry_dict.update(entries) return entry_dict - async def scan(self, root): - self.worker = Worker(self.worker_handlers) + async def scan(self, repo_path): + self._create_worker = lambda: Worker(self.worker_handlers, + repo_path=repo_path) + self.worker = self._create_worker() try: - r = await self._scan(root) + r = await self._scan(repo_path) finally: await self.worker.close() return r @@ -96,7 +98,7 @@ def __init__(self, repo_backend, worker_handlers): self.worker_handlers = worker_handlers self.cur_rev = self.repo_backend.get_head_rev() - self.repo_backend.request_rev(self.cur_rev) + self.repo_path, _ = self.repo_backend.request_rev(self.cur_rev) self.explist = Notifier(dict()) self._scanning = False @@ -140,7 +142,10 @@ async def examine(self, filename, use_repository=True, revision=None): revision = self.cur_rev wd, _ = self.repo_backend.request_rev(revision) filename = os.path.join(wd, filename) - worker = Worker(self.worker_handlers) + repo_path = wd + else: + repo_path = self.repo_path + worker = Worker(self.worker_handlers, repo_path=repo_path) try: description = await worker.examine("examine", filename) finally: @@ -183,7 +188,12 @@ def release_rev(self, rev): class _GitCheckout: def __init__(self, git, rev): - self.path = tempfile.mkdtemp() + parent_path = tempfile.mkdtemp() + # Checkout into a subdirectory 'repository' of our temp dir to ensure + # that there are no other modules unintentionally added to the python + # path + self.path = parent_path+os.path.sep+'repository' + os.mkdir(self.path) commit = git.get(rev) git.checkout_tree(commit, directory=self.path) self.message = commit.message.strip() diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index 53d06cac9f..c8e812b619 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -45,7 +45,7 @@ async def worker_method(self, *args, **kwargs): class Run: def __init__(self, rid, pipeline_name, - wd, expid, priority, due_date, flush, + wd, repo_path, expid, priority, due_date, flush, pool, **kwargs): # called through pool self.rid = rid @@ -56,7 +56,7 @@ def __init__(self, rid, pipeline_name, self.due_date = due_date self.flush = flush - self.worker = Worker(pool.worker_handlers) + self.worker = Worker(pool.worker_handlers, repo_path=repo_path) self.termination_requested = False self._status = RunStatus.pending @@ -135,10 +135,12 @@ def submit(self, expid, priority, due_date, flush, pipeline_name): expid["repo_rev"] = self.experiment_db.cur_rev wd, repo_msg = self.experiment_db.repo_backend.request_rev( expid["repo_rev"]) + repo_path = wd else: wd, repo_msg = None, None - run = Run(rid, pipeline_name, wd, expid, priority, due_date, flush, - self, repo_msg=repo_msg) + repo_path = self.experiment_db.repo_backend.request_rev(None) + run = Run(rid, pipeline_name, wd, repo_path, expid, priority, due_date, + flush, self, repo_msg=repo_msg) self.runs[rid] = run self.state_changed.notify() return rid diff --git a/artiq/master/worker.py b/artiq/master/worker.py index 69b6d9a28a..81e12e0732 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -41,7 +41,7 @@ def log_worker_exception(): class Worker: - def __init__(self, handlers=dict(), send_timeout=10.0): + def __init__(self, handlers=dict(), send_timeout=10.0, repo_path=None): self.handlers = handlers self.send_timeout = send_timeout @@ -49,6 +49,7 @@ def __init__(self, handlers=dict(), send_timeout=10.0): self.filename = None self.ipc = None self.watchdogs = dict() # wid -> expiration (using time.monotonic) + self.repo_path = repo_path self.io_lock = asyncio.Lock() self.closed = asyncio.Event() @@ -85,6 +86,13 @@ async def _create_process(self, log_level): self.ipc = pipe_ipc.AsyncioParentComm() env = os.environ.copy() env["PYTHONUNBUFFERED"] = "1" + if self.repo_path: + # Add parent of repository directory to PYTHONPATH + repo_parent_path = os.path.dirname(self.repo_path) + if "PYTHONPATH" in env: + env["PYTHONPATH"] += os.pathsep+repo_parent_path + else: + env["PYTHONPATH"] = repo_parent_path await self.ipc.create_subprocess( sys.executable, "-m", "artiq.master.worker_impl", self.ipc.get_address(), str(log_level),