diff --git a/conf/logback-dev.xml b/conf/logback-dev.xml
index 94a9c3fe14..8c0e6612a2 100644
--- a/conf/logback-dev.xml
+++ b/conf/logback-dev.xml
@@ -8,7 +8,7 @@
-
+
diff --git a/coordinator/src/main/scala/filodb.coordinator/ShardMapper.scala b/coordinator/src/main/scala/filodb.coordinator/ShardMapper.scala
index 07390f6a65..e7114fe8b8 100644
--- a/coordinator/src/main/scala/filodb.coordinator/ShardMapper.scala
+++ b/coordinator/src/main/scala/filodb.coordinator/ShardMapper.scala
@@ -246,7 +246,7 @@ class ShardMapper(val numShards: Int) extends Serializable {
* Registers a new node to the given shards. Modifies state in place.
* Idempotent.
*/
- private[coordinator] def registerNode(shards: Seq[Int], coordinator: ActorRef): Try[Unit] = {
+ def registerNode(shards: Seq[Int], coordinator: ActorRef): Try[Unit] = {
shards foreach {
case shard =>
//we always override the mapping. There was code earlier which prevent from
diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala
index d457a2710a..68370e4ea5 100644
--- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala
+++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala
@@ -4,6 +4,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import akka.actor.ActorRef
+import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
import com.typesafe.scalalogging.StrictLogging
import kamon.Kamon
@@ -65,6 +66,36 @@ class SingleClusterPlanner(val dataset: Dataset,
private val shardColumns = dsOptions.shardKeyColumns.sorted
private val dsRef = dataset.ref
+ private val shardPushdownCache: Option[Cache[(LogicalPlan, Option[Seq[Int]]), Option[Set[Int]]]] =
+ if (queryConfig.cachingConfig.singleClusterPlannerCachingEnabled) {
+ Some(
+ Caffeine.newBuilder()
+ .maximumSize(queryConfig.cachingConfig.singleClusterPlannerCachingSize)
+ .recordStats()
+ .build()
+ )
+ } else {
+ None
+ }
+
+
+ private val tSchemaChangingCache: Option[Cache[(Seq[ColumnFilter], Long, Long), Some[Boolean]]] =
+ if (queryConfig.cachingConfig.singleClusterPlannerCachingEnabled) {
+ Some(
+ Caffeine.newBuilder()
+ .maximumSize(queryConfig.cachingConfig.singleClusterPlannerCachingSize)
+ .recordStats()
+ .build()
+ )
+ } else {
+ None
+ }
+
+ private[queryplanner] def invalidateCaches(): Unit = {
+ shardPushdownCache.foreach(_.invalidateAll())
+ tSchemaChangingCache.foreach(_.invalidateAll())
+ }
+
// failed failover counter captures failovers which are not possible because at least one shard
// is down both on the primary and DR clusters, the query will get executed only when the
// partial results are acceptable otherwise an exception is thrown
@@ -80,14 +111,9 @@ class SingleClusterPlanner(val dataset: Dataset,
qContext.plannerParams.targetSchemaProviderOverride.getOrElse(_targetSchemaProvider)
}
- /**
- * Returns true iff a target-schema:
- * (1) matches any shard-key matched by the argument filters, and
- * (2) changes between the argument timestamps.
- */
- def isTargetSchemaChanging(shardKeyFilters: Seq[ColumnFilter],
- startMs: Long, endMs: Long,
- qContext: QueryContext): Boolean = {
+ private def isTargetSchemaChangingInner(shardKeyFilters: Seq[ColumnFilter],
+ startMs: Long, endMs: Long,
+ qContext: QueryContext): Boolean = {
val keyToValues = shardKeyFilters.map { filter =>
val values = filter match {
case ColumnFilter(col, regex: EqualsRegex) if QueryUtils.containsPipeOnlyRegex(regex.value.toString) =>
@@ -97,6 +123,7 @@ class SingleClusterPlanner(val dataset: Dataset,
}
(filter.column, values)
}.toMap
+
QueryUtils.makeAllKeyValueCombos(keyToValues).exists { shardKeys =>
// Replace any EqualsRegex shard-key filters with Equals.
val equalsFilters = shardKeys.map(entry => ColumnFilter(entry._1, Equals(entry._2))).toSeq
@@ -106,6 +133,25 @@ class SingleClusterPlanner(val dataset: Dataset,
}
}
+
+ /**
+ * Returns true iff a target-schema:
+ * (1) matches any shard-key matched by the argument filters, and
+ * (2) changes between the argument timestamps.
+ */
+ def isTargetSchemaChanging(shardKeyFilters: Seq[ColumnFilter],
+ startMs: Long, endMs: Long,
+ qContext: QueryContext): Boolean = {
+ tSchemaChangingCache match {
+ case Some(cache) =>
+ cache.get((shardKeyFilters, startMs, endMs), _ => {
+ Some(isTargetSchemaChangingInner(shardKeyFilters, startMs, endMs, qContext))
+ }).getOrElse(true)
+ case None =>
+ isTargetSchemaChangingInner(shardKeyFilters, startMs, endMs, qContext)
+ }
+ }
+
/**
* Returns true iff a target-schema should be used to identify query shards.
* A target-schema should be used iff all of:
@@ -515,11 +561,7 @@ class SingleClusterPlanner(val dataset: Dataset,
}
// scalastyle:on method.length
- /**
- * Returns a set of shards iff a plan can be pushed-down to each.
- * See [[LogicalPlanUtils.getPushdownKeys]] for more info.
- */
- private def getPushdownShards(qContext: QueryContext,
+ private def getPushdownShardsInner(qContext: QueryContext,
plan: LogicalPlan): Option[Set[Int]] = {
val getRawPushdownShards = (rs: RawSeries) => {
if (qContext.plannerParams.shardOverrides.isEmpty) {
@@ -534,6 +576,22 @@ class SingleClusterPlanner(val dataset: Dataset,
rs => LogicalPlan.getRawSeriesFilters(rs))
}
+ /**
+ * Returns a set of shards iff a plan can be pushed-down to each.
+ * See [[LogicalPlanUtils.getPushdownKeys]] for more info.
+ */
+ private def getPushdownShards(qContext: QueryContext,
+ plan: LogicalPlan): Option[Set[Int]] = {
+ shardPushdownCache match {
+ case Some(cache) =>
+ cache.get((plan, qContext.plannerParams.shardOverrides), _ => {
+ getPushdownShardsInner(qContext, plan)
+ })
+ case None =>
+ getPushdownShardsInner(qContext, plan)
+ }
+ }
+
/**
* Materialize a BinaryJoin without the pushdown optimization.
* @param forceDispatcher If occupied, forces this BinaryJoin to be materialized with the dispatcher.
diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/SingleClusterPlannerSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/SingleClusterPlannerSpec.scala
index afd3462206..fd6a6558b1 100644
--- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/SingleClusterPlannerSpec.scala
+++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/SingleClusterPlannerSpec.scala
@@ -1,5 +1,6 @@
package filodb.coordinator.queryplanner
+//scalastyle:off
import akka.actor.ActorSystem
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
@@ -21,10 +22,10 @@ import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers
import filodb.query.LogicalPlan.getRawSeriesFilters
import filodb.query.exec.aggregator.{CountRowAggregator, SumRowAggregator}
+import org.scalatest.BeforeAndAfterEach
import org.scalatest.exceptions.TestFailedException
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
-
import scala.concurrent.duration._
object SingleClusterPlannerSpec {
@@ -51,7 +52,8 @@ object SingleClusterPlannerSpec {
}
}
-class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFutures with PlanValidationSpec {
+class SingleClusterPlannerSpec extends AnyFunSpec
+ with Matchers with ScalaFutures with BeforeAndAfterEach with PlanValidationSpec {
implicit val system = ActorSystem()
private val node = TestProbe().ref
@@ -71,6 +73,10 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture
private val engine = new SingleClusterPlanner(dataset, schemas, mapperRef, earliestRetainedTimestampFn = 0,
queryConfig, "raw")
+ override def beforeEach(): Unit = {
+ engine.invalidateCaches()
+ }
+
/*
This is the PromQL
@@ -604,7 +610,6 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture
}
it ("should pushdown BinaryJoins/Aggregates when valid") {
-
def spread(filter: Seq[ColumnFilter]): Seq[SpreadChange] = {
Seq(SpreadChange(0, 1))
}
diff --git a/core/src/main/resources/filodb-defaults.conf b/core/src/main/resources/filodb-defaults.conf
index 90a24f4562..f287146281 100644
--- a/core/src/main/resources/filodb-defaults.conf
+++ b/core/src/main/resources/filodb-defaults.conf
@@ -427,6 +427,11 @@ filodb {
# config to figure out the tenant column filter to enable max min column selection
max-min-tenant-column-filter = "_ws_"
+ single.cluster.cache {
+ enabled = true
+ # The maximum number of entries in the cache
+ cache-size = 2048
+ }
routing {
enable-remote-raw-exports = false
max-time-range-remote-raw-export = 180 minutes
diff --git a/core/src/main/scala/filodb.core/query/QueryConfig.scala b/core/src/main/scala/filodb.core/query/QueryConfig.scala
index 90a275b8a2..6b2363ca4c 100644
--- a/core/src/main/scala/filodb.core/query/QueryConfig.scala
+++ b/core/src/main/scala/filodb.core/query/QueryConfig.scala
@@ -43,6 +43,10 @@ object QueryConfig {
periodOfUncertaintyMs
)
+ val scCachingEnabled = queryConfig.as[Boolean]("single.cluster.cache.enabled")
+ val scCacheSize = queryConfig.as[Int]("single.cluster.cache.cache-size")
+ val cachingConfig = CachingConfig(scCachingEnabled, scCacheSize)
+
QueryConfig(askTimeout, staleSampleAfterMs, minStepMs, fastReduceMaxWindows, parser, translatePromToFilodbHistogram,
fasterRateEnabled, routingConfig.as[Option[String]]("partition_name"),
routingConfig.as[Option[Long]]("remote.http.timeout"),
@@ -52,7 +56,7 @@ object QueryConfig {
allowPartialResultsRangeQuery, allowPartialResultsMetadataQuery,
grpcDenyList.split(",").map(_.trim.toLowerCase).toSet,
None,
- containerOverrides, rc)
+ containerOverrides, rc, cachingConfig)
}
import scala.concurrent.duration._
@@ -84,6 +88,11 @@ case class RoutingConfig(
periodOfUncertaintyMs: Long = (5 minutes).toMillis
)
+case class CachingConfig(
+ singleClusterPlannerCachingEnabled: Boolean = true,
+ singleClusterPlannerCachingSize: Int = 2048
+ )
+
case class QueryConfig(askTimeout: FiniteDuration,
staleSampleAfterMs: Long,
minStepMs: Long,
@@ -102,4 +111,5 @@ case class QueryConfig(askTimeout: FiniteDuration,
grpcPartitionsDenyList: Set[String] = Set.empty,
plannerSelector: Option[String] = None,
recordContainerOverrides: Map[String, Int] = Map.empty,
- routingConfig: RoutingConfig = RoutingConfig())
+ routingConfig: RoutingConfig = RoutingConfig(),
+ cachingConfig: CachingConfig = CachingConfig())
diff --git a/core/src/main/scala/filodb.core/query/QueryUtils.scala b/core/src/main/scala/filodb.core/query/QueryUtils.scala
index 285e27c6d4..cb7bfaf80d 100644
--- a/core/src/main/scala/filodb.core/query/QueryUtils.scala
+++ b/core/src/main/scala/filodb.core/query/QueryUtils.scala
@@ -1,5 +1,6 @@
package filodb.core.query
+import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -7,10 +8,18 @@ import scala.collection.mutable.ArrayBuffer
* Storage for utility functions.
*/
object QueryUtils {
- val REGEX_CHARS = Array('.', '?', '+', '*', '|', '{', '}', '[', ']', '(', ')', '"', '\\')
+ val REGEX_CHARS: Array[Char] = Array('.', '?', '+', '*', '|', '{', '}', '[', ']', '(', ')', '"', '\\')
+ private val COMBO_CACHE_SIZE = 2048
private val regexCharsMinusPipe = (REGEX_CHARS.toSet - '|').toArray
+ private val comboCache: Cache[Map[String, Seq[String]], Seq[Map[String, String]]] =
+ Caffeine.newBuilder()
+ .maximumSize(COMBO_CACHE_SIZE)
+ .recordStats()
+ .build()
+
+
/**
* Returns true iff the argument string contains any special regex chars.
*/
@@ -72,7 +81,7 @@ object QueryUtils {
splits.append(left)
remaining = right
// count of all characters before the remaining suffix (+1 to account for pipe)
- offset = offset + left.size + 1
+ offset = offset + left.length + 1
}
splits.append(remaining)
splits
@@ -89,11 +98,13 @@ object QueryUtils {
def makeAllKeyValueCombos(keyToValues: Map[String, Seq[String]]): Seq[Map[String, String]] = {
// Store the entries with some order, then find all possible value combos s.t. each combo's
// ith value is a value of the ith key.
- val entries = keyToValues.toSeq
- val keys = entries.map(_._1)
- val vals = entries.map(_._2.toSeq)
- val combos = QueryUtils.combinations(vals)
- // Zip the ordered keys with the ordered values.
- combos.map(keys.zip(_).toMap)
+ comboCache.get(keyToValues, _ => {
+ val entries = keyToValues.toSeq
+ val keys = entries.map(_._1)
+ val vals = entries.map(_._2.toSeq)
+ val combos = QueryUtils.combinations(vals)
+ // Zip the ordered keys with the ordered values.
+ combos.map(keys.zip(_).toMap)
+ })
}
}
diff --git a/core/src/test/resources/application_test.conf b/core/src/test/resources/application_test.conf
index ddccfe9aa0..0433797532 100644
--- a/core/src/test/resources/application_test.conf
+++ b/core/src/test/resources/application_test.conf
@@ -148,7 +148,14 @@ filodb {
}
grpc {
partitions-deny-list = ""
- }
+ }
+
+ single.cluster.cache {
+ enabled = true
+ # The maximum number of entries in the cache
+ cache-size = 2048
+ }
+
routing {
enable-remote-raw-exports = false
max-time-range-remote-raw-export = 30 minutes
diff --git a/jmh/src/main/scala/filodb.jmh/PlannerBenchmark.scala b/jmh/src/main/scala/filodb.jmh/PlannerBenchmark.scala
new file mode 100644
index 0000000000..c934a3bb49
--- /dev/null
+++ b/jmh/src/main/scala/filodb.jmh/PlannerBenchmark.scala
@@ -0,0 +1,118 @@
+// scalastyle:off
+package filodb.jmh
+
+import java.util.concurrent.TimeUnit
+import akka.actor.ActorSystem
+import akka.testkit.TestProbe
+import com.typesafe.config.ConfigFactory
+import com.typesafe.scalalogging.Logger
+import org.openjdk.jmh.annotations._
+
+import scala.concurrent.duration._
+import filodb.coordinator.ShardMapper
+import filodb.coordinator.client.QueryCommands.{FunctionalSpreadProvider, FunctionalTargetSchemaProvider}
+import filodb.coordinator.queryplanner.SingleClusterPlanner
+import filodb.core.{MetricsTestData, SpreadChange, SpreadProvider, TargetSchemaChange}
+import filodb.core.metadata.Schemas
+import filodb.core.query.{PlannerParams, PromQlQueryParams, QueryConfig, QueryContext}
+import filodb.prometheus.ast.TimeStepParams
+import filodb.prometheus.parse.Parser
+import filodb.query.exec.ExecPlan
+
+@State(Scope.Thread)
+class PlannerBenchmark {
+
+ // Run using the following in sbt
+ // jmh/jmh:run -rf json -i 5 -wi 3 -f 1 -jvmArgsAppend -XX:MaxInlineLevel=20 -jvmArgsAppend -Xmx4g -jvmArgsAppend -XX:MaxInlineSize=99 -jvmArgsAppend -Dlogback.configurationFile=/conf/logback-dev.xml filodb.jmh.PlannerBenchmark -prof "async:libPath=/lib/libasyncProfiler.dylib;output=jfr;alloc=0"
+
+
+ var system: Option[ActorSystem] = None
+ var planner: Option[SingleClusterPlanner] = None
+ var now = 0L;
+ val logger: Logger = Logger("PlannerBenchmark")
+ private val spreadProvider: Option[SpreadProvider] = Some(
+ FunctionalSpreadProvider(
+ _ => Seq(SpreadChange(0, 8)))
+ )
+ private val tSchemaProvider: Option[FunctionalTargetSchemaProvider] = Some(
+ FunctionalTargetSchemaProvider(
+ _ => Seq(TargetSchemaChange(0, Seq("job", "app")))
+ )
+ )
+
+
+ var execPlan: Option[ExecPlan] = None
+
+
+ val query = """ foo { job="baz" , node!="", ns="ns"}
+ | OR on (app)
+ | bar { job="baz", node!="", ns="ns" }
+ | * on (app) group_right()
+ | baz{ job="baz", node!="", ns="ns" } == 1""".stripMargin
+
+
+ private def buildPlanners(): Unit = {
+ implicit val system: ActorSystem = ActorSystem()
+ this.system = Some(system)
+ val node = TestProbe().ref
+ val mapper = new ShardMapper(256)
+ for (i <- 0 until 256) {
+ mapper.registerNode(Seq(i), node)
+ }
+ this.now = System.currentTimeMillis()
+
+ val rawRetention = 7.days.toMillis
+
+ val routingConfigString = "routing {\n remote {\n http {\n timeout = 10000\n }\n }\n}"
+ val routingConfig = ConfigFactory.parseString(routingConfigString)
+ val config = ConfigFactory.load("application_test.conf").getConfig("filodb.query").withFallback(routingConfig)
+ val queryConfig = QueryConfig(config).copy(plannerSelector = Some("plannerSelector"))
+
+ val dataset = MetricsTestData.timeseriesDataset
+ val schemas = Schemas(dataset.schema)
+
+ planner = Some(new SingleClusterPlanner(dataset, schemas, mapper,
+ earliestRetainedTimestampFn = now - rawRetention, queryConfig, "raw"))
+ }
+
+ @Setup(Level.Trial)
+ def setup(): Unit = {
+ buildPlanners()
+ }
+
+ @TearDown(Level.Trial)
+ def teardown(): Unit = {
+ this.system.foreach(_.terminate())
+// println("\n\n===================================\n\n")
+// print(s"${execPlan.get.printTree()}")
+ }
+
+ @Benchmark
+ @BenchmarkMode(Array(Mode.AverageTime))
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ @Warmup(iterations = 1, time = 1)
+ @Measurement(iterations = 5, time = 1)
+ @throws[Exception]
+ def benchmarkMaterializePlan(): Unit = {
+
+ var i = 0;
+ // Materialize the query every hour for past 5 days
+ for (endTime <- (now - 3.hour.toMillis) to now by 1.hour.toMillis) {
+ val endSecs = endTime / 1000;
+ val timeParams = TimeStepParams(endSecs - 1.day.toSeconds, 60, endSecs)
+ val lp = Parser.queryRangeToLogicalPlan(query, timeParams)
+ execPlan = Some(planner.get.materialize(lp,
+ QueryContext(PromQlQueryParams("dummy", timeParams.start, timeParams.step, timeParams.end),
+ plannerParams = PlannerParams(
+ spreadOverride = spreadProvider,
+ targetSchemaProviderOverride = tSchemaProvider,
+ queryTimeoutMillis = 1000000))))
+ }
+ }
+
+}
+
+
+
+
+
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index a1ee58e89f..72e31549fd 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -197,7 +197,8 @@ object Dependencies {
// )
lazy val jmhDeps = Seq(
- "org.apache.spark" %% "spark-sql" % sparkVersion excludeAll(excludeSlf4jLog4j, excludeZK, excludeJersey)
+ "org.apache.spark" %% "spark-sql" % sparkVersion excludeAll(excludeSlf4jLog4j, excludeZK, excludeJersey),
+ "com.typesafe.akka" %% "akka-testkit" % akkaVersion
)
lazy val gatlingDeps = Seq(