implement matchlogger

This commit is contained in:
Ilion Beyst 2022-02-23 21:08:56 +01:00
parent e15944622d
commit 54b9694f0d
6 changed files with 162 additions and 89 deletions

View file

@ -18,6 +18,6 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
planetwars-rules = { path = "../planetwars-rules" } planetwars-rules = { path = "../planetwars-rules" }
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
bollard = { git = "https://github.com/antoinert/bollard" } bollard = { git = "https://github.com/fussybeaver/bollard", rev = "c5d87a4934c70a04f9c649fedb241dbd4943c927" }
bytes = "1.1" bytes = "1.1"
async-trait = "0.1" async-trait = "0.1"

View file

@ -13,6 +13,7 @@ use tokio::sync::mpsc;
use tokio::time::timeout; use tokio::time::timeout;
use crate::match_context::{EventBus, PlayerHandle, RequestError, RequestMessage}; use crate::match_context::{EventBus, PlayerHandle, RequestError, RequestMessage};
use crate::match_log::{MatchLogMessage, MatchLogger, StdErrMessage};
use crate::BotSpec; use crate::BotSpec;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -28,10 +29,11 @@ impl BotSpec for DockerBotSpec {
&self, &self,
player_id: u32, player_id: u32,
event_bus: Arc<Mutex<EventBus>>, event_bus: Arc<Mutex<EventBus>>,
match_logger: MatchLogger,
) -> Box<dyn PlayerHandle> { ) -> Box<dyn PlayerHandle> {
let (handle, runner) = create_docker_bot(player_id, event_bus);
let process = spawn_docker_process(self).await.unwrap(); let process = spawn_docker_process(self).await.unwrap();
tokio::spawn(runner.run(process)); let (handle, runner) = create_docker_bot(process, player_id, event_bus, match_logger);
tokio::spawn(runner.run());
return Box::new(handle); return Box::new(handle);
} }
} }
@ -75,7 +77,8 @@ async fn spawn_docker_process(
stderr: Some(true), stderr: Some(true),
stdin: Some(true), stdin: Some(true),
stream: Some(true), stream: Some(true),
logs: Some(true), // setting this to true causes duplicate error output. Why?
logs: Some(false),
..Default::default() ..Default::default()
}), }),
) )
@ -87,56 +90,24 @@ async fn spawn_docker_process(
}) })
} }
pub struct ContainerProcess { struct ContainerProcess {
stdin: Pin<Box<dyn AsyncWrite + Send>>, stdin: Pin<Box<dyn AsyncWrite + Send>>,
output: Pin<Box<dyn Stream<Item = Result<LogOutput, bollard::errors::Error>> + Send>>, output: Pin<Box<dyn Stream<Item = Result<LogOutput, bollard::errors::Error>> + Send>>,
} }
impl ContainerProcess {
pub async fn communicate(&mut self, input: &[u8]) -> io::Result<Bytes> {
self.write_line(input).await?;
self.read_line().await
}
async fn write_line(&mut self, bytes: &[u8]) -> io::Result<()> {
self.stdin.write_all(bytes).await?;
self.stdin.write_u8(b'\n').await?;
self.stdin.flush().await?;
Ok(())
}
async fn read_line(&mut self) -> io::Result<Bytes> {
while let Some(item) = self.output.next().await {
let log_output = item.expect("failed to get log output");
match log_output {
LogOutput::StdOut { message } => {
// TODO: this is not correct (buffering and such)
return Ok(message);
}
LogOutput::StdErr { message } => {
// TODO
println!("{}", String::from_utf8_lossy(&message));
}
_ => (),
}
}
Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"no response received",
))
}
}
fn create_docker_bot( fn create_docker_bot(
process: ContainerProcess,
player_id: u32, player_id: u32,
event_bus: Arc<Mutex<EventBus>>, event_bus: Arc<Mutex<EventBus>>,
match_logger: MatchLogger,
) -> (DockerBotHandle, DockerBotRunner) { ) -> (DockerBotHandle, DockerBotRunner) {
let (tx, rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel();
let bot_handle = DockerBotHandle { tx }; let bot_handle = DockerBotHandle { tx };
let bot_runner = DockerBotRunner { let bot_runner = DockerBotRunner {
process,
player_id, player_id,
event_bus, event_bus,
match_logger,
rx, rx,
}; };
(bot_handle, bot_runner) (bot_handle, bot_runner)
@ -155,21 +126,22 @@ impl PlayerHandle for DockerBotHandle {
} }
pub struct DockerBotRunner { pub struct DockerBotRunner {
process: ContainerProcess,
event_bus: Arc<Mutex<EventBus>>, event_bus: Arc<Mutex<EventBus>>,
rx: mpsc::UnboundedReceiver<RequestMessage>, rx: mpsc::UnboundedReceiver<RequestMessage>,
match_logger: MatchLogger,
player_id: u32, player_id: u32,
} }
impl DockerBotRunner { impl DockerBotRunner {
pub async fn run(mut self, mut process: ContainerProcess) { pub async fn run(mut self) {
while let Some(request) = self.rx.recv().await { while let Some(request) = self.rx.recv().await {
let resp_fut = process.communicate(&request.content); let resp_fut = self.communicate(&request.content);
let result = timeout(request.timeout, resp_fut) let result = timeout(request.timeout, resp_fut).await;
.await let request_response = match result {
// TODO: how can this failure be handled cleanly? Ok(Ok(response)) => Ok(response.to_vec()),
.expect("process read failed"); // this one happens when a bot output stream ends, map this to Timeout for now
let result = match result { Ok(Err(_read_error)) => Err(RequestError::Timeout),
Ok(line) => Ok(line.to_vec()),
Err(_elapsed) => Err(RequestError::Timeout), Err(_elapsed) => Err(RequestError::Timeout),
}; };
let request_id = (self.player_id, request.request_id); let request_id = (self.player_id, request.request_id);
@ -177,7 +149,55 @@ impl DockerBotRunner {
self.event_bus self.event_bus
.lock() .lock()
.unwrap() .unwrap()
.resolve_request(request_id, result); .resolve_request(request_id, request_response);
} }
} }
pub async fn communicate(&mut self, input: &[u8]) -> io::Result<Bytes> {
self.write_line(input).await?;
self.read_line().await
}
async fn write_line(&mut self, bytes: &[u8]) -> io::Result<()> {
self.process.stdin.write_all(bytes).await?;
self.process.stdin.write_u8(b'\n').await?;
self.process.stdin.flush().await?;
Ok(())
}
async fn read_line(&mut self) -> io::Result<Bytes> {
while let Some(item) = self.process.output.next().await {
let log_output = item.expect("failed to get log output");
match log_output {
LogOutput::StdOut { message } => {
// TODO: this is not correct (buffering and such)
return Ok(message);
}
LogOutput::StdErr { mut message } => {
// TODO
if message.ends_with(b"\n") {
message.truncate(message.len() - 1);
}
for line in message.split(|c| *c == b'\n') {
let message = StdErrMessage {
player_id: self.player_id,
message: String::from_utf8_lossy(line).to_string(),
};
self.match_logger
.send(MatchLogMessage::StdErr(message))
.unwrap();
}
}
_ => (),
}
}
// at this point the stream has ended
// does this mean the container has exited?
Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"no response received",
))
}
} }

View file

@ -1,17 +1,18 @@
pub mod bot_runner; pub mod bot_runner;
pub mod docker_runner; pub mod docker_runner;
pub mod match_context; pub mod match_context;
pub mod match_log;
pub mod pw_match; pub mod pw_match;
use std::{ use std::{
io::Write,
path::PathBuf, path::PathBuf,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
}; };
use async_trait::async_trait; use async_trait::async_trait;
use futures::{stream::FuturesOrdered, FutureExt, StreamExt}; use futures::{stream::FuturesOrdered, StreamExt};
use match_context::MatchCtx; use match_context::MatchCtx;
use match_log::{create_log_sink, MatchLogger};
use planetwars_rules::PwConfig; use planetwars_rules::PwConfig;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -47,6 +48,7 @@ pub trait BotSpec: Send + Sync {
&self, &self,
player_id: u32, player_id: u32,
event_bus: Arc<Mutex<EventBus>>, event_bus: Arc<Mutex<EventBus>>,
match_logger: MatchLogger,
) -> Box<dyn PlayerHandle>; ) -> Box<dyn PlayerHandle>;
} }
@ -57,6 +59,7 @@ pub async fn run_match(config: MatchConfig) {
}; };
let event_bus = Arc::new(Mutex::new(EventBus::new())); let event_bus = Arc::new(Mutex::new(EventBus::new()));
let match_logger = create_log_sink(&config.log_path).await;
// start bots // start bots
// TODO: what happens when a bot fails? // TODO: what happens when a bot fails?
@ -66,34 +69,39 @@ pub async fn run_match(config: MatchConfig) {
.enumerate() .enumerate()
.map(|(player_id, player)| { .map(|(player_id, player)| {
let player_id = (player_id + 1) as u32; let player_id = (player_id + 1) as u32;
start_bot(player_id, event_bus.clone(), &player.bot_spec) start_bot(
player_id,
event_bus.clone(),
&player.bot_spec,
match_logger.clone(),
)
}) })
.collect::<FuturesOrdered<_>>() .collect::<FuturesOrdered<_>>()
// await all results // await all results
.collect() .collect()
.await; .await;
let mut log_file = std::fs::File::create(config.log_path).expect("could not create log file");
let match_ctx = MatchCtx::new(event_bus, players, match_logger);
// TODO: is this still needed?
// assemble the math meta struct // assemble the math meta struct
let match_meta = MatchMeta { // let match_meta = MatchMeta {
map_name: config.map_name.clone(), // map_name: config.map_name.clone(),
timestamp: chrono::Local::now(), // timestamp: chrono::Local::now(),
players: config // players: config
.players // .players
.iter() // .iter()
.map(|bot| PlayerInfo { // .map(|bot| PlayerInfo {
name: bot.name.clone(), // name: bot.name.clone(),
}) // })
.collect(), // .collect(),
}; // };
write!( // write!(
log_file, // log_file,
"{}\n", // "{}\n",
serde_json::to_string(&match_meta).unwrap() // serde_json::to_string(&match_meta).unwrap()
) // )
.unwrap(); // .unwrap();
let match_ctx = MatchCtx::new(event_bus, players, log_file);
let match_state = pw_match::PwMatch::create(match_ctx, pw_config); let match_state = pw_match::PwMatch::create(match_ctx, pw_config);
match_state.run().await; match_state.run().await;
@ -104,7 +112,8 @@ async fn start_bot(
player_id: u32, player_id: u32,
event_bus: Arc<Mutex<EventBus>>, event_bus: Arc<Mutex<EventBus>>,
bot_spec: &Box<dyn BotSpec>, bot_spec: &Box<dyn BotSpec>,
match_logger: MatchLogger,
) -> (u32, Box<dyn PlayerHandle>) { ) -> (u32, Box<dyn PlayerHandle>) {
let player_handle = bot_spec.run_bot(player_id, event_bus).await; let player_handle = bot_spec.run_bot(player_id, event_bus, match_logger).await;
(player_id, player_handle) (player_id, player_handle)
} }

