Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cablehead committed May 15, 2024
1 parent b4d7772 commit 7b9755a
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 13 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ fjall = "0.8"
http-body-util = "0.1"
hyper = { version = "1", features = ["full"] }
hyper-util = { version = "0.1", features = ["full"] }
path-tree = "0.7.7"
scru128 = { version = "3", features = ["serde"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
Expand Down
48 changes: 36 additions & 12 deletions src/http.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::error::Error;

use tokio::io::AsyncWriteExt;
Expand All @@ -16,24 +17,47 @@ use hyper::service::service_fn;
use hyper::{Method, Request, Response, StatusCode};
use hyper_util::rt::TokioIo;

use path_tree::PathTree;

use crate::store::Store;

type BoxError = Box<dyn std::error::Error + Send + Sync>;
type HTTPResult = Result<Response<BoxBody<Bytes, BoxError>>, BoxError>;

enum Routes {
Root,
CasGet,
}

async fn get(store: Store, req: Request<hyper::body::Incoming>) -> HTTPResult {
let uri = req.uri();

let rx = store.subscribe().await;
let stream = ReceiverStream::new(rx);
let stream = stream.map(|frame| {
eprintln!("streaming");
let mut encoded = serde_json::to_vec(&frame).unwrap();
encoded.push(b'\n');
Ok(hyper::body::Frame::data(bytes::Bytes::from(encoded)))
});
let body = StreamBody::new(stream).boxed();
Ok(Response::new(body))
let mut tree = PathTree::new();
let _ = tree.insert("/", Routes::Root);
let _ = tree.insert("/cas/:hash+", Routes::CasGet);

eprintln!("path: {:?}", req.uri().path());

match tree.find(req.uri().path()) {
Some((h, p)) => match h {
Routes::Root => {
let rx = store.subscribe().await;
let stream = ReceiverStream::new(rx);
let stream = stream.map(|frame| {
eprintln!("streaming");
let mut encoded = serde_json::to_vec(&frame).unwrap();
encoded.push(b'\n');
Ok(hyper::body::Frame::data(bytes::Bytes::from(encoded)))
});
let body = StreamBody::new(stream).boxed();
Ok(Response::new(body))
}

Routes::CasGet => Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(full("let's go"))?),
},
None => response_404(),
}
}

async fn post(mut store: Store, req: Request<hyper::body::Incoming>) -> HTTPResult {
Expand Down
14 changes: 13 additions & 1 deletion tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,19 @@ async fn test_integration() {
.expect("Failed to run date | curl command");

let frame: Frame = serde_json::from_str(&output).expect("Failed to parse JSON into Frame");
println!("{:?}", frame.hash.unwrap().to_string());
let output = cmd!(
"sh",
"-c",
format!(
"curl -v --unix-socket {}/sock 'localhost/cas/{}'",
temp_dir.path().display(),
frame.hash.unwrap().to_string(),
)
)
.read()
.expect("Failed to run date | curl command");

eprintln!("output: {:?}", &output);

// Clean up
let _ = cli_process.kill();
Expand Down

0 comments on commit 7b9755a

Please sign in to comment.