]> git.itanic.dy.fi Git - rrdd/blob - process.c
rrdtool: get_last_update: Fix file descriptor leak
[rrdd] / process.c
1 #include <unistd.h>
2 #include <fcntl.h>
3 #include <sys/select.h>
4 #include <sys/epoll.h>
5 #include <stdio.h>
6 #include <sys/wait.h>
7 #include <sys/signalfd.h>
8 #include <sys/resource.h>
9
10 #include "process.h"
11 #include "debug.h"
12 #include "utils.h"
13
14 static int epoll_fd;
15 static unsigned int max_jobs;
16 static unsigned int max_jobs_pending;
17
18 struct work_struct {
19         const char *name;
20         int (*work_fn)(void *);
21         void *arg;
22         struct work_struct *next;
23 };
24
25 struct work_queue {
26         struct work_struct *work;
27         int length;
28         char *name;
29         struct mutex lock;
30 };
31
32 struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
33         {
34                 .name = "high priority",
35                 .lock = {
36                         .name = "high_prio_queue",
37                         .lock = PTHREAD_MUTEX_INITIALIZER,
38                 },
39         },
40         {
41                 .name = "low priority",
42                 .lock = {
43                         .name = "low_prio_queue",
44                         .lock = PTHREAD_MUTEX_INITIALIZER,
45                 },
46         },
47 };
48
49 struct mutex work_stats_mutex = {
50         .name = "work_stats",
51         .lock = PTHREAD_MUTEX_INITIALIZER,
52 };
53 static int workers_active;
54 static int worker_count;
55
56 static int job_notify_fd[2];
57
58 static int run_work_on_queue(struct work_queue *queue)
59 {
60         struct work_struct *work;
61
62         mutex_lock(&queue->lock);
63
64         if (!queue->work) {
65                 mutex_unlock(&queue->lock);
66                 return 0;
67         }
68
69         /* Take next work */
70         work = queue->work;
71         queue->work = work->next;
72         queue->length--;
73
74         mutex_unlock(&queue->lock);
75
76         pr_info("Executing work %s from queue %s, %d still pending\n",
77                 work->name, queue->name, queue->length);
78
79         work->work_fn(work->arg);
80         pr_info("Work %s done\n", work->name);
81         free(work);
82
83         return 1;
84 }
85
86 static void *worker_thread(void *arg)
87 {
88         int ret;
89         char name[16];
90
91         mutex_lock(&work_stats_mutex);
92         snprintf(name, sizeof(name), "worker%d", worker_count);
93         worker_count++;
94         mutex_unlock(&work_stats_mutex);
95
96         pthread_setname_np(pthread_self(), name);
97         pthread_detach(pthread_self());
98
99         pr_info("Worker started\n");
100
101         /* Execute all high priority work from the queue */
102         while (run_work_on_queue(&work_queues[WORK_PRIORITY_HIGH]))
103                 ;
104         /*
105          * All high priority work is now done, see if we have enough
106          * workers executing low priority worl. Continue from there if
107          * needed.
108          */
109         mutex_lock(&work_stats_mutex);
110         if (workers_active > max_jobs)
111                 goto out_unlock;
112
113         mutex_unlock(&work_stats_mutex);
114
115         /*
116          * Start executing the low priority work. Drop the nice value
117          * as this really is low priority stuff
118          */
119         ret = nice(19);
120         pr_info("Worker priority dropped to %d\n", ret);
121
122         while (run_work_on_queue(&work_queues[WORK_PRIORITY_LOW]))
123                 ;
124
125         /* All done, exit */
126         mutex_lock(&work_stats_mutex);
127 out_unlock:
128         workers_active--;
129         pr_info("Worker exiting, %d left active\n",
130                 workers_active);
131
132         /*
133          * Last exiting worker zeroes the worker_count. This
134          * ensures next time we start spawning worker threads
135          * the first thread will have number zero on its name.
136          */
137         if (!workers_active) {
138                 worker_count = 0;
139
140                 /*
141                  * Kick the job poller, just to print the time of next
142                  * update on the logs
143                  */
144                 notify_job_request();
145         }
146
147         mutex_unlock(&work_stats_mutex);
148
149         return NULL;
150 }
151
152 int queue_work(unsigned int priority, char *name,
153         int (work_fn)(void *arg), void *arg)
154 {
155         pthread_t *thread;
156         struct work_queue *queue;
157         struct work_struct *work, *last_work;
158
159         if (priority >= WORK_PRIORITIES_NUM) {
160                 pr_err("Invalid priority: %d\n", priority);
161                 return -EINVAL;
162         }
163
164         work = calloc(sizeof(*work), 1);
165
166         work->name = name;
167         work->work_fn = work_fn;
168         work->arg = arg;
169
170         queue = &work_queues[priority];
171
172         /* Insert new work at the end of the work queue */
173         mutex_lock(&queue->lock);
174
175         last_work = queue->work;
176         while (last_work && last_work->next)
177                 last_work = last_work->next;
178
179         if (!last_work)
180                 queue->work = work;
181         else
182                 last_work->next = work;
183
184         pr_info("Inserted work %s in queue %s, with %d pending items\n",
185                 work->name, queue->name, queue->length);
186         queue->length++;
187         mutex_unlock(&queue->lock);
188
189         mutex_lock(&work_stats_mutex);
190         pr_info("workers_active: %d, priority: %d\n", workers_active, priority);
191         if (priority != WORK_PRIORITY_HIGH && workers_active >= max_jobs) {
192                 mutex_unlock(&work_stats_mutex);
193                 return 0;
194         }
195         workers_active++;
196         mutex_unlock(&work_stats_mutex);
197
198         pr_info("Creating new worker thread\n");
199         /* We need a worker thread, create one */
200         thread = calloc(sizeof(*thread), 1);
201         pthread_create(thread, NULL, worker_thread, NULL);
202
203         free(thread);
204
205         return 0;
206 }
207
208 static int job_notify_handler(struct event_handler *h)
209 {
210         int ret;
211         char buf[64];
212
213         ret = read(job_notify_fd[0], buf, sizeof(buf));
214         if (ret < 0)
215                 pr_err("Failed to read: %m\n");
216
217         return 0;
218 }
219
220 /*
221  * Initialize the jobcontrol.
222  *
223  * Create the pipes that are used to grant children execution
224  * permissions. If max_jobs is zero, count the number of CPUs from
225  * /proc/cpuinfo and use that.
226  */
227 int init_jobcontrol(int max_jobs_requested)
228 {
229         static struct event_handler ev;
230         FILE *file;
231         int ret;
232         char buf[256];
233         char match[8];
234
235         epoll_fd = epoll_create(1);
236         if (epoll_fd == -1) {
237                 pr_err("Failed to epoll_create(): %m\n");
238                 return -1;
239         }
240
241         if (max_jobs_requested > 0) {
242                 max_jobs = max_jobs_requested;
243                 goto no_count_cpus;
244         }
245         max_jobs++;
246
247         file = fopen("/proc/cpuinfo", "ro");
248         if (!file) {
249                 pr_err("Failed to open /proc/cpuinfo: %m\n");
250                 goto open_fail;
251         }
252
253         /*
254          * The CPU count algorithm simply reads the first 8 bytes from
255          * the /proc/cpuinfo and then expects that line to be there as
256          * many times as there are CPUs.
257          */
258         ret = fread(match, 1, sizeof(match), file);
259         if (ret < sizeof(match)) {
260                 pr_err("read %d bytes when expecting %zd %m\n",
261                         ret, sizeof(match));
262                 goto read_fail;
263         }
264
265         while(fgets(buf, sizeof(buf), file)) {
266                 if (!strncmp(buf, match, sizeof(match)))
267                         max_jobs++;
268         }
269
270         ret = pipe(job_notify_fd);
271         if (ret) {
272                 pr_err("pipe() failed: %m\n");
273                 return ret;
274         }
275
276         ev.fd = job_notify_fd[0];
277         ev.events = EPOLLIN;
278         ev.handle_event = job_notify_handler;
279         ev.name = "job_notify";
280         register_event_handler(&ev, EPOLL_CTL_ADD);
281
282 open_fail:
283 read_fail:
284         fclose(file);
285
286 no_count_cpus:
287         pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
288
289         max_jobs_pending = max_jobs * 10 + 25;
290         pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
291
292         return 0;
293 }
294
295 int poll_job_requests(int timeout)
296 {
297         struct epoll_event event;
298         struct event_handler *job_handler;
299         int ret;
300
301         /* Convert positive seconds to milliseconds */
302         timeout = timeout > 0 ? 1000 * timeout : timeout;
303
304         ret = epoll_wait(epoll_fd, &event, 1, timeout);
305
306         if (ret == -1) {
307                 if (errno != EINTR) {
308                         pr_err("epoll_wait: %m\n");
309                         return -1;
310                 }
311
312                 /*
313                  * If epoll_wait() was interrupted, better start
314                  * everything again from the beginning
315                  */
316                 return 0;
317         }
318
319         if (ret == 0) {
320                 pr_info("Timed out\n");
321                 goto out;
322         }
323
324         job_handler = event.data.ptr;
325
326         if (!job_handler || !job_handler->handle_event) {
327                 pr_err("Corrupted event handler for fd %d\n",
328                         event.data.fd);
329                 goto out;
330         }
331
332         pr_debug("Running handler %s to handle events from fd %d\n",
333                 job_handler->name, job_handler->fd);
334         job_handler->handle_event(job_handler);
335
336 out:
337         pr_info("Workers active: %u\n", workers_active);
338         return ret;
339 }
340
341 int notify_job_request(void)
342 {
343         int ret;
344         char byte = 0;
345
346         ret = write(job_notify_fd[1], &byte, sizeof(byte));
347         if (ret < 0)
348                 pr_err("Failed to write: %m\n");
349
350         return 0;
351 }
352
353 int do_fork(void)
354 {
355         int child;
356         child = fork();
357         if (child < 0) {
358                 pr_err("fork() failed: %m\n");
359                 return -1;
360         }
361
362         if (child) {
363                 pr_debug("Fork child %d\n", child);
364                 return child;
365         }
366
367         return 0;
368 }
369
370 int clear_zombie(int pid)
371 {
372         int status;
373         struct rusage rusage;
374         char *status_str = NULL;
375         int code = 0;
376
377         if (pid)
378                 pr_debug("Waiting on pid %d\n", pid);
379
380         do {
381                 pid = wait4(pid, &status, 0, &rusage);
382                 if (pid < 0) {
383                         pr_err("Error on waitid(): %m\n");
384                         return 0;
385                 }
386                 /* Wait until the child has become a zombie */
387         } while (!WIFEXITED(status) && !WIFSIGNALED(status));
388
389         if (WIFEXITED(status)) {
390                 status_str = "exited with status";
391                 code = WEXITSTATUS(status);
392         } else if (WIFSIGNALED(status)) {
393                 status_str = "killed by signal";
394                 code = WTERMSIG(status);
395         }
396         pr_debug("pid %d: %s %d.\n", pid,
397                 status_str, code);
398         pr_debug("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
399                 (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
400                 (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
401
402         return 1;
403 }
404
405 /*
406  * Runs a command cmd with params argv, connects stdin and stdout to
407  * readfd and writefd
408  *
409  * Returns the pid of the executed process
410  */
411 int run_piped(const char *cmd, char *const argv[],
412               int *stdinfd, int *stdoutfd, int *stderrfd)
413 {
414         int ifd[2], ofd[2], efd[2], pid;
415
416         pr_info("Running command %s\n", cmd);
417
418         if (stdinfd && pipe(ifd)) {
419                 pr_err("pipe() failed: %m\n");
420                 return -1;
421         }
422
423         if (stdoutfd && pipe(ofd)) {
424                 pr_err("pipe() failed: %m\n");
425                 goto close_ifd;
426         }
427
428         if (stderrfd && pipe(efd)) {
429                 pr_err("pipe() failed: %m\n");
430                 goto close_ofd;
431         }
432
433         pid = do_fork();
434         if (pid) { /* Parent side */
435                 if (stdinfd) {
436                         close(ifd[0]);
437                         *stdinfd = ifd[0];
438                 }
439
440                 if (stdoutfd) {
441                         close(ofd[1]);
442                         *stdoutfd = ofd[0];
443                 }
444
445                 if (stderrfd) {
446                         close(efd[1]);
447                         *stderrfd = efd[0];
448                 }
449
450                 return pid;
451         }
452
453         if (stdinfd) {
454                 close(ifd[1]);
455                 dup2(ifd[0], STDIN_FILENO);
456         }
457
458         if (stdoutfd) {
459                 close(ofd[0]);
460                 dup2(ofd[1], STDOUT_FILENO);
461         }
462
463         if (stderrfd) {
464                 close(efd[0]);
465                 dup2(efd[1], STDERR_FILENO);
466         }
467
468         /* Now we have redirected standard streams to parent process */
469         execvp(cmd, argv);
470         pr_err("Failed to execv command %s: %m\n", cmd);
471         exit(1);
472
473         return 0;
474
475 close_ofd:
476         if (stdoutfd) {
477                 close(ofd[0]);
478                 close(ofd[1]);
479         }
480 close_ifd:
481         if (stdinfd) {
482                 close(ifd[0]);
483                 close(ifd[1]);
484         }
485
486         return -1;
487 }
488
489 /*
490  * Runs a command cmd with params argv, connects stdin and stdout to
491  * readfd and writefd
492  *
493  * Returns the pid of the executed process
494  */
495 int run_piped_stream(const char *cmd, char *const argv[],
496                      FILE **stdinf, FILE **stdoutf, FILE **stderrf)
497 {
498         int ifd, ofd, efd, pid;
499         int *i, *o, *e;
500
501         if (stdinf)
502                 i = &ifd;
503         else
504                 i = 0;
505         if (stdoutf)
506                 o = &ofd;
507         else
508                 o = 0;
509         if (stderrf)
510                 e = &efd;
511         else
512                 e = 0;
513
514         pid = run_piped(cmd, argv, i, o, e);
515
516         if (stdinf) {
517                 *stdinf = fdopen(ifd, "r");
518                 if (*stdinf == NULL) {
519                         pr_err("Error opening file stream for fd %d: %m\n",
520                                ifd);
521                         return -1;
522                 }
523         }
524
525         if (stdoutf) {
526                 *stdoutf = fdopen(ofd, "w");
527                 if (*stdoutf == NULL) {
528                         pr_err("Error opening file stream for fd %d: %m\n",
529                                ofd);
530                         return -1;
531                 }
532         }
533
534         if (stderrf) {
535                 *stderrf = fdopen(efd, "r");
536                 if (*stderrf == NULL) {
537                         pr_err("Error opening file stream for fd %d: %m\n",
538                                efd);
539                         return -1;
540                 }
541         }
542
543         return pid;
544 }
545
546 /*
547  * Forks a child and executes a command to run on parallel
548  */
549
550 #define BUF_SIZE (128*1024)
551 int run(const char *cmd, char *const argv[])
552 {
553         int child, error;
554         int ofd, efd;
555         fd_set rfds;
556         int maxfd;
557         int eof = 0;
558
559         child = run_piped(cmd, argv, NULL, &ofd, &efd);
560
561         FD_ZERO(&rfds);
562         FD_SET(ofd, &rfds);
563         FD_SET(efd, &rfds);
564
565         while (!eof) {
566                 char *sptr , *eptr;
567                 char rbuf[BUF_SIZE];
568                 int bytes;
569                 int is_stderr = 0;
570
571                 maxfd = max(ofd, efd);
572                 error = select(maxfd, &rfds, NULL, NULL, NULL);
573
574                 if (error < 0) {
575                         pr_err("Error with select: %m\n");
576                         break;
577                 }
578
579                 if (FD_ISSET(ofd, &rfds)) {
580                         bytes = read(ofd, rbuf, BUF_SIZE);
581                         goto print;
582                 }
583
584                 if (FD_ISSET(efd, &rfds)) {
585                         is_stderr = 1;
586                         bytes = read(efd, rbuf, BUF_SIZE);
587                         goto print;
588                 }
589
590                 pr_err("select() returned unknown fd\n");
591                 break;
592
593 print:
594                 if (bytes < 0) {
595                         pr_err("read() failed: %m\n");
596                         break;
597                 }
598
599                 /*
600                  * Workaround: When a process had die and it has only
601                  * written to stderr, select() doesn't indicate that
602                  * there might be something to read in stderr fd. To
603                  * work around this issue, we try to read stderr just
604                  * in case in order to ensure everything gets read.
605                  */
606                 if (bytes == 0) {
607                         bytes = read(efd, rbuf, BUF_SIZE);
608                         is_stderr = 1;
609                         eof = 1;
610                 }
611
612                 sptr = eptr = rbuf;
613                 while (bytes--) {
614                         if (*eptr == '\n') {
615                                 *eptr = 0;
616                                 if (is_stderr)
617                                         pr_err("%s: stderr: %s\n",
618                                                 cmd, sptr);
619                                 else
620                                         pr_info("%s: stdout: %s\n",
621                                                 cmd, sptr);
622                                 sptr = eptr + 1;
623                         }
624                         eptr++;
625                 }
626         }
627
628         close(ofd);
629         close(efd);
630
631         clear_zombie(child);
632
633         return 0;
634 }
635
636 int register_event_handler(struct event_handler *handler, int op)
637 {
638         struct epoll_event ev;
639         int ret;
640         const char *str;
641
642         if (handler->fd <= 0) {
643                 pr_err("Invalid file descriptor of %d\n", handler->fd);
644                 return -1;
645         }
646
647         if (!handler->handle_event) {
648                 pr_err("Handler callback missing\n");
649                 return -1;
650         }
651
652         switch (op) {
653         case EPOLL_CTL_ADD:
654                 str = "register";
655                 break;
656
657         case EPOLL_CTL_MOD:
658                 str = "modify";
659                 break;
660
661         case EPOLL_CTL_DEL:
662                 str = "deregister";
663                 break;
664
665         default:
666                 pr_err("Invalid op %d\n", op);
667                 return -1;
668         }
669
670         pr_info("Doing a epoll %s for handler %s, fd %d\n", str,
671                 handler->name, handler->fd);
672
673         ev.data.fd = handler->fd;
674         ev.data.ptr = handler;
675         ev.events = handler->events;
676         ret = epoll_ctl(epoll_fd, op, handler->fd, &ev);
677         if (ret) {
678                 pr_err("Failed to do epoll_ctl %s: %m\n", str);
679                 return -1;
680         }
681
682         return 0;
683 }
684
685 void _mutex_lock_acquired(struct mutex *lock, char *file, int line)
686 {
687         lock->line = line;
688         lock->file = file;
689         pthread_getname_np(pthread_self(),
690                         lock->owner_name, sizeof(lock->owner_name));
691 }
692
693 int _mutex_lock(struct mutex *lock, char *file, int line)
694 {
695         int ret = 0;
696         int contended = 0;
697
698         if (!pthread_mutex_trylock(&lock->lock))
699                 goto out_lock;
700
701         contended = 1;
702         pr_debug("Lock contention at %s:%d on lock %s acquired by %s at %s:%d\n",
703                 file, line, lock->name,
704                 lock->owner_name, lock->file, lock->line);
705
706         ret = pthread_mutex_lock(&lock->lock);
707         if (ret)
708                 pr_err("Acquirin lock %s failed: %m, acquired on %s:%d\n",
709                         lock->name, lock->file, lock->line);
710
711 out_lock:
712         if (contended)
713                 pr_debug("Lock %s acquired at %s:%d after contention\n",
714                         lock->name, file, line);
715         _mutex_lock_acquired(lock, file, line);
716         return ret;
717 }
718
719 int _mutex_unlock(struct mutex *lock)
720 {
721         lock->line = 0;
722         lock->file = NULL;
723         pthread_mutex_unlock(&lock->lock);
724
725         return 0;
726 }