planetwars.dev/planetwars-matchrunner/src/bot_runner.rs

117 lines
3.2 KiB
Rust
Raw Normal View History

2021-12-25 14:45:05 +01:00
use std::io;
2021-12-28 14:57:41 +01:00
use std::path::PathBuf;
2021-12-25 14:45:05 +01:00
use std::process::Stdio;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines};
use tokio::process;
use tokio::sync::mpsc;
use tokio::time::timeout;
use super::match_context::EventBus;
use super::match_context::PlayerHandle;
use super::match_context::RequestError;
use super::match_context::RequestMessage;
pub struct LocalBotHandle {
tx: mpsc::UnboundedSender<RequestMessage>,
}
impl PlayerHandle for LocalBotHandle {
fn send_request(&mut self, r: RequestMessage) {
self.tx
.send(r)
.expect("failed to send message to local bot");
}
}
pub fn run_local_bot(player_id: u32, event_bus: Arc<Mutex<EventBus>>, bot: Bot) -> LocalBotHandle {
let (tx, rx) = mpsc::unbounded_channel();
let runner = LocalBotRunner {
event_bus,
rx,
player_id,
bot,
};
tokio::spawn(runner.run());
return LocalBotHandle { tx };
}
pub struct LocalBotRunner {
event_bus: Arc<Mutex<EventBus>>,
rx: mpsc::UnboundedReceiver<RequestMessage>,
player_id: u32,
bot: Bot,
}
impl LocalBotRunner {
pub async fn run(mut self) {
let mut process = self.bot.spawn_process();
while let Some(request) = self.rx.recv().await {
let resp_fut = process.communicate(&request.content);
let result = timeout(request.timeout, resp_fut)
.await
// TODO: how can this failure be handled cleanly?
.expect("process read failed");
let result = match result {
Ok(line) => Ok(line.into_bytes()),
Err(_elapsed) => Err(RequestError::Timeout),
};
let request_id = (self.player_id, request.request_id);
self.event_bus
.lock()
.unwrap()
.resolve_request(request_id, result);
}
}
}
#[derive(Debug, Clone)]
pub struct Bot {
2021-12-28 14:57:41 +01:00
pub working_dir: PathBuf,
2021-12-25 14:45:05 +01:00
pub argv: Vec<String>,
}
impl Bot {
pub fn spawn_process(&self) -> BotProcess {
let mut child = process::Command::new(&self.argv[0])
.args(&self.argv[1..])
.current_dir(self.working_dir.clone())
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()
.expect("spawning failed");
let stdout = child.stdout.take().unwrap();
let reader = BufReader::new(stdout).lines();
return BotProcess {
stdin: child.stdin.take().unwrap(),
stdout: reader,
child,
};
}
}
pub struct BotProcess {
#[allow(dead_code)]
child: process::Child,
stdin: process::ChildStdin,
stdout: Lines<BufReader<process::ChildStdout>>,
}
impl BotProcess {
// TODO: gracefully handle errors
pub async fn communicate(&mut self, input: &[u8]) -> io::Result<String> {
self.stdin.write_all(input).await?;
self.stdin.write_u8(b'\n').await?;
let line = self.stdout.next_line().await?;
line.ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "no response received"))
}
}