]> git.itanic.dy.fi Git - rrdd/blob - process.c
process: sigchild_handler: Only call waitpid()
[rrdd] / process.c
1 #define _GNU_SOURCE
2 #include <unistd.h>
3 #include <fcntl.h>
4 #include <sys/select.h>
5 #include <sys/epoll.h>
6 #include <stdio.h>
7
8 #include "process.h"
9 #include "debug.h"
10
11 static int child_count;
12 static int parent_count;
13 static int job_request_fd[2];
14 static int job_get_permission_fd[2];
15 static unsigned int max_jobs;
16 static unsigned int job_count;
17 static unsigned int jobs_pending;
18
19 int get_child_count(void)
20 {
21         return child_count;
22 }
23
24 int get_parent_count(void)
25 {
26         return parent_count;
27 }
28
29 static void sigchild_handler(int signal)
30 {
31         int status;
32
33         waitpid(0, &status, 0);
34 }
35
36 static int setup_sigchild_handler(void)
37 {
38         struct sigaction sa;
39         int ret;
40
41         sa.sa_handler = sigchild_handler;
42         sa.sa_flags = SA_NOCLDSTOP;
43
44         ret = sigaction(SIGCHLD, &sa, NULL);
45         if (ret)
46                 pr_err("Failed to setup SIGCHLD handler: %m\n");
47
48         return ret;
49 }
50
51 static int cancel_sigchild_handler(void)
52 {
53         struct sigaction sa;
54         int ret;
55
56         sa.sa_handler = SIG_DFL;
57         sa.sa_flags = SA_NOCLDSTOP;
58
59         ret = sigaction(SIGCHLD, &sa, NULL);
60         if (ret)
61                 pr_err("Failed to cancel SIGCHLD handler: %m\n");
62
63         return ret;
64 }
65
66 /*
67  * Initialize the jobcontrol.
68  *
69  * Create the pipes that are used to grant children execution
70  * permissions. If max_jobs is zero, count the CPUs from /proc/cpuinfo
71  * and use that.
72  */
73 int init_max_jobs(int max_jobs_requested)
74 {
75         FILE *file;
76         int ret;
77         char buf[256];
78         char match[8];
79
80         if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
81                 pr_err("Failed to create pipe: %m\n");
82                 return -1;
83         }
84
85         if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
86                 pr_err("Failed to create pipe: %m\n");
87                 return -1;
88         }
89
90         ret = setup_sigchild_handler();
91         if (ret)
92                 return ret;
93
94         if (max_jobs_requested > 0) {
95                 max_jobs = max_jobs_requested;
96                 goto no_count_cpus;
97         }
98         max_jobs++;
99
100         file = fopen("/proc/cpuinfo", "ro");
101         if (!file) {
102                 pr_err("Failed to open /proc/cpuinfo: %m\n");
103                 goto open_fail;
104         }
105
106         /*
107          * The CPU count algorithm simply reads the first 8 bytes from
108          * the /proc/cpuinfo and then expects that line to be there as
109          * many times as there are CPUs.
110          */
111         ret = fread(match, 1, sizeof(match), file);
112         if (ret < sizeof(match)) {
113                 pr_err("read %d bytes when expecting %zd %m\n",
114                         ret, sizeof(match));
115                 goto read_fail;
116         }
117
118         while(fgets(buf, sizeof(buf), file)) {
119                 if (!strncmp(buf, match, sizeof(match)))
120                         max_jobs++;
121         }
122
123 open_fail:
124 read_fail:
125         fclose(file);
126
127 no_count_cpus:
128         pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
129
130         return 0;
131 }
132
133 static int grant_new_job(void)
134 {
135         int ret;
136         char byte = 0;
137
138         job_count++;
139         pr_info("Granting new job. %d jobs currently and %d pending\n",
140                 job_count, jobs_pending);
141
142         ret = write(job_get_permission_fd[1], &byte, 1);
143         if (ret != 1) {
144                 pr_err("Failed to write 1 byte: %m\n");
145                 return -1;
146         }
147
148         return 0;
149 }
150
151 int poll_job_requests(int timeout)
152 {
153         struct epoll_event event;
154         static int epollfd;
155         int ret;
156         int pid;
157
158         if (!epollfd) {
159                 struct epoll_event ev;
160
161                 epollfd = epoll_create(1);
162                 if (epollfd == -1) {
163                         pr_err("Failed to epoll_create(): %m\n");
164                         return -1;
165                 }
166
167                 ev.events = EPOLLIN;
168                 ev.data.fd = job_request_fd[0];
169                 epoll_ctl(epollfd, EPOLL_CTL_ADD, job_request_fd[0], &ev);
170         }
171
172         /* Convert positive seconds to milliseconds */
173         timeout = timeout > 0 ? 1000 * timeout : timeout;
174
175         ret = epoll_wait(epollfd, &event, 1, timeout);
176
177         if (ret == -1) {
178                 if (errno != EINTR) {
179                         pr_err("epoll_wait: %m\n");
180                         return -1;
181                 }
182
183                 /*
184                  * If epoll_wait() was interrupted, better start
185                  * everything again from the beginning
186                  */
187                 return 0;
188         }
189
190         if (ret == 0) {
191                 pr_info("Timed out\n");
192                 goto out;
193         }
194
195         ret = read(event.data.fd, &pid, sizeof(pid));
196         if (ret < 0) {
197                 pr_err("Failed to read: %m\n");
198                 return -1;
199         }
200
201         if (ret == 0) {
202                 pr_info("Read zero bytes\n");
203                 return 0;
204         }
205
206         if (pid > 0) {
207                 if (job_count >= max_jobs) {
208                         jobs_pending++;
209                 } else {
210                         ret = grant_new_job();
211                         goto out;
212                 }
213         } else if (pid < 0) {
214                 if (job_count > max_jobs)
215                         pr_err("BUG: Job %u jobs exceeded limit %u\n",
216                                 job_count, max_jobs);
217
218                 pr_info("Job %d finished\n", -pid);
219                 job_count--;
220                 if (jobs_pending) {
221                         jobs_pending--;
222                         ret = grant_new_job();
223                         goto out;
224                 }
225         }
226
227 out:
228         pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
229         return ret;
230 }
231
232 /*
233  * Per process flag indicating whether this child has requested fork
234  * limiting. If it has, it must also tell the master parent when it
235  * has died so that the parent can give next pending job permission to
236  * go.
237  */
238 static int is_limited_fork;
239
240 int do_fork(void)
241 {
242         int child;
243         child = fork();
244         if (child < 0) {
245                 pr_err("fork() failed: %m\n");
246                 return -1;
247         }
248
249         if (child) {
250                 child_count++;
251                 pr_info("Fork %d, child %d\n", child_count, child);
252                 return child;
253         }
254
255         /*
256          * Child processes may want to use waitpid() for synchronizing
257          * with their sub-childs. Disable the signal handler by
258          * default
259          */
260         cancel_sigchild_handler();
261
262         /*
263          * Also do not notify the master parent the death of this
264          * child. Only childs that have been created with
265          * do_fork_limited() can have this flag set.
266          */
267         is_limited_fork = 0;
268
269         /* reset child's child count */
270         child_count = 0;
271         parent_count++;
272         return 0;
273 }
274
275 static int request_fork(char request)
276 {
277         int pid = getpid();
278
279         pid = request > 0 ? pid : -pid;
280
281         return write(job_request_fd[1], &pid, sizeof(pid));
282 }
283
284 static void limited_fork_exit_handler(void)
285 {
286         if (is_limited_fork)
287                 request_fork(-1);
288 }
289
290 /*
291  * Like do_fork(), but allow the child continue only after the global
292  * job count is low enough.
293  *
294  * We allow the parent to continue other more important activities but
295  * child respects the limit of global active processes.
296  */
297 int do_fork_limited(void)
298 {
299         int child, ret;
300         char byte;
301
302         child = do_fork();
303         if (child)
304                 return child;
305
306         /* Remember to notify the parent when we are done */
307         atexit(limited_fork_exit_handler);
308         is_limited_fork = 1;
309
310         pr_info("Requesting permission to go\n");
311
312         /* Signal the parent that we are here, waiting to go */
313         request_fork(1);
314
315         /*
316          * The parent will tell us when we can continue. If there were
317          * multiple children waiting for their turn to run parent's
318          * write to the pipe will wake up every process reading
319          * it. However, only one will be reading the content and
320          * getting the permission to run. So we will read as many
321          * times as needed until we get our own permission to run.
322          */
323         do {
324                 ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
325                 if (ret == 1)
326                         break;
327
328         } while(1);
329
330         pr_info("Continuing\n");
331         return child;
332 }
333
334 int harvest_zombies(int pid)
335 {
336         int status;
337
338         if (child_count == 0)
339                 return 0;
340
341         if (pid)
342                 pr_info("Waiting on pid %d, children left: %d\n", pid, 
343                         child_count);
344
345         pid = waitpid(pid, &status, 0);
346         if (pid < 0) {
347                 pr_err("Error on wait(): %m\n");
348                 child_count--; /* Decrement child count anyway */
349         }
350         else {
351                 child_count--;
352                 pr_info("pid %d: exit code %d. Children left: %d\n", pid,
353                         status, child_count);
354         }
355
356         return 1;
357 }
358
359 /*
360  * Runs a command cmd with params argv, connects stdin and stdout to
361  * readfd and writefd
362  *
363  * Returns the pid of the executed process
364  */
365 int run_piped(const char *cmd, char *const argv[],
366               int *stdinfd, int *stdoutfd, int *stderrfd)
367 {
368         int ifd[2], ofd[2], efd[2], pid;
369
370         pr_info("Running command %s\n", cmd);
371
372         if (stdinfd && pipe(ifd)) {
373                 pr_err("pipe() failed: %m\n");
374                 return -1;
375         }
376
377         if (stdoutfd && pipe(ofd)) {
378                 pr_err("pipe() failed: %m\n");
379                 return -1;
380         }
381
382         if (stderrfd && pipe(efd)) {
383                 pr_err("pipe() failed: %m\n");
384                 return -1;
385         }
386
387         pid = do_fork();
388         if (pid) { /* Parent side */
389                 if (stdinfd) {
390                         close(ifd[0]);
391                         *stdinfd = ifd[0];
392                 }
393
394                 if (stdoutfd) {
395                         close(ofd[1]);
396                         *stdoutfd = ofd[0];
397                 }
398
399                 if (stderrfd) {
400                         close(efd[1]);
401                         *stderrfd = efd[0];
402                 }
403
404                 return pid;
405         }
406
407         if (stdinfd) {
408                 close(ifd[1]);
409                 dup2(ifd[0], STDIN_FILENO);
410         }
411
412         if (stdoutfd) {
413                 close(ofd[0]);
414                 dup2(ofd[1], STDOUT_FILENO);
415         }
416
417         if (stderrfd) {
418                 close(efd[0]);
419                 dup2(efd[1], STDERR_FILENO);
420         }
421
422         /* Now we have redirected standard streams to parent process */
423         execvp(cmd, argv);
424         pr_err("Failed to execv command %s: %m\n", cmd);
425         exit(1);
426
427         return 0;
428 }
429
430 /*
431  * Runs a command cmd with params argv, connects stdin and stdout to
432  * readfd and writefd
433  *
434  * Returns the pid of the executed process
435  */
436 int run_piped_stream(const char *cmd, char *const argv[],
437                      FILE **stdinf, FILE **stdoutf, FILE **stderrf)
438 {
439         int ifd, ofd, efd, pid;
440         int *i, *o, *e;
441
442         if (stdinf)
443                 i = &ifd;
444         else 
445                 i = 0;
446         if (stdoutf)
447                 o = &ofd;
448         else 
449                 o = 0;
450         if (stderrf)
451                 e = &efd;
452         else 
453                 e = 0;
454
455         pid = run_piped(cmd, argv, i, o, e);
456
457         if (stdinf) {
458                 *stdinf = fdopen(ifd, "r");
459                 if (*stdinf == NULL) {
460                         pr_err("Error opening file stream for fd %d: %m\n",
461                                ifd);
462                         return -1;
463                 }
464         }
465
466         if (stdoutf) {
467                 *stdoutf = fdopen(ofd, "r");
468                 if (*stdoutf == NULL) {
469                         pr_err("Error opening file stream for fd %d: %m\n",
470                                ofd);
471                         return -1;
472                 }
473         }
474
475         if (stderrf) {
476                 *stderrf = fdopen(efd, "r");
477                 if (*stderrf == NULL) {
478                         pr_err("Error opening file stream for fd %d: %m\n",
479                                efd);
480                         return -1;
481                 }
482         }
483
484         return pid;
485 }
486
487 /*
488  * Forks a child and executes a command to run on parallel
489  */
490
491 #define max(a,b) (a) < (b) ? (b) : (a)
492 #define BUF_SIZE (128*1024)
493 int run(const char *cmd, char *const argv[])
494 {
495         int child, error;
496         int ofd, efd;
497         fd_set rfds;
498         int maxfd;
499         int eof = 0;
500         char stdoutstr[32], stderrstr[32], indent[16] = { "                " };
501
502         indent[get_parent_count() + 1] = 0;
503                 
504         if ((child = do_fork()))
505             return child;
506
507         child = run_piped(cmd, argv, NULL, &ofd, &efd);
508         snprintf(stdoutstr, 32, "%sstdout", green_color);
509         snprintf(stderrstr, 32, "%sstderr", red_color);
510
511         FD_ZERO(&rfds);
512         FD_SET(ofd, &rfds);
513         FD_SET(efd, &rfds);
514
515         while (!eof) {
516                 char *sptr , *eptr;
517                 char rbuf[BUF_SIZE];
518                 int bytes;
519                 char *typestr;
520
521                 maxfd = max(ofd, efd);
522                 error = select(maxfd, &rfds, NULL, NULL, NULL);
523
524                 if (error < 0) {
525                         pr_err("Error with select: %m\n");
526                         break;
527                 }
528
529                 if (FD_ISSET(ofd, &rfds)) {
530                         typestr = stdoutstr;
531                         bytes = read(ofd, rbuf, BUF_SIZE);
532                         goto print;
533                 }
534
535                 if (FD_ISSET(efd, &rfds)) {
536                         typestr = stderrstr;
537                         bytes = read(efd, rbuf, BUF_SIZE);
538                         goto print;
539                 }
540
541                 pr_err("select() returned unknown fd\n");
542                 break;
543
544 print:
545                 if (bytes < 0) {
546                         pr_err("read() failed: %m\n");
547                         break;
548                 }
549
550                 /*
551                  * Workaround: When a process had die and it has only
552                  * written to stderr, select() doesn't indicate that
553                  * there might be something to read in stderr fd. To
554                  * work around this issue, we try to read stderr just
555                  * in case in order to ensure everything gets read.
556                  */
557                 if (bytes == 0) {
558                         bytes = read(efd, rbuf, BUF_SIZE);
559                         typestr = stderrstr;
560                         eof = 1;
561                 }
562
563                 sptr = eptr = rbuf;
564                 while(bytes--) {
565                         if (*eptr == '\n') {
566                                 *eptr = 0;
567                                 fprintf(stderr, "%s[%5d %s] %s: %s%s\n", indent,
568                                        child, cmd, typestr, sptr, normal_color);
569                                 sptr = eptr;
570                         }
571                         eptr++;
572                 }
573         }
574
575         close(ofd);
576         close(efd);
577
578         harvest_zombies(child); 
579
580         exit(1);
581         return 0;
582 }