Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

misc(cli): use scallop instead of sumac #854

Merged
merged 2 commits into from
Aug 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 73 additions & 71 deletions cli/src/main/scala/filodb.cli/CliMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import scala.concurrent.duration._
import scala.util.Try

import com.opencsv.CSVWriter
import com.quantifind.sumac.{ArgMain, FieldArgs}
import monix.reactive.Observable
import org.rogach.scallop.ScallopConf
import org.scalactic._

import filodb.coordinator._
Expand All @@ -27,41 +27,42 @@ import filodb.prometheus.parse.Parser
import filodb.query._

// scalastyle:off
class Arguments extends FieldArgs {
var dataset: Option[String] = None
var database: Option[String] = None
var command: Option[String] = None
var filename: Option[String] = None
var configPath: Option[String] = None
class Arguments(args: Seq[String]) extends ScallopConf(args) {
val dataset = opt[String]()
val database = opt[String]()
val command = opt[String]()
val filename = opt[String]()
val configPath = opt[String]()
// max # of results returned. Don't make it too high.
var limit: Int = 200
var sampleLimit: Int = 1000000
var timeoutSeconds: Int = 60
var outfile: Option[String] = None
var delimiter: String = ","
var indexName: Option[String] = None
var host: Option[String] = None
var port: Int = 2552
var promql: Option[String] = None
var schema: Option[String] = None
var hexPk: Option[String] = None
var hexVector: Option[String] = None
var hexChunkInfo: Option[String] = None
var vectorType: Option[String] = None
var matcher: Option[String] = None
var labelNames: Seq[String] = Seq.empty
var labelFilter: Map[String, String] = Map.empty
var start: Long = System.currentTimeMillis() / 1000 // promql argument is seconds since epoch
var end: Long = System.currentTimeMillis() / 1000 // promql argument is seconds since epoch
var minutes: Option[String] = None
var step: Long = 10 // in seconds
var chunks: Option[String] = None // select either "memory" or "buffers" chunks only
var everyNSeconds: Option[String] = None
var shards: Option[Seq[String]] = None
var spread: Option[Integer] = None
val limit = opt[Int](default = Some(200))
val sampleLimit = opt[Int](default = Some(1000000))
val timeoutSeconds = opt[Int](default = Some(60))
val outfile = opt[String]()
val delimiter = opt[String](default = Some(",")).apply()
val indexName = opt[String]()
val host = opt[String]()
val port = opt[Int](default = Some(2552))
val promql = opt[String]()
val schema = opt[String]()
val hexPk = opt[String]()
val hexVector = opt[String]()
val hexChunkInfo = opt[String]()
val vectorType = opt[String]()
val matcher = opt[String]()
val labelNames = opt[List[String]](default = Some(List()))
val labelFilter = opt[Map[String, String]](default = Some(Map.empty))
val start: Long = System.currentTimeMillis() / 1000 // promql argument is seconds since epoch
val end: Long = System.currentTimeMillis() / 1000 // promql argument is seconds since epoch
val minutes = opt[String]()
val step = opt[Long](default = Some(10)) // in seconds
val chunks = opt[String]() // select either "memory" or "buffers" chunks only
val everyNSeconds = opt[String]()
val shards = opt[List[String]]()
val spread = opt[Int]()
verify()
}

object CliMain extends ArgMain[Arguments] with FilodbClusterNode {
object CliMain extends FilodbClusterNode {
var exitCode = 0

override val role = ClusterRole.Cli
Expand Down Expand Up @@ -99,30 +100,31 @@ object CliMain extends ArgMain[Arguments] with FilodbClusterNode {
println(" options: ./filo-cli -Dakka.loglevel=DEBUG -Dakka.actor.debug.receive=on -Dakka.actor.debug.autoreceive=on --command importcsv ...")
}

def getRef(args: Arguments): DatasetRef = DatasetRef(args.dataset.get, args.database)
def getRef(args: Arguments): DatasetRef = DatasetRef(args.dataset(), args.database.toOption)

def getClientAndRef(args: Arguments): (LocalClient, DatasetRef) = {
require(args.host.nonEmpty && args.dataset.nonEmpty, "--host and --dataset must be defined")
val remote = Client.standaloneClient(system, args.host.get, args.port)
(remote, DatasetRef(args.dataset.get))
require(args.host.isDefined && args.dataset.isDefined, "--host and --dataset must be defined")
val remote = Client.standaloneClient(system, args.host(), args.port())
(remote, DatasetRef(args.dataset()))
}

def getQueryRange(args: Arguments): TimeRangeParams =
args.chunks.filter { cOpt => cOpt == "memory" || cOpt == "buffers" }
.map {
case "memory" => InMemoryParam(args.step)
case "buffers" => WriteBuffersParam(args.step)
case "memory" => InMemoryParam(args.step())
case "buffers" => WriteBuffersParam(args.step())
}.getOrElse {
args.minutes.map { minArg =>
val end = System.currentTimeMillis() / 1000
TimeStepParams(end - minArg.toInt * 60, args.step, end)
}.getOrElse(TimeStepParams(args.start, args.step, args.end))
TimeStepParams(end - minArg.toInt * 60, args.step(), end)
}.getOrElse(TimeStepParams(args.start, args.step(), args.end))
}

def main(args: Arguments): Unit = {
def main(rawArgs: Array[String]): Unit = {
val args = new Arguments(rawArgs)
try {
val timeout = args.timeoutSeconds.seconds
args.command match {
val timeout = args.timeoutSeconds().seconds
args.command.toOption match {
case Some("init") =>
println("Initializing FiloDB Admin keyspace and tables...")
parse(metaStore.initialize(), timeout) {
Expand All @@ -137,7 +139,7 @@ object CliMain extends ArgMain[Arguments] with FilodbClusterNode {

case Some("list") =>
args.host.map { server =>
listRegisteredDatasets(Client.standaloneClient(system, server, args.port))
listRegisteredDatasets(Client.standaloneClient(system, server, args.port()))
}

case Some("indexnames") =>
Expand All @@ -146,10 +148,10 @@ object CliMain extends ArgMain[Arguments] with FilodbClusterNode {
names.foreach(println)

case Some("indexvalues") =>
require(args.indexName.nonEmpty, "--indexName required")
require(args.shards.nonEmpty, "--shards required")
require(args.indexName.isDefined, "--indexName required")
require(args.shards.isDefined, "--shards required")
val (remote, ref) = getClientAndRef(args)
val values = remote.getIndexValues(ref, args.indexName.get, args.shards.get.head.toInt, args.limit)
val values = remote.getIndexValues(ref, args.indexName(), args.shards().head.toInt, args.limit())
values.foreach { case (term, freq) => println(f"$term%40s\t$freq") }

case Some("status") =>
Expand All @@ -159,45 +161,45 @@ object CliMain extends ArgMain[Arguments] with FilodbClusterNode {
case Some("validateSchemas") => validateSchemas()

case Some("promFilterToPartKeyBR") =>
require(args.promql.nonEmpty && args.schema.nonEmpty, "--promql and --schema must be defined")
promFilterToPartKeyBr(args.promql.get, args.schema.get)
require(args.promql.isDefined && args.schema.isDefined, "--promql and --schema must be defined")
promFilterToPartKeyBr(args.promql(), args.schema())

case Some("partKeyBrAsString") =>
require(args.hexPk.nonEmpty, "--hexPk must be defined")
partKeyBrAsString(args.hexPk.get)
require(args.hexPk.isDefined, "--hexPk must be defined")
partKeyBrAsString(args.hexPk())

case Some("decodeChunkInfo") =>
require(args.hexChunkInfo.nonEmpty, "--hexChunkInfo must be defined")
decodeChunkInfo(args.hexChunkInfo.get)
require(args.hexChunkInfo.isDefined, "--hexChunkInfo must be defined")
decodeChunkInfo(args.hexChunkInfo())

case Some("decodeVector") =>
require(args.hexVector.nonEmpty && args.vectorType.nonEmpty, "--hexVector and --vectorType must be defined")
decodeVector(args.hexVector.get, args.vectorType.get)
require(args.hexVector.isDefined && args.vectorType.isDefined, "--hexVector and --vectorType must be defined")
decodeVector(args.hexVector(), args.vectorType())

case Some("timeseriesMetadata") =>
require(args.host.nonEmpty && args.dataset.nonEmpty && args.matcher.nonEmpty, "--host, --dataset and --matcher must be defined")
val remote = Client.standaloneClient(system, args.host.get, args.port)
val options = QOptions(args.limit, args.sampleLimit, args.everyNSeconds.map(_.toInt),
timeout, args.shards.map(_.map(_.toInt)), args.spread)
parseTimeSeriesMetadataQuery(remote, args.matcher.get, args.dataset.get,
require(args.host.isDefined && args.dataset.isDefined && args.matcher.isDefined, "--host, --dataset and --matcher must be defined")
val remote = Client.standaloneClient(system, args.host(), args.port())
val options = QOptions(args.limit(), args.sampleLimit(), args.everyNSeconds.map(_.toInt).toOption,
timeout, args.shards.map(_.map(_.toInt)).toOption, args.spread.toOption.map(Integer.valueOf))
parseTimeSeriesMetadataQuery(remote, args.matcher(), args.dataset(),
getQueryRange(args), true, options)

case Some("labelValues") =>
require(args.host.nonEmpty && args.dataset.nonEmpty && args.labelNames.nonEmpty, "--host, --dataset and --labelName must be defined")
val remote = Client.standaloneClient(system, args.host.get, args.port)
val options = QOptions(args.limit, args.sampleLimit, args.everyNSeconds.map(_.toInt),
timeout, args.shards.map(_.map(_.toInt)), args.spread)
parseLabelValuesQuery(remote, args.labelNames, args.labelFilter, args.dataset.get,
require(args.host.isDefined && args.dataset.isDefined && args.labelNames.isDefined, "--host, --dataset and --labelName must be defined")
val remote = Client.standaloneClient(system, args.host(), args.port())
val options = QOptions(args.limit(), args.sampleLimit(), args.everyNSeconds.map(_.toInt).toOption,
timeout, args.shards.map(_.map(_.toInt)).toOption, args.spread.toOption.map(Integer.valueOf))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Integer.valueOf for args.spread necessary? Spread is declared with opt[Int]....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QOptions constructor takes Option[Integer] (don't know the reason, seems like unfounded inconsistency).

parseLabelValuesQuery(remote, args.labelNames(), args.labelFilter(), args.dataset(),
getQueryRange(args), options)

case x: Any =>
// This will soon be deprecated
args.promql.map { query =>
require(args.host.nonEmpty && args.dataset.nonEmpty, "--host and --dataset must be defined")
val remote = Client.standaloneClient(system, args.host.get, args.port)
val options = QOptions(args.limit, args.sampleLimit, args.everyNSeconds.map(_.toInt),
timeout, args.shards.map(_.map(_.toInt)), args.spread)
parsePromQuery2(remote, query, args.dataset.get, getQueryRange(args), options)
require(args.host.isDefined && args.dataset.isDefined, "--host and --dataset must be defined")
val remote = Client.standaloneClient(system, args.host(), args.port())
val options = QOptions(args.limit(), args.sampleLimit(), args.everyNSeconds.toOption.map(_.toInt),
timeout, args.shards.toOption.map(_.map(_.toInt)), args.spread.toOption.map(Integer.valueOf))
parsePromQuery2(remote, query, args.dataset(), getQueryRange(args), options)
}
.getOrElse(printHelp)
}
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ object Dependencies {
lazy val cliDeps = Seq(
logbackDep,
"io.kamon" %% "kamon-bundle" % kamonBundleVersion,
"com.quantifind" %% "sumac" % "0.3.0"
"org.rogach" %% "scallop" % "3.1.1"
)

lazy val kafkaDeps = Seq(
Expand Down