]> git.itanic.dy.fi Git - rrdd/blob - process.c
process: Remove fork based concurrenct management
[rrdd] / process.c
1 #include <unistd.h>
2 #include <fcntl.h>
3 #include <sys/select.h>
4 #include <sys/epoll.h>
5 #include <stdio.h>
6 #include <sys/wait.h>
7 #include <sys/signalfd.h>
8 #include <sys/resource.h>
9
10 #include "process.h"
11 #include "debug.h"
12
13 static int epoll_fd;
14 static unsigned int max_jobs;
15 static unsigned int job_count;
16 static unsigned int jobs_pending;
17 static unsigned int max_jobs_pending;
18
19 struct work_struct {
20         const char *name;
21         int (*work_fn)(void *);
22         void *arg;
23         struct work_struct *next;
24 };
25
26 struct work_queue {
27         struct work_struct *work;
28         int length;
29         char *name;
30         struct mutex lock;
31 };
32
33 struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
34         {
35                 .name = "high priority",
36                 .lock = {
37                         .name = "high_prio_queue",
38                         .lock = PTHREAD_MUTEX_INITIALIZER,
39                 },
40         },
41         {
42                 .name = "low priority",
43                 .lock = {
44                         .name = "low_prio_queue",
45                         .lock = PTHREAD_MUTEX_INITIALIZER,
46                 },
47         },
48 };
49
50 struct mutex work_pending_mutex = {
51         .name = "work_pending",
52         .lock = PTHREAD_MUTEX_INITIALIZER,
53 };
54 pthread_cond_t work_pending_cond = PTHREAD_COND_INITIALIZER;
55
56 static int run_work_on_queue(struct work_queue *queue)
57 {
58         struct work_struct *work;
59
60         mutex_lock(&queue->lock);
61
62         if (!queue->work) {
63                 mutex_unlock(&queue->lock);
64                 return 0;
65         }
66
67         /* Take next work */
68         work = queue->work;
69         queue->work = work->next;
70         queue->length--;
71
72         /*
73          * If queue is not empty, try waking up more workers. It is
74          * possible that when work were queued, the first worker did
75          * not wake up soon enough and
76          */
77         if (queue->length > 0)
78                 pthread_cond_signal(&work_pending_cond);
79
80         mutex_unlock(&queue->lock);
81
82         pr_info("Executing work %s from queue %s, %d still pending\n",
83                 work->name, queue->name, queue->length);
84
85         work->work_fn(work->arg);
86         pr_info("Work %s done\n", work->name);
87         free(work);
88
89         return 1;
90 }
91
92 static void *worker_thread(void *arg)
93 {
94         int ret;
95
96         char name[16];
97
98         snprintf(name, sizeof(name), "worker%ld", (long)arg);
99         pthread_setname_np(pthread_self(), name);
100
101         while (1) {
102                 while (1) {
103                         int prio, work_done = 0;
104
105                         /*
106                          * Execute as many works from the queues as
107                          * there are, starting from highest priority
108                          * queue
109                          */
110                         for (prio = 0; prio < WORK_PRIORITIES_NUM; prio++) {
111                                 work_done =
112                                         run_work_on_queue(&work_queues[prio]);
113                                 if (work_done)
114                                         break;
115                         }
116
117                         if (!work_done)
118                                 break;
119                 }
120
121                 pr_info("Worker going to sleep\n");
122                 ret = pthread_cond_wait(&work_pending_cond,
123                                         &work_pending_mutex.lock);
124                 if (ret < 0)
125                         pr_err("Error: %m\n");
126
127                 mutex_lock_acquired(&work_pending_mutex);
128
129                 mutex_unlock(&work_pending_mutex);
130
131         }
132
133         return NULL;
134 }
135
136 int queue_work(unsigned int priority, char *name,
137         int (work_fn)(void *arg), void *arg)
138 {
139         struct work_queue *queue;
140         struct work_struct *work, *last_work;
141
142         if (priority >= WORK_PRIORITIES_NUM) {
143                 pr_err("Invalid priority: %d\n", priority);
144                 return -EINVAL;
145         }
146
147         work = calloc(sizeof(*work), 1);
148
149         work->name = name;
150         work->work_fn = work_fn;
151         work->arg = arg;
152
153         queue = &work_queues[priority];
154
155         /* Insert new work at the end of the work queue */
156         mutex_lock(&queue->lock);
157
158         last_work = queue->work;
159         while (last_work && last_work->next)
160                 last_work = last_work->next;
161
162         if (!last_work)
163                 queue->work = work;
164         else
165                 last_work->next = work;
166
167         pr_info("Inserted work %s in queue %s, with %d pending items\n",
168                 work->name, queue->name, queue->length);
169         queue->length++;
170         mutex_unlock(&queue->lock);
171
172         pthread_cond_signal(&work_pending_cond);
173
174         return 0;
175 }
176
177 /*
178  * Initialize the jobcontrol.
179  *
180  * Create the pipes that are used to grant children execution
181  * permissions. If max_jobs is zero, count the number of CPUs from
182  * /proc/cpuinfo and use that.
183  */
184 int init_jobcontrol(int max_jobs_requested)
185 {
186         FILE *file;
187         int ret;
188         char buf[256];
189         char match[8];
190         pthread_t *thread;
191         long int i;
192
193         epoll_fd = epoll_create(1);
194         if (epoll_fd == -1) {
195                 pr_err("Failed to epoll_create(): %m\n");
196                 return -1;
197         }
198
199         if (max_jobs_requested > 0) {
200                 max_jobs = max_jobs_requested;
201                 goto no_count_cpus;
202         }
203         max_jobs++;
204
205         file = fopen("/proc/cpuinfo", "ro");
206         if (!file) {
207                 pr_err("Failed to open /proc/cpuinfo: %m\n");
208                 goto open_fail;
209         }
210
211         /*
212          * The CPU count algorithm simply reads the first 8 bytes from
213          * the /proc/cpuinfo and then expects that line to be there as
214          * many times as there are CPUs.
215          */
216         ret = fread(match, 1, sizeof(match), file);
217         if (ret < sizeof(match)) {
218                 pr_err("read %d bytes when expecting %zd %m\n",
219                         ret, sizeof(match));
220                 goto read_fail;
221         }
222
223         while(fgets(buf, sizeof(buf), file)) {
224                 if (!strncmp(buf, match, sizeof(match)))
225                         max_jobs++;
226         }
227
228 open_fail:
229 read_fail:
230         fclose(file);
231
232 no_count_cpus:
233         pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
234
235         max_jobs_pending = max_jobs * 10 + 25;
236         pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
237
238         /* Create worker threads */
239         thread = calloc(sizeof(*thread), max_jobs);
240         for (i = 0; i < max_jobs; i++)
241                 pthread_create(&thread[i], NULL, worker_thread, (void *)i);
242
243         return 0;
244 }
245
246 int poll_job_requests(int timeout)
247 {
248         struct epoll_event event;
249         struct event_handler *job_handler;
250         int ret;
251
252         /* Convert positive seconds to milliseconds */
253         timeout = timeout > 0 ? 1000 * timeout : timeout;
254
255         ret = epoll_wait(epoll_fd, &event, 1, timeout);
256
257         if (ret == -1) {
258                 if (errno != EINTR) {
259                         pr_err("epoll_wait: %m\n");
260                         return -1;
261                 }
262
263                 /*
264                  * If epoll_wait() was interrupted, better start
265                  * everything again from the beginning
266                  */
267                 return 0;
268         }
269
270         if (ret == 0) {
271                 pr_info("Timed out\n");
272                 goto out;
273         }
274
275         job_handler = event.data.ptr;
276
277         if (!job_handler || !job_handler->handle_event) {
278                 pr_err("Corrupted event handler for fd %d\n",
279                         event.data.fd);
280                 goto out;
281         }
282
283         pr_debug("Running handler %s to handle events from fd %d\n",
284                 job_handler->name, job_handler->fd);
285         job_handler->handle_event(job_handler);
286
287 out:
288         pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
289         return ret;
290 }
291
292 int do_fork(void)
293 {
294         int child;
295         child = fork();
296         if (child < 0) {
297                 pr_err("fork() failed: %m\n");
298                 return -1;
299         }
300
301         if (child) {
302                 pr_debug("Fork child %d\n", child);
303                 return child;
304         }
305
306         return 0;
307 }
308
309 int clear_zombie(int pid)
310 {
311         int status;
312         struct rusage rusage;
313         char *status_str = NULL;
314         int code = 0;
315
316         if (pid)
317                 pr_debug("Waiting on pid %d\n", pid);
318
319         do {
320                 pid = wait4(pid, &status, 0, &rusage);
321                 if (pid < 0) {
322                         pr_err("Error on waitid(): %m\n");
323                         return 0;
324                 }
325                 /* Wait until the child has become a zombie */
326         } while (!WIFEXITED(status) && !WIFSIGNALED(status));
327
328         if (WIFEXITED(status)) {
329                 status_str = "exited with status";
330                 code = WEXITSTATUS(status);
331         } else if (WIFSIGNALED(status)) {
332                 status_str = "killed by signal";
333                 code = WTERMSIG(status);
334         }
335         pr_debug("pid %d: %s %d.\n", pid,
336                 status_str, code);
337         pr_debug("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
338                 (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
339                 (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
340
341         return 1;
342 }
343
344 /*
345  * Runs a command cmd with params argv, connects stdin and stdout to
346  * readfd and writefd
347  *
348  * Returns the pid of the executed process
349  */
350 int run_piped(const char *cmd, char *const argv[],
351               int *stdinfd, int *stdoutfd, int *stderrfd)
352 {
353         int ifd[2], ofd[2], efd[2], pid;
354
355         pr_info("Running command %s\n", cmd);
356
357         if (stdinfd && pipe(ifd)) {
358                 pr_err("pipe() failed: %m\n");
359                 return -1;
360         }
361
362         if (stdoutfd && pipe(ofd)) {
363                 pr_err("pipe() failed: %m\n");
364                 return -1;
365         }
366
367         if (stderrfd && pipe(efd)) {
368                 pr_err("pipe() failed: %m\n");
369                 return -1;
370         }
371
372         pid = do_fork();
373         if (pid) { /* Parent side */
374                 if (stdinfd) {
375                         close(ifd[0]);
376                         *stdinfd = ifd[0];
377                 }
378
379                 if (stdoutfd) {
380                         close(ofd[1]);
381                         *stdoutfd = ofd[0];
382                 }
383
384                 if (stderrfd) {
385                         close(efd[1]);
386                         *stderrfd = efd[0];
387                 }
388
389                 return pid;
390         }
391
392         if (stdinfd) {
393                 close(ifd[1]);
394                 dup2(ifd[0], STDIN_FILENO);
395         }
396
397         if (stdoutfd) {
398                 close(ofd[0]);
399                 dup2(ofd[1], STDOUT_FILENO);
400         }
401
402         if (stderrfd) {
403                 close(efd[0]);
404                 dup2(efd[1], STDERR_FILENO);
405         }
406
407         /* Now we have redirected standard streams to parent process */
408         execvp(cmd, argv);
409         pr_err("Failed to execv command %s: %m\n", cmd);
410         exit(1);
411
412         return 0;
413 }
414
415 /*
416  * Runs a command cmd with params argv, connects stdin and stdout to
417  * readfd and writefd
418  *
419  * Returns the pid of the executed process
420  */
421 int run_piped_stream(const char *cmd, char *const argv[],
422                      FILE **stdinf, FILE **stdoutf, FILE **stderrf)
423 {
424         int ifd, ofd, efd, pid;
425         int *i, *o, *e;
426
427         if (stdinf)
428                 i = &ifd;
429         else
430                 i = 0;
431         if (stdoutf)
432                 o = &ofd;
433         else
434                 o = 0;
435         if (stderrf)
436                 e = &efd;
437         else
438                 e = 0;
439
440         pid = run_piped(cmd, argv, i, o, e);
441
442         if (stdinf) {
443                 *stdinf = fdopen(ifd, "r");
444                 if (*stdinf == NULL) {
445                         pr_err("Error opening file stream for fd %d: %m\n",
446                                ifd);
447                         return -1;
448                 }
449         }
450
451         if (stdoutf) {
452                 *stdoutf = fdopen(ofd, "r");
453                 if (*stdoutf == NULL) {
454                         pr_err("Error opening file stream for fd %d: %m\n",
455                                ofd);
456                         return -1;
457                 }
458         }
459
460         if (stderrf) {
461                 *stderrf = fdopen(efd, "r");
462                 if (*stderrf == NULL) {
463                         pr_err("Error opening file stream for fd %d: %m\n",
464                                efd);
465                         return -1;
466                 }
467         }
468
469         return pid;
470 }
471
472 /*
473  * Forks a child and executes a command to run on parallel
474  */
475
476 #define max(a,b) (a) < (b) ? (b) : (a)
477 #define BUF_SIZE (128*1024)
478 int run(const char *cmd, char *const argv[])
479 {
480         int child, error;
481         int ofd, efd;
482         fd_set rfds;
483         int maxfd;
484         int eof = 0;
485
486         child = run_piped(cmd, argv, NULL, &ofd, &efd);
487
488         FD_ZERO(&rfds);
489         FD_SET(ofd, &rfds);
490         FD_SET(efd, &rfds);
491
492         while (!eof) {
493                 char *sptr , *eptr;
494                 char rbuf[BUF_SIZE];
495                 int bytes;
496                 int is_stderr = 0;
497
498                 maxfd = max(ofd, efd);
499                 error = select(maxfd, &rfds, NULL, NULL, NULL);
500
501                 if (error < 0) {
502                         pr_err("Error with select: %m\n");
503                         break;
504                 }
505
506                 if (FD_ISSET(ofd, &rfds)) {
507                         bytes = read(ofd, rbuf, BUF_SIZE);
508                         goto print;
509                 }
510
511                 if (FD_ISSET(efd, &rfds)) {
512                         is_stderr = 1;
513                         bytes = read(efd, rbuf, BUF_SIZE);
514                         goto print;
515                 }
516
517                 pr_err("select() returned unknown fd\n");
518                 break;
519
520 print:
521                 if (bytes < 0) {
522                         pr_err("read() failed: %m\n");
523                         break;
524                 }
525
526                 /*
527                  * Workaround: When a process had die and it has only
528                  * written to stderr, select() doesn't indicate that
529                  * there might be something to read in stderr fd. To
530                  * work around this issue, we try to read stderr just
531                  * in case in order to ensure everything gets read.
532                  */
533                 if (bytes == 0) {
534                         bytes = read(efd, rbuf, BUF_SIZE);
535                         is_stderr = 1;
536                         eof = 1;
537                 }
538
539                 sptr = eptr = rbuf;
540                 while (bytes--) {
541                         if (*eptr == '\n') {
542                                 *eptr = 0;
543                                 if (is_stderr)
544                                         pr_err("%s: stderr: %s\n",
545                                                 cmd, sptr);
546                                 else
547                                         pr_info("%s: stdout: %s\n",
548                                                 cmd, sptr);
549                                 sptr = eptr + 1;
550                         }
551                         eptr++;
552                 }
553         }
554
555         close(ofd);
556         close(efd);
557
558         clear_zombie(child);
559
560         return 0;
561 }
562
563 int register_event_handler(struct event_handler *handler)
564 {
565         struct epoll_event ev;
566         int ret;
567
568         if (handler->fd <= 0) {
569                 pr_err("Invalid file descriptor of %d\n", handler->fd);
570                 return -1;
571         }
572
573         if (!handler->handle_event) {
574                 pr_err("Handler callback missing\n");
575                 return -1;
576         }
577
578         pr_info("Registering handler for %s, fd %d\n",
579                 handler->name, handler->fd);
580
581         ev.data.fd = handler->fd;
582         ev.data.ptr = handler;
583         ev.events = handler->events;
584         ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handler->fd, &ev);
585         if (ret) {
586                 pr_err("Failed to add epoll_fd: %m\n");
587                 return -1;
588         }
589
590         return 0;
591 }
592
593 void _mutex_lock_acquired(struct mutex *lock, char *file, int line)
594 {
595         lock->line = line;
596         lock->file = file;
597 }
598
599 int _mutex_lock(struct mutex *lock, char *file, int line)
600 {
601         int ret = 0;
602
603         if (!pthread_mutex_trylock(&lock->lock))
604                 goto out_lock;
605
606         pr_info("Lock contention on lock %s on %s:%d\n",
607                 lock->name, lock->file, lock->line);
608
609         ret = pthread_mutex_lock(&lock->lock);
610         if (ret)
611                 pr_err("Acquirin lock %s failed: %m, acquired %s:%d\n",
612                         lock->name, lock->file, lock->line);
613
614 out_lock:
615         _mutex_lock_acquired(lock, file, line);
616         return ret;
617 }
618
619 int _mutex_unlock(struct mutex *lock)
620 {
621         lock->line = 0;
622         lock->file = NULL;
623         pthread_mutex_unlock(&lock->lock);
624
625         return 0;
626 }