]> git.itanic.dy.fi Git - rrdd/blobdiff - process.c
process: Remove fork based concurrenct management
[rrdd] / process.c
index 4b19b97e3ccfa1f589d5f2cc6d9dbd26682b70c6..7e739210f5f9829c1e70bed700935611820cd4cc 100644 (file)
--- a/process.c
+++ b/process.c
-#define _GNU_SOURCE
 #include <unistd.h>
 #include <fcntl.h>
 #include <sys/select.h>
 #include <sys/epoll.h>
 #include <stdio.h>
+#include <sys/wait.h>
+#include <sys/signalfd.h>
+#include <sys/resource.h>
 
 #include "process.h"
 #include "debug.h"
 
-static int child_count;
-static int parent_count;
-static int job_request_fd[2];
-static int job_get_permission_fd[2];
 static int epoll_fd;
 static unsigned int max_jobs;
 static unsigned int job_count;
 static unsigned int jobs_pending;
-
-int get_child_count(void)
+static unsigned int max_jobs_pending;
+
+struct work_struct {
+       const char *name;
+       int (*work_fn)(void *);
+       void *arg;
+       struct work_struct *next;
+};
+
+struct work_queue {
+       struct work_struct *work;
+       int length;
+       char *name;
+       struct mutex lock;
+};
+
+struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
+       {
+               .name = "high priority",
+               .lock = {
+                       .name = "high_prio_queue",
+                       .lock = PTHREAD_MUTEX_INITIALIZER,
+               },
+       },
+       {
+               .name = "low priority",
+               .lock = {
+                       .name = "low_prio_queue",
+                       .lock = PTHREAD_MUTEX_INITIALIZER,
+               },
+       },
+};
+
+struct mutex work_pending_mutex = {
+       .name = "work_pending",
+       .lock = PTHREAD_MUTEX_INITIALIZER,
+};
+pthread_cond_t work_pending_cond = PTHREAD_COND_INITIALIZER;
+
+static int run_work_on_queue(struct work_queue *queue)
 {
-       return child_count;
-}
+       struct work_struct *work;
 
-int get_parent_count(void)
-{
-       return parent_count;
-}
+       mutex_lock(&queue->lock);
 
-static void sigchild_handler(int signal)
-{
-       int status;
+       if (!queue->work) {
+               mutex_unlock(&queue->lock);
+               return 0;
+       }
 
-       waitpid(0, &status, 0);
+       /* Take next work */
+       work = queue->work;
+       queue->work = work->next;
+       queue->length--;
+
+       /*
+        * If queue is not empty, try waking up more workers. It is
+        * possible that when work were queued, the first worker did
+        * not wake up soon enough and
+        */
+       if (queue->length > 0)
+               pthread_cond_signal(&work_pending_cond);
+
+       mutex_unlock(&queue->lock);
+
+       pr_info("Executing work %s from queue %s, %d still pending\n",
+               work->name, queue->name, queue->length);
+
+       work->work_fn(work->arg);
+       pr_info("Work %s done\n", work->name);
+       free(work);
+
+       return 1;
 }
 
-static int setup_sigchild_handler(void)
+static void *worker_thread(void *arg)
 {
-       struct sigaction sa;
        int ret;
 
-       sa.sa_handler = sigchild_handler;
-       sa.sa_flags = SA_NOCLDSTOP;
+       char name[16];
+
+       snprintf(name, sizeof(name), "worker%ld", (long)arg);
+       pthread_setname_np(pthread_self(), name);
+
+       while (1) {
+               while (1) {
+                       int prio, work_done = 0;
+
+                       /*
+                        * Execute as many works from the queues as
+                        * there are, starting from highest priority
+                        * queue
+                        */
+                       for (prio = 0; prio < WORK_PRIORITIES_NUM; prio++) {
+                               work_done =
+                                       run_work_on_queue(&work_queues[prio]);
+                               if (work_done)
+                                       break;
+                       }
 
-       ret = sigaction(SIGCHLD, &sa, NULL);
-       if (ret)
-               pr_err("Failed to setup SIGCHLD handler: %m\n");
+                       if (!work_done)
+                               break;
+               }
 
-       return ret;
+               pr_info("Worker going to sleep\n");
+               ret = pthread_cond_wait(&work_pending_cond,
+                                       &work_pending_mutex.lock);
+               if (ret < 0)
+                       pr_err("Error: %m\n");
+
+               mutex_lock_acquired(&work_pending_mutex);
+
+               mutex_unlock(&work_pending_mutex);
+
+       }
+
+       return NULL;
 }
 
