4 #include <sys/select.h>
8 #include <sys/signalfd.h>
9 #include <sys/resource.h>
14 static int child_count;
15 static int parent_count;
16 static int job_request_fd[2];
17 static int job_get_permission_fd[2];
19 static unsigned int max_jobs;
20 static unsigned int job_count;
21 static unsigned int jobs_pending;
22 static unsigned int max_jobs_pending;
24 int get_child_count(void)
29 int get_parent_count(void)
34 static int handle_signals(struct event_handler *h)
36 struct signalfd_siginfo siginfo;
39 ret = read(h->fd, &siginfo, sizeof(siginfo));
40 if (ret < sizeof(siginfo)) {
41 pr_err("Expected %zd from read, got %d: %m\n",
42 sizeof(siginfo), ret);
46 if (siginfo.ssi_signo != SIGCHLD) {
47 pr_err("Unexpected signal %d, ignoring\n", siginfo.ssi_signo);
51 harvest_zombies(siginfo.ssi_pid);
56 static int grant_new_job(void)
62 pr_info("Granting new job. %d jobs currently and %d pending\n",
63 job_count, jobs_pending);
65 ret = write(job_get_permission_fd[1], &byte, 1);
67 pr_err("Failed to write 1 byte: %m\n");
74 static int deny_job(void)
79 pr_info("Denying new job. %d jobs currently and %d pending, "
80 "limit of pending jobs is %d\n",
81 job_count, jobs_pending, max_jobs_pending);
83 ret = write(job_get_permission_fd[1], &byte, 1);
85 pr_err("Failed to write 1 byte: %m\n");
92 static int handle_job_request(struct event_handler *h)
96 ret = read(job_request_fd[0], &pid, sizeof(pid));
98 pr_err("Failed to read: %m\n");
103 pr_info("Read zero bytes\n");
108 if (job_count >= max_jobs) {
109 if (jobs_pending < max_jobs_pending)
114 ret = grant_new_job();
117 } else if (pid < 0) {
118 if (job_count > max_jobs)
119 pr_err("BUG: Job %u jobs exceeded limit %u\n",
120 job_count, max_jobs);
122 pr_info("Job %d finished\n", -pid);
126 ret = grant_new_job();
134 struct event_handler signal_handler = {
135 .handle_event = handle_signals,
140 struct event_handler job_request_handler = {
141 .handle_event = handle_job_request,
143 .name = "job_request",
147 * Initialize the jobcontrol.
149 * Create the pipes that are used to grant children execution
150 * permissions. If max_jobs is zero, count the number of CPUs from
151 * /proc/cpuinfo and use that.
153 int init_jobcontrol(int max_jobs_requested)
161 if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
162 pr_err("Failed to create pipe: %m\n");
166 if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
167 pr_err("Failed to create pipe: %m\n");
171 epoll_fd = epoll_create(1);
172 if (epoll_fd == -1) {
173 pr_err("Failed to epoll_create(): %m\n");
177 job_request_handler.fd = job_request_fd[0];
178 register_event_handler(&job_request_handler);
180 sigemptyset(&sigmask);
181 sigaddset(&sigmask, SIGCHLD);
183 /* Block SIGCHLD so that it becomes readable via signalfd */
184 ret = sigprocmask(SIG_BLOCK, &sigmask, NULL);
186 pr_err("Failed to sigprocmask: %m\n");
189 signal_handler.fd = signalfd(-1, &sigmask, SFD_CLOEXEC);
190 if (job_request_handler.fd < 0) {
191 pr_err("Failed to create signal_fd: %m\n");
195 register_event_handler(&signal_handler);
197 if (max_jobs_requested > 0) {
198 max_jobs = max_jobs_requested;
203 file = fopen("/proc/cpuinfo", "ro");
205 pr_err("Failed to open /proc/cpuinfo: %m\n");
210 * The CPU count algorithm simply reads the first 8 bytes from
211 * the /proc/cpuinfo and then expects that line to be there as
212 * many times as there are CPUs.
214 ret = fread(match, 1, sizeof(match), file);
215 if (ret < sizeof(match)) {
216 pr_err("read %d bytes when expecting %zd %m\n",
221 while(fgets(buf, sizeof(buf), file)) {
222 if (!strncmp(buf, match, sizeof(match)))
231 pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
233 max_jobs_pending = max_jobs * 50 + 25;
234 pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
239 int poll_job_requests(int timeout)
241 struct epoll_event event;
242 struct event_handler *job_handler;
245 /* Convert positive seconds to milliseconds */
246 timeout = timeout > 0 ? 1000 * timeout : timeout;
248 ret = epoll_wait(epoll_fd, &event, 1, timeout);
251 if (errno != EINTR) {
252 pr_err("epoll_wait: %m\n");
257 * If epoll_wait() was interrupted, better start
258 * everything again from the beginning
264 pr_info("Timed out\n");
268 job_handler = event.data.ptr;
270 if (!job_handler || !job_handler->handle_event) {
271 pr_err("Corrupted event handler for fd %d\n",
276 pr_info("Running handler %s to handle events from fd %d\n",
277 job_handler->name, job_handler->fd);
278 job_handler->handle_event(job_handler);
281 pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
286 * Per process flag indicating whether this child has requested fork
287 * limiting. If it has, it must also tell the master parent when it
288 * has died so that the parent can give next pending job permission to
291 static int is_limited_fork;
298 pr_err("fork() failed: %m\n");
304 pr_info("Fork %d, child %d\n", child_count, child);
309 * Also do not notify the master parent the death of this
310 * child. Only childs that have been created with
311 * do_fork_limited() can have this flag set.
316 * Close unused ends of the job control pipes. Only the parent
317 * which controls the jobs may have the write end open of the
318 * job_get_permission_fd and the read end of the
319 * job_request_fd. Failing to close the pipe ends properly
320 * will cause the childs to wait forever for the run
321 * permission in case parent dies prematurely.
323 * Note! The file descriptor must be closed once and only
324 * once. They are marked to -1 to make it impossible for
325 * subseqent do_fork() calls from closing them again (in which
326 * case some other file descriptor might already be reserved
327 * for the same number) and prevent accidentally closing some
328 * innocent file descriptors that are still in use.
330 if (job_get_permission_fd[1] >= 0) {
331 close(job_get_permission_fd[1]);
332 job_get_permission_fd[1] = -1;
334 if (job_request_fd[0] >= 0) {
335 close(job_request_fd[0]);
336 job_request_fd[0] = -1;
339 /* reset child's child count */
345 static int request_fork(int request)
349 pid = request > 0 ? pid : -pid;
351 return write(job_request_fd[1], &pid, sizeof(pid));
354 static void limited_fork_exit_handler(void)
361 * Like do_fork(), but allow the child continue only after the global
362 * job count is low enough.
364 * We allow the parent to continue other more important activities but
365 * child respects the limit of global active processes.
367 int do_fork_limited(void)
376 /* Remember to notify the parent when we are done */
377 atexit(limited_fork_exit_handler);
380 pr_info("Requesting permission to go\n");
382 /* Signal the parent that we are here, waiting to go */
386 * The parent will tell us when we can continue. If there were
387 * multiple children waiting for their turn to run only one
388 * will be reading the content byte from the pipe and getting
389 * the permission to run.
391 ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
393 pr_err("Error requesting run, did the parent die?\n");
396 pr_err("Job control request failure: %m\n");
399 pr_info("Did not get permission to execute. Terminating\n");
402 * Avoid running exit handler, that would tell the
403 * parent we died normally and decrement the job
409 pr_info("Continuing\n");
413 int harvest_zombies(int pid)
416 struct rusage rusage;
417 char *status_str = NULL;
420 if (child_count == 0)
424 pr_info("Waiting on pid %d, children left: %d\n", pid,
428 pid = wait4(pid, &status, 0, &rusage);
430 pr_err("Error on waitid(): %m\n");
433 /* Wait until the child has become a zombie */
434 } while (!WIFEXITED(status) && !WIFSIGNALED(status));
437 if (WIFEXITED(status)) {
438 status_str = "exited with status";
439 code = WEXITSTATUS(status);
440 } else if (WIFSIGNALED(status)) {
441 status_str = "killed by signal";
442 code = WTERMSIG(status);
444 pr_info("pid %d: %s %d. Children left: %d\n", pid,
445 status_str, code, child_count);
446 pr_info("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
447 (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
448 (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
454 * Runs a command cmd with params argv, connects stdin and stdout to
457 * Returns the pid of the executed process
459 int run_piped(const char *cmd, char *const argv[],
460 int *stdinfd, int *stdoutfd, int *stderrfd)
462 int ifd[2], ofd[2], efd[2], pid;
464 pr_info("Running command %s\n", cmd);
466 if (stdinfd && pipe(ifd)) {
467 pr_err("pipe() failed: %m\n");
471 if (stdoutfd && pipe(ofd)) {
472 pr_err("pipe() failed: %m\n");
476 if (stderrfd && pipe(efd)) {
477 pr_err("pipe() failed: %m\n");
482 if (pid) { /* Parent side */
503 dup2(ifd[0], STDIN_FILENO);
508 dup2(ofd[1], STDOUT_FILENO);
513 dup2(efd[1], STDERR_FILENO);
516 /* Now we have redirected standard streams to parent process */
518 pr_err("Failed to execv command %s: %m\n", cmd);
525 * Runs a command cmd with params argv, connects stdin and stdout to
528 * Returns the pid of the executed process
530 int run_piped_stream(const char *cmd, char *const argv[],
531 FILE **stdinf, FILE **stdoutf, FILE **stderrf)
533 int ifd, ofd, efd, pid;
549 pid = run_piped(cmd, argv, i, o, e);
552 *stdinf = fdopen(ifd, "r");
553 if (*stdinf == NULL) {
554 pr_err("Error opening file stream for fd %d: %m\n",
561 *stdoutf = fdopen(ofd, "r");
562 if (*stdoutf == NULL) {
563 pr_err("Error opening file stream for fd %d: %m\n",
570 *stderrf = fdopen(efd, "r");
571 if (*stderrf == NULL) {
572 pr_err("Error opening file stream for fd %d: %m\n",
582 * Forks a child and executes a command to run on parallel
585 #define max(a,b) (a) < (b) ? (b) : (a)
586 #define BUF_SIZE (128*1024)
587 int run(const char *cmd, char *const argv[])
595 if ((child = do_fork()))
598 child = run_piped(cmd, argv, NULL, &ofd, &efd);
610 maxfd = max(ofd, efd);
611 error = select(maxfd, &rfds, NULL, NULL, NULL);
614 pr_err("Error with select: %m\n");
618 if (FD_ISSET(ofd, &rfds)) {
619 bytes = read(ofd, rbuf, BUF_SIZE);
623 if (FD_ISSET(efd, &rfds)) {
625 bytes = read(efd, rbuf, BUF_SIZE);
629 pr_err("select() returned unknown fd\n");
634 pr_err("read() failed: %m\n");
639 * Workaround: When a process had die and it has only
640 * written to stderr, select() doesn't indicate that
641 * there might be something to read in stderr fd. To
642 * work around this issue, we try to read stderr just
643 * in case in order to ensure everything gets read.
646 bytes = read(efd, rbuf, BUF_SIZE);
656 pr_err("%s: stderr: %s\n",
659 pr_info("%s: stdout: %s\n",
670 harvest_zombies(child);
676 int register_event_handler(struct event_handler *handler)
678 struct epoll_event ev;
681 if (handler->fd <= 0) {
682 pr_err("Invalid file descriptor of %d\n", handler->fd);
686 if (!handler->handle_event) {
687 pr_err("Handler callback missing\n");
691 pr_info("Registering handler for %s, fd %d\n",
692 handler->name, handler->fd);
694 ev.data.fd = handler->fd;
695 ev.data.ptr = handler;
696 ev.events = handler->events;
697 ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handler->fd, &ev);
699 pr_err("Failed to add epoll_fd: %m\n");