3 #include <sys/select.h>
7 #include <sys/signalfd.h>
8 #include <sys/resource.h>
13 static int child_count;
14 static int parent_count;
15 static int job_request_fd[2];
16 static int job_get_permission_fd[2];
18 static unsigned int max_jobs;
19 static unsigned int job_count;
20 static unsigned int jobs_pending;
21 static unsigned int max_jobs_pending;
25 int (*work_fn)(void *);
27 struct work_struct *next;
31 struct work_struct *work;
37 struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
39 .name = "high priority",
41 .name = "high_prio_queue",
42 .lock = PTHREAD_MUTEX_INITIALIZER,
46 .name = "low priority",
48 .name = "low_prio_queue",
49 .lock = PTHREAD_MUTEX_INITIALIZER,
54 struct mutex work_pending_mutex = {
55 .name = "work_pending",
56 .lock = PTHREAD_MUTEX_INITIALIZER,
58 pthread_cond_t work_pending_cond = PTHREAD_COND_INITIALIZER;
60 static int run_work_on_queue(struct work_queue *queue)
62 struct work_struct *work;
64 mutex_lock(&queue->lock);
67 pr_info("No work to run on queue %s\n", queue->name);
68 mutex_unlock(&queue->lock);
74 queue->work = work->next;
78 * If queue is not empty, try waking up more workers. It is
79 * possible that when work were queued, the first worker did
80 * not wake up soon enough and
82 if (queue->length > 0)
83 pthread_cond_signal(&work_pending_cond);
85 mutex_unlock(&queue->lock);
87 pr_info("Executing work %s from queue %s, %d still pending\n",
88 work->name, queue->name, queue->length);
90 work->work_fn(work->arg);
91 pr_info("Work %s done\n", work->name);
97 static void *worker_thread(void *arg)
103 snprintf(name, sizeof(name), "worker%ld", (long)arg);
104 pthread_setname_np(pthread_self(), name);
108 int prio, work_done = 0;
111 * Execute as many works from the queues as
112 * there are, starting from highest priority
115 for (prio = 0; prio < WORK_PRIORITIES_NUM; prio++) {
117 run_work_on_queue(&work_queues[prio]);
126 pr_info("Worker going to sleep\n");
127 ret = pthread_cond_wait(&work_pending_cond,
128 &work_pending_mutex.lock);
130 pr_err("Error: %m\n");
132 mutex_lock_acquired(&work_pending_mutex);
134 mutex_unlock(&work_pending_mutex);
141 int queue_work(unsigned int priority, char *name,
142 int (work_fn)(void *arg), void *arg)
144 struct work_queue *queue;
145 struct work_struct *work, *last_work;
147 if (priority >= WORK_PRIORITIES_NUM) {
148 pr_err("Invalid priority: %d\n", priority);
152 work = calloc(sizeof(*work), 1);
155 work->work_fn = work_fn;
158 queue = &work_queues[priority];
160 /* Insert new work at the end of the work queue */
161 mutex_lock(&queue->lock);
163 last_work = queue->work;
164 while (last_work && last_work->next)
165 last_work = last_work->next;
170 last_work->next = work;
172 pr_info("Inserted work %s in queue %s, with %d pending items\n",
173 work->name, queue->name, queue->length);
175 mutex_unlock(&queue->lock);
177 pthread_cond_signal(&work_pending_cond);
182 int get_child_count(void)
187 int get_parent_count(void)
192 static int handle_signals(struct event_handler *h)
194 struct signalfd_siginfo siginfo;
197 ret = read(h->fd, &siginfo, sizeof(siginfo));
198 if (ret < sizeof(siginfo)) {
199 pr_err("Expected %zd from read, got %d: %m\n",
200 sizeof(siginfo), ret);
204 if (siginfo.ssi_signo != SIGCHLD) {
205 pr_err("Unexpected signal %d, ignoring\n", siginfo.ssi_signo);
209 harvest_zombies(siginfo.ssi_pid);
214 static int grant_new_job(void)
220 pr_info("Granting new job. %d jobs currently and %d pending\n",
221 job_count, jobs_pending);
223 ret = write(job_get_permission_fd[1], &byte, 1);
225 pr_err("Failed to write 1 byte: %m\n");
232 static int deny_job(void)
237 pr_info("Denying new job. %d jobs currently and %d pending, "
238 "limit of pending jobs is %d\n",
239 job_count, jobs_pending, max_jobs_pending);
241 ret = write(job_get_permission_fd[1], &byte, 1);
243 pr_err("Failed to write 1 byte: %m\n");
250 static int handle_job_request(struct event_handler *h)
254 ret = read(job_request_fd[0], &pid, sizeof(pid));
256 pr_err("Failed to read: %m\n");
261 pr_info("Read zero bytes\n");
266 if (job_count >= max_jobs) {
267 if (jobs_pending < max_jobs_pending)
272 ret = grant_new_job();
275 } else if (pid < 0) {
276 if (job_count > max_jobs)
277 pr_err("BUG: Job %u jobs exceeded limit %u\n",
278 job_count, max_jobs);
280 pr_info("Job %d finished\n", -pid);
284 ret = grant_new_job();
292 struct event_handler signal_handler = {
293 .handle_event = handle_signals,
298 struct event_handler job_request_handler = {
299 .handle_event = handle_job_request,
301 .name = "job_request",
305 * Initialize the jobcontrol.
307 * Create the pipes that are used to grant children execution
308 * permissions. If max_jobs is zero, count the number of CPUs from
309 * /proc/cpuinfo and use that.
311 int init_jobcontrol(int max_jobs_requested)
321 if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
322 pr_err("Failed to create pipe: %m\n");
326 if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
327 pr_err("Failed to create pipe: %m\n");
331 epoll_fd = epoll_create(1);
332 if (epoll_fd == -1) {
333 pr_err("Failed to epoll_create(): %m\n");
337 job_request_handler.fd = job_request_fd[0];
338 register_event_handler(&job_request_handler);
340 sigemptyset(&sigmask);
341 sigaddset(&sigmask, SIGCHLD);
343 /* Block SIGCHLD so that it becomes readable via signalfd */
344 ret = sigprocmask(SIG_BLOCK, &sigmask, NULL);
346 pr_err("Failed to sigprocmask: %m\n");
349 signal_handler.fd = signalfd(-1, &sigmask, SFD_CLOEXEC);
350 if (job_request_handler.fd < 0) {
351 pr_err("Failed to create signal_fd: %m\n");
355 register_event_handler(&signal_handler);
357 if (max_jobs_requested > 0) {
358 max_jobs = max_jobs_requested;
363 file = fopen("/proc/cpuinfo", "ro");
365 pr_err("Failed to open /proc/cpuinfo: %m\n");
370 * The CPU count algorithm simply reads the first 8 bytes from
371 * the /proc/cpuinfo and then expects that line to be there as
372 * many times as there are CPUs.
374 ret = fread(match, 1, sizeof(match), file);
375 if (ret < sizeof(match)) {
376 pr_err("read %d bytes when expecting %zd %m\n",
381 while(fgets(buf, sizeof(buf), file)) {
382 if (!strncmp(buf, match, sizeof(match)))
391 pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
393 max_jobs_pending = max_jobs * 10 + 25;
394 pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
396 /* Create worker threads */
397 thread = calloc(sizeof(*thread), max_jobs);
398 for (i = 0; i < max_jobs; i++)
399 pthread_create(&thread[i], NULL, worker_thread, (void *)i);
402 * Magic sleep. There are too many fork() calls at the moment
403 * so we must ensure our threads don't print anything out
404 * while a fork() is executed. Otherwise the child will
405 * inherit glibc internal locks while they are locked, and
406 * function calls such as printf will deadlock.
413 int poll_job_requests(int timeout)
415 struct epoll_event event;
416 struct event_handler *job_handler;
419 /* Convert positive seconds to milliseconds */
420 timeout = timeout > 0 ? 1000 * timeout : timeout;
422 ret = epoll_wait(epoll_fd, &event, 1, timeout);
425 if (errno != EINTR) {
426 pr_err("epoll_wait: %m\n");
431 * If epoll_wait() was interrupted, better start
432 * everything again from the beginning
438 pr_info("Timed out\n");
442 job_handler = event.data.ptr;
444 if (!job_handler || !job_handler->handle_event) {
445 pr_err("Corrupted event handler for fd %d\n",
450 pr_debug("Running handler %s to handle events from fd %d\n",
451 job_handler->name, job_handler->fd);
452 job_handler->handle_event(job_handler);
455 pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
460 * Per process flag indicating whether this child has requested fork
461 * limiting. If it has, it must also tell the master parent when it
462 * has died so that the parent can give next pending job permission to
465 static int is_limited_fork;
472 pr_err("fork() failed: %m\n");
478 pr_debug("Fork %d, child %d\n", child_count, child);
483 * Also do not notify the master parent the death of this
484 * child. Only childs that have been created with
485 * do_fork_limited() can have this flag set.
490 * Close unused ends of the job control pipes. Only the parent
491 * which controls the jobs may have the write end open of the
492 * job_get_permission_fd and the read end of the
493 * job_request_fd. Failing to close the pipe ends properly
494 * will cause the childs to wait forever for the run
495 * permission in case parent dies prematurely.
497 * Note! The file descriptor must be closed once and only
498 * once. They are marked to -1 to make it impossible for
499 * subseqent do_fork() calls from closing them again (in which
500 * case some other file descriptor might already be reserved
501 * for the same number) and prevent accidentally closing some
502 * innocent file descriptors that are still in use.
504 if (job_get_permission_fd[1] >= 0) {
505 close(job_get_permission_fd[1]);
506 job_get_permission_fd[1] = -1;
508 if (job_request_fd[0] >= 0) {
509 close(job_request_fd[0]);
510 job_request_fd[0] = -1;
513 /* reset child's child count */
519 static int request_fork(int request)
523 pid = request > 0 ? pid : -pid;
525 return write(job_request_fd[1], &pid, sizeof(pid));
528 static void limited_fork_exit_handler(void)
535 * Like do_fork(), but allow the child continue only after the global
536 * job count is low enough.
538 * We allow the parent to continue other more important activities but
539 * child respects the limit of global active processes.
541 int do_fork_limited(void)
550 /* Remember to notify the parent when we are done */
551 atexit(limited_fork_exit_handler);
554 pr_debug("Requesting permission to go\n");
556 /* Signal the parent that we are here, waiting to go */
560 * The parent will tell us when we can continue. If there were
561 * multiple children waiting for their turn to run only one
562 * will be reading the content byte from the pipe and getting
563 * the permission to run.
565 ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
567 pr_err("Error requesting run, did the parent die?\n");
570 pr_err("Job control request failure: %m\n");
573 pr_info("Did not get permission to execute. Terminating\n");
576 * Avoid running exit handler, that would tell the
577 * parent we died normally and decrement the job
583 pr_debug("Continuing\n");
587 int harvest_zombies(int pid)
590 struct rusage rusage;
591 char *status_str = NULL;
594 if (child_count == 0)
598 pr_debug("Waiting on pid %d, children left: %d\n", pid,
602 pid = wait4(pid, &status, 0, &rusage);
604 pr_err("Error on waitid(): %m\n");
607 /* Wait until the child has become a zombie */
608 } while (!WIFEXITED(status) && !WIFSIGNALED(status));
611 if (WIFEXITED(status)) {
612 status_str = "exited with status";
613 code = WEXITSTATUS(status);
614 } else if (WIFSIGNALED(status)) {
615 status_str = "killed by signal";
616 code = WTERMSIG(status);
618 pr_debug("pid %d: %s %d. Children left: %d\n", pid,
619 status_str, code, child_count);
620 pr_debug("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
621 (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
622 (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
628 * Runs a command cmd with params argv, connects stdin and stdout to
631 * Returns the pid of the executed process
633 int run_piped(const char *cmd, char *const argv[],
634 int *stdinfd, int *stdoutfd, int *stderrfd)
636 int ifd[2], ofd[2], efd[2], pid;
638 pr_info("Running command %s\n", cmd);
640 if (stdinfd && pipe(ifd)) {
641 pr_err("pipe() failed: %m\n");
645 if (stdoutfd && pipe(ofd)) {
646 pr_err("pipe() failed: %m\n");
650 if (stderrfd && pipe(efd)) {
651 pr_err("pipe() failed: %m\n");
656 if (pid) { /* Parent side */
677 dup2(ifd[0], STDIN_FILENO);
682 dup2(ofd[1], STDOUT_FILENO);
687 dup2(efd[1], STDERR_FILENO);
690 /* Now we have redirected standard streams to parent process */
692 pr_err("Failed to execv command %s: %m\n", cmd);
699 * Runs a command cmd with params argv, connects stdin and stdout to
702 * Returns the pid of the executed process
704 int run_piped_stream(const char *cmd, char *const argv[],
705 FILE **stdinf, FILE **stdoutf, FILE **stderrf)
707 int ifd, ofd, efd, pid;
723 pid = run_piped(cmd, argv, i, o, e);
726 *stdinf = fdopen(ifd, "r");
727 if (*stdinf == NULL) {
728 pr_err("Error opening file stream for fd %d: %m\n",
735 *stdoutf = fdopen(ofd, "r");
736 if (*stdoutf == NULL) {
737 pr_err("Error opening file stream for fd %d: %m\n",
744 *stderrf = fdopen(efd, "r");
745 if (*stderrf == NULL) {
746 pr_err("Error opening file stream for fd %d: %m\n",
756 * Forks a child and executes a command to run on parallel
759 #define max(a,b) (a) < (b) ? (b) : (a)
760 #define BUF_SIZE (128*1024)
761 int run(const char *cmd, char *const argv[])
769 if ((child = do_fork()))
772 child = run_piped(cmd, argv, NULL, &ofd, &efd);
784 maxfd = max(ofd, efd);
785 error = select(maxfd, &rfds, NULL, NULL, NULL);
788 pr_err("Error with select: %m\n");
792 if (FD_ISSET(ofd, &rfds)) {
793 bytes = read(ofd, rbuf, BUF_SIZE);
797 if (FD_ISSET(efd, &rfds)) {
799 bytes = read(efd, rbuf, BUF_SIZE);
803 pr_err("select() returned unknown fd\n");
808 pr_err("read() failed: %m\n");
813 * Workaround: When a process had die and it has only
814 * written to stderr, select() doesn't indicate that
815 * there might be something to read in stderr fd. To
816 * work around this issue, we try to read stderr just
817 * in case in order to ensure everything gets read.
820 bytes = read(efd, rbuf, BUF_SIZE);
830 pr_err("%s: stderr: %s\n",
833 pr_info("%s: stdout: %s\n",
844 harvest_zombies(child);
850 int register_event_handler(struct event_handler *handler)
852 struct epoll_event ev;
855 if (handler->fd <= 0) {
856 pr_err("Invalid file descriptor of %d\n", handler->fd);
860 if (!handler->handle_event) {
861 pr_err("Handler callback missing\n");
865 pr_info("Registering handler for %s, fd %d\n",
866 handler->name, handler->fd);
868 ev.data.fd = handler->fd;
869 ev.data.ptr = handler;
870 ev.events = handler->events;
871 ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handler->fd, &ev);
873 pr_err("Failed to add epoll_fd: %m\n");
880 void _mutex_lock_acquired(struct mutex *lock, char *file, int line)
886 int _mutex_lock(struct mutex *lock, char *file, int line)
890 if (!pthread_mutex_trylock(&lock->lock))
893 pr_info("Lock contention on lock %s on %s:%d\n",
894 lock->name, lock->file, lock->line);
896 ret = pthread_mutex_lock(&lock->lock);
898 pr_err("Acquirin lock %s failed: %m, acquired %s:%d\n",
899 lock->name, lock->file, lock->line);
902 _mutex_lock_acquired(lock, file, line);
906 int _mutex_unlock(struct mutex *lock)
910 pthread_mutex_unlock(&lock->lock);