client: don't freeze when bot does not produce output

This commit is contained in:
Ilion Beyst 2022-11-27 11:26:38 +01:00
parent 2b71ca625e
commit c7d9734067
2 changed files with 89 additions and 21 deletions

View file

@ -7,12 +7,21 @@ pub mod pb {
use clap::Parser;
use pb::client_api_service_client::ClientApiServiceClient;
use planetwars_matchrunner::bot_runner::Bot;
use planetwars_matchrunner::bot_runner::{Bot, BotProcess};
use serde::Deserialize;
use std::{path::PathBuf, time::Duration};
use tokio::sync::mpsc;
use std::{
collections::VecDeque,
path::PathBuf,
sync::{Arc, Mutex},
time::Duration,
};
use tokio::{
io::{AsyncWriteExt, BufReader, Lines},
process::{ChildStdin, ChildStdout},
sync::mpsc::{self, UnboundedSender},
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::{metadata::MetadataValue, transport::Channel, Request, Status};
use tonic::{metadata::MetadataValue, transport::Channel, Request, Status, Streaming};
#[derive(clap::Parser)]
struct PlayMatch {
@ -124,7 +133,11 @@ async fn run_player(
Ok(req)
});
let mut bot_process = Bot {
let BotProcess {
child: _child,
stdin,
stdout,
} = Bot {
working_dir: PathBuf::from(
bot_config
.working_directory
@ -134,27 +147,57 @@ async fn run_player(
}
.spawn_process();
let state = Arc::new(Mutex::new(BotRunnerState {
request_queue: VecDeque::new(),
}));
let (tx, rx) = mpsc::unbounded_channel();
let mut stream = client
let stream = client
.connect_player(UnboundedReceiverStream::new(rx))
.await
.unwrap()
.into_inner();
let output_handle = tokio::spawn(handle_bot_output(state.clone(), tx, stdout));
let input_handle = tokio::spawn(handle_server_messages(state.clone(), stream, stdin));
output_handle
.await
.unwrap()
.map_err(RunPlayerError::RunBotError)?;
input_handle
.await
.unwrap()
.map_err(RunPlayerError::RunBotError)?;
Ok(())
}
struct BotRunnerState {
request_queue: VecDeque<i32>,
}
async fn handle_server_messages(
runner_state: Arc<Mutex<BotRunnerState>>,
mut stream: Streaming<pb::PlayerApiServerMessage>,
mut stdin: ChildStdin,
) -> io::Result<()> {
while let Some(message) = stream.message().await.unwrap() {
match message.server_message {
Some(pb::PlayerApiServerMessageType::ActionRequest(req)) => {
let moves = bot_process
.communicate(&req.content)
.await
.map_err(RunPlayerError::RunBotError)?;
let action = pb::PlayerAction {
action_request_id: req.action_request_id,
content: moves.as_bytes().to_vec(),
};
let msg = pb::PlayerApiClientMessage {
client_message: Some(pb::PlayerApiClientMessageType::Action(action)),
};
tx.send(msg).unwrap();
{
let mut state = runner_state.lock().unwrap();
if !state.request_queue.is_empty() {
eprintln!("[WARN] new turn started before bot output was received");
eprintln!(
"[WARN] this could be due to your bot taking too long, \
or failing to flush output buffers."
);
}
state.request_queue.push_back(req.action_request_id);
}
stdin.write_all(&req.content).await?;
stdin.write_u8(b'\n').await?;
}
_ => {} // pass
}
@ -162,3 +205,28 @@ async fn run_player(
Ok(())
}
use std::io;
async fn handle_bot_output(
runner_state: Arc<Mutex<BotRunnerState>>,
tx: UnboundedSender<pb::PlayerApiClientMessage>,
mut bot_stdout: Lines<BufReader<ChildStdout>>,
) -> io::Result<()> {
while let Some(line) = bot_stdout.next_line().await? {
if let Some(request_id) = runner_state.lock().unwrap().request_queue.pop_front() {
let action = pb::PlayerAction {
action_request_id: request_id,
content: line.as_bytes().to_vec(),
};
let msg = pb::PlayerApiClientMessage {
client_message: Some(pb::PlayerApiClientMessageType::Action(action)),
};
tx.send(msg).unwrap();
} else {
eprintln!("[WARN] bot issued commands before a gamestate was received");
}
}
Ok(())
}

View file

@ -108,9 +108,9 @@ impl Bot {
pub struct BotProcess {
#[allow(dead_code)]
child: process::Child,
stdin: process::ChildStdin,
stdout: Lines<BufReader<process::ChildStdout>>,
pub child: process::Child,
pub stdin: process::ChildStdin,
pub stdout: Lines<BufReader<process::ChildStdout>>,
}
impl BotProcess {