]> git.itanic.dy.fi Git - rrdd/commitdiff
process: Handle threads dynamically
authorTimo Kokkonen <timo.t.kokkonen@iki.fi>
Sat, 9 Jul 2016 17:18:13 +0000 (20:18 +0300)
committerTimo Kokkonen <timo.t.kokkonen@iki.fi>
Sat, 9 Jul 2016 17:50:42 +0000 (20:50 +0300)
Instead of hard coding thread number on start up, handle them
dynamically as needed.

The max_jobs variable determines how many workers there may be
running. However, there are also high priority works that should be
executed even though thread count is already maxed. Thus we start a
new worker whenever a high priority work is queued.

As soon as work is finished and there are no more work to do or there
are too many threads to do low priority work, threads will exit.

Signed-off-by: Timo Kokkonen <timo.t.kokkonen@iki.fi>
process.c

index 7e739210f5f9829c1e70bed700935611820cd4cc..d4581065bc34bd416427dd636189d6e97bfdaa32 100644 (file)
--- a/process.c
+++ b/process.c
@@ -12,8 +12,6 @@
 
 static int epoll_fd;
 static unsigned int max_jobs;
-static unsigned int job_count;
-static unsigned int jobs_pending;
 static unsigned int max_jobs_pending;
 
 struct work_struct {
@@ -47,11 +45,12 @@ struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
        },
 };
 
-struct mutex work_pending_mutex = {
-       .name = "work_pending",
+struct mutex work_stats_mutex = {
+       .name = "work_stats",
        .lock = PTHREAD_MUTEX_INITIALIZER,
 };
-pthread_cond_t work_pending_cond = PTHREAD_COND_INITIALIZER;
+static int workers_active;
+static int worker_count;
 
 static int run_work_on_queue(struct work_queue *queue)
 {
@@ -69,14 +68,6 @@ static int run_work_on_queue(struct work_queue *queue)
        queue->work = work->next;
        queue->length--;
 
-       /*
-        * If queue is not empty, try waking up more workers. It is
-        * possible that when work were queued, the first worker did
-        * not wake up soon enough and
-        */
-       if (queue->length > 0)
-               pthread_cond_signal(&work_pending_cond);
-
        mutex_unlock(&queue->lock);
 
        pr_info("Executing work %s from queue %s, %d still pending\n",
@@ -91,21 +82,29 @@ static int run_work_on_queue(struct work_queue *queue)
 
 static void *worker_thread(void *arg)
 {
-       int ret;
-
+       int stop_working = 0;
+       int work_done = 0;
        char name[16];
 
-       snprintf(name, sizeof(name), "worker%ld", (long)arg);
+       mutex_lock(&work_stats_mutex);
+       snprintf(name, sizeof(name), "worker%d", worker_count);
+       worker_count++;
+       mutex_unlock(&work_stats_mutex);
+
        pthread_setname_np(pthread_self(), name);
+       pthread_detach(pthread_self());
+
+       pr_info("Worker started\n");
 
-       while (1) {
+       while (!stop_working) {
                while (1) {
-                       int prio, work_done = 0;
+                       int prio;
 
                        /*
-                        * Execute as many works from the queues as
-                        * there are, starting from highest priority
-                        * queue
+                        * Execute as much work from the high priority
+                        * queue as possible. Once there are no more
+                        * high prio work left, break out the loop and
+                        * see if we still need this many workers.
                         */
                        for (prio = 0; prio < WORK_PRIORITIES_NUM; prio++) {
                                work_done =
@@ -114,20 +113,20 @@ static void *worker_thread(void *arg)
                                        break;
                        }
 
-                       if (!work_done)
+                       if (!work_done || prio != WORK_PRIORITY_HIGH)
                                break;
                }
 
-               pr_info("Worker going to sleep\n");
-               ret = pthread_cond_wait(&work_pending_cond,
-                                       &work_pending_mutex.lock);
-               if (ret < 0)
-                       pr_err("Error: %m\n");
-
-               mutex_lock_acquired(&work_pending_mutex);
-
-               mutex_unlock(&work_pending_mutex);
-
+               mutex_lock(&work_stats_mutex);
+               if (workers_active > max_jobs || !work_done) {
+                       workers_active--;
+                       pr_info("Worker exiting, %d left active\n",
+                               workers_active);
+                       if (!workers_active)
+                               worker_count = 0;
+                       stop_working = 1;
+               }
+               mutex_unlock(&work_stats_mutex);
        }
 
        return NULL;
@@ -136,6 +135,7 @@ static void *worker_thread(void *arg)
 int queue_work(unsigned int priority, char *name,
        int (work_fn)(void *arg), void *arg)
 {
+       pthread_t *thread;
        struct work_queue *queue;
        struct work_struct *work, *last_work;
 
@@ -169,7 +169,21 @@ int queue_work(unsigned int priority, char *name,
        queue->length++;
        mutex_unlock(&queue->lock);
 
-       pthread_cond_signal(&work_pending_cond);
+       mutex_lock(&work_stats_mutex);
+       pr_info("workers_active: %d, priority: %d\n", workers_active, priority);
+       if (priority != WORK_PRIORITY_HIGH && workers_active >= max_jobs) {
+               mutex_unlock(&work_stats_mutex);
+               return 0;
+       }
+       workers_active++;
+       mutex_unlock(&work_stats_mutex);
+
+       pr_info("Creting new worker thread\n");
+       /* We need a worker thread, create one */
+       thread = calloc(sizeof(*thread), 1);
+       pthread_create(thread, NULL, worker_thread, NULL);
+
+       free(thread);
 
        return 0;
 }
@@ -187,8 +201,6 @@ int init_jobcontrol(int max_jobs_requested)
        int ret;
        char buf[256];
        char match[8];
-       pthread_t *thread;
-       long int i;
 
        epoll_fd = epoll_create(1);
        if (epoll_fd == -1) {
@@ -235,11 +247,6 @@ no_count_cpus:
        max_jobs_pending = max_jobs * 10 + 25;
        pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
 
-       /* Create worker threads */
-       thread = calloc(sizeof(*thread), max_jobs);
-       for (i = 0; i < max_jobs; i++)
-               pthread_create(&thread[i], NULL, worker_thread, (void *)i);
-
        return 0;
 }
 
@@ -285,7 +292,7 @@ int poll_job_requests(int timeout)
        job_handler->handle_event(job_handler);
 
 out:
-       pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
+       pr_info("Workers active: %u\n", workers_active);
        return ret;
 }