diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index a1193cb29f6b..373cc7e8917b 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -149,7 +149,7 @@ pub type FlowWorkerManagerRef = Arc; pub struct FlowWorkerManager { /// The handler to the worker that will run the dataflow /// which is `!Send` so a handle is used - pub worker_handles: Vec>, + pub worker_handles: Vec, /// The selector to select a worker to run the dataflow worker_selector: Mutex, /// The query engine that will be used to parse the query and convert it to a dataflow plan @@ -236,7 +236,7 @@ impl FlowWorkerManager { /// add a worker handler to manager, meaning this corresponding worker is under it's manage pub fn add_worker_handle(&mut self, handle: WorkerHandle) { - self.worker_handles.push(Mutex::new(handle)); + self.worker_handles.push(handle); } } @@ -577,13 +577,16 @@ impl FlowWorkerManager { pub async fn run(&self, mut shutdown: Option>) { debug!("Starting to run"); let default_interval = Duration::from_secs(1); + let mut tick_interval = tokio::time::interval(default_interval); + // burst mode, so that if we miss a tick, we will run immediately to fully utilize the cpu + tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst); let mut avg_spd = 0; // rows/sec let mut since_last_run = tokio::time::Instant::now(); let run_per_trace = 10; let mut run_cnt = 0; loop { // TODO(discord9): only run when new inputs arrive or scheduled to - let row_cnt = self.run_available(true).await.unwrap_or_else(|err| { + let row_cnt = self.run_available(false).await.unwrap_or_else(|err| { common_telemetry::error!(err;"Run available errors"); 0 }); @@ -613,9 +616,9 @@ impl FlowWorkerManager { // for now we want to batch rows until there is around `BATCH_SIZE` rows in send buf // before trigger a run of flow's worker - // (plus one for prevent div by zero) let wait_for = since_last_run.elapsed(); + // last runs insert speed let cur_spd = row_cnt * 1000 / wait_for.as_millis().max(1) as usize; // rapid increase, slow decay avg_spd = if cur_spd > avg_spd { @@ -638,7 +641,10 @@ impl FlowWorkerManager { METRIC_FLOW_RUN_INTERVAL_MS.set(new_wait.as_millis() as i64); since_last_run = tokio::time::Instant::now(); - tokio::time::sleep(new_wait).await; + tokio::select! { + _ = tick_interval.tick() => (), + _ = tokio::time::sleep(new_wait) => () + } } // flow is now shutdown, drop frontend_invoker early so a ref cycle(in standalone mode) can be prevent: // FlowWorkerManager.frontend_invoker -> FrontendInvoker.inserter @@ -649,9 +655,9 @@ impl FlowWorkerManager { /// Run all available subgraph in the flow node /// This will try to run all dataflow in this node /// - /// set `blocking` to true to wait until lock is acquired - /// and false to return immediately if lock is not acquired - /// return numbers of rows send to worker + /// set `blocking` to true to wait until worker finish running + /// false to just trigger run and return immediately + /// return numbers of rows send to worker(Inaccuary) /// TODO(discord9): add flag for subgraph that have input since last run pub async fn run_available(&self, blocking: bool) -> Result { let mut row_cnt = 0; @@ -659,13 +665,7 @@ impl FlowWorkerManager { let now = self.tick_manager.tick(); for worker in self.worker_handles.iter() { // TODO(discord9): consider how to handle error in individual worker - if blocking { - worker.lock().await.run_available(now, blocking).await?; - } else if let Ok(worker) = worker.try_lock() { - worker.run_available(now, blocking).await?; - } else { - return Ok(row_cnt); - } + worker.run_available(now, blocking).await?; } // check row send and rows remain in send buf let flush_res = if blocking { @@ -736,7 +736,6 @@ impl FlowWorkerManager { /// remove a flow by it's id pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> { for handle in self.worker_handles.iter() { - let handle = handle.lock().await; if handle.contains_flow(flow_id).await? { handle.remove_flow(flow_id).await?; break; @@ -873,7 +872,7 @@ impl FlowWorkerManager { .await .insert(flow_id, err_collector.clone()); // TODO(discord9): load balance? - let handle = &self.get_worker_handle_for_create_flow().await; + let handle = self.get_worker_handle_for_create_flow().await; let create_request = worker::Request::Create { flow_id, plan: flow_plan, diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index d0cc5ffce5bf..7983b396fedc 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -130,7 +130,16 @@ impl SourceSender { // TODO(discord9): send rows instead so it's just moving a point if let Some(batch) = send_buf.recv().await { let len = batch.row_count(); - self.send_buf_row_cnt.fetch_sub(len, Ordering::SeqCst); + if let Err(prev_row_cnt) = + self.send_buf_row_cnt + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| x.checked_sub(len)) + { + common_telemetry::error!( + "send buf row count underflow, prev = {}, len = {}", + prev_row_cnt, + len + ); + } row_cnt += len; self.sender .send(batch) @@ -162,18 +171,21 @@ impl SourceSender { batch_datatypes: &[ConcreteDataType], ) -> Result { METRIC_FLOW_INPUT_BUF_SIZE.add(rows.len() as _); + // important for backpressure. if send buf is full, block until it's not while self.send_buf_row_cnt.load(Ordering::SeqCst) >= BATCH_SIZE * 4 { tokio::task::yield_now().await; } + // row count metrics is approx so relaxed order is ok - self.send_buf_row_cnt - .fetch_add(rows.len(), Ordering::SeqCst); let batch = Batch::try_from_rows_with_types( rows.into_iter().map(|(row, _, _)| row).collect(), batch_datatypes, ) .context(EvalSnafu)?; common_telemetry::trace!("Send one batch to worker with {} rows", batch.row_count()); + + self.send_buf_row_cnt + .fetch_add(batch.row_count(), Ordering::SeqCst); self.send_buf_tx.send(batch).await.map_err(|e| { crate::error::InternalSnafu { reason: format!("Failed to send row, error = {:?}", e), diff --git a/src/flow/src/adapter/stat.rs b/src/flow/src/adapter/stat.rs index c719e35f3ca9..4dd990b7e5f0 100644 --- a/src/flow/src/adapter/stat.rs +++ b/src/flow/src/adapter/stat.rs @@ -22,7 +22,6 @@ impl FlowWorkerManager { pub async fn gen_state_report(&self) -> FlowStat { let mut full_report = BTreeMap::new(); for worker in self.worker_handles.iter() { - let worker = worker.lock().await; match worker.get_state_size().await { Ok(state_size) => { full_report.extend(state_size.into_iter().map(|(k, v)| (k as u32, v))) diff --git a/src/flow/src/adapter/util.rs b/src/flow/src/adapter/util.rs index 26b96f75b99f..6cedb7ac347d 100644 --- a/src/flow/src/adapter/util.rs +++ b/src/flow/src/adapter/util.rs @@ -35,19 +35,18 @@ use crate::FlowWorkerManager; impl FlowWorkerManager { /// Get a worker handle for creating flow, using round robin to select a worker - pub(crate) async fn get_worker_handle_for_create_flow( - &self, - ) -> tokio::sync::MutexGuard { - let mut selector = self.worker_selector.lock().await; - - *selector += 1; - if *selector >= self.worker_handles.len() { - *selector = 0 + pub(crate) async fn get_worker_handle_for_create_flow(&self) -> &WorkerHandle { + let use_idx = { + let mut selector = self.worker_selector.lock().await; + if *selector >= self.worker_handles.len() { + *selector = 0 + }; + let use_idx = *selector; + *selector += 1; + use_idx }; - // Safety: selector is always in bound - let handle = &self.worker_handles[*selector]; - handle.lock().await + &self.worker_handles[use_idx] } /// Create table from given schema(will adjust to add auto column if needed), return true if table is created diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index ee77c46d64c4..7e7434a2c6d7 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -88,6 +88,10 @@ impl flow_server::Flow for FlowService { self.manager .handle(request) .await + .map_err(|err| { + common_telemetry::error!(err; "Failed to handle flow request"); + err + }) .map(Response::new) .map_err(to_status_with_last_err) }