+#define _GNU_SOURCE
#include <unistd.h>
+#include <fcntl.h>
#include <sys/select.h>
+#include <sys/epoll.h>
+#include <stdio.h>
#include "process.h"
#include "debug.h"
-static int child_count = 0;
-static int parent_count = 0;
-static int process_count = 0;
+static int child_count;
+static int parent_count;
+static int job_request_fd[2];
+static int job_get_permission_fd[2];
+static unsigned int max_jobs;
+static unsigned int job_count;
+static unsigned int jobs_pending;
int get_child_count(void)
{
return parent_count;
}
+static void sigchild_handler(int signal)
+{
+ pr_info("Child has died, go harvest the zombie\n");
+ harvest_zombies(0);
+}
+
+static int setup_sigchild_handler(void)
+{
+ struct sigaction sa;
+ int ret;
+
+ sa.sa_handler = sigchild_handler;
+ sa.sa_flags = SA_NOCLDSTOP;
+
+ ret = sigaction(SIGCHLD, &sa, NULL);
+ if (ret)
+ pr_err("Failed to setup SIGCHLD handler: %m\n");
+
+ return ret;
+}
+
+static int cancel_sigchild_handler(void)
+{
+ struct sigaction sa;
+ int ret;
+
+ sa.sa_handler = SIG_DFL;
+ sa.sa_flags = SA_NOCLDSTOP;
+
+ ret = sigaction(SIGCHLD, &sa, NULL);
+ if (ret)
+ pr_err("Failed to cancel SIGCHLD handler: %m\n");
+
+ return ret;
+}
+
+/*
+ * Initialize the jobcontrol.
+ *
+ * Create the pipes that are used to grant children execution
+ * permissions. If max_jobs is zero, count the CPUs from /proc/cpuinfo
+ * and use that.
+ */
+int init_max_jobs(int max_jobs_requested)
+{
+ FILE *file;
+ int ret;
+ char buf[256];
+ char match[8];
+
+ if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
+ pr_err("Failed to create pipe: %m\n");
+ return -1;
+ }
+
+ if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
+ pr_err("Failed to create pipe: %m\n");
+ return -1;
+ }
+
+ ret = setup_sigchild_handler();
+ if (ret)
+ return ret;
+
+ if (max_jobs_requested > 0) {
+ max_jobs = max_jobs_requested;
+ goto no_count_cpus;
+ }
+ max_jobs++;
+
+ file = fopen("/proc/cpuinfo", "ro");
+ if (!file) {
+ pr_err("Failed to open /proc/cpuinfo: %m\n");
+ goto open_fail;
+ }
+
+ /*
+ * The CPU count algorithm simply reads the first 8 bytes from
+ * the /proc/cpuinfo and then expects that line to be there as
+ * many times as there are CPUs.
+ */
+ ret = fread(match, 1, sizeof(match), file);
+ if (ret < sizeof(match)) {
+ pr_err("read %d bytes when expecting %zd %m\n",
+ ret, sizeof(match));
+ goto read_fail;
+ }
+
+ while(fgets(buf, sizeof(buf), file)) {
+ if (!strncmp(buf, match, sizeof(match)))
+ max_jobs++;
+ }
+
+open_fail:
+read_fail:
+ fclose(file);
+
+no_count_cpus:
+ pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
+
+ return 0;
+}
+
+static int grant_new_job(void)
+{
+ int ret;
+ char byte = 0;
+
+ job_count++;
+ pr_info("Granting new job. %d jobs currently and %d pending\n",
+ job_count, jobs_pending);
+
+ ret = write(job_get_permission_fd[1], &byte, 1);
+ if (ret != 1) {
+ pr_err("Failed to write 1 byte: %m\n");
+ return -1;
+ }
+
+ return 0;
+}
+
+int poll_job_requests(int timeout)
+{
+ struct epoll_event event;
+ static int epollfd;
+ int ret;
+ int pid;
+
+ if (!epollfd) {
+ struct epoll_event ev;
+
+ epollfd = epoll_create(1);
+ if (epollfd == -1) {
+ pr_err("Failed to epoll_create(): %m\n");
+ return -1;
+ }
+
+ ev.events = EPOLLIN;
+ ev.data.fd = job_request_fd[0];
+ epoll_ctl(epollfd, EPOLL_CTL_ADD, job_request_fd[0], &ev);
+ }
+
+ /* Convert positive seconds to milliseconds */
+ timeout = timeout > 0 ? 1000 * timeout : timeout;
+
+ ret = epoll_wait(epollfd, &event, 1, timeout);
+
+ if (ret == -1) {
+ if (errno != EINTR) {
+ pr_err("epoll_wait: %m\n");
+ return -1;
+ }
+
+ /*
+ * If epoll_wait() was interrupted, better start
+ * everything again from the beginning
+ */
+ return 0;
+ }
+
+ if (ret == 0) {
+ pr_info("Timed out\n");
+ goto out;
+ }
+
+ ret = read(event.data.fd, &pid, sizeof(pid));
+ if (ret < 0) {
+ pr_err("Failed to read: %m\n");
+ return -1;
+ }
+
+ if (ret == 0) {
+ pr_info("Read zero bytes\n");
+ return 0;
+ }
+
+ if (pid > 0) {
+ if (job_count >= max_jobs) {
+ jobs_pending++;
+ } else {
+ ret = grant_new_job();
+ goto out;
+ }
+ } else if (pid < 0) {
+ if (job_count > max_jobs)
+ pr_err("BUG: Job %u jobs exceeded limit %u\n",
+ job_count, max_jobs);
+
+ pr_info("Job %d finished\n", -pid);
+ job_count--;
+ if (jobs_pending) {
+ jobs_pending--;
+ ret = grant_new_job();
+ goto out;
+ }
+ }
+
+out:
+ pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
+ return ret;
+}
+
+/*
+ * Per process flag indicating whether this child has requested fork
+ * limiting. If it has, it must also tell the master parent when it
+ * has died so that the parent can give next pending job permission to
+ * go.
+ */
+static int is_limited_fork;
+
int do_fork(void)
{
int child;
return child;
}
+ /*
+ * Child processes may want to use waitpid() for synchronizing
+ * with their sub-childs. Disable the signal handler by
+ * default
+ */
+ cancel_sigchild_handler();
+
+ /*
+ * Also do not notify the master parent the death of this
+ * child. Only childs that have been created with
+ * do_fork_limited() can have this flag set.
+ */
+ is_limited_fork = 0;
+
/* reset child's child count */
child_count = 0;
parent_count++;
return 0;
}
+static int request_fork(char request)
+{
+ int pid = getpid();
+
+ pid = request > 0 ? pid : -pid;
+
+ return write(job_request_fd[1], &pid, sizeof(pid));
+}
+
+static void limited_fork_exit_handler(void)
+{
+ if (is_limited_fork)
+ request_fork(-1);
+}
+
+/*
+ * Like do_fork(), but allow the child continue only after the global
+ * job count is low enough.
+ *
+ * We allow the parent to continue other more important activities but
+ * child respects the limit of global active processes.
+ */
+int do_fork_limited(void)
+{
+ int child, ret;
+ char byte;
+
+ child = do_fork();
+ if (child)
+ return child;
+
+ /* Remember to notify the parent when we are done */
+ atexit(limited_fork_exit_handler);
+ is_limited_fork = 1;
+
+ pr_info("Requesting permission to go\n");
+
+ /* Signal the parent that we are here, waiting to go */
+ request_fork(1);
+
+ /*
+ * The parent will tell us when we can continue. If there were
+ * multiple children waiting for their turn to run parent's
+ * write to the pipe will wake up every process reading
+ * it. However, only one will be reading the content and
+ * getting the permission to run. So we will read as many
+ * times as needed until we get our own permission to run.
+ */
+ do {
+ ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
+ if (ret == 1)
+ break;
+
+ } while(1);
+
+ pr_info("Continuing\n");
+ return child;
+}
+
int harvest_zombies(int pid)
{
int status;