cli for running matches

This commit is contained in:
Ilion Beyst 2021-12-25 14:45:05 +01:00
parent d0af8d3bbf
commit d0a0fcfded
9 changed files with 611 additions and 1 deletions

View file

@ -2,4 +2,5 @@
members = [
"planetwars-rules",
]
"planetwars-localdev",
]

View file

@ -0,0 +1,24 @@
[package]
name = "planetwars-localdev"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[[bin]]
name = "pwcli"
[dependencies]
futures-core = "0.3"
futures = "0.3"
tokio = { version = "1", features = ["full"] }
rand = "0.6"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
toml = "0.5"
planetwars-rules = { path = "../planetwars-rules" }
clap = { version = "3.0.0-rc.8", features = ["derive"] }
chrono = "0.4"
rust-embed = "6.3.0"
axum = "0.4"
mime_guess = "2"

View file

@ -0,0 +1,3 @@
# planetwars-localdev
Tools for developping planetwars bots locally.

View file

@ -0,0 +1,6 @@
use planetwars_localdev;
#[tokio::main]
async fn main() {
planetwars_localdev::run().await
}

View file

@ -0,0 +1,103 @@
use match_runner::{MatchBot, MatchConfig};
use serde::Deserialize;
mod match_runner;
use serde::Serialize;
use std::collections::HashMap;
use std::env;
use std::io;
use std::path::{Path, PathBuf};
use toml;
use clap::{Parser, Subcommand};
#[derive(Parser)]
#[clap(name = "pwcli")]
#[clap(author, version, about)]
struct Cli {
#[clap(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
/// Run a match
RunMatch(RunMatchCommand),
}
#[derive(Parser)]
struct RunMatchCommand {
/// map name
map: String,
/// bot names
bots: Vec<String>,
}
#[derive(Serialize, Deserialize, Debug)]
struct ProjectConfig {
bots: HashMap<String, BotConfig>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct BotConfig {
path: String,
argv: Vec<String>,
}
pub async fn run() {
let matches = Cli::parse();
let res = match matches.command {
Commands::RunMatch(command) => run_match(command).await,
};
if let Err(err) = res {
eprintln!("{}", err);
std::process::exit(1);
}
}
async fn run_match(command: RunMatchCommand) -> io::Result<()> {
let project_dir = env::current_dir().unwrap();
let config_path = project_dir.join("pw_project.toml");
let map_path = project_dir.join(format!("maps/{}.json", command.map));
let timestamp = chrono::Local::now().format("%Y-%m-%d-%H-%M-%S");
let log_path = project_dir.join(format!("matches/{}.log", timestamp));
let config_str = std::fs::read_to_string(config_path).unwrap();
let project_config: ProjectConfig = toml::from_str(&config_str).unwrap();
let players = command
.bots
.into_iter()
.map(|bot_name| {
let bot_config = project_config.bots.get(&bot_name).unwrap().clone();
let resolved_config = resolve_bot_config(&project_dir, bot_config);
MatchBot {
name: bot_name,
bot_config: resolved_config,
}
})
.collect();
let match_config = MatchConfig {
map_path,
log_path,
players,
};
match_runner::run_match(match_config).await;
Ok(())
}
fn resolve_bot_config(project_dir: &Path, config: BotConfig) -> BotConfig {
let mut path = PathBuf::from(project_dir);
path.push(&config.path);
BotConfig {
path: path.to_str().unwrap().to_string(),
argv: config.argv,
}
}

View file

@ -0,0 +1,120 @@
use std::io;
use std::process::Stdio;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines};
use tokio::process;
use tokio::sync::mpsc;
use tokio::time::timeout;
use super::match_context::EventBus;
use super::match_context::PlayerHandle;
use super::match_context::RequestError;
use super::match_context::RequestMessage;
pub struct LocalBotHandle {
tx: mpsc::UnboundedSender<RequestMessage>,
}
impl PlayerHandle for LocalBotHandle {
fn send_request(&mut self, r: RequestMessage) {
self.tx
.send(r)
.expect("failed to send message to local bot");
}
fn send_info(&mut self, _msg: String) {
// TODO: log this somewhere
// drop info message
}
}
pub fn run_local_bot(player_id: u32, event_bus: Arc<Mutex<EventBus>>, bot: Bot) -> LocalBotHandle {
let (tx, rx) = mpsc::unbounded_channel();
let runner = LocalBotRunner {
event_bus,
rx,
player_id,
bot,
};
tokio::spawn(runner.run());
return LocalBotHandle { tx };
}
pub struct LocalBotRunner {
event_bus: Arc<Mutex<EventBus>>,
rx: mpsc::UnboundedReceiver<RequestMessage>,
player_id: u32,
bot: Bot,
}
impl LocalBotRunner {
pub async fn run(mut self) {
let mut process = self.bot.spawn_process();
while let Some(request) = self.rx.recv().await {
let resp_fut = process.communicate(&request.content);
let result = timeout(request.timeout, resp_fut)
.await
// TODO: how can this failure be handled cleanly?
.expect("process read failed");
let result = match result {
Ok(line) => Ok(line.into_bytes()),
Err(_elapsed) => Err(RequestError::Timeout),
};
let request_id = (self.player_id, request.request_id);
self.event_bus
.lock()
.unwrap()
.resolve_request(request_id, result);
}
}
}
#[derive(Debug, Clone)]
pub struct Bot {
pub working_dir: String,
pub argv: Vec<String>,
}
impl Bot {
pub fn spawn_process(&self) -> BotProcess {
let mut child = process::Command::new(&self.argv[0])
.args(&self.argv[1..])
.current_dir(self.working_dir.clone())
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()
.expect("spawning failed");
let stdout = child.stdout.take().unwrap();
let reader = BufReader::new(stdout).lines();
return BotProcess {
stdin: child.stdin.take().unwrap(),
stdout: reader,
child,
};
}
}
pub struct BotProcess {
#[allow(dead_code)]
child: process::Child,
stdin: process::ChildStdin,
stdout: Lines<BufReader<process::ChildStdout>>,
}
impl BotProcess {
// TODO: gracefully handle errors
pub async fn communicate(&mut self, input: &[u8]) -> io::Result<String> {
self.stdin.write_all(input).await?;
self.stdin.write_u8(b'\n').await?;
let line = self.stdout.next_line().await?;
line.ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "no response received"))
}
}

