From: Timo Kokkonen Date: Sat, 9 Jul 2016 14:47:30 +0000 (+0300) Subject: process: Remove fork based concurrenct management X-Git-Url: http://git.itanic.dy.fi/?p=rrdd;a=commitdiff_plain;h=b258d4904d5e89dd6b05100c4e0e0c52424be22e process: Remove fork based concurrenct management The thread based concurrency system replaces the fork based system. Remove all deprecated code out of the way. Signed-off-by: Timo Kokkonen --- diff --git a/built_in_parsers.c b/built_in_parsers.c index 8d7bdb1..f86da28 100644 --- a/built_in_parsers.c +++ b/built_in_parsers.c @@ -147,7 +147,7 @@ static int digitemp_parser(char *data, const char **p) /* Read whatever the process might be still printing out */ while (fgets(buf, 1024, readf)); - harvest_zombies(pid); + clear_zombie(pid); snprintf(data, RRD_DATA_MAX_LEN, "%.2f:%.2f", t3, t2); return 0; } @@ -184,7 +184,7 @@ static int script_parser(char *rrd_data, const char **parser_data) err_read: fclose(readf); - harvest_zombies(pid); + clear_zombie(pid); return 0; } diff --git a/debug.c b/debug.c index fe0fc9b..437ce5e 100644 --- a/debug.c +++ b/debug.c @@ -60,9 +60,9 @@ void print_trace(const char *file, int line, int color, int l, pthread_getname_np(pthread_self(), thread_name, sizeof(thread_name)); ret = snprintf(all_trace, sizeof(all_trace), - "%s%s [%5d.%d] %s %s:%d %s %s", + "%s%s [%5d] %s %s:%d %s %s", color_str, time, - getpid(), get_parent_count(), + getpid(), thread_name, file, line, normal_color, trace); diff --git a/process.c b/process.c index 19d8ded..7e73921 100644 --- a/process.c +++ b/process.c @@ -10,10 +10,6 @@ #include "process.h" #include "debug.h" -static int child_count; -static int parent_count; -static int job_request_fd[2]; -static int job_get_permission_fd[2]; static int epoll_fd; static unsigned int max_jobs; static unsigned int job_count; @@ -178,100 +174,6 @@ int queue_work(unsigned int priority, char *name, return 0; } -int get_child_count(void) -{ - return child_count; -} - -int get_parent_count(void) -{ - return parent_count; -} - -static int grant_new_job(void) -{ - int ret; - char byte = 0; - - job_count++; - pr_info("Granting new job. %d jobs currently and %d pending\n", - job_count, jobs_pending); - - ret = write(job_get_permission_fd[1], &byte, 1); - if (ret != 1) { - pr_err("Failed to write 1 byte: %m\n"); - return -1; - } - - return 0; -} - -static int deny_job(void) -{ - int ret; - char byte = -1; - - pr_info("Denying new job. %d jobs currently and %d pending, " - "limit of pending jobs is %d\n", - job_count, jobs_pending, max_jobs_pending); - - ret = write(job_get_permission_fd[1], &byte, 1); - if (ret != 1) { - pr_err("Failed to write 1 byte: %m\n"); - return -1; - } - - return 0; -} - -static int handle_job_request(struct event_handler *h) -{ - int ret, pid; - - ret = read(job_request_fd[0], &pid, sizeof(pid)); - if (ret < 0) { - pr_err("Failed to read: %m\n"); - return -1; - } - - if (ret == 0) { - pr_info("Read zero bytes\n"); - return 0; - } - - if (pid > 0) { - if (job_count >= max_jobs) { - if (jobs_pending < max_jobs_pending) - jobs_pending++; - else - deny_job(); - } else { - ret = grant_new_job(); - return 0; - } - } else if (pid < 0) { - if (job_count > max_jobs) - pr_err("BUG: Job %u jobs exceeded limit %u\n", - job_count, max_jobs); - - pr_info("Job %d finished\n", -pid); - job_count--; - if (jobs_pending) { - jobs_pending--; - ret = grant_new_job(); - return 0; - } - } - - return 0; -} - -struct event_handler job_request_handler = { - .handle_event = handle_job_request, - .events = EPOLLIN, - .name = "job_request", -}; - /* * Initialize the jobcontrol. * @@ -283,34 +185,17 @@ int init_jobcontrol(int max_jobs_requested) { FILE *file; int ret; - sigset_t sigmask; char buf[256]; char match[8]; pthread_t *thread; long int i; - if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) { - pr_err("Failed to create pipe: %m\n"); - return -1; - } - - if (pipe2(job_get_permission_fd, O_CLOEXEC)) { - pr_err("Failed to create pipe: %m\n"); - return -1; - } - epoll_fd = epoll_create(1); if (epoll_fd == -1) { pr_err("Failed to epoll_create(): %m\n"); return -1; } - job_request_handler.fd = job_request_fd[0]; - register_event_handler(&job_request_handler); - - sigemptyset(&sigmask); - sigaddset(&sigmask, SIGCHLD); - if (max_jobs_requested > 0) { max_jobs = max_jobs_requested; goto no_count_cpus; @@ -404,14 +289,6 @@ out: return ret; } -/* - * Per process flag indicating whether this child has requested fork - * limiting. If it has, it must also tell the master parent when it - * has died so that the parent can give next pending job permission to - * go. - */ -static int is_limited_fork; - int do_fork(void) { int child; @@ -422,129 +299,22 @@ int do_fork(void) } if (child) { - child_count++; - pr_debug("Fork %d, child %d\n", child_count, child); + pr_debug("Fork child %d\n", child); return child; } - /* - * Also do not notify the master parent the death of this - * child. Only childs that have been created with - * do_fork_limited() can have this flag set. - */ - is_limited_fork = 0; - - /* - * Close unused ends of the job control pipes. Only the parent - * which controls the jobs may have the write end open of the - * job_get_permission_fd and the read end of the - * job_request_fd. Failing to close the pipe ends properly - * will cause the childs to wait forever for the run - * permission in case parent dies prematurely. - * - * Note! The file descriptor must be closed once and only - * once. They are marked to -1 to make it impossible for - * subseqent do_fork() calls from closing them again (in which - * case some other file descriptor might already be reserved - * for the same number) and prevent accidentally closing some - * innocent file descriptors that are still in use. - */ - if (job_get_permission_fd[1] >= 0) { - close(job_get_permission_fd[1]); - job_get_permission_fd[1] = -1; - } - if (job_request_fd[0] >= 0) { - close(job_request_fd[0]); - job_request_fd[0] = -1; - } - - /* reset child's child count */ - child_count = 0; - parent_count++; return 0; } -static int request_fork(int request) -{ - int pid = getpid(); - - pid = request > 0 ? pid : -pid; - - return write(job_request_fd[1], &pid, sizeof(pid)); -} - -static void limited_fork_exit_handler(void) -{ - if (is_limited_fork) - request_fork(-1); -} - -/* - * Like do_fork(), but allow the child continue only after the global - * job count is low enough. - * - * We allow the parent to continue other more important activities but - * child respects the limit of global active processes. - */ -int do_fork_limited(void) -{ - int child, ret; - char byte; - - child = do_fork(); - if (child) - return child; - - /* Remember to notify the parent when we are done */ - atexit(limited_fork_exit_handler); - is_limited_fork = 1; - - pr_debug("Requesting permission to go\n"); - - /* Signal the parent that we are here, waiting to go */ - request_fork(1); - - /* - * The parent will tell us when we can continue. If there were - * multiple children waiting for their turn to run only one - * will be reading the content byte from the pipe and getting - * the permission to run. - */ - ret = read(job_get_permission_fd[0], &byte, sizeof(byte)); - if (ret == 0) - pr_err("Error requesting run, did the parent die?\n"); - - if (ret < 0) - pr_err("Job control request failure: %m\n"); - - if (byte < 0) { - pr_info("Did not get permission to execute. Terminating\n"); - - /* - * Avoid running exit handler, that would tell the - * parent we died normally and decrement the job - * counters. - */ - raise(SIGKILL); - } - - pr_debug("Continuing\n"); - return child; -} - -int harvest_zombies(int pid) +int clear_zombie(int pid) { int status; struct rusage rusage; char *status_str = NULL; int code = 0; - if (child_count == 0) - return 0; - if (pid) - pr_debug("Waiting on pid %d, children left: %d\n", pid, - child_count); + pr_debug("Waiting on pid %d\n", pid); do { pid = wait4(pid, &status, 0, &rusage); @@ -555,7 +325,6 @@ int harvest_zombies(int pid) /* Wait until the child has become a zombie */ } while (!WIFEXITED(status) && !WIFSIGNALED(status)); - child_count--; if (WIFEXITED(status)) { status_str = "exited with status"; code = WEXITSTATUS(status); @@ -563,8 +332,8 @@ int harvest_zombies(int pid) status_str = "killed by signal"; code = WTERMSIG(status); } - pr_debug("pid %d: %s %d. Children left: %d\n", pid, - status_str, code, child_count); + pr_debug("pid %d: %s %d.\n", pid, + status_str, code); pr_debug("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid, (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000, (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000); @@ -786,7 +555,7 @@ print: close(ofd); close(efd); - harvest_zombies(child); + clear_zombie(child); return 0; } diff --git a/process.h b/process.h index c3b5c23..c72830b 100644 --- a/process.h +++ b/process.h @@ -32,15 +32,11 @@ struct mutex { int register_event_handler(struct event_handler *handler); -int get_child_count(void); -int get_parent_count(void); - int init_jobcontrol(int max_jobs_requested); int poll_job_requests(int timeout); int do_fork(void); -int do_fork_limited(void); int run(const char *p, char *const argv[]); -int harvest_zombies(int pid); +int clear_zombie(int pid); int run_piped(const char *cmd, char *const argv[], int *stdinfd, int *stdoutfd, int *stderrfd); int run_piped_stream(const char *cmd, char *const argv[],