4 #include <sys/select.h>
11 static int child_count;
12 static int parent_count;
13 static int job_request_fd[2];
14 static int job_get_permission_fd[2];
15 static unsigned int max_jobs;
16 static unsigned int job_count;
17 static unsigned int jobs_pending;
19 int get_child_count(void)
24 int get_parent_count(void)
29 static void sigchild_handler(int signal)
33 waitpid(0, &status, 0);
36 static int setup_sigchild_handler(void)
41 sa.sa_handler = sigchild_handler;
42 sa.sa_flags = SA_NOCLDSTOP;
44 ret = sigaction(SIGCHLD, &sa, NULL);
46 pr_err("Failed to setup SIGCHLD handler: %m\n");
51 static int cancel_sigchild_handler(void)
56 sa.sa_handler = SIG_DFL;
57 sa.sa_flags = SA_NOCLDSTOP;
59 ret = sigaction(SIGCHLD, &sa, NULL);
61 pr_err("Failed to cancel SIGCHLD handler: %m\n");
67 * Initialize the jobcontrol.
69 * Create the pipes that are used to grant children execution
70 * permissions. If max_jobs is zero, count the number of CPUs from
71 * /proc/cpuinfo and use that.
73 int init_jobcontrol(int max_jobs_requested)
75 struct epoll_event ev;
81 if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
82 pr_err("Failed to create pipe: %m\n");
86 if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
87 pr_err("Failed to create pipe: %m\n");
91 ret = setup_sigchild_handler();
95 if (max_jobs_requested > 0) {
96 max_jobs = max_jobs_requested;
101 file = fopen("/proc/cpuinfo", "ro");
103 pr_err("Failed to open /proc/cpuinfo: %m\n");
108 * The CPU count algorithm simply reads the first 8 bytes from
109 * the /proc/cpuinfo and then expects that line to be there as
110 * many times as there are CPUs.
112 ret = fread(match, 1, sizeof(match), file);
113 if (ret < sizeof(match)) {
114 pr_err("read %d bytes when expecting %zd %m\n",
119 while(fgets(buf, sizeof(buf), file)) {
120 if (!strncmp(buf, match, sizeof(match)))
129 pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
134 static int grant_new_job(void)
140 pr_info("Granting new job. %d jobs currently and %d pending\n",
141 job_count, jobs_pending);
143 ret = write(job_get_permission_fd[1], &byte, 1);
145 pr_err("Failed to write 1 byte: %m\n");
152 int poll_job_requests(int timeout)
154 struct epoll_event event;
160 struct epoll_event ev;
162 epollfd = epoll_create(1);
164 pr_err("Failed to epoll_create(): %m\n");
169 ev.data.fd = job_request_fd[0];
170 epoll_ctl(epollfd, EPOLL_CTL_ADD, job_request_fd[0], &ev);
173 /* Convert positive seconds to milliseconds */
174 timeout = timeout > 0 ? 1000 * timeout : timeout;
176 ret = epoll_wait(epollfd, &event, 1, timeout);
179 if (errno != EINTR) {
180 pr_err("epoll_wait: %m\n");
185 * If epoll_wait() was interrupted, better start
186 * everything again from the beginning
192 pr_info("Timed out\n");
196 ret = read(event.data.fd, &pid, sizeof(pid));
198 pr_err("Failed to read: %m\n");
203 pr_info("Read zero bytes\n");
208 if (job_count >= max_jobs) {
211 ret = grant_new_job();
214 } else if (pid < 0) {
215 if (job_count > max_jobs)
216 pr_err("BUG: Job %u jobs exceeded limit %u\n",
217 job_count, max_jobs);
219 pr_info("Job %d finished\n", -pid);
223 ret = grant_new_job();
229 pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
234 * Per process flag indicating whether this child has requested fork
235 * limiting. If it has, it must also tell the master parent when it
236 * has died so that the parent can give next pending job permission to
239 static int is_limited_fork;
246 pr_err("fork() failed: %m\n");
252 pr_info("Fork %d, child %d\n", child_count, child);
257 * Child processes may want to use waitpid() for synchronizing
258 * with their sub-childs. Disable the signal handler by
261 cancel_sigchild_handler();
264 * Also do not notify the master parent the death of this
265 * child. Only childs that have been created with
266 * do_fork_limited() can have this flag set.
270 /* reset child's child count */
276 static int request_fork(char request)
280 pid = request > 0 ? pid : -pid;
282 return write(job_request_fd[1], &pid, sizeof(pid));
285 static void limited_fork_exit_handler(void)
292 * Like do_fork(), but allow the child continue only after the global
293 * job count is low enough.
295 * We allow the parent to continue other more important activities but
296 * child respects the limit of global active processes.
298 int do_fork_limited(void)
307 /* Remember to notify the parent when we are done */
308 atexit(limited_fork_exit_handler);
311 pr_info("Requesting permission to go\n");
313 /* Signal the parent that we are here, waiting to go */
317 * The parent will tell us when we can continue. If there were
318 * multiple children waiting for their turn to run parent's
319 * write to the pipe will wake up every process reading
320 * it. However, only one will be reading the content and
321 * getting the permission to run. So we will read as many
322 * times as needed until we get our own permission to run.
325 ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
331 pr_info("Continuing\n");
335 int harvest_zombies(int pid)
339 if (child_count == 0)
343 pr_info("Waiting on pid %d, children left: %d\n", pid,
346 pid = waitpid(pid, &status, 0);
348 pr_err("Error on wait(): %m\n");
349 child_count--; /* Decrement child count anyway */
353 pr_info("pid %d: exit code %d. Children left: %d\n", pid,
354 status, child_count);
361 * Runs a command cmd with params argv, connects stdin and stdout to
364 * Returns the pid of the executed process
366 int run_piped(const char *cmd, char *const argv[],
367 int *stdinfd, int *stdoutfd, int *stderrfd)
369 int ifd[2], ofd[2], efd[2], pid;
371 pr_info("Running command %s\n", cmd);
373 if (stdinfd && pipe(ifd)) {
374 pr_err("pipe() failed: %m\n");
378 if (stdoutfd && pipe(ofd)) {
379 pr_err("pipe() failed: %m\n");
383 if (stderrfd && pipe(efd)) {
384 pr_err("pipe() failed: %m\n");
389 if (pid) { /* Parent side */
410 dup2(ifd[0], STDIN_FILENO);
415 dup2(ofd[1], STDOUT_FILENO);
420 dup2(efd[1], STDERR_FILENO);
423 /* Now we have redirected standard streams to parent process */
425 pr_err("Failed to execv command %s: %m\n", cmd);
432 * Runs a command cmd with params argv, connects stdin and stdout to
435 * Returns the pid of the executed process
437 int run_piped_stream(const char *cmd, char *const argv[],
438 FILE **stdinf, FILE **stdoutf, FILE **stderrf)
440 int ifd, ofd, efd, pid;
456 pid = run_piped(cmd, argv, i, o, e);
459 *stdinf = fdopen(ifd, "r");
460 if (*stdinf == NULL) {
461 pr_err("Error opening file stream for fd %d: %m\n",
468 *stdoutf = fdopen(ofd, "r");
469 if (*stdoutf == NULL) {
470 pr_err("Error opening file stream for fd %d: %m\n",
477 *stderrf = fdopen(efd, "r");
478 if (*stderrf == NULL) {
479 pr_err("Error opening file stream for fd %d: %m\n",
489 * Forks a child and executes a command to run on parallel
492 #define max(a,b) (a) < (b) ? (b) : (a)
493 #define BUF_SIZE (128*1024)
494 int run(const char *cmd, char *const argv[])
501 char stdoutstr[32], stderrstr[32], indent[16] = { " " };
503 indent[get_parent_count() + 1] = 0;
505 if ((child = do_fork()))
508 child = run_piped(cmd, argv, NULL, &ofd, &efd);
509 snprintf(stdoutstr, 32, "%sstdout", green_color);
510 snprintf(stderrstr, 32, "%sstderr", red_color);
522 maxfd = max(ofd, efd);
523 error = select(maxfd, &rfds, NULL, NULL, NULL);
526 pr_err("Error with select: %m\n");
530 if (FD_ISSET(ofd, &rfds)) {
532 bytes = read(ofd, rbuf, BUF_SIZE);
536 if (FD_ISSET(efd, &rfds)) {
538 bytes = read(efd, rbuf, BUF_SIZE);
542 pr_err("select() returned unknown fd\n");
547 pr_err("read() failed: %m\n");
552 * Workaround: When a process had die and it has only
553 * written to stderr, select() doesn't indicate that
554 * there might be something to read in stderr fd. To
555 * work around this issue, we try to read stderr just
556 * in case in order to ensure everything gets read.
559 bytes = read(efd, rbuf, BUF_SIZE);
568 fprintf(stderr, "%s[%5d %s] %s: %s%s\n", indent,
569 child, cmd, typestr, sptr, normal_color);
579 harvest_zombies(child);