3 #include <sys/select.h>
7 #include <sys/signalfd.h>
8 #include <sys/resource.h>
13 static int child_count;
14 static int parent_count;
15 static int job_request_fd[2];
16 static int job_get_permission_fd[2];
18 static unsigned int max_jobs;
19 static unsigned int job_count;
20 static unsigned int jobs_pending;
21 static unsigned int max_jobs_pending;
25 int (*work_fn)(void *);
27 struct work_struct *next;
31 struct work_struct *work;
37 struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
39 .name = "high priority",
41 .name = "high_prio_queue",
42 .lock = PTHREAD_MUTEX_INITIALIZER,
46 .name = "low priority",
48 .name = "low_prio_queue",
49 .lock = PTHREAD_MUTEX_INITIALIZER,
54 struct mutex work_pending_mutex = {
55 .name = "work_pending",
56 .lock = PTHREAD_MUTEX_INITIALIZER,
58 pthread_cond_t work_pending_cond = PTHREAD_COND_INITIALIZER;
60 static int run_work_on_queue(struct work_queue *queue)
62 struct work_struct *work;
64 mutex_lock(&queue->lock);
67 mutex_unlock(&queue->lock);
73 queue->work = work->next;
77 * If queue is not empty, try waking up more workers. It is
78 * possible that when work were queued, the first worker did
79 * not wake up soon enough and
81 if (queue->length > 0)
82 pthread_cond_signal(&work_pending_cond);
84 mutex_unlock(&queue->lock);
86 pr_info("Executing work %s from queue %s, %d still pending\n",
87 work->name, queue->name, queue->length);
89 work->work_fn(work->arg);
90 pr_info("Work %s done\n", work->name);
96 static void *worker_thread(void *arg)
102 snprintf(name, sizeof(name), "worker%ld", (long)arg);
103 pthread_setname_np(pthread_self(), name);
107 int prio, work_done = 0;
110 * Execute as many works from the queues as
111 * there are, starting from highest priority
114 for (prio = 0; prio < WORK_PRIORITIES_NUM; prio++) {
116 run_work_on_queue(&work_queues[prio]);
125 pr_info("Worker going to sleep\n");
126 ret = pthread_cond_wait(&work_pending_cond,
127 &work_pending_mutex.lock);
129 pr_err("Error: %m\n");
131 mutex_lock_acquired(&work_pending_mutex);
133 mutex_unlock(&work_pending_mutex);
140 int queue_work(unsigned int priority, char *name,
141 int (work_fn)(void *arg), void *arg)
143 struct work_queue *queue;
144 struct work_struct *work, *last_work;
146 if (priority >= WORK_PRIORITIES_NUM) {
147 pr_err("Invalid priority: %d\n", priority);
151 work = calloc(sizeof(*work), 1);
154 work->work_fn = work_fn;
157 queue = &work_queues[priority];
159 /* Insert new work at the end of the work queue */
160 mutex_lock(&queue->lock);
162 last_work = queue->work;
163 while (last_work && last_work->next)
164 last_work = last_work->next;
169 last_work->next = work;
171 pr_info("Inserted work %s in queue %s, with %d pending items\n",
172 work->name, queue->name, queue->length);
174 mutex_unlock(&queue->lock);
176 pthread_cond_signal(&work_pending_cond);
181 int get_child_count(void)
186 int get_parent_count(void)
191 static int grant_new_job(void)
197 pr_info("Granting new job. %d jobs currently and %d pending\n",
198 job_count, jobs_pending);
200 ret = write(job_get_permission_fd[1], &byte, 1);
202 pr_err("Failed to write 1 byte: %m\n");
209 static int deny_job(void)
214 pr_info("Denying new job. %d jobs currently and %d pending, "
215 "limit of pending jobs is %d\n",
216 job_count, jobs_pending, max_jobs_pending);
218 ret = write(job_get_permission_fd[1], &byte, 1);
220 pr_err("Failed to write 1 byte: %m\n");
227 static int handle_job_request(struct event_handler *h)
231 ret = read(job_request_fd[0], &pid, sizeof(pid));
233 pr_err("Failed to read: %m\n");
238 pr_info("Read zero bytes\n");
243 if (job_count >= max_jobs) {
244 if (jobs_pending < max_jobs_pending)
249 ret = grant_new_job();
252 } else if (pid < 0) {
253 if (job_count > max_jobs)
254 pr_err("BUG: Job %u jobs exceeded limit %u\n",
255 job_count, max_jobs);
257 pr_info("Job %d finished\n", -pid);
261 ret = grant_new_job();
269 struct event_handler job_request_handler = {
270 .handle_event = handle_job_request,
272 .name = "job_request",
276 * Initialize the jobcontrol.
278 * Create the pipes that are used to grant children execution
279 * permissions. If max_jobs is zero, count the number of CPUs from
280 * /proc/cpuinfo and use that.
282 int init_jobcontrol(int max_jobs_requested)
292 if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
293 pr_err("Failed to create pipe: %m\n");
297 if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
298 pr_err("Failed to create pipe: %m\n");
302 epoll_fd = epoll_create(1);
303 if (epoll_fd == -1) {
304 pr_err("Failed to epoll_create(): %m\n");
308 job_request_handler.fd = job_request_fd[0];
309 register_event_handler(&job_request_handler);
311 sigemptyset(&sigmask);
312 sigaddset(&sigmask, SIGCHLD);
314 if (max_jobs_requested > 0) {
315 max_jobs = max_jobs_requested;
320 file = fopen("/proc/cpuinfo", "ro");
322 pr_err("Failed to open /proc/cpuinfo: %m\n");
327 * The CPU count algorithm simply reads the first 8 bytes from
328 * the /proc/cpuinfo and then expects that line to be there as
329 * many times as there are CPUs.
331 ret = fread(match, 1, sizeof(match), file);
332 if (ret < sizeof(match)) {
333 pr_err("read %d bytes when expecting %zd %m\n",
338 while(fgets(buf, sizeof(buf), file)) {
339 if (!strncmp(buf, match, sizeof(match)))
348 pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
350 max_jobs_pending = max_jobs * 10 + 25;
351 pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
353 /* Create worker threads */
354 thread = calloc(sizeof(*thread), max_jobs);
355 for (i = 0; i < max_jobs; i++)
356 pthread_create(&thread[i], NULL, worker_thread, (void *)i);
361 int poll_job_requests(int timeout)
363 struct epoll_event event;
364 struct event_handler *job_handler;
367 /* Convert positive seconds to milliseconds */
368 timeout = timeout > 0 ? 1000 * timeout : timeout;
370 ret = epoll_wait(epoll_fd, &event, 1, timeout);
373 if (errno != EINTR) {
374 pr_err("epoll_wait: %m\n");
379 * If epoll_wait() was interrupted, better start
380 * everything again from the beginning
386 pr_info("Timed out\n");
390 job_handler = event.data.ptr;
392 if (!job_handler || !job_handler->handle_event) {
393 pr_err("Corrupted event handler for fd %d\n",
398 pr_debug("Running handler %s to handle events from fd %d\n",
399 job_handler->name, job_handler->fd);
400 job_handler->handle_event(job_handler);
403 pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
408 * Per process flag indicating whether this child has requested fork
409 * limiting. If it has, it must also tell the master parent when it
410 * has died so that the parent can give next pending job permission to
413 static int is_limited_fork;
420 pr_err("fork() failed: %m\n");
426 pr_debug("Fork %d, child %d\n", child_count, child);
431 * Also do not notify the master parent the death of this
432 * child. Only childs that have been created with
433 * do_fork_limited() can have this flag set.
438 * Close unused ends of the job control pipes. Only the parent
439 * which controls the jobs may have the write end open of the
440 * job_get_permission_fd and the read end of the
441 * job_request_fd. Failing to close the pipe ends properly
442 * will cause the childs to wait forever for the run
443 * permission in case parent dies prematurely.
445 * Note! The file descriptor must be closed once and only
446 * once. They are marked to -1 to make it impossible for
447 * subseqent do_fork() calls from closing them again (in which
448 * case some other file descriptor might already be reserved
449 * for the same number) and prevent accidentally closing some
450 * innocent file descriptors that are still in use.
452 if (job_get_permission_fd[1] >= 0) {
453 close(job_get_permission_fd[1]);
454 job_get_permission_fd[1] = -1;
456 if (job_request_fd[0] >= 0) {
457 close(job_request_fd[0]);
458 job_request_fd[0] = -1;
461 /* reset child's child count */
467 static int request_fork(int request)
471 pid = request > 0 ? pid : -pid;
473 return write(job_request_fd[1], &pid, sizeof(pid));
476 static void limited_fork_exit_handler(void)
483 * Like do_fork(), but allow the child continue only after the global
484 * job count is low enough.
486 * We allow the parent to continue other more important activities but
487 * child respects the limit of global active processes.
489 int do_fork_limited(void)
498 /* Remember to notify the parent when we are done */
499 atexit(limited_fork_exit_handler);
502 pr_debug("Requesting permission to go\n");
504 /* Signal the parent that we are here, waiting to go */
508 * The parent will tell us when we can continue. If there were
509 * multiple children waiting for their turn to run only one
510 * will be reading the content byte from the pipe and getting
511 * the permission to run.
513 ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
515 pr_err("Error requesting run, did the parent die?\n");
518 pr_err("Job control request failure: %m\n");
521 pr_info("Did not get permission to execute. Terminating\n");
524 * Avoid running exit handler, that would tell the
525 * parent we died normally and decrement the job
531 pr_debug("Continuing\n");
535 int harvest_zombies(int pid)
538 struct rusage rusage;
539 char *status_str = NULL;
542 if (child_count == 0)
546 pr_debug("Waiting on pid %d, children left: %d\n", pid,
550 pid = wait4(pid, &status, 0, &rusage);
552 pr_err("Error on waitid(): %m\n");
555 /* Wait until the child has become a zombie */
556 } while (!WIFEXITED(status) && !WIFSIGNALED(status));
559 if (WIFEXITED(status)) {
560 status_str = "exited with status";
561 code = WEXITSTATUS(status);
562 } else if (WIFSIGNALED(status)) {
563 status_str = "killed by signal";
564 code = WTERMSIG(status);
566 pr_debug("pid %d: %s %d. Children left: %d\n", pid,
567 status_str, code, child_count);
568 pr_debug("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
569 (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
570 (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
576 * Runs a command cmd with params argv, connects stdin and stdout to
579 * Returns the pid of the executed process
581 int run_piped(const char *cmd, char *const argv[],
582 int *stdinfd, int *stdoutfd, int *stderrfd)
584 int ifd[2], ofd[2], efd[2], pid;
586 pr_info("Running command %s\n", cmd);
588 if (stdinfd && pipe(ifd)) {
589 pr_err("pipe() failed: %m\n");
593 if (stdoutfd && pipe(ofd)) {
594 pr_err("pipe() failed: %m\n");
598 if (stderrfd && pipe(efd)) {
599 pr_err("pipe() failed: %m\n");
604 if (pid) { /* Parent side */
625 dup2(ifd[0], STDIN_FILENO);
630 dup2(ofd[1], STDOUT_FILENO);
635 dup2(efd[1], STDERR_FILENO);
638 /* Now we have redirected standard streams to parent process */
640 pr_err("Failed to execv command %s: %m\n", cmd);
647 * Runs a command cmd with params argv, connects stdin and stdout to
650 * Returns the pid of the executed process
652 int run_piped_stream(const char *cmd, char *const argv[],
653 FILE **stdinf, FILE **stdoutf, FILE **stderrf)
655 int ifd, ofd, efd, pid;
671 pid = run_piped(cmd, argv, i, o, e);
674 *stdinf = fdopen(ifd, "r");
675 if (*stdinf == NULL) {
676 pr_err("Error opening file stream for fd %d: %m\n",
683 *stdoutf = fdopen(ofd, "r");
684 if (*stdoutf == NULL) {
685 pr_err("Error opening file stream for fd %d: %m\n",
692 *stderrf = fdopen(efd, "r");
693 if (*stderrf == NULL) {
694 pr_err("Error opening file stream for fd %d: %m\n",
704 * Forks a child and executes a command to run on parallel
707 #define max(a,b) (a) < (b) ? (b) : (a)
708 #define BUF_SIZE (128*1024)
709 int run(const char *cmd, char *const argv[])
717 child = run_piped(cmd, argv, NULL, &ofd, &efd);
729 maxfd = max(ofd, efd);
730 error = select(maxfd, &rfds, NULL, NULL, NULL);
733 pr_err("Error with select: %m\n");
737 if (FD_ISSET(ofd, &rfds)) {
738 bytes = read(ofd, rbuf, BUF_SIZE);
742 if (FD_ISSET(efd, &rfds)) {
744 bytes = read(efd, rbuf, BUF_SIZE);
748 pr_err("select() returned unknown fd\n");
753 pr_err("read() failed: %m\n");
758 * Workaround: When a process had die and it has only
759 * written to stderr, select() doesn't indicate that
760 * there might be something to read in stderr fd. To
761 * work around this issue, we try to read stderr just
762 * in case in order to ensure everything gets read.
765 bytes = read(efd, rbuf, BUF_SIZE);
775 pr_err("%s: stderr: %s\n",
778 pr_info("%s: stdout: %s\n",
789 harvest_zombies(child);
794 int register_event_handler(struct event_handler *handler)
796 struct epoll_event ev;
799 if (handler->fd <= 0) {
800 pr_err("Invalid file descriptor of %d\n", handler->fd);
804 if (!handler->handle_event) {
805 pr_err("Handler callback missing\n");
809 pr_info("Registering handler for %s, fd %d\n",
810 handler->name, handler->fd);
812 ev.data.fd = handler->fd;
813 ev.data.ptr = handler;
814 ev.events = handler->events;
815 ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handler->fd, &ev);
817 pr_err("Failed to add epoll_fd: %m\n");
824 void _mutex_lock_acquired(struct mutex *lock, char *file, int line)
830 int _mutex_lock(struct mutex *lock, char *file, int line)
834 if (!pthread_mutex_trylock(&lock->lock))
837 pr_info("Lock contention on lock %s on %s:%d\n",
838 lock->name, lock->file, lock->line);
840 ret = pthread_mutex_lock(&lock->lock);
842 pr_err("Acquirin lock %s failed: %m, acquired %s:%d\n",
843 lock->name, lock->file, lock->line);
846 _mutex_lock_acquired(lock, file, line);
850 int _mutex_unlock(struct mutex *lock)
854 pthread_mutex_unlock(&lock->lock);