]> git.itanic.dy.fi Git - rrdd/commitdiff
process: Remove fork based concurrenct management
authorTimo Kokkonen <timo.t.kokkonen@iki.fi>
Sat, 9 Jul 2016 14:47:30 +0000 (17:47 +0300)
committerTimo Kokkonen <timo.t.kokkonen@iki.fi>
Sat, 9 Jul 2016 14:47:30 +0000 (17:47 +0300)
The thread based concurrency system replaces the fork based
system. Remove all deprecated code out of the way.

Signed-off-by: Timo Kokkonen <timo.t.kokkonen@iki.fi>
built_in_parsers.c
debug.c
process.c
process.h

index 8d7bdb15c1ca9760246e8bd83630b9b0318cac51..f86da28efa9317762b1c6cc904d318e60ad5fb88 100644 (file)
@@ -147,7 +147,7 @@ static int digitemp_parser(char *data, const char **p)
        /* Read whatever the process might be still printing out */
        while (fgets(buf, 1024, readf));
 
        /* Read whatever the process might be still printing out */
        while (fgets(buf, 1024, readf));
 
-       harvest_zombies(pid);
+       clear_zombie(pid);
        snprintf(data, RRD_DATA_MAX_LEN, "%.2f:%.2f", t3, t2);
        return 0;
 }
        snprintf(data, RRD_DATA_MAX_LEN, "%.2f:%.2f", t3, t2);
        return 0;
 }
@@ -184,7 +184,7 @@ static int script_parser(char *rrd_data, const char **parser_data)
 err_read:
        fclose(readf);
 
 err_read:
        fclose(readf);
 
-       harvest_zombies(pid);
+       clear_zombie(pid);
 
        return 0;
 }
 
        return 0;
 }
diff --git a/debug.c b/debug.c
index fe0fc9b139ba661d1c42df4adf7966f47020c985..437ce5eb87f02963da2639e1e44b58023b741f16 100644 (file)
--- a/debug.c
+++ b/debug.c
@@ -60,9 +60,9 @@ void print_trace(const char *file, int line, int color, int l,
        pthread_getname_np(pthread_self(), thread_name, sizeof(thread_name));
 
        ret = snprintf(all_trace, sizeof(all_trace),
        pthread_getname_np(pthread_self(), thread_name, sizeof(thread_name));
 
        ret = snprintf(all_trace, sizeof(all_trace),
-               "%s%s [%5d.%d] %s %s:%d %s %s",
+               "%s%s [%5d] %s %s:%d %s %s",
                color_str, time,
                color_str, time,
-               getpid(), get_parent_count(),
+               getpid(),
                thread_name,
                file, line, normal_color, trace);
 
                thread_name,
                file, line, normal_color, trace);
 
index 19d8ded186e9c7976761ef9b8da63a9ebc7a8b15..7e739210f5f9829c1e70bed700935611820cd4cc 100644 (file)
--- a/process.c
+++ b/process.c
 #include "process.h"
 #include "debug.h"
 
 #include "process.h"
 #include "debug.h"
 
-static int child_count;
-static int parent_count;
-static int job_request_fd[2];
-static int job_get_permission_fd[2];
 static int epoll_fd;
 static unsigned int max_jobs;
 static unsigned int job_count;
 static int epoll_fd;
 static unsigned int max_jobs;
 static unsigned int job_count;
@@ -178,100 +174,6 @@ int queue_work(unsigned int priority, char *name,
        return 0;
 }
 
        return 0;
 }
 
