bot api: handle timeouts and disconnects
This commit is contained in:
parent
2f915af919
commit
69421d7b25
1 changed files with 55 additions and 8 deletions
|
@ -6,8 +6,9 @@ use std::collections::HashMap;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use runner::match_context::{EventBus, PlayerHandle, RequestMessage};
|
use runner::match_context::{EventBus, PlayerHandle, RequestError, RequestMessage};
|
||||||
use runner::match_log::MatchLogger;
|
use runner::match_log::MatchLogger;
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||||
|
@ -114,10 +115,16 @@ impl runner::BotSpec for RemoteBotSpec {
|
||||||
);
|
);
|
||||||
|
|
||||||
let client_messages = rx.await.unwrap();
|
let client_messages = rx.await.unwrap();
|
||||||
tokio::spawn(handle_bot_messages(player_id, event_bus, client_messages));
|
tokio::spawn(handle_bot_messages(
|
||||||
|
player_id,
|
||||||
|
event_bus.clone(),
|
||||||
|
client_messages,
|
||||||
|
));
|
||||||
|
|
||||||
Box::new(RemoteBotHandle {
|
Box::new(RemoteBotHandle {
|
||||||
sender: server_msg_snd,
|
sender: server_msg_snd,
|
||||||
|
player_id,
|
||||||
|
event_bus,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -138,19 +145,59 @@ async fn handle_bot_messages(
|
||||||
|
|
||||||
struct RemoteBotHandle {
|
struct RemoteBotHandle {
|
||||||
sender: mpsc::UnboundedSender<Result<pb::PlayerRequest, Status>>,
|
sender: mpsc::UnboundedSender<Result<pb::PlayerRequest, Status>>,
|
||||||
|
player_id: u32,
|
||||||
|
event_bus: Arc<Mutex<EventBus>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PlayerHandle for RemoteBotHandle {
|
impl PlayerHandle for RemoteBotHandle {
|
||||||
fn send_request(&mut self, r: RequestMessage) {
|
fn send_request(&mut self, r: RequestMessage) {
|
||||||
self.sender
|
let res = self.sender.send(Ok(pb::PlayerRequest {
|
||||||
.send(Ok(pb::PlayerRequest {
|
request_id: r.request_id as i32,
|
||||||
request_id: r.request_id as i32,
|
content: r.content,
|
||||||
content: r.content,
|
}));
|
||||||
}))
|
match res {
|
||||||
.unwrap();
|
Ok(()) => {
|
||||||
|
// schedule a timeout. See comments at method implementation
|
||||||
|
tokio::spawn(schedule_timeout(
|
||||||
|
(self.player_id, r.request_id),
|
||||||
|
r.timeout,
|
||||||
|
self.event_bus.clone(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
Err(_send_error) => {
|
||||||
|
// cannot contact the remote bot anymore;
|
||||||
|
// directly mark all requests as timed out.
|
||||||
|
// TODO: create a dedicated error type for this.
|
||||||
|
// should it be logged?
|
||||||
|
self.event_bus
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.resolve_request((self.player_id, r.request_id), Err(RequestError::Timeout));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: this will spawn a task for every request, which might not be ideal.
|
||||||
|
// Some alternatives:
|
||||||
|
// - create a single task that manages all time-outs.
|
||||||
|
// - intersperse timeouts with incoming client messages
|
||||||
|
// - push timeouts upwards, into the matchrunner logic (before we hit the playerhandle).
|
||||||
|
// This was initially not done to allow timer start to be delayed until the message actually arrived
|
||||||
|
// with the player. Is this still needed, or is there a different way to do this?
|
||||||
|
//
|
||||||
|
async fn schedule_timeout(
|
||||||
|
request_id: (u32, u32),
|
||||||
|
duration: Duration,
|
||||||
|
event_bus: Arc<Mutex<EventBus>>,
|
||||||
|
) {
|
||||||
|
tokio::time::sleep(duration).await;
|
||||||
|
event_bus
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.resolve_request(request_id, Err(RequestError::Timeout));
|
||||||
|
}
|
||||||
|
|
||||||
async fn run_match(router: PlayerRouter, pool: ConnectionPool) {
|
async fn run_match(router: PlayerRouter, pool: ConnectionPool) {
|
||||||
let conn = pool.get().await.unwrap();
|
let conn = pool.get().await.unwrap();
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue