From c7d973406790056451af2af24630f03409a9647e Mon Sep 17 00:00:00 2001 From: Ilion Beyst Date: Sun, 27 Nov 2022 11:26:38 +0100 Subject: [PATCH] client: don't freeze when bot does not produce output --- planetwars-client/src/main.rs | 104 +++++++++++++++++++---- planetwars-matchrunner/src/bot_runner.rs | 6 +- 2 files changed, 89 insertions(+), 21 deletions(-) diff --git a/planetwars-client/src/main.rs b/planetwars-client/src/main.rs index 6ae2a31..5c9ee44 100644 --- a/planetwars-client/src/main.rs +++ b/planetwars-client/src/main.rs @@ -7,12 +7,21 @@ pub mod pb { use clap::Parser; use pb::client_api_service_client::ClientApiServiceClient; -use planetwars_matchrunner::bot_runner::Bot; +use planetwars_matchrunner::bot_runner::{Bot, BotProcess}; use serde::Deserialize; -use std::{path::PathBuf, time::Duration}; -use tokio::sync::mpsc; +use std::{ + collections::VecDeque, + path::PathBuf, + sync::{Arc, Mutex}, + time::Duration, +}; +use tokio::{ + io::{AsyncWriteExt, BufReader, Lines}, + process::{ChildStdin, ChildStdout}, + sync::mpsc::{self, UnboundedSender}, +}; use tokio_stream::wrappers::UnboundedReceiverStream; -use tonic::{metadata::MetadataValue, transport::Channel, Request, Status}; +use tonic::{metadata::MetadataValue, transport::Channel, Request, Status, Streaming}; #[derive(clap::Parser)] struct PlayMatch { @@ -124,7 +133,11 @@ async fn run_player( Ok(req) }); - let mut bot_process = Bot { + let BotProcess { + child: _child, + stdin, + stdout, + } = Bot { working_dir: PathBuf::from( bot_config .working_directory @@ -134,27 +147,57 @@ async fn run_player( } .spawn_process(); + let state = Arc::new(Mutex::new(BotRunnerState { + request_queue: VecDeque::new(), + })); + let (tx, rx) = mpsc::unbounded_channel(); - let mut stream = client + let stream = client .connect_player(UnboundedReceiverStream::new(rx)) .await .unwrap() .into_inner(); + + let output_handle = tokio::spawn(handle_bot_output(state.clone(), tx, stdout)); + let input_handle = tokio::spawn(handle_server_messages(state.clone(), stream, stdin)); + + output_handle + .await + .unwrap() + .map_err(RunPlayerError::RunBotError)?; + input_handle + .await + .unwrap() + .map_err(RunPlayerError::RunBotError)?; + + Ok(()) +} + +struct BotRunnerState { + request_queue: VecDeque, +} + +async fn handle_server_messages( + runner_state: Arc>, + mut stream: Streaming, + mut stdin: ChildStdin, +) -> io::Result<()> { while let Some(message) = stream.message().await.unwrap() { match message.server_message { Some(pb::PlayerApiServerMessageType::ActionRequest(req)) => { - let moves = bot_process - .communicate(&req.content) - .await - .map_err(RunPlayerError::RunBotError)?; - let action = pb::PlayerAction { - action_request_id: req.action_request_id, - content: moves.as_bytes().to_vec(), - }; - let msg = pb::PlayerApiClientMessage { - client_message: Some(pb::PlayerApiClientMessageType::Action(action)), - }; - tx.send(msg).unwrap(); + { + let mut state = runner_state.lock().unwrap(); + if !state.request_queue.is_empty() { + eprintln!("[WARN] new turn started before bot output was received"); + eprintln!( + "[WARN] this could be due to your bot taking too long, \ + or failing to flush output buffers." + ); + } + state.request_queue.push_back(req.action_request_id); + } + stdin.write_all(&req.content).await?; + stdin.write_u8(b'\n').await?; } _ => {} // pass } @@ -162,3 +205,28 @@ async fn run_player( Ok(()) } + +use std::io; + +async fn handle_bot_output( + runner_state: Arc>, + tx: UnboundedSender, + mut bot_stdout: Lines>, +) -> io::Result<()> { + while let Some(line) = bot_stdout.next_line().await? { + if let Some(request_id) = runner_state.lock().unwrap().request_queue.pop_front() { + let action = pb::PlayerAction { + action_request_id: request_id, + content: line.as_bytes().to_vec(), + }; + let msg = pb::PlayerApiClientMessage { + client_message: Some(pb::PlayerApiClientMessageType::Action(action)), + }; + tx.send(msg).unwrap(); + } else { + eprintln!("[WARN] bot issued commands before a gamestate was received"); + } + } + + Ok(()) +} diff --git a/planetwars-matchrunner/src/bot_runner.rs b/planetwars-matchrunner/src/bot_runner.rs index 8597e26..f7c0742 100644 --- a/planetwars-matchrunner/src/bot_runner.rs +++ b/planetwars-matchrunner/src/bot_runner.rs @@ -108,9 +108,9 @@ impl Bot { pub struct BotProcess { #[allow(dead_code)] - child: process::Child, - stdin: process::ChildStdin, - stdout: Lines>, + pub child: process::Child, + pub stdin: process::ChildStdin, + pub stdout: Lines>, } impl BotProcess {