]> git.itanic.dy.fi Git - rrdd/commitdiff
Merge branch 'master' of /home/git/rrdd
authorTimo Kokkonen <timo.t.kokkonen@iki.fi>
Sat, 9 Jul 2016 07:05:36 +0000 (10:05 +0300)
committerTimo Kokkonen <timo.t.kokkonen@iki.fi>
Sat, 9 Jul 2016 07:05:36 +0000 (10:05 +0300)
Signed-off-by: Timo Kokkonen <timo.t.kokkonen@iki.fi>
1  2 
process.c
rrdtool.c

diff --combined process.c
index b4319fe869aa92188c90762e987670c8362419ba,7de41b9abc2262908c543f36619bb871e73fffed..ae13277269dd4f68016fe7995a859bbd166ff70f
+++ b/process.c
@@@ -1,3 -1,4 +1,3 @@@
 -#define _GNU_SOURCE
  #include <unistd.h>
  #include <fcntl.h>
  #include <sys/select.h>
@@@ -20,165 -21,6 +20,165 @@@ static unsigned int job_count
  static unsigned int jobs_pending;
  static unsigned int max_jobs_pending;
  
 +struct work_struct {
 +      const char *name;
 +      int (*work_fn)(void *);
 +      void *arg;
 +      struct work_struct *next;
 +};
 +
 +struct work_queue {
 +      struct work_struct *work;
 +      int length;
 +      char *name;
 +      struct mutex lock;
 +};
 +
 +struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
 +      {
 +              .name = "high priority",
 +              .lock = {
 +                      .name = "high_prio_queue",
 +                      .lock = PTHREAD_MUTEX_INITIALIZER,
 +              },
 +      },
 +      {
 +              .name = "low priority",
 +              .lock = {
 +                      .name = "low_prio_queue",
 +                      .lock = PTHREAD_MUTEX_INITIALIZER,
 +              },
 +      },
 +};
 +
 +struct mutex work_pending_mutex = {
 +      .name = "work_pending",
 +      .lock = PTHREAD_MUTEX_INITIALIZER,
 +};
 +pthread_cond_t work_pending_cond = PTHREAD_COND_INITIALIZER;
 +
 +static int run_work_on_queue(struct work_queue *queue)
 +{
 +      struct work_struct *work;
 +
 +      mutex_lock(&queue->lock);
 +
 +      if (!queue->work) {
 +              pr_info("No  work to run on queue %s\n", queue->name);
 +              mutex_unlock(&queue->lock);
 +              return 0;
 +      }
 +
 +      /* Take next work */
 +      work = queue->work;
 +      queue->work = work->next;
 +      queue->length--;
 +
 +      /*
 +       * If queue is not empty, try waking up more workers. It is
 +       * possible that when work were queued, the first worker did
 +       * not wake up soon enough and
 +       */
 +      if (queue->length > 0)
 +              pthread_cond_signal(&work_pending_cond);
 +
 +      mutex_unlock(&queue->lock);
 +
 +      pr_info("Executing work %s from queue %s, %d still pending\n",
 +              work->name, queue->name, queue->length);
 +
 +      work->work_fn(work->arg);
 +      pr_info("Work %s done\n", work->name);
 +      free(work);
 +
 +      return 1;
 +}
 +
 +static void *worker_thread(void *arg)
 +{
 +      int ret;
 +
 +      char name[16];
 +
 +      snprintf(name, sizeof(name), "worker%ld", (long)arg);
 +      pthread_setname_np(pthread_self(), name);
 +
 +      while (1) {
 +              while (1) {
 +                      int prio, work_done = 0;
 +
 +                      /*
 +                       * Execute as many works from the queues as
 +                       * there are, starting from highest priority
 +                       * queue
 +                       */
 +                      for (prio = 0; prio < WORK_PRIORITIES_NUM; prio++) {
 +                              work_done =
 +                                      run_work_on_queue(&work_queues[prio]);
 +                              if (work_done)
 +                                      break;
 +                      }
 +
 +                      if (!work_done)
 +                              break;
 +              }
 +
 +              pr_info("Worker going to sleep\n");
 +              ret = pthread_cond_wait(&work_pending_cond,
 +                                      &work_pending_mutex.lock);
 +              if (ret < 0)
 +                      pr_err("Error: %m\n");
 +
 +              mutex_lock_acquired(&work_pending_mutex);
 +
 +              mutex_unlock(&work_pending_mutex);
 +
 +      }
 +
 +      return NULL;
 +}
 +
 +int queue_work(unsigned int priority, char *name,
 +      int (work_fn)(void *arg), void *arg)
 +{
 +      struct work_queue *queue;
 +      struct work_struct *work, *last_work;
 +
 +      if (priority >= WORK_PRIORITIES_NUM) {
 +              pr_err("Invalid priority: %d\n", priority);
 +              return -EINVAL;
 +      }
 +
 +      work = calloc(sizeof(*work), 1);
 +
 +      work->name = name;
 +      work->work_fn = work_fn;
 +      work->arg = arg;
 +
 +      queue = &work_queues[priority];
 +
 +      /* Insert new work at the end of the work queue */
 +      mutex_lock(&queue->lock);
 +
 +      last_work = queue->work;
 +      while (last_work && last_work->next)
 +              last_work = last_work->next;
 +
 +      if (!last_work)
 +              queue->work = work;
 +      else
 +              last_work->next = work;
 +
 +      pr_info("Inserted work %s in queue %s, with %d pending items\n",
 +              work->name, queue->name, queue->length);
 +      queue->length++;
 +      mutex_unlock(&queue->lock);
 +
 +      pthread_cond_signal(&work_pending_cond);
 +
 +      return 0;
 +}
 +
  int get_child_count(void)
  {
        return child_count;
@@@ -315,8 -157,6 +315,8 @@@ int init_jobcontrol(int max_jobs_reques
        sigset_t sigmask;
        char buf[256];
        char match[8];
 +      pthread_t *thread;
 +      int i;
  
        if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
                pr_err("Failed to create pipe: %m\n");
        sigemptyset(&sigmask);
        sigaddset(&sigmask, SIGCHLD);
  
 -      /* Block SIGCHLD so that it becomes readable via signalfd */
 -      ret = sigprocmask(SIG_BLOCK, &sigmask, NULL);
 -      if (ret < 0) {
 -              pr_err("Failed to sigprocmask: %m\n");
 -      }
 -
        signal_handler.fd = signalfd(-1, &sigmask, SFD_CLOEXEC);
        if (job_request_handler.fd < 0) {
                pr_err("Failed to create signal_fd: %m\n");
@@@ -387,11 -233,6 +387,11 @@@ no_count_cpus
        max_jobs_pending = max_jobs * 10 + 25;
        pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
  
 +      /* Create worker threads */
 +      thread = calloc(sizeof(*thread), max_jobs);
 +      for (i = 0; i < max_jobs; i++)
 +              pthread_create(&thread[i], NULL, worker_thread, (void *)i);
 +
        return 0;
  }
  
@@@ -580,7 -421,7 +580,7 @@@ int harvest_zombies(int pid
                return 0;
  
        if (pid)
 -              pr_debug("Waiting on pid %d, children left: %d\n", pid, 
 +              pr_debug("Waiting on pid %d, children left: %d\n", pid,
                        child_count);
  
        do {
@@@ -694,15 -535,15 +694,15 @@@ int run_piped_stream(const char *cmd, c
  
        if (stdinf)
                i = &ifd;
 -      else 
 +      else
                i = 0;
        if (stdoutf)
                o = &ofd;
 -      else 
 +      else
                o = 0;
        if (stderrf)
                e = &efd;
 -      else 
 +      else
                e = 0;
  
        pid = run_piped(cmd, argv, i, o, e);
@@@ -751,6 -592,9 +751,6 @@@ int run(const char *cmd, char *const ar
        int maxfd;
        int eof = 0;
  
 -      if ((child = do_fork()))
 -          return child;
 -
        child = run_piped(cmd, argv, NULL, &ofd, &efd);
  
        FD_ZERO(&rfds);
@@@ -805,7 -649,7 +805,7 @@@ print
                }
  
                sptr = eptr = rbuf;
 -              while(bytes--) {
 +              while (bytes--) {
                        if (*eptr == '\n') {
                                *eptr = 0;
                                if (is_stderr)
                                else
                                        pr_info("%s: stdout: %s\n",
                                                cmd, sptr);
-                               sptr = eptr;
+                               sptr = eptr + 1;
                        }
                        eptr++;
                }
        close(ofd);
        close(efd);
  
 -      harvest_zombies(child); 
 +      harvest_zombies(child);
  
 -      exit(1);
        return 0;
  }
  
@@@ -857,38 -702,3 +857,38 @@@ int register_event_handler(struct event
  
        return 0;
  }
 +
 +void _mutex_lock_acquired(struct mutex *lock, char *file, int line)
 +{
 +      lock->line = line;
 +      lock->file = file;
 +}
 +
 +int _mutex_lock(struct mutex *lock, char *file, int line)
 +{
 +      int ret = 0;
 +
 +      if (!pthread_mutex_trylock(&lock->lock))
 +              goto out_lock;
 +
 +      pr_info("Lock contention on lock %s on %s:%d\n",
 +              lock->name, lock->file, lock->line);
 +
 +      ret = pthread_mutex_lock(&lock->lock);
 +      if (ret)
 +              pr_err("Acquirin lock %s failed: %m, acquired %s:%d\n",
 +                      lock->name, lock->file, lock->line);
 +
 +out_lock:
 +      _mutex_lock_acquired(lock, file, line);
 +      return ret;
 +}
 +
 +int _mutex_unlock(struct mutex *lock)
 +{
 +      lock->line = 0;
 +      lock->file = NULL;
 +      pthread_mutex_unlock(&lock->lock);
 +
 +      return 0;
 +}
diff --combined rrdtool.c
index be6232177f909d8217e1f0fef8509b764f80b68b,909e1233ce43becfe2ac8b8ef475ec3ddd086705..c8754176fd66ac6855422356914103e7af6b1b47
+++ b/rrdtool.c
@@@ -45,6 -45,10 +45,6 @@@ int rrdtool_draw_image(struct rrd_imag
        time_t t = time(0);
        const char *updatestr = "Last update %d.%m.%Y %T (%Z)";
  
 -      pid = do_fork_limited();
 -      if (pid)
 -              return pid;
 -
        pr_info("Drawing image %s\n", image->image_filename);
  
        tmpfile[0] = 0;
  
        add_arg(args, argcnt, argstr, idx, "COMMENT: %s\\c", timestamp);
  
 -      pid = run(cmd, args);
 -      harvest_zombies(pid);
 +      run(cmd, args);
  
        rename(tmpfile, image->image_filename);
  
 -      exit(0);
 +      return 0;
  }
  
  int rrdtool_draw_images(struct rrd_image **image)
  {
        int i;
        for (i = 0; image[i]; i++)
 -              rrdtool_draw_image(image[i]);
 +              queue_work(WORK_PRIORITY_LOW, "rrdtool_draw_image",
 +                      rrdtool_draw_image, image[i]);
  
        return 0;
  }
@@@ -251,7 -255,7 +251,7 @@@ static int write_to_logfile(struct rrd_
        return ret < 0 ? ret : 0;
  }
  
 -int rrdtool_update_data(struct rrd_database *rrd)
 +static int do_rrdtool_update_data(struct rrd_database *rrd)
  {
        int pid;
        char data[RRD_DATA_MAX_LEN + 3]; /* 3 == "N:" + NULL termination */
        };
        int l;
  
 -      rrd->last_update = time(0);
 -      if (do_fork())
 -              return 0;
 -
        l = sprintf(data, "N:");
  
        if (rrd->parser && rrd->parser->parse) {
                sanitize_rrd_update_data(data + l);
                write_to_logfile(rrd, data);
  
 -              pid = run(cmd, cmdline);
 -              harvest_zombies(pid);
 +              run(cmd, cmdline);
        }
  
        if (rrd->pre_draw_cmd && !strcmp(rrd->pre_draw_cmd[0], "shell")) {
 -              pid = run(rrd->pre_draw_cmd[1], &rrd->pre_draw_cmd[1]);
 -              harvest_zombies(pid);
 +              run(rrd->pre_draw_cmd[1], &rrd->pre_draw_cmd[1]);
        }
  
        if (rrd->images)
                rrdtool_draw_images(rrd->images);
  
 -      while (harvest_zombies(0));
++      if (rrd->post_draw_cmd && !strcmp(rrd->post_draw_cmd[0], "shell"))
++              run(rrd->post_draw_cmd[1], &rrd->post_draw_cmd[1]);
 -      if (rrd->post_draw_cmd && !strcmp(rrd->post_draw_cmd[0], "shell")) {
 -              pid = run(rrd->post_draw_cmd[1], &rrd->post_draw_cmd[1]);
 -              harvest_zombies(pid);
 -      }
 +      return 0;
 +}
  
 -      exit(0);
 +int rrdtool_update_data(struct rrd_database *rrd)
 +{
 +      rrd->last_update = time(0);
 +
 +      return queue_work(WORK_PRIORITY_HIGH, "rrdtool_update_data",
 +                      do_rrdtool_update_data, rrd);
  }
  
  /*
@@@ -359,7 -368,7 +362,7 @@@ static int create_database(struct rrd_d
  //    char cmd[] = "echo";
        char *args[512], argstr[ARGSTR_LEN];
        int idx = 0, argcnt = 0;
 -      int child, i;
 +      int i;
  
        if (!db->filename) {
                pr_err("Database %s missing database filename\n", db->name);
                        db->archives[i].rows);
        }
  
 -      child = run(cmd, args);
 -
 -      harvest_zombies(child);
 +      run(cmd, args);
  
        return 0;
  }