]> git.itanic.dy.fi Git - rrdd/blob - network_parser.c
onewire_parser.c: Fix compiler warnings about string lengths
[rrdd] / network_parser.c
1 #include <stdlib.h>
2 #include <stdio.h>
3 #include <string.h>
4 #include <errno.h>
5 #include <sys/types.h>
6 #include <sys/socket.h>
7 #include <netinet/in.h>
8 #include <arpa/inet.h>
9
10 #include "parser.h"
11 #include "debug.h"
12 #include "utils.h"
13 #include "process.h"
14
15 struct network_parser_data {
16         struct event_handler ev; /* Must be first entry */
17         char buf[4096];
18         char last_line[4096];  /* Last complete line, without timestamp */
19         struct sockaddr_in addr;
20         int idx;        /* Index to end of buffer */
21         time_t last_time; /* time stamp of last complete line */
22         int fd;
23         int entries;
24         struct mutex lock;
25         pthread_cond_t cond;
26 };
27
28 static char *data_addrstr(struct network_parser_data *data)
29 {
30         static char str[32];
31
32         snprintf(str, sizeof(str), "%s:%d",
33                 inet_ntoa(data->addr.sin_addr), ntohs(data->addr.sin_port));
34         str[sizeof(str) - 1] = '\0';
35
36         return str;
37 }
38
39 static int parse_ip_port(const char *str, struct network_parser_data *data)
40 {
41         char ip[16];
42         char *s;
43         long int len;
44         int ret;
45
46         s = strstr(str, ":");
47         len = s - str;
48         if (len >= sizeof(ip) || len < 0 || !s) {
49                 pr_err("Unable to parse ip:port from string %s, %ld\n", str, len);
50                 return -1;
51         }
52
53         strncpy(ip, str, len);
54         ip[len] = '\0';
55
56         ret = inet_aton(ip, &data->addr.sin_addr);
57         if (!ret) {
58                 pr_err("Invalid address: %s\n", ip);
59                 return -1;
60         }
61
62         s++;
63         data->addr.sin_port = htons(atoi(s));
64
65         pr_info("Parsed %s from %s: \n", data_addrstr(data), str);
66
67         data->addr.sin_family = AF_INET;
68         return 0;
69 }
70
71 static int parse_config(const char **parser_data, struct network_parser_data *data)
72 {
73         int num, ret;
74
75         /* Count entries */
76         for (num = 0; parser_data[num]; num++)
77                 ;
78
79         if (num != 1) {
80                 pr_err("Too many entries in config\n");
81                 return -1;
82         }
83
84         ret = parse_ip_port(*parser_data, data);
85         if (ret)
86                 return ret;
87
88         data->fd = -1;
89         data->idx = 0;
90         mutex_init(&data->lock);
91
92         pthread_cond_init(&data->cond, NULL);
93
94         return 0;
95 }
96
97 static int network_parser_recv_data(struct event_handler *ev)
98 {
99         struct network_parser_data *data = (struct network_parser_data *)ev;
100         int ret;
101
102         mutex_lock(&data->lock);
103         if (data->fd == -1) {
104                 pr_err("Invalid fd on ev %p\n", ev);
105                 mutex_unlock(&data->lock);
106                 return 0;
107         }
108
109         ret = recv(data->fd, data->buf + data->idx, sizeof(data->buf) - data->idx - 1, 0);
110         if (ret < 0) {
111                 pr_err("recv: %m\n");
112                 if (ret != EAGAIN) {
113                         register_event_handler(&data->ev, EPOLL_CTL_DEL);
114                         close(data->fd);
115                         data->fd = -1;
116                 }
117
118                 mutex_unlock(&data->lock);
119                 return ret;
120         }
121
122         if (!ret) {
123                 pr_info("Closing fd %d\n", data->fd);
124                 register_event_handler(&data->ev, EPOLL_CTL_DEL);
125                 close(data->fd);
126                 data->fd = -1;
127         } else {
128                 data->idx += ret;
129                 data->buf[data->idx] = '\0';
130         }
131         mutex_unlock(&data->lock);
132
133         pthread_cond_signal(&data->cond);
134
135         return ret;
136 }
137
138 static int parse_line(struct network_parser_data *data)
139 {
140         char *s;
141         int len, entries, i;
142
143         /* Already parsed? */
144         if (data->last_time)
145                 return 1;
146
147         s = strstr(data->buf, "\n");
148         if (!s)
149                 return 0; /* No complete line yet */
150
151
152         len = s - data->buf;
153
154         /* Simple data validation */
155         for (entries = 0, i = 0; i < len; i++)
156                 if (data->buf[i] == ':')
157                         entries++;
158
159         if (entries < 1) {
160                 pr_err("Got garbled data\n");
161                 return 0;
162         }
163
164         data->last_time = atol(data->buf);
165
166         /* Copy the line into last_line */
167         memcpy(data->last_line, data->buf, len);
168         pr_info("Parsed line: %s\n", data->last_line);
169
170         /* Take out line from the buffer */
171         s++; /* Consume the newline */
172         len++;
173         memmove(data->buf, s, sizeof(data->buf) - len);
174         data->idx -= len;
175         bzero(data->buf + data->idx, sizeof(data->buf) - data->idx);
176
177         return 1;
178 }
179
180 static int network_multi_parser(char ***rrd_data, const char **parser_data,
181                                 void **s, time_t last_update)
182 {
183         struct timespec maxwait;
184         struct network_parser_data *data = *s;
185         int ret, d;
186         int max_data = 256;
187         char **rdata;
188
189         if (!data) {
190                 data = calloc(1, sizeof(*data));
191
192                 ret = parse_config(parser_data, data);
193                 if (ret) {
194                         free(data);
195                         return ret;
196                 }
197                 *s = data;
198         }
199
200         *rrd_data = rdata = calloc(max_data, sizeof(*data));
201
202         data->fd = socket(AF_INET, SOCK_STREAM, 0);
203         if (data->fd == -1) {
204                 pr_err("Failed to create socket: %m\n");
205                 return -1;
206         }
207
208         ret = connect(data->fd, (struct sockaddr *)&data->addr, sizeof(data->addr));
209         if (ret < 0) {
210                 pr_err("Failed to connect to %s: %m\n", data_addrstr(data));
211                 goto out_close_noderegister;
212         }
213
214         /* Send the last update timestamp to every host */
215         ret = dprintf(data->fd, "%ld\n", last_update);
216         if (ret < 0) {
217                 pr_err("Failed to send timestamp to host %s: %m\n",
218                         data_addrstr(data));
219                 goto out_close;
220         }
221
222         data->ev.fd = data->fd;
223         data->ev.events = EPOLLIN;
224         data->ev.handle_event = network_parser_recv_data;
225         data->ev.name = "network_parser_recv_data";
226         register_event_handler(&data->ev, EPOLL_CTL_ADD);
227
228         /*
229          * Try to get as much data in 10 seconds from all of the hosts
230          * as we can. If some host fails to return data us in the 10
231          * seconds, it probably doesn't return anything at all.
232          */
233         clock_gettime(CLOCK_REALTIME, &maxwait);
234         maxwait.tv_sec += 10;
235
236         /* Start processing all of the data */
237         for (d = 0; d < max_data - 1; d++) {
238                 mutex_lock(&data->lock);
239
240                 /* Parse lines until we have a complete line in buffer */
241                 while (1) {
242                         if (parse_line(data))
243                                 break;
244
245                         if (data->fd == -1)
246                                 break;
247
248                         ret = pthread_cond_timedwait(&data->cond, &data->lock.lock, &maxwait);
249                         if (ret == ETIMEDOUT) {
250                                 pr_info("Timed out\n");
251                                 break;
252                         }
253                 }
254
255                 /* Did we get a line? */
256                 if (!parse_line(data)) {
257                         mutex_unlock(&data->lock);
258                         break;
259                 }
260
261                 rdata[d] = calloc(1, RRD_DATA_MAX_LEN);
262
263                 strncpy(rdata[d], data->last_line, RRD_DATA_MAX_LEN);
264                 data->last_time = 0;
265                 bzero(data->last_line, sizeof(data->last_line));
266                 mutex_unlock(&data->lock);
267         }
268
269         pr_info("Finished at line %d, idx: %d\n", d, data->idx);
270
271         ret = d;
272         bzero(data->last_line, sizeof(data->last_line));
273         bzero(data->buf, sizeof(data->buf));
274         data->idx = 0;
275
276 out_close:
277         /*
278          * Close the socket if needed. We'll reconnect on the next
279          * call
280          */
281         if (data->fd != -1) {
282                 register_event_handler(&data->ev, EPOLL_CTL_DEL);
283 out_close_noderegister:
284                 close(data->fd);
285                 data->fd = -1;
286         }
287
288         return ret;
289 }
290
291 static struct parser_info network_parser = {
292         .name = "network",
293         .parse_multi = network_multi_parser,
294 };
295
296 int register_network_parser(void)
297 {
298         return register_parser(&network_parser);
299 }