]> git.itanic.dy.fi Git - rrdd/blob - process.c
Rename init_max_jobs() to init_jobcontrol()
[rrdd] / process.c
1 #define _GNU_SOURCE
2 #include <unistd.h>
3 #include <fcntl.h>
4 #include <sys/select.h>
5 #include <sys/epoll.h>
6 #include <stdio.h>
7
8 #include "process.h"
9 #include "debug.h"
10
11 static int child_count;
12 static int parent_count;
13 static int job_request_fd[2];
14 static int job_get_permission_fd[2];
15 static unsigned int max_jobs;
16 static unsigned int job_count;
17 static unsigned int jobs_pending;
18
19 int get_child_count(void)
20 {
21         return child_count;
22 }
23
24 int get_parent_count(void)
25 {
26         return parent_count;
27 }
28
29 static void sigchild_handler(int signal)
30 {
31         int status;
32
33         waitpid(0, &status, 0);
34 }
35
36 static int setup_sigchild_handler(void)
37 {
38         struct sigaction sa;
39         int ret;
40
41         sa.sa_handler = sigchild_handler;
42         sa.sa_flags = SA_NOCLDSTOP;
43
44         ret = sigaction(SIGCHLD, &sa, NULL);
45         if (ret)
46                 pr_err("Failed to setup SIGCHLD handler: %m\n");
47
48         return ret;
49 }
50
51 static int cancel_sigchild_handler(void)
52 {
53         struct sigaction sa;
54         int ret;
55
56         sa.sa_handler = SIG_DFL;
57         sa.sa_flags = SA_NOCLDSTOP;
58
59         ret = sigaction(SIGCHLD, &sa, NULL);
60         if (ret)
61                 pr_err("Failed to cancel SIGCHLD handler: %m\n");
62
63         return ret;
64 }
65
66 /*
67  * Initialize the jobcontrol.
68  *
69  * Create the pipes that are used to grant children execution
70  * permissions. If max_jobs is zero, count the number of CPUs from
71  * /proc/cpuinfo and use that.
72  */
73 int init_jobcontrol(int max_jobs_requested)
74 {
75         struct epoll_event ev;
76         FILE *file;
77         int ret;
78         char buf[256];
79         char match[8];
80
81         if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
82                 pr_err("Failed to create pipe: %m\n");
83                 return -1;
84         }
85
86         if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
87                 pr_err("Failed to create pipe: %m\n");
88                 return -1;
89         }
90
91         ret = setup_sigchild_handler();
92         if (ret)
93                 return ret;
94
95         if (max_jobs_requested > 0) {
96                 max_jobs = max_jobs_requested;
97                 goto no_count_cpus;
98         }
99         max_jobs++;
100
101         file = fopen("/proc/cpuinfo", "ro");
102         if (!file) {
103                 pr_err("Failed to open /proc/cpuinfo: %m\n");
104                 goto open_fail;
105         }
106
107         /*
108          * The CPU count algorithm simply reads the first 8 bytes from
109          * the /proc/cpuinfo and then expects that line to be there as
110          * many times as there are CPUs.
111          */
112         ret = fread(match, 1, sizeof(match), file);
113         if (ret < sizeof(match)) {
114                 pr_err("read %d bytes when expecting %zd %m\n",
115                         ret, sizeof(match));
116                 goto read_fail;
117         }
118
119         while(fgets(buf, sizeof(buf), file)) {
120                 if (!strncmp(buf, match, sizeof(match)))
121                         max_jobs++;
122         }
123
124 open_fail:
125 read_fail:
126         fclose(file);
127
128 no_count_cpus:
129         pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
130
131         return 0;
132 }
133
134 static int grant_new_job(void)
135 {
136         int ret;
137         char byte = 0;
138
139         job_count++;
140         pr_info("Granting new job. %d jobs currently and %d pending\n",
141                 job_count, jobs_pending);
142
143         ret = write(job_get_permission_fd[1], &byte, 1);
144         if (ret != 1) {
145                 pr_err("Failed to write 1 byte: %m\n");
146                 return -1;
147         }
148
149         return 0;
150 }
151
152 int poll_job_requests(int timeout)
153 {
154         struct epoll_event event;
155         static int epollfd;
156         int ret;
157         int pid;
158
159         if (!epollfd) {
160                 struct epoll_event ev;
161
162                 epollfd = epoll_create(1);
163                 if (epollfd == -1) {
164                         pr_err("Failed to epoll_create(): %m\n");
165                         return -1;
166                 }
167
168                 ev.events = EPOLLIN;
169                 ev.data.fd = job_request_fd[0];
170                 epoll_ctl(epollfd, EPOLL_CTL_ADD, job_request_fd[0], &ev);
171         }
172
173         /* Convert positive seconds to milliseconds */
174         timeout = timeout > 0 ? 1000 * timeout : timeout;
175
176         ret = epoll_wait(epollfd, &event, 1, timeout);
177
178         if (ret == -1) {
179                 if (errno != EINTR) {
180                         pr_err("epoll_wait: %m\n");
181                         return -1;
182                 }
183
184                 /*
185                  * If epoll_wait() was interrupted, better start
186                  * everything again from the beginning
187                  */
188                 return 0;
189         }
190
191         if (ret == 0) {
192                 pr_info("Timed out\n");
193                 goto out;
194         }
195
196         ret = read(event.data.fd, &pid, sizeof(pid));
197         if (ret < 0) {
198                 pr_err("Failed to read: %m\n");
199                 return -1;
200         }
201
202         if (ret == 0) {
203                 pr_info("Read zero bytes\n");
204                 return 0;
205         }
206
207         if (pid > 0) {
208                 if (job_count >= max_jobs) {
209                         jobs_pending++;
210                 } else {
211                         ret = grant_new_job();
212                         goto out;
213                 }
214         } else if (pid < 0) {
215                 if (job_count > max_jobs)
216                         pr_err("BUG: Job %u jobs exceeded limit %u\n",
217                                 job_count, max_jobs);
218
219                 pr_info("Job %d finished\n", -pid);
220                 job_count--;
221                 if (jobs_pending) {
222                         jobs_pending--;
223                         ret = grant_new_job();
224                         goto out;
225                 }
226         }
227
228 out:
229         pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
230         return ret;
231 }
232
233 /*
234  * Per process flag indicating whether this child has requested fork
235  * limiting. If it has, it must also tell the master parent when it
236  * has died so that the parent can give next pending job permission to
237  * go.
238  */
239 static int is_limited_fork;
240
241 int do_fork(void)
242 {
243         int child;
244         child = fork();
245         if (child < 0) {
246                 pr_err("fork() failed: %m\n");
247                 return -1;
248         }
249
250         if (child) {
251                 child_count++;
252                 pr_info("Fork %d, child %d\n", child_count, child);
253                 return child;
254         }
255
256         /*
257          * Child processes may want to use waitpid() for synchronizing
258          * with their sub-childs. Disable the signal handler by
259          * default
260          */
261         cancel_sigchild_handler();
262
263         /*
264          * Also do not notify the master parent the death of this
265          * child. Only childs that have been created with
266          * do_fork_limited() can have this flag set.
267          */
268         is_limited_fork = 0;
269
270         /* reset child's child count */
271         child_count = 0;
272         parent_count++;
273         return 0;
274 }
275
276 static int request_fork(char request)
277 {
278         int pid = getpid();
279
280         pid = request > 0 ? pid : -pid;
281
282         return write(job_request_fd[1], &pid, sizeof(pid));
283 }
284
285 static void limited_fork_exit_handler(void)
286 {
287         if (is_limited_fork)
288                 request_fork(-1);
289 }
290
291 /*
292  * Like do_fork(), but allow the child continue only after the global
293  * job count is low enough.
294  *
295  * We allow the parent to continue other more important activities but
296  * child respects the limit of global active processes.
297  */
298 int do_fork_limited(void)
299 {
300         int child, ret;
301         char byte;
302
303         child = do_fork();
304         if (child)
305                 return child;
306
307         /* Remember to notify the parent when we are done */
308         atexit(limited_fork_exit_handler);
309         is_limited_fork = 1;
310
311         pr_info("Requesting permission to go\n");
312
313         /* Signal the parent that we are here, waiting to go */
314         request_fork(1);
315
316         /*
317          * The parent will tell us when we can continue. If there were
318          * multiple children waiting for their turn to run parent's
319          * write to the pipe will wake up every process reading
320          * it. However, only one will be reading the content and
321          * getting the permission to run. So we will read as many
322          * times as needed until we get our own permission to run.
323          */
324         do {
325                 ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
326                 if (ret == 1)
327                         break;
328
329         } while(1);
330
331         pr_info("Continuing\n");
332         return child;
333 }
334
335 int harvest_zombies(int pid)
336 {
337         int status;
338
339         if (child_count == 0)
340                 return 0;
341
342         if (pid)
343                 pr_info("Waiting on pid %d, children left: %d\n", pid, 
344                         child_count);
345
346         pid = waitpid(pid, &status, 0);
347         if (pid < 0) {
348                 pr_err("Error on wait(): %m\n");
349                 child_count--; /* Decrement child count anyway */
350         }
351         else {
352                 child_count--;
353                 pr_info("pid %d: exit code %d. Children left: %d\n", pid,
354                         status, child_count);
355         }
356
357         return 1;
358 }
359
360 /*
361  * Runs a command cmd with params argv, connects stdin and stdout to
362  * readfd and writefd
363  *
364  * Returns the pid of the executed process
365  */
366 int run_piped(const char *cmd, char *const argv[],
367               int *stdinfd, int *stdoutfd, int *stderrfd)
368 {
369         int ifd[2], ofd[2], efd[2], pid;
370
371         pr_info("Running command %s\n", cmd);
372
373         if (stdinfd && pipe(ifd)) {
374                 pr_err("pipe() failed: %m\n");
375                 return -1;
376         }
377
378         if (stdoutfd && pipe(ofd)) {
379                 pr_err("pipe() failed: %m\n");
380                 return -1;
381         }
382
383         if (stderrfd && pipe(efd)) {
384                 pr_err("pipe() failed: %m\n");
385                 return -1;
386         }
387
388         pid = do_fork();
389         if (pid) { /* Parent side */
390                 if (stdinfd) {
391                         close(ifd[0]);
392                         *stdinfd = ifd[0];
393                 }
394
395                 if (stdoutfd) {
396                         close(ofd[1]);
397                         *stdoutfd = ofd[0];
398                 }
399
400                 if (stderrfd) {
401                         close(efd[1]);
402                         *stderrfd = efd[0];
403                 }
404
405                 return pid;
406         }
407
408         if (stdinfd) {
409                 close(ifd[1]);
410                 dup2(ifd[0], STDIN_FILENO);
411         }
412
413         if (stdoutfd) {
414                 close(ofd[0]);
415                 dup2(ofd[1], STDOUT_FILENO);
416         }
417
418         if (stderrfd) {
419                 close(efd[0]);
420                 dup2(efd[1], STDERR_FILENO);
421         }
422
423         /* Now we have redirected standard streams to parent process */
424         execvp(cmd, argv);
425         pr_err("Failed to execv command %s: %m\n", cmd);
426         exit(1);
427
428         return 0;
429 }
430
431 /*
432  * Runs a command cmd with params argv, connects stdin and stdout to
433  * readfd and writefd
434  *
435  * Returns the pid of the executed process
436  */
437 int run_piped_stream(const char *cmd, char *const argv[],
438                      FILE **stdinf, FILE **stdoutf, FILE **stderrf)
439 {
440         int ifd, ofd, efd, pid;
441         int *i, *o, *e;
442
443         if (stdinf)
444                 i = &ifd;
445         else 
446                 i = 0;
447         if (stdoutf)
448                 o = &ofd;
449         else 
450                 o = 0;
451         if (stderrf)
452                 e = &efd;
453         else 
454                 e = 0;
455
456         pid = run_piped(cmd, argv, i, o, e);
457
458         if (stdinf) {
459                 *stdinf = fdopen(ifd, "r");
460                 if (*stdinf == NULL) {
461                         pr_err("Error opening file stream for fd %d: %m\n",
462                                ifd);
463                         return -1;
464                 }
465         }
466
467         if (stdoutf) {
468                 *stdoutf = fdopen(ofd, "r");
469                 if (*stdoutf == NULL) {
470                         pr_err("Error opening file stream for fd %d: %m\n",
471                                ofd);
472                         return -1;
473                 }
474         }
475
476         if (stderrf) {
477                 *stderrf = fdopen(efd, "r");
478                 if (*stderrf == NULL) {
479                         pr_err("Error opening file stream for fd %d: %m\n",
480                                efd);
481                         return -1;
482                 }
483         }
484
485         return pid;
486 }
487
488 /*
489  * Forks a child and executes a command to run on parallel
490  */
491
492 #define max(a,b) (a) < (b) ? (b) : (a)
493 #define BUF_SIZE (128*1024)
494 int run(const char *cmd, char *const argv[])
495 {
496         int child, error;
497         int ofd, efd;
498         fd_set rfds;
499         int maxfd;
500         int eof = 0;
501         char stdoutstr[32], stderrstr[32], indent[16] = { "                " };
502
503         indent[get_parent_count() + 1] = 0;
504                 
505         if ((child = do_fork()))
506             return child;
507
508         child = run_piped(cmd, argv, NULL, &ofd, &efd);
509         snprintf(stdoutstr, 32, "%sstdout", green_color);
510         snprintf(stderrstr, 32, "%sstderr", red_color);
511
512         FD_ZERO(&rfds);
513         FD_SET(ofd, &rfds);
514         FD_SET(efd, &rfds);
515
516         while (!eof) {
517                 char *sptr , *eptr;
518                 char rbuf[BUF_SIZE];
519                 int bytes;
520                 char *typestr;
521
522                 maxfd = max(ofd, efd);
523                 error = select(maxfd, &rfds, NULL, NULL, NULL);
524
525                 if (error < 0) {
526                         pr_err("Error with select: %m\n");
527                         break;
528                 }
529
530                 if (FD_ISSET(ofd, &rfds)) {
531                         typestr = stdoutstr;
532                         bytes = read(ofd, rbuf, BUF_SIZE);
533                         goto print;
534                 }
535
536                 if (FD_ISSET(efd, &rfds)) {
537                         typestr = stderrstr;
538                         bytes = read(efd, rbuf, BUF_SIZE);
539                         goto print;
540                 }
541
542                 pr_err("select() returned unknown fd\n");
543                 break;
544
545 print:
546                 if (bytes < 0) {
547                         pr_err("read() failed: %m\n");
548                         break;
549                 }
550
551                 /*
552                  * Workaround: When a process had die and it has only
553                  * written to stderr, select() doesn't indicate that
554                  * there might be something to read in stderr fd. To
555                  * work around this issue, we try to read stderr just
556                  * in case in order to ensure everything gets read.
557                  */
558                 if (bytes == 0) {
559                         bytes = read(efd, rbuf, BUF_SIZE);
560                         typestr = stderrstr;
561                         eof = 1;
562                 }
563
564                 sptr = eptr = rbuf;
565                 while(bytes--) {
566                         if (*eptr == '\n') {
567                                 *eptr = 0;
568                                 fprintf(stderr, "%s[%5d %s] %s: %s%s\n", indent,
569                                        child, cmd, typestr, sptr, normal_color);
570                                 sptr = eptr;
571                         }
572                         eptr++;
573                 }
574         }
575
576         close(ofd);
577         close(efd);
578
579         harvest_zombies(child); 
580
581         exit(1);
582         return 0;
583 }