View file

@ -1,8 +1,6 @@
use futures::task::{Context, Poll}; use futures::task::{Context, Poll};
use futures::{future::Future, task::AtomicWaker}; use futures::{future::Future, task::AtomicWaker};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::Write;
use std::pin::Pin; use std::pin::Pin;
use std::time::Duration; use std::time::Duration;
use std::{ use std::{
@ -10,6 +8,8 @@ use std::{
sync::{Arc, Mutex}, sync::{Arc, Mutex},
}; };
use crate::match_log::{MatchLogMessage, MatchLogger};
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct RequestMessage { pub struct RequestMessage {
pub request_id: u32, pub request_id: u32,
@ -20,16 +20,14 @@ pub struct RequestMessage {
pub struct MatchCtx { pub struct MatchCtx {
event_bus: Arc<Mutex<EventBus>>, event_bus: Arc<Mutex<EventBus>>,
players: HashMap<u32, PlayerData>, players: HashMap<u32, PlayerData>,
// output: MsgStreamHandle<String>, match_logger: MatchLogger,
log_sink: File,
} }
impl MatchCtx { impl MatchCtx {
pub fn new( pub fn new(
event_bus: Arc<Mutex<EventBus>>, event_bus: Arc<Mutex<EventBus>>,
players: HashMap<u32, Box<dyn PlayerHandle>>, players: HashMap<u32, Box<dyn PlayerHandle>>,
log_file: File, match_logger: MatchLogger,
// log: MsgStreamHandle<String>,
) -> Self { ) -> Self {
MatchCtx { MatchCtx {
event_bus, event_bus,
@ -43,7 +41,7 @@ impl MatchCtx {
(id, player_handle) (id, player_handle)
}) })
.collect(), .collect(),
log_sink: log_file, match_logger,
} }
} }
@ -70,9 +68,8 @@ impl MatchCtx {
self.players.keys().cloned().collect() self.players.keys().cloned().collect()
} }
// this method should be used to emit log states etc. pub fn log(&mut self, message: MatchLogMessage) {
pub fn log_string(&mut self, message: String) { self.match_logger.send(message).expect("write failed");
write!(self.log_sink, "{}\n", message).expect("failed to write to log file");
} }
} }

