From 317fe9eaa514b8a991394286edb695415713dc12 Mon Sep 17 00:00:00 2001
From: discord9 <55937128+discord9@users.noreply.github.com>
Date: Thu, 16 Jan 2025 15:25:30 +0800
Subject: [PATCH] feat: flow's http server (#5372)
* feat: flow's http server
* feat: add cli options for http addr
* test: sqlness runner http addr
* feat: metrics
* chore: also shutdown http server
---
config/config.md | 4 ++++
config/flownode.example.toml | 10 ++++++++++
src/cmd/src/flownode.rs | 14 ++++++++++++++
src/flow/src/adapter.rs | 3 +++
src/flow/src/server.rs | 25 +++++++++++++++++++++++++
tests/runner/src/env.rs | 1 +
6 files changed, 57 insertions(+)
diff --git a/config/config.md b/config/config.md
index f14148bfcc9c..152325f08157 100644
--- a/config/config.md
+++ b/config/config.md
@@ -546,6 +546,10 @@
| `grpc.runtime_size` | Integer | `2` | The number of server worker threads. |
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
+| `http` | -- | -- | The HTTP server options. |
+| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
+| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
+| `http.body_limit` | String | `64MB` | HTTP request body limit.
The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
Set to 0 to disable limit. |
| `meta_client` | -- | -- | The metasrv client options. |
| `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. |
| `meta_client.timeout` | String | `3s` | Operation timeout. |
diff --git a/config/flownode.example.toml b/config/flownode.example.toml
index b27076a4c86b..877cb0892882 100644
--- a/config/flownode.example.toml
+++ b/config/flownode.example.toml
@@ -25,6 +25,16 @@ max_recv_message_size = "512MB"
## The maximum send message size for gRPC server.
max_send_message_size = "512MB"
+## The HTTP server options.
+[http]
+## The address to bind the HTTP server.
+addr = "127.0.0.1:4000"
+## HTTP request timeout. Set to 0 to disable timeout.
+timeout = "30s"
+## HTTP request body limit.
+## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
+## Set to 0 to disable limit.
+body_limit = "64MB"
## The metasrv client options.
[meta_client]
diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs
index 04950975549a..bc1670977c88 100644
--- a/src/cmd/src/flownode.rs
+++ b/src/cmd/src/flownode.rs
@@ -13,6 +13,7 @@
// limitations under the License.
use std::sync::Arc;
+use std::time::Duration;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::information_extension::DistributedInformationExtension;
@@ -142,6 +143,11 @@ struct StartCommand {
/// The prefix of environment variables, default is `GREPTIMEDB_FLOWNODE`;
#[clap(long, default_value = "GREPTIMEDB_FLOWNODE")]
env_prefix: String,
+ #[clap(long)]
+ http_addr: Option,
+ /// HTTP request timeout in seconds.
+ #[clap(long)]
+ http_timeout: Option,
}
impl StartCommand {
@@ -198,6 +204,14 @@ impl StartCommand {
opts.mode = Mode::Distributed;
}
+ if let Some(http_addr) = &self.http_addr {
+ opts.http.addr.clone_from(http_addr);
+ }
+
+ if let Some(http_timeout) = self.http_timeout {
+ opts.http.timeout = Duration::from_secs(http_timeout);
+ }
+
if let (Mode::Distributed, None) = (&opts.mode, &opts.node_id) {
return MissingConfigSnafu {
msg: "Missing node id option",
diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs
index 9924908dd4f4..a1193cb29f6b 100644
--- a/src/flow/src/adapter.rs
+++ b/src/flow/src/adapter.rs
@@ -36,6 +36,7 @@ use query::QueryEngine;
use serde::{Deserialize, Serialize};
use servers::grpc::GrpcOptions;
use servers::heartbeat_options::HeartbeatOptions;
+use servers::http::HttpOptions;
use servers::Mode;
use session::context::QueryContext;
use snafu::{ensure, OptionExt, ResultExt};
@@ -106,6 +107,7 @@ pub struct FlownodeOptions {
pub node_id: Option,
pub flow: FlowConfig,
pub grpc: GrpcOptions,
+ pub http: HttpOptions,
pub meta_client: Option,
pub logging: LoggingOptions,
pub tracing: TracingOptions,
@@ -120,6 +122,7 @@ impl Default for FlownodeOptions {
node_id: None,
flow: FlowConfig::default(),
grpc: GrpcOptions::default().with_addr("127.0.0.1:3004"),
+ http: HttpOptions::default(),
meta_client: None,
logging: LoggingOptions::default(),
tracing: TracingOptions::default(),
diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs
index 4ecda0b66fc7..ee77c46d64c4 100644
--- a/src/flow/src/server.rs
+++ b/src/flow/src/server.rs
@@ -39,6 +39,8 @@ use operator::statement::StatementExecutor;
use partition::manager::PartitionRuleManager;
use query::{QueryEngine, QueryEngineFactory};
use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu};
+use servers::http::{HttpServer, HttpServerBuilder};
+use servers::metrics_handler::MetricsHandler;
use servers::server::Server;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};
@@ -210,6 +212,9 @@ impl servers::server::Server for FlownodeServer {
pub struct FlownodeInstance {
server: FlownodeServer,
addr: SocketAddr,
+ /// only used for health check
+ http_server: HttpServer,
+ http_addr: SocketAddr,
heartbeat_task: Option,
}
@@ -224,6 +229,12 @@ impl FlownodeInstance {
.start(self.addr)
.await
.context(StartServerSnafu)?;
+
+ self.http_server
+ .start(self.http_addr)
+ .await
+ .context(StartServerSnafu)?;
+
Ok(())
}
pub async fn shutdown(&self) -> Result<(), crate::Error> {
@@ -233,6 +244,11 @@ impl FlownodeInstance {
task.shutdown();
}
+ self.http_server
+ .shutdown()
+ .await
+ .context(ShutdownServerSnafu)?;
+
Ok(())
}
@@ -305,12 +321,21 @@ impl FlownodeBuilder {
let server = FlownodeServer::new(FlowService::new(manager.clone()));
+ let http_addr = self.opts.http.addr.parse().context(ParseAddrSnafu {
+ addr: self.opts.http.addr.clone(),
+ })?;
+ let http_server = HttpServerBuilder::new(self.opts.http)
+ .with_metrics_handler(MetricsHandler)
+ .build();
+
let heartbeat_task = self.heartbeat_task;
let addr = self.opts.grpc.addr;
let instance = FlownodeInstance {
server,
addr: addr.parse().context(ParseAddrSnafu { addr })?,
+ http_server,
+ http_addr,
heartbeat_task,
};
Ok(instance)
diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs
index 7ca65b3a5f56..6c646d1d1875 100644
--- a/tests/runner/src/env.rs
+++ b/tests/runner/src/env.rs
@@ -490,6 +490,7 @@ impl Env {
sqlness_home.display()
));
args.push("--metasrv-addrs=127.0.0.1:29302".to_string());
+ args.push(format!("--http-addr=127.0.0.1:2951{id}"));
(args, format!("127.0.0.1:2968{id}"))
}