4 #include <sys/select.h>
11 static int child_count;
12 static int parent_count;
13 static int job_request_fd[2];
14 static int job_get_permission_fd[2];
16 static unsigned int max_jobs;
17 static unsigned int job_count;
18 static unsigned int jobs_pending;
20 int get_child_count(void)
25 int get_parent_count(void)
30 static void sigchild_handler(int signal)
34 waitpid(0, &status, 0);
37 static int setup_sigchild_handler(void)
42 sa.sa_handler = sigchild_handler;
43 sa.sa_flags = SA_NOCLDSTOP;
45 ret = sigaction(SIGCHLD, &sa, NULL);
47 pr_err("Failed to setup SIGCHLD handler: %m\n");
52 static int cancel_sigchild_handler(void)
57 sa.sa_handler = SIG_DFL;
58 sa.sa_flags = SA_NOCLDSTOP;
60 ret = sigaction(SIGCHLD, &sa, NULL);
62 pr_err("Failed to cancel SIGCHLD handler: %m\n");
68 * Initialize the jobcontrol.
70 * Create the pipes that are used to grant children execution
71 * permissions. If max_jobs is zero, count the number of CPUs from
72 * /proc/cpuinfo and use that.
74 int init_jobcontrol(int max_jobs_requested)
76 struct epoll_event ev;
82 if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
83 pr_err("Failed to create pipe: %m\n");
87 if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
88 pr_err("Failed to create pipe: %m\n");
92 ret = setup_sigchild_handler();
96 epoll_fd = epoll_create(1);
98 pr_err("Failed to epoll_create(): %m\n");
103 ev.data.fd = job_request_fd[0];
104 ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, job_request_fd[0], &ev);
106 pr_err("Failed to add epoll_fd: %m\n");
110 if (max_jobs_requested > 0) {
111 max_jobs = max_jobs_requested;
116 file = fopen("/proc/cpuinfo", "ro");
118 pr_err("Failed to open /proc/cpuinfo: %m\n");
123 * The CPU count algorithm simply reads the first 8 bytes from
124 * the /proc/cpuinfo and then expects that line to be there as
125 * many times as there are CPUs.
127 ret = fread(match, 1, sizeof(match), file);
128 if (ret < sizeof(match)) {
129 pr_err("read %d bytes when expecting %zd %m\n",
134 while(fgets(buf, sizeof(buf), file)) {
135 if (!strncmp(buf, match, sizeof(match)))
144 pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
149 static int grant_new_job(void)
155 pr_info("Granting new job. %d jobs currently and %d pending\n",
156 job_count, jobs_pending);
158 ret = write(job_get_permission_fd[1], &byte, 1);
160 pr_err("Failed to write 1 byte: %m\n");
167 static int handle_job_request(void)
171 ret = read(job_request_fd[0], &pid, sizeof(pid));
173 pr_err("Failed to read: %m\n");
178 pr_info("Read zero bytes\n");
183 if (job_count >= max_jobs) {
186 ret = grant_new_job();
189 } else if (pid < 0) {
190 if (job_count > max_jobs)
191 pr_err("BUG: Job %u jobs exceeded limit %u\n",
192 job_count, max_jobs);
194 pr_info("Job %d finished\n", -pid);
198 ret = grant_new_job();
206 int poll_job_requests(int timeout)
208 struct epoll_event event;
211 /* Convert positive seconds to milliseconds */
212 timeout = timeout > 0 ? 1000 * timeout : timeout;
214 ret = epoll_wait(epoll_fd, &event, 1, timeout);
217 if (errno != EINTR) {
218 pr_err("epoll_wait: %m\n");
223 * If epoll_wait() was interrupted, better start
224 * everything again from the beginning
230 pr_info("Timed out\n");
234 if (event.data.fd == job_request_fd[0])
235 handle_job_request();
237 pr_err("Unknown fd: %d\n", event.data.fd);
240 pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
245 * Per process flag indicating whether this child has requested fork
246 * limiting. If it has, it must also tell the master parent when it
247 * has died so that the parent can give next pending job permission to
250 static int is_limited_fork;
257 pr_err("fork() failed: %m\n");
263 pr_info("Fork %d, child %d\n", child_count, child);
268 * Child processes may want to use waitpid() for synchronizing
269 * with their sub-childs. Disable the signal handler by
272 cancel_sigchild_handler();
275 * Also do not notify the master parent the death of this
276 * child. Only childs that have been created with
277 * do_fork_limited() can have this flag set.
281 /* reset child's child count */
287 static int request_fork(char request)
291 pid = request > 0 ? pid : -pid;
293 return write(job_request_fd[1], &pid, sizeof(pid));
296 static void limited_fork_exit_handler(void)
303 * Like do_fork(), but allow the child continue only after the global
304 * job count is low enough.
306 * We allow the parent to continue other more important activities but
307 * child respects the limit of global active processes.
309 int do_fork_limited(void)
318 /* Remember to notify the parent when we are done */
319 atexit(limited_fork_exit_handler);
322 pr_info("Requesting permission to go\n");
324 /* Signal the parent that we are here, waiting to go */
328 * The parent will tell us when we can continue. If there were
329 * multiple children waiting for their turn to run parent's
330 * write to the pipe will wake up every process reading
331 * it. However, only one will be reading the content and
332 * getting the permission to run. So we will read as many
333 * times as needed until we get our own permission to run.
336 ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
342 pr_info("Continuing\n");
346 int harvest_zombies(int pid)
350 if (child_count == 0)
354 pr_info("Waiting on pid %d, children left: %d\n", pid,
357 pid = waitpid(pid, &status, 0);
359 pr_err("Error on wait(): %m\n");
360 child_count--; /* Decrement child count anyway */
364 pr_info("pid %d: exit code %d. Children left: %d\n", pid,
365 status, child_count);
372 * Runs a command cmd with params argv, connects stdin and stdout to
375 * Returns the pid of the executed process
377 int run_piped(const char *cmd, char *const argv[],
378 int *stdinfd, int *stdoutfd, int *stderrfd)
380 int ifd[2], ofd[2], efd[2], pid;
382 pr_info("Running command %s\n", cmd);
384 if (stdinfd && pipe(ifd)) {
385 pr_err("pipe() failed: %m\n");
389 if (stdoutfd && pipe(ofd)) {
390 pr_err("pipe() failed: %m\n");
394 if (stderrfd && pipe(efd)) {
395 pr_err("pipe() failed: %m\n");
400 if (pid) { /* Parent side */
421 dup2(ifd[0], STDIN_FILENO);
426 dup2(ofd[1], STDOUT_FILENO);
431 dup2(efd[1], STDERR_FILENO);
434 /* Now we have redirected standard streams to parent process */
436 pr_err("Failed to execv command %s: %m\n", cmd);
443 * Runs a command cmd with params argv, connects stdin and stdout to
446 * Returns the pid of the executed process
448 int run_piped_stream(const char *cmd, char *const argv[],
449 FILE **stdinf, FILE **stdoutf, FILE **stderrf)
451 int ifd, ofd, efd, pid;
467 pid = run_piped(cmd, argv, i, o, e);
470 *stdinf = fdopen(ifd, "r");
471 if (*stdinf == NULL) {
472 pr_err("Error opening file stream for fd %d: %m\n",
479 *stdoutf = fdopen(ofd, "r");
480 if (*stdoutf == NULL) {
481 pr_err("Error opening file stream for fd %d: %m\n",
488 *stderrf = fdopen(efd, "r");
489 if (*stderrf == NULL) {
490 pr_err("Error opening file stream for fd %d: %m\n",
500 * Forks a child and executes a command to run on parallel
503 #define max(a,b) (a) < (b) ? (b) : (a)
504 #define BUF_SIZE (128*1024)
505 int run(const char *cmd, char *const argv[])
512 char stdoutstr[32], stderrstr[32], indent[16] = { " " };
514 indent[get_parent_count() + 1] = 0;
516 if ((child = do_fork()))
519 child = run_piped(cmd, argv, NULL, &ofd, &efd);
520 snprintf(stdoutstr, 32, "%sstdout", green_color);
521 snprintf(stderrstr, 32, "%sstderr", red_color);
533 maxfd = max(ofd, efd);
534 error = select(maxfd, &rfds, NULL, NULL, NULL);
537 pr_err("Error with select: %m\n");
541 if (FD_ISSET(ofd, &rfds)) {
543 bytes = read(ofd, rbuf, BUF_SIZE);
547 if (FD_ISSET(efd, &rfds)) {
549 bytes = read(efd, rbuf, BUF_SIZE);
553 pr_err("select() returned unknown fd\n");
558 pr_err("read() failed: %m\n");
563 * Workaround: When a process had die and it has only
564 * written to stderr, select() doesn't indicate that
565 * there might be something to read in stderr fd. To
566 * work around this issue, we try to read stderr just
567 * in case in order to ensure everything gets read.
570 bytes = read(efd, rbuf, BUF_SIZE);
579 fprintf(stderr, "%s[%5d %s] %s: %s%s\n", indent,
580 child, cmd, typestr, sptr, normal_color);
590 harvest_zombies(child);