Skip to content

Commit

Permalink
updating the schema and adding unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeep6189 committed Jan 6, 2025
1 parent 66709a9 commit 6917f05
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ object NodeClusterActor {
final case class DatasetUnknown(ref: DatasetRef) extends ErrorResponse
final case class BadSchema(message: String) extends ErrorResponse
final case class BadData(message: String) extends ErrorResponse
final case class InternalServiceError(message: String) extends ErrorResponse

// Cluster state info commands
// Returns a Seq[DatasetRef]
Expand Down
26 changes: 20 additions & 6 deletions coordinator/src/main/scala/filodb.coordinator/ShardStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,29 @@ sealed trait ShardAction extends Serializable
final case class CurrentShardSnapshot(ref: DatasetRef,
map: ShardMapper) extends ShardAction with Response

/**
* Optimized form of the ShardMapper state representation.
* NOTE: It doesn't track the shard status updates from coordinator or Ingestion actors. It is just
* a wrapper which compresses the response of ShardMapper state to reduce network transmission costs.
*
* @param nodeCountInCluster Number of replicas in the filodb cluster
* @param numShards Number of shards in the filodb cluster
* @param k8sHostFormat K8s host format. Valid ONLY for ClusterV2 shard assignment strategy
* @param shardState ByteArray. Each bit of the byte represents the shard status.
* For example: lets say we have 4 shards with following status:
* Seq[ShardStatusAssigned, ShardStatusRecovery, ShardStatusAssigned, ShardStatusAssigned]
* Then the shardState would be an array of single byte whose bit representation is - 1000 0000
* Explanation - corresponding bit is set to 1 if the shard is assigned, else 0
*/
final case class ShardMapperV2(nodeCountInCluster: Int, numShards: Int, k8sHostFormat: String,
shardState: Array[Byte])

/**
* @param nodeCountInCluster number of nodes in the cluster
* @param numShards
* @param k8sHostFormat
* @param shardState
* Response to GetShardMapV2 request. Uses the optimized ShardMapperV2 representation. Only applicable
* for ClusterV2 shard assignment strategy.
* @param map ShardMapperV2
*/
final case class ShardSnapshot(nodeCountInCluster: Int, numShards: Int, k8sHostFormat: String,
shardState: Array[Byte]) extends ShardAction with Response
final case class ShardSnapshot(map: ShardMapperV2) extends ShardAction with Response

/**
* Full state of all shards, sent to all ingestion actors. They react by starting/stopping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,20 @@ trait ClusterOps extends ClientBase with StrictLogging {
}
}
}

/**
* ShardMapperV2 is an optimization of the response size over the ShardMapper and GetShardMap ask call
* @return Some(ShardMapperV2) if the dataset is registered, None if dataset not found
*/
def getShardMapperV2(dataset: DatasetRef, v2Enabled: Boolean,
timeout: FiniteDuration = 30.seconds): Option[ShardMapperV2] = {
require(v2Enabled, s"ClusterV2 ShardAssignment is must for this operation")
val actor = Some(nodeCoordinator)
actor.flatMap { ref =>
Client.actorAsk(ref, GetShardMapV2(dataset), timeout) {
case ShardSnapshot(shardMapperV2) => Some(shardMapperV2)
case _ => None
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import filodb.query.QueryCommand
final case class GetShardMapScatter(ref: DatasetRef)
case object LocalShardsHealthRequest
case class DatasetShardHealth(dataset: DatasetRef, shard: Int, status: ShardStatus)
case class LocalShardsHealthResponse(shardStatus: Seq[DatasetShardHealth])

object NewNodeCoordinatorActor {

Expand Down Expand Up @@ -217,9 +216,11 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore,
sender() ! CurrentShardSnapshot(g.ref, clusterDiscovery.shardMapper(g.ref))
} catch { case e: Exception =>
logger.error(s"[ClusterV2] Error occurred when processing message $g", e)
// send a response to avoid blocking of akka caller for long time
sender() ! InternalServiceError(s"Exception while executing GetShardMap for dataset: ${g.ref.dataset}")
}

/*
* requested from HTTP API
* What is the trade-off between GetShardMap vs GetShardMapV2 ?
*
* No | Ask Call | Size of Response (256 Shards) | Compute Used
Expand All @@ -231,13 +232,16 @@ private[filodb] final class NewNodeCoordinatorActor(memStore: TimeSeriesStore,
case g: GetShardMapV2 =>
try {
val shardBitMap = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(clusterDiscovery.shardMapper(g.ref))
sender() ! ShardSnapshot(
val shardMapperV2 = ShardMapperV2(
settings.minNumNodes.get,
ingestionConfigs(g.ref).numShards,
settings.k8sHostFormat.get,
shardBitMap)
sender() ! ShardSnapshot(shardMapperV2)
} catch { case e: Exception =>
logger.error(s"[ClusterV2] Error occurred when processing message $g", e)
// send a response to avoid blocking of akka caller for long time
sender() ! InternalServiceError(s"Exception while executing GetShardMapV2 for dataset: ${g.ref.dataset}")
}

// requested from peer NewNodeCoordActors upon them receiving GetShardMap call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,13 +306,97 @@ class ShardMapperSpec extends ActorTest(ShardMapperSpec.getNewSystem) {

it ("test bitmap conversion of shard mapper") {
val numShards = 32
val map = new ShardMapper(numShards) // default init to ShardStatusUnassigned
val bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(map)
val shardMapper = new ShardMapper(numShards) // default init to ShardStatusUnassigned
var bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper)
bitRep.length shouldEqual 4
bitRep.forall (x => x == 0) shouldEqual true // no bit should be set at this point
bitRep.forall (x => x == 0x00.toByte) shouldEqual true // no bit should be set at this point

// move everyone to assigned
shardMapper.registerNode(shardMapper.statuses.indices, TestProbe().ref)
shardMapper.assignedShards.length shouldEqual 32

// status updated to assigned but bitmap representation should NOT yet be set to 1
bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper)
bitRep.length shouldEqual 4
bitRep.forall (x => x == 0x00.toByte) shouldEqual true // no bit should be set at this point

// move first 8 and last 8 shards to active
for (i <- 0 to 7) {
shardMapper.updateFromEvent(IngestionStarted(dataset, i, TestProbe().ref))
}
for (i <- 24 to 31) {
shardMapper.updateFromEvent(IngestionStarted(dataset, i, TestProbe().ref))
}
shardMapper.activeShards().size shouldEqual 16
bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper)
// 1111 1111 0000 0000 0000 0000 1111 1111
bitRep(0) shouldEqual 0xFF.toByte
bitRep(1) shouldEqual 0x00.toByte
bitRep(2) shouldEqual 0x00.toByte
bitRep(3) shouldEqual 0xFF.toByte
}

it ("test bitmap conversion of shard mapper with 256 shards") {
val shardMapper = new ShardMapper(256) // default init to ShardStatusUnassigned
var bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper)
bitRep.length shouldEqual 32
bitRep.forall (x => x == 0x00.toByte) shouldEqual true // no bit should be set at this point
shardMapper.registerNode(shardMapper.statuses.indices, TestProbe().ref)
shardMapper.assignedShards.length shouldEqual 256

// make all shards active
for (i <- 0 to 255) {
shardMapper.updateFromEvent(IngestionStarted(dataset, i, TestProbe().ref))
}
// check if all the bits are set correctly
bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper)
bitRep.forall (x => x == 0xFF.toByte) shouldEqual true

// make some shards in recovery mode
for (i <- 60 to 63) {
shardMapper.updateFromEvent(RecoveryInProgress(dataset, i, TestProbe().ref, 50))
}
// make some shards in down mode
for (i <- 64 to 67) {
shardMapper.updateFromEvent(ShardDown(dataset, i, TestProbe().ref))
}

shardMapper.activeShards().size shouldEqual 248
shardMapper.notActiveShards().size shouldEqual 8
bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper)

// first 60 shards are active
for (i <- 0 to 6) {
bitRep(i) shouldEqual 0xFF.toByte
}

// shards 56-63 should be 1111 0000
bitRep(7) shouldEqual 0xF0.toByte

// shards 64-71 should be 0000 1111
bitRep(8) shouldEqual 0x0F.toByte

// last 188 shards are active
for (i <- 9 to 31) {
bitRep(i) shouldEqual 0xFF.toByte
}
}

it ("test padding is set correctly in non 8 byte aligned number of shards") {
// most common setup is local testing with 4 shards
val shardMapper = new ShardMapper(2) // default init to ShardStatusUnassigned
var bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper)
bitRep.length shouldEqual 1
bitRep.forall (x => x == 0x00.toByte) shouldEqual true // no bit should be set at this point
shardMapper.registerNode(shardMapper.statuses.indices, TestProbe().ref)
shardMapper.assignedShards.length shouldEqual 2
for (i <- 0 to 1) {
shardMapper.updateFromEvent(IngestionStarted(dataset, i, TestProbe().ref))
}
bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper)
bitRep(0) shouldEqual 0xC0.toByte // 1100 0000 - padding for last 6 shards
shardMapper.updateFromEvent(ShardDown(dataset, 1, TestProbe().ref))

bitRep = NewNodeCoordinatorActor.shardMapperBitMapRepresentation(shardMapper)
bitRep(0) shouldEqual 0x80.toByte // 1000 0000
}
}
12 changes: 8 additions & 4 deletions http/src/main/scala/filodb/http/ClusterApiRoute.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,21 @@ class ClusterApiRoute(clusterProxy: ActorRef) extends FiloRoute with StrictLoggi
complete(httpList(statusList))
case DatasetUnknown(_) =>
complete(Codes.NotFound -> httpErr("DatasetUnknown", s"Dataset $dataset is not registered"))
case InternalServiceError(errorMessage) =>
complete(Codes.InternalServerError -> httpErr("InternalServerError", errorMessage))
}
}
} ~
// NOTE: statusV2 will only work with ClusteringV2 ShardAssignment strategy
path(Segment / "statusV2") { dataset =>
get {
val resp = onSuccess(asyncAsk(clusterProxy, GetShardMapV2(DatasetRef.fromDotString(dataset))))
resp {
case ShardSnapshot(_, _, _, _) =>
complete(httpList(Seq(resp)))
onSuccess(asyncAsk(clusterProxy, GetShardMapV2(DatasetRef.fromDotString(dataset)))) {
case ShardSnapshot(shardMapperV2) =>
complete(httpList(Seq(shardMapperV2)))
case DatasetUnknown(_) =>
complete(Codes.NotFound -> httpErr("DatasetUnknown", s"Dataset $dataset is not registered"))
case InternalServiceError(errorMessage) =>
complete(Codes.InternalServerError -> httpErr("InternalServerError", errorMessage))
}
}
} ~
Expand Down

0 comments on commit 6917f05

Please sign in to comment.