Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/scout worker #7

Merged
merged 7 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,080 changes: 1,058 additions & 22 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace]
members = ["scout-interpreter", "scout-lexer", "scout-parser"]
members = ["scout-interpreter", "scout-lexer", "scout-parser", "scout-worker"]

# Config for 'cargo dist'
[workspace.metadata.dist]
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ The `scout` binary ran with a filename will read and interpret a script file. Wi

Available ENV variables:
- `SCOUT_DEBUG`: Whether or not to open the debug browser. Defaults to `false`.
- `SCOUT_PORT`: Which port to run Scout on. Defaults to `4444`.
- `SCOUT_PORT`: Which port to run Scout on. Defaults to a random open port. Do not set if you intend to run multiple scout instances at once as ports will conflict.
- `SCOUT_PROXY`: An optional URL to proxy requests to. Defaults to none.
- `SCOUT_PATH`: A path to where Scout installs dependencies, like the standard lib. Defaults to `$HOME/scout-lang/`.

Expand Down
1 change: 1 addition & 0 deletions scout-interpreter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ scout-lexer = { version = "0.6.0", path = "../scout-lexer/" }
url = "2.5.2"
reqwest = { version = "0.12", features = ["json", "cookies"] }
envy = "0.4.2"
get-port = "4.0.0"

[dev-dependencies]
test-case = "3.3.1"
19 changes: 14 additions & 5 deletions scout-interpreter/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use get_port::Ops;

use crate::{env::EnvPointer, eval::ScrapeResultsPtr, EnvVars, Interpreter};

#[derive(Debug)]
Expand Down Expand Up @@ -32,20 +34,27 @@ impl InterpreterBuilder {
pub async fn build(self) -> Result<Interpreter, BuilderError> {
let env_vars =
envy::from_env::<EnvVars>().map_err(|e| BuilderError::EnvError(e.to_string()))?;
let port = env_vars
.port()
.unwrap_or_else(|| get_port::tcp::TcpPort::any("127.0.0.1").unwrap() as usize);
let child = crate::GeckDriverProc::new(port);
let crawler = match self.crawler {
Some(c) => Ok(c),
None => new_crawler(&env_vars).await,
None => new_crawler(&env_vars, port).await,
}?;

let interpreter = Interpreter::new(
self.env.unwrap_or(EnvPointer::default()),
self.results.unwrap_or(ScrapeResultsPtr::default()),
self.env.unwrap_or_default(),
self.results.unwrap_or_default(),
crawler,
child,
);

Ok(interpreter)
}
}

async fn new_crawler(env_vars: &EnvVars) -> Result<fantoccini::Client, BuilderError> {
async fn new_crawler(env_vars: &EnvVars, port: usize) -> Result<fantoccini::Client, BuilderError> {
let mut caps = serde_json::map::Map::new();
if !env_vars.scout_debug {
let opts = serde_json::json!({ "args": ["--headless"] });
Expand All @@ -58,7 +67,7 @@ async fn new_crawler(env_vars: &EnvVars) -> Result<fantoccini::Client, BuilderEr
});
caps.insert("proxy".into(), opt);
}
let conn_url = format!("http://localhost:{}", env_vars.scout_port);
let conn_url = format!("http://localhost:{}", port);
let crawler = fantoccini::ClientBuilder::native()
.capabilities(caps)
.connect(&conn_url)
Expand Down
77 changes: 67 additions & 10 deletions scout-interpreter/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::sync::Arc;
use std::{
process::{Child, Command},
sync::Arc,
};

