]> git.itanic.dy.fi Git - rrdd/commitdiff
Introduce network parser
authorTimo Kokkonen <timo.t.kokkonen@iki.fi>
Sun, 11 Oct 2020 11:05:38 +0000 (14:05 +0300)
committerTimo Kokkonen <timo.t.kokkonen@iki.fi>
Sun, 11 Oct 2020 11:05:38 +0000 (14:05 +0300)
This is a first multiparser. It will fetch rrd data from a network address via tcp.

The communication protocol is simple. At first, the client (rrdd) will
send the last timestamp it had on its rrd database. Then the server
will respond with stream of rrd data, one entry per line. This data is
then fed to rrd database.

Signed-off-by: Timo Kokkonen <timo.t.kokkonen@iki.fi>
Makefile
built_in_parsers.h
main.c
network_parser.c [new file with mode: 0644]

index 2cafd3d0345f3f32484fae90ff650f7890c63142..fe34614c59a8ae3b1880d49ab1f2a5a726289c65 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -3,7 +3,7 @@ LD=ld
 CFLAGS=-Wall -O2 -g -fPIC -D_GNU_SOURCE
 
 RRDD_OBJS= main.o process.o rrdtool.o parser.o built_in_parsers.o string.o \
-               debug.o config.o plugin_manager.o
+               debug.o config.o plugin_manager.o network_parser.o
 
 ONEWIRE_PARSER_OBJS = onewire_parser.o
 
index 17022893772a85ae4facc767205b07896a869fbf..c6d841bceac809155b31bdc886d70de23e1b612e 100644 (file)
@@ -2,5 +2,6 @@
 #define _BUILT_IN_PARSERS_H
 
 int register_built_in_parsers(void);
+int register_network_parser(void);
 
 #endif
diff --git a/main.c b/main.c
index eb19bb9b77a9d407136702a59b962c32e86e22ef..d57fb0b646b36983d67da88a08cc2068435c5a78 100644 (file)
--- a/main.c
+++ b/main.c
@@ -187,6 +187,7 @@ int main(int argc, char *argv[])
 
        init_plugin_manager(argv[0]);
        register_built_in_parsers();
