]> git.itanic.dy.fi Git - rrdd/blob - process.c
config: Fix copy paste errors in error messages
[rrdd] / process.c
1 #define _GNU_SOURCE
2 #include <unistd.h>
3 #include <fcntl.h>
4 #include <sys/select.h>
5 #include <sys/epoll.h>
6 #include <stdio.h>
7 #include <sys/wait.h>
8 #include <sys/signalfd.h>
9 #include <sys/resource.h>
10
11 #include "process.h"
12 #include "debug.h"
13
14 static int child_count;
15 static int parent_count;
16 static int job_request_fd[2];
17 static int job_get_permission_fd[2];
18 static int epoll_fd;
19 static unsigned int max_jobs;
20 static unsigned int job_count;
21 static unsigned int jobs_pending;
22
23 int get_child_count(void)
24 {
25         return child_count;
26 }
27
28 int get_parent_count(void)
29 {
30         return parent_count;
31 }
32
33 static int handle_signals(struct event_handler *h)
34 {
35         struct signalfd_siginfo siginfo;
36         int ret;
37
38         ret = read(h->fd, &siginfo, sizeof(siginfo));
39         if (ret < sizeof(siginfo)) {
40                 pr_err("Expected %zd from read, got %d: %m\n",
41                         sizeof(siginfo), ret);
42                 return -1;
43         }
44
45         if (siginfo.ssi_signo != SIGCHLD) {
46                 pr_err("Unexpected signal %d, ignoring\n", siginfo.ssi_signo);
47                 return -1;
48         }
49
50         harvest_zombies(siginfo.ssi_pid);
51
52         return 0;
53 }
54
55 static int grant_new_job(void)
56 {
57         int ret;
58         char byte = 0;
59
60         job_count++;
61         pr_info("Granting new job. %d jobs currently and %d pending\n",
62                 job_count, jobs_pending);
63
64         ret = write(job_get_permission_fd[1], &byte, 1);
65         if (ret != 1) {
66                 pr_err("Failed to write 1 byte: %m\n");
67                 return -1;
68         }
69
70         return 0;
71 }
72
73 static int handle_job_request(struct event_handler *h)
74 {
75         int ret, pid;
76
77         ret = read(job_request_fd[0], &pid, sizeof(pid));
78         if (ret < 0) {
79                 pr_err("Failed to read: %m\n");
80                 return -1;
81         }
82
83         if (ret == 0) {
84                 pr_info("Read zero bytes\n");
85                 return 0;
86         }
87
88         if (pid > 0) {
89                 if (job_count >= max_jobs) {
90                         jobs_pending++;
91                 } else {
92                         ret = grant_new_job();
93                         return 0;
94                 }
95         } else if (pid < 0) {
96                 if (job_count > max_jobs)
97                         pr_err("BUG: Job %u jobs exceeded limit %u\n",
98                                 job_count, max_jobs);
99
100                 pr_info("Job %d finished\n", -pid);
101                 job_count--;
102                 if (jobs_pending) {
103                         jobs_pending--;
104                         ret = grant_new_job();
105                         return 0;
106                 }
107         }
108
109         return 0;
110 }
111
112 struct event_handler signal_handler = {
113         .handle_event = handle_signals,
114         .events = EPOLLIN,
115         .name = "signal",
116 };
117
118 struct event_handler job_request_handler = {
119         .handle_event = handle_job_request,
120         .events = EPOLLIN,
121         .name = "job_request",
122 };
123
124 /*
125  * Initialize the jobcontrol.
126  *
127  * Create the pipes that are used to grant children execution
128  * permissions. If max_jobs is zero, count the number of CPUs from
129  * /proc/cpuinfo and use that.
130  */
131 int init_jobcontrol(int max_jobs_requested)
132 {
133         FILE *file;
134         int ret;
135         sigset_t sigmask;
136         char buf[256];
137         char match[8];
138
139         if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
140                 pr_err("Failed to create pipe: %m\n");
141                 return -1;
142         }
143
144         if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
145                 pr_err("Failed to create pipe: %m\n");
146                 return -1;
147         }
148
149         epoll_fd = epoll_create(1);
150         if (epoll_fd == -1) {
151                 pr_err("Failed to epoll_create(): %m\n");
152                 return -1;
153         }
154
155         job_request_handler.fd = job_request_fd[0];
156         register_event_handler(&job_request_handler);
157
158         sigemptyset(&sigmask);
159         sigaddset(&sigmask, SIGCHLD);
160
161         /* Block SIGCHLD so that it becomes readable via signalfd */
162         ret = sigprocmask(SIG_BLOCK, &sigmask, NULL);
163         if (ret < 0) {
164                 pr_err("Failed to sigprocmask: %m\n");
165         }
166
167         signal_handler.fd = signalfd(-1, &sigmask, SFD_CLOEXEC);
168         if (job_request_handler.fd < 0) {
169                 pr_err("Failed to create signal_fd: %m\n");
170                 return -1;
171         }
172
173         register_event_handler(&signal_handler);
174
175         if (max_jobs_requested > 0) {
176                 max_jobs = max_jobs_requested;
177                 goto no_count_cpus;
178         }
179         max_jobs++;
180
181         file = fopen("/proc/cpuinfo", "ro");
182         if (!file) {
183                 pr_err("Failed to open /proc/cpuinfo: %m\n");
184                 goto open_fail;
185         }
186
187         /*
188          * The CPU count algorithm simply reads the first 8 bytes from
189          * the /proc/cpuinfo and then expects that line to be there as
190          * many times as there are CPUs.
191          */
192         ret = fread(match, 1, sizeof(match), file);
193         if (ret < sizeof(match)) {
194                 pr_err("read %d bytes when expecting %zd %m\n",
195                         ret, sizeof(match));
196                 goto read_fail;
197         }
198
199         while(fgets(buf, sizeof(buf), file)) {
200                 if (!strncmp(buf, match, sizeof(match)))
201                         max_jobs++;
202         }
203
204 open_fail:
205 read_fail:
206         fclose(file);
207
208 no_count_cpus:
209         pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
210
211         return 0;
212 }
213
214 int poll_job_requests(int timeout)
215 {
216         struct epoll_event event;
217         struct event_handler *job_handler;
218         int ret;
219
220         /* Convert positive seconds to milliseconds */
221         timeout = timeout > 0 ? 1000 * timeout : timeout;
222
223         ret = epoll_wait(epoll_fd, &event, 1, timeout);
224
225         if (ret == -1) {
226                 if (errno != EINTR) {
227                         pr_err("epoll_wait: %m\n");
228                         return -1;
229                 }
230
231                 /*
232                  * If epoll_wait() was interrupted, better start
233                  * everything again from the beginning
234                  */
235                 return 0;
236         }
237
238         if (ret == 0) {
239                 pr_info("Timed out\n");
240                 goto out;
241         }
242
243         job_handler = event.data.ptr;
244
245         if (!job_handler || !job_handler->handle_event) {
246                 pr_err("Corrupted event handler for fd %d\n",
247                         event.data.fd);
248                 goto out;
249         }
250
251         pr_info("Running handler %s to handle events from fd %d\n",
252                 job_handler->name, job_handler->fd);
253         job_handler->handle_event(job_handler);
254
255 out:
256         pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
257         return ret;
258 }
259
260 /*
261  * Per process flag indicating whether this child has requested fork
262  * limiting. If it has, it must also tell the master parent when it
263  * has died so that the parent can give next pending job permission to
264  * go.
265  */
266 static int is_limited_fork;
267
268 int do_fork(void)
269 {
270         int child;
271         child = fork();
272         if (child < 0) {
273                 pr_err("fork() failed: %m\n");
274                 return -1;
275         }
276
277         if (child) {
278                 child_count++;
279                 pr_info("Fork %d, child %d\n", child_count, child);
280                 return child;
281         }
282
283         /*
284          * Also do not notify the master parent the death of this
285          * child. Only childs that have been created with
286          * do_fork_limited() can have this flag set.
287          */
288         is_limited_fork = 0;
289
290         /*
291          * Close unused ends of the job control pipes. Only the parent
292          * which controls the jobs may have the write end open of the
293          * job_get_permission_fd and the read end of the
294          * job_request_fd. Failing to close the pipe ends properly
295          * will cause the childs to wait forever for the run
296          * permission in case parent dies prematurely.
297          *
298          * Note! The file descriptor must be closed once and only
299          * once. They are marked to -1 to make it impossible for
300          * subseqent do_fork() calls from closing them again (in which
301          * case some other file descriptor might already be reserved
302          * for the same number) and prevent accidentally closing some
303          * innocent file descriptors that are still in use.
304          */
305         if (job_get_permission_fd[1] >= 0) {
306                 close(job_get_permission_fd[1]);
307                 job_get_permission_fd[1] = -1;
308         }
309         if (job_request_fd[0] >= 0) {
310                 close(job_request_fd[0]);
311                 job_request_fd[0] = -1;
312         }
313
314         /* reset child's child count */
315         child_count = 0;
316         parent_count++;
317         return 0;
318 }
319
320 static int request_fork(int request)
321 {
322         int pid = getpid();
323
324         pid = request > 0 ? pid : -pid;
325
326         return write(job_request_fd[1], &pid, sizeof(pid));
327 }
328
329 static void limited_fork_exit_handler(void)
330 {
331         if (is_limited_fork)
332                 request_fork(-1);
333 }
334
335 /*
336  * Like do_fork(), but allow the child continue only after the global
337  * job count is low enough.
338  *
339  * We allow the parent to continue other more important activities but
340  * child respects the limit of global active processes.
341  */
342 int do_fork_limited(void)
343 {
344         int child, ret;
345         char byte;
346
347         child = do_fork();
348         if (child)
349                 return child;
350
351         /* Remember to notify the parent when we are done */
352         atexit(limited_fork_exit_handler);
353         is_limited_fork = 1;
354
355         pr_info("Requesting permission to go\n");
356
357         /* Signal the parent that we are here, waiting to go */
358         request_fork(1);
359
360         /*
361          * The parent will tell us when we can continue. If there were
362          * multiple children waiting for their turn to run only one
363          * will be reading the content byte from the pipe and getting
364          * the permission to run.
365          */
366         ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
367         if (ret == 0)
368                 pr_err("Error requesting run, did the parent die?\n");
369
370         if (ret < 0)
371                 pr_err("Job control request failure: %m\n");
372
373         pr_info("Continuing\n");
374         return child;
375 }
376
377 int harvest_zombies(int pid)
378 {
379         int status;
380         struct rusage rusage;
381         char *status_str = NULL;
382         int code = 0;
383
384         if (child_count == 0)
385                 return 0;
386
387         if (pid)
388                 pr_info("Waiting on pid %d, children left: %d\n", pid, 
389                         child_count);
390
391         do {
392                 pid = wait4(pid, &status, 0, &rusage);
393                 if (pid < 0) {
394                         pr_err("Error on waitid(): %m\n");
395                         return 0;
396                 }
397                 /* Wait until the child has become a zombie */
398         } while (!WIFEXITED(status) && !WIFSIGNALED(status));
399
400         child_count--;
401         if (WIFEXITED(status)) {
402                 status_str = "exited with status";
403                 code = WEXITSTATUS(status);
404         } else if (WIFSIGNALED(status)) {
405                 status_str = "killed by signal";
406                 code = WTERMSIG(status);
407         }
408         pr_info("pid %d: %s %d. Children left: %d\n", pid,
409                 status_str, code, child_count);
410         pr_info("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
411                 (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
412                 (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
413
414         return 1;
415 }
416
417 /*
418  * Runs a command cmd with params argv, connects stdin and stdout to
419  * readfd and writefd
420  *
421  * Returns the pid of the executed process
422  */
423 int run_piped(const char *cmd, char *const argv[],
424               int *stdinfd, int *stdoutfd, int *stderrfd)
425 {
426         int ifd[2], ofd[2], efd[2], pid;
427
428         pr_info("Running command %s\n", cmd);
429
430         if (stdinfd && pipe(ifd)) {
431                 pr_err("pipe() failed: %m\n");
432                 return -1;
433         }
434
435         if (stdoutfd && pipe(ofd)) {
436                 pr_err("pipe() failed: %m\n");
437                 return -1;
438         }
439
440         if (stderrfd && pipe(efd)) {
441                 pr_err("pipe() failed: %m\n");
442                 return -1;
443         }
444
445         pid = do_fork();
446         if (pid) { /* Parent side */
447                 if (stdinfd) {
448                         close(ifd[0]);
449                         *stdinfd = ifd[0];
450                 }
451
452                 if (stdoutfd) {
453                         close(ofd[1]);
454                         *stdoutfd = ofd[0];
455                 }
456
457                 if (stderrfd) {
458                         close(efd[1]);
459                         *stderrfd = efd[0];
460                 }
461
462                 return pid;
463         }
464
465         if (stdinfd) {
466                 close(ifd[1]);
467                 dup2(ifd[0], STDIN_FILENO);
468         }
469
470         if (stdoutfd) {
471                 close(ofd[0]);
472                 dup2(ofd[1], STDOUT_FILENO);
473         }
474
475         if (stderrfd) {
476                 close(efd[0]);
477                 dup2(efd[1], STDERR_FILENO);
478         }
479
480         /* Now we have redirected standard streams to parent process */
481         execvp(cmd, argv);
482         pr_err("Failed to execv command %s: %m\n", cmd);
483         exit(1);
484
485         return 0;
486 }
487
488 /*
489  * Runs a command cmd with params argv, connects stdin and stdout to
490  * readfd and writefd
491  *
492  * Returns the pid of the executed process
493  */
494 int run_piped_stream(const char *cmd, char *const argv[],
495                      FILE **stdinf, FILE **stdoutf, FILE **stderrf)
496 {
497         int ifd, ofd, efd, pid;
498         int *i, *o, *e;
499
500         if (stdinf)
501                 i = &ifd;
502         else 
503                 i = 0;
504         if (stdoutf)
505                 o = &ofd;
506         else 
507                 o = 0;
508         if (stderrf)
509                 e = &efd;
510         else 
511                 e = 0;
512
513         pid = run_piped(cmd, argv, i, o, e);
514
515         if (stdinf) {
516                 *stdinf = fdopen(ifd, "r");
517                 if (*stdinf == NULL) {
518                         pr_err("Error opening file stream for fd %d: %m\n",
519                                ifd);
520                         return -1;
521                 }
522         }
523
524         if (stdoutf) {
525                 *stdoutf = fdopen(ofd, "r");
526                 if (*stdoutf == NULL) {
527                         pr_err("Error opening file stream for fd %d: %m\n",
528                                ofd);
529                         return -1;
530                 }
531         }
532
533         if (stderrf) {
534                 *stderrf = fdopen(efd, "r");
535                 if (*stderrf == NULL) {
536                         pr_err("Error opening file stream for fd %d: %m\n",
537                                efd);
538                         return -1;
539                 }
540         }
541
542         return pid;
543 }
544
545 /*
546  * Forks a child and executes a command to run on parallel
547  */
548
549 #define max(a,b) (a) < (b) ? (b) : (a)
550 #define BUF_SIZE (128*1024)
551 int run(const char *cmd, char *const argv[])
552 {
553         int child, error;
554         int ofd, efd;
555         fd_set rfds;
556         int maxfd;
557         int eof = 0;
558
559         if ((child = do_fork()))
560             return child;
561
562         child = run_piped(cmd, argv, NULL, &ofd, &efd);
563
564         FD_ZERO(&rfds);
565         FD_SET(ofd, &rfds);
566         FD_SET(efd, &rfds);
567
568         while (!eof) {
569                 char *sptr , *eptr;
570                 char rbuf[BUF_SIZE];
571                 int bytes;
572                 int is_stderr = 0;
573
574                 maxfd = max(ofd, efd);
575                 error = select(maxfd, &rfds, NULL, NULL, NULL);
576
577                 if (error < 0) {
578                         pr_err("Error with select: %m\n");
579                         break;
580                 }
581
582                 if (FD_ISSET(ofd, &rfds)) {
583                         bytes = read(ofd, rbuf, BUF_SIZE);
584                         goto print;
585                 }
586
587                 if (FD_ISSET(efd, &rfds)) {
588                         is_stderr = 1;
589                         bytes = read(efd, rbuf, BUF_SIZE);
590                         goto print;
591                 }
592
593                 pr_err("select() returned unknown fd\n");
594                 break;
595
596 print:
597                 if (bytes < 0) {
598                         pr_err("read() failed: %m\n");
599                         break;
600                 }
601
602                 /*
603                  * Workaround: When a process had die and it has only
604                  * written to stderr, select() doesn't indicate that
605                  * there might be something to read in stderr fd. To
606                  * work around this issue, we try to read stderr just
607                  * in case in order to ensure everything gets read.
608                  */
609                 if (bytes == 0) {
610                         bytes = read(efd, rbuf, BUF_SIZE);
611                         is_stderr = 1;
612                         eof = 1;
613                 }
614
615                 sptr = eptr = rbuf;
616                 while(bytes--) {
617                         if (*eptr == '\n') {
618                                 *eptr = 0;
619                                 if (is_stderr)
620                                         pr_err("%s: stderr: %s\n",
621                                                 cmd, sptr);
622                                 else
623                                         pr_info("%s: stdout: %s\n",
624                                                 cmd, sptr);
625                                 sptr = eptr;
626                         }
627                         eptr++;
628                 }
629         }
630
631         close(ofd);
632         close(efd);
633
634         harvest_zombies(child); 
635
636         exit(1);
637         return 0;
638 }
639
640 int register_event_handler(struct event_handler *handler)
641 {
642         struct epoll_event ev;
643         int ret;
644
645         if (handler->fd <= 0) {
646                 pr_err("Invalid file descriptor of %d\n", handler->fd);
647                 return -1;
648         }
649
650         if (!handler->handle_event) {
651                 pr_err("Handler callback missing\n");
652                 return -1;
653         }
654
655         pr_info("Registering handler for %s, fd %d\n",
656                 handler->name, handler->fd);
657
658         ev.data.fd = handler->fd;
659         ev.data.ptr = handler;
660         ev.events = handler->events;
661         ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handler->fd, &ev);
662         if (ret) {
663                 pr_err("Failed to add epoll_fd: %m\n");
664                 return -1;
665         }
666
667         return 0;
668 }