3 #include <sys/select.h>
7 #include <sys/signalfd.h>
8 #include <sys/resource.h>
15 static unsigned int max_jobs;
16 static unsigned int max_jobs_pending;
20 int (*work_fn)(void *);
22 struct work_struct *next;
26 struct work_struct *work;
32 struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
34 .name = "high priority",
36 .name = "high_prio_queue",
37 .lock = PTHREAD_MUTEX_INITIALIZER,
41 .name = "low priority",
43 .name = "low_prio_queue",
44 .lock = PTHREAD_MUTEX_INITIALIZER,
49 struct mutex work_stats_mutex = {
51 .lock = PTHREAD_MUTEX_INITIALIZER,
53 static int workers_active;
54 static int worker_count;
56 static int job_notify_fd[2];
58 static int run_work_on_queue(struct work_queue *queue)
60 struct work_struct *work;
62 mutex_lock(&queue->lock);
65 mutex_unlock(&queue->lock);
71 queue->work = work->next;
74 mutex_unlock(&queue->lock);
76 pr_info("Executing work %s from queue %s, %d still pending\n",
77 work->name, queue->name, queue->length);
79 work->work_fn(work->arg);
80 pr_info("Work %s done\n", work->name);
86 static void *worker_thread(void *arg)
91 mutex_lock(&work_stats_mutex);
92 snprintf(name, sizeof(name), "worker%d", worker_count);
94 mutex_unlock(&work_stats_mutex);
96 pthread_setname_np(pthread_self(), name);
97 pthread_detach(pthread_self());
99 pr_info("Worker started\n");
101 /* Execute all high priority work from the queue */
102 while (run_work_on_queue(&work_queues[WORK_PRIORITY_HIGH]))
105 * All high priority work is now done, see if we have enough
106 * workers executing low priority worl. Continue from there if
109 mutex_lock(&work_stats_mutex);
110 if (workers_active > max_jobs)
113 mutex_unlock(&work_stats_mutex);
116 * Start executing the low priority work. Drop the nice value
117 * as this really is low priority stuff
120 pr_info("Worker priority dropped to %d\n", ret);
122 while (run_work_on_queue(&work_queues[WORK_PRIORITY_LOW]))
126 mutex_lock(&work_stats_mutex);
129 pr_info("Worker exiting, %d left active\n",
133 * Last exiting worker zeroes the worker_count. This
134 * ensures next time we start spawning worker threads
135 * the first thread will have number zero on its name.
141 * Kick the job poller. If all jobs were active at this point
142 * the scheduler thread will wait indefinitely until someone
143 * tells it to do something. We may now know when next job is
144 * available, so it is better for the scheduler to recalculate
147 notify_job_request();
149 mutex_unlock(&work_stats_mutex);
154 int queue_work(unsigned int priority, char *name,
155 int (work_fn)(void *arg), void *arg)
158 struct work_queue *queue;
159 struct work_struct *work, *last_work;
161 if (priority >= WORK_PRIORITIES_NUM) {
162 pr_err("Invalid priority: %d\n", priority);
166 work = calloc(sizeof(*work), 1);
169 work->work_fn = work_fn;
172 queue = &work_queues[priority];
174 /* Insert new work at the end of the work queue */
175 mutex_lock(&queue->lock);
177 last_work = queue->work;
178 while (last_work && last_work->next)
179 last_work = last_work->next;
184 last_work->next = work;
186 pr_info("Inserted work %s in queue %s, with %d pending items\n",
187 work->name, queue->name, queue->length);
189 mutex_unlock(&queue->lock);
191 mutex_lock(&work_stats_mutex);
192 pr_info("workers_active: %d, priority: %d\n", workers_active, priority);
193 if (priority != WORK_PRIORITY_HIGH && workers_active >= max_jobs) {
194 mutex_unlock(&work_stats_mutex);
198 mutex_unlock(&work_stats_mutex);
200 pr_info("Creating new worker thread\n");
201 /* We need a worker thread, create one */
202 thread = calloc(sizeof(*thread), 1);
203 pthread_create(thread, NULL, worker_thread, NULL);
210 static int job_notify_handler(struct event_handler *h)
215 ret = read(job_notify_fd[0], buf, sizeof(buf));
217 pr_err("Failed to read: %m\n");
223 * Initialize the jobcontrol.
225 * Create the pipes that are used to grant children execution
226 * permissions. If max_jobs is zero, count the number of CPUs from
227 * /proc/cpuinfo and use that.
229 int init_jobcontrol(int max_jobs_requested)
231 static struct event_handler ev;
237 epoll_fd = epoll_create(1);
238 if (epoll_fd == -1) {
239 pr_err("Failed to epoll_create(): %m\n");
243 if (max_jobs_requested > 0) {
244 max_jobs = max_jobs_requested;
249 file = fopen("/proc/cpuinfo", "ro");
251 pr_err("Failed to open /proc/cpuinfo: %m\n");
256 * The CPU count algorithm simply reads the first 8 bytes from
257 * the /proc/cpuinfo and then expects that line to be there as
258 * many times as there are CPUs.
260 ret = fread(match, 1, sizeof(match), file);
261 if (ret < sizeof(match)) {
262 pr_err("read %d bytes when expecting %zd %m\n",
267 while(fgets(buf, sizeof(buf), file)) {
268 if (!strncmp(buf, match, sizeof(match)))
272 ret = pipe(job_notify_fd);
274 pr_err("pipe() failed: %m\n");
278 ev.fd = job_notify_fd[0];
280 ev.handle_event = job_notify_handler;
281 ev.name = "job_notify";
282 register_event_handler(&ev, EPOLL_CTL_ADD);
289 pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
291 max_jobs_pending = max_jobs * 10 + 25;
292 pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
297 int poll_job_requests(int timeout)
299 struct epoll_event event;
300 struct event_handler *job_handler;
303 /* Convert positive seconds to milliseconds */
304 timeout = timeout > 0 ? 1000 * timeout : timeout;
306 ret = epoll_wait(epoll_fd, &event, 1, timeout);
309 if (errno != EINTR) {
310 pr_err("epoll_wait: %m\n");
315 * If epoll_wait() was interrupted, better start
316 * everything again from the beginning
322 pr_info("Timed out\n");
326 job_handler = event.data.ptr;
328 if (!job_handler || !job_handler->handle_event) {
329 pr_err("Corrupted event handler for fd %d\n",
334 pr_debug("Running handler %s to handle events from fd %d\n",
335 job_handler->name, job_handler->fd);
336 job_handler->handle_event(job_handler);
339 pr_info("Workers active: %u\n", workers_active);
343 int notify_job_request(void)
348 ret = write(job_notify_fd[1], &byte, sizeof(byte));
350 pr_err("Failed to write: %m\n");
360 pr_err("fork() failed: %m\n");
365 pr_debug("Fork child %d\n", child);
372 int clear_zombie(int pid)
375 struct rusage rusage;
376 char *status_str = NULL;
380 pr_debug("Waiting on pid %d\n", pid);
383 pid = wait4(pid, &status, 0, &rusage);
385 pr_err("Error on waitid(): %m\n");
388 /* Wait until the child has become a zombie */
389 } while (!WIFEXITED(status) && !WIFSIGNALED(status));
391 if (WIFEXITED(status)) {
392 status_str = "exited with status";
393 code = WEXITSTATUS(status);
394 } else if (WIFSIGNALED(status)) {
395 status_str = "killed by signal";
396 code = WTERMSIG(status);
398 pr_debug("pid %d: %s %d.\n", pid,
400 pr_debug("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
401 (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
402 (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
408 * Runs a command cmd with params argv, connects stdin and stdout to
411 * Returns the pid of the executed process
413 int run_piped(const char *cmd, char *const argv[],
414 int *stdinfd, int *stdoutfd, int *stderrfd)
416 int ifd[2], ofd[2], efd[2], pid;
418 pr_info("Running command %s\n", cmd);
420 if (stdinfd && pipe(ifd)) {
421 pr_err("pipe() failed: %m\n");
425 if (stdoutfd && pipe(ofd)) {
426 pr_err("pipe() failed: %m\n");
430 if (stderrfd && pipe(efd)) {
431 pr_err("pipe() failed: %m\n");
436 if (pid) { /* Parent side */
457 dup2(ifd[0], STDIN_FILENO);
462 dup2(ofd[1], STDOUT_FILENO);
467 dup2(efd[1], STDERR_FILENO);
470 /* Now we have redirected standard streams to parent process */
472 pr_err("Failed to execv command %s: %m\n", cmd);
492 * Runs a command cmd with params argv, connects stdin and stdout to
495 * Returns the pid of the executed process
497 int run_piped_stream(const char *cmd, char *const argv[],
498 FILE **stdinf, FILE **stdoutf, FILE **stderrf)
500 int ifd, ofd, efd, pid;
516 pid = run_piped(cmd, argv, i, o, e);
519 *stdinf = fdopen(ifd, "r");
520 if (*stdinf == NULL) {
521 pr_err("Error opening file stream for fd %d: %m\n",
528 *stdoutf = fdopen(ofd, "w");
529 if (*stdoutf == NULL) {
530 pr_err("Error opening file stream for fd %d: %m\n",
537 *stderrf = fdopen(efd, "r");
538 if (*stderrf == NULL) {
539 pr_err("Error opening file stream for fd %d: %m\n",
549 * Forks a child and executes a command to run on parallel
552 #define BUF_SIZE (128*1024)
553 int run(const char *cmd, char *const argv[])
561 child = run_piped(cmd, argv, NULL, &ofd, &efd);
573 maxfd = max(ofd, efd);
574 error = select(maxfd, &rfds, NULL, NULL, NULL);
577 pr_err("Error with select: %m\n");
581 if (FD_ISSET(ofd, &rfds)) {
582 bytes = read(ofd, rbuf, BUF_SIZE);
586 if (FD_ISSET(efd, &rfds)) {
588 bytes = read(efd, rbuf, BUF_SIZE);
592 pr_err("select() returned unknown fd\n");
597 pr_err("read() failed: %m\n");
602 * Workaround: When a process had die and it has only
603 * written to stderr, select() doesn't indicate that
604 * there might be something to read in stderr fd. To
605 * work around this issue, we try to read stderr just
606 * in case in order to ensure everything gets read.
609 bytes = read(efd, rbuf, BUF_SIZE);
619 pr_err("%s: stderr: %s\n",
622 pr_info("%s: stdout: %s\n",
638 int register_event_handler(struct event_handler *handler, int op)
640 struct epoll_event ev;
644 if (handler->fd <= 0) {
645 pr_err("Invalid file descriptor of %d\n", handler->fd);
649 if (!handler->handle_event) {
650 pr_err("Handler callback missing\n");
668 pr_err("Invalid op %d\n", op);
672 pr_info("Doing a epoll %s for handler %s, fd %d\n", str,
673 handler->name, handler->fd);
675 ev.data.fd = handler->fd;
676 ev.data.ptr = handler;
677 ev.events = handler->events;
678 ret = epoll_ctl(epoll_fd, op, handler->fd, &ev);
680 pr_err("Failed to do epoll_ctl %s: %m\n", str);
687 void _mutex_lock_acquired(struct mutex *lock, char *file, int line)
691 pthread_getname_np(pthread_self(),
692 lock->owner_name, sizeof(lock->owner_name));
695 int _mutex_lock(struct mutex *lock, char *file, int line)
700 if (!pthread_mutex_trylock(&lock->lock))
704 pr_debug("Lock contention at %s:%d on lock %s acquired by %s at %s:%d\n",
705 file, line, lock->name,
706 lock->owner_name, lock->file, lock->line);
708 ret = pthread_mutex_lock(&lock->lock);
710 pr_err("Acquirin lock %s failed: %m, acquired on %s:%d\n",
711 lock->name, lock->file, lock->line);
715 pr_debug("Lock %s acquired at %s:%d after contention\n",
716 lock->name, file, line);
717 _mutex_lock_acquired(lock, file, line);
721 int _mutex_unlock(struct mutex *lock)
725 pthread_mutex_unlock(&lock->lock);