From 383b7c14f291f7e169852a7c94c3f46286f56f94 Mon Sep 17 00:00:00 2001 From: Timo Kokkonen Date: Thu, 11 Oct 2012 21:30:20 +0300 Subject: [PATCH] process: Implement event handling infrastructure Implement proper event registration infrastucture for handling the events coming in through the file descriptors. This makes it possible to expand the polling code to arbitrary number of file descriptors. All current users of the epoll have been converted to the new infra. Some functions were moved higher in the file in order to make it possible to reference them in the init_jobcontrol() function. Signed-off-by: Timo Kokkonen --- process.c | 201 ++++++++++++++++++++++++++++++++---------------------- process.h | 14 ++++ 2 files changed, 132 insertions(+), 83 deletions(-) diff --git a/process.c b/process.c index 9864ec7..070b5f0 100644 --- a/process.c +++ b/process.c @@ -15,7 +15,6 @@ static int child_count; static int parent_count; static int job_request_fd[2]; static int job_get_permission_fd[2]; -static int signal_fd; static int epoll_fd; static unsigned int max_jobs; static unsigned int job_count; @@ -31,12 +30,12 @@ int get_parent_count(void) return parent_count; } -static int handle_signals(void) +static int handle_signals(struct event_handler *h) { struct signalfd_siginfo siginfo; int ret; - ret = read(signal_fd, &siginfo, sizeof(siginfo)); + ret = read(h->fd, &siginfo, sizeof(siginfo)); if (ret < sizeof(siginfo)) { pr_err("Expected %zd from read, got %d: %m\n", sizeof(siginfo), ret); @@ -53,6 +52,75 @@ static int handle_signals(void) return 0; } +static int grant_new_job(void) +{ + int ret; + char byte = 0; + + job_count++; + pr_info("Granting new job. %d jobs currently and %d pending\n", + job_count, jobs_pending); + + ret = write(job_get_permission_fd[1], &byte, 1); + if (ret != 1) { + pr_err("Failed to write 1 byte: %m\n"); + return -1; + } + + return 0; +} + +static int handle_job_request(struct event_handler *h) +{ + int ret, pid; + + ret = read(job_request_fd[0], &pid, sizeof(pid)); + if (ret < 0) { + pr_err("Failed to read: %m\n"); + return -1; + } + + if (ret == 0) { + pr_info("Read zero bytes\n"); + return 0; + } + + if (pid > 0) { + if (job_count >= max_jobs) { + jobs_pending++; + } else { + ret = grant_new_job(); + return 0; + } + } else if (pid < 0) { + if (job_count > max_jobs) + pr_err("BUG: Job %u jobs exceeded limit %u\n", + job_count, max_jobs); + + pr_info("Job %d finished\n", -pid); + job_count--; + if (jobs_pending) { + jobs_pending--; + ret = grant_new_job(); + return 0; + } + } + + return 0; +} + +struct event_handler signal_handler = { + .handle_event = handle_signals, + .events = EPOLLIN, + .name = "signal", +}; + +struct event_handler job_request_handler = { + .handle_event = handle_job_request, + .events = EPOLLIN, + .name = "job_request", +}; + /* * Initialize the jobcontrol. * @@ -62,7 +130,6 @@ static int handle_signals(void) */ int init_jobcontrol(int max_jobs_requested) { - struct epoll_event ev; FILE *file; int ret; sigset_t sigmask; @@ -85,13 +152,8 @@ int init_jobcontrol(int max_jobs_requested) return -1; } - ev.events = EPOLLIN; - ev.data.fd = job_request_fd[0]; - ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, job_request_fd[0], &ev); - if (ret) { - pr_err("Failed to add epoll_fd: %m\n"); - return -1; - } + job_request_handler.fd = job_request_fd[0]; + register_event_handler(&job_request_handler); sigemptyset(&sigmask); sigaddset(&sigmask, SIGCHLD); @@ -102,19 +164,13 @@ int init_jobcontrol(int max_jobs_requested) pr_err("Failed to sigprocmask: %m\n"); } - signal_fd = signalfd(-1, &sigmask, SFD_CLOEXEC); - if (signal_fd < 0) { + signal_handler.fd = signalfd(-1, &sigmask, SFD_CLOEXEC); + if (job_request_handler.fd < 0) { pr_err("Failed to create signal_fd: %m\n"); return -1; } - ev.data.fd = signal_fd; - ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, signal_fd, &ev); - if (ret) { - pr_err("Failed to add epoll_fd: %m\n"); - return -1; - } - + register_event_handler(&signal_handler); if (max_jobs_requested > 0) { max_jobs = max_jobs_requested; @@ -155,66 +211,10 @@ no_count_cpus: return 0; } -static int grant_new_job(void) -{ - int ret; - char byte = 0; - - job_count++; - pr_info("Granting new job. %d jobs currently and %d pending\n", - job_count, jobs_pending); - - ret = write(job_get_permission_fd[1], &byte, 1); - if (ret != 1) { - pr_err("Failed to write 1 byte: %m\n"); - return -1; - } - - return 0; -} - -static int handle_job_request(void) -{ - int ret, pid; - - ret = read(job_request_fd[0], &pid, sizeof(pid)); - if (ret < 0) { - pr_err("Failed to read: %m\n"); - return -1; - } - - if (ret == 0) { - pr_info("Read zero bytes\n"); - return 0; - } - - if (pid > 0) { - if (job_count >= max_jobs) { - jobs_pending++; - } else { - ret = grant_new_job(); - return 0; - } - } else if (pid < 0) { - if (job_count > max_jobs) - pr_err("BUG: Job %u jobs exceeded limit %u\n", - job_count, max_jobs); - - pr_info("Job %d finished\n", -pid); - job_count--; - if (jobs_pending) { - jobs_pending--; - ret = grant_new_job(); - return 0; - } - } - - return 0; -} - int poll_job_requests(int timeout) { struct epoll_event event; + struct event_handler *job_handler; int ret; /* Convert positive seconds to milliseconds */ @@ -240,12 +240,17 @@ int poll_job_requests(int timeout) goto out; } - if (event.data.fd == job_request_fd[0]) - handle_job_request(); - else if (event.data.fd == signal_fd) - handle_signals(); - else - pr_err("Unknown fd: %d\n", event.data.fd); + job_handler = event.data.ptr; + + if (!job_handler || !job_handler->handle_event) { + pr_err("Corrupted event handler for fd %d\n", + event.data.fd); + goto out; + } + + pr_info("Running handler %s to handle events from fd %d\n", + job_handler->name, job_handler->fd); + job_handler->handle_event(job_handler); out: pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending); @@ -631,3 +636,33 @@ print: exit(1); return 0; } + +int register_event_handler(struct event_handler *handler) +{ + struct epoll_event ev; + int ret; + + if (handler->fd <= 0) { + pr_err("Invalid file descriptor of %d\n", handler->fd); + return -1; + } + + if (!handler->handle_event) { + pr_err("Handler callback missing\n"); + return -1; + } + + pr_info("Registering handler for %s, fd %d\n", + handler->name, handler->fd); + + ev.data.fd = handler->fd; + ev.data.ptr = handler; + ev.events = handler->events; + ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handler->fd, &ev); + if (ret) { + pr_err("Failed to add epoll_fd: %m\n"); + return -1; + } + + return 0; +} diff --git a/process.h b/process.h index 7d0e45a..ae9db12 100644 --- a/process.h +++ b/process.h @@ -8,6 +8,20 @@ #include #include #include +#include + +struct event_handler; + +typedef int (handle_event_fn_t)(struct event_handler *); + +struct event_handler { + int fd; + uint32_t events; + handle_event_fn_t *handle_event; + char *name; +}; + +int register_event_handler(struct event_handler *handler); int get_child_count(void); int get_parent_count(void); -- 2.45.0