From 3637cfb56341b65203e4be3c37000e182692f51c Mon Sep 17 00:00:00 2001 From: Daniel Bulant Date: Sun, 28 May 2023 12:22:46 +0200 Subject: [PATCH] Working version --- Cargo.lock | 30 ++++++++++++++++++++++++++++++ Cargo.toml | 3 ++- src/main.rs | 50 +++++++++++++++++++++++++++++++++++++++++--------- 3 files changed, 73 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e0eee11..24d3c5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -117,6 +117,15 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "memoffset" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" +dependencies = [ + "autocfg", +] + [[package]] name = "mio" version = "0.8.6" @@ -129,6 +138,20 @@ dependencies = [ "windows-sys 0.45.0", ] +[[package]] +name = "nix" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a" +dependencies = [ + "bitflags", + "cfg-if", + "libc", + "memoffset", + "pin-utils", + "static_assertions", +] + [[package]] name = "num_cpus" version = "1.15.0" @@ -251,6 +274,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "syn" version = "2.0.18" @@ -267,6 +296,7 @@ name = "tcp-spawner" version = "0.1.0" dependencies = [ "futures-util", + "nix", "os_pipe", "tokio", "tokio-stream", diff --git a/Cargo.toml b/Cargo.toml index 3140bd5..2925224 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,4 +10,5 @@ tokio = { version = "1", features = ["full"] } tokio-stream = "0.1" os_pipe = "1.1.4" tokio-util = { version = "0.7.8", features = ["compat"] } -futures-util = { version = "0.3.28", features = ["io"] } \ No newline at end of file +futures-util = { version = "0.3.28", features = ["io"] } +nix = { version = "0.26.2", features = ["signal"] } \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 3924aa0..25e59ab 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,19 @@ use std::env::args; use std::net::TcpListener; +use std::time::Duration; use tokio::process::Command; -use tokio::io; +use tokio::{io, select}; use futures_util::io::AllowStdIo; - +use nix::unistd::Pid; +use nix::sys::signal::{self, Signal}; fn build_command_from_args(args: &[String]) -> Command { let mut command = Command::new(args.get(0).unwrap()); command.stdout(std::process::Stdio::piped()); command.stderr(std::process::Stdio::piped()); + command.stdin(std::process::Stdio::piped()); + command.kill_on_drop(true); for arg in args.iter().skip(1) { command.arg(arg); @@ -21,21 +25,26 @@ fn build_command_from_args(args: &[String]) -> Command { #[tokio::main] /// This is a simple TCP server that will spawn a process for each connection /// and pipe the connection to the process stdin and stdout. -/// The first argument is the port to listen on. +/// The first argument is the address and port to listen on. /// The second argument (and later arguments) is the command to run. async fn main() -> io::Result<()> { let args = args().collect::>(); let listener = TcpListener::bind(args.get(1).unwrap())?; let listener = tokio::net::TcpListener::from_std(listener)?; + println!("Listening on {}", args.get(1).unwrap()); + + // Haxagon compatibility + println!("SCENARIO_IS_READY"); + loop { - let (mut stream, _) = listener.accept().await.unwrap(); + let (stream, addr) = listener.accept().await.unwrap(); let mut command = build_command_from_args(args.get(2..).unwrap()); - println!("Got connection from {:?}", stream.peer_addr()); + println!("Got connection from {addr:?}"); tokio::spawn(async move { - let (mut stream_reader, mut stream_writer) = stream.split(); + let (mut stream_reader, mut stream_writer) = stream.into_split(); let (child_reader, child_writer) = os_pipe::pipe().unwrap(); let writer_clone = child_writer.try_clone().unwrap(); @@ -45,18 +54,41 @@ async fn main() -> io::Result<()> { let child_reader = AllowStdIo::new(child_reader); let mut child_reader = tokio_util::compat::FuturesAsyncReadCompatExt::compat(child_reader); + eprintln!("Starting command {:?}", command.as_std()); + let mut child = command.spawn().unwrap(); let mut stdin = child.stdin.take().unwrap(); let copy_stdin = io::copy(&mut stream_reader, &mut stdin); - let copy_output = io::copy(&mut child_reader, &mut stream_writer); + let copy_output = tokio::spawn(async move { + io::copy(&mut child_reader, &mut stream_writer).await.unwrap(); + }); drop(command); - tokio::try_join!(copy_stdin, copy_output).unwrap(); + // wait for first to complete + select!{biased; + _ = child.wait() => {}, + _ = copy_output => {}, + _ = copy_stdin => {} + } - child.wait().await.unwrap(); + eprintln!("Connection from {addr:?} closed"); + + if child.try_wait().unwrap().is_none() { + signal::kill(Pid::from_raw(child.id().unwrap() as i32), Signal::SIGTERM).unwrap(); + + select!{ + _ = tokio::time::sleep(Duration::from_millis(100)) => { + child.kill().await.unwrap(); + }, + _ = child.wait() => {} + } + } + + let res = child.wait().await.unwrap(); + eprintln!("Command exited with {:?}", res.code().unwrap_or(-1)); }); } }