#include "process.h"
#include "debug.h"
+#include "utils.h"
static int epoll_fd;
static unsigned int max_jobs;
-static unsigned int job_count;
-static unsigned int jobs_pending;
static unsigned int max_jobs_pending;
struct work_struct {
},
};
-struct mutex work_pending_mutex = {
- .name = "work_pending",
+struct mutex work_stats_mutex = {
+ .name = "work_stats",
.lock = PTHREAD_MUTEX_INITIALIZER,
};
-pthread_cond_t work_pending_cond = PTHREAD_COND_INITIALIZER;
+static int workers_active;
+static int worker_count;
+
+static int job_notify_fd[2];
static int run_work_on_queue(struct work_queue *queue)
{
queue->work = work->next;
queue->length--;
- /*
- * If queue is not empty, try waking up more workers. It is
- * possible that when work were queued, the first worker did
- * not wake up soon enough and
- */
- if (queue->length > 0)
- pthread_cond_signal(&work_pending_cond);
-
mutex_unlock(&queue->lock);
pr_info("Executing work %s from queue %s, %d still pending\n",
static void *worker_thread(void *arg)
{
int ret;
-
char name[16];
- snprintf(name, sizeof(name), "worker%ld", (long)arg);
+ mutex_lock(&work_stats_mutex);
+ snprintf(name, sizeof(name), "worker%d", worker_count);
+ worker_count++;
+ mutex_unlock(&work_stats_mutex);
+
pthread_setname_np(pthread_self(), name);
+ pthread_detach(pthread_self());
- while (1) {
- while (1) {
- int prio, work_done = 0;
-
- /*
- * Execute as many works from the queues as
- * there are, starting from highest priority
- * queue
- */
- for (prio = 0; prio < WORK_PRIORITIES_NUM; prio++) {
- work_done =
- run_work_on_queue(&work_queues[prio]);
- if (work_done)
- break;
- }
+ pr_info("Worker started\n");
- if (!work_done)
- break;
- }
+ /* Execute all high priority work from the queue */
+ while (run_work_on_queue(&work_queues[WORK_PRIORITY_HIGH]))
+ ;
+ /*
+ * All high priority work is now done, see if we have enough
+ * workers executing low priority worl. Continue from there if
+ * needed.
+ */
+ mutex_lock(&work_stats_mutex);
+ if (workers_active > max_jobs)
+ goto out_unlock;
- pr_info("Worker going to sleep\n");
- ret = pthread_cond_wait(&work_pending_cond,
- &work_pending_mutex.lock);
- if (ret < 0)
- pr_err("Error: %m\n");
+ mutex_unlock(&work_stats_mutex);
- mutex_lock_acquired(&work_pending_mutex);
+ /*
+ * Start executing the low priority work. Drop the nice value
+ * as this really is low priority stuff
+ */
+ ret = nice(19);
+ pr_info("Worker priority dropped to %d\n", ret);
- mutex_unlock(&work_pending_mutex);
+ while (run_work_on_queue(&work_queues[WORK_PRIORITY_LOW]))
+ ;
- }
+ /* All done, exit */
+ mutex_lock(&work_stats_mutex);
+out_unlock:
+ workers_active--;
+ pr_info("Worker exiting, %d left active\n",
+ workers_active);
+
+ /*
+ * Last exiting worker zeroes the worker_count. This
+ * ensures next time we start spawning worker threads
+ * the first thread will have number zero on its name.
+ */
+ if (!workers_active)
+ worker_count = 0;
+
+ /*
+ * Kick the job poller. If all jobs were active at this point
+ * the scheduler thread will wait indefinitely until someone
+ * tells it to do something. We may now know when next job is
+ * available, so it is better for the scheduler to recalculate
+ * its sleep time.
+ */
+ notify_job_request();
+
+ mutex_unlock(&work_stats_mutex);
return NULL;
}
int queue_work(unsigned int priority, char *name,
int (work_fn)(void *arg), void *arg)
{
+ pthread_t *thread;
struct work_queue *queue;
struct work_struct *work, *last_work;
queue->length++;
mutex_unlock(&queue->lock);
- pthread_cond_signal(&work_pending_cond);
+ mutex_lock(&work_stats_mutex);
+ pr_info("workers_active: %d, priority: %d\n", workers_active, priority);
+ if (priority != WORK_PRIORITY_HIGH && workers_active >= max_jobs) {
+ mutex_unlock(&work_stats_mutex);
+ return 0;
+ }
+ workers_active++;
+ mutex_unlock(&work_stats_mutex);
+
+ pr_info("Creating new worker thread\n");
+ /* We need a worker thread, create one */
+ thread = calloc(sizeof(*thread), 1);
+ pthread_create(thread, NULL, worker_thread, NULL);
+
+ free(thread);
+
+ return 0;
+}
+
+static int job_notify_handler(struct event_handler *h)
+{
+ int ret;
+ char buf[64];
+
+ ret = read(job_notify_fd[0], buf, sizeof(buf));
+ if (ret < 0)
+ pr_err("Failed to read: %m\n");
return 0;
}
*/
int init_jobcontrol(int max_jobs_requested)
{
+ static struct event_handler ev;
FILE *file;
int ret;
char buf[256];
char match[8];
- pthread_t *thread;
- long int i;
epoll_fd = epoll_create(1);
if (epoll_fd == -1) {
max_jobs++;
}
+ ret = pipe(job_notify_fd);
+ if (ret) {
+ pr_err("pipe() failed: %m\n");
+ return ret;
+ }
+
+ ev.fd = job_notify_fd[0];
+ ev.events = EPOLLIN;
+ ev.handle_event = job_notify_handler;
+ ev.name = "job_notify";
+ register_event_handler(&ev, EPOLL_CTL_ADD);
+
open_fail:
read_fail:
fclose(file);
max_jobs_pending = max_jobs * 10 + 25;
pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
- /* Create worker threads */
- thread = calloc(sizeof(*thread), max_jobs);
- for (i = 0; i < max_jobs; i++)
- pthread_create(&thread[i], NULL, worker_thread, (void *)i);
-
return 0;
}
job_handler->handle_event(job_handler);
out:
- pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
+ pr_info("Workers active: %u\n", workers_active);
return ret;
}
+int notify_job_request(void)
+{
+ int ret;
+ char byte = 0;
+
+ ret = write(job_notify_fd[1], &byte, sizeof(byte));
+ if (ret < 0)
+ pr_err("Failed to write: %m\n");
+
+ return 0;
+}
+
int do_fork(void)
{
int child;
if (stdoutfd && pipe(ofd)) {
pr_err("pipe() failed: %m\n");
- return -1;
+ goto close_ifd;
}
if (stderrfd && pipe(efd)) {
pr_err("pipe() failed: %m\n");
- return -1;
+ goto close_ofd;
}
pid = do_fork();
exit(1);
return 0;
+
+close_ofd:
+ if (stdoutfd) {
+ close(ofd[0]);
+ close(ofd[1]);
+ }
+close_ifd:
+ if (stdinfd) {
+ close(ifd[0]);
+ close(ifd[1]);
+ }
+
+ return -1;
}
/*
pid = run_piped(cmd, argv, i, o, e);
if (stdinf) {
- *stdinf = fdopen(ifd, "r");
+ *stdinf = fdopen(ifd, "w");
if (*stdinf == NULL) {
pr_err("Error opening file stream for fd %d: %m\n",
ifd);
* Forks a child and executes a command to run on parallel
*/
-#define max(a,b) (a) < (b) ? (b) : (a)
#define BUF_SIZE (128*1024)
int run(const char *cmd, char *const argv[])
{
return 0;
}
-int register_event_handler(struct event_handler *handler)
+int register_event_handler(struct event_handler *handler, int op)
{
struct epoll_event ev;
int ret;
+ const char *str;
if (handler->fd <= 0) {
pr_err("Invalid file descriptor of %d\n", handler->fd);
return -1;
}
- pr_info("Registering handler for %s, fd %d\n",
+ switch (op) {
+ case EPOLL_CTL_ADD:
+ str = "register";
+ break;
+
+ case EPOLL_CTL_MOD:
+ str = "modify";
+ break;
+
+ case EPOLL_CTL_DEL:
+ str = "deregister";
+ break;
+
+ default:
+ pr_err("Invalid op %d\n", op);
+ return -1;
+ }
+
+ pr_info("Doing a epoll %s for handler %s, fd %d\n", str,
handler->name, handler->fd);
ev.data.fd = handler->fd;
ev.data.ptr = handler;
ev.events = handler->events;
- ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handler->fd, &ev);
+ ret = epoll_ctl(epoll_fd, op, handler->fd, &ev);
if (ret) {
- pr_err("Failed to add epoll_fd: %m\n");
+ pr_err("Failed to do epoll_ctl %s: %m\n", str);
return -1;
}
{
lock->line = line;
lock->file = file;
+ pthread_getname_np(pthread_self(),
+ lock->owner_name, sizeof(lock->owner_name));
}
int _mutex_lock(struct mutex *lock, char *file, int line)
{
int ret = 0;
+ int contended = 0;
if (!pthread_mutex_trylock(&lock->lock))
goto out_lock;
- pr_info("Lock contention on lock %s on %s:%d\n",
- lock->name, lock->file, lock->line);
+ contended = 1;
+ pr_debug("Lock contention at %s:%d on lock %s acquired by %s at %s:%d\n",
+ file, line, lock->name,
+ lock->owner_name, lock->file, lock->line);
ret = pthread_mutex_lock(&lock->lock);
if (ret)
- pr_err("Acquirin lock %s failed: %m, acquired %s:%d\n",
+ pr_err("Acquirin lock %s failed: %m, acquired on %s:%d\n",
lock->name, lock->file, lock->line);
out_lock:
+ if (contended)
+ pr_debug("Lock %s acquired at %s:%d after contention\n",
+ lock->name, file, line);
_mutex_lock_acquired(lock, file, line);
return ret;
}