Skip to content

Commit

Permalink
refactor(junowen): delayed_input
Browse files Browse the repository at this point in the history
  • Loading branch information
progre committed Oct 4, 2023
1 parent 63c3502 commit 7542963
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 90 deletions.
17 changes: 7 additions & 10 deletions junowen/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize};
use tokio::{spawn, sync::broadcast};
use tracing::{debug, info};

use self::delayed_inputs::{DelayedInput, DelayedInputs, InternalDelayedInput};
use self::delayed_inputs::{DelayedInputs, InternalDelayedInput};

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct MatchInitial {
Expand Down Expand Up @@ -134,14 +134,11 @@ impl Session {
self.delayed_inputs.recv_init_round()
}

pub fn enqueue_input(&mut self, input: u8, delay: Option<u8>) {
self.delayed_inputs
.enqueue_input(DelayedInput::Input(input), delay);
}

pub fn dequeue_inputs(&mut self) -> Result<(u8, u8), RecvError> {
let (p1, p2) = self.delayed_inputs.dequeue_inputs()?;
let DelayedInput::Input(p1) = p1;
Ok((p1, p2))
pub fn enqueue_input_and_dequeue(
&mut self,
input: u8,
delay: Option<u8>,
) -> Result<(u8, u8), RecvError> {
self.delayed_inputs.enqueue_input_and_dequeue(input, delay)
}
}
126 changes: 53 additions & 73 deletions junowen/src/session/delayed_inputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,10 @@ use std::{

use anyhow::Result;
use serde::{Deserialize, Serialize};
use tracing::{debug, trace, warn};
use tracing::{debug, trace};

use super::{MatchInitial, RoundInitial};

pub enum DelayedInput {
Input(u8),
}

impl From<DelayedInput> for InternalDelayedInput {
fn from(value: DelayedInput) -> Self {
match value {
DelayedInput::Input(input) => InternalDelayedInput::Input(input),
}
}
}

/** input 以外はホストのみ発行できる */
#[derive(Debug, Deserialize, Serialize)]
pub enum InternalDelayedInput {
Expand Down Expand Up @@ -88,64 +76,59 @@ impl DelayedInputs {
}

pub fn recv_init_round(&mut self) -> Result<Option<RoundInitial>, RecvError> {
while self.dequeue_local().is_some() {
let mut local_delay = None;
loop {
let Some((_, delay)) = self.dequeue_local() else {
break;
};
if let Some(delay) = delay {
debug_assert!(self.host);
local_delay = Some(delay);
}
trace!("local input skipped");
}
if let Some(round_initial) = self.remote_round_initial.take() {
return Ok(round_initial);
}
for _ in 0..10 {
let msg = self.remote_receiver.recv()?;
match msg {
InternalDelayedInput::Delay(delay) => {
self.update_delay(delay);
continue;
}
InternalDelayedInput::Input(_) => {
trace!("remote input skipped");
continue;
}
InternalDelayedInput::InitMatch(_) => panic!("unexpected message: {:?}", msg),
InternalDelayedInput::InitRound(round_initial) => {
trace!("delay gap updated: {}", self.delay_gap());
return Ok(round_initial);
}
let mut remote_delay = None;
let round_initial = loop {
if let Some(round_initial) = self.remote_round_initial.take() {
break round_initial;
}
}
panic!("desync");
}

pub fn enqueue_input(&mut self, input: DelayedInput, delay: Option<u8>) {
if self.delay_gap() > 0 {
// self.delay_gap() -= 1;
trace!("delay gap updated: {}", self.delay_gap());
return;
}
let (_, delay) = self.dequeue_remote()?;
if let Some(delay) = delay {
debug_assert!(self.host);
remote_delay = Some(delay);
}
trace!("remote input skipped");
};
let delay = if self.host { local_delay } else { remote_delay };
if let Some(delay) = delay {
let _ = self.remote_sender.send(InternalDelayedInput::Delay(delay));
self.local.push_back(InternalDelayedInput::Delay(delay));
self.update_delay(delay);
}
let DelayedInput::Input(input_u8) = input;
let _ = self
.remote_sender
.send(InternalDelayedInput::Input(input_u8));
self.local.push_back(input.into());
Ok(round_initial)
}

pub fn dequeue_inputs(&mut self) -> Result<(DelayedInput, u8), RecvError> {
// TODO: enqueue したりしなかったりする分でギャップがずれる
if self.delay_gap() < 0 {
// self.delay_gap() += 1;
pub fn enqueue_input_and_dequeue(
&mut self,
input: u8,
delay: Option<u8>,
) -> Result<(u8, u8), RecvError> {
let delay_gap = self.delay_gap();
if delay_gap <= 0 {
if let Some(delay) = delay {
let _ = self.remote_sender.send(InternalDelayedInput::Delay(delay));
self.local.push_back(InternalDelayedInput::Delay(delay));
}
let _ = self.remote_sender.send(InternalDelayedInput::Input(input));
self.local.push_back(InternalDelayedInput::Input(input));
}
if delay_gap < 0 {
trace!("delay gap updated: {}", self.delay_gap());
return Ok((DelayedInput::Input(0), 0));
return Ok((0, 0));
}
let (local, local_delay) = self.dequeue_local().unwrap();
let (remote, remote_delay) = self.dequeue_remote()?;
let (p1, p2, delay) = if self.host {
let (local, local_delay) = self.dequeue_local().unwrap();
let (DelayedInput::Input(remote), _remote_delay) = self.dequeue_remote()?;
(local, remote, local_delay)
} else {
let (remote, remote_delay) = self.dequeue_remote()?;
let (DelayedInput::Input(local), _local_delay) = self.dequeue_local().unwrap();
(remote, local, remote_delay)
};
if let Some(delay) = delay {
Expand All @@ -160,46 +143,43 @@ impl DelayedInputs {
debug!("delay gap={}", self.delay_gap());
}

fn dequeue_local(&mut self) -> Option<(DelayedInput, Option<u8>)> {
fn dequeue_local(&mut self) -> Option<(u8, Option<u8>)> {
let mut delay = None;
loop {
let local = self.local.pop_front()?;
debug_assert!(matches!(local, InternalDelayedInput::Input(_)) || self.host);
match local {
InternalDelayedInput::InitMatch(_) => panic!("unexpected message: {:?}", local),
InternalDelayedInput::Delay(d) => {
debug_assert!(self.host);
delay = Some(d);
continue;
}
InternalDelayedInput::Input(input) => {
return Some((DelayedInput::Input(input), delay))
}
InternalDelayedInput::InitMatch(_) | InternalDelayedInput::InitRound(_) => panic!(),
InternalDelayedInput::Input(input) => return Some((input, delay)),
InternalDelayedInput::InitRound(_) => panic!(),
}
}
}

fn dequeue_remote(&mut self) -> Result<(DelayedInput, Option<u8>), RecvError> {
fn dequeue_remote(&mut self) -> Result<(u8, Option<u8>), RecvError> {
if self.remote_round_initial.is_some() {
return Ok((DelayedInput::Input(0), None));
return Ok((0, None));
}
let mut delay = None;
loop {
let remote = self.remote_receiver.recv()?;
match remote {
InternalDelayedInput::InitMatch(_) => panic!("unexpected message: {:?}", remote),
InternalDelayedInput::Delay(d) => {
debug_assert!(!self.host);
delay = Some(d);
continue;
}
InternalDelayedInput::Input(input) => {
return Ok((DelayedInput::Input(input), delay))
}
InternalDelayedInput::InitMatch(_) => {
warn!("MAYBE DESYNC: {:?}", remote)
}
InternalDelayedInput::Input(input) => return Ok((input, delay)),
InternalDelayedInput::InitRound(round_initial) => {
debug_assert!(self.remote_round_initial.is_none());
self.remote_round_initial = Some(round_initial);
return Ok((DelayedInput::Input(0), None));
return Ok((0, None));
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions junowen/src/state/game.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ pub fn on_input_players(session: &mut Session, th19: &mut Th19) -> Result<(), Re
input.set_p1_input(Input(0));
input.set_p2_input(Input(0));
} else {
session.enqueue_input(th19.input().p1_input().0 as u8, None);
let (p1, p2) = session.dequeue_inputs()?;
let (p1, p2) = session.enqueue_input_and_dequeue(th19.input().p1_input().0 as u8, None)?;
let input = th19.input_mut();
input.set_p1_input(Input(p1 as u32));
input.set_p2_input(Input(p2 as u32));
Expand Down
8 changes: 3 additions & 5 deletions junowen/src/state/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ pub fn on_input_players(
let raw_keys = &th19.input().input_device_array[0].raw_keys;
(0..=9).find(|i| raw_keys[(b'0' + i) as usize] & 0x80 != 0)
};
session.enqueue_input(th19.input().p1_input().0 as u8, delay);
let (p1, p2) = session.dequeue_inputs()?;
let (p1, p2) = session.enqueue_input_and_dequeue(th19.input().p1_input().0 as u8, delay)?;
let input = th19.input_mut();
input.set_p1_input(Input(p1 as u32));
input.set_p2_input(Input(p2 as u32));
Expand All @@ -76,15 +75,14 @@ pub fn on_input_menu(session: &mut Session, th19: &mut Th19) -> Result<(), RecvE
return Ok(());
}

session.enqueue_input(
let (p1, _p2) = session.enqueue_input_and_dequeue(
if session.host() {
th19.menu_input().0 as u8
} else {
Input::NULL as u8
},
None,
);
let (p1, _p2) = session.dequeue_inputs()?;
)?;
*th19.menu_input_mut() = Input(p1 as u32);
Ok(())
}
Expand Down

0 comments on commit 7542963

Please sign in to comment.