Skip to content

Commit

Permalink
fix: make flow worker actually run in parallel (#5384)
Browse files Browse the repository at this point in the history
* fix: make flow worker actually run in parallel

* chore: check for underflow

* fix: del duplicate sub

* fix: print server handle error
  • Loading branch information
discord9 authored Jan 16, 2025
1 parent 1acfb6e commit f8d26b4
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 32 deletions.
33 changes: 16 additions & 17 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ pub type FlowWorkerManagerRef = Arc<FlowWorkerManager>;
pub struct FlowWorkerManager {
/// The handler to the worker that will run the dataflow
/// which is `!Send` so a handle is used
pub worker_handles: Vec<Mutex<WorkerHandle>>,
pub worker_handles: Vec<WorkerHandle>,
/// The selector to select a worker to run the dataflow
worker_selector: Mutex<usize>,
/// The query engine that will be used to parse the query and convert it to a dataflow plan
Expand Down Expand Up @@ -236,7 +236,7 @@ impl FlowWorkerManager {

/// add a worker handler to manager, meaning this corresponding worker is under it's manage
pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
self.worker_handles.push(Mutex::new(handle));
self.worker_handles.push(handle);
}
}

Expand Down Expand Up @@ -577,13 +577,16 @@ impl FlowWorkerManager {
pub async fn run(&self, mut shutdown: Option<broadcast::Receiver<()>>) {
debug!("Starting to run");
let default_interval = Duration::from_secs(1);
let mut tick_interval = tokio::time::interval(default_interval);
// burst mode, so that if we miss a tick, we will run immediately to fully utilize the cpu
tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);
let mut avg_spd = 0; // rows/sec
let mut since_last_run = tokio::time::Instant::now();
let run_per_trace = 10;
let mut run_cnt = 0;
loop {
// TODO(discord9): only run when new inputs arrive or scheduled to
let row_cnt = self.run_available(true).await.unwrap_or_else(|err| {
let row_cnt = self.run_available(false).await.unwrap_or_else(|err| {
common_telemetry::error!(err;"Run available errors");
0
});
Expand Down Expand Up @@ -613,9 +616,9 @@ impl FlowWorkerManager {

// for now we want to batch rows until there is around `BATCH_SIZE` rows in send buf
// before trigger a run of flow's worker
// (plus one for prevent div by zero)
let wait_for = since_last_run.elapsed();

// last runs insert speed
let cur_spd = row_cnt * 1000 / wait_for.as_millis().max(1) as usize;
// rapid increase, slow decay
avg_spd = if cur_spd > avg_spd {
Expand All @@ -638,7 +641,10 @@ impl FlowWorkerManager {

METRIC_FLOW_RUN_INTERVAL_MS.set(new_wait.as_millis() as i64);
since_last_run = tokio::time::Instant::now();
tokio::time::sleep(new_wait).await;
tokio::select! {
_ = tick_interval.tick() => (),
_ = tokio::time::sleep(new_wait) => ()
}
}
// flow is now shutdown, drop frontend_invoker early so a ref cycle(in standalone mode) can be prevent:
// FlowWorkerManager.frontend_invoker -> FrontendInvoker.inserter
Expand All @@ -649,23 +655,17 @@ impl FlowWorkerManager {
/// Run all available subgraph in the flow node
/// This will try to run all dataflow in this node
///
/// set `blocking` to true to wait until lock is acquired
/// and false to return immediately if lock is not acquired
/// return numbers of rows send to worker
/// set `blocking` to true to wait until worker finish running
/// false to just trigger run and return immediately
/// return numbers of rows send to worker(Inaccuary)
/// TODO(discord9): add flag for subgraph that have input since last run
pub async fn run_available(&self, blocking: bool) -> Result<usize, Error> {
let mut row_cnt = 0;

let now = self.tick_manager.tick();
for worker in self.worker_handles.iter() {
// TODO(discord9): consider how to handle error in individual worker
if blocking {
worker.lock().await.run_available(now, blocking).await?;
} else if let Ok(worker) = worker.try_lock() {
worker.run_available(now, blocking).await?;
} else {
return Ok(row_cnt);
}
worker.run_available(now, blocking).await?;
}
// check row send and rows remain in send buf
let flush_res = if blocking {
Expand Down Expand Up @@ -736,7 +736,6 @@ impl FlowWorkerManager {
/// remove a flow by it's id
pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
for handle in self.worker_handles.iter() {
let handle = handle.lock().await;
if handle.contains_flow(flow_id).await? {
handle.remove_flow(flow_id).await?;
break;
Expand Down Expand Up @@ -873,7 +872,7 @@ impl FlowWorkerManager {
.await
.insert(flow_id, err_collector.clone());
// TODO(discord9): load balance?
let handle = &self.get_worker_handle_for_create_flow().await;
let handle = self.get_worker_handle_for_create_flow().await;
let create_request = worker::Request::Create {
flow_id,
plan: flow_plan,
Expand Down
18 changes: 15 additions & 3 deletions src/flow/src/adapter/node_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,16 @@ impl SourceSender {
// TODO(discord9): send rows instead so it's just moving a point
if let Some(batch) = send_buf.recv().await {
let len = batch.row_count();
self.send_buf_row_cnt.fetch_sub(len, Ordering::SeqCst);
if let Err(prev_row_cnt) =
self.send_buf_row_cnt
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| x.checked_sub(len))
{
common_telemetry::error!(
"send buf row count underflow, prev = {}, len = {}",
prev_row_cnt,
len
);
}
row_cnt += len;
self.sender
.send(batch)
Expand Down Expand Up @@ -162,18 +171,21 @@ impl SourceSender {
batch_datatypes: &[ConcreteDataType],
) -> Result<usize, Error> {
METRIC_FLOW_INPUT_BUF_SIZE.add(rows.len() as _);
// important for backpressure. if send buf is full, block until it's not
while self.send_buf_row_cnt.load(Ordering::SeqCst) >= BATCH_SIZE * 4 {
tokio::task::yield_now().await;
}

// row count metrics is approx so relaxed order is ok
self.send_buf_row_cnt
.fetch_add(rows.len(), Ordering::SeqCst);
let batch = Batch::try_from_rows_with_types(
rows.into_iter().map(|(row, _, _)| row).collect(),
batch_datatypes,
)
.context(EvalSnafu)?;
common_telemetry::trace!("Send one batch to worker with {} rows", batch.row_count());

self.send_buf_row_cnt
.fetch_add(batch.row_count(), Ordering::SeqCst);
self.send_buf_tx.send(batch).await.map_err(|e| {
crate::error::InternalSnafu {
reason: format!("Failed to send row, error = {:?}", e),
Expand Down
1 change: 0 additions & 1 deletion src/flow/src/adapter/stat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ impl FlowWorkerManager {
pub async fn gen_state_report(&self) -> FlowStat {
let mut full_report = BTreeMap::new();
for worker in self.worker_handles.iter() {
let worker = worker.lock().await;
match worker.get_state_size().await {
Ok(state_size) => {
full_report.extend(state_size.into_iter().map(|(k, v)| (k as u32, v)))
Expand Down
21 changes: 10 additions & 11 deletions src/flow/src/adapter/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,18 @@ use crate::FlowWorkerManager;

impl FlowWorkerManager {
/// Get a worker handle for creating flow, using round robin to select a worker
pub(crate) async fn get_worker_handle_for_create_flow(
&self,
) -> tokio::sync::MutexGuard<WorkerHandle> {
let mut selector = self.worker_selector.lock().await;

*selector += 1;
if *selector >= self.worker_handles.len() {
*selector = 0
pub(crate) async fn get_worker_handle_for_create_flow(&self) -> &WorkerHandle {
let use_idx = {
let mut selector = self.worker_selector.lock().await;
if *selector >= self.worker_handles.len() {
*selector = 0
};
let use_idx = *selector;
*selector += 1;
use_idx
};

// Safety: selector is always in bound
let handle = &self.worker_handles[*selector];
handle.lock().await
&self.worker_handles[use_idx]
}

/// Create table from given schema(will adjust to add auto column if needed), return true if table is created
Expand Down
4 changes: 4 additions & 0 deletions src/flow/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ impl flow_server::Flow for FlowService {
self.manager
.handle(request)
.await
.map_err(|err| {
common_telemetry::error!(err; "Failed to handle flow request");
err
})
.map(Response::new)
.map_err(to_status_with_last_err)
}
Expand Down

0 comments on commit f8d26b4

Please sign in to comment.