]> git.itanic.dy.fi Git - rrdd/blob - process.c
19d8ded186e9c7976761ef9b8da63a9ebc7a8b15
[rrdd] / process.c
1 #include <unistd.h>
2 #include <fcntl.h>
3 #include <sys/select.h>
4 #include <sys/epoll.h>
5 #include <stdio.h>
6 #include <sys/wait.h>
7 #include <sys/signalfd.h>
8 #include <sys/resource.h>
9
10 #include "process.h"
11 #include "debug.h"
12
13 static int child_count;
14 static int parent_count;
15 static int job_request_fd[2];
16 static int job_get_permission_fd[2];
17 static int epoll_fd;
18 static unsigned int max_jobs;
19 static unsigned int job_count;
20 static unsigned int jobs_pending;
21 static unsigned int max_jobs_pending;
22
23 struct work_struct {
24         const char *name;
25         int (*work_fn)(void *);
26         void *arg;
27         struct work_struct *next;
28 };
29
30 struct work_queue {
31         struct work_struct *work;
32         int length;
33         char *name;
34         struct mutex lock;
35 };
36
37 struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
38         {
39                 .name = "high priority",
40                 .lock = {
41                         .name = "high_prio_queue",
42                         .lock = PTHREAD_MUTEX_INITIALIZER,
43                 },
44         },
45         {
46                 .name = "low priority",
47                 .lock = {
48                         .name = "low_prio_queue",
49                         .lock = PTHREAD_MUTEX_INITIALIZER,
50                 },
51         },
52 };
53
54 struct mutex work_pending_mutex = {
55         .name = "work_pending",
56         .lock = PTHREAD_MUTEX_INITIALIZER,
57 };
58 pthread_cond_t work_pending_cond = PTHREAD_COND_INITIALIZER;
59
60 static int run_work_on_queue(struct work_queue *queue)
61 {
62         struct work_struct *work;
63
64         mutex_lock(&queue->lock);
65
66         if (!queue->work) {
67                 mutex_unlock(&queue->lock);
68                 return 0;
69         }
70
71         /* Take next work */
72         work = queue->work;
73         queue->work = work->next;
74         queue->length--;
75
76         /*
77          * If queue is not empty, try waking up more workers. It is
78          * possible that when work were queued, the first worker did
79          * not wake up soon enough and
80          */
81         if (queue->length > 0)
82                 pthread_cond_signal(&work_pending_cond);
83
84         mutex_unlock(&queue->lock);
85
86         pr_info("Executing work %s from queue %s, %d still pending\n",
87                 work->name, queue->name, queue->length);
88
89         work->work_fn(work->arg);
90         pr_info("Work %s done\n", work->name);
91         free(work);
92
93         return 1;
94 }
95
96 static void *worker_thread(void *arg)
97 {
98         int ret;
99
100         char name[16];
101
102         snprintf(name, sizeof(name), "worker%ld", (long)arg);
103         pthread_setname_np(pthread_self(), name);
104
105         while (1) {
106                 while (1) {
107                         int prio, work_done = 0;
108
109                         /*
110                          * Execute as many works from the queues as
111                          * there are, starting from highest priority
112                          * queue
113                          */
114                         for (prio = 0; prio < WORK_PRIORITIES_NUM; prio++) {
115                                 work_done =
116                                         run_work_on_queue(&work_queues[prio]);
117                                 if (work_done)
118                                         break;
119                         }
120
121                         if (!work_done)
122                                 break;
123                 }
124
125                 pr_info("Worker going to sleep\n");
126                 ret = pthread_cond_wait(&work_pending_cond,
127                                         &work_pending_mutex.lock);
128                 if (ret < 0)
129                         pr_err("Error: %m\n");
130
131                 mutex_lock_acquired(&work_pending_mutex);
132
133                 mutex_unlock(&work_pending_mutex);
134
135         }
136
137         return NULL;
138 }
139
140 int queue_work(unsigned int priority, char *name,
141         int (work_fn)(void *arg), void *arg)
142 {
143         struct work_queue *queue;
144         struct work_struct *work, *last_work;
145
146         if (priority >= WORK_PRIORITIES_NUM) {
147                 pr_err("Invalid priority: %d\n", priority);
148                 return -EINVAL;
149         }
150
151         work = calloc(sizeof(*work), 1);
152
153         work->name = name;
154         work->work_fn = work_fn;
155         work->arg = arg;
156
157         queue = &work_queues[priority];
158
159         /* Insert new work at the end of the work queue */
160         mutex_lock(&queue->lock);
161
162         last_work = queue->work;
163         while (last_work && last_work->next)
164                 last_work = last_work->next;
165
166         if (!last_work)
167                 queue->work = work;
168         else
169                 last_work->next = work;
170
171         pr_info("Inserted work %s in queue %s, with %d pending items\n",
172                 work->name, queue->name, queue->length);
173         queue->length++;
174         mutex_unlock(&queue->lock);
175
176         pthread_cond_signal(&work_pending_cond);
177
178         return 0;
179 }
180
181 int get_child_count(void)
182 {
183         return child_count;
184 }
185
186 int get_parent_count(void)
187 {
188         return parent_count;
189 }
190
191 static int grant_new_job(void)
192 {
193         int ret;
194         char byte = 0;
195
196         job_count++;
197         pr_info("Granting new job. %d jobs currently and %d pending\n",
198                 job_count, jobs_pending);
199
200         ret = write(job_get_permission_fd[1], &byte, 1);
201         if (ret != 1) {
202                 pr_err("Failed to write 1 byte: %m\n");
203                 return -1;
204         }
205
206         return 0;
207 }
208
209 static int deny_job(void)
210 {
211         int ret;
212         char byte = -1;
213
214         pr_info("Denying new job. %d jobs currently and %d pending, "
215                 "limit of pending jobs is %d\n",
216                 job_count, jobs_pending, max_jobs_pending);
217
218         ret = write(job_get_permission_fd[1], &byte, 1);
219         if (ret != 1) {
220                 pr_err("Failed to write 1 byte: %m\n");
221                 return -1;
222         }
223
224         return 0;
225 }
226
227 static int handle_job_request(struct event_handler *h)
228 {
229         int ret, pid;
230
231         ret = read(job_request_fd[0], &pid, sizeof(pid));
232         if (ret < 0) {
233                 pr_err("Failed to read: %m\n");
234                 return -1;
235         }
236
237         if (ret == 0) {
238                 pr_info("Read zero bytes\n");
239                 return 0;
240         }
241
242         if (pid > 0) {
243                 if (job_count >= max_jobs) {
244                         if (jobs_pending < max_jobs_pending)
245                                 jobs_pending++;
246                         else
247                                 deny_job();
248                 } else {
249                         ret = grant_new_job();
250                         return 0;
251                 }
252         } else if (pid < 0) {
253                 if (job_count > max_jobs)
254                         pr_err("BUG: Job %u jobs exceeded limit %u\n",
255                                 job_count, max_jobs);
256
257                 pr_info("Job %d finished\n", -pid);
258                 job_count--;
259                 if (jobs_pending) {
260                         jobs_pending--;
261                         ret = grant_new_job();
262                         return 0;
263                 }
264         }
265
266         return 0;
267 }
268
269 struct event_handler job_request_handler = {
270         .handle_event = handle_job_request,
271         .events = EPOLLIN,
272         .name = "job_request",
273 };
274
275 /*
276  * Initialize the jobcontrol.
277  *
278  * Create the pipes that are used to grant children execution
279  * permissions. If max_jobs is zero, count the number of CPUs from
280  * /proc/cpuinfo and use that.
281  */
282 int init_jobcontrol(int max_jobs_requested)
283 {
284         FILE *file;
285         int ret;
286         sigset_t sigmask;
287         char buf[256];
288         char match[8];
289         pthread_t *thread;
290         long int i;
291
292         if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
293                 pr_err("Failed to create pipe: %m\n");
294                 return -1;
295         }
296
297         if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
298                 pr_err("Failed to create pipe: %m\n");
299                 return -1;
300         }
301
302         epoll_fd = epoll_create(1);
303         if (epoll_fd == -1) {
304                 pr_err("Failed to epoll_create(): %m\n");
305                 return -1;
306         }
307
308         job_request_handler.fd = job_request_fd[0];
309         register_event_handler(&job_request_handler);
310
311         sigemptyset(&sigmask);
312         sigaddset(&sigmask, SIGCHLD);
313
314         if (max_jobs_requested > 0) {
315                 max_jobs = max_jobs_requested;
316                 goto no_count_cpus;
317         }
318         max_jobs++;
319
320         file = fopen("/proc/cpuinfo", "ro");
321         if (!file) {
322                 pr_err("Failed to open /proc/cpuinfo: %m\n");
323                 goto open_fail;
324         }
325
326         /*
327          * The CPU count algorithm simply reads the first 8 bytes from
328          * the /proc/cpuinfo and then expects that line to be there as
329          * many times as there are CPUs.
330          */
331         ret = fread(match, 1, sizeof(match), file);
332         if (ret < sizeof(match)) {
333                 pr_err("read %d bytes when expecting %zd %m\n",
334                         ret, sizeof(match));
335                 goto read_fail;
336         }
337
338         while(fgets(buf, sizeof(buf), file)) {
339                 if (!strncmp(buf, match, sizeof(match)))
340                         max_jobs++;
341         }
342
343 open_fail:
344 read_fail:
345         fclose(file);
346
347 no_count_cpus:
348         pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
349
350         max_jobs_pending = max_jobs * 10 + 25;
351         pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
352
353         /* Create worker threads */
354         thread = calloc(sizeof(*thread), max_jobs);
355         for (i = 0; i < max_jobs; i++)
356                 pthread_create(&thread[i], NULL, worker_thread, (void *)i);
357
358         return 0;
359 }
360
361 int poll_job_requests(int timeout)
362 {
363         struct epoll_event event;
364         struct event_handler *job_handler;
365         int ret;
366
367         /* Convert positive seconds to milliseconds */
368         timeout = timeout > 0 ? 1000 * timeout : timeout;
369
370         ret = epoll_wait(epoll_fd, &event, 1, timeout);
371
372         if (ret == -1) {
373                 if (errno != EINTR) {
374                         pr_err("epoll_wait: %m\n");
375                         return -1;
376                 }
377
378                 /*
379                  * If epoll_wait() was interrupted, better start
380                  * everything again from the beginning
381                  */
382                 return 0;
383         }
384
385         if (ret == 0) {
386                 pr_info("Timed out\n");
387                 goto out;
388         }
389
390         job_handler = event.data.ptr;
391
392         if (!job_handler || !job_handler->handle_event) {
393                 pr_err("Corrupted event handler for fd %d\n",
394                         event.data.fd);
395                 goto out;
396         }
397
398         pr_debug("Running handler %s to handle events from fd %d\n",
399                 job_handler->name, job_handler->fd);
400         job_handler->handle_event(job_handler);
401
402 out:
403         pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
404         return ret;
405 }
406
407 /*
408  * Per process flag indicating whether this child has requested fork
409  * limiting. If it has, it must also tell the master parent when it
410  * has died so that the parent can give next pending job permission to
411  * go.
412  */
413 static int is_limited_fork;
414
415 int do_fork(void)
416 {
417         int child;
418         child = fork();
419         if (child < 0) {
420                 pr_err("fork() failed: %m\n");
421                 return -1;
422         }
423
424         if (child) {
425                 child_count++;
426                 pr_debug("Fork %d, child %d\n", child_count, child);
427                 return child;
428         }
429
430         /*
431          * Also do not notify the master parent the death of this
432          * child. Only childs that have been created with
433          * do_fork_limited() can have this flag set.
434          */
435         is_limited_fork = 0;
436
437         /*
438          * Close unused ends of the job control pipes. Only the parent
439          * which controls the jobs may have the write end open of the
440          * job_get_permission_fd and the read end of the
441          * job_request_fd. Failing to close the pipe ends properly
442          * will cause the childs to wait forever for the run
443          * permission in case parent dies prematurely.
444          *
445          * Note! The file descriptor must be closed once and only
446          * once. They are marked to -1 to make it impossible for
447          * subseqent do_fork() calls from closing them again (in which
448          * case some other file descriptor might already be reserved
449          * for the same number) and prevent accidentally closing some
450          * innocent file descriptors that are still in use.
451          */
452         if (job_get_permission_fd[1] >= 0) {
453                 close(job_get_permission_fd[1]);
454                 job_get_permission_fd[1] = -1;
455         }
456         if (job_request_fd[0] >= 0) {
457                 close(job_request_fd[0]);
458                 job_request_fd[0] = -1;
459         }
460
461         /* reset child's child count */
462         child_count = 0;
463         parent_count++;
464         return 0;
465 }
466
467 static int request_fork(int request)
468 {
469         int pid = getpid();
470
471         pid = request > 0 ? pid : -pid;
472
473         return write(job_request_fd[1], &pid, sizeof(pid));
474 }
475
476 static void limited_fork_exit_handler(void)
477 {
478         if (is_limited_fork)
479                 request_fork(-1);
480 }
481
482 /*
483  * Like do_fork(), but allow the child continue only after the global
484  * job count is low enough.
485  *
486  * We allow the parent to continue other more important activities but
487  * child respects the limit of global active processes.
488  */
489 int do_fork_limited(void)
490 {
491         int child, ret;
492         char byte;
493
494         child = do_fork();
495         if (child)
496                 return child;
497
498         /* Remember to notify the parent when we are done */
499         atexit(limited_fork_exit_handler);
500         is_limited_fork = 1;
501
502         pr_debug("Requesting permission to go\n");
503
504         /* Signal the parent that we are here, waiting to go */
505         request_fork(1);
506
507         /*
508          * The parent will tell us when we can continue. If there were
509          * multiple children waiting for their turn to run only one
510          * will be reading the content byte from the pipe and getting
511          * the permission to run.
512          */
513         ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
514         if (ret == 0)
515                 pr_err("Error requesting run, did the parent die?\n");
516
517         if (ret < 0)
518                 pr_err("Job control request failure: %m\n");
519
520         if (byte < 0) {
521                 pr_info("Did not get permission to execute. Terminating\n");
522
523                 /*
524                  * Avoid running exit handler, that would tell the
525                  * parent we died normally and decrement the job
526                  * counters.
527                  */
528                 raise(SIGKILL);
529         }
530
531         pr_debug("Continuing\n");
532         return child;
533 }
534
535 int harvest_zombies(int pid)
536 {
537         int status;
538         struct rusage rusage;
539         char *status_str = NULL;
540         int code = 0;
541
542         if (child_count == 0)
543                 return 0;
544
545         if (pid)
546                 pr_debug("Waiting on pid %d, children left: %d\n", pid,
547                         child_count);
548
549         do {
550                 pid = wait4(pid, &status, 0, &rusage);
551                 if (pid < 0) {
552                         pr_err("Error on waitid(): %m\n");
553                         return 0;
554                 }
555                 /* Wait until the child has become a zombie */
556         } while (!WIFEXITED(status) && !WIFSIGNALED(status));
557
558         child_count--;
559         if (WIFEXITED(status)) {
560                 status_str = "exited with status";
561                 code = WEXITSTATUS(status);
562         } else if (WIFSIGNALED(status)) {
563                 status_str = "killed by signal";
564                 code = WTERMSIG(status);
565         }
566         pr_debug("pid %d: %s %d. Children left: %d\n", pid,
567                 status_str, code, child_count);
568         pr_debug("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
569                 (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
570                 (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
571
572         return 1;
573 }
574
575 /*
576  * Runs a command cmd with params argv, connects stdin and stdout to
577  * readfd and writefd
578  *
579  * Returns the pid of the executed process
580  */
581 int run_piped(const char *cmd, char *const argv[],
582               int *stdinfd, int *stdoutfd, int *stderrfd)
583 {
584         int ifd[2], ofd[2], efd[2], pid;
585
586         pr_info("Running command %s\n", cmd);
587
588         if (stdinfd && pipe(ifd)) {
589                 pr_err("pipe() failed: %m\n");
590                 return -1;
591         }
592
593         if (stdoutfd && pipe(ofd)) {
594                 pr_err("pipe() failed: %m\n");
595                 return -1;
596         }
597
598         if (stderrfd && pipe(efd)) {
599                 pr_err("pipe() failed: %m\n");
600                 return -1;
601         }
602
603         pid = do_fork();
604         if (pid) { /* Parent side */
605                 if (stdinfd) {
606                         close(ifd[0]);
607                         *stdinfd = ifd[0];
608                 }
609
610                 if (stdoutfd) {
611                         close(ofd[1]);
612                         *stdoutfd = ofd[0];
613                 }
614
615                 if (stderrfd) {
616                         close(efd[1]);
617                         *stderrfd = efd[0];
618                 }
619
620                 return pid;
621         }
622
623         if (stdinfd) {
624                 close(ifd[1]);
625                 dup2(ifd[0], STDIN_FILENO);
626         }
627
628         if (stdoutfd) {
629                 close(ofd[0]);
630                 dup2(ofd[1], STDOUT_FILENO);
631         }
632
633         if (stderrfd) {
634                 close(efd[0]);
635                 dup2(efd[1], STDERR_FILENO);
636         }
637
638         /* Now we have redirected standard streams to parent process */
639         execvp(cmd, argv);
640         pr_err("Failed to execv command %s: %m\n", cmd);
641         exit(1);
642
643         return 0;
644 }
645
646 /*
647  * Runs a command cmd with params argv, connects stdin and stdout to
648  * readfd and writefd
649  *
650  * Returns the pid of the executed process
651  */
652 int run_piped_stream(const char *cmd, char *const argv[],
653                      FILE **stdinf, FILE **stdoutf, FILE **stderrf)
654 {
655         int ifd, ofd, efd, pid;
656         int *i, *o, *e;
657
658         if (stdinf)
659                 i = &ifd;
660         else
661                 i = 0;
662         if (stdoutf)
663                 o = &ofd;
664         else
665                 o = 0;
666         if (stderrf)
667                 e = &efd;
668         else
669                 e = 0;
670
671         pid = run_piped(cmd, argv, i, o, e);
672
673         if (stdinf) {
674                 *stdinf = fdopen(ifd, "r");
675                 if (*stdinf == NULL) {
676                         pr_err("Error opening file stream for fd %d: %m\n",
677                                ifd);
678                         return -1;
679                 }
680         }
681
682         if (stdoutf) {
683                 *stdoutf = fdopen(ofd, "r");
684                 if (*stdoutf == NULL) {
685                         pr_err("Error opening file stream for fd %d: %m\n",
686                                ofd);
687                         return -1;
688                 }
689         }
690
691         if (stderrf) {
692                 *stderrf = fdopen(efd, "r");
693                 if (*stderrf == NULL) {
694                         pr_err("Error opening file stream for fd %d: %m\n",
695                                efd);
696                         return -1;
697                 }
698         }
699
700         return pid;
701 }
702
703 /*
704  * Forks a child and executes a command to run on parallel
705  */
706
707 #define max(a,b) (a) < (b) ? (b) : (a)
708 #define BUF_SIZE (128*1024)
709 int run(const char *cmd, char *const argv[])
710 {
711         int child, error;
712         int ofd, efd;
713         fd_set rfds;
714         int maxfd;
715         int eof = 0;
716
717         child = run_piped(cmd, argv, NULL, &ofd, &efd);
718
719         FD_ZERO(&rfds);
720         FD_SET(ofd, &rfds);
721         FD_SET(efd, &rfds);
722
723         while (!eof) {
724                 char *sptr , *eptr;
725                 char rbuf[BUF_SIZE];
726                 int bytes;
727                 int is_stderr = 0;
728
729                 maxfd = max(ofd, efd);
730                 error = select(maxfd, &rfds, NULL, NULL, NULL);
731
732                 if (error < 0) {
733                         pr_err("Error with select: %m\n");
734                         break;
735                 }
736
737                 if (FD_ISSET(ofd, &rfds)) {
738                         bytes = read(ofd, rbuf, BUF_SIZE);
739                         goto print;
740                 }
741
742                 if (FD_ISSET(efd, &rfds)) {
743                         is_stderr = 1;
744                         bytes = read(efd, rbuf, BUF_SIZE);
745                         goto print;
746                 }
747
748                 pr_err("select() returned unknown fd\n");
749                 break;
750
751 print:
752                 if (bytes < 0) {
753                         pr_err("read() failed: %m\n");
754                         break;
755                 }
756
757                 /*
758                  * Workaround: When a process had die and it has only
759                  * written to stderr, select() doesn't indicate that
760                  * there might be something to read in stderr fd. To
761                  * work around this issue, we try to read stderr just
762                  * in case in order to ensure everything gets read.
763                  */
764                 if (bytes == 0) {
765                         bytes = read(efd, rbuf, BUF_SIZE);
766                         is_stderr = 1;
767                         eof = 1;
768                 }
769
770                 sptr = eptr = rbuf;
771                 while (bytes--) {
772                         if (*eptr == '\n') {
773                                 *eptr = 0;
774                                 if (is_stderr)
775                                         pr_err("%s: stderr: %s\n",
776                                                 cmd, sptr);
777                                 else
778                                         pr_info("%s: stdout: %s\n",
779                                                 cmd, sptr);
780                                 sptr = eptr + 1;
781                         }
782                         eptr++;
783                 }
784         }
785
786         close(ofd);
787         close(efd);
788
789         harvest_zombies(child);
790
791         return 0;
792 }
793
794 int register_event_handler(struct event_handler *handler)
795 {
796         struct epoll_event ev;
797         int ret;
798
799         if (handler->fd <= 0) {
800                 pr_err("Invalid file descriptor of %d\n", handler->fd);
801                 return -1;
802         }
803
804         if (!handler->handle_event) {
805                 pr_err("Handler callback missing\n");
806                 return -1;
807         }
808
809         pr_info("Registering handler for %s, fd %d\n",
810                 handler->name, handler->fd);
811
812         ev.data.fd = handler->fd;
813         ev.data.ptr = handler;
814         ev.events = handler->events;
815         ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handler->fd, &ev);
816         if (ret) {
817                 pr_err("Failed to add epoll_fd: %m\n");
818                 return -1;
819         }
820
821         return 0;
822 }
823
824 void _mutex_lock_acquired(struct mutex *lock, char *file, int line)
825 {
826         lock->line = line;
827         lock->file = file;
828 }
829
830 int _mutex_lock(struct mutex *lock, char *file, int line)
831 {
832         int ret = 0;
833
834         if (!pthread_mutex_trylock(&lock->lock))
835                 goto out_lock;
836
837         pr_info("Lock contention on lock %s on %s:%d\n",
838                 lock->name, lock->file, lock->line);
839
840         ret = pthread_mutex_lock(&lock->lock);
841         if (ret)
842                 pr_err("Acquirin lock %s failed: %m, acquired %s:%d\n",
843                         lock->name, lock->file, lock->line);
844
845 out_lock:
846         _mutex_lock_acquired(lock, file, line);
847         return ret;
848 }
849
850 int _mutex_unlock(struct mutex *lock)
851 {
852         lock->line = 0;
853         lock->file = NULL;
854         pthread_mutex_unlock(&lock->lock);
855
856         return 0;
857 }