]> git.itanic.dy.fi Git - rrdd/commitdiff
process: Implement event handling infrastructure
authorTimo Kokkonen <timo.t.kokkonen@iki.fi>
Thu, 11 Oct 2012 18:30:20 +0000 (21:30 +0300)
committerTimo Kokkonen <timo.t.kokkonen@iki.fi>
Thu, 11 Oct 2012 18:30:20 +0000 (21:30 +0300)
Implement proper event registration infrastucture for handling the
events coming in through the file descriptors. This makes it possible
to expand the polling code to arbitrary number of file descriptors.

All current users of the epoll have been converted to the new
infra. Some functions were moved higher in the file in order to make
it possible to reference them in the init_jobcontrol() function.

Signed-off-by: Timo Kokkonen <timo.t.kokkonen@iki.fi>
process.c
process.h

index 9864ec756996b1312b723ecac814418127d80e02..070b5f0920091ac393908e3fbd7e516f2f90f0fd 100644 (file)
--- a/process.c
+++ b/process.c
@@ -15,7 +15,6 @@ static int child_count;
 static int parent_count;
 static int job_request_fd[2];
 static int job_get_permission_fd[2];
-static int signal_fd;
 static int epoll_fd;
 static unsigned int max_jobs;
 static unsigned int job_count;
@@ -31,12 +30,12 @@ int get_parent_count(void)
        return parent_count;
 }
 
-static int handle_signals(void)
+static int handle_signals(struct event_handler *h)
 {
        struct signalfd_siginfo siginfo;
        int ret;
 
-       ret = read(signal_fd, &siginfo, sizeof(siginfo));
+       ret = read(h->fd, &siginfo, sizeof(siginfo));
        if (ret < sizeof(siginfo)) {
                pr_err("Expected %zd from read, got %d: %m\n",
                        sizeof(siginfo), ret);
@@ -53,6 +52,75 @@ static int handle_signals(void)
        return 0;
 }
 
+static int grant_new_job(void)
+{
+       int ret;
+       char byte = 0;
+
+       job_count++;
+       pr_info("Granting new job. %d jobs currently and %d pending\n",
+               job_count, jobs_pending);
+
+       ret = write(job_get_permission_fd[1], &byte, 1);
+       if (ret != 1) {
+               pr_err("Failed to write 1 byte: %m\n");
+               return -1;
+       }
+
+       return 0;
+}
+
+static int handle_job_request(struct event_handler *h)
+{
+       int ret, pid;
+
+       ret = read(job_request_fd[0], &pid, sizeof(pid));
+       if (ret < 0) {
+               pr_err("Failed to read: %m\n");
+               return -1;
+       }
+
+       if (ret == 0) {
+               pr_info("Read zero bytes\n");
+               return 0;
+       }
+
+       if (pid > 0) {
+               if (job_count >= max_jobs) {
+                       jobs_pending++;
+               } else {
+                       ret = grant_new_job();
+                       return 0;
+               }
+       } else if (pid < 0) {
+               if (job_count > max_jobs)
+                       pr_err("BUG: Job %u jobs exceeded limit %u\n",
+                               job_count, max_jobs);
+
+               pr_info("Job %d finished\n", -pid);
+               job_count--;
+               if (jobs_pending) {
+                       jobs_pending--;
+                       ret = grant_new_job();
+                       return 0;
+               }
+       }
+
+       return 0;
+}
+
+struct event_handler signal_handler = {
+       .handle_event = handle_signals,
+       .events = EPOLLIN,
+       .name = "signal",
+};
+
+struct event_handler job_request_handler = {
+       .handle_event = handle_job_request,
+       .events = EPOLLIN,
+       .name = "job_request",
+};
+
 /*
  * Initialize the jobcontrol.
  *
@@ -62,7 +130,6 @@ static int handle_signals(void)
  */
 int init_jobcontrol(int max_jobs_requested)
 {
-       struct epoll_event ev;
        FILE *file;
        int ret;
        sigset_t sigmask;
@@ -85,13 +152,8 @@ int init_jobcontrol(int max_jobs_requested)
                return -1;
        }
 
-       ev.events = EPOLLIN;
-       ev.data.fd = job_request_fd[0];
-       ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, job_request_fd[0], &ev);
-       if (ret) {
-               pr_err("Failed to add epoll_fd: %m\n");
-               return -1;
-       }
+       job_request_handler.fd = job_request_fd[0];
+       register_event_handler(&job_request_handler);
 
        sigemptyset(&sigmask);
        sigaddset(&sigmask, SIGCHLD);
@@ -102,19 +164,13 @@ int init_jobcontrol(int max_jobs_requested)
                pr_err("Failed to sigprocmask: %m\n");
        }
 
