diff --git a/planetwars-matchrunner/Cargo.toml b/planetwars-matchrunner/Cargo.toml index 488eb45..16c1249 100644 --- a/planetwars-matchrunner/Cargo.toml +++ b/planetwars-matchrunner/Cargo.toml @@ -19,3 +19,4 @@ serde_json = "1.0" planetwars-rules = { path = "../planetwars-rules" } chrono = { version = "0.4", features = ["serde"] } bollard = "0.11" +bytes = "1.1" \ No newline at end of file diff --git a/planetwars-matchrunner/src/bin/testmatch.rs b/planetwars-matchrunner/src/bin/testmatch.rs index 97c00ed..ebd0199 100644 --- a/planetwars-matchrunner/src/bin/testmatch.rs +++ b/planetwars-matchrunner/src/bin/testmatch.rs @@ -10,15 +10,17 @@ use std::sync::{Arc, Mutex}; use bollard::container::{self, LogOutput}; use bollard::exec::StartExecResults; use bollard::Docker; +use bytes::Bytes; use futures::{Stream, StreamExt}; use planetwars_matchrunner::{ - match_context::{EventBus, MatchCtx, PlayerHandle}, + match_context::{EventBus, MatchCtx, PlayerHandle, RequestMessage}, pw_match, MatchConfig, MatchMeta, PlayerInfo, }; use planetwars_rules::protocol as proto; use planetwars_rules::PwConfig; use std::env; use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio::sync::mpsc; const IMAGE: &'static str = "simplebot:latest"; @@ -64,9 +66,12 @@ async fn create_player_process(docker: &Docker) -> Result<(), bollard::errors::E .id; let start_exec_results = docker.start_exec(&exec_id, None).await?; - let (mut input, mut output) = match start_exec_results { + let mut process = match start_exec_results { StartExecResults::Detached => panic!("failed to get io channels"), - StartExecResults::Attached { input, output } => (input, output), + StartExecResults::Attached { input, output } => ContainerProcess { + stdin: input, + output, + }, }; let state = proto::State { @@ -90,22 +95,50 @@ async fn create_player_process(docker: &Docker) -> Result<(), bollard::errors::E }; let serialized = serde_json::to_vec(&state).unwrap(); - input.write_all(&serialized).await?; - input.write(b"\n").await?; - input.flush().await?; + let out = process.communicate(&serialized).await?; - while let Some(item) = output.next().await { - let log_output = item.expect("failed to get log output"); - match log_output { - LogOutput::StdOut { message } => { - println!("stdout: {}", String::from_utf8_lossy(&message)); - } - LogOutput::StdErr { message } => { - println!("stderr: {}", String::from_utf8_lossy(&message)); - } - _ => (), - } - } + print!("{}", String::from_utf8(out.to_vec()).unwrap()); Ok(()) } + +pub struct ContainerProcess { + stdin: Pin>, + output: Pin>>>, +} + +impl ContainerProcess { + pub async fn communicate(&mut self, input: &[u8]) -> io::Result { + self.write_line(input).await?; + self.read_line().await + } + + async fn write_line(&mut self, bytes: &[u8]) -> io::Result<()> { + self.stdin.write_all(bytes).await?; + self.stdin.write_u8(b'\n').await?; + self.stdin.flush().await?; + Ok(()) + } + + async fn read_line(&mut self) -> io::Result { + while let Some(item) = self.output.next().await { + let log_output = item.expect("failed to get log output"); + match log_output { + LogOutput::StdOut { message } => { + // TODO: this is not correct (buffering and such) + return Ok(message); + } + LogOutput::StdErr { message } => { + // TODO + println!("stderr: {}", String::from_utf8_lossy(&message)); + } + _ => (), + } + } + + Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "no response received", + )) + } +}