From 54b9694f0d0d7e853592317d60ad262ae8c13568 Mon Sep 17 00:00:00 2001 From: Ilion Beyst Date: Wed, 23 Feb 2022 21:08:56 +0100 Subject: [PATCH] implement matchlogger --- planetwars-matchrunner/Cargo.toml | 4 +- planetwars-matchrunner/src/docker_runner.rs | 118 ++++++++++++-------- planetwars-matchrunner/src/lib.rs | 57 ++++++---- planetwars-matchrunner/src/match_context.rs | 17 ++- planetwars-matchrunner/src/match_log.rs | 45 ++++++++ planetwars-matchrunner/src/pw_match.rs | 10 +- 6 files changed, 162 insertions(+), 89 deletions(-) create mode 100644 planetwars-matchrunner/src/match_log.rs diff --git a/planetwars-matchrunner/Cargo.toml b/planetwars-matchrunner/Cargo.toml index b041d61..f69e7ff 100644 --- a/planetwars-matchrunner/Cargo.toml +++ b/planetwars-matchrunner/Cargo.toml @@ -18,6 +18,6 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" planetwars-rules = { path = "../planetwars-rules" } chrono = { version = "0.4", features = ["serde"] } -bollard = { git = "https://github.com/antoinert/bollard" } +bollard = { git = "https://github.com/fussybeaver/bollard", rev = "c5d87a4934c70a04f9c649fedb241dbd4943c927" } bytes = "1.1" -async-trait = "0.1" \ No newline at end of file +async-trait = "0.1" diff --git a/planetwars-matchrunner/src/docker_runner.rs b/planetwars-matchrunner/src/docker_runner.rs index 5900d92..d563d60 100644 --- a/planetwars-matchrunner/src/docker_runner.rs +++ b/planetwars-matchrunner/src/docker_runner.rs @@ -13,6 +13,7 @@ use tokio::sync::mpsc; use tokio::time::timeout; use crate::match_context::{EventBus, PlayerHandle, RequestError, RequestMessage}; +use crate::match_log::{MatchLogMessage, MatchLogger, StdErrMessage}; use crate::BotSpec; #[derive(Clone, Debug)] @@ -28,10 +29,11 @@ impl BotSpec for DockerBotSpec { &self, player_id: u32, event_bus: Arc>, + match_logger: MatchLogger, ) -> Box { - let (handle, runner) = create_docker_bot(player_id, event_bus); let process = spawn_docker_process(self).await.unwrap(); - tokio::spawn(runner.run(process)); + let (handle, runner) = create_docker_bot(process, player_id, event_bus, match_logger); + tokio::spawn(runner.run()); return Box::new(handle); } } @@ -75,7 +77,8 @@ async fn spawn_docker_process( stderr: Some(true), stdin: Some(true), stream: Some(true), - logs: Some(true), + // setting this to true causes duplicate error output. Why? + logs: Some(false), ..Default::default() }), ) @@ -87,56 +90,24 @@ async fn spawn_docker_process( }) } -pub struct ContainerProcess { +struct ContainerProcess { stdin: Pin>, output: Pin> + Send>>, } -impl ContainerProcess { - pub async fn communicate(&mut self, input: &[u8]) -> io::Result { - self.write_line(input).await?; - self.read_line().await - } - - async fn write_line(&mut self, bytes: &[u8]) -> io::Result<()> { - self.stdin.write_all(bytes).await?; - self.stdin.write_u8(b'\n').await?; - self.stdin.flush().await?; - Ok(()) - } - - async fn read_line(&mut self) -> io::Result { - while let Some(item) = self.output.next().await { - let log_output = item.expect("failed to get log output"); - match log_output { - LogOutput::StdOut { message } => { - // TODO: this is not correct (buffering and such) - return Ok(message); - } - LogOutput::StdErr { message } => { - // TODO - println!("{}", String::from_utf8_lossy(&message)); - } - _ => (), - } - } - - Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - "no response received", - )) - } -} - fn create_docker_bot( + process: ContainerProcess, player_id: u32, event_bus: Arc>, + match_logger: MatchLogger, ) -> (DockerBotHandle, DockerBotRunner) { let (tx, rx) = mpsc::unbounded_channel(); let bot_handle = DockerBotHandle { tx }; let bot_runner = DockerBotRunner { + process, player_id, event_bus, + match_logger, rx, }; (bot_handle, bot_runner) @@ -155,21 +126,22 @@ impl PlayerHandle for DockerBotHandle { } pub struct DockerBotRunner { + process: ContainerProcess, event_bus: Arc>, rx: mpsc::UnboundedReceiver, + match_logger: MatchLogger, player_id: u32, } impl DockerBotRunner { - pub async fn run(mut self, mut process: ContainerProcess) { + pub async fn run(mut self) { while let Some(request) = self.rx.recv().await { - let resp_fut = process.communicate(&request.content); - let result = timeout(request.timeout, resp_fut) - .await - // TODO: how can this failure be handled cleanly? - .expect("process read failed"); - let result = match result { - Ok(line) => Ok(line.to_vec()), + let resp_fut = self.communicate(&request.content); + let result = timeout(request.timeout, resp_fut).await; + let request_response = match result { + Ok(Ok(response)) => Ok(response.to_vec()), + // this one happens when a bot output stream ends, map this to Timeout for now + Ok(Err(_read_error)) => Err(RequestError::Timeout), Err(_elapsed) => Err(RequestError::Timeout), }; let request_id = (self.player_id, request.request_id); @@ -177,7 +149,55 @@ impl DockerBotRunner { self.event_bus .lock() .unwrap() - .resolve_request(request_id, result); + .resolve_request(request_id, request_response); } } + + pub async fn communicate(&mut self, input: &[u8]) -> io::Result { + self.write_line(input).await?; + self.read_line().await + } + + async fn write_line(&mut self, bytes: &[u8]) -> io::Result<()> { + self.process.stdin.write_all(bytes).await?; + self.process.stdin.write_u8(b'\n').await?; + self.process.stdin.flush().await?; + Ok(()) + } + + async fn read_line(&mut self) -> io::Result { + while let Some(item) = self.process.output.next().await { + let log_output = item.expect("failed to get log output"); + match log_output { + LogOutput::StdOut { message } => { + // TODO: this is not correct (buffering and such) + return Ok(message); + } + LogOutput::StdErr { mut message } => { + // TODO + if message.ends_with(b"\n") { + message.truncate(message.len() - 1); + } + for line in message.split(|c| *c == b'\n') { + let message = StdErrMessage { + player_id: self.player_id, + message: String::from_utf8_lossy(line).to_string(), + }; + self.match_logger + .send(MatchLogMessage::StdErr(message)) + .unwrap(); + } + } + _ => (), + } + } + + // at this point the stream has ended + // does this mean the container has exited? + + Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "no response received", + )) + } } diff --git a/planetwars-matchrunner/src/lib.rs b/planetwars-matchrunner/src/lib.rs index 170ac1e..0be0b3d 100644 --- a/planetwars-matchrunner/src/lib.rs +++ b/planetwars-matchrunner/src/lib.rs @@ -1,17 +1,18 @@ pub mod bot_runner; pub mod docker_runner; pub mod match_context; +pub mod match_log; pub mod pw_match; use std::{ - io::Write, path::PathBuf, sync::{Arc, Mutex}, }; use async_trait::async_trait; -use futures::{stream::FuturesOrdered, FutureExt, StreamExt}; +use futures::{stream::FuturesOrdered, StreamExt}; use match_context::MatchCtx; +use match_log::{create_log_sink, MatchLogger}; use planetwars_rules::PwConfig; use serde::{Deserialize, Serialize}; @@ -47,6 +48,7 @@ pub trait BotSpec: Send + Sync { &self, player_id: u32, event_bus: Arc>, + match_logger: MatchLogger, ) -> Box; } @@ -57,6 +59,7 @@ pub async fn run_match(config: MatchConfig) { }; let event_bus = Arc::new(Mutex::new(EventBus::new())); + let match_logger = create_log_sink(&config.log_path).await; // start bots // TODO: what happens when a bot fails? @@ -66,34 +69,39 @@ pub async fn run_match(config: MatchConfig) { .enumerate() .map(|(player_id, player)| { let player_id = (player_id + 1) as u32; - start_bot(player_id, event_bus.clone(), &player.bot_spec) + start_bot( + player_id, + event_bus.clone(), + &player.bot_spec, + match_logger.clone(), + ) }) .collect::>() // await all results .collect() .await; - let mut log_file = std::fs::File::create(config.log_path).expect("could not create log file"); + let match_ctx = MatchCtx::new(event_bus, players, match_logger); + + // TODO: is this still needed? // assemble the math meta struct - let match_meta = MatchMeta { - map_name: config.map_name.clone(), - timestamp: chrono::Local::now(), - players: config - .players - .iter() - .map(|bot| PlayerInfo { - name: bot.name.clone(), - }) - .collect(), - }; - write!( - log_file, - "{}\n", - serde_json::to_string(&match_meta).unwrap() - ) - .unwrap(); - - let match_ctx = MatchCtx::new(event_bus, players, log_file); + // let match_meta = MatchMeta { + // map_name: config.map_name.clone(), + // timestamp: chrono::Local::now(), + // players: config + // .players + // .iter() + // .map(|bot| PlayerInfo { + // name: bot.name.clone(), + // }) + // .collect(), + // }; + // write!( + // log_file, + // "{}\n", + // serde_json::to_string(&match_meta).unwrap() + // ) + // .unwrap(); let match_state = pw_match::PwMatch::create(match_ctx, pw_config); match_state.run().await; @@ -104,7 +112,8 @@ async fn start_bot( player_id: u32, event_bus: Arc>, bot_spec: &Box, + match_logger: MatchLogger, ) -> (u32, Box) { - let player_handle = bot_spec.run_bot(player_id, event_bus).await; + let player_handle = bot_spec.run_bot(player_id, event_bus, match_logger).await; (player_id, player_handle) } diff --git a/planetwars-matchrunner/src/match_context.rs b/planetwars-matchrunner/src/match_context.rs index 8161ed9..6ea60c3 100644 --- a/planetwars-matchrunner/src/match_context.rs +++ b/planetwars-matchrunner/src/match_context.rs @@ -1,8 +1,6 @@ use futures::task::{Context, Poll}; use futures::{future::Future, task::AtomicWaker}; use serde::{Deserialize, Serialize}; -use std::fs::File; -use std::io::Write; use std::pin::Pin; use std::time::Duration; use std::{ @@ -10,6 +8,8 @@ use std::{ sync::{Arc, Mutex}, }; +use crate::match_log::{MatchLogMessage, MatchLogger}; + #[derive(Serialize, Deserialize, Debug)] pub struct RequestMessage { pub request_id: u32, @@ -20,16 +20,14 @@ pub struct RequestMessage { pub struct MatchCtx { event_bus: Arc>, players: HashMap, - // output: MsgStreamHandle, - log_sink: File, + match_logger: MatchLogger, } impl MatchCtx { pub fn new( event_bus: Arc>, players: HashMap>, - log_file: File, - // log: MsgStreamHandle, + match_logger: MatchLogger, ) -> Self { MatchCtx { event_bus, @@ -43,7 +41,7 @@ impl MatchCtx { (id, player_handle) }) .collect(), - log_sink: log_file, + match_logger, } } @@ -70,9 +68,8 @@ impl MatchCtx { self.players.keys().cloned().collect() } - // this method should be used to emit log states etc. - pub fn log_string(&mut self, message: String) { - write!(self.log_sink, "{}\n", message).expect("failed to write to log file"); + pub fn log(&mut self, message: MatchLogMessage) { + self.match_logger.send(message).expect("write failed"); } } diff --git a/planetwars-matchrunner/src/match_log.rs b/planetwars-matchrunner/src/match_log.rs new file mode 100644 index 0000000..9991f99 --- /dev/null +++ b/planetwars-matchrunner/src/match_log.rs @@ -0,0 +1,45 @@ +use std::path::Path; + +use serde::{Deserialize, Serialize}; +use tokio::{fs::File, io::AsyncWriteExt}; + +use planetwars_rules::protocol::State; +use tokio::sync::mpsc; + +#[derive(Serialize, Deserialize, Debug)] +#[serde(tag = "type")] +pub enum MatchLogMessage { + #[serde(rename = "gamestate")] + GameState(State), + #[serde(rename = "stderr")] + StdErr(StdErrMessage), +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct StdErrMessage { + pub player_id: u32, + pub message: String, +} + +pub type MatchLogger = mpsc::UnboundedSender; + +pub async fn create_log_sink(log_file_path: &Path) -> MatchLogger { + let (tx, rx) = mpsc::unbounded_channel(); + let log_file = File::create(log_file_path) + .await + .expect("Could not create log file"); + tokio::spawn(run_log_sink(rx, log_file)); + return tx; +} + +async fn run_log_sink(mut rx: mpsc::UnboundedReceiver, mut file: File) { + while let Some(message) = rx.recv().await { + let json = serde_json::to_string(&message).expect("failed to serialize message"); + file.write_all(json.as_bytes()) + .await + .expect("failed to write log message to file"); + file.write_all(b"\n") + .await + .expect("failed to write newline log message to file"); + } +} diff --git a/planetwars-matchrunner/src/pw_match.rs b/planetwars-matchrunner/src/pw_match.rs index 25f849e..c114d78 100644 --- a/planetwars-matchrunner/src/pw_match.rs +++ b/planetwars-matchrunner/src/pw_match.rs @@ -1,3 +1,5 @@ +use crate::match_log::MatchLogMessage; + use super::match_context::{MatchCtx, RequestResult}; use futures::stream::futures_unordered::FuturesUnordered; use futures::{FutureExt, StreamExt}; @@ -44,16 +46,16 @@ impl PwMatch { for (player_id, turn) in player_messages { let res = self.execute_action(player_id, turn); if let Some(err) = action_errors(res) { - let info_str = serde_json::to_string(&err).unwrap(); - println!("player {}: {}", player_id, info_str); + let _info_str = serde_json::to_string(&err).unwrap(); + // TODO + // println!("player {}: {}", player_id, info_str); } } self.match_state.step(); // Log state let state = self.match_state.serialize_state(); - self.match_ctx - .log_string(serde_json::to_string(&state).unwrap()); + self.match_ctx.log(MatchLogMessage::GameState(state)); } }