View file

@ -0,0 +1,45 @@
use std::path::Path;
use serde::{Deserialize, Serialize};
use tokio::{fs::File, io::AsyncWriteExt};
use planetwars_rules::protocol::State;
use tokio::sync::mpsc;
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "type")]
pub enum MatchLogMessage {
#[serde(rename = "gamestate")]
GameState(State),
#[serde(rename = "stderr")]
StdErr(StdErrMessage),
}
#[derive(Serialize, Deserialize, Debug)]
pub struct StdErrMessage {
pub player_id: u32,
pub message: String,
}
pub type MatchLogger = mpsc::UnboundedSender<MatchLogMessage>;
pub async fn create_log_sink(log_file_path: &Path) -> MatchLogger {
let (tx, rx) = mpsc::unbounded_channel();
let log_file = File::create(log_file_path)
.await
.expect("Could not create log file");
tokio::spawn(run_log_sink(rx, log_file));
return tx;
}
async fn run_log_sink(mut rx: mpsc::UnboundedReceiver<MatchLogMessage>, mut file: File) {
while let Some(message) = rx.recv().await {
let json = serde_json::to_string(&message).expect("failed to serialize message");
file.write_all(json.as_bytes())
.await
.expect("failed to write log message to file");
file.write_all(b"\n")
.await
.expect("failed to write newline log message to file");
}
}

View file

@ -1,3 +1,5 @@
use crate::match_log::MatchLogMessage;
use super::match_context::{MatchCtx, RequestResult}; use super::match_context::{MatchCtx, RequestResult};
use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::futures_unordered::FuturesUnordered;
use futures::{FutureExt, StreamExt}; use futures::{FutureExt, StreamExt};
@ -44,16 +46,16 @@ impl PwMatch {
for (player_id, turn) in player_messages { for (player_id, turn) in player_messages {
let res = self.execute_action(player_id, turn); let res = self.execute_action(player_id, turn);
if let Some(err) = action_errors(res) { if let Some(err) = action_errors(res) {
let info_str = serde_json::to_string(&err).unwrap(); let _info_str = serde_json::to_string(&err).unwrap();
println!("player {}: {}", player_id, info_str); // TODO
// println!("player {}: {}", player_id, info_str);
} }
} }
self.match_state.step(); self.match_state.step();
// Log state // Log state
let state = self.match_state.serialize_state(); let state = self.match_state.serialize_state();
self.match_ctx self.match_ctx.log(MatchLogMessage::GameState(state));
.log_string(serde_json::to_string(&state).unwrap());
} }
} }