4 #include <sys/select.h>
11 static int child_count;
12 static int parent_count;
13 static int job_request_fd[2];
14 static int job_get_permission_fd[2];
15 static unsigned int max_jobs;
16 static unsigned int job_count;
17 static unsigned int jobs_pending;
19 int get_child_count(void)
24 int get_parent_count(void)
29 static void sigchild_handler(int signal)
31 pr_info("Child has died, go harvest the zombie\n");
35 static int setup_sigchild_handler(void)
40 sa.sa_handler = sigchild_handler;
41 sa.sa_flags = SA_NOCLDSTOP;
43 ret = sigaction(SIGCHLD, &sa, NULL);
45 pr_err("Failed to setup SIGCHLD handler: %m\n");
50 static int cancel_sigchild_handler(void)
55 sa.sa_handler = SIG_DFL;
56 sa.sa_flags = SA_NOCLDSTOP;
58 ret = sigaction(SIGCHLD, &sa, NULL);
60 pr_err("Failed to cancel SIGCHLD handler: %m\n");
66 * Initialize the jobcontrol.
68 * Create the pipes that are used to grant children execution
69 * permissions. If max_jobs is zero, count the CPUs from /proc/cpuinfo
72 int init_max_jobs(int max_jobs_requested)
79 if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
80 pr_err("Failed to create pipe: %m\n");
84 if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
85 pr_err("Failed to create pipe: %m\n");
89 ret = setup_sigchild_handler();
93 if (max_jobs_requested > 0) {
94 max_jobs = max_jobs_requested;
99 file = fopen("/proc/cpuinfo", "ro");
101 pr_err("Failed to open /proc/cpuinfo: %m\n");
106 * The CPU count algorithm simply reads the first 8 bytes from
107 * the /proc/cpuinfo and then expects that line to be there as
108 * many times as there are CPUs.
110 ret = fread(match, 1, sizeof(match), file);
111 if (ret < sizeof(match)) {
112 pr_err("read %d bytes when expecting %zd %m\n",
117 while(fgets(buf, sizeof(buf), file)) {
118 if (!strncmp(buf, match, sizeof(match)))
127 pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
132 static int grant_new_job(void)
138 pr_info("Granting new job. %d jobs currently and %d pending\n",
139 job_count, jobs_pending);
141 ret = write(job_get_permission_fd[1], &byte, 1);
143 pr_err("Failed to write 1 byte: %m\n");
150 int poll_job_requests(int timeout)
152 struct epoll_event event;
158 struct epoll_event ev;
160 epollfd = epoll_create(1);
162 pr_err("Failed to epoll_create(): %m\n");
167 ev.data.fd = job_request_fd[0];
168 epoll_ctl(epollfd, EPOLL_CTL_ADD, job_request_fd[0], &ev);
171 /* Convert positive seconds to milliseconds */
172 timeout = timeout > 0 ? 1000 * timeout : timeout;
174 ret = epoll_wait(epollfd, &event, 1, timeout);
177 if (errno != EINTR) {
178 pr_err("epoll_wait: %m\n");
183 * If epoll_wait() was interrupted, better start
184 * everything again from the beginning
190 pr_info("Timed out\n");
194 ret = read(event.data.fd, &pid, sizeof(pid));
196 pr_err("Failed to read: %m\n");
201 pr_info("Read zero bytes\n");
206 if (job_count >= max_jobs) {
209 ret = grant_new_job();
212 } else if (pid < 0) {
213 if (job_count > max_jobs)
214 pr_err("BUG: Job %u jobs exceeded limit %u\n",
215 job_count, max_jobs);
217 pr_info("Job %d finished\n", -pid);
221 ret = grant_new_job();
227 pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
232 * Per process flag indicating whether this child has requested fork
233 * limiting. If it has, it must also tell the master parent when it
234 * has died so that the parent can give next pending job permission to
237 static int is_limited_fork;
244 pr_err("fork() failed: %m\n");
250 pr_info("Fork %d, child %d\n", child_count, child);
255 * Child processes may want to use waitpid() for synchronizing
256 * with their sub-childs. Disable the signal handler by
259 cancel_sigchild_handler();
262 * Also do not notify the master parent the death of this
263 * child. Only childs that have been created with
264 * do_fork_limited() can have this flag set.
268 /* reset child's child count */
274 static int request_fork(char request)
278 pid = request > 0 ? pid : -pid;
280 return write(job_request_fd[1], &pid, sizeof(pid));
283 static void limited_fork_exit_handler(void)
290 * Like do_fork(), but allow the child continue only after the global
291 * job count is low enough.
293 * We allow the parent to continue other more important activities but
294 * child respects the limit of global active processes.
296 int do_fork_limited(void)
305 /* Remember to notify the parent when we are done */
306 atexit(limited_fork_exit_handler);
309 pr_info("Requesting permission to go\n");
311 /* Signal the parent that we are here, waiting to go */
315 * The parent will tell us when we can continue. If there were
316 * multiple children waiting for their turn to run parent's
317 * write to the pipe will wake up every process reading
318 * it. However, only one will be reading the content and
319 * getting the permission to run. So we will read as many
320 * times as needed until we get our own permission to run.
323 ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
329 pr_info("Continuing\n");
333 int harvest_zombies(int pid)
337 if (child_count == 0)
341 pr_info("Waiting on pid %d, children left: %d\n", pid,
344 pid = waitpid(pid, &status, 0);
346 pr_err("Error on wait(): %m\n");
347 child_count--; /* Decrement child count anyway */
351 pr_info("pid %d: exit code %d. Children left: %d\n", pid,
352 status, child_count);
359 * Runs a command cmd with params argv, connects stdin and stdout to
362 * Returns the pid of the executed process
364 int run_piped(const char *cmd, char *const argv[],
365 int *stdinfd, int *stdoutfd, int *stderrfd)
367 int ifd[2], ofd[2], efd[2], pid;
369 pr_info("Running command %s\n", cmd);
371 if (stdinfd && pipe(ifd)) {
372 pr_err("pipe() failed: %m\n");
376 if (stdoutfd && pipe(ofd)) {
377 pr_err("pipe() failed: %m\n");
381 if (stderrfd && pipe(efd)) {
382 pr_err("pipe() failed: %m\n");
387 if (pid) { /* Parent side */
408 dup2(ifd[0], STDIN_FILENO);
413 dup2(ofd[1], STDOUT_FILENO);
418 dup2(efd[1], STDERR_FILENO);
421 /* Now we have redirected standard streams to parent process */
423 pr_err("Failed to execv command %s: %m\n", cmd);
430 * Runs a command cmd with params argv, connects stdin and stdout to
433 * Returns the pid of the executed process
435 int run_piped_stream(const char *cmd, char *const argv[],
436 FILE **stdinf, FILE **stdoutf, FILE **stderrf)
438 int ifd, ofd, efd, pid;
454 pid = run_piped(cmd, argv, i, o, e);
457 *stdinf = fdopen(ifd, "r");
458 if (*stdinf == NULL) {
459 pr_err("Error opening file stream for fd %d: %m\n",
466 *stdoutf = fdopen(ofd, "r");
467 if (*stdoutf == NULL) {
468 pr_err("Error opening file stream for fd %d: %m\n",
475 *stderrf = fdopen(efd, "r");
476 if (*stderrf == NULL) {
477 pr_err("Error opening file stream for fd %d: %m\n",
487 * Forks a child and executes a command to run on parallel
490 #define max(a,b) (a) < (b) ? (b) : (a)
491 #define BUF_SIZE (128*1024)
492 int run(const char *cmd, char *const argv[])
499 char stdoutstr[32], stderrstr[32], indent[16] = { " " };
501 indent[get_parent_count() + 1] = 0;
503 if ((child = do_fork()))
506 child = run_piped(cmd, argv, NULL, &ofd, &efd);
507 snprintf(stdoutstr, 32, "%sstdout", green_color);
508 snprintf(stderrstr, 32, "%sstderr", red_color);
520 maxfd = max(ofd, efd);
521 error = select(maxfd, &rfds, NULL, NULL, NULL);
524 pr_err("Error with select: %m\n");
528 if (FD_ISSET(ofd, &rfds)) {
530 bytes = read(ofd, rbuf, BUF_SIZE);
534 if (FD_ISSET(efd, &rfds)) {
536 bytes = read(efd, rbuf, BUF_SIZE);
540 pr_err("select() returned unknown fd\n");
545 pr_err("read() failed: %m\n");
550 * Workaround: When a process had die and it has only
551 * written to stderr, select() doesn't indicate that
552 * there might be something to read in stderr fd. To
553 * work around this issue, we try to read stderr just
554 * in case in order to ensure everything gets read.
557 bytes = read(efd, rbuf, BUF_SIZE);
566 fprintf(stderr, "%s[%5d %s] %s: %s%s\n", indent,
567 child, cmd, typestr, sptr, normal_color);
577 harvest_zombies(child);