-int get_child_count(void)
-{
-       return child_count;
-}
-
-int get_parent_count(void)
-{
-       return parent_count;
-}
-
-static int grant_new_job(void)
-{
-       int ret;
-       char byte = 0;
-
-       job_count++;
-       pr_info("Granting new job. %d jobs currently and %d pending\n",
-               job_count, jobs_pending);
-
-       ret = write(job_get_permission_fd[1], &byte, 1);
-       if (ret != 1) {
-               pr_err("Failed to write 1 byte: %m\n");
-               return -1;
-       }
-
-       return 0;
-}
-
-static int deny_job(void)
-{
-       int ret;
-       char byte = -1;
-
-       pr_info("Denying new job. %d jobs currently and %d pending, "
-               "limit of pending jobs is %d\n",
-               job_count, jobs_pending, max_jobs_pending);
-
-       ret = write(job_get_permission_fd[1], &byte, 1);
-       if (ret != 1) {
-               pr_err("Failed to write 1 byte: %m\n");
-               return -1;
-       }
-
-       return 0;
-}
-
-static int handle_job_request(struct event_handler *h)
-{
-       int ret, pid;
-
-       ret = read(job_request_fd[0], &pid, sizeof(pid));
-       if (ret < 0) {
-               pr_err("Failed to read: %m\n");
-               return -1;
-       }
-
-       if (ret == 0) {
-               pr_info("Read zero bytes\n");
-               return 0;
-       }
-
-       if (pid > 0) {
-               if (job_count >= max_jobs) {
-                       if (jobs_pending < max_jobs_pending)
-                               jobs_pending++;
-                       else
-                               deny_job();
-               } else {
-                       ret = grant_new_job();
-                       return 0;
-               }
-       } else if (pid < 0) {
-               if (job_count > max_jobs)
-                       pr_err("BUG: Job %u jobs exceeded limit %u\n",
-                               job_count, max_jobs);
-
-               pr_info("Job %d finished\n", -pid);
-               job_count--;
-               if (jobs_pending) {
-                       jobs_pending--;
-                       ret = grant_new_job();
-                       return 0;
-               }
-       }
-
-       return 0;
-}
-
-struct event_handler job_request_handler = {
-       .handle_event = handle_job_request,
-       .events = EPOLLIN,
-       .name = "job_request",
-};
-
 /*
  * Initialize the jobcontrol.
  *
 /*
  * Initialize the jobcontrol.
  *
@@ -283,34 +185,17 @@ int init_jobcontrol(int max_jobs_requested)
 {
        FILE *file;
        int ret;
 {
        FILE *file;
        int ret;
-       sigset_t sigmask;
        char buf[256];
        char match[8];
        pthread_t *thread;
        long int i;
 
        char buf[256];
        char match[8];
        pthread_t *thread;
        long int i;
 
-       if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
-               pr_err("Failed to create pipe: %m\n");
-               return -1;
-       }
-
-       if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
-               pr_err("Failed to create pipe: %m\n");
-               return -1;
-       }
-
        epoll_fd = epoll_create(1);
        if (epoll_fd == -1) {
                pr_err("Failed to epoll_create(): %m\n");
                return -1;
        }
 
        epoll_fd = epoll_create(1);
        if (epoll_fd == -1) {
                pr_err("Failed to epoll_create(): %m\n");
                return -1;
        }
 
-       job_request_handler.fd = job_request_fd[0];
-       register_event_handler(&job_request_handler);
-
-       sigemptyset(&sigmask);
-       sigaddset(&sigmask, SIGCHLD);
-
        if (max_jobs_requested > 0) {
                max_jobs = max_jobs_requested;
                goto no_count_cpus;
        if (max_jobs_requested > 0) {
                max_jobs = max_jobs_requested;
                goto no_count_cpus;
@@ -404,14 +289,6 @@ out:
        return ret;
 }
 
        return ret;
 }
 
-/*
- * Per process flag indicating whether this child has requested fork
- * limiting. If it has, it must also tell the master parent when it
- * has died so that the parent can give next pending job permission to
- * go.
- */
-static int is_limited_fork;
-
 int do_fork(void)
 {
        int child;
 int do_fork(void)
 {
        int child;
@@ -422,129 +299,22 @@ int do_fork(void)
        }
 
        if (child) {
        }
 
        if (child) {
-               child_count++;
-               pr_debug("Fork %d, child %d\n", child_count, child);
+               pr_debug("Fork child %d\n", child);
                return child;
        }
 
                return child;
        }
 
-       /*
-        * Also do not notify the master parent the death of this
-        * child. Only childs that have been created with
-        * do_fork_limited() can have this flag set.
-        */
-       is_limited_fork = 0;
-
-       /*
-        * Close unused ends of the job control pipes. Only the parent
-        * which controls the jobs may have the write end open of the
-        * job_get_permission_fd and the read end of the
-        * job_request_fd. Failing to close the pipe ends properly
-        * will cause the childs to wait forever for the run
-        * permission in case parent dies prematurely.
-        *
-        * Note! The file descriptor must be closed once and only
-        * once. They are marked to -1 to make it impossible for
-        * subseqent do_fork() calls from closing them again (in which
-        * case some other file descriptor might already be reserved
-        * for the same number) and prevent accidentally closing some
-        * innocent file descriptors that are still in use.
-        */
-       if (job_get_permission_fd[1] >= 0) {
-               close(job_get_permission_fd[1]);
-               job_get_permission_fd[1] = -1;
-       }
-       if (job_request_fd[0] >= 0) {
-               close(job_request_fd[0]);
-               job_request_fd[0] = -1;
-       }
-
-       /* reset child's child count */
-       child_count = 0;
-       parent_count++;
        return 0;
 }
 
        return 0;
 }
 