-static int cancel_sigchild_handler(void)
+int queue_work(unsigned int priority, char *name,
+       int (work_fn)(void *arg), void *arg)
 {
-       struct sigaction sa;
-       int ret;
+       struct work_queue *queue;
+       struct work_struct *work, *last_work;
 
-       sa.sa_handler = SIG_DFL;
-       sa.sa_flags = SA_NOCLDSTOP;
+       if (priority >= WORK_PRIORITIES_NUM) {
+               pr_err("Invalid priority: %d\n", priority);
+               return -EINVAL;
+       }
 
-       ret = sigaction(SIGCHLD, &sa, NULL);
-       if (ret)
-               pr_err("Failed to cancel SIGCHLD handler: %m\n");
+       work = calloc(sizeof(*work), 1);
 
-       return ret;
+       work->name = name;
+       work->work_fn = work_fn;
+       work->arg = arg;
+
+       queue = &work_queues[priority];
+
+       /* Insert new work at the end of the work queue */
+       mutex_lock(&queue->lock);
+
+       last_work = queue->work;
+       while (last_work && last_work->next)
+               last_work = last_work->next;
+
+       if (!last_work)
+               queue->work = work;
+       else
+               last_work->next = work;
+
+       pr_info("Inserted work %s in queue %s, with %d pending items\n",
+               work->name, queue->name, queue->length);
+       queue->length++;
+       mutex_unlock(&queue->lock);
+
+       pthread_cond_signal(&work_pending_cond);
+
+       return 0;
 }
 
 /*
@@ -73,25 +183,12 @@ static int cancel_sigchild_handler(void)
  */
 int init_jobcontrol(int max_jobs_requested)
 {
-       struct epoll_event ev;
        FILE *file;
        int ret;
        char buf[256];
        char match[8];
-
-       if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
-               pr_err("Failed to create pipe: %m\n");
-               return -1;
-       }
-
-       if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
-               pr_err("Failed to create pipe: %m\n");
-               return -1;
-       }
-
-       ret = setup_sigchild_handler();
-       if (ret)
-               return ret;
+       pthread_t *thread;
+       long int i;
 
        epoll_fd = epoll_create(1);
        if (epoll_fd == -1) {
@@ -99,14 +196,6 @@ int init_jobcontrol(int max_jobs_requested)
                return -1;
        }
 
-       ev.events = EPOLLIN;
-       ev.data.fd = job_request_fd[0];
-       ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, job_request_fd[0], &ev);
-       if (ret) {
-               pr_err("Failed to add epoll_fd: %m\n");
-               return -1;
-       }
-
        if (max_jobs_requested > 0) {
                max_jobs = max_jobs_requested;
                goto no_count_cpus;
@@ -143,23 +232,13 @@ read_fail:
 no_count_cpus:
        pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
 
-       return 0;
-}
-
-static int grant_new_job(void)
-{
-       int ret;
-       char byte = 0;
-
-       job_count++;
-       pr_info("Granting new job. %d jobs currently and %d pending\n",
-               job_count, jobs_pending);
+       max_jobs_pending = max_jobs * 10 + 25;
+       pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
 
-       ret = write(job_get_permission_fd[1], &byte, 1);
-       if (ret != 1) {
-               pr_err("Failed to write 1 byte: %m\n");
-               return -1;
-       }
+       /* Create worker threads */
+       thread = calloc(sizeof(*thread), max_jobs);
+       for (i = 0; i < max_jobs; i++)
+               pthread_create(&thread[i], NULL, worker_thread, (void *)i);
 
        return 0;
 }
@@ -167,8 +246,8 @@ static int grant_new_job(void)
 int poll_job_requests(int timeout)
 {
        struct epoll_event event;
+       struct event_handler *job_handler;
        int ret;
-       int pid;
 
        /* Convert positive seconds to milliseconds */
        timeout = timeout > 0 ? 1000 * timeout : timeout;
@@ -193,51 +272,23 @@ int poll_job_requests(int timeout)
                goto out;
        }
 
-       ret = read(event.data.fd, &pid, sizeof(pid));
-       if (ret < 0) {
-               pr_err("Failed to read: %m\n");
-               return -1;
-       }
+       job_handler = event.data.ptr;
 
