diff --git a/platform_plugin_aspects/management/commands/monitor_load_test_tracking.py b/platform_plugin_aspects/management/commands/monitor_load_test_tracking.py index b014ee4..f007530 100644 --- a/platform_plugin_aspects/management/commands/monitor_load_test_tracking.py +++ b/platform_plugin_aspects/management/commands/monitor_load_test_tracking.py @@ -138,28 +138,6 @@ def get_redis_bus_stats(self): r = redis.Redis(host="redis", port=6379, db=0) info = r.xinfo_stream("openedx-analytics", full=True) - """ - xinfo_stream returns a large structure, we are mostly looking at this - section: - 'groups': [{'consumers': [ - 0 [b'name', - 1 b'aspects', - 2 b'seen-time', - 3 1709152586342, - 4 b'active-time', - 5 1709152586342, - 6 b'pel-count', - 7 0, - 8 b'pending', - 9 []]], - 'entries-read': None, - 'lag': 0, - 'last-delivered-id': b'1707234536710-0', - 'name': b'event_routing_backends', - 'pel-count': 0, - 'pending': []}], - """ - lag = [] for g in info["groups"]: lag.append({str(g["name"]): g["lag"]}) @@ -174,30 +152,27 @@ def get_redis_bus_stats(self): return consumer_stats def get_kafka_bus_stats(self): + # This isn't always installed, but should be if the Kafka bus is on import confluent_kafka brokers = settings.EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS topic = f"{settings.EVENT_BUS_TOPIC_PREFIX}-analytics" group = "analytics-service" - # Create consumer. # This consumer will not join the group, but the group.id is required by # committed() to know which group to get offsets for. - consumer = confluent_kafka.Consumer({'bootstrap.servers': brokers, - 'group.id': group}) - - print("%-50s %9s %9s" % ("Topic [Partition]", "Committed", "Lag")) - print("=" * 72) + consumer = confluent_kafka.Consumer({ + 'bootstrap.servers': brokers, + 'group.id': group + }) # Get the topic's partitions metadata = consumer.list_topics(topic, timeout=10) + if metadata.topics[topic].error is not None: - raise confluent_kafka.KafkaException(metadata.topics[topic].error) + print(metadata.topics[topic].error) - # Construct TopicPartition list of partitions to query partitions = [confluent_kafka.TopicPartition(topic, p) for p in metadata.topics[topic].partitions] - - # Query committed offsets for this group and the given partitions committed = consumer.committed(partitions, timeout=10) consumer_stats = { @@ -207,25 +182,19 @@ def get_kafka_bus_stats(self): for partition in committed: # Get the partitions low and high watermark offsets. - (lo, hi) = consumer.get_watermark_offsets(partition, timeout=10, cached=False) - - if partition.offset == confluent_kafka.OFFSET_INVALID: - offset = "-" - else: - offset = "%d" % partition.offset + low, high = consumer.get_watermark_offsets(partition, timeout=10, cached=False) - if hi < 0: + if high < 0: lag = "no hwmark" # Unlikely elif partition.offset < 0: # No committed offset, show total message count as lag. # The actual message count may be lower due to compaction # and record deletions. - lag = "%d" % (hi - lo) + lag = high - low else: - lag = "%d" % (hi - partition.offset) + lag = high - partition.offset - print("%-50s %9s %9s" % ( - "{} [{}]".format(partition.topic, partition.partition), offset, lag)) + print(f"Kafka bus lag: {lag}") consumer_stats["partitions"].append({ "partition": partition.partition,