#include #include #include #include #include #include #include #include #include "parser.h" #include "debug.h" #include "utils.h" #include "process.h" struct network_parser_data { struct event_handler ev; /* Must be first entry */ char buf[4096]; char last_line[4096]; /* Last complete line, without timestamp */ struct sockaddr_in addr; int idx; /* Index to end of buffer */ time_t last_time; /* time stamp of last complete line */ int fd; int entries; struct mutex lock; pthread_cond_t cond; }; static char *data_addrstr(struct network_parser_data *data) { static char str[32]; snprintf(str, sizeof(str), "%s:%d", inet_ntoa(data->addr.sin_addr), ntohs(data->addr.sin_port)); str[sizeof(str) - 1] = '\0'; return str; } static int parse_ip_port(const char *str, struct network_parser_data *data) { char ip[16]; char *s; long int len; int ret; s = strstr(str, ":"); len = s - str; if (len >= sizeof(ip) || len < 0 || !s) { pr_err("Unable to parse ip:port from string %s, %ld\n", str, len); return -1; } strncpy(ip, str, len); ip[len] = '\0'; ret = inet_aton(ip, &data->addr.sin_addr); if (!ret) { pr_err("Invalid address: %s\n", ip); return -1; } s++; data->addr.sin_port = htons(atoi(s)); pr_info("Parsed %s from %s: \n", data_addrstr(data), str); data->addr.sin_family = AF_INET; return 0; } static int parse_config(const char **parser_data, struct network_parser_data *data) { int num, ret; /* Count entries */ for (num = 0; parser_data[num]; num++) ; if (num != 1) { pr_err("Too many entries in config\n"); return -1; } ret = parse_ip_port(*parser_data, data); if (ret) return ret; data->fd = -1; data->idx = 0; mutex_init(&data->lock); pthread_cond_init(&data->cond, NULL); return 0; } static int network_parser_recv_data(struct event_handler *ev) { struct network_parser_data *data = (struct network_parser_data *)ev; int ret; mutex_lock(&data->lock); if (data->fd == -1) { pr_err("Invalid fd on ev %p\n", ev); mutex_unlock(&data->lock); return 0; } ret = recv(data->fd, data->buf + data->idx, sizeof(data->buf) - data->idx - 1, 0); if (ret < 0) { pr_err("recv: %m\n"); if (ret != EAGAIN) { register_event_handler(&data->ev, EPOLL_CTL_DEL); close(data->fd); data->fd = -1; } mutex_unlock(&data->lock); return ret; } if (!ret) { pr_info("Closing fd %d\n", data->fd); register_event_handler(&data->ev, EPOLL_CTL_DEL); close(data->fd); data->fd = -1; } else { data->idx += ret; data->buf[data->idx] = '\0'; } mutex_unlock(&data->lock); pthread_cond_signal(&data->cond); return ret; } static int parse_line(struct network_parser_data *data) { char *s; int len, entries, i; /* Already parsed? */ if (data->last_time) return 1; s = strstr(data->buf, "\n"); if (!s) return 0; /* No complete line yet */ len = s - data->buf; /* Simple data validation */ for (entries = 0, i = 0; i < len; i++) if (data->buf[i] == ':') entries++; if (entries < 1) { pr_err("Got garbled data\n"); return 0; } data->last_time = atol(data->buf); /* Copy the line into last_line */ memcpy(data->last_line, data->buf, len); pr_info("Parsed line: %s\n", data->last_line); /* Take out line from the buffer */ s++; /* Consume the newline */ len++; memmove(data->buf, s, sizeof(data->buf) - len); data->idx -= len; bzero(data->buf + data->idx, sizeof(data->buf) - data->idx); return 1; } static int network_multi_parser(char ***rrd_data, const char **parser_data, void **s, time_t last_update) { struct timespec maxwait; struct network_parser_data *data = *s; int ret, d; int max_data = 256; char **rdata; if (!data) { data = calloc(1, sizeof(*data)); ret = parse_config(parser_data, data); if (ret) { free(data); return ret; } *s = data; } *rrd_data = rdata = calloc(max_data, sizeof(*data)); data->fd = socket(AF_INET, SOCK_STREAM, 0); if (data->fd == -1) { pr_err("Failed to create socket: %m\n"); return -1; } ret = connect(data->fd, (struct sockaddr *)&data->addr, sizeof(data->addr)); if (ret < 0) { pr_err("Failed to connect to %s: %m\n", data_addrstr(data)); goto out_close_noderegister; } /* Send the last update timestamp to every host */ ret = dprintf(data->fd, "%ld\n", last_update); if (ret < 0) { pr_err("Failed to send timestamp to host %s: %m\n", data_addrstr(data)); goto out_close; } data->ev.fd = data->fd; data->ev.events = EPOLLIN; data->ev.handle_event = network_parser_recv_data; data->ev.name = "network_parser_recv_data"; register_event_handler(&data->ev, EPOLL_CTL_ADD); /* * Try to get as much data in 10 seconds from all of the hosts * as we can. If some host fails to return data us in the 10 * seconds, it probably doesn't return anything at all. */ clock_gettime(CLOCK_REALTIME, &maxwait); maxwait.tv_sec += 10; /* Start processing all of the data */ for (d = 0; d < max_data - 1; d++) { mutex_lock(&data->lock); /* Parse lines until we have a complete line in buffer */ while (1) { if (parse_line(data)) break; if (data->fd == -1) break; ret = pthread_cond_timedwait(&data->cond, &data->lock.lock, &maxwait); if (ret == ETIMEDOUT) { pr_info("Timed out\n"); break; } } /* Did we get a line? */ if (!parse_line(data)) { mutex_unlock(&data->lock); break; } rdata[d] = calloc(1, RRD_DATA_MAX_LEN); strncpy(rdata[d], data->last_line, RRD_DATA_MAX_LEN); data->last_time = 0; bzero(data->last_line, sizeof(data->last_line)); mutex_unlock(&data->lock); } pr_info("Finished at line %d, idx: %d\n", d, data->idx); ret = d; bzero(data->last_line, sizeof(data->last_line)); bzero(data->buf, sizeof(data->buf)); data->idx = 0; out_close: /* * Close the socket if needed. We'll reconnect on the next * call */ if (data->fd != -1) { register_event_handler(&data->ev, EPOLL_CTL_DEL); out_close_noderegister: close(data->fd); data->fd = -1; } return ret; } static struct parser_info network_parser = { .name = "network", .parse_multi = network_multi_parser, }; int register_network_parser(void) { return register_parser(&network_parser); }