]> git.itanic.dy.fi Git - rrdd/blobdiff - process.c
process.c: Introduce work queues
[rrdd] / process.c
index ad47196501ecc3a70b773f252e170ba2dfabe9bb..77ec174945b109872163a96f2786f421f244bed6 100644 (file)
--- a/process.c
+++ b/process.c
@@ -20,6 +20,165 @@ static unsigned int job_count;
 static unsigned int jobs_pending;
 static unsigned int max_jobs_pending;
 
+struct work_struct {
+       const char *name;
+       int (*work_fn)(void *);
+       void *arg;
+       struct work_struct *next;
+};
+
+struct work_queue {
+       struct work_struct *work;
+       int length;
+       char *name;
+       struct mutex lock;
+};
+
+struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
+       {
+               .name = "high priority",
+               .lock = {
+                       .name = "high_prio_queue",
+                       .lock = PTHREAD_MUTEX_INITIALIZER,
+               },
+       },
+       {
+               .name = "low priority",
+               .lock = {
+                       .name = "low_prio_queue",
+                       .lock = PTHREAD_MUTEX_INITIALIZER,
+               },
+       },
+};
+
+struct mutex work_pending_mutex = {
+       .name = "work_pending",
+       .lock = PTHREAD_MUTEX_INITIALIZER,
+};
+pthread_cond_t work_pending_cond = PTHREAD_COND_INITIALIZER;
+
+static int run_work_on_queue(struct work_queue *queue)
+{
+       struct work_struct *work;
+
+       mutex_lock(&queue->lock);
+
+       if (!queue->work) {
+               pr_info("No  work to run on queue %s\n", queue->name);
+               mutex_unlock(&queue->lock);
+               return 0;
+       }
+
+       /* Take next work */
+       work = queue->work;
+       queue->work = work->next;
+       queue->length--;
+
+       /*
+        * If queue is not empty, try waking up more workers. It is
+        * possible that when work were queued, the first worker did
+        * not wake up soon enough and
+        */
+       if (queue->length > 0)
+               pthread_cond_signal(&work_pending_cond);
+
+       mutex_unlock(&queue->lock);
+
+       pr_info("Executing work %s from queue %s, %d still pending\n",
+               work->name, queue->name, queue->length);
+
+       work->work_fn(work->arg);
+       pr_info("Work %s done\n", work->name);
+       free(work);
+
+       return 1;
+}
+
+static void *worker_thread(void *arg)
+{
+       int ret;
+
+       char name[16];
+
+       snprintf(name, sizeof(name), "worker%ld", (long)arg);
+       pthread_setname_np(pthread_self(), name);
+
+       while (1) {
+               while (1) {
+                       int prio, work_done = 0;
+
+                       /*
+                        * Execute as many works from the queues as
+                        * there are, starting from highest priority
+                        * queue
+                        */
+                       for (prio = 0; prio < WORK_PRIORITIES_NUM; prio++) {
+                               work_done =
+                                       run_work_on_queue(&work_queues[prio]);
+                               if (work_done)
+                                       break;
+                       }
+
+                       if (!work_done)
+                               break;
+               }
+
+               pr_info("Worker going to sleep\n");
+               ret = pthread_cond_wait(&work_pending_cond,
+                                       &work_pending_mutex.lock);
+               if (ret < 0)
+                       pr_err("Error: %m\n");
+
+               mutex_lock_acquired(&work_pending_mutex);
+
+               mutex_unlock(&work_pending_mutex);
+
+       }
+
+       return NULL;
+}
+
+int queue_work(unsigned int priority, char *name,
+       int (work_fn)(void *arg), void *arg)
+{
+       struct work_queue *queue;
+       struct work_struct *work, *last_work;
+
+       if (priority >= WORK_PRIORITIES_NUM) {
+               pr_err("Invalid priority: %d\n", priority);
+               return -EINVAL;
+       }
+
+       work = calloc(sizeof(*work), 1);
+
+       work->name = name;
+       work->work_fn = work_fn;
+       work->arg = arg;
+
+       queue = &work_queues[priority];
+
+       /* Insert new work at the end of the work queue */
+       mutex_lock(&queue->lock);
+
+       last_work = queue->work;
+       while (last_work && last_work->next)
+               last_work = last_work->next;
+
+       if (!last_work)
+               queue->work = work;
+       else
+               last_work->next = work;
+
+       pr_info("Inserted work %s in queue %s, with %d pending items\n",
+               work->name, queue->name, queue->length);
+       queue->length++;
+       mutex_unlock(&queue->lock);
+
+       pthread_cond_signal(&work_pending_cond);
+
+       return 0;
+}
+
 int get_child_count(void)
 {
        return child_count;
@@ -156,6 +315,8 @@ int init_jobcontrol(int max_jobs_requested)
        sigset_t sigmask;
        char buf[256];
        char match[8];
+       pthread_t *thread;
+       int i;
 
        if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
                pr_err("Failed to create pipe: %m\n");
@@ -232,6 +393,20 @@ no_count_cpus:
        max_jobs_pending = max_jobs * 10 + 25;
        pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
 
+       /* Create worker threads */
+       thread = calloc(sizeof(*thread), max_jobs);
+       for (i = 0; i < max_jobs; i++)
+               pthread_create(&thread[i], NULL, worker_thread, (void *)i);
+
+       /*
+        * Magic sleep. There are too many fork() calls at the moment
+        * so we must ensure our threads don't print anything out
+        * while a fork() is executed. Otherwise the child will
+        * inherit glibc internal locks while they are locked, and
+        * function calls such as printf will deadlock.
+        */
+       sleep(1);
+
        return 0;
 }