4 #include <sys/select.h>
8 #include <sys/resource.h>
13 static int child_count;
14 static int parent_count;
15 static int job_request_fd[2];
16 static int job_get_permission_fd[2];
18 static unsigned int max_jobs;
19 static unsigned int job_count;
20 static unsigned int jobs_pending;
22 int get_child_count(void)
27 int get_parent_count(void)
32 static void sigchild_handler(int signal)
36 waitpid(0, &status, 0);
39 static int setup_sigchild_handler(void)
44 sa.sa_handler = sigchild_handler;
45 sa.sa_flags = SA_NOCLDSTOP;
47 ret = sigaction(SIGCHLD, &sa, NULL);
49 pr_err("Failed to setup SIGCHLD handler: %m\n");
54 static int cancel_sigchild_handler(void)
59 sa.sa_handler = SIG_DFL;
60 sa.sa_flags = SA_NOCLDSTOP;
62 ret = sigaction(SIGCHLD, &sa, NULL);
64 pr_err("Failed to cancel SIGCHLD handler: %m\n");
70 * Initialize the jobcontrol.
72 * Create the pipes that are used to grant children execution
73 * permissions. If max_jobs is zero, count the number of CPUs from
74 * /proc/cpuinfo and use that.
76 int init_jobcontrol(int max_jobs_requested)
78 struct epoll_event ev;
84 if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
85 pr_err("Failed to create pipe: %m\n");
89 if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
90 pr_err("Failed to create pipe: %m\n");
94 ret = setup_sigchild_handler();
98 epoll_fd = epoll_create(1);
100 pr_err("Failed to epoll_create(): %m\n");
105 ev.data.fd = job_request_fd[0];
106 ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, job_request_fd[0], &ev);
108 pr_err("Failed to add epoll_fd: %m\n");
112 if (max_jobs_requested > 0) {
113 max_jobs = max_jobs_requested;
118 file = fopen("/proc/cpuinfo", "ro");
120 pr_err("Failed to open /proc/cpuinfo: %m\n");
125 * The CPU count algorithm simply reads the first 8 bytes from
126 * the /proc/cpuinfo and then expects that line to be there as
127 * many times as there are CPUs.
129 ret = fread(match, 1, sizeof(match), file);
130 if (ret < sizeof(match)) {
131 pr_err("read %d bytes when expecting %zd %m\n",
136 while(fgets(buf, sizeof(buf), file)) {
137 if (!strncmp(buf, match, sizeof(match)))
146 pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
151 static int grant_new_job(void)
157 pr_info("Granting new job. %d jobs currently and %d pending\n",
158 job_count, jobs_pending);
160 ret = write(job_get_permission_fd[1], &byte, 1);
162 pr_err("Failed to write 1 byte: %m\n");
169 static int handle_job_request(void)
173 ret = read(job_request_fd[0], &pid, sizeof(pid));
175 pr_err("Failed to read: %m\n");
180 pr_info("Read zero bytes\n");
185 if (job_count >= max_jobs) {
188 ret = grant_new_job();
191 } else if (pid < 0) {
192 if (job_count > max_jobs)
193 pr_err("BUG: Job %u jobs exceeded limit %u\n",
194 job_count, max_jobs);
196 pr_info("Job %d finished\n", -pid);
200 ret = grant_new_job();
208 int poll_job_requests(int timeout)
210 struct epoll_event event;
213 /* Convert positive seconds to milliseconds */
214 timeout = timeout > 0 ? 1000 * timeout : timeout;
216 ret = epoll_wait(epoll_fd, &event, 1, timeout);
219 if (errno != EINTR) {
220 pr_err("epoll_wait: %m\n");
225 * If epoll_wait() was interrupted, better start
226 * everything again from the beginning
232 pr_info("Timed out\n");
236 if (event.data.fd == job_request_fd[0])
237 handle_job_request();
239 pr_err("Unknown fd: %d\n", event.data.fd);
242 pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
247 * Per process flag indicating whether this child has requested fork
248 * limiting. If it has, it must also tell the master parent when it
249 * has died so that the parent can give next pending job permission to
252 static int is_limited_fork;
259 pr_err("fork() failed: %m\n");
265 pr_info("Fork %d, child %d\n", child_count, child);
270 * Child processes may want to use waitpid() for synchronizing
271 * with their sub-childs. Disable the signal handler by
274 cancel_sigchild_handler();
277 * Also do not notify the master parent the death of this
278 * child. Only childs that have been created with
279 * do_fork_limited() can have this flag set.
283 /* reset child's child count */
289 static int request_fork(char request)
293 pid = request > 0 ? pid : -pid;
295 return write(job_request_fd[1], &pid, sizeof(pid));
298 static void limited_fork_exit_handler(void)
305 * Like do_fork(), but allow the child continue only after the global
306 * job count is low enough.
308 * We allow the parent to continue other more important activities but
309 * child respects the limit of global active processes.
311 int do_fork_limited(void)
320 /* Remember to notify the parent when we are done */
321 atexit(limited_fork_exit_handler);
324 pr_info("Requesting permission to go\n");
326 /* Signal the parent that we are here, waiting to go */
330 * The parent will tell us when we can continue. If there were
331 * multiple children waiting for their turn to run parent's
332 * write to the pipe will wake up every process reading
333 * it. However, only one will be reading the content and
334 * getting the permission to run. So we will read as many
335 * times as needed until we get our own permission to run.
338 ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
344 pr_info("Continuing\n");
348 int harvest_zombies(int pid)
351 struct rusage rusage;
352 char *status_str = NULL;
355 if (child_count == 0)
359 pr_info("Waiting on pid %d, children left: %d\n", pid,
363 pid = wait4(pid, &status, 0, &rusage);
365 pr_err("Error on waitid(): %m\n");
368 /* Wait until the child has become a zombie */
369 } while (!WIFEXITED(status) && !WIFSIGNALED(status));
372 if (WIFEXITED(status)) {
373 status_str = "exited with status";
374 code = WEXITSTATUS(status);
375 } else if (WIFSIGNALED(status)) {
376 status_str = "killed by signal";
377 code = WTERMSIG(status);
379 pr_info("pid %d: %s %d. Children left: %d\n", pid,
380 status_str, code, child_count);
381 pr_info("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
382 (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
383 (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
389 * Runs a command cmd with params argv, connects stdin and stdout to
392 * Returns the pid of the executed process
394 int run_piped(const char *cmd, char *const argv[],
395 int *stdinfd, int *stdoutfd, int *stderrfd)
397 int ifd[2], ofd[2], efd[2], pid;
399 pr_info("Running command %s\n", cmd);
401 if (stdinfd && pipe(ifd)) {
402 pr_err("pipe() failed: %m\n");
406 if (stdoutfd && pipe(ofd)) {
407 pr_err("pipe() failed: %m\n");
411 if (stderrfd && pipe(efd)) {
412 pr_err("pipe() failed: %m\n");
417 if (pid) { /* Parent side */
438 dup2(ifd[0], STDIN_FILENO);
443 dup2(ofd[1], STDOUT_FILENO);
448 dup2(efd[1], STDERR_FILENO);
451 /* Now we have redirected standard streams to parent process */
453 pr_err("Failed to execv command %s: %m\n", cmd);
460 * Runs a command cmd with params argv, connects stdin and stdout to
463 * Returns the pid of the executed process
465 int run_piped_stream(const char *cmd, char *const argv[],
466 FILE **stdinf, FILE **stdoutf, FILE **stderrf)
468 int ifd, ofd, efd, pid;
484 pid = run_piped(cmd, argv, i, o, e);
487 *stdinf = fdopen(ifd, "r");
488 if (*stdinf == NULL) {
489 pr_err("Error opening file stream for fd %d: %m\n",
496 *stdoutf = fdopen(ofd, "r");
497 if (*stdoutf == NULL) {
498 pr_err("Error opening file stream for fd %d: %m\n",
505 *stderrf = fdopen(efd, "r");
506 if (*stderrf == NULL) {
507 pr_err("Error opening file stream for fd %d: %m\n",
517 * Forks a child and executes a command to run on parallel
520 #define max(a,b) (a) < (b) ? (b) : (a)
521 #define BUF_SIZE (128*1024)
522 int run(const char *cmd, char *const argv[])
529 char stdoutstr[32], stderrstr[32], indent[16] = { " " };
531 indent[get_parent_count() + 1] = 0;
533 if ((child = do_fork()))
536 child = run_piped(cmd, argv, NULL, &ofd, &efd);
537 snprintf(stdoutstr, 32, "%sstdout", green_color);
538 snprintf(stderrstr, 32, "%sstderr", red_color);
550 maxfd = max(ofd, efd);
551 error = select(maxfd, &rfds, NULL, NULL, NULL);
554 pr_err("Error with select: %m\n");
558 if (FD_ISSET(ofd, &rfds)) {
560 bytes = read(ofd, rbuf, BUF_SIZE);
564 if (FD_ISSET(efd, &rfds)) {
566 bytes = read(efd, rbuf, BUF_SIZE);
570 pr_err("select() returned unknown fd\n");
575 pr_err("read() failed: %m\n");
580 * Workaround: When a process had die and it has only
581 * written to stderr, select() doesn't indicate that
582 * there might be something to read in stderr fd. To
583 * work around this issue, we try to read stderr just
584 * in case in order to ensure everything gets read.
587 bytes = read(efd, rbuf, BUF_SIZE);
596 fprintf(stderr, "%s[%5d %s] %s: %s%s\n", indent,
597 child, cmd, typestr, sptr, normal_color);
607 harvest_zombies(child);