6 #include <sys/socket.h>
7 #include <netinet/in.h>
15 struct network_parser_data {
16 struct event_handler ev; /* Must be first entry */
18 char last_line[4096]; /* Last complete line, without timestamp */
19 struct sockaddr_in addr;
20 int idx; /* Index to end of buffer */
21 time_t last_time; /* time stamp of last complete line */
28 static char *data_addrstr(struct network_parser_data *data)
32 snprintf(str, sizeof(str), "%s:%d",
33 inet_ntoa(data->addr.sin_addr), ntohs(data->addr.sin_port));
34 str[sizeof(str) - 1] = '\0';
39 static int parse_ip_port(const char *str, struct network_parser_data *data)
48 if (len >= sizeof(ip) || len < 0 || !s) {
49 pr_err("Unable to parse ip:port from string %s, %ld\n", str, len);
53 strncpy(ip, str, len);
56 ret = inet_aton(ip, &data->addr.sin_addr);
58 pr_err("Invalid address: %s\n", ip);
63 data->addr.sin_port = htons(atoi(s));
65 pr_info("Parsed %s from %s: \n", data_addrstr(data), str);
67 data->addr.sin_family = AF_INET;
71 static int parse_config(const char **parser_data, struct network_parser_data *data)
76 for (num = 0; parser_data[num]; num++)
80 pr_err("Too many entries in config\n");
84 ret = parse_ip_port(*parser_data, data);
90 mutex_init(&data->lock);
92 pthread_cond_init(&data->cond, NULL);
97 static int network_parser_recv_data(struct event_handler *ev)
99 struct network_parser_data *data = (struct network_parser_data *)ev;
102 mutex_lock(&data->lock);
103 if (data->fd == -1) {
104 pr_err("Invalid fd on ev %p\n", ev);
105 mutex_unlock(&data->lock);
109 ret = recv(data->fd, data->buf + data->idx, sizeof(data->buf) - data->idx - 1, 0);
111 pr_err("recv: %m\n");
113 register_event_handler(&data->ev, EPOLL_CTL_DEL);
118 mutex_unlock(&data->lock);
123 pr_info("Closing fd %d\n", data->fd);
124 register_event_handler(&data->ev, EPOLL_CTL_DEL);
129 data->buf[data->idx] = '\0';
131 mutex_unlock(&data->lock);
133 pthread_cond_signal(&data->cond);
138 static int parse_line(struct network_parser_data *data)
143 /* Already parsed? */
147 s = strstr(data->buf, "\n");
149 return 0; /* No complete line yet */
154 /* Simple data validation */
155 for (entries = 0, i = 0; i < len; i++)
156 if (data->buf[i] == ':')
160 pr_err("Got garbled data\n");
164 data->last_time = atol(data->buf);
166 /* Copy the line into last_line */
167 memcpy(data->last_line, data->buf, len);
168 pr_info("Parsed line: %s\n", data->last_line);
170 /* Take out line from the buffer */
171 s++; /* Consume the newline */
173 memmove(data->buf, s, sizeof(data->buf) - len);
175 bzero(data->buf + data->idx, sizeof(data->buf) - data->idx);
180 static int network_multi_parser(char ***rrd_data, const char **parser_data,
181 void **s, time_t last_update)
183 struct timespec maxwait;
184 struct network_parser_data *data = *s;
190 data = calloc(1, sizeof(*data));
192 ret = parse_config(parser_data, data);
200 *rrd_data = rdata = calloc(max_data, sizeof(*data));
202 data->fd = socket(AF_INET, SOCK_STREAM, 0);
203 if (data->fd == -1) {
204 pr_err("Failed to create socket: %m\n");
208 ret = connect(data->fd, (struct sockaddr *)&data->addr, sizeof(data->addr));
210 pr_err("Failed to connect to %s: %m\n", data_addrstr(data));
211 goto out_close_noderegister;
214 /* Send the last update timestamp to every host */
215 ret = dprintf(data->fd, "%ld\n", last_update);
217 pr_err("Failed to send timestamp to host %s: %m\n",
222 data->ev.fd = data->fd;
223 data->ev.events = EPOLLIN;
224 data->ev.handle_event = network_parser_recv_data;
225 data->ev.name = "network_parser_recv_data";
226 register_event_handler(&data->ev, EPOLL_CTL_ADD);
229 * Try to get as much data in 10 seconds from all of the hosts
230 * as we can. If some host fails to return data us in the 10
231 * seconds, it probably doesn't return anything at all.
233 clock_gettime(CLOCK_REALTIME, &maxwait);
234 maxwait.tv_sec += 10;
236 /* Start processing all of the data */
237 for (d = 0; d < max_data - 1; d++) {
238 mutex_lock(&data->lock);
240 /* Parse lines until we have a complete line in buffer */
242 if (parse_line(data))
248 ret = pthread_cond_timedwait(&data->cond, &data->lock.lock, &maxwait);
249 if (ret == ETIMEDOUT) {
250 pr_info("Timed out\n");
255 /* Did we get a line? */
256 if (!parse_line(data)) {
257 mutex_unlock(&data->lock);
261 rdata[d] = calloc(1, RRD_DATA_MAX_LEN);
263 strncpy(rdata[d], data->last_line, RRD_DATA_MAX_LEN);
265 bzero(data->last_line, sizeof(data->last_line));
266 mutex_unlock(&data->lock);
269 pr_info("Finished at line %d, idx: %d\n", d, data->idx);
272 bzero(data->last_line, sizeof(data->last_line));
273 bzero(data->buf, sizeof(data->buf));
278 * Close the socket if needed. We'll reconnect on the next
281 if (data->fd != -1) {
282 register_event_handler(&data->ev, EPOLL_CTL_DEL);
283 out_close_noderegister:
291 static struct parser_info network_parser = {
293 .parse_multi = network_multi_parser,
296 int register_network_parser(void)
298 return register_parser(&network_parser);