docker bot runner PoC

This commit is contained in:
Ilion Beyst 2022-01-19 23:44:11 +01:00
parent 3e18c840d1
commit 5f1c7385c7
2 changed files with 52 additions and 18 deletions

View file

@ -19,3 +19,4 @@ serde_json = "1.0"
planetwars-rules = { path = "../planetwars-rules" }
chrono = { version = "0.4", features = ["serde"] }
bollard = "0.11"
bytes = "1.1"

View file

@ -10,15 +10,17 @@ use std::sync::{Arc, Mutex};
use bollard::container::{self, LogOutput};
use bollard::exec::StartExecResults;
use bollard::Docker;
use bytes::Bytes;
use futures::{Stream, StreamExt};
use planetwars_matchrunner::{
match_context::{EventBus, MatchCtx, PlayerHandle},
match_context::{EventBus, MatchCtx, PlayerHandle, RequestMessage},
pw_match, MatchConfig, MatchMeta, PlayerInfo,
};
use planetwars_rules::protocol as proto;
use planetwars_rules::PwConfig;
use std::env;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc;
const IMAGE: &'static str = "simplebot:latest";
@ -64,9 +66,12 @@ async fn create_player_process(docker: &Docker) -> Result<(), bollard::errors::E
.id;
let start_exec_results = docker.start_exec(&exec_id, None).await?;
let (mut input, mut output) = match start_exec_results {
let mut process = match start_exec_results {
StartExecResults::Detached => panic!("failed to get io channels"),
StartExecResults::Attached { input, output } => (input, output),
StartExecResults::Attached { input, output } => ContainerProcess {
stdin: input,
output,
},
};
let state = proto::State {
@ -90,22 +95,50 @@ async fn create_player_process(docker: &Docker) -> Result<(), bollard::errors::E
};
let serialized = serde_json::to_vec(&state).unwrap();
input.write_all(&serialized).await?;
input.write(b"\n").await?;
input.flush().await?;
let out = process.communicate(&serialized).await?;
while let Some(item) = output.next().await {
print!("{}", String::from_utf8(out.to_vec()).unwrap());
Ok(())
}
pub struct ContainerProcess {
stdin: Pin<Box<dyn AsyncWrite + Send>>,
output: Pin<Box<dyn Stream<Item = Result<LogOutput, bollard::errors::Error>>>>,
}
impl ContainerProcess {
pub async fn communicate(&mut self, input: &[u8]) -> io::Result<Bytes> {
self.write_line(input).await?;
self.read_line().await
}
async fn write_line(&mut self, bytes: &[u8]) -> io::Result<()> {
self.stdin.write_all(bytes).await?;
self.stdin.write_u8(b'\n').await?;
self.stdin.flush().await?;
Ok(())
}
async fn read_line(&mut self) -> io::Result<Bytes> {
while let Some(item) = self.output.next().await {
let log_output = item.expect("failed to get log output");
match log_output {
LogOutput::StdOut { message } => {
println!("stdout: {}", String::from_utf8_lossy(&message));
// TODO: this is not correct (buffering and such)
return Ok(message);
}
LogOutput::StdErr { message } => {
// TODO
println!("stderr: {}", String::from_utf8_lossy(&message));
}
_ => (),
}
}
Ok(())
Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"no response received",
))
}
}