// Copyright 2015 Google Inc. All rights reserved // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include #include #include #include #include #include #include #include #include #include #include #define CHECK(expr) do { \ if (!(expr)) { \ fprintf(stderr, "%s:%d: " #expr, __FILE__, __LINE__); \ exit(1); \ } \ } while(0) #define PCHECK(expr) do { \ if ((expr) < 0) { \ fprintf(stderr, "%s:%d: ", __FILE__, __LINE__); \ perror(""); \ exit(1); \ } \ } while(0) using namespace std; class Para; struct Task { Task() : echo(false), ignore_error(false), status(-1), signal(-1) { stdout_pipe[0] = -1; stdout_pipe[1] = -1; stderr_pipe[0] = -1; stderr_pipe[1] = -1; } ~Task() { if (stdout_pipe[0] >= 0) PCHECK(close(stdout_pipe[0])); if (stderr_pipe[0] >= 0) PCHECK(close(stderr_pipe[0])); } string output; string shell; string cmd; bool echo; bool ignore_error; pid_t pid; int stdout_pipe[2]; int stderr_pipe[2]; string stdout_buf; string stderr_buf; int status; int signal; }; class TaskProvider { public: virtual ~TaskProvider() {} virtual int GetFD() = 0; virtual void PollFD(Para* para, int fd) = 0; virtual void OnStarted(Task* t) = 0; virtual void OnFinished(Task* t) = 0; }; class Para { public: Para(TaskProvider* provider, int num_jobs); ~Para(); void AddTask(Task* task); void Loop(); void Done(); static void WakeUp(int); private: void WaitChildren(); void RunCommands(); TaskProvider* provider_; size_t num_jobs_; int sig_pipe_[2]; int provider_fd_; queue tasks_; set running_; bool done_; }; class StdinTaskProvider : public TaskProvider { public: virtual ~StdinTaskProvider() {} virtual int GetFD() { return STDIN_FILENO; } virtual void PollFD(Para* para, int fd); virtual void OnStarted(Task* t); virtual void OnFinished(Task* t); private: string buf_; }; class KatiTaskProvider : public TaskProvider { public: virtual ~KatiTaskProvider() {} virtual int GetFD() { return STDIN_FILENO; } virtual void PollFD(Para* para, int fd); virtual void OnStarted(Task* t); virtual void OnFinished(Task* t); private: string buf_; }; Para* g_para_; Para::Para(TaskProvider* provider, int num_jobs) : provider_(provider), num_jobs_(num_jobs), done_(false) { g_para_ = this; PCHECK(pipe(sig_pipe_)); sigset_t sigmask; sigemptyset(&sigmask); sigaddset(&sigmask, SIGCHLD); PCHECK(sigprocmask(SIG_BLOCK, &sigmask, NULL)); CHECK(signal(SIGCHLD, &Para::WakeUp) != SIG_ERR); provider_fd_ = provider_->GetFD(); } Para::~Para() { PCHECK(close(sig_pipe_[0])); PCHECK(close(sig_pipe_[1])); } void Para::AddTask(Task* task) { task->pid = 0; tasks_.push(task); } static void SetFd(int fd, fd_set* fdset, int* nfds) { if (fd < 0) return; FD_SET(fd, fdset); *nfds = max(*nfds, fd); } static void readOutput(int* fd, string* buf) { char b[4096]; ssize_t r = read(*fd, b, sizeof(b)); if (r < 0 && errno != EINTR) return; PCHECK(r); if (r == 0) { PCHECK(close(*fd)); *fd = -1; return; } size_t l = buf->size(); buf->resize(l + r); memcpy(&((*buf)[l]), b, r); } void Para::Loop() { sigset_t sigmask; sigemptyset(&sigmask); while (!done_ || !tasks_.empty() || !running_.empty()) { int nfds = 0; fd_set rd; FD_ZERO(&rd); if (!done_) SetFd(provider_fd_, &rd, &nfds); SetFd(sig_pipe_[0], &rd, &nfds); for (Task* t : running_) { SetFd(t->stdout_pipe[0], &rd, &nfds); SetFd(t->stderr_pipe[0], &rd, &nfds); } int r = pselect(nfds, &rd, NULL, NULL, NULL, &sigmask); PCHECK(r && errno != EINTR); if (FD_ISSET(sig_pipe_[0], &rd)) { WaitChildren(); RunCommands(); continue; } if (FD_ISSET(provider_fd_, &rd)) { provider_->PollFD(this, provider_fd_); RunCommands(); continue; } for (Task* t : running_) { if (t->stdout_pipe[0] >= 0 && FD_ISSET(t->stdout_pipe[0], &rd)) readOutput(&t->stdout_pipe[0], &t->stdout_buf); if (t->stderr_pipe[0] >= 0 && FD_ISSET(t->stderr_pipe[0], &rd)) readOutput(&t->stderr_pipe[0], &t->stderr_buf); } } } void Para::Done() { done_ = true; } void Para::WakeUp(int) { char c = 42; PCHECK(write(g_para_->sig_pipe_[1], &c, 1)); } void Para::WaitChildren() { char c = 0; PCHECK(read(sig_pipe_[0], &c, 1)); CHECK(c == 42); vector finished; for (Task* task : running_) { int status; pid_t pid = waitpid(task->pid, &status, WNOHANG); if (WIFSIGNALED(status)) { task->signal = WTERMSIG(status); } else if (WIFEXITED(status)) { task->status = WEXITSTATUS(status); } else { PCHECK(false); } PCHECK(pid); if (pid == 0) { continue; } CHECK(pid == task->pid); while (task->stdout_pipe[0] >= 0) readOutput(&task->stdout_pipe[0], &task->stdout_buf); while (task->stderr_pipe[0] >= 0) readOutput(&task->stderr_pipe[0], &task->stderr_buf); // TODO: Handle error. finished.push_back(task); } for (Task* task : finished) { running_.erase(task); provider_->OnFinished(task); delete task; } } void Para::RunCommands() { while (!tasks_.empty() && (running_.size() < num_jobs_ || num_jobs_ == 0)) { Task* task = tasks_.front(); tasks_.pop(); provider_->OnStarted(task); PCHECK(pipe(task->stdout_pipe)); PCHECK(pipe(task->stderr_pipe)); task->pid = fork(); if (task->pid == 0) { PCHECK(close(task->stdout_pipe[0])); PCHECK(close(task->stderr_pipe[0])); PCHECK(dup2(task->stdout_pipe[1], STDOUT_FILENO)); PCHECK(dup2(task->stderr_pipe[1], STDERR_FILENO)); PCHECK(close(task->stdout_pipe[1])); PCHECK(close(task->stderr_pipe[1])); const char* args[] = { task->shell.c_str(), "-c", task->cmd.c_str(), NULL }; PCHECK(execvp(args[0], const_cast(args))); abort(); } PCHECK(close(task->stdout_pipe[1])); PCHECK(close(task->stderr_pipe[1])); running_.insert(task); } } void StdinTaskProvider::PollFD(Para* para, int fd) { char buf[4096]; ssize_t r = read(fd, buf, sizeof(buf)); PCHECK(r); if (r == 0) { para->Done(); return; } buf_.append(buf, r); for (;;) { size_t index = buf_.find('\n'); if (index == string::npos) { break; } Task* task = new Task(); task->shell = "/bin/sh"; task->cmd = buf_.substr(0, index); if (task->cmd.empty()) continue; para->AddTask(task); buf_ = buf_.substr(index + 1); } } void StdinTaskProvider::OnStarted(Task*) { } void StdinTaskProvider::OnFinished(Task* t) { fprintf(stdout, "%s", t->stdout_buf.c_str()); fprintf(stderr, "%s", t->stderr_buf.c_str()); } struct Runner { string output; string cmd; string shell; bool echo; bool ignore_error; }; static void recvData(int fd, void* d, size_t sz) { size_t s = 0; while (s < sz) { ssize_t r = read(fd, reinterpret_cast(d) + s, sz - s); if (r < 0 && errno == EINTR) continue; PCHECK(r); if (r == 0) { exit(1); } s += r; } } static int recvInt(int fd) { int v; recvData(fd, &v, sizeof(v)); return v; } static void recvString(int fd, string* s) { int l = recvInt(fd); s->resize(l); recvData(fd, &((*s)[0]), l); } static void recvTasks(int fd, vector* tasks) { int l = recvInt(fd); for (int i = 0; i < l; i++) { Task* r = new Task(); recvString(fd, &r->output); recvString(fd, &r->cmd); recvString(fd, &r->shell); r->echo = recvInt(fd); r->ignore_error = recvInt(fd); tasks->push_back(r); } } void KatiTaskProvider::PollFD(Para* para, int fd) { vector tasks; recvTasks(fd, &tasks); #if 0 for (Task* t : tasks) { para->AddTask(t); } #else Task* task = tasks[0]; for (Task* t : tasks) { if (task == t) continue; task->cmd += " ; "; task->cmd += t->cmd; } para->AddTask(task); #endif } static void sendData(int fd, const void* d, size_t sz) { size_t s = 0; while (s < sz) { ssize_t r = write(fd, reinterpret_cast(d) + s, sz - s); if (r < 0 && errno == EINTR) continue; PCHECK(r); if (r == 0) { exit(1); } s += r; } } static void sendInt(int fd, int v) { sendData(fd, &v, sizeof(v)); } static void sendString(int fd, const string& s) { sendInt(fd, s.size()); sendData(fd, s.data(), s.size()); } static void sendResult(int fd, Task* t) { sendString(fd, t->output); sendString(fd, t->stdout_buf); sendString(fd, t->stderr_buf); sendInt(fd, t->status); sendInt(fd, t->signal); } void KatiTaskProvider::OnStarted(Task* t) { sendResult(STDOUT_FILENO, t); } void KatiTaskProvider::OnFinished(Task* t) { sendResult(STDOUT_FILENO, t); } int GetNumCpus() { // TODO: Implement. return 4; } int main(int argc, char* argv[]) { int num_jobs = -1; bool from_kati = false; for (int i = 1; i < argc; i++) { char* arg = argv[i]; if (!strncmp(arg, "-j", 2)) { num_jobs = atoi(arg + 2); } else if (!strcmp(arg, "--kati")) { from_kati = true; } } if (num_jobs < 0) { num_jobs = GetNumCpus(); } TaskProvider* provider = NULL; if (from_kati) { provider = new KatiTaskProvider(); } else { provider = new StdinTaskProvider(); } Para para(provider, num_jobs); para.Loop(); }