]> git.itanic.dy.fi Git - rrdd/blobdiff - process.c
Merge branch 'master' of /home/git/rrdd
[rrdd] / process.c
index 7de41b9abc2262908c543f36619bb871e73fffed..ae13277269dd4f68016fe7995a859bbd166ff70f 100644 (file)
--- a/process.c
+++ b/process.c
@@ -1,4 +1,3 @@
-#define _GNU_SOURCE
 #include <unistd.h>
 #include <fcntl.h>
 #include <sys/select.h>
@@ -21,6 +20,165 @@ static unsigned int job_count;
 static unsigned int jobs_pending;
 static unsigned int max_jobs_pending;
 
+struct work_struct {
+       const char *name;
+       int (*work_fn)(void *);
+       void *arg;
+       struct work_struct *next;
+};
+
+struct work_queue {
+       struct work_struct *work;
+       int length;
+       char *name;
+       struct mutex lock;
+};
+
+struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
+       {
+               .name = "high priority",
+               .lock = {
+                       .name = "high_prio_queue",
+                       .lock = PTHREAD_MUTEX_INITIALIZER,
+               },
+       },
+       {
+               .name = "low priority",
+               .lock = {
+                       .name = "low_prio_queue",
+                       .lock = PTHREAD_MUTEX_INITIALIZER,
+               },
+       },
+};
+
+struct mutex work_pending_mutex = {
+       .name = "work_pending",
+       .lock = PTHREAD_MUTEX_INITIALIZER,
+};
+pthread_cond_t work_pending_cond = PTHREAD_COND_INITIALIZER;
+
+static int run_work_on_queue(struct work_queue *queue)
+{
+       struct work_struct *work;
+
+       mutex_lock(&queue->lock);
+
+       if (!queue->work) {
+               pr_info("No  work to run on queue %s\n", queue->name);
+               mutex_unlock(&queue->lock);
+               return 0;
+       }
+
+       /* Take next work */
+       work = queue->work;
+       queue->work = work->next;
+       queue->length--;
+
+       /*
+        * If queue is not empty, try waking up more workers. It is
+        * possible that when work were queued, the first worker did
+        * not wake up soon enough and
+        */
+       if (queue->length > 0)
+               pthread_cond_signal(&work_pending_cond);
+
+       mutex_unlock(&queue->lock);
+
+       pr_info("Executing work %s from queue %s, %d still pending\n",
+               work->name, queue->name, queue->length);
+
+       work->work_fn(work->arg);
+       pr_info("Work %s done\n", work->name);
+       free(work);
+
+       return 1;
+}
+
+static void *worker_thread(void *arg)
+{
+       int ret;
+
+       char name[16];
+
+       snprintf(name, sizeof(name), "worker%ld", (long)arg);
+       pthread_setname_np(pthread_self(), name);
+
+       while (1) {
+               while (1) {
+                       int prio, work_done = 0;
+
+                       /*
+                        * Execute as many works from the queues as
+                        * there are, starting from highest priority
+                        * queue
+                        */
+                       for (prio = 0; prio < WORK_PRIORITIES_NUM; prio++) {
+                               work_done =
+                                       run_work_on_queue(&work_queues[prio]);
+                               if (work_done)
+                                       break;
+                       }
+
+                       if (!work_done)
+                               break;
+               }
+
+               pr_info("Worker going to sleep\n");
+               ret = pthread_cond_wait(&work_pending_cond,
+                                       &work_pending_mutex.lock);
+               if (ret < 0)
+                       pr_err("Error: %m\n");
+
+               mutex_lock_acquired(&work_pending_mutex);
+
+               mutex_unlock(&work_pending_mutex);
+
+       }
+
+       return NULL;
+}
+
+int queue_work(unsigned int priority, char *name,
+       int (work_fn)(void *arg), void *arg)
+{
+       struct work_queue *queue;
+       struct work_struct *work, *last_work;
+
+       if (priority >= WORK_PRIORITIES_NUM) {
+               pr_err("Invalid priority: %d\n", priority);
+               return -EINVAL;
+       }
+
+       work = calloc(sizeof(*work), 1);
+
+       work->name = name;
+       work->work_fn = work_fn;
+       work->arg = arg;
+
+       queue = &work_queues[priority];
+
+       /* Insert new work at the end of the work queue */
+       mutex_lock(&queue->lock);
+
+       last_work = queue->work;
+       while (last_work && last_work->next)
+               last_work = last_work->next;
+
+       if (!last_work)
+               queue->work = work;
+       else
+               last_work->next = work;
+
+       pr_info("Inserted work %s in queue %s, with %d pending items\n",
+               work->name, queue->name, queue->length);
+       queue->length++;
+       mutex_unlock(&queue->lock);
+
+       pthread_cond_signal(&work_pending_cond);
+
+       return 0;
+}
+
 int get_child_count(void)
 {
        return child_count;
@@ -157,6 +315,8 @@ int init_jobcontrol(int max_jobs_requested)
        sigset_t sigmask;
        char buf[256];
        char match[8];
+       pthread_t *thread;
+       int i;
 
        if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
                pr_err("Failed to create pipe: %m\n");
@@ -180,12 +340,6 @@ int init_jobcontrol(int max_jobs_requested)
        sigemptyset(&sigmask);
        sigaddset(&sigmask, SIGCHLD);
 
-       /* Block SIGCHLD so that it becomes readable via signalfd */
-       ret = sigprocmask(SIG_BLOCK, &sigmask, NULL);
-       if (ret < 0) {
-               pr_err("Failed to sigprocmask: %m\n");
-       }
-
        signal_handler.fd = signalfd(-1, &sigmask, SFD_CLOEXEC);
        if (job_request_handler.fd < 0) {
                pr_err("Failed to create signal_fd: %m\n");
@@ -233,6 +387,11 @@ no_count_cpus:
        max_jobs_pending = max_jobs * 10 + 25;
        pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
 
+       /* Create worker threads */
+       thread = calloc(sizeof(*thread), max_jobs);
+       for (i = 0; i < max_jobs; i++)
+               pthread_create(&thread[i], NULL, worker_thread, (void *)i);
+
        return 0;
 }
 
@@ -421,7 +580,7 @@ int harvest_zombies(int pid)
                return 0;
 
        if (pid)
-               pr_debug("Waiting on pid %d, children left: %d\n", pid, 
+               pr_debug("Waiting on pid %d, children left: %d\n", pid,
                        child_count);
 
        do {
@@ -535,15 +694,15 @@ int run_piped_stream(const char *cmd, char *const argv[],
 
        if (stdinf)
                i = &ifd;
-       else 
+       else
                i = 0;
        if (stdoutf)
                o = &ofd;
-       else 
+       else
                o = 0;
        if (stderrf)
                e = &efd;
-       else 
+       else
                e = 0;
 
        pid = run_piped(cmd, argv, i, o, e);
@@ -592,9 +751,6 @@ int run(const char *cmd, char *const argv[])
        int maxfd;
        int eof = 0;
 
-       if ((child = do_fork()))
-           return child;
-
        child = run_piped(cmd, argv, NULL, &ofd, &efd);
 
        FD_ZERO(&rfds);
@@ -649,7 +805,7 @@ print:
                }
 
                sptr = eptr = rbuf;
-               while(bytes--) {
+               while (bytes--) {
                        if (*eptr == '\n') {
                                *eptr = 0;
                                if (is_stderr)
@@ -667,9 +823,8 @@ print:
        close(ofd);
        close(efd);
 
-       harvest_zombies(child); 
+       harvest_zombies(child);
 
-       exit(1);
        return 0;
 }
 
@@ -702,3 +857,38 @@ int register_event_handler(struct event_handler *handler)
 
        return 0;
 }
+
+void _mutex_lock_acquired(struct mutex *lock, char *file, int line)
+{
+       lock->line = line;
+       lock->file = file;
+}
+
+int _mutex_lock(struct mutex *lock, char *file, int line)
+{
+       int ret = 0;
+
+       if (!pthread_mutex_trylock(&lock->lock))
+               goto out_lock;
+
+       pr_info("Lock contention on lock %s on %s:%d\n",
+               lock->name, lock->file, lock->line);
+
+       ret = pthread_mutex_lock(&lock->lock);
+       if (ret)
+               pr_err("Acquirin lock %s failed: %m, acquired %s:%d\n",
+                       lock->name, lock->file, lock->line);
+
+out_lock:
+       _mutex_lock_acquired(lock, file, line);
+       return ret;
+}
+
+int _mutex_unlock(struct mutex *lock)
+{
+       lock->line = 0;
+       lock->file = NULL;
+       pthread_mutex_unlock(&lock->lock);
+
+       return 0;
+}