]> git.itanic.dy.fi Git - rrdd/blob - process.c
onewire_parser.c: Fix compiler warnings about string lengths
[rrdd] / process.c
1 #include <unistd.h>
2 #include <fcntl.h>
3 #include <sys/select.h>
4 #include <sys/epoll.h>
5 #include <stdio.h>
6 #include <sys/wait.h>
7 #include <sys/signalfd.h>
8 #include <sys/resource.h>
9
10 #include "process.h"
11 #include "debug.h"
12 #include "utils.h"
13
14 static int epoll_fd;
15 static unsigned int max_jobs;
16 static unsigned int max_jobs_pending;
17
18 struct work_struct {
19         const char *name;
20         int (*work_fn)(void *);
21         void *arg;
22         struct work_struct *next;
23 };
24
25 struct work_queue {
26         struct work_struct *work;
27         int length;
28         char *name;
29         struct mutex lock;
30 };
31
32 struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
33         {
34                 .name = "high priority",
35                 .lock = {
36                         .name = "high_prio_queue",
37                         .lock = PTHREAD_MUTEX_INITIALIZER,
38                 },
39         },
40         {
41                 .name = "low priority",
42                 .lock = {
43                         .name = "low_prio_queue",
44                         .lock = PTHREAD_MUTEX_INITIALIZER,
45                 },
46         },
47 };
48
49 struct mutex work_stats_mutex = {
50         .name = "work_stats",
51         .lock = PTHREAD_MUTEX_INITIALIZER,
52 };
53 static int workers_active;
54 static int worker_count;
55
56 static int job_notify_fd[2];
57
58 static int run_work_on_queue(struct work_queue *queue)
59 {
60         struct work_struct *work;
61
62         mutex_lock(&queue->lock);
63
64         if (!queue->work) {
65                 mutex_unlock(&queue->lock);
66                 return 0;
67         }
68
69         /* Take next work */
70         work = queue->work;
71         queue->work = work->next;
72         queue->length--;
73
74         mutex_unlock(&queue->lock);
75
76         pr_info("Executing work %s from queue %s, %d still pending\n",
77                 work->name, queue->name, queue->length);
78
79         work->work_fn(work->arg);
80         pr_info("Work %s done\n", work->name);
81         free(work);
82
83         return 1;
84 }
85
86 static void *worker_thread(void *arg)
87 {
88         int ret;
89         char name[16];
90
91         mutex_lock(&work_stats_mutex);
92         snprintf(name, sizeof(name), "worker%d", worker_count);
93         worker_count++;
94         mutex_unlock(&work_stats_mutex);
95
96         pthread_setname_np(pthread_self(), name);
97         pthread_detach(pthread_self());
98
99         pr_info("Worker started\n");
100
101         /* Execute all high priority work from the queue */
102         while (run_work_on_queue(&work_queues[WORK_PRIORITY_HIGH]))
103                 ;
104         /*
105          * All high priority work is now done, see if we have enough
106          * workers executing low priority worl. Continue from there if
107          * needed.
108          */
109         mutex_lock(&work_stats_mutex);
110         if (workers_active > max_jobs)
111                 goto out_unlock;
112
113         mutex_unlock(&work_stats_mutex);
114
115         /*
116          * Start executing the low priority work. Drop the nice value
117          * as this really is low priority stuff
118          */
119         ret = nice(19);
120         pr_info("Worker priority dropped to %d\n", ret);
121
122         while (run_work_on_queue(&work_queues[WORK_PRIORITY_LOW]))
123                 ;
124
125         /* All done, exit */
126         mutex_lock(&work_stats_mutex);
127 out_unlock:
128         workers_active--;
129         pr_info("Worker exiting, %d left active\n",
130                 workers_active);
131
132         /*
133          * Last exiting worker zeroes the worker_count. This
134          * ensures next time we start spawning worker threads
135          * the first thread will have number zero on its name.
136          */
137         if (!workers_active)
138                 worker_count = 0;
139
140         /*
141          * Kick the job poller. If all jobs were active at this point
142          * the scheduler thread will wait indefinitely until someone
143          * tells it to do something. We may now know when next job is
144          * available, so it is better for the scheduler to recalculate
145          * its sleep time.
146          */
147         notify_job_request();
148
149         mutex_unlock(&work_stats_mutex);
150
151         return NULL;
152 }
153
154 int queue_work(unsigned int priority, char *name,
155         int (work_fn)(void *arg), void *arg)
156 {
157         pthread_t *thread;
158         struct work_queue *queue;
159         struct work_struct *work, *last_work;
160
161         if (priority >= WORK_PRIORITIES_NUM) {
162                 pr_err("Invalid priority: %d\n", priority);
163                 return -EINVAL;
164         }
165
166         work = calloc(sizeof(*work), 1);
167
168         work->name = name;
169         work->work_fn = work_fn;
170         work->arg = arg;
171
172         queue = &work_queues[priority];
173
174         /* Insert new work at the end of the work queue */
175         mutex_lock(&queue->lock);
176
177         last_work = queue->work;
178         while (last_work && last_work->next)
179                 last_work = last_work->next;
180
181         if (!last_work)
182                 queue->work = work;
183         else
184                 last_work->next = work;
185
186         pr_info("Inserted work %s in queue %s, with %d pending items\n",
187                 work->name, queue->name, queue->length);
188         queue->length++;
189         mutex_unlock(&queue->lock);
190
191         mutex_lock(&work_stats_mutex);
192         pr_info("workers_active: %d, priority: %d\n", workers_active, priority);
193         if (priority != WORK_PRIORITY_HIGH && workers_active >= max_jobs) {
194                 mutex_unlock(&work_stats_mutex);
195                 return 0;
196         }
197         workers_active++;
198         mutex_unlock(&work_stats_mutex);
199
200         pr_info("Creating new worker thread\n");
201         /* We need a worker thread, create one */
202         thread = calloc(sizeof(*thread), 1);
203         pthread_create(thread, NULL, worker_thread, NULL);
204
205         free(thread);
206
207         return 0;
208 }
209
210 static int job_notify_handler(struct event_handler *h)
211 {
212         int ret;
213         char buf[64];
214
215         ret = read(job_notify_fd[0], buf, sizeof(buf));
216         if (ret < 0)
217                 pr_err("Failed to read: %m\n");
218
219         return 0;
220 }
221
222 /*
223  * Initialize the jobcontrol.
224  *
225  * Create the pipes that are used to grant children execution
226  * permissions. If max_jobs is zero, count the number of CPUs from
227  * /proc/cpuinfo and use that.
228  */
229 int init_jobcontrol(int max_jobs_requested)
230 {
231         static struct event_handler ev;
232         FILE *file;
233         int ret;
234         char buf[256];
235         char match[8];
236
237         epoll_fd = epoll_create(1);
238         if (epoll_fd == -1) {
239                 pr_err("Failed to epoll_create(): %m\n");
240                 return -1;
241         }
242
243         if (max_jobs_requested > 0) {
244                 max_jobs = max_jobs_requested;
245                 goto no_count_cpus;
246         }
247         max_jobs++;
248
249         file = fopen("/proc/cpuinfo", "ro");
250         if (!file) {
251                 pr_err("Failed to open /proc/cpuinfo: %m\n");
252                 goto open_fail;
253         }
254
255         /*
256          * The CPU count algorithm simply reads the first 8 bytes from
257          * the /proc/cpuinfo and then expects that line to be there as
258          * many times as there are CPUs.
259          */
260         ret = fread(match, 1, sizeof(match), file);
261         if (ret < sizeof(match)) {
262                 pr_err("read %d bytes when expecting %zd %m\n",
263                         ret, sizeof(match));
264                 goto read_fail;
265         }
266
267         while(fgets(buf, sizeof(buf), file)) {
268                 if (!strncmp(buf, match, sizeof(match)))
269                         max_jobs++;
270         }
271
272         ret = pipe(job_notify_fd);
273         if (ret) {
274                 pr_err("pipe() failed: %m\n");
275                 return ret;
276         }
277
278         ev.fd = job_notify_fd[0];
279         ev.events = EPOLLIN;
280         ev.handle_event = job_notify_handler;
281         ev.name = "job_notify";
282         register_event_handler(&ev, EPOLL_CTL_ADD);
283
284 open_fail:
285 read_fail:
286         fclose(file);
287
288 no_count_cpus:
289         pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
290
291         max_jobs_pending = max_jobs * 10 + 25;
292         pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
293
294         return 0;
295 }
296
297 int poll_job_requests(int timeout)
298 {
299         struct epoll_event event;
300         struct event_handler *job_handler;
301         int ret;
302
303         /* Convert positive seconds to milliseconds */
304         timeout = timeout > 0 ? 1000 * timeout : timeout;
305
306         ret = epoll_wait(epoll_fd, &event, 1, timeout);
307
308         if (ret == -1) {
309                 if (errno != EINTR) {
310                         pr_err("epoll_wait: %m\n");
311                         return -1;
312                 }
313
314                 /*
315                  * If epoll_wait() was interrupted, better start
316                  * everything again from the beginning
317                  */
318                 return 0;
319         }
320
321         if (ret == 0) {
322                 pr_info("Timed out\n");
323                 goto out;
324         }
325
326         job_handler = event.data.ptr;
327
328         if (!job_handler || !job_handler->handle_event) {
329                 pr_err("Corrupted event handler for fd %d\n",
330                         event.data.fd);
331                 goto out;
332         }
333
334         pr_debug("Running handler %s to handle events from fd %d\n",
335                 job_handler->name, job_handler->fd);
336         job_handler->handle_event(job_handler);
337
338 out:
339         pr_info("Workers active: %u\n", workers_active);
340         return ret;
341 }
342
343 int notify_job_request(void)
344 {
345         int ret;
346         char byte = 0;
347
348         ret = write(job_notify_fd[1], &byte, sizeof(byte));
349         if (ret < 0)
350                 pr_err("Failed to write: %m\n");
351
352         return 0;
353 }
354
355 int do_fork(void)
356 {
357         int child;
358         child = fork();
359         if (child < 0) {
360                 pr_err("fork() failed: %m\n");
361                 return -1;
362         }
363
364         if (child) {
365                 pr_debug("Fork child %d\n", child);
366                 return child;
367         }
368
369         return 0;
370 }
371
372 int clear_zombie(int pid)
373 {
374         int status;
375         struct rusage rusage;
376         char *status_str = NULL;
377         int code = 0;
378
379         if (pid)
380                 pr_debug("Waiting on pid %d\n", pid);
381
382         do {
383                 pid = wait4(pid, &status, 0, &rusage);
384                 if (pid < 0) {
385                         pr_err("Error on waitid(): %m\n");
386                         return 0;
387                 }
388                 /* Wait until the child has become a zombie */
389         } while (!WIFEXITED(status) && !WIFSIGNALED(status));
390
391         if (WIFEXITED(status)) {
392                 status_str = "exited with status";
393                 code = WEXITSTATUS(status);
394         } else if (WIFSIGNALED(status)) {
395                 status_str = "killed by signal";
396                 code = WTERMSIG(status);
397         }
398         pr_debug("pid %d: %s %d.\n", pid,
399                 status_str, code);
400         pr_debug("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
401                 (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
402                 (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
403
404         return 1;
405 }
406
407 /*
408  * Runs a command cmd with params argv, connects stdin and stdout to
409  * readfd and writefd
410  *
411  * Returns the pid of the executed process
412  */
413 int run_piped(const char *cmd, char *const argv[],
414               int *stdinfd, int *stdoutfd, int *stderrfd)
415 {
416         int ifd[2], ofd[2], efd[2], pid;
417
418         pr_info("Running command %s\n", cmd);
419
420         if (stdinfd && pipe(ifd)) {
421                 pr_err("pipe() failed: %m\n");
422                 return -1;
423         }
424
425         if (stdoutfd && pipe(ofd)) {
426                 pr_err("pipe() failed: %m\n");
427                 goto close_ifd;
428         }
429
430         if (stderrfd && pipe(efd)) {
431                 pr_err("pipe() failed: %m\n");
432                 goto close_ofd;
433         }
434
435         pid = do_fork();
436         if (pid) { /* Parent side */
437                 if (stdinfd) {
438                         close(ifd[0]);
439                         *stdinfd = ifd[0];
440                 }
441
442                 if (stdoutfd) {
443                         close(ofd[1]);
444                         *stdoutfd = ofd[0];
445                 }
446
447                 if (stderrfd) {
448                         close(efd[1]);
449                         *stderrfd = efd[0];
450                 }
451
452                 return pid;
453         }
454
455         if (stdinfd) {
456                 close(ifd[1]);
457                 dup2(ifd[0], STDIN_FILENO);
458         }
459
460         if (stdoutfd) {
461                 close(ofd[0]);
462                 dup2(ofd[1], STDOUT_FILENO);
463         }
464
465         if (stderrfd) {
466                 close(efd[0]);
467                 dup2(efd[1], STDERR_FILENO);
468         }
469
470         /* Now we have redirected standard streams to parent process */
471         execvp(cmd, argv);
472         pr_err("Failed to execv command %s: %m\n", cmd);
473         exit(1);
474
475         return 0;
476
477 close_ofd:
478         if (stdoutfd) {
479                 close(ofd[0]);
480                 close(ofd[1]);
481         }
482 close_ifd:
483         if (stdinfd) {
484                 close(ifd[0]);
485                 close(ifd[1]);
486         }
487
488         return -1;
489 }
490
491 /*
492  * Runs a command cmd with params argv, connects stdin and stdout to
493  * readfd and writefd
494  *
495  * Returns the pid of the executed process
496  */
497 int run_piped_stream(const char *cmd, char *const argv[],
498                      FILE **stdinf, FILE **stdoutf, FILE **stderrf)
499 {
500         int ifd, ofd, efd, pid;
501         int *i, *o, *e;
502
503         if (stdinf)
504                 i = &ifd;
505         else
506                 i = 0;
507         if (stdoutf)
508                 o = &ofd;
509         else
510                 o = 0;
511         if (stderrf)
512                 e = &efd;
513         else
514                 e = 0;
515
516         pid = run_piped(cmd, argv, i, o, e);
517
518         if (stdinf) {
519                 *stdinf = fdopen(ifd, "r");
520                 if (*stdinf == NULL) {
521                         pr_err("Error opening file stream for fd %d: %m\n",
522                                ifd);
523                         return -1;
524                 }
525         }
526
527         if (stdoutf) {
528                 *stdoutf = fdopen(ofd, "w");
529                 if (*stdoutf == NULL) {
530                         pr_err("Error opening file stream for fd %d: %m\n",
531                                ofd);
532                         return -1;
533                 }
534         }
535
536         if (stderrf) {
537                 *stderrf = fdopen(efd, "r");
538                 if (*stderrf == NULL) {
539                         pr_err("Error opening file stream for fd %d: %m\n",
540                                efd);
541                         return -1;
542                 }
543         }
544
545         return pid;
546 }
547
548 /*
549  * Forks a child and executes a command to run on parallel
550  */
551
552 #define BUF_SIZE (128*1024)
553 int run(const char *cmd, char *const argv[])
554 {
555         int child, error;
556         int ofd, efd;
557         fd_set rfds;
558         int maxfd;
559         int eof = 0;
560
561         child = run_piped(cmd, argv, NULL, &ofd, &efd);
562
563         FD_ZERO(&rfds);
564         FD_SET(ofd, &rfds);
565         FD_SET(efd, &rfds);
566
567         while (!eof) {
568                 char *sptr , *eptr;
569                 char rbuf[BUF_SIZE];
570                 int bytes;
571                 int is_stderr = 0;
572
573                 maxfd = max(ofd, efd);
574                 error = select(maxfd, &rfds, NULL, NULL, NULL);
575
576                 if (error < 0) {
577                         pr_err("Error with select: %m\n");
578                         break;
579                 }
580
581                 if (FD_ISSET(ofd, &rfds)) {
582                         bytes = read(ofd, rbuf, BUF_SIZE);
583                         goto print;
584                 }
585
586                 if (FD_ISSET(efd, &rfds)) {
587                         is_stderr = 1;
588                         bytes = read(efd, rbuf, BUF_SIZE);
589                         goto print;
590                 }
591
592                 pr_err("select() returned unknown fd\n");
593                 break;
594
595 print:
596                 if (bytes < 0) {
597                         pr_err("read() failed: %m\n");
598                         break;
599                 }
600
601                 /*
602                  * Workaround: When a process had die and it has only
603                  * written to stderr, select() doesn't indicate that
604                  * there might be something to read in stderr fd. To
605                  * work around this issue, we try to read stderr just
606                  * in case in order to ensure everything gets read.
607                  */
608                 if (bytes == 0) {
609                         bytes = read(efd, rbuf, BUF_SIZE);
610                         is_stderr = 1;
611                         eof = 1;
612                 }
613
614                 sptr = eptr = rbuf;
615                 while (bytes--) {
616                         if (*eptr == '\n') {
617                                 *eptr = 0;
618                                 if (is_stderr)
619                                         pr_err("%s: stderr: %s\n",
620                                                 cmd, sptr);
621                                 else
622                                         pr_info("%s: stdout: %s\n",
623                                                 cmd, sptr);
624                                 sptr = eptr + 1;
625                         }
626                         eptr++;
627                 }
628         }
629
630         close(ofd);
631         close(efd);
632
633         clear_zombie(child);
634
635         return 0;
636 }
637
638 int register_event_handler(struct event_handler *handler, int op)
639 {
640         struct epoll_event ev;
641         int ret;
642         const char *str;
643
644         if (handler->fd <= 0) {
645                 pr_err("Invalid file descriptor of %d\n", handler->fd);
646                 return -1;
647         }
648
649         if (!handler->handle_event) {
650                 pr_err("Handler callback missing\n");
651                 return -1;
652         }
653
654         switch (op) {
655         case EPOLL_CTL_ADD:
656                 str = "register";
657                 break;
658
659         case EPOLL_CTL_MOD:
660                 str = "modify";
661                 break;
662
663         case EPOLL_CTL_DEL:
664                 str = "deregister";
665                 break;
666
667         default:
668                 pr_err("Invalid op %d\n", op);
669                 return -1;
670         }
671
672         pr_info("Doing a epoll %s for handler %s, fd %d\n", str,
673                 handler->name, handler->fd);
674
675         ev.data.fd = handler->fd;
676         ev.data.ptr = handler;
677         ev.events = handler->events;
678         ret = epoll_ctl(epoll_fd, op, handler->fd, &ev);
679         if (ret) {
680                 pr_err("Failed to do epoll_ctl %s: %m\n", str);
681                 return -1;
682         }
683
684         return 0;
685 }
686
687 void _mutex_lock_acquired(struct mutex *lock, char *file, int line)
688 {
689         lock->line = line;
690         lock->file = file;
691         pthread_getname_np(pthread_self(),
692                         lock->owner_name, sizeof(lock->owner_name));
693 }
694
695 int _mutex_lock(struct mutex *lock, char *file, int line)
696 {
697         int ret = 0;
698         int contended = 0;
699
700         if (!pthread_mutex_trylock(&lock->lock))
701                 goto out_lock;
702
703         contended = 1;
704         pr_debug("Lock contention at %s:%d on lock %s acquired by %s at %s:%d\n",
705                 file, line, lock->name,
706                 lock->owner_name, lock->file, lock->line);
707
708         ret = pthread_mutex_lock(&lock->lock);
709         if (ret)
710                 pr_err("Acquirin lock %s failed: %m, acquired on %s:%d\n",
711                         lock->name, lock->file, lock->line);
712
713 out_lock:
714         if (contended)
715                 pr_debug("Lock %s acquired at %s:%d after contention\n",
716                         lock->name, file, line);
717         _mutex_lock_acquired(lock, file, line);
718         return ret;
719 }
720
721 int _mutex_unlock(struct mutex *lock)
722 {
723         lock->line = 0;
724         lock->file = NULL;
725         pthread_mutex_unlock(&lock->lock);
726
727         return 0;
728 }