-static int request_fork(int request)
-{
-       int pid = getpid();
-
-       pid = request > 0 ? pid : -pid;
-
-       return write(job_request_fd[1], &pid, sizeof(pid));
-}
-
-static void limited_fork_exit_handler(void)
-{
-       if (is_limited_fork)
-               request_fork(-1);
-}
-
-/*
- * Like do_fork(), but allow the child continue only after the global
- * job count is low enough.
- *
- * We allow the parent to continue other more important activities but
- * child respects the limit of global active processes.
- */
-int do_fork_limited(void)
-{
-       int child, ret;
-       char byte;
-
-       child = do_fork();
-       if (child)
-               return child;
-
-       /* Remember to notify the parent when we are done */
-       atexit(limited_fork_exit_handler);
-       is_limited_fork = 1;
-
-       pr_debug("Requesting permission to go\n");
-
-       /* Signal the parent that we are here, waiting to go */
-       request_fork(1);
-
-       /*
-        * The parent will tell us when we can continue. If there were
-        * multiple children waiting for their turn to run only one
-        * will be reading the content byte from the pipe and getting
-        * the permission to run.
-        */
-       ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
-       if (ret == 0)
-               pr_err("Error requesting run, did the parent die?\n");
-
-       if (ret < 0)
-               pr_err("Job control request failure: %m\n");
-
-       if (byte < 0) {
-               pr_info("Did not get permission to execute. Terminating\n");
-
-               /*
-                * Avoid running exit handler, that would tell the
-                * parent we died normally and decrement the job
-                * counters.
-                */
-               raise(SIGKILL);
-       }
-
-       pr_debug("Continuing\n");
-       return child;
-}
-
-int harvest_zombies(int pid)
+int clear_zombie(int pid)
 {
        int status;
        struct rusage rusage;
        char *status_str = NULL;
        int code = 0;
 
 {
        int status;
        struct rusage rusage;
        char *status_str = NULL;
        int code = 0;
 
-       if (child_count == 0)
-               return 0;
-
        if (pid)
        if (pid)
-               pr_debug("Waiting on pid %d, children left: %d\n", pid,
-                       child_count);
+               pr_debug("Waiting on pid %d\n", pid);
 
        do {
                pid = wait4(pid, &status, 0, &rusage);
 
        do {
                pid = wait4(pid, &status, 0, &rusage);
@@ -555,7 +325,6 @@ int harvest_zombies(int pid)
                /* Wait until the child has become a zombie */
        } while (!WIFEXITED(status) && !WIFSIGNALED(status));
 
                /* Wait until the child has become a zombie */
        } while (!WIFEXITED(status) && !WIFSIGNALED(status));
 
-       child_count--;
        if (WIFEXITED(status)) {
                status_str = "exited with status";
                code = WEXITSTATUS(status);
        if (WIFEXITED(status)) {
                status_str = "exited with status";
                code = WEXITSTATUS(status);
@@ -563,8 +332,8 @@ int harvest_zombies(int pid)
                status_str = "killed by signal";
                code = WTERMSIG(status);
        }
                status_str = "killed by signal";
                code = WTERMSIG(status);
        }
-       pr_debug("pid %d: %s %d. Children left: %d\n", pid,
-               status_str, code, child_count);
+       pr_debug("pid %d: %s %d.\n", pid,
+               status_str, code);
        pr_debug("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
                (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
                (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
        pr_debug("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
                (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
                (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
@@ -786,7 +555,7 @@ print:
        close(ofd);
        close(efd);
 
        close(ofd);
        close(efd);
 
-       harvest_zombies(child);
+       clear_zombie(child);
 
        return 0;
 }
 
        return 0;
 }
index c3b5c23653381297400e88d6cad5b3498cd82ef8..c72830b72ee5c1ff74ebad870a39f88ca1ebf687 100644 (file)
--- a/process.h
+++ b/process.h
@@ -32,15 +32,11 @@ struct mutex {
 
 int register_event_handler(struct event_handler *handler);
 
 
 int register_event_handler(struct event_handler *handler);
 
-int get_child_count(void);
-int get_parent_count(void);
-
 int init_jobcontrol(int max_jobs_requested);
 int poll_job_requests(int timeout);
 int do_fork(void);
 int init_jobcontrol(int max_jobs_requested);
 int poll_job_requests(int timeout);
 int do_fork(void);
-int do_fork_limited(void);
 int run(const char *p, char *const argv[]);
 int run(const char *p, char *const argv[]);
-int harvest_zombies(int pid);
+int clear_zombie(int pid);
 int run_piped(const char *cmd, char *const argv[],
              int *stdinfd, int *stdoutfd, int *stderrfd);
 int run_piped_stream(const char *cmd, char *const argv[],
 int run_piped(const char *cmd, char *const argv[],
              int *stdinfd, int *stdoutfd, int *stderrfd);
 int run_piped_stream(const char *cmd, char *const argv[],