]> git.itanic.dy.fi Git - rrdd/blob - process.c
process.c: Implement support for limiting number of active processes
[rrdd] / process.c
1 #define _GNU_SOURCE
2 #include <unistd.h>
3 #include <fcntl.h>
4 #include <sys/select.h>
5 #include <sys/epoll.h>
6 #include <stdio.h>
7
8 #include "process.h"
9 #include "debug.h"
10
11 static int child_count;
12 static int parent_count;
13 static int job_request_fd[2];
14 static int job_get_permission_fd[2];
15 static unsigned int max_jobs;
16 static unsigned int job_count;
17 static unsigned int jobs_pending;
18
19 int get_child_count(void)
20 {
21         return child_count;
22 }
23
24 int get_parent_count(void)
25 {
26         return parent_count;
27 }
28
29 static void sigchild_handler(int signal)
30 {
31         pr_info("Child has died, go harvest the zombie\n");
32         harvest_zombies(0);
33 }
34
35 static int setup_sigchild_handler(void)
36 {
37         struct sigaction sa;
38         int ret;
39
40         sa.sa_handler = sigchild_handler;
41         sa.sa_flags = SA_NOCLDSTOP;
42
43         ret = sigaction(SIGCHLD, &sa, NULL);
44         if (ret)
45                 pr_err("Failed to setup SIGCHLD handler: %m\n");
46
47         return ret;
48 }
49
50 static int cancel_sigchild_handler(void)
51 {
52         struct sigaction sa;
53         int ret;
54
55         sa.sa_handler = SIG_DFL;
56         sa.sa_flags = SA_NOCLDSTOP;
57
58         ret = sigaction(SIGCHLD, &sa, NULL);
59         if (ret)
60                 pr_err("Failed to cancel SIGCHLD handler: %m\n");
61
62         return ret;
63 }
64
65 /*
66  * Initialize the jobcontrol.
67  *
68  * Create the pipes that are used to grant children execution
69  * permissions. If max_jobs is zero, count the CPUs from /proc/cpuinfo
70  * and use that.
71  */
72 int init_max_jobs(int max_jobs_requested)
73 {
74         FILE *file;
75         int ret;
76         char buf[256];
77         char match[8];
78
79         if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
80                 pr_err("Failed to create pipe: %m\n");
81                 return -1;
82         }
83
84         if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
85                 pr_err("Failed to create pipe: %m\n");
86                 return -1;
87         }
88
89         ret = setup_sigchild_handler();
90         if (ret)
91                 return ret;
92
93         if (max_jobs_requested > 0) {
94                 max_jobs = max_jobs_requested;
95                 goto no_count_cpus;
96         }
97         max_jobs++;
98
99         file = fopen("/proc/cpuinfo", "ro");
100         if (!file) {
101                 pr_err("Failed to open /proc/cpuinfo: %m\n");
102                 goto open_fail;
103         }
104
105         /*
106          * The CPU count algorithm simply reads the first 8 bytes from
107          * the /proc/cpuinfo and then expects that line to be there as
108          * many times as there are CPUs.
109          */
110         ret = fread(match, 1, sizeof(match), file);
111         if (ret < sizeof(match)) {
112                 pr_err("read %d bytes when expecting %zd %m\n",
113                         ret, sizeof(match));
114                 goto read_fail;
115         }
116
117         while(fgets(buf, sizeof(buf), file)) {
118                 if (!strncmp(buf, match, sizeof(match)))
119                         max_jobs++;
120         }
121
122 open_fail:
123 read_fail:
124         fclose(file);
125
126 no_count_cpus:
127         pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
128
129         return 0;
130 }
131
132 static int grant_new_job(void)
133 {
134         int ret;
135         char byte = 0;
136
137         job_count++;
138         pr_info("Granting new job. %d jobs currently and %d pending\n",
139                 job_count, jobs_pending);
140
141         ret = write(job_get_permission_fd[1], &byte, 1);
142         if (ret != 1) {
143                 pr_err("Failed to write 1 byte: %m\n");
144                 return -1;
145         }
146
147         return 0;
148 }
149
150 int poll_job_requests(int timeout)
151 {
152         struct epoll_event event;
153         static int epollfd;
154         int ret;
155         int pid;
156
157         if (!epollfd) {
158                 struct epoll_event ev;
159
160                 epollfd = epoll_create(1);
161                 if (epollfd == -1) {
162                         pr_err("Failed to epoll_create(): %m\n");
163                         return -1;
164                 }
165
166                 ev.events = EPOLLIN;
167                 ev.data.fd = job_request_fd[0];
168                 epoll_ctl(epollfd, EPOLL_CTL_ADD, job_request_fd[0], &ev);
169         }
170
171         /* Convert positive seconds to milliseconds */
172         timeout = timeout > 0 ? 1000 * timeout : timeout;
173
174         ret = epoll_wait(epollfd, &event, 1, timeout);
175
176         if (ret == -1) {
177                 if (errno != EINTR) {
178                         pr_err("epoll_wait: %m\n");
179                         return -1;
180                 }
181
182                 /*
183                  * If epoll_wait() was interrupted, better start
184                  * everything again from the beginning
185                  */
186                 return 0;
187         }
188
189         if (ret == 0) {
190                 pr_info("Timed out\n");
191                 goto out;
192         }
193
194         ret = read(event.data.fd, &pid, sizeof(pid));
195         if (ret < 0) {
196                 pr_err("Failed to read: %m\n");
197                 return -1;
198         }
199
200         if (ret == 0) {
201                 pr_info("Read zero bytes\n");
202                 return 0;
203         }
204
205         if (pid > 0) {
206                 if (job_count >= max_jobs) {
207                         jobs_pending++;
208                 } else {
209                         ret = grant_new_job();
210                         goto out;
211                 }
212         } else if (pid < 0) {
213                 if (job_count > max_jobs)
214                         pr_err("BUG: Job %u jobs exceeded limit %u\n",
215                                 job_count, max_jobs);
216
217                 pr_info("Job %d finished\n", -pid);
218                 job_count--;
219                 if (jobs_pending) {
220                         jobs_pending--;
221                         ret = grant_new_job();
222                         goto out;
223                 }
224         }
225
226 out:
227         pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
228         return ret;
229 }
230
231 /*
232  * Per process flag indicating whether this child has requested fork
233  * limiting. If it has, it must also tell the master parent when it
234  * has died so that the parent can give next pending job permission to
235  * go.
236  */
237 static int is_limited_fork;
238
239 int do_fork(void)
240 {
241         int child;
242         child = fork();
243         if (child < 0) {
244                 pr_err("fork() failed: %m\n");
245                 return -1;
246         }
247
248         if (child) {
249                 child_count++;
250                 pr_info("Fork %d, child %d\n", child_count, child);
251                 return child;
252         }
253
254         /*
255          * Child processes may want to use waitpid() for synchronizing
256          * with their sub-childs. Disable the signal handler by
257          * default
258          */
259         cancel_sigchild_handler();
260
261         /*
262          * Also do not notify the master parent the death of this
263          * child. Only childs that have been created with
264          * do_fork_limited() can have this flag set.
265          */
266         is_limited_fork = 0;
267
268         /* reset child's child count */
269         child_count = 0;
270         parent_count++;
271         return 0;
272 }
273
274 static int request_fork(char request)
275 {
276         int pid = getpid();
277
278         pid = request > 0 ? pid : -pid;
279
280         return write(job_request_fd[1], &pid, sizeof(pid));
281 }
282
283 static void limited_fork_exit_handler(void)
284 {
285         if (is_limited_fork)
286                 request_fork(-1);
287 }
288
289 /*
290  * Like do_fork(), but allow the child continue only after the global
291  * job count is low enough.
292  *
293  * We allow the parent to continue other more important activities but
294  * child respects the limit of global active processes.
295  */
296 int do_fork_limited(void)
297 {
298         int child, ret;
299         char byte;
300
301         child = do_fork();
302         if (child)
303                 return child;
304
305         /* Remember to notify the parent when we are done */
306         atexit(limited_fork_exit_handler);
307         is_limited_fork = 1;
308
309         pr_info("Requesting permission to go\n");
310
311         /* Signal the parent that we are here, waiting to go */
312         request_fork(1);
313
314         /*
315          * The parent will tell us when we can continue. If there were
316          * multiple children waiting for their turn to run parent's
317          * write to the pipe will wake up every process reading
318          * it. However, only one will be reading the content and
319          * getting the permission to run. So we will read as many
320          * times as needed until we get our own permission to run.
321          */
322         do {
323                 ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
324                 if (ret == 1)
325                         break;
326
327         } while(1);
328
329         pr_info("Continuing\n");
330         return child;
331 }
332
333 int harvest_zombies(int pid)
334 {
335         int status;
336
337         if (child_count == 0)
338                 return 0;
339
340         if (pid)
341                 pr_info("Waiting on pid %d, children left: %d\n", pid, 
342                         child_count);
343
344         pid = waitpid(pid, &status, 0);
345         if (pid < 0) {
346                 pr_err("Error on wait(): %m\n");
347                 child_count--; /* Decrement child count anyway */
348         }
349         else {
350                 child_count--;
351                 pr_info("pid %d: exit code %d. Children left: %d\n", pid,
352                         status, child_count);
353         }
354
355         return 1;
356 }
357
358 /*
359  * Runs a command cmd with params argv, connects stdin and stdout to
360  * readfd and writefd
361  *
362  * Returns the pid of the executed process
363  */
364 int run_piped(const char *cmd, char *const argv[],
365               int *stdinfd, int *stdoutfd, int *stderrfd)
366 {
367         int ifd[2], ofd[2], efd[2], pid;
368
369         pr_info("Running command %s\n", cmd);
370
371         if (stdinfd && pipe(ifd)) {
372                 pr_err("pipe() failed: %m\n");
373                 return -1;
374         }
375
376         if (stdoutfd && pipe(ofd)) {
377                 pr_err("pipe() failed: %m\n");
378                 return -1;
379         }
380
381         if (stderrfd && pipe(efd)) {
382                 pr_err("pipe() failed: %m\n");
383                 return -1;
384         }
385
386         pid = do_fork();
387         if (pid) { /* Parent side */
388                 if (stdinfd) {
389                         close(ifd[0]);
390                         *stdinfd = ifd[0];
391                 }
392
393                 if (stdoutfd) {
394                         close(ofd[1]);
395                         *stdoutfd = ofd[0];
396                 }
397
398                 if (stderrfd) {
399                         close(efd[1]);
400                         *stderrfd = efd[0];
401                 }
402
403                 return pid;
404         }
405
406         if (stdinfd) {
407                 close(ifd[1]);
408                 dup2(ifd[0], STDIN_FILENO);
409         }
410
411         if (stdoutfd) {
412                 close(ofd[0]);
413                 dup2(ofd[1], STDOUT_FILENO);
414         }
415
416         if (stderrfd) {
417                 close(efd[0]);
418                 dup2(efd[1], STDERR_FILENO);
419         }
420
421         /* Now we have redirected standard streams to parent process */
422         execvp(cmd, argv);
423         pr_err("Failed to execv command %s: %m\n", cmd);
424         exit(1);
425
426         return 0;
427 }
428
429 /*
430  * Runs a command cmd with params argv, connects stdin and stdout to
431  * readfd and writefd
432  *
433  * Returns the pid of the executed process
434  */
435 int run_piped_stream(const char *cmd, char *const argv[],
436                      FILE **stdinf, FILE **stdoutf, FILE **stderrf)
437 {
438         int ifd, ofd, efd, pid;
439         int *i, *o, *e;
440
441         if (stdinf)
442                 i = &ifd;
443         else 
444                 i = 0;
445         if (stdoutf)
446                 o = &ofd;
447         else 
448                 o = 0;
449         if (stderrf)
450                 e = &efd;
451         else 
452                 e = 0;
453
454         pid = run_piped(cmd, argv, i, o, e);
455
456         if (stdinf) {
457                 *stdinf = fdopen(ifd, "r");
458                 if (*stdinf == NULL) {
459                         pr_err("Error opening file stream for fd %d: %m\n",
460                                ifd);
461                         return -1;
462                 }
463         }
464
465         if (stdoutf) {
466                 *stdoutf = fdopen(ofd, "r");
467                 if (*stdoutf == NULL) {
468                         pr_err("Error opening file stream for fd %d: %m\n",
469                                ofd);
470                         return -1;
471                 }
472         }
473
474         if (stderrf) {
475                 *stderrf = fdopen(efd, "r");
476                 if (*stderrf == NULL) {
477                         pr_err("Error opening file stream for fd %d: %m\n",
478                                efd);
479                         return -1;
480                 }
481         }
482
483         return pid;
484 }
485
486 /*
487  * Forks a child and executes a command to run on parallel
488  */
489
490 #define max(a,b) (a) < (b) ? (b) : (a)
491 #define BUF_SIZE (128*1024)
492 int run(const char *cmd, char *const argv[])
493 {
494         int child, error;
495         int ofd, efd;
496         fd_set rfds;
497         int maxfd;
498         int eof = 0;
499         char stdoutstr[32], stderrstr[32], indent[16] = { "                " };
500
501         indent[get_parent_count() + 1] = 0;
502                 
503         if ((child = do_fork()))
504             return child;
505
506         child = run_piped(cmd, argv, NULL, &ofd, &efd);
507         snprintf(stdoutstr, 32, "%sstdout", green_color);
508         snprintf(stderrstr, 32, "%sstderr", red_color);
509
510         FD_ZERO(&rfds);
511         FD_SET(ofd, &rfds);
512         FD_SET(efd, &rfds);
513
514         while (!eof) {
515                 char *sptr , *eptr;
516                 char rbuf[BUF_SIZE];
517                 int bytes;
518                 char *typestr;
519
520                 maxfd = max(ofd, efd);
521                 error = select(maxfd, &rfds, NULL, NULL, NULL);
522
523                 if (error < 0) {
524                         pr_err("Error with select: %m\n");
525                         break;
526                 }
527
528                 if (FD_ISSET(ofd, &rfds)) {
529                         typestr = stdoutstr;
530                         bytes = read(ofd, rbuf, BUF_SIZE);
531                         goto print;
532                 }
533
534                 if (FD_ISSET(efd, &rfds)) {
535                         typestr = stderrstr;
536                         bytes = read(efd, rbuf, BUF_SIZE);
537                         goto print;
538                 }
539
540                 pr_err("select() returned unknown fd\n");
541                 break;
542
543 print:
544                 if (bytes < 0) {
545                         pr_err("read() failed: %m\n");
546                         break;
547                 }
548
549                 /*
550                  * Workaround: When a process had die and it has only
551                  * written to stderr, select() doesn't indicate that
552                  * there might be something to read in stderr fd. To
553                  * work around this issue, we try to read stderr just
554                  * in case in order to ensure everything gets read.
555                  */
556                 if (bytes == 0) {
557                         bytes = read(efd, rbuf, BUF_SIZE);
558                         typestr = stderrstr;
559                         eof = 1;
560                 }
561
562                 sptr = eptr = rbuf;
563                 while(bytes--) {
564                         if (*eptr == '\n') {
565                                 *eptr = 0;
566                                 fprintf(stderr, "%s[%5d %s] %s: %s%s\n", indent,
567                                        child, cmd, typestr, sptr, normal_color);
568                                 sptr = eptr;
569                         }
570                         eptr++;
571                 }
572         }
573
574         close(ofd);
575         close(efd);
576
577         harvest_zombies(child); 
578
579         exit(1);
580         return 0;
581 }