-       if (ret == 0) {
-               pr_info("Read zero bytes\n");
-               return 0;
+       if (!job_handler || !job_handler->handle_event) {
+               pr_err("Corrupted event handler for fd %d\n",
+                       event.data.fd);
+               goto out;
        }
 
-       if (pid > 0) {
-               if (job_count >= max_jobs) {
-                       jobs_pending++;
-               } else {
-                       ret = grant_new_job();
-                       goto out;
-               }
-       } else if (pid < 0) {
-               if (job_count > max_jobs)
-                       pr_err("BUG: Job %u jobs exceeded limit %u\n",
-                               job_count, max_jobs);
-
-               pr_info("Job %d finished\n", -pid);
-               job_count--;
-               if (jobs_pending) {
-                       jobs_pending--;
-                       ret = grant_new_job();
-                       goto out;
-               }
-       }
+       pr_debug("Running handler %s to handle events from fd %d\n",
+               job_handler->name, job_handler->fd);
+       job_handler->handle_event(job_handler);
 
 out:
        pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
        return ret;
 }
 
-/*
- * Per process flag indicating whether this child has requested fork
- * limiting. If it has, it must also tell the master parent when it
- * has died so that the parent can give next pending job permission to
- * go.
- */
-static int is_limited_fork;
-
 int do_fork(void)
 {
        int child;
@@ -248,111 +299,44 @@ int do_fork(void)
        }
 
        if (child) {
-               child_count++;
-               pr_info("Fork %d, child %d\n", child_count, child);
+               pr_debug("Fork child %d\n", child);
                return child;
        }
 
-       /*
-        * Child processes may want to use waitpid() for synchronizing
-        * with their sub-childs. Disable the signal handler by
-        * default
-        */
-       cancel_sigchild_handler();
-
-       /*
-        * Also do not notify the master parent the death of this
-        * child. Only childs that have been created with
-        * do_fork_limited() can have this flag set.
-        */
-       is_limited_fork = 0;
-
-       /* reset child's child count */
-       child_count = 0;
-       parent_count++;
        return 0;
 }
 
