Skip to content

Commit

Permalink
Remove commented impls
Browse files Browse the repository at this point in the history
  • Loading branch information
vertexclique committed Jan 20, 2024
1 parent e5c884b commit bd95366
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 134 deletions.
3 changes: 2 additions & 1 deletion src/syscore/bsd/kqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use std::mem::MaybeUninit;
use std::os::unix::io::{AsRawFd, RawFd};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{collections::HashMap, os::unix::net::UnixStream, time::Duration};
use ahash::HashMap;
use std::{os::unix::net::UnixStream, time::Duration};

macro_rules! syscall {
($fn:ident $args:tt) => {{
Expand Down
3 changes: 2 additions & 1 deletion src/syscore/linux/epoll/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ use crate::sys::epoll::*;
use futures::channel::oneshot;
use lever::prelude::*;
use pin_utils::unsafe_pinned;
use ahash::HashMap;
use std::future::Future;
use std::io::{self, Read, Write};
use std::mem::MaybeUninit;
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{collections::HashMap, fs::File, time::Duration};
use std::{fs::File, time::Duration};

macro_rules! syscall {
($fn:ident $args:tt) => {{
Expand Down
6 changes: 1 addition & 5 deletions src/syscore/linux/iouring/iouring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ impl SysProactor {
// issue cas barrier
cq.sync();
while let Some(cqe) = cq.next() {
dbg!(&cqe);
self.cqe_completion(&cqe)?;
acc+=1;
}
Expand All @@ -236,10 +235,7 @@ impl SysProactor {
let res: i32 = cqe.result();

let mut sbmts = self.submitters.lock();
sbmts.remove(&udata).map(|s| {
dbg!(&res);
s.send(res)
});
sbmts.remove(&udata).map(|s| s.send(res));

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/syscore/linux/iouring/nethandle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl Handle<TcpListener> {

pub fn incoming(
&self,
) -> impl Stream<Item = io::Result<Handle<TcpStream>>> + Send + Unpin + '_ {
) -> impl Stream<Item = io::Result<Handle<TcpStream>>> + Unpin + '_ {
Box::pin(futures::stream::unfold(
self,
|listener: &Handle<TcpListener>| async move {
Expand Down
142 changes: 16 additions & 126 deletions src/syscore/linux/iouring/processor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::future::Future;
use std::io;
use std::{io, mem};
use std::io::{Read, Write};
use std::net::TcpStream;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6, UdpSocket};
Expand Down Expand Up @@ -188,15 +188,11 @@ impl Processor {
flags: RecvFlags,
) -> io::Result<usize> {
let fd = socket.as_raw_fd() as _;
dbg!(&fd);

let mut sqe = OP::Recv::new(Fd(fd), buf.as_mut_ptr(), buf.len() as _)
.flags(flags)
.build();

dbg!("recv_with_flags time");
dbg!(&flags);

let res = Proactor::get()
.inner()
.register_io(sqe)?
Expand Down Expand Up @@ -316,131 +312,25 @@ impl Processor {
///// TcpListener
///////////////////////////////////

// TODO: (vcq): need to fix the accept
// pub(crate) async fn processor_accept_tcp_listener<R: AsRawFd>(listener: &R) -> io::Result<(Handle<TcpStream>, SocketAddr)> {
// let fd = listener.as_raw_fd() as _;
// // let flags = syscall!(fcntl(fd, libc::F_GETFL))?;
// // syscall!(fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK))?;
//
// // let mut sockaddr = MaybeUninit::<libc::sockaddr_storage>::uninit();
// // let mut sockaddr_len = std::mem::size_of::<libc::sockaddr_storage>() as _;
// //
// //
// // let mut sockaddr = unsafe {
// // let mut saddr = sockaddr.assume_init();
// // saddr.ss_family = libc::AF_INET as libc::sa_family_t;
// // saddr
// // };
//
// // let mut saddrstor = SockAddrStorage::uninit();
//
// let cc = Proactor::get().inner().register_io(|sqe| unsafe {
// // let sqep = sqe.raw_mut();
// // dbg!(&sqe.user_data());
// // dbg!(&sqep.user_data);
// // sqe.prep_accept(fd, Some(&mut saddrstor), SockFlag::SOCK_NONBLOCK);
// sqe.prep_accept(fd, None, iou::SockFlag::empty());
// // uring_sys::io_uring_prep_accept(sqep as *mut _,
// // fd,
// // &mut sockaddr as *mut _ as *mut _,
// // &mut sockaddr_len,
// // 0);
// })?;
//
// // let cc = Proactor::get().inner().register_io(|sqe| unsafe {
// // dbg!("SQE CAME");
// // let sqep = sqe.raw_mut();
// // // sqe.prep_accept(fd, Some(&mut saddrstor), SockFlag::empty());
// // uring_sys::io_uring_prep_accept(sqep,
// // fd,
// // sockaddr.as_mut_ptr() as *mut _,
// // &mut sockaddr_len,
// // 0);
// // })?;
//
// // let mut saddrstor = SockAddrStorage::uninit();
// //
// // let cc = Proactor::get().inner().register_io(|sqe| unsafe {
// // sqe.prep_accept(fd, Some(&mut saddrstor), SockFlag::empty());
// // })?;
// dbg!("TCP LISTENER");
//
// let stream = unsafe { TcpStream::from_raw_fd(cc.await?) };
// dbg!("TCP LISTENER RECEIVED");
// // let addr = unsafe {
// // let nixsa = saddrstor.as_socket_addr()?;
// // let (saddr, saddr_len) = nixsa.as_ffi_pair();
// // socket2::SockAddr::from_raw_parts(saddr as *const _, saddr_len as _)
// // .as_std()
// // .unwrap()
// // };
//
// let socket = unsafe { socket2::Socket::from_raw_fd(listener.as_raw_fd()) };
// let socket = socket.into_tcp_listener();
// let addr = socket.local_addr().unwrap();
// // let res = socket
// // .accept()
// // .map(|(_, sockaddr)| (Handle::new(stream).unwrap(), sockaddr))?;
//
// // let addr = unsafe {
// // socket
// // .local_addr()
// // .unwrap()
// // };
//
// // unsafe {
// // let mut sockaddr = sockaddr.assume_init();
// // sockaddr.ss_family = libc::AF_INET as libc::sa_family_t;
// // }
//
// // let addr = unsafe {
// // socket2::SockAddr::from_raw_parts(&sockaddr as *const _ as *const _, sockaddr_len as _)
// // .as_std()
// // .unwrap()
// // };
//
// Ok((Handle::new(stream)?, addr))
// }

// pub(crate) async fn processor_accept_tcp_listener<R: AsRawFd>(
// listener: &R
// ) -> io::Result<(Handle<TcpStream>, SocketAddr)> {
// let fd = listener.as_raw_fd() as _;
// let sockfd: OwnedFd = unsafe { OwnedFd::from_raw_fd(fd) };
// let sockaddr = rustix::net::getsockname(&sockfd)?;
//
// let mut sqe = OP::Accept::new(Fd(fd), null_mut(), null_mut())
// .build();
//
// let cc = Proactor::get().inner().register_io(sqe)?;
//
// let stream = unsafe { TcpStream::from_raw_fd(cc.await?) };
//
// let natsaddr = unsafe {
// let mut sas = std::mem::zeroed::<SocketAddrStorage>();
// sockaddr.write(&mut sas as *mut _ as *mut _);
// let size = std::mem::size_of::<libc::sockaddr_storage>();
// socket2::SockAddr::from_raw_parts(&sas as *const _ as *const _, size as _)
// .as_std()
// .unwrap()
// };
//
// Ok((Handle::new(stream).unwrap(), natsaddr))
// }

pub(crate) async fn processor_accept_tcp_listener<R: AsRawFd>(
listener: &R,
listener: &R
) -> io::Result<(Handle<TcpStream>, SocketAddr)> {
dbg!(&listener.as_raw_fd());
let socket = unsafe { socket2::Socket::from_raw_fd(listener.as_raw_fd()) };
let socket = socket.into_tcp_listener();
let socket = ManuallyDrop::new(socket);
let fd = listener.as_raw_fd() as _;

dbg!("ACCEPT_TCP");
let (mut storage, mut addrlen) = unsafe {
let mut addrlen = mem::size_of::<sockaddr>() as socklen_t;
let mut storage = MaybeUninit::<sockaddr>::zeroed().assume_init();
(storage, addrlen)
};

let mut sqe = OP::Accept::new(Fd(fd), &mut storage as *mut _ as *mut _, &mut addrlen as *mut _ as *mut _)
.build();

let cc = Proactor::get().inner().register_io(sqe)?;
let stream = unsafe { TcpStream::from_raw_fd(cc.await?) };
let addr = stream.local_addr()?;

socket
.accept()
.map(|(stream, sockaddr)| (Handle::new(stream).unwrap(), sockaddr))
Ok((Handle::new(stream).unwrap(), addr))
}

///////////////////////////////////
Expand Down

0 comments on commit bd95366

Please sign in to comment.