]> git.itanic.dy.fi Git - rrdd/blob - process.c
onewire_parser: Implement simultaneous reading
[rrdd] / process.c
1 #include <unistd.h>
2 #include <fcntl.h>
3 #include <sys/select.h>
4 #include <sys/epoll.h>
5 #include <stdio.h>
6 #include <sys/wait.h>
7 #include <sys/signalfd.h>
8 #include <sys/resource.h>
9
10 #include "process.h"
11 #include "debug.h"
12 #include "utils.h"
13
14 static int epoll_fd;
15 static unsigned int max_jobs;
16 static unsigned int max_jobs_pending;
17
18 struct work_struct {
19         const char *name;
20         int (*work_fn)(void *);
21         void *arg;
22         struct work_struct *next;
23 };
24
25 struct work_queue {
26         struct work_struct *work;
27         int length;
28         char *name;
29         struct mutex lock;
30 };
31
32 struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
33         {
34                 .name = "high priority",
35                 .lock = {
36                         .name = "high_prio_queue",
37                         .lock = PTHREAD_MUTEX_INITIALIZER,
38                 },
39         },
40         {
41                 .name = "low priority",
42                 .lock = {
43                         .name = "low_prio_queue",
44                         .lock = PTHREAD_MUTEX_INITIALIZER,
45                 },
46         },
47 };
48
49 struct mutex work_stats_mutex = {
50         .name = "work_stats",
51         .lock = PTHREAD_MUTEX_INITIALIZER,
52 };
53 static int workers_active;
54 static int worker_count;
55
56 static int run_work_on_queue(struct work_queue *queue)
57 {
58         struct work_struct *work;
59
60         mutex_lock(&queue->lock);
61
62         if (!queue->work) {
63                 mutex_unlock(&queue->lock);
64                 return 0;
65         }
66
67         /* Take next work */
68         work = queue->work;
69         queue->work = work->next;
70         queue->length--;
71
72         mutex_unlock(&queue->lock);
73
74         pr_info("Executing work %s from queue %s, %d still pending\n",
75                 work->name, queue->name, queue->length);
76
77         work->work_fn(work->arg);
78         pr_info("Work %s done\n", work->name);
79         free(work);
80
81         return 1;
82 }
83
84 static void *worker_thread(void *arg)
85 {
86         int ret;
87         char name[16];
88
89         mutex_lock(&work_stats_mutex);
90         snprintf(name, sizeof(name), "worker%d", worker_count);
91         worker_count++;
92         mutex_unlock(&work_stats_mutex);
93
94         pthread_setname_np(pthread_self(), name);
95         pthread_detach(pthread_self());
96
97         pr_info("Worker started\n");
98
99         /* Execute all high priority work from the queue */
100         while (run_work_on_queue(&work_queues[WORK_PRIORITY_HIGH]))
101                 ;
102         /*
103          * All high priority work is now done, see if we have enough
104          * workers executing low priority worl. Continue from there if
105          * needed.
106          */
107         mutex_lock(&work_stats_mutex);
108         if (workers_active > max_jobs)
109                 goto out_unlock;
110
111         mutex_unlock(&work_stats_mutex);
112
113         /*
114          * Start executing the low priority work. Drop the nice value
115          * as this really is low priority stuff
116          */
117         ret = nice(19);
118         pr_info("Worker priority dropped to %d\n", ret);
119
120         while (run_work_on_queue(&work_queues[WORK_PRIORITY_LOW]))
121                 ;
122
123         /* All done, exit */
124         mutex_lock(&work_stats_mutex);
125 out_unlock:
126         workers_active--;
127         pr_info("Worker exiting, %d left active\n",
128                 workers_active);
129
130         /*
131          * Last exiting worker zeroes the worker_count. This
132          * ensures next time we start spawning worker threads
133          * the first thread will have number zero on its name.
134          */
135         if (!workers_active)
136                 worker_count = 0;
137
138         mutex_unlock(&work_stats_mutex);
139
140         return NULL;
141 }
142
143 int queue_work(unsigned int priority, char *name,
144         int (work_fn)(void *arg), void *arg)
145 {
146         pthread_t *thread;
147         struct work_queue *queue;
148         struct work_struct *work, *last_work;
149
150         if (priority >= WORK_PRIORITIES_NUM) {
151                 pr_err("Invalid priority: %d\n", priority);
152                 return -EINVAL;
153         }
154
155         work = calloc(sizeof(*work), 1);
156
157         work->name = name;
158         work->work_fn = work_fn;
159         work->arg = arg;
160
161         queue = &work_queues[priority];
162
163         /* Insert new work at the end of the work queue */
164         mutex_lock(&queue->lock);
165
166         last_work = queue->work;
167         while (last_work && last_work->next)
168                 last_work = last_work->next;
169
170         if (!last_work)
171                 queue->work = work;
172         else
173                 last_work->next = work;
174
175         pr_info("Inserted work %s in queue %s, with %d pending items\n",
176                 work->name, queue->name, queue->length);
177         queue->length++;
178         mutex_unlock(&queue->lock);
179
180         mutex_lock(&work_stats_mutex);
181         pr_info("workers_active: %d, priority: %d\n", workers_active, priority);
182         if (priority != WORK_PRIORITY_HIGH && workers_active >= max_jobs) {
183                 mutex_unlock(&work_stats_mutex);
184                 return 0;
185         }
186         workers_active++;
187         mutex_unlock(&work_stats_mutex);
188
189         pr_info("Creating new worker thread\n");
190         /* We need a worker thread, create one */
191         thread = calloc(sizeof(*thread), 1);
192         pthread_create(thread, NULL, worker_thread, NULL);
193
194         free(thread);
195
196         return 0;
197 }
198
199 /*
200  * Initialize the jobcontrol.
201  *
202  * Create the pipes that are used to grant children execution
203  * permissions. If max_jobs is zero, count the number of CPUs from
204  * /proc/cpuinfo and use that.
205  */
206 int init_jobcontrol(int max_jobs_requested)
207 {
208         FILE *file;
209         int ret;
210         char buf[256];
211         char match[8];
212
213         epoll_fd = epoll_create(1);
214         if (epoll_fd == -1) {
215                 pr_err("Failed to epoll_create(): %m\n");
216                 return -1;
217         }
218
219         if (max_jobs_requested > 0) {
220                 max_jobs = max_jobs_requested;
221                 goto no_count_cpus;
222         }
223         max_jobs++;
224
225         file = fopen("/proc/cpuinfo", "ro");
226         if (!file) {
227                 pr_err("Failed to open /proc/cpuinfo: %m\n");
228                 goto open_fail;
229         }
230
231         /*
232          * The CPU count algorithm simply reads the first 8 bytes from
233          * the /proc/cpuinfo and then expects that line to be there as
234          * many times as there are CPUs.
235          */
236         ret = fread(match, 1, sizeof(match), file);
237         if (ret < sizeof(match)) {
238                 pr_err("read %d bytes when expecting %zd %m\n",
239                         ret, sizeof(match));
240                 goto read_fail;
241         }
242
243         while(fgets(buf, sizeof(buf), file)) {
244                 if (!strncmp(buf, match, sizeof(match)))
245                         max_jobs++;
246         }
247
248 open_fail:
249 read_fail:
250         fclose(file);
251
252 no_count_cpus:
253         pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
254
255         max_jobs_pending = max_jobs * 10 + 25;
256         pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
257
258         return 0;
259 }
260
261 int poll_job_requests(int timeout)
262 {
263         struct epoll_event event;
264         struct event_handler *job_handler;
265         int ret;
266
267         /* Convert positive seconds to milliseconds */
268         timeout = timeout > 0 ? 1000 * timeout : timeout;
269
270         ret = epoll_wait(epoll_fd, &event, 1, timeout);
271
272         if (ret == -1) {
273                 if (errno != EINTR) {
274                         pr_err("epoll_wait: %m\n");
275                         return -1;
276                 }
277
278                 /*
279                  * If epoll_wait() was interrupted, better start
280                  * everything again from the beginning
281                  */
282                 return 0;
283         }
284
285         if (ret == 0) {
286                 pr_info("Timed out\n");
287                 goto out;
288         }
289
290         job_handler = event.data.ptr;
291
292         if (!job_handler || !job_handler->handle_event) {
293                 pr_err("Corrupted event handler for fd %d\n",
294                         event.data.fd);
295                 goto out;
296         }
297
298         pr_debug("Running handler %s to handle events from fd %d\n",
299                 job_handler->name, job_handler->fd);
300         job_handler->handle_event(job_handler);
301
302 out:
303         pr_info("Workers active: %u\n", workers_active);
304         return ret;
305 }
306
307 int do_fork(void)
308 {
309         int child;
310         child = fork();
311         if (child < 0) {
312                 pr_err("fork() failed: %m\n");
313                 return -1;
314         }
315
316         if (child) {
317                 pr_debug("Fork child %d\n", child);
318                 return child;
319         }
320
321         return 0;
322 }
323
324 int clear_zombie(int pid)
325 {
326         int status;
327         struct rusage rusage;
328         char *status_str = NULL;
329         int code = 0;
330
331         if (pid)
332                 pr_debug("Waiting on pid %d\n", pid);
333
334         do {
335                 pid = wait4(pid, &status, 0, &rusage);
336                 if (pid < 0) {
337                         pr_err("Error on waitid(): %m\n");
338                         return 0;
339                 }
340                 /* Wait until the child has become a zombie */
341         } while (!WIFEXITED(status) && !WIFSIGNALED(status));
342
343         if (WIFEXITED(status)) {
344                 status_str = "exited with status";
345                 code = WEXITSTATUS(status);
346         } else if (WIFSIGNALED(status)) {
347                 status_str = "killed by signal";
348                 code = WTERMSIG(status);
349         }
350         pr_debug("pid %d: %s %d.\n", pid,
351                 status_str, code);
352         pr_debug("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
353                 (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
354                 (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
355
356         return 1;
357 }
358
359 /*
360  * Runs a command cmd with params argv, connects stdin and stdout to
361  * readfd and writefd
362  *
363  * Returns the pid of the executed process
364  */
365 int run_piped(const char *cmd, char *const argv[],
366               int *stdinfd, int *stdoutfd, int *stderrfd)
367 {
368         int ifd[2], ofd[2], efd[2], pid;
369
370         pr_info("Running command %s\n", cmd);
371
372         if (stdinfd && pipe(ifd)) {
373                 pr_err("pipe() failed: %m\n");
374                 return -1;
375         }
376
377         if (stdoutfd && pipe(ofd)) {
378                 pr_err("pipe() failed: %m\n");
379                 return -1;
380         }
381
382         if (stderrfd && pipe(efd)) {
383                 pr_err("pipe() failed: %m\n");
384                 return -1;
385         }
386
387         pid = do_fork();
388         if (pid) { /* Parent side */
389                 if (stdinfd) {
390                         close(ifd[0]);
391                         *stdinfd = ifd[0];
392                 }
393
394                 if (stdoutfd) {
395                         close(ofd[1]);
396                         *stdoutfd = ofd[0];
397                 }
398
399                 if (stderrfd) {
400                         close(efd[1]);
401                         *stderrfd = efd[0];
402                 }
403
404                 return pid;
405         }
406
407         if (stdinfd) {
408                 close(ifd[1]);
409                 dup2(ifd[0], STDIN_FILENO);
410         }
411
412         if (stdoutfd) {
413                 close(ofd[0]);
414                 dup2(ofd[1], STDOUT_FILENO);
415         }
416
417         if (stderrfd) {
418                 close(efd[0]);
419                 dup2(efd[1], STDERR_FILENO);
420         }
421
422         /* Now we have redirected standard streams to parent process */
423         execvp(cmd, argv);
424         pr_err("Failed to execv command %s: %m\n", cmd);
425         exit(1);
426
427         return 0;
428 }
429
430 /*
431  * Runs a command cmd with params argv, connects stdin and stdout to
432  * readfd and writefd
433  *
434  * Returns the pid of the executed process
435  */
436 int run_piped_stream(const char *cmd, char *const argv[],
437                      FILE **stdinf, FILE **stdoutf, FILE **stderrf)
438 {
439         int ifd, ofd, efd, pid;
440         int *i, *o, *e;
441
442         if (stdinf)
443                 i = &ifd;
444         else
445                 i = 0;
446         if (stdoutf)
447                 o = &ofd;
448         else
449                 o = 0;
450         if (stderrf)
451                 e = &efd;
452         else
453                 e = 0;
454
455         pid = run_piped(cmd, argv, i, o, e);
456
457         if (stdinf) {
458                 *stdinf = fdopen(ifd, "r");
459                 if (*stdinf == NULL) {
460                         pr_err("Error opening file stream for fd %d: %m\n",
461                                ifd);
462                         return -1;
463                 }
464         }
465
466         if (stdoutf) {
467                 *stdoutf = fdopen(ofd, "r");
468                 if (*stdoutf == NULL) {
469                         pr_err("Error opening file stream for fd %d: %m\n",
470                                ofd);
471                         return -1;
472                 }
473         }
474
475         if (stderrf) {
476                 *stderrf = fdopen(efd, "r");
477                 if (*stderrf == NULL) {
478                         pr_err("Error opening file stream for fd %d: %m\n",
479                                efd);
480                         return -1;
481                 }
482         }
483
484         return pid;
485 }
486
487 /*
488  * Forks a child and executes a command to run on parallel
489  */
490
491 #define BUF_SIZE (128*1024)
492 int run(const char *cmd, char *const argv[])
493 {
494         int child, error;
495         int ofd, efd;
496         fd_set rfds;
497         int maxfd;
498         int eof = 0;
499
500         child = run_piped(cmd, argv, NULL, &ofd, &efd);
501
502         FD_ZERO(&rfds);
503         FD_SET(ofd, &rfds);
504         FD_SET(efd, &rfds);
505
506         while (!eof) {
507                 char *sptr , *eptr;
508                 char rbuf[BUF_SIZE];
509                 int bytes;
510                 int is_stderr = 0;
511
512                 maxfd = max(ofd, efd);
513                 error = select(maxfd, &rfds, NULL, NULL, NULL);
514
515                 if (error < 0) {
516                         pr_err("Error with select: %m\n");
517                         break;
518                 }
519
520                 if (FD_ISSET(ofd, &rfds)) {
521                         bytes = read(ofd, rbuf, BUF_SIZE);
522                         goto print;
523                 }
524
525                 if (FD_ISSET(efd, &rfds)) {
526                         is_stderr = 1;
527                         bytes = read(efd, rbuf, BUF_SIZE);
528                         goto print;
529                 }
530
531                 pr_err("select() returned unknown fd\n");
532                 break;
533
534 print:
535                 if (bytes < 0) {
536                         pr_err("read() failed: %m\n");
537                         break;
538                 }
539
540                 /*
541                  * Workaround: When a process had die and it has only
542                  * written to stderr, select() doesn't indicate that
543                  * there might be something to read in stderr fd. To
544                  * work around this issue, we try to read stderr just
545                  * in case in order to ensure everything gets read.
546                  */
547                 if (bytes == 0) {
548                         bytes = read(efd, rbuf, BUF_SIZE);
549                         is_stderr = 1;
550                         eof = 1;
551                 }
552
553                 sptr = eptr = rbuf;
554                 while (bytes--) {
555                         if (*eptr == '\n') {
556                                 *eptr = 0;
557                                 if (is_stderr)
558                                         pr_err("%s: stderr: %s\n",
559                                                 cmd, sptr);
560                                 else
561                                         pr_info("%s: stdout: %s\n",
562                                                 cmd, sptr);
563                                 sptr = eptr + 1;
564                         }
565                         eptr++;
566                 }
567         }
568
569         close(ofd);
570         close(efd);
571
572         clear_zombie(child);
573
574         return 0;
575 }
576
577 int register_event_handler(struct event_handler *handler)
578 {
579         struct epoll_event ev;
580         int ret;
581
582         if (handler->fd <= 0) {
583                 pr_err("Invalid file descriptor of %d\n", handler->fd);
584                 return -1;
585         }
586
587         if (!handler->handle_event) {
588                 pr_err("Handler callback missing\n");
589                 return -1;
590         }
591
592         pr_info("Registering handler for %s, fd %d\n",
593                 handler->name, handler->fd);
594
595         ev.data.fd = handler->fd;
596         ev.data.ptr = handler;
597         ev.events = handler->events;
598         ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handler->fd, &ev);
599         if (ret) {
600                 pr_err("Failed to add epoll_fd: %m\n");
601                 return -1;
602         }
603
604         return 0;
605 }
606
607 void _mutex_lock_acquired(struct mutex *lock, char *file, int line)
608 {
609         lock->line = line;
610         lock->file = file;
611         pthread_getname_np(pthread_self(),
612                         lock->owner_name, sizeof(lock->owner_name));
613 }
614
615 int _mutex_lock(struct mutex *lock, char *file, int line)
616 {
617         int ret = 0;
618         int contended = 0;
619
620         if (!pthread_mutex_trylock(&lock->lock))
621                 goto out_lock;
622
623         contended = 1;
624         pr_info("Lock contention at %s:%d on lock %s acquired by %s at %s:%d\n",
625                 file, line, lock->name,
626                 lock->owner_name, lock->file, lock->line);
627
628         ret = pthread_mutex_lock(&lock->lock);
629         if (ret)
630                 pr_err("Acquirin lock %s failed: %m, acquired on %s:%d\n",
631                         lock->name, lock->file, lock->line);
632
633 out_lock:
634         if (contended)
635                 pr_info("Lock %s acquired at %s:%d after contention\n",
636                         lock->name, file, line);
637         _mutex_lock_acquired(lock, file, line);
638         return ret;
639 }
640
641 int _mutex_unlock(struct mutex *lock)
642 {
643         lock->line = 0;
644         lock->file = NULL;
645         pthread_mutex_unlock(&lock->lock);
646
647         return 0;
648 }