]> git.itanic.dy.fi Git - rrdd/blobdiff - process.c
process: Implement event handling infrastructure
[rrdd] / process.c
index 9864ec756996b1312b723ecac814418127d80e02..070b5f0920091ac393908e3fbd7e516f2f90f0fd 100644 (file)
--- a/process.c
+++ b/process.c
@@ -15,7 +15,6 @@ static int child_count;
 static int parent_count;
 static int job_request_fd[2];
 static int job_get_permission_fd[2];
-static int signal_fd;
 static int epoll_fd;
 static unsigned int max_jobs;
 static unsigned int job_count;
@@ -31,12 +30,12 @@ int get_parent_count(void)
        return parent_count;
 }
 
-static int handle_signals(void)
+static int handle_signals(struct event_handler *h)
 {
        struct signalfd_siginfo siginfo;
        int ret;
 
-       ret = read(signal_fd, &siginfo, sizeof(siginfo));
+       ret = read(h->fd, &siginfo, sizeof(siginfo));
        if (ret < sizeof(siginfo)) {
                pr_err("Expected %zd from read, got %d: %m\n",
                        sizeof(siginfo), ret);
@@ -53,6 +52,75 @@ static int handle_signals(void)
        return 0;
 }
 
+static int grant_new_job(void)
+{
+       int ret;
+       char byte = 0;
+
+       job_count++;
+       pr_info("Granting new job. %d jobs currently and %d pending\n",
+               job_count, jobs_pending);
+
+       ret = write(job_get_permission_fd[1], &byte, 1);
+       if (ret != 1) {
+               pr_err("Failed to write 1 byte: %m\n");
+               return -1;
+       }
+
+       return 0;
+}
+
+static int handle_job_request(struct event_handler *h)
+{
+       int ret, pid;
+
+       ret = read(job_request_fd[0], &pid, sizeof(pid));
+       if (ret < 0) {
+               pr_err("Failed to read: %m\n");
+               return -1;
+       }
+
+       if (ret == 0) {
+               pr_info("Read zero bytes\n");
+               return 0;
+       }
+
+       if (pid > 0) {
+               if (job_count >= max_jobs) {
+                       jobs_pending++;
+               } else {
+                       ret = grant_new_job();
+                       return 0;
+               }
+       } else if (pid < 0) {
+               if (job_count > max_jobs)
+                       pr_err("BUG: Job %u jobs exceeded limit %u\n",
+                               job_count, max_jobs);
+
+               pr_info("Job %d finished\n", -pid);
+               job_count--;
+               if (jobs_pending) {
+                       jobs_pending--;
+                       ret = grant_new_job();
+                       return 0;
+               }
+       }
+
+       return 0;
+}
+
+struct event_handler signal_handler = {
+       .handle_event = handle_signals,
+       .events = EPOLLIN,
+       .name = "signal",
+};
+
+struct event_handler job_request_handler = {
+       .handle_event = handle_job_request,
+       .events = EPOLLIN,
+       .name = "job_request",
+};
+
 /*
  * Initialize the jobcontrol.
  *
@@ -62,7 +130,6 @@ static int handle_signals(void)
  */
 int init_jobcontrol(int max_jobs_requested)
 {
-       struct epoll_event ev;
        FILE *file;
        int ret;
        sigset_t sigmask;
@@ -85,13 +152,8 @@ int init_jobcontrol(int max_jobs_requested)
                return -1;
        }
 
-       ev.events = EPOLLIN;
-       ev.data.fd = job_request_fd[0];
-       ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, job_request_fd[0], &ev);
-       if (ret) {
-               pr_err("Failed to add epoll_fd: %m\n");
-               return -1;
-       }
+       job_request_handler.fd = job_request_fd[0];
+       register_event_handler(&job_request_handler);
 
        sigemptyset(&sigmask);
        sigaddset(&sigmask, SIGCHLD);
@@ -102,19 +164,13 @@ int init_jobcontrol(int max_jobs_requested)
                pr_err("Failed to sigprocmask: %m\n");
        }
 
