#include #include #include #include #include #include #include #include #include "process.h" #include "debug.h" #include "utils.h" static int epoll_fd; static unsigned int max_jobs; static unsigned int max_jobs_pending; struct work_struct { const char *name; int (*work_fn)(void *); void *arg; struct work_struct *next; }; struct work_queue { struct work_struct *work; int length; char *name; struct mutex lock; }; struct work_queue work_queues[WORK_PRIORITIES_NUM] = { { .name = "high priority", .lock = { .name = "high_prio_queue", .lock = PTHREAD_MUTEX_INITIALIZER, }, }, { .name = "low priority", .lock = { .name = "low_prio_queue", .lock = PTHREAD_MUTEX_INITIALIZER, }, }, }; struct mutex work_stats_mutex = { .name = "work_stats", .lock = PTHREAD_MUTEX_INITIALIZER, }; static int workers_active; static int worker_count; static int job_notify_fd[2]; static int run_work_on_queue(struct work_queue *queue) { struct work_struct *work; mutex_lock(&queue->lock); if (!queue->work) { mutex_unlock(&queue->lock); return 0; } /* Take next work */ work = queue->work; queue->work = work->next; queue->length--; mutex_unlock(&queue->lock); pr_info("Executing work %s from queue %s, %d still pending\n", work->name, queue->name, queue->length); work->work_fn(work->arg); pr_info("Work %s done\n", work->name); free(work); return 1; } static void *worker_thread(void *arg) { int ret; char name[16]; mutex_lock(&work_stats_mutex); snprintf(name, sizeof(name), "worker%d", worker_count); worker_count++; mutex_unlock(&work_stats_mutex); pthread_setname_np(pthread_self(), name); pthread_detach(pthread_self()); pr_info("Worker started\n"); /* Execute all high priority work from the queue */ while (run_work_on_queue(&work_queues[WORK_PRIORITY_HIGH])) ; /* * All high priority work is now done, see if we have enough * workers executing low priority worl. Continue from there if * needed. */ mutex_lock(&work_stats_mutex); if (workers_active > max_jobs) goto out_unlock; mutex_unlock(&work_stats_mutex); /* * Start executing the low priority work. Drop the nice value * as this really is low priority stuff */ ret = nice(19); pr_info("Worker priority dropped to %d\n", ret); while (run_work_on_queue(&work_queues[WORK_PRIORITY_LOW])) ; /* All done, exit */ mutex_lock(&work_stats_mutex); out_unlock: workers_active--; pr_info("Worker exiting, %d left active\n", workers_active); /* * Last exiting worker zeroes the worker_count. This * ensures next time we start spawning worker threads * the first thread will have number zero on its name. */ if (!workers_active) worker_count = 0; /* * Kick the job poller. If all jobs were active at this point * the scheduler thread will wait indefinitely until someone * tells it to do something. We may now know when next job is * available, so it is better for the scheduler to recalculate * its sleep time. */ notify_job_request(); mutex_unlock(&work_stats_mutex); return NULL; } int queue_work(unsigned int priority, char *name, int (work_fn)(void *arg), void *arg) { pthread_t *thread; struct work_queue *queue; struct work_struct *work, *last_work; if (priority >= WORK_PRIORITIES_NUM) { pr_err("Invalid priority: %d\n", priority); return -EINVAL; } work = calloc(sizeof(*work), 1); work->name = name; work->work_fn = work_fn; work->arg = arg; queue = &work_queues[priority]; /* Insert new work at the end of the work queue */ mutex_lock(&queue->lock); last_work = queue->work; while (last_work && last_work->next) last_work = last_work->next; if (!last_work) queue->work = work; else last_work->next = work; pr_info("Inserted work %s in queue %s, with %d pending items\n", work->name, queue->name, queue->length); queue->length++; mutex_unlock(&queue->lock); mutex_lock(&work_stats_mutex); pr_info("workers_active: %d, priority: %d\n", workers_active, priority); if (priority != WORK_PRIORITY_HIGH && workers_active >= max_jobs) { mutex_unlock(&work_stats_mutex); return 0; } workers_active++; mutex_unlock(&work_stats_mutex); pr_info("Creating new worker thread\n"); /* We need a worker thread, create one */ thread = calloc(sizeof(*thread), 1); pthread_create(thread, NULL, worker_thread, NULL); free(thread); return 0; } static int job_notify_handler(struct event_handler *h) { int ret; char buf[64]; ret = read(job_notify_fd[0], buf, sizeof(buf)); if (ret < 0) pr_err("Failed to read: %m\n"); return 0; } /* * Initialize the jobcontrol. * * Create the pipes that are used to grant children execution * permissions. If max_jobs is zero, count the number of CPUs from * /proc/cpuinfo and use that. */ int init_jobcontrol(int max_jobs_requested) { static struct event_handler ev; FILE *file; int ret; char buf[256]; char match[8]; epoll_fd = epoll_create(1); if (epoll_fd == -1) { pr_err("Failed to epoll_create(): %m\n"); return -1; } if (max_jobs_requested > 0) { max_jobs = max_jobs_requested; goto no_count_cpus; } max_jobs++; file = fopen("/proc/cpuinfo", "ro"); if (!file) { pr_err("Failed to open /proc/cpuinfo: %m\n"); goto open_fail; } /* * The CPU count algorithm simply reads the first 8 bytes from * the /proc/cpuinfo and then expects that line to be there as * many times as there are CPUs. */ ret = fread(match, 1, sizeof(match), file); if (ret < sizeof(match)) { pr_err("read %d bytes when expecting %zd %m\n", ret, sizeof(match)); goto read_fail; } while(fgets(buf, sizeof(buf), file)) { if (!strncmp(buf, match, sizeof(match))) max_jobs++; } ret = pipe(job_notify_fd); if (ret) { pr_err("pipe() failed: %m\n"); return ret; } ev.fd = job_notify_fd[0]; ev.events = EPOLLIN; ev.handle_event = job_notify_handler; ev.name = "job_notify"; register_event_handler(&ev, EPOLL_CTL_ADD); open_fail: read_fail: fclose(file); no_count_cpus: pr_info("Set maximum number of parallel jobs to %d\n", max_jobs); max_jobs_pending = max_jobs * 10 + 25; pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending); return 0; } int poll_job_requests(int timeout) { struct epoll_event event; struct event_handler *job_handler; int ret; /* Convert positive seconds to milliseconds */ timeout = timeout > 0 ? 1000 * timeout : timeout; ret = epoll_wait(epoll_fd, &event, 1, timeout); if (ret == -1) { if (errno != EINTR) { pr_err("epoll_wait: %m\n"); return -1; } /* * If epoll_wait() was interrupted, better start * everything again from the beginning */ return 0; } if (ret == 0) { pr_info("Timed out\n"); goto out; } job_handler = event.data.ptr; if (!job_handler || !job_handler->handle_event) { pr_err("Corrupted event handler for fd %d\n", event.data.fd); goto out; } pr_debug("Running handler %s to handle events from fd %d\n", job_handler->name, job_handler->fd); job_handler->handle_event(job_handler); out: pr_info("Workers active: %u\n", workers_active); return ret; } int notify_job_request(void) { int ret; char byte = 0; ret = write(job_notify_fd[1], &byte, sizeof(byte)); if (ret < 0) pr_err("Failed to write: %m\n"); return 0; } int do_fork(void) { int child; child = fork(); if (child < 0) { pr_err("fork() failed: %m\n"); return -1; } if (child) { pr_debug("Fork child %d\n", child); return child; } return 0; } int clear_zombie(int pid) { int status; struct rusage rusage; char *status_str = NULL; int code = 0; if (pid) pr_debug("Waiting on pid %d\n", pid); do { pid = wait4(pid, &status, 0, &rusage); if (pid < 0) { pr_err("Error on waitid(): %m\n"); return 0; } /* Wait until the child has become a zombie */ } while (!WIFEXITED(status) && !WIFSIGNALED(status)); if (WIFEXITED(status)) { status_str = "exited with status"; code = WEXITSTATUS(status); } else if (WIFSIGNALED(status)) { status_str = "killed by signal"; code = WTERMSIG(status); } pr_debug("pid %d: %s %d.\n", pid, status_str, code); pr_debug("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid, (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000, (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000); return 1; } /* * Runs a command cmd with params argv, connects stdin and stdout to * readfd and writefd * * Returns the pid of the executed process */ int run_piped(const char *cmd, char *const argv[], int *stdinfd, int *stdoutfd, int *stderrfd) { int ifd[2], ofd[2], efd[2], pid; pr_info("Running command %s\n", cmd); if (stdinfd && pipe(ifd)) { pr_err("pipe() failed: %m\n"); return -1; } if (stdoutfd && pipe(ofd)) { pr_err("pipe() failed: %m\n"); goto close_ifd; } if (stderrfd && pipe(efd)) { pr_err("pipe() failed: %m\n"); goto close_ofd; } pid = do_fork(); if (pid) { /* Parent side */ if (stdinfd) { close(ifd[0]); *stdinfd = ifd[0]; } if (stdoutfd) { close(ofd[1]); *stdoutfd = ofd[0]; } if (stderrfd) { close(efd[1]); *stderrfd = efd[0]; } return pid; } if (stdinfd) { close(ifd[1]); dup2(ifd[0], STDIN_FILENO); } if (stdoutfd) { close(ofd[0]); dup2(ofd[1], STDOUT_FILENO); } if (stderrfd) { close(efd[0]); dup2(efd[1], STDERR_FILENO); } /* Now we have redirected standard streams to parent process */ execvp(cmd, argv); pr_err("Failed to execv command %s: %m\n", cmd); exit(1); return 0; close_ofd: if (stdoutfd) { close(ofd[0]); close(ofd[1]); } close_ifd: if (stdinfd) { close(ifd[0]); close(ifd[1]); } return -1; } /* * Runs a command cmd with params argv, connects stdin and stdout to * readfd and writefd * * Returns the pid of the executed process */ int run_piped_stream(const char *cmd, char *const argv[], FILE **stdinf, FILE **stdoutf, FILE **stderrf) { int ifd, ofd, efd, pid; int *i, *o, *e; if (stdinf) i = &ifd; else i = 0; if (stdoutf) o = &ofd; else o = 0; if (stderrf) e = &efd; else e = 0; pid = run_piped(cmd, argv, i, o, e); if (stdinf) { *stdinf = fdopen(ifd, "w"); if (*stdinf == NULL) { pr_err("Error opening file stream for fd %d: %m\n", ifd); return -1; } } if (stdoutf) { *stdoutf = fdopen(ofd, "r"); if (*stdoutf == NULL) { pr_err("Error opening file stream for fd %d: %m\n", ofd); return -1; } } if (stderrf) { *stderrf = fdopen(efd, "r"); if (*stderrf == NULL) { pr_err("Error opening file stream for fd %d: %m\n", efd); return -1; } } return pid; } /* * Forks a child and executes a command to run on parallel */ #define BUF_SIZE (128*1024) int run(const char *cmd, char *const argv[]) { int child, error; int ofd, efd; fd_set rfds; int maxfd; int eof = 0; child = run_piped(cmd, argv, NULL, &ofd, &efd); FD_ZERO(&rfds); FD_SET(ofd, &rfds); FD_SET(efd, &rfds); while (!eof) { char *sptr , *eptr; char rbuf[BUF_SIZE]; int bytes; int is_stderr = 0; maxfd = max(ofd, efd); error = select(maxfd, &rfds, NULL, NULL, NULL); if (error < 0) { pr_err("Error with select: %m\n"); break; } if (FD_ISSET(ofd, &rfds)) { bytes = read(ofd, rbuf, BUF_SIZE); goto print; } if (FD_ISSET(efd, &rfds)) { is_stderr = 1; bytes = read(efd, rbuf, BUF_SIZE); goto print; } pr_err("select() returned unknown fd\n"); break; print: if (bytes < 0) { pr_err("read() failed: %m\n"); break; } /* * Workaround: When a process had die and it has only * written to stderr, select() doesn't indicate that * there might be something to read in stderr fd. To * work around this issue, we try to read stderr just * in case in order to ensure everything gets read. */ if (bytes == 0) { bytes = read(efd, rbuf, BUF_SIZE); is_stderr = 1; eof = 1; } sptr = eptr = rbuf; while (bytes--) { if (*eptr == '\n') { *eptr = 0; if (is_stderr) pr_err("%s: stderr: %s\n", cmd, sptr); else pr_info("%s: stdout: %s\n", cmd, sptr); sptr = eptr + 1; } eptr++; } } close(ofd); close(efd); clear_zombie(child); return 0; } int register_event_handler(struct event_handler *handler, int op) { struct epoll_event ev; int ret; const char *str; if (handler->fd <= 0) { pr_err("Invalid file descriptor of %d\n", handler->fd); return -1; } if (!handler->handle_event) { pr_err("Handler callback missing\n"); return -1; } switch (op) { case EPOLL_CTL_ADD: str = "register"; break; case EPOLL_CTL_MOD: str = "modify"; break; case EPOLL_CTL_DEL: str = "deregister"; break; default: pr_err("Invalid op %d\n", op); return -1; } pr_info("Doing a epoll %s for handler %s, fd %d\n", str, handler->name, handler->fd); ev.data.fd = handler->fd; ev.data.ptr = handler; ev.events = handler->events; ret = epoll_ctl(epoll_fd, op, handler->fd, &ev); if (ret) { pr_err("Failed to do epoll_ctl %s: %m\n", str); return -1; } return 0; } void _mutex_lock_acquired(struct mutex *lock, char *file, int line) { lock->line = line; lock->file = file; pthread_getname_np(pthread_self(), lock->owner_name, sizeof(lock->owner_name)); } int _mutex_lock(struct mutex *lock, char *file, int line) { int ret = 0; int contended = 0; if (!pthread_mutex_trylock(&lock->lock)) goto out_lock; contended = 1; pr_debug("Lock contention at %s:%d on lock %s acquired by %s at %s:%d\n", file, line, lock->name, lock->owner_name, lock->file, lock->line); ret = pthread_mutex_lock(&lock->lock); if (ret) pr_err("Acquirin lock %s failed: %m, acquired on %s:%d\n", lock->name, lock->file, lock->line); out_lock: if (contended) pr_debug("Lock %s acquired at %s:%d after contention\n", lock->name, file, line); _mutex_lock_acquired(lock, file, line); return ret; } int _mutex_unlock(struct mutex *lock) { lock->line = 0; lock->file = NULL; pthread_mutex_unlock(&lock->lock); return 0; }