Skip to content

Commit

Permalink
updated sys metrics expected publish interval from 60s to 120s
Browse files Browse the repository at this point in the history
  • Loading branch information
seanshahkarami committed Mar 3, 2022
1 parent 19baf13 commit b7e51bd
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 180 deletions.
276 changes: 96 additions & 180 deletions check_nodes_influxdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@
}

outputs_from_raingauge = {
"env.raingauge.acc",
# "env.raingauge.acc",
"env.raingauge.event_acc",
"env.raingauge.rint",
"env.raingauge.total_acc",
Expand All @@ -176,9 +176,9 @@
# the possible devices on a node. the frequency is the minimum expected
# publishing frequency
device_output_table = {
"nxcore": [("sys", name, "60s") for name in sys_from_nxcore],
"nxagent": [("sys", name, "60s") for name in sys_from_nxagent],
"rpi": [("sys", name, "60s") for name in sys_from_rpi],
"nxcore": [("sys", name, "120s") for name in sys_from_nxcore],
"nxagent": [("sys", name, "120s") for name in sys_from_nxagent],
"rpi": [("sys", name, "120s") for name in sys_from_rpi],
"dell": [("sys", name, "60s") for name in sys_from_dellblade],
"bme280": [("iio-nx", name, "30s") for name in outputs_from_bme],
"bme680": [("iio-rpi", name, "30s") for name in outputs_from_bme],
Expand All @@ -191,76 +191,6 @@
}


# # probably can do this in simpler way using production list
# def get_sanity_test_rollup_results(start, end, now=None):
# if now is None:
# now = pd.to_datetime("now", utc=True)

# start = (now + pd.to_timedelta(start)).floor("1h")
# end = (now + pd.to_timedelta(end)).floor("1h")

# # request data 30min before and after [start, end] window so we always
# # include data we would round in.
# df = sage_data_client.query(
# start=(start - pd.to_timedelta("30min")).isoformat(),
# end=(end + pd.to_timedelta("30min")).isoformat(),
# filter={
# "name": "sys.sanity.*"
# }
# )

# # TODO drop outside of rollup window
# df["timestamp"] = df["timestamp"].dt.round("1h")
# df["total"] = 1
# df["pass"] = df["value"] == 0
# df["fail"] = df["value"] != 0

# table = pd.pivot_table(df, values=["total", "pass", "fail"], index="timestamp", columns=["meta.node", "meta.vsn"], aggfunc="sum", fill_value=0)

# # fill in all time windows
# index = pd.date_range(start.floor("1h"), end.floor("1h"), freq="h")
# table = table.reindex(index)

# # TODO need to filling in the vsn
# # groups = df.groupby("meta.vsn")

# # for vsn in vsns:
# # try:
# # df_vsn = groups.get_group(vsn)
# # table_vsn = pd.pivot_table(df_vsn, values=["total", "pass", "fail"], index="timestamp", aggfunc="sum", fill_value=0)
# # table_vsn = table_vsn.reindex(index, fill_value=0)
# # except KeyError:
# # table_vsn = pd.DataFrame([], index=index)
# # table_vsn["total"] = 0
# # table_vsn["pass"] = 0
# # table_vsn["fail"] = 0
# # print(table_vsn)
# # print()

# results = []

# fieldnames = {
# "total": "sanity_test_total",
# "pass": "sanity_test_pass_total",
# "fail": "sanity_test_fail_total",
# }

# for (field, node, vsn, ts), total in table.unstack().iteritems():
# results.append({
# "measurement": fieldnames[field],
# "tags": {
# "vsn": vsn,
# "node": node,
# },
# "fields": {
# "value": total,
# },
# "time": ts,
# })

# return results


def write_results_to_influxdb(url, token, org, records):
data = []

Expand Down Expand Up @@ -354,6 +284,95 @@ def get_rollup_range(start, end, now=None):
return start.floor("1h"), end.floor("1h")


def get_records_for_window(nodes, start, end, window):
logging.info("getting records in %s %s", start, end)

records = []

logging.info("querying data...")
df = sage_data_client.query(start=start, end=end)
logging.info("done")

logging.info("checking data...")

timestamp = start

def add_node_health_check_record(vsn, value):
records.append({
"measurement": "node_health_check",
"tags": {
"vsn": vsn,
},
"fields": {
"value": int(value),
},
"timestamp": timestamp,
})

def add_device_health_check_record(vsn, device, value):
records.append({
"measurement": "device_health_check",
"tags": {
"vsn": vsn,
"device": device,
},
"fields": {
"value": int(value),
},
"timestamp": timestamp,
})

# NOTE metrics agent doesn't add a task name, so we set task name
# to system for system metrics.
df.loc[df["name"].str.startswith("sys."), "meta.task"] = "sys"

vsn_groups = df.groupby(["meta.vsn"])

for node in nodes:
try:
df_vsn = vsn_groups.get_group(node.vsn)
except:
add_node_health_check_record(node.vsn, 0)
for device in node.devices:
add_device_health_check_record(node.vsn, device, 0)
continue

groups = df_vsn.groupby(["meta.task", "name"])

def check_publishing_frequency_for_device(device, window):
for task, name, freq in device_output_table[device]:
try:
group = groups.get_group((task, name))
yield task, name, check_publishing_frequency(group, freq, window)
except KeyError:
yield task, name, 0.0

def check_publishing_sla_for_device(device, window, sla):
healthy = True
for task, name, f in check_publishing_frequency_for_device(device, window):
if f < sla:
healthy = False
logging.info("failed sla %s %s %s %s %s %s %0.3f", start, end, node.vsn, device, task, name, f)
return healthy

node_healthy = True

for device in node.devices:
# the idea here is to translate the publishing frequency into a kind of SLA. here
# we're saying that after breaking the series up into window the size of the publishing
# frequency, we should see 1 sample per window in 90% of the windows.
healthy = check_publishing_sla_for_device(device, window, 0.90)
# accumulate full node health
node_healthy = node_healthy and healthy
add_device_health_check_record(node.vsn, device, healthy)

add_node_health_check_record(node.vsn, node_healthy)

logging.info("done")

return records


def main():
INFLUXDB_URL = "https://influxdb.sagecontinuum.org"
INFLUXDB_ORG = "waggle"
Expand All @@ -374,125 +393,22 @@ def time_arg(s):
format="%(asctime)s %(message)s",
datefmt="%Y/%m/%d %H:%M:%S")

