Skip to content

Commit

Permalink
fix(store): fix lag related metrics
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits committed Oct 21, 2023
1 parent f4d6967 commit f2e7246
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class StoreMetricsConstant {
public static final String COUNTER_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL = "rocketmq_send_to_dlq_messages_total";

public static final String LABEL_TOPIC = "topic";
public static final String LABEL_QUEUE_ID = "queue_id";
public static final String LABEL_CONSUMER_GROUP = "consumer_group";
public static final String LABEL_IS_RETRY = "is_retry";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static com.automq.rocketmq.store.metrics.StoreMetricsConstant.GAUGE_CONSUMER_READY_MESSAGES;
import static com.automq.rocketmq.store.metrics.StoreMetricsConstant.LABEL_CONSUMER_GROUP;
import static com.automq.rocketmq.store.metrics.StoreMetricsConstant.LABEL_IS_RETRY;
import static com.automq.rocketmq.store.metrics.StoreMetricsConstant.LABEL_QUEUE_ID;
import static com.automq.rocketmq.store.metrics.StoreMetricsConstant.LABEL_TOPIC;

public class StoreMetricsManager extends ServiceThread implements MetricsManager {
Expand Down Expand Up @@ -111,7 +112,7 @@ public void run() {

logicQueue.retryStreamIdMap().forEach((consumerGroupId, retryStreamIdFuture) -> {
long confirmOffset = streamStore.confirmOffset(logicQueue.dataStreamId());
long consumeOffset = logicQueue.getRetryConsumeOffset(consumerGroupId);
long consumeOffset = logicQueue.getConsumeOffset(consumerGroupId);
int inflightCount = logicQueue.getInflightStats(consumerGroupId);
// TODO: build lag record for retry stream
LagRecord record = new LagRecord(logicQueue.topicId(), logicQueue.queueId(), consumerGroupId, false,
Expand All @@ -130,6 +131,7 @@ private Attributes buildLagAttributes(LagRecord record) {
AttributesBuilder attributesBuilder = newAttributesBuilder();
attributesBuilder.put(LABEL_CONSUMER_GROUP, record.consumerGroupId());
attributesBuilder.put(LABEL_TOPIC, record.topicId());
attributesBuilder.put(LABEL_QUEUE_ID, record.queueId());
attributesBuilder.put(LABEL_IS_RETRY, record.retry());
return attributesBuilder.build();
}
Expand Down

0 comments on commit f2e7246

Please sign in to comment.