From 317fe9eaa514b8a991394286edb695415713dc12 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Thu, 16 Jan 2025 15:25:30 +0800 Subject: [PATCH 1/2] feat: flow's http server (#5372) * feat: flow's http server * feat: add cli options for http addr * test: sqlness runner http addr * feat: metrics * chore: also shutdown http server --- config/config.md | 4 ++++ config/flownode.example.toml | 10 ++++++++++ src/cmd/src/flownode.rs | 14 ++++++++++++++ src/flow/src/adapter.rs | 3 +++ src/flow/src/server.rs | 25 +++++++++++++++++++++++++ tests/runner/src/env.rs | 1 + 6 files changed, 57 insertions(+) diff --git a/config/config.md b/config/config.md index f14148bfcc9c..152325f08157 100644 --- a/config/config.md +++ b/config/config.md @@ -546,6 +546,10 @@ | `grpc.runtime_size` | Integer | `2` | The number of server worker threads. | | `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. | | `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. | +| `http` | -- | -- | The HTTP server options. | +| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. | +| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. | +| `http.body_limit` | String | `64MB` | HTTP request body limit.
The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
Set to 0 to disable limit. | | `meta_client` | -- | -- | The metasrv client options. | | `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. | | `meta_client.timeout` | String | `3s` | Operation timeout. | diff --git a/config/flownode.example.toml b/config/flownode.example.toml index b27076a4c86b..877cb0892882 100644 --- a/config/flownode.example.toml +++ b/config/flownode.example.toml @@ -25,6 +25,16 @@ max_recv_message_size = "512MB" ## The maximum send message size for gRPC server. max_send_message_size = "512MB" +## The HTTP server options. +[http] +## The address to bind the HTTP server. +addr = "127.0.0.1:4000" +## HTTP request timeout. Set to 0 to disable timeout. +timeout = "30s" +## HTTP request body limit. +## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`. +## Set to 0 to disable limit. +body_limit = "64MB" ## The metasrv client options. [meta_client] diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index 04950975549a..bc1670977c88 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::sync::Arc; +use std::time::Duration; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; use catalog::information_extension::DistributedInformationExtension; @@ -142,6 +143,11 @@ struct StartCommand { /// The prefix of environment variables, default is `GREPTIMEDB_FLOWNODE`; #[clap(long, default_value = "GREPTIMEDB_FLOWNODE")] env_prefix: String, + #[clap(long)] + http_addr: Option, + /// HTTP request timeout in seconds. + #[clap(long)] + http_timeout: Option, } impl StartCommand { @@ -198,6 +204,14 @@ impl StartCommand { opts.mode = Mode::Distributed; } + if let Some(http_addr) = &self.http_addr { + opts.http.addr.clone_from(http_addr); + } + + if let Some(http_timeout) = self.http_timeout { + opts.http.timeout = Duration::from_secs(http_timeout); + } + if let (Mode::Distributed, None) = (&opts.mode, &opts.node_id) { return MissingConfigSnafu { msg: "Missing node id option", diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 9924908dd4f4..a1193cb29f6b 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -36,6 +36,7 @@ use query::QueryEngine; use serde::{Deserialize, Serialize}; use servers::grpc::GrpcOptions; use servers::heartbeat_options::HeartbeatOptions; +use servers::http::HttpOptions; use servers::Mode; use session::context::QueryContext; use snafu::{ensure, OptionExt, ResultExt}; @@ -106,6 +107,7 @@ pub struct FlownodeOptions { pub node_id: Option, pub flow: FlowConfig, pub grpc: GrpcOptions, + pub http: HttpOptions, pub meta_client: Option, pub logging: LoggingOptions, pub tracing: TracingOptions, @@ -120,6 +122,7 @@ impl Default for FlownodeOptions { node_id: None, flow: FlowConfig::default(), grpc: GrpcOptions::default().with_addr("127.0.0.1:3004"), + http: HttpOptions::default(), meta_client: None, logging: LoggingOptions::default(), tracing: TracingOptions::default(), diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 4ecda0b66fc7..ee77c46d64c4 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -39,6 +39,8 @@ use operator::statement::StatementExecutor; use partition::manager::PartitionRuleManager; use query::{QueryEngine, QueryEngineFactory}; use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu}; +use servers::http::{HttpServer, HttpServerBuilder}; +use servers::metrics_handler::MetricsHandler; use servers::server::Server; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{ensure, OptionExt, ResultExt}; @@ -210,6 +212,9 @@ impl servers::server::Server for FlownodeServer { pub struct FlownodeInstance { server: FlownodeServer, addr: SocketAddr, + /// only used for health check + http_server: HttpServer, + http_addr: SocketAddr, heartbeat_task: Option, } @@ -224,6 +229,12 @@ impl FlownodeInstance { .start(self.addr) .await .context(StartServerSnafu)?; + + self.http_server + .start(self.http_addr) + .await + .context(StartServerSnafu)?; + Ok(()) } pub async fn shutdown(&self) -> Result<(), crate::Error> { @@ -233,6 +244,11 @@ impl FlownodeInstance { task.shutdown(); } + self.http_server + .shutdown() + .await + .context(ShutdownServerSnafu)?; + Ok(()) } @@ -305,12 +321,21 @@ impl FlownodeBuilder { let server = FlownodeServer::new(FlowService::new(manager.clone())); + let http_addr = self.opts.http.addr.parse().context(ParseAddrSnafu { + addr: self.opts.http.addr.clone(), + })?; + let http_server = HttpServerBuilder::new(self.opts.http) + .with_metrics_handler(MetricsHandler) + .build(); + let heartbeat_task = self.heartbeat_task; let addr = self.opts.grpc.addr; let instance = FlownodeInstance { server, addr: addr.parse().context(ParseAddrSnafu { addr })?, + http_server, + http_addr, heartbeat_task, }; Ok(instance) diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 7ca65b3a5f56..6c646d1d1875 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -490,6 +490,7 @@ impl Env { sqlness_home.display() )); args.push("--metasrv-addrs=127.0.0.1:29302".to_string()); + args.push(format!("--http-addr=127.0.0.1:2951{id}")); (args, format!("127.0.0.1:2968{id}")) } From 0db10a33d0c80ec10a03ac6bdf5318e6f1fd288e Mon Sep 17 00:00:00 2001 From: Yohan Wal Date: Thu, 16 Jan 2025 16:06:28 +0800 Subject: [PATCH 2/2] chore: update proto rev (#5379) --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9c7fb40aac3d..a3e5f0309c5b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4449,7 +4449,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=9c56862fdcf713ad485932a62702b8afbd5a22dd#9c56862fdcf713ad485932a62702b8afbd5a22dd" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=4a173785b3376267c4d62b6e0b0a54ca040822aa#4a173785b3376267c4d62b6e0b0a54ca040822aa" dependencies = [ "prost 0.12.6", "serde", diff --git a/Cargo.toml b/Cargo.toml index 616cbe3b4012..0e9e5f4e87a6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -124,7 +124,7 @@ etcd-client = "0.13" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "9c56862fdcf713ad485932a62702b8afbd5a22dd" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "4a173785b3376267c4d62b6e0b0a54ca040822aa" } hex = "0.4" http = "0.2" humantime = "2.1"