4 #include <sys/select.h>
8 #include <sys/signalfd.h>
9 #include <sys/resource.h>
14 static int child_count;
15 static int parent_count;
16 static int job_request_fd[2];
17 static int job_get_permission_fd[2];
20 static unsigned int max_jobs;
21 static unsigned int job_count;
22 static unsigned int jobs_pending;
24 int get_child_count(void)
29 int get_parent_count(void)
34 static int handle_signals(void)
36 struct signalfd_siginfo siginfo;
39 ret = read(signal_fd, &siginfo, sizeof(siginfo));
40 if (ret < sizeof(siginfo)) {
41 pr_err("Expected %zd from read, got %d: %m\n",
42 sizeof(siginfo), ret);
46 if (siginfo.ssi_signo != SIGCHLD) {
47 pr_err("Unexpected signal %d, ignoring\n", siginfo.ssi_signo);
51 harvest_zombies(siginfo.ssi_pid);
57 * Initialize the jobcontrol.
59 * Create the pipes that are used to grant children execution
60 * permissions. If max_jobs is zero, count the number of CPUs from
61 * /proc/cpuinfo and use that.
63 int init_jobcontrol(int max_jobs_requested)
65 struct epoll_event ev;
72 if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
73 pr_err("Failed to create pipe: %m\n");
77 if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
78 pr_err("Failed to create pipe: %m\n");
82 epoll_fd = epoll_create(1);
84 pr_err("Failed to epoll_create(): %m\n");
89 ev.data.fd = job_request_fd[0];
90 ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, job_request_fd[0], &ev);
92 pr_err("Failed to add epoll_fd: %m\n");
96 sigemptyset(&sigmask);
97 sigaddset(&sigmask, SIGCHLD);
99 /* Block SIGCHLD so that it becomes readable via signalfd */
100 ret = sigprocmask(SIG_BLOCK, &sigmask, NULL);
102 pr_err("Failed to sigprocmask: %m\n");
105 signal_fd = signalfd(-1, &sigmask, SFD_CLOEXEC);
107 pr_err("Failed to create signal_fd: %m\n");
111 ev.data.fd = signal_fd;
112 ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, signal_fd, &ev);
114 pr_err("Failed to add epoll_fd: %m\n");
119 if (max_jobs_requested > 0) {
120 max_jobs = max_jobs_requested;
125 file = fopen("/proc/cpuinfo", "ro");
127 pr_err("Failed to open /proc/cpuinfo: %m\n");
132 * The CPU count algorithm simply reads the first 8 bytes from
133 * the /proc/cpuinfo and then expects that line to be there as
134 * many times as there are CPUs.
136 ret = fread(match, 1, sizeof(match), file);
137 if (ret < sizeof(match)) {
138 pr_err("read %d bytes when expecting %zd %m\n",
143 while(fgets(buf, sizeof(buf), file)) {
144 if (!strncmp(buf, match, sizeof(match)))
153 pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
158 static int grant_new_job(void)
164 pr_info("Granting new job. %d jobs currently and %d pending\n",
165 job_count, jobs_pending);
167 ret = write(job_get_permission_fd[1], &byte, 1);
169 pr_err("Failed to write 1 byte: %m\n");
176 static int handle_job_request(void)
180 ret = read(job_request_fd[0], &pid, sizeof(pid));
182 pr_err("Failed to read: %m\n");
187 pr_info("Read zero bytes\n");
192 if (job_count >= max_jobs) {
195 ret = grant_new_job();
198 } else if (pid < 0) {
199 if (job_count > max_jobs)
200 pr_err("BUG: Job %u jobs exceeded limit %u\n",
201 job_count, max_jobs);
203 pr_info("Job %d finished\n", -pid);
207 ret = grant_new_job();
215 int poll_job_requests(int timeout)
217 struct epoll_event event;
220 /* Convert positive seconds to milliseconds */
221 timeout = timeout > 0 ? 1000 * timeout : timeout;
223 ret = epoll_wait(epoll_fd, &event, 1, timeout);
226 if (errno != EINTR) {
227 pr_err("epoll_wait: %m\n");
232 * If epoll_wait() was interrupted, better start
233 * everything again from the beginning
239 pr_info("Timed out\n");
243 if (event.data.fd == job_request_fd[0])
244 handle_job_request();
245 else if (event.data.fd == signal_fd)
248 pr_err("Unknown fd: %d\n", event.data.fd);
251 pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
256 * Per process flag indicating whether this child has requested fork
257 * limiting. If it has, it must also tell the master parent when it
258 * has died so that the parent can give next pending job permission to
261 static int is_limited_fork;
268 pr_err("fork() failed: %m\n");
274 pr_info("Fork %d, child %d\n", child_count, child);
279 * Also do not notify the master parent the death of this
280 * child. Only childs that have been created with
281 * do_fork_limited() can have this flag set.
285 /* reset child's child count */
291 static int request_fork(char request)
295 pid = request > 0 ? pid : -pid;
297 return write(job_request_fd[1], &pid, sizeof(pid));
300 static void limited_fork_exit_handler(void)
307 * Like do_fork(), but allow the child continue only after the global
308 * job count is low enough.
310 * We allow the parent to continue other more important activities but
311 * child respects the limit of global active processes.
313 int do_fork_limited(void)
322 /* Remember to notify the parent when we are done */
323 atexit(limited_fork_exit_handler);
326 pr_info("Requesting permission to go\n");
328 /* Signal the parent that we are here, waiting to go */
332 * The parent will tell us when we can continue. If there were
333 * multiple children waiting for their turn to run parent's
334 * write to the pipe will wake up every process reading
335 * it. However, only one will be reading the content and
336 * getting the permission to run. So we will read as many
337 * times as needed until we get our own permission to run.
340 ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
346 pr_info("Continuing\n");
350 int harvest_zombies(int pid)
353 struct rusage rusage;
354 char *status_str = NULL;
357 if (child_count == 0)
361 pr_info("Waiting on pid %d, children left: %d\n", pid,
365 pid = wait4(pid, &status, 0, &rusage);
367 pr_err("Error on waitid(): %m\n");
370 /* Wait until the child has become a zombie */
371 } while (!WIFEXITED(status) && !WIFSIGNALED(status));
374 if (WIFEXITED(status)) {
375 status_str = "exited with status";
376 code = WEXITSTATUS(status);
377 } else if (WIFSIGNALED(status)) {
378 status_str = "killed by signal";
379 code = WTERMSIG(status);
381 pr_info("pid %d: %s %d. Children left: %d\n", pid,
382 status_str, code, child_count);
383 pr_info("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
384 (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
385 (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
391 * Runs a command cmd with params argv, connects stdin and stdout to
394 * Returns the pid of the executed process
396 int run_piped(const char *cmd, char *const argv[],
397 int *stdinfd, int *stdoutfd, int *stderrfd)
399 int ifd[2], ofd[2], efd[2], pid;
401 pr_info("Running command %s\n", cmd);
403 if (stdinfd && pipe(ifd)) {
404 pr_err("pipe() failed: %m\n");
408 if (stdoutfd && pipe(ofd)) {
409 pr_err("pipe() failed: %m\n");
413 if (stderrfd && pipe(efd)) {
414 pr_err("pipe() failed: %m\n");
419 if (pid) { /* Parent side */
440 dup2(ifd[0], STDIN_FILENO);
445 dup2(ofd[1], STDOUT_FILENO);
450 dup2(efd[1], STDERR_FILENO);
453 /* Now we have redirected standard streams to parent process */
455 pr_err("Failed to execv command %s: %m\n", cmd);
462 * Runs a command cmd with params argv, connects stdin and stdout to
465 * Returns the pid of the executed process
467 int run_piped_stream(const char *cmd, char *const argv[],
468 FILE **stdinf, FILE **stdoutf, FILE **stderrf)
470 int ifd, ofd, efd, pid;
486 pid = run_piped(cmd, argv, i, o, e);
489 *stdinf = fdopen(ifd, "r");
490 if (*stdinf == NULL) {
491 pr_err("Error opening file stream for fd %d: %m\n",
498 *stdoutf = fdopen(ofd, "r");
499 if (*stdoutf == NULL) {
500 pr_err("Error opening file stream for fd %d: %m\n",
507 *stderrf = fdopen(efd, "r");
508 if (*stderrf == NULL) {
509 pr_err("Error opening file stream for fd %d: %m\n",
519 * Forks a child and executes a command to run on parallel
522 #define max(a,b) (a) < (b) ? (b) : (a)
523 #define BUF_SIZE (128*1024)
524 int run(const char *cmd, char *const argv[])
531 char stdoutstr[32], stderrstr[32], indent[16] = { " " };
533 indent[get_parent_count() + 1] = 0;
535 if ((child = do_fork()))
538 child = run_piped(cmd, argv, NULL, &ofd, &efd);
539 snprintf(stdoutstr, 32, "%sstdout", green_color);
540 snprintf(stderrstr, 32, "%sstderr", red_color);
552 maxfd = max(ofd, efd);
553 error = select(maxfd, &rfds, NULL, NULL, NULL);
556 pr_err("Error with select: %m\n");
560 if (FD_ISSET(ofd, &rfds)) {
562 bytes = read(ofd, rbuf, BUF_SIZE);
566 if (FD_ISSET(efd, &rfds)) {
568 bytes = read(efd, rbuf, BUF_SIZE);
572 pr_err("select() returned unknown fd\n");
577 pr_err("read() failed: %m\n");
582 * Workaround: When a process had die and it has only
583 * written to stderr, select() doesn't indicate that
584 * there might be something to read in stderr fd. To
585 * work around this issue, we try to read stderr just
586 * in case in order to ensure everything gets read.
589 bytes = read(efd, rbuf, BUF_SIZE);
598 fprintf(stderr, "%s[%5d %s] %s: %s%s\n", indent,
599 child, cmd, typestr, sptr, normal_color);
609 harvest_zombies(child);