static unsigned int jobs_pending;
static unsigned int max_jobs_pending;
+struct work_struct {
+ const char *name;
+ int (*work_fn)(void *);
+ void *arg;
+ struct work_struct *next;
+};
+
+struct work_queue {
+ struct work_struct *work;
+ int length;
+ char *name;
+ struct mutex lock;
+};
+
+struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
+ {
+ .name = "high priority",
+ .lock = {
+ .name = "high_prio_queue",
+ .lock = PTHREAD_MUTEX_INITIALIZER,
+ },
+ },
+ {
+ .name = "low priority",
+ .lock = {
+ .name = "low_prio_queue",
+ .lock = PTHREAD_MUTEX_INITIALIZER,
+ },
+ },
+};
+
+struct mutex work_pending_mutex = {
+ .name = "work_pending",
+ .lock = PTHREAD_MUTEX_INITIALIZER,
+};
+pthread_cond_t work_pending_cond = PTHREAD_COND_INITIALIZER;
+
+static int run_work_on_queue(struct work_queue *queue)
+{
+ struct work_struct *work;
+
+ mutex_lock(&queue->lock);
+
+ if (!queue->work) {
+ pr_info("No work to run on queue %s\n", queue->name);
+ mutex_unlock(&queue->lock);
+ return 0;
+ }
+
+ /* Take next work */
+ work = queue->work;
+ queue->work = work->next;
+ queue->length--;
+
+ /*
+ * If queue is not empty, try waking up more workers. It is
+ * possible that when work were queued, the first worker did
+ * not wake up soon enough and
+ */
+ if (queue->length > 0)
+ pthread_cond_signal(&work_pending_cond);
+
+ mutex_unlock(&queue->lock);
+
+ pr_info("Executing work %s from queue %s, %d still pending\n",
+ work->name, queue->name, queue->length);
+
+ work->work_fn(work->arg);
+ pr_info("Work %s done\n", work->name);
+ free(work);
+
+ return 1;
+}
+
+static void *worker_thread(void *arg)
+{
+ int ret;
+
+ char name[16];
+
+ snprintf(name, sizeof(name), "worker%ld", (long)arg);
+ pthread_setname_np(pthread_self(), name);
+
+ while (1) {
+ while (1) {
+ int prio, work_done = 0;
+
+ /*
+ * Execute as many works from the queues as
+ * there are, starting from highest priority
+ * queue
+ */
+ for (prio = 0; prio < WORK_PRIORITIES_NUM; prio++) {
+ work_done =
+ run_work_on_queue(&work_queues[prio]);
+ if (work_done)
+ break;
+ }
+
+ if (!work_done)
+ break;
+ }
+
+ pr_info("Worker going to sleep\n");
+ ret = pthread_cond_wait(&work_pending_cond,
+ &work_pending_mutex.lock);
+ if (ret < 0)
+ pr_err("Error: %m\n");
+
+ mutex_lock_acquired(&work_pending_mutex);
+
+ mutex_unlock(&work_pending_mutex);
+
+ }
+
+ return NULL;
+}
+
+int queue_work(unsigned int priority, char *name,
+ int (work_fn)(void *arg), void *arg)
+{
+ struct work_queue *queue;
+ struct work_struct *work, *last_work;
+
+ if (priority >= WORK_PRIORITIES_NUM) {
+ pr_err("Invalid priority: %d\n", priority);
+ return -EINVAL;
+ }
+
+ work = calloc(sizeof(*work), 1);
+
+ work->name = name;
+ work->work_fn = work_fn;
+ work->arg = arg;
+
+ queue = &work_queues[priority];
+
+ /* Insert new work at the end of the work queue */
+ mutex_lock(&queue->lock);
+
+ last_work = queue->work;
+ while (last_work && last_work->next)
+ last_work = last_work->next;
+
+ if (!last_work)
+ queue->work = work;
+ else
+ last_work->next = work;
+
+ pr_info("Inserted work %s in queue %s, with %d pending items\n",
+ work->name, queue->name, queue->length);
+ queue->length++;
+ mutex_unlock(&queue->lock);
+
+ pthread_cond_signal(&work_pending_cond);
+
+ return 0;
+}
+
int get_child_count(void)
{
return child_count;
sigset_t sigmask;
char buf[256];
char match[8];
+ pthread_t *thread;
+ int i;
if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
pr_err("Failed to create pipe: %m\n");
max_jobs_pending = max_jobs * 10 + 25;
pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
+ /* Create worker threads */
+ thread = calloc(sizeof(*thread), max_jobs);
+ for (i = 0; i < max_jobs; i++)
+ pthread_create(&thread[i], NULL, worker_thread, (void *)i);
+
+ /*
+ * Magic sleep. There are too many fork() calls at the moment
+ * so we must ensure our threads don't print anything out
+ * while a fork() is executed. Otherwise the child will
+ * inherit glibc internal locks while they are locked, and
+ * function calls such as printf will deadlock.
+ */
+ sleep(1);
+
return 0;
}