Skip to content

Commit

Permalink
Add hasher
Browse files Browse the repository at this point in the history
  • Loading branch information
vertexclique committed Jan 19, 2024
1 parent 47c51b2 commit e5c884b
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 45 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ socket2 = { version = "0.3.19", features = ["pair", "unix"] }
pin-utils = "0.1.0"
once_cell = "1.19.0"
os_socketaddr = "0.2.5"
ahash = "0.8.7"
async-global-executor = { version = "2.4", optional = true, features = ["async-io"] }

# Other backends
Expand Down
62 changes: 21 additions & 41 deletions src/syscore/linux/iouring/iouring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use core::mem::MaybeUninit;
use futures::channel::oneshot;
use lever::sync::prelude::*;
use pin_utils::unsafe_pinned;
use std::collections::HashMap;
use ahash::{AHasher, HashMap, HashMapExt};

Check warning on line 5 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused import: `AHasher`

Check warning on line 5 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused import: `AHasher`

Check warning on line 5 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused import: `AHasher`

Check warning on line 5 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused import: `AHasher`

Check warning on line 5 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused import: `AHasher`

Check warning on line 5 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused import: `AHasher`

Check warning on line 5 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused import: `AHasher`

Check warning on line 5 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused import: `AHasher`
use std::future::Future;
use std::hash::BuildHasherDefault;

Check warning on line 7 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused import: `std::hash::BuildHasherDefault`

Check warning on line 7 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused import: `std::hash::BuildHasherDefault`

Check warning on line 7 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused import: `std::hash::BuildHasherDefault`

Check warning on line 7 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused import: `std::hash::BuildHasherDefault`

Check warning on line 7 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused import: `std::hash::BuildHasherDefault`

Check warning on line 7 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused import: `std::hash::BuildHasherDefault`

Check warning on line 7 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused import: `std::hash::BuildHasherDefault`

Check warning on line 7 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused import: `std::hash::BuildHasherDefault`
use std::io;
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};

Check warning on line 9 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused imports: `FromRawFd`, `RawFd`

Check warning on line 9 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused imports: `FromRawFd`, `RawFd`

Check warning on line 9 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused imports: `FromRawFd`, `RawFd`

Check warning on line 9 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused imports: `FromRawFd`, `RawFd`

Check warning on line 9 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused imports: `FromRawFd`, `RawFd`

Check warning on line 9 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused imports: `FromRawFd`, `RawFd`

Check warning on line 9 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused imports: `FromRawFd`, `RawFd`

Check warning on line 9 in src/syscore/linux/iouring/iouring.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused imports: `FromRawFd`, `RawFd`
use std::pin::Pin;
Expand Down Expand Up @@ -138,10 +139,10 @@ const MANUAL_TIMEOUT: u64 = -2 as _;
const QUEUE_LEN: u32 = 1 << 10;

