]> git.itanic.dy.fi Git - rrdd/blob - process.c
process: Improve harvest_zombies
[rrdd] / process.c
1 #define _GNU_SOURCE
2 #include <unistd.h>
3 #include <fcntl.h>
4 #include <sys/select.h>
5 #include <sys/epoll.h>
6 #include <stdio.h>
7 #include <sys/wait.h>
8 #include <sys/resource.h>
9
10 #include "process.h"
11 #include "debug.h"
12
13 static int child_count;
14 static int parent_count;
15 static int job_request_fd[2];
16 static int job_get_permission_fd[2];
17 static int epoll_fd;
18 static unsigned int max_jobs;
19 static unsigned int job_count;
20 static unsigned int jobs_pending;
21
22 int get_child_count(void)
23 {
24         return child_count;
25 }
26
27 int get_parent_count(void)
28 {
29         return parent_count;
30 }
31
32 static void sigchild_handler(int signal)
33 {
34         int status;
35
36         waitpid(0, &status, 0);
37 }
38
39 static int setup_sigchild_handler(void)
40 {
41         struct sigaction sa;
42         int ret;
43
44         sa.sa_handler = sigchild_handler;
45         sa.sa_flags = SA_NOCLDSTOP;
46
47         ret = sigaction(SIGCHLD, &sa, NULL);
48         if (ret)
49                 pr_err("Failed to setup SIGCHLD handler: %m\n");
50
51         return ret;
52 }
53
54 static int cancel_sigchild_handler(void)
55 {
56         struct sigaction sa;
57         int ret;
58
59         sa.sa_handler = SIG_DFL;
60         sa.sa_flags = SA_NOCLDSTOP;
61
62         ret = sigaction(SIGCHLD, &sa, NULL);
63         if (ret)
64                 pr_err("Failed to cancel SIGCHLD handler: %m\n");
65
66         return ret;
67 }
68
69 /*
70  * Initialize the jobcontrol.
71  *
72  * Create the pipes that are used to grant children execution
73  * permissions. If max_jobs is zero, count the number of CPUs from
74  * /proc/cpuinfo and use that.
75  */
76 int init_jobcontrol(int max_jobs_requested)
77 {
78         struct epoll_event ev;
79         FILE *file;
80         int ret;
81         char buf[256];
82         char match[8];
83
84         if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
85                 pr_err("Failed to create pipe: %m\n");
86                 return -1;
87         }
88
89         if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
90                 pr_err("Failed to create pipe: %m\n");
91                 return -1;
92         }
93
94         ret = setup_sigchild_handler();
95         if (ret)
96                 return ret;
97
98         epoll_fd = epoll_create(1);
99         if (epoll_fd == -1) {
100                 pr_err("Failed to epoll_create(): %m\n");
101                 return -1;
102         }
103
104         ev.events = EPOLLIN;
105         ev.data.fd = job_request_fd[0];
106         ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, job_request_fd[0], &ev);
107         if (ret) {
108                 pr_err("Failed to add epoll_fd: %m\n");
109                 return -1;
110         }
111
112         if (max_jobs_requested > 0) {
113                 max_jobs = max_jobs_requested;
114                 goto no_count_cpus;
115         }
116         max_jobs++;
117
118         file = fopen("/proc/cpuinfo", "ro");
119         if (!file) {
120                 pr_err("Failed to open /proc/cpuinfo: %m\n");
121                 goto open_fail;
122         }
123
124         /*
125          * The CPU count algorithm simply reads the first 8 bytes from
126          * the /proc/cpuinfo and then expects that line to be there as
127          * many times as there are CPUs.
128          */
129         ret = fread(match, 1, sizeof(match), file);
130         if (ret < sizeof(match)) {
131                 pr_err("read %d bytes when expecting %zd %m\n",
132                         ret, sizeof(match));
133                 goto read_fail;
134         }
135
136         while(fgets(buf, sizeof(buf), file)) {
137                 if (!strncmp(buf, match, sizeof(match)))
138                         max_jobs++;
139         }
140
141 open_fail:
142 read_fail:
143         fclose(file);
144
145 no_count_cpus:
146         pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
147
148         return 0;
149 }
150
151 static int grant_new_job(void)
152 {
153         int ret;
154         char byte = 0;
155
156         job_count++;
157         pr_info("Granting new job. %d jobs currently and %d pending\n",
158                 job_count, jobs_pending);
159
160         ret = write(job_get_permission_fd[1], &byte, 1);
161         if (ret != 1) {
162                 pr_err("Failed to write 1 byte: %m\n");
163                 return -1;
164         }
165
166         return 0;
167 }
168
169 static int handle_job_request(void)
170 {
171         int ret, pid;
172
173         ret = read(job_request_fd[0], &pid, sizeof(pid));
174         if (ret < 0) {
175                 pr_err("Failed to read: %m\n");
176                 return -1;
177         }
178
179         if (ret == 0) {
180                 pr_info("Read zero bytes\n");
181                 return 0;
182         }
183
184         if (pid > 0) {
185                 if (job_count >= max_jobs) {
186                         jobs_pending++;
187                 } else {
188                         ret = grant_new_job();
189                         return 0;
190                 }
191         } else if (pid < 0) {
192                 if (job_count > max_jobs)
193                         pr_err("BUG: Job %u jobs exceeded limit %u\n",
194                                 job_count, max_jobs);
195
196                 pr_info("Job %d finished\n", -pid);
197                 job_count--;
198                 if (jobs_pending) {
199                         jobs_pending--;
200                         ret = grant_new_job();
201                         return 0;
202                 }
203         }
204
205         return 0;
206 }
207
208 int poll_job_requests(int timeout)
209 {
210         struct epoll_event event;
211         int ret;
212
213         /* Convert positive seconds to milliseconds */
214         timeout = timeout > 0 ? 1000 * timeout : timeout;
215
216         ret = epoll_wait(epoll_fd, &event, 1, timeout);
217
218         if (ret == -1) {
219                 if (errno != EINTR) {
220                         pr_err("epoll_wait: %m\n");
221                         return -1;
222                 }
223
224                 /*
225                  * If epoll_wait() was interrupted, better start
226                  * everything again from the beginning
227                  */
228                 return 0;
229         }
230
231         if (ret == 0) {
232                 pr_info("Timed out\n");
233                 goto out;
234         }
235
236         if (event.data.fd == job_request_fd[0])
237                 handle_job_request();
238         else
239                 pr_err("Unknown fd: %d\n", event.data.fd);
240
241 out:
242         pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
243         return ret;
244 }
245
246 /*
247  * Per process flag indicating whether this child has requested fork
248  * limiting. If it has, it must also tell the master parent when it
249  * has died so that the parent can give next pending job permission to
250  * go.
251  */
252 static int is_limited_fork;
253
254 int do_fork(void)
255 {
256         int child;
257         child = fork();
258         if (child < 0) {
259                 pr_err("fork() failed: %m\n");
260                 return -1;
261         }
262
263         if (child) {
264                 child_count++;
265                 pr_info("Fork %d, child %d\n", child_count, child);
266                 return child;
267         }
268
269         /*
270          * Child processes may want to use waitpid() for synchronizing
271          * with their sub-childs. Disable the signal handler by
272          * default
273          */
274         cancel_sigchild_handler();
275
276         /*
277          * Also do not notify the master parent the death of this
278          * child. Only childs that have been created with
279          * do_fork_limited() can have this flag set.
280          */
281         is_limited_fork = 0;
282
283         /* reset child's child count */
284         child_count = 0;
285         parent_count++;
286         return 0;
287 }
288
289 static int request_fork(char request)
290 {
291         int pid = getpid();
292
293         pid = request > 0 ? pid : -pid;
294
295         return write(job_request_fd[1], &pid, sizeof(pid));
296 }
297
298 static void limited_fork_exit_handler(void)
299 {
300         if (is_limited_fork)
301                 request_fork(-1);
302 }
303
304 /*
305  * Like do_fork(), but allow the child continue only after the global
306  * job count is low enough.
307  *
308  * We allow the parent to continue other more important activities but
309  * child respects the limit of global active processes.
310  */
311 int do_fork_limited(void)
312 {
313         int child, ret;
314         char byte;
315
316         child = do_fork();
317         if (child)
318                 return child;
319
320         /* Remember to notify the parent when we are done */
321         atexit(limited_fork_exit_handler);
322         is_limited_fork = 1;
323
324         pr_info("Requesting permission to go\n");
325
326         /* Signal the parent that we are here, waiting to go */
327         request_fork(1);
328
329         /*
330          * The parent will tell us when we can continue. If there were
331          * multiple children waiting for their turn to run parent's
332          * write to the pipe will wake up every process reading
333          * it. However, only one will be reading the content and
334          * getting the permission to run. So we will read as many
335          * times as needed until we get our own permission to run.
336          */
337         do {
338                 ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
339                 if (ret == 1)
340                         break;
341
342         } while(1);
343
344         pr_info("Continuing\n");
345         return child;
346 }
347
348 int harvest_zombies(int pid)
349 {
350         int status;
351         struct rusage rusage;
352         char *status_str = NULL;
353         int code = 0;
354
355         if (child_count == 0)
356                 return 0;
357
358         if (pid)
359                 pr_info("Waiting on pid %d, children left: %d\n", pid, 
360                         child_count);
361
362         do {
363                 pid = wait4(pid, &status, 0, &rusage);
364                 if (pid < 0) {
365                         pr_err("Error on waitid(): %m\n");
366                         return 0;
367                 }
368                 /* Wait until the child has become a zombie */
369         } while (!WIFEXITED(status) && !WIFSIGNALED(status));
370
371         child_count--;
372         if (WIFEXITED(status)) {
373                 status_str = "exited with status";
374                 code = WEXITSTATUS(status);
375         } else if (WIFSIGNALED(status)) {
376                 status_str = "killed by signal";
377                 code = WTERMSIG(status);
378         }
379         pr_info("pid %d: %s %d. Children left: %d\n", pid,
380                 status_str, code, child_count);
381         pr_info("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
382                 (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
383                 (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
384
385         return 1;
386 }
387
388 /*
389  * Runs a command cmd with params argv, connects stdin and stdout to
390  * readfd and writefd
391  *
392  * Returns the pid of the executed process
393  */
394 int run_piped(const char *cmd, char *const argv[],
395               int *stdinfd, int *stdoutfd, int *stderrfd)
396 {
397         int ifd[2], ofd[2], efd[2], pid;
398
399         pr_info("Running command %s\n", cmd);
400
401         if (stdinfd && pipe(ifd)) {
402                 pr_err("pipe() failed: %m\n");
403                 return -1;
404         }
405
406         if (stdoutfd && pipe(ofd)) {
407                 pr_err("pipe() failed: %m\n");
408                 return -1;
409         }
410
411         if (stderrfd && pipe(efd)) {
412                 pr_err("pipe() failed: %m\n");
413                 return -1;
414         }
415
416         pid = do_fork();
417         if (pid) { /* Parent side */
418                 if (stdinfd) {
419                         close(ifd[0]);
420                         *stdinfd = ifd[0];
421                 }
422
423                 if (stdoutfd) {
424                         close(ofd[1]);
425                         *stdoutfd = ofd[0];
426                 }
427
428                 if (stderrfd) {
429                         close(efd[1]);
430                         *stderrfd = efd[0];
431                 }
432
433                 return pid;
434         }
435
436         if (stdinfd) {
437                 close(ifd[1]);
438                 dup2(ifd[0], STDIN_FILENO);
439         }
440
441         if (stdoutfd) {
442                 close(ofd[0]);
443                 dup2(ofd[1], STDOUT_FILENO);
444         }
445
446         if (stderrfd) {
447                 close(efd[0]);
448                 dup2(efd[1], STDERR_FILENO);
449         }
450
451         /* Now we have redirected standard streams to parent process */
452         execvp(cmd, argv);
453         pr_err("Failed to execv command %s: %m\n", cmd);
454         exit(1);
455
456         return 0;
457 }
458
459 /*
460  * Runs a command cmd with params argv, connects stdin and stdout to
461  * readfd and writefd
462  *
463  * Returns the pid of the executed process
464  */
465 int run_piped_stream(const char *cmd, char *const argv[],
466                      FILE **stdinf, FILE **stdoutf, FILE **stderrf)
467 {
468         int ifd, ofd, efd, pid;
469         int *i, *o, *e;
470
471         if (stdinf)
472                 i = &ifd;
473         else 
474                 i = 0;
475         if (stdoutf)
476                 o = &ofd;
477         else 
478                 o = 0;
479         if (stderrf)
480                 e = &efd;
481         else 
482                 e = 0;
483
484         pid = run_piped(cmd, argv, i, o, e);
485
486         if (stdinf) {
487                 *stdinf = fdopen(ifd, "r");
488                 if (*stdinf == NULL) {
489                         pr_err("Error opening file stream for fd %d: %m\n",
490                                ifd);
491                         return -1;
492                 }
493         }
494
495         if (stdoutf) {
496                 *stdoutf = fdopen(ofd, "r");
497                 if (*stdoutf == NULL) {
498                         pr_err("Error opening file stream for fd %d: %m\n",
499                                ofd);
500                         return -1;
501                 }
502         }
503
504         if (stderrf) {
505                 *stderrf = fdopen(efd, "r");
506                 if (*stderrf == NULL) {
507                         pr_err("Error opening file stream for fd %d: %m\n",
508                                efd);
509                         return -1;
510                 }
511         }
512
513         return pid;
514 }
515
516 /*
517  * Forks a child and executes a command to run on parallel
518  */
519
520 #define max(a,b) (a) < (b) ? (b) : (a)
521 #define BUF_SIZE (128*1024)
522 int run(const char *cmd, char *const argv[])
523 {
524         int child, error;
525         int ofd, efd;
526         fd_set rfds;
527         int maxfd;
528         int eof = 0;
529         char stdoutstr[32], stderrstr[32], indent[16] = { "                " };
530
531         indent[get_parent_count() + 1] = 0;
532                 
533         if ((child = do_fork()))
534             return child;
535
536         child = run_piped(cmd, argv, NULL, &ofd, &efd);
537         snprintf(stdoutstr, 32, "%sstdout", green_color);
538         snprintf(stderrstr, 32, "%sstderr", red_color);
539
540         FD_ZERO(&rfds);
541         FD_SET(ofd, &rfds);
542         FD_SET(efd, &rfds);
543
544         while (!eof) {
545                 char *sptr , *eptr;
546                 char rbuf[BUF_SIZE];
547                 int bytes;
548                 char *typestr;
549
550                 maxfd = max(ofd, efd);
551                 error = select(maxfd, &rfds, NULL, NULL, NULL);
552
553                 if (error < 0) {
554                         pr_err("Error with select: %m\n");
555                         break;
556                 }
557
558                 if (FD_ISSET(ofd, &rfds)) {
559                         typestr = stdoutstr;
560                         bytes = read(ofd, rbuf, BUF_SIZE);
561                         goto print;
562                 }
563
564                 if (FD_ISSET(efd, &rfds)) {
565                         typestr = stderrstr;
566                         bytes = read(efd, rbuf, BUF_SIZE);
567                         goto print;
568                 }
569
570                 pr_err("select() returned unknown fd\n");
571                 break;
572
573 print:
574                 if (bytes < 0) {
575                         pr_err("read() failed: %m\n");
576                         break;
577                 }
578
579                 /*
580                  * Workaround: When a process had die and it has only
581                  * written to stderr, select() doesn't indicate that
582                  * there might be something to read in stderr fd. To
583                  * work around this issue, we try to read stderr just
584                  * in case in order to ensure everything gets read.
585                  */
586                 if (bytes == 0) {
587                         bytes = read(efd, rbuf, BUF_SIZE);
588                         typestr = stderrstr;
589                         eof = 1;
590                 }
591
592                 sptr = eptr = rbuf;
593                 while(bytes--) {
594                         if (*eptr == '\n') {
595                                 *eptr = 0;
596                                 fprintf(stderr, "%s[%5d %s] %s: %s%s\n", indent,
597                                        child, cmd, typestr, sptr, normal_color);
598                                 sptr = eptr;
599                         }
600                         eptr++;
601                 }
602         }
603
604         close(ofd);
605         close(efd);
606
607         harvest_zombies(child); 
608
609         exit(1);
610         return 0;
611 }