diff --git a/planetwars-client/Cargo.toml b/planetwars-client/Cargo.toml index 10de887..52c3c64 100644 --- a/planetwars-client/Cargo.toml +++ b/planetwars-client/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] tokio = { version = "1.15", features = ["full"] } +tokio-stream = "0.1.9" prost = "0.10" tonic = "0.7.2" diff --git a/planetwars-client/src/main.rs b/planetwars-client/src/main.rs index 9d9bdab..d995ebc 100644 --- a/planetwars-client/src/main.rs +++ b/planetwars-client/src/main.rs @@ -2,20 +2,34 @@ pub mod pb { tonic::include_proto!("grpc.planetwars.bot_api"); } -use pb::test_service_client::TestServiceClient; -use pb::{Hello, HelloResponse}; -use tonic::Response; +use pb::bot_api_service_client::BotApiServiceClient; +use tokio_stream::wrappers::UnboundedReceiverStream; + +use tokio::sync::mpsc; #[tokio::main] async fn main() { - let mut client = TestServiceClient::connect("http://localhost:50051") + let mut client = BotApiServiceClient::connect("http://localhost:50051") .await .unwrap(); - let response: Response = client - .greet(Hello { - hello_message: "robbe".to_string(), + + let (tx, rx) = mpsc::unbounded_channel(); + let mut stream = client + .connect_bot(UnboundedReceiverStream::new(rx)) + .await + .unwrap() + .into_inner(); + while let Some(message) = stream.message().await.unwrap() { + let state = String::from_utf8(message.content).unwrap(); + println!("{}", state); + let response = r#"{ moves: [] }"#; + tx.send(pb::PlayerRequestResponse { + request_id: message.request_id, + content: response.as_bytes().to_vec(), }) - .await .unwrap(); - println!("{}", response.get_ref().response); + } + std::mem::drop(tx); + // for clean exit + std::mem::drop(client); } diff --git a/planetwars-server/Cargo.toml b/planetwars-server/Cargo.toml index 6b96b04..0ceabbc 100644 --- a/planetwars-server/Cargo.toml +++ b/planetwars-server/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] tokio = { version = "1.15", features = ["full"] } +tokio-stream = "0.1.9" hyper = "0.14" axum = { version = "0.4", features = ["json", "headers", "multipart"] } diesel = { version = "1.4.4", features = ["postgres", "chrono"] } diff --git a/planetwars-server/src/modules/bot_api.rs b/planetwars-server/src/modules/bot_api.rs index 1941136..0f1ff82 100644 --- a/planetwars-server/src/modules/bot_api.rs +++ b/planetwars-server/src/modules/bot_api.rs @@ -3,27 +3,157 @@ pub mod pb { } use std::net::SocketAddr; +use std::ops::DerefMut; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; +use runner::match_context::{EventBus, PlayerHandle, RequestMessage}; +use runner::match_log::MatchLogger; +use tokio::sync::{mpsc, oneshot}; +use tokio_stream::wrappers::UnboundedReceiverStream; use tonic; use tonic::transport::Server; -use tonic::{Request, Response, Status}; +use tonic::{Request, Response, Status, Streaming}; -pub struct BotApiServer {} +use planetwars_matchrunner as runner; + +use crate::db; +use crate::{ConnectionPool, MAPS_DIR, MATCHES_DIR}; + +use super::matches::code_bundle_to_botspec; + +pub struct BotApiServer { + sync_thing: ServerSyncThing, +} #[tonic::async_trait] -impl pb::test_service_server::TestService for BotApiServer { - async fn greet(&self, req: Request) -> Result, Status> { - Ok(Response::new(pb::HelloResponse { - response: format!("hallo {}", req.get_ref().hello_message), - })) +impl pb::bot_api_service_server::BotApiService for BotApiServer { + type ConnectBotStream = UnboundedReceiverStream>; + + async fn connect_bot( + &self, + req: Request>, + ) -> Result, Status> { + println!("bot connected"); + let stream = req.into_inner(); + let sync_data = self.sync_thing.streams.lock().unwrap().take().unwrap(); + sync_data.tx.send(stream).unwrap(); + Ok(Response::new(UnboundedReceiverStream::new( + sync_data.server_messages, + ))) } } -pub async fn run_bot_api() { - let server = BotApiServer {}; +#[derive(Clone)] +struct ServerSyncThing { + streams: Arc>>, +} + +struct SyncThingData { + tx: oneshot::Sender>, + server_messages: mpsc::UnboundedReceiver>, +} + +impl ServerSyncThing { + fn new() -> Self { + ServerSyncThing { + streams: Arc::new(Mutex::new(None)), + } + } +} + +struct RemoteBotSpec { + sync_thing: ServerSyncThing, +} + +#[tonic::async_trait] +impl runner::BotSpec for RemoteBotSpec { + async fn run_bot( + &self, + player_id: u32, + event_bus: Arc>, + _match_logger: MatchLogger, + ) -> Box { + let (tx, rx) = oneshot::channel(); + let (server_msg_snd, server_msg_recv) = mpsc::unbounded_channel(); + *self.sync_thing.streams.lock().unwrap().deref_mut() = Some(SyncThingData { + tx, + server_messages: server_msg_recv, + }); + + let client_messages = rx.await.unwrap(); + tokio::spawn(handle_bot_messages(player_id, event_bus, client_messages)); + + Box::new(RemoteBotHandle { + sender: server_msg_snd, + }) + } +} + +async fn handle_bot_messages( + player_id: u32, + event_bus: Arc>, + mut messages: Streaming, +) { + while let Some(message) = messages.message().await.unwrap() { + let request_id = (player_id, message.request_id as u32); + event_bus + .lock() + .unwrap() + .resolve_request(request_id, Ok(message.content)); + } +} + +struct RemoteBotHandle { + sender: mpsc::UnboundedSender>, +} + +impl PlayerHandle for RemoteBotHandle { + fn send_request(&mut self, r: RequestMessage) { + self.sender + .send(Ok(pb::PlayerRequest { + request_id: r.request_id as i32, + content: r.content, + })) + .unwrap(); + } +} + +async fn run_match(sync_thing: ServerSyncThing, pool: ConnectionPool) { + let conn = pool.get().await.unwrap(); + + let opponent = db::bots::find_bot_by_name("simplebot", &conn).unwrap(); + let opponent_code_bundle = db::bots::active_code_bundle(opponent.id, &conn).unwrap(); + + let log_file_name = "remote_match.log"; + + let remote_bot_spec = RemoteBotSpec { sync_thing }; + + let match_config = runner::MatchConfig { + map_path: PathBuf::from(MAPS_DIR).join("hex.json"), + map_name: "hex".to_string(), + log_path: PathBuf::from(MATCHES_DIR).join(&log_file_name), + players: vec![ + runner::MatchPlayer { + bot_spec: Box::new(remote_bot_spec), + }, + runner::MatchPlayer { + bot_spec: code_bundle_to_botspec(&opponent_code_bundle), + }, + ], + }; + + runner::run_match(match_config).await; +} + +pub async fn run_bot_api(pool: ConnectionPool) { + let sync_thing = ServerSyncThing::new(); + tokio::spawn(run_match(sync_thing.clone(), pool)); + let server = BotApiServer { sync_thing }; + let addr = SocketAddr::from(([127, 0, 0, 1], 50051)); Server::builder() - .add_service(pb::test_service_server::TestServiceServer::new(server)) + .add_service(pb::bot_api_service_server::BotApiServiceServer::new(server)) .serve(addr) .await .unwrap()