implement docker runner

This commit is contained in:
Ilion Beyst 2022-01-22 14:32:43 +01:00
parent 3dd940321c
commit f62196d983
4 changed files with 233 additions and 157 deletions

View file

@ -20,3 +20,4 @@ planetwars-rules = { path = "../planetwars-rules" }
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
bollard = { git = "https://github.com/antoinert/bollard" } bollard = { git = "https://github.com/antoinert/bollard" }
bytes = "1.1" bytes = "1.1"
async-trait = "0.1"

View file

@ -1,28 +1,6 @@
extern crate planetwars_matchrunner; use std::{env, path::PathBuf};
extern crate tokio;
use std::collections::HashMap; use planetwars_matchrunner::{docker_runner::DockerBotSpec, run_match, MatchConfig, MatchPlayer};
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use bollard::container::{self, AttachContainerOptions, AttachContainerResults, LogOutput};
use bollard::exec::StartExecResults;
use bollard::Docker;
use bytes::Bytes;
use futures::{Stream, StreamExt};
use planetwars_matchrunner::{
match_context::{EventBus, MatchCtx, PlayerHandle, RequestMessage},
pw_match, MatchConfig, MatchMeta, PlayerInfo,
};
use planetwars_rules::protocol as proto;
use std::env;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc;
const IMAGE: &'static str = "python:3.10.1-slim-buster";
// const IMAGE: &'static str = "simplebot:latest";
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
@ -32,134 +10,30 @@ async fn main() {
_run_match(map_path).await; _run_match(map_path).await;
} }
async fn _run_match(map_path: String) { const IMAGE: &'static str = "python:3.10-slim-buster";
let docker = Docker::connect_with_socket_defaults().unwrap();
let code_dir_path = PathBuf::from("../simplebot");
let params = BotParams {
image: IMAGE,
code_path: &code_dir_path,
argv: vec!["python", "simplebot.py"],
};
let mut process = spawn_docker_process(&docker, params).await.unwrap();
let state = proto::State { async fn _run_match(map_path: String) {
planets: vec![ let code_dir_path = PathBuf::from("../simplebot");
proto::Planet { let bot_spec = DockerBotSpec {
image: IMAGE.to_string(),
code_path: code_dir_path,
argv: vec!["python".to_string(), "simplebot.py".to_string()],
};
run_match(MatchConfig {
map_path: PathBuf::from(map_path),
map_name: "hex".to_string(),
log_path: PathBuf::from("match.log"),
players: vec![
MatchPlayer {
name: "a".to_string(), name: "a".to_string(),
owner: Some(1), bot_spec: Box::new(bot_spec.clone()),
ship_count: 100,
x: -1.0,
y: 0.0,
}, },
proto::Planet { MatchPlayer {
name: "b".to_string(), name: "b".to_string(),
owner: Some(2), bot_spec: Box::new(bot_spec.clone()),
ship_count: 100,
x: 1.0,
y: 0.0,
}, },
], ],
expeditions: vec![],
};
let serialized = serde_json::to_vec(&state).unwrap();
let out = process.communicate(&serialized).await.unwrap();
print!("got output: {}", String::from_utf8(out.to_vec()).unwrap());
}
pub struct BotParams<'a> {
pub image: &'a str,
pub code_path: &'a Path,
pub argv: Vec<&'a str>,
}
async fn spawn_docker_process(
docker: &Docker,
params: BotParams<'_>,
) -> Result<ContainerProcess, bollard::errors::Error> {
let bot_code_dir = std::fs::canonicalize(params.code_path).unwrap();
let code_dir_str = bot_code_dir.as_os_str().to_str().unwrap();
let config = container::Config {
image: Some(params.image),
host_config: Some(bollard::models::HostConfig {
binds: Some(vec![format!("{}:{}", code_dir_str, "/workdir")]),
..Default::default()
}),
working_dir: Some("/workdir"),
cmd: Some(params.argv),
attach_stdin: Some(true),
attach_stdout: Some(true),
attach_stderr: Some(true),
open_stdin: Some(true),
..Default::default()
};
let response = docker.create_container::<&str, &str>(None, config).await?;
let container_id = response.id;
docker
.start_container::<String>(&container_id, None)
.await?;
let AttachContainerResults { output, input } = docker
.attach_container(
&container_id,
Some(AttachContainerOptions::<String> {
stdout: Some(true),
stderr: Some(true),
stdin: Some(true),
stream: Some(true),
logs: Some(true),
..Default::default()
}),
)
.await?;
Ok(ContainerProcess {
stdin: input,
output,
}) })
} .await;
pub struct ContainerProcess {
stdin: Pin<Box<dyn AsyncWrite + Send>>,
output: Pin<Box<dyn Stream<Item = Result<LogOutput, bollard::errors::Error>>>>,
}
impl ContainerProcess {
pub async fn communicate(&mut self, input: &[u8]) -> io::Result<Bytes> {
self.write_line(input).await?;
self.read_line().await
}
async fn write_line(&mut self, bytes: &[u8]) -> io::Result<()> {
self.stdin.write_all(bytes).await?;
self.stdin.write_u8(b'\n').await?;
self.stdin.flush().await?;
Ok(())
}
async fn read_line(&mut self) -> io::Result<Bytes> {
while let Some(item) = self.output.next().await {
let log_output = item.expect("failed to get log output");
match log_output {
LogOutput::StdOut { message } => {
// TODO: this is not correct (buffering and such)
return Ok(message);
}
LogOutput::StdErr { message } => {
// TODO
println!("{}", String::from_utf8_lossy(&message));
}
_ => (),
}
}
Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"no response received",
))
}
} }