node_table = load_node_table()

nodes = load_node_table()
start, end = get_rollup_range(args.start, args.end)
window = args.window

logging.info("current time is %s", now)

for start, end in time_windows(start, end, window):
logging.info("checking %s %s", start, end)
records = []

logging.info("querying data...")
df = sage_data_client.query(
start=start.isoformat(),
end=end.isoformat(),
)
logging.info("done!")

logging.info("checking data...")

timestamp = start

# NOTE metrics agent doesn't add a task name, so we set task name
# to system for system metrics.
df.loc[df["name"].str.startswith("sys."), "meta.task"] = "sys"

vsn_groups = df.groupby(["meta.vsn"])

for node in node_table:
try:
df_vsn = vsn_groups.get_group(node.vsn)
except:
records.append({
"measurement": "node_health_check",
"tags": {
"vsn": node.vsn,
},
"fields": {
"value": 0,
},
"timestamp": timestamp,
})

for device in node.devices:
records.append({
"measurement": "device_health_check",
"tags": {
"vsn": node.vsn,
"device": device,
},
"fields": {
"value": 0,
},
"timestamp": timestamp,
})
continue

groups = df_vsn.groupby(["meta.task", "name"])

def check_publishing_frequency_for_device(device, window):
for task, name, freq in device_output_table[device]:
try:
group = groups.get_group((task, name))
except KeyError:
yield 0.0
yield check_publishing_frequency(group, freq, window)


node_healthy = True
records = get_records_for_window(nodes, start, end, window)

for device in node.devices:
# for task, name, freq in device_output_table[device]:
# try:
# group = groups.get_group((task, name))
# value = check_publishing_frequency(group, freq, window)
# except KeyError:
# value = 0.0
# results.append({"task", task, "name": name, "vsn": node.vsn, "device": device, "value": value})

# the idea here is to translate the publishing frequency into a kind of SLA. here
# we're saying that after breaking the series up into window the size of the publishing
# frequency, we should see 1 sample per window in 95% of the windows.
healthy = all(f > 0.95 for f in check_publishing_frequency_for_device(device, window))

# accumulate full node health
node_healthy = node_healthy and healthy

records.append({
"measurement": "device_health_check",
"tags": {
"vsn": node.vsn,
"device": device,
},
"fields": {
"value": int(healthy),
},
"timestamp": timestamp,
})

records.append({
"measurement": "node_health_check",
"tags": {
"vsn": node.vsn,
},
"fields": {
"value": int(node_healthy),
},
"timestamp": timestamp,
})

logging.info("done!")

logging.info("writing %s records...", len(records))
write_results_to_influxdb(
url=INFLUXDB_URL,
org=INFLUXDB_ORG,
token=INFLUXDB_TOKEN,
records=records)
logging.info("done!")
logging.info("done")


if __name__ == "__main__":
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
sage-data-client==0.4.*
slackclient
influxdb-client[ciso]

0 comments on commit b7e51bd

Please sign in to comment.