+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include "parser.h"
+#include "debug.h"
+#include "utils.h"
+#include "process.h"
+
+struct network_parser_data {
+ struct event_handler ev; /* Must be first entry */
+ char buf[4096];
+ char last_line[4096]; /* Last complete line, without timestamp */
+ struct sockaddr_in addr;
+ int idx; /* Index to end of buffer */
+ time_t last_time; /* time stamp of last complete line */
+ int fd;
+ int entries;
+ struct mutex lock;
+ pthread_cond_t cond;
+};
+
+static char *data_addrstr(struct network_parser_data *data)
+{
+ static char str[32];
+
+ snprintf(str, sizeof(str), "%s:%d",
+ inet_ntoa(data->addr.sin_addr), ntohs(data->addr.sin_port));
+ str[sizeof(str) - 1] = '\0';
+
+ return str;
+}
+
+static int parse_ip_port(const char *str, struct network_parser_data *data)
+{
+ char ip[16];
+ char *s;
+ long int len;
+ int ret;
+
+ s = strstr(str, ":");
+ len = s - str;
+ if (len >= sizeof(ip) || len < 0 || !s) {
+ pr_err("Unable to parse ip:port from string %s, %ld\n", str, len);
+ return -1;
+ }
+
+ strncpy(ip, str, len);
+ ip[len] = '\0';
+
+ ret = inet_aton(ip, &data->addr.sin_addr);
+ if (!ret) {
+ pr_err("Invalid address: %s\n", ip);
+ return -1;
+ }
+
+ s++;
+ data->addr.sin_port = htons(atoi(s));
+
+ pr_info("Parsed %s from %s: \n", data_addrstr(data), str);
+
+ data->addr.sin_family = AF_INET;
+ return 0;
+}
+
+static int parse_config(const char **parser_data, struct network_parser_data *data)
+{
+ int num, ret;
+
+ /* Count entries */
+ for (num = 0; parser_data[num]; num++)
+ ;
+
+ if (num != 1) {
+ pr_err("Too many entries in config\n");
+ return -1;
+ }
+
+ ret = parse_ip_port(*parser_data, data);
+ if (ret)
+ return ret;
+
+ data->fd = -1;
+ data->idx = 0;
+ mutex_init(&data->lock);
+
+ pthread_cond_init(&data->cond, NULL);
+
+ return 0;
+}
+
+static int network_parser_recv_data(struct event_handler *ev)
+{
+ struct network_parser_data *data = (struct network_parser_data *)ev;
+ int ret;
+
+ mutex_lock(&data->lock);
+ if (data->fd == -1) {
+ pr_err("Invalid fd on ev %p\n", ev);
+ mutex_unlock(&data->lock);
+ return 0;
+ }
+
+ ret = recv(data->fd, data->buf + data->idx, sizeof(data->buf) - data->idx - 1, 0);
+ if (ret < 0) {
+ pr_err("recv: %m\n");
+ if (ret != EAGAIN) {
+ register_event_handler(&data->ev, EPOLL_CTL_DEL);
+ close(data->fd);
+ data->fd = -1;
+ }
+
+ mutex_unlock(&data->lock);
+ return ret;
+ }
+
+ if (!ret) {
+ pr_info("Closing fd %d\n", data->fd);
+ register_event_handler(&data->ev, EPOLL_CTL_DEL);
+ close(data->fd);
+ data->fd = -1;
+ } else {
+ data->idx += ret;
+ data->buf[data->idx] = '\0';
+ }
+ mutex_unlock(&data->lock);
+
+ pthread_cond_signal(&data->cond);
+
+ return ret;
+}
+
+static int parse_line(struct network_parser_data *data)
+{
+ char *s;
+ int len, entries, i;
+
+ /* Already parsed? */
+ if (data->last_time)
+ return 1;
+
+ s = strstr(data->buf, "\n");
+ if (!s)
+ return 0; /* No complete line yet */
+
+
+ len = s - data->buf;
+
+ /* Simple data validation */
+ for (entries = 0, i = 0; i < len; i++)
+ if (data->buf[i] == ':')
+ entries++;
+
+ if (entries < 1) {
+ pr_err("Got garbled data\n");
+ return 0;
+ }
+
+ data->last_time = atol(data->buf);
+
+ /* Copy the line into last_line */
+ memcpy(data->last_line, data->buf, len);
+ pr_info("Parsed line: %s\n", data->last_line);
+
+ /* Take out line from the buffer */
+ s++; /* Consume the newline */
+ len++;
+ memmove(data->buf, s, sizeof(data->buf) - len);
+ data->idx -= len;
+
+ return 1;
+}
+
+static int network_multi_parser(char ***rrd_data, const char **parser_data,
+ void **s, time_t last_update)
+{
+ struct timespec maxwait;
+ struct network_parser_data *data = *s;
+ int ret, d;
+ int max_data = 256;
+ char **rdata;
+
+ if (!data) {
+ data = calloc(1, sizeof(*data));
+
+ ret = parse_config(parser_data, data);
+ if (ret) {
+ free(data);
+ return ret;
+ }
+ *s = data;
+ }
+
+ *rrd_data = rdata = calloc(max_data, sizeof(*data));
+
+ data->fd = socket(AF_INET, SOCK_STREAM, 0);
+ if (data->fd == -1) {
+ printf("Failed to create socket: %m\n");
+ return -1;
+ }
+
+ ret = connect(data->fd, (struct sockaddr *)&data->addr, sizeof(data->addr));
+ if (ret < 0) {
+ pr_err("Failed to connect to %s: %m\n", data_addrstr(data));
+ return -1;
+ }
+
+ /* Send the last update timestamp to every host */
+ ret = dprintf(data->fd, "%ld\n", last_update);
+ if (ret < 0) {
+ pr_err("Failed to send timestamp to host %s: %m\n",
+ data_addrstr(data));
+ goto out_close;
+ }
+
+ data->ev.fd = data->fd;
+ data->ev.events = EPOLLIN;
+ data->ev.handle_event = network_parser_recv_data;
+ data->ev.name = "network_parser_recv_data";
+ register_event_handler(&data->ev, EPOLL_CTL_ADD);
+
+ /*
+ * Try to get as much data in 10 seconds from all of the hosts
+ * as we can. If some host fails to return data us in the 10
+ * seconds, it probably doesn't return anything at all.
+ */
+ clock_gettime(CLOCK_REALTIME, &maxwait);
+ maxwait.tv_sec += 10;
+
+ /* Start processing all of the data */
+ for (d = 0; d < max_data - 1; d++) {
+ mutex_lock(&data->lock);
+
+ /* Parse lines until we have a complete line in buffer */
+ while (1) {
+ if (parse_line(data))
+ break;
+
+ if (data->fd == -1)
+ break;
+
+ ret = pthread_cond_timedwait(&data->cond, &data->lock.lock, &maxwait);
+ if (ret == ETIMEDOUT) {
+ pr_info("Timed out\n");
+ break;
+ }
+ }
+
+ /* Did we get a line? */
+ if (!parse_line(data)) {
+ mutex_unlock(&data->lock);
+ break;
+ }
+
+ rdata[d] = calloc(1, RRD_DATA_MAX_LEN);
+
+ strncpy(rdata[d], data->last_line, RRD_DATA_MAX_LEN);
+ data->last_time = 0;
+ mutex_unlock(&data->lock);
+ }
+
+ pr_info("Finished at line %d, idx: %d\n", d, data->idx);
+
+ ret = d;
+
+out_close:
+ /*
+ * Close the socket if needed. We'll reconnect on the next
+ * call
+ */
+ if (data->fd != -1) {
+ register_event_handler(&data->ev, EPOLL_CTL_DEL);
+ close(data->fd);
+ data->fd = -1;
+ }
+
+ return ret;
+}
+
+static struct parser_info network_parser = {
+ .name = "network",
+ .parse_multi = network_multi_parser,
+};
+
+int register_network_parser(void)
+{
+ return register_parser(&network_parser);
+}