Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(query) Memoize the part of the logical plan tree traversal for reduced memory allocation and faster planning #1874

Merged
merged 3 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conf/logback-dev.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</encoder>
</appender>

<logger name="filodb.coordinator" level="DEBUG" />
<logger name="filodb.coordinator" level="INFO" />
<logger name="filodb.core" level="DEBUG" />
<logger name="filodb.memory" level="DEBUG" />
<logger name="filodb.query" level="DEBUG" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ class ShardMapper(val numShards: Int) extends Serializable {
* Registers a new node to the given shards. Modifies state in place.
* Idempotent.
*/
private[coordinator] def registerNode(shards: Seq[Int], coordinator: ActorRef): Try[Unit] = {
def registerNode(shards: Seq[Int], coordinator: ActorRef): Try[Unit] = {
shards foreach {
case shard =>
//we always override the mapping. There was code earlier which prevent from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._

import akka.actor.ActorRef
import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
import com.typesafe.scalalogging.StrictLogging
import kamon.Kamon

Expand Down Expand Up @@ -65,6 +66,36 @@ class SingleClusterPlanner(val dataset: Dataset,
private val shardColumns = dsOptions.shardKeyColumns.sorted
private val dsRef = dataset.ref

private val shardPushdownCache: Option[Cache[(LogicalPlan, Option[Seq[Int]]), Option[Set[Int]]]] =
if (queryConfig.cachingConfig.singleClusterPlannerCachingEnabled) {
Some(
Caffeine.newBuilder()
.maximumSize(queryConfig.cachingConfig.singleClusterPlannerCachingSize)
.recordStats()
.build()
)
} else {
None
}


private val tSchemaChangingCache: Option[Cache[(Seq[ColumnFilter], Long, Long), Some[Boolean]]] =
amolnayak311 marked this conversation as resolved.
Show resolved Hide resolved
if (queryConfig.cachingConfig.singleClusterPlannerCachingEnabled) {
Some(
Caffeine.newBuilder()
.maximumSize(queryConfig.cachingConfig.singleClusterPlannerCachingSize)
.recordStats()
.build()
)
} else {
None
}

private[queryplanner] def invalidateCaches(): Unit = {
shardPushdownCache.foreach(_.invalidateAll())
tSchemaChangingCache.foreach(_.invalidateAll())
}

// failed failover counter captures failovers which are not possible because at least one shard
// is down both on the primary and DR clusters, the query will get executed only when the
// partial results are acceptable otherwise an exception is thrown
Expand All @@ -80,14 +111,9 @@ class SingleClusterPlanner(val dataset: Dataset,
qContext.plannerParams.targetSchemaProviderOverride.getOrElse(_targetSchemaProvider)
}

/**
* Returns true iff a target-schema:
* (1) matches any shard-key matched by the argument filters, and
* (2) changes between the argument timestamps.
*/
def isTargetSchemaChanging(shardKeyFilters: Seq[ColumnFilter],
startMs: Long, endMs: Long,
qContext: QueryContext): Boolean = {
private def isTargetSchemaChangingInner(shardKeyFilters: Seq[ColumnFilter],
startMs: Long, endMs: Long,
qContext: QueryContext): Boolean = {
val keyToValues = shardKeyFilters.map { filter =>
val values = filter match {
case ColumnFilter(col, regex: EqualsRegex) if QueryUtils.containsPipeOnlyRegex(regex.value.toString) =>
Expand All @@ -97,6 +123,7 @@ class SingleClusterPlanner(val dataset: Dataset,
}
(filter.column, values)
}.toMap

QueryUtils.makeAllKeyValueCombos(keyToValues).exists { shardKeys =>
// Replace any EqualsRegex shard-key filters with Equals.
val equalsFilters = shardKeys.map(entry => ColumnFilter(entry._1, Equals(entry._2))).toSeq
Expand All @@ -106,6 +133,25 @@ class SingleClusterPlanner(val dataset: Dataset,
}
}


/**
* Returns true iff a target-schema:
* (1) matches any shard-key matched by the argument filters, and
* (2) changes between the argument timestamps.
*/
def isTargetSchemaChanging(shardKeyFilters: Seq[ColumnFilter],
startMs: Long, endMs: Long,
qContext: QueryContext): Boolean = {
tSchemaChangingCache match {
case Some(cache) =>
cache.get((shardKeyFilters, startMs, endMs), _ => {
Some(isTargetSchemaChangingInner(shardKeyFilters, startMs, endMs, qContext))
}).getOrElse(true)
amolnayak311 marked this conversation as resolved.
Show resolved Hide resolved
case None =>
isTargetSchemaChangingInner(shardKeyFilters, startMs, endMs, qContext)
}
}

/**
* Returns true iff a target-schema should be used to identify query shards.
* A target-schema should be used iff all of:
Expand Down Expand Up @@ -515,11 +561,7 @@ class SingleClusterPlanner(val dataset: Dataset,
}
// scalastyle:on method.length

/**
* Returns a set of shards iff a plan can be pushed-down to each.
* See [[LogicalPlanUtils.getPushdownKeys]] for more info.
*/
private def getPushdownShards(qContext: QueryContext,
private def getPushdownShardsInner(qContext: QueryContext,
plan: LogicalPlan): Option[Set[Int]] = {
val getRawPushdownShards = (rs: RawSeries) => {
if (qContext.plannerParams.shardOverrides.isEmpty) {
Expand All @@ -534,6 +576,22 @@ class SingleClusterPlanner(val dataset: Dataset,
rs => LogicalPlan.getRawSeriesFilters(rs))
}

/**
* Returns a set of shards iff a plan can be pushed-down to each.
* See [[LogicalPlanUtils.getPushdownKeys]] for more info.
*/
private def getPushdownShards(qContext: QueryContext,
plan: LogicalPlan): Option[Set[Int]] = {
shardPushdownCache match {
case Some(cache) =>
cache.get((plan, qContext.plannerParams.shardOverrides), _ => {
getPushdownShardsInner(qContext, plan)
})
case None =>
getPushdownShardsInner(qContext, plan)
}
}

/**
* Materialize a BinaryJoin without the pushdown optimization.
* @param forceDispatcher If occupied, forces this BinaryJoin to be materialized with the dispatcher.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package filodb.coordinator.queryplanner

//scalastyle:off
import akka.actor.ActorSystem
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
Expand All @@ -21,10 +22,10 @@ import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers
import filodb.query.LogicalPlan.getRawSeriesFilters
import filodb.query.exec.aggregator.{CountRowAggregator, SumRowAggregator}
import org.scalatest.BeforeAndAfterEach
import org.scalatest.exceptions.TestFailedException
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper


import scala.concurrent.duration._

object SingleClusterPlannerSpec {
Expand All @@ -51,7 +52,8 @@ object SingleClusterPlannerSpec {
}
}

class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFutures with PlanValidationSpec {
class SingleClusterPlannerSpec extends AnyFunSpec
with Matchers with ScalaFutures with BeforeAndAfterEach with PlanValidationSpec {

implicit val system = ActorSystem()
private val node = TestProbe().ref
Expand All @@ -71,6 +73,10 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture
private val engine = new SingleClusterPlanner(dataset, schemas, mapperRef, earliestRetainedTimestampFn = 0,
queryConfig, "raw")

override def beforeEach(): Unit = {
engine.invalidateCaches()
}

/*
This is the PromQL

Expand Down Expand Up @@ -604,7 +610,6 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture
}

it ("should pushdown BinaryJoins/Aggregates when valid") {

def spread(filter: Seq[ColumnFilter]): Seq[SpreadChange] = {
Seq(SpreadChange(0, 1))
}
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/resources/filodb-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,11 @@ filodb {
# config to figure out the tenant column filter to enable max min column selection
max-min-tenant-column-filter = "_ws_"

single.cluster.cache {
enabled = true
# The maximum number of entries in the cache
cache-size = 2048
}
routing {
enable-remote-raw-exports = false
max-time-range-remote-raw-export = 180 minutes
Expand Down
14 changes: 12 additions & 2 deletions core/src/main/scala/filodb.core/query/QueryConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ object QueryConfig {
periodOfUncertaintyMs
)

val scCachingEnabled = queryConfig.as[Boolean]("single.cluster.cache.enabled")
val scCacheSize = queryConfig.as[Int]("single.cluster.cache.cache-size")
val cachingConfig = CachingConfig(scCachingEnabled, scCacheSize)

QueryConfig(askTimeout, staleSampleAfterMs, minStepMs, fastReduceMaxWindows, parser, translatePromToFilodbHistogram,
fasterRateEnabled, routingConfig.as[Option[String]]("partition_name"),
routingConfig.as[Option[Long]]("remote.http.timeout"),
Expand All @@ -52,7 +56,7 @@ object QueryConfig {
allowPartialResultsRangeQuery, allowPartialResultsMetadataQuery,
grpcDenyList.split(",").map(_.trim.toLowerCase).toSet,
None,
containerOverrides, rc)
containerOverrides, rc, cachingConfig)
}

import scala.concurrent.duration._
Expand Down Expand Up @@ -84,6 +88,11 @@ case class RoutingConfig(
periodOfUncertaintyMs: Long = (5 minutes).toMillis
)

case class CachingConfig(
singleClusterPlannerCachingEnabled: Boolean = true,
singleClusterPlannerCachingSize: Int = 2048
)

case class QueryConfig(askTimeout: FiniteDuration,
staleSampleAfterMs: Long,
minStepMs: Long,
Expand All @@ -102,4 +111,5 @@ case class QueryConfig(askTimeout: FiniteDuration,
grpcPartitionsDenyList: Set[String] = Set.empty,
plannerSelector: Option[String] = None,
recordContainerOverrides: Map[String, Int] = Map.empty,
routingConfig: RoutingConfig = RoutingConfig())
routingConfig: RoutingConfig = RoutingConfig(),
cachingConfig: CachingConfig = CachingConfig())
27 changes: 19 additions & 8 deletions core/src/main/scala/filodb.core/query/QueryUtils.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
package filodb.core.query

import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
* Storage for utility functions.
*/
object QueryUtils {
val REGEX_CHARS = Array('.', '?', '+', '*', '|', '{', '}', '[', ']', '(', ')', '"', '\\')
val REGEX_CHARS: Array[Char] = Array('.', '?', '+', '*', '|', '{', '}', '[', ']', '(', ')', '"', '\\')
private val COMBO_CACHE_SIZE = 2048
amolnayak311 marked this conversation as resolved.
Show resolved Hide resolved

private val regexCharsMinusPipe = (REGEX_CHARS.toSet - '|').toArray

private val comboCache: Cache[Map[String, Seq[String]], Seq[Map[String, String]]] =
Caffeine.newBuilder()
.maximumSize(COMBO_CACHE_SIZE)
.recordStats()
.build()


/**
* Returns true iff the argument string contains any special regex chars.
*/
Expand Down Expand Up @@ -72,7 +81,7 @@ object QueryUtils {
splits.append(left)
remaining = right
// count of all characters before the remaining suffix (+1 to account for pipe)
offset = offset + left.size + 1
offset = offset + left.length + 1
}
splits.append(remaining)
splits
Expand All @@ -89,11 +98,13 @@ object QueryUtils {
def makeAllKeyValueCombos(keyToValues: Map[String, Seq[String]]): Seq[Map[String, String]] = {
// Store the entries with some order, then find all possible value combos s.t. each combo's
// ith value is a value of the ith key.
val entries = keyToValues.toSeq
val keys = entries.map(_._1)
val vals = entries.map(_._2.toSeq)
val combos = QueryUtils.combinations(vals)
// Zip the ordered keys with the ordered values.
combos.map(keys.zip(_).toMap)
comboCache.get(keyToValues, _ => {
val entries = keyToValues.toSeq
val keys = entries.map(_._1)
val vals = entries.map(_._2.toSeq)
val combos = QueryUtils.combinations(vals)
// Zip the ordered keys with the ordered values.
combos.map(keys.zip(_).toMap)
})
}
}
9 changes: 8 additions & 1 deletion core/src/test/resources/application_test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,14 @@ filodb {
}
grpc {
partitions-deny-list = ""
}
}

single.cluster.cache {
enabled = true
# The maximum number of entries in the cache
cache-size = 2048
}

routing {
enable-remote-raw-exports = false
max-time-range-remote-raw-export = 30 minutes
Expand Down
Loading
Loading