3 #include <sys/select.h>
7 #include <sys/signalfd.h>
8 #include <sys/resource.h>
14 static unsigned int max_jobs;
15 static unsigned int max_jobs_pending;
19 int (*work_fn)(void *);
21 struct work_struct *next;
25 struct work_struct *work;
31 struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
33 .name = "high priority",
35 .name = "high_prio_queue",
36 .lock = PTHREAD_MUTEX_INITIALIZER,
40 .name = "low priority",
42 .name = "low_prio_queue",
43 .lock = PTHREAD_MUTEX_INITIALIZER,
48 struct mutex work_stats_mutex = {
50 .lock = PTHREAD_MUTEX_INITIALIZER,
52 static int workers_active;
53 static int worker_count;
55 static int run_work_on_queue(struct work_queue *queue)
57 struct work_struct *work;
59 mutex_lock(&queue->lock);
62 mutex_unlock(&queue->lock);
68 queue->work = work->next;
71 mutex_unlock(&queue->lock);
73 pr_info("Executing work %s from queue %s, %d still pending\n",
74 work->name, queue->name, queue->length);
76 work->work_fn(work->arg);
77 pr_info("Work %s done\n", work->name);
83 static void *worker_thread(void *arg)
89 mutex_lock(&work_stats_mutex);
90 snprintf(name, sizeof(name), "worker%d", worker_count);
92 mutex_unlock(&work_stats_mutex);
94 pthread_setname_np(pthread_self(), name);
95 pthread_detach(pthread_self());
97 pr_info("Worker started\n");
99 while (!stop_working) {
104 * Execute as much work from the high priority
105 * queue as possible. Once there are no more
106 * high prio work left, break out the loop and
107 * see if we still need this many workers.
109 for (prio = 0; prio < WORK_PRIORITIES_NUM; prio++) {
111 run_work_on_queue(&work_queues[prio]);
116 if (!work_done || prio != WORK_PRIORITY_HIGH)
120 mutex_lock(&work_stats_mutex);
121 if (workers_active > max_jobs || !work_done) {
123 pr_info("Worker exiting, %d left active\n",
129 mutex_unlock(&work_stats_mutex);
135 int queue_work(unsigned int priority, char *name,
136 int (work_fn)(void *arg), void *arg)
139 struct work_queue *queue;
140 struct work_struct *work, *last_work;
142 if (priority >= WORK_PRIORITIES_NUM) {
143 pr_err("Invalid priority: %d\n", priority);
147 work = calloc(sizeof(*work), 1);
150 work->work_fn = work_fn;
153 queue = &work_queues[priority];
155 /* Insert new work at the end of the work queue */
156 mutex_lock(&queue->lock);
158 last_work = queue->work;
159 while (last_work && last_work->next)
160 last_work = last_work->next;
165 last_work->next = work;
167 pr_info("Inserted work %s in queue %s, with %d pending items\n",
168 work->name, queue->name, queue->length);
170 mutex_unlock(&queue->lock);
172 mutex_lock(&work_stats_mutex);
173 pr_info("workers_active: %d, priority: %d\n", workers_active, priority);
174 if (priority != WORK_PRIORITY_HIGH && workers_active >= max_jobs) {
175 mutex_unlock(&work_stats_mutex);
179 mutex_unlock(&work_stats_mutex);
181 pr_info("Creating new worker thread\n");
182 /* We need a worker thread, create one */
183 thread = calloc(sizeof(*thread), 1);
184 pthread_create(thread, NULL, worker_thread, NULL);
192 * Initialize the jobcontrol.
194 * Create the pipes that are used to grant children execution
195 * permissions. If max_jobs is zero, count the number of CPUs from
196 * /proc/cpuinfo and use that.
198 int init_jobcontrol(int max_jobs_requested)
205 epoll_fd = epoll_create(1);
206 if (epoll_fd == -1) {
207 pr_err("Failed to epoll_create(): %m\n");
211 if (max_jobs_requested > 0) {
212 max_jobs = max_jobs_requested;
217 file = fopen("/proc/cpuinfo", "ro");
219 pr_err("Failed to open /proc/cpuinfo: %m\n");
224 * The CPU count algorithm simply reads the first 8 bytes from
225 * the /proc/cpuinfo and then expects that line to be there as
226 * many times as there are CPUs.
228 ret = fread(match, 1, sizeof(match), file);
229 if (ret < sizeof(match)) {
230 pr_err("read %d bytes when expecting %zd %m\n",
235 while(fgets(buf, sizeof(buf), file)) {
236 if (!strncmp(buf, match, sizeof(match)))
245 pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
247 max_jobs_pending = max_jobs * 10 + 25;
248 pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
253 int poll_job_requests(int timeout)
255 struct epoll_event event;
256 struct event_handler *job_handler;
259 /* Convert positive seconds to milliseconds */
260 timeout = timeout > 0 ? 1000 * timeout : timeout;
262 ret = epoll_wait(epoll_fd, &event, 1, timeout);
265 if (errno != EINTR) {
266 pr_err("epoll_wait: %m\n");
271 * If epoll_wait() was interrupted, better start
272 * everything again from the beginning
278 pr_info("Timed out\n");
282 job_handler = event.data.ptr;
284 if (!job_handler || !job_handler->handle_event) {
285 pr_err("Corrupted event handler for fd %d\n",
290 pr_debug("Running handler %s to handle events from fd %d\n",
291 job_handler->name, job_handler->fd);
292 job_handler->handle_event(job_handler);
295 pr_info("Workers active: %u\n", workers_active);
304 pr_err("fork() failed: %m\n");
309 pr_debug("Fork child %d\n", child);
316 int clear_zombie(int pid)
319 struct rusage rusage;
320 char *status_str = NULL;
324 pr_debug("Waiting on pid %d\n", pid);
327 pid = wait4(pid, &status, 0, &rusage);
329 pr_err("Error on waitid(): %m\n");
332 /* Wait until the child has become a zombie */
333 } while (!WIFEXITED(status) && !WIFSIGNALED(status));
335 if (WIFEXITED(status)) {
336 status_str = "exited with status";
337 code = WEXITSTATUS(status);
338 } else if (WIFSIGNALED(status)) {
339 status_str = "killed by signal";
340 code = WTERMSIG(status);
342 pr_debug("pid %d: %s %d.\n", pid,
344 pr_debug("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
345 (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
346 (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
352 * Runs a command cmd with params argv, connects stdin and stdout to
355 * Returns the pid of the executed process
357 int run_piped(const char *cmd, char *const argv[],
358 int *stdinfd, int *stdoutfd, int *stderrfd)
360 int ifd[2], ofd[2], efd[2], pid;
362 pr_info("Running command %s\n", cmd);
364 if (stdinfd && pipe(ifd)) {
365 pr_err("pipe() failed: %m\n");
369 if (stdoutfd && pipe(ofd)) {
370 pr_err("pipe() failed: %m\n");
374 if (stderrfd && pipe(efd)) {
375 pr_err("pipe() failed: %m\n");
380 if (pid) { /* Parent side */
401 dup2(ifd[0], STDIN_FILENO);
406 dup2(ofd[1], STDOUT_FILENO);
411 dup2(efd[1], STDERR_FILENO);
414 /* Now we have redirected standard streams to parent process */
416 pr_err("Failed to execv command %s: %m\n", cmd);
423 * Runs a command cmd with params argv, connects stdin and stdout to
426 * Returns the pid of the executed process
428 int run_piped_stream(const char *cmd, char *const argv[],
429 FILE **stdinf, FILE **stdoutf, FILE **stderrf)
431 int ifd, ofd, efd, pid;
447 pid = run_piped(cmd, argv, i, o, e);
450 *stdinf = fdopen(ifd, "r");
451 if (*stdinf == NULL) {
452 pr_err("Error opening file stream for fd %d: %m\n",
459 *stdoutf = fdopen(ofd, "r");
460 if (*stdoutf == NULL) {
461 pr_err("Error opening file stream for fd %d: %m\n",
468 *stderrf = fdopen(efd, "r");
469 if (*stderrf == NULL) {
470 pr_err("Error opening file stream for fd %d: %m\n",
480 * Forks a child and executes a command to run on parallel
483 #define max(a,b) (a) < (b) ? (b) : (a)
484 #define BUF_SIZE (128*1024)
485 int run(const char *cmd, char *const argv[])
493 child = run_piped(cmd, argv, NULL, &ofd, &efd);
505 maxfd = max(ofd, efd);
506 error = select(maxfd, &rfds, NULL, NULL, NULL);
509 pr_err("Error with select: %m\n");
513 if (FD_ISSET(ofd, &rfds)) {
514 bytes = read(ofd, rbuf, BUF_SIZE);
518 if (FD_ISSET(efd, &rfds)) {
520 bytes = read(efd, rbuf, BUF_SIZE);
524 pr_err("select() returned unknown fd\n");
529 pr_err("read() failed: %m\n");
534 * Workaround: When a process had die and it has only
535 * written to stderr, select() doesn't indicate that
536 * there might be something to read in stderr fd. To
537 * work around this issue, we try to read stderr just
538 * in case in order to ensure everything gets read.
541 bytes = read(efd, rbuf, BUF_SIZE);
551 pr_err("%s: stderr: %s\n",
554 pr_info("%s: stdout: %s\n",
570 int register_event_handler(struct event_handler *handler)
572 struct epoll_event ev;
575 if (handler->fd <= 0) {
576 pr_err("Invalid file descriptor of %d\n", handler->fd);
580 if (!handler->handle_event) {
581 pr_err("Handler callback missing\n");
585 pr_info("Registering handler for %s, fd %d\n",
586 handler->name, handler->fd);
588 ev.data.fd = handler->fd;
589 ev.data.ptr = handler;
590 ev.events = handler->events;
591 ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handler->fd, &ev);
593 pr_err("Failed to add epoll_fd: %m\n");
600 void _mutex_lock_acquired(struct mutex *lock, char *file, int line)
606 int _mutex_lock(struct mutex *lock, char *file, int line)
610 if (!pthread_mutex_trylock(&lock->lock))
613 pr_info("Lock contention on lock %s on %s:%d\n",
614 lock->name, lock->file, lock->line);
616 ret = pthread_mutex_lock(&lock->lock);
618 pr_err("Acquirin lock %s failed: %m, acquired %s:%d\n",
619 lock->name, lock->file, lock->line);
622 _mutex_lock_acquired(lock, file, line);
626 int _mutex_unlock(struct mutex *lock)
630 pthread_mutex_unlock(&lock->lock);