]> git.itanic.dy.fi Git - rrdd/blob - process.c
process: request_fork: Ensure function parameter is always signed
[rrdd] / process.c
1 #define _GNU_SOURCE
2 #include <unistd.h>
3 #include <fcntl.h>
4 #include <sys/select.h>
5 #include <sys/epoll.h>
6 #include <stdio.h>
7 #include <sys/wait.h>
8 #include <sys/signalfd.h>
9 #include <sys/resource.h>
10
11 #include "process.h"
12 #include "debug.h"
13
14 static int child_count;
15 static int parent_count;
16 static int job_request_fd[2];
17 static int job_get_permission_fd[2];
18 static int signal_fd;
19 static int epoll_fd;
20 static unsigned int max_jobs;
21 static unsigned int job_count;
22 static unsigned int jobs_pending;
23
24 int get_child_count(void)
25 {
26         return child_count;
27 }
28
29 int get_parent_count(void)
30 {
31         return parent_count;
32 }
33
34 static int handle_signals(void)
35 {
36         struct signalfd_siginfo siginfo;
37         int ret;
38
39         ret = read(signal_fd, &siginfo, sizeof(siginfo));
40         if (ret < sizeof(siginfo)) {
41                 pr_err("Expected %zd from read, got %d: %m\n",
42                         sizeof(siginfo), ret);
43                 return -1;
44         }
45
46         if (siginfo.ssi_signo != SIGCHLD) {
47                 pr_err("Unexpected signal %d, ignoring\n", siginfo.ssi_signo);
48                 return -1;
49         }
50
51         harvest_zombies(siginfo.ssi_pid);
52
53         return 0;
54 }
55
56 /*
57  * Initialize the jobcontrol.
58  *
59  * Create the pipes that are used to grant children execution
60  * permissions. If max_jobs is zero, count the number of CPUs from
61  * /proc/cpuinfo and use that.
62  */
63 int init_jobcontrol(int max_jobs_requested)
64 {
65         struct epoll_event ev;
66         FILE *file;
67         int ret;
68         sigset_t sigmask;
69         char buf[256];
70         char match[8];
71
72         if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
73                 pr_err("Failed to create pipe: %m\n");
74                 return -1;
75         }
76
77         if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
78                 pr_err("Failed to create pipe: %m\n");
79                 return -1;
80         }
81
82         epoll_fd = epoll_create(1);
83         if (epoll_fd == -1) {
84                 pr_err("Failed to epoll_create(): %m\n");
85                 return -1;
86         }
87
88         ev.events = EPOLLIN;
89         ev.data.fd = job_request_fd[0];
90         ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, job_request_fd[0], &ev);
91         if (ret) {
92                 pr_err("Failed to add epoll_fd: %m\n");
93                 return -1;
94         }
95
96         sigemptyset(&sigmask);
97         sigaddset(&sigmask, SIGCHLD);
98
99         /* Block SIGCHLD so that it becomes readable via signalfd */
100         ret = sigprocmask(SIG_BLOCK, &sigmask, NULL);
101         if (ret < 0) {
102                 pr_err("Failed to sigprocmask: %m\n");
103         }
104
105         signal_fd = signalfd(-1, &sigmask, SFD_CLOEXEC);
106         if (signal_fd < 0) {
107                 pr_err("Failed to create signal_fd: %m\n");
108                 return -1;
109         }
110
111         ev.data.fd = signal_fd;
112         ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, signal_fd, &ev);
113         if (ret) {
114                 pr_err("Failed to add epoll_fd: %m\n");
115                 return -1;
116         }
117
118
119         if (max_jobs_requested > 0) {
120                 max_jobs = max_jobs_requested;
121                 goto no_count_cpus;
122         }
123         max_jobs++;
124
125         file = fopen("/proc/cpuinfo", "ro");
126         if (!file) {
127                 pr_err("Failed to open /proc/cpuinfo: %m\n");
128                 goto open_fail;
129         }
130
131         /*
132          * The CPU count algorithm simply reads the first 8 bytes from
133          * the /proc/cpuinfo and then expects that line to be there as
134          * many times as there are CPUs.
135          */
136         ret = fread(match, 1, sizeof(match), file);
137         if (ret < sizeof(match)) {
138                 pr_err("read %d bytes when expecting %zd %m\n",
139                         ret, sizeof(match));
140                 goto read_fail;
141         }
142
143         while(fgets(buf, sizeof(buf), file)) {
144                 if (!strncmp(buf, match, sizeof(match)))
145                         max_jobs++;
146         }
147
148 open_fail:
149 read_fail:
150         fclose(file);
151
152 no_count_cpus:
153         pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
154
155         return 0;
156 }
157
158 static int grant_new_job(void)
159 {
160         int ret;
161         char byte = 0;
162
163         job_count++;
164         pr_info("Granting new job. %d jobs currently and %d pending\n",
165                 job_count, jobs_pending);
166
167         ret = write(job_get_permission_fd[1], &byte, 1);
168         if (ret != 1) {
169                 pr_err("Failed to write 1 byte: %m\n");
170                 return -1;
171         }
172
173         return 0;
174 }
175
176 static int handle_job_request(void)
177 {
178         int ret, pid;
179
180         ret = read(job_request_fd[0], &pid, sizeof(pid));
181         if (ret < 0) {
182                 pr_err("Failed to read: %m\n");
183                 return -1;
184         }
185
186         if (ret == 0) {
187                 pr_info("Read zero bytes\n");
188                 return 0;
189         }
190
191         if (pid > 0) {
192                 if (job_count >= max_jobs) {
193                         jobs_pending++;
194                 } else {
195                         ret = grant_new_job();
196                         return 0;
197                 }
198         } else if (pid < 0) {
199                 if (job_count > max_jobs)
200                         pr_err("BUG: Job %u jobs exceeded limit %u\n",
201                                 job_count, max_jobs);
202
203                 pr_info("Job %d finished\n", -pid);
204                 job_count--;
205                 if (jobs_pending) {
206                         jobs_pending--;
207                         ret = grant_new_job();
208                         return 0;
209                 }
210         }
211
212         return 0;
213 }
214
215 int poll_job_requests(int timeout)
216 {
217         struct epoll_event event;
218         int ret;
219
220         /* Convert positive seconds to milliseconds */
221         timeout = timeout > 0 ? 1000 * timeout : timeout;
222
223         ret = epoll_wait(epoll_fd, &event, 1, timeout);
224
225         if (ret == -1) {
226                 if (errno != EINTR) {
227                         pr_err("epoll_wait: %m\n");
228                         return -1;
229                 }
230
231                 /*
232                  * If epoll_wait() was interrupted, better start
233                  * everything again from the beginning
234                  */
235                 return 0;
236         }
237
238         if (ret == 0) {
239                 pr_info("Timed out\n");
240                 goto out;
241         }
242
243         if (event.data.fd == job_request_fd[0])
244                 handle_job_request();
245         else if (event.data.fd == signal_fd)
246                 handle_signals();
247         else
248                 pr_err("Unknown fd: %d\n", event.data.fd);
249
250 out:
251         pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
252         return ret;
253 }
254
255 /*
256  * Per process flag indicating whether this child has requested fork
257  * limiting. If it has, it must also tell the master parent when it
258  * has died so that the parent can give next pending job permission to
259  * go.
260  */
261 static int is_limited_fork;
262
263 int do_fork(void)
264 {
265         int child;
266         child = fork();
267         if (child < 0) {
268                 pr_err("fork() failed: %m\n");
269                 return -1;
270         }
271
272         if (child) {
273                 child_count++;
274                 pr_info("Fork %d, child %d\n", child_count, child);
275                 return child;
276         }
277
278         /*
279          * Also do not notify the master parent the death of this
280          * child. Only childs that have been created with
281          * do_fork_limited() can have this flag set.
282          */
283         is_limited_fork = 0;
284
285         /*
286          * Close unused ends of the job control pipes. Only the parent
287          * which controls the jobs may have the write end open of the
288          * job_get_permission_fd and the read end of the
289          * job_request_fd. Failing to close the pipe ends properly
290          * will cause the childs to wait forever for the run
291          * permission in case parent dies prematurely.
292          *
293          * Note! The file descriptor must be closed once and only
294          * once. They are marked to -1 to make it impossible for
295          * subseqent do_fork() calls from closing them again (in which
296          * case some other file descriptor might already be reserved
297          * for the same number) and prevent accidentally closing some
298          * innocent file descriptors that are still in use.
299          */
300         if (job_get_permission_fd[1] >= 0) {
301                 close(job_get_permission_fd[1]);
302                 job_get_permission_fd[1] = -1;
303         }
304         if (job_request_fd[0] >= 0) {
305                 close(job_request_fd[0]);
306                 job_request_fd[0] = -1;
307         }
308
309         /* reset child's child count */
310         child_count = 0;
311         parent_count++;
312         return 0;
313 }
314
315 static int request_fork(int request)
316 {
317         int pid = getpid();
318
319         pid = request > 0 ? pid : -pid;
320
321         return write(job_request_fd[1], &pid, sizeof(pid));
322 }
323
324 static void limited_fork_exit_handler(void)
325 {
326         if (is_limited_fork)
327                 request_fork(-1);
328 }
329
330 /*
331  * Like do_fork(), but allow the child continue only after the global
332  * job count is low enough.
333  *
334  * We allow the parent to continue other more important activities but
335  * child respects the limit of global active processes.
336  */
337 int do_fork_limited(void)
338 {
339         int child, ret;
340         char byte;
341
342         child = do_fork();
343         if (child)
344                 return child;
345
346         /* Remember to notify the parent when we are done */
347         atexit(limited_fork_exit_handler);
348         is_limited_fork = 1;
349
350         pr_info("Requesting permission to go\n");
351
352         /* Signal the parent that we are here, waiting to go */
353         request_fork(1);
354
355         /*
356          * The parent will tell us when we can continue. If there were
357          * multiple children waiting for their turn to run only one
358          * will be reading the content byte from the pipe and getting
359          * the permission to run.
360          */
361         ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
362         if (ret == 0)
363                 pr_err("Error requesting run, did the parent die?\n");
364
365         if (ret < 0)
366                 pr_err("Job control request failure: %m\n");
367
368         pr_info("Continuing\n");
369         return child;
370 }
371
372 int harvest_zombies(int pid)
373 {
374         int status;
375         struct rusage rusage;
376         char *status_str = NULL;
377         int code = 0;
378
379         if (child_count == 0)
380                 return 0;
381
382         if (pid)
383                 pr_info("Waiting on pid %d, children left: %d\n", pid, 
384                         child_count);
385
386         do {
387                 pid = wait4(pid, &status, 0, &rusage);
388                 if (pid < 0) {
389                         pr_err("Error on waitid(): %m\n");
390                         return 0;
391                 }
392                 /* Wait until the child has become a zombie */
393         } while (!WIFEXITED(status) && !WIFSIGNALED(status));
394
395         child_count--;
396         if (WIFEXITED(status)) {
397                 status_str = "exited with status";
398                 code = WEXITSTATUS(status);
399         } else if (WIFSIGNALED(status)) {
400                 status_str = "killed by signal";
401                 code = WTERMSIG(status);
402         }
403         pr_info("pid %d: %s %d. Children left: %d\n", pid,
404                 status_str, code, child_count);
405         pr_info("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
406                 (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
407                 (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
408
409         return 1;
410 }
411
412 /*
413  * Runs a command cmd with params argv, connects stdin and stdout to
414  * readfd and writefd
415  *
416  * Returns the pid of the executed process
417  */
418 int run_piped(const char *cmd, char *const argv[],
419               int *stdinfd, int *stdoutfd, int *stderrfd)
420 {
421         int ifd[2], ofd[2], efd[2], pid;
422
423         pr_info("Running command %s\n", cmd);
424
425         if (stdinfd && pipe(ifd)) {
426                 pr_err("pipe() failed: %m\n");
427                 return -1;
428         }
429
430         if (stdoutfd && pipe(ofd)) {
431                 pr_err("pipe() failed: %m\n");
432                 return -1;
433         }
434
435         if (stderrfd && pipe(efd)) {
436                 pr_err("pipe() failed: %m\n");
437                 return -1;
438         }
439
440         pid = do_fork();
441         if (pid) { /* Parent side */
442                 if (stdinfd) {
443                         close(ifd[0]);
444                         *stdinfd = ifd[0];
445                 }
446
447                 if (stdoutfd) {
448                         close(ofd[1]);
449                         *stdoutfd = ofd[0];
450                 }
451
452                 if (stderrfd) {
453                         close(efd[1]);
454                         *stderrfd = efd[0];
455                 }
456
457                 return pid;
458         }
459
460         if (stdinfd) {
461                 close(ifd[1]);
462                 dup2(ifd[0], STDIN_FILENO);
463         }
464
465         if (stdoutfd) {
466                 close(ofd[0]);
467                 dup2(ofd[1], STDOUT_FILENO);
468         }
469
470         if (stderrfd) {
471                 close(efd[0]);
472                 dup2(efd[1], STDERR_FILENO);
473         }
474
475         /* Now we have redirected standard streams to parent process */
476         execvp(cmd, argv);
477         pr_err("Failed to execv command %s: %m\n", cmd);
478         exit(1);
479
480         return 0;
481 }
482
483 /*
484  * Runs a command cmd with params argv, connects stdin and stdout to
485  * readfd and writefd
486  *
487  * Returns the pid of the executed process
488  */
489 int run_piped_stream(const char *cmd, char *const argv[],
490                      FILE **stdinf, FILE **stdoutf, FILE **stderrf)
491 {
492         int ifd, ofd, efd, pid;
493         int *i, *o, *e;
494
495         if (stdinf)
496                 i = &ifd;
497         else 
498                 i = 0;
499         if (stdoutf)
500                 o = &ofd;
501         else 
502                 o = 0;
503         if (stderrf)
504                 e = &efd;
505         else 
506                 e = 0;
507
508         pid = run_piped(cmd, argv, i, o, e);
509
510         if (stdinf) {
511                 *stdinf = fdopen(ifd, "r");
512                 if (*stdinf == NULL) {
513                         pr_err("Error opening file stream for fd %d: %m\n",
514                                ifd);
515                         return -1;
516                 }
517         }
518
519         if (stdoutf) {
520                 *stdoutf = fdopen(ofd, "r");
521                 if (*stdoutf == NULL) {
522                         pr_err("Error opening file stream for fd %d: %m\n",
523                                ofd);
524                         return -1;
525                 }
526         }
527
528         if (stderrf) {
529                 *stderrf = fdopen(efd, "r");
530                 if (*stderrf == NULL) {
531                         pr_err("Error opening file stream for fd %d: %m\n",
532                                efd);
533                         return -1;
534                 }
535         }
536
537         return pid;
538 }
539
540 /*
541  * Forks a child and executes a command to run on parallel
542  */
543
544 #define max(a,b) (a) < (b) ? (b) : (a)
545 #define BUF_SIZE (128*1024)
546 int run(const char *cmd, char *const argv[])
547 {
548         int child, error;
549         int ofd, efd;
550         fd_set rfds;
551         int maxfd;
552         int eof = 0;
553
554         if ((child = do_fork()))
555             return child;
556
557         child = run_piped(cmd, argv, NULL, &ofd, &efd);
558
559         FD_ZERO(&rfds);
560         FD_SET(ofd, &rfds);
561         FD_SET(efd, &rfds);
562
563         while (!eof) {
564                 char *sptr , *eptr;
565                 char rbuf[BUF_SIZE];
566                 int bytes;
567                 int is_stderr = 0;
568
569                 maxfd = max(ofd, efd);
570                 error = select(maxfd, &rfds, NULL, NULL, NULL);
571
572                 if (error < 0) {
573                         pr_err("Error with select: %m\n");
574                         break;
575                 }
576
577                 if (FD_ISSET(ofd, &rfds)) {
578                         bytes = read(ofd, rbuf, BUF_SIZE);
579                         goto print;
580                 }
581
582                 if (FD_ISSET(efd, &rfds)) {
583                         is_stderr = 1;
584                         bytes = read(efd, rbuf, BUF_SIZE);
585                         goto print;
586                 }
587
588                 pr_err("select() returned unknown fd\n");
589                 break;
590
591 print:
592                 if (bytes < 0) {
593                         pr_err("read() failed: %m\n");
594                         break;
595                 }
596
597                 /*
598                  * Workaround: When a process had die and it has only
599                  * written to stderr, select() doesn't indicate that
600                  * there might be something to read in stderr fd. To
601                  * work around this issue, we try to read stderr just
602                  * in case in order to ensure everything gets read.
603                  */
604                 if (bytes == 0) {
605                         bytes = read(efd, rbuf, BUF_SIZE);
606                         is_stderr = 1;
607                         eof = 1;
608                 }
609
610                 sptr = eptr = rbuf;
611                 while(bytes--) {
612                         if (*eptr == '\n') {
613                                 *eptr = 0;
614                                 if (is_stderr)
615                                         pr_err("%s: stderr: %s\n",
616                                                 cmd, sptr);
617                                 else
618                                         pr_info("%s: stdout: %s\n",
619                                                 cmd, sptr);
620                                 sptr = eptr;
621                         }
622                         eptr++;
623                 }
624         }
625
626         close(ofd);
627         close(efd);
628
629         harvest_zombies(child); 
630
631         exit(1);
632         return 0;
633 }