3 #include <sys/select.h>
7 #include <sys/signalfd.h>
8 #include <sys/resource.h>
13 static int child_count;
14 static int parent_count;
15 static int job_request_fd[2];
16 static int job_get_permission_fd[2];
18 static unsigned int max_jobs;
19 static unsigned int job_count;
20 static unsigned int jobs_pending;
21 static unsigned int max_jobs_pending;
23 int get_child_count(void)
28 int get_parent_count(void)
33 static int handle_signals(struct event_handler *h)
35 struct signalfd_siginfo siginfo;
38 ret = read(h->fd, &siginfo, sizeof(siginfo));
39 if (ret < sizeof(siginfo)) {
40 pr_err("Expected %zd from read, got %d: %m\n",
41 sizeof(siginfo), ret);
45 if (siginfo.ssi_signo != SIGCHLD) {
46 pr_err("Unexpected signal %d, ignoring\n", siginfo.ssi_signo);
50 harvest_zombies(siginfo.ssi_pid);
55 static int grant_new_job(void)
61 pr_info("Granting new job. %d jobs currently and %d pending\n",
62 job_count, jobs_pending);
64 ret = write(job_get_permission_fd[1], &byte, 1);
66 pr_err("Failed to write 1 byte: %m\n");
73 static int deny_job(void)
78 pr_info("Denying new job. %d jobs currently and %d pending, "
79 "limit of pending jobs is %d\n",
80 job_count, jobs_pending, max_jobs_pending);
82 ret = write(job_get_permission_fd[1], &byte, 1);
84 pr_err("Failed to write 1 byte: %m\n");
91 static int handle_job_request(struct event_handler *h)
95 ret = read(job_request_fd[0], &pid, sizeof(pid));
97 pr_err("Failed to read: %m\n");
102 pr_info("Read zero bytes\n");
107 if (job_count >= max_jobs) {
108 if (jobs_pending < max_jobs_pending)
113 ret = grant_new_job();
116 } else if (pid < 0) {
117 if (job_count > max_jobs)
118 pr_err("BUG: Job %u jobs exceeded limit %u\n",
119 job_count, max_jobs);
121 pr_info("Job %d finished\n", -pid);
125 ret = grant_new_job();
133 struct event_handler signal_handler = {
134 .handle_event = handle_signals,
139 struct event_handler job_request_handler = {
140 .handle_event = handle_job_request,
142 .name = "job_request",
146 * Initialize the jobcontrol.
148 * Create the pipes that are used to grant children execution
149 * permissions. If max_jobs is zero, count the number of CPUs from
150 * /proc/cpuinfo and use that.
152 int init_jobcontrol(int max_jobs_requested)
160 if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
161 pr_err("Failed to create pipe: %m\n");
165 if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
166 pr_err("Failed to create pipe: %m\n");
170 epoll_fd = epoll_create(1);
171 if (epoll_fd == -1) {
172 pr_err("Failed to epoll_create(): %m\n");
176 job_request_handler.fd = job_request_fd[0];
177 register_event_handler(&job_request_handler);
179 sigemptyset(&sigmask);
180 sigaddset(&sigmask, SIGCHLD);
182 /* Block SIGCHLD so that it becomes readable via signalfd */
183 ret = sigprocmask(SIG_BLOCK, &sigmask, NULL);
185 pr_err("Failed to sigprocmask: %m\n");
188 signal_handler.fd = signalfd(-1, &sigmask, SFD_CLOEXEC);
189 if (job_request_handler.fd < 0) {
190 pr_err("Failed to create signal_fd: %m\n");
194 register_event_handler(&signal_handler);
196 if (max_jobs_requested > 0) {
197 max_jobs = max_jobs_requested;
202 file = fopen("/proc/cpuinfo", "ro");
204 pr_err("Failed to open /proc/cpuinfo: %m\n");
209 * The CPU count algorithm simply reads the first 8 bytes from
210 * the /proc/cpuinfo and then expects that line to be there as
211 * many times as there are CPUs.
213 ret = fread(match, 1, sizeof(match), file);
214 if (ret < sizeof(match)) {
215 pr_err("read %d bytes when expecting %zd %m\n",
220 while(fgets(buf, sizeof(buf), file)) {
221 if (!strncmp(buf, match, sizeof(match)))
230 pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
232 max_jobs_pending = max_jobs * 10 + 25;
233 pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
238 int poll_job_requests(int timeout)
240 struct epoll_event event;
241 struct event_handler *job_handler;
244 /* Convert positive seconds to milliseconds */
245 timeout = timeout > 0 ? 1000 * timeout : timeout;
247 ret = epoll_wait(epoll_fd, &event, 1, timeout);
250 if (errno != EINTR) {
251 pr_err("epoll_wait: %m\n");
256 * If epoll_wait() was interrupted, better start
257 * everything again from the beginning
263 pr_info("Timed out\n");
267 job_handler = event.data.ptr;
269 if (!job_handler || !job_handler->handle_event) {
270 pr_err("Corrupted event handler for fd %d\n",
275 pr_debug("Running handler %s to handle events from fd %d\n",
276 job_handler->name, job_handler->fd);
277 job_handler->handle_event(job_handler);
280 pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
285 * Per process flag indicating whether this child has requested fork
286 * limiting. If it has, it must also tell the master parent when it
287 * has died so that the parent can give next pending job permission to
290 static int is_limited_fork;
297 pr_err("fork() failed: %m\n");
303 pr_debug("Fork %d, child %d\n", child_count, child);
308 * Also do not notify the master parent the death of this
309 * child. Only childs that have been created with
310 * do_fork_limited() can have this flag set.
315 * Close unused ends of the job control pipes. Only the parent
316 * which controls the jobs may have the write end open of the
317 * job_get_permission_fd and the read end of the
318 * job_request_fd. Failing to close the pipe ends properly
319 * will cause the childs to wait forever for the run
320 * permission in case parent dies prematurely.
322 * Note! The file descriptor must be closed once and only
323 * once. They are marked to -1 to make it impossible for
324 * subseqent do_fork() calls from closing them again (in which
325 * case some other file descriptor might already be reserved
326 * for the same number) and prevent accidentally closing some
327 * innocent file descriptors that are still in use.
329 if (job_get_permission_fd[1] >= 0) {
330 close(job_get_permission_fd[1]);
331 job_get_permission_fd[1] = -1;
333 if (job_request_fd[0] >= 0) {
334 close(job_request_fd[0]);
335 job_request_fd[0] = -1;
338 /* reset child's child count */
344 static int request_fork(int request)
348 pid = request > 0 ? pid : -pid;
350 return write(job_request_fd[1], &pid, sizeof(pid));
353 static void limited_fork_exit_handler(void)
360 * Like do_fork(), but allow the child continue only after the global
361 * job count is low enough.
363 * We allow the parent to continue other more important activities but
364 * child respects the limit of global active processes.
366 int do_fork_limited(void)
375 /* Remember to notify the parent when we are done */
376 atexit(limited_fork_exit_handler);
379 pr_debug("Requesting permission to go\n");
381 /* Signal the parent that we are here, waiting to go */
385 * The parent will tell us when we can continue. If there were
386 * multiple children waiting for their turn to run only one
387 * will be reading the content byte from the pipe and getting
388 * the permission to run.
390 ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
392 pr_err("Error requesting run, did the parent die?\n");
395 pr_err("Job control request failure: %m\n");
398 pr_info("Did not get permission to execute. Terminating\n");
401 * Avoid running exit handler, that would tell the
402 * parent we died normally and decrement the job
408 pr_debug("Continuing\n");
412 int harvest_zombies(int pid)
415 struct rusage rusage;
416 char *status_str = NULL;
419 if (child_count == 0)
423 pr_debug("Waiting on pid %d, children left: %d\n", pid,
427 pid = wait4(pid, &status, 0, &rusage);
429 pr_err("Error on waitid(): %m\n");
432 /* Wait until the child has become a zombie */
433 } while (!WIFEXITED(status) && !WIFSIGNALED(status));
436 if (WIFEXITED(status)) {
437 status_str = "exited with status";
438 code = WEXITSTATUS(status);
439 } else if (WIFSIGNALED(status)) {
440 status_str = "killed by signal";
441 code = WTERMSIG(status);
443 pr_debug("pid %d: %s %d. Children left: %d\n", pid,
444 status_str, code, child_count);
445 pr_debug("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
446 (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
447 (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
453 * Runs a command cmd with params argv, connects stdin and stdout to
456 * Returns the pid of the executed process
458 int run_piped(const char *cmd, char *const argv[],
459 int *stdinfd, int *stdoutfd, int *stderrfd)
461 int ifd[2], ofd[2], efd[2], pid;
463 pr_info("Running command %s\n", cmd);
465 if (stdinfd && pipe(ifd)) {
466 pr_err("pipe() failed: %m\n");
470 if (stdoutfd && pipe(ofd)) {
471 pr_err("pipe() failed: %m\n");
475 if (stderrfd && pipe(efd)) {
476 pr_err("pipe() failed: %m\n");
481 if (pid) { /* Parent side */
502 dup2(ifd[0], STDIN_FILENO);
507 dup2(ofd[1], STDOUT_FILENO);
512 dup2(efd[1], STDERR_FILENO);
515 /* Now we have redirected standard streams to parent process */
517 pr_err("Failed to execv command %s: %m\n", cmd);
524 * Runs a command cmd with params argv, connects stdin and stdout to
527 * Returns the pid of the executed process
529 int run_piped_stream(const char *cmd, char *const argv[],
530 FILE **stdinf, FILE **stdoutf, FILE **stderrf)
532 int ifd, ofd, efd, pid;
548 pid = run_piped(cmd, argv, i, o, e);
551 *stdinf = fdopen(ifd, "r");
552 if (*stdinf == NULL) {
553 pr_err("Error opening file stream for fd %d: %m\n",
560 *stdoutf = fdopen(ofd, "r");
561 if (*stdoutf == NULL) {
562 pr_err("Error opening file stream for fd %d: %m\n",
569 *stderrf = fdopen(efd, "r");
570 if (*stderrf == NULL) {
571 pr_err("Error opening file stream for fd %d: %m\n",
581 * Forks a child and executes a command to run on parallel
584 #define max(a,b) (a) < (b) ? (b) : (a)
585 #define BUF_SIZE (128*1024)
586 int run(const char *cmd, char *const argv[])
594 if ((child = do_fork()))
597 child = run_piped(cmd, argv, NULL, &ofd, &efd);
609 maxfd = max(ofd, efd);
610 error = select(maxfd, &rfds, NULL, NULL, NULL);
613 pr_err("Error with select: %m\n");
617 if (FD_ISSET(ofd, &rfds)) {
618 bytes = read(ofd, rbuf, BUF_SIZE);
622 if (FD_ISSET(efd, &rfds)) {
624 bytes = read(efd, rbuf, BUF_SIZE);
628 pr_err("select() returned unknown fd\n");
633 pr_err("read() failed: %m\n");
638 * Workaround: When a process had die and it has only
639 * written to stderr, select() doesn't indicate that
640 * there might be something to read in stderr fd. To
641 * work around this issue, we try to read stderr just
642 * in case in order to ensure everything gets read.
645 bytes = read(efd, rbuf, BUF_SIZE);
655 pr_err("%s: stderr: %s\n",
658 pr_info("%s: stdout: %s\n",
669 harvest_zombies(child);
675 int register_event_handler(struct event_handler *handler)
677 struct epoll_event ev;
680 if (handler->fd <= 0) {
681 pr_err("Invalid file descriptor of %d\n", handler->fd);
685 if (!handler->handle_event) {
686 pr_err("Handler callback missing\n");
690 pr_info("Registering handler for %s, fd %d\n",
691 handler->name, handler->fd);
693 ev.data.fd = handler->fd;
694 ev.data.ptr = handler;
695 ev.events = handler->events;
696 ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handler->fd, &ev);
698 pr_err("Failed to add epoll_fd: %m\n");
705 void _mutex_lock_acquired(struct mutex *lock, char *file, int line)
711 int _mutex_lock(struct mutex *lock, char *file, int line)
715 if (!pthread_mutex_trylock(&lock->lock))
718 pr_info("Lock contention on lock %s on %s:%d\n",
719 lock->name, lock->file, lock->line);
721 ret = pthread_mutex_lock(&lock->lock);
723 pr_err("Acquirin lock %s failed: %m, acquired %s:%d\n",
724 lock->name, lock->file, lock->line);
727 _mutex_lock_acquired(lock, file, line);
731 int _mutex_unlock(struct mutex *lock)
735 pthread_mutex_unlock(&lock->lock);