diff --git a/conf/logback-dev.xml b/conf/logback-dev.xml index 94a9c3fe14..8c0e6612a2 100644 --- a/conf/logback-dev.xml +++ b/conf/logback-dev.xml @@ -8,7 +8,7 @@ - + diff --git a/coordinator/src/main/scala/filodb.coordinator/ShardMapper.scala b/coordinator/src/main/scala/filodb.coordinator/ShardMapper.scala index 07390f6a65..e7114fe8b8 100644 --- a/coordinator/src/main/scala/filodb.coordinator/ShardMapper.scala +++ b/coordinator/src/main/scala/filodb.coordinator/ShardMapper.scala @@ -246,7 +246,7 @@ class ShardMapper(val numShards: Int) extends Serializable { * Registers a new node to the given shards. Modifies state in place. * Idempotent. */ - private[coordinator] def registerNode(shards: Seq[Int], coordinator: ActorRef): Try[Unit] = { + def registerNode(shards: Seq[Int], coordinator: ActorRef): Try[Unit] = { shards foreach { case shard => //we always override the mapping. There was code earlier which prevent from diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala index d457a2710a..68370e4ea5 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala @@ -4,6 +4,7 @@ import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import akka.actor.ActorRef +import com.github.benmanes.caffeine.cache.{Cache, Caffeine} import com.typesafe.scalalogging.StrictLogging import kamon.Kamon @@ -65,6 +66,36 @@ class SingleClusterPlanner(val dataset: Dataset, private val shardColumns = dsOptions.shardKeyColumns.sorted private val dsRef = dataset.ref + private val shardPushdownCache: Option[Cache[(LogicalPlan, Option[Seq[Int]]), Option[Set[Int]]]] = + if (queryConfig.cachingConfig.singleClusterPlannerCachingEnabled) { + Some( + Caffeine.newBuilder() + .maximumSize(queryConfig.cachingConfig.singleClusterPlannerCachingSize) + .recordStats() + .build() + ) + } else { + None + } + + + private val tSchemaChangingCache: Option[Cache[(Seq[ColumnFilter], Long, Long), Some[Boolean]]] = + if (queryConfig.cachingConfig.singleClusterPlannerCachingEnabled) { + Some( + Caffeine.newBuilder() + .maximumSize(queryConfig.cachingConfig.singleClusterPlannerCachingSize) + .recordStats() + .build() + ) + } else { + None + } + + private[queryplanner] def invalidateCaches(): Unit = { + shardPushdownCache.foreach(_.invalidateAll()) + tSchemaChangingCache.foreach(_.invalidateAll()) + } + // failed failover counter captures failovers which are not possible because at least one shard // is down both on the primary and DR clusters, the query will get executed only when the // partial results are acceptable otherwise an exception is thrown @@ -80,14 +111,9 @@ class SingleClusterPlanner(val dataset: Dataset, qContext.plannerParams.targetSchemaProviderOverride.getOrElse(_targetSchemaProvider) } - /** - * Returns true iff a target-schema: - * (1) matches any shard-key matched by the argument filters, and - * (2) changes between the argument timestamps. - */ - def isTargetSchemaChanging(shardKeyFilters: Seq[ColumnFilter], - startMs: Long, endMs: Long, - qContext: QueryContext): Boolean = { + private def isTargetSchemaChangingInner(shardKeyFilters: Seq[ColumnFilter], + startMs: Long, endMs: Long, + qContext: QueryContext): Boolean = { val keyToValues = shardKeyFilters.map { filter => val values = filter match { case ColumnFilter(col, regex: EqualsRegex) if QueryUtils.containsPipeOnlyRegex(regex.value.toString) => @@ -97,6 +123,7 @@ class SingleClusterPlanner(val dataset: Dataset, } (filter.column, values) }.toMap + QueryUtils.makeAllKeyValueCombos(keyToValues).exists { shardKeys => // Replace any EqualsRegex shard-key filters with Equals. val equalsFilters = shardKeys.map(entry => ColumnFilter(entry._1, Equals(entry._2))).toSeq @@ -106,6 +133,25 @@ class SingleClusterPlanner(val dataset: Dataset, } } + + /** + * Returns true iff a target-schema: + * (1) matches any shard-key matched by the argument filters, and + * (2) changes between the argument timestamps. + */ + def isTargetSchemaChanging(shardKeyFilters: Seq[ColumnFilter], + startMs: Long, endMs: Long, + qContext: QueryContext): Boolean = { + tSchemaChangingCache match { + case Some(cache) => + cache.get((shardKeyFilters, startMs, endMs), _ => { + Some(isTargetSchemaChangingInner(shardKeyFilters, startMs, endMs, qContext)) + }).getOrElse(true) + case None => + isTargetSchemaChangingInner(shardKeyFilters, startMs, endMs, qContext) + } + } + /** * Returns true iff a target-schema should be used to identify query shards. * A target-schema should be used iff all of: @@ -515,11 +561,7 @@ class SingleClusterPlanner(val dataset: Dataset, } // scalastyle:on method.length - /** - * Returns a set of shards iff a plan can be pushed-down to each. - * See [[LogicalPlanUtils.getPushdownKeys]] for more info. - */ - private def getPushdownShards(qContext: QueryContext, + private def getPushdownShardsInner(qContext: QueryContext, plan: LogicalPlan): Option[Set[Int]] = { val getRawPushdownShards = (rs: RawSeries) => { if (qContext.plannerParams.shardOverrides.isEmpty) { @@ -534,6 +576,22 @@ class SingleClusterPlanner(val dataset: Dataset, rs => LogicalPlan.getRawSeriesFilters(rs)) } + /** + * Returns a set of shards iff a plan can be pushed-down to each. + * See [[LogicalPlanUtils.getPushdownKeys]] for more info. + */ + private def getPushdownShards(qContext: QueryContext, + plan: LogicalPlan): Option[Set[Int]] = { + shardPushdownCache match { + case Some(cache) => + cache.get((plan, qContext.plannerParams.shardOverrides), _ => { + getPushdownShardsInner(qContext, plan) + }) + case None => + getPushdownShardsInner(qContext, plan) + } + } + /** * Materialize a BinaryJoin without the pushdown optimization. * @param forceDispatcher If occupied, forces this BinaryJoin to be materialized with the dispatcher. diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/SingleClusterPlannerSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/SingleClusterPlannerSpec.scala index afd3462206..fd6a6558b1 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/SingleClusterPlannerSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/SingleClusterPlannerSpec.scala @@ -1,5 +1,6 @@ package filodb.coordinator.queryplanner +//scalastyle:off import akka.actor.ActorSystem import akka.testkit.TestProbe import com.typesafe.config.ConfigFactory @@ -21,10 +22,10 @@ import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.should.Matchers import filodb.query.LogicalPlan.getRawSeriesFilters import filodb.query.exec.aggregator.{CountRowAggregator, SumRowAggregator} +import org.scalatest.BeforeAndAfterEach import org.scalatest.exceptions.TestFailedException import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper - import scala.concurrent.duration._ object SingleClusterPlannerSpec { @@ -51,7 +52,8 @@ object SingleClusterPlannerSpec { } } -class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFutures with PlanValidationSpec { +class SingleClusterPlannerSpec extends AnyFunSpec + with Matchers with ScalaFutures with BeforeAndAfterEach with PlanValidationSpec { implicit val system = ActorSystem() private val node = TestProbe().ref @@ -71,6 +73,10 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture private val engine = new SingleClusterPlanner(dataset, schemas, mapperRef, earliestRetainedTimestampFn = 0, queryConfig, "raw") + override def beforeEach(): Unit = { + engine.invalidateCaches() + } + /* This is the PromQL @@ -604,7 +610,6 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture } it ("should pushdown BinaryJoins/Aggregates when valid") { - def spread(filter: Seq[ColumnFilter]): Seq[SpreadChange] = { Seq(SpreadChange(0, 1)) } diff --git a/core/src/main/resources/filodb-defaults.conf b/core/src/main/resources/filodb-defaults.conf index 90a24f4562..f287146281 100644 --- a/core/src/main/resources/filodb-defaults.conf +++ b/core/src/main/resources/filodb-defaults.conf @@ -427,6 +427,11 @@ filodb { # config to figure out the tenant column filter to enable max min column selection max-min-tenant-column-filter = "_ws_" + single.cluster.cache { + enabled = true + # The maximum number of entries in the cache + cache-size = 2048 + } routing { enable-remote-raw-exports = false max-time-range-remote-raw-export = 180 minutes diff --git a/core/src/main/scala/filodb.core/query/QueryConfig.scala b/core/src/main/scala/filodb.core/query/QueryConfig.scala index 90a275b8a2..6b2363ca4c 100644 --- a/core/src/main/scala/filodb.core/query/QueryConfig.scala +++ b/core/src/main/scala/filodb.core/query/QueryConfig.scala @@ -43,6 +43,10 @@ object QueryConfig { periodOfUncertaintyMs ) + val scCachingEnabled = queryConfig.as[Boolean]("single.cluster.cache.enabled") + val scCacheSize = queryConfig.as[Int]("single.cluster.cache.cache-size") + val cachingConfig = CachingConfig(scCachingEnabled, scCacheSize) + QueryConfig(askTimeout, staleSampleAfterMs, minStepMs, fastReduceMaxWindows, parser, translatePromToFilodbHistogram, fasterRateEnabled, routingConfig.as[Option[String]]("partition_name"), routingConfig.as[Option[Long]]("remote.http.timeout"), @@ -52,7 +56,7 @@ object QueryConfig { allowPartialResultsRangeQuery, allowPartialResultsMetadataQuery, grpcDenyList.split(",").map(_.trim.toLowerCase).toSet, None, - containerOverrides, rc) + containerOverrides, rc, cachingConfig) } import scala.concurrent.duration._ @@ -84,6 +88,11 @@ case class RoutingConfig( periodOfUncertaintyMs: Long = (5 minutes).toMillis ) +case class CachingConfig( + singleClusterPlannerCachingEnabled: Boolean = true, + singleClusterPlannerCachingSize: Int = 2048 + ) + case class QueryConfig(askTimeout: FiniteDuration, staleSampleAfterMs: Long, minStepMs: Long, @@ -102,4 +111,5 @@ case class QueryConfig(askTimeout: FiniteDuration, grpcPartitionsDenyList: Set[String] = Set.empty, plannerSelector: Option[String] = None, recordContainerOverrides: Map[String, Int] = Map.empty, - routingConfig: RoutingConfig = RoutingConfig()) + routingConfig: RoutingConfig = RoutingConfig(), + cachingConfig: CachingConfig = CachingConfig()) diff --git a/core/src/main/scala/filodb.core/query/QueryUtils.scala b/core/src/main/scala/filodb.core/query/QueryUtils.scala index 285e27c6d4..cb7bfaf80d 100644 --- a/core/src/main/scala/filodb.core/query/QueryUtils.scala +++ b/core/src/main/scala/filodb.core/query/QueryUtils.scala @@ -1,5 +1,6 @@ package filodb.core.query +import com.github.benmanes.caffeine.cache.{Cache, Caffeine} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -7,10 +8,18 @@ import scala.collection.mutable.ArrayBuffer * Storage for utility functions. */ object QueryUtils { - val REGEX_CHARS = Array('.', '?', '+', '*', '|', '{', '}', '[', ']', '(', ')', '"', '\\') + val REGEX_CHARS: Array[Char] = Array('.', '?', '+', '*', '|', '{', '}', '[', ']', '(', ')', '"', '\\') + private val COMBO_CACHE_SIZE = 2048 private val regexCharsMinusPipe = (REGEX_CHARS.toSet - '|').toArray + private val comboCache: Cache[Map[String, Seq[String]], Seq[Map[String, String]]] = + Caffeine.newBuilder() + .maximumSize(COMBO_CACHE_SIZE) + .recordStats() + .build() + + /** * Returns true iff the argument string contains any special regex chars. */ @@ -72,7 +81,7 @@ object QueryUtils { splits.append(left) remaining = right // count of all characters before the remaining suffix (+1 to account for pipe) - offset = offset + left.size + 1 + offset = offset + left.length + 1 } splits.append(remaining) splits @@ -89,11 +98,13 @@ object QueryUtils { def makeAllKeyValueCombos(keyToValues: Map[String, Seq[String]]): Seq[Map[String, String]] = { // Store the entries with some order, then find all possible value combos s.t. each combo's // ith value is a value of the ith key. - val entries = keyToValues.toSeq - val keys = entries.map(_._1) - val vals = entries.map(_._2.toSeq) - val combos = QueryUtils.combinations(vals) - // Zip the ordered keys with the ordered values. - combos.map(keys.zip(_).toMap) + comboCache.get(keyToValues, _ => { + val entries = keyToValues.toSeq + val keys = entries.map(_._1) + val vals = entries.map(_._2.toSeq) + val combos = QueryUtils.combinations(vals) + // Zip the ordered keys with the ordered values. + combos.map(keys.zip(_).toMap) + }) } } diff --git a/core/src/test/resources/application_test.conf b/core/src/test/resources/application_test.conf index ddccfe9aa0..0433797532 100644 --- a/core/src/test/resources/application_test.conf +++ b/core/src/test/resources/application_test.conf @@ -148,7 +148,14 @@ filodb { } grpc { partitions-deny-list = "" - } + } + + single.cluster.cache { + enabled = true + # The maximum number of entries in the cache + cache-size = 2048 + } + routing { enable-remote-raw-exports = false max-time-range-remote-raw-export = 30 minutes diff --git a/jmh/src/main/scala/filodb.jmh/PlannerBenchmark.scala b/jmh/src/main/scala/filodb.jmh/PlannerBenchmark.scala new file mode 100644 index 0000000000..c934a3bb49 --- /dev/null +++ b/jmh/src/main/scala/filodb.jmh/PlannerBenchmark.scala @@ -0,0 +1,118 @@ +// scalastyle:off +package filodb.jmh + +import java.util.concurrent.TimeUnit +import akka.actor.ActorSystem +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory +import com.typesafe.scalalogging.Logger +import org.openjdk.jmh.annotations._ + +import scala.concurrent.duration._ +import filodb.coordinator.ShardMapper +import filodb.coordinator.client.QueryCommands.{FunctionalSpreadProvider, FunctionalTargetSchemaProvider} +import filodb.coordinator.queryplanner.SingleClusterPlanner +import filodb.core.{MetricsTestData, SpreadChange, SpreadProvider, TargetSchemaChange} +import filodb.core.metadata.Schemas +import filodb.core.query.{PlannerParams, PromQlQueryParams, QueryConfig, QueryContext} +import filodb.prometheus.ast.TimeStepParams +import filodb.prometheus.parse.Parser +import filodb.query.exec.ExecPlan + +@State(Scope.Thread) +class PlannerBenchmark { + + // Run using the following in sbt + // jmh/jmh:run -rf json -i 5 -wi 3 -f 1 -jvmArgsAppend -XX:MaxInlineLevel=20 -jvmArgsAppend -Xmx4g -jvmArgsAppend -XX:MaxInlineSize=99 -jvmArgsAppend -Dlogback.configurationFile=/conf/logback-dev.xml filodb.jmh.PlannerBenchmark -prof "async:libPath=/lib/libasyncProfiler.dylib;output=jfr;alloc=0" + + + var system: Option[ActorSystem] = None + var planner: Option[SingleClusterPlanner] = None + var now = 0L; + val logger: Logger = Logger("PlannerBenchmark") + private val spreadProvider: Option[SpreadProvider] = Some( + FunctionalSpreadProvider( + _ => Seq(SpreadChange(0, 8))) + ) + private val tSchemaProvider: Option[FunctionalTargetSchemaProvider] = Some( + FunctionalTargetSchemaProvider( + _ => Seq(TargetSchemaChange(0, Seq("job", "app"))) + ) + ) + + + var execPlan: Option[ExecPlan] = None + + + val query = """ foo { job="baz" , node!="", ns="ns"} + | OR on (app) + | bar { job="baz", node!="", ns="ns" } + | * on (app) group_right() + | baz{ job="baz", node!="", ns="ns" } == 1""".stripMargin + + + private def buildPlanners(): Unit = { + implicit val system: ActorSystem = ActorSystem() + this.system = Some(system) + val node = TestProbe().ref + val mapper = new ShardMapper(256) + for (i <- 0 until 256) { + mapper.registerNode(Seq(i), node) + } + this.now = System.currentTimeMillis() + + val rawRetention = 7.days.toMillis + + val routingConfigString = "routing {\n remote {\n http {\n timeout = 10000\n }\n }\n}" + val routingConfig = ConfigFactory.parseString(routingConfigString) + val config = ConfigFactory.load("application_test.conf").getConfig("filodb.query").withFallback(routingConfig) + val queryConfig = QueryConfig(config).copy(plannerSelector = Some("plannerSelector")) + + val dataset = MetricsTestData.timeseriesDataset + val schemas = Schemas(dataset.schema) + + planner = Some(new SingleClusterPlanner(dataset, schemas, mapper, + earliestRetainedTimestampFn = now - rawRetention, queryConfig, "raw")) + } + + @Setup(Level.Trial) + def setup(): Unit = { + buildPlanners() + } + + @TearDown(Level.Trial) + def teardown(): Unit = { + this.system.foreach(_.terminate()) +// println("\n\n===================================\n\n") +// print(s"${execPlan.get.printTree()}") + } + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + @Warmup(iterations = 1, time = 1) + @Measurement(iterations = 5, time = 1) + @throws[Exception] + def benchmarkMaterializePlan(): Unit = { + + var i = 0; + // Materialize the query every hour for past 5 days + for (endTime <- (now - 3.hour.toMillis) to now by 1.hour.toMillis) { + val endSecs = endTime / 1000; + val timeParams = TimeStepParams(endSecs - 1.day.toSeconds, 60, endSecs) + val lp = Parser.queryRangeToLogicalPlan(query, timeParams) + execPlan = Some(planner.get.materialize(lp, + QueryContext(PromQlQueryParams("dummy", timeParams.start, timeParams.step, timeParams.end), + plannerParams = PlannerParams( + spreadOverride = spreadProvider, + targetSchemaProviderOverride = tSchemaProvider, + queryTimeoutMillis = 1000000)))) + } + } + +} + + + + + diff --git a/project/Dependencies.scala b/project/Dependencies.scala index a1ee58e89f..72e31549fd 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -197,7 +197,8 @@ object Dependencies { // ) lazy val jmhDeps = Seq( - "org.apache.spark" %% "spark-sql" % sparkVersion excludeAll(excludeSlf4jLog4j, excludeZK, excludeJersey) + "org.apache.spark" %% "spark-sql" % sparkVersion excludeAll(excludeSlf4jLog4j, excludeZK, excludeJersey), + "com.typesafe.akka" %% "akka-testkit" % akkaVersion ) lazy val gatlingDeps = Seq(