Skip to content

Commit

Permalink
fix: Add Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
bmtcril committed Mar 13, 2024
1 parent 154b473 commit e273c4a
Showing 1 changed file with 12 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,28 +138,6 @@ def get_redis_bus_stats(self):
r = redis.Redis(host="redis", port=6379, db=0)
info = r.xinfo_stream("openedx-analytics", full=True)

Check warning on line 139 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L138-L139

Added lines #L138 - L139 were not covered by tests

"""
xinfo_stream returns a large structure, we are mostly looking at this
section:
'groups': [{'consumers': [
0 [b'name',
1 b'aspects',
2 b'seen-time',
3 1709152586342,
4 b'active-time',
5 1709152586342,
6 b'pel-count',
7 0,
8 b'pending',
9 []]],
'entries-read': None,
'lag': 0,
'last-delivered-id': b'1707234536710-0',
'name': b'event_routing_backends',
'pel-count': 0,
'pending': []}],
"""

lag = []

Check warning on line 141 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L141

Added line #L141 was not covered by tests
for g in info["groups"]:
lag.append({str(g["name"]): g["lag"]})

Check warning on line 143 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L143

Added line #L143 was not covered by tests
Expand All @@ -174,30 +152,27 @@ def get_redis_bus_stats(self):
return consumer_stats

Check warning on line 152 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L152

Added line #L152 was not covered by tests

def get_kafka_bus_stats(self):

Check warning on line 154 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L154

Added line #L154 was not covered by tests
# This isn't always installed, but should be if the Kafka bus is on
import confluent_kafka

Check warning on line 156 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L156

Added line #L156 was not covered by tests

brokers = settings.EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS
topic = f"{settings.EVENT_BUS_TOPIC_PREFIX}-analytics"
group = "analytics-service"

Check warning on line 160 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L158-L160

Added lines #L158 - L160 were not covered by tests

# Create consumer.
# This consumer will not join the group, but the group.id is required by
# committed() to know which group to get offsets for.
consumer = confluent_kafka.Consumer({'bootstrap.servers': brokers,
'group.id': group})

print("%-50s %9s %9s" % ("Topic [Partition]", "Committed", "Lag"))
print("=" * 72)
consumer = confluent_kafka.Consumer({

Check warning on line 164 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L164

Added line #L164 was not covered by tests
'bootstrap.servers': brokers,
'group.id': group
})

# Get the topic's partitions
metadata = consumer.list_topics(topic, timeout=10)

Check warning on line 170 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L170

Added line #L170 was not covered by tests

if metadata.topics[topic].error is not None:
raise confluent_kafka.KafkaException(metadata.topics[topic].error)
print(metadata.topics[topic].error)

Check warning on line 173 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L173

Added line #L173 was not covered by tests

# Construct TopicPartition list of partitions to query
partitions = [confluent_kafka.TopicPartition(topic, p) for p in metadata.topics[topic].partitions]

# Query committed offsets for this group and the given partitions
committed = consumer.committed(partitions, timeout=10)

Check warning on line 176 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L176

Added line #L176 was not covered by tests

consumer_stats = {

Check warning on line 178 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L178

Added line #L178 was not covered by tests
Expand All @@ -207,25 +182,19 @@ def get_kafka_bus_stats(self):

for partition in committed:
# Get the partitions low and high watermark offsets.
(lo, hi) = consumer.get_watermark_offsets(partition, timeout=10, cached=False)

if partition.offset == confluent_kafka.OFFSET_INVALID:
offset = "-"
else:
offset = "%d" % partition.offset
low, high = consumer.get_watermark_offsets(partition, timeout=10, cached=False)

Check warning on line 185 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L185

Added line #L185 was not covered by tests

if hi < 0:
if high < 0:
lag = "no hwmark" # Unlikely

Check warning on line 188 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L188

Added line #L188 was not covered by tests
elif partition.offset < 0:
# No committed offset, show total message count as lag.
# The actual message count may be lower due to compaction
# and record deletions.
lag = "%d" % (hi - lo)
lag = high - low

Check warning on line 193 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L193

Added line #L193 was not covered by tests
else:
lag = "%d" % (hi - partition.offset)
lag = high - partition.offset

Check warning on line 195 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L195

Added line #L195 was not covered by tests

print("%-50s %9s %9s" % (
"{} [{}]".format(partition.topic, partition.partition), offset, lag))
print(f"Kafka bus lag: {lag}")

Check warning on line 197 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L197

Added line #L197 was not covered by tests

consumer_stats["partitions"].append({

Check warning on line 199 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L199

Added line #L199 was not covered by tests
"partition": partition.partition,
Expand Down

0 comments on commit e273c4a

Please sign in to comment.