Skip to content

Commit

Permalink
Improved --top
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmoreno committed Dec 15, 2023
1 parent 43bdf05 commit 998cb86
Showing 1 changed file with 208 additions and 160 deletions.
368 changes: 208 additions & 160 deletions cli/rtpmidid-cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/python3

import shutil
import socket
import sys
import json
Expand All @@ -26,32 +27,6 @@ def command(self, command):
return json.loads(data)


def main(argv):
settings = parse_arguments(argv)
if settings.control:
socketpath = settings.control
else:
socketpath = "/var/run/rtpmidid/control.sock"

if not os.path.exists(socketpath):
print("Control socket %s does not exist" % socketpath)
sys.exit(1)

try:
conn = Connection(socketpath)
except Exception as e:
print(str(e))
sys.exit(1)

if settings.top:
return top_loop(conn)

for cmd in parse_commands(settings.command or ["help"]):
print(">>> %s" % json.dumps(cmd), file=sys.stderr)
ret = conn.command(cmd)
print(json.dumps(ret, indent=2))


def maybe_int(txt: str):
try:
return int(txt)
Expand Down Expand Up @@ -83,148 +58,221 @@ def parse_commands(argv):
yield {"method": cmd[0], "params": [maybe_int(x) for x in cmd[1:]]}


ANSI_CLEAR_SCREEN = "\033[2J\033[H"
ANSI_PUSH_SCREEN = "\033[?1049h"
ANSI_POP_SCREEN = "\033[?1049l"

ANSI_TEXT_BLUE = "\033[34m"
ANSI_TEXT_RED = "\033[31m"
ANSI_TEXT_GREEN = "\033[32m"
ANSI_TEXT_YELLOW = "\033[33m"
ANSI_TEXT_PURPLE = "\033[35m"
ANSI_TEXT_CYAN = "\033[36m"
ANSI_TEXT_WHITE = "\033[37m"
ANSI_TEXT_BLACK = "\033[30m"
ANSI_TEXT_GREY = "\033[90m"
ANSI_TEXT_BOLD = "\033[1m"

ANSI_BG_BLUE = "\033[44m"
ANSI_BG_RED = "\033[41m"
ANSI_BG_GREEN = "\033[42m"
ANSI_BG_YELLOW = "\033[43m"
ANSI_BG_PURPLE = "\033[45m"
ANSI_BG_CYAN = "\033[46m"
ANSI_BG_WHITE = "\033[47m"
ANSI_BG_BLACK = "\033[40m"
ANSI_BG_GREY = "\033[100m"

ANSI_RESET = "\033[0m"


def safe_getter(*keys):
def getter(data):
try:
for key in keys:
if key in data:
data = data[key]
else:
return ""
except:
return ""
return data

return getter


def latency_getter(data):
avg = safe_getter("peer", "latency_ms", "average")(data)
if not avg:
def safe_get(data, *keys):
try:
for key in keys:
if key in data:
data = data[key]
else:
return ""
except:
return ""
stddev = safe_getter("peer", "latency_ms", "stddev")(data)
return f"{avg}ms \u00B1 {stddev}ms"


COLUMNS = [
{
"name": "ID",
"get": safe_getter("id"),
"width": 8,
"align": "right",
},
{
"name": "Type",
"get": safe_getter("type"),
"width": 40,
"align": "left",
},
{
"name": "Name",
"get": safe_getter("name"),
"width": 60,
"align": "left",
},
{
"name": "Status",
"get": safe_getter("peer", "status"),
"width": 20,
"align": "left",
},
{
"name": "Sent",
"get": safe_getter("stats", "sent"),
"width": 8,
"align": "right",
},
{
"name": "Received",
"get": safe_getter("stats", "recv"),
"width": 8,
"align": "right",
},
{
"name": "Latency",
# the unicode symbol for +- is \u00B1
"get": latency_getter,
"width": 20,
"align": "right",
},
{
"name": "Send To",
"get": lambda x: ", ".join(str(x) for x in safe_getter("send_to")(x)),
"width": 10,
},
]


