]> git.itanic.dy.fi Git - rrdd/commitdiff
process.c: Implement support for limiting number of active processes
authorTimo Kokkonen <timo.t.kokkonen@iki.fi>
Sun, 15 Apr 2012 18:08:26 +0000 (21:08 +0300)
committerTimo Kokkonen <timo.t.kokkonen@iki.fi>
Sun, 15 Apr 2012 18:17:31 +0000 (21:17 +0300)
This will make it possible to have only limited number of active job
processes runnint at given time. These can be requested by calling
do_fork_limited(), which works otherwise similarly to do_fork() but
the child process will not start running until the main parent has
given the child permission to run.

The job controlling is implemented via pipes between the parent and
the children. The child which wish to limit the number of processes
will send its pid to the parent. The master parent will keep count of
all the processes running. If the number of active processes grows too
high, no new jobs are granted until fewer processes are running. Once
the parent decides that a new job can become active, it will write one
byte to a pipe. The child which reads the byte out is the one who has
the right to execute. Other children reading the same pipe are left
waiting for their turn.

Signed-off-by: Timo Kokkonen <timo.t.kokkonen@iki.fi>
process.c
process.h

index 01f8a7aa666995f5a0e624d5f7480e0bcfd7ccb3..ac43a34a27e19458ffec9f94a8cd7a2e4df6696e 100644 (file)
--- a/process.c
+++ b/process.c
@@ -1,12 +1,20 @@
+#define _GNU_SOURCE
 #include <unistd.h>
+#include <fcntl.h>
 #include <sys/select.h>
+#include <sys/epoll.h>
+#include <stdio.h>
 
 #include "process.h"
 #include "debug.h"
 
-static int child_count = 0;
-static int parent_count = 0;
-static int process_count = 0;
+static int child_count;
+static int parent_count;
+static int job_request_fd[2];
+static int job_get_permission_fd[2];
+static unsigned int max_jobs;
+static unsigned int job_count;
+static unsigned int jobs_pending;
 
 int get_child_count(void)
 {
@@ -18,6 +26,216 @@ int get_parent_count(void)
        return parent_count;
 }
 
+static void sigchild_handler(int signal)
+{
+       pr_info("Child has died, go harvest the zombie\n");
+       harvest_zombies(0);
+}
+
+static int setup_sigchild_handler(void)
+{
+       struct sigaction sa;
+       int ret;
+
+       sa.sa_handler = sigchild_handler;
+       sa.sa_flags = SA_NOCLDSTOP;
+
+       ret = sigaction(SIGCHLD, &sa, NULL);
+       if (ret)
+               pr_err("Failed to setup SIGCHLD handler: %m\n");
+
+       return ret;
+}
+
+static int cancel_sigchild_handler(void)
+{
+       struct sigaction sa;
+       int ret;
+
+       sa.sa_handler = SIG_DFL;
+       sa.sa_flags = SA_NOCLDSTOP;
+
+       ret = sigaction(SIGCHLD, &sa, NULL);
+       if (ret)
+               pr_err("Failed to cancel SIGCHLD handler: %m\n");
+
+       return ret;
+}
+
+/*
+ * Initialize the jobcontrol.
+ *
+ * Create the pipes that are used to grant children execution
+ * permissions. If max_jobs is zero, count the CPUs from /proc/cpuinfo
+ * and use that.
+ */
+int init_max_jobs(int max_jobs_requested)
+{
+       FILE *file;
+       int ret;
+       char buf[256];
+       char match[8];
+
+       if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
+               pr_err("Failed to create pipe: %m\n");
+               return -1;
+       }
+
+       if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
+               pr_err("Failed to create pipe: %m\n");
+               return -1;
+       }
+
+       ret = setup_sigchild_handler();
+       if (ret)
+               return ret;
+
+       if (max_jobs_requested > 0) {
+               max_jobs = max_jobs_requested;
+               goto no_count_cpus;
+       }
+       max_jobs++;
+
+       file = fopen("/proc/cpuinfo", "ro");
+       if (!file) {
+               pr_err("Failed to open /proc/cpuinfo: %m\n");
+               goto open_fail;
+       }
+
+       /*
+        * The CPU count algorithm simply reads the first 8 bytes from
+        * the /proc/cpuinfo and then expects that line to be there as
+        * many times as there are CPUs.
+        */
+       ret = fread(match, 1, sizeof(match), file);
+       if (ret < sizeof(match)) {
+               pr_err("read %d bytes when expecting %zd %m\n",
+                       ret, sizeof(match));
+               goto read_fail;
+       }
+
+       while(fgets(buf, sizeof(buf), file)) {
+               if (!strncmp(buf, match, sizeof(match)))
+                       max_jobs++;
+       }
+
+open_fail:
+read_fail:
+       fclose(file);
+
+no_count_cpus:
+       pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
+
+       return 0;
+}
+
+static int grant_new_job(void)
+{
+       int ret;
+       char byte = 0;
+
+       job_count++;
+       pr_info("Granting new job. %d jobs currently and %d pending\n",
+               job_count, jobs_pending);
+
+       ret = write(job_get_permission_fd[1], &byte, 1);
+       if (ret != 1) {
+               pr_err("Failed to write 1 byte: %m\n");
+               return -1;
+       }
+
+       return 0;
+}
+
+int poll_job_requests(int timeout)
+{
+       struct epoll_event event;
+       static int epollfd;
+       int ret;
+       int pid;
+
+       if (!epollfd) {
+               struct epoll_event ev;
+
+               epollfd = epoll_create(1);
+               if (epollfd == -1) {
+                       pr_err("Failed to epoll_create(): %m\n");
+                       return -1;
+               }
+
+               ev.events = EPOLLIN;
+               ev.data.fd = job_request_fd[0];
+               epoll_ctl(epollfd, EPOLL_CTL_ADD, job_request_fd[0], &ev);
+       }
+
+       /* Convert positive seconds to milliseconds */
+       timeout = timeout > 0 ? 1000 * timeout : timeout;
+
+       ret = epoll_wait(epollfd, &event, 1, timeout);
+
+       if (ret == -1) {
+               if (errno != EINTR) {
+                       pr_err("epoll_wait: %m\n");
+                       return -1;
+               }
+
+               /*
+                * If epoll_wait() was interrupted, better start
+                * everything again from the beginning
+                */
+               return 0;
+       }
+
+       if (ret == 0) {
+               pr_info("Timed out\n");
+               goto out;
+       }
+
+       ret = read(event.data.fd, &pid, sizeof(pid));
+       if (ret < 0) {
+               pr_err("Failed to read: %m\n");
+               return -1;
+       }
+
+       if (ret == 0) {
+               pr_info("Read zero bytes\n");
+               return 0;
+       }
+
+       if (pid > 0) {
+               if (job_count >= max_jobs) {
+                       jobs_pending++;
+               } else {
+                       ret = grant_new_job();
+                       goto out;
+               }
+       } else if (pid < 0) {
+               if (job_count > max_jobs)
+                       pr_err("BUG: Job %u jobs exceeded limit %u\n",
+                               job_count, max_jobs);
+
+               pr_info("Job %d finished\n", -pid);
+               job_count--;
+               if (jobs_pending) {
+                       jobs_pending--;
+                       ret = grant_new_job();
+                       goto out;
+               }
+       }
+
+out:
+       pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
+       return ret;
+}
+
+/*
+ * Per process flag indicating whether this child has requested fork
+ * limiting. If it has, it must also tell the master parent when it
+ * has died so that the parent can give next pending job permission to
+ * go.
+ */
+static int is_limited_fork;
+
 int do_fork(void)
 {
        int child;
@@ -33,12 +251,85 @@ int do_fork(void)
                return child;
        }
 
+       /*
+        * Child processes may want to use waitpid() for synchronizing
+        * with their sub-childs. Disable the signal handler by
+        * default
+        */
+       cancel_sigchild_handler();
+
+       /*
+        * Also do not notify the master parent the death of this
+        * child. Only childs that have been created with
+        * do_fork_limited() can have this flag set.
+        */
+       is_limited_fork = 0;
+
        /* reset child's child count */
        child_count = 0;
        parent_count++;
        return 0;
 }
 
+static int request_fork(char request)
+{
+       int pid = getpid();
+
+       pid = request > 0 ? pid : -pid;
+
+       return write(job_request_fd[1], &pid, sizeof(pid));
+}
+
+static void limited_fork_exit_handler(void)
+{
+       if (is_limited_fork)
+               request_fork(-1);
+}
+
+/*
+ * Like do_fork(), but allow the child continue only after the global
+ * job count is low enough.
+ *
+ * We allow the parent to continue other more important activities but
+ * child respects the limit of global active processes.
+ */
+int do_fork_limited(void)
+{
+       int child, ret;
+       char byte;
+
+       child = do_fork();
+       if (child)
+               return child;
+
+       /* Remember to notify the parent when we are done */
+       atexit(limited_fork_exit_handler);
+       is_limited_fork = 1;
+
+       pr_info("Requesting permission to go\n");
+
+       /* Signal the parent that we are here, waiting to go */
+       request_fork(1);
+
+       /*
+        * The parent will tell us when we can continue. If there were
+        * multiple children waiting for their turn to run parent's
+        * write to the pipe will wake up every process reading
+        * it. However, only one will be reading the content and
+        * getting the permission to run. So we will read as many
+        * times as needed until we get our own permission to run.
+        */
+       do {
+               ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
+               if (ret == 1)
+                       break;
+
+       } while(1);
+
+       pr_info("Continuing\n");
+       return child;
+}
+
 int harvest_zombies(int pid)
 {
        int status;
index 9b8e40c0f272d6b5c41976c1eab93a8dee130392..6b5e33e7e7cfb8e80f496b8605dc4c3f48b61dcd 100644 (file)
--- a/process.h
+++ b/process.h
 int get_child_count(void);
 int get_parent_count(void);
 
+int init_max_jobs(int max_jobs_requested);
+int poll_job_requests(int timeout);
 int do_fork(void);
+int do_fork_limited(void);
 int run(const char *p, char *const argv[]);
 int harvest_zombies(int pid);
 int run_piped(const char *cmd, char *const argv[],