]> git.itanic.dy.fi Git - rrdd/blobdiff - process.c
onewire_parser.c: Fix compiler warnings about string lengths
[rrdd] / process.c
index 8d3d18fdd94de8776b5ee7e8882b7159c98a84e2..f7477b532697fa257c79e22927629a7974c717c7 100644 (file)
--- a/process.c
+++ b/process.c
@@ -9,15 +9,10 @@
 
 #include "process.h"
 #include "debug.h"
+#include "utils.h"
 
-static int child_count;
-static int parent_count;
-static int job_request_fd[2];
-static int job_get_permission_fd[2];
 static int epoll_fd;
 static unsigned int max_jobs;
-static unsigned int job_count;
-static unsigned int jobs_pending;
 static unsigned int max_jobs_pending;
 
 struct work_struct {
@@ -51,11 +46,14 @@ struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
        },
 };
 
-struct mutex work_pending_mutex = {
-       .name = "work_pending",
+struct mutex work_stats_mutex = {
+       .name = "work_stats",
        .lock = PTHREAD_MUTEX_INITIALIZER,
 };
-pthread_cond_t work_pending_cond = PTHREAD_COND_INITIALIZER;
+static int workers_active;
+static int worker_count;
+
+static int job_notify_fd[2];
 
 static int run_work_on_queue(struct work_queue *queue)
 {
@@ -64,7 +62,6 @@ static int run_work_on_queue(struct work_queue *queue)
        mutex_lock(&queue->lock);
 
        if (!queue->work) {
-               pr_info("No  work to run on queue %s\n", queue->name);
                mutex_unlock(&queue->lock);
                return 0;
        }
@@ -74,14 +71,6 @@ static int run_work_on_queue(struct work_queue *queue)
        queue->work = work->next;
        queue->length--;
 
-       /*
-        * If queue is not empty, try waking up more workers. It is
-        * possible that when work were queued, the first worker did
-        * not wake up soon enough and
-        */
-       if (queue->length > 0)
-               pthread_cond_signal(&work_pending_cond);
-
        mutex_unlock(&queue->lock);
 
        pr_info("Executing work %s from queue %s, %d still pending\n",
@@ -97,43 +86,67 @@ static int run_work_on_queue(struct work_queue *queue)
 static void *worker_thread(void *arg)
 {
        int ret;
-
        char name[16];
 
-       snprintf(name, sizeof(name), "worker%ld", (long)arg);
+       mutex_lock(&work_stats_mutex);
+       snprintf(name, sizeof(name), "worker%d", worker_count);
+       worker_count++;
+       mutex_unlock(&work_stats_mutex);
+
        pthread_setname_np(pthread_self(), name);
+       pthread_detach(pthread_self());
 
-       while (1) {
-               while (1) {
-                       int prio, work_done = 0;
-
-                       /*
-                        * Execute as many works from the queues as
-                        * there are, starting from highest priority
-                        * queue
-                        */
-                       for (prio = 0; prio < WORK_PRIORITIES_NUM; prio++) {
-                               work_done =
-                                       run_work_on_queue(&work_queues[prio]);
-                               if (work_done)
-                                       break;
-                       }
+       pr_info("Worker started\n");
 
-                       if (!work_done)
-                               break;
-               }
+       /* Execute all high priority work from the queue */
+       while (run_work_on_queue(&work_queues[WORK_PRIORITY_HIGH]))
+               ;
+       /*
+        * All high priority work is now done, see if we have enough
+        * workers executing low priority worl. Continue from there if
+        * needed.
+        */
+       mutex_lock(&work_stats_mutex);
+       if (workers_active > max_jobs)
+               goto out_unlock;
 
-               pr_info("Worker going to sleep\n");
-               ret = pthread_cond_wait(&work_pending_cond,
-                                       &work_pending_mutex.lock);
-               if (ret < 0)
-                       pr_err("Error: %m\n");
+       mutex_unlock(&work_stats_mutex);
 
-               mutex_lock_acquired(&work_pending_mutex);
+       /*
+        * Start executing the low priority work. Drop the nice value
+        * as this really is low priority stuff
+        */
+       ret = nice(19);
+       pr_info("Worker priority dropped to %d\n", ret);
 
-               mutex_unlock(&work_pending_mutex);
+       while (run_work_on_queue(&work_queues[WORK_PRIORITY_LOW]))
+               ;
 
-       }
+       /* All done, exit */
+       mutex_lock(&work_stats_mutex);
+out_unlock:
+       workers_active--;
+       pr_info("Worker exiting, %d left active\n",
+               workers_active);
+
+       /*
+        * Last exiting worker zeroes the worker_count. This
+        * ensures next time we start spawning worker threads
+        * the first thread will have number zero on its name.
+        */
+       if (!workers_active)
+               worker_count = 0;
+
+       /*
+        * Kick the job poller. If all jobs were active at this point
+        * the scheduler thread will wait indefinitely until someone
+        * tells it to do something. We may now know when next job is
+        * available, so it is better for the scheduler to recalculate
+        * its sleep time.
+        */
+       notify_job_request();
+
+       mutex_unlock(&work_stats_mutex);
 
        return NULL;
 }
@@ -141,6 +154,7 @@ static void *worker_thread(void *arg)
 int queue_work(unsigned int priority, char *name,
        int (work_fn)(void *arg), void *arg)
 {
+       pthread_t *thread;
        struct work_queue *queue;
        struct work_struct *work, *last_work;
 
@@ -174,133 +188,37 @@ int queue_work(unsigned int priority, char *name,
        queue->length++;
        mutex_unlock(&queue->lock);
 
-       pthread_cond_signal(&work_pending_cond);
-
-       return 0;
-}
-
-int get_child_count(void)
-{
-       return child_count;
-}
-
-int get_parent_count(void)
-{
-       return parent_count;
-}
-
-static int handle_signals(struct event_handler *h)
-{
-       struct signalfd_siginfo siginfo;
-       int ret;
-
-       ret = read(h->fd, &siginfo, sizeof(siginfo));
-       if (ret < sizeof(siginfo)) {
-               pr_err("Expected %zd from read, got %d: %m\n",
-                       sizeof(siginfo), ret);
-               return -1;
+       mutex_lock(&work_stats_mutex);
+       pr_info("workers_active: %d, priority: %d\n", workers_active, priority);
+       if (priority != WORK_PRIORITY_HIGH && workers_active >= max_jobs) {
+               mutex_unlock(&work_stats_mutex);
+               return 0;
        }
+       workers_active++;
+       mutex_unlock(&work_stats_mutex);
 
-       if (siginfo.ssi_signo != SIGCHLD) {
-               pr_err("Unexpected signal %d, ignoring\n", siginfo.ssi_signo);
-               return -1;
-       }
+       pr_info("Creating new worker thread\n");
+       /* We need a worker thread, create one */
+       thread = calloc(sizeof(*thread), 1);
+       pthread_create(thread, NULL, worker_thread, NULL);
 
-       harvest_zombies(siginfo.ssi_pid);
+       free(thread);
 
        return 0;
 }
 
-static int grant_new_job(void)
+static int job_notify_handler(struct event_handler *h)
 {
        int ret;
-       char byte = 0;
-
-       job_count++;
-       pr_info("Granting new job. %d jobs currently and %d pending\n",
-               job_count, jobs_pending);
+       char buf[64];
 
-       ret = write(job_get_permission_fd[1], &byte, 1);
-       if (ret != 1) {
-               pr_err("Failed to write 1 byte: %m\n");
-               return -1;
-       }
-
-       return 0;
-}
-
-static int deny_job(void)
-{
-       int ret;
-       char byte = -1;
-
-       pr_info("Denying new job. %d jobs currently and %d pending, "
-               "limit of pending jobs is %d\n",
-               job_count, jobs_pending, max_jobs_pending);
-
-       ret = write(job_get_permission_fd[1], &byte, 1);
-       if (ret != 1) {
-               pr_err("Failed to write 1 byte: %m\n");
-               return -1;
-       }
-
-       return 0;
-}
-
-static int handle_job_request(struct event_handler *h)
-{
-       int ret, pid;
-
-       ret = read(job_request_fd[0], &pid, sizeof(pid));
-       if (ret < 0) {
+       ret = read(job_notify_fd[0], buf, sizeof(buf));
+       if (ret < 0)
                pr_err("Failed to read: %m\n");
-               return -1;
-       }
-
-       if (ret == 0) {
-               pr_info("Read zero bytes\n");
-               return 0;
-       }
-
-       if (pid > 0) {
-               if (job_count >= max_jobs) {
-                       if (jobs_pending < max_jobs_pending)
-                               jobs_pending++;
-                       else
-                               deny_job();
-               } else {
-                       ret = grant_new_job();
-                       return 0;
-               }
-       } else if (pid < 0) {
-               if (job_count > max_jobs)
-                       pr_err("BUG: Job %u jobs exceeded limit %u\n",
-                               job_count, max_jobs);
-
-               pr_info("Job %d finished\n", -pid);
-               job_count--;
-               if (jobs_pending) {
-                       jobs_pending--;
-                       ret = grant_new_job();
-                       return 0;
-               }
-       }
 
        return 0;
 }
 
-struct event_handler signal_handler = {
-       .handle_event = handle_signals,
-       .events = EPOLLIN,
-       .name = "signal",
-};
-
-struct event_handler job_request_handler = {
-       .handle_event = handle_job_request,
-       .events = EPOLLIN,
-       .name = "job_request",
-};
-
 /*
  * Initialize the jobcontrol.
  *
@@ -310,23 +228,11 @@ struct event_handler job_request_handler = {
  */
 int init_jobcontrol(int max_jobs_requested)
 {
+       static struct event_handler ev;
        FILE *file;
        int ret;
-       sigset_t sigmask;
        char buf[256];
        char match[8];
-       pthread_t *thread;
-       int i;
-
-       if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
-               pr_err("Failed to create pipe: %m\n");
-               return -1;
-       }
-
-       if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
-               pr_err("Failed to create pipe: %m\n");
-               return -1;
-       }
 
        epoll_fd = epoll_create(1);
        if (epoll_fd == -1) {
@@ -334,20 +240,6 @@ int init_jobcontrol(int max_jobs_requested)
                return -1;
        }
 
-       job_request_handler.fd = job_request_fd[0];
-       register_event_handler(&job_request_handler);
-
-       sigemptyset(&sigmask);
-       sigaddset(&sigmask, SIGCHLD);
-
-       signal_handler.fd = signalfd(-1, &sigmask, SFD_CLOEXEC);
-       if (job_request_handler.fd < 0) {
-               pr_err("Failed to create signal_fd: %m\n");
-               return -1;
-       }
-
-       register_event_handler(&signal_handler);
-
        if (max_jobs_requested > 0) {
                max_jobs = max_jobs_requested;
                goto no_count_cpus;
@@ -377,6 +269,18 @@ int init_jobcontrol(int max_jobs_requested)
                        max_jobs++;
        }
 
+       ret = pipe(job_notify_fd);
+       if (ret) {
+               pr_err("pipe() failed: %m\n");
+               return ret;
+       }
+
+       ev.fd = job_notify_fd[0];
+       ev.events = EPOLLIN;
+       ev.handle_event = job_notify_handler;
+       ev.name = "job_notify";
+       register_event_handler(&ev, EPOLL_CTL_ADD);
+
 open_fail:
 read_fail:
        fclose(file);
@@ -387,11 +291,6 @@ no_count_cpus:
        max_jobs_pending = max_jobs * 10 + 25;
        pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
 
-       /* Create worker threads */
-       thread = calloc(sizeof(*thread), max_jobs);
-       for (i = 0; i < max_jobs; i++)
-               pthread_create(&thread[i], NULL, worker_thread, (void *)i);
-
        return 0;
 }
 
@@ -437,17 +336,21 @@ int poll_job_requests(int timeout)
        job_handler->handle_event(job_handler);
 
 out:
-       pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
+       pr_info("Workers active: %u\n", workers_active);
        return ret;
 }
 
-/*
- * Per process flag indicating whether this child has requested fork
- * limiting. If it has, it must also tell the master parent when it
- * has died so that the parent can give next pending job permission to
- * go.
- */
-static int is_limited_fork;
+int notify_job_request(void)
+{
+       int ret;
+       char byte = 0;
+
+       ret = write(job_notify_fd[1], &byte, sizeof(byte));
+       if (ret < 0)
+               pr_err("Failed to write: %m\n");
+
+       return 0;
+}
 
 int do_fork(void)
 {
@@ -459,129 +362,22 @@ int do_fork(void)
        }
 
        if (child) {
-               child_count++;
-               pr_debug("Fork %d, child %d\n", child_count, child);
+               pr_debug("Fork child %d\n", child);
                return child;
        }
 
-       /*
-        * Also do not notify the master parent the death of this
-        * child. Only childs that have been created with
-        * do_fork_limited() can have this flag set.
-        */
-       is_limited_fork = 0;
-
-       /*
-        * Close unused ends of the job control pipes. Only the parent
-        * which controls the jobs may have the write end open of the
-        * job_get_permission_fd and the read end of the
-        * job_request_fd. Failing to close the pipe ends properly
-        * will cause the childs to wait forever for the run
-        * permission in case parent dies prematurely.
-        *
-        * Note! The file descriptor must be closed once and only
-        * once. They are marked to -1 to make it impossible for
-        * subseqent do_fork() calls from closing them again (in which
-        * case some other file descriptor might already be reserved
-        * for the same number) and prevent accidentally closing some
-        * innocent file descriptors that are still in use.
-        */
-       if (job_get_permission_fd[1] >= 0) {
-               close(job_get_permission_fd[1]);
-               job_get_permission_fd[1] = -1;
-       }
-       if (job_request_fd[0] >= 0) {
-               close(job_request_fd[0]);
-               job_request_fd[0] = -1;
-       }
-
-       /* reset child's child count */
-       child_count = 0;
-       parent_count++;
        return 0;
 }
 
-static int request_fork(int request)
-{
-       int pid = getpid();
-
-       pid = request > 0 ? pid : -pid;
-
-       return write(job_request_fd[1], &pid, sizeof(pid));
-}
-
-static void limited_fork_exit_handler(void)
-{
-       if (is_limited_fork)
-               request_fork(-1);
-}
-
-/*
- * Like do_fork(), but allow the child continue only after the global
- * job count is low enough.
- *
- * We allow the parent to continue other more important activities but
- * child respects the limit of global active processes.
- */
-int do_fork_limited(void)
-{
-       int child, ret;
-       char byte;
-
-       child = do_fork();
-       if (child)
-               return child;
-
-       /* Remember to notify the parent when we are done */
-       atexit(limited_fork_exit_handler);
-       is_limited_fork = 1;
-
-       pr_debug("Requesting permission to go\n");
-
-       /* Signal the parent that we are here, waiting to go */
-       request_fork(1);
-
-       /*
-        * The parent will tell us when we can continue. If there were
-        * multiple children waiting for their turn to run only one
-        * will be reading the content byte from the pipe and getting
-        * the permission to run.
-        */
-       ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
-       if (ret == 0)
-               pr_err("Error requesting run, did the parent die?\n");
-
-       if (ret < 0)
-               pr_err("Job control request failure: %m\n");
-
-       if (byte < 0) {
-               pr_info("Did not get permission to execute. Terminating\n");
-
-               /*
-                * Avoid running exit handler, that would tell the
-                * parent we died normally and decrement the job
-                * counters.
-                */
-               raise(SIGKILL);
-       }
-
-       pr_debug("Continuing\n");
-       return child;
-}
-
-int harvest_zombies(int pid)
+int clear_zombie(int pid)
 {
        int status;
        struct rusage rusage;
        char *status_str = NULL;
        int code = 0;
 
-       if (child_count == 0)
-               return 0;
-
        if (pid)
-               pr_debug("Waiting on pid %d, children left: %d\n", pid,
-                       child_count);
+               pr_debug("Waiting on pid %d\n", pid);
 
        do {
                pid = wait4(pid, &status, 0, &rusage);
@@ -592,7 +388,6 @@ int harvest_zombies(int pid)
                /* Wait until the child has become a zombie */
        } while (!WIFEXITED(status) && !WIFSIGNALED(status));
 
-       child_count--;
        if (WIFEXITED(status)) {
                status_str = "exited with status";
                code = WEXITSTATUS(status);
@@ -600,8 +395,8 @@ int harvest_zombies(int pid)
                status_str = "killed by signal";
                code = WTERMSIG(status);
        }
-       pr_debug("pid %d: %s %d. Children left: %d\n", pid,
-               status_str, code, child_count);
+       pr_debug("pid %d: %s %d.\n", pid,
+               status_str, code);
        pr_debug("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
                (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
                (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
@@ -629,12 +424,12 @@ int run_piped(const char *cmd, char *const argv[],
 
        if (stdoutfd && pipe(ofd)) {
                pr_err("pipe() failed: %m\n");
-               return -1;
+               goto close_ifd;
        }
 
        if (stderrfd && pipe(efd)) {
                pr_err("pipe() failed: %m\n");
-               return -1;
+               goto close_ofd;
        }
 
        pid = do_fork();
@@ -678,6 +473,19 @@ int run_piped(const char *cmd, char *const argv[],
        exit(1);
 
        return 0;
+
+close_ofd:
+       if (stdoutfd) {
+               close(ofd[0]);
+               close(ofd[1]);
+       }
+close_ifd:
+       if (stdinfd) {
+               close(ifd[0]);
+               close(ifd[1]);
+       }
+
+       return -1;
 }
 
 /*
@@ -708,7 +516,7 @@ int run_piped_stream(const char *cmd, char *const argv[],
        pid = run_piped(cmd, argv, i, o, e);
 
        if (stdinf) {
-               *stdinf = fdopen(ifd, "r");
+               *stdinf = fdopen(ifd, "w");
                if (*stdinf == NULL) {
                        pr_err("Error opening file stream for fd %d: %m\n",
                               ifd);
@@ -741,7 +549,6 @@ int run_piped_stream(const char *cmd, char *const argv[],
  * Forks a child and executes a command to run on parallel
  */
 
-#define max(a,b) (a) < (b) ? (b) : (a)
 #define BUF_SIZE (128*1024)
 int run(const char *cmd, char *const argv[])
 {
@@ -751,9 +558,6 @@ int run(const char *cmd, char *const argv[])
        int maxfd;
        int eof = 0;
 
-       if ((child = do_fork()))
-           return child;
-
        child = run_piped(cmd, argv, NULL, &ofd, &efd);
 
        FD_ZERO(&rfds);
@@ -817,7 +621,7 @@ print:
                                else
                                        pr_info("%s: stdout: %s\n",
                                                cmd, sptr);
-                               sptr = eptr;
+                               sptr = eptr + 1;
                        }
                        eptr++;
                }
@@ -826,16 +630,16 @@ print:
        close(ofd);
        close(efd);
 
-       harvest_zombies(child);
+       clear_zombie(child);
 
-       exit(1);
        return 0;
 }
 
-int register_event_handler(struct event_handler *handler)
+int register_event_handler(struct event_handler *handler, int op)
 {
        struct epoll_event ev;
        int ret;
+       const char *str;
 
        if (handler->fd <= 0) {
                pr_err("Invalid file descriptor of %d\n", handler->fd);
@@ -847,15 +651,33 @@ int register_event_handler(struct event_handler *handler)
                return -1;
        }
 
-       pr_info("Registering handler for %s, fd %d\n",
+       switch (op) {
+       case EPOLL_CTL_ADD:
+               str = "register";
+               break;
+
+       case EPOLL_CTL_MOD:
+               str = "modify";
+               break;
+
+       case EPOLL_CTL_DEL:
+               str = "deregister";
+               break;
+
+       default:
+               pr_err("Invalid op %d\n", op);
+               return -1;
+       }
+
+       pr_info("Doing a epoll %s for handler %s, fd %d\n", str,
                handler->name, handler->fd);
 
        ev.data.fd = handler->fd;
        ev.data.ptr = handler;
        ev.events = handler->events;
-       ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handler->fd, &ev);
+       ret = epoll_ctl(epoll_fd, op, handler->fd, &ev);
        if (ret) {
-               pr_err("Failed to add epoll_fd: %m\n");
+               pr_err("Failed to do epoll_ctl %s: %m\n", str);
                return -1;
        }
 
@@ -866,24 +688,32 @@ void _mutex_lock_acquired(struct mutex *lock, char *file, int line)
 {
        lock->line = line;
        lock->file = file;
+       pthread_getname_np(pthread_self(),
+                       lock->owner_name, sizeof(lock->owner_name));
 }
 
 int _mutex_lock(struct mutex *lock, char *file, int line)
 {
        int ret = 0;
+       int contended = 0;
 
        if (!pthread_mutex_trylock(&lock->lock))
                goto out_lock;
 
-       pr_info("Lock contention on lock %s on %s:%d\n",
-               lock->name, lock->file, lock->line);
+       contended = 1;
+       pr_debug("Lock contention at %s:%d on lock %s acquired by %s at %s:%d\n",
+               file, line, lock->name,
+               lock->owner_name, lock->file, lock->line);
 
        ret = pthread_mutex_lock(&lock->lock);
        if (ret)
-               pr_err("Acquirin lock %s failed: %m, acquired %s:%d\n",
+               pr_err("Acquirin lock %s failed: %m, acquired on %s:%d\n",
                        lock->name, lock->file, lock->line);
 
 out_lock:
+       if (contended)
+               pr_debug("Lock %s acquired at %s:%d after contention\n",
+                       lock->name, file, line);
        _mutex_lock_acquired(lock, file, line);
        return ret;
 }