static int workers_active;
static int worker_count;
+static int job_notify_fd[2];
+
static int run_work_on_queue(struct work_queue *queue)
{
struct work_struct *work;
if (!workers_active)
worker_count = 0;
+ /*
+ * Kick the job poller. If all jobs were active at this point
+ * the scheduler thread will wait indefinitely until someone
+ * tells it to do something. We may now know when next job is
+ * available, so it is better for the scheduler to recalculate
+ * its sleep time.
+ */
+ notify_job_request();
+
mutex_unlock(&work_stats_mutex);
return NULL;
return 0;
}
+static int job_notify_handler(struct event_handler *h)
+{
+ int ret;
+ char buf[64];
+
+ ret = read(job_notify_fd[0], buf, sizeof(buf));
+ if (ret < 0)
+ pr_err("Failed to read: %m\n");
+
+ return 0;
+}
+
/*
* Initialize the jobcontrol.
*
*/
int init_jobcontrol(int max_jobs_requested)
{
+ static struct event_handler ev;
FILE *file;
int ret;
char buf[256];
max_jobs++;
}
+ ret = pipe(job_notify_fd);
+ if (ret) {
+ pr_err("pipe() failed: %m\n");
+ return ret;
+ }
+
+ ev.fd = job_notify_fd[0];
+ ev.events = EPOLLIN;
+ ev.handle_event = job_notify_handler;
+ ev.name = "job_notify";
+ register_event_handler(&ev, EPOLL_CTL_ADD);
+
open_fail:
read_fail:
fclose(file);
return ret;
}
+int notify_job_request(void)
+{
+ int ret;
+ char byte = 0;
+
+ ret = write(job_notify_fd[1], &byte, sizeof(byte));
+ if (ret < 0)
+ pr_err("Failed to write: %m\n");
+
+ return 0;
+}
+
int do_fork(void)
{
int child;
if (stdoutfd && pipe(ofd)) {
pr_err("pipe() failed: %m\n");
- return -1;
+ goto close_ifd;
}
if (stderrfd && pipe(efd)) {
pr_err("pipe() failed: %m\n");
- return -1;
+ goto close_ofd;
}
pid = do_fork();
exit(1);
return 0;
+
+close_ofd:
+ if (stdoutfd) {
+ close(ofd[0]);
+ close(ofd[1]);
+ }
+close_ifd:
+ if (stdinfd) {
+ close(ifd[0]);
+ close(ifd[1]);
+ }
+
+ return -1;
}
/*
pid = run_piped(cmd, argv, i, o, e);
if (stdinf) {
- *stdinf = fdopen(ifd, "r");
+ *stdinf = fdopen(ifd, "w");
if (*stdinf == NULL) {
pr_err("Error opening file stream for fd %d: %m\n",
ifd);
}
if (stdoutf) {
- *stdoutf = fdopen(ofd, "w");
+ *stdoutf = fdopen(ofd, "r");
if (*stdoutf == NULL) {
pr_err("Error opening file stream for fd %d: %m\n",
ofd);
goto out_lock;
contended = 1;
- pr_info("Lock contention at %s:%d on lock %s acquired by %s at %s:%d\n",
+ pr_debug("Lock contention at %s:%d on lock %s acquired by %s at %s:%d\n",
file, line, lock->name,
lock->owner_name, lock->file, lock->line);
out_lock:
if (contended)
- pr_info("Lock %s acquired at %s:%d after contention\n",
+ pr_debug("Lock %s acquired at %s:%d after contention\n",
lock->name, file, line);
_mutex_lock_acquired(lock, file, line);
return ret;