]> git.itanic.dy.fi Git - rrdd/commitdiff
process.c: Introduce work queues
authorTimo Kokkonen <timo.t.kokkonen@iki.fi>
Fri, 8 Jul 2016 19:44:39 +0000 (22:44 +0300)
committerTimo Kokkonen <timo.t.kokkonen@iki.fi>
Fri, 8 Jul 2016 19:51:33 +0000 (22:51 +0300)
Work queues are a way to execute function calls asynchronously. Each
work is placed on a fifo queue where worker threads pick them up one
by one.

Priority levels are also supported, right now two levels are
available. All high priority works are executed before workers pick up
low priority works.

Signed-off-by: Timo Kokkonen <timo.t.kokkonen@iki.fi>
process.c
process.h

index ad47196501ecc3a70b773f252e170ba2dfabe9bb..77ec174945b109872163a96f2786f421f244bed6 100644 (file)
--- a/process.c
+++ b/process.c
@@ -20,6 +20,165 @@ static unsigned int job_count;
 static unsigned int jobs_pending;
 static unsigned int max_jobs_pending;
 
+struct work_struct {
+       const char *name;
+       int (*work_fn)(void *);
+       void *arg;
+       struct work_struct *next;
+};
+
+struct work_queue {
+       struct work_struct *work;
+       int length;
+       char *name;
+       struct mutex lock;
+};
+
+struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
+       {
+               .name = "high priority",
+               .lock = {
+                       .name = "high_prio_queue",
+                       .lock = PTHREAD_MUTEX_INITIALIZER,
+               },
+       },
+       {
+               .name = "low priority",
+               .lock = {
+                       .name = "low_prio_queue",
+                       .lock = PTHREAD_MUTEX_INITIALIZER,
+               },
+       },
+};
+
+struct mutex work_pending_mutex = {
+       .name = "work_pending",
+       .lock = PTHREAD_MUTEX_INITIALIZER,
+};
+pthread_cond_t work_pending_cond = PTHREAD_COND_INITIALIZER;
+
+static int run_work_on_queue(struct work_queue *queue)
+{
+       struct work_struct *work;
+
+       mutex_lock(&queue->lock);
+
+       if (!queue->work) {
+               pr_info("No  work to run on queue %s\n", queue->name);
+               mutex_unlock(&queue->lock);
+               return 0;
+       }
+
+       /* Take next work */
+       work = queue->work;
+       queue->work = work->next;
+       queue->length--;
+
+       /*
+        * If queue is not empty, try waking up more workers. It is
+        * possible that when work were queued, the first worker did
+        * not wake up soon enough and
+        */
+       if (queue->length > 0)
+               pthread_cond_signal(&work_pending_cond);
+
+       mutex_unlock(&queue->lock);
+
+       pr_info("Executing work %s from queue %s, %d still pending\n",
+               work->name, queue->name, queue->length);
+
+       work->work_fn(work->arg);
+       pr_info("Work %s done\n", work->name);
+       free(work);
+
+       return 1;
+}
+
+static void *worker_thread(void *arg)
+{
+       int ret;
+
+       char name[16];
+
+       snprintf(name, sizeof(name), "worker%ld", (long)arg);
+       pthread_setname_np(pthread_self(), name);
+
+       while (1) {
+               while (1) {
+                       int prio, work_done = 0;
+
+                       /*
+                        * Execute as many works from the queues as
+                        * there are, starting from highest priority
+                        * queue
+                        */
+                       for (prio = 0; prio < WORK_PRIORITIES_NUM; prio++) {
+                               work_done =
+                                       run_work_on_queue(&work_queues[prio]);
+                               if (work_done)
+                                       break;
+                       }
+
+                       if (!work_done)
+                               break;
+               }
+
+               pr_info("Worker going to sleep\n");
+               ret = pthread_cond_wait(&work_pending_cond,
+                                       &work_pending_mutex.lock);
+               if (ret < 0)
+                       pr_err("Error: %m\n");
+
+               mutex_lock_acquired(&work_pending_mutex);
+
+               mutex_unlock(&work_pending_mutex);
+
+       }
+
+       return NULL;
+}
+
+int queue_work(unsigned int priority, char *name,
+       int (work_fn)(void *arg), void *arg)
+{
+       struct work_queue *queue;
+       struct work_struct *work, *last_work;
+
+       if (priority >= WORK_PRIORITIES_NUM) {
+               pr_err("Invalid priority: %d\n", priority);
+               return -EINVAL;
+       }
+
+       work = calloc(sizeof(*work), 1);
+
+       work->name = name;
+       work->work_fn = work_fn;
+       work->arg = arg;
+
+       queue = &work_queues[priority];
+
+       /* Insert new work at the end of the work queue */
+       mutex_lock(&queue->lock);
+
+       last_work = queue->work;
+       while (last_work && last_work->next)
+               last_work = last_work->next;
+
+       if (!last_work)
+               queue->work = work;
+       else
+               last_work->next = work;
+
+       pr_info("Inserted work %s in queue %s, with %d pending items\n",
+               work->name, queue->name, queue->length);
+       queue->length++;
+       mutex_unlock(&queue->lock);
+
+       pthread_cond_signal(&work_pending_cond);
+
+       return 0;
+}
+
 int get_child_count(void)
 {
        return child_count;
@@ -156,6 +315,8 @@ int init_jobcontrol(int max_jobs_requested)
        sigset_t sigmask;
        char buf[256];
        char match[8];
+       pthread_t *thread;
+       int i;
 
        if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
                pr_err("Failed to create pipe: %m\n");
@@ -232,6 +393,20 @@ no_count_cpus:
        max_jobs_pending = max_jobs * 10 + 25;
        pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
 
+       /* Create worker threads */
+       thread = calloc(sizeof(*thread), max_jobs);
+       for (i = 0; i < max_jobs; i++)
+               pthread_create(&thread[i], NULL, worker_thread, (void *)i);
+
+       /*
+        * Magic sleep. There are too many fork() calls at the moment
+        * so we must ensure our threads don't print anything out
+        * while a fork() is executed. Otherwise the child will
+        * inherit glibc internal locks while they are locked, and
+        * function calls such as printf will deadlock.
+        */
+       sleep(1);
+
        return 0;
 }
 
index 2589ea9a5251174c6632bf0b87620912c54e1cc3..c3b5c23653381297400e88d6cad5b3498cd82ef8 100644 (file)
--- a/process.h
+++ b/process.h
@@ -54,4 +54,13 @@ int _mutex_unlock(struct mutex *lock);
 #define mutex_unlock(lock) _mutex_unlock(lock)
 #define mutex_lock_acquired(lock) _mutex_lock_acquired(lock, __FILE__, __LINE__)
 
+enum {
+       WORK_PRIORITY_HIGH,
+       WORK_PRIORITY_LOW,
+       WORK_PRIORITIES_NUM,
+};
+
+int queue_work(unsigned int priority, char *name,
+       int (work_fn)(void *arg), void *arg);
+
 #endif