Skip to content

Commit

Permalink
Add TokenPool monitoring to SubprocessSet::DoWork()
Browse files Browse the repository at this point in the history
Improve on the original jobserver client implementation. This makes
ninja a more aggressive GNU make jobserver client.

- add monitor interface to TokenPool
- TokenPool passed down when plan indicates more work is ready to start
- posix: update DoWork() to monitor TokenPool read file descriptor
- WaitForCommand() exits when DoWork() sets token flag
- Main loop starts over when WaitForCommand() sets token exit status
  • Loading branch information
Stefan Becker committed May 30, 2016
1 parent 60db09f commit 22faa20
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 29 deletions.
27 changes: 20 additions & 7 deletions src/build.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ struct DryRunCommandRunner : public CommandRunner {
// Overridden from CommandRunner:
virtual bool CanRunMore();
virtual bool StartCommand(Edge* edge);
virtual bool WaitForCommand(Result* result);
virtual bool WaitForCommand(Result* result, bool more_ready);

private:
queue<Edge*> finished_;
Expand All @@ -60,7 +60,7 @@ bool DryRunCommandRunner::StartCommand(Edge* edge) {
return true;
}

bool DryRunCommandRunner::WaitForCommand(Result* result) {
bool DryRunCommandRunner::WaitForCommand(Result* result, bool more_ready) {
if (finished_.empty())
return false;

Expand Down Expand Up @@ -507,7 +507,7 @@ struct RealCommandRunner : public CommandRunner {
virtual ~RealCommandRunner();
virtual bool CanRunMore();
virtual bool StartCommand(Edge* edge);
virtual bool WaitForCommand(Result* result);
virtual bool WaitForCommand(Result* result, bool more_ready);
virtual vector<Edge*> GetActiveEdges();
virtual void Abort();

Expand Down Expand Up @@ -561,14 +561,23 @@ bool RealCommandRunner::StartCommand(Edge* edge) {
return true;
}

bool RealCommandRunner::WaitForCommand(Result* result) {
bool RealCommandRunner::WaitForCommand(Result* result, bool more_ready) {
Subprocess* subproc;
while ((subproc = subprocs_.NextFinished()) == NULL) {
bool interrupted = subprocs_.DoWork();
subprocs_.ResetTokenAvailable();
while (((subproc = subprocs_.NextFinished()) == NULL) &&
!subprocs_.IsTokenAvailable()) {
bool interrupted = subprocs_.DoWork(more_ready ? tokens_ : NULL);
if (interrupted)
return false;
}

// token became available
if (subproc == NULL) {
result->status = ExitTokenAvailable;
return true;
}

// command completed
if (tokens_)
tokens_->Release();

Expand Down Expand Up @@ -698,14 +707,18 @@ bool Builder::Build(string* err) {
// See if we can reap any finished commands.
if (pending_commands) {
CommandRunner::Result result;
if (!command_runner_->WaitForCommand(&result) ||
if (!command_runner_->WaitForCommand(&result, plan_.more_ready()) ||
result.status == ExitInterrupted) {
Cleanup();
status_->BuildFinished();
*err = "interrupted by user";
return false;
}

// We might be able to start another command; start the main loop over.
if (result.status == ExitTokenAvailable)
continue;

--pending_commands;
if (!FinishCommand(&result, err)) {
Cleanup();
Expand Down
2 changes: 1 addition & 1 deletion src/build.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ struct CommandRunner {
bool success() const { return status == ExitSuccess; }
};
/// Wait for a command to complete, or return false if interrupted.
virtual bool WaitForCommand(Result* result) = 0;
virtual bool WaitForCommand(Result* result, bool more_ready) = 0;

virtual vector<Edge*> GetActiveEdges() { return vector<Edge*>(); }
virtual void Abort() {}
Expand Down
4 changes: 2 additions & 2 deletions src/build_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ struct FakeCommandRunner : public CommandRunner {
// CommandRunner impl
virtual bool CanRunMore();
virtual bool StartCommand(Edge* edge);
virtual bool WaitForCommand(Result* result);
virtual bool WaitForCommand(Result* result, bool more_ready);
virtual vector<Edge*> GetActiveEdges();
virtual void Abort();

Expand Down Expand Up @@ -627,7 +627,7 @@ bool FakeCommandRunner::StartCommand(Edge* edge) {
return true;
}

bool FakeCommandRunner::WaitForCommand(Result* result) {
bool FakeCommandRunner::WaitForCommand(Result* result, bool more_ready) {
if (!last_command_)
return false;

Expand Down
3 changes: 2 additions & 1 deletion src/exit_status.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
enum ExitStatus {
ExitSuccess,
ExitFailure,
ExitInterrupted
ExitTokenAvailable,
ExitInterrupted,
};

#endif // NINJA_EXIT_STATUS_H_
33 changes: 31 additions & 2 deletions src/subprocess-posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "subprocess.h"
#include "tokenpool.h"

#include <assert.h>
#include <errno.h>
Expand Down Expand Up @@ -216,7 +217,7 @@ Subprocess *SubprocessSet::Add(const string& command, bool use_console) {
}

#ifdef USE_PPOLL
bool SubprocessSet::DoWork() {
bool SubprocessSet::DoWork(struct TokenPool* tokens) {
vector<pollfd> fds;
nfds_t nfds = 0;

Expand All @@ -230,6 +231,12 @@ bool SubprocessSet::DoWork() {
++nfds;
}

if (tokens) {
pollfd pfd = { tokens->GetMonitorFd(), POLLIN | POLLPRI, 0 };
fds.push_back(pfd);
++nfds;
}

interrupted_ = 0;
int ret = ppoll(&fds.front(), nfds, NULL, &old_mask_);
if (ret == -1) {
Expand Down Expand Up @@ -262,11 +269,20 @@ bool SubprocessSet::DoWork() {
++i;
}

if (tokens) {
pollfd *pfd = &fds[nfds - 1];
if (pfd->fd >= 0) {
assert(pfd->fd == tokens->GetMonitorFd());
if (pfd->revents != 0)
token_available_ = true;
}
}

return IsInterrupted();
}

#else // !defined(USE_PPOLL)
bool SubprocessSet::DoWork() {
bool SubprocessSet::DoWork(struct TokenPool* tokens) {
fd_set set;
int nfds = 0;
FD_ZERO(&set);
Expand All @@ -281,6 +297,13 @@ bool SubprocessSet::DoWork() {
}
}

if (tokens) {
int fd = tokens->GetMonitorFd();
FD_SET(fd, &set);
if (nfds < fd+1)
nfds = fd+1;
}

interrupted_ = 0;
int ret = pselect(nfds, &set, 0, 0, 0, &old_mask_);
if (ret == -1) {
Expand Down Expand Up @@ -309,6 +332,12 @@ bool SubprocessSet::DoWork() {
++i;
}

if (tokens) {
int fd = tokens->GetMonitorFd();
if ((fd >= 0) && FD_ISSET(fd, &set))
token_available_ = true;
}

return IsInterrupted();
}
#endif // !defined(USE_PPOLL)
Expand Down
2 changes: 1 addition & 1 deletion src/subprocess-win32.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ Subprocess *SubprocessSet::Add(const string& command, bool use_console) {
return subprocess;
}

bool SubprocessSet::DoWork() {
bool SubprocessSet::DoWork(struct TokenPool* tokens) {
DWORD bytes_read;
Subprocess* subproc;
OVERLAPPED* overlapped;
Expand Down
8 changes: 7 additions & 1 deletion src/subprocess.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ struct Subprocess {
friend struct SubprocessSet;
};

struct TokenPool;

/// SubprocessSet runs a ppoll/pselect() loop around a set of Subprocesses.
/// DoWork() waits for any state change in subprocesses; finished_
/// is a queue of subprocesses as they finish.
Expand All @@ -77,13 +79,17 @@ struct SubprocessSet {
~SubprocessSet();

Subprocess* Add(const string& command, bool use_console = false);
bool DoWork();
bool DoWork(struct TokenPool* tokens);
Subprocess* NextFinished();
void Clear();

vector<Subprocess*> running_;
queue<Subprocess*> finished_;

bool token_available_;
bool IsTokenAvailable() { return token_available_; }
void ResetTokenAvailable() { token_available_ = false; }

#ifdef _WIN32
static BOOL WINAPI NotifyInterrupted(DWORD dwCtrlType);
static HANDLE ioport_;
Expand Down
47 changes: 33 additions & 14 deletions src/subprocess_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ TEST_F(SubprocessTest, BadCommandStderr) {
Subprocess* subproc = subprocs_.Add("cmd /c ninja_no_such_command");
ASSERT_NE((Subprocess *) 0, subproc);

subprocs_.ResetTokenAvailable();
while (!subproc->Done()) {
// Pretend we discovered that stderr was ready for writing.
subprocs_.DoWork();
subprocs_.DoWork(NULL);
}
ASSERT_EQ(false, subprocs_.IsTokenAvailable());

EXPECT_EQ(ExitFailure, subproc->Finish());
EXPECT_NE("", subproc->GetOutput());
Expand All @@ -57,10 +59,12 @@ TEST_F(SubprocessTest, NoSuchCommand) {
Subprocess* subproc = subprocs_.Add("ninja_no_such_command");
ASSERT_NE((Subprocess *) 0, subproc);

subprocs_.ResetTokenAvailable();
while (!subproc->Done()) {
// Pretend we discovered that stderr was ready for writing.
subprocs_.DoWork();
subprocs_.DoWork(NULL);
}
ASSERT_EQ(false, subprocs_.IsTokenAvailable());

EXPECT_EQ(ExitFailure, subproc->Finish());
EXPECT_NE("", subproc->GetOutput());
Expand All @@ -76,9 +80,11 @@ TEST_F(SubprocessTest, InterruptChild) {
Subprocess* subproc = subprocs_.Add("kill -INT $$");
ASSERT_NE((Subprocess *) 0, subproc);

subprocs_.ResetTokenAvailable();
while (!subproc->Done()) {
subprocs_.DoWork();
subprocs_.DoWork(NULL);
}
ASSERT_EQ(false, subprocs_.IsTokenAvailable());

EXPECT_EQ(ExitInterrupted, subproc->Finish());
}
Expand All @@ -88,7 +94,7 @@ TEST_F(SubprocessTest, InterruptParent) {
ASSERT_NE((Subprocess *) 0, subproc);

while (!subproc->Done()) {
bool interrupted = subprocs_.DoWork();
bool interrupted = subprocs_.DoWork(NULL);
if (interrupted)
return;
}
Expand All @@ -100,9 +106,11 @@ TEST_F(SubprocessTest, InterruptChildWithSigTerm) {
Subprocess* subproc = subprocs_.Add("kill -TERM $$");
ASSERT_NE((Subprocess *) 0, subproc);

subprocs_.ResetTokenAvailable();
while (!subproc->Done()) {
subprocs_.DoWork();
subprocs_.DoWork(NULL);
}
ASSERT_EQ(false, subprocs_.IsTokenAvailable());

EXPECT_EQ(ExitInterrupted, subproc->Finish());
}
Expand All @@ -112,7 +120,7 @@ TEST_F(SubprocessTest, InterruptParentWithSigTerm) {
ASSERT_NE((Subprocess *) 0, subproc);

while (!subproc->Done()) {
bool interrupted = subprocs_.DoWork();
bool interrupted = subprocs_.DoWork(NULL);
if (interrupted)
return;
}
Expand All @@ -124,9 +132,11 @@ TEST_F(SubprocessTest, InterruptChildWithSigHup) {
Subprocess* subproc = subprocs_.Add("kill -HUP $$");
ASSERT_NE((Subprocess *) 0, subproc);

subprocs_.ResetTokenAvailable();
while (!subproc->Done()) {
subprocs_.DoWork();
subprocs_.DoWork(NULL);
}
ASSERT_EQ(false, subprocs_.IsTokenAvailable());

EXPECT_EQ(ExitInterrupted, subproc->Finish());
}
Expand All @@ -136,7 +146,7 @@ TEST_F(SubprocessTest, InterruptParentWithSigHup) {
ASSERT_NE((Subprocess *) 0, subproc);

while (!subproc->Done()) {
bool interrupted = subprocs_.DoWork();
bool interrupted = subprocs_.DoWork(NULL);
if (interrupted)
return;
}
Expand All @@ -151,9 +161,11 @@ TEST_F(SubprocessTest, Console) {
subprocs_.Add("test -t 0 -a -t 1 -a -t 2", /*use_console=*/true);
ASSERT_NE((Subprocess*)0, subproc);

subprocs_.ResetTokenAvailable();
while (!subproc->Done()) {
subprocs_.DoWork();
subprocs_.DoWork(NULL);
}
ASSERT_EQ(false, subprocs_.IsTokenAvailable());

EXPECT_EQ(ExitSuccess, subproc->Finish());
}
Expand All @@ -165,9 +177,11 @@ TEST_F(SubprocessTest, SetWithSingle) {
Subprocess* subproc = subprocs_.Add(kSimpleCommand);
ASSERT_NE((Subprocess *) 0, subproc);

subprocs_.ResetTokenAvailable();
while (!subproc->Done()) {
subprocs_.DoWork();
subprocs_.DoWork(NULL);
}
ASSERT_EQ(false, subprocs_.IsTokenAvailable());
ASSERT_EQ(ExitSuccess, subproc->Finish());
ASSERT_NE("", subproc->GetOutput());

Expand Down Expand Up @@ -198,12 +212,13 @@ TEST_F(SubprocessTest, SetWithMulti) {
ASSERT_EQ("", processes[i]->GetOutput());
}

subprocs_.ResetTokenAvailable();
while (!processes[0]->Done() || !processes[1]->Done() ||
!processes[2]->Done()) {
ASSERT_GT(subprocs_.running_.size(), 0u);
subprocs_.DoWork();
subprocs_.DoWork(NULL);
}

ASSERT_EQ(false, subprocs_.IsTokenAvailable());
ASSERT_EQ(0u, subprocs_.running_.size());
ASSERT_EQ(3u, subprocs_.finished_.size());

Expand Down Expand Up @@ -237,8 +252,10 @@ TEST_F(SubprocessTest, SetWithLots) {
ASSERT_NE((Subprocess *) 0, subproc);
procs.push_back(subproc);
}
subprocs_.ResetTokenAvailable();
while (!subprocs_.running_.empty())
subprocs_.DoWork();
subprocs_.DoWork(NULL);
ASSERT_EQ(false, subprocs_.IsTokenAvailable());
for (size_t i = 0; i < procs.size(); ++i) {
ASSERT_EQ(ExitSuccess, procs[i]->Finish());
ASSERT_NE("", procs[i]->GetOutput());
Expand All @@ -254,9 +271,11 @@ TEST_F(SubprocessTest, SetWithLots) {
// that stdin is closed.
TEST_F(SubprocessTest, ReadStdin) {
Subprocess* subproc = subprocs_.Add("cat -");
subprocs_.ResetTokenAvailable();
while (!subproc->Done()) {
subprocs_.DoWork();
subprocs_.DoWork(NULL);
}
ASSERT_EQ(false, subprocs_.IsTokenAvailable());
ASSERT_EQ(ExitSuccess, subproc->Finish());
ASSERT_EQ(1u, subprocs_.finished_.size());
}
Expand Down
Loading

0 comments on commit 22faa20

Please sign in to comment.