Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): API and Event processing improvements #1152

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

seriousben
Copy link
Member

@seriousben seriousben commented Jan 9, 2025

Context

We've seen in:

That API calls that can mutate the state while jobs are happening (task creation, allocation, replay) result in a multitude of edge cases.

What

We decided to take a no-locking approach to fix the root cause which requires a significant overhaul of the contract between API, state machines and tasks/worker.

This is as opposed to the alternative which would require leveraging rocksdb get_for_update locks.

The benefits of the chosen approach are:

  • No contentions due to locks
  • No risks of deadlocks because of locks
  • Enables copy-on-write without needing retries.
  • Enables batching state updates.
  • Prevents difficult to debug bugs by taking an approach that forces getting rid of concurrency edge cases.

The downsides of this approach are:

  • Increased complexity of coordinating between different async tasks.
  • Until multiple namespace processors are supported, environments with high number of namespaces will have a higher latency compared to API calling the state machine directly.

Changes:

  • New concepts:
    • Processors which serializes request and events processing by domain. Right now we have two processors: namespace and task allocator. The tast allocator does task allocation and executor management. The namespace processor does the rest.
    • Dispatcher: which dispatches requests and state changes to the right processor.
  • The API calls mutating the state get their request processed by a processor before getting to the state machine.
  • State machine state changes are also processed by the same processor as the requests guaranteeing that operations/mutations for the same entities is done serially.
  • Processors moved within the new processor crate.
  • All integration tests moved in server/src.
  • All integration tests leverage the server Service struct
  • Processors are build on top of an abstraction so that they all leverage channels for requests and state change events the same way.
  • New metrics:
    • requests_inflight number of requests in flight. (Per processor)
    • requests_queue_duration_seconds time spent waiting for a processor in seconds. (per processor)
    • processor_duration_seconds Processors latencies in seconds. (per processor)

Diagram

sequenceDiagram
    participant API
    participant Dispatcher
    participant Processor
    participant StateMachine

    Note right of Dispatcher: API Calls Processing
    
    API->>+Dispatcher: dispatch_request()
    Dispatcher-->>+Processor: trigger processor<br/>for request (async)
    
    loop for each requests
        Processor->>Processor: process request
        Processor->>StateMachine: write request
        Processor->>-Dispatcher: request handled
        Dispatcher-->>API: 
    end

    Note right of Dispatcher: State Change Processing

    StateMachine->>Dispatcher: dispatch_event(StateChange)
    Dispatcher-->>+Processor: trigger processor(s) for StateChange.change_type (async)
    
    loop for each state_changes
        Processor->>StateMachine: get unprocessed state changes
        Processor->>Processor: process state change
    end
    Processor->>-StateMachine: batch write processor_update<br/>for state changes
Loading

Metrics example:

Details

HELP requests_inflight number of requests in flight

TYPE requests_inflight gauge

requests_inflight{processor="Namespace",otel_scope_name="dispatcher_metrics"} 292
requests_inflight{processor="TaskAllocator",otel_scope_name="dispatcher_metrics"} 10

HELP requests_queue_duration_seconds time spent waiting for a processor in seconds

TYPE requests_queue_duration_seconds histogram

requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="0.001"} 132
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="0.005"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="0.01"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="0.05"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="0.1"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="0.5"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="1"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="5"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="10"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="25"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="50"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="75"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="100"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="250"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="500"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="750"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="1000"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="2500"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="5000"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="7500"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="10000"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="+Inf"} 146
requests_queue_duration_seconds_sum{processor="Namespace",otel_scope_name="dispatcher_metrics"} 0.035963831999999994
requests_queue_duration_seconds_count{processor="Namespace",otel_scope_name="dispatcher_metrics"} 146

Future:

  • High Priority: Deletion of compute graphs and invocations still have edge cases, we need to leverage processors and events to address those.
  • High Priority: Namespace processor should do a single writes for all state changes. Doing so will require keeping track of state to write in order to not create many reduction tasks at once.
  • Medium Priority: Replay processing should be merged within the namespace processor in order to properly serialize namespace operations.
  • Low Priority: GC should become a type of system task managed by the system task processor.
  • Low Priority: To fully take advantage of these processors, we should fetch all the state needed for a processor on first run and update it based on operations instead of reading from storage multiple times during a single run.
  • Low Priority: In order to prevent one namespace from impacting other namespaces, we should start one namespace processor per namespace especially with replay possibly swamping the namespace processor.