-static int request_fork(char request)
-{
-       int pid = getpid();
-
-       pid = request > 0 ? pid : -pid;
-
-       return write(job_request_fd[1], &pid, sizeof(pid));
-}
-
-static void limited_fork_exit_handler(void)
-{
-       if (is_limited_fork)
-               request_fork(-1);
-}
-
-/*
- * Like do_fork(), but allow the child continue only after the global
- * job count is low enough.
- *
- * We allow the parent to continue other more important activities but
- * child respects the limit of global active processes.
- */
-int do_fork_limited(void)
-{
-       int child, ret;
-       char byte;
-
-       child = do_fork();
-       if (child)
-               return child;
-
-       /* Remember to notify the parent when we are done */
-       atexit(limited_fork_exit_handler);
-       is_limited_fork = 1;
-
-       pr_info("Requesting permission to go\n");
-
-       /* Signal the parent that we are here, waiting to go */
-       request_fork(1);
-
-       /*
-        * The parent will tell us when we can continue. If there were
-        * multiple children waiting for their turn to run parent's
-        * write to the pipe will wake up every process reading
-        * it. However, only one will be reading the content and
-        * getting the permission to run. So we will read as many
-        * times as needed until we get our own permission to run.
-        */
-       do {
-               ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
-               if (ret == 1)
-                       break;
-
-       } while(1);
-
-       pr_info("Continuing\n");
-       return child;
-}
-
-int harvest_zombies(int pid)
+int clear_zombie(int pid)
 {
        int status;
-
-       if (child_count == 0)
-               return 0;
+       struct rusage rusage;
+       char *status_str = NULL;
+       int code = 0;
 
        if (pid)
-               pr_info("Waiting on pid %d, children left: %d\n", pid, 
-                       child_count);
+               pr_debug("Waiting on pid %d\n", pid);
 
-       pid = waitpid(pid, &status, 0);
-       if (pid < 0) {
-               pr_err("Error on wait(): %m\n");
-               child_count--; /* Decrement child count anyway */
-       }
-       else {
-               child_count--;
-               pr_info("pid %d: exit code %d. Children left: %d\n", pid,
-                       status, child_count);
+       do {
+               pid = wait4(pid, &status, 0, &rusage);
+               if (pid < 0) {
+                       pr_err("Error on waitid(): %m\n");
+                       return 0;
+               }
+               /* Wait until the child has become a zombie */
+       } while (!WIFEXITED(status) && !WIFSIGNALED(status));
+
+       if (WIFEXITED(status)) {
+               status_str = "exited with status";
+               code = WEXITSTATUS(status);
+       } else if (WIFSIGNALED(status)) {
+               status_str = "killed by signal";
+               code = WTERMSIG(status);
        }
+       pr_debug("pid %d: %s %d.\n", pid,
+               status_str, code);
+       pr_debug("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
+               (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
+               (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
 
        return 1;
 }
@@ -442,15 +426,15 @@ int run_piped_stream(const char *cmd, char *const argv[],
 
        if (stdinf)
                i = &ifd;
-       else 
+       else
                i = 0;
        if (stdoutf)
                o = &ofd;
-       else 
+       else
                o = 0;
        if (stderrf)
                e = &efd;
-       else 
+       else
                e = 0;
 
        pid = run_piped(cmd, argv, i, o, e);
@@ -498,16 +482,8 @@ int run(const char *cmd, char *const argv[])
        fd_set rfds;
        int maxfd;
        int eof = 0;
-       char stdoutstr[32], stderrstr[32], indent[16] = { "                " };
-
-       indent[get_parent_count() + 1] = 0;
-               
-       if ((child = do_fork()))
-           return child;
 
        child = run_piped(cmd, argv, NULL, &ofd, &efd);
-       snprintf(stdoutstr, 32, "%sstdout", green_color);
-       snprintf(stderrstr, 32, "%sstderr", red_color);
 
        FD_ZERO(&rfds);
        FD_SET(ofd, &rfds);
@@ -517,7 +493,7 @@ int run(const char *cmd, char *const argv[])
                char *sptr , *eptr;
                char rbuf[BUF_SIZE];
                int bytes;
-               char *typestr;
+               int is_stderr = 0;
 
                maxfd = max(ofd, efd);
                error = select(maxfd, &rfds, NULL, NULL, NULL);
@@ -528,13 +504,12 @@ int run(const char *cmd, char *const argv[])
                }
 
                if (FD_ISSET(ofd, &rfds)) {
-                       typestr = stdoutstr;
                        bytes = read(ofd, rbuf, BUF_SIZE);
                        goto print;
                }
 
                if (FD_ISSET(efd, &rfds)) {
-                       typestr = stderrstr;
+                       is_stderr = 1;
                        bytes = read(efd, rbuf, BUF_SIZE);
                        goto print;
                }
@@ -557,17 +532,21 @@ print:
                 */
                if (bytes == 0) {
                        bytes = read(efd, rbuf, BUF_SIZE);
-                       typestr = stderrstr;
+                       is_stderr = 1;
                        eof = 1;
                }
 
                sptr = eptr = rbuf;
-               while(bytes--) {
+               while (bytes--) {
                        if (*eptr == '\n') {
                                *eptr = 0;
-                               fprintf(stderr, "%s[%5d %s] %s: %s%s\n", indent,
-                                      child, cmd, typestr, sptr, normal_color);
-                               sptr = eptr;
+                               if (is_stderr)
+                                       pr_err("%s: stderr: %s\n",
+                                               cmd, sptr);
+                               else
+                                       pr_info("%s: stdout: %s\n",
+                                               cmd, sptr);
+                               sptr = eptr + 1;
                        }
                        eptr++;
                }
@@ -576,8 +555,72 @@ print:
        close(ofd);
        close(efd);
 
-       harvest_zombies(child); 
+       clear_zombie(child);
+
+       return 0;
+}
+
+int register_event_handler(struct event_handler *handler)
+{
+       struct epoll_event ev;
+       int ret;
+
+       if (handler->fd <= 0) {
+               pr_err("Invalid file descriptor of %d\n", handler->fd);
+               return -1;
+       }
+
+       if (!handler->handle_event) {
+               pr_err("Handler callback missing\n");
+               return -1;
+       }
+
+       pr_info("Registering handler for %s, fd %d\n",
+               handler->name, handler->fd);
+
+       ev.data.fd = handler->fd;
+       ev.data.ptr = handler;
+       ev.events = handler->events;
+       ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handler->fd, &ev);
+       if (ret) {
+               pr_err("Failed to add epoll_fd: %m\n");
+               return -1;
+       }
+
+       return 0;
+}
+
+void _mutex_lock_acquired(struct mutex *lock, char *file, int line)
+{
+       lock->line = line;
+       lock->file = file;
+}
+
+int _mutex_lock(struct mutex *lock, char *file, int line)
+{
+       int ret = 0;
+
+       if (!pthread_mutex_trylock(&lock->lock))
+               goto out_lock;
+
+       pr_info("Lock contention on lock %s on %s:%d\n",
+               lock->name, lock->file, lock->line);
+
+       ret = pthread_mutex_lock(&lock->lock);
+       if (ret)
+               pr_err("Acquirin lock %s failed: %m, acquired %s:%d\n",
+                       lock->name, lock->file, lock->line);
+
+out_lock:
+       _mutex_lock_acquired(lock, file, line);
+       return ret;
+}
+
+int _mutex_unlock(struct mutex *lock)
+{
+       lock->line = 0;
+       lock->file = NULL;
+       pthread_mutex_unlock(&lock->lock);
 
-       exit(1);
        return 0;
 }