From b7e51bd3b522fb8413aa89a37cc6bdf97f33a3a8 Mon Sep 17 00:00:00 2001 From: Sean Shahkarami Date: Thu, 3 Mar 2022 09:11:11 -0600 Subject: [PATCH] updated sys metrics expected publish interval from 60s to 120s --- check_nodes_influxdb.py | 276 ++++++++++++++-------------------------- requirements.txt | 1 + 2 files changed, 97 insertions(+), 180 deletions(-) diff --git a/check_nodes_influxdb.py b/check_nodes_influxdb.py index 30d0211..caae21b 100644 --- a/check_nodes_influxdb.py +++ b/check_nodes_influxdb.py @@ -166,7 +166,7 @@ } outputs_from_raingauge = { - "env.raingauge.acc", + # "env.raingauge.acc", "env.raingauge.event_acc", "env.raingauge.rint", "env.raingauge.total_acc", @@ -176,9 +176,9 @@ # the possible devices on a node. the frequency is the minimum expected # publishing frequency device_output_table = { - "nxcore": [("sys", name, "60s") for name in sys_from_nxcore], - "nxagent": [("sys", name, "60s") for name in sys_from_nxagent], - "rpi": [("sys", name, "60s") for name in sys_from_rpi], + "nxcore": [("sys", name, "120s") for name in sys_from_nxcore], + "nxagent": [("sys", name, "120s") for name in sys_from_nxagent], + "rpi": [("sys", name, "120s") for name in sys_from_rpi], "dell": [("sys", name, "60s") for name in sys_from_dellblade], "bme280": [("iio-nx", name, "30s") for name in outputs_from_bme], "bme680": [("iio-rpi", name, "30s") for name in outputs_from_bme], @@ -191,76 +191,6 @@ } -# # probably can do this in simpler way using production list -# def get_sanity_test_rollup_results(start, end, now=None): -# if now is None: -# now = pd.to_datetime("now", utc=True) - -# start = (now + pd.to_timedelta(start)).floor("1h") -# end = (now + pd.to_timedelta(end)).floor("1h") - -# # request data 30min before and after [start, end] window so we always -# # include data we would round in. -# df = sage_data_client.query( -# start=(start - pd.to_timedelta("30min")).isoformat(), -# end=(end + pd.to_timedelta("30min")).isoformat(), -# filter={ -# "name": "sys.sanity.*" -# } -# ) - -# # TODO drop outside of rollup window -# df["timestamp"] = df["timestamp"].dt.round("1h") -# df["total"] = 1 -# df["pass"] = df["value"] == 0 -# df["fail"] = df["value"] != 0 - -# table = pd.pivot_table(df, values=["total", "pass", "fail"], index="timestamp", columns=["meta.node", "meta.vsn"], aggfunc="sum", fill_value=0) - -# # fill in all time windows -# index = pd.date_range(start.floor("1h"), end.floor("1h"), freq="h") -# table = table.reindex(index) - -# # TODO need to filling in the vsn -# # groups = df.groupby("meta.vsn") - -# # for vsn in vsns: -# # try: -# # df_vsn = groups.get_group(vsn) -# # table_vsn = pd.pivot_table(df_vsn, values=["total", "pass", "fail"], index="timestamp", aggfunc="sum", fill_value=0) -# # table_vsn = table_vsn.reindex(index, fill_value=0) -# # except KeyError: -# # table_vsn = pd.DataFrame([], index=index) -# # table_vsn["total"] = 0 -# # table_vsn["pass"] = 0 -# # table_vsn["fail"] = 0 -# # print(table_vsn) -# # print() - -# results = [] - -# fieldnames = { -# "total": "sanity_test_total", -# "pass": "sanity_test_pass_total", -# "fail": "sanity_test_fail_total", -# } - -# for (field, node, vsn, ts), total in table.unstack().iteritems(): -# results.append({ -# "measurement": fieldnames[field], -# "tags": { -# "vsn": vsn, -# "node": node, -# }, -# "fields": { -# "value": total, -# }, -# "time": ts, -# }) - -# return results - - def write_results_to_influxdb(url, token, org, records): data = [] @@ -354,6 +284,95 @@ def get_rollup_range(start, end, now=None): return start.floor("1h"), end.floor("1h") +def get_records_for_window(nodes, start, end, window): + logging.info("getting records in %s %s", start, end) + + records = [] + + logging.info("querying data...") + df = sage_data_client.query(start=start, end=end) + logging.info("done") + + logging.info("checking data...") + + timestamp = start + + def add_node_health_check_record(vsn, value): + records.append({ + "measurement": "node_health_check", + "tags": { + "vsn": vsn, + }, + "fields": { + "value": int(value), + }, + "timestamp": timestamp, + }) + + def add_device_health_check_record(vsn, device, value): + records.append({ + "measurement": "device_health_check", + "tags": { + "vsn": vsn, + "device": device, + }, + "fields": { + "value": int(value), + }, + "timestamp": timestamp, + }) + + # NOTE metrics agent doesn't add a task name, so we set task name + # to system for system metrics. + df.loc[df["name"].str.startswith("sys."), "meta.task"] = "sys" + + vsn_groups = df.groupby(["meta.vsn"]) + + for node in nodes: + try: + df_vsn = vsn_groups.get_group(node.vsn) + except: + add_node_health_check_record(node.vsn, 0) + for device in node.devices: + add_device_health_check_record(node.vsn, device, 0) + continue + + groups = df_vsn.groupby(["meta.task", "name"]) + + def check_publishing_frequency_for_device(device, window): + for task, name, freq in device_output_table[device]: + try: + group = groups.get_group((task, name)) + yield task, name, check_publishing_frequency(group, freq, window) + except KeyError: + yield task, name, 0.0 + + def check_publishing_sla_for_device(device, window, sla): + healthy = True + for task, name, f in check_publishing_frequency_for_device(device, window): + if f < sla: + healthy = False + logging.info("failed sla %s %s %s %s %s %s %0.3f", start, end, node.vsn, device, task, name, f) + return healthy + + node_healthy = True + + for device in node.devices: + # the idea here is to translate the publishing frequency into a kind of SLA. here + # we're saying that after breaking the series up into window the size of the publishing + # frequency, we should see 1 sample per window in 90% of the windows. + healthy = check_publishing_sla_for_device(device, window, 0.90) + # accumulate full node health + node_healthy = node_healthy and healthy + add_device_health_check_record(node.vsn, device, healthy) + + add_node_health_check_record(node.vsn, node_healthy) + + logging.info("done") + + return records + + def main(): INFLUXDB_URL = "https://influxdb.sagecontinuum.org" INFLUXDB_ORG = "waggle" @@ -374,125 +393,22 @@ def time_arg(s): format="%(asctime)s %(message)s", datefmt="%Y/%m/%d %H:%M:%S") - node_table = load_node_table() - + nodes = load_node_table() start, end = get_rollup_range(args.start, args.end) window = args.window logging.info("current time is %s", now) for start, end in time_windows(start, end, window): - logging.info("checking %s %s", start, end) - records = [] - - logging.info("querying data...") - df = sage_data_client.query( - start=start.isoformat(), - end=end.isoformat(), - ) - logging.info("done!") - - logging.info("checking data...") - - timestamp = start - - # NOTE metrics agent doesn't add a task name, so we set task name - # to system for system metrics. - df.loc[df["name"].str.startswith("sys."), "meta.task"] = "sys" - - vsn_groups = df.groupby(["meta.vsn"]) - - for node in node_table: - try: - df_vsn = vsn_groups.get_group(node.vsn) - except: - records.append({ - "measurement": "node_health_check", - "tags": { - "vsn": node.vsn, - }, - "fields": { - "value": 0, - }, - "timestamp": timestamp, - }) - - for device in node.devices: - records.append({ - "measurement": "device_health_check", - "tags": { - "vsn": node.vsn, - "device": device, - }, - "fields": { - "value": 0, - }, - "timestamp": timestamp, - }) - continue - - groups = df_vsn.groupby(["meta.task", "name"]) - - def check_publishing_frequency_for_device(device, window): - for task, name, freq in device_output_table[device]: - try: - group = groups.get_group((task, name)) - except KeyError: - yield 0.0 - yield check_publishing_frequency(group, freq, window) - - - node_healthy = True + records = get_records_for_window(nodes, start, end, window) - for device in node.devices: - # for task, name, freq in device_output_table[device]: - # try: - # group = groups.get_group((task, name)) - # value = check_publishing_frequency(group, freq, window) - # except KeyError: - # value = 0.0 - # results.append({"task", task, "name": name, "vsn": node.vsn, "device": device, "value": value}) - - # the idea here is to translate the publishing frequency into a kind of SLA. here - # we're saying that after breaking the series up into window the size of the publishing - # frequency, we should see 1 sample per window in 95% of the windows. - healthy = all(f > 0.95 for f in check_publishing_frequency_for_device(device, window)) - - # accumulate full node health - node_healthy = node_healthy and healthy - - records.append({ - "measurement": "device_health_check", - "tags": { - "vsn": node.vsn, - "device": device, - }, - "fields": { - "value": int(healthy), - }, - "timestamp": timestamp, - }) - - records.append({ - "measurement": "node_health_check", - "tags": { - "vsn": node.vsn, - }, - "fields": { - "value": int(node_healthy), - }, - "timestamp": timestamp, - }) - - logging.info("done!") - logging.info("writing %s records...", len(records)) write_results_to_influxdb( url=INFLUXDB_URL, org=INFLUXDB_ORG, token=INFLUXDB_TOKEN, records=records) - logging.info("done!") + logging.info("done") if __name__ == "__main__": diff --git a/requirements.txt b/requirements.txt index 6730d7c..96b03be 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ sage-data-client==0.4.* slackclient +influxdb-client[ciso]