Skip to content

Commit

Permalink
Added support for snapshot related metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
psmitj committed Aug 9, 2024
1 parent 486ca19 commit 8c581c8
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 12 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ dependencies {

restResources {
restApi {
includeCore '_common', 'cat', 'cluster', 'nodes', 'indices', 'index'
includeCore '_common', 'cat', 'cluster', 'nodes', 'indices', 'index', 'snapshot'
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.compuscene.metrics.prometheus;

import org.opensearch.action.ClusterStatsData;
import org.opensearch.action.SnapshotsResponse;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.indices.stats.CommonStats;
Expand All @@ -37,6 +38,8 @@
import org.opensearch.monitor.os.OsStats;
import org.opensearch.monitor.process.ProcessStats;
import org.opensearch.script.ScriptStats;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.threadpool.ThreadPoolStats;
import org.opensearch.transport.TransportStats;

Expand All @@ -54,19 +57,23 @@ public class PrometheusMetricsCollector {

private boolean isPrometheusClusterSettings;
private boolean isPrometheusIndices;
private boolean isPrometheusSnapshots;
private PrometheusMetricsCatalog catalog;

/**
* A constructor.
* @param catalog {@link PrometheusMetricsCatalog}
* @param isPrometheusIndices boolean flag for index level metric
* @param isPrometheusSnapshots boolean flag for snapshots related metrics
* @param isPrometheusClusterSettings boolean flag cluster settings metrics
*/
public PrometheusMetricsCollector(PrometheusMetricsCatalog catalog,
boolean isPrometheusIndices,
boolean isPrometheusSnapshots,
boolean isPrometheusClusterSettings) {
this.isPrometheusClusterSettings = isPrometheusClusterSettings;
this.isPrometheusIndices = isPrometheusIndices;
this.isPrometheusSnapshots = isPrometheusSnapshots;
this.catalog = catalog;
}

Expand All @@ -80,6 +87,7 @@ public void registerMetrics() {
registerNodeMetrics();
registerIndicesMetrics();
registerPerIndexMetrics();
registerSnapshotMetrics();
registerTransportMetrics();
registerHTTPMetrics();
registerThreadPoolMetrics();
Expand Down Expand Up @@ -465,6 +473,30 @@ private void updatePerIndexMetrics(@Nullable ClusterHealthResponse chr, @Nullabl
}
}

@SuppressWarnings("checkstyle:LineLength")
private void registerSnapshotMetrics() {
catalog.registerClusterGauge("min_snapshot_age", "Time elapsed in milliseconds since the most recent successful snapshot's start time", "sm_policy");
}

private void updateSnapshotsMetrics(@Nullable SnapshotsResponse snapshotsResponse) {
if (snapshotsResponse == null) {
return;
}
Map<String, Long> smPolicyMinSnapshotAge = new HashMap<>();
for (SnapshotInfo snapshotInfo : snapshotsResponse.getSnapshotInfos()) {
// emit snapshot_age metric only for successful snapshots
if (snapshotInfo.state() != SnapshotState.SUCCESS) {
continue;
}
String smPolicy = snapshotInfo.userMetadata() == null ? "adhoc" : snapshotInfo.userMetadata().getOrDefault("sm_policy", "adhoc").toString();
long snapshotAge = System.currentTimeMillis() - snapshotInfo.startTime();
smPolicyMinSnapshotAge.compute(smPolicy, (key, oldValue) -> oldValue == null ? snapshotAge : Math.min(oldValue, snapshotAge));
}
for(Map.Entry<String, Long> entry : smPolicyMinSnapshotAge.entrySet()) {
catalog.setClusterGauge("min_snapshot_age", entry.getValue(), entry.getKey());
}
}

@SuppressWarnings("checkstyle:LineLength")
private void updatePerIndexContextMetrics(String indexName, String context, CommonStats idx) {
catalog.setClusterGauge("index_doc_number", idx.getDocs().getCount(), indexName, context);
Expand Down Expand Up @@ -920,12 +952,14 @@ private void updateESSettings(@Nullable ClusterStatsData stats) {
* @param nodeStats NodeStats filtered using nodes filter
* @param indicesStats IndicesStatsResponse
* @param clusterStatsData ClusterStatsData
* @param snapshotsResponse SnapshotsResponse
*/
public void updateMetrics(String originNodeName, String originNodeId,
@Nullable ClusterHealthResponse clusterHealthResponse,
NodeStats[] nodeStats,
@Nullable IndicesStatsResponse indicesStats,
@Nullable ClusterStatsData clusterStatsData) {
@Nullable ClusterStatsData clusterStatsData,
@Nullable SnapshotsResponse snapshotsResponse) {
Summary.Timer timer = catalog.startSummaryTimer(
new Tuple<>(originNodeName, originNodeId),
"metrics_generate_time_seconds");
Expand Down Expand Up @@ -956,7 +990,9 @@ public void updateMetrics(String originNodeName, String originNodeId,
if (isPrometheusClusterSettings) {
updateESSettings(clusterStatsData);
}

if (isPrometheusSnapshots) {
updateSnapshotsMetrics(snapshotsResponse);
}
timer.observeDuration();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public enum INDEX_FILTER_OPTIONS {

static String PROMETHEUS_CLUSTER_SETTINGS_KEY = "prometheus.cluster.settings";
static String PROMETHEUS_INDICES_KEY = "prometheus.indices";
static String PROMETHEUS_SNAPSHOTS_KEY = "prometheus.snapshots";
static String PROMETHEUS_NODES_FILTER_KEY = "prometheus.nodes.filter";
static String PROMETHEUS_SELECTED_INDICES_KEY = "prometheus.indices_filter.selected_indices";
static String PROMETHEUS_SELECTED_OPTION_KEY = "prometheus.indices_filter.selected_option";
Expand All @@ -69,6 +70,14 @@ public enum INDEX_FILTER_OPTIONS {
Setting.boolSetting(PROMETHEUS_INDICES_KEY, true,
Setting.Property.Dynamic, Setting.Property.NodeScope);

/**
* This setting is used configure weather to expose snapshot metrics or not. The default value is false.
* Can be configured in opensearch.yml file or update dynamically under key {@link #PROMETHEUS_SNAPSHOTS_KEY}.
*/
public static final Setting<Boolean> PROMETHEUS_SNAPSHOTS =
Setting.boolSetting(PROMETHEUS_SNAPSHOTS_KEY, false,
Setting.Property.Dynamic, Setting.Property.NodeScope);

/**
* This setting is used configure which cluster nodes to gather metrics from. The default value is _local.
* Can be configured in opensearch.yml file or update dynamically under key {@link #PROMETHEUS_NODES_FILTER_KEY}.
Expand Down Expand Up @@ -97,6 +106,7 @@ public enum INDEX_FILTER_OPTIONS {

private volatile boolean clusterSettings;
private volatile boolean indices;
private volatile boolean snapshots;
private volatile String nodesFilter;
private volatile String selectedIndices;
private volatile INDEX_FILTER_OPTIONS selectedOption;
Expand All @@ -109,11 +119,13 @@ public enum INDEX_FILTER_OPTIONS {
public PrometheusSettings(Settings settings, ClusterSettings clusterSettings) {
setPrometheusClusterSettings(PROMETHEUS_CLUSTER_SETTINGS.get(settings));
setPrometheusIndices(PROMETHEUS_INDICES.get(settings));
setPrometheusSnapshots(PROMETHEUS_SNAPSHOTS.get(settings));
setPrometheusNodesFilter(PROMETHEUS_NODES_FILTER.get(settings));
setPrometheusSelectedIndices(PROMETHEUS_SELECTED_INDICES.get(settings));
setPrometheusSelectedOption(PROMETHEUS_SELECTED_OPTION.get(settings));
clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_CLUSTER_SETTINGS, this::setPrometheusClusterSettings);
clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_INDICES, this::setPrometheusIndices);
clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_SNAPSHOTS, this::setPrometheusSnapshots);
clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_NODES_FILTER, this::setPrometheusNodesFilter);
clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_SELECTED_INDICES, this::setPrometheusSelectedIndices);
clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_SELECTED_OPTION, this::setPrometheusSelectedOption);
Expand All @@ -127,6 +139,10 @@ private void setPrometheusIndices(boolean flag) {
this.indices = flag;
}

private void setPrometheusSnapshots(boolean flag) {
this.snapshots = flag;
}

private void setPrometheusNodesFilter(String filter) { this.nodesFilter = filter; }

private void setPrometheusSelectedIndices(String selectedIndices) {
Expand All @@ -153,6 +169,14 @@ public boolean getPrometheusIndices() {
return this.indices;
}

/**
* Get value of settings key {@link #PROMETHEUS_SNAPSHOTS_KEY}.
* @return boolean value of the key
*/
public boolean getPrometheusSnapshots() {
return this.snapshots;
}

/**
* Get value of settings key {@link #PROMETHEUS_NODES_FILTER_KEY}.
* @return boolean value of the key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class NodePrometheusMetricsResponse extends ActionResponse {
private final NodeStats[] nodeStats;
@Nullable private final IndicesStatsResponse indicesStats;
private ClusterStatsData clusterStatsData = null;
@Nullable private final SnapshotsResponse snapshotsResponse;

/**
* A constructor that materialize the instance from inputStream.
Expand All @@ -56,6 +57,7 @@ public NodePrometheusMetricsResponse(StreamInput in) throws IOException {
nodeStats = in.readArray(NodeStats::new, NodeStats[]::new);
indicesStats = PackageAccessHelper.createIndicesStatsResponse(in);
clusterStatsData = new ClusterStatsData(in);
snapshotsResponse = new SnapshotsResponse(in);
}

/**
Expand All @@ -65,6 +67,7 @@ public NodePrometheusMetricsResponse(StreamInput in) throws IOException {
* @param nodesStats NodesStats
* @param indicesStats IndicesStats
* @param clusterStateResponse ClusterStateResponse
* @param snapshotsResponse SnapshotsResponse
* @param settings Settings
* @param clusterSettings ClusterSettings
*/
Expand All @@ -73,6 +76,7 @@ public NodePrometheusMetricsResponse(ClusterHealthResponse clusterHealth,
NodeStats[] nodesStats,
@Nullable IndicesStatsResponse indicesStats,
@Nullable ClusterStateResponse clusterStateResponse,
@Nullable SnapshotsResponse snapshotsResponse,
Settings settings,
ClusterSettings clusterSettings) {
this.clusterHealth = clusterHealth;
Expand All @@ -82,6 +86,7 @@ public NodePrometheusMetricsResponse(ClusterHealthResponse clusterHealth,
if (clusterStateResponse != null) {
this.clusterStatsData = new ClusterStatsData(clusterStateResponse, settings, clusterSettings);
}
this.snapshotsResponse = snapshotsResponse;
}

/**
Expand All @@ -106,6 +111,15 @@ public NodeStats[] getNodeStats() {
return this.nodeStats;
}

/**
* Get internal {@link SnapshotsResponse} object.
* @return SnapshotsResponse object
*/
@Nullable
public SnapshotsResponse getSnapshotsResponse() {
return this.snapshotsResponse;
}

/**
* Get internal {@link IndicesStatsResponse} object.
* @return IndicesStatsResponse object
Expand All @@ -131,5 +145,6 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeArray(nodeStats);
out.writeOptionalWriteable(indicesStats);
clusterStatsData.writeTo(out);
snapshotsResponse.writeTo(out);
}
}
74 changes: 74 additions & 0 deletions src/main/java/org/opensearch/action/SnapshotsResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright [2018] [Vincent VAN HOLLEBEKE]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.opensearch.action;

import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.snapshots.SnapshotInfo;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/**
* Represents a container class for holding response data related to snapshots.
*/
public class SnapshotsResponse extends ActionResponse {
private final List<SnapshotInfo> snapshotInfos;

/**
* A constructor.
* @param in A streamInput to materialize the instance from
* @throws IOException if there is an exception reading from inputStream
*/
public SnapshotsResponse(StreamInput in) throws IOException {
super(in);
snapshotInfos = in.readList(SnapshotInfo::new);
}

/**
* A constructor.
*
* @param snapshotInfos A list of {@link SnapshotInfo} objects to initialize the instance with.
*/
public SnapshotsResponse(List<SnapshotInfo> snapshotInfos) {
this.snapshotInfos = Collections.unmodifiableList(snapshotInfos);
}

/**
* Writes the instance into {@link StreamOutput}.
*
* @param out the output stream to which the instance is to be written
* @throws IOException if there is an exception writing to the output stream
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(snapshotInfos);
}

/**
* Getter for {@code snapshotInfos} list.
* The returned list is unmodifiable to ensure immutability.
*
* @return the list of {@link SnapshotInfo} objects
*/
public List<SnapshotInfo> getSnapshotInfos() {
return snapshotInfos;
}
}
Loading

0 comments on commit 8c581c8

Please sign in to comment.