View file

@ -0,0 +1,188 @@
use std::io;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use bollard::container::{self, AttachContainerOptions, AttachContainerResults, LogOutput};
use bollard::Docker;
use bytes::Bytes;
use futures::{Stream, StreamExt};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc;
use tokio::time::timeout;
use crate::match_context::{EventBus, PlayerHandle, RequestError, RequestMessage};
use crate::BotSpec;
#[derive(Clone, Debug)]
pub struct DockerBotSpec {
pub image: String,
pub code_path: PathBuf,
pub argv: Vec<String>,
}
#[async_trait]
impl BotSpec for DockerBotSpec {
async fn run_bot(
&self,
player_id: u32,
event_bus: Arc<Mutex<EventBus>>,
) -> Box<dyn PlayerHandle> {
let (handle, runner) = create_docker_bot(player_id, event_bus);
let process = spawn_docker_process(self).await.unwrap();
tokio::spawn(runner.run(process));
return Box::new(handle);
}
}
async fn spawn_docker_process(
params: &DockerBotSpec,
) -> Result<ContainerProcess, bollard::errors::Error> {
let docker = Docker::connect_with_socket_defaults()?;
let bot_code_dir = std::fs::canonicalize(&params.code_path).unwrap();
let code_dir_str = bot_code_dir.as_os_str().to_str().unwrap();
let config = container::Config {
image: Some(params.image.clone()),
host_config: Some(bollard::models::HostConfig {
binds: Some(vec![format!("{}:{}", code_dir_str, "/workdir")]),
..Default::default()
}),
working_dir: Some("/workdir".to_string()),
cmd: Some(params.argv.clone()),
attach_stdin: Some(true),
attach_stdout: Some(true),
attach_stderr: Some(true),
open_stdin: Some(true),
..Default::default()
};
let response = docker
.create_container::<&str, String>(None, config)
.await?;
let container_id = response.id;
docker
.start_container::<String>(&container_id, None)
.await?;
let AttachContainerResults { output, input } = docker
.attach_container(
&container_id,
Some(AttachContainerOptions::<String> {
stdout: Some(true),
stderr: Some(true),
stdin: Some(true),
stream: Some(true),
logs: Some(true),
..Default::default()
}),
)
.await?;
Ok(ContainerProcess {
stdin: input,
output,
})
}
pub struct ContainerProcess {
stdin: Pin<Box<dyn AsyncWrite + Send>>,
output: Pin<Box<dyn Stream<Item = Result<LogOutput, bollard::errors::Error>> + Send>>,
}
impl ContainerProcess {
pub async fn communicate(&mut self, input: &[u8]) -> io::Result<Bytes> {
self.write_line(input).await?;
self.read_line().await
}
async fn write_line(&mut self, bytes: &[u8]) -> io::Result<()> {
self.stdin.write_all(bytes).await?;
self.stdin.write_u8(b'\n').await?;
self.stdin.flush().await?;
Ok(())
}
async fn read_line(&mut self) -> io::Result<Bytes> {
while let Some(item) = self.output.next().await {
let log_output = item.expect("failed to get log output");
match log_output {
LogOutput::StdOut { message } => {
// TODO: this is not correct (buffering and such)
return Ok(message);
}
LogOutput::StdErr { message } => {
// TODO
println!("{}", String::from_utf8_lossy(&message));
}
_ => (),
}
}
Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"no response received",
))
}
}
fn create_docker_bot(
player_id: u32,
event_bus: Arc<Mutex<EventBus>>,
) -> (DockerBotHandle, DockerBotRunner) {
let (tx, rx) = mpsc::unbounded_channel();
let bot_handle = DockerBotHandle { tx };
let bot_runner = DockerBotRunner {
player_id,
event_bus,
rx,
};
(bot_handle, bot_runner)
}
pub struct DockerBotHandle {
tx: mpsc::UnboundedSender<RequestMessage>,
}
impl PlayerHandle for DockerBotHandle {
fn send_request(&mut self, r: RequestMessage) {
self.tx
.send(r)
.expect("failed to send message to local bot");
}
fn send_info(&mut self, _msg: String) {
// TODO: log this somewhere
// drop info message
}
}
pub struct DockerBotRunner {
event_bus: Arc<Mutex<EventBus>>,
rx: mpsc::UnboundedReceiver<RequestMessage>,
player_id: u32,
}
impl DockerBotRunner {
pub async fn run(mut self, mut process: ContainerProcess) {
while let Some(request) = self.rx.recv().await {
let resp_fut = process.communicate(&request.content);
let result = timeout(request.timeout, resp_fut)
.await
// TODO: how can this failure be handled cleanly?
.expect("process read failed");
let result = match result {
Ok(line) => Ok(line.to_vec()),
Err(_elapsed) => Err(RequestError::Timeout),
};
let request_id = (self.player_id, request.request_id);
self.event_bus
.lock()
.unwrap()
.resolve_request(request_id, result);
}
}
}

