]> git.itanic.dy.fi Git - rrdd/commitdiff
rrdtool: Add support for multi parsers
authorTimo Kokkonen <timo.t.kokkonen@iki.fi>
Sun, 11 Oct 2020 09:40:18 +0000 (12:40 +0300)
committerTimo Kokkonen <timo.t.kokkonen@iki.fi>
Sun, 11 Oct 2020 09:40:18 +0000 (12:40 +0300)
This new multi parser type can provide multiple data entries, each
having their own timestamp. The parsing interface is simply extended
so that the new parser returns an array of data elementrs, all in
rrdtool data format. The data is then fed directly to rrdtool via
"rrdtool update" command. New parser also is given the timestamp of
the last update.

As data timestamp is now provided by the parser, there are quite
significant implications on the timeout handling. Previously the
expectation was that each rrd database is updated only once during its
update interval and data fetching happens only once during an update
interval.

This expectation is no longer valid as the parser can return bunch of
data, which is all in past. And the parser would be happy to give even
more data if it was give a chance to do so. Therefore, the actual
"last_update" timestamp is now taken from the latest data entry. And
after each complete update, the job scheduler is notified to do a
re-schedule in order to start a new round of data acquicition in case
there is more data to parse.

The scheduler now needs to be able to wait indefinitely in case all
parsers are still active and there is no information about when a next
event is going to happen. Also a new backoff timestamp is introduced
to prevent the parser from continuously re-staring parsing in case the
data source fails to provide any new data at all.

Signed-off-by: Timo Kokkonen <timo.t.kokkonen@iki.fi>
main.c
parser.h
rrdtool.c
rrdtool.h
utils.h

diff --git a/main.c b/main.c
index f756bdea4e63d0e7934a8a295f9dcfc0d0cb3af1..eb19bb9b77a9d407136702a59b962c32e86e22ef 100644 (file)
--- a/main.c
+++ b/main.c
@@ -217,10 +217,13 @@ int main(int argc, char *argv[])
 
                sleeptime = get_next_update(db_list, &db_name);
 
-               t = time(0) + sleeptime;
-               strftime(timestr, sizeof(timestr), "%T", localtime(&t));
-               pr_info("Next scheduled event \"%s\" at %s, in %d seconds\n",
-                       db_name, timestr, sleeptime);
+               if (sleeptime >=  0) {
+                       t = time(0) + sleeptime;
+                       strftime(timestr, sizeof(timestr), "%T", localtime(&t));
+                       pr_info("Next scheduled event \"%s\" at %s, in %d seconds\n",
+                               db_name, timestr, sleeptime);
+               } else
+                       pr_info("All jobs active, sleeping until event arrives\n");
 
                poll_job_requests(sleeptime);
        }
index b77eafef1adc55e077c1659bee9bfac78919ae10..54f23a7b85debbd31fa021ab3a7bc28231f3ba78 100644 (file)
--- a/parser.h
+++ b/parser.h
@@ -1,13 +1,19 @@
 #ifndef _PARSER_H
 #define _PARSER_H
 
+#include <time.h>
+
 typedef int (parse_fn_t)(char *rrd_data, const char **parser_data,
                        void **parser_state);
 
+typedef int (parse_multi_fn_t)(char ***rrd_data, const char **parser_data,
+                       void **parser_state, time_t last_update);
+
 struct parser_info {
        struct parser_info *next;
        const char *name;
        parse_fn_t *parse;
+       parse_multi_fn_t *parse_multi;
 };
 
 int register_parser(struct parser_info *info);
index d286963c66ea6c04616ad9b5151a9bd2c530eede..d1e85f4c1d398750ed25d81ad7ee15866f68ad85 100644 (file)
--- a/rrdtool.c
+++ b/rrdtool.c
@@ -11,6 +11,7 @@
 #include "parser.h"
 #include "debug.h"
 #include "string.h"
+#include "utils.h"
 
 #define MAX_ARGS       512
 #define ARGSTR_LEN     32768
@@ -265,6 +266,68 @@ static int run_post_draw_cmd(struct rrd_database *rrd)
        return 0;
 }
 
+static int rrdtool_update_data_multi(struct rrd_database *rrd)
+{
+       char **data = NULL;
+       int ret, i, d;
+       char cmd[] = RRDTOOL_CMD;
+       char *cmdline[512] = {
+               RRDTOOL_CMD,
+               "update",
+               (char *const)rrd->filename,
+       };
+       int old_last_update = rrd->last_update;
+
+       ret = rrd->parser->parse_multi(&data, rrd->parser_data,
+                               &rrd->parser_state, rrd->last_update);
+       if (ret < 0) {
+               pr_err("Parser failure: %d\n", ret);
+               goto out;
+       }
+
+       for (i = 3, d = 0; i < ARRAY_SIZE(cmdline) - 1; i++, d++) {
+               time_t then;
+
+               if (!data[d])
+                       break;
+
+               sanitize_rrd_update_data(data[d]);
+
+               then = atoi(data[d]);
+               write_to_logfile(rrd, data[d], then);
+               cmdline[i] = data[d];
+               pr_info("Data: %s\n", data[d]);
+
+               rrd->last_update = then;
+       }
+
+       cmdline[i] = 0;
+
+       if (ret)
+               run(cmd, cmdline);
+
+out:
+       if (data)
+               for (d = 0; data[d]; d++)
+                       free(data[d]);
+       free(data);
+
+       if (old_last_update == rrd->last_update) {
+               rrd->update_backoff = time(NULL) + 10;
+               pr_info("Setting backoff\n");
+       } else
+               rrd->update_backoff = 0;
+
+       /*
+        * Re-schedule job processing in case we are too far behind
+        * with updates on this database and can start parsing more
+        * data immediately.
+        */
+       notify_job_request();
+
+       return 0;
+}
+
 static int do_rrdtool_update_data(struct rrd_database *rrd)
 {
        char data[RRD_DATA_MAX_LEN + 12]; /* 12 == "%s:" + NULL termination */
@@ -283,10 +346,12 @@ static int do_rrdtool_update_data(struct rrd_database *rrd)
        bzero(data, sizeof(data));
        l = sprintf(data, "%zd:", now);
 
-       if (rrd->parser && rrd->parser->parse) {
+       if (rrd->parser && rrd->parser->parse_multi) {
+               rrdtool_update_data_multi(rrd);
+       } else if (rrd->parser && rrd->parser->parse) {
                rrd->parser->parse(data + l, rrd->parser_data,
                                &rrd->parser_state);
-               data[RRD_DATA_MAX_LEN + 2] = '\0';
+               data[RRD_DATA_MAX_LEN + l] = '\0';
 
                pr_info("Data: %s\n", data);
 
@@ -294,7 +359,9 @@ static int do_rrdtool_update_data(struct rrd_database *rrd)
                write_to_logfile(rrd, data, now);
 
                run(cmd, cmdline);
-       }
+               rrd->last_update = now;
+       } else
+               rrd->last_update = now;
 
        if (rrd->pre_draw_cmd && !strcmp(rrd->pre_draw_cmd[0], "shell")) {
                run(rrd->pre_draw_cmd[1], &rrd->pre_draw_cmd[1]);
@@ -316,18 +383,14 @@ static int do_rrdtool_update_data(struct rrd_database *rrd)
                queue_work(WORK_PRIORITY_LOW, "rrdtool_post_draw_cmd",
                        (work_fn_t *)run_post_draw_cmd, rrd);
 
+       rrd->update_active = 0;
+
        return 0;
 }
 
 int rrdtool_update_data(struct rrd_database *rrd)
 {
-       time_t now = time(0);
-
-       /*
-        * This will put our last update slightly into past, but
-        * ensures our update interval will not drift over time.
-        */
-       rrd->last_update = now - now % rrd->interval;
+       rrd->update_active = 1;
 
        return queue_work(WORK_PRIORITY_HIGH, "rrdtool_update_data",
                        (work_fn_t *)do_rrdtool_update_data, rrd);
@@ -340,10 +403,12 @@ int rrdtool_update_data(struct rrd_database *rrd)
 struct rrd_database *get_outdated_db(struct rrd_database **dblist)
 {
        int i;
-       time_t now = time(0);
+       time_t now = time(0), last;
 
        for (i = 0; dblist[i]; i++) {
-               if ((dblist[i]->last_update + dblist[i]->interval) - now <= 0)
+               last = max(ROUND_UP(dblist[i]->last_update, dblist[i]->interval),
+                       dblist[i]->update_backoff);
+               if (!dblist[i]->update_active && last - now <= 0)
                        return dblist[i];
        }
 
@@ -352,8 +417,7 @@ struct rrd_database *get_outdated_db(struct rrd_database **dblist)
 }
 
 /*
- * See how long we may sleep until it is required to run an update
- * again
+ * See how long we may sleep until next update interval window begins
  */
 int get_next_update(struct rrd_database **dblist, const char **name)
 {
@@ -361,7 +425,12 @@ int get_next_update(struct rrd_database **dblist, const char **name)
        time_t now = time(0);
 
        for (i = 0; dblist[i]; i++) {
-               diff = dblist[i]->last_update + dblist[i]->interval - now;
+               diff = ROUND_UP(dblist[i]->last_update, dblist[i]->interval) - now;
+               diff = max(diff, dblist[i]->update_backoff - now);
+
+               if (dblist[i]->update_active)
+                       diff = (now + dblist[i]->interval) % dblist[i]->interval;
+
                if (!sleeptime) {
                        sleeptime = diff;
                        *name = dblist[i]->name;
@@ -370,10 +439,11 @@ int get_next_update(struct rrd_database **dblist, const char **name)
                        sleeptime = diff;
                        *name = dblist[i]->name;
                }
-               if (sleeptime <= 0)
-                       return 0;
        }
 
+       if (sleeptime == 0)
+               sleeptime = -1;
+
        return sleeptime;
 }
 
@@ -411,6 +481,8 @@ static int get_last_update(struct rrd_database *db)
        }
 
        db->last_update = atoi(buf);
+       pr_info("Last update for %s is: %ld, %ld sec ago\n", db->name, db->last_update,
+               time(NULL) - db->last_update);
 
        clear_zombie(child);
 
index 4484e2c3c325ea340f4e0d1f7ae2d76d8986b122..94d0ad8887a3fc9ac34d526750644be58d2aed42 100644 (file)
--- a/rrdtool.h
+++ b/rrdtool.h
@@ -52,7 +52,9 @@ struct rrd_database {
        const char *logfile;    /* Name of a file where data can be logged */
        const char *logfile_timestamp_fmt;
 
-       time_t  last_update;    /* When was the data last updated */
+       time_t last_update;     /* When was the data last updated */
+       int update_active;
+       int update_backoff;
        const char *name;               /* Name of the database */
 };
 
diff --git a/utils.h b/utils.h
index c230b5559b8394188b52a6c94639ef31c4d358c3..304e32a69013d8205aad1d46b06ed8685ea9424f 100644 (file)
--- a/utils.h
+++ b/utils.h
@@ -35,4 +35,6 @@ static inline int mutex_init(struct mutex *mutex)
        return pthread_mutex_init(&mutex->lock, NULL);
 }
 
+#define ROUND_UP(a, m) ((a) + (m) - (a) % (m))
+
 #endif