]> git.itanic.dy.fi Git - rrdd/blob - process.c
init_job_control: Suppress compiler warning
[rrdd] / process.c
1 #include <unistd.h>
2 #include <fcntl.h>
3 #include <sys/select.h>
4 #include <sys/epoll.h>
5 #include <stdio.h>
6 #include <sys/wait.h>
7 #include <sys/signalfd.h>
8 #include <sys/resource.h>
9
10 #include "process.h"
11 #include "debug.h"
12
13 static int child_count;
14 static int parent_count;
15 static int job_request_fd[2];
16 static int job_get_permission_fd[2];
17 static int epoll_fd;
18 static unsigned int max_jobs;
19 static unsigned int job_count;
20 static unsigned int jobs_pending;
21 static unsigned int max_jobs_pending;
22
23 struct work_struct {
24         const char *name;
25         int (*work_fn)(void *);
26         void *arg;
27         struct work_struct *next;
28 };
29
30 struct work_queue {
31         struct work_struct *work;
32         int length;
33         char *name;
34         struct mutex lock;
35 };
36
37 struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
38         {
39                 .name = "high priority",
40                 .lock = {
41                         .name = "high_prio_queue",
42                         .lock = PTHREAD_MUTEX_INITIALIZER,
43                 },
44         },
45         {
46                 .name = "low priority",
47                 .lock = {
48                         .name = "low_prio_queue",
49                         .lock = PTHREAD_MUTEX_INITIALIZER,
50                 },
51         },
52 };
53
54 struct mutex work_pending_mutex = {
55         .name = "work_pending",
56         .lock = PTHREAD_MUTEX_INITIALIZER,
57 };
58 pthread_cond_t work_pending_cond = PTHREAD_COND_INITIALIZER;
59
60 static int run_work_on_queue(struct work_queue *queue)
61 {
62         struct work_struct *work;
63
64         mutex_lock(&queue->lock);
65
66         if (!queue->work) {
67                 pr_info("No  work to run on queue %s\n", queue->name);
68                 mutex_unlock(&queue->lock);
69                 return 0;
70         }
71
72         /* Take next work */
73         work = queue->work;
74         queue->work = work->next;
75         queue->length--;
76
77         /*
78          * If queue is not empty, try waking up more workers. It is
79          * possible that when work were queued, the first worker did
80          * not wake up soon enough and
81          */
82         if (queue->length > 0)
83                 pthread_cond_signal(&work_pending_cond);
84
85         mutex_unlock(&queue->lock);
86
87         pr_info("Executing work %s from queue %s, %d still pending\n",
88                 work->name, queue->name, queue->length);
89
90         work->work_fn(work->arg);
91         pr_info("Work %s done\n", work->name);
92         free(work);
93
94         return 1;
95 }
96
97 static void *worker_thread(void *arg)
98 {
99         int ret;
100
101         char name[16];
102
103         snprintf(name, sizeof(name), "worker%ld", (long)arg);
104         pthread_setname_np(pthread_self(), name);
105
106         while (1) {
107                 while (1) {
108                         int prio, work_done = 0;
109
110                         /*
111                          * Execute as many works from the queues as
112                          * there are, starting from highest priority
113                          * queue
114                          */
115                         for (prio = 0; prio < WORK_PRIORITIES_NUM; prio++) {
116                                 work_done =
117                                         run_work_on_queue(&work_queues[prio]);
118                                 if (work_done)
119                                         break;
120                         }
121
122                         if (!work_done)
123                                 break;
124                 }
125
126                 pr_info("Worker going to sleep\n");
127                 ret = pthread_cond_wait(&work_pending_cond,
128                                         &work_pending_mutex.lock);
129                 if (ret < 0)
130                         pr_err("Error: %m\n");
131
132                 mutex_lock_acquired(&work_pending_mutex);
133
134                 mutex_unlock(&work_pending_mutex);
135
136         }
137
138         return NULL;
139 }
140
141 int queue_work(unsigned int priority, char *name,
142         int (work_fn)(void *arg), void *arg)
143 {
144         struct work_queue *queue;
145         struct work_struct *work, *last_work;
146
147         if (priority >= WORK_PRIORITIES_NUM) {
148                 pr_err("Invalid priority: %d\n", priority);
149                 return -EINVAL;
150         }
151
152         work = calloc(sizeof(*work), 1);
153
154         work->name = name;
155         work->work_fn = work_fn;
156         work->arg = arg;
157
158         queue = &work_queues[priority];
159
160         /* Insert new work at the end of the work queue */
161         mutex_lock(&queue->lock);
162
163         last_work = queue->work;
164         while (last_work && last_work->next)
165                 last_work = last_work->next;
166
167         if (!last_work)
168                 queue->work = work;
169         else
170                 last_work->next = work;
171
172         pr_info("Inserted work %s in queue %s, with %d pending items\n",
173                 work->name, queue->name, queue->length);
174         queue->length++;
175         mutex_unlock(&queue->lock);
176
177         pthread_cond_signal(&work_pending_cond);
178
179         return 0;
180 }
181
182 int get_child_count(void)
183 {
184         return child_count;
185 }
186
187 int get_parent_count(void)
188 {
189         return parent_count;
190 }
191
192 static int grant_new_job(void)
193 {
194         int ret;
195         char byte = 0;
196
197         job_count++;
198         pr_info("Granting new job. %d jobs currently and %d pending\n",
199                 job_count, jobs_pending);
200
201         ret = write(job_get_permission_fd[1], &byte, 1);
202         if (ret != 1) {
203                 pr_err("Failed to write 1 byte: %m\n");
204                 return -1;
205         }
206
207         return 0;
208 }
209
210 static int deny_job(void)
211 {
212         int ret;
213         char byte = -1;
214
215         pr_info("Denying new job. %d jobs currently and %d pending, "
216                 "limit of pending jobs is %d\n",
217                 job_count, jobs_pending, max_jobs_pending);
218
219         ret = write(job_get_permission_fd[1], &byte, 1);
220         if (ret != 1) {
221                 pr_err("Failed to write 1 byte: %m\n");
222                 return -1;
223         }
224
225         return 0;
226 }
227
228 static int handle_job_request(struct event_handler *h)
229 {
230         int ret, pid;
231
232         ret = read(job_request_fd[0], &pid, sizeof(pid));
233         if (ret < 0) {
234                 pr_err("Failed to read: %m\n");
235                 return -1;
236         }
237
238         if (ret == 0) {
239                 pr_info("Read zero bytes\n");
240                 return 0;
241         }
242
243         if (pid > 0) {
244                 if (job_count >= max_jobs) {
245                         if (jobs_pending < max_jobs_pending)
246                                 jobs_pending++;
247                         else
248                                 deny_job();
249                 } else {
250                         ret = grant_new_job();
251                         return 0;
252                 }
253         } else if (pid < 0) {
254                 if (job_count > max_jobs)
255                         pr_err("BUG: Job %u jobs exceeded limit %u\n",
256                                 job_count, max_jobs);
257
258                 pr_info("Job %d finished\n", -pid);
259                 job_count--;
260                 if (jobs_pending) {
261                         jobs_pending--;
262                         ret = grant_new_job();
263                         return 0;
264                 }
265         }
266
267         return 0;
268 }
269
270 struct event_handler job_request_handler = {
271         .handle_event = handle_job_request,
272         .events = EPOLLIN,
273         .name = "job_request",
274 };
275
276 /*
277  * Initialize the jobcontrol.
278  *
279  * Create the pipes that are used to grant children execution
280  * permissions. If max_jobs is zero, count the number of CPUs from
281  * /proc/cpuinfo and use that.
282  */
283 int init_jobcontrol(int max_jobs_requested)
284 {
285         FILE *file;
286         int ret;
287         sigset_t sigmask;
288         char buf[256];
289         char match[8];
290         pthread_t *thread;
291         long int i;
292
293         if (pipe2(job_request_fd, O_NONBLOCK | O_CLOEXEC)) {
294                 pr_err("Failed to create pipe: %m\n");
295                 return -1;
296         }
297
298         if (pipe2(job_get_permission_fd, O_CLOEXEC)) {
299                 pr_err("Failed to create pipe: %m\n");
300                 return -1;
301         }
302
303         epoll_fd = epoll_create(1);
304         if (epoll_fd == -1) {
305                 pr_err("Failed to epoll_create(): %m\n");
306                 return -1;
307         }
308
309         job_request_handler.fd = job_request_fd[0];
310         register_event_handler(&job_request_handler);
311
312         sigemptyset(&sigmask);
313         sigaddset(&sigmask, SIGCHLD);
314
315         if (max_jobs_requested > 0) {
316                 max_jobs = max_jobs_requested;
317                 goto no_count_cpus;
318         }
319         max_jobs++;
320
321         file = fopen("/proc/cpuinfo", "ro");
322         if (!file) {
323                 pr_err("Failed to open /proc/cpuinfo: %m\n");
324                 goto open_fail;
325         }
326
327         /*
328          * The CPU count algorithm simply reads the first 8 bytes from
329          * the /proc/cpuinfo and then expects that line to be there as
330          * many times as there are CPUs.
331          */
332         ret = fread(match, 1, sizeof(match), file);
333         if (ret < sizeof(match)) {
334                 pr_err("read %d bytes when expecting %zd %m\n",
335                         ret, sizeof(match));
336                 goto read_fail;
337         }
338
339         while(fgets(buf, sizeof(buf), file)) {
340                 if (!strncmp(buf, match, sizeof(match)))
341                         max_jobs++;
342         }
343
344 open_fail:
345 read_fail:
346         fclose(file);
347
348 no_count_cpus:
349         pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
350
351         max_jobs_pending = max_jobs * 10 + 25;
352         pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
353
354         /* Create worker threads */
355         thread = calloc(sizeof(*thread), max_jobs);
356         for (i = 0; i < max_jobs; i++)
357                 pthread_create(&thread[i], NULL, worker_thread, (void *)i);
358
359         return 0;
360 }
361
362 int poll_job_requests(int timeout)
363 {
364         struct epoll_event event;
365         struct event_handler *job_handler;
366         int ret;
367
368         /* Convert positive seconds to milliseconds */
369         timeout = timeout > 0 ? 1000 * timeout : timeout;
370
371         ret = epoll_wait(epoll_fd, &event, 1, timeout);
372
373         if (ret == -1) {
374                 if (errno != EINTR) {
375                         pr_err("epoll_wait: %m\n");
376                         return -1;
377                 }
378
379                 /*
380                  * If epoll_wait() was interrupted, better start
381                  * everything again from the beginning
382                  */
383                 return 0;
384         }
385
386         if (ret == 0) {
387                 pr_info("Timed out\n");
388                 goto out;
389         }
390
391         job_handler = event.data.ptr;
392
393         if (!job_handler || !job_handler->handle_event) {
394                 pr_err("Corrupted event handler for fd %d\n",
395                         event.data.fd);
396                 goto out;
397         }
398
399         pr_debug("Running handler %s to handle events from fd %d\n",
400                 job_handler->name, job_handler->fd);
401         job_handler->handle_event(job_handler);
402
403 out:
404         pr_info("Jobs active: %u, pending: %u\n", job_count, jobs_pending);
405         return ret;
406 }
407
408 /*
409  * Per process flag indicating whether this child has requested fork
410  * limiting. If it has, it must also tell the master parent when it
411  * has died so that the parent can give next pending job permission to
412  * go.
413  */
414 static int is_limited_fork;
415
416 int do_fork(void)
417 {
418         int child;
419         child = fork();
420         if (child < 0) {
421                 pr_err("fork() failed: %m\n");
422                 return -1;
423         }
424
425         if (child) {
426                 child_count++;
427                 pr_debug("Fork %d, child %d\n", child_count, child);
428                 return child;
429         }
430
431         /*
432          * Also do not notify the master parent the death of this
433          * child. Only childs that have been created with
434          * do_fork_limited() can have this flag set.
435          */
436         is_limited_fork = 0;
437
438         /*
439          * Close unused ends of the job control pipes. Only the parent
440          * which controls the jobs may have the write end open of the
441          * job_get_permission_fd and the read end of the
442          * job_request_fd. Failing to close the pipe ends properly
443          * will cause the childs to wait forever for the run
444          * permission in case parent dies prematurely.
445          *
446          * Note! The file descriptor must be closed once and only
447          * once. They are marked to -1 to make it impossible for
448          * subseqent do_fork() calls from closing them again (in which
449          * case some other file descriptor might already be reserved
450          * for the same number) and prevent accidentally closing some
451          * innocent file descriptors that are still in use.
452          */
453         if (job_get_permission_fd[1] >= 0) {
454                 close(job_get_permission_fd[1]);
455                 job_get_permission_fd[1] = -1;
456         }
457         if (job_request_fd[0] >= 0) {
458                 close(job_request_fd[0]);
459                 job_request_fd[0] = -1;
460         }
461
462         /* reset child's child count */
463         child_count = 0;
464         parent_count++;
465         return 0;
466 }
467
468 static int request_fork(int request)
469 {
470         int pid = getpid();
471
472         pid = request > 0 ? pid : -pid;
473
474         return write(job_request_fd[1], &pid, sizeof(pid));
475 }
476
477 static void limited_fork_exit_handler(void)
478 {
479         if (is_limited_fork)
480                 request_fork(-1);
481 }
482
483 /*
484  * Like do_fork(), but allow the child continue only after the global
485  * job count is low enough.
486  *
487  * We allow the parent to continue other more important activities but
488  * child respects the limit of global active processes.
489  */
490 int do_fork_limited(void)
491 {
492         int child, ret;
493         char byte;
494
495         child = do_fork();
496         if (child)
497                 return child;
498
499         /* Remember to notify the parent when we are done */
500         atexit(limited_fork_exit_handler);
501         is_limited_fork = 1;
502
503         pr_debug("Requesting permission to go\n");
504
505         /* Signal the parent that we are here, waiting to go */
506         request_fork(1);
507
508         /*
509          * The parent will tell us when we can continue. If there were
510          * multiple children waiting for their turn to run only one
511          * will be reading the content byte from the pipe and getting
512          * the permission to run.
513          */
514         ret = read(job_get_permission_fd[0], &byte, sizeof(byte));
515         if (ret == 0)
516                 pr_err("Error requesting run, did the parent die?\n");
517
518         if (ret < 0)
519                 pr_err("Job control request failure: %m\n");
520
521         if (byte < 0) {
522                 pr_info("Did not get permission to execute. Terminating\n");
523
524                 /*
525                  * Avoid running exit handler, that would tell the
526                  * parent we died normally and decrement the job
527                  * counters.
528                  */
529                 raise(SIGKILL);
530         }
531
532         pr_debug("Continuing\n");
533         return child;
534 }
535
536 int harvest_zombies(int pid)
537 {
538         int status;
539         struct rusage rusage;
540         char *status_str = NULL;
541         int code = 0;
542
543         if (child_count == 0)
544                 return 0;
545
546         if (pid)
547                 pr_debug("Waiting on pid %d, children left: %d\n", pid,
548                         child_count);
549
550         do {
551                 pid = wait4(pid, &status, 0, &rusage);
552                 if (pid < 0) {
553                         pr_err("Error on waitid(): %m\n");
554                         return 0;
555                 }
556                 /* Wait until the child has become a zombie */
557         } while (!WIFEXITED(status) && !WIFSIGNALED(status));
558
559         child_count--;
560         if (WIFEXITED(status)) {
561                 status_str = "exited with status";
562                 code = WEXITSTATUS(status);
563         } else if (WIFSIGNALED(status)) {
564                 status_str = "killed by signal";
565                 code = WTERMSIG(status);
566         }
567         pr_debug("pid %d: %s %d. Children left: %d\n", pid,
568                 status_str, code, child_count);
569         pr_debug("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
570                 (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
571                 (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
572
573         return 1;
574 }
575
576 /*
577  * Runs a command cmd with params argv, connects stdin and stdout to
578  * readfd and writefd
579  *
580  * Returns the pid of the executed process
581  */
582 int run_piped(const char *cmd, char *const argv[],
583               int *stdinfd, int *stdoutfd, int *stderrfd)
584 {
585         int ifd[2], ofd[2], efd[2], pid;
586
587         pr_info("Running command %s\n", cmd);
588
589         if (stdinfd && pipe(ifd)) {
590                 pr_err("pipe() failed: %m\n");
591                 return -1;
592         }
593
594         if (stdoutfd && pipe(ofd)) {
595                 pr_err("pipe() failed: %m\n");
596                 return -1;
597         }
598
599         if (stderrfd && pipe(efd)) {
600                 pr_err("pipe() failed: %m\n");
601                 return -1;
602         }
603
604         pid = do_fork();
605         if (pid) { /* Parent side */
606                 if (stdinfd) {
607                         close(ifd[0]);
608                         *stdinfd = ifd[0];
609                 }
610
611                 if (stdoutfd) {
612                         close(ofd[1]);
613                         *stdoutfd = ofd[0];
614                 }
615
616                 if (stderrfd) {
617                         close(efd[1]);
618                         *stderrfd = efd[0];
619                 }
620
621                 return pid;
622         }
623
624         if (stdinfd) {
625                 close(ifd[1]);
626                 dup2(ifd[0], STDIN_FILENO);
627         }
628
629         if (stdoutfd) {
630                 close(ofd[0]);
631                 dup2(ofd[1], STDOUT_FILENO);
632         }
633
634         if (stderrfd) {
635                 close(efd[0]);
636                 dup2(efd[1], STDERR_FILENO);
637         }
638
639         /* Now we have redirected standard streams to parent process */
640         execvp(cmd, argv);
641         pr_err("Failed to execv command %s: %m\n", cmd);
642         exit(1);
643
644         return 0;
645 }
646
647 /*
648  * Runs a command cmd with params argv, connects stdin and stdout to
649  * readfd and writefd
650  *
651  * Returns the pid of the executed process
652  */
653 int run_piped_stream(const char *cmd, char *const argv[],
654                      FILE **stdinf, FILE **stdoutf, FILE **stderrf)
655 {
656         int ifd, ofd, efd, pid;
657         int *i, *o, *e;
658
659         if (stdinf)
660                 i = &ifd;
661         else
662                 i = 0;
663         if (stdoutf)
664                 o = &ofd;
665         else
666                 o = 0;
667         if (stderrf)
668                 e = &efd;
669         else
670                 e = 0;
671
672         pid = run_piped(cmd, argv, i, o, e);
673
674         if (stdinf) {
675                 *stdinf = fdopen(ifd, "r");
676                 if (*stdinf == NULL) {
677                         pr_err("Error opening file stream for fd %d: %m\n",
678                                ifd);
679                         return -1;
680                 }
681         }
682
683         if (stdoutf) {
684                 *stdoutf = fdopen(ofd, "r");
685                 if (*stdoutf == NULL) {
686                         pr_err("Error opening file stream for fd %d: %m\n",
687                                ofd);
688                         return -1;
689                 }
690         }
691
692         if (stderrf) {
693                 *stderrf = fdopen(efd, "r");
694                 if (*stderrf == NULL) {
695                         pr_err("Error opening file stream for fd %d: %m\n",
696                                efd);
697                         return -1;
698                 }
699         }
700
701         return pid;
702 }
703
704 /*
705  * Forks a child and executes a command to run on parallel
706  */
707
708 #define max(a,b) (a) < (b) ? (b) : (a)
709 #define BUF_SIZE (128*1024)
710 int run(const char *cmd, char *const argv[])
711 {
712         int child, error;
713         int ofd, efd;
714         fd_set rfds;
715         int maxfd;
716         int eof = 0;
717
718         child = run_piped(cmd, argv, NULL, &ofd, &efd);
719
720         FD_ZERO(&rfds);
721         FD_SET(ofd, &rfds);
722         FD_SET(efd, &rfds);
723
724         while (!eof) {
725                 char *sptr , *eptr;
726                 char rbuf[BUF_SIZE];
727                 int bytes;
728                 int is_stderr = 0;
729
730                 maxfd = max(ofd, efd);
731                 error = select(maxfd, &rfds, NULL, NULL, NULL);
732
733                 if (error < 0) {
734                         pr_err("Error with select: %m\n");
735                         break;
736                 }
737
738                 if (FD_ISSET(ofd, &rfds)) {
739                         bytes = read(ofd, rbuf, BUF_SIZE);
740                         goto print;
741                 }
742
743                 if (FD_ISSET(efd, &rfds)) {
744                         is_stderr = 1;
745                         bytes = read(efd, rbuf, BUF_SIZE);
746                         goto print;
747                 }
748
749                 pr_err("select() returned unknown fd\n");
750                 break;
751
752 print:
753                 if (bytes < 0) {
754                         pr_err("read() failed: %m\n");
755                         break;
756                 }
757
758                 /*
759                  * Workaround: When a process had die and it has only
760                  * written to stderr, select() doesn't indicate that
761                  * there might be something to read in stderr fd. To
762                  * work around this issue, we try to read stderr just
763                  * in case in order to ensure everything gets read.
764                  */
765                 if (bytes == 0) {
766                         bytes = read(efd, rbuf, BUF_SIZE);
767                         is_stderr = 1;
768                         eof = 1;
769                 }
770
771                 sptr = eptr = rbuf;
772                 while (bytes--) {
773                         if (*eptr == '\n') {
774                                 *eptr = 0;
775                                 if (is_stderr)
776                                         pr_err("%s: stderr: %s\n",
777                                                 cmd, sptr);
778                                 else
779                                         pr_info("%s: stdout: %s\n",
780                                                 cmd, sptr);
781                                 sptr = eptr + 1;
782                         }
783                         eptr++;
784                 }
785         }
786
787         close(ofd);
788         close(efd);
789
790         harvest_zombies(child);
791
792         return 0;
793 }
794
795 int register_event_handler(struct event_handler *handler)
796 {
797         struct epoll_event ev;
798         int ret;
799
800         if (handler->fd <= 0) {
801                 pr_err("Invalid file descriptor of %d\n", handler->fd);
802                 return -1;
803         }
804
805         if (!handler->handle_event) {
806                 pr_err("Handler callback missing\n");
807                 return -1;
808         }
809
810         pr_info("Registering handler for %s, fd %d\n",
811                 handler->name, handler->fd);
812
813         ev.data.fd = handler->fd;
814         ev.data.ptr = handler;
815         ev.events = handler->events;
816         ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handler->fd, &ev);
817         if (ret) {
818                 pr_err("Failed to add epoll_fd: %m\n");
819                 return -1;
820         }
821
822         return 0;
823 }
824
825 void _mutex_lock_acquired(struct mutex *lock, char *file, int line)
826 {
827         lock->line = line;
828         lock->file = file;
829 }
830
831 int _mutex_lock(struct mutex *lock, char *file, int line)
832 {
833         int ret = 0;
834
835         if (!pthread_mutex_trylock(&lock->lock))
836                 goto out_lock;
837
838         pr_info("Lock contention on lock %s on %s:%d\n",
839                 lock->name, lock->file, lock->line);
840
841         ret = pthread_mutex_lock(&lock->lock);
842         if (ret)
843                 pr_err("Acquirin lock %s failed: %m, acquired %s:%d\n",
844                         lock->name, lock->file, lock->line);
845
846 out_lock:
847         _mutex_lock_acquired(lock, file, line);
848         return ret;
849 }
850
851 int _mutex_unlock(struct mutex *lock)
852 {
853         lock->line = 0;
854         lock->file = NULL;
855         pthread_mutex_unlock(&lock->lock);
856
857         return 0;
858 }