-       signal_fd = signalfd(-1, &sigmask, SFD_CLOEXEC);
-       if (signal_fd < 0) {
+       signal_handler.fd = signalfd(-1, &sigmask, SFD_CLOEXEC);
+       if (job_request_handler.fd < 0) {
                pr_err("Failed to create signal_fd: %m\n");
                return -1;
        }
 
-       ev.data.fd = signal_fd;
-       ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, signal_fd, &ev);
-       if (ret) {
-               pr_err("Failed to add epoll_fd: %m\n");
-               return -1;
-       }
-
+       register_event_handler(&signal_handler);
 
        if (max_jobs_requested > 0) {
                max_jobs = max_jobs_requested;
@@ -155,66 +211,10 @@ no_count_cpus:
        return 0;
 }
 
-static int grant_new_job(void)
-{
-       int ret;
-       char byte = 0;
-
-       job_count++;
-       pr_info("Granting new job. %d jobs currently and %d pending\n",
-               job_count, jobs_pending);
-
-       ret = write(job_get_permission_fd[1], &byte, 1);
-       if (ret != 1) {
-               pr_err("Failed to write 1 byte: %m\n");
-               return -1;
-       }
-
-       return 0;
-}
-
-static int handle_job_request(void)
-{
-       int ret, pid;
-
-       ret = read(job_request_fd[0], &pid, sizeof(pid));
-       if (ret < 0) {
-               pr_err("Failed to read: %m\n");
-               return -1;
-       }
-
-       if (ret == 0) {
-               pr_info("Read zero bytes\n");
-               return 0;
-       }
-
-       if (pid > 0) {
-               if (job_count >= max_jobs) {
-                       jobs_pending++;
-               } else {
-                       ret = grant_new_job();
-                       return 0;
-               }
-       } else if (pid < 0) {
-               if (job_count > max_jobs)
-                       pr_err("BUG: Job %u jobs exceeded limit %u\n",
-                               job_count, max_jobs);
-
-               pr_info("Job %d finished\n", -pid);
-               job_count--;
-               if (jobs_pending) {
-                       jobs_pending--;
-                       ret = grant_new_job();
-                       return 0;
-               }
-       }
-
-       return 0;
-}
-
 int poll_job_requests(int timeout)
 {
        struct epoll_event event;
+       struct event_handler *job_handler;
        int ret;
 
        /* Convert positive seconds to milliseconds */
@@ -240,12 +240,17 @@ int poll_job_requests(int timeout)
                goto out;
        }
 
-       if (event.data.fd == job_request_fd[0])
-               handle_job_request();
-       else if (event.data.fd == signal_fd)
-               handle_signals();
-       else
-               pr_err("Unknown fd: %d\n", event.data.fd);
+       job_handler = event.data.ptr;
+
+       if (!job_handler || !job_handler->handle_event) {
+               pr_err("Corrupted event handler for fd %d\n",
+                       event.data.fd);
+               goto out;
+       }
+
+       pr_info("Running handler %s to handle events from fd %d\n",
+               job_handler->name, job_handler->fd);
+       job_handler->handle_event(job_handler);
 
 out:
        pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
@@ -631,3 +636,33 @@ print:
        exit(1);
        return 0;
 }
+
+int register_event_handler(struct event_handler *handler)
+{
+       struct epoll_event ev;
+       int ret;
+
+       if (handler->fd <= 0) {
+               pr_err("Invalid file descriptor of %d\n", handler->fd);
+               return -1;
+       }
+
+       if (!handler->handle_event) {
+               pr_err("Handler callback missing\n");
+               return -1;
+       }
+
+       pr_info("Registering handler for %s, fd %d\n",
+               handler->name, handler->fd);
+
+       ev.data.fd = handler->fd;
+       ev.data.ptr = handler;
+       ev.events = handler->events;
+       ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handler->fd, &ev);
+       if (ret) {
+               pr_err("Failed to add epoll_fd: %m\n");
+               return -1;
+       }
+
+       return 0;
+}
index 7d0e45a543ebc21ecd2b5e74b23bffc7a751138d..ae9db12b97457f90e5d4256c0e42e3ba5ffdb506 100644 (file)
--- a/process.h
+++ b/process.h
@@ -8,6 +8,20 @@
 #include <sys/wait.h>
 #include <error.h>
 #include <errno.h>
+#include <stdint.h>
+
+struct event_handler;
+
+typedef int (handle_event_fn_t)(struct event_handler *);
+
+struct event_handler {
+       int fd;
+       uint32_t events;
+       handle_event_fn_t *handle_event;
+       char *name;
+};
+
+int register_event_handler(struct event_handler *handler);
 
 int get_child_count(void);
 int get_parent_count(void);