]> git.itanic.dy.fi Git - rrdd/blobdiff - process.c
process: Handle threads dynamically
[rrdd] / process.c
index 7e739210f5f9829c1e70bed700935611820cd4cc..d4581065bc34bd416427dd636189d6e97bfdaa32 100644 (file)
--- a/process.c
+++ b/process.c
@@ -12,8 +12,6 @@
 
 static int epoll_fd;
 static unsigned int max_jobs;
-static unsigned int job_count;
-static unsigned int jobs_pending;
 static unsigned int max_jobs_pending;
 
 struct work_struct {
@@ -47,11 +45,12 @@ struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
        },
 };
 
-struct mutex work_pending_mutex = {
-       .name = "work_pending",
+struct mutex work_stats_mutex = {
+       .name = "work_stats",
        .lock = PTHREAD_MUTEX_INITIALIZER,
 };
-pthread_cond_t work_pending_cond = PTHREAD_COND_INITIALIZER;
+static int workers_active;
+static int worker_count;
 
 static int run_work_on_queue(struct work_queue *queue)
 {
@@ -69,14 +68,6 @@ static int run_work_on_queue(struct work_queue *queue)
        queue->work = work->next;
        queue->length--;
 
-       /*
-        * If queue is not empty, try waking up more workers. It is
-        * possible that when work were queued, the first worker did
-        * not wake up soon enough and
-        */
-       if (queue->length > 0)
-               pthread_cond_signal(&work_pending_cond);
-
        mutex_unlock(&queue->lock);
 
        pr_info("Executing work %s from queue %s, %d still pending\n",
@@ -91,21 +82,29 @@ static int run_work_on_queue(struct work_queue *queue)
 
 static void *worker_thread(void *arg)
 {
-       int ret;
-
+       int stop_working = 0;
+       int work_done = 0;
        char name[16];
 
-       snprintf(name, sizeof(name), "worker%ld", (long)arg);
+       mutex_lock(&work_stats_mutex);
+       snprintf(name, sizeof(name), "worker%d", worker_count);
+       worker_count++;
+       mutex_unlock(&work_stats_mutex);
+
        pthread_setname_np(pthread_self(), name);
+       pthread_detach(pthread_self());
+
+       pr_info("Worker started\n");
 
-       while (1) {
+       while (!stop_working) {
                while (1) {
-                       int prio, work_done = 0;
+                       int prio;
 
                        /*
-                        * Execute as many works from the queues as
-                        * there are, starting from highest priority
-                        * queue
+                        * Execute as much work from the high priority
+                        * queue as possible. Once there are no more
+                        * high prio work left, break out the loop and
+                        * see if we still need this many workers.
                         */
                        for (prio = 0; prio < WORK_PRIORITIES_NUM; prio++) {
                                work_done =
@@ -114,20 +113,20 @@ static void *worker_thread(void *arg)
                                        break;
                        }
 
-                       if (!work_done)
+                       if (!work_done || prio != WORK_PRIORITY_HIGH)
                                break;
                }
 
-               pr_info("Worker going to sleep\n");
-               ret = pthread_cond_wait(&work_pending_cond,
-                                       &work_pending_mutex.lock);
-               if (ret < 0)
-                       pr_err("Error: %m\n");
-
-               mutex_lock_acquired(&work_pending_mutex);
-
-               mutex_unlock(&work_pending_mutex);
-
+               mutex_lock(&work_stats_mutex);
+               if (workers_active > max_jobs || !work_done) {
+                       workers_active--;
+                       pr_info("Worker exiting, %d left active\n",
+                               workers_active);
+                       if (!workers_active)
+                               worker_count = 0;
+                       stop_working = 1;
+               }
+               mutex_unlock(&work_stats_mutex);
        }
 
        return NULL;
@@ -136,6 +135,7 @@ static void *worker_thread(void *arg)
 int queue_work(unsigned int priority, char *name,
        int (work_fn)(void *arg), void *arg)
 {
+       pthread_t *thread;
        struct work_queue *queue;
        struct work_struct *work, *last_work;
 
@@ -169,7 +169,21 @@ int queue_work(unsigned int priority, char *name,
        queue->length++;
        mutex_unlock(&queue->lock);
 
-       pthread_cond_signal(&work_pending_cond);
+       mutex_lock(&work_stats_mutex);
+       pr_info("workers_active: %d, priority: %d\n", workers_active, priority);
+       if (priority != WORK_PRIORITY_HIGH && workers_active >= max_jobs) {
+               mutex_unlock(&work_stats_mutex);
+               return 0;
+       }
+       workers_active++;
+       mutex_unlock(&work_stats_mutex);
+
+       pr_info("Creting new worker thread\n");
+       /* We need a worker thread, create one */
+       thread = calloc(sizeof(*thread), 1);
+       pthread_create(thread, NULL, worker_thread, NULL);
+
+       free(thread);
 
        return 0;
 }
@@ -187,8 +201,6 @@ int init_jobcontrol(int max_jobs_requested)
        int ret;
        char buf[256];
        char match[8];
-       pthread_t *thread;
-       long int i;
 
        epoll_fd = epoll_create(1);
        if (epoll_fd == -1) {
@@ -235,11 +247,6 @@ no_count_cpus:
        max_jobs_pending = max_jobs * 10 + 25;
        pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
 
-       /* Create worker threads */
-       thread = calloc(sizeof(*thread), max_jobs);
-       for (i = 0; i < max_jobs; i++)
-               pthread_create(&thread[i], NULL, worker_thread, (void *)i);
-
        return 0;
 }
 
@@ -285,7 +292,7 @@ int poll_job_requests(int timeout)
        job_handler->handle_event(job_handler);
 
 out:
-       pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
+       pr_info("Workers active: %u\n", workers_active);
        return ret;
 }