ensure bots cleanly stop before a match completes

This commit is contained in:
Ilion Beyst 2022-09-23 21:34:57 +02:00
parent 8f3621813e
commit a1d81ac774
6 changed files with 41 additions and 14 deletions

View file

@ -6,14 +6,18 @@ use std::sync::Mutex;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines};
use tokio::process; use tokio::process;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::time::timeout; use tokio::time::timeout;
use super::match_context::EventBus; use super::match_context::EventBus;
use super::match_context::PlayerHandle; use super::match_context::PlayerHandle;
use super::match_context::RequestError; use super::match_context::RequestError;
use super::match_context::RequestMessage; use super::match_context::RequestMessage;
// TODO: this is exactly the same as the docker bot handle.
// should this abstraction be removed?
pub struct LocalBotHandle { pub struct LocalBotHandle {
tx: mpsc::UnboundedSender<RequestMessage>, tx: mpsc::UnboundedSender<RequestMessage>,
join_handle: JoinHandle<()>,
} }
impl PlayerHandle for LocalBotHandle { impl PlayerHandle for LocalBotHandle {
@ -22,6 +26,10 @@ impl PlayerHandle for LocalBotHandle {
.send(r) .send(r)
.expect("failed to send message to local bot"); .expect("failed to send message to local bot");
} }
fn into_join_handle(self: Box<Self>) -> JoinHandle<()> {
self.join_handle
}
} }
pub fn run_local_bot(player_id: u32, event_bus: Arc<Mutex<EventBus>>, bot: Bot) -> LocalBotHandle { pub fn run_local_bot(player_id: u32, event_bus: Arc<Mutex<EventBus>>, bot: Bot) -> LocalBotHandle {
@ -33,9 +41,9 @@ pub fn run_local_bot(player_id: u32, event_bus: Arc<Mutex<EventBus>>, bot: Bot)
player_id, player_id,
bot, bot,
}; };
tokio::spawn(runner.run()); let join_handle = tokio::spawn(runner.run());
LocalBotHandle { tx } LocalBotHandle { tx, join_handle }
} }
pub struct LocalBotRunner { pub struct LocalBotRunner {

View file

@ -9,6 +9,7 @@ use bytes::Bytes;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::time::timeout; use tokio::time::timeout;
use crate::match_context::{EventBus, PlayerHandle, RequestError, RequestMessage}; use crate::match_context::{EventBus, PlayerHandle, RequestError, RequestMessage};
@ -42,8 +43,7 @@ impl BotSpec for DockerBotSpec {
match_logger: MatchLogger, match_logger: MatchLogger,
) -> Box<dyn PlayerHandle> { ) -> Box<dyn PlayerHandle> {
let process = spawn_docker_process(self).await.unwrap(); let process = spawn_docker_process(self).await.unwrap();
let (handle, runner) = create_docker_bot(process, player_id, event_bus, match_logger); let handle = run_docker_bot(process, player_id, event_bus, match_logger);
tokio::spawn(runner.run());
return Box::new(handle); return Box::new(handle);
} }
} }
@ -155,14 +155,13 @@ impl ContainerProcess {
} }
} }
fn create_docker_bot( fn run_docker_bot(
process: ContainerProcess, process: ContainerProcess,
player_id: u32, player_id: u32,
event_bus: Arc<Mutex<EventBus>>, event_bus: Arc<Mutex<EventBus>>,
match_logger: MatchLogger, match_logger: MatchLogger,
) -> (DockerBotHandle, DockerBotRunner) { ) -> DockerBotHandle {
let (tx, rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel();
let bot_handle = DockerBotHandle { tx };
let bot_runner = DockerBotRunner { let bot_runner = DockerBotRunner {
process, process,
player_id, player_id,
@ -170,11 +169,15 @@ fn create_docker_bot(
match_logger, match_logger,
rx, rx,
}; };
(bot_handle, bot_runner)
let join_handle = tokio::spawn(bot_runner.run());
DockerBotHandle { tx, join_handle }
} }
pub struct DockerBotHandle { pub struct DockerBotHandle {
tx: mpsc::UnboundedSender<RequestMessage>, tx: mpsc::UnboundedSender<RequestMessage>,
join_handle: JoinHandle<()>,
} }
impl PlayerHandle for DockerBotHandle { impl PlayerHandle for DockerBotHandle {
@ -183,6 +186,10 @@ impl PlayerHandle for DockerBotHandle {
.send(r) .send(r)
.expect("failed to send message to local bot"); .expect("failed to send message to local bot");
} }
fn into_join_handle(self: Box<Self>) -> JoinHandle<()> {
self.join_handle
}
} }
pub struct DockerBotRunner { pub struct DockerBotRunner {

View file

@ -106,11 +106,9 @@ pub async fn run_match(config: MatchConfig) -> MatchOutcome {
// ) // )
// .unwrap(); // .unwrap();
let mut match_state = pw_match::PwMatch::create(match_ctx, pw_config); let final_state = pw_match::PwMatch::create(match_ctx, pw_config).run().await;
match_state.run().await;
let final_state = match_state.match_state.state(); let survivors = final_state.state().living_players();
let survivors = final_state.living_players();
let winner = if survivors.len() == 1 { let winner = if survivors.len() == 1 {
Some(survivors[0]) Some(survivors[0])
} else { } else {

View file

@ -7,6 +7,7 @@ use std::{
collections::HashMap, collections::HashMap,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
}; };
use tokio::task::JoinHandle;
use crate::match_log::{MatchLogMessage, MatchLogger}; use crate::match_log::{MatchLogMessage, MatchLogger};
@ -71,10 +72,19 @@ impl MatchCtx {
pub fn log(&mut self, message: MatchLogMessage) { pub fn log(&mut self, message: MatchLogMessage) {
self.match_logger.send(message).expect("write failed"); self.match_logger.send(message).expect("write failed");
} }
pub async fn shutdown(self) {
let join_handles = self
.players
.into_iter()
.map(|(_player_id, player_data)| player_data.handle.into_join_handle());
futures::future::join_all(join_handles).await;
}
} }
pub trait PlayerHandle: Send { pub trait PlayerHandle: Send {
fn send_request(&mut self, r: RequestMessage); fn send_request(&mut self, r: RequestMessage);
fn into_join_handle(self: Box<Self>) -> JoinHandle<()>;
} }
struct PlayerData { struct PlayerData {

View file

@ -39,7 +39,7 @@ impl PwMatch {
} }
} }
pub async fn run(&mut self) { pub async fn run(mut self) -> PlanetWars {
// log initial state // log initial state
self.log_game_state(); self.log_game_state();
@ -53,6 +53,9 @@ impl PwMatch {
self.match_state.step(); self.match_state.step();
self.log_game_state(); self.log_game_state();
} }
self.match_ctx.shutdown().await;
self.match_state
} }
async fn prompt_players(&mut self) -> Vec<(usize, RequestResult<Vec<u8>>)> { async fn prompt_players(&mut self) -> Vec<(usize, RequestResult<Vec<u8>>)> {

View file

@ -70,5 +70,6 @@ async fn docker_runner_timeout() {
.request(1, b"sup".to_vec(), Duration::from_millis(1000)) .request(1, b"sup".to_vec(), Duration::from_millis(1000))
.await; .await;
assert_eq!(resp, Err(RequestError::Timeout)) assert_eq!(resp, Err(RequestError::Timeout));
ctx.shutdown().await;
} }