]> git.itanic.dy.fi Git - rrdd/blob - process.c
Allow parsers to store private data to databases
[rrdd] / process.c
1 #include <unistd.h>
2 #include <fcntl.h>
3 #include <sys/select.h>
4 #include <sys/epoll.h>
5 #include <stdio.h>
6 #include <sys/wait.h>
7 #include <sys/signalfd.h>
8 #include <sys/resource.h>
9
10 #include "process.h"
11 #include "debug.h"
12
13 static int epoll_fd;
14 static unsigned int max_jobs;
15 static unsigned int max_jobs_pending;
16
17 struct work_struct {
18         const char *name;
19         int (*work_fn)(void *);
20         void *arg;
21         struct work_struct *next;
22 };
23
24 struct work_queue {
25         struct work_struct *work;
26         int length;
27         char *name;
28         struct mutex lock;
29 };
30
31 struct work_queue work_queues[WORK_PRIORITIES_NUM] = {
32         {
33                 .name = "high priority",
34                 .lock = {
35                         .name = "high_prio_queue",
36                         .lock = PTHREAD_MUTEX_INITIALIZER,
37                 },
38         },
39         {
40                 .name = "low priority",
41                 .lock = {
42                         .name = "low_prio_queue",
43                         .lock = PTHREAD_MUTEX_INITIALIZER,
44                 },
45         },
46 };
47
48 struct mutex work_stats_mutex = {
49         .name = "work_stats",
50         .lock = PTHREAD_MUTEX_INITIALIZER,
51 };
52 static int workers_active;
53 static int worker_count;
54
55 static int run_work_on_queue(struct work_queue *queue)
56 {
57         struct work_struct *work;
58
59         mutex_lock(&queue->lock);
60
61         if (!queue->work) {
62                 mutex_unlock(&queue->lock);
63                 return 0;
64         }
65
66         /* Take next work */
67         work = queue->work;
68         queue->work = work->next;
69         queue->length--;
70
71         mutex_unlock(&queue->lock);
72
73         pr_info("Executing work %s from queue %s, %d still pending\n",
74                 work->name, queue->name, queue->length);
75
76         work->work_fn(work->arg);
77         pr_info("Work %s done\n", work->name);
78         free(work);
79
80         return 1;
81 }
82
83 static void *worker_thread(void *arg)
84 {
85         int stop_working = 0;
86         int work_done = 0;
87         char name[16];
88
89         mutex_lock(&work_stats_mutex);
90         snprintf(name, sizeof(name), "worker%d", worker_count);
91         worker_count++;
92         mutex_unlock(&work_stats_mutex);
93
94         pthread_setname_np(pthread_self(), name);
95         pthread_detach(pthread_self());
96
97         pr_info("Worker started\n");
98
99         while (!stop_working) {
100                 while (1) {
101                         int prio;
102
103                         /*
104                          * Execute as much work from the high priority
105                          * queue as possible. Once there are no more
106                          * high prio work left, break out the loop and
107                          * see if we still need this many workers.
108                          */
109                         for (prio = 0; prio < WORK_PRIORITIES_NUM; prio++) {
110                                 work_done =
111                                         run_work_on_queue(&work_queues[prio]);
112                                 if (work_done)
113                                         break;
114                         }
115
116                         if (!work_done || prio != WORK_PRIORITY_HIGH)
117                                 break;
118                 }
119
120                 mutex_lock(&work_stats_mutex);
121                 if (workers_active > max_jobs || !work_done) {
122                         workers_active--;
123                         pr_info("Worker exiting, %d left active\n",
124                                 workers_active);
125                         if (!workers_active)
126                                 worker_count = 0;
127                         stop_working = 1;
128                 }
129                 mutex_unlock(&work_stats_mutex);
130         }
131
132         return NULL;
133 }
134
135 int queue_work(unsigned int priority, char *name,
136         int (work_fn)(void *arg), void *arg)
137 {
138         pthread_t *thread;
139         struct work_queue *queue;
140         struct work_struct *work, *last_work;
141
142         if (priority >= WORK_PRIORITIES_NUM) {
143                 pr_err("Invalid priority: %d\n", priority);
144                 return -EINVAL;
145         }
146
147         work = calloc(sizeof(*work), 1);
148
149         work->name = name;
150         work->work_fn = work_fn;
151         work->arg = arg;
152
153         queue = &work_queues[priority];
154
155         /* Insert new work at the end of the work queue */
156         mutex_lock(&queue->lock);
157
158         last_work = queue->work;
159         while (last_work && last_work->next)
160                 last_work = last_work->next;
161
162         if (!last_work)
163                 queue->work = work;
164         else
165                 last_work->next = work;
166
167         pr_info("Inserted work %s in queue %s, with %d pending items\n",
168                 work->name, queue->name, queue->length);
169         queue->length++;
170         mutex_unlock(&queue->lock);
171
172         mutex_lock(&work_stats_mutex);
173         pr_info("workers_active: %d, priority: %d\n", workers_active, priority);
174         if (priority != WORK_PRIORITY_HIGH && workers_active >= max_jobs) {
175                 mutex_unlock(&work_stats_mutex);
176                 return 0;
177         }
178         workers_active++;
179         mutex_unlock(&work_stats_mutex);
180
181         pr_info("Creating new worker thread\n");
182         /* We need a worker thread, create one */
183         thread = calloc(sizeof(*thread), 1);
184         pthread_create(thread, NULL, worker_thread, NULL);
185
186         free(thread);
187
188         return 0;
189 }
190
191 /*
192  * Initialize the jobcontrol.
193  *
194  * Create the pipes that are used to grant children execution
195  * permissions. If max_jobs is zero, count the number of CPUs from
196  * /proc/cpuinfo and use that.
197  */
198 int init_jobcontrol(int max_jobs_requested)
199 {
200         FILE *file;
201         int ret;
202         char buf[256];
203         char match[8];
204
205         epoll_fd = epoll_create(1);
206         if (epoll_fd == -1) {
207                 pr_err("Failed to epoll_create(): %m\n");
208                 return -1;
209         }
210
211         if (max_jobs_requested > 0) {
212                 max_jobs = max_jobs_requested;
213                 goto no_count_cpus;
214         }
215         max_jobs++;
216
217         file = fopen("/proc/cpuinfo", "ro");
218         if (!file) {
219                 pr_err("Failed to open /proc/cpuinfo: %m\n");
220                 goto open_fail;
221         }
222
223         /*
224          * The CPU count algorithm simply reads the first 8 bytes from
225          * the /proc/cpuinfo and then expects that line to be there as
226          * many times as there are CPUs.
227          */
228         ret = fread(match, 1, sizeof(match), file);
229         if (ret < sizeof(match)) {
230                 pr_err("read %d bytes when expecting %zd %m\n",
231                         ret, sizeof(match));
232                 goto read_fail;
233         }
234
235         while(fgets(buf, sizeof(buf), file)) {
236                 if (!strncmp(buf, match, sizeof(match)))
237                         max_jobs++;
238         }
239
240 open_fail:
241 read_fail:
242         fclose(file);
243
244 no_count_cpus:
245         pr_info("Set maximum number of parallel jobs to %d\n", max_jobs);
246
247         max_jobs_pending = max_jobs * 10 + 25;
248         pr_info("Set maximum number of pending jobs to %d\n", max_jobs_pending);
249
250         return 0;
251 }
252
253 int poll_job_requests(int timeout)
254 {
255         struct epoll_event event;
256         struct event_handler *job_handler;
257         int ret;
258
259         /* Convert positive seconds to milliseconds */
260         timeout = timeout > 0 ? 1000 * timeout : timeout;
261
262         ret = epoll_wait(epoll_fd, &event, 1, timeout);
263
264         if (ret == -1) {
265                 if (errno != EINTR) {
266                         pr_err("epoll_wait: %m\n");
267                         return -1;
268                 }
269
270                 /*
271                  * If epoll_wait() was interrupted, better start
272                  * everything again from the beginning
273                  */
274                 return 0;
275         }
276
277         if (ret == 0) {
278                 pr_info("Timed out\n");
279                 goto out;
280         }
281
282         job_handler = event.data.ptr;
283
284         if (!job_handler || !job_handler->handle_event) {
285                 pr_err("Corrupted event handler for fd %d\n",
286                         event.data.fd);
287                 goto out;
288         }
289
290         pr_debug("Running handler %s to handle events from fd %d\n",
291                 job_handler->name, job_handler->fd);
292         job_handler->handle_event(job_handler);
293
294 out:
295         pr_info("Workers active: %u\n", workers_active);
296         return ret;
297 }
298
299 int do_fork(void)
300 {
301         int child;
302         child = fork();
303         if (child < 0) {
304                 pr_err("fork() failed: %m\n");
305                 return -1;
306         }
307
308         if (child) {
309                 pr_debug("Fork child %d\n", child);
310                 return child;
311         }
312
313         return 0;
314 }
315
316 int clear_zombie(int pid)
317 {
318         int status;
319         struct rusage rusage;
320         char *status_str = NULL;
321         int code = 0;
322
323         if (pid)
324                 pr_debug("Waiting on pid %d\n", pid);
325
326         do {
327                 pid = wait4(pid, &status, 0, &rusage);
328                 if (pid < 0) {
329                         pr_err("Error on waitid(): %m\n");
330                         return 0;
331                 }
332                 /* Wait until the child has become a zombie */
333         } while (!WIFEXITED(status) && !WIFSIGNALED(status));
334
335         if (WIFEXITED(status)) {
336                 status_str = "exited with status";
337                 code = WEXITSTATUS(status);
338         } else if (WIFSIGNALED(status)) {
339                 status_str = "killed by signal";
340                 code = WTERMSIG(status);
341         }
342         pr_debug("pid %d: %s %d.\n", pid,
343                 status_str, code);
344         pr_debug("pid %d: User time: %ld.%03lds, System %ld.%03lds\n", pid,
345                 (long)rusage.ru_utime.tv_sec, rusage.ru_utime.tv_usec / 1000,
346                 (long)rusage.ru_stime.tv_sec, rusage.ru_stime.tv_usec / 1000);
347
348         return 1;
349 }
350
351 /*
352  * Runs a command cmd with params argv, connects stdin and stdout to
353  * readfd and writefd
354  *
355  * Returns the pid of the executed process
356  */
357 int run_piped(const char *cmd, char *const argv[],
358               int *stdinfd, int *stdoutfd, int *stderrfd)
359 {
360         int ifd[2], ofd[2], efd[2], pid;
361
362         pr_info("Running command %s\n", cmd);
363
364         if (stdinfd && pipe(ifd)) {
365                 pr_err("pipe() failed: %m\n");
366                 return -1;
367         }
368
369         if (stdoutfd && pipe(ofd)) {
370                 pr_err("pipe() failed: %m\n");
371                 return -1;
372         }
373
374         if (stderrfd && pipe(efd)) {
375                 pr_err("pipe() failed: %m\n");
376                 return -1;
377         }
378
379         pid = do_fork();
380         if (pid) { /* Parent side */
381                 if (stdinfd) {
382                         close(ifd[0]);
383                         *stdinfd = ifd[0];
384                 }
385
386                 if (stdoutfd) {
387                         close(ofd[1]);
388                         *stdoutfd = ofd[0];
389                 }
390
391                 if (stderrfd) {
392                         close(efd[1]);
393                         *stderrfd = efd[0];
394                 }
395
396                 return pid;
397         }
398
399         if (stdinfd) {
400                 close(ifd[1]);
401                 dup2(ifd[0], STDIN_FILENO);
402         }
403
404         if (stdoutfd) {
405                 close(ofd[0]);
406                 dup2(ofd[1], STDOUT_FILENO);
407         }
408
409         if (stderrfd) {
410                 close(efd[0]);
411                 dup2(efd[1], STDERR_FILENO);
412         }
413
414         /* Now we have redirected standard streams to parent process */
415         execvp(cmd, argv);
416         pr_err("Failed to execv command %s: %m\n", cmd);
417         exit(1);
418
419         return 0;
420 }
421
422 /*
423  * Runs a command cmd with params argv, connects stdin and stdout to
424  * readfd and writefd
425  *
426  * Returns the pid of the executed process
427  */
428 int run_piped_stream(const char *cmd, char *const argv[],
429                      FILE **stdinf, FILE **stdoutf, FILE **stderrf)
430 {
431         int ifd, ofd, efd, pid;
432         int *i, *o, *e;
433
434         if (stdinf)
435                 i = &ifd;
436         else
437                 i = 0;
438         if (stdoutf)
439                 o = &ofd;
440         else
441                 o = 0;
442         if (stderrf)
443                 e = &efd;
444         else
445                 e = 0;
446
447         pid = run_piped(cmd, argv, i, o, e);
448
449         if (stdinf) {
450                 *stdinf = fdopen(ifd, "r");
451                 if (*stdinf == NULL) {
452                         pr_err("Error opening file stream for fd %d: %m\n",
453                                ifd);
454                         return -1;
455                 }
456         }
457
458         if (stdoutf) {
459                 *stdoutf = fdopen(ofd, "r");
460                 if (*stdoutf == NULL) {
461                         pr_err("Error opening file stream for fd %d: %m\n",
462                                ofd);
463                         return -1;
464                 }
465         }
466
467         if (stderrf) {
468                 *stderrf = fdopen(efd, "r");
469                 if (*stderrf == NULL) {
470                         pr_err("Error opening file stream for fd %d: %m\n",
471                                efd);
472                         return -1;
473                 }
474         }
475
476         return pid;
477 }
478
479 /*
480  * Forks a child and executes a command to run on parallel
481  */
482
483 #define max(a,b) (a) < (b) ? (b) : (a)
484 #define BUF_SIZE (128*1024)
485 int run(const char *cmd, char *const argv[])
486 {
487         int child, error;
488         int ofd, efd;
489         fd_set rfds;
490         int maxfd;
491         int eof = 0;
492
493         child = run_piped(cmd, argv, NULL, &ofd, &efd);
494
495         FD_ZERO(&rfds);
496         FD_SET(ofd, &rfds);
497         FD_SET(efd, &rfds);
498
499         while (!eof) {
500                 char *sptr , *eptr;
501                 char rbuf[BUF_SIZE];
502                 int bytes;
503                 int is_stderr = 0;
504
505                 maxfd = max(ofd, efd);
506                 error = select(maxfd, &rfds, NULL, NULL, NULL);
507
508                 if (error < 0) {
509                         pr_err("Error with select: %m\n");
510                         break;
511                 }
512
513                 if (FD_ISSET(ofd, &rfds)) {
514                         bytes = read(ofd, rbuf, BUF_SIZE);
515                         goto print;
516                 }
517
518                 if (FD_ISSET(efd, &rfds)) {
519                         is_stderr = 1;
520                         bytes = read(efd, rbuf, BUF_SIZE);
521                         goto print;
522                 }
523
524                 pr_err("select() returned unknown fd\n");
525                 break;
526
527 print:
528                 if (bytes < 0) {
529                         pr_err("read() failed: %m\n");
530                         break;
531                 }
532
533                 /*
534                  * Workaround: When a process had die and it has only
535                  * written to stderr, select() doesn't indicate that
536                  * there might be something to read in stderr fd. To
537                  * work around this issue, we try to read stderr just
538                  * in case in order to ensure everything gets read.
539                  */
540                 if (bytes == 0) {
541                         bytes = read(efd, rbuf, BUF_SIZE);
542                         is_stderr = 1;
543                         eof = 1;
544                 }
545
546                 sptr = eptr = rbuf;
547                 while (bytes--) {
548                         if (*eptr == '\n') {
549                                 *eptr = 0;
550                                 if (is_stderr)
551                                         pr_err("%s: stderr: %s\n",
552                                                 cmd, sptr);
553                                 else
554                                         pr_info("%s: stdout: %s\n",
555                                                 cmd, sptr);
556                                 sptr = eptr + 1;
557                         }
558                         eptr++;
559                 }
560         }
561
562         close(ofd);
563         close(efd);
564
565         clear_zombie(child);
566
567         return 0;
568 }
569
570 int register_event_handler(struct event_handler *handler)
571 {
572         struct epoll_event ev;
573         int ret;
574
575         if (handler->fd <= 0) {
576                 pr_err("Invalid file descriptor of %d\n", handler->fd);
577                 return -1;
578         }
579
580         if (!handler->handle_event) {
581                 pr_err("Handler callback missing\n");
582                 return -1;
583         }
584
585         pr_info("Registering handler for %s, fd %d\n",
586                 handler->name, handler->fd);
587
588         ev.data.fd = handler->fd;
589         ev.data.ptr = handler;
590         ev.events = handler->events;
591         ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, handler->fd, &ev);
592         if (ret) {
593                 pr_err("Failed to add epoll_fd: %m\n");
594                 return -1;
595         }
596
597         return 0;
598 }
599
600 void _mutex_lock_acquired(struct mutex *lock, char *file, int line)
601 {
602         lock->line = line;
603         lock->file = file;
604 }
605
606 int _mutex_lock(struct mutex *lock, char *file, int line)
607 {
608         int ret = 0;
609
610         if (!pthread_mutex_trylock(&lock->lock))
611                 goto out_lock;
612
613         pr_info("Lock contention on lock %s on %s:%d\n",
614                 lock->name, lock->file, lock->line);
615
616         ret = pthread_mutex_lock(&lock->lock);
617         if (ret)
618                 pr_err("Acquirin lock %s failed: %m, acquired %s:%d\n",
619                         lock->name, lock->file, lock->line);
620
621 out_lock:
622         _mutex_lock_acquired(lock, file, line);
623         return ret;
624 }
625
626 int _mutex_unlock(struct mutex *lock)
627 {
628         lock->line = 0;
629         lock->file = NULL;
630         pthread_mutex_unlock(&lock->lock);
631
632         return 0;
633 }