]> git.itanic.dy.fi Git - rrdd/blob - process.c
process.c: Factor out job request handling
[rrdd] / process.c
1 #define _GNU_SOURCE
2 #include <unistd.h>
3 #include <fcntl.h>
4 #include <sys/select.h>
5 #include <sys/epoll.h>
6 #include <stdio.h>
7
8 #include "process.h"
9 #include "debug.h"
10
11 static int child_count;
12 static int parent_count;
13 static int job_request_fd[2];
14 static int job_get_permission_fd[2];
15 static int epoll_fd;
16 static unsigned int max_jobs;
17 static unsigned int job_count;
18 static unsigned int jobs_pending;
19
20 int get_child_count(void)
21 {
22         return child_count;
23 }
24
25 int get_parent_count(void)
26 {
27         return parent_count;
28 }
29
30 static void sigchild_handler(int signal)
31 {
32         int status;
33
34         waitpid(0, &status, 0);
35 }
36
37 static int setup_sigchild_handler(void)
38 {
39         struct sigaction sa;
40         int ret;
41
42         sa.sa_handler = sigchild_handler;
43         sa.sa_flags = SA_NOCLDSTOP;
44
45         ret = sigaction(SIGCHLD, &sa, NULL);
46         if (ret)
47                 pr_err("Failed to setup SIGCHLD handler: %m\n");
48
49         return ret;
50 }
51
52 static int cancel_sigchild_handler(void)
53 {
54         struct sigaction sa;
55         int ret;
56
57         sa.sa_handler = SIG_DFL;
58         sa.sa_flags = SA_NOCLDSTOP;
59
60         ret = sigaction(SIGCHLD, &sa, NULL);
61         if (ret)
62                 pr_err("Failed to cancel SIGCHLD handler: %m\n");
63
64         return ret;
65 }
66
67 /*
68  * Initialize the jobcontrol.
69  *
70  * Create the pipes that are used to grant children execution
71  * permissions. If max_jobs is zero, count the number of CPUs from
72  * /proc/cpuinfo and use that.
73  */
74 int init_jobcontrol(int max_jobs_requested)
75 {
76         struct epoll_event ev;
77         FILE *file;
78         int ret;
79         char buf[256];
80         char match[8];
81
82         if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
83                 pr_err("Failed to create pipe: %m\n");
84                 return -1;
85         }
86
87         if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
88                 pr_err("Failed to create pipe: %m\n");
89                 return -1;
90         }
91
92         ret = setup_sigchild_handler();
93         if (ret)
94                 return ret;
95
96         epoll_fd = epoll_create(1);
97         if (epoll_fd == -1) {
98                 pr_err("Failed to epoll_create(): %m\n");
99                 return -1;
100         }
101
102         ev.events = EPOLLIN;
103         ev.data.fd = job_request_fd[0];
104         ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, job_request_fd[0], &ev);
105         if (ret) {
106                 pr_err("Failed to add epoll_fd: %m\n");
107                 return -1;
108         }
109
110         if (max_jobs_requested > 0) {
111                 max_jobs = max_jobs_requested;
112                 goto no_count_cpus;
113         }
114         max_jobs++;
115
116         file = fopen("/proc/cpuinfo", "ro");
117         if (!file) {
118                 pr_err("Failed to open /proc/cpuinfo: %m\n");
119                 goto open_fail;
120         }
121
122         /*
123          * The CPU count algorithm simply reads the first 8 bytes from
124          * the /proc/cpuinfo and then expects that line to be there as
125          * many times as there are CPUs.
126          */
127         ret = fread(match, 1, sizeof(match), file);
128         if (ret < sizeof(match)) {
129                 pr_err("read %d bytes when expecting %zd %m\n",
130                         ret, sizeof(match));
131                 goto read_fail;
132         }
133
134         while(fgets(buf, sizeof(buf), file)) {
135                 if (!strncmp(buf, match, sizeof(match)))
136                         max_jobs++;
137         }
138
139 open_fail:
140 read_fail:
141         fclose(file);
142
143 no_count_cpus:
144         pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
145
146         return 0;
147 }
148
149 static int grant_new_job(void)
150 {
151         int ret;
152         char byte = 0;
153
154         job_count++;
155         pr_info("Granting new job. %d jobs currently and %d pending\n",
156                 job_count, jobs_pending);
157
158         ret = write(job_get_permission_fd[1], &byte, 1);
159         if (ret != 1) {
160                 pr_err("Failed to write 1 byte: %m\n");
161                 return -1;
162         }
163
164         return 0;
165 }
166
167 static int handle_job_request(void)
168 {
169         int ret, pid;
170
171         ret = read(job_request_fd[0], &pid, sizeof(pid));
172         if (ret < 0) {
173                 pr_err("Failed to read: %m\n");
174                 return -1;
175         }
176
177         if (ret == 0) {
178                 pr_info("Read zero bytes\n");
179                 return 0;
180         }
181
182         if (pid > 0) {
183                 if (job_count >= max_jobs) {
184                         jobs_pending++;
185                 } else {
186                         ret = grant_new_job();
187                         return 0;
188                 }
189         } else if (pid < 0) {
190                 if (job_count > max_jobs)
191                         pr_err("BUG: Job %u jobs exceeded limit %u\n",
192                                 job_count, max_jobs);
193
194                 pr_info("Job %d finished\n", -pid);
195                 job_count--;
196                 if (jobs_pending) {
197                         jobs_pending--;
198                         ret = grant_new_job();
199                         return 0;
200                 }
201         }
202
203         return 0;
204 }
205
206 int poll_job_requests(int timeout)
207 {
208         struct epoll_event event;
209         int ret;
210
211         /* Convert positive seconds to milliseconds */
212         timeout = timeout > 0 ? 1000 * timeout : timeout;
213
214         ret = epoll_wait(epoll_fd, &event, 1, timeout);
215
216         if (ret == -1) {
217                 if (errno != EINTR) {
218                         pr_err("epoll_wait: %m\n");
219                         return -1;
220                 }
221
222                 /*
223                  * If epoll_wait() was interrupted, better start
224                  * everything again from the beginning
225                  */
226                 return 0;
227         }
228
229         if (ret == 0) {
230                 pr_info("Timed out\n");
231                 goto out;
232         }
233
234         if (event.data.fd == job_request_fd[0])
235                 handle_job_request();
236         else
237                 pr_err("Unknown fd: %d\n", event.data.fd);
238
239 out:
240         pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
241         return ret;
242 }
243
244 /*
245  * Per process flag indicating whether this child has requested fork
246  * limiting. If it has, it must also tell the master parent when it
247  * has died so that the parent can give next pending job permission to
248  * go.
249  */
250 static int is_limited_fork;
251
252 int do_fork(void)
253 {
254         int child;
255         child = fork();
256         if (child < 0) {
257                 pr_err("fork() failed: %m\n");
258                 return -1;
259         }
260
261         if (child) {
262                 child_count++;
263                 pr_info("Fork %d, child %d\n", child_count, child);
264                 return child;
265         }
266
267         /*
268          * Child processes may want to use waitpid() for synchronizing
269          * with their sub-childs. Disable the signal handler by
270          * default
271          */
272         cancel_sigchild_handler();
273
274         /*
275          * Also do not notify the master parent the death of this
276          * child. Only childs that have been created with
277          * do_fork_limited() can have this flag set.
278          */
279         is_limited_fork = 0;
280
281         /* reset child's child count */
282         child_count = 0;
283         parent_count++;
284         return 0;
285 }
286
287 static int request_fork(char request)
288 {
289         int pid = getpid();
290
291         pid = request > 0 ? pid : -pid;
292
293         return write(job_request_fd[1], &pid, sizeof(pid));
294 }
295
296 static void limited_fork_exit_handler(void)
297 {
298         if (is_limited_fork)
299                 request_fork(-1);
300 }
301
302 /*
303  * Like do_fork(), but allow the child continue only after the global
304  * job count is low enough.
305  *
306  * We allow the parent to continue other more important activities but
307  * child respects the limit of global active processes.
308  */
309 int do_fork_limited(void)
310 {
311         int child, ret;
312         char byte;
313
314         child = do_fork();
315         if (child)
316                 return child;
317
318         /* Remember to notify the parent when we are done */
319         atexit(limited_fork_exit_handler);
320         is_limited_fork = 1;
321
322         pr_info("Requesting permission to go\n");
323
324         /* Signal the parent that we are here, waiting to go */
325         request_fork(1);
326
327         /*
328          * The parent will tell us when we can continue. If there were
329          * multiple children waiting for their turn to run parent's
330          * write to the pipe will wake up every process reading
331          * it. However, only one will be reading the content and
332          * getting the permission to run. So we will read as many
333          * times as needed until we get our own permission to run.
334          */
335         do {
336                 ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
337                 if (ret == 1)
338                         break;
339
340         } while(1);
341
342         pr_info("Continuing\n");
343         return child;
344 }
345
346 int harvest_zombies(int pid)
347 {
348         int status;
349
350         if (child_count == 0)
351                 return 0;
352
353         if (pid)
354                 pr_info("Waiting on pid %d, children left: %d\n", pid, 
355                         child_count);
356
357         pid = waitpid(pid, &status, 0);
358         if (pid < 0) {
359                 pr_err("Error on wait(): %m\n");
360                 child_count--; /* Decrement child count anyway */
361         }
362         else {
363                 child_count--;
364                 pr_info("pid %d: exit code %d. Children left: %d\n", pid,
365                         status, child_count);
366         }
367
368         return 1;
369 }
370
371 /*
372  * Runs a command cmd with params argv, connects stdin and stdout to
373  * readfd and writefd
374  *
375  * Returns the pid of the executed process
376  */
377 int run_piped(const char *cmd, char *const argv[],
378               int *stdinfd, int *stdoutfd, int *stderrfd)
379 {
380         int ifd[2], ofd[2], efd[2], pid;
381
382         pr_info("Running command %s\n", cmd);
383
384         if (stdinfd && pipe(ifd)) {
385                 pr_err("pipe() failed: %m\n");
386                 return -1;
387         }
388
389         if (stdoutfd && pipe(ofd)) {
390                 pr_err("pipe() failed: %m\n");
391                 return -1;
392         }
393
394         if (stderrfd && pipe(efd)) {
395                 pr_err("pipe() failed: %m\n");
396                 return -1;
397         }
398
399         pid = do_fork();
400         if (pid) { /* Parent side */
401                 if (stdinfd) {
402                         close(ifd[0]);
403                         *stdinfd = ifd[0];
404                 }
405
406                 if (stdoutfd) {
407                         close(ofd[1]);
408                         *stdoutfd = ofd[0];
409                 }
410
411                 if (stderrfd) {
412                         close(efd[1]);
413                         *stderrfd = efd[0];
414                 }
415
416                 return pid;
417         }
418
419         if (stdinfd) {
420                 close(ifd[1]);
421                 dup2(ifd[0], STDIN_FILENO);
422         }
423
424         if (stdoutfd) {
425                 close(ofd[0]);
426                 dup2(ofd[1], STDOUT_FILENO);
427         }
428
429         if (stderrfd) {
430                 close(efd[0]);
431                 dup2(efd[1], STDERR_FILENO);
432         }
433
434         /* Now we have redirected standard streams to parent process */
435         execvp(cmd, argv);
436         pr_err("Failed to execv command %s: %m\n", cmd);
437         exit(1);
438
439         return 0;
440 }
441
442 /*
443  * Runs a command cmd with params argv, connects stdin and stdout to
444  * readfd and writefd
445  *
446  * Returns the pid of the executed process
447  */
448 int run_piped_stream(const char *cmd, char *const argv[],
449                      FILE **stdinf, FILE **stdoutf, FILE **stderrf)
450 {
451         int ifd, ofd, efd, pid;
452         int *i, *o, *e;
453
454         if (stdinf)
455                 i = &ifd;
456         else 
457                 i = 0;
458         if (stdoutf)
459                 o = &ofd;
460         else 
461                 o = 0;
462         if (stderrf)
463                 e = &efd;
464         else 
465                 e = 0;
466
467         pid = run_piped(cmd, argv, i, o, e);
468
469         if (stdinf) {
470                 *stdinf = fdopen(ifd, "r");
471                 if (*stdinf == NULL) {
472                         pr_err("Error opening file stream for fd %d: %m\n",
473                                ifd);
474                         return -1;
475                 }
476         }
477
478         if (stdoutf) {
479                 *stdoutf = fdopen(ofd, "r");
480                 if (*stdoutf == NULL) {
481                         pr_err("Error opening file stream for fd %d: %m\n",
482                                ofd);
483                         return -1;
484                 }
485         }
486
487         if (stderrf) {
488                 *stderrf = fdopen(efd, "r");
489                 if (*stderrf == NULL) {
490                         pr_err("Error opening file stream for fd %d: %m\n",
491                                efd);
492                         return -1;
493                 }
494         }
495
496         return pid;
497 }
498
499 /*
500  * Forks a child and executes a command to run on parallel
501  */
502
503 #define max(a,b) (a) < (b) ? (b) : (a)
504 #define BUF_SIZE (128*1024)
505 int run(const char *cmd, char *const argv[])
506 {
507         int child, error;
508         int ofd, efd;
509         fd_set rfds;
510         int maxfd;
511         int eof = 0;
512         char stdoutstr[32], stderrstr[32], indent[16] = { "                " };
513
514         indent[get_parent_count() + 1] = 0;
515                 
516         if ((child = do_fork()))
517             return child;
518
519         child = run_piped(cmd, argv, NULL, &ofd, &efd);
520         snprintf(stdoutstr, 32, "%sstdout", green_color);
521         snprintf(stderrstr, 32, "%sstderr", red_color);
522
523         FD_ZERO(&rfds);
524         FD_SET(ofd, &rfds);
525         FD_SET(efd, &rfds);
526
527         while (!eof) {
528                 char *sptr , *eptr;
529                 char rbuf[BUF_SIZE];
530                 int bytes;
531                 char *typestr;
532
533                 maxfd = max(ofd, efd);
534                 error = select(maxfd, &rfds, NULL, NULL, NULL);
535
536                 if (error < 0) {
537                         pr_err("Error with select: %m\n");
538                         break;
539                 }
540
541                 if (FD_ISSET(ofd, &rfds)) {
542                         typestr = stdoutstr;
543                         bytes = read(ofd, rbuf, BUF_SIZE);
544                         goto print;
545                 }
546
547                 if (FD_ISSET(efd, &rfds)) {
548                         typestr = stderrstr;
549                         bytes = read(efd, rbuf, BUF_SIZE);
550                         goto print;
551                 }
552
553                 pr_err("select() returned unknown fd\n");
554                 break;
555
556 print:
557                 if (bytes < 0) {
558                         pr_err("read() failed: %m\n");
559                         break;
560                 }
561
562                 /*
563                  * Workaround: When a process had die and it has only
564                  * written to stderr, select() doesn't indicate that
565                  * there might be something to read in stderr fd. To
566                  * work around this issue, we try to read stderr just
567                  * in case in order to ensure everything gets read.
568                  */
569                 if (bytes == 0) {
570                         bytes = read(efd, rbuf, BUF_SIZE);
571                         typestr = stderrstr;
572                         eof = 1;
573                 }
574
575                 sptr = eptr = rbuf;
576                 while(bytes--) {
577                         if (*eptr == '\n') {
578                                 *eptr = 0;
579                                 fprintf(stderr, "%s[%5d %s] %s: %s%s\n", indent,
580                                        child, cmd, typestr, sptr, normal_color);
581                                 sptr = eptr;
582                         }
583                         eptr++;
584                 }
585         }
586
587         close(ofd);
588         close(efd);
589
590         harvest_zombies(child); 
591
592         exit(1);
593         return 0;
594 }