+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/select.h>
+#include <sys/epoll.h>
+#include <stdio.h>
+#include <sys/wait.h>
+#include <sys/signalfd.h>
+#include <sys/resource.h>
+
#include "process.h"
#include "debug.h"
+#include "utils.h"
-int child_count = 0;
+static int epoll_fd;
+static unsigned int max_jobs;
+static unsigned int max_jobs_pending;
-int do_fork(void)
+struct work_struct {
+ const char *name;
+ int (*work_fn)(void *);
+ void *arg;
+ struct work_struct *next;
+};
+
+struct work_queue {
+ struct work_struct *work;
+ int length;
+ char *name;
+ struct mutex lock;
+};
+
+struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
+ {
+ .name = "high priority",
+ .lock = {
+ .name = "high_prio_queue",
+ .lock = PTHREAD_MUTEX_INITIALIZER,
+ },
+ },
+ {
+ .name = "low priority",
+ .lock = {
+ .name = "low_prio_queue",
+ .lock = PTHREAD_MUTEX_INITIALIZER,
+ },
+ },
+};
+
+struct mutex work_stats_mutex = {
+ .name = "work_stats",
+ .lock = PTHREAD_MUTEX_INITIALIZER,
+};
+static int workers_active;
+static int worker_count;
+
+static int job_notify_fd[2];
+
+static int run_work_on_queue(struct work_queue *queue)
{
- int child, error;
- child = fork();
- if (child < 0) {
- error = errno;
- pr_err("fork() failed: %s\n", strerror(error));
- return -1;
+ struct work_struct *work;
+
+ mutex_lock(&queue->lock);
+
+ if (!queue->work) {
+ mutex_unlock(&queue->lock);
+ return 0;
}
- if (child) {
- child_count++;
- pr_info("Fork %d, child %d\n", child_count, child);
- return child;
+ /* Take next work */
+ work = queue->work;
+ queue->work = work->next;
+ queue->length--;
+
+ mutex_unlock(&queue->lock);
+
+ pr_info("Executing work %s from queue %s, %d still pending\n",
+ work->name, queue->name, queue->length);
+
+ work->work_fn(work->arg);
+ pr_info("Work %s done\n", work->name);
+ free(work);
+
+ return 1;
+}
+
+static void *worker_thread(void *arg)
+{
+ int ret;
+ char name[16];
+
+ mutex_lock(&work_stats_mutex);
+ snprintf(name, sizeof(name), "worker%d", worker_count);
+ worker_count++;
+ mutex_unlock(&work_stats_mutex);
+
+ pthread_setname_np(pthread_self(), name);
+ pthread_detach(pthread_self());
+
+ pr_info("Worker started\n");
+
+ /* Execute all high priority work from the queue */
+ while (run_work_on_queue(&work_queues[WORK_PRIORITY_HIGH]))
+ ;
+ /*
+ * All high priority work is now done, see if we have enough
+ * workers executing low priority worl. Continue from there if
+ * needed.
+ */
+ mutex_lock(&work_stats_mutex);
+ if (workers_active > max_jobs)
+ goto out_unlock;
+
+ mutex_unlock(&work_stats_mutex);
+
+ /*
+ * Start executing the low priority work. Drop the nice value
+ * as this really is low priority stuff
+ */
+ ret = nice(19);
+ pr_info("Worker priority dropped to %d\n", ret);
+
+ while (run_work_on_queue(&work_queues[WORK_PRIORITY_LOW]))
+ ;
+
+ /* All done, exit */
+ mutex_lock(&work_stats_mutex);
+out_unlock:
+ workers_active--;
+ pr_info("Worker exiting, %d left active\n",
+ workers_active);
+
+ /*
+ * Last exiting worker zeroes the worker_count. This
+ * ensures next time we start spawning worker threads
+ * the first thread will have number zero on its name.
+ */
+ if (!workers_active) {
+ worker_count = 0;
+
+ /*
+ * Kick the job poller, just to print the time of next
+ * update on the logs
+ */
+ notify_job_request();
+ }
+
+ mutex_unlock(&work_stats_mutex);
+
+ return NULL;
+}
+
+int queue_work(unsigned int priority, char *name,
+ int (work_fn)(void *arg), void *arg)
+{
+ pthread_t *thread;
+ struct work_queue *queue;
+ struct work_struct *work, *last_work;
+
+ if (priority >= WORK_PRIORITIES_NUM) {
+ pr_err("Invalid priority: %d\n", priority);
+ return -EINVAL;
}
- /* reset child's child count */
- child_count = 0;
+ work = calloc(sizeof(*work), 1);
+
+ work->name = name;
+ work->work_fn = work_fn;
+ work->arg = arg;
+
+ queue = &work_queues[priority];
+
+ /* Insert new work at the end of the work queue */
+ mutex_lock(&queue->lock);
+
+ last_work = queue->work;
+ while (last_work && last_work->next)
+ last_work = last_work->next;
+
+ if (!last_work)
+ queue->work = work;
+ else
+ last_work->next = work;
+
+ pr_info("Inserted work %s in queue %s, with %d pending items\n",
+ work->name, queue->name, queue->length);
+ queue->length++;
+ mutex_unlock(&queue->lock);
+
+ mutex_lock(&work_stats_mutex);
+ pr_info("workers_active: %d, priority: %d\n", workers_active, priority);
+ if (priority != WORK_PRIORITY_HIGH && workers_active >= max_jobs) {
+ mutex_unlock(&work_stats_mutex);
+ return 0;
+ }
+ workers_active++;
+ mutex_unlock(&work_stats_mutex);
+
+ pr_info("Creating new worker thread\n");
+ /* We need a worker thread, create one */
+ thread = calloc(sizeof(*thread), 1);
+ pthread_create(thread, NULL, worker_thread, NULL);
+
+ free(thread);
+
return 0;
}
-int run(const char *cmd, char *const argv[])
+static int job_notify_handler(struct event_handler *h)
{
- int child, error;
- if ((child = do_fork()))
- return child;
+ int ret;
+ char buf[64];
+
+ ret = read(job_notify_fd[0], buf, sizeof(buf));
+ if (ret < 0)
+ pr_err("Failed to read: %m\n");
- execvp(cmd, argv);
- error = errno;
- pr_err("Failed to execv command %s: %s\n", cmd, strerror(error));
- exit(1);
return 0;
}
-int harvest_zombies(int pid)
+/*
+ * Initialize the jobcontrol.
+ *
+ * Create the pipes that are used to grant children execution
+ * permissions. If max_jobs is zero, count the number of CPUs from
+ * /proc/cpuinfo and use that.
+ */
+int init_jobcontrol(int max_jobs_requested)
{
- int status, error;
+ static struct event_handler ev;
+ FILE *file;
+ int ret;
+ char buf[256];
+ char match[8];
+
+ epoll_fd = epoll_create(1);
+ if (epoll_fd == -1) {
+ pr_err("Failed to epoll_create(): %m\n");
+ return -1;
+ }
+
+ if (max_jobs_requested > 0) {
+ max_jobs = max_jobs_requested;
+ goto no_count_cpus;
+ }
+ max_jobs++;
+
+ file = fopen("/proc/cpuinfo", "ro");
+ if (!file) {
+ pr_err("Failed to open /proc/cpuinfo: %m\n");
+ goto open_fail;
+ }
+
+ /*
+ * The CPU count algorithm simply reads the first 8 bytes from
+ * the /proc/cpuinfo and then expects that line to be there as
+ * many times as there are CPUs.
+ */
+ ret = fread(match, 1, sizeof(match), file);
+ if (ret < sizeof(match)) {
+ pr_err("read %d bytes when expecting %zd %m\n",
+ ret, sizeof(match));
+ goto read_fail;
+ }
+
+ while(fgets(buf, sizeof(buf), file)) {
+ if (!strncmp(buf, match, sizeof(match)))
+ max_jobs++;
+ }
+
+ ret = pipe(job_notify_fd);
+ if (ret) {
+ pr_err("pipe() failed: %m\n");
+ return ret;
+ }
+
+ ev.fd = job_notify_fd[0];
+ ev.events = EPOLLIN;
+ ev.handle_event = job_notify_handler;
+ ev.name = "job_notify";
+ register_event_handler(&ev, EPOLL_CTL_ADD);
+
+open_fail:
+read_fail:
+ fclose(file);
+
+no_count_cpus:
+ pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
+
+ max_jobs_pending = max_jobs * 10 + 25;
+ pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
+
+ return 0;
+}
- if (child_count == 0)
+int poll_job_requests(int timeout)
+{
+ struct epoll_event event;
+ struct event_handler *job_handler;
+ int ret;
+
+ /* Convert positive seconds to milliseconds */
+ timeout = timeout > 0 ? 1000 * timeout : timeout;
+
+ ret = epoll_wait(epoll_fd, &event, 1, timeout);
+
+ if (ret == -1) {
+ if (errno != EINTR) {
+ pr_err("epoll_wait: %m\n");
+ return -1;
+ }
+
+ /*
+ * If epoll_wait() was interrupted, better start
+ * everything again from the beginning
+ */
return 0;
+ }
- if (pid)
- pr_info("Waiting on pid %d, children left: %d\n", pid,
- child_count);
+ if (ret == 0) {
+ pr_info("Timed out\n");
+ goto out;
+ }
- pid = waitpid(pid, &status, 0);
- if (pid < 0) {
- error = errno;
- pr_err("Error on wait(): %s\n", strerror(error));
- child_count--; /* Decrement child count anyway */
+ job_handler = event.data.ptr;
+
+ if (!job_handler || !job_handler->handle_event) {
+ pr_err("Corrupted event handler for fd %d\n",
+ event.data.fd);
+ goto out;
}
- else {
- child_count--;
- pr_info("pid %d: exit code %d. Children left: %d\n", pid,
- status, child_count);
+
+ pr_debug("Running handler %s to handle events from fd %d\n",
+ job_handler->name, job_handler->fd);
+ job_handler->handle_event(job_handler);
+
+out:
+ pr_info("Workers active: %u\n", workers_active);
+ return ret;
+}
+
+int notify_job_request(void)
+{
+ int ret;
+ char byte = 0;
+
+ ret = write(job_notify_fd[1], &byte, sizeof(byte));
+ if (ret < 0)
+ pr_err("Failed to write: %m\n");
+
+ return 0;
+}
+
+int do_fork(void)
+{
+ int child;
+ child = fork();
+ if (child < 0) {
+ pr_err("fork() failed: %m\n");
+ return -1;
}
+ if (child) {
+ pr_debug("Fork child %d\n", child);
+ return child;
+ }
+
+ return 0;
+}
+
+int clear_zombie(int pid)
+{
+ int status;
+ struct rusage rusage;
+ char *status_str = NULL;
+ int code = 0;
+
+ if (pid)
+ pr_debug("Waiting on pid %d\n", pid);
+
+ do {
+ pid = wait4(pid, &status, 0, &rusage);
+ if (pid < 0) {
+ pr_err("Error on waitid(): %m\n");
+ return 0;
+ }
+ /* Wait until the child has become a zombie */
+ } while (!WIFEXITED(status) && !WIFSIGNALED(status));
+
+ if (WIFEXITED(status)) {
+ status_str = "exited with status";
+ code = WEXITSTATUS(status);
+ } else if (WIFSIGNALED(status)) {
+ status_str = "killed by signal";
+ code = WTERMSIG(status);
+ }
+ pr_debug("pid %d: %s %d.\n", pid,
+ status_str, code);
+ pr_debug("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
+ (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
+ (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
+
return 1;
}
*
* Returns the pid of the executed process
*/
-int run_piped(const char *cmd, char *const argv[], int *readfd, int *writefd)
+int run_piped(const char *cmd, char *const argv[],
+ int *stdinfd, int *stdoutfd, int *stderrfd)
{
- int rfd[2], wfd[2], error, pid;
+ int ifd[2], ofd[2], efd[2], pid;
+
+ pr_info("Running command %s\n", cmd);
- if (pipe(rfd)) {
- error = errno;
- pr_err("pipe() failed: %s\n", strerror(error));
+ if (stdinfd && pipe(ifd)) {
+ pr_err("pipe() failed: %m\n");
return -1;
}
- if (pipe(wfd)) {
- error = errno;
- pr_err("pipe() failed: %s\n", strerror(error));
- return -1;
+ if (stdoutfd && pipe(ofd)) {
+ pr_err("pipe() failed: %m\n");
+ goto close_ifd;
+ }
+
+ if (stderrfd && pipe(efd)) {
+ pr_err("pipe() failed: %m\n");
+ goto close_ofd;
}
pid = do_fork();
- if (pid) {
- close(rfd[1]);
- close(wfd[0]);
- *readfd = rfd[0];
- *writefd = wfd[1];
+ if (pid) { /* Parent side */
+ if (stdinfd) {
+ close(ifd[0]);
+ *stdinfd = ifd[0];
+ }
+
+ if (stdoutfd) {
+ close(ofd[1]);
+ *stdoutfd = ofd[0];
+ }
+
+ if (stderrfd) {
+ close(efd[1]);
+ *stderrfd = efd[0];
+ }
+
return pid;
}
- close(rfd[0]);
- close(wfd[1]);
- dup2(wfd[0], STDIN_FILENO);
- dup2(rfd[1], STDOUT_FILENO);
+ if (stdinfd) {
+ close(ifd[1]);
+ dup2(ifd[0], STDIN_FILENO);
+ }
- /* Now we have redirected both stdin and stdout to parent process */
+ if (stdoutfd) {
+ close(ofd[0]);
+ dup2(ofd[1], STDOUT_FILENO);
+ }
+
+ if (stderrfd) {
+ close(efd[0]);
+ dup2(efd[1], STDERR_FILENO);
+ }
+
+ /* Now we have redirected standard streams to parent process */
execvp(cmd, argv);
- error = errno;
- pr_err("Failed to execv command %s: %s\n", cmd, strerror(error));
+ pr_err("Failed to execv command %s: %m\n", cmd);
exit(1);
+
return 0;
+
+close_ofd:
+ if (stdoutfd) {
+ close(ofd[0]);
+ close(ofd[1]);
+ }
+close_ifd:
+ if (stdinfd) {
+ close(ifd[0]);
+ close(ifd[1]);
+ }
+
+ return -1;
}
/*
* Returns the pid of the executed process
*/
int run_piped_stream(const char *cmd, char *const argv[],
- FILE **readf, FILE **writef)
+ FILE **stdinf, FILE **stdoutf, FILE **stderrf)
{
- int rfd, wfd, pid, error;
+ int ifd, ofd, efd, pid;
+ int *i, *o, *e;
+
+ if (stdinf)
+ i = &ifd;
+ else
+ i = 0;
+ if (stdoutf)
+ o = &ofd;
+ else
+ o = 0;
+ if (stderrf)
+ e = &efd;
+ else
+ e = 0;
+
+ pid = run_piped(cmd, argv, i, o, e);
- pid = run_piped(cmd, argv, &rfd, &wfd);
+ if (stdinf) {
+ *stdinf = fdopen(ifd, "r");
+ if (*stdinf == NULL) {
+ pr_err("Error opening file stream for fd %d: %m\n",
+ ifd);
+ return -1;
+ }
+ }
- if (readf) {
- *readf = fdopen(rfd, "r");
- if (*readf == NULL) {
- error = errno;
- pr_err("Error opening file stream for fd %d: %s\n",
- rfd, strerror(error));
+ if (stdoutf) {
+ *stdoutf = fdopen(ofd, "w");
+ if (*stdoutf == NULL) {
+ pr_err("Error opening file stream for fd %d: %m\n",
+ ofd);
return -1;
}
}
- if (writef) {
- *writef = fdopen(wfd, "w");
- if (*writef == NULL) {
- error = errno;
- pr_err("Error opening file stream for fd %d: %s\n",
- wfd, strerror(error));
+ if (stderrf) {
+ *stderrf = fdopen(efd, "r");
+ if (*stderrf == NULL) {
+ pr_err("Error opening file stream for fd %d: %m\n",
+ efd);
return -1;
}
}
return pid;
}
+
+/*
+ * Forks a child and executes a command to run on parallel
+ */
+
+#define BUF_SIZE (128*1024)
+int run(const char *cmd, char *const argv[])
+{
+ int child, error;
+ int ofd, efd;
+ fd_set rfds;
+ int maxfd;
+ int eof = 0;
+
+ child = run_piped(cmd, argv, NULL, &ofd, &efd);
+
+ FD_ZERO(&rfds);
+ FD_SET(ofd, &rfds);
+ FD_SET(efd, &rfds);
+
+ while (!eof) {
+ char *sptr , *eptr;
+ char rbuf[BUF_SIZE];
+ int bytes;
+ int is_stderr = 0;
+
+ maxfd = max(ofd, efd);
+ error = select(maxfd, &rfds, NULL, NULL, NULL);
+
+ if (error < 0) {
+ pr_err("Error with select: %m\n");
+ break;
+ }
+
+ if (FD_ISSET(ofd, &rfds)) {
+ bytes = read(ofd, rbuf, BUF_SIZE);
+ goto print;
+ }
+
+ if (FD_ISSET(efd, &rfds)) {
+ is_stderr = 1;
+ bytes = read(efd, rbuf, BUF_SIZE);
+ goto print;
+ }
+
+ pr_err("select() returned unknown fd\n");
+ break;
+
+print:
+ if (bytes < 0) {
+ pr_err("read() failed: %m\n");
+ break;
+ }
+
+ /*
+ * Workaround: When a process had die and it has only
+ * written to stderr, select() doesn't indicate that
+ * there might be something to read in stderr fd. To
+ * work around this issue, we try to read stderr just
+ * in case in order to ensure everything gets read.
+ */
+ if (bytes == 0) {
+ bytes = read(efd, rbuf, BUF_SIZE);
+ is_stderr = 1;
+ eof = 1;
+ }
+
+ sptr = eptr = rbuf;
+ while (bytes--) {
+ if (*eptr == '\n') {
+ *eptr = 0;
+ if (is_stderr)
+ pr_err("%s: stderr: %s\n",
+ cmd, sptr);
+ else
+ pr_info("%s: stdout: %s\n",
+ cmd, sptr);
+ sptr = eptr + 1;
+ }
+ eptr++;
+ }
+ }
+
+ close(ofd);
+ close(efd);
+
+ clear_zombie(child);
+
+ return 0;
+}
+
+int register_event_handler(struct event_handler *handler, int op)
+{
+ struct epoll_event ev;
+ int ret;
+ const char *str;
+
+ if (handler->fd <= 0) {
+ pr_err("Invalid file descriptor of %d\n", handler->fd);
+ return -1;
+ }
+
+ if (!handler->handle_event) {
+ pr_err("Handler callback missing\n");
+ return -1;
+ }
+
+ switch (op) {
+ case EPOLL_CTL_ADD:
+ str = "register";
+ break;
+
+ case EPOLL_CTL_MOD:
+ str = "modify";
+ break;
+
+ case EPOLL_CTL_DEL:
+ str = "deregister";
+ break;
+
+ default:
+ pr_err("Invalid op %d\n", op);
+ return -1;
+ }
+
+ pr_info("Doing a epoll %s for handler %s, fd %d\n", str,
+ handler->name, handler->fd);
+
+ ev.data.fd = handler->fd;
+ ev.data.ptr = handler;
+ ev.events = handler->events;
+ ret = epoll_ctl(epoll_fd, op, handler->fd, &ev);
+ if (ret) {
+ pr_err("Failed to do epoll_ctl %s: %m\n", str);
+ return -1;
+ }
+
+ return 0;
+}
+
+void _mutex_lock_acquired(struct mutex *lock, char *file, int line)
+{
+ lock->line = line;
+ lock->file = file;
+ pthread_getname_np(pthread_self(),
+ lock->owner_name, sizeof(lock->owner_name));
+}
+
+int _mutex_lock(struct mutex *lock, char *file, int line)
+{
+ int ret = 0;
+ int contended = 0;
+
+ if (!pthread_mutex_trylock(&lock->lock))
+ goto out_lock;
+
+ contended = 1;
+ pr_debug("Lock contention at %s:%d on lock %s acquired by %s at %s:%d\n",
+ file, line, lock->name,
+ lock->owner_name, lock->file, lock->line);
+
+ ret = pthread_mutex_lock(&lock->lock);
+ if (ret)
+ pr_err("Acquirin lock %s failed: %m, acquired on %s:%d\n",
+ lock->name, lock->file, lock->line);
+
+out_lock:
+ if (contended)
+ pr_debug("Lock %s acquired at %s:%d after contention\n",
+ lock->name, file, line);
+ _mutex_lock_acquired(lock, file, line);
+ return ret;
+}
+
+int _mutex_unlock(struct mutex *lock)
+{
+ lock->line = 0;
+ lock->file = NULL;
+ pthread_mutex_unlock(&lock->lock);
+
+ return 0;
+}