Testing

Contribution Checklist

  • If a Python package was changed, please run make fmt in the package directory.
  • If the server was changed, please run make fmt in server/.
  • Make sure all PR Checks are passing.

Copy link
Collaborator

@diptanu diptanu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eyeballed the PR and left some comments Going to come back to it.

@@ -1004,6 +1001,40 @@ impl Display for StateChangeId {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ProcessorId {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pub struct ProcessorId(ProcessorType) would do the job

@@ -1012,13 +1043,175 @@ pub struct StateChange {
pub created_at: u64,
pub processed_at: Option<u64>,
}
impl StateChange {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are changing the key prefix for statechanges, lets add namespaces to the keys, so that we can scan by namespaces. This would make it easy to not process state changes serially from all namespaces. Global state changes such as cluster topology changes could have global as a prefix, and make sure global is never used as namespace name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea actually, also makes it possible to migrate to processor per namespace very easily.


#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Namespace {
pub name: String,
pub created_at: u64,
}

pub struct StateMachineUpdateRequest {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest moving these types into a requests.rs file in the interst of keeping this file smaller. Long files are hard for editors to handle.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, let me see if they can go back in state_store based on create dependencies.

let scheduler_invocations = meter
.f64_histogram("scheduler_invocations")
.with_description("Scheduler invocation latencies in seconds")
let processors_duration = meter
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not measure both processor duration and scheduler invocation duration?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are but using a label on the metrics.
So the same metric name is used for all processors.

#[derive(Debug, Clone)]
struct ProcessorHandle {
state_change_tx: tokio::sync::watch::Sender<()>,
request_tx: tokio::sync::mpsc::UnboundedSender<DispatchedRequest>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably not a good idea. Keeping this to even 5k might be better than making this unbounded. We could keep this in the first iteration of this PR though if it makes things easy.

Copy link
Member Author

@seriousben seriousben Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change this as part of splitting up of having a processor per namespace, currently this behaves a lot like unbounded API requests coming in. Except taking more memory.

Edit: I'll change this in this PR.

server/processor/src/dispatcher.rs Show resolved Hide resolved
server/processor/src/dispatcher.rs Outdated Show resolved Hide resolved

impl StateChangeDispatcher for Dispatcher {
fn dispatch_event(&self, changes: Vec<StateChange>) -> Result<()> {
changes
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is happening here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am adding comments and tweaking it a bit.

    /// Dispatches state changes to the appropriate processors based on the change types.
    fn dispatch_state_change(&self, changes: Vec<StateChange>) -> Result<()> {
        changes
            .iter()
            // get all processor ids that are subscribed to the state change
            .flat_map(|sc| self.subscribed_processor_ids(sc.clone()))
            // dedupe the processor ids
            .unique()
            // dispatch the state change to each processor
            .try_for_each(|processor_id| -> Result<()> {
                let handle = match self.processors.get(&processor_id) {
                    Some(tx) => Ok(tx.value().clone()),
                    None => Err(anyhow::anyhow!("processor not found: {:?}", processor_id)),
                }?;
                handle.state_change_tx.send(())?;
                Ok(())
            })?;

        Ok(())
    }

@seriousben
Copy link
Member Author

New change.

  • New metrics:
    • requests_inflight number of requests in flight. (Per processor)
    • requests_queue_duration_seconds time spent waiting for a processor in seconds. (per processor)
    • processor_duration_seconds Processors latencies in seconds. (per processor)

@seriousben seriousben force-pushed the seriousben/processor-refactoring branch 2 times, most recently from edf60e2 to 7148a3f Compare January 9, 2025 21:09
@seriousben seriousben force-pushed the seriousben/processor-refactoring branch from 7148a3f to 1332cd9 Compare January 9, 2025 21:10
@seriousben seriousben force-pushed the seriousben/processor-refactoring branch from 1332cd9 to f6c580d Compare January 9, 2025 21:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants