Skip to content

Commit

Permalink
fix(plugin-driver): use std:process over tokio for plugin loads on al…
Browse files Browse the repository at this point in the history
…l os

fixes tokio-rs/tokio#3520

Signals do not add SA_ONSTACK which may cause linked Go applications to crash.

fixes #68
fixes pact-foundation/pact-go#269

note: std::process unable to read pact-avro-plugin startup message see #68 for detail
  • Loading branch information
YOU54F committed Jul 17, 2024
1 parent 9ad1fb1 commit a7e0302
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 215 deletions.
56 changes: 30 additions & 26 deletions drivers/rust/driver/src/child_process.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
//! Module for managing running child processes (non-Windows version)
//! Module for managing running child processes
use std::io::{BufRead, BufReader};
use std::process::Child;
use std::sync::mpsc::channel;
use std::time::Duration;

use anyhow::anyhow;
use serde::{Deserialize, Serialize};
use sysinfo::{Pid, Signal, System};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Child;
use sysinfo::{Pid, System};
use tracing::{debug, error, trace, warn};

use crate::plugin_models::PactPluginManifest;
Expand All @@ -32,29 +31,27 @@ impl ChildPluginProcess {
/// Start the child process and try read the startup JSON message from its standard output.
pub async fn new(mut child: Child, manifest: &PactPluginManifest) -> anyhow::Result<Self> {
let (tx, rx) = channel();
let child_pid = child.id()
.ok_or_else(|| anyhow!("Could not get the child process ID"))?;
let child_pid = child.id();
let child_out = child.stdout.take()
.ok_or_else(|| anyhow!("Could not get the child process standard output stream"))?;
let child_err = child.stderr.take()
.ok_or_else(|| anyhow!("Could not get the child process standard error stream"))?;

trace!("Starting output polling tasks...");

let mfso = manifest.clone();
tokio::task::spawn(async move {
trace!("Starting task to poll plugin stdout");
let plugin_name = manifest.name.clone();
std::thread::spawn(move || {
trace!("Starting thread to poll plugin STDOUT");

let mut startup_read = false;
let reader = BufReader::new(child_out);
let mut lines = reader.lines();
let plugin_name = mfso.name.as_str();
while let Ok(line) = lines.next_line().await {
if let Some(line) = line {
let mut reader = BufReader::new(child_out);
let mut line = String::with_capacity(256);
while let Ok(chars_read) = reader.read_line(&mut line) {
if chars_read > 0 {
debug!("Plugin({}, {}, STDOUT) || {}", plugin_name, child_pid, line);
if !startup_read {
let line = line.trim();
if line.starts_with("{") {
startup_read = true;
match serde_json::from_str::<RunningPluginInfo>(line) {
Ok(plugin_info) => {
tx.send(Ok(ChildPluginProcess {
Expand All @@ -67,25 +64,32 @@ impl ChildPluginProcess {
tx.send(Err(anyhow!("Failed to read startup info from plugin - {}", err)))
.unwrap_or_default()
}
}
};
startup_read = true;
}
}
} else {
trace!("0 bytes read from STDOUT, this indicates EOF");
break;
}
}
trace!("Task to poll plugin stderr done");
trace!("Thread to poll plugin STDOUT done");
});

let plugin_name = manifest.name.clone();
tokio::task::spawn(async move {
trace!("Starting task to poll plugin stderr");
let reader = BufReader::new(child_err);
let mut lines = reader.lines();
while let Ok(line) = lines.next_line().await {
if let Some(line) = line {
std::thread::spawn(move || {
trace!("Starting thread to poll plugin STDERR");
let mut reader = BufReader::new(child_err);
let mut line = String::with_capacity(256);
while let Ok(chars_read) = reader.read_line(&mut line) {
if chars_read > 0 {
debug!("Plugin({}, {}, STDERR) || {}", plugin_name, child_pid, line);
} else {
trace!("0 bytes read from STDERR, this indicates EOF");
break;
}
}
trace!("Task to poll plugin stderr done");
trace!("Thread to poll plugin STDERR done");
});

trace!("Starting output polling tasks... DONE");
Expand All @@ -111,7 +115,7 @@ impl ChildPluginProcess {
let mut s = System::new();
s.refresh_processes();
if let Some(process) = s.process(Pid::from_u32(self.child_pid as u32)) {
process.kill_with(Signal::Term);
process.kill();
} else {
warn!("Child process with PID {} was not found", self.child_pid);
}
Expand Down
126 changes: 0 additions & 126 deletions drivers/rust/driver/src/child_process_windows.rs

This file was deleted.

3 changes: 0 additions & 3 deletions drivers/rust/driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
pub mod plugin_models;
pub mod plugin_manager;
#[cfg(not(windows))]
mod child_process;
#[cfg(windows)]
mod child_process_windows;
pub mod proto;
pub mod catalogue_manager;
pub mod content;
Expand Down
60 changes: 3 additions & 57 deletions drivers/rust/driver/src/plugin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::fs::File;
use std::io::{BufReader, Write};
use std::path::PathBuf;
use std::process::Stdio;
#[cfg(windows)] use std::process::Command;
use std::process::Command;
use std::str::from_utf8;
use std::str::FromStr;
use std::sync::Mutex;
Expand All @@ -18,7 +18,7 @@ use itertools::Either;
use lazy_static::lazy_static;
use log::max_level;
use maplit::hashmap;
#[cfg(not(windows))] use os_info::Type;
use os_info::Type;
use pact_models::bodies::OptionalBody;
use pact_models::json_utils::json_to_string;
use pact_models::PactSpecification;
Expand All @@ -29,13 +29,10 @@ use reqwest::Client;
use semver::Version;
use serde_json::Value;
use sysinfo::{Pid,System};
#[cfg(not(windows))] use sysinfo::Signal;
#[cfg(not(windows))] use tokio::process::Command;
use tracing::{debug, info, trace, warn};

use crate::catalogue_manager::{all_entries, CatalogueEntry, register_plugin_entries, remove_plugin_entries};
#[cfg(not(windows))] use crate::child_process::ChildPluginProcess;
#[cfg(windows)] use crate::child_process_windows::ChildPluginProcess;
use crate::child_process::ChildPluginProcess;
use crate::content::ContentMismatch;
use crate::download::{download_json_from_github, download_plugin_executable, fetch_json_from_url};
use crate::metrics::send_metrics;
Expand Down Expand Up @@ -244,7 +241,6 @@ pub async fn init_handshake(manifest: &PactPluginManifest, plugin: &mut (dyn Pac
Ok(())
}

#[cfg(not(windows))]
async fn start_plugin_process(manifest: &PactPluginManifest) -> anyhow::Result<PactPlugin> {
debug!("Starting plugin with manifest {:?}", manifest);

Expand All @@ -257,56 +253,6 @@ async fn start_plugin_process(manifest: &PactPluginManifest) -> anyhow::Result<P
} else {
PathBuf::from(&manifest.entry_point)
};

if !path.is_absolute() || !path.exists() {
path = PathBuf::from(manifest.plugin_dir.clone()).join(path);
}
debug!("Starting plugin using {:?}", &path);

let log_level = max_level();
let mut child_command = Command::new(path.clone());
let mut child_command = child_command
.env("LOG_LEVEL", log_level.to_string())
.env("RUST_LOG", log_level.to_string())
.current_dir(manifest.plugin_dir.clone());

if let Some(args) = &manifest.args {
child_command = child_command.args(args);
}

let child = child_command
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|err| anyhow!("Was not able to start plugin process for '{}' - {}",
path.to_string_lossy(), err))?;
let child_pid = child.id().unwrap_or_default();
debug!("Plugin {} started with PID {}", manifest.name, child_pid);

match ChildPluginProcess::new(child, manifest).await {
Ok(child) => Ok(PactPlugin::new(manifest, child)),
Err(err) => {
let mut s = System::new();
s.refresh_processes();
if let Some(process) = s.process(Pid::from_u32(child_pid)) {
process.kill_with(Signal::Term);
} else {
warn!("Child process with PID {} was not found", child_pid);
}
Err(err)
}
}
}

#[cfg(windows)]
async fn start_plugin_process(manifest: &PactPluginManifest) -> anyhow::Result<PactPlugin> {
debug!("Starting plugin with manifest {:?}", manifest);

let mut path = if let Some(entry) = manifest.entry_points.get("windows") {
PathBuf::from(entry)
} else {
PathBuf::from(&manifest.entry_point)
};
if !path.is_absolute() || !path.exists() {
path = PathBuf::from(manifest.plugin_dir.clone()).join(path);
}
Expand Down
3 changes: 0 additions & 3 deletions drivers/rust/driver/src/plugin_models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ use tonic::service::Interceptor;
use tonic::transport::Channel;
use tracing::{debug, trace};

#[cfg(not(windows))]
use crate::child_process::ChildPluginProcess;
#[cfg(windows)]
use crate::child_process_windows::ChildPluginProcess;
use crate::proto::*;
use crate::proto::pact_plugin_client::PactPluginClient;

Expand Down

0 comments on commit a7e0302

Please sign in to comment.