]> git.itanic.dy.fi Git - rrdd/blob - process.c
plugins: Add version checking
[rrdd] / process.c
1 #define _GNU_SOURCE
2 #include <unistd.h>
3 #include <fcntl.h>
4 #include <sys/select.h>
5 #include <sys/epoll.h>
6 #include <stdio.h>
7 #include <sys/wait.h>
8 #include <sys/signalfd.h>
9 #include <sys/resource.h>
10
11 #include "process.h"
12 #include "debug.h"
13
14 static int child_count;
15 static int parent_count;
16 static int job_request_fd[2];
17 static int job_get_permission_fd[2];
18 static int epoll_fd;
19 static unsigned int max_jobs;
20 static unsigned int job_count;
21 static unsigned int jobs_pending;
22 static unsigned int max_jobs_pending;
23
24 int get_child_count(void)
25 {
26         return child_count;
27 }
28
29 int get_parent_count(void)
30 {
31         return parent_count;
32 }
33
34 static int handle_signals(struct event_handler *h)
35 {
36         struct signalfd_siginfo siginfo;
37         int ret;
38
39         ret = read(h->fd, &siginfo, sizeof(siginfo));
40         if (ret < sizeof(siginfo)) {
41                 pr_err("Expected %zd from read, got %d: %m\n",
42                         sizeof(siginfo), ret);
43                 return -1;
44         }
45
46         if (siginfo.ssi_signo != SIGCHLD) {
47                 pr_err("Unexpected signal %d, ignoring\n", siginfo.ssi_signo);
48                 return -1;
49         }
50
51         harvest_zombies(siginfo.ssi_pid);
52
53         return 0;
54 }
55
56 static int grant_new_job(void)
57 {
58         int ret;
59         char byte = 0;
60
61         job_count++;
62         pr_info("Granting new job. %d jobs currently and %d pending\n",
63                 job_count, jobs_pending);
64
65         ret = write(job_get_permission_fd[1], &byte, 1);
66         if (ret != 1) {
67                 pr_err("Failed to write 1 byte: %m\n");
68                 return -1;
69         }
70
71         return 0;
72 }
73
74 static int deny_job(void)
75 {
76         int ret;
77         char byte = -1;
78
79         pr_info("Denying new job. %d jobs currently and %d pending, "
80                 "limit of pending jobs is %d\n",
81                 job_count, jobs_pending, max_jobs_pending);
82
83         ret = write(job_get_permission_fd[1], &byte, 1);
84         if (ret != 1) {
85                 pr_err("Failed to write 1 byte: %m\n");
86                 return -1;
87         }
88
89         return 0;
90 }
91
92 static int handle_job_request(struct event_handler *h)
93 {
94         int ret, pid;
95
96         ret = read(job_request_fd[0], &pid, sizeof(pid));
97         if (ret < 0) {
98                 pr_err("Failed to read: %m\n");
99                 return -1;
100         }
101
102         if (ret == 0) {
103                 pr_info("Read zero bytes\n");
104                 return 0;
105         }
106
107         if (pid > 0) {
108                 if (job_count >= max_jobs) {
109                         if (jobs_pending < max_jobs_pending)
110                                 jobs_pending++;
111                         else
112                                 deny_job();
113                 } else {
114                         ret = grant_new_job();
115                         return 0;
116                 }
117         } else if (pid < 0) {
118                 if (job_count > max_jobs)
119                         pr_err("BUG: Job %u jobs exceeded limit %u\n",
120                                 job_count, max_jobs);
121
122                 pr_info("Job %d finished\n", -pid);
123                 job_count--;
124                 if (jobs_pending) {
125                         jobs_pending--;
126                         ret = grant_new_job();
127                         return 0;
128                 }
129         }
130
131         return 0;
132 }
133
134 struct event_handler signal_handler = {
135         .handle_event = handle_signals,
136         .events = EPOLLIN,
137         .name = "signal",
138 };
139
140 struct event_handler job_request_handler = {
141         .handle_event = handle_job_request,
142         .events = EPOLLIN,
143         .name = "job_request",
144 };
145
146 /*
147  * Initialize the jobcontrol.
148  *
149  * Create the pipes that are used to grant children execution
150  * permissions. If max_jobs is zero, count the number of CPUs from
151  * /proc/cpuinfo and use that.
152  */
153 int init_jobcontrol(int max_jobs_requested)
154 {
155         FILE *file;
156         int ret;
157         sigset_t sigmask;
158         char buf[256];
159         char match[8];
160
161         if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
162                 pr_err("Failed to create pipe: %m\n");
163                 return -1;
164         }
165
166         if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
167                 pr_err("Failed to create pipe: %m\n");
168                 return -1;
169         }
170
171         epoll_fd = epoll_create(1);
172         if (epoll_fd == -1) {
173                 pr_err("Failed to epoll_create(): %m\n");
174                 return -1;
175         }
176
177         job_request_handler.fd = job_request_fd[0];
178         register_event_handler(&job_request_handler);
179
180         sigemptyset(&sigmask);
181         sigaddset(&sigmask, SIGCHLD);
182
183         /* Block SIGCHLD so that it becomes readable via signalfd */
184         ret = sigprocmask(SIG_BLOCK, &sigmask, NULL);
185         if (ret < 0) {
186                 pr_err("Failed to sigprocmask: %m\n");
187         }
188
189         signal_handler.fd = signalfd(-1, &sigmask, SFD_CLOEXEC);
190         if (job_request_handler.fd < 0) {
191                 pr_err("Failed to create signal_fd: %m\n");
192                 return -1;
193         }
194
195         register_event_handler(&signal_handler);
196
197         if (max_jobs_requested > 0) {
198                 max_jobs = max_jobs_requested;
199                 goto no_count_cpus;
200         }
201         max_jobs++;
202
203         file = fopen("/proc/cpuinfo", "ro");
204         if (!file) {
205                 pr_err("Failed to open /proc/cpuinfo: %m\n");
206                 goto open_fail;
207         }
208
209         /*
210          * The CPU count algorithm simply reads the first 8 bytes from
211          * the /proc/cpuinfo and then expects that line to be there as
212          * many times as there are CPUs.
213          */
214         ret = fread(match, 1, sizeof(match), file);
215         if (ret < sizeof(match)) {
216                 pr_err("read %d bytes when expecting %zd %m\n",
217                         ret, sizeof(match));
218                 goto read_fail;
219         }
220
221         while(fgets(buf, sizeof(buf), file)) {
222                 if (!strncmp(buf, match, sizeof(match)))
223                         max_jobs++;
224         }
225
226 open_fail:
227 read_fail:
228         fclose(file);
229
230 no_count_cpus:
231         pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
232
233         max_jobs_pending = max_jobs * 50 + 25;
234         pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
235
236         return 0;
237 }
238
239 int poll_job_requests(int timeout)
240 {
241         struct epoll_event event;
242         struct event_handler *job_handler;
243         int ret;
244
245         /* Convert positive seconds to milliseconds */
246         timeout = timeout > 0 ? 1000 * timeout : timeout;
247
248         ret = epoll_wait(epoll_fd, &event, 1, timeout);
249
250         if (ret == -1) {
251                 if (errno != EINTR) {
252                         pr_err("epoll_wait: %m\n");
253                         return -1;
254                 }
255
256                 /*
257                  * If epoll_wait() was interrupted, better start
258                  * everything again from the beginning
259                  */
260                 return 0;
261         }
262
263         if (ret == 0) {
264                 pr_info("Timed out\n");
265                 goto out;
266         }
267
268         job_handler = event.data.ptr;
269
270         if (!job_handler || !job_handler->handle_event) {
271                 pr_err("Corrupted event handler for fd %d\n",
272                         event.data.fd);
273                 goto out;
274         }
275
276         pr_info("Running handler %s to handle events from fd %d\n",
277                 job_handler->name, job_handler->fd);
278         job_handler->handle_event(job_handler);
279
280 out:
281         pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
282         return ret;
283 }
284
285 /*
286  * Per process flag indicating whether this child has requested fork
287  * limiting. If it has, it must also tell the master parent when it
288  * has died so that the parent can give next pending job permission to
289  * go.
290  */
291 static int is_limited_fork;
292
293 int do_fork(void)
294 {
295         int child;
296         child = fork();
297         if (child < 0) {
298                 pr_err("fork() failed: %m\n");
299                 return -1;
300         }
301
302         if (child) {
303                 child_count++;
304                 pr_info("Fork %d, child %d\n", child_count, child);
305                 return child;
306         }
307
308         /*
309          * Also do not notify the master parent the death of this
310          * child. Only childs that have been created with
311          * do_fork_limited() can have this flag set.
312          */
313         is_limited_fork = 0;
314
315         /*
316          * Close unused ends of the job control pipes. Only the parent
317          * which controls the jobs may have the write end open of the
318          * job_get_permission_fd and the read end of the
319          * job_request_fd. Failing to close the pipe ends properly
320          * will cause the childs to wait forever for the run
321          * permission in case parent dies prematurely.
322          *
323          * Note! The file descriptor must be closed once and only
324          * once. They are marked to -1 to make it impossible for
325          * subseqent do_fork() calls from closing them again (in which
326          * case some other file descriptor might already be reserved
327          * for the same number) and prevent accidentally closing some
328          * innocent file descriptors that are still in use.
329          */
330         if (job_get_permission_fd[1] >= 0) {
331                 close(job_get_permission_fd[1]);
332                 job_get_permission_fd[1] = -1;
333         }
334         if (job_request_fd[0] >= 0) {
335                 close(job_request_fd[0]);
336                 job_request_fd[0] = -1;
337         }
338
339         /* reset child's child count */
340         child_count = 0;
341         parent_count++;
342         return 0;
343 }
344
345 static int request_fork(int request)
346 {
347         int pid = getpid();
348
349         pid = request > 0 ? pid : -pid;
350
351         return write(job_request_fd[1], &pid, sizeof(pid));
352 }
353
354 static void limited_fork_exit_handler(void)
355 {
356         if (is_limited_fork)
357                 request_fork(-1);
358 }
359
360 /*
361  * Like do_fork(), but allow the child continue only after the global
362  * job count is low enough.
363  *
364  * We allow the parent to continue other more important activities but
365  * child respects the limit of global active processes.
366  */
367 int do_fork_limited(void)
368 {
369         int child, ret;
370         char byte;
371
372         child = do_fork();
373         if (child)
374                 return child;
375
376         /* Remember to notify the parent when we are done */
377         atexit(limited_fork_exit_handler);
378         is_limited_fork = 1;
379
380         pr_info("Requesting permission to go\n");
381
382         /* Signal the parent that we are here, waiting to go */
383         request_fork(1);
384
385         /*
386          * The parent will tell us when we can continue. If there were
387          * multiple children waiting for their turn to run only one
388          * will be reading the content byte from the pipe and getting
389          * the permission to run.
390          */
391         ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
392         if (ret == 0)
393                 pr_err("Error requesting run, did the parent die?\n");
394
395         if (ret < 0)
396                 pr_err("Job control request failure: %m\n");
397
398         if (byte < 0) {
399                 pr_info("Did not get permission to execute. Terminating\n");
400
401                 /*
402                  * Avoid running exit handler, that would tell the
403                  * parent we died normally and decrement the job
404                  * counters.
405                  */
406                 raise(SIGKILL);
407         }
408
409         pr_info("Continuing\n");
410         return child;
411 }
412
413 int harvest_zombies(int pid)
414 {
415         int status;
416         struct rusage rusage;
417         char *status_str = NULL;
418         int code = 0;
419
420         if (child_count == 0)
421                 return 0;
422
423         if (pid)
424                 pr_info("Waiting on pid %d, children left: %d\n", pid, 
425                         child_count);
426
427         do {
428                 pid = wait4(pid, &status, 0, &rusage);
429                 if (pid < 0) {
430                         pr_err("Error on waitid(): %m\n");
431                         return 0;
432                 }
433                 /* Wait until the child has become a zombie */
434         } while (!WIFEXITED(status) && !WIFSIGNALED(status));
435
436         child_count--;
437         if (WIFEXITED(status)) {
438                 status_str = "exited with status";
439                 code = WEXITSTATUS(status);
440         } else if (WIFSIGNALED(status)) {
441                 status_str = "killed by signal";
442                 code = WTERMSIG(status);
443         }
444         pr_info("pid %d: %s %d. Children left: %d\n", pid,
445                 status_str, code, child_count);
446         pr_info("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
447                 (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
448                 (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
449
450         return 1;
451 }
452
453 /*
454  * Runs a command cmd with params argv, connects stdin and stdout to
455  * readfd and writefd
456  *
457  * Returns the pid of the executed process
458  */
459 int run_piped(const char *cmd, char *const argv[],
460               int *stdinfd, int *stdoutfd, int *stderrfd)
461 {
462         int ifd[2], ofd[2], efd[2], pid;
463
464         pr_info("Running command %s\n", cmd);
465
466         if (stdinfd && pipe(ifd)) {
467                 pr_err("pipe() failed: %m\n");
468                 return -1;
469         }
470
471         if (stdoutfd && pipe(ofd)) {
472                 pr_err("pipe() failed: %m\n");
473                 return -1;
474         }
475
476         if (stderrfd && pipe(efd)) {
477                 pr_err("pipe() failed: %m\n");
478                 return -1;
479         }
480
481         pid = do_fork();
482         if (pid) { /* Parent side */
483                 if (stdinfd) {
484                         close(ifd[0]);
485                         *stdinfd = ifd[0];
486                 }
487
488                 if (stdoutfd) {
489                         close(ofd[1]);
490                         *stdoutfd = ofd[0];
491                 }
492
493                 if (stderrfd) {
494                         close(efd[1]);
495                         *stderrfd = efd[0];
496                 }
497
498                 return pid;
499         }
500
501         if (stdinfd) {
502                 close(ifd[1]);
503                 dup2(ifd[0], STDIN_FILENO);
504         }
505
506         if (stdoutfd) {
507                 close(ofd[0]);
508                 dup2(ofd[1], STDOUT_FILENO);
509         }
510
511         if (stderrfd) {
512                 close(efd[0]);
513                 dup2(efd[1], STDERR_FILENO);
514         }
515
516         /* Now we have redirected standard streams to parent process */
517         execvp(cmd, argv);
518         pr_err("Failed to execv command %s: %m\n", cmd);
519         exit(1);
520
521         return 0;
522 }
523
524 /*
525  * Runs a command cmd with params argv, connects stdin and stdout to
526  * readfd and writefd
527  *
528  * Returns the pid of the executed process
529  */
530 int run_piped_stream(const char *cmd, char *const argv[],
531                      FILE **stdinf, FILE **stdoutf, FILE **stderrf)
532 {
533         int ifd, ofd, efd, pid;
534         int *i, *o, *e;
535
536         if (stdinf)
537                 i = &ifd;
538         else 
539                 i = 0;
540         if (stdoutf)
541                 o = &ofd;
542         else 
543                 o = 0;
544         if (stderrf)
545                 e = &efd;
546         else 
547                 e = 0;
548
549         pid = run_piped(cmd, argv, i, o, e);
550
551         if (stdinf) {
552                 *stdinf = fdopen(ifd, "r");
553                 if (*stdinf == NULL) {
554                         pr_err("Error opening file stream for fd %d: %m\n",
555                                ifd);
556                         return -1;
557                 }
558         }
559
560         if (stdoutf) {
561                 *stdoutf = fdopen(ofd, "r");
562                 if (*stdoutf == NULL) {
563                         pr_err("Error opening file stream for fd %d: %m\n",
564                                ofd);
565                         return -1;
566                 }
567         }
568
569         if (stderrf) {
570                 *stderrf = fdopen(efd, "r");
571                 if (*stderrf == NULL) {
572                         pr_err("Error opening file stream for fd %d: %m\n",
573                                efd);
574                         return -1;
575                 }
576         }
577
578         return pid;
579 }
580
581 /*
582  * Forks a child and executes a command to run on parallel
583  */
584
585 #define max(a,b) (a) < (b) ? (b) : (a)
586 #define BUF_SIZE (128*1024)
587 int run(const char *cmd, char *const argv[])
588 {
589         int child, error;
590         int ofd, efd;
591         fd_set rfds;
592         int maxfd;
593         int eof = 0;
594
595         if ((child = do_fork()))
596             return child;
597
598         child = run_piped(cmd, argv, NULL, &ofd, &efd);
599
600         FD_ZERO(&rfds);
601         FD_SET(ofd, &rfds);
602         FD_SET(efd, &rfds);
603
604         while (!eof) {
605                 char *sptr , *eptr;
606                 char rbuf[BUF_SIZE];
607                 int bytes;
608                 int is_stderr = 0;
609
610                 maxfd = max(ofd, efd);
611                 error = select(maxfd, &rfds, NULL, NULL, NULL);
612
613                 if (error < 0) {
614                         pr_err("Error with select: %m\n");
615                         break;
616                 }
617
618                 if (FD_ISSET(ofd, &rfds)) {
619                         bytes = read(ofd, rbuf, BUF_SIZE);
620                         goto print;
621                 }
622
623                 if (FD_ISSET(efd, &rfds)) {
624                         is_stderr = 1;
625                         bytes = read(efd, rbuf, BUF_SIZE);
626                         goto print;
627                 }
628
629                 pr_err("select() returned unknown fd\n");
630                 break;
631
632 print:
633                 if (bytes < 0) {
634                         pr_err("read() failed: %m\n");
635                         break;
636                 }
637
638                 /*
639                  * Workaround: When a process had die and it has only
640                  * written to stderr, select() doesn't indicate that
641                  * there might be something to read in stderr fd. To
642                  * work around this issue, we try to read stderr just
643                  * in case in order to ensure everything gets read.
644                  */
645                 if (bytes == 0) {
646                         bytes = read(efd, rbuf, BUF_SIZE);
647                         is_stderr = 1;
648                         eof = 1;
649                 }
650
651                 sptr = eptr = rbuf;
652                 while(bytes--) {
653                         if (*eptr == '\n') {
654                                 *eptr = 0;
655                                 if (is_stderr)
656                                         pr_err("%s: stderr: %s\n",
657                                                 cmd, sptr);
658                                 else
659                                         pr_info("%s: stdout: %s\n",
660                                                 cmd, sptr);
661                                 sptr = eptr;
662                         }
663                         eptr++;
664                 }
665         }
666
667         close(ofd);
668         close(efd);
669
670         harvest_zombies(child); 
671
672         exit(1);
673         return 0;
674 }
675
676 int register_event_handler(struct event_handler *handler)
677 {
678         struct epoll_event ev;
679         int ret;
680
681         if (handler->fd <= 0) {
682                 pr_err("Invalid file descriptor of %d\n", handler->fd);
683                 return -1;
684         }
685
686         if (!handler->handle_event) {
687                 pr_err("Handler callback missing\n");
688                 return -1;
689         }
690
691         pr_info("Registering handler for %s, fd %d\n",
692                 handler->name, handler->fd);
693
694         ev.data.fd = handler->fd;
695         ev.data.ptr = handler;
696         ev.events = handler->events;
697         ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handler->fd, &ev);
698         if (ret) {
699                 pr_err("Failed to add epoll_fd: %m\n");
700                 return -1;
701         }
702
703         return 0;
704 }