From: Timo Kokkonen Date: Sun, 11 Oct 2020 11:05:38 +0000 (+0300) Subject: Introduce network parser X-Git-Url: http://git.itanic.dy.fi/?p=rrdd;a=commitdiff_plain;h=6aa4a171029a0d2a2299d1216bfbbb3446d32420 Introduce network parser This is a first multiparser. It will fetch rrd data from a network address via tcp. The communication protocol is simple. At first, the client (rrdd) will send the last timestamp it had on its rrd database. Then the server will respond with stream of rrd data, one entry per line. This data is then fed to rrd database. Signed-off-by: Timo Kokkonen --- diff --git a/Makefile b/Makefile index 2cafd3d..fe34614 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ LD=ld CFLAGS=-Wall -O2 -g -fPIC -D_GNU_SOURCE RRDD_OBJS= main.o process.o rrdtool.o parser.o built_in_parsers.o string.o \ - debug.o config.o plugin_manager.o + debug.o config.o plugin_manager.o network_parser.o ONEWIRE_PARSER_OBJS = onewire_parser.o diff --git a/built_in_parsers.h b/built_in_parsers.h index 1702289..c6d841b 100644 --- a/built_in_parsers.h +++ b/built_in_parsers.h @@ -2,5 +2,6 @@ #define _BUILT_IN_PARSERS_H int register_built_in_parsers(void); +int register_network_parser(void); #endif diff --git a/main.c b/main.c index eb19bb9..d57fb0b 100644 --- a/main.c +++ b/main.c @@ -187,6 +187,7 @@ int main(int argc, char *argv[]) init_plugin_manager(argv[0]); register_built_in_parsers(); + register_network_parser(); if (!opts.config_file) { pr_err("No database config file given. Nothing to do\n"); diff --git a/network_parser.c b/network_parser.c new file mode 100644 index 0000000..0e8475a --- /dev/null +++ b/network_parser.c @@ -0,0 +1,293 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "parser.h" +#include "debug.h" +#include "utils.h" +#include "process.h" + +struct network_parser_data { + struct event_handler ev; /* Must be first entry */ + char buf[4096]; + char last_line[4096]; /* Last complete line, without timestamp */ + struct sockaddr_in addr; + int idx; /* Index to end of buffer */ + time_t last_time; /* time stamp of last complete line */ + int fd; + int entries; + struct mutex lock; + pthread_cond_t cond; +}; + +static char *data_addrstr(struct network_parser_data *data) +{ + static char str[32]; + + snprintf(str, sizeof(str), "%s:%d", + inet_ntoa(data->addr.sin_addr), ntohs(data->addr.sin_port)); + str[sizeof(str) - 1] = '\0'; + + return str; +} + +static int parse_ip_port(const char *str, struct network_parser_data *data) +{ + char ip[16]; + char *s; + long int len; + int ret; + + s = strstr(str, ":"); + len = s - str; + if (len >= sizeof(ip) || len < 0 || !s) { + pr_err("Unable to parse ip:port from string %s, %ld\n", str, len); + return -1; + } + + strncpy(ip, str, len); + ip[len] = '\0'; + + ret = inet_aton(ip, &data->addr.sin_addr); + if (!ret) { + pr_err("Invalid address: %s\n", ip); + return -1; + } + + s++; + data->addr.sin_port = htons(atoi(s)); + + pr_info("Parsed %s from %s: \n", data_addrstr(data), str); + + data->addr.sin_family = AF_INET; + return 0; +} + +static int parse_config(const char **parser_data, struct network_parser_data *data) +{ + int num, ret; + + /* Count entries */ + for (num = 0; parser_data[num]; num++) + ; + + if (num != 1) { + pr_err("Too many entries in config\n"); + return -1; + } + + ret = parse_ip_port(*parser_data, data); + if (ret) + return ret; + + data->fd = -1; + data->idx = 0; + mutex_init(&data->lock); + + pthread_cond_init(&data->cond, NULL); + + return 0; +} + +static int network_parser_recv_data(struct event_handler *ev) +{ + struct network_parser_data *data = (struct network_parser_data *)ev; + int ret; + + mutex_lock(&data->lock); + if (data->fd == -1) { + pr_err("Invalid fd on ev %p\n", ev); + mutex_unlock(&data->lock); + return 0; + } + + ret = recv(data->fd, data->buf + data->idx, sizeof(data->buf) - data->idx - 1, 0); + if (ret < 0) { + pr_err("recv: %m\n"); + if (ret != EAGAIN) { + register_event_handler(&data->ev, EPOLL_CTL_DEL); + close(data->fd); + data->fd = -1; + } + + mutex_unlock(&data->lock); + return ret; + } + + if (!ret) { + pr_info("Closing fd %d\n", data->fd); + register_event_handler(&data->ev, EPOLL_CTL_DEL); + close(data->fd); + data->fd = -1; + } else { + data->idx += ret; + data->buf[data->idx] = '\0'; + } + mutex_unlock(&data->lock); + + pthread_cond_signal(&data->cond); + + return ret; +} + +static int parse_line(struct network_parser_data *data) +{ + char *s; + int len, entries, i; + + /* Already parsed? */ + if (data->last_time) + return 1; + + s = strstr(data->buf, "\n"); + if (!s) + return 0; /* No complete line yet */ + + + len = s - data->buf; + + /* Simple data validation */ + for (entries = 0, i = 0; i < len; i++) + if (data->buf[i] == ':') + entries++; + + if (entries < 1) { + pr_err("Got garbled data\n"); + return 0; + } + + data->last_time = atol(data->buf); + + /* Copy the line into last_line */ + memcpy(data->last_line, data->buf, len); + pr_info("Parsed line: %s\n", data->last_line); + + /* Take out line from the buffer */ + s++; /* Consume the newline */ + len++; + memmove(data->buf, s, sizeof(data->buf) - len); + data->idx -= len; + + return 1; +} + +static int network_multi_parser(char ***rrd_data, const char **parser_data, + void **s, time_t last_update) +{ + struct timespec maxwait; + struct network_parser_data *data = *s; + int ret, d; + int max_data = 256; + char **rdata; + + if (!data) { + data = calloc(1, sizeof(*data)); + + ret = parse_config(parser_data, data); + if (ret) { + free(data); + return ret; + } + *s = data; + } + + *rrd_data = rdata = calloc(max_data, sizeof(*data)); + + data->fd = socket(AF_INET, SOCK_STREAM, 0); + if (data->fd == -1) { + printf("Failed to create socket: %m\n"); + return -1; + } + + ret = connect(data->fd, (struct sockaddr *)&data->addr, sizeof(data->addr)); + if (ret < 0) { + pr_err("Failed to connect to %s: %m\n", data_addrstr(data)); + return -1; + } + + /* Send the last update timestamp to every host */ + ret = dprintf(data->fd, "%ld\n", last_update); + if (ret < 0) { + pr_err("Failed to send timestamp to host %s: %m\n", + data_addrstr(data)); + goto out_close; + } + + data->ev.fd = data->fd; + data->ev.events = EPOLLIN; + data->ev.handle_event = network_parser_recv_data; + data->ev.name = "network_parser_recv_data"; + register_event_handler(&data->ev, EPOLL_CTL_ADD); + + /* + * Try to get as much data in 10 seconds from all of the hosts + * as we can. If some host fails to return data us in the 10 + * seconds, it probably doesn't return anything at all. + */ + clock_gettime(CLOCK_REALTIME, &maxwait); + maxwait.tv_sec += 10; + + /* Start processing all of the data */ + for (d = 0; d < max_data - 1; d++) { + mutex_lock(&data->lock); + + /* Parse lines until we have a complete line in buffer */ + while (1) { + if (parse_line(data)) + break; + + if (data->fd == -1) + break; + + ret = pthread_cond_timedwait(&data->cond, &data->lock.lock, &maxwait); + if (ret == ETIMEDOUT) { + pr_info("Timed out\n"); + break; + } + } + + /* Did we get a line? */ + if (!parse_line(data)) { + mutex_unlock(&data->lock); + break; + } + + rdata[d] = calloc(1, RRD_DATA_MAX_LEN); + + strncpy(rdata[d], data->last_line, RRD_DATA_MAX_LEN); + data->last_time = 0; + mutex_unlock(&data->lock); + } + + pr_info("Finished at line %d, idx: %d\n", d, data->idx); + + ret = d; + +out_close: + /* + * Close the socket if needed. We'll reconnect on the next + * call + */ + if (data->fd != -1) { + register_event_handler(&data->ev, EPOLL_CTL_DEL); + close(data->fd); + data->fd = -1; + } + + return ret; +} + +static struct parser_info network_parser = { + .name = "network", + .parse_multi = network_multi_parser, +}; + +int register_network_parser(void) +{ + return register_parser(&network_parser); +}