def print_table(table: list[list[str]]):
def format_cell(cell, column):
if column.get("align") == "right":
return "{:>{width}}".format(cell, width=column["width"])
return data


class Top:
ANSI_CLEAR_SCREEN = "\033[2J\033[H"
ANSI_PUSH_SCREEN = "\033[?1049h"
ANSI_POP_SCREEN = "\033[?1049l"

ANSI_TEXT_BLUE = "\033[34m"
ANSI_TEXT_RED = "\033[31m"
ANSI_TEXT_GREEN = "\033[32m"
ANSI_TEXT_YELLOW = "\033[33m"
ANSI_TEXT_PURPLE = "\033[35m"
ANSI_TEXT_CYAN = "\033[36m"
ANSI_TEXT_WHITE = "\033[37m"
ANSI_TEXT_BLACK = "\033[30m"
ANSI_TEXT_GREY = "\033[90m"
ANSI_TEXT_BOLD = "\033[1m"

ANSI_BG_BLUE = "\033[44m"
ANSI_BG_RED = "\033[41m"
ANSI_BG_GREEN = "\033[42m"
ANSI_BG_DARK_GREEN = "\033[32m"
ANSI_BG_YELLOW = "\033[43m"
ANSI_BG_PURPLE = "\033[45m"
ANSI_BG_CYAN = "\033[46m"
ANSI_BG_WHITE = "\033[47m"
ANSI_BG_BLACK = "\033[40m"
ANSI_BG_GREY = "\033[100m"

ANSI_RESET = "\033[0m"

def __init__(self):
# terminal width and height from stty
width, height = shutil.get_terminal_size()
self.width = width
self.height = height

self.COLUMNS = [
{
"name": "ID",
"get": lambda data: safe_get(data, "id"),
"width": 8,
"align": "right",
},
{
"name": "Type",
"get": lambda data: safe_get(data, "type"),
"width": 40,
"align": "left",
},
{
"name": "Name",
"get": lambda data: safe_get(data, "name"),
"width": 0,
"align": "left",
},
{
"name": "Status",
"get": lambda data: safe_get(data, "peer", "status"),
"width": 20,
"align": "left",
},
{
"name": "Sent",
"get": lambda data: safe_get(data, "stats", "sent"),
"width": 8,
"align": "right",
},
{
"name": "Received",
"get": lambda data: safe_get(data, "stats", "recv"),
"width": 8,
"align": "right",
},
{
"name": "Latency",
# the unicode symbol for +- is \u00B1
"get": self.get_latency,
"width": 20,
"align": "right",
},
{
"name": "Send To",
"get": lambda data: ", ".join(
str(x) for x in safe_get(data, "send_to")
),
"width": 10,
},
]

# make Name column as wide as possible
self.COLUMNS[2]["width"] = (
self.width
- sum(x["width"] for x in self.COLUMNS)
- (3 * (len(self.COLUMNS) - 1))
)

def terminal_goto(self, x, y):
print("\033[%d;%dH" % (y, x), end="")

def get_latency(self, data):
avg = safe_get(data, "peer", "latency_ms", "average")
if not avg:
return ""
stddev = safe_get(data, "peer", "latency_ms", "stddev")
return f"{avg}ms \u00B1 {stddev}ms"

def print_table(self, table: list[list[str]]):
def format_cell(cell, column):
if column.get("align") == "right":
return "{:>{width}}".format(cell, width=column["width"])
else:
return "{:{width}}".format(cell, width=column["width"])

for idx, row in enumerate(table):
print(self.get_color_row(row, idx), end="")
print(
" | ".join(
format_cell(x, column) for x, column in zip(row, self.COLUMNS)
)
)
print(self.ANSI_RESET, end="")

