]> git.itanic.dy.fi Git - rrdd/commitdiff
Merge branch 'master' of /home/git/rrdd
authorTimo Kokkonen <timo.t.kokkonen@iki.fi>
Sat, 9 Jul 2016 07:05:36 +0000 (10:05 +0300)
committerTimo Kokkonen <timo.t.kokkonen@iki.fi>
Sat, 9 Jul 2016 07:05:36 +0000 (10:05 +0300)
Signed-off-by: Timo Kokkonen <timo.t.kokkonen@iki.fi>
Makefile
debug.c
process.c
process.h
rrdtool.c

index cdf6dd746339baed0c5d5b9a8909b2f7fdd5421d..2cafd3d0345f3f32484fae90ff650f7890c63142 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -1,6 +1,6 @@
 CC=gcc
 LD=ld
-CFLAGS=-Wall -O2 -g -fPIC
+CFLAGS=-Wall -O2 -g -fPIC -D_GNU_SOURCE
 
 RRDD_OBJS= main.o process.o rrdtool.o parser.o built_in_parsers.o string.o \
                debug.o config.o plugin_manager.o
@@ -27,7 +27,8 @@ default: rrdd
 all: rrdd $(ALL_PARSERS)
 
 rrdd: $(RRDD_OBJS)
-       $(QUIET_LINK)$(CC) -o rrdd $(RRDD_OBJS) -lconfig -ldl -rdynamic
+       $(QUIET_LINK)$(CC) -o rrdd $(RRDD_OBJS) -lconfig -ldl -rdynamic \
+               -lpthread
 
 onewire_parser.so: $(ONEWIRE_PARSER_OBJS)
        $(QUIET_LINK)$(CC) $(CFLAGS) -lownet -shared -fPIC $< -o $@
diff --git a/debug.c b/debug.c
index 8bc70fc605bcb499a61c7dcaf7b12a5c11f92324..fe0fc9b139ba661d1c42df4adf7966f47020c985 100644 (file)
--- a/debug.c
+++ b/debug.c
@@ -4,8 +4,9 @@
 #include <time.h>
 #include <unistd.h>
 #include <stdarg.h>
-#include "process.h"
+#include <pthread.h>
 
