From f62196d983c04a94b892086a4ea6926bd7b6e4fb Mon Sep 17 00:00:00 2001 From: Ilion Beyst Date: Sat, 22 Jan 2022 14:32:43 +0100 Subject: [PATCH] implement docker runner --- planetwars-matchrunner/Cargo.toml | 3 +- planetwars-matchrunner/src/bin/testmatch.rs | 168 +++-------------- planetwars-matchrunner/src/docker_runner.rs | 188 ++++++++++++++++++++ planetwars-matchrunner/src/lib.rs | 31 +++- 4 files changed, 233 insertions(+), 157 deletions(-) create mode 100644 planetwars-matchrunner/src/docker_runner.rs diff --git a/planetwars-matchrunner/Cargo.toml b/planetwars-matchrunner/Cargo.toml index c4cb00d..b041d61 100644 --- a/planetwars-matchrunner/Cargo.toml +++ b/planetwars-matchrunner/Cargo.toml @@ -19,4 +19,5 @@ serde_json = "1.0" planetwars-rules = { path = "../planetwars-rules" } chrono = { version = "0.4", features = ["serde"] } bollard = { git = "https://github.com/antoinert/bollard" } -bytes = "1.1" \ No newline at end of file +bytes = "1.1" +async-trait = "0.1" \ No newline at end of file diff --git a/planetwars-matchrunner/src/bin/testmatch.rs b/planetwars-matchrunner/src/bin/testmatch.rs index a072b9f..db160cf 100644 --- a/planetwars-matchrunner/src/bin/testmatch.rs +++ b/planetwars-matchrunner/src/bin/testmatch.rs @@ -1,28 +1,6 @@ -extern crate planetwars_matchrunner; -extern crate tokio; +use std::{env, path::PathBuf}; -use std::collections::HashMap; -use std::io::{self, Write}; -use std::path::{Path, PathBuf}; -use std::pin::Pin; -use std::sync::{Arc, Mutex}; - -use bollard::container::{self, AttachContainerOptions, AttachContainerResults, LogOutput}; -use bollard::exec::StartExecResults; -use bollard::Docker; -use bytes::Bytes; -use futures::{Stream, StreamExt}; -use planetwars_matchrunner::{ - match_context::{EventBus, MatchCtx, PlayerHandle, RequestMessage}, - pw_match, MatchConfig, MatchMeta, PlayerInfo, -}; -use planetwars_rules::protocol as proto; -use std::env; -use tokio::io::{AsyncWrite, AsyncWriteExt}; -use tokio::sync::mpsc; - -const IMAGE: &'static str = "python:3.10.1-slim-buster"; -// const IMAGE: &'static str = "simplebot:latest"; +use planetwars_matchrunner::{docker_runner::DockerBotSpec, run_match, MatchConfig, MatchPlayer}; #[tokio::main] async fn main() { @@ -32,134 +10,30 @@ async fn main() { _run_match(map_path).await; } -async fn _run_match(map_path: String) { - let docker = Docker::connect_with_socket_defaults().unwrap(); - let code_dir_path = PathBuf::from("../simplebot"); - let params = BotParams { - image: IMAGE, - code_path: &code_dir_path, - argv: vec!["python", "simplebot.py"], - }; - let mut process = spawn_docker_process(&docker, params).await.unwrap(); +const IMAGE: &'static str = "python:3.10-slim-buster"; - let state = proto::State { - planets: vec![ - proto::Planet { +async fn _run_match(map_path: String) { + let code_dir_path = PathBuf::from("../simplebot"); + let bot_spec = DockerBotSpec { + image: IMAGE.to_string(), + code_path: code_dir_path, + argv: vec!["python".to_string(), "simplebot.py".to_string()], + }; + + run_match(MatchConfig { + map_path: PathBuf::from(map_path), + map_name: "hex".to_string(), + log_path: PathBuf::from("match.log"), + players: vec![ + MatchPlayer { name: "a".to_string(), - owner: Some(1), - ship_count: 100, - x: -1.0, - y: 0.0, + bot_spec: Box::new(bot_spec.clone()), }, - proto::Planet { + MatchPlayer { name: "b".to_string(), - owner: Some(2), - ship_count: 100, - x: 1.0, - y: 0.0, + bot_spec: Box::new(bot_spec.clone()), }, ], - expeditions: vec![], - }; - - let serialized = serde_json::to_vec(&state).unwrap(); - let out = process.communicate(&serialized).await.unwrap(); - - print!("got output: {}", String::from_utf8(out.to_vec()).unwrap()); -} - -pub struct BotParams<'a> { - pub image: &'a str, - pub code_path: &'a Path, - pub argv: Vec<&'a str>, -} - -async fn spawn_docker_process( - docker: &Docker, - params: BotParams<'_>, -) -> Result { - let bot_code_dir = std::fs::canonicalize(params.code_path).unwrap(); - let code_dir_str = bot_code_dir.as_os_str().to_str().unwrap(); - - let config = container::Config { - image: Some(params.image), - host_config: Some(bollard::models::HostConfig { - binds: Some(vec![format!("{}:{}", code_dir_str, "/workdir")]), - ..Default::default() - }), - working_dir: Some("/workdir"), - cmd: Some(params.argv), - attach_stdin: Some(true), - attach_stdout: Some(true), - attach_stderr: Some(true), - open_stdin: Some(true), - ..Default::default() - }; - - let response = docker.create_container::<&str, &str>(None, config).await?; - let container_id = response.id; - - docker - .start_container::(&container_id, None) - .await?; - - let AttachContainerResults { output, input } = docker - .attach_container( - &container_id, - Some(AttachContainerOptions:: { - stdout: Some(true), - stderr: Some(true), - stdin: Some(true), - stream: Some(true), - logs: Some(true), - ..Default::default() - }), - ) - .await?; - - Ok(ContainerProcess { - stdin: input, - output, }) -} - -pub struct ContainerProcess { - stdin: Pin>, - output: Pin>>>, -} - -impl ContainerProcess { - pub async fn communicate(&mut self, input: &[u8]) -> io::Result { - self.write_line(input).await?; - self.read_line().await - } - - async fn write_line(&mut self, bytes: &[u8]) -> io::Result<()> { - self.stdin.write_all(bytes).await?; - self.stdin.write_u8(b'\n').await?; - self.stdin.flush().await?; - Ok(()) - } - - async fn read_line(&mut self) -> io::Result { - while let Some(item) = self.output.next().await { - let log_output = item.expect("failed to get log output"); - match log_output { - LogOutput::StdOut { message } => { - // TODO: this is not correct (buffering and such) - return Ok(message); - } - LogOutput::StdErr { message } => { - // TODO - println!("{}", String::from_utf8_lossy(&message)); - } - _ => (), - } - } - - Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - "no response received", - )) - } + .await; } diff --git a/planetwars-matchrunner/src/docker_runner.rs b/planetwars-matchrunner/src/docker_runner.rs new file mode 100644 index 0000000..84101ef --- /dev/null +++ b/planetwars-matchrunner/src/docker_runner.rs @@ -0,0 +1,188 @@ +use std::io; +use std::path::PathBuf; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; +use bollard::container::{self, AttachContainerOptions, AttachContainerResults, LogOutput}; +use bollard::Docker; +use bytes::Bytes; +use futures::{Stream, StreamExt}; +use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio::sync::mpsc; +use tokio::time::timeout; + +use crate::match_context::{EventBus, PlayerHandle, RequestError, RequestMessage}; +use crate::BotSpec; + +#[derive(Clone, Debug)] +pub struct DockerBotSpec { + pub image: String, + pub code_path: PathBuf, + pub argv: Vec, +} + +#[async_trait] +impl BotSpec for DockerBotSpec { + async fn run_bot( + &self, + player_id: u32, + event_bus: Arc>, + ) -> Box { + let (handle, runner) = create_docker_bot(player_id, event_bus); + let process = spawn_docker_process(self).await.unwrap(); + tokio::spawn(runner.run(process)); + return Box::new(handle); + } +} + +async fn spawn_docker_process( + params: &DockerBotSpec, +) -> Result { + let docker = Docker::connect_with_socket_defaults()?; + let bot_code_dir = std::fs::canonicalize(¶ms.code_path).unwrap(); + let code_dir_str = bot_code_dir.as_os_str().to_str().unwrap(); + + let config = container::Config { + image: Some(params.image.clone()), + host_config: Some(bollard::models::HostConfig { + binds: Some(vec![format!("{}:{}", code_dir_str, "/workdir")]), + ..Default::default() + }), + working_dir: Some("/workdir".to_string()), + cmd: Some(params.argv.clone()), + attach_stdin: Some(true), + attach_stdout: Some(true), + attach_stderr: Some(true), + open_stdin: Some(true), + ..Default::default() + }; + + let response = docker + .create_container::<&str, String>(None, config) + .await?; + let container_id = response.id; + + docker + .start_container::(&container_id, None) + .await?; + + let AttachContainerResults { output, input } = docker + .attach_container( + &container_id, + Some(AttachContainerOptions:: { + stdout: Some(true), + stderr: Some(true), + stdin: Some(true), + stream: Some(true), + logs: Some(true), + ..Default::default() + }), + ) + .await?; + + Ok(ContainerProcess { + stdin: input, + output, + }) +} + +pub struct ContainerProcess { + stdin: Pin>, + output: Pin> + Send>>, +} + +impl ContainerProcess { + pub async fn communicate(&mut self, input: &[u8]) -> io::Result { + self.write_line(input).await?; + self.read_line().await + } + + async fn write_line(&mut self, bytes: &[u8]) -> io::Result<()> { + self.stdin.write_all(bytes).await?; + self.stdin.write_u8(b'\n').await?; + self.stdin.flush().await?; + Ok(()) + } + + async fn read_line(&mut self) -> io::Result { + while let Some(item) = self.output.next().await { + let log_output = item.expect("failed to get log output"); + match log_output { + LogOutput::StdOut { message } => { + // TODO: this is not correct (buffering and such) + return Ok(message); + } + LogOutput::StdErr { message } => { + // TODO + println!("{}", String::from_utf8_lossy(&message)); + } + _ => (), + } + } + + Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "no response received", + )) + } +} + +fn create_docker_bot( + player_id: u32, + event_bus: Arc>, +) -> (DockerBotHandle, DockerBotRunner) { + let (tx, rx) = mpsc::unbounded_channel(); + let bot_handle = DockerBotHandle { tx }; + let bot_runner = DockerBotRunner { + player_id, + event_bus, + rx, + }; + (bot_handle, bot_runner) +} + +pub struct DockerBotHandle { + tx: mpsc::UnboundedSender, +} + +impl PlayerHandle for DockerBotHandle { + fn send_request(&mut self, r: RequestMessage) { + self.tx + .send(r) + .expect("failed to send message to local bot"); + } + + fn send_info(&mut self, _msg: String) { + // TODO: log this somewhere + // drop info message + } +} + +pub struct DockerBotRunner { + event_bus: Arc>, + rx: mpsc::UnboundedReceiver, + player_id: u32, +} + +impl DockerBotRunner { + pub async fn run(mut self, mut process: ContainerProcess) { + while let Some(request) = self.rx.recv().await { + let resp_fut = process.communicate(&request.content); + let result = timeout(request.timeout, resp_fut) + .await + // TODO: how can this failure be handled cleanly? + .expect("process read failed"); + let result = match result { + Ok(line) => Ok(line.to_vec()), + Err(_elapsed) => Err(RequestError::Timeout), + }; + let request_id = (self.player_id, request.request_id); + + self.event_bus + .lock() + .unwrap() + .resolve_request(request_id, result); + } + } +} diff --git a/planetwars-matchrunner/src/lib.rs b/planetwars-matchrunner/src/lib.rs index 1f23551..2e4200c 100644 --- a/planetwars-matchrunner/src/lib.rs +++ b/planetwars-matchrunner/src/lib.rs @@ -1,4 +1,5 @@ pub mod bot_runner; +pub mod docker_runner; pub mod match_context; pub mod pw_match; @@ -8,6 +9,8 @@ use std::{ sync::{Arc, Mutex}, }; +use async_trait::async_trait; +use futures::{stream::FuturesOrdered, FutureExt, StreamExt}; use match_context::MatchCtx; use planetwars_rules::PwConfig; use serde::{Deserialize, Serialize}; @@ -35,8 +38,16 @@ pub struct PlayerInfo { pub struct MatchPlayer { pub name: String, - pub path: PathBuf, - pub argv: Vec, + pub bot_spec: Box, +} + +#[async_trait] +pub trait BotSpec { + async fn run_bot( + &self, + player_id: u32, + event_bus: Arc>, + ) -> Box; } pub async fn run_match(config: MatchConfig) { @@ -48,20 +59,22 @@ pub async fn run_match(config: MatchConfig) { let event_bus = Arc::new(Mutex::new(EventBus::new())); // start bots + // TODO: what happens when a bot fails? let players = config .players .iter() .enumerate() .map(|(player_id, player)| { let player_id = (player_id + 1) as u32; - let bot = bot_runner::Bot { - working_dir: player.path.clone(), - argv: player.argv.clone(), - }; - let handle = bot_runner::run_local_bot(player_id, event_bus.clone(), bot); - (player_id, Box::new(handle) as Box) + player + .bot_spec + .run_bot(player_id, event_bus.clone()) + .map(move |handle| (player_id, handle)) }) - .collect(); + .collect::>() + // await all results + .collect() + .await; let mut log_file = std::fs::File::create(config.log_path).expect("could not create log file"); // assemble the math meta struct