-       signal_fd = signalfd(-1, &sigmask, SFD_CLOEXEC);
-       if (signal_fd < 0) {
+       signal_handler.fd = signalfd(-1, &sigmask, SFD_CLOEXEC);
+       if (job_request_handler.fd < 0) {
                pr_err("Failed to create signal_fd: %m\n");
                return -1;
        }
 
-       ev.data.fd = signal_fd;
-       ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, signal_fd, &ev);
-       if (ret) {
-               pr_err("Failed to add epoll_fd: %m\n");
-               return -1;
-       }
-
+       register_event_handler(&signal_handler);
 
        if (max_jobs_requested > 0) {
                max_jobs = max_jobs_requested;
@@ -155,66 +211,10 @@ no_count_cpus:
        return 0;
 }
 
-static int grant_new_job(void)
-{
-       int ret;
-       char byte = 0;
-
-       job_count++;
-       pr_info("Granting new job. %d jobs currently and %d pending\n",
-               job_count, jobs_pending);
-
-       ret = write(job_get_permission_fd[1], &byte, 1);
-       if (ret != 1) {
-               pr_err("Failed to write 1 byte: %m\n");
-               return -1;
-       }
-
-       return 0;
-}
-
-static int handle_job_request(void)
-{
-       int ret, pid;
-
-       ret = read(job_request_fd[0], &pid, sizeof(pid));
-       if (ret < 0) {
-               pr_err("Failed to read: %m\n");
-               return -1;
-       }
-
-       if (ret == 0) {
-               pr_info("Read zero bytes\n");
-               return 0;
-       }
-
-       if (pid > 0) {
-               if (job_count >= max_jobs) {
-                       jobs_pending++;
-               } else {
-                       ret = grant_new_job();
-                       return 0;
-               }
-       } else if (pid < 0) {
-               if (job_count > max_jobs)
-                       pr_err("BUG: Job %u jobs exceeded limit %u\n",
-                               job_count, max_jobs);
-
-               pr_info("Job %d finished\n", -pid);
-               job_count--;
-               if (jobs_pending) {
-                       jobs_pending--;
-                       ret = grant_new_job();
-                       return 0;
-               }
-       }
-
-       return 0;
-}
-
 int poll_job_requests(int timeout)
 {
        struct epoll_event event;
+       struct event_handler *job_handler;
        int ret;
 
        /* Convert positive seconds to milliseconds */
@@ -240,12 +240,17 @@ int poll_job_requests(int timeout)
                goto out;
        }
 
-       if (event.data.fd == job_request_fd[0])
-               handle_job_request();
-       else if (event.data.fd == signal_fd)
-               handle_signals();
-       else
-               pr_err("Unknown fd: %d\n", event.data.fd);
+       job_handler = event.data.ptr;
+
+       if (!job_handler || !job_handler->handle_event) {
+               pr_err("Corrupted event handler for fd %d\n",
+                       event.data.fd);
+               goto out;
+       }
+
+       pr_info("Running handler %s to handle events from fd %d\n",
+               job_handler->name, job_handler->fd);
+       job_handler->handle_event(job_handler);
 
 out:
        pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
@@ -631,3 +636,33 @@ print:
        exit(1);
        return 0;
 }
+
+int register_event_handler(struct event_handler *handler)
+{
+       struct epoll_event ev;
+       int ret;
+
+       if (handler->fd <= 0) {
+               pr_err("Invalid file descriptor of %d\n", handler->fd);
+               return -1;
+       }
+
+       if (!handler->handle_event) {
+               pr_err("Handler callback missing\n");
+               return -1;
+       }
+
+       pr_info("Registering handler for %s, fd %d\n",
+               handler->name, handler->fd);
+
+       ev.data.fd = handler->fd;
+       ev.data.ptr = handler;
+       ev.events = handler->events;
+       ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handler->fd, &ev);
+       if (ret) {
+               pr_err("Failed to add epoll_fd: %m\n");
+               return -1;
+       }
+
+       return 0;
+}