static int epoll_fd;
static unsigned int max_jobs;
-static unsigned int job_count;
-static unsigned int jobs_pending;
static unsigned int max_jobs_pending;
struct work_struct {
},
};
-struct mutex work_pending_mutex = {
- .name = "work_pending",
+struct mutex work_stats_mutex = {
+ .name = "work_stats",
.lock = PTHREAD_MUTEX_INITIALIZER,
};
-pthread_cond_t work_pending_cond = PTHREAD_COND_INITIALIZER;
+static int workers_active;
+static int worker_count;
static int run_work_on_queue(struct work_queue *queue)
{
queue->work = work->next;
queue->length--;
- /*
- * If queue is not empty, try waking up more workers. It is
- * possible that when work were queued, the first worker did
- * not wake up soon enough and
- */
- if (queue->length > 0)
- pthread_cond_signal(&work_pending_cond);
-
mutex_unlock(&queue->lock);
pr_info("Executing work %s from queue %s, %d still pending\n",
static void *worker_thread(void *arg)
{
- int ret;
-
+ int stop_working = 0;
+ int work_done = 0;
char name[16];
- snprintf(name, sizeof(name), "worker%ld", (long)arg);
+ mutex_lock(&work_stats_mutex);
+ snprintf(name, sizeof(name), "worker%d", worker_count);
+ worker_count++;
+ mutex_unlock(&work_stats_mutex);
+
pthread_setname_np(pthread_self(), name);
+ pthread_detach(pthread_self());
+
+ pr_info("Worker started\n");
- while (1) {
+ while (!stop_working) {
while (1) {
- int prio, work_done = 0;
+ int prio;
/*
- * Execute as many works from the queues as
- * there are, starting from highest priority
- * queue
+ * Execute as much work from the high priority
+ * queue as possible. Once there are no more
+ * high prio work left, break out the loop and
+ * see if we still need this many workers.
*/
for (prio = 0; prio < WORK_PRIORITIES_NUM; prio++) {
work_done =
break;
}
- if (!work_done)
+ if (!work_done || prio != WORK_PRIORITY_HIGH)
break;
}
- pr_info("Worker going to sleep\n");
- ret = pthread_cond_wait(&work_pending_cond,
- &work_pending_mutex.lock);
- if (ret < 0)
- pr_err("Error: %m\n");
-
- mutex_lock_acquired(&work_pending_mutex);
-
- mutex_unlock(&work_pending_mutex);
-
+ mutex_lock(&work_stats_mutex);
+ if (workers_active > max_jobs || !work_done) {
+ workers_active--;
+ pr_info("Worker exiting, %d left active\n",
+ workers_active);
+ if (!workers_active)
+ worker_count = 0;
+ stop_working = 1;
+ }
+ mutex_unlock(&work_stats_mutex);
}
return NULL;
int queue_work(unsigned int priority, char *name,
int (work_fn)(void *arg), void *arg)
{
+ pthread_t *thread;
struct work_queue *queue;
struct work_struct *work, *last_work;
queue->length++;
mutex_unlock(&queue->lock);
- pthread_cond_signal(&work_pending_cond);
+ mutex_lock(&work_stats_mutex);
+ pr_info("workers_active: %d, priority: %d\n", workers_active, priority);
+ if (priority != WORK_PRIORITY_HIGH && workers_active >= max_jobs) {
+ mutex_unlock(&work_stats_mutex);
+ return 0;
+ }
+ workers_active++;
+ mutex_unlock(&work_stats_mutex);
+
+ pr_info("Creting new worker thread\n");
+ /* We need a worker thread, create one */
+ thread = calloc(sizeof(*thread), 1);
+ pthread_create(thread, NULL, worker_thread, NULL);
+
+ free(thread);
return 0;
}
int ret;
char buf[256];
char match[8];
- pthread_t *thread;
- long int i;
epoll_fd = epoll_create(1);
if (epoll_fd == -1) {
max_jobs_pending = max_jobs * 10 + 25;
pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
- /* Create worker threads */
- thread = calloc(sizeof(*thread), max_jobs);
- for (i = 0; i < max_jobs; i++)
- pthread_create(&thread[i], NULL, worker_thread, (void *)i);
-
return 0;
}
job_handler->handle_event(job_handler);
out:
- pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
+ pr_info("Workers active: %u\n", workers_active);
return ret;
}