View file

@ -0,0 +1,161 @@
use futures::task::{Context, Poll};
use futures::{future::Future, task::AtomicWaker};
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::Write;
use std::pin::Pin;
use std::time::Duration;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
#[derive(Serialize, Deserialize, Debug)]
pub struct RequestMessage {
pub request_id: u32,
pub timeout: Duration,
pub content: Vec<u8>,
}
pub struct MatchCtx {
event_bus: Arc<Mutex<EventBus>>,
players: HashMap<u32, PlayerData>,
// output: MsgStreamHandle<String>,
log_sink: File,
}
impl MatchCtx {
pub fn new(
event_bus: Arc<Mutex<EventBus>>,
players: HashMap<u32, Box<dyn PlayerHandle>>,
log_file: File,
// log: MsgStreamHandle<String>,
) -> Self {
MatchCtx {
event_bus,
players: players
.into_iter()
.map(|(id, handle)| {
let player_handle = PlayerData {
request_ctr: 0,
handle,
};
(id, player_handle)
})
.collect(),
log_sink: log_file,
}
}
// TODO: implement a clean way to handle the player not existing
pub fn request(&mut self, player_id: u32, content: Vec<u8>, timeout: Duration) -> Request {
let player = self.players.get_mut(&player_id).unwrap();
let request_id = player.request_ctr;
player.request_ctr += 1;
player.handle.send_request(RequestMessage {
request_id,
content,
timeout,
});
return Request {
player_id,
request_id,
event_bus: self.event_bus.clone(),
};
}
pub fn send_info(&mut self, player_id: u32, msg: String) {
let player = self.players.get_mut(&player_id).unwrap();
player.handle.send_info(msg);
}
pub fn players(&self) -> Vec<u32> {
self.players.keys().cloned().collect()
}
// this method should be used to emit log states etc.
pub fn log_string(&mut self, message: String) {
write!(self.log_sink, "{}\n", message).expect("failed to write to log file");
}
}
pub trait PlayerHandle: Send {
fn send_request(&mut self, r: RequestMessage);
fn send_info(&mut self, msg: String);
}
struct PlayerData {
request_ctr: u32,
handle: Box<dyn PlayerHandle>,
}
type RequestId = (u32, u32);
pub struct EventBus {
request_responses: HashMap<RequestId, RequestResult<Vec<u8>>>,
wakers: HashMap<RequestId, AtomicWaker>,
}
impl EventBus {
pub fn new() -> Self {
EventBus {
request_responses: HashMap::new(),
wakers: HashMap::new(),
}
}
}
impl EventBus {
pub fn resolve_request(&mut self, id: RequestId, result: RequestResult<Vec<u8>>) {
if self.request_responses.contains_key(&id) {
// request already resolved
// TODO: maybe report this?
return;
}
self.request_responses.insert(id, result);
if let Some(waker) = self.wakers.remove(&id) {
waker.wake();
}
}
}
pub struct Request {
player_id: u32,
request_id: u32,
event_bus: Arc<Mutex<EventBus>>,
}
impl Request {
#[allow(dead_code)]
pub fn player_id(&self) -> u32 {
self.player_id
}
}
impl Future for Request {
type Output = RequestResult<Vec<u8>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut event_bus = self.event_bus.lock().unwrap();
let request_id = (self.player_id, self.request_id);
if let Some(result) = event_bus.request_responses.get(&request_id) {
return Poll::Ready(result.clone());
}
event_bus
.wakers
.entry(request_id)
.or_insert_with(|| AtomicWaker::new())
.register(cx.waker());
return Poll::Pending;
}
}
#[derive(Debug, Clone)]
pub enum RequestError {
Timeout,
}
pub type RequestResult<T> = Result<T, RequestError>;

