]> git.itanic.dy.fi Git - rrdd/blobdiff - network_parser.c
Introduce network parser
[rrdd] / network_parser.c
diff --git a/network_parser.c b/network_parser.c
new file mode 100644 (file)
index 0000000..0e8475a
--- /dev/null
@@ -0,0 +1,293 @@
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include "parser.h"
+#include "debug.h"
+#include "utils.h"
+#include "process.h"
+
+struct network_parser_data {
+       struct event_handler ev; /* Must be first entry */
+       char buf[4096];
+       char last_line[4096];  /* Last complete line, without timestamp */
+       struct sockaddr_in addr;
+       int idx;        /* Index to end of buffer */
+       time_t last_time; /* time stamp of last complete line */
+       int fd;
+       int entries;
+       struct mutex lock;
+       pthread_cond_t cond;
+};
+
+static char *data_addrstr(struct network_parser_data *data)
+{
+       static char str[32];
+
+       snprintf(str, sizeof(str), "%s:%d",
+               inet_ntoa(data->addr.sin_addr), ntohs(data->addr.sin_port));
+       str[sizeof(str) - 1] = '\0';
+
+       return str;
+}
+
+static int parse_ip_port(const char *str, struct network_parser_data *data)
+{
+       char ip[16];
+       char *s;
+       long int len;
+       int ret;
+
+       s = strstr(str, ":");
+       len = s - str;
+       if (len >= sizeof(ip) || len < 0 || !s) {
+               pr_err("Unable to parse ip:port from string %s, %ld\n", str, len);
+               return -1;
+       }
+
+       strncpy(ip, str, len);
+       ip[len] = '\0';
+
+       ret = inet_aton(ip, &data->addr.sin_addr);
+       if (!ret) {
+               pr_err("Invalid address: %s\n", ip);
+               return -1;
+       }
+
+       s++;
+       data->addr.sin_port = htons(atoi(s));
+
+       pr_info("Parsed %s from %s: \n", data_addrstr(data), str);
+
+       data->addr.sin_family = AF_INET;
+       return 0;
+}
+
+static int parse_config(const char **parser_data, struct network_parser_data *data)
+{
+       int num, ret;
+
+       /* Count entries */
+       for (num = 0; parser_data[num]; num++)
+               ;
+
+       if (num != 1) {
+               pr_err("Too many entries in config\n");
+               return -1;
+       }
+
+       ret = parse_ip_port(*parser_data, data);
+       if (ret)
+               return ret;
+
+       data->fd = -1;
+       data->idx = 0;
+       mutex_init(&data->lock);
+
+       pthread_cond_init(&data->cond, NULL);
+
+       return 0;
+}
+
+static int network_parser_recv_data(struct event_handler *ev)
+{
+       struct network_parser_data *data = (struct network_parser_data *)ev;
+       int ret;
+
+       mutex_lock(&data->lock);
+       if (data->fd == -1) {
+               pr_err("Invalid fd on ev %p\n", ev);
+               mutex_unlock(&data->lock);
+               return 0;
+       }
+
+       ret = recv(data->fd, data->buf + data->idx, sizeof(data->buf) - data->idx - 1, 0);
+       if (ret < 0) {
+               pr_err("recv: %m\n");
+               if (ret != EAGAIN) {
+                       register_event_handler(&data->ev, EPOLL_CTL_DEL);
+                       close(data->fd);
+                       data->fd = -1;
+               }
+
+               mutex_unlock(&data->lock);
+               return ret;
+       }
+
+       if (!ret) {
+               pr_info("Closing fd %d\n", data->fd);
+               register_event_handler(&data->ev, EPOLL_CTL_DEL);
+               close(data->fd);
+               data->fd = -1;
+       } else {
+               data->idx += ret;
+               data->buf[data->idx] = '\0';
+       }
+       mutex_unlock(&data->lock);
+
+       pthread_cond_signal(&data->cond);
+
+       return ret;
+}
+
+static int parse_line(struct network_parser_data *data)
+{
+       char *s;
+       int len, entries, i;
+
+       /* Already parsed? */
+       if (data->last_time)
+               return 1;
+
+       s = strstr(data->buf, "\n");
+       if (!s)
+               return 0; /* No complete line yet */
+
+
+       len = s - data->buf;
+
+       /* Simple data validation */
+       for (entries = 0, i = 0; i < len; i++)
+               if (data->buf[i] == ':')
+                       entries++;
+
+       if (entries < 1) {
+               pr_err("Got garbled data\n");
+               return 0;
+       }
+
+       data->last_time = atol(data->buf);
+
+       /* Copy the line into last_line */
+       memcpy(data->last_line, data->buf, len);
+       pr_info("Parsed line: %s\n", data->last_line);
+
+       /* Take out line from the buffer */
+       s++; /* Consume the newline */
+       len++;
+       memmove(data->buf, s, sizeof(data->buf) - len);
+       data->idx -= len;
+
+       return 1;
+}
+
+static int network_multi_parser(char ***rrd_data, const char **parser_data,
+                               void **s, time_t last_update)
+{
+       struct timespec maxwait;
+       struct network_parser_data *data = *s;
+       int ret, d;
+       int max_data = 256;
+       char **rdata;
+
+       if (!data) {
+               data = calloc(1, sizeof(*data));
+
+               ret = parse_config(parser_data, data);
+               if (ret) {
+                       free(data);
+                       return ret;
+               }
+               *s = data;
+       }
+
+       *rrd_data = rdata = calloc(max_data, sizeof(*data));
+
+       data->fd = socket(AF_INET, SOCK_STREAM, 0);
+       if (data->fd == -1) {
+               printf("Failed to create socket: %m\n");
+               return -1;
+       }
+
+       ret = connect(data->fd, (struct sockaddr *)&data->addr, sizeof(data->addr));
+       if (ret < 0) {
+               pr_err("Failed to connect to %s: %m\n", data_addrstr(data));
+               return -1;
+       }
+
+       /* Send the last update timestamp to every host */
+       ret = dprintf(data->fd, "%ld\n", last_update);
+       if (ret < 0) {
+               pr_err("Failed to send timestamp to host %s: %m\n",
+                       data_addrstr(data));
+               goto out_close;
+       }
+
+       data->ev.fd = data->fd;
+       data->ev.events = EPOLLIN;
+       data->ev.handle_event = network_parser_recv_data;
+       data->ev.name = "network_parser_recv_data";
+       register_event_handler(&data->ev, EPOLL_CTL_ADD);
+
+       /*
+        * Try to get as much data in 10 seconds from all of the hosts
+        * as we can. If some host fails to return data us in the 10
+        * seconds, it probably doesn't return anything at all.
+        */
+       clock_gettime(CLOCK_REALTIME, &maxwait);
+       maxwait.tv_sec += 10;
+
+       /* Start processing all of the data */
+       for (d = 0; d < max_data - 1; d++) {
+               mutex_lock(&data->lock);
+
+               /* Parse lines until we have a complete line in buffer */
+               while (1) {
+                       if (parse_line(data))
+                               break;
+
+                       if (data->fd == -1)
+                               break;
+
+                       ret = pthread_cond_timedwait(&data->cond, &data->lock.lock, &maxwait);
+                       if (ret == ETIMEDOUT) {
+                               pr_info("Timed out\n");
+                               break;
+                       }
+               }
+
+               /* Did we get a line? */
+               if (!parse_line(data)) {
+                       mutex_unlock(&data->lock);
+                       break;
+               }
+
+               rdata[d] = calloc(1, RRD_DATA_MAX_LEN);
+
+               strncpy(rdata[d], data->last_line, RRD_DATA_MAX_LEN);
+               data->last_time = 0;
+               mutex_unlock(&data->lock);
+       }
+
+       pr_info("Finished at line %d, idx: %d\n", d, data->idx);
+
+       ret = d;
+
+out_close:
+       /*
+        * Close the socket if needed. We'll reconnect on the next
+        * call
+        */
+       if (data->fd != -1) {
+               register_event_handler(&data->ev, EPOLL_CTL_DEL);
+               close(data->fd);
+               data->fd = -1;
+       }
+
+       return ret;
+}
+
+static struct parser_info network_parser = {
+       .name = "network",
+       .parse_multi = network_multi_parser,
+};
+
+int register_network_parser(void)
+{
+       return register_parser(&network_parser);
+}