pub struct SysProactor {
sq: Mutex<SubmissionQueue<'static>>,
cq: Mutex<CompletionQueue<'static>>,
sbmt: Mutex<Submitter<'static>>,
submitters: Mutex<HashMap<u64, oneshot::Sender<i32>>>,
sq: TTas<SubmissionQueue<'static>>,
cq: TTas<CompletionQueue<'static>>,
sbmt: TTas<Submitter<'static>>,
submitters: TTas<HashMap<u64, oneshot::Sender<i32>>>,
submitter_id: AtomicU64,
waker: AtomicBool,
}
Expand All @@ -157,21 +158,19 @@ static mut IO_URING: Option<IoUring> = None;
impl SysProactor {
pub(crate) fn new() -> io::Result<SysProactor> {
unsafe {
// nodrop?
let ring = IoUring::builder()
.build(QUEUE_LEN)
.expect("nuclei: uring can't be initialized");

IO_URING = Some(ring);

// IO_URING = Some(IoUring::new(QUEUE_LEN).expect("nuclei: uring can't be initialized"));
let (submitter, sq, cq) = IO_URING.as_mut().unwrap().split();

Ok(SysProactor {
sq: Mutex::new(sq),
cq: Mutex::new(cq),
sbmt: Mutex::new(submitter),
submitters: Mutex::new(HashMap::default()),
sq: TTas::new(sq),
cq: TTas::new(cq),
sbmt: TTas::new(submitter),
submitters: TTas::new(HashMap::with_capacity(QUEUE_LEN as usize)),
submitter_id: AtomicU64::default(),
waker: AtomicBool::default(),
})
Expand All @@ -182,47 +181,34 @@ impl SysProactor {
&self,
mut sqe: SQEntry,
) -> io::Result<CompletionChan> {
dbg!("REGISTER IO");
let id = self.submitter_id.fetch_add(1, Ordering::Relaxed);
let (tx, rx) = oneshot::channel();
dbg!("self.submitter_id.fetch_add");

sqe = sqe.user_data(id);
dbg!("sqe = sqe.user_data(id);");

let mut subguard = self.submitters.lock().unwrap();
let mut subguard = self.submitters.lock();
subguard.insert(id, tx);
drop(subguard);
dbg!("subguard.insert");

let mut sq = self.sq.lock().unwrap();
let mut sq = self.sq.lock();
unsafe {
sq.push(&sqe).expect("nuclei: submission queue is full");
}
sq.sync();
drop(sq);

dbg!("pushed - submit was here");

// drop(sbmt);
let sbmt = self.sbmt.lock().unwrap();
dbg!("submitting.......................................");
let sbmt = self.sbmt.lock();
sbmt.submit()?;
dbg!("submitted.......................................");
drop(sbmt);

sq.sync();
drop(sq);

dbg!("leaving");

Ok(CompletionChan { rx })
// sub_comp.ok_or(io::Error::from(io::ErrorKind::WouldBlock))
}

pub(crate) fn wake(&self) -> io::Result<()> {
let (mut sq, mut cq) = (self.sq.lock().unwrap(), self.cq.lock().unwrap());
sq.sync();
cq.sync();
if let (Some(mut sq), Some(mut cq)) = (self.sq.try_lock(), self.cq.try_lock()) {
sq.sync();
cq.sync();
}
Ok(())
}

Expand All @@ -231,9 +217,7 @@ impl SysProactor {
max_event_size: usize,
duration: Option<Duration>,
) -> io::Result<usize> {
// dbg!("wait");

let mut cq = self.cq.lock().unwrap();
let mut cq = self.cq.lock();
let mut acc: usize = 0;

// issue cas barrier
Expand All @@ -243,23 +227,19 @@ impl SysProactor {
self.cqe_completion(&cqe)?;
acc+=1;
}
cq.sync();

Ok(acc)
}

fn cqe_completion(&self, cqe: &CQEntry) -> io::Result<()> {
let udata = cqe.user_data();
dbg!(&udata);
let res: i32 = cqe.result();

dbg!("self.submitters.lock().remov");
let mut sbmts = self.submitters.lock().unwrap();
let mut sbmts = self.submitters.lock();
sbmts.remove(&udata).map(|s| {
dbg!(&res);
s.send(res).unwrap()
s.send(res)
});
dbg!("self.submitters.lock().removed");

Ok(())
}
Expand Down
8 changes: 4 additions & 4 deletions src/syscore/linux/iouring/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,10 @@ impl Processor {
offset: usize,
) -> io::Result<usize> {
dbg!(&offset);
let mut sqe = OP::Read::new(Fd(*io), buf.as_mut_ptr(), offset as _)
let mut sqe = OP::Read::new(Fd(*io), buf.as_mut_ptr(), buf.len() as _)
.offset(offset as _)
.build();

dbg!("READFILE");

let cc = Proactor::get().inner().register_io(sqe)?;

Ok(cc.await? as _)
Expand All @@ -90,7 +89,8 @@ impl Processor {
buf: &[u8],
offset: usize,
) -> io::Result<usize> {
let mut sqe = OP::Write::new(Fd(*io), buf as *const _ as *const _, offset as _)
let mut sqe = OP::Write::new(Fd(*io), buf.as_ptr(), buf.len() as _)
.offset(offset as _)
.build();

let cc = Proactor::get().inner().register_io(sqe)?;
Expand Down

0 comments on commit e5c884b

Please sign in to comment.