View file

@ -0,0 +1,56 @@
mod bot_runner;
mod match_context;
mod pw_match;
use std::{
path::PathBuf,
sync::{Arc, Mutex},
};
use match_context::MatchCtx;
use planetwars_rules::PwConfig;
use crate::BotConfig;
use self::match_context::{EventBus, PlayerHandle};
pub struct MatchConfig {
pub map_path: PathBuf,
pub log_path: PathBuf,
pub players: Vec<MatchBot>,
}
pub struct MatchBot {
pub name: String,
pub bot_config: BotConfig,
}
pub async fn run_match(config: MatchConfig) {
let pw_config = PwConfig {
map_file: config.map_path,
max_turns: 100,
};
let event_bus = Arc::new(Mutex::new(EventBus::new()));
// start bots
let players = config
.players
.iter()
.enumerate()
.map(|(player_id, bot)| {
let player_id = (player_id + 1) as u32;
let bot = bot_runner::Bot {
working_dir: bot.bot_config.path.clone(),
argv: bot.bot_config.argv.clone(),
};
let handle = bot_runner::run_local_bot(player_id, event_bus.clone(), bot);
(player_id, Box::new(handle) as Box<dyn PlayerHandle>)
})
.collect();
let log_file = std::fs::File::create(config.log_path).expect("could not create log file");
let match_ctx = MatchCtx::new(event_bus, players, log_file);
let match_state = pw_match::PwMatch::create(match_ctx, pw_config);
match_state.run().await;
}

View file

@ -0,0 +1,136 @@
use super::match_context::{MatchCtx, RequestResult};
use futures::stream::futures_unordered::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::time::Duration;
use serde_json;
use std::convert::TryInto;
pub use planetwars_rules::config::{Config, Map};
use planetwars_rules::protocol::{self as proto, PlayerAction};
use planetwars_rules::serializer as pw_serializer;
use planetwars_rules::{PlanetWars, PwConfig};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct MatchConfig {
pub map_name: String,
pub max_turns: usize,
}
pub struct PwMatch {
match_ctx: MatchCtx,
match_state: PlanetWars,
}
impl PwMatch {
pub fn create(match_ctx: MatchCtx, config: PwConfig) -> Self {
// TODO: this is kind of hacked together at the moment
let match_state = PlanetWars::create(config, match_ctx.players().len());
PwMatch {
match_state,
match_ctx,
}
}
pub async fn run(mut self) {
while !self.match_state.is_finished() {
let player_messages = self.prompt_players().await;
for (player_id, turn) in player_messages {
let res = self.execute_action(player_id, turn);
if let Some(err) = action_errors(res) {
let info_str = serde_json::to_string(&err).unwrap();
self.match_ctx.send_info(player_id as u32, info_str);
}
}
self.match_state.step();
// Log state
let state = self.match_state.serialize_state();
self.match_ctx
.log_string(serde_json::to_string(&state).unwrap());
}
}
async fn prompt_players(&mut self) -> Vec<(usize, RequestResult<Vec<u8>>)> {
// borrow these outside closure to make the borrow checker happy
let state = self.match_state.state();
let match_ctx = &mut self.match_ctx;
// TODO: this numbering is really messy.
// Get rid of the distinction between player_num
// and player_id.
self.match_state
.state()
.players
.iter()
.filter(|p| p.alive)
.map(move |player| {
let state_for_player = pw_serializer::serialize_rotated(&state, player.id - 1);
match_ctx
.request(
player.id.try_into().unwrap(),
serde_json::to_vec(&state_for_player).unwrap(),
Duration::from_millis(1000),
)
.map(move |resp| (player.id, resp))
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<_>>()
.await
}
fn execute_action(
&mut self,
player_num: usize,
turn: RequestResult<Vec<u8>>,
) -> proto::PlayerAction {
let turn = match turn {
Err(_timeout) => return proto::PlayerAction::Timeout,
Ok(data) => data,
};
let action: proto::Action = match serde_json::from_slice(&turn) {
Err(err) => return proto::PlayerAction::ParseError(err.to_string()),
Ok(action) => action,
};
let commands = action
.commands
.into_iter()
.map(|command| {
let res = self.match_state.execute_command(player_num, &command);
proto::PlayerCommand {
command,
error: res.err(),
}
})
.collect();
return proto::PlayerAction::Commands(commands);
}
}
fn action_errors(action: PlayerAction) -> Option<PlayerAction> {
match action {
PlayerAction::Commands(commands) => {
let failed = commands
.into_iter()
.filter(|cmd| cmd.error.is_some())
.collect::<Vec<_>>();
if failed.is_empty() {
None
} else {
Some(PlayerAction::Commands(failed))
}
}
e => Some(e),
}
}