Skip to content

Commit

Permalink
misc(cli): use scallop instead of sumac (#854)
Browse files Browse the repository at this point in the history
  • Loading branch information
Szymon Matejczyk authored Aug 24, 2020
1 parent 3dec2c7 commit bd629df
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 72 deletions.
144 changes: 73 additions & 71 deletions cli/src/main/scala/filodb.cli/CliMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import scala.concurrent.duration._
import scala.util.Try

import com.opencsv.CSVWriter
import com.quantifind.sumac.{ArgMain, FieldArgs}
import monix.reactive.Observable
import org.rogach.scallop.ScallopConf
import org.scalactic._

import filodb.coordinator._
Expand All @@ -27,41 +27,42 @@ import filodb.prometheus.parse.Parser
import filodb.query._

// scalastyle:off
class Arguments extends FieldArgs {
var dataset: Option[String] = None
var database: Option[String] = None
var command: Option[String] = None
var filename: Option[String] = None
var configPath: Option[String] = None
class Arguments(args: Seq[String]) extends ScallopConf(args) {
val dataset = opt[String]()
val database = opt[String]()
val command = opt[String]()
val filename = opt[String]()
val configPath = opt[String]()
// max # of results returned. Don't make it too high.
var limit: Int = 200
var sampleLimit: Int = 1000000
var timeoutSeconds: Int = 60
var outfile: Option[String] = None
var delimiter: String = ","
var indexName: Option[String] = None
var host: Option[String] = None
var port: Int = 2552
var promql: Option[String] = None
var schema: Option[String] = None
var hexPk: Option[String] = None
var hexVector: Option[String] = None
var hexChunkInfo: Option[String] = None
var vectorType: Option[String] = None
var matcher: Option[String] = None
var labelNames: Seq[String] = Seq.empty
var labelFilter: Map[String, String] = Map.empty
var start: Long = System.currentTimeMillis() / 1000 // promql argument is seconds since epoch
var end: Long = System.currentTimeMillis() / 1000 // promql argument is seconds since epoch
var minutes: Option[String] = None
var step: Long = 10 // in seconds
var chunks: Option[String] = None // select either "memory" or "buffers" chunks only
var everyNSeconds: Option[String] = None
var shards: Option[Seq[String]] = None
var spread: Option[Integer] = None
val limit = opt[Int](default = Some(200))
val sampleLimit = opt[Int](default = Some(1000000))
val timeoutSeconds = opt[Int](default = Some(60))
val outfile = opt[String]()
val delimiter = opt[String](default = Some(",")).apply()
val indexName = opt[String]()
val host = opt[String]()
val port = opt[Int](default = Some(2552))
val promql = opt[String]()
val schema = opt[String]()
val hexPk = opt[String]()
val hexVector = opt[String]()
val hexChunkInfo = opt[String]()
val vectorType = opt[String]()
val matcher = opt[String]()
val labelNames = opt[List[String]](default = Some(List()))
val labelFilter = opt[Map[String, String]](default = Some(Map.empty))
val start: Long = System.currentTimeMillis() / 1000 // promql argument is seconds since epoch
val end: Long = System.currentTimeMillis() / 1000 // promql argument is seconds since epoch
val minutes = opt[String]()
val step = opt[Long](default = Some(10)) // in seconds
val chunks = opt[String]() // select either "memory" or "buffers" chunks only
val everyNSeconds = opt[String]()
val shards = opt[List[String]]()
val spread = opt[Int]()
verify()
}

object CliMain extends ArgMain[Arguments] with FilodbClusterNode {
object CliMain extends FilodbClusterNode {
var exitCode = 0

override val role = ClusterRole.Cli
Expand Down Expand Up @@ -99,30 +100,31 @@ object CliMain extends ArgMain[Arguments] with FilodbClusterNode {
println(" options: ./filo-cli -Dakka.loglevel=DEBUG -Dakka.actor.debug.receive=on -Dakka.actor.debug.autoreceive=on --command importcsv ...")
}

def getRef(args: Arguments): DatasetRef = DatasetRef(args.dataset.get, args.database)
def getRef(args: Arguments): DatasetRef = DatasetRef(args.dataset(), args.database.toOption)

def getClientAndRef(args: Arguments): (LocalClient, DatasetRef) = {
require(args.host.nonEmpty && args.dataset.nonEmpty, "--host and --dataset must be defined")
val remote = Client.standaloneClient(system, args.host.get, args.port)
(remote, DatasetRef(args.dataset.get))
require(args.host.isDefined && args.dataset.isDefined, "--host and --dataset must be defined")
val remote = Client.standaloneClient(system, args.host(), args.port())
(remote, DatasetRef(args.dataset()))
}

def getQueryRange(args: Arguments): TimeRangeParams =
args.chunks.filter { cOpt => cOpt == "memory" || cOpt == "buffers" }
.map {
case "memory" => InMemoryParam(args.step)
case "buffers" => WriteBuffersParam(args.step)
case "memory" => InMemoryParam(args.step())
case "buffers" => WriteBuffersParam(args.step())
}.getOrElse {
args.minutes.map { minArg =>
val end = System.currentTimeMillis() / 1000
TimeStepParams(end - minArg.toInt * 60, args.step, end)
}.getOrElse(TimeStepParams(args.start, args.step, args.end))
TimeStepParams(end - minArg.toInt * 60, args.step(), end)
}.getOrElse(TimeStepParams(args.start, args.step(), args.end))
}

def main(args: Arguments): Unit = {
def main(rawArgs: Array[String]): Unit = {
val args = new Arguments(rawArgs)
try {
val timeout = args.timeoutSeconds.seconds
args.command match {
val timeout = args.timeoutSeconds().seconds
args.command.toOption match {
case Some("init") =>
println("Initializing FiloDB Admin keyspace and tables...")
parse(metaStore.initialize(), timeout) {
Expand All @@ -137,7 +139,7 @@ object CliMain extends ArgMain[Arguments] with FilodbClusterNode {

case Some("list") =>
args.host.map { server =>
listRegisteredDatasets(Client.standaloneClient(system, server, args.port))
listRegisteredDatasets(Client.standaloneClient(system, server, args.port()))
}

case Some("indexnames") =>
Expand All @@ -146,10 +148,10 @@ object CliMain extends ArgMain[Arguments] with FilodbClusterNode {
names.foreach(println)

case Some("indexvalues") =>
require(args.indexName.nonEmpty, "--indexName required")
require(args.shards.nonEmpty, "--shards required")
require(args.indexName.isDefined, "--indexName required")
require(args.shards.isDefined, "--shards required")
val (remote, ref) = getClientAndRef(args)
val values = remote.getIndexValues(ref, args.indexName.get, args.shards.get.head.toInt, args.limit)
val values = remote.getIndexValues(ref, args.indexName(), args.shards().head.toInt, args.limit())
values.foreach { case (term, freq) => println(f"$term%40s\t$freq") }

case Some("status") =>
Expand All @@ -159,45 +161,45 @@ object CliMain extends ArgMain[Arguments] with FilodbClusterNode {
case Some("validateSchemas") => validateSchemas()

case Some("promFilterToPartKeyBR") =>
require(args.promql.nonEmpty && args.schema.nonEmpty, "--promql and --schema must be defined")
promFilterToPartKeyBr(args.promql.get, args.schema.get)
require(args.promql.isDefined && args.schema.isDefined, "--promql and --schema must be defined")
promFilterToPartKeyBr(args.promql(), args.schema())

case Some("partKeyBrAsString") =>
require(args.hexPk.nonEmpty, "--hexPk must be defined")
partKeyBrAsString(args.hexPk.get)
require(args.hexPk.isDefined, "--hexPk must be defined")
partKeyBrAsString(args.hexPk())

case Some("decodeChunkInfo") =>
require(args.hexChunkInfo.nonEmpty, "--hexChunkInfo must be defined")
decodeChunkInfo(args.hexChunkInfo.get)
require(args.hexChunkInfo.isDefined, "--hexChunkInfo must be defined")
decodeChunkInfo(args.hexChunkInfo())

case Some("decodeVector") =>
require(args.hexVector.nonEmpty && args.vectorType.nonEmpty, "--hexVector and --vectorType must be defined")
decodeVector(args.hexVector.get, args.vectorType.get)
require(args.hexVector.isDefined && args.vectorType.isDefined, "--hexVector and --vectorType must be defined")
decodeVector(args.hexVector(), args.vectorType())

case Some("timeseriesMetadata") =>
require(args.host.nonEmpty && args.dataset.nonEmpty && args.matcher.nonEmpty, "--host, --dataset and --matcher must be defined")
val remote = Client.standaloneClient(system, args.host.get, args.port)
val options = QOptions(args.limit, args.sampleLimit, args.everyNSeconds.map(_.toInt),
timeout, args.shards.map(_.map(_.toInt)), args.spread)
parseTimeSeriesMetadataQuery(remote, args.matcher.get, args.dataset.get,
require(args.host.isDefined && args.dataset.isDefined && args.matcher.isDefined, "--host, --dataset and --matcher must be defined")
val remote = Client.standaloneClient(system, args.host(), args.port())
val options = QOptions(args.limit(), args.sampleLimit(), args.everyNSeconds.map(_.toInt).toOption,
timeout, args.shards.map(_.map(_.toInt)).toOption, args.spread.toOption.map(Integer.valueOf))
parseTimeSeriesMetadataQuery(remote, args.matcher(), args.dataset(),
getQueryRange(args), true, options)

case Some("labelValues") =>
require(args.host.nonEmpty && args.dataset.nonEmpty && args.labelNames.nonEmpty, "--host, --dataset and --labelName must be defined")
val remote = Client.standaloneClient(system, args.host.get, args.port)
val options = QOptions(args.limit, args.sampleLimit, args.everyNSeconds.map(_.toInt),
timeout, args.shards.map(_.map(_.toInt)), args.spread)
parseLabelValuesQuery(remote, args.labelNames, args.labelFilter, args.dataset.get,
require(args.host.isDefined && args.dataset.isDefined && args.labelNames.isDefined, "--host, --dataset and --labelName must be defined")
val remote = Client.standaloneClient(system, args.host(), args.port())
val options = QOptions(args.limit(), args.sampleLimit(), args.everyNSeconds.map(_.toInt).toOption,
timeout, args.shards.map(_.map(_.toInt)).toOption, args.spread.toOption.map(Integer.valueOf))
parseLabelValuesQuery(remote, args.labelNames(), args.labelFilter(), args.dataset(),
getQueryRange(args), options)

case x: Any =>
// This will soon be deprecated
args.promql.map { query =>
require(args.host.nonEmpty && args.dataset.nonEmpty, "--host and --dataset must be defined")
val remote = Client.standaloneClient(system, args.host.get, args.port)
val options = QOptions(args.limit, args.sampleLimit, args.everyNSeconds.map(_.toInt),
timeout, args.shards.map(_.map(_.toInt)), args.spread)
parsePromQuery2(remote, query, args.dataset.get, getQueryRange(args), options)
require(args.host.isDefined && args.dataset.isDefined, "--host and --dataset must be defined")
val remote = Client.standaloneClient(system, args.host(), args.port())
val options = QOptions(args.limit(), args.sampleLimit(), args.everyNSeconds.toOption.map(_.toInt),
timeout, args.shards.toOption.map(_.map(_.toInt)), args.spread.toOption.map(Integer.valueOf))
parsePromQuery2(remote, query, args.dataset(), getQueryRange(args), options)
}
.getOrElse(printHelp)
}
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ object Dependencies {
lazy val cliDeps = Seq(
logbackDep,
"io.kamon" %% "kamon-bundle" % kamonBundleVersion,
"com.quantifind" %% "sumac" % "0.3.0"
"org.rogach" %% "scallop" % "3.1.1"
)

lazy val kafkaDeps = Seq(
Expand Down

0 comments on commit bd629df

Please sign in to comment.