View file

@ -1,4 +1,5 @@
pub mod bot_runner; pub mod bot_runner;
pub mod docker_runner;
pub mod match_context; pub mod match_context;
pub mod pw_match; pub mod pw_match;
@ -8,6 +9,8 @@ use std::{
sync::{Arc, Mutex}, sync::{Arc, Mutex},
}; };
use async_trait::async_trait;
use futures::{stream::FuturesOrdered, FutureExt, StreamExt};
use match_context::MatchCtx; use match_context::MatchCtx;
use planetwars_rules::PwConfig; use planetwars_rules::PwConfig;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -35,8 +38,16 @@ pub struct PlayerInfo {
pub struct MatchPlayer { pub struct MatchPlayer {
pub name: String, pub name: String,
pub path: PathBuf, pub bot_spec: Box<dyn BotSpec>,
pub argv: Vec<String>, }
#[async_trait]
pub trait BotSpec {
async fn run_bot(
&self,
player_id: u32,
event_bus: Arc<Mutex<EventBus>>,
) -> Box<dyn PlayerHandle>;
} }
pub async fn run_match(config: MatchConfig) { pub async fn run_match(config: MatchConfig) {
@ -48,20 +59,22 @@ pub async fn run_match(config: MatchConfig) {
let event_bus = Arc::new(Mutex::new(EventBus::new())); let event_bus = Arc::new(Mutex::new(EventBus::new()));
// start bots // start bots
// TODO: what happens when a bot fails?
let players = config let players = config
.players .players
.iter() .iter()
.enumerate() .enumerate()
.map(|(player_id, player)| { .map(|(player_id, player)| {
let player_id = (player_id + 1) as u32; let player_id = (player_id + 1) as u32;
let bot = bot_runner::Bot { player
working_dir: player.path.clone(), .bot_spec
argv: player.argv.clone(), .run_bot(player_id, event_bus.clone())
}; .map(move |handle| (player_id, handle))
let handle = bot_runner::run_local_bot(player_id, event_bus.clone(), bot);
(player_id, Box::new(handle) as Box<dyn PlayerHandle>)
}) })
.collect(); .collect::<FuturesOrdered<_>>()
// await all results
.collect()
.await;
let mut log_file = std::fs::File::create(config.log_path).expect("could not create log file"); let mut log_file = std::fs::File::create(config.log_path).expect("could not create log file");
// assemble the math meta struct // assemble the math meta struct