3 #include <sys/select.h>
7 #include <sys/signalfd.h>
8 #include <sys/resource.h>
14 static unsigned int max_jobs;
15 static unsigned int job_count;
16 static unsigned int jobs_pending;
17 static unsigned int max_jobs_pending;
21 int (*work_fn)(void *);
23 struct work_struct *next;
27 struct work_struct *work;
33 struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
35 .name = "high priority",
37 .name = "high_prio_queue",
38 .lock = PTHREAD_MUTEX_INITIALIZER,
42 .name = "low priority",
44 .name = "low_prio_queue",
45 .lock = PTHREAD_MUTEX_INITIALIZER,
50 struct mutex work_pending_mutex = {
51 .name = "work_pending",
52 .lock = PTHREAD_MUTEX_INITIALIZER,
54 pthread_cond_t work_pending_cond = PTHREAD_COND_INITIALIZER;
56 static int run_work_on_queue(struct work_queue *queue)
58 struct work_struct *work;
60 mutex_lock(&queue->lock);
63 mutex_unlock(&queue->lock);
69 queue->work = work->next;
73 * If queue is not empty, try waking up more workers. It is
74 * possible that when work were queued, the first worker did
75 * not wake up soon enough and
77 if (queue->length > 0)
78 pthread_cond_signal(&work_pending_cond);
80 mutex_unlock(&queue->lock);
82 pr_info("Executing work %s from queue %s, %d still pending\n",
83 work->name, queue->name, queue->length);
85 work->work_fn(work->arg);
86 pr_info("Work %s done\n", work->name);
92 static void *worker_thread(void *arg)
98 snprintf(name, sizeof(name), "worker%ld", (long)arg);
99 pthread_setname_np(pthread_self(), name);
103 int prio, work_done = 0;
106 * Execute as many works from the queues as
107 * there are, starting from highest priority
110 for (prio = 0; prio < WORK_PRIORITIES_NUM; prio++) {
112 run_work_on_queue(&work_queues[prio]);
121 pr_info("Worker going to sleep\n");
122 ret = pthread_cond_wait(&work_pending_cond,
123 &work_pending_mutex.lock);
125 pr_err("Error: %m\n");
127 mutex_lock_acquired(&work_pending_mutex);
129 mutex_unlock(&work_pending_mutex);
136 int queue_work(unsigned int priority, char *name,
137 int (work_fn)(void *arg), void *arg)
139 struct work_queue *queue;
140 struct work_struct *work, *last_work;
142 if (priority >= WORK_PRIORITIES_NUM) {
143 pr_err("Invalid priority: %d\n", priority);
147 work = calloc(sizeof(*work), 1);
150 work->work_fn = work_fn;
153 queue = &work_queues[priority];
155 /* Insert new work at the end of the work queue */
156 mutex_lock(&queue->lock);
158 last_work = queue->work;
159 while (last_work && last_work->next)
160 last_work = last_work->next;
165 last_work->next = work;
167 pr_info("Inserted work %s in queue %s, with %d pending items\n",
168 work->name, queue->name, queue->length);
170 mutex_unlock(&queue->lock);
172 pthread_cond_signal(&work_pending_cond);
178 * Initialize the jobcontrol.
180 * Create the pipes that are used to grant children execution
181 * permissions. If max_jobs is zero, count the number of CPUs from
182 * /proc/cpuinfo and use that.
184 int init_jobcontrol(int max_jobs_requested)
193 epoll_fd = epoll_create(1);
194 if (epoll_fd == -1) {
195 pr_err("Failed to epoll_create(): %m\n");
199 if (max_jobs_requested > 0) {
200 max_jobs = max_jobs_requested;
205 file = fopen("/proc/cpuinfo", "ro");
207 pr_err("Failed to open /proc/cpuinfo: %m\n");
212 * The CPU count algorithm simply reads the first 8 bytes from
213 * the /proc/cpuinfo and then expects that line to be there as
214 * many times as there are CPUs.
216 ret = fread(match, 1, sizeof(match), file);
217 if (ret < sizeof(match)) {
218 pr_err("read %d bytes when expecting %zd %m\n",
223 while(fgets(buf, sizeof(buf), file)) {
224 if (!strncmp(buf, match, sizeof(match)))
233 pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
235 max_jobs_pending = max_jobs * 10 + 25;
236 pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
238 /* Create worker threads */
239 thread = calloc(sizeof(*thread), max_jobs);
240 for (i = 0; i < max_jobs; i++)
241 pthread_create(&thread[i], NULL, worker_thread, (void *)i);
246 int poll_job_requests(int timeout)
248 struct epoll_event event;
249 struct event_handler *job_handler;
252 /* Convert positive seconds to milliseconds */
253 timeout = timeout > 0 ? 1000 * timeout : timeout;
255 ret = epoll_wait(epoll_fd, &event, 1, timeout);
258 if (errno != EINTR) {
259 pr_err("epoll_wait: %m\n");
264 * If epoll_wait() was interrupted, better start
265 * everything again from the beginning
271 pr_info("Timed out\n");
275 job_handler = event.data.ptr;
277 if (!job_handler || !job_handler->handle_event) {
278 pr_err("Corrupted event handler for fd %d\n",
283 pr_debug("Running handler %s to handle events from fd %d\n",
284 job_handler->name, job_handler->fd);
285 job_handler->handle_event(job_handler);
288 pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
297 pr_err("fork() failed: %m\n");
302 pr_debug("Fork child %d\n", child);
309 int clear_zombie(int pid)
312 struct rusage rusage;
313 char *status_str = NULL;
317 pr_debug("Waiting on pid %d\n", pid);
320 pid = wait4(pid, &status, 0, &rusage);
322 pr_err("Error on waitid(): %m\n");
325 /* Wait until the child has become a zombie */
326 } while (!WIFEXITED(status) && !WIFSIGNALED(status));
328 if (WIFEXITED(status)) {
329 status_str = "exited with status";
330 code = WEXITSTATUS(status);
331 } else if (WIFSIGNALED(status)) {
332 status_str = "killed by signal";
333 code = WTERMSIG(status);
335 pr_debug("pid %d: %s %d.\n", pid,
337 pr_debug("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
338 (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
339 (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
345 * Runs a command cmd with params argv, connects stdin and stdout to
348 * Returns the pid of the executed process
350 int run_piped(const char *cmd, char *const argv[],
351 int *stdinfd, int *stdoutfd, int *stderrfd)
353 int ifd[2], ofd[2], efd[2], pid;
355 pr_info("Running command %s\n", cmd);
357 if (stdinfd && pipe(ifd)) {
358 pr_err("pipe() failed: %m\n");
362 if (stdoutfd && pipe(ofd)) {
363 pr_err("pipe() failed: %m\n");
367 if (stderrfd && pipe(efd)) {
368 pr_err("pipe() failed: %m\n");
373 if (pid) { /* Parent side */
394 dup2(ifd[0], STDIN_FILENO);
399 dup2(ofd[1], STDOUT_FILENO);
404 dup2(efd[1], STDERR_FILENO);
407 /* Now we have redirected standard streams to parent process */
409 pr_err("Failed to execv command %s: %m\n", cmd);
416 * Runs a command cmd with params argv, connects stdin and stdout to
419 * Returns the pid of the executed process
421 int run_piped_stream(const char *cmd, char *const argv[],
422 FILE **stdinf, FILE **stdoutf, FILE **stderrf)
424 int ifd, ofd, efd, pid;
440 pid = run_piped(cmd, argv, i, o, e);
443 *stdinf = fdopen(ifd, "r");
444 if (*stdinf == NULL) {
445 pr_err("Error opening file stream for fd %d: %m\n",
452 *stdoutf = fdopen(ofd, "r");
453 if (*stdoutf == NULL) {
454 pr_err("Error opening file stream for fd %d: %m\n",
461 *stderrf = fdopen(efd, "r");
462 if (*stderrf == NULL) {
463 pr_err("Error opening file stream for fd %d: %m\n",
473 * Forks a child and executes a command to run on parallel
476 #define max(a,b) (a) < (b) ? (b) : (a)
477 #define BUF_SIZE (128*1024)
478 int run(const char *cmd, char *const argv[])
486 child = run_piped(cmd, argv, NULL, &ofd, &efd);
498 maxfd = max(ofd, efd);
499 error = select(maxfd, &rfds, NULL, NULL, NULL);
502 pr_err("Error with select: %m\n");
506 if (FD_ISSET(ofd, &rfds)) {
507 bytes = read(ofd, rbuf, BUF_SIZE);
511 if (FD_ISSET(efd, &rfds)) {
513 bytes = read(efd, rbuf, BUF_SIZE);
517 pr_err("select() returned unknown fd\n");
522 pr_err("read() failed: %m\n");
527 * Workaround: When a process had die and it has only
528 * written to stderr, select() doesn't indicate that
529 * there might be something to read in stderr fd. To
530 * work around this issue, we try to read stderr just
531 * in case in order to ensure everything gets read.
534 bytes = read(efd, rbuf, BUF_SIZE);
544 pr_err("%s: stderr: %s\n",
547 pr_info("%s: stdout: %s\n",
563 int register_event_handler(struct event_handler *handler)
565 struct epoll_event ev;
568 if (handler->fd <= 0) {
569 pr_err("Invalid file descriptor of %d\n", handler->fd);
573 if (!handler->handle_event) {
574 pr_err("Handler callback missing\n");
578 pr_info("Registering handler for %s, fd %d\n",
579 handler->name, handler->fd);
581 ev.data.fd = handler->fd;
582 ev.data.ptr = handler;
583 ev.events = handler->events;
584 ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handler->fd, &ev);
586 pr_err("Failed to add epoll_fd: %m\n");
593 void _mutex_lock_acquired(struct mutex *lock, char *file, int line)
599 int _mutex_lock(struct mutex *lock, char *file, int line)
603 if (!pthread_mutex_trylock(&lock->lock))
606 pr_info("Lock contention on lock %s on %s:%d\n",
607 lock->name, lock->file, lock->line);
609 ret = pthread_mutex_lock(&lock->lock);
611 pr_err("Acquirin lock %s failed: %m, acquired %s:%d\n",
612 lock->name, lock->file, lock->line);
615 _mutex_lock_acquired(lock, file, line);
619 int _mutex_unlock(struct mutex *lock)
623 pthread_mutex_unlock(&lock->lock);