]> git.itanic.dy.fi Git - rrdd/blob - network_parser.c
onewire_parser.c: Fix compiler warnings about string lengths
[rrdd] / network_parser.c
1 #include <stdlib.h>
2 #include <stdio.h>
3 #include <string.h>
4 #include <errno.h>
5 #include <sys/types.h>
6 #include <sys/socket.h>
7 #include <netinet/in.h>
8 #include <arpa/inet.h>
9
10 #include "parser.h"
11 #include "debug.h"
12 #include "utils.h"
13 #include "process.h"
14
15 struct network_parser_data {
16         struct event_handler ev; /* Must be first entry */
17         char buf[4096];
18         char last_line[4096];  /* Last complete line, without timestamp */
19         struct sockaddr_in addr;
20         int idx;        /* Index to end of buffer */
21         time_t last_time; /* time stamp of last complete line */
22         int fd;
23         int entries;
24         struct mutex lock;
25         pthread_cond_t cond;
26 };
27
28 static char *data_addrstr(struct network_parser_data *data)
29 {
30         static char str[32];
31
32         snprintf(str, sizeof(str), "%s:%d",
33                 inet_ntoa(data->addr.sin_addr), ntohs(data->addr.sin_port));
34         str[sizeof(str) - 1] = '\0';
35
36         return str;
37 }
38
39 static int parse_ip_port(const char *str, struct network_parser_data *data)
40 {
41         char ip[16];
42         char *s;
43         long int len;
44         int ret;
45
46         s = strstr(str, ":");
47         len = s - str;
48         if (len >= sizeof(ip) || len < 0 || !s) {
49                 pr_err("Unable to parse ip:port from string %s, %ld\n", str, len);
50                 return -1;
51         }
52
53         strncpy(ip, str, len);
54         ip[len] = '\0';
55
56         ret = inet_aton(ip, &data->addr.sin_addr);
57         if (!ret) {
58                 pr_err("Invalid address: %s\n", ip);
59                 return -1;
60         }
61
62         s++;
63         data->addr.sin_port = htons(atoi(s));
64
65         pr_info("Parsed %s from %s: \n", data_addrstr(data), str);
66
67         data->addr.sin_family = AF_INET;
68         return 0;
69 }
70
71 static int parse_config(const char **parser_data, struct network_parser_data *data)
72 {
73         int num, ret;
74
75         /* Count entries */
76         for (num = 0; parser_data[num]; num++)
77                 ;
78
79         if (num != 1) {
80                 pr_err("Too many entries in config\n");
81                 return -1;
82         }
83
84         ret = parse_ip_port(*parser_data, data);
85         if (ret)
86                 return ret;
87
88         data->fd = -1;
89         data->idx = 0;
90         mutex_init(&data->lock);
91
92         pthread_cond_init(&data->cond, NULL);
93
94         return 0;
95 }
96
97 static int network_parser_recv_data(struct event_handler *ev)
98 {
99         struct network_parser_data *data = (struct network_parser_data *)ev;
100         int ret;
101
102         mutex_lock(&data->lock);
103         if (data->fd == -1) {
104                 pr_err("Invalid fd on ev %p\n", ev);
105                 mutex_unlock(&data->lock);
106                 return 0;
107         }
108
109         ret = recv(data->fd, data->buf + data->idx, sizeof(data->buf) - data->idx - 1, 0);
110         if (ret < 0) {
111                 pr_err("recv: %m\n");
112                 if (ret != EAGAIN) {
113                         register_event_handler(&data->ev, EPOLL_CTL_DEL);
114                         close(data->fd);
115                         data->fd = -1;
116                 }
117
118                 mutex_unlock(&data->lock);
119                 return ret;
120         }
121
122         if (!ret) {
123                 pr_info("Closing fd %d\n", data->fd);
124                 register_event_handler(&data->ev, EPOLL_CTL_DEL);
125                 close(data->fd);
126                 data->fd = -1;
127         } else {
128                 data->idx += ret;
129                 data->buf[data->idx] = '\0';
130         }
131         mutex_unlock(&data->lock);
132
133         pthread_cond_signal(&data->cond);
134
135         return ret;
136 }
137
138 static int parse_line(struct network_parser_data *data)
139 {
140         char *s;
141         int len, entries, i;
142
143         /* Already parsed? */
144         if (data->last_time)
145                 return 1;
146
147         s = strstr(data->buf, "\n");
148         if (!s)
149                 return 0; /* No complete line yet */
150
151
152         len = s - data->buf;
153
154         /* Simple data validation */
155         for (entries = 0, i = 0; i < len; i++)
156                 if (data->buf[i] == ':')
157                         entries++;
158
159         if (entries < 1) {
160                 pr_err("Got garbled data\n");
161                 return 0;
162         }
163
164         data->last_time = atol(data->buf);
165
166         /* Copy the line into last_line */
167         memcpy(data->last_line, data->buf, len);
168         pr_info("Parsed line: %s\n", data->last_line);
169
170         /* Take out line from the buffer */
171         s++; /* Consume the newline */
172         len++;
173         memmove(data->buf, s, sizeof(data->buf) - len);
174         data->idx -= len;
175
176         return 1;
177 }
178
179 static int network_multi_parser(char ***rrd_data, const char **parser_data,
180                                 void **s, time_t last_update)
181 {
182         struct timespec maxwait;
183         struct network_parser_data *data = *s;
184         int ret, d;
185         int max_data = 256;
186         char **rdata;
187
188         if (!data) {
189                 data = calloc(1, sizeof(*data));
190
191                 ret = parse_config(parser_data, data);
192                 if (ret) {
193                         free(data);
194                         return ret;
195                 }
196                 *s = data;
197         }
198
199         *rrd_data = rdata = calloc(max_data, sizeof(*data));
200
201         data->fd = socket(AF_INET, SOCK_STREAM, 0);
202         if (data->fd == -1) {
203                 printf("Failed to create socket: %m\n");
204                 return -1;
205         }
206
207         ret = connect(data->fd, (struct sockaddr *)&data->addr, sizeof(data->addr));
208         if (ret < 0) {
209                 pr_err("Failed to connect to %s: %m\n", data_addrstr(data));
210                 goto out_close_noderegister;
211         }
212
213         /* Send the last update timestamp to every host */
214         ret = dprintf(data->fd, "%ld\n", last_update);
215         if (ret < 0) {
216                 pr_err("Failed to send timestamp to host %s: %m\n",
217                         data_addrstr(data));
218                 goto out_close;
219         }
220
221         data->ev.fd = data->fd;
222         data->ev.events = EPOLLIN;
223         data->ev.handle_event = network_parser_recv_data;
224         data->ev.name = "network_parser_recv_data";
225         register_event_handler(&data->ev, EPOLL_CTL_ADD);
226
227         /*
228          * Try to get as much data in 10 seconds from all of the hosts
229          * as we can. If some host fails to return data us in the 10
230          * seconds, it probably doesn't return anything at all.
231          */
232         clock_gettime(CLOCK_REALTIME, &maxwait);
233         maxwait.tv_sec += 10;
234
235         /* Start processing all of the data */
236         for (d = 0; d < max_data - 1; d++) {
237                 mutex_lock(&data->lock);
238
239                 /* Parse lines until we have a complete line in buffer */
240                 while (1) {
241                         if (parse_line(data))
242                                 break;
243
244                         if (data->fd == -1)
245                                 break;
246
247                         ret = pthread_cond_timedwait(&data->cond, &data->lock.lock, &maxwait);
248                         if (ret == ETIMEDOUT) {
249                                 pr_info("Timed out\n");
250                                 break;
251                         }
252                 }
253
254                 /* Did we get a line? */
255                 if (!parse_line(data)) {
256                         mutex_unlock(&data->lock);
257                         break;
258                 }
259
260                 rdata[d] = calloc(1, RRD_DATA_MAX_LEN);
261
262                 strncpy(rdata[d], data->last_line, RRD_DATA_MAX_LEN);
263                 data->last_time = 0;
264                 mutex_unlock(&data->lock);
265         }
266
267         pr_info("Finished at line %d, idx: %d\n", d, data->idx);
268
269         ret = d;
270
271 out_close:
272         /*
273          * Close the socket if needed. We'll reconnect on the next
274          * call
275          */
276         if (data->fd != -1) {
277                 register_event_handler(&data->ev, EPOLL_CTL_DEL);
278 out_close_noderegister:
279                 close(data->fd);
280                 data->fd = -1;
281         }
282
283         return ret;
284 }
285
286 static struct parser_info network_parser = {
287         .name = "network",
288         .parse_multi = network_multi_parser,
289 };
290
291 int register_network_parser(void)
292 {
293         return register_parser(&network_parser);
294 }