Skip to content

Commit

Permalink
Add metrics to auto scale based on indexing pressure (#904)
Browse files Browse the repository at this point in the history
* Add metrics indexing_pressure.memory.limit_in_bytes and indexing_pressure.memory.current.current.all_in_bytes to allow auto-scaling based on how close the cluster nodes are to dropping indexing requests due to the indxing request memory buffer reaching capacity.

Signed-off-by: emilandresentac <[email protected]>

* Reduce labels per metric for indexing pressure metrics to cluster, node, and name to save on storage space.

Signed-off-by: emilandresentac <[email protected]>

---------

Signed-off-by: emilandresentac <[email protected]>
  • Loading branch information
tac-emil-andresen authored Jul 11, 2024
1 parent bf89cef commit d13c555
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 17 deletions.
61 changes: 61 additions & 0 deletions collector/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ var (
defaultRoleLabels = []string{"cluster", "host", "name"}
defaultThreadPoolLabels = append(defaultNodeLabels, "type")
defaultBreakerLabels = append(defaultNodeLabels, "breaker")
defaultIndexingPressureLabels = []string{"cluster", "host", "name", "indexing_pressure"}
defaultFilesystemDataLabels = append(defaultNodeLabels, "mount", "path")
defaultFilesystemIODeviceLabels = append(defaultNodeLabels, "device")
defaultCacheLabels = append(defaultNodeLabels, "cache")
Expand Down Expand Up @@ -150,6 +151,13 @@ type breakerMetric struct {
Labels func(cluster string, node NodeStatsNodeResponse, breaker string) []string
}

type indexingPressureMetric struct {
Type prometheus.ValueType
Desc *prometheus.Desc
Value func(indexingPressureStats NodeStatsIndexingPressureResponse) float64
Labels func(cluster string, node NodeStatsNodeResponse, indexingPressure string) []string
}

type threadPoolMetric struct {
Type prometheus.ValueType
Desc *prometheus.Desc
Expand Down Expand Up @@ -185,6 +193,7 @@ type Nodes struct {
nodeMetrics []*nodeMetric
gcCollectionMetrics []*gcCollectionMetric
breakerMetrics []*breakerMetric
indexingPressureMetrics []*indexingPressureMetric
threadPoolMetrics []*threadPoolMetric
filesystemDataMetrics []*filesystemDataMetric
filesystemIODeviceMetrics []*filesystemIODeviceMetric
Expand Down Expand Up @@ -1607,6 +1616,46 @@ func NewNodes(logger log.Logger, client *http.Client, url *url.URL, all bool, no
},
},
},
indexingPressureMetrics: []*indexingPressureMetric{
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "indexing_pressure", "current_all_in_bytes"),
"Memory consumed, in bytes, by indexing requests in the coordinating, primary, or replica stage.",
defaultIndexingPressureLabels, nil,
),
Value: func(indexingPressureMem NodeStatsIndexingPressureResponse) float64 {
return float64(indexingPressureMem.Current.AllInBytes)
},
Labels: func(cluster string, node NodeStatsNodeResponse, indexingPressure string) []string {
return []string{
cluster,
node.Host,
node.Name,
indexingPressure,
}
},
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "indexing_pressure", "limit_in_bytes"),
"Configured memory limit, in bytes, for the indexing requests",
defaultIndexingPressureLabels, nil,
),
Value: func(indexingPressureStats NodeStatsIndexingPressureResponse) float64 {
return float64(indexingPressureStats.LimitInBytes)
},
Labels: func(cluster string, node NodeStatsNodeResponse, indexingPressure string) []string {
return []string{
cluster,
node.Host,
node.Name,
indexingPressure,
}
},
},
},
threadPoolMetrics: []*threadPoolMetric{
{
Type: prometheus.CounterValue,
Expand Down Expand Up @@ -1919,6 +1968,18 @@ func (c *Nodes) Collect(ch chan<- prometheus.Metric) {
}
}

// Indexing Pressure stats
for indexingPressure, ipstats := range node.IndexingPressure {
for _, metric := range c.indexingPressureMetrics {
ch <- prometheus.MustNewConstMetric(
metric.Desc,
metric.Type,
metric.Value(ipstats),
metric.Labels(nodeStatsResp.ClusterName, node, indexingPressure)...,
)
}
}

// Thread Pool stats
for pool, pstats := range node.ThreadPool {
for _, metric := range c.threadPoolMetrics {
Expand Down
46 changes: 29 additions & 17 deletions collector/nodes_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,24 @@ type nodeStatsResponse struct {

// NodeStatsNodeResponse defines node stats information structure for nodes
type NodeStatsNodeResponse struct {
Name string `json:"name"`
Host string `json:"host"`
Timestamp int64 `json:"timestamp"`
TransportAddress string `json:"transport_address"`
Hostname string `json:"hostname"`
Roles []string `json:"roles"`
Attributes map[string]string `json:"attributes"`
Indices NodeStatsIndicesResponse `json:"indices"`
OS NodeStatsOSResponse `json:"os"`
Network NodeStatsNetworkResponse `json:"network"`
FS NodeStatsFSResponse `json:"fs"`
ThreadPool map[string]NodeStatsThreadPoolPoolResponse `json:"thread_pool"`
JVM NodeStatsJVMResponse `json:"jvm"`
Breakers map[string]NodeStatsBreakersResponse `json:"breakers"`
HTTP map[string]interface{} `json:"http"`
Transport NodeStatsTransportResponse `json:"transport"`
Process NodeStatsProcessResponse `json:"process"`
Name string `json:"name"`
Host string `json:"host"`
Timestamp int64 `json:"timestamp"`
TransportAddress string `json:"transport_address"`
Hostname string `json:"hostname"`
Roles []string `json:"roles"`
Attributes map[string]string `json:"attributes"`
Indices NodeStatsIndicesResponse `json:"indices"`
OS NodeStatsOSResponse `json:"os"`
Network NodeStatsNetworkResponse `json:"network"`
FS NodeStatsFSResponse `json:"fs"`
ThreadPool map[string]NodeStatsThreadPoolPoolResponse `json:"thread_pool"`
JVM NodeStatsJVMResponse `json:"jvm"`
Breakers map[string]NodeStatsBreakersResponse `json:"breakers"`
HTTP map[string]interface{} `json:"http"`
Transport NodeStatsTransportResponse `json:"transport"`
Process NodeStatsProcessResponse `json:"process"`
IndexingPressure map[string]NodeStatsIndexingPressureResponse `json:"indexing_pressure"`
}

// NodeStatsBreakersResponse is a representation of a statistics about the field data circuit breaker
Expand All @@ -50,6 +51,17 @@ type NodeStatsBreakersResponse struct {
Tripped int64 `json:"tripped"`
}

// NodeStatsIndexingPressureResponse is a representation of a elasticsearch indexing pressure
type NodeStatsIndexingPressureResponse struct {
Current NodeStatsIndexingPressureCurrentResponse `json:"current"`
LimitInBytes int64 `json:"limit_in_bytes"`
}

// NodeStatsIndexingPressureMemoryCurrentResponse is a representation of a elasticsearch indexing pressure current memory usage
type NodeStatsIndexingPressureCurrentResponse struct {
AllInBytes int64 `json:"all_in_bytes"`
}

// NodeStatsJVMResponse is a representation of a JVM stats, memory pool information, garbage collection, buffer pools, number of loaded/unloaded classes
type NodeStatsJVMResponse struct {
BufferPools map[string]NodeStatsJVMBufferPoolResponse `json:"buffer_pools"`
Expand Down

0 comments on commit d13c555

Please sign in to comment.