+       register_network_parser();
 
        if (!opts.config_file) {
                pr_err("No database config file given. Nothing to do\n");
diff --git a/network_parser.c b/network_parser.c
new file mode 100644 (file)
index 0000000..0e8475a
--- /dev/null
@@ -0,0 +1,293 @@
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include "parser.h"
+#include "debug.h"
+#include "utils.h"
+#include "process.h"
+
+struct network_parser_data {
+       struct event_handler ev; /* Must be first entry */
+       char buf[4096];
+       char last_line[4096];  /* Last complete line, without timestamp */
+       struct sockaddr_in addr;
+       int idx;        /* Index to end of buffer */
+       time_t last_time; /* time stamp of last complete line */
+       int fd;
+       int entries;
+       struct mutex lock;
+       pthread_cond_t cond;
+};
+
+static char *data_addrstr(struct network_parser_data *data)
+{
+       static char str[32];
+
+       snprintf(str, sizeof(str), "%s:%d",
+               inet_ntoa(data->addr.sin_addr), ntohs(data->addr.sin_port));
+       str[sizeof(str) - 1] = '\0';
+
+       return str;
+}
+
+static int parse_ip_port(const char *str, struct network_parser_data *data)
+{
+       char ip[16];
+       char *s;
+       long int len;
+       int ret;
+
+       s = strstr(str, ":");
+       len = s - str;
+       if (len >= sizeof(ip) || len < 0 || !s) {
+               pr_err("Unable to parse ip:port from string %s, %ld\n", str, len);
+               return -1;
+       }
+
+       strncpy(ip, str, len);
+       ip[len] = '\0';
+
+       ret = inet_aton(ip, &data->addr.sin_addr);
+       if (!ret) {
+               pr_err("Invalid address: %s\n", ip);
+               return -1;
+       }
+
+       s++;
+       data->addr.sin_port = htons(atoi(s));
+
+       pr_info("Parsed %s from %s: \n", data_addrstr(data), str);
+
+       data->addr.sin_family = AF_INET;
+       return 0;
+}
+
+static int parse_config(const char **parser_data, struct network_parser_data *data)
+{
+       int num, ret;
+
+       /* Count entries */
+       for (num = 0; parser_data[num]; num++)
+               ;
+
+       if (num != 1) {
+               pr_err("Too many entries in config\n");
+               return -1;
+       }
+
+       ret = parse_ip_port(*parser_data, data);
+       if (ret)
+               return ret;
+
+       data->fd = -1;
+       data->idx = 0;
+       mutex_init(&data->lock);
+
+       pthread_cond_init(&data->cond, NULL);
+
+       return 0;
+}
+
+static int network_parser_recv_data(struct event_handler *ev)
+{
+       struct network_parser_data *data = (struct network_parser_data *)ev;
+       int ret;
+
+       mutex_lock(&data->lock);
+       if (data->fd == -1) {
+               pr_err("Invalid fd on ev %p\n", ev);
+               mutex_unlock(&data->lock);
+               return 0;
+       }
+
+       ret = recv(data->fd, data->buf + data->idx, sizeof(data->buf) - data->idx - 1, 0);
+       if (ret < 0) {
+               pr_err("recv: %m\n");
+               if (ret != EAGAIN) {
+                       register_event_handler(&data->ev, EPOLL_CTL_DEL);
+                       close(data->fd);
+                       data->fd = -1;
+               }
+
+               mutex_unlock(&data->lock);
+               return ret;
+       }
+
+       if (!ret) {
+               pr_info("Closing fd %d\n", data->fd);
+               register_event_handler(&data->ev, EPOLL_CTL_DEL);
+               close(data->fd);
+               data->fd = -1;
+       } else {
+               data->idx += ret;
+               data->buf[data->idx] = '\0';
+       }
+       mutex_unlock(&data->lock);
+
+       pthread_cond_signal(&data->cond);
+
+       return ret;
+}
+
+static int parse_line(struct network_parser_data *data)
+{
+       char *s;
+       int len, entries, i;
+
+       /* Already parsed? */
+       if (data->last_time)
+               return 1;
+
+       s = strstr(data->buf, "\n");
+       if (!s)
+               return 0; /* No complete line yet */
+
+
+       len = s - data->buf;
+
+       /* Simple data validation */
+       for (entries = 0, i = 0; i < len; i++)
+               if (data->buf[i] == ':')
+                       entries++;
+
+       if (entries < 1) {
+               pr_err("Got garbled data\n");
+               return 0;
+       }
+
+       data->last_time = atol(data->buf);
+
+       /* Copy the line into last_line */
+       memcpy(data->last_line, data->buf, len);
+       pr_info("Parsed line: %s\n", data->last_line);
+
+       /* Take out line from the buffer */
+       s++; /* Consume the newline */
+       len++;
+       memmove(data->buf, s, sizeof(data->buf) - len);
+       data->idx -= len;
+
+       return 1;
+}
+
+static int network_multi_parser(char ***rrd_data, const char **parser_data,
+                               void **s, time_t last_update)
+{
+       struct timespec maxwait;
+       struct network_parser_data *data = *s;
+       int ret, d;
+       int max_data = 256;
+       char **rdata;
+
+       if (!data) {
+               data = calloc(1, sizeof(*data));
+
+               ret = parse_config(parser_data, data);
+               if (ret) {
+                       free(data);
+                       return ret;
+               }
+               *s = data;
+       }
+
+       *rrd_data = rdata = calloc(max_data, sizeof(*data));
+
+       data->fd = socket(AF_INET, SOCK_STREAM, 0);
+       if (data->fd == -1) {
+               printf("Failed to create socket: %m\n");
+               return -1;
+       }
+
+       ret = connect(data->fd, (struct sockaddr *)&data->addr, sizeof(data->addr));
+       if (ret < 0) {
+               pr_err("Failed to connect to %s: %m\n", data_addrstr(data));
+               return -1;
+       }
+
+       /* Send the last update timestamp to every host */
+       ret = dprintf(data->fd, "%ld\n", last_update);
+       if (ret < 0) {
+               pr_err("Failed to send timestamp to host %s: %m\n",
+                       data_addrstr(data));
+               goto out_close;
+       }
+
+       data->ev.fd = data->fd;
+       data->ev.events = EPOLLIN;
+       data->ev.handle_event = network_parser_recv_data;
+       data->ev.name = "network_parser_recv_data";
+       register_event_handler(&data->ev, EPOLL_CTL_ADD);
+
+       /*
+        * Try to get as much data in 10 seconds from all of the hosts
+        * as we can. If some host fails to return data us in the 10
+        * seconds, it probably doesn't return anything at all.
+        */
+       clock_gettime(CLOCK_REALTIME, &maxwait);
+       maxwait.tv_sec += 10;
+
+       /* Start processing all of the data */
+       for (d = 0; d < max_data - 1; d++) {
+               mutex_lock(&data->lock);
+
+               /* Parse lines until we have a complete line in buffer */
+               while (1) {
+                       if (parse_line(data))
+                               break;
+
+                       if (data->fd == -1)
+                               break;
+
+                       ret = pthread_cond_timedwait(&data->cond, &data->lock.lock, &maxwait);
+                       if (ret == ETIMEDOUT) {
+                               pr_info("Timed out\n");
+                               break;
+                       }
+               }
+
+               /* Did we get a line? */
+               if (!parse_line(data)) {
+                       mutex_unlock(&data->lock);
+                       break;
+               }
+
+               rdata[d] = calloc(1, RRD_DATA_MAX_LEN);
+
+               strncpy(rdata[d], data->last_line, RRD_DATA_MAX_LEN);
+               data->last_time = 0;
+               mutex_unlock(&data->lock);
+       }
+
+       pr_info("Finished at line %d, idx: %d\n", d, data->idx);
+
+       ret = d;
+
+out_close:
+       /*
+        * Close the socket if needed. We'll reconnect on the next
+        * call
+        */
+       if (data->fd != -1) {
+               register_event_handler(&data->ev, EPOLL_CTL_DEL);
+               close(data->fd);
+               data->fd = -1;
+       }
+
+       return ret;
+}
+
+static struct parser_info network_parser = {
+       .name = "network",
+       .parse_multi = network_multi_parser,
+};
+
+int register_network_parser(void)
+{
+       return register_parser(&network_parser);
+}