4 #include <sys/select.h>
11 static int child_count;
12 static int parent_count;
13 static int job_request_fd[2];
14 static int job_get_permission_fd[2];
15 static unsigned int max_jobs;
16 static unsigned int job_count;
17 static unsigned int jobs_pending;
19 int get_child_count(void)
24 int get_parent_count(void)
29 static void sigchild_handler(int signal)
33 waitpid(0, &status, 0);
36 static int setup_sigchild_handler(void)
41 sa.sa_handler = sigchild_handler;
42 sa.sa_flags = SA_NOCLDSTOP;
44 ret = sigaction(SIGCHLD, &sa, NULL);
46 pr_err("Failed to setup SIGCHLD handler: %m\n");
51 static int cancel_sigchild_handler(void)
56 sa.sa_handler = SIG_DFL;
57 sa.sa_flags = SA_NOCLDSTOP;
59 ret = sigaction(SIGCHLD, &sa, NULL);
61 pr_err("Failed to cancel SIGCHLD handler: %m\n");
67 * Initialize the jobcontrol.
69 * Create the pipes that are used to grant children execution
70 * permissions. If max_jobs is zero, count the CPUs from /proc/cpuinfo
73 int init_max_jobs(int max_jobs_requested)
80 if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
81 pr_err("Failed to create pipe: %m\n");
85 if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
86 pr_err("Failed to create pipe: %m\n");
90 ret = setup_sigchild_handler();
94 if (max_jobs_requested > 0) {
95 max_jobs = max_jobs_requested;
100 file = fopen("/proc/cpuinfo", "ro");
102 pr_err("Failed to open /proc/cpuinfo: %m\n");
107 * The CPU count algorithm simply reads the first 8 bytes from
108 * the /proc/cpuinfo and then expects that line to be there as
109 * many times as there are CPUs.
111 ret = fread(match, 1, sizeof(match), file);
112 if (ret < sizeof(match)) {
113 pr_err("read %d bytes when expecting %zd %m\n",
118 while(fgets(buf, sizeof(buf), file)) {
119 if (!strncmp(buf, match, sizeof(match)))
128 pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
133 static int grant_new_job(void)
139 pr_info("Granting new job. %d jobs currently and %d pending\n",
140 job_count, jobs_pending);
142 ret = write(job_get_permission_fd[1], &byte, 1);
144 pr_err("Failed to write 1 byte: %m\n");
151 int poll_job_requests(int timeout)
153 struct epoll_event event;
159 struct epoll_event ev;
161 epollfd = epoll_create(1);
163 pr_err("Failed to epoll_create(): %m\n");
168 ev.data.fd = job_request_fd[0];
169 epoll_ctl(epollfd, EPOLL_CTL_ADD, job_request_fd[0], &ev);
172 /* Convert positive seconds to milliseconds */
173 timeout = timeout > 0 ? 1000 * timeout : timeout;
175 ret = epoll_wait(epollfd, &event, 1, timeout);
178 if (errno != EINTR) {
179 pr_err("epoll_wait: %m\n");
184 * If epoll_wait() was interrupted, better start
185 * everything again from the beginning
191 pr_info("Timed out\n");
195 ret = read(event.data.fd, &pid, sizeof(pid));
197 pr_err("Failed to read: %m\n");
202 pr_info("Read zero bytes\n");
207 if (job_count >= max_jobs) {
210 ret = grant_new_job();
213 } else if (pid < 0) {
214 if (job_count > max_jobs)
215 pr_err("BUG: Job %u jobs exceeded limit %u\n",
216 job_count, max_jobs);
218 pr_info("Job %d finished\n", -pid);
222 ret = grant_new_job();
228 pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
233 * Per process flag indicating whether this child has requested fork
234 * limiting. If it has, it must also tell the master parent when it
235 * has died so that the parent can give next pending job permission to
238 static int is_limited_fork;
245 pr_err("fork() failed: %m\n");
251 pr_info("Fork %d, child %d\n", child_count, child);
256 * Child processes may want to use waitpid() for synchronizing
257 * with their sub-childs. Disable the signal handler by
260 cancel_sigchild_handler();
263 * Also do not notify the master parent the death of this
264 * child. Only childs that have been created with
265 * do_fork_limited() can have this flag set.
269 /* reset child's child count */
275 static int request_fork(char request)
279 pid = request > 0 ? pid : -pid;
281 return write(job_request_fd[1], &pid, sizeof(pid));
284 static void limited_fork_exit_handler(void)
291 * Like do_fork(), but allow the child continue only after the global
292 * job count is low enough.
294 * We allow the parent to continue other more important activities but
295 * child respects the limit of global active processes.
297 int do_fork_limited(void)
306 /* Remember to notify the parent when we are done */
307 atexit(limited_fork_exit_handler);
310 pr_info("Requesting permission to go\n");
312 /* Signal the parent that we are here, waiting to go */
316 * The parent will tell us when we can continue. If there were
317 * multiple children waiting for their turn to run parent's
318 * write to the pipe will wake up every process reading
319 * it. However, only one will be reading the content and
320 * getting the permission to run. So we will read as many
321 * times as needed until we get our own permission to run.
324 ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
330 pr_info("Continuing\n");
334 int harvest_zombies(int pid)
338 if (child_count == 0)
342 pr_info("Waiting on pid %d, children left: %d\n", pid,
345 pid = waitpid(pid, &status, 0);
347 pr_err("Error on wait(): %m\n");
348 child_count--; /* Decrement child count anyway */
352 pr_info("pid %d: exit code %d. Children left: %d\n", pid,
353 status, child_count);
360 * Runs a command cmd with params argv, connects stdin and stdout to
363 * Returns the pid of the executed process
365 int run_piped(const char *cmd, char *const argv[],
366 int *stdinfd, int *stdoutfd, int *stderrfd)
368 int ifd[2], ofd[2], efd[2], pid;
370 pr_info("Running command %s\n", cmd);
372 if (stdinfd && pipe(ifd)) {
373 pr_err("pipe() failed: %m\n");
377 if (stdoutfd && pipe(ofd)) {
378 pr_err("pipe() failed: %m\n");
382 if (stderrfd && pipe(efd)) {
383 pr_err("pipe() failed: %m\n");
388 if (pid) { /* Parent side */
409 dup2(ifd[0], STDIN_FILENO);
414 dup2(ofd[1], STDOUT_FILENO);
419 dup2(efd[1], STDERR_FILENO);
422 /* Now we have redirected standard streams to parent process */
424 pr_err("Failed to execv command %s: %m\n", cmd);
431 * Runs a command cmd with params argv, connects stdin and stdout to
434 * Returns the pid of the executed process
436 int run_piped_stream(const char *cmd, char *const argv[],
437 FILE **stdinf, FILE **stdoutf, FILE **stderrf)
439 int ifd, ofd, efd, pid;
455 pid = run_piped(cmd, argv, i, o, e);
458 *stdinf = fdopen(ifd, "r");
459 if (*stdinf == NULL) {
460 pr_err("Error opening file stream for fd %d: %m\n",
467 *stdoutf = fdopen(ofd, "r");
468 if (*stdoutf == NULL) {
469 pr_err("Error opening file stream for fd %d: %m\n",
476 *stderrf = fdopen(efd, "r");
477 if (*stderrf == NULL) {
478 pr_err("Error opening file stream for fd %d: %m\n",
488 * Forks a child and executes a command to run on parallel
491 #define max(a,b) (a) < (b) ? (b) : (a)
492 #define BUF_SIZE (128*1024)
493 int run(const char *cmd, char *const argv[])
500 char stdoutstr[32], stderrstr[32], indent[16] = { " " };
502 indent[get_parent_count() + 1] = 0;
504 if ((child = do_fork()))
507 child = run_piped(cmd, argv, NULL, &ofd, &efd);
508 snprintf(stdoutstr, 32, "%sstdout", green_color);
509 snprintf(stderrstr, 32, "%sstderr", red_color);
521 maxfd = max(ofd, efd);
522 error = select(maxfd, &rfds, NULL, NULL, NULL);
525 pr_err("Error with select: %m\n");
529 if (FD_ISSET(ofd, &rfds)) {
531 bytes = read(ofd, rbuf, BUF_SIZE);
535 if (FD_ISSET(efd, &rfds)) {
537 bytes = read(efd, rbuf, BUF_SIZE);
541 pr_err("select() returned unknown fd\n");
546 pr_err("read() failed: %m\n");
551 * Workaround: When a process had die and it has only
552 * written to stderr, select() doesn't indicate that
553 * there might be something to read in stderr fd. To
554 * work around this issue, we try to read stderr just
555 * in case in order to ensure everything gets read.
558 bytes = read(efd, rbuf, BUF_SIZE);
567 fprintf(stderr, "%s[%5d %s] %s: %s%s\n", indent,
568 child, cmd, typestr, sptr, normal_color);
578 harvest_zombies(child);