use env::EnvPointer;
use eval::{eval, EvalError, ScrapeResultsPtr};
Expand All @@ -19,8 +22,8 @@ pub struct EnvVars {
#[serde(default)]
scout_debug: bool,

#[serde(default = "default_port")]
scout_port: usize,
#[serde(default)]
scout_port: Option<usize>,

#[serde(default)]
scout_proxy: Option<String>,
Expand All @@ -31,7 +34,7 @@ impl EnvVars {
self.scout_debug
}

pub fn port(&self) -> usize {
pub fn port(&self) -> Option<usize> {
self.scout_port
}

Expand All @@ -40,28 +43,49 @@ impl EnvVars {
}
}

fn default_port() -> usize {
4444
}

#[derive(Debug)]
pub enum InterpreterError {
EvalError(EvalError),
ParserError(ParseError),
}

pub struct GeckDriverProc(Child);

impl GeckDriverProc {
pub fn new(port: usize) -> Self {
let child = Command::new("geckodriver")
.arg("--port")
.arg(port.to_string())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
.expect("error spinning up driver process");

// sleep to allow driver to start
std::thread::sleep(std::time::Duration::from_millis(50));
Self(child)
}
}

pub struct Interpreter {
env: EnvPointer,
results: ScrapeResultsPtr,
crawler: fantoccini::Client,
_geckodriver_proc: GeckDriverProc,
}

impl Interpreter {
pub fn new(env: EnvPointer, results: ScrapeResultsPtr, crawler: fantoccini::Client) -> Self {
pub fn new(
env: EnvPointer,
results: ScrapeResultsPtr,
crawler: fantoccini::Client,
geckodriver_proc: GeckDriverProc,
) -> Self {
Self {
env,
results,
crawler,
_geckodriver_proc: geckodriver_proc,
}
}
pub async fn eval(&self, content: &str) -> Result<Arc<Object>, InterpreterError> {
Expand All @@ -79,7 +103,16 @@ impl Interpreter {
}
}

pub async fn finalize(self) {
pub fn results(&self) -> ScrapeResultsPtr {
self.results.clone()
}

pub fn reset(&mut self) {
self.env = EnvPointer::default();
self.results = ScrapeResultsPtr::default();
}

pub async fn close(self) {
let _ = self.crawler.close().await;
}
}
Expand All @@ -89,3 +122,27 @@ impl From<EvalError> for InterpreterError {
InterpreterError::EvalError(value)
}
}

impl Drop for GeckDriverProc {
fn drop(&mut self) {
#[cfg(target_os = "windows")]
let mut kill = Command::new("taskkill")
.arg("/PID")
.arg(&self.0.id().to_string())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.arg("/F")
.spawn()
.expect("error sending driver kill");

#[cfg(not(target_os = "windows"))]
let mut kill = Command::new("kill")
.args(["-s", "TERM", &self.0.id().to_string()])
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
.expect("error sending driver kill");

kill.wait().expect("error waiting for driver kill");
}
}
16 changes: 16 additions & 0 deletions scout-worker/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "scout-worker"
version = "0.1.0"
edition = "2021"

[dependencies]
scout-interpreter = { version = "0.6.0", path = "../scout-interpreter/" }
actix-web = "4"
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0.203", features = ["derive"] }
toml = "0.8.16"
lapin = { version = "2.5.0", default-features = false, features = ["native-tls"] }
futures-lite = "2.3.0"
serde_json = "1.0"
tracing = "0.1.40"
reqwest = "0.12.5"
1 change: 1 addition & 0 deletions scout-worker/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Scout-worker is a processing framework for ScoutLang. It provides multiple input/output types that are driven via configuration.
71 changes: 71 additions & 0 deletions scout-worker/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use std::fs;

use serde::Deserialize;

use crate::WorkerError;

const DFEAULT_CONFIG_FILE: &str = "scout.toml";

#[derive(Debug, Default, Deserialize)]
pub struct Config {
pub inputs: ConfigInputs,
pub outputs: Option<ConfigOutputs>,
}

#[derive(Debug, Default, Deserialize)]
pub struct ConfigOutputs {
pub rmq: Option<ConfigRMQ>,
pub http: Option<ConfigOutputHttp>,
}

#[derive(Debug, Default, Deserialize)]
pub struct ConfigInputs {
pub http: Option<ConfigInputHttp>,
pub rmq: Option<ConfigRMQ>,
}

#[derive(Debug, Deserialize)]
pub struct ConfigInputHttp {
pub addr: String,
pub port: usize,
}

#[derive(Debug, Deserialize)]
pub enum OutputMethods {
POST,
PUT,
PATCH,
}

#[derive(Debug, Deserialize)]
pub struct ConfigOutputHttp {
pub endpoint: String,
pub method: OutputMethods,
}

#[derive(Debug, Deserialize)]
pub struct ConfigRMQ {
pub addr: String,
pub queue: String,
pub exchange: String,
pub routing_key: String,
}

impl Config {
pub fn load_file(path: Option<&str>) -> Result<Self, WorkerError> {
let path = path.unwrap_or(DFEAULT_CONFIG_FILE);
let content =
fs::read_to_string(path).map_err(|e| WorkerError::ConfigError(e.to_string()))?;
toml::from_str(&content).map_err(|e| WorkerError::ConfigError(e.to_string()))
}
}

impl std::fmt::Display for OutputMethods {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::PATCH => write!(f, "PATCH"),
Self::POST => write!(f, "POST"),
Self::PUT => write!(f, "PUT"),
}
}
}
2 changes: 2 additions & 0 deletions scout-worker/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod sender;
pub mod server;
37 changes: 37 additions & 0 deletions scout-worker/src/http/sender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use reqwest::Method;
use tracing::info;

use crate::config::OutputMethods;

pub struct Sender {
client: reqwest::Client,
method: Method,
endpoint: String,
}

impl Sender {
pub fn new(
method: &OutputMethods,
endpoint: String,
) -> Result<Self, Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
let method = Method::from_bytes(method.to_string().as_bytes())?;
Ok(Self {
client,
method,
endpoint,
})
}

pub async fn send(&self, payload: &str) -> Result<(), reqwest::Error> {
info!("sending output to {} {}", self.method, self.endpoint);
let req = self
.client
.request(self.method.clone(), &self.endpoint)
.body(payload.to_owned())
.build()?;
let res = self.client.execute(req).await?;
info!("received response code {}", res.status());
Ok(())
}
}
Loading
Loading