From 5f5e86612232a4056f34d67eb13fdac08b8dd2b4 Mon Sep 17 00:00:00 2001 From: Timo Kokkonen Date: Fri, 8 Jul 2016 22:44:39 +0300 Subject: [PATCH] process.c: Introduce work queues Work queues are a way to execute function calls asynchronously. Each work is placed on a fifo queue where worker threads pick them up one by one. Priority levels are also supported, right now two levels are available. All high priority works are executed before workers pick up low priority works. Signed-off-by: Timo Kokkonen --- process.c | 175 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ process.h | 9 +++ 2 files changed, 184 insertions(+) diff --git a/process.c b/process.c index ad47196..77ec174 100644 --- a/process.c +++ b/process.c @@ -20,6 +20,165 @@ static unsigned int job_count; static unsigned int jobs_pending; static unsigned int max_jobs_pending; +struct work_struct { + const char *name; + int (*work_fn)(void *); + void *arg; + struct work_struct *next; +}; + +struct work_queue { + struct work_struct *work; + int length; + char *name; + struct mutex lock; +}; + +struct work_queue work_queues[WORK_PRIORITIES_NUM] = { + { + .name = "high priority", + .lock = { + .name = "high_prio_queue", + .lock = PTHREAD_MUTEX_INITIALIZER, + }, + }, + { + .name = "low priority", + .lock = { + .name = "low_prio_queue", + .lock = PTHREAD_MUTEX_INITIALIZER, + }, + }, +}; + +struct mutex work_pending_mutex = { + .name = "work_pending", + .lock = PTHREAD_MUTEX_INITIALIZER, +}; +pthread_cond_t work_pending_cond = PTHREAD_COND_INITIALIZER; + +static int run_work_on_queue(struct work_queue *queue) +{ + struct work_struct *work; + + mutex_lock(&queue->lock); + + if (!queue->work) { + pr_info("No work to run on queue %s\n", queue->name); + mutex_unlock(&queue->lock); + return 0; + } + + /* Take next work */ + work = queue->work; + queue->work = work->next; + queue->length--; + + /* + * If queue is not empty, try waking up more workers. It is + * possible that when work were queued, the first worker did + * not wake up soon enough and + */ + if (queue->length > 0) + pthread_cond_signal(&work_pending_cond); + + mutex_unlock(&queue->lock); + + pr_info("Executing work %s from queue %s, %d still pending\n", + work->name, queue->name, queue->length); + + work->work_fn(work->arg); + pr_info("Work %s done\n", work->name); + free(work); + + return 1; +} + +static void *worker_thread(void *arg) +{ + int ret; + + char name[16]; + + snprintf(name, sizeof(name), "worker%ld", (long)arg); + pthread_setname_np(pthread_self(), name); + + while (1) { + while (1) { + int prio, work_done = 0; + + /* + * Execute as many works from the queues as + * there are, starting from highest priority + * queue + */ + for (prio = 0; prio < WORK_PRIORITIES_NUM; prio++) { + work_done = + run_work_on_queue(&work_queues[prio]); + if (work_done) + break; + } + + if (!work_done) + break; + } + + pr_info("Worker going to sleep\n"); + ret = pthread_cond_wait(&work_pending_cond, + &work_pending_mutex.lock); + if (ret < 0) + pr_err("Error: %m\n"); + + mutex_lock_acquired(&work_pending_mutex); + + mutex_unlock(&work_pending_mutex); + + } + + return NULL; +} + +int queue_work(unsigned int priority, char *name, + int (work_fn)(void *arg), void *arg) +{ + struct work_queue *queue; + struct work_struct *work, *last_work; + + if (priority >= WORK_PRIORITIES_NUM) { + pr_err("Invalid priority: %d\n", priority); + return -EINVAL; + } + + work = calloc(sizeof(*work), 1); + + work->name = name; + work->work_fn = work_fn; + work->arg = arg; + + queue = &work_queues[priority]; + + /* Insert new work at the end of the work queue */ + mutex_lock(&queue->lock); + + last_work = queue->work; + while (last_work && last_work->next) + last_work = last_work->next; + + if (!last_work) + queue->work = work; + else + last_work->next = work; + + pr_info("Inserted work %s in queue %s, with %d pending items\n", + work->name, queue->name, queue->length); + queue->length++; + mutex_unlock(&queue->lock); + + pthread_cond_signal(&work_pending_cond); + + return 0; +} + int get_child_count(void) { return child_count; @@ -156,6 +315,8 @@ int init_jobcontrol(int max_jobs_requested) sigset_t sigmask; char buf[256]; char match[8]; + pthread_t *thread; + int i; if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) { pr_err("Failed to create pipe: %m\n"); @@ -232,6 +393,20 @@ no_count_cpus: max_jobs_pending = max_jobs * 10 + 25; pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending); + /* Create worker threads */ + thread = calloc(sizeof(*thread), max_jobs); + for (i = 0; i < max_jobs; i++) + pthread_create(&thread[i], NULL, worker_thread, (void *)i); + + /* + * Magic sleep. There are too many fork() calls at the moment + * so we must ensure our threads don't print anything out + * while a fork() is executed. Otherwise the child will + * inherit glibc internal locks while they are locked, and + * function calls such as printf will deadlock. + */ + sleep(1); + return 0; } diff --git a/process.h b/process.h index 2589ea9..c3b5c23 100644 --- a/process.h +++ b/process.h @@ -54,4 +54,13 @@ int _mutex_unlock(struct mutex *lock); #define mutex_unlock(lock) _mutex_unlock(lock) #define mutex_lock_acquired(lock) _mutex_lock_acquired(lock, __FILE__, __LINE__) +enum { + WORK_PRIORITY_HIGH, + WORK_PRIORITY_LOW, + WORK_PRIORITIES_NUM, +}; + +int queue_work(unsigned int priority, char *name, + int (work_fn)(void *arg), void *arg); + #endif -- 2.45.0