-#define _GNU_SOURCE
#include <unistd.h>
#include <fcntl.h>
#include <sys/select.h>
#include "process.h"
#include "debug.h"
+#include "utils.h"
-static int child_count;
-static int parent_count;
-static int job_request_fd[2];
-static int job_get_permission_fd[2];
static int epoll_fd;
static unsigned int max_jobs;
-static unsigned int job_count;
-static unsigned int jobs_pending;
static unsigned int max_jobs_pending;
-int get_child_count(void)
-{
- return child_count;
-}
+struct work_struct {
+ const char *name;
+ int (*work_fn)(void *);
+ void *arg;
+ struct work_struct *next;
+};
-int get_parent_count(void)
-{
- return parent_count;
-}
+struct work_queue {
+ struct work_struct *work;
+ int length;
+ char *name;
+ struct mutex lock;
+};
+
+struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
+ {
+ .name = "high priority",
+ .lock = {
+ .name = "high_prio_queue",
+ .lock = PTHREAD_MUTEX_INITIALIZER,
+ },
+ },
+ {
+ .name = "low priority",
+ .lock = {
+ .name = "low_prio_queue",
+ .lock = PTHREAD_MUTEX_INITIALIZER,
+ },
+ },
+};
+
+struct mutex work_stats_mutex = {
+ .name = "work_stats",
+ .lock = PTHREAD_MUTEX_INITIALIZER,
+};
+static int workers_active;
+static int worker_count;
+
+static int job_notify_fd[2];
-static int handle_signals(struct event_handler *h)
+static int run_work_on_queue(struct work_queue *queue)
{
- struct signalfd_siginfo siginfo;
- int ret;
+ struct work_struct *work;
- ret = read(h->fd, &siginfo, sizeof(siginfo));
- if (ret < sizeof(siginfo)) {
- pr_err("Expected %zd from read, got %d: %m\n",
- sizeof(siginfo), ret);
- return -1;
- }
+ mutex_lock(&queue->lock);
- if (siginfo.ssi_signo != SIGCHLD) {
- pr_err("Unexpected signal %d, ignoring\n", siginfo.ssi_signo);
- return -1;
+ if (!queue->work) {
+ mutex_unlock(&queue->lock);
+ return 0;
}
- harvest_zombies(siginfo.ssi_pid);
+ /* Take next work */
+ work = queue->work;
+ queue->work = work->next;
+ queue->length--;
- return 0;
+ mutex_unlock(&queue->lock);
+
+ pr_info("Executing work %s from queue %s, %d still pending\n",
+ work->name, queue->name, queue->length);
+
+ work->work_fn(work->arg);
+ pr_info("Work %s done\n", work->name);
+ free(work);
+
+ return 1;
}
-static int grant_new_job(void)
+static void *worker_thread(void *arg)
{
int ret;
- char byte = 0;
+ char name[16];
- job_count++;
- pr_info("Granting new job. %d jobs currently and %d pending\n",
- job_count, jobs_pending);
+ mutex_lock(&work_stats_mutex);
+ snprintf(name, sizeof(name), "worker%d", worker_count);
+ worker_count++;
+ mutex_unlock(&work_stats_mutex);
- ret = write(job_get_permission_fd[1], &byte, 1);
- if (ret != 1) {
- pr_err("Failed to write 1 byte: %m\n");
- return -1;
- }
+ pthread_setname_np(pthread_self(), name);
+ pthread_detach(pthread_self());
- return 0;
-}
+ pr_info("Worker started\n");
-static int deny_job(void)
-{
- int ret;
- char byte = -1;
+ /* Execute all high priority work from the queue */
+ while (run_work_on_queue(&work_queues[WORK_PRIORITY_HIGH]))
+ ;
+ /*
+ * All high priority work is now done, see if we have enough
+ * workers executing low priority worl. Continue from there if
+ * needed.
+ */
+ mutex_lock(&work_stats_mutex);
+ if (workers_active > max_jobs)
+ goto out_unlock;
- pr_info("Denying new job. %d jobs currently and %d pending, "
- "limit of pending jobs is %d\n",
- job_count, jobs_pending, max_jobs_pending);
+ mutex_unlock(&work_stats_mutex);
- ret = write(job_get_permission_fd[1], &byte, 1);
- if (ret != 1) {
- pr_err("Failed to write 1 byte: %m\n");
- return -1;
- }
+ /*
+ * Start executing the low priority work. Drop the nice value
+ * as this really is low priority stuff
+ */
+ ret = nice(19);
+ pr_info("Worker priority dropped to %d\n", ret);
- return 0;
+ while (run_work_on_queue(&work_queues[WORK_PRIORITY_LOW]))
+ ;
+
+ /* All done, exit */
+ mutex_lock(&work_stats_mutex);
+out_unlock:
+ workers_active--;
+ pr_info("Worker exiting, %d left active\n",
+ workers_active);
+
+ /*
+ * Last exiting worker zeroes the worker_count. This
+ * ensures next time we start spawning worker threads
+ * the first thread will have number zero on its name.
+ */
+ if (!workers_active)
+ worker_count = 0;
+
+ /*
+ * Kick the job poller. If all jobs were active at this point
+ * the scheduler thread will wait indefinitely until someone
+ * tells it to do something. We may now know when next job is
+ * available, so it is better for the scheduler to recalculate
+ * its sleep time.
+ */
+ notify_job_request();
+
+ mutex_unlock(&work_stats_mutex);
+
+ return NULL;
}
-static int handle_job_request(struct event_handler *h)
+int queue_work(unsigned int priority, char *name,
+ int (work_fn)(void *arg), void *arg)
{
- int ret, pid;
+ pthread_t *thread;
+ struct work_queue *queue;
+ struct work_struct *work, *last_work;
- ret = read(job_request_fd[0], &pid, sizeof(pid));
- if (ret < 0) {
- pr_err("Failed to read: %m\n");
- return -1;
+ if (priority >= WORK_PRIORITIES_NUM) {
+ pr_err("Invalid priority: %d\n", priority);
+ return -EINVAL;
}
- if (ret == 0) {
- pr_info("Read zero bytes\n");
+ work = calloc(sizeof(*work), 1);
+
+ work->name = name;
+ work->work_fn = work_fn;
+ work->arg = arg;
+
+ queue = &work_queues[priority];
+
+ /* Insert new work at the end of the work queue */
+ mutex_lock(&queue->lock);
+
+ last_work = queue->work;
+ while (last_work && last_work->next)
+ last_work = last_work->next;
+
+ if (!last_work)
+ queue->work = work;
+ else
+ last_work->next = work;
+
+ pr_info("Inserted work %s in queue %s, with %d pending items\n",
+ work->name, queue->name, queue->length);
+ queue->length++;
+ mutex_unlock(&queue->lock);
+
+ mutex_lock(&work_stats_mutex);
+ pr_info("workers_active: %d, priority: %d\n", workers_active, priority);
+ if (priority != WORK_PRIORITY_HIGH && workers_active >= max_jobs) {
+ mutex_unlock(&work_stats_mutex);
return 0;
}
+ workers_active++;
+ mutex_unlock(&work_stats_mutex);
- if (pid > 0) {
- if (job_count >= max_jobs) {
- if (jobs_pending < max_jobs_pending)
- jobs_pending++;
- else
- deny_job();
- } else {
- ret = grant_new_job();
- return 0;
- }
- } else if (pid < 0) {
- if (job_count > max_jobs)
- pr_err("BUG: Job %u jobs exceeded limit %u\n",
- job_count, max_jobs);
-
- pr_info("Job %d finished\n", -pid);
- job_count--;
- if (jobs_pending) {
- jobs_pending--;
- ret = grant_new_job();
- return 0;
- }
- }
+ pr_info("Creating new worker thread\n");
+ /* We need a worker thread, create one */
+ thread = calloc(sizeof(*thread), 1);
+ pthread_create(thread, NULL, worker_thread, NULL);
+
+ free(thread);
return 0;
}
-struct event_handler signal_handler = {
- .handle_event = handle_signals,
- .events = EPOLLIN,
- .name = "signal",
-};
+static int job_notify_handler(struct event_handler *h)
+{
+ int ret;
+ char buf[64];
-struct event_handler job_request_handler = {
- .handle_event = handle_job_request,
- .events = EPOLLIN,
- .name = "job_request",
-};
+ ret = read(job_notify_fd[0], buf, sizeof(buf));
+ if (ret < 0)
+ pr_err("Failed to read: %m\n");
+
+ return 0;
+}
/*
* Initialize the jobcontrol.
*/
int init_jobcontrol(int max_jobs_requested)
{
+ static struct event_handler ev;
FILE *file;
int ret;
- sigset_t sigmask;
char buf[256];
char match[8];
- if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
- pr_err("Failed to create pipe: %m\n");
- return -1;
- }
-
- if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
- pr_err("Failed to create pipe: %m\n");
- return -1;
- }
-
epoll_fd = epoll_create(1);
if (epoll_fd == -1) {
pr_err("Failed to epoll_create(): %m\n");
return -1;
}
- job_request_handler.fd = job_request_fd[0];
- register_event_handler(&job_request_handler);
-
- sigemptyset(&sigmask);
- sigaddset(&sigmask, SIGCHLD);
-
- /* Block SIGCHLD so that it becomes readable via signalfd */
- ret = sigprocmask(SIG_BLOCK, &sigmask, NULL);
- if (ret < 0) {
- pr_err("Failed to sigprocmask: %m\n");
- }
-
- signal_handler.fd = signalfd(-1, &sigmask, SFD_CLOEXEC);
- if (job_request_handler.fd < 0) {
- pr_err("Failed to create signal_fd: %m\n");
- return -1;
- }
-
- register_event_handler(&signal_handler);
-
if (max_jobs_requested > 0) {
max_jobs = max_jobs_requested;
goto no_count_cpus;
max_jobs++;
}
+ ret = pipe(job_notify_fd);
+ if (ret) {
+ pr_err("pipe() failed: %m\n");
+ return ret;
+ }
+
+ ev.fd = job_notify_fd[0];
+ ev.events = EPOLLIN;
+ ev.handle_event = job_notify_handler;
+ ev.name = "job_notify";
+ register_event_handler(&ev, EPOLL_CTL_ADD);
+
open_fail:
read_fail:
fclose(file);
no_count_cpus:
pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
- max_jobs_pending = max_jobs * 50 + 25;
+ max_jobs_pending = max_jobs * 10 + 25;
pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
return 0;
job_handler->handle_event(job_handler);
out:
- pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
+ pr_info("Workers active: %u\n", workers_active);
return ret;
}
-/*
- * Per process flag indicating whether this child has requested fork
- * limiting. If it has, it must also tell the master parent when it
- * has died so that the parent can give next pending job permission to
- * go.
- */
-static int is_limited_fork;
+int notify_job_request(void)
+{
+ int ret;
+ char byte = 0;
+
+ ret = write(job_notify_fd[1], &byte, sizeof(byte));
+ if (ret < 0)
+ pr_err("Failed to write: %m\n");
+
+ return 0;
+}
int do_fork(void)
{
}
if (child) {
- child_count++;
- pr_debug("Fork %d, child %d\n", child_count, child);
+ pr_debug("Fork child %d\n", child);
return child;
}
- /*
- * Also do not notify the master parent the death of this
- * child. Only childs that have been created with
- * do_fork_limited() can have this flag set.
- */
- is_limited_fork = 0;
-
- /*
- * Close unused ends of the job control pipes. Only the parent
- * which controls the jobs may have the write end open of the
- * job_get_permission_fd and the read end of the
- * job_request_fd. Failing to close the pipe ends properly
- * will cause the childs to wait forever for the run
- * permission in case parent dies prematurely.
- *
- * Note! The file descriptor must be closed once and only
- * once. They are marked to -1 to make it impossible for
- * subseqent do_fork() calls from closing them again (in which
- * case some other file descriptor might already be reserved
- * for the same number) and prevent accidentally closing some
- * innocent file descriptors that are still in use.
- */
- if (job_get_permission_fd[1] >= 0) {
- close(job_get_permission_fd[1]);
- job_get_permission_fd[1] = -1;
- }
- if (job_request_fd[0] >= 0) {
- close(job_request_fd[0]);
- job_request_fd[0] = -1;
- }
-
- /* reset child's child count */
- child_count = 0;
- parent_count++;
return 0;
}
-static int request_fork(int request)
-{
- int pid = getpid();
-
- pid = request > 0 ? pid : -pid;
-
- return write(job_request_fd[1], &pid, sizeof(pid));
-}
-
-static void limited_fork_exit_handler(void)
-{
- if (is_limited_fork)
- request_fork(-1);
-}
-
-/*
- * Like do_fork(), but allow the child continue only after the global
- * job count is low enough.
- *
- * We allow the parent to continue other more important activities but
- * child respects the limit of global active processes.
- */
-int do_fork_limited(void)
-{
- int child, ret;
- char byte;
-
- child = do_fork();
- if (child)
- return child;
-
- /* Remember to notify the parent when we are done */
- atexit(limited_fork_exit_handler);
- is_limited_fork = 1;
-
- pr_debug("Requesting permission to go\n");
-
- /* Signal the parent that we are here, waiting to go */
- request_fork(1);
-
- /*
- * The parent will tell us when we can continue. If there were
- * multiple children waiting for their turn to run only one
- * will be reading the content byte from the pipe and getting
- * the permission to run.
- */
- ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
- if (ret == 0)
- pr_err("Error requesting run, did the parent die?\n");
-
- if (ret < 0)
- pr_err("Job control request failure: %m\n");
-
- if (byte < 0) {
- pr_info("Did not get permission to execute. Terminating\n");
-
- /*
- * Avoid running exit handler, that would tell the
- * parent we died normally and decrement the job
- * counters.
- */
- raise(SIGKILL);
- }
-
- pr_debug("Continuing\n");
- return child;
-}
-
-int harvest_zombies(int pid)
+int clear_zombie(int pid)
{
int status;
struct rusage rusage;
char *status_str = NULL;
int code = 0;
- if (child_count == 0)
- return 0;
-
if (pid)
- pr_debug("Waiting on pid %d, children left: %d\n", pid,
- child_count);
+ pr_debug("Waiting on pid %d\n", pid);
do {
pid = wait4(pid, &status, 0, &rusage);
/* Wait until the child has become a zombie */
} while (!WIFEXITED(status) && !WIFSIGNALED(status));
- child_count--;
if (WIFEXITED(status)) {
status_str = "exited with status";
code = WEXITSTATUS(status);
status_str = "killed by signal";
code = WTERMSIG(status);
}
- pr_debug("pid %d: %s %d. Children left: %d\n", pid,
- status_str, code, child_count);
+ pr_debug("pid %d: %s %d.\n", pid,
+ status_str, code);
pr_debug("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
(long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
(long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
if (stdoutfd && pipe(ofd)) {
pr_err("pipe() failed: %m\n");
- return -1;
+ goto close_ifd;
}
if (stderrfd && pipe(efd)) {
pr_err("pipe() failed: %m\n");
- return -1;
+ goto close_ofd;
}
pid = do_fork();
exit(1);
return 0;
+
+close_ofd:
+ if (stdoutfd) {
+ close(ofd[0]);
+ close(ofd[1]);
+ }
+close_ifd:
+ if (stdinfd) {
+ close(ifd[0]);
+ close(ifd[1]);
+ }
+
+ return -1;
}
/*
if (stdinf)
i = &ifd;
- else
+ else
i = 0;
if (stdoutf)
o = &ofd;
- else
+ else
o = 0;
if (stderrf)
e = &efd;
- else
+ else
e = 0;
pid = run_piped(cmd, argv, i, o, e);
if (stdinf) {
- *stdinf = fdopen(ifd, "r");
+ *stdinf = fdopen(ifd, "w");
if (*stdinf == NULL) {
pr_err("Error opening file stream for fd %d: %m\n",
ifd);
* Forks a child and executes a command to run on parallel
*/
-#define max(a,b) (a) < (b) ? (b) : (a)
#define BUF_SIZE (128*1024)
int run(const char *cmd, char *const argv[])
{
int maxfd;
int eof = 0;
- if ((child = do_fork()))
- return child;
-
child = run_piped(cmd, argv, NULL, &ofd, &efd);
FD_ZERO(&rfds);
}
sptr = eptr = rbuf;
- while(bytes--) {
+ while (bytes--) {
if (*eptr == '\n') {
*eptr = 0;
if (is_stderr)
else
pr_info("%s: stdout: %s\n",
cmd, sptr);
- sptr = eptr;
+ sptr = eptr + 1;
}
eptr++;
}
close(ofd);
close(efd);
- harvest_zombies(child);
+ clear_zombie(child);
- exit(1);
return 0;
}
-int register_event_handler(struct event_handler *handler)
+int register_event_handler(struct event_handler *handler, int op)
{
struct epoll_event ev;
int ret;
+ const char *str;
if (handler->fd <= 0) {
pr_err("Invalid file descriptor of %d\n", handler->fd);
return -1;
}
- pr_info("Registering handler for %s, fd %d\n",
+ switch (op) {
+ case EPOLL_CTL_ADD:
+ str = "register";
+ break;
+
+ case EPOLL_CTL_MOD:
+ str = "modify";
+ break;
+
+ case EPOLL_CTL_DEL:
+ str = "deregister";
+ break;
+
+ default:
+ pr_err("Invalid op %d\n", op);
+ return -1;
+ }
+
+ pr_info("Doing a epoll %s for handler %s, fd %d\n", str,
handler->name, handler->fd);
ev.data.fd = handler->fd;
ev.data.ptr = handler;
ev.events = handler->events;
- ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handler->fd, &ev);
+ ret = epoll_ctl(epoll_fd, op, handler->fd, &ev);
if (ret) {
- pr_err("Failed to add epoll_fd: %m\n");
+ pr_err("Failed to do epoll_ctl %s: %m\n", str);
return -1;
}
return 0;
}
+
+void _mutex_lock_acquired(struct mutex *lock, char *file, int line)
+{
+ lock->line = line;
+ lock->file = file;
+ pthread_getname_np(pthread_self(),
+ lock->owner_name, sizeof(lock->owner_name));
+}
+
+int _mutex_lock(struct mutex *lock, char *file, int line)
+{
+ int ret = 0;
+ int contended = 0;
+
+ if (!pthread_mutex_trylock(&lock->lock))
+ goto out_lock;
+
+ contended = 1;
+ pr_debug("Lock contention at %s:%d on lock %s acquired by %s at %s:%d\n",
+ file, line, lock->name,
+ lock->owner_name, lock->file, lock->line);
+
+ ret = pthread_mutex_lock(&lock->lock);
+ if (ret)
+ pr_err("Acquirin lock %s failed: %m, acquired on %s:%d\n",
+ lock->name, lock->file, lock->line);
+
+out_lock:
+ if (contended)
+ pr_debug("Lock %s acquired at %s:%d after contention\n",
+ lock->name, file, line);
+ _mutex_lock_acquired(lock, file, line);
+ return ret;
+}
+
+int _mutex_unlock(struct mutex *lock)
+{
+ lock->line = 0;
+ lock->file = NULL;
+ pthread_mutex_unlock(&lock->lock);
+
+ return 0;
+}