Working version

This commit is contained in:
Daniel Bulant 2023-05-28 12:22:46 +02:00
parent 03bbbbddc4
commit 3637cfb563
3 changed files with 73 additions and 10 deletions

30
Cargo.lock generated
View file

@ -117,6 +117,15 @@ version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "memoffset"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4"
dependencies = [
"autocfg",
]
[[package]]
name = "mio"
version = "0.8.6"
@ -129,6 +138,20 @@ dependencies = [
"windows-sys 0.45.0",
]
[[package]]
name = "nix"
version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a"
dependencies = [
"bitflags",
"cfg-if",
"libc",
"memoffset",
"pin-utils",
"static_assertions",
]
[[package]]
name = "num_cpus"
version = "1.15.0"
@ -251,6 +274,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "syn"
version = "2.0.18"
@ -267,6 +296,7 @@ name = "tcp-spawner"
version = "0.1.0"
dependencies = [
"futures-util",
"nix",
"os_pipe",
"tokio",
"tokio-stream",

View file

@ -10,4 +10,5 @@ tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
os_pipe = "1.1.4"
tokio-util = { version = "0.7.8", features = ["compat"] }
futures-util = { version = "0.3.28", features = ["io"] }
futures-util = { version = "0.3.28", features = ["io"] }
nix = { version = "0.26.2", features = ["signal"] }

View file

@ -1,15 +1,19 @@
use std::env::args;
use std::net::TcpListener;
use std::time::Duration;
use tokio::process::Command;
use tokio::io;
use tokio::{io, select};
use futures_util::io::AllowStdIo;
use nix::unistd::Pid;
use nix::sys::signal::{self, Signal};
fn build_command_from_args(args: &[String]) -> Command {
let mut command = Command::new(args.get(0).unwrap());
command.stdout(std::process::Stdio::piped());
command.stderr(std::process::Stdio::piped());
command.stdin(std::process::Stdio::piped());
command.kill_on_drop(true);
for arg in args.iter().skip(1) {
command.arg(arg);
@ -21,21 +25,26 @@ fn build_command_from_args(args: &[String]) -> Command {
#[tokio::main]
/// This is a simple TCP server that will spawn a process for each connection
/// and pipe the connection to the process stdin and stdout.
/// The first argument is the port to listen on.
/// The first argument is the address and port to listen on.
/// The second argument (and later arguments) is the command to run.
async fn main() -> io::Result<()> {
let args = args().collect::<Vec<String>>();
let listener = TcpListener::bind(args.get(1).unwrap())?;
let listener = tokio::net::TcpListener::from_std(listener)?;
println!("Listening on {}", args.get(1).unwrap());
// Haxagon compatibility
println!("SCENARIO_IS_READY");
loop {
let (mut stream, _) = listener.accept().await.unwrap();
let (stream, addr) = listener.accept().await.unwrap();
let mut command = build_command_from_args(args.get(2..).unwrap());
println!("Got connection from {:?}", stream.peer_addr());
println!("Got connection from {addr:?}");
tokio::spawn(async move {
let (mut stream_reader, mut stream_writer) = stream.split();
let (mut stream_reader, mut stream_writer) = stream.into_split();
let (child_reader, child_writer) = os_pipe::pipe().unwrap();
let writer_clone = child_writer.try_clone().unwrap();
@ -45,18 +54,41 @@ async fn main() -> io::Result<()> {
let child_reader = AllowStdIo::new(child_reader);
let mut child_reader = tokio_util::compat::FuturesAsyncReadCompatExt::compat(child_reader);
eprintln!("Starting command {:?}", command.as_std());
let mut child = command.spawn().unwrap();
let mut stdin = child.stdin.take().unwrap();
let copy_stdin = io::copy(&mut stream_reader, &mut stdin);
let copy_output = io::copy(&mut child_reader, &mut stream_writer);
let copy_output = tokio::spawn(async move {
io::copy(&mut child_reader, &mut stream_writer).await.unwrap();
});
drop(command);
tokio::try_join!(copy_stdin, copy_output).unwrap();
// wait for first to complete
select!{biased;
_ = child.wait() => {},
_ = copy_output => {},
_ = copy_stdin => {}
}
child.wait().await.unwrap();
eprintln!("Connection from {addr:?} closed");
if child.try_wait().unwrap().is_none() {
signal::kill(Pid::from_raw(child.id().unwrap() as i32), Signal::SIGTERM).unwrap();
select!{
_ = tokio::time::sleep(Duration::from_millis(100)) => {
child.kill().await.unwrap();
},
_ = child.wait() => {}
}
}
let res = child.wait().await.unwrap();
eprintln!("Command exited with {:?}", res.code().unwrap_or(-1));
});
}
}