def get_color_row(self, row, idx):
if idx == 0:
return self.ANSI_BG_CYAN + self.ANSI_TEXT_WHITE + self.ANSI_TEXT_BOLD
elif row[3] == "CONNECTED":
return self.ANSI_BG_GREEN + self.ANSI_TEXT_BLACK
elif row[3] == "CONNECTING":
return self.ANSI_BG_YELLOW + self.ANSI_TEXT_BLACK
elif row[3] == "":
return self.ANSI_BG_BLACK + self.ANSI_TEXT_GREY
else:
return "{:{width}}".format(cell, width=column["width"])

for idx, row in enumerate(table):
print(get_color_row(row, idx), end="")
print(" | ".join(format_cell(x, column) for x, column in zip(row, COLUMNS)))
print(ANSI_RESET, end="")
return self.ANSI_BG_GREY + self.ANSI_TEXT_BLACK

def print_header(self):
# write a header with the rtpmidid client, version, with color until the end of line
print(self.ANSI_BG_BLUE + self.ANSI_TEXT_BOLD + self.ANSI_TEXT_WHITE, end="")
self.print_padding("rtpmidid-cli v23.12")
# color until next newline
print(self.ANSI_RESET, end="")
print()

def print_footer(self):
self.terminal_goto(1, self.height)

print(self.ANSI_BG_BLUE + self.ANSI_TEXT_BOLD + self.ANSI_TEXT_WHITE, end="")
self.print_padding(
"Press Ctrl-C to exit | rtpmidid-cli v23.12 | (C) Coralbits 2023"
)
print(self.ANSI_RESET, end="")

def print_padding(self, text, count=None):
if count is None:
count = self.width

padchars = count - len(text)
if padchars < 0:
padchars = 0
print(text[:count] + " " * padchars, end="")

def top_loop(self, conn: Connection):
print(self.ANSI_PUSH_SCREEN, end="")
try:
while True:
print(self.ANSI_CLEAR_SCREEN, end="")
self.print_header()
data = conn.command({"method": "status"})
table = [[x["name"] for x in self.COLUMNS]]
peers = data["result"]["router"]
table.extend([[x["get"](peer) for x in self.COLUMNS] for peer in peers])
self.print_table(table)
self.print_footer()
print(flush=True, end="")
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
print(self.ANSI_POP_SCREEN, end="")


def get_color_row(row, idx):
if idx == 0:
return ANSI_BG_CYAN + ANSI_TEXT_WHITE + ANSI_TEXT_BOLD
elif row[3] == "CONNECTED":
return ANSI_BG_GREEN + ANSI_TEXT_BLACK
elif row[3] == "CONNECTING":
return ANSI_BG_YELLOW + ANSI_TEXT_BLACK
def main(argv):
settings = parse_arguments(argv)
if settings.control:
socketpath = settings.control
else:
return ANSI_BG_GREY + ANSI_TEXT_BLACK
socketpath = "/var/run/rtpmidid/control.sock"

if not os.path.exists(socketpath):
print("Control socket %s does not exist" % socketpath)
sys.exit(1)

def top_loop(conn: Connection):
print(ANSI_PUSH_SCREEN, end="")
try:
while True:
print(ANSI_CLEAR_SCREEN, end="")
data = conn.command({"method": "status"})
table = [[x["name"] for x in COLUMNS]]
peers = data["result"]["router"]
table.extend([[x["get"](peer) for x in COLUMNS] for peer in peers])
print_table(table)
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
print(ANSI_POP_SCREEN, end="")
conn = Connection(socketpath)
except Exception as e:
print(str(e))
sys.exit(1)

if settings.top:
return Top().top_loop(conn)

for cmd in parse_commands(settings.command or ["help"]):
print(">>> %s" % json.dumps(cmd), file=sys.stderr)
ret = conn.command(cmd)
print(json.dumps(ret, indent=2))


if __name__ == "__main__":
Expand Down

0 comments on commit 998cb86

Please sign in to comment.