+#include "process.h"
 #include "debug.h"
 
 const char red_color[] = {
@@ -43,6 +44,7 @@ void print_trace(const char *file, int line, int color, int l,
        char time[32];
        char trace[1024];
        char all_trace[1538];
+       char thread_name[16];
        const char *color_str = assign_color(color);
        int ret;
 
@@ -55,10 +57,13 @@ void print_trace(const char *file, int line, int color, int l,
 
        strftime(time, sizeof(time), "%d.%m.%Y %T", localtime(&t));
 
+       pthread_getname_np(pthread_self(), thread_name, sizeof(thread_name));
+
        ret = snprintf(all_trace, sizeof(all_trace),
-               "%s%s [%5d.%d] %s:%d %s %s",
+               "%s%s [%5d.%d] %s %s:%d %s %s",
                color_str, time,
                getpid(), get_parent_count(),
+               thread_name,
                file, line, normal_color, trace);
 
        ret = write(logfile_fd, all_trace, ret);
index 7de41b9abc2262908c543f36619bb871e73fffed..ae13277269dd4f68016fe7995a859bbd166ff70f 100644 (file)
--- a/process.c
+++ b/process.c
@@ -1,4 +1,3 @@
-#define _GNU_SOURCE
 #include <unistd.h>
 #include <fcntl.h>
 #include <sys/select.h>
@@ -21,6 +20,165 @@ static unsigned int job_count;
 static unsigned int jobs_pending;
 static unsigned int max_jobs_pending;
 
+struct work_struct {
+       const char *name;
+       int (*work_fn)(void *);
+       void *arg;
+       struct work_struct *next;
+};
+
+struct work_queue {
+       struct work_struct *work;
+       int length;
+       char *name;
+       struct mutex lock;
+};
+
+struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
+       {
+               .name = "high priority",
+               .lock = {
+                       .name = "high_prio_queue",
+                       .lock = PTHREAD_MUTEX_INITIALIZER,
+               },
+       },
+       {
+               .name = "low priority",
+               .lock = {
+                       .name = "low_prio_queue",
+                       .lock = PTHREAD_MUTEX_INITIALIZER,
+               },
+       },
+};
+
+struct mutex work_pending_mutex = {
+       .name = "work_pending",
+       .lock = PTHREAD_MUTEX_INITIALIZER,
+};
+pthread_cond_t work_pending_cond = PTHREAD_COND_INITIALIZER;
+
+static int run_work_on_queue(struct work_queue *queue)
+{
+       struct work_struct *work;
+
+       mutex_lock(&queue->lock);
+
+       if (!queue->work) {
+               pr_info("No  work to run on queue %s\n", queue->name);
+               mutex_unlock(&queue->lock);
+               return 0;
+       }
+
+       /* Take next work */
+       work = queue->work;
+       queue->work = work->next;
+       queue->length--;
+
+       /*
+        * If queue is not empty, try waking up more workers. It is
+        * possible that when work were queued, the first worker did
+        * not wake up soon enough and
+        */
+       if (queue->length > 0)
+               pthread_cond_signal(&work_pending_cond);
+
+       mutex_unlock(&queue->lock);
+
+       pr_info("Executing work %s from queue %s, %d still pending\n",
+               work->name, queue->name, queue->length);
+
+       work->work_fn(work->arg);
+       pr_info("Work %s done\n", work->name);
+       free(work);
+
+       return 1;
+}
+
+static void *worker_thread(void *arg)
+{
+       int ret;
+
+       char name[16];
+
+       snprintf(name, sizeof(name), "worker%ld", (long)arg);
+       pthread_setname_np(pthread_self(), name);
+
+       while (1) {
+               while (1) {
+                       int prio, work_done = 0;
+
+                       /*
+                        * Execute as many works from the queues as
+                        * there are, starting from highest priority
+                        * queue
+                        */
+                       for (prio = 0; prio < WORK_PRIORITIES_NUM; prio++) {
+                               work_done =
+                                       run_work_on_queue(&work_queues[prio]);
+                               if (work_done)
+                                       break;
+                       }
+
+                       if (!work_done)
+                               break;
+               }
+
+               pr_info("Worker going to sleep\n");
+               ret = pthread_cond_wait(&work_pending_cond,
+                                       &work_pending_mutex.lock);
+               if (ret < 0)
+                       pr_err("Error: %m\n");
+
+               mutex_lock_acquired(&work_pending_mutex);
+
+               mutex_unlock(&work_pending_mutex);
+
+       }
+
+       return NULL;
+}
+
+int queue_work(unsigned int priority, char *name,
+       int (work_fn)(void *arg), void *arg)
+{
+       struct work_queue *queue;
+       struct work_struct *work, *last_work;
+
+       if (priority >= WORK_PRIORITIES_NUM) {
+               pr_err("Invalid priority: %d\n", priority);
+               return -EINVAL;
+       }
+
+       work = calloc(sizeof(*work), 1);
+
+       work->name = name;
+       work->work_fn = work_fn;
+       work->arg = arg;
+
+       queue = &work_queues[priority];
+
+       /* Insert new work at the end of the work queue */
+       mutex_lock(&queue->lock);
+
+       last_work = queue->work;
+       while (last_work && last_work->next)
+               last_work = last_work->next;
+
+       if (!last_work)
+               queue->work = work;
+       else
+               last_work->next = work;
+
+       pr_info("Inserted work %s in queue %s, with %d pending items\n",
+               work->name, queue->name, queue->length);
+       queue->length++;
+       mutex_unlock(&queue->lock);
+
+       pthread_cond_signal(&work_pending_cond);
+
+       return 0;
+}
+
 int get_child_count(void)
 {
        return child_count;
@@ -157,6 +315,8 @@ int init_jobcontrol(int max_jobs_requested)
        sigset_t sigmask;
        char buf[256];
        char match[8];
+       pthread_t *thread;
+       int i;
 
        if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
                pr_err("Failed to create pipe: %m\n");
@@ -180,12 +340,6 @@ int init_jobcontrol(int max_jobs_requested)
        sigemptyset(&sigmask);
        sigaddset(&sigmask, SIGCHLD);
 
-       /* Block SIGCHLD so that it becomes readable via signalfd */
-       ret = sigprocmask(SIG_BLOCK, &sigmask, NULL);
-       if (ret < 0) {
-               pr_err("Failed to sigprocmask: %m\n");
-       }
-
        signal_handler.fd = signalfd(-1, &sigmask, SFD_CLOEXEC);
        if (job_request_handler.fd < 0) {
                pr_err("Failed to create signal_fd: %m\n");
@@ -233,6 +387,11 @@ no_count_cpus:
        max_jobs_pending = max_jobs * 10 + 25;
        pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
 
+       /* Create worker threads */
+       thread = calloc(sizeof(*thread), max_jobs);
+       for (i = 0; i < max_jobs; i++)
+               pthread_create(&thread[i], NULL, worker_thread, (void *)i);
+
        return 0;
 }
 
@@ -421,7 +580,7 @@ int harvest_zombies(int pid)
                return 0;
 
        if (pid)
-               pr_debug("Waiting on pid %d, children left: %d\n", pid, 
+               pr_debug("Waiting on pid %d, children left: %d\n", pid,
                        child_count);
 
        do {
@@ -535,15 +694,15 @@ int run_piped_stream(const char *cmd, char *const argv[],
 
        if (stdinf)
                i = &ifd;
-       else 
+       else
                i = 0;
        if (stdoutf)
                o = &ofd;
-       else 
+       else
                o = 0;
        if (stderrf)
                e = &efd;
-       else 
+       else
                e = 0;
 
        pid = run_piped(cmd, argv, i, o, e);
@@ -592,9 +751,6 @@ int run(const char *cmd, char *const argv[])
        int maxfd;
        int eof = 0;
 
-       if ((child = do_fork()))
-           return child;
-
        child = run_piped(cmd, argv, NULL, &ofd, &efd);
 
        FD_ZERO(&rfds);
@@ -649,7 +805,7 @@ print:
                }
 
                sptr = eptr = rbuf;
-               while(bytes--) {
+               while (bytes--) {
                        if (*eptr == '\n') {
                                *eptr = 0;
                                if (is_stderr)
@@ -667,9 +823,8 @@ print:
        close(ofd);
        close(efd);
 
-       harvest_zombies(child); 
+       harvest_zombies(child);
 
-       exit(1);
        return 0;
 }
 
@@ -702,3 +857,38 @@ int register_event_handler(struct event_handler *handler)
 
        return 0;
 }
+
+void _mutex_lock_acquired(struct mutex *lock, char *file, int line)
+{
+       lock->line = line;
+       lock->file = file;
+}
+
+int _mutex_lock(struct mutex *lock, char *file, int line)
+{
+       int ret = 0;
+
+       if (!pthread_mutex_trylock(&lock->lock))
+               goto out_lock;
+
+       pr_info("Lock contention on lock %s on %s:%d\n",
+               lock->name, lock->file, lock->line);
+
+       ret = pthread_mutex_lock(&lock->lock);
+       if (ret)
+               pr_err("Acquirin lock %s failed: %m, acquired %s:%d\n",
+                       lock->name, lock->file, lock->line);
+
+out_lock:
+       _mutex_lock_acquired(lock, file, line);
+       return ret;
+}
+
+int _mutex_unlock(struct mutex *lock)
+{
+       lock->line = 0;
+       lock->file = NULL;
+       pthread_mutex_unlock(&lock->lock);
+
+       return 0;
+}
index ae9db12b97457f90e5d4256c0e42e3ba5ffdb506..c3b5c23653381297400e88d6cad5b3498cd82ef8 100644 (file)
--- a/process.h
+++ b/process.h
@@ -9,6 +9,7 @@
 #include <error.h>
 #include <errno.h>
 #include <stdint.h>
+#include <pthread.h>
 
 struct event_handler;
 
@@ -21,6 +22,14 @@ struct event_handler {
        char *name;
 };
 
+struct mutex {
+       pthread_mutex_t lock;
+       int line;
+       char *file;
+       time_t lock_time;
+       char *name;
+};
+
 int register_event_handler(struct event_handler *handler);
 
 int get_child_count(void);
@@ -37,5 +46,21 @@ int run_piped(const char *cmd, char *const argv[],
 int run_piped_stream(const char *cmd, char *const argv[],
                     FILE **stdinf, FILE **stdoutf, FILE **stderrf);
 
+void _mutex_lock_acquired(struct mutex *lock, char *file, int line);
+int _mutex_lock(struct mutex *lock, char *file, int line);
+int _mutex_unlock(struct mutex *lock);
+
+#define mutex_lock(lock) _mutex_lock(lock, __FILE__, __LINE__)
+#define mutex_unlock(lock) _mutex_unlock(lock)
+#define mutex_lock_acquired(lock) _mutex_lock_acquired(lock, __FILE__, __LINE__)
+
+enum {
+       WORK_PRIORITY_HIGH,
+       WORK_PRIORITY_LOW,
+       WORK_PRIORITIES_NUM,
+};
+
+int queue_work(unsigned int priority, char *name,
+       int (work_fn)(void *arg), void *arg);
 
 #endif
index 909e1233ce43becfe2ac8b8ef475ec3ddd086705..c8754176fd66ac6855422356914103e7af6b1b47 100644 (file)
--- a/rrdtool.c
+++ b/rrdtool.c
@@ -45,10 +45,6 @@ int rrdtool_draw_image(struct rrd_image *image)
        time_t t = time(0);
        const char *updatestr = "Last update %d.%m.%Y %T (%Z)";
 
-       pid = do_fork_limited();
-       if (pid)
-               return pid;
-
        pr_info("Drawing image %s\n", image->image_filename);
 
        tmpfile[0] = 0;
@@ -95,19 +91,19 @@ int rrdtool_draw_image(struct rrd_image *image)
 
        add_arg(args, argcnt, argstr, idx, "COMMENT: %s\\c", timestamp);
 
-       pid = run(cmd, args);
-       harvest_zombies(pid);
+       run(cmd, args);
 
        rename(tmpfile, image->image_filename);
 
-       exit(0);
+       return 0;
 }
 
 int rrdtool_draw_images(struct rrd_image **image)
 {
        int i;
        for (i = 0; image[i]; i++)
-               rrdtool_draw_image(image[i]);
+               queue_work(WORK_PRIORITY_LOW, "rrdtool_draw_image",
+                       rrdtool_draw_image, image[i]);
 
        return 0;
 }
@@ -255,7 +251,7 @@ static int write_to_logfile(struct rrd_database *rrd, const char *data)
        return ret < 0 ? ret : 0;
 }
 
-int rrdtool_update_data(struct rrd_database *rrd)
+static int do_rrdtool_update_data(struct rrd_database *rrd)
 {
        int pid;
        char data[RRD_DATA_MAX_LEN + 3]; /* 3 == "N:" + NULL termination */
@@ -270,10 +266,6 @@ int rrdtool_update_data(struct rrd_database *rrd)
        };
        int l;
 
-       rrd->last_update = time(0);
-       if (do_fork())
-               return 0;
-
        l = sprintf(data, "N:");
 
        if (rrd->parser && rrd->parser->parse) {
@@ -285,26 +277,28 @@ int rrdtool_update_data(struct rrd_database *rrd)
                sanitize_rrd_update_data(data + l);
                write_to_logfile(rrd, data);
 
-               pid = run(cmd, cmdline);
-               harvest_zombies(pid);
+               run(cmd, cmdline);
        }
 
        if (rrd->pre_draw_cmd && !strcmp(rrd->pre_draw_cmd[0], "shell")) {
-               pid = run(rrd->pre_draw_cmd[1], &rrd->pre_draw_cmd[1]);
-               harvest_zombies(pid);
+               run(rrd->pre_draw_cmd[1], &rrd->pre_draw_cmd[1]);
        }
 
        if (rrd->images)
                rrdtool_draw_images(rrd->images);
 
-       while (harvest_zombies(0));
+       if (rrd->post_draw_cmd && !strcmp(rrd->post_draw_cmd[0], "shell"))
+               run(rrd->post_draw_cmd[1], &rrd->post_draw_cmd[1]);
 
-       if (rrd->post_draw_cmd && !strcmp(rrd->post_draw_cmd[0], "shell")) {
-               pid = run(rrd->post_draw_cmd[1], &rrd->post_draw_cmd[1]);
-               harvest_zombies(pid);
-       }
+       return 0;
+}
 
-       exit(0);
+int rrdtool_update_data(struct rrd_database *rrd)
+{
+       rrd->last_update = time(0);
+
+       return queue_work(WORK_PRIORITY_HIGH, "rrdtool_update_data",
+                       do_rrdtool_update_data, rrd);
 }
 
 /*
@@ -368,7 +362,7 @@ static int create_database(struct rrd_database *db)
 //     char cmd[] = "echo";
        char *args[512], argstr[ARGSTR_LEN];
        int idx = 0, argcnt = 0;
-       int child, i;
+       int i;
 
        if (!db->filename) {
                pr_err("Database %s missing database filename\n", db->name);
@@ -404,9 +398,7 @@ static int create_database(struct rrd_database *db)
                        db->archives[i].rows);
        }
 
-       child = run(cmd, args);
-
-       harvest_zombies(child);
+       run(cmd, args);
 
        return 0;
 }