#include #include #include #include #include #include #include #include #include "process.h" #include "debug.h" static int child_count; static int parent_count; static int job_request_fd[2]; static int job_get_permission_fd[2]; static int epoll_fd; static unsigned int max_jobs; static unsigned int job_count; static unsigned int jobs_pending; static unsigned int max_jobs_pending; struct work_struct { const char *name; int (*work_fn)(void *); void *arg; struct work_struct *next; }; struct work_queue { struct work_struct *work; int length; char *name; struct mutex lock; }; struct work_queue work_queues[WORK_PRIORITIES_NUM] = { { .name = "high priority", .lock = { .name = "high_prio_queue", .lock = PTHREAD_MUTEX_INITIALIZER, }, }, { .name = "low priority", .lock = { .name = "low_prio_queue", .lock = PTHREAD_MUTEX_INITIALIZER, }, }, }; struct mutex work_pending_mutex = { .name = "work_pending", .lock = PTHREAD_MUTEX_INITIALIZER, }; pthread_cond_t work_pending_cond = PTHREAD_COND_INITIALIZER; static int run_work_on_queue(struct work_queue *queue) { struct work_struct *work; mutex_lock(&queue->lock); if (!queue->work) { pr_info("No work to run on queue %s\n", queue->name); mutex_unlock(&queue->lock); return 0; } /* Take next work */ work = queue->work; queue->work = work->next; queue->length--; /* * If queue is not empty, try waking up more workers. It is * possible that when work were queued, the first worker did * not wake up soon enough and */ if (queue->length > 0) pthread_cond_signal(&work_pending_cond); mutex_unlock(&queue->lock); pr_info("Executing work %s from queue %s, %d still pending\n", work->name, queue->name, queue->length); work->work_fn(work->arg); pr_info("Work %s done\n", work->name); free(work); return 1; } static void *worker_thread(void *arg) { int ret; char name[16]; snprintf(name, sizeof(name), "worker%ld", (long)arg); pthread_setname_np(pthread_self(), name); while (1) { while (1) { int prio, work_done = 0; /* * Execute as many works from the queues as * there are, starting from highest priority * queue */ for (prio = 0; prio < WORK_PRIORITIES_NUM; prio++) { work_done = run_work_on_queue(&work_queues[prio]); if (work_done) break; } if (!work_done) break; } pr_info("Worker going to sleep\n"); ret = pthread_cond_wait(&work_pending_cond, &work_pending_mutex.lock); if (ret < 0) pr_err("Error: %m\n"); mutex_lock_acquired(&work_pending_mutex); mutex_unlock(&work_pending_mutex); } return NULL; } int queue_work(unsigned int priority, char *name, int (work_fn)(void *arg), void *arg) { struct work_queue *queue; struct work_struct *work, *last_work; if (priority >= WORK_PRIORITIES_NUM) { pr_err("Invalid priority: %d\n", priority); return -EINVAL; } work = calloc(sizeof(*work), 1); work->name = name; work->work_fn = work_fn; work->arg = arg; queue = &work_queues[priority]; /* Insert new work at the end of the work queue */ mutex_lock(&queue->lock); last_work = queue->work; while (last_work && last_work->next) last_work = last_work->next; if (!last_work) queue->work = work; else last_work->next = work; pr_info("Inserted work %s in queue %s, with %d pending items\n", work->name, queue->name, queue->length); queue->length++; mutex_unlock(&queue->lock); pthread_cond_signal(&work_pending_cond); return 0; } int get_child_count(void) { return child_count; } int get_parent_count(void) { return parent_count; } static int handle_signals(struct event_handler *h) { struct signalfd_siginfo siginfo; int ret; ret = read(h->fd, &siginfo, sizeof(siginfo)); if (ret < sizeof(siginfo)) { pr_err("Expected %zd from read, got %d: %m\n", sizeof(siginfo), ret); return -1; } if (siginfo.ssi_signo != SIGCHLD) { pr_err("Unexpected signal %d, ignoring\n", siginfo.ssi_signo); return -1; } harvest_zombies(siginfo.ssi_pid); return 0; } static int grant_new_job(void) { int ret; char byte = 0; job_count++; pr_info("Granting new job. %d jobs currently and %d pending\n", job_count, jobs_pending); ret = write(job_get_permission_fd[1], &byte, 1); if (ret != 1) { pr_err("Failed to write 1 byte: %m\n"); return -1; } return 0; } static int deny_job(void) { int ret; char byte = -1; pr_info("Denying new job. %d jobs currently and %d pending, " "limit of pending jobs is %d\n", job_count, jobs_pending, max_jobs_pending); ret = write(job_get_permission_fd[1], &byte, 1); if (ret != 1) { pr_err("Failed to write 1 byte: %m\n"); return -1; } return 0; } static int handle_job_request(struct event_handler *h) { int ret, pid; ret = read(job_request_fd[0], &pid, sizeof(pid)); if (ret < 0) { pr_err("Failed to read: %m\n"); return -1; } if (ret == 0) { pr_info("Read zero bytes\n"); return 0; } if (pid > 0) { if (job_count >= max_jobs) { if (jobs_pending < max_jobs_pending) jobs_pending++; else deny_job(); } else { ret = grant_new_job(); return 0; } } else if (pid < 0) { if (job_count > max_jobs) pr_err("BUG: Job %u jobs exceeded limit %u\n", job_count, max_jobs); pr_info("Job %d finished\n", -pid); job_count--; if (jobs_pending) { jobs_pending--; ret = grant_new_job(); return 0; } } return 0; } struct event_handler signal_handler = { .handle_event = handle_signals, .events = EPOLLIN, .name = "signal", }; struct event_handler job_request_handler = { .handle_event = handle_job_request, .events = EPOLLIN, .name = "job_request", }; /* * Initialize the jobcontrol. * * Create the pipes that are used to grant children execution * permissions. If max_jobs is zero, count the number of CPUs from * /proc/cpuinfo and use that. */ int init_jobcontrol(int max_jobs_requested) { FILE *file; int ret; sigset_t sigmask; char buf[256]; char match[8]; pthread_t *thread; int i; if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) { pr_err("Failed to create pipe: %m\n"); return -1; } if (pipe2(job_get_permission_fd, O_CLOEXEC)) { pr_err("Failed to create pipe: %m\n"); return -1; } epoll_fd = epoll_create(1); if (epoll_fd == -1) { pr_err("Failed to epoll_create(): %m\n"); return -1; } job_request_handler.fd = job_request_fd[0]; register_event_handler(&job_request_handler); sigemptyset(&sigmask); sigaddset(&sigmask, SIGCHLD); /* Block SIGCHLD so that it becomes readable via signalfd */ ret = sigprocmask(SIG_BLOCK, &sigmask, NULL); if (ret < 0) { pr_err("Failed to sigprocmask: %m\n"); } signal_handler.fd = signalfd(-1, &sigmask, SFD_CLOEXEC); if (job_request_handler.fd < 0) { pr_err("Failed to create signal_fd: %m\n"); return -1; } register_event_handler(&signal_handler); if (max_jobs_requested > 0) { max_jobs = max_jobs_requested; goto no_count_cpus; } max_jobs++; file = fopen("/proc/cpuinfo", "ro"); if (!file) { pr_err("Failed to open /proc/cpuinfo: %m\n"); goto open_fail; } /* * The CPU count algorithm simply reads the first 8 bytes from * the /proc/cpuinfo and then expects that line to be there as * many times as there are CPUs. */ ret = fread(match, 1, sizeof(match), file); if (ret < sizeof(match)) { pr_err("read %d bytes when expecting %zd %m\n", ret, sizeof(match)); goto read_fail; } while(fgets(buf, sizeof(buf), file)) { if (!strncmp(buf, match, sizeof(match))) max_jobs++; } open_fail: read_fail: fclose(file); no_count_cpus: pr_info("Set maximum number of parallel jobs to %d\n", max_jobs); max_jobs_pending = max_jobs * 10 + 25; pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending); /* Create worker threads */ thread = calloc(sizeof(*thread), max_jobs); for (i = 0; i < max_jobs; i++) pthread_create(&thread[i], NULL, worker_thread, (void *)i); /* * Magic sleep. There are too many fork() calls at the moment * so we must ensure our threads don't print anything out * while a fork() is executed. Otherwise the child will * inherit glibc internal locks while they are locked, and * function calls such as printf will deadlock. */ sleep(1); return 0; } int poll_job_requests(int timeout) { struct epoll_event event; struct event_handler *job_handler; int ret; /* Convert positive seconds to milliseconds */ timeout = timeout > 0 ? 1000 * timeout : timeout; ret = epoll_wait(epoll_fd, &event, 1, timeout); if (ret == -1) { if (errno != EINTR) { pr_err("epoll_wait: %m\n"); return -1; } /* * If epoll_wait() was interrupted, better start * everything again from the beginning */ return 0; } if (ret == 0) { pr_info("Timed out\n"); goto out; } job_handler = event.data.ptr; if (!job_handler || !job_handler->handle_event) { pr_err("Corrupted event handler for fd %d\n", event.data.fd); goto out; } pr_debug("Running handler %s to handle events from fd %d\n", job_handler->name, job_handler->fd); job_handler->handle_event(job_handler); out: pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending); return ret; } /* * Per process flag indicating whether this child has requested fork * limiting. If it has, it must also tell the master parent when it * has died so that the parent can give next pending job permission to * go. */ static int is_limited_fork; int do_fork(void) { int child; child = fork(); if (child < 0) { pr_err("fork() failed: %m\n"); return -1; } if (child) { child_count++; pr_debug("Fork %d, child %d\n", child_count, child); return child; } /* * Also do not notify the master parent the death of this * child. Only childs that have been created with * do_fork_limited() can have this flag set. */ is_limited_fork = 0; /* * Close unused ends of the job control pipes. Only the parent * which controls the jobs may have the write end open of the * job_get_permission_fd and the read end of the * job_request_fd. Failing to close the pipe ends properly * will cause the childs to wait forever for the run * permission in case parent dies prematurely. * * Note! The file descriptor must be closed once and only * once. They are marked to -1 to make it impossible for * subseqent do_fork() calls from closing them again (in which * case some other file descriptor might already be reserved * for the same number) and prevent accidentally closing some * innocent file descriptors that are still in use. */ if (job_get_permission_fd[1] >= 0) { close(job_get_permission_fd[1]); job_get_permission_fd[1] = -1; } if (job_request_fd[0] >= 0) { close(job_request_fd[0]); job_request_fd[0] = -1; } /* reset child's child count */ child_count = 0; parent_count++; return 0; } static int request_fork(int request) { int pid = getpid(); pid = request > 0 ? pid : -pid; return write(job_request_fd[1], &pid, sizeof(pid)); } static void limited_fork_exit_handler(void) { if (is_limited_fork) request_fork(-1); } /* * Like do_fork(), but allow the child continue only after the global * job count is low enough. * * We allow the parent to continue other more important activities but * child respects the limit of global active processes. */ int do_fork_limited(void) { int child, ret; char byte; child = do_fork(); if (child) return child; /* Remember to notify the parent when we are done */ atexit(limited_fork_exit_handler); is_limited_fork = 1; pr_debug("Requesting permission to go\n"); /* Signal the parent that we are here, waiting to go */ request_fork(1); /* * The parent will tell us when we can continue. If there were * multiple children waiting for their turn to run only one * will be reading the content byte from the pipe and getting * the permission to run. */ ret = read(job_get_permission_fd[0], &byte, sizeof(byte)); if (ret == 0) pr_err("Error requesting run, did the parent die?\n"); if (ret < 0) pr_err("Job control request failure: %m\n"); if (byte < 0) { pr_info("Did not get permission to execute. Terminating\n"); /* * Avoid running exit handler, that would tell the * parent we died normally and decrement the job * counters. */ raise(SIGKILL); } pr_debug("Continuing\n"); return child; } int harvest_zombies(int pid) { int status; struct rusage rusage; char *status_str = NULL; int code = 0; if (child_count == 0) return 0; if (pid) pr_debug("Waiting on pid %d, children left: %d\n", pid, child_count); do { pid = wait4(pid, &status, 0, &rusage); if (pid < 0) { pr_err("Error on waitid(): %m\n"); return 0; } /* Wait until the child has become a zombie */ } while (!WIFEXITED(status) && !WIFSIGNALED(status)); child_count--; if (WIFEXITED(status)) { status_str = "exited with status"; code = WEXITSTATUS(status); } else if (WIFSIGNALED(status)) { status_str = "killed by signal"; code = WTERMSIG(status); } pr_debug("pid %d: %s %d. Children left: %d\n", pid, status_str, code, child_count); pr_debug("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid, (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000, (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000); return 1; } /* * Runs a command cmd with params argv, connects stdin and stdout to * readfd and writefd * * Returns the pid of the executed process */ int run_piped(const char *cmd, char *const argv[], int *stdinfd, int *stdoutfd, int *stderrfd) { int ifd[2], ofd[2], efd[2], pid; pr_info("Running command %s\n", cmd); if (stdinfd && pipe(ifd)) { pr_err("pipe() failed: %m\n"); return -1; } if (stdoutfd && pipe(ofd)) { pr_err("pipe() failed: %m\n"); return -1; } if (stderrfd && pipe(efd)) { pr_err("pipe() failed: %m\n"); return -1; } pid = do_fork(); if (pid) { /* Parent side */ if (stdinfd) { close(ifd[0]); *stdinfd = ifd[0]; } if (stdoutfd) { close(ofd[1]); *stdoutfd = ofd[0]; } if (stderrfd) { close(efd[1]); *stderrfd = efd[0]; } return pid; } if (stdinfd) { close(ifd[1]); dup2(ifd[0], STDIN_FILENO); } if (stdoutfd) { close(ofd[0]); dup2(ofd[1], STDOUT_FILENO); } if (stderrfd) { close(efd[0]); dup2(efd[1], STDERR_FILENO); } /* Now we have redirected standard streams to parent process */ execvp(cmd, argv); pr_err("Failed to execv command %s: %m\n", cmd); exit(1); return 0; } /* * Runs a command cmd with params argv, connects stdin and stdout to * readfd and writefd * * Returns the pid of the executed process */ int run_piped_stream(const char *cmd, char *const argv[], FILE **stdinf, FILE **stdoutf, FILE **stderrf) { int ifd, ofd, efd, pid; int *i, *o, *e; if (stdinf) i = &ifd; else i = 0; if (stdoutf) o = &ofd; else o = 0; if (stderrf) e = &efd; else e = 0; pid = run_piped(cmd, argv, i, o, e); if (stdinf) { *stdinf = fdopen(ifd, "r"); if (*stdinf == NULL) { pr_err("Error opening file stream for fd %d: %m\n", ifd); return -1; } } if (stdoutf) { *stdoutf = fdopen(ofd, "r"); if (*stdoutf == NULL) { pr_err("Error opening file stream for fd %d: %m\n", ofd); return -1; } } if (stderrf) { *stderrf = fdopen(efd, "r"); if (*stderrf == NULL) { pr_err("Error opening file stream for fd %d: %m\n", efd); return -1; } } return pid; } /* * Forks a child and executes a command to run on parallel */ #define max(a,b) (a) < (b) ? (b) : (a) #define BUF_SIZE (128*1024) int run(const char *cmd, char *const argv[]) { int child, error; int ofd, efd; fd_set rfds; int maxfd; int eof = 0; if ((child = do_fork())) return child; child = run_piped(cmd, argv, NULL, &ofd, &efd); FD_ZERO(&rfds); FD_SET(ofd, &rfds); FD_SET(efd, &rfds); while (!eof) { char *sptr , *eptr; char rbuf[BUF_SIZE]; int bytes; int is_stderr = 0; maxfd = max(ofd, efd); error = select(maxfd, &rfds, NULL, NULL, NULL); if (error < 0) { pr_err("Error with select: %m\n"); break; } if (FD_ISSET(ofd, &rfds)) { bytes = read(ofd, rbuf, BUF_SIZE); goto print; } if (FD_ISSET(efd, &rfds)) { is_stderr = 1; bytes = read(efd, rbuf, BUF_SIZE); goto print; } pr_err("select() returned unknown fd\n"); break; print: if (bytes < 0) { pr_err("read() failed: %m\n"); break; } /* * Workaround: When a process had die and it has only * written to stderr, select() doesn't indicate that * there might be something to read in stderr fd. To * work around this issue, we try to read stderr just * in case in order to ensure everything gets read. */ if (bytes == 0) { bytes = read(efd, rbuf, BUF_SIZE); is_stderr = 1; eof = 1; } sptr = eptr = rbuf; while (bytes--) { if (*eptr == '\n') { *eptr = 0; if (is_stderr) pr_err("%s: stderr: %s\n", cmd, sptr); else pr_info("%s: stdout: %s\n", cmd, sptr); sptr = eptr; } eptr++; } } close(ofd); close(efd); harvest_zombies(child); exit(1); return 0; } int register_event_handler(struct event_handler *handler) { struct epoll_event ev; int ret; if (handler->fd <= 0) { pr_err("Invalid file descriptor of %d\n", handler->fd); return -1; } if (!handler->handle_event) { pr_err("Handler callback missing\n"); return -1; } pr_info("Registering handler for %s, fd %d\n", handler->name, handler->fd); ev.data.fd = handler->fd; ev.data.ptr = handler; ev.events = handler->events; ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handler->fd, &ev); if (ret) { pr_err("Failed to add epoll_fd: %m\n"); return -1; } return 0; } void _mutex_lock_acquired(struct mutex *lock, char *file, int line) { lock->line = line; lock->file = file; } int _mutex_lock(struct mutex *lock, char *file, int line) { int ret = 0; if (!pthread_mutex_trylock(&lock->lock)) goto out_lock; pr_info("Lock contention on lock %s on %s:%d\n", lock->name, lock->file, lock->line); ret = pthread_mutex_lock(&lock->lock); if (ret) pr_err("Acquirin lock %s failed: %m, acquired %s:%d\n", lock->name, lock->file, lock->line); out_lock: _mutex_lock_acquired(lock, file, line); return ret; } int _mutex_unlock(struct mutex *lock) { lock->line = 0; lock->file = NULL; pthread_mutex_unlock(&lock->lock); return 0; }