Skip to content

Commit

Permalink
Add experiment repository to experiment PYTHONPATH
Browse files Browse the repository at this point in the history
Implements m-labs#648. The repository must be a valid module (i.e. have
__init__.py). Using the git backend one can 'import repository'.
Using the file-system backend the repository module name is the
name of your repository directory.
  • Loading branch information
cjbe committed Feb 3, 2017
1 parent be0953d commit acc01ed
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 12 deletions.
24 changes: 17 additions & 7 deletions artiq/master/experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,20 @@ async def _scan(self, root, subdir=""):
exc_info=not isinstance(exc, WorkerInternalException))
# restart worker
await self.worker.close()
self.worker = Worker(self.worker_handlers)
self.worker = self._create_worker()
if de.is_dir():
subentries = await self._scan(
root, os.path.join(subdir, de.name))
entries = {de.name + "/" + k: v for k, v in subentries.items()}
entry_dict.update(entries)
return entry_dict

async def scan(self, root):
self.worker = Worker(self.worker_handlers)
async def scan(self, repo_path):
self._create_worker = lambda: Worker(self.worker_handlers,
repo_path=repo_path)
self.worker = self._create_worker()
try:
r = await self._scan(root)
r = await self._scan(repo_path)
finally:
await self.worker.close()
return r
Expand All @@ -96,7 +98,7 @@ def __init__(self, repo_backend, worker_handlers):
self.worker_handlers = worker_handlers

self.cur_rev = self.repo_backend.get_head_rev()
self.repo_backend.request_rev(self.cur_rev)
self.repo_path, _ = self.repo_backend.request_rev(self.cur_rev)
self.explist = Notifier(dict())
self._scanning = False

Expand Down Expand Up @@ -140,7 +142,10 @@ async def examine(self, filename, use_repository=True, revision=None):
revision = self.cur_rev
wd, _ = self.repo_backend.request_rev(revision)
filename = os.path.join(wd, filename)
worker = Worker(self.worker_handlers)
repo_path = wd
else:
repo_path = self.repo_path
worker = Worker(self.worker_handlers, repo_path=repo_path)
try:
description = await worker.examine("examine", filename)
finally:
Expand Down Expand Up @@ -183,7 +188,12 @@ def release_rev(self, rev):

class _GitCheckout:
def __init__(self, git, rev):
self.path = tempfile.mkdtemp()
parent_path = tempfile.mkdtemp()
# Checkout into a subdirectory 'repository' of our temp dir to ensure
# that there are no other modules unintentionally added to the python
# path
self.path = parent_path+os.path.sep+'repository'
os.mkdir(self.path)
commit = git.get(rev)
git.checkout_tree(commit, directory=self.path)
self.message = commit.message.strip()
Expand Down
10 changes: 6 additions & 4 deletions artiq/master/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async def worker_method(self, *args, **kwargs):

class Run:
def __init__(self, rid, pipeline_name,
wd, expid, priority, due_date, flush,
wd, repo_path, expid, priority, due_date, flush,
pool, **kwargs):
# called through pool
self.rid = rid
Expand All @@ -56,7 +56,7 @@ def __init__(self, rid, pipeline_name,
self.due_date = due_date
self.flush = flush

self.worker = Worker(pool.worker_handlers)
self.worker = Worker(pool.worker_handlers, repo_path=repo_path)
self.termination_requested = False

self._status = RunStatus.pending
Expand Down Expand Up @@ -135,10 +135,12 @@ def submit(self, expid, priority, due_date, flush, pipeline_name):
expid["repo_rev"] = self.experiment_db.cur_rev
wd, repo_msg = self.experiment_db.repo_backend.request_rev(
expid["repo_rev"])
repo_path = wd
else:
wd, repo_msg = None, None
run = Run(rid, pipeline_name, wd, expid, priority, due_date, flush,
self, repo_msg=repo_msg)
repo_path = self.experiment_db.repo_backend.request_rev(None)
run = Run(rid, pipeline_name, wd, repo_path, expid, priority, due_date,
flush, self, repo_msg=repo_msg)
self.runs[rid] = run
self.state_changed.notify()
return rid
Expand Down
10 changes: 9 additions & 1 deletion artiq/master/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ def log_worker_exception():


class Worker:
def __init__(self, handlers=dict(), send_timeout=10.0):
def __init__(self, handlers=dict(), send_timeout=10.0, repo_path=None):
self.handlers = handlers
self.send_timeout = send_timeout

self.rid = None
self.filename = None
self.ipc = None
self.watchdogs = dict() # wid -> expiration (using time.monotonic)
self.repo_path = repo_path

self.io_lock = asyncio.Lock()
self.closed = asyncio.Event()
Expand Down Expand Up @@ -85,6 +86,13 @@ async def _create_process(self, log_level):
self.ipc = pipe_ipc.AsyncioParentComm()
env = os.environ.copy()
env["PYTHONUNBUFFERED"] = "1"
if self.repo_path:
# Add parent of repository directory to PYTHONPATH
repo_parent_path = os.path.dirname(self.repo_path)
if "PYTHONPATH" in env:
env["PYTHONPATH"] += os.pathsep+repo_parent_path
else:
env["PYTHONPATH"] = repo_parent_path
await self.ipc.create_subprocess(
sys.executable, "-m", "artiq.master.worker_impl",
self.ipc.get_address(), str(log_level),
Expand Down

0 comments on commit acc01ed

Please sign in to comment.