]> git.itanic.dy.fi Git - rrdd/blob - process.c
process: Add debug wrappers for pthread mutex operations
[rrdd] / process.c
1 #include <unistd.h>
2 #include <fcntl.h>
3 #include <sys/select.h>
4 #include <sys/epoll.h>
5 #include <stdio.h>
6 #include <sys/wait.h>
7 #include <sys/signalfd.h>
8 #include <sys/resource.h>
9
10 #include "process.h"
11 #include "debug.h"
12
13 static int child_count;
14 static int parent_count;
15 static int job_request_fd[2];
16 static int job_get_permission_fd[2];
17 static int epoll_fd;
18 static unsigned int max_jobs;
19 static unsigned int job_count;
20 static unsigned int jobs_pending;
21 static unsigned int max_jobs_pending;
22
23 int get_child_count(void)
24 {
25         return child_count;
26 }
27
28 int get_parent_count(void)
29 {
30         return parent_count;
31 }
32
33 static int handle_signals(struct event_handler *h)
34 {
35         struct signalfd_siginfo siginfo;
36         int ret;
37
38         ret = read(h->fd, &siginfo, sizeof(siginfo));
39         if (ret < sizeof(siginfo)) {
40                 pr_err("Expected %zd from read, got %d: %m\n",
41                         sizeof(siginfo), ret);
42                 return -1;
43         }
44
45         if (siginfo.ssi_signo != SIGCHLD) {
46                 pr_err("Unexpected signal %d, ignoring\n", siginfo.ssi_signo);
47                 return -1;
48         }
49
50         harvest_zombies(siginfo.ssi_pid);
51
52         return 0;
53 }
54
55 static int grant_new_job(void)
56 {
57         int ret;
58         char byte = 0;
59
60         job_count++;
61         pr_info("Granting new job. %d jobs currently and %d pending\n",
62                 job_count, jobs_pending);
63
64         ret = write(job_get_permission_fd[1], &byte, 1);
65         if (ret != 1) {
66                 pr_err("Failed to write 1 byte: %m\n");
67                 return -1;
68         }
69
70         return 0;
71 }
72
73 static int deny_job(void)
74 {
75         int ret;
76         char byte = -1;
77
78         pr_info("Denying new job. %d jobs currently and %d pending, "
79                 "limit of pending jobs is %d\n",
80                 job_count, jobs_pending, max_jobs_pending);
81
82         ret = write(job_get_permission_fd[1], &byte, 1);
83         if (ret != 1) {
84                 pr_err("Failed to write 1 byte: %m\n");
85                 return -1;
86         }
87
88         return 0;
89 }
90
91 static int handle_job_request(struct event_handler *h)
92 {
93         int ret, pid;
94
95         ret = read(job_request_fd[0], &pid, sizeof(pid));
96         if (ret < 0) {
97                 pr_err("Failed to read: %m\n");
98                 return -1;
99         }
100
101         if (ret == 0) {
102                 pr_info("Read zero bytes\n");
103                 return 0;
104         }
105
106         if (pid > 0) {
107                 if (job_count >= max_jobs) {
108                         if (jobs_pending < max_jobs_pending)
109                                 jobs_pending++;
110                         else
111                                 deny_job();
112                 } else {
113                         ret = grant_new_job();
114                         return 0;
115                 }
116         } else if (pid < 0) {
117                 if (job_count > max_jobs)
118                         pr_err("BUG: Job %u jobs exceeded limit %u\n",
119                                 job_count, max_jobs);
120
121                 pr_info("Job %d finished\n", -pid);
122                 job_count--;
123                 if (jobs_pending) {
124                         jobs_pending--;
125                         ret = grant_new_job();
126                         return 0;
127                 }
128         }
129
130         return 0;
131 }
132
133 struct event_handler signal_handler = {
134         .handle_event = handle_signals,
135         .events = EPOLLIN,
136         .name = "signal",
137 };
138
139 struct event_handler job_request_handler = {
140         .handle_event = handle_job_request,
141         .events = EPOLLIN,
142         .name = "job_request",
143 };
144
145 /*
146  * Initialize the jobcontrol.
147  *
148  * Create the pipes that are used to grant children execution
149  * permissions. If max_jobs is zero, count the number of CPUs from
150  * /proc/cpuinfo and use that.
151  */
152 int init_jobcontrol(int max_jobs_requested)
153 {
154         FILE *file;
155         int ret;
156         sigset_t sigmask;
157         char buf[256];
158         char match[8];
159
160         if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
161                 pr_err("Failed to create pipe: %m\n");
162                 return -1;
163         }
164
165         if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
166                 pr_err("Failed to create pipe: %m\n");
167                 return -1;
168         }
169
170         epoll_fd = epoll_create(1);
171         if (epoll_fd == -1) {
172                 pr_err("Failed to epoll_create(): %m\n");
173                 return -1;
174         }
175
176         job_request_handler.fd = job_request_fd[0];
177         register_event_handler(&job_request_handler);
178
179         sigemptyset(&sigmask);
180         sigaddset(&sigmask, SIGCHLD);
181
182         /* Block SIGCHLD so that it becomes readable via signalfd */
183         ret = sigprocmask(SIG_BLOCK, &sigmask, NULL);
184         if (ret < 0) {
185                 pr_err("Failed to sigprocmask: %m\n");
186         }
187
188         signal_handler.fd = signalfd(-1, &sigmask, SFD_CLOEXEC);
189         if (job_request_handler.fd < 0) {
190                 pr_err("Failed to create signal_fd: %m\n");
191                 return -1;
192         }
193
194         register_event_handler(&signal_handler);
195
196         if (max_jobs_requested > 0) {
197                 max_jobs = max_jobs_requested;
198                 goto no_count_cpus;
199         }
200         max_jobs++;
201
202         file = fopen("/proc/cpuinfo", "ro");
203         if (!file) {
204                 pr_err("Failed to open /proc/cpuinfo: %m\n");
205                 goto open_fail;
206         }
207
208         /*
209          * The CPU count algorithm simply reads the first 8 bytes from
210          * the /proc/cpuinfo and then expects that line to be there as
211          * many times as there are CPUs.
212          */
213         ret = fread(match, 1, sizeof(match), file);
214         if (ret < sizeof(match)) {
215                 pr_err("read %d bytes when expecting %zd %m\n",
216                         ret, sizeof(match));
217                 goto read_fail;
218         }
219
220         while(fgets(buf, sizeof(buf), file)) {
221                 if (!strncmp(buf, match, sizeof(match)))
222                         max_jobs++;
223         }
224
225 open_fail:
226 read_fail:
227         fclose(file);
228
229 no_count_cpus:
230         pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
231
232         max_jobs_pending = max_jobs * 10 + 25;
233         pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
234
235         return 0;
236 }
237
238 int poll_job_requests(int timeout)
239 {
240         struct epoll_event event;
241         struct event_handler *job_handler;
242         int ret;
243
244         /* Convert positive seconds to milliseconds */
245         timeout = timeout > 0 ? 1000 * timeout : timeout;
246
247         ret = epoll_wait(epoll_fd, &event, 1, timeout);
248
249         if (ret == -1) {
250                 if (errno != EINTR) {
251                         pr_err("epoll_wait: %m\n");
252                         return -1;
253                 }
254
255                 /*
256                  * If epoll_wait() was interrupted, better start
257                  * everything again from the beginning
258                  */
259                 return 0;
260         }
261
262         if (ret == 0) {
263                 pr_info("Timed out\n");
264                 goto out;
265         }
266
267         job_handler = event.data.ptr;
268
269         if (!job_handler || !job_handler->handle_event) {
270                 pr_err("Corrupted event handler for fd %d\n",
271                         event.data.fd);
272                 goto out;
273         }
274
275         pr_debug("Running handler %s to handle events from fd %d\n",
276                 job_handler->name, job_handler->fd);
277         job_handler->handle_event(job_handler);
278
279 out:
280         pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
281         return ret;
282 }
283
284 /*
285  * Per process flag indicating whether this child has requested fork
286  * limiting. If it has, it must also tell the master parent when it
287  * has died so that the parent can give next pending job permission to
288  * go.
289  */
290 static int is_limited_fork;
291
292 int do_fork(void)
293 {
294         int child;
295         child = fork();
296         if (child < 0) {
297                 pr_err("fork() failed: %m\n");
298                 return -1;
299         }
300
301         if (child) {
302                 child_count++;
303                 pr_debug("Fork %d, child %d\n", child_count, child);
304                 return child;
305         }
306
307         /*
308          * Also do not notify the master parent the death of this
309          * child. Only childs that have been created with
310          * do_fork_limited() can have this flag set.
311          */
312         is_limited_fork = 0;
313
314         /*
315          * Close unused ends of the job control pipes. Only the parent
316          * which controls the jobs may have the write end open of the
317          * job_get_permission_fd and the read end of the
318          * job_request_fd. Failing to close the pipe ends properly
319          * will cause the childs to wait forever for the run
320          * permission in case parent dies prematurely.
321          *
322          * Note! The file descriptor must be closed once and only
323          * once. They are marked to -1 to make it impossible for
324          * subseqent do_fork() calls from closing them again (in which
325          * case some other file descriptor might already be reserved
326          * for the same number) and prevent accidentally closing some
327          * innocent file descriptors that are still in use.
328          */
329         if (job_get_permission_fd[1] >= 0) {
330                 close(job_get_permission_fd[1]);
331                 job_get_permission_fd[1] = -1;
332         }
333         if (job_request_fd[0] >= 0) {
334                 close(job_request_fd[0]);
335                 job_request_fd[0] = -1;
336         }
337
338         /* reset child's child count */
339         child_count = 0;
340         parent_count++;
341         return 0;
342 }
343
344 static int request_fork(int request)
345 {
346         int pid = getpid();
347
348         pid = request > 0 ? pid : -pid;
349
350         return write(job_request_fd[1], &pid, sizeof(pid));
351 }
352
353 static void limited_fork_exit_handler(void)
354 {
355         if (is_limited_fork)
356                 request_fork(-1);
357 }
358
359 /*
360  * Like do_fork(), but allow the child continue only after the global
361  * job count is low enough.
362  *
363  * We allow the parent to continue other more important activities but
364  * child respects the limit of global active processes.
365  */
366 int do_fork_limited(void)
367 {
368         int child, ret;
369         char byte;
370
371         child = do_fork();
372         if (child)
373                 return child;
374
375         /* Remember to notify the parent when we are done */
376         atexit(limited_fork_exit_handler);
377         is_limited_fork = 1;
378
379         pr_debug("Requesting permission to go\n");
380
381         /* Signal the parent that we are here, waiting to go */
382         request_fork(1);
383
384         /*
385          * The parent will tell us when we can continue. If there were
386          * multiple children waiting for their turn to run only one
387          * will be reading the content byte from the pipe and getting
388          * the permission to run.
389          */
390         ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
391         if (ret == 0)
392                 pr_err("Error requesting run, did the parent die?\n");
393
394         if (ret < 0)
395                 pr_err("Job control request failure: %m\n");
396
397         if (byte < 0) {
398                 pr_info("Did not get permission to execute. Terminating\n");
399
400                 /*
401                  * Avoid running exit handler, that would tell the
402                  * parent we died normally and decrement the job
403                  * counters.
404                  */
405                 raise(SIGKILL);
406         }
407
408         pr_debug("Continuing\n");
409         return child;
410 }
411
412 int harvest_zombies(int pid)
413 {
414         int status;
415         struct rusage rusage;
416         char *status_str = NULL;
417         int code = 0;
418
419         if (child_count == 0)
420                 return 0;
421
422         if (pid)
423                 pr_debug("Waiting on pid %d, children left: %d\n", pid,
424                         child_count);
425
426         do {
427                 pid = wait4(pid, &status, 0, &rusage);
428                 if (pid < 0) {
429                         pr_err("Error on waitid(): %m\n");
430                         return 0;
431                 }
432                 /* Wait until the child has become a zombie */
433         } while (!WIFEXITED(status) && !WIFSIGNALED(status));
434
435         child_count--;
436         if (WIFEXITED(status)) {
437                 status_str = "exited with status";
438                 code = WEXITSTATUS(status);
439         } else if (WIFSIGNALED(status)) {
440                 status_str = "killed by signal";
441                 code = WTERMSIG(status);
442         }
443         pr_debug("pid %d: %s %d. Children left: %d\n", pid,
444                 status_str, code, child_count);
445         pr_debug("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
446                 (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
447                 (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
448
449         return 1;
450 }
451
452 /*
453  * Runs a command cmd with params argv, connects stdin and stdout to
454  * readfd and writefd
455  *
456  * Returns the pid of the executed process
457  */
458 int run_piped(const char *cmd, char *const argv[],
459               int *stdinfd, int *stdoutfd, int *stderrfd)
460 {
461         int ifd[2], ofd[2], efd[2], pid;
462
463         pr_info("Running command %s\n", cmd);
464
465         if (stdinfd && pipe(ifd)) {
466                 pr_err("pipe() failed: %m\n");
467                 return -1;
468         }
469
470         if (stdoutfd && pipe(ofd)) {
471                 pr_err("pipe() failed: %m\n");
472                 return -1;
473         }
474
475         if (stderrfd && pipe(efd)) {
476                 pr_err("pipe() failed: %m\n");
477                 return -1;
478         }
479
480         pid = do_fork();
481         if (pid) { /* Parent side */
482                 if (stdinfd) {
483                         close(ifd[0]);
484                         *stdinfd = ifd[0];
485                 }
486
487                 if (stdoutfd) {
488                         close(ofd[1]);
489                         *stdoutfd = ofd[0];
490                 }
491
492                 if (stderrfd) {
493                         close(efd[1]);
494                         *stderrfd = efd[0];
495                 }
496
497                 return pid;
498         }
499
500         if (stdinfd) {
501                 close(ifd[1]);
502                 dup2(ifd[0], STDIN_FILENO);
503         }
504
505         if (stdoutfd) {
506                 close(ofd[0]);
507                 dup2(ofd[1], STDOUT_FILENO);
508         }
509
510         if (stderrfd) {
511                 close(efd[0]);
512                 dup2(efd[1], STDERR_FILENO);
513         }
514
515         /* Now we have redirected standard streams to parent process */
516         execvp(cmd, argv);
517         pr_err("Failed to execv command %s: %m\n", cmd);
518         exit(1);
519
520         return 0;
521 }
522
523 /*
524  * Runs a command cmd with params argv, connects stdin and stdout to
525  * readfd and writefd
526  *
527  * Returns the pid of the executed process
528  */
529 int run_piped_stream(const char *cmd, char *const argv[],
530                      FILE **stdinf, FILE **stdoutf, FILE **stderrf)
531 {
532         int ifd, ofd, efd, pid;
533         int *i, *o, *e;
534
535         if (stdinf)
536                 i = &ifd;
537         else
538                 i = 0;
539         if (stdoutf)
540                 o = &ofd;
541         else
542                 o = 0;
543         if (stderrf)
544                 e = &efd;
545         else
546                 e = 0;
547
548         pid = run_piped(cmd, argv, i, o, e);
549
550         if (stdinf) {
551                 *stdinf = fdopen(ifd, "r");
552                 if (*stdinf == NULL) {
553                         pr_err("Error opening file stream for fd %d: %m\n",
554                                ifd);
555                         return -1;
556                 }
557         }
558
559         if (stdoutf) {
560                 *stdoutf = fdopen(ofd, "r");
561                 if (*stdoutf == NULL) {
562                         pr_err("Error opening file stream for fd %d: %m\n",
563                                ofd);
564                         return -1;
565                 }
566         }
567
568         if (stderrf) {
569                 *stderrf = fdopen(efd, "r");
570                 if (*stderrf == NULL) {
571                         pr_err("Error opening file stream for fd %d: %m\n",
572                                efd);
573                         return -1;
574                 }
575         }
576
577         return pid;
578 }
579
580 /*
581  * Forks a child and executes a command to run on parallel
582  */
583
584 #define max(a,b) (a) < (b) ? (b) : (a)
585 #define BUF_SIZE (128*1024)
586 int run(const char *cmd, char *const argv[])
587 {
588         int child, error;
589         int ofd, efd;
590         fd_set rfds;
591         int maxfd;
592         int eof = 0;
593
594         if ((child = do_fork()))
595             return child;
596
597         child = run_piped(cmd, argv, NULL, &ofd, &efd);
598
599         FD_ZERO(&rfds);
600         FD_SET(ofd, &rfds);
601         FD_SET(efd, &rfds);
602
603         while (!eof) {
604                 char *sptr , *eptr;
605                 char rbuf[BUF_SIZE];
606                 int bytes;
607                 int is_stderr = 0;
608
609                 maxfd = max(ofd, efd);
610                 error = select(maxfd, &rfds, NULL, NULL, NULL);
611
612                 if (error < 0) {
613                         pr_err("Error with select: %m\n");
614                         break;
615                 }
616
617                 if (FD_ISSET(ofd, &rfds)) {
618                         bytes = read(ofd, rbuf, BUF_SIZE);
619                         goto print;
620                 }
621
622                 if (FD_ISSET(efd, &rfds)) {
623                         is_stderr = 1;
624                         bytes = read(efd, rbuf, BUF_SIZE);
625                         goto print;
626                 }
627
628                 pr_err("select() returned unknown fd\n");
629                 break;
630
631 print:
632                 if (bytes < 0) {
633                         pr_err("read() failed: %m\n");
634                         break;
635                 }
636
637                 /*
638                  * Workaround: When a process had die and it has only
639                  * written to stderr, select() doesn't indicate that
640                  * there might be something to read in stderr fd. To
641                  * work around this issue, we try to read stderr just
642                  * in case in order to ensure everything gets read.
643                  */
644                 if (bytes == 0) {
645                         bytes = read(efd, rbuf, BUF_SIZE);
646                         is_stderr = 1;
647                         eof = 1;
648                 }
649
650                 sptr = eptr = rbuf;
651                 while (bytes--) {
652                         if (*eptr == '\n') {
653                                 *eptr = 0;
654                                 if (is_stderr)
655                                         pr_err("%s: stderr: %s\n",
656                                                 cmd, sptr);
657                                 else
658                                         pr_info("%s: stdout: %s\n",
659                                                 cmd, sptr);
660                                 sptr = eptr;
661                         }
662                         eptr++;
663                 }
664         }
665
666         close(ofd);
667         close(efd);
668
669         harvest_zombies(child);
670
671         exit(1);
672         return 0;
673 }
674
675 int register_event_handler(struct event_handler *handler)
676 {
677         struct epoll_event ev;
678         int ret;
679
680         if (handler->fd <= 0) {
681                 pr_err("Invalid file descriptor of %d\n", handler->fd);
682                 return -1;
683         }
684
685         if (!handler->handle_event) {
686                 pr_err("Handler callback missing\n");
687                 return -1;
688         }
689
690         pr_info("Registering handler for %s, fd %d\n",
691                 handler->name, handler->fd);
692
693         ev.data.fd = handler->fd;
694         ev.data.ptr = handler;
695         ev.events = handler->events;
696         ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handler->fd, &ev);
697         if (ret) {
698                 pr_err("Failed to add epoll_fd: %m\n");
699                 return -1;
700         }
701
702         return 0;
703 }
704
705 void _mutex_lock_acquired(struct mutex *lock, char *file, int line)
706 {
707         lock->line = line;
708         lock->file = file;
709 }
710
711 int _mutex_lock(struct mutex *lock, char *file, int line)
712 {
713         int ret = 0;
714
715         if (!pthread_mutex_trylock(&lock->lock))
716                 goto out_lock;
717
718         pr_info("Lock contention on lock %s on %s:%d\n",
719                 lock->name, lock->file, lock->line);
720
721         ret = pthread_mutex_lock(&lock->lock);
722         if (ret)
723                 pr_err("Acquirin lock %s failed: %m, acquired %s:%d\n",
724                         lock->name, lock->file, lock->line);
725
726 out_lock:
727         _mutex_lock_acquired(lock, file, line);
728         return ret;
729 }
730
731 int _mutex_unlock(struct mutex *lock)
732 {
733         lock->line = 0;
734         lock->file = NULL;
735         pthread_mutex_unlock(&lock->lock);
736
737         return 0;
738 }