]> git.itanic.dy.fi Git - rrdd/blobdiff - process.c
onewire_parser.c: Fix compiler warnings about string lengths
[rrdd] / process.c
index 0e5c1c2ce24a7c3d01604b6b3bcf2c3f601c5c63..f7477b532697fa257c79e22927629a7974c717c7 100644 (file)
--- a/process.c
+++ b/process.c
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/select.h>
+#include <sys/epoll.h>
+#include <stdio.h>
+#include <sys/wait.h>
+#include <sys/signalfd.h>
+#include <sys/resource.h>
+
 #include "process.h"
+#include "debug.h"
+#include "utils.h"
+
+static int epoll_fd;
+static unsigned int max_jobs;
+static unsigned int max_jobs_pending;
+
+struct work_struct {
+       const char *name;
+       int (*work_fn)(void *);
+       void *arg;
+       struct work_struct *next;
+};
+
+struct work_queue {
+       struct work_struct *work;
+       int length;
+       char *name;
+       struct mutex lock;
+};
+
+struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
+       {
+               .name = "high priority",
+               .lock = {
+                       .name = "high_prio_queue",
+                       .lock = PTHREAD_MUTEX_INITIALIZER,
+               },
+       },
+       {
+               .name = "low priority",
+               .lock = {
+                       .name = "low_prio_queue",
+                       .lock = PTHREAD_MUTEX_INITIALIZER,
+               },
+       },
+};
+
+struct mutex work_stats_mutex = {
+       .name = "work_stats",
+       .lock = PTHREAD_MUTEX_INITIALIZER,
+};
+static int workers_active;
+static int worker_count;
+
+static int job_notify_fd[2];
+
+static int run_work_on_queue(struct work_queue *queue)
+{
+       struct work_struct *work;
+
+       mutex_lock(&queue->lock);
+
+       if (!queue->work) {
+               mutex_unlock(&queue->lock);
+               return 0;
+       }
+
+       /* Take next work */
+       work = queue->work;
+       queue->work = work->next;
+       queue->length--;
+
+       mutex_unlock(&queue->lock);
+
+       pr_info("Executing work %s from queue %s, %d still pending\n",
+               work->name, queue->name, queue->length);
+
+       work->work_fn(work->arg);
+       pr_info("Work %s done\n", work->name);
+       free(work);
+
+       return 1;
+}
+
+static void *worker_thread(void *arg)
+{
+       int ret;
+       char name[16];
+
+       mutex_lock(&work_stats_mutex);
+       snprintf(name, sizeof(name), "worker%d", worker_count);
+       worker_count++;
+       mutex_unlock(&work_stats_mutex);
+
+       pthread_setname_np(pthread_self(), name);
+       pthread_detach(pthread_self());
+
+       pr_info("Worker started\n");
+
+       /* Execute all high priority work from the queue */
+       while (run_work_on_queue(&work_queues[WORK_PRIORITY_HIGH]))
+               ;
+       /*
+        * All high priority work is now done, see if we have enough
+        * workers executing low priority worl. Continue from there if
+        * needed.
+        */
+       mutex_lock(&work_stats_mutex);
+       if (workers_active > max_jobs)
+               goto out_unlock;
+
+       mutex_unlock(&work_stats_mutex);
+
+       /*
+        * Start executing the low priority work. Drop the nice value
+        * as this really is low priority stuff
+        */
+       ret = nice(19);
+       pr_info("Worker priority dropped to %d\n", ret);
+
+       while (run_work_on_queue(&work_queues[WORK_PRIORITY_LOW]))
+               ;
+
+       /* All done, exit */
+       mutex_lock(&work_stats_mutex);
+out_unlock:
+       workers_active--;
+       pr_info("Worker exiting, %d left active\n",
+               workers_active);
+
+       /*
+        * Last exiting worker zeroes the worker_count. This
+        * ensures next time we start spawning worker threads
+        * the first thread will have number zero on its name.
+        */
+       if (!workers_active)
+               worker_count = 0;
+
+       /*
+        * Kick the job poller. If all jobs were active at this point
+        * the scheduler thread will wait indefinitely until someone
+        * tells it to do something. We may now know when next job is
+        * available, so it is better for the scheduler to recalculate
+        * its sleep time.
+        */
+       notify_job_request();
+
+       mutex_unlock(&work_stats_mutex);
+
+       return NULL;
+}
+
+int queue_work(unsigned int priority, char *name,
+       int (work_fn)(void *arg), void *arg)
+{
+       pthread_t *thread;
+       struct work_queue *queue;
+       struct work_struct *work, *last_work;
+
+       if (priority >= WORK_PRIORITIES_NUM) {
+               pr_err("Invalid priority: %d\n", priority);
+               return -EINVAL;
+       }
+
+       work = calloc(sizeof(*work), 1);
+
+       work->name = name;
+       work->work_fn = work_fn;
+       work->arg = arg;
+
+       queue = &work_queues[priority];
+
+       /* Insert new work at the end of the work queue */
+       mutex_lock(&queue->lock);
+
+       last_work = queue->work;
+       while (last_work && last_work->next)
+               last_work = last_work->next;
+
+       if (!last_work)
+               queue->work = work;
+       else
+               last_work->next = work;
+
+       pr_info("Inserted work %s in queue %s, with %d pending items\n",
+               work->name, queue->name, queue->length);
+       queue->length++;
+       mutex_unlock(&queue->lock);
+
+       mutex_lock(&work_stats_mutex);
+       pr_info("workers_active: %d, priority: %d\n", workers_active, priority);
+       if (priority != WORK_PRIORITY_HIGH && workers_active >= max_jobs) {
+               mutex_unlock(&work_stats_mutex);
+               return 0;
+       }
+       workers_active++;
+       mutex_unlock(&work_stats_mutex);
+
+       pr_info("Creating new worker thread\n");
+       /* We need a worker thread, create one */
+       thread = calloc(sizeof(*thread), 1);
+       pthread_create(thread, NULL, worker_thread, NULL);
+
+       free(thread);
+
+       return 0;
+}
+
+static int job_notify_handler(struct event_handler *h)
+{
+       int ret;
+       char buf[64];
+
+       ret = read(job_notify_fd[0], buf, sizeof(buf));
+       if (ret < 0)
+               pr_err("Failed to read: %m\n");
+
+       return 0;
+}
+
+/*
+ * Initialize the jobcontrol.
+ *
+ * Create the pipes that are used to grant children execution
+ * permissions. If max_jobs is zero, count the number of CPUs from
+ * /proc/cpuinfo and use that.
+ */
+int init_jobcontrol(int max_jobs_requested)
+{
+       static struct event_handler ev;
+       FILE *file;
+       int ret;
+       char buf[256];
+       char match[8];
+
+       epoll_fd = epoll_create(1);
+       if (epoll_fd == -1) {
+               pr_err("Failed to epoll_create(): %m\n");
+               return -1;
+       }
+
+       if (max_jobs_requested > 0) {
+               max_jobs = max_jobs_requested;
+               goto no_count_cpus;
+       }
+       max_jobs++;
+
+       file = fopen("/proc/cpuinfo", "ro");
+       if (!file) {
+               pr_err("Failed to open /proc/cpuinfo: %m\n");
+               goto open_fail;
+       }
+
+       /*
+        * The CPU count algorithm simply reads the first 8 bytes from
+        * the /proc/cpuinfo and then expects that line to be there as
+        * many times as there are CPUs.
+        */
+       ret = fread(match, 1, sizeof(match), file);
+       if (ret < sizeof(match)) {
+               pr_err("read %d bytes when expecting %zd %m\n",
+                       ret, sizeof(match));
+               goto read_fail;
+       }
+
+       while(fgets(buf, sizeof(buf), file)) {
+               if (!strncmp(buf, match, sizeof(match)))
+                       max_jobs++;
+       }
+
+       ret = pipe(job_notify_fd);
+       if (ret) {
+               pr_err("pipe() failed: %m\n");
+               return ret;
+       }
+
+       ev.fd = job_notify_fd[0];
+       ev.events = EPOLLIN;
+       ev.handle_event = job_notify_handler;
+       ev.name = "job_notify";
+       register_event_handler(&ev, EPOLL_CTL_ADD);
+
+open_fail:
+read_fail:
+       fclose(file);
 
-int child_count = 0;
+no_count_cpus:
+       pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
+
+       max_jobs_pending = max_jobs * 10 + 25;
+       pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
+
+       return 0;
+}
+
+int poll_job_requests(int timeout)
+{
+       struct epoll_event event;
+       struct event_handler *job_handler;
+       int ret;
+
+       /* Convert positive seconds to milliseconds */
+       timeout = timeout > 0 ? 1000 * timeout : timeout;
+
+       ret = epoll_wait(epoll_fd, &event, 1, timeout);
+
+       if (ret == -1) {
+               if (errno != EINTR) {
+                       pr_err("epoll_wait: %m\n");
+                       return -1;
+               }
+
+               /*
+                * If epoll_wait() was interrupted, better start
+                * everything again from the beginning
+                */
+               return 0;
+       }
+
+       if (ret == 0) {
+               pr_info("Timed out\n");
+               goto out;
+       }
+
+       job_handler = event.data.ptr;
+
+       if (!job_handler || !job_handler->handle_event) {
+               pr_err("Corrupted event handler for fd %d\n",
+                       event.data.fd);
+               goto out;
+       }
+
+       pr_debug("Running handler %s to handle events from fd %d\n",
+               job_handler->name, job_handler->fd);
+       job_handler->handle_event(job_handler);
+
+out:
+       pr_info("Workers active: %u\n", workers_active);
+       return ret;
+}
+
+int notify_job_request(void)
+{
+       int ret;
+       char byte = 0;
+
+       ret = write(job_notify_fd[1], &byte, sizeof(byte));
+       if (ret < 0)
+               pr_err("Failed to write: %m\n");
+
+       return 0;
+}
 
 int do_fork(void)
 {
-       int child, error;
+       int child;
        child = fork();
        if (child < 0) {
-               error = errno;
-               fprintf(stderr, "fork() failed: %s\n", strerror(error));
+               pr_err("fork() failed: %m\n");
                return -1;
        }
 
        if (child) {
-               printf("%d: Forked child %d\n", getpid(), child);
-               child_count++;
+               pr_debug("Fork child %d\n", child);
                return child;
        }
 
-       /* reset child's child count */
-       child_count = 0;
        return 0;
 }
 
-int run(const char *cmd, char *const argv[])
+int clear_zombie(int pid)
 {
-       int child, error;
-       if ((child = do_fork()))
-           return child;
-
-       execvp(cmd, argv);
-       error = errno;
-       printf("Failed to execv command %s: %s\n", cmd, strerror(error));
-       exit(1);
-       return 0;
-}
+       int status;
+       struct rusage rusage;
+       char *status_str = NULL;
+       int code = 0;
 
-int harvest_zombies(int pid)
-{
-       int status, error;
+       if (pid)
+               pr_debug("Waiting on pid %d\n", pid);
 
-       if (child_count == 0)
-               return 0;
+       do {
+               pid = wait4(pid, &status, 0, &rusage);
+               if (pid < 0) {
+                       pr_err("Error on waitid(): %m\n");
+                       return 0;
+               }
+               /* Wait until the child has become a zombie */
+       } while (!WIFEXITED(status) && !WIFSIGNALED(status));
 
-       pid = waitpid(pid, &status, 0);
-       if (pid < 0) {
-               error = errno;
-               fprintf(stderr, "%d: Error on wait(): %s\n", getpid(),
-                       strerror(error));
+       if (WIFEXITED(status)) {
+               status_str = "exited with status";
+               code = WEXITSTATUS(status);
+       } else if (WIFSIGNALED(status)) {
+               status_str = "killed by signal";
+               code = WTERMSIG(status);
        }
-       else
-               printf("pid %d: exit %d. %d still running\n", pid, status,
-                      child_count--);
+       pr_debug("pid %d: %s %d.\n", pid,
+               status_str, code);
+       pr_debug("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
+               (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
+               (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
 
        return 1;
 }
@@ -62,43 +410,82 @@ int harvest_zombies(int pid)
  *
  * Returns the pid of the executed process
  */
-int run_piped(const char *cmd, char *const argv[], int *readfd, int *writefd)
+int run_piped(const char *cmd, char *const argv[],
+             int *stdinfd, int *stdoutfd, int *stderrfd)
 {
-       int rfd[2], wfd[2], error, pid;
+       int ifd[2], ofd[2], efd[2], pid;
+
+       pr_info("Running command %s\n", cmd);
 
-       if (pipe(rfd)) {
-               error = errno;
-               fprintf(stderr, "pipe() failed: %s\n", strerror(error));
+       if (stdinfd && pipe(ifd)) {
+               pr_err("pipe() failed: %m\n");
                return -1;
        }
 
-       if (pipe(wfd)) {
-               error = errno;
-               fprintf(stderr, "pipe() failed: %s\n", strerror(error));
-               return -1;
+       if (stdoutfd && pipe(ofd)) {
+               pr_err("pipe() failed: %m\n");
+               goto close_ifd;
+       }
+
+       if (stderrfd && pipe(efd)) {
+               pr_err("pipe() failed: %m\n");
+               goto close_ofd;
        }
 
        pid = do_fork();
-       if (pid) {
-               close(rfd[1]);
-               close(wfd[0]);
-               *readfd = rfd[0];
-               *writefd = wfd[1];
-               child_count++;
+       if (pid) { /* Parent side */
+               if (stdinfd) {
+                       close(ifd[0]);
+                       *stdinfd = ifd[0];
+               }
+
+               if (stdoutfd) {
+                       close(ofd[1]);
+                       *stdoutfd = ofd[0];
+               }
+
+               if (stderrfd) {
+                       close(efd[1]);
+                       *stderrfd = efd[0];
+               }
+
                return pid;
        }
 
-       close(rfd[0]);
-       close(wfd[1]);
-       dup2(wfd[0], STDIN_FILENO);
-       dup2(rfd[1], STDOUT_FILENO);
+       if (stdinfd) {
+               close(ifd[1]);
+               dup2(ifd[0], STDIN_FILENO);
+       }
+
+       if (stdoutfd) {
+               close(ofd[0]);
+               dup2(ofd[1], STDOUT_FILENO);
+       }
+
+       if (stderrfd) {
+               close(efd[0]);
+               dup2(efd[1], STDERR_FILENO);
+       }
 
-       /* Now we have redirected both stdin and stdout to parent process */
+       /* Now we have redirected standard streams to parent process */
        execvp(cmd, argv);
-       error = errno;
-       printf("Failed to execv command %s: %s\n", cmd, strerror(error));
+       pr_err("Failed to execv command %s: %m\n", cmd);
        exit(1);
+
        return 0;
+
+close_ofd:
+       if (stdoutfd) {
+               close(ofd[0]);
+               close(ofd[1]);
+       }
+close_ifd:
+       if (stdinfd) {
+               close(ifd[0]);
+               close(ifd[1]);
+       }
+
+       return -1;
 }
 
 /*
@@ -108,33 +495,234 @@ int run_piped(const char *cmd, char *const argv[], int *readfd, int *writefd)
  * Returns the pid of the executed process
  */
 int run_piped_stream(const char *cmd, char *const argv[],
-                    FILE **readf, FILE **writef)
+                    FILE **stdinf, FILE **stdoutf, FILE **stderrf)
 {
-       int rfd, wfd, pid, error;
+       int ifd, ofd, efd, pid;
+       int *i, *o, *e;
+
+       if (stdinf)
+               i = &ifd;
+       else
+               i = 0;
+       if (stdoutf)
+               o = &ofd;
+       else
+               o = 0;
+       if (stderrf)
+               e = &efd;
+       else
+               e = 0;
+
+       pid = run_piped(cmd, argv, i, o, e);
 
-       pid = run_piped(cmd, argv, &rfd, &wfd);
+       if (stdinf) {
+               *stdinf = fdopen(ifd, "w");
+               if (*stdinf == NULL) {
+                       pr_err("Error opening file stream for fd %d: %m\n",
+                              ifd);
+                       return -1;
+               }
+       }
 
-       if (readf) {
-               *readf = fdopen(rfd, "r");
-               if (*readf == NULL) {
-                       error = errno;
-                       fprintf(stderr,
-                               "Error opening file stream for fd %d: %s\n",
-                               rfd, strerror(error));
+       if (stdoutf) {
+               *stdoutf = fdopen(ofd, "r");
+               if (*stdoutf == NULL) {
+                       pr_err("Error opening file stream for fd %d: %m\n",
+                              ofd);
                        return -1;
                }
        }
 
-       if (writef) {
-               *writef = fdopen(wfd, "w");
-               if (*writef == NULL) {
-                       error = errno;
-                       fprintf(stderr,
-                               "Error opening file stream for fd %d: %s\n",
-                               wfd, strerror(error));
+       if (stderrf) {
+               *stderrf = fdopen(efd, "r");
+               if (*stderrf == NULL) {
+                       pr_err("Error opening file stream for fd %d: %m\n",
+                              efd);
                        return -1;
                }
        }
 
        return pid;
 }
+
+/*
+ * Forks a child and executes a command to run on parallel
+ */
+
+#define BUF_SIZE (128*1024)
+int run(const char *cmd, char *const argv[])
+{
+       int child, error;
+       int ofd, efd;
+       fd_set rfds;
+       int maxfd;
+       int eof = 0;
+
+       child = run_piped(cmd, argv, NULL, &ofd, &efd);
+
+       FD_ZERO(&rfds);
+       FD_SET(ofd, &rfds);
+       FD_SET(efd, &rfds);
+
+       while (!eof) {
+               char *sptr , *eptr;
+               char rbuf[BUF_SIZE];
+               int bytes;
+               int is_stderr = 0;
+
+               maxfd = max(ofd, efd);
+               error = select(maxfd, &rfds, NULL, NULL, NULL);
+
+               if (error < 0) {
+                       pr_err("Error with select: %m\n");
+                       break;
+               }
+
+               if (FD_ISSET(ofd, &rfds)) {
+                       bytes = read(ofd, rbuf, BUF_SIZE);
+                       goto print;
+               }
+
+               if (FD_ISSET(efd, &rfds)) {
+                       is_stderr = 1;
+                       bytes = read(efd, rbuf, BUF_SIZE);
+                       goto print;
+               }
+
+               pr_err("select() returned unknown fd\n");
+               break;
+
+print:
+               if (bytes < 0) {
+                       pr_err("read() failed: %m\n");
+                       break;
+               }
+
+               /*
+                * Workaround: When a process had die and it has only
+                * written to stderr, select() doesn't indicate that
+                * there might be something to read in stderr fd. To
+                * work around this issue, we try to read stderr just
+                * in case in order to ensure everything gets read.
+                */
+               if (bytes == 0) {
+                       bytes = read(efd, rbuf, BUF_SIZE);
+                       is_stderr = 1;
+                       eof = 1;
+               }
+
+               sptr = eptr = rbuf;
+               while (bytes--) {
+                       if (*eptr == '\n') {
+                               *eptr = 0;
+                               if (is_stderr)
+                                       pr_err("%s: stderr: %s\n",
+                                               cmd, sptr);
+                               else
+                                       pr_info("%s: stdout: %s\n",
+                                               cmd, sptr);
+                               sptr = eptr + 1;
+                       }
+                       eptr++;
+               }
+       }
+
+       close(ofd);
+       close(efd);
+
+       clear_zombie(child);
+
+       return 0;
+}
+
+int register_event_handler(struct event_handler *handler, int op)
+{
+       struct epoll_event ev;
+       int ret;
+       const char *str;
+
+       if (handler->fd <= 0) {
+               pr_err("Invalid file descriptor of %d\n", handler->fd);
+               return -1;
+       }
+
+       if (!handler->handle_event) {
+               pr_err("Handler callback missing\n");
+               return -1;
+       }
+
+       switch (op) {
+       case EPOLL_CTL_ADD:
+               str = "register";
+               break;
+
+       case EPOLL_CTL_MOD:
+               str = "modify";
+               break;
+
+       case EPOLL_CTL_DEL:
+               str = "deregister";
+               break;
+
+       default:
+               pr_err("Invalid op %d\n", op);
+               return -1;
+       }
+
+       pr_info("Doing a epoll %s for handler %s, fd %d\n", str,
+               handler->name, handler->fd);
+
+       ev.data.fd = handler->fd;
+       ev.data.ptr = handler;
+       ev.events = handler->events;
+       ret = epoll_ctl(epoll_fd, op, handler->fd, &ev);
+       if (ret) {
+               pr_err("Failed to do epoll_ctl %s: %m\n", str);
+               return -1;
+       }
+
+       return 0;
+}
+
+void _mutex_lock_acquired(struct mutex *lock, char *file, int line)
+{
+       lock->line = line;
+       lock->file = file;
+       pthread_getname_np(pthread_self(),
+                       lock->owner_name, sizeof(lock->owner_name));
+}
+
+int _mutex_lock(struct mutex *lock, char *file, int line)
+{
+       int ret = 0;
+       int contended = 0;
+
+       if (!pthread_mutex_trylock(&lock->lock))
+               goto out_lock;
+
+       contended = 1;
+       pr_debug("Lock contention at %s:%d on lock %s acquired by %s at %s:%d\n",
+               file, line, lock->name,
+               lock->owner_name, lock->file, lock->line);
+
+       ret = pthread_mutex_lock(&lock->lock);
+       if (ret)
+               pr_err("Acquirin lock %s failed: %m, acquired on %s:%d\n",
+                       lock->name, lock->file, lock->line);
+
+out_lock:
+       if (contended)
+               pr_debug("Lock %s acquired at %s:%d after contention\n",
+                       lock->name, file, line);
+       _mutex_lock_acquired(lock, file, line);
+       return ret;
+}
+
+int _mutex_unlock(struct mutex *lock)
+{
+       lock->line = 0;
+       lock->file = NULL;
+       pthread_mutex_unlock(&lock->lock);
+
+       return 0;
+}