]> git.itanic.dy.fi Git - rrdd/blobdiff - process.c
onewire_parser.c: Fix compiler warnings about string lengths
[rrdd] / process.c
index a54f3dd0ffb5dd00e00823d07b2c6f700ccdb7b0..f7477b532697fa257c79e22927629a7974c717c7 100644 (file)
--- a/process.c
+++ b/process.c
@@ -53,6 +53,8 @@ struct mutex work_stats_mutex = {
 static int workers_active;
 static int worker_count;
 
+static int job_notify_fd[2];
+
 static int run_work_on_queue(struct work_queue *queue)
 {
        struct work_struct *work;
@@ -135,6 +137,15 @@ out_unlock:
        if (!workers_active)
                worker_count = 0;
 
+       /*
+        * Kick the job poller. If all jobs were active at this point
+        * the scheduler thread will wait indefinitely until someone
+        * tells it to do something. We may now know when next job is
+        * available, so it is better for the scheduler to recalculate
+        * its sleep time.
+        */
+       notify_job_request();
+
        mutex_unlock(&work_stats_mutex);
 
        return NULL;
@@ -196,6 +207,18 @@ int queue_work(unsigned int priority, char *name,
        return 0;
 }
 
+static int job_notify_handler(struct event_handler *h)
+{
+       int ret;
+       char buf[64];
+
+       ret = read(job_notify_fd[0], buf, sizeof(buf));
+       if (ret < 0)
+               pr_err("Failed to read: %m\n");
+
+       return 0;
+}
+
 /*
  * Initialize the jobcontrol.
  *
@@ -205,6 +228,7 @@ int queue_work(unsigned int priority, char *name,
  */
 int init_jobcontrol(int max_jobs_requested)
 {
+       static struct event_handler ev;
        FILE *file;
        int ret;
        char buf[256];
@@ -245,6 +269,18 @@ int init_jobcontrol(int max_jobs_requested)
                        max_jobs++;
        }
 
+       ret = pipe(job_notify_fd);
+       if (ret) {
+               pr_err("pipe() failed: %m\n");
+               return ret;
+       }
+
+       ev.fd = job_notify_fd[0];
+       ev.events = EPOLLIN;
+       ev.handle_event = job_notify_handler;
+       ev.name = "job_notify";
+       register_event_handler(&ev, EPOLL_CTL_ADD);
+
 open_fail:
 read_fail:
        fclose(file);
@@ -304,6 +340,18 @@ out:
        return ret;
 }
 
+int notify_job_request(void)
+{
+       int ret;
+       char byte = 0;
+
+       ret = write(job_notify_fd[1], &byte, sizeof(byte));
+       if (ret < 0)
+               pr_err("Failed to write: %m\n");
+
+       return 0;
+}
+
 int do_fork(void)
 {
        int child;
@@ -376,12 +424,12 @@ int run_piped(const char *cmd, char *const argv[],
 
        if (stdoutfd && pipe(ofd)) {
                pr_err("pipe() failed: %m\n");
-               return -1;
+               goto close_ifd;
        }
 
        if (stderrfd && pipe(efd)) {
                pr_err("pipe() failed: %m\n");
-               return -1;
+               goto close_ofd;
        }
 
        pid = do_fork();
@@ -425,6 +473,19 @@ int run_piped(const char *cmd, char *const argv[],
        exit(1);
 
        return 0;
+
+close_ofd:
+       if (stdoutfd) {
+               close(ofd[0]);
+               close(ofd[1]);
+       }
+close_ifd:
+       if (stdinfd) {
+               close(ifd[0]);
+               close(ifd[1]);
+       }
+
+       return -1;
 }
 
 /*
@@ -455,7 +516,7 @@ int run_piped_stream(const char *cmd, char *const argv[],
        pid = run_piped(cmd, argv, i, o, e);
 
        if (stdinf) {
-               *stdinf = fdopen(ifd, "r");
+               *stdinf = fdopen(ifd, "w");
                if (*stdinf == NULL) {
                        pr_err("Error opening file stream for fd %d: %m\n",
                               ifd);
@@ -574,10 +635,11 @@ print:
        return 0;
 }
 
-int register_event_handler(struct event_handler *handler)
+int register_event_handler(struct event_handler *handler, int op)
 {
        struct epoll_event ev;
        int ret;
+       const char *str;
 
        if (handler->fd <= 0) {
                pr_err("Invalid file descriptor of %d\n", handler->fd);
@@ -589,15 +651,33 @@ int register_event_handler(struct event_handler *handler)
                return -1;
        }
 
-       pr_info("Registering handler for %s, fd %d\n",
+       switch (op) {
+       case EPOLL_CTL_ADD:
+               str = "register";
+               break;
+
+       case EPOLL_CTL_MOD:
+               str = "modify";
+               break;
+
+       case EPOLL_CTL_DEL:
+               str = "deregister";
+               break;
+
+       default:
+               pr_err("Invalid op %d\n", op);
+               return -1;
+       }
+
+       pr_info("Doing a epoll %s for handler %s, fd %d\n", str,
                handler->name, handler->fd);
 
        ev.data.fd = handler->fd;
        ev.data.ptr = handler;
        ev.events = handler->events;
-       ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handler->fd, &ev);
+       ret = epoll_ctl(epoll_fd, op, handler->fd, &ev);
        if (ret) {
-               pr_err("Failed to add epoll_fd: %m\n");
+               pr_err("Failed to do epoll_ctl %s: %m\n", str);
                return -1;
        }
 
@@ -621,7 +701,7 @@ int _mutex_lock(struct mutex *lock, char *file, int line)
                goto out_lock;
 
        contended = 1;
-       pr_info("Lock contention at %s:%d on lock %s acquired by %s at %s:%d\n",
+       pr_debug("Lock contention at %s:%d on lock %s acquired by %s at %s:%d\n",
                file, line, lock->name,
                lock->owner_name, lock->file, lock->line);
 
@@ -632,7 +712,7 @@ int _mutex_lock(struct mutex *lock, char *file, int line)
 
 out_lock:
        if (contended)
-               pr_info("Lock %s acquired at %s:%d after contention\n",
+               pr_debug("Lock %s acquired at %s:%d after contention\n",
                        lock->name, file, line);
        _mutex_lock_acquired(lock, file, line);
        return ret;