From: Timo Kokkonen Date: Sun, 11 Oct 2020 09:40:18 +0000 (+0300) Subject: rrdtool: Add support for multi parsers X-Git-Url: http://git.itanic.dy.fi/?p=rrdd;a=commitdiff_plain;h=7424327891ddb402b80142c83bfe1fe4303a35af rrdtool: Add support for multi parsers This new multi parser type can provide multiple data entries, each having their own timestamp. The parsing interface is simply extended so that the new parser returns an array of data elementrs, all in rrdtool data format. The data is then fed directly to rrdtool via "rrdtool update" command. New parser also is given the timestamp of the last update. As data timestamp is now provided by the parser, there are quite significant implications on the timeout handling. Previously the expectation was that each rrd database is updated only once during its update interval and data fetching happens only once during an update interval. This expectation is no longer valid as the parser can return bunch of data, which is all in past. And the parser would be happy to give even more data if it was give a chance to do so. Therefore, the actual "last_update" timestamp is now taken from the latest data entry. And after each complete update, the job scheduler is notified to do a re-schedule in order to start a new round of data acquicition in case there is more data to parse. The scheduler now needs to be able to wait indefinitely in case all parsers are still active and there is no information about when a next event is going to happen. Also a new backoff timestamp is introduced to prevent the parser from continuously re-staring parsing in case the data source fails to provide any new data at all. Signed-off-by: Timo Kokkonen --- diff --git a/main.c b/main.c index f756bde..eb19bb9 100644 --- a/main.c +++ b/main.c @@ -217,10 +217,13 @@ int main(int argc, char *argv[]) sleeptime = get_next_update(db_list, &db_name); - t = time(0) + sleeptime; - strftime(timestr, sizeof(timestr), "%T", localtime(&t)); - pr_info("Next scheduled event \"%s\" at %s, in %d seconds\n", - db_name, timestr, sleeptime); + if (sleeptime >= 0) { + t = time(0) + sleeptime; + strftime(timestr, sizeof(timestr), "%T", localtime(&t)); + pr_info("Next scheduled event \"%s\" at %s, in %d seconds\n", + db_name, timestr, sleeptime); + } else + pr_info("All jobs active, sleeping until event arrives\n"); poll_job_requests(sleeptime); } diff --git a/parser.h b/parser.h index b77eafe..54f23a7 100644 --- a/parser.h +++ b/parser.h @@ -1,13 +1,19 @@ #ifndef _PARSER_H #define _PARSER_H +#include + typedef int (parse_fn_t)(char *rrd_data, const char **parser_data, void **parser_state); +typedef int (parse_multi_fn_t)(char ***rrd_data, const char **parser_data, + void **parser_state, time_t last_update); + struct parser_info { struct parser_info *next; const char *name; parse_fn_t *parse; + parse_multi_fn_t *parse_multi; }; int register_parser(struct parser_info *info); diff --git a/rrdtool.c b/rrdtool.c index d286963..d1e85f4 100644 --- a/rrdtool.c +++ b/rrdtool.c @@ -11,6 +11,7 @@ #include "parser.h" #include "debug.h" #include "string.h" +#include "utils.h" #define MAX_ARGS 512 #define ARGSTR_LEN 32768 @@ -265,6 +266,68 @@ static int run_post_draw_cmd(struct rrd_database *rrd) return 0; } +static int rrdtool_update_data_multi(struct rrd_database *rrd) +{ + char **data = NULL; + int ret, i, d; + char cmd[] = RRDTOOL_CMD; + char *cmdline[512] = { + RRDTOOL_CMD, + "update", + (char *const)rrd->filename, + }; + int old_last_update = rrd->last_update; + + ret = rrd->parser->parse_multi(&data, rrd->parser_data, + &rrd->parser_state, rrd->last_update); + if (ret < 0) { + pr_err("Parser failure: %d\n", ret); + goto out; + } + + for (i = 3, d = 0; i < ARRAY_SIZE(cmdline) - 1; i++, d++) { + time_t then; + + if (!data[d]) + break; + + sanitize_rrd_update_data(data[d]); + + then = atoi(data[d]); + write_to_logfile(rrd, data[d], then); + cmdline[i] = data[d]; + pr_info("Data: %s\n", data[d]); + + rrd->last_update = then; + } + + cmdline[i] = 0; + + if (ret) + run(cmd, cmdline); + +out: + if (data) + for (d = 0; data[d]; d++) + free(data[d]); + free(data); + + if (old_last_update == rrd->last_update) { + rrd->update_backoff = time(NULL) + 10; + pr_info("Setting backoff\n"); + } else + rrd->update_backoff = 0; + + /* + * Re-schedule job processing in case we are too far behind + * with updates on this database and can start parsing more + * data immediately. + */ + notify_job_request(); + + return 0; +} + static int do_rrdtool_update_data(struct rrd_database *rrd) { char data[RRD_DATA_MAX_LEN + 12]; /* 12 == "%s:" + NULL termination */ @@ -283,10 +346,12 @@ static int do_rrdtool_update_data(struct rrd_database *rrd) bzero(data, sizeof(data)); l = sprintf(data, "%zd:", now); - if (rrd->parser && rrd->parser->parse) { + if (rrd->parser && rrd->parser->parse_multi) { + rrdtool_update_data_multi(rrd); + } else if (rrd->parser && rrd->parser->parse) { rrd->parser->parse(data + l, rrd->parser_data, &rrd->parser_state); - data[RRD_DATA_MAX_LEN + 2] = '\0'; + data[RRD_DATA_MAX_LEN + l] = '\0'; pr_info("Data: %s\n", data); @@ -294,7 +359,9 @@ static int do_rrdtool_update_data(struct rrd_database *rrd) write_to_logfile(rrd, data, now); run(cmd, cmdline); - } + rrd->last_update = now; + } else + rrd->last_update = now; if (rrd->pre_draw_cmd && !strcmp(rrd->pre_draw_cmd[0], "shell")) { run(rrd->pre_draw_cmd[1], &rrd->pre_draw_cmd[1]); @@ -316,18 +383,14 @@ static int do_rrdtool_update_data(struct rrd_database *rrd) queue_work(WORK_PRIORITY_LOW, "rrdtool_post_draw_cmd", (work_fn_t *)run_post_draw_cmd, rrd); + rrd->update_active = 0; + return 0; } int rrdtool_update_data(struct rrd_database *rrd) { - time_t now = time(0); - - /* - * This will put our last update slightly into past, but - * ensures our update interval will not drift over time. - */ - rrd->last_update = now - now % rrd->interval; + rrd->update_active = 1; return queue_work(WORK_PRIORITY_HIGH, "rrdtool_update_data", (work_fn_t *)do_rrdtool_update_data, rrd); @@ -340,10 +403,12 @@ int rrdtool_update_data(struct rrd_database *rrd) struct rrd_database *get_outdated_db(struct rrd_database **dblist) { int i; - time_t now = time(0); + time_t now = time(0), last; for (i = 0; dblist[i]; i++) { - if ((dblist[i]->last_update + dblist[i]->interval) - now <= 0) + last = max(ROUND_UP(dblist[i]->last_update, dblist[i]->interval), + dblist[i]->update_backoff); + if (!dblist[i]->update_active && last - now <= 0) return dblist[i]; } @@ -352,8 +417,7 @@ struct rrd_database *get_outdated_db(struct rrd_database **dblist) } /* - * See how long we may sleep until it is required to run an update - * again + * See how long we may sleep until next update interval window begins */ int get_next_update(struct rrd_database **dblist, const char **name) { @@ -361,7 +425,12 @@ int get_next_update(struct rrd_database **dblist, const char **name) time_t now = time(0); for (i = 0; dblist[i]; i++) { - diff = dblist[i]->last_update + dblist[i]->interval - now; + diff = ROUND_UP(dblist[i]->last_update, dblist[i]->interval) - now; + diff = max(diff, dblist[i]->update_backoff - now); + + if (dblist[i]->update_active) + diff = (now + dblist[i]->interval) % dblist[i]->interval; + if (!sleeptime) { sleeptime = diff; *name = dblist[i]->name; @@ -370,10 +439,11 @@ int get_next_update(struct rrd_database **dblist, const char **name) sleeptime = diff; *name = dblist[i]->name; } - if (sleeptime <= 0) - return 0; } + if (sleeptime == 0) + sleeptime = -1; + return sleeptime; } @@ -411,6 +481,8 @@ static int get_last_update(struct rrd_database *db) } db->last_update = atoi(buf); + pr_info("Last update for %s is: %ld, %ld sec ago\n", db->name, db->last_update, + time(NULL) - db->last_update); clear_zombie(child); diff --git a/rrdtool.h b/rrdtool.h index 4484e2c..94d0ad8 100644 --- a/rrdtool.h +++ b/rrdtool.h @@ -52,7 +52,9 @@ struct rrd_database { const char *logfile; /* Name of a file where data can be logged */ const char *logfile_timestamp_fmt; - time_t last_update; /* When was the data last updated */ + time_t last_update; /* When was the data last updated */ + int update_active; + int update_backoff; const char *name; /* Name of the database */ }; diff --git a/utils.h b/utils.h index c230b55..304e32a 100644 --- a/utils.h +++ b/utils.h @@ -35,4 +35,6 @@ static inline int mutex_init(struct mutex *mutex) return pthread_mutex_init(&mutex->lock, NULL); } +#define ROUND_UP(a, m) ((a) + (m) - (a) % (m)) + #endif