From 7c09bd4e8bd3e4c6ff0547eaf5e8435cc73012c6 Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Fri, 13 Oct 2023 22:35:28 +1300 Subject: [PATCH 1/8] Add ndagtcp: format for receiving unicast reliable ndag --- lib/format_ndag.c | 452 +++++++++++++++++++++++++++++++++++----------- lib/libtrace.h.in | 1 + 2 files changed, 343 insertions(+), 110 deletions(-) diff --git a/lib/format_ndag.c b/lib/format_ndag.c index 36c76ece..e76532f8 100644 --- a/lib/format_ndag.c +++ b/lib/format_ndag.c @@ -61,6 +61,11 @@ static struct libtrace_format_t ndag; volatile int ndag_paused = 0; +enum { + NDAG_SOCKET_TYPE_MULTICAST, + NDAG_SOCKET_TYPE_TCP, +}; + typedef struct monitor { uint16_t monitorid; uint64_t laststart; @@ -96,9 +101,8 @@ typedef struct streamsock { #if HAVE_DECL_RECVMMSG struct mmsghdr mmsgbufs[RECV_BATCH_SIZE]; -#else - struct msghdr singlemsg; #endif + struct msghdr singlemsg; } streamsock_t; @@ -124,6 +128,7 @@ typedef struct ndag_format_data { uint16_t nextthreadid; recvstream_t *receivers; int receiver_cnt; + uint8_t socktype; pthread_t controlthread; libtrace_message_queue_t controlqueue; @@ -224,7 +229,10 @@ static int join_multicast_group(char *groupaddr, char *localiface, return -1; } - *srcinfo = gotten; + if (srcinfo) { + *srcinfo = gotten; + } + sock = socket(gotten->ai_family, gotten->ai_socktype, 0); if (sock < 0) { fprintf(stderr, @@ -442,54 +450,132 @@ static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf, } -static void *ndag_controller_run(void *tdata) { - - libtrace_t *libtrace = (libtrace_t *)tdata; - uint16_t ptmap[65536]; - int sock = -1; - struct addrinfo *receiveaddr = NULL; - fd_set listening; +static inline int select_on_sock(int sock) { + int r; + fd_set read_fds; struct timeval timeout; - /* ptmap is a dirty hack to allow us to quickly check if we've already - * assigned a stream to a thread. - */ - memset(ptmap, 0xff, 65536 * sizeof(uint16_t)); + FD_ZERO(&read_fds); + FD_SET(sock, &read_fds); - sock = join_multicast_group(FORMAT_DATA->multicastgroup, - FORMAT_DATA->localiface, FORMAT_DATA->portstr, 0, - &receiveaddr); - if (sock == -1) { - trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, - "Unable to join multicast group for nDAG control channel"); - trace_interrupt(); - pthread_exit(NULL); + timeout.tv_sec = 0; + timeout.tv_usec = 500000; + + r = select(sock + 1, &read_fds, NULL, NULL, &timeout); + if (r == -1) { + return -1; + } + if (!FD_ISSET(sock, &read_fds)) { + return 0; } + return 1; +} + +static int accept_ndagtcp_connection(libtrace_t *libtrace, + char *ipstr, char *portstr) { + + struct addrinfo hints, *listenai; + int sock, consock=-1, reuse=1, r; + struct sockaddr_storage sa; + socklen_t addrsize; + + hints.ai_family = PF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + hints.ai_protocol = 0; + + sock = -1; + listenai = NULL; + if (getaddrinfo(ipstr, portstr, &hints, &listenai) != 0) { + fprintf(stderr, + "Call to getaddrinfo failed for %s:%s -- %s\n", + ipstr, portstr, strerror(errno)); + goto failed; + } + + sock = socket(listenai->ai_family, listenai->ai_socktype, 0); + if (sock < 0) { + fprintf(stderr, "Failed to create socket for %s:%s -- %s\n", + ipstr, portstr, strerror(errno)); + goto failed; + } + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) + < 0) { + + fprintf(stderr, "Failed to configure socket for %s:%s -- %s\n", + ipstr, portstr, strerror(errno)); + + goto failed; + } + + if (bind(sock, (struct sockaddr *)listenai->ai_addr, + listenai->ai_addrlen) < 0) { + + fprintf(stderr, "Failed to bind socket for %s:%s -- %s\n", + ipstr, portstr, strerror(errno)); + goto failed; + } + + if (listen(sock, 10) < 0) { + fprintf(stderr, "Failed to listen on socket for %s:%s -- %s\n", + ipstr, portstr, strerror(errno)); + goto failed; + } + + freeaddrinfo(listenai); + listenai = NULL; + + fcntl(sock, F_SETFL, O_NONBLOCK); + + while (is_halted(libtrace) == -1) { + r = select_on_sock(sock); + if (r < 0) { + fprintf(stderr, "Error in select while accepting connection on socket for %s:%s -- %s\n", + ipstr, portstr, strerror(errno)); + } else if (r == 0) { + continue; + } + + consock = accept(sock, (struct sockaddr *)&sa, &addrsize); + if (consock < 0) { + fprintf(stderr, + "Failed to accept connection on socket for %s:%s -- %s\n", + ipstr, portstr, strerror(errno)); + goto failed; + } + break; + } + +failed: + if (sock >= 0) { + close(sock); + } + if (listenai) { + freeaddrinfo(listenai); + } + return consock; +} + +static void _ndag_controller_run(libtrace_t *libtrace, int sock) { + uint16_t ptmap[65536]; + + memset(ptmap, 0xff, 65536 * sizeof(uint16_t)); + ndag_paused = 0; while ((is_halted(libtrace) == -1) && !ndag_paused) { int ret; char buf[CTRL_BUF_SIZE]; - FD_ZERO(&listening); - FD_SET(sock, &listening); - - timeout.tv_sec = 0; - timeout.tv_usec = 500000; - - ret = select(sock + 1, &listening, NULL, NULL, &timeout); + ret = select_on_sock(sock); if (ret < 0) { fprintf(stderr, "Error while waiting for nDAG control messages: %s\n", strerror(errno)); break; - } - - if (!FD_ISSET(sock, &listening)) { + } else if (ret == 0) { continue; } - ret = recvfrom(sock, buf, CTRL_BUF_SIZE, 0, - receiveaddr->ai_addr, - &(receiveaddr->ai_addrlen)); + ret = recvfrom(sock, buf, CTRL_BUF_SIZE, 0, NULL, NULL); if (ret < 0) { fprintf(stderr, "Error while receiving nDAG control message: %s\n", strerror(errno)); break; @@ -509,19 +595,56 @@ static void *ndag_controller_run(void *tdata) { close(sock); } - freeaddrinfo(receiveaddr); - /* Control channel has fallen over, should probably encourage libtrace * to halt the receiver threads as well. */ if (!is_halted(libtrace)) { trace_interrupt(); } +} + +static void *ndagtcp_controller_run(void *tdata) { + libtrace_t *libtrace = (libtrace_t *)tdata; + int sock = -1; + + /* ptmap is a dirty hack to allow us to quickly check if we've already + * assigned a stream to a thread. + */ + sock = accept_ndagtcp_connection(libtrace, FORMAT_DATA->multicastgroup, + FORMAT_DATA->portstr); + if (sock == -1) { + trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, + "Unable to setup control channel for nDAG TCP"); + trace_interrupt(); + } else { + _ndag_controller_run(libtrace, sock); + } + + + pthread_exit(NULL); +} + +static void *ndag_controller_run(void *tdata) { + + libtrace_t *libtrace = (libtrace_t *)tdata; + int sock = -1; + + sock = join_multicast_group(FORMAT_DATA->multicastgroup, + FORMAT_DATA->localiface, FORMAT_DATA->portstr, 0, + NULL); + if (sock == -1) { + trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, + "Unable to setup control channel for nDAG TCP"); + trace_interrupt(); + } else { + _ndag_controller_run(libtrace, sock); + } pthread_exit(NULL); } -static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads) +static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads, + uint8_t socktype) { int ret; uint32_t i; @@ -553,8 +676,16 @@ static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads) /* Start the controller thread */ /* TODO consider affinity of this thread? */ - ret = pthread_create(&(FORMAT_DATA->controlthread), NULL, - ndag_controller_run, libtrace); + if (socktype == NDAG_SOCKET_TYPE_MULTICAST) { + ret = pthread_create(&(FORMAT_DATA->controlthread), NULL, + ndag_controller_run, libtrace); + } else if (socktype == NDAG_SOCKET_TYPE_TCP) { + ret = pthread_create(&(FORMAT_DATA->controlthread), NULL, + ndagtcp_controller_run, libtrace); + } else { + ret = -1; + } + if (ret != 0) { return -1; } @@ -562,13 +693,32 @@ static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads) } static int ndag_start_input(libtrace_t *libtrace) { - return ndag_start_threads(libtrace, 1); + FORMAT_DATA->socktype = NDAG_SOCKET_TYPE_MULTICAST; + return ndag_start_threads(libtrace, 1, NDAG_SOCKET_TYPE_MULTICAST); +} + +static int ndagtcp_start_input(libtrace_t *libtrace) { + FORMAT_DATA->socktype = NDAG_SOCKET_TYPE_TCP; + return ndag_start_threads(libtrace, 1, NDAG_SOCKET_TYPE_TCP); } static int ndag_pstart_input(libtrace_t *libtrace) { - if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count) == - libtrace->perpkt_thread_count) + FORMAT_DATA->socktype = NDAG_SOCKET_TYPE_MULTICAST; + if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count, + NDAG_SOCKET_TYPE_MULTICAST) == + libtrace->perpkt_thread_count) { + return 0; + } + return -1; +} + +static int ndagtcp_pstart_input(libtrace_t *libtrace) { + FORMAT_DATA->socktype = NDAG_SOCKET_TYPE_TCP; + if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count, + NDAG_SOCKET_TYPE_TCP) == + libtrace->perpkt_thread_count) { return 0; + } return -1; } @@ -595,9 +745,10 @@ static void halt_ndag_receiver(recvstream_t *receiver) { free(src.mmsgbufs[j].msg_hdr.msg_iov); } } -#else - free(src.singlemsg.msg_iov); #endif + if (src.singlemsg.msg_iov) { + free(src.singlemsg.msg_iov); + } if (src.sock != -1) { close(src.sock); @@ -942,9 +1093,8 @@ static int add_new_streamsock(recvstream_t *rt, streamsource_t src) { ssock->mmsgbufs[i].msg_hdr.msg_flags = 0; ssock->mmsgbufs[i].msg_len = 0; } -#else - ssock->singlemsg.msg_iov = (struct iovec *) calloc(1, sizeof(struct iovec)); #endif + ssock->singlemsg.msg_iov = (struct iovec *) calloc(1, sizeof(struct iovec)); ssock->nextread = NULL;; ssock->nextreadind = 0; @@ -1013,6 +1163,10 @@ static int init_receivers(streamsock_t *ssock, int required) { int wind = ssock->nextwriteind; int i = 1; + if (required <= 0) { + fprintf(stderr, "You are required to have atleast 1 receiver in init_receivers\n"); + return TRACE_ERR_INIT_FAILED; + } #if HAVE_DECL_RECVMMSG for (i = 0; i < required; i++) { if (i >= RECV_BATCH_SIZE) { @@ -1030,15 +1184,10 @@ static int init_receivers(streamsock_t *ssock, int required) { wind ++; } -#else - if (required <= 0) { - fprintf(stderr, "You are required to have atleast 1 receiver in init_receivers\n"); - return TRACE_ERR_INIT_FAILED; - } +#endif ssock->singlemsg.msg_iov->iov_base = ssock->saved[wind]; ssock->singlemsg.msg_iov->iov_len = ENCAP_BUFSIZE; ssock->singlemsg.msg_iovlen = 1; -#endif return i; } @@ -1118,65 +1267,59 @@ static int check_ndag_received(streamsock_t *ssock, int index, } -static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv, - int *gottime, recvstream_t *rt) { +static int is_buffered_data_available(streamsock_t *ssock, struct timeval *tv, + int *gottime) { - int ret, ndagstat, avail; int toret = 0; + /* Nothing to receive right now, but we should still + * count as 'ready' if at least one buffer is full */ + if (errno == EAGAIN || errno == EWOULDBLOCK) { + if (readable_data(ssock)) { + toret = 1; + } + if (!(*gottime)) { + gettimeofday(tv, NULL); + *gottime = 1; + } + if (ssock->startidle == 0) { + ssock->startidle = tv->tv_sec; + } else if (tv->tv_sec - ssock->startidle > NDAG_IDLE_TIMEOUT) { + fprintf(stderr, + "Closing channel %s:%u due to inactivity.\n", + ssock->groupaddr, + ssock->port); + + close(ssock->sock); + ssock->sock = -1; + } + return toret; + } + fprintf(stderr, + "Error receiving encapsulated records from %s:%u -- %s \n", + ssock->groupaddr, ssock->port, + strerror(errno)); + close(ssock->sock); + ssock->sock = -1; + return 0; +} #if HAVE_DECL_RECVMMSG +static int receive_from_single_socket_recvmmsg(streamsock_t *ssock, + struct timeval *tv, int *gottime, recvstream_t *rt) { + + int ret, ndagstat, avail; + int toret = 0; int i; -#endif avail = init_receivers(ssock, ssock->bufavail); -#if HAVE_DECL_RECVMMSG ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail, MSG_DONTWAIT, NULL); -#else - if (avail != 1) { - return 0; - } - - ret = recvmsg(ssock->sock, &(ssock->singlemsg), MSG_DONTWAIT); -#endif if (ret < 0) { - /* Nothing to receive right now, but we should still - * count as 'ready' if at least one buffer is full */ - if (errno == EAGAIN || errno == EWOULDBLOCK) { - if (readable_data(ssock)) { - toret = 1; - } - if (!(*gottime)) { - gettimeofday(tv, NULL); - *gottime = 1; - } - if (ssock->startidle == 0) { - ssock->startidle = tv->tv_sec; - } else if (tv->tv_sec - ssock->startidle > NDAG_IDLE_TIMEOUT) { - fprintf(stderr, - "Closing channel %s:%u due to inactivity.\n", - ssock->groupaddr, - ssock->port); - - close(ssock->sock); - ssock->sock = -1; - } - } else { - - fprintf(stderr, - "Error receiving encapsulated records from %s:%u -- %s \n", - ssock->groupaddr, ssock->port, - strerror(errno)); - close(ssock->sock); - ssock->sock = -1; - } - return toret; + return is_buffered_data_available(ssock, tv, gottime); } ssock->startidle = 0; - -#if HAVE_DECL_RECVMMSG for (i = 0; i < ret; i++) { ndagstat = check_ndag_received(ssock, ssock->nextwriteind, ssock->mmsgbufs[i].msg_len, rt); @@ -1188,19 +1331,42 @@ static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv, toret = 1; } } -#else + return toret; +} +#endif + +static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv, + int *gottime, recvstream_t *rt) { + + int ret, ndagstat, avail; + int toret = 0; + + avail = init_receivers(ssock, ssock->bufavail); + + if (avail != 1) { + return 0; + } + + ret = recvmsg(ssock->sock, &(ssock->singlemsg), MSG_DONTWAIT); + if (ret < 0) { + if (is_buffered_data_available(ssock, tv, gottime)) { + return 1; + } + } + + ssock->startidle = 0; + ndagstat = check_ndag_received(ssock, ssock->nextwriteind, ret, rt); if (ndagstat <= 0) { toret = 0; } else { toret = 1; } -#endif return toret; } -static int receive_from_sockets(recvstream_t *rt) { +static int receive_from_sockets(recvstream_t *rt, uint8_t socktype) { int i, readybufs, gottime; struct timeval tv; @@ -1222,16 +1388,18 @@ static int receive_from_sockets(recvstream_t *rt) { #if HAVE_DECL_RECVMMSG /* Plenty of full buffers, just use the packets in those */ - if (rt->sources[i].bufavail < RECV_BATCH_SIZE / 2) { - readybufs ++; - continue; + if (socktype == NDAG_SOCKET_TYPE_MULTICAST) { + if (rt->sources[i].bufavail < RECV_BATCH_SIZE / 2) { + readybufs ++; + continue; + } } -#else +#endif if (rt->sources[i].bufavail == 0) { readybufs ++; continue; } -#endif + if (maxfd == 0) { FD_ZERO(&fds); } @@ -1260,8 +1428,15 @@ static int receive_from_sockets(recvstream_t *rt) { } continue; } - readybufs += receive_from_single_socket(&(rt->sources[i]), - &tv, &gottime, rt); +#if HAVE_DECL_RECVMMSG + if (socktype == NDAG_SOCKET_TYPE_MULTICAST) { + readybufs += receive_from_single_socket_recvmmsg( + &(rt->sources[i]), &tv, &gottime, rt); + continue; + } +#endif + readybufs += receive_from_single_socket( + &(rt->sources[i]), &tv, &gottime, rt); } return readybufs; @@ -1300,7 +1475,8 @@ static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt, continue; } - if ((iserr = receive_from_sockets(rt)) < 0) { + if ((iserr = receive_from_sockets(rt, + FORMAT_DATA->socktype)) < 0) { return iserr; } else if (iserr > 0) { /* At least one of our input sockets has available @@ -1347,7 +1523,7 @@ static int receive_encap_records_nonblock(libtrace_t *libtrace, recvstream_t *rt return 0; } - return receive_from_sockets(rt); + return receive_from_sockets(rt, FORMAT_DATA->socktype); } static streamsock_t *select_next_packet(recvstream_t *rt) { @@ -1774,6 +1950,62 @@ static struct libtrace_format_t ndag = { ndag_get_thread_stats /* per-thread stats */ }; +static struct libtrace_format_t ndagtcp = { + + "ndagtcp", + "", + TRACE_FORMAT_NDAG_TCP, + NULL, /* probe filename */ + NULL, /* probe magic */ + ndag_init_input, /* init_input */ + ndag_config_input, /* config_input */ + ndagtcp_start_input, /* start_input */ + ndag_pause_input, /* pause_input */ + NULL, /* init_output */ + NULL, /* config_output */ + NULL, /* start_output */ + ndag_fin_input, /* fin_input */ + NULL, /* fin_output */ + ndag_read_packet, /* read_packet */ + ndag_prepare_packet, /* prepare_packet */ + NULL, /* fin_packet */ + NULL, /* can_hold_packet */ + NULL, /* write_packet */ + NULL, /* flush_output */ + ndag_get_link_type, /* get_link_type */ + ndag_get_direction, /* get_direction */ + ndag_set_direction, /* set_direction */ + ndag_get_erf_timestamp, /* get_erf_timestamp */ + NULL, /* get_timeval */ + NULL, /* get_seconds */ + NULL, /* get_timespec */ + NULL, /* get_meta_section */ + NULL, /* seek_erf */ + NULL, /* seek_timeval */ + NULL, /* seek_seconds */ + ndag_get_capture_length,/* get_capture_length */ + ndag_get_wire_length, /* get_wire_length */ + ndag_get_framing_length,/* get_framing_length */ + ndag_set_capture_length,/* set_capture_length */ + NULL, /* get_received_packets */ + NULL, /* get_filtered_packets */ + NULL, /* get_dropped_packets */ + ndag_get_statistics, /* get_statistics */ + NULL, /* get_fd */ + trace_event_ndag, /* trace_event */ + NULL, /* help */ + NULL, /* next pointer */ + {true, 0}, /* live packet capture */ + ndagtcp_pstart_input, /* parallel start */ + ndag_pread_packets, /* parallel read */ + ndag_pause_input, /* parallel pause */ + NULL, + ndag_pregister_thread, /* register thread */ + NULL, + ndag_get_thread_stats /* per-thread stats */ +}; + void ndag_constructor(void) { register_format(&ndag); + register_format(&ndagtcp); } diff --git a/lib/libtrace.h.in b/lib/libtrace.h.in index 4873bba3..ae57dd76 100644 --- a/lib/libtrace.h.in +++ b/lib/libtrace.h.in @@ -464,6 +464,7 @@ enum base_format_t { TRACE_FORMAT_PFRINGOLD =26, TRACE_FORMAT_PFRING =27, TRACE_FORMAT_ETSIFILE =28, /**< ETSI LI in a binary file */ + TRACE_FORMAT_NDAG_TCP =29, /**< DAG unicast over a network */ }; /** RT protocol packet types */ From 8b607ec62abc1a73df2a1c4f38eb70c89c235165 Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Fri, 13 Oct 2023 22:35:59 +1300 Subject: [PATCH 2/8] Fix memory error when expanding object cache --- lib/data-struct/object_cache.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/data-struct/object_cache.c b/lib/data-struct/object_cache.c index 4babd88f..e864cb67 100644 --- a/lib/data-struct/object_cache.c +++ b/lib/data-struct/object_cache.c @@ -146,7 +146,7 @@ static void resize_memory_caches(struct local_caches *lcs) { fprintf(stderr, "Expected lcs->t_mem_caches_total to be greater or equal to 0 in resize_memory_caches()\n"); return; } - lcs->t_mem_caches += 0x10; + lcs->t_mem_caches_total += 0x10; lcs->t_mem_caches = realloc(lcs->t_mem_caches, lcs->t_mem_caches_total * sizeof(struct local_cache)); } From 80b92ec4c1119849260c2bf9c544edb5e95c6ad8 Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Mon, 23 Oct 2023 16:46:11 +1300 Subject: [PATCH 3/8] Add first pass at traceucast tool Still a few bugs to iron out, but getting close to a working solution. --- lib/format_ndag.c | 226 ++++++---- tools/tracemcast/Makefile.am | 6 +- tools/tracemcast/traceucast.1 | 70 ++++ tools/tracemcast/traceucast.c | 767 ++++++++++++++++++++++++++++++++++ 4 files changed, 993 insertions(+), 76 deletions(-) create mode 100644 tools/tracemcast/traceucast.1 create mode 100644 tools/tracemcast/traceucast.c diff --git a/lib/format_ndag.c b/lib/format_ndag.c index e76532f8..6b2950bb 100644 --- a/lib/format_ndag.c +++ b/lib/format_ndag.c @@ -35,6 +35,7 @@ #include "format_helper.h" #include "format_erf.h" +#include #include #include #include @@ -80,6 +81,7 @@ typedef struct streamsource { } streamsource_t; typedef struct streamsock { + uint8_t socktype; char *groupaddr; int sock; struct addrinfo *srcaddr; @@ -373,6 +375,7 @@ static void new_group_alert(libtrace_t *libtrace, uint16_t threadid, ndag_internal_message_t alert; + memset(&alert, 0, sizeof(alert)); alert.type = NDAG_CLIENT_NEWGROUP; alert.contents.groupaddr = FORMAT_DATA->multicastgroup; alert.contents.localiface = FORMAT_DATA->localiface; @@ -533,7 +536,10 @@ static int accept_ndagtcp_connection(libtrace_t *libtrace, if (r < 0) { fprintf(stderr, "Error in select while accepting connection on socket for %s:%s -- %s\n", ipstr, portstr, strerror(errno)); + consock = -1; + break; } else if (r == 0) { + consock = 0; continue; } @@ -559,12 +565,15 @@ static int accept_ndagtcp_connection(libtrace_t *libtrace, static void _ndag_controller_run(libtrace_t *libtrace, int sock) { uint16_t ptmap[65536]; + int ret; + /* ptmap is a dirty hack to allow us to quickly check if we've already + * assigned a stream to a thread. + */ memset(ptmap, 0xff, 65536 * sizeof(uint16_t)); ndag_paused = 0; while ((is_halted(libtrace) == -1) && !ndag_paused) { - int ret; char buf[CTRL_BUF_SIZE]; ret = select_on_sock(sock); @@ -598,7 +607,7 @@ static void _ndag_controller_run(libtrace_t *libtrace, int sock) { /* Control channel has fallen over, should probably encourage libtrace * to halt the receiver threads as well. */ - if (!is_halted(libtrace)) { + if (ret < 0 && is_halted(libtrace) == -1) { trace_interrupt(); } } @@ -607,20 +616,21 @@ static void *ndagtcp_controller_run(void *tdata) { libtrace_t *libtrace = (libtrace_t *)tdata; int sock = -1; - /* ptmap is a dirty hack to allow us to quickly check if we've already - * assigned a stream to a thread. - */ - sock = accept_ndagtcp_connection(libtrace, FORMAT_DATA->multicastgroup, + while (is_halted(libtrace) == -1) { + sock = accept_ndagtcp_connection(libtrace, + FORMAT_DATA->multicastgroup, FORMAT_DATA->portstr); - if (sock == -1) { - trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, + if (sock == -1) { + trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Unable to setup control channel for nDAG TCP"); - trace_interrupt(); - } else { - _ndag_controller_run(libtrace, sock); + trace_interrupt(); + } else if (sock == 0) { + continue; + } else { + _ndag_controller_run(libtrace, sock); + } } - pthread_exit(NULL); } @@ -722,37 +732,43 @@ static int ndagtcp_pstart_input(libtrace_t *libtrace) { return -1; } -static void halt_ndag_receiver(recvstream_t *receiver) { - int j, i; - libtrace_message_queue_destroy(&(receiver->mqueue)); - - if (receiver->sources == NULL) - return; - for (i = 0; i < receiver->sourcecount; i++) { - streamsock_t src = receiver->sources[i]; - if (src.saved) { - for (j = 0; j < ENCAP_BUFFERS; j++) { - if (src.saved[j]) { - free(src.saved[j]); - } +static void free_streamsock_data(streamsock_t *src) { + int j; + if (src->saved) { + for (j = 0; j < ENCAP_BUFFERS; j++) { + if (src->saved[j]) { + free(src->saved[j]); } - free(src.saved); } + free(src->saved); + } #if HAVE_DECL_RECVMMSG - for (j = 0; j < RECV_BATCH_SIZE; j++) { - if (src.mmsgbufs[j].msg_hdr.msg_iov) { - free(src.mmsgbufs[j].msg_hdr.msg_iov); - } + for (j = 0; j < RECV_BATCH_SIZE; j++) { + if (src->mmsgbufs[j].msg_hdr.msg_iov) { + free(src->mmsgbufs[j].msg_hdr.msg_iov); } + } #endif - if (src.singlemsg.msg_iov) { - free(src.singlemsg.msg_iov); - } + if (src->singlemsg.msg_iov) { + free(src->singlemsg.msg_iov); + } - if (src.sock != -1) { - close(src.sock); - } + if (src->sock != -1) { + close(src->sock); + } +} + + +static void halt_ndag_receiver(recvstream_t *receiver) { + int i; + libtrace_message_queue_destroy(&(receiver->mqueue)); + + if (receiver->sources == NULL) + return; + for (i = 0; i < receiver->sourcecount; i++) { + streamsock_t *src = &(receiver->sources[i]); + free_streamsock_data(src); } if (receiver->knownmonitors) { free(receiver->knownmonitors); @@ -924,6 +940,7 @@ static int ndag_prepare_packet_stream_encaperf(libtrace_t *restrict libtrace, /* TODO */ } else { if (rt->received_packets > 0) { + assert(erfptr->lctr == 0); rt->dropped_upstream += ntohs(erfptr->lctr); } } @@ -1025,20 +1042,54 @@ static ndag_monitor_t *add_new_knownmonitor(recvstream_t *rt, uint16_t monid) { return mon; } -static int add_new_streamsock(recvstream_t *rt, streamsource_t src) { +static void realign_sources(recvstream_t *rt) { + + /* remove any "dead" socket entries in a source list, so + * we can reclaim the memory and not just continuously grow + * when sources come and go. + */ + + streamsock_t *newsrcs; + int i, newcnt = 0; + + newsrcs = calloc(rt->sourcecount, sizeof(streamsock_t)); + + for (i = 0; i < rt->sourcecount; i++) { + streamsock_t *src = &(rt->sources[i]); + if (src->sock == -1) { + free_streamsock_data(src); + continue; + } + memcpy(&(newsrcs[newcnt]), &(rt->sources[i]), + sizeof(streamsock_t)); + newcnt ++; + } + + /* If we haven't removed any, then we'll need to extend the + * source array to fit in the new streamsock. + */ + if (newcnt == rt->sourcecount) { + newsrcs = (streamsock_t *)realloc(rt->sources, + sizeof(streamsock_t) * (rt->sourcecount + 10)); + } + + free(rt->sources); + rt->sources = newsrcs; + rt->sourcecount = newcnt; +} + +static int add_new_streamsock(libtrace_t *libtrace, + recvstream_t *rt, streamsource_t src, uint8_t socktype) { streamsock_t *ssock = NULL; ndag_monitor_t *mon = NULL; int i; + char portstr[16]; - /* TODO consider replacing this with a list or vector so we can - * easily remove sources that are no longer in use, rather than - * just setting the sock to -1 and having to check them every - * time we want to read a packet. - */ if (rt->sourcecount == 0) { rt->sources = (streamsock_t *)malloc(sizeof(streamsock_t) * 10); } else if ((rt->sourcecount % 10) == 0) { + realign_sources(rt); rt->sources = (streamsock_t *)realloc(rt->sources, sizeof(streamsock_t) * (rt->sourcecount + 10)); } @@ -1056,6 +1107,7 @@ static int add_new_streamsock(recvstream_t *rt, streamsource_t src) { mon = add_new_knownmonitor(rt, src.monitor); } + ssock->socktype = socktype; ssock->port = src.port; ssock->groupaddr = src.groupaddr; ssock->expectedseq = 0; @@ -1071,8 +1123,15 @@ static int add_new_streamsock(recvstream_t *rt, streamsource_t src) { ssock->savedsize[i] = 0; } - ssock->sock = join_multicast_group(src.groupaddr, src.localiface, - NULL, src.port, &(ssock->srcaddr)); + if (socktype == NDAG_SOCKET_TYPE_MULTICAST) { + ssock->sock = join_multicast_group(src.groupaddr, + src.localiface, NULL, src.port, + &(ssock->srcaddr)); + } else { + snprintf(portstr, 16, "%u", src.port); + ssock->sock = accept_ndagtcp_connection(libtrace, + src.groupaddr, portstr); + } if (ssock->sock < 0) { return -1; @@ -1083,17 +1142,26 @@ static int add_new_streamsock(recvstream_t *rt, streamsource_t src) { } #if HAVE_DECL_RECVMMSG - for (i = 0; i < RECV_BATCH_SIZE; i++) { - ssock->mmsgbufs[i].msg_hdr.msg_iov = (struct iovec *) - malloc(sizeof(struct iovec)); - ssock->mmsgbufs[i].msg_hdr.msg_name = ssock->srcaddr->ai_addr; - ssock->mmsgbufs[i].msg_hdr.msg_namelen = ssock->srcaddr->ai_addrlen; - ssock->mmsgbufs[i].msg_hdr.msg_control = NULL; - ssock->mmsgbufs[i].msg_hdr.msg_controllen = 0; - ssock->mmsgbufs[i].msg_hdr.msg_flags = 0; - ssock->mmsgbufs[i].msg_len = 0; + if (ssock->socktype == NDAG_SOCKET_TYPE_MULTICAST) { + assert(ssock->srcaddr != NULL); + for (i = 0; i < RECV_BATCH_SIZE; i++) { + ssock->mmsgbufs[i].msg_hdr.msg_iov = (struct iovec *) + malloc(sizeof(struct iovec)); + ssock->mmsgbufs[i].msg_hdr.msg_name = + ssock->srcaddr->ai_addr; + ssock->mmsgbufs[i].msg_hdr.msg_namelen = + ssock->srcaddr->ai_addrlen; + ssock->mmsgbufs[i].msg_hdr.msg_control = NULL; + ssock->mmsgbufs[i].msg_hdr.msg_controllen = 0; + ssock->mmsgbufs[i].msg_hdr.msg_flags = 0; + ssock->mmsgbufs[i].msg_len = 0; + } + } else { + memset(ssock->mmsgbufs, 0, + sizeof(struct mmsghdr) * RECV_BATCH_SIZE); } #endif + memset(&(ssock->singlemsg), 0, sizeof(ssock->singlemsg)); ssock->singlemsg.msg_iov = (struct iovec *) calloc(1, sizeof(struct iovec)); ssock->nextread = NULL;; @@ -1102,13 +1170,11 @@ static int add_new_streamsock(recvstream_t *rt, streamsource_t src) { ssock->recordcount = 0; rt->sourcecount += 1; - fprintf(stderr, "Added new stream %s:%u to thread %d\n", - ssock->groupaddr, ssock->port, rt->threadindex); - return ssock->port; } -static int receiver_read_messages(recvstream_t *rt) { +static int receiver_read_messages(libtrace_t *libtrace, + recvstream_t *rt, uint8_t socktype) { ndag_internal_message_t msg; @@ -1116,7 +1182,8 @@ static int receiver_read_messages(recvstream_t *rt) { (void *)&msg) != LIBTRACE_MQ_FAILED) { switch(msg.type) { case NDAG_CLIENT_NEWGROUP: - if (add_new_streamsock(rt, msg.contents) < 0) { + if (add_new_streamsock(libtrace, rt, + msg.contents, socktype) < 0) { return -1; } break; @@ -1169,6 +1236,10 @@ static int init_receivers(streamsock_t *ssock, int required) { } #if HAVE_DECL_RECVMMSG for (i = 0; i < required; i++) { + if (ssock->socktype != NDAG_SOCKET_TYPE_MULTICAST) { + i = 1; + break; + } if (i >= RECV_BATCH_SIZE) { break; } @@ -1352,6 +1423,11 @@ static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv, if (is_buffered_data_available(ssock, tv, gottime)) { return 1; } + fprintf(stderr, "recvmsg failed: %s\n", strerror(errno)); + } else if (ret == 0) { + close(ssock->sock); + ssock->sock = -1; + return 0; } ssock->startidle = 0; @@ -1418,10 +1494,15 @@ static int receive_from_sockets(recvstream_t *rt, uint8_t socktype) { zerotv.tv_usec = 0; if (select(maxfd + 1, &fds, NULL, NULL, &zerotv) == -1) { /* log the error? XXX */ + fprintf(stderr, "select() failed for recvstream %d: %s\n", + rt->threadindex, strerror(errno)); return -1; } for (i = 0; i < rt->sourcecount; i++) { + if (rt->sources[i].sock == -1) { + continue; + } if (!FD_ISSET(rt->sources[i].sock, &fds)) { if (rt->sources[i].bufavail < ENCAP_BUFFERS) { readybufs ++; @@ -1461,7 +1542,8 @@ static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt, } /* Check for any messages from the control thread */ - iserr = receiver_read_messages(rt); + iserr = receiver_read_messages(libtrace, rt, + FORMAT_DATA->socktype); if (iserr <= 0) { return iserr; @@ -1527,23 +1609,12 @@ static int receive_encap_records_nonblock(libtrace_t *libtrace, recvstream_t *rt } static streamsock_t *select_next_packet(recvstream_t *rt) { - int i; + int i, res = -1; streamsock_t *ssock = NULL; uint64_t earliest = 0; uint64_t currentts = 0; dag_record_t *daghdr; - /* If we only have one source, then no need to do any - * timestamp parsing or byteswapping. - */ - if (rt->sourcecount == 1) { - if (readable_data(&(rt->sources[0]))) { - return &(rt->sources[0]); - } - return NULL; - } - - for (i = 0; i < rt->sourcecount; i ++) { if (!readable_data(&(rt->sources[i]))) { continue; @@ -1560,8 +1631,13 @@ static streamsock_t *select_next_packet(recvstream_t *rt) { if (earliest == 0 || earliest > currentts) { earliest = currentts; ssock = &(rt->sources[i]); + res = i; } } + assert(ssock == NULL || res == rt->sourcecount - 1); + if (ssock != NULL) { + printf("%d\n", res); + } return ssock; } @@ -1572,7 +1648,7 @@ static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) { rem = receive_encap_records_block(libtrace, &(FORMAT_DATA->receivers[0]), packet, NULL); - if (rem <= 0) { + if (rem < 0) { return rem; } @@ -1615,7 +1691,6 @@ static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t, break; } } - nextavail = select_next_packet(rt); if (nextavail == NULL) { break; @@ -1631,6 +1706,8 @@ static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t, } } while (1); + printf("pread done\n"); + for (i = 0; i < rt->sourcecount; i++) { streamsock_t *src = &(rt->sources[i]); src->bufavail += src->bufwaiting; @@ -1654,7 +1731,8 @@ static libtrace_eventobj_t trace_event_ndag(libtrace_t *libtrace, streamsock_t *nextavail = NULL; /* Only check for messages once per call */ - rem = receiver_read_messages(&(FORMAT_DATA->receivers[0])); + rem = receiver_read_messages(libtrace, &(FORMAT_DATA->receivers[0]), + FORMAT_DATA->socktype); if (rem <= 0) { event.type = TRACE_EVENT_TERMINATE; return event; diff --git a/tools/tracemcast/Makefile.am b/tools/tracemcast/Makefile.am index 02cfd3c3..3f72261c 100644 --- a/tools/tracemcast/Makefile.am +++ b/tools/tracemcast/Makefile.am @@ -1,6 +1,8 @@ -bin_PROGRAMS = tracemcast -man_MANS = tracemcast.1 +bin_PROGRAMS = tracemcast traceucast +man_MANS = tracemcast.1 traceucast.1 EXTRA_DIST = $(man_MANS) include ../Makefile.tools tracemcast_SOURCES = tracemcast.c + +traceucast_SOURCEs = traceucast.c diff --git a/tools/tracemcast/traceucast.1 b/tools/tracemcast/traceucast.1 new file mode 100644 index 00000000..a97c846f --- /dev/null +++ b/tools/tracemcast/traceucast.1 @@ -0,0 +1,70 @@ +.TH TRACEUCAST "1" "Oct 2023" "traceucast (libtrace)" "User Commands" +.SH +traceucast \- stream captured packets to a libtrace client +.SH SYNOPSIS +.B traceucast +[ \-m ] +[ \-f ] +[ \-c ] +[ \-p ] +[ \-t ] +[ \-M ] +inputuri +.SH DESCRIPTION +traceucast reads packets from a single live packet source (e.g. an interface +or hardware capture card) and then emits those packets to a specified +libtrace client program that can process the packets off-site. + +This allows the redirection of packets captured on one host to another. +Unlike tracemcast, traceucast uses TCP to transfer packets, so the packet +stream is reliable and ordered. The downside is that traceucast can only +maintain a connection to a single receiving client at a time. + +The streaming protocol is the nDAG protocol, which libtrace supports as +an input format, so libtrace programs can natively receive packets from a +traceucast instance without any additional modifications. + +.TP +\fB\-m\fR +set a unique identifier that will be included in the nDAG header. This is used +by the recipient to tell which traceucast instance emitted the packet. + +.TP +\fB\-f\fR +only emit packets that match this BPF filter expression. + +.TP +\fB\-c\fR
+transmit captured packets to this client address. The client should be +a libtrace program that is using an ndagtcp: input URI. + +.TP +\fB\-p\fR +send nDAG beacon messages to this port number. + +.TP +\fB\-t\fR +sets the number of threads to use for streaming packets. +Each thread will produce a separate stream on a unique port. + +.TP +\fB\-M\fR +do not create any nDAG messages larger than this number of bytes. +Don't forget to allow for additional encapsulation (e.g. Ethernet, IP, UDP) +when determining this value. + +.SH LINKS +More details about traceucast (and libtrace) can be found at +https://github.com/LibtraceTeam/libtrace/wiki + +.SH SEE ALSO +libtrace(3), tracesplit(1), tracesplit_dir(1), tracefilter(1), +traceconvert(1), tracereport(1), tracertstats(1), tracestats(1), +tracepktdump(1), traceanon(1), tracesummary(1), tracereplay(1), +tracediff(1), traceends(1), tracetopends(1), tracemerge(1), +tracemcast(1) + +.SH AUTHORS +Shane Alcock + + diff --git a/tools/tracemcast/traceucast.c b/tools/tracemcast/traceucast.c new file mode 100644 index 00000000..eba5137c --- /dev/null +++ b/tools/tracemcast/traceucast.c @@ -0,0 +1,767 @@ +/* + * + * Copyright (c) 2007-2023 The University of Waikato, Hamilton, New Zealand. + * All rights reserved. + * + * This file is part of libtrace. + * + * Libtrace was originally developed by the University of Waikato WAND + * research group. For further information please see http://www.wand.net.nz/ + * + * libtrace is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * libtrace is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see . + * + * + */ + +/* Author: Shane Alcock, SearchLight */ + +/* Given a single live capture input, e.g. 'ring:' or 'dpdk:', this tool + * will re-transmit the packets received across a network to a listening + * host. The resulting traffic stream matches the expected format + * for an 'ndagtcp:' client, so you can use libtrace to receive the + * packets and process them as if you had captured from the source + * directly. + * + * Effectively, this tool is intended to provide a means of pushing packets + * from a capture source to a secondary client so that you can run libtrace + * tools and programs on a remote host. Unlike tracemcast, traceucast uses + * TCP to ensure the packets reach their destination but, as a result, can + * only support a single recipient. + * + * Inspired by (and borrowing somewhat from) the DAG multicaster tool that + * I developed for the STARDUST project. The DAG multicaster is optimised + * for use with a DAG card only. It is highly recommended if you are using a + * DAG card for your initial capture *and* your use case is academic and + * non-commercial. + * + * traceucast is generalised for use with other live capture formats and + * therefore loses some of the optimizations that come from being DAG-specific. + * It is also licensed under the LGPL, so can be used for commercial purposes + * (provided the terms of the LGPL are met). + */ + +#include "config.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include "lib/format_erf.h" +#include "lib/format_ndag.h" +#include "lib/lt_bswap.h" + +#ifndef HAVE_DAG_API +#include "lib/dagformat.h" +#endif + +#include "lib/libtrace_int.h" + +struct libtrace_t *currenttrace = NULL; + +struct global_params { + + uint16_t monitorid ; + char *clientaddr ; + uint64_t starttime; + uint16_t firstport; + int readercount; + uint16_t mtu; +}; + +struct beacon_params { + uint16_t beaconport; + struct global_params *gparams; + uint32_t frequency; +}; + +typedef struct read_thread_data { + int threadid; + uint16_t streamport; + int streamfd; + + uint8_t *pbuffer; + ndag_encap_t *encaphdr; + uint8_t *writeptr; + uint32_t seqno; + uint16_t reccount; + struct addrinfo *target; + uint32_t lastsend; + + bool livesource; + uint8_t failed; + +} read_thread_data_t; + +volatile int halted = 0; + +static void cleanup_signal(int signal UNUSED) { + if (currenttrace) { + trace_pstop(currenttrace); + } + halted = 1; +} + +static int create_stream_socket(uint16_t port, char *clientaddr, + struct addrinfo **targetinfo, uint8_t block) { + + struct addrinfo hints; + struct addrinfo *gotten; + char portstr[16]; + int sock; + int bufsize, reuse=1; + + hints.ai_family = PF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + hints.ai_protocol = 0; + + snprintf(portstr, 15, "%u", port); + + if (getaddrinfo(clientaddr, portstr, &hints, &gotten) != 0) { + fprintf(stderr, + "traceucast: Call to getaddrinfo failed for %s:%s -- %s\n", + clientaddr, portstr, strerror(errno)); + return -1; + } + if (targetinfo) { + *targetinfo = gotten; + } + + sock = socket(gotten->ai_family, gotten->ai_socktype, 0); + if (sock < 0) { + fprintf(stderr, + "traceucast: Failed to create TCP socket for %s:%s -- %s\n", + clientaddr, portstr, strerror(errno)); + goto sockcreateover; + } + + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) + < 0) { + + fprintf(stderr, "traceucast: Failed to configure socket for %s:%s -- %s\n", + clientaddr, portstr, strerror(errno)); + + close(sock); + sock = -1; + goto sockcreateover; + } + + bufsize = 32 * 1024 * 1024; + if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &bufsize, + (socklen_t)sizeof(int)) != 0) { + fprintf(stderr, + "traceucast: Failed to increase buffer size on streaming interface %s:%s -- %s\n", + clientaddr, portstr, strerror(errno)); + close(sock); + sock = -1; + goto sockcreateover; + } + + + while (!halted) { + if (connect(sock, gotten->ai_addr, gotten->ai_addrlen) == -1) { + if (errno == ECONNREFUSED) { + if (block) { + sleep(1); + continue; + } else { + close(sock); + sock = 0; + break; + } + } + fprintf(stderr, + "traceucast: Failed to connect to %s:%s -- %s\n", + clientaddr, portstr, strerror(errno)); + close(sock); + sock = -1; + break; + } else { + fprintf(stderr, "traceucast connected to %s:%s\n", clientaddr, + portstr); + break; + } + } + +sockcreateover: + if (targetinfo == NULL) { + freeaddrinfo(gotten); + } + return sock; +} + +static inline char *fill_common_header(char *bufstart, uint16_t monitorid, + uint8_t pkttype) { + + ndag_common_t *hdr = (ndag_common_t *)bufstart; + + hdr->magic = htonl(NDAG_MAGIC_NUMBER); + hdr->version = NDAG_EXPORT_VERSION; + hdr->type = pkttype; + hdr->monitorid = htons(monitorid); + + return bufstart + sizeof(ndag_common_t); +} + +static void *init_reader_thread(libtrace_t *trace, + libtrace_thread_t *t, void *global) { + + read_thread_data_t *rdata = NULL; + struct global_params *gparams = (struct global_params *)global; + libtrace_info_t *info = trace_get_information(trace); + + rdata = calloc(1, sizeof(read_thread_data_t)); + if (info) { + rdata->livesource = info->live; + } else { + rdata->livesource = false; + } + rdata->threadid = trace_get_perpkt_thread_id(t); + rdata->streamport = gparams->firstport + rdata->threadid; + rdata->streamfd = -1; + rdata->pbuffer = calloc(gparams->mtu, sizeof(uint8_t)); + rdata->writeptr = rdata->pbuffer; + rdata->seqno = 1; + rdata->target = NULL; + rdata->lastsend = 0; + rdata->encaphdr = NULL; + rdata->reccount = 0; + rdata->failed = 0; + + rdata->streamfd = -1; + + return rdata; +} + +static int send_ndag_packet(read_thread_data_t *rdata) { + + int s; + int rem = (rdata->writeptr - rdata->pbuffer); + int sentsofar = 0; + int ret = 0; + int attempts = 0; + + rdata->encaphdr->recordcount = ntohs(rdata->reccount); + + while (rem > 0) { + s = send(rdata->streamfd, rdata->pbuffer + sentsofar, rem, MSG_DONTWAIT); + + if (s < 0) { + if ((errno == EAGAIN || errno == EWOULDBLOCK) && attempts < 10) { + attempts ++; + usleep(5000); + continue; + } + fprintf(stderr, "traceucast: thread %d failed to send streamed ERF packet: %s\n", + rdata->threadid, strerror(errno)); + ret = -1; + break; + } + + sentsofar += s; + rem -= s; + } + + rdata->writeptr = rdata->pbuffer; + rdata->encaphdr = NULL; + rdata->reccount = 0; + return ret; +} + +static void halt_reader_thread(libtrace_t *trace UNUSED, + libtrace_thread_t *t UNUSED, void *global UNUSED, void *tls) { + + read_thread_data_t *rdata = (read_thread_data_t *)tls; + + if (rdata->writeptr > rdata->pbuffer) { + send_ndag_packet(rdata); + } + + if (rdata->pbuffer) { + free(rdata->pbuffer); + } + if (rdata->target) { + freeaddrinfo(rdata->target); + } + if (rdata->streamfd != -1) { + close(rdata->streamfd); + } + free(rdata); +} + +static uint16_t construct_erf_header(read_thread_data_t *rdata, + libtrace_packet_t *packet, libtrace_linktype_t ltype, uint32_t rem, + uint64_t erfts) { + + uint16_t framing = 0; + dag_record_t *drec = (dag_record_t *)(rdata->writeptr); + + drec->ts = bswap_host_to_le64(erfts); + + if (ltype == TRACE_TYPE_ETH) { + drec->type = TYPE_ETH; + } else if (ltype == TRACE_TYPE_NONE) { + drec->type = TYPE_IPV4; // sorry if you're using IPv6 raw */ + } else { + drec->type = 255; + } + + if (drec->type == TYPE_ETH) { + framing = dag_record_size + 2; + } else { + framing = dag_record_size; + } + drec->rlen = htons(rem + framing); + drec->wlen = htons(trace_get_wire_length(packet)); + drec->lctr = htons(0); + memset(&(drec->flags), 0, sizeof(drec->flags)); + + if (trace_get_direction(packet) != TRACE_DIR_UNKNOWN) { + drec->flags.iface = trace_get_direction(packet); + } else { + drec->flags.iface = 0; + } + + return framing; +} + +static void tick_reader_thread(libtrace_t *trace UNUSED, + libtrace_thread_t *t UNUSED, void *global UNUSED, void *tls, + uint64_t order) { + + read_thread_data_t *rdata = (read_thread_data_t *)tls; + + if (rdata->writeptr > rdata->pbuffer && + (order >> 32) >= rdata->lastsend + 3) { + + if (send_ndag_packet(rdata) < 0) { + rdata->failed = 1; + } + rdata->lastsend = (order >> 32); + } +} + +static libtrace_packet_t *packet_reader_thread(libtrace_t *trace UNUSED, + libtrace_thread_t *t UNUSED, void *global, void *tls, + libtrace_packet_t *packet) { + + read_thread_data_t *rdata = (read_thread_data_t *)tls; + struct global_params *gparams = (struct global_params *)global; + libtrace_linktype_t ltype; + uint32_t rem; + void *l2; + uint64_t erfts; + + if (rdata->failed) { + trace_interrupt(); + return packet; + } + + if (IS_LIBTRACE_META_PACKET(packet)) { + return packet; + } + + if (rdata->streamfd == -1) { + int fd; + uint8_t block; + + if (rdata->livesource) { + block = 0; + } else { + block = 1; + } + fd = create_stream_socket(rdata->streamport, + gparams->clientaddr, &(rdata->target), block); + + if (fd == 0) { + return packet; + } + if (fd == -1) { + fprintf(stderr, "traceucast: failed to create TCP socket for reader thread %d\n", rdata->threadid); + trace_interrupt(); + return packet; + + } else if (rdata->target == NULL) { + fprintf(stderr, "traceucast: failed to get addrinfo for reader socket %d\n", rdata->threadid); + close(rdata->streamfd); + rdata->streamfd = -1; + trace_interrupt(); + return packet; + } + rdata->streamfd = fd; + } + + /* first, check if there is going to be space in the buffer for this + * packet + an ERF header */ + l2 = trace_get_layer2(packet, <ype, &rem); + erfts = trace_get_erf_timestamp(packet); + + if (gparams->mtu - (rdata->writeptr - rdata->pbuffer) < + rem + dag_record_size) { + + /* if not and if there is already something in the buffer, send it then + * create a new one. + */ + if (rdata->writeptr > rdata->pbuffer + sizeof(ndag_common_t) + + sizeof(ndag_encap_t)) { + + if (send_ndag_packet(rdata) < 0) { + rdata->failed = 1; + return packet; + } + rdata->lastsend = (erfts >> 32); + } + } + + /* append this packet to the buffer (truncate if necessary) */ + + /* if the buffer is empty, put on a common and encap header on the + * front, before adding any packets */ + if (rdata->writeptr == rdata->pbuffer) { + rdata->encaphdr = (ndag_encap_t *)(fill_common_header( + (char *)rdata->writeptr, + gparams->monitorid, NDAG_PKT_ENCAPERF)); + rdata->writeptr = ((uint8_t *)rdata->encaphdr) + sizeof(ndag_encap_t); + + rdata->encaphdr->started = gparams->starttime; + rdata->encaphdr->seqno = htonl(rdata->seqno); + rdata->encaphdr->streamid = htons(rdata->threadid); + rdata->encaphdr->recordcount = 0; + + rdata->reccount = 0; + rdata->seqno ++; + } + + if (rem > gparams->mtu - (rdata->writeptr - rdata->pbuffer) + - (dag_record_size + 2)) { + rem = gparams->mtu - (rdata->writeptr - rdata->pbuffer); + rem -= (dag_record_size + 2); + } + + /* put an ERF header in at writeptr */ + rdata->writeptr += construct_erf_header(rdata, packet, ltype, rem, erfts); + + /* copy packet contents into writeptr */ + memcpy(rdata->writeptr, l2, rem); + rdata->writeptr += rem; + rdata->reccount ++; + + /* if the buffer is close to full, just send the buffer anyway */ + if (gparams->mtu - (rdata->writeptr - rdata->pbuffer) - + (dag_record_size + 2) < 64) { + if (send_ndag_packet(rdata) < 0) { + rdata->failed = 1; + } + rdata->lastsend = (erfts >> 32); + } + + return packet; +} + +static void start_libtrace_reader(struct global_params *gparams, char *uri, + char *filterstring) { + + + libtrace_filter_t *filt = NULL; + libtrace_callback_set_t *pktcbs = NULL; + + currenttrace = trace_create(uri); + if (trace_is_err(currenttrace)) { + trace_perror(currenttrace, "trace_create"); + goto failmode; + } + + trace_set_perpkt_threads(currenttrace, gparams->readercount); + + pktcbs = trace_create_callback_set(); + trace_set_starting_cb(pktcbs, init_reader_thread); + trace_set_stopping_cb(pktcbs, halt_reader_thread); + trace_set_packet_cb(pktcbs, packet_reader_thread); + trace_set_tick_interval_cb(pktcbs, tick_reader_thread); + + if (trace_get_information(currenttrace)->live) { + trace_set_tick_interval(currenttrace, 1000); + } else { + trace_set_tracetime(currenttrace, true); + } + + if (filterstring) { + filt = trace_create_filter(filterstring); + + if (trace_config(currenttrace, TRACE_OPTION_FILTER, filt) < 0) { + trace_perror(currenttrace, "Failed to configure filter"); + goto failmode; + } + } + + + if (trace_pstart(currenttrace, gparams, pktcbs, NULL) == -1) { + trace_perror(currenttrace, "Failed to start trace"); + goto failmode; + } + + trace_join(currenttrace); + + if (trace_is_err(currenttrace)) { + trace_perror(currenttrace, "Reading packets"); + } + +failmode: + if (filt) { + trace_destroy_filter(filt); + } + if (currenttrace) { + trace_destroy(currenttrace); + } + if (pktcbs) { + trace_destroy_callback_set(pktcbs); + } + +} + + +static uint32_t form_beacon(char **buffer, struct beacon_params *bparams) { + + uint32_t bsize = sizeof(ndag_common_t) + (sizeof(uint16_t) * + (bparams->gparams->readercount + 1)); + char *bptr; + uint16_t *next; + int i; + + if (bsize > bparams->gparams->mtu) { + fprintf(stderr, "traceucast: beacon is too large to fit in a single datagram, either increase MTU or reduce number of threads\n"); + return 0; + } + + bptr = (char *)malloc(bsize); + next = (uint16_t *)(fill_common_header(bptr, bparams->gparams->monitorid, + NDAG_PKT_BEACON)); + + *next = htons(bparams->gparams->readercount); + next ++; + + for (i = 0; i < bparams->gparams->readercount; i++) { + *next = htons(bparams->gparams->firstport + (i)); + next ++; + } + + *buffer = bptr; + return bsize; +} + +static void *beaconer_thread(void *tdata) { + + struct beacon_params *bparams = (struct beacon_params *)tdata; + int sock; + char *beaconpacket = NULL; + uint32_t beaconsize; + struct addrinfo *targetinfo = NULL; + + sock = create_stream_socket(bparams->beaconport, + bparams->gparams->clientaddr, &targetinfo, 1); + + if (sock == -1) { + fprintf(stderr, "traceucast: failed to create TCP socket for beaconer thread\n"); + halted = 1; + } else if (targetinfo == NULL) { + fprintf(stderr, "traceucast: failed to get addrinfo for beaconer socket\n"); + halted = 1; + } + + beaconsize = form_beacon(&beaconpacket, bparams); + + if (beaconsize <= 0 || beaconpacket == NULL) { + halted = 1; + } + + while (!halted) { + if (send(sock, beaconpacket, beaconsize, 0) != beaconsize) { + fprintf(stderr, "traceucast: failed to send a beacon packet: %s\n", + strerror(errno)); + break; + } + usleep(1000 * bparams->frequency); + } + + if (beaconpacket) { + free(beaconpacket); + } + if (targetinfo) { + free(targetinfo); + } + if (sock >= 0) { + close(sock); + } + + pthread_exit(NULL); + +} + +static void usage(char *prog) { + fprintf(stderr, "Usage:\n" + "%s [ options ] libtraceURI\n\n", prog); + fprintf(stderr, "Options:\n" + " -f --filter=bpffilter Only emit packets that match this BPF filter\n" + " -m --monitorid=idnum Tag all streamed packets with the given identifier\n" + " -c --clientaddr=address Connect to a ndagtcp receiver at this address/hostname\n" + " -p --beaconport=port Send beacons to the receiver on this port number\n" + " -M --mtu=bytes Limit streamed message size to this number of bytes\n" + " -t --threads=count Use this number of packet processing threads\n" + " -h --help Show this usage statement\n"); +} + + +int main(int argc, char *argv[]) { + struct sigaction sigact; + char *filterstring = NULL; + uint16_t beaconport = 9999; + struct global_params gparams; + struct beacon_params bparams; + int threads = 1; + struct timeval tv; + uint16_t mtu = NDAG_MAX_DGRAM_SIZE; + pthread_t beacontid = 0; + sigset_t sig_before, sig_block_all; + + gparams.monitorid = 0; + gparams.clientaddr = NULL; + + while (1) { + int optindex; + struct option long_options[] = { + { "filter", 1, 0, 'f' }, + { "monitorid", 1, 0, 'm' }, + { "clientaddr", 1, 0, 'c' }, + { "beaconport", 1, 0, 'p' }, + { "threads", 1, 0, 't' }, + { "mtu", 1, 0, 'M' }, + { "help", 0, 0, 'h' }, + { NULL, 0, 0, 0 }, + }; + + int c = getopt_long(argc, argv, "M:t:f:m:c:p:h", long_options, + &optindex); + if (c == -1) { + break; + } + + switch (c) { + case 'f': + filterstring = optarg; + break; + case 'M': + mtu = (uint16_t)strtoul(optarg, NULL, 0); + break; + case 'm': + gparams.monitorid = (uint16_t)strtoul(optarg, NULL, 0); + break; + case 'c': + gparams.clientaddr = optarg; + break; + case 'p': + beaconport = (uint16_t)strtoul(optarg, NULL, 0); + break; + case 't': + threads = (int)strtoul(optarg, NULL, 0); + break; + case 'h': + default: + usage(argv[0]); + return 1; + } + } + + if (optind >= argc) { + usage(argv[0]); + fprintf(stderr, + "traceucast: No URI specified as an input source. Exiting\n"); + return 1; + } + + sigact.sa_handler = cleanup_signal; + sigemptyset(&sigact.sa_mask); + sigact.sa_flags = SA_RESTART; + + sigaction(SIGINT, &sigact, NULL); + sigaction(SIGTERM, &sigact, NULL); + + if (gparams.clientaddr == NULL) { + fprintf(stderr, + "traceucast: no client address specified to receive our streams. Exiting\n"); + return 1; + } + if (mtu >= NDAG_MAX_DGRAM_SIZE) { + mtu = NDAG_MAX_DGRAM_SIZE; + } else if (mtu < 536) { + mtu = 536; + } + + + gettimeofday(&tv, NULL); + gparams.starttime = bswap_host_to_le64(((tv.tv_sec - 1509494400) * 1000) + + (tv.tv_usec / 1000.0)); + gparams.readercount = threads; + gparams.mtu = mtu; + + gparams.firstport = 10000 + (rand() % 52000); + + fprintf(stderr, "Streaming %s to %s:%u \n", + argv[optind], gparams.clientaddr, beaconport); + fprintf(stderr, "Monitor ID is set to %u\n", gparams.monitorid); + + /* Start up the beaconing */ + bparams.beaconport = beaconport; + bparams.gparams = &(gparams); + bparams.frequency = 1000; + + sigemptyset(&sig_block_all); + if (pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before) < 0) { + fprintf(stderr, "Unable to disable signals before starting beaconer.\n"); + goto endmcast; + } + + if (pthread_create(&beacontid, NULL, beaconer_thread, &bparams) != 0) { + fprintf(stderr, "Error while creating beaconer thread: %s", + strerror(errno)); + goto endmcast; + } + + if (pthread_sigmask(SIG_SETMASK, &sig_before, NULL)) { + fprintf(stderr, "Unable to re-enable signals after beaconer creation.\n"); + goto endmcast; + } + + start_libtrace_reader(&gparams, argv[optind], filterstring); + +endmcast: + halted = 1; + + if (beacontid != 0) { + pthread_join(beacontid, NULL); + } + + return 0; +} + +// vim: set sw=4 tabstop=4 softtabstop=4 expandtab : From 9b715fddfd06fe9224b646cc7c8f50df70d7f32a Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Tue, 24 Oct 2023 16:12:00 +1300 Subject: [PATCH 4/8] Fix remaining bugs in receiving ndag over TCP * handle situations where two (or more) ndag messages are received into a single buffer. * avoid potential race condition when pausing an ndag trace, due to an ongoing "read packet" function. * a socket is now still considered to have readable data as long as there is unprocessed buffered data, even if the socket itself is closed. --- lib/format_ndag.c | 418 ++++++++++++++++++++++++++++------------------ 1 file changed, 255 insertions(+), 163 deletions(-) diff --git a/lib/format_ndag.c b/lib/format_ndag.c index 6b2950bb..0d2cc901 100644 --- a/lib/format_ndag.c +++ b/lib/format_ndag.c @@ -93,11 +93,13 @@ typedef struct streamsock { int nextreadind; int nextwriteind; int savedsize[ENCAP_BUFFERS]; + int savedreccount[ENCAP_BUFFERS]; uint8_t rectype[ENCAP_BUFFERS]; uint64_t nextts; uint32_t startidle; - uint64_t recordcount; + uint64_t total_recordcount; + int reccount; int bufavail; int bufwaiting; @@ -121,6 +123,7 @@ typedef struct recvstream { uint64_t received_packets; int maxfd; + uint8_t halted; } recvstream_t; typedef struct ndag_format_data { @@ -187,6 +190,109 @@ static uint8_t check_ndag_header(char *msgbuf, uint32_t msgsize) { return header->type; } +static inline void reset_expected_seqs(recvstream_t *rt, ndag_monitor_t *mon) { + + int i; + for (i = 0; i < rt->sourcecount; i++) { + if (rt->sources[i].monitorptr == mon) { + rt->sources[i].expectedseq = 0; + } + } + +} + +static int check_ndag_received(streamsock_t *ssock, int index, + unsigned int msglen, recvstream_t *rt, unsigned int offset) { + + ndag_encap_t *encaphdr; + ndag_monitor_t *mon; + uint8_t rectype; + + if (msglen < sizeof(ndag_common_t)) { + return 0; + } + + /* Check that we have a valid nDAG encap record */ + rectype = check_ndag_header(ssock->saved[index] + offset, + (uint32_t)msglen); + + if (rectype == NDAG_PKT_KEEPALIVE) { + /* Keep-alive, reset startidle and carry on. Don't + * change nextwrite -- we want to overwrite the + * keep-alive with usable content. */ + if (ssock->socktype == NDAG_SOCKET_TYPE_MULTICAST) { + return 0; + } + /* skip over keep alive */ + offset += sizeof(ndag_common_t); + msglen -= sizeof(ndag_common_t); + return check_ndag_received(ssock, index, msglen, rt, offset); + + } else if (rectype != NDAG_PKT_ENCAPERF && + rectype != NDAG_PKT_CORSAROTAG) { + fprintf(stderr, "Received invalid record on the channel for %s:%u.\n", + ssock->groupaddr, ssock->port); + close(ssock->sock); + ssock->sock = -1; + return -1; + } + + ssock->rectype[index] = rectype; + if (offset == 0) { + ssock->savedsize[index] = msglen; + ssock->nextwriteind ++; + ssock->bufavail --; + + if (ssock->bufavail < 0) { + fprintf(stderr, "No space in buffer in check_ndag_received()\n"); + return -1; + } + if (ssock->nextwriteind >= ENCAP_BUFFERS) { + ssock->nextwriteind = 0; + } + + } + /* Get the useful info from the encap header */ + encaphdr=(ndag_encap_t *)(ssock->saved[index] + offset + + sizeof(ndag_common_t)); + + ssock->savedreccount[index] = ntohs(encaphdr->recordcount); + mon = ssock->monitorptr; + + if (mon->laststart == 0) { + mon->laststart = bswap_be_to_host64(encaphdr->started); + } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) { + mon->laststart = bswap_be_to_host64(encaphdr->started); + reset_expected_seqs(rt, mon); + + /* TODO what is a good way to indicate this to clients? + * set the loss counter in the ERF header? a bit rude? + * use another bit in the ERF header? + * add a queryable flag to libtrace_packet_t? + */ + + } + + if (ssock->expectedseq != 0) { + rt->missing_records += seq_cmp( + ntohl(encaphdr->seqno), ssock->expectedseq); + + } + ssock->expectedseq = ntohl(encaphdr->seqno) + 1; + if (ssock->expectedseq == 0) { + ssock->expectedseq ++; + } + + /* set up 'nextread' * by skipping past the nDAG headers */ + if (index == ssock->nextreadind) { + ssock->nextread = ssock->saved[index] + offset + + sizeof(ndag_common_t) + sizeof(ndag_encap_t); + } + + return 1; + +} + static int join_multicast_group(char *groupaddr, char *localiface, char *portstr, uint16_t portnum, struct addrinfo **srcinfo) { @@ -670,6 +776,7 @@ static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads, for (i = 0; i < maxthreads; i++) { FORMAT_DATA->receivers[i].sources = NULL; FORMAT_DATA->receivers[i].sourcecount = 0; + FORMAT_DATA->receivers[i].halted = 0; FORMAT_DATA->receivers[i].knownmonitors = NULL; FORMAT_DATA->receivers[i].monitorcount = 0; FORMAT_DATA->receivers[i].threadindex = i; @@ -762,6 +869,10 @@ static void free_streamsock_data(streamsock_t *src) { static void halt_ndag_receiver(recvstream_t *receiver) { int i; + + receiver->halted = 1; + usleep(200000); + libtrace_message_queue_destroy(&(receiver->mqueue)); if (receiver->sources == NULL) @@ -782,12 +893,12 @@ static void halt_ndag_receiver(recvstream_t *receiver) { static int ndag_pause_input(libtrace_t *libtrace) { int i; + ndag_paused = 1; + pthread_join(FORMAT_DATA->controlthread, NULL); /* Close the existing receiver sockets */ for (i = 0; i < FORMAT_DATA->receiver_cnt; i++) { halt_ndag_receiver(&(FORMAT_DATA->receivers[i])); } - ndag_paused = 1; - pthread_join(FORMAT_DATA->controlthread, NULL); return 0; } @@ -852,11 +963,12 @@ static int ndag_prepare_packet_stream_corsarotag(libtrace_t *restrict libtrace, rlen = ntohs(taghdr->pktlen) + sizeof(corsaro_tagged_packet_header_t); rt->received_packets ++; - ssock->recordcount += 1; + ssock->total_recordcount += 1; nr = ssock->nextreadind; ssock->nextread += rlen; ssock->nextts = 0; + ssock->reccount ++; if (ssock->nextread - ssock->saved[nr] > ssock->savedsize[nr]) { trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Walked past the end of the " @@ -869,6 +981,7 @@ static int ndag_prepare_packet_stream_corsarotag(libtrace_t *restrict libtrace, * move on. */ ssock->savedsize[nr] = 0; ssock->bufwaiting ++; + ssock->reccount = 0; nr ++; if (nr == ENCAP_BUFFERS) { @@ -877,6 +990,27 @@ static int ndag_prepare_packet_stream_corsarotag(libtrace_t *restrict libtrace, ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) + sizeof(ndag_encap_t); ssock->nextreadind = nr; + } else if (ssock->reccount >= ssock->savedreccount[nr]) { + int x = check_ndag_received(ssock, ssock->nextreadind, + ssock->savedsize[nr] - (ssock->nextread + - ssock->saved[nr]), rt, + ssock->nextread - ssock->saved[nr]); + + ssock->reccount = 0; + if (x <= 0) { + ssock->savedsize[nr] = 0; + ssock->bufwaiting ++; + ssock->reccount = 0; + + nr ++; + if (nr == ENCAP_BUFFERS) { + nr = 0; + } + ssock->nextread = ssock->saved[nr] + + sizeof(ndag_common_t) + + sizeof(ndag_encap_t); + ssock->nextreadind = nr; + } } packet->order = ((uint64_t) ntohl(taghdr->ts_sec)) << 32; @@ -946,7 +1080,7 @@ static int ndag_prepare_packet_stream_encaperf(libtrace_t *restrict libtrace, } rt->received_packets ++; - ssock->recordcount += 1; + ssock->total_recordcount += 1; nr = ssock->nextreadind; encaphdr = (ndag_encap_t *)(ssock->saved[nr] + @@ -963,6 +1097,7 @@ static int ndag_prepare_packet_stream_encaperf(libtrace_t *restrict libtrace, } ssock->nextread += rlen; ssock->nextts = 0; + ssock->reccount ++; if (ssock->nextread - ssock->saved[nr] > ssock->savedsize[nr]) { trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Walked past the end of the " @@ -975,6 +1110,7 @@ static int ndag_prepare_packet_stream_encaperf(libtrace_t *restrict libtrace, * move on. */ ssock->savedsize[nr] = 0; ssock->bufwaiting ++; + ssock->reccount = 0; nr ++; if (nr == ENCAP_BUFFERS) { @@ -983,6 +1119,28 @@ static int ndag_prepare_packet_stream_encaperf(libtrace_t *restrict libtrace, ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) + sizeof(ndag_encap_t); ssock->nextreadind = nr; + } else if (ssock->reccount >= ssock->savedreccount[nr]) { + int x = check_ndag_received(ssock, ssock->nextreadind, + ssock->savedsize[nr] - (ssock->nextread + - ssock->saved[nr]), rt, + ssock->nextread - ssock->saved[nr]); + ssock->reccount = 0; + if (x <= 0) { + ssock->savedsize[nr] = 0; + ssock->bufwaiting ++; + ssock->reccount = 0; + + nr ++; + if (nr == ENCAP_BUFFERS) { + nr = 0; + } + ssock->nextread = ssock->saved[nr] + + sizeof(ndag_common_t) + + sizeof(ndag_encap_t); + ssock->nextreadind = nr; + } + + } packet->order = erf_get_erf_timestamp(packet); @@ -1117,10 +1275,12 @@ static int add_new_streamsock(libtrace_t *libtrace, ssock->bufwaiting = 0; ssock->startidle = 0; ssock->nextts = 0; + ssock->reccount = 0; for (i = 0; i < ENCAP_BUFFERS; i++) { ssock->saved[i] = (char *)malloc(ENCAP_BUFSIZE); ssock->savedsize[i] = 0; + ssock->savedreccount[i] = 0; } if (socktype == NDAG_SOCKET_TYPE_MULTICAST) { @@ -1167,7 +1327,7 @@ static int add_new_streamsock(libtrace_t *libtrace, ssock->nextread = NULL;; ssock->nextreadind = 0; ssock->nextwriteind = 0; - ssock->recordcount = 0; + ssock->total_recordcount = 0; rt->sourcecount += 1; return ssock->port; @@ -1184,11 +1344,11 @@ static int receiver_read_messages(libtrace_t *libtrace, case NDAG_CLIENT_NEWGROUP: if (add_new_streamsock(libtrace, rt, msg.contents, socktype) < 0) { - return -1; + return READ_ERROR; } break; case NDAG_CLIENT_HALT: - return 0; + return READ_EOF; } } return 1; @@ -1197,10 +1357,12 @@ static int receiver_read_messages(libtrace_t *libtrace, static inline int readable_data(streamsock_t *ssock) { - if (ssock->sock == -1) { + if (ssock->savedsize[ssock->nextreadind] == 0) { return 0; } - if (ssock->savedsize[ssock->nextreadind] == 0) { + + /* this one shouldn't happen, but just to be safe... */ + if (ssock->savedreccount[ssock->nextreadind] == 0) { return 0; } /* @@ -1214,17 +1376,6 @@ static inline int readable_data(streamsock_t *ssock) { } -static inline void reset_expected_seqs(recvstream_t *rt, ndag_monitor_t *mon) { - - int i; - for (i = 0; i < rt->sourcecount; i++) { - if (rt->sources[i].monitorptr == mon) { - rt->sources[i].expectedseq = 0; - } - } - -} - static int init_receivers(streamsock_t *ssock, int required) { int wind = ssock->nextwriteind; @@ -1262,116 +1413,29 @@ static int init_receivers(streamsock_t *ssock, int required) { return i; } -static int check_ndag_received(streamsock_t *ssock, int index, - unsigned int msglen, recvstream_t *rt) { - - ndag_encap_t *encaphdr; - ndag_monitor_t *mon; - uint8_t rectype; - - /* Check that we have a valid nDAG encap record */ - rectype = check_ndag_header(ssock->saved[index], (uint32_t)msglen); - - if (rectype == NDAG_PKT_KEEPALIVE) { - /* Keep-alive, reset startidle and carry on. Don't - * change nextwrite -- we want to overwrite the - * keep-alive with usable content. */ - return 0; - } else if (rectype != NDAG_PKT_ENCAPERF && - rectype != NDAG_PKT_CORSAROTAG) { - fprintf(stderr, "Received invalid record on the channel for %s:%u.\n", - ssock->groupaddr, ssock->port); - close(ssock->sock); - ssock->sock = -1; - return -1; - } - - ssock->rectype[index] = rectype; - ssock->savedsize[index] = msglen; - ssock->nextwriteind ++; - ssock->bufavail --; - - if (ssock->bufavail < 0) { - fprintf(stderr, "No space in buffer in check_ndag_received()\n"); - return -1; - } - if (ssock->nextwriteind >= ENCAP_BUFFERS) { - ssock->nextwriteind = 0; - } - - /* Get the useful info from the encap header */ - encaphdr=(ndag_encap_t *)(ssock->saved[index] + sizeof(ndag_common_t)); - - mon = ssock->monitorptr; - - if (mon->laststart == 0) { - mon->laststart = bswap_be_to_host64(encaphdr->started); - } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) { - mon->laststart = bswap_be_to_host64(encaphdr->started); - reset_expected_seqs(rt, mon); - - /* TODO what is a good way to indicate this to clients? - * set the loss counter in the ERF header? a bit rude? - * use another bit in the ERF header? - * add a queryable flag to libtrace_packet_t? - */ - - } - - if (ssock->expectedseq != 0) { - rt->missing_records += seq_cmp( - ntohl(encaphdr->seqno), ssock->expectedseq); - - } - ssock->expectedseq = ntohl(encaphdr->seqno) + 1; - if (ssock->expectedseq == 0) { - ssock->expectedseq ++; - } - - if (ssock->nextread == NULL) { - /* If this is our first read, set up 'nextread' - * by skipping past the nDAG headers */ - ssock->nextread = ssock->saved[0] + - sizeof(ndag_common_t) + sizeof(ndag_encap_t); - } - return 1; - -} - static int is_buffered_data_available(streamsock_t *ssock, struct timeval *tv, int *gottime) { int toret = 0; - /* Nothing to receive right now, but we should still - * count as 'ready' if at least one buffer is full */ - if (errno == EAGAIN || errno == EWOULDBLOCK) { - if (readable_data(ssock)) { - toret = 1; - } - if (!(*gottime)) { - gettimeofday(tv, NULL); - *gottime = 1; - } - if (ssock->startidle == 0) { - ssock->startidle = tv->tv_sec; - } else if (tv->tv_sec - ssock->startidle > NDAG_IDLE_TIMEOUT) { - fprintf(stderr, - "Closing channel %s:%u due to inactivity.\n", - ssock->groupaddr, - ssock->port); + if (readable_data(ssock)) { + toret = 1; + } + if (!(*gottime)) { + gettimeofday(tv, NULL); + *gottime = 1; + } + if (ssock->startidle == 0) { + ssock->startidle = tv->tv_sec; + } else if (tv->tv_sec - ssock->startidle > NDAG_IDLE_TIMEOUT) { + fprintf(stderr, + "Closing channel %s:%u due to inactivity.\n", + ssock->groupaddr, + ssock->port); - close(ssock->sock); - ssock->sock = -1; - } - return toret; - } - fprintf(stderr, - "Error receiving encapsulated records from %s:%u -- %s \n", - ssock->groupaddr, ssock->port, - strerror(errno)); - close(ssock->sock); - ssock->sock = -1; - return 0; + close(ssock->sock); + ssock->sock = -1; + } + return toret; } #if HAVE_DECL_RECVMMSG @@ -1387,13 +1451,22 @@ static int receive_from_single_socket_recvmmsg(streamsock_t *ssock, ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail, MSG_DONTWAIT, NULL); if (ret < 0) { - return is_buffered_data_available(ssock, tv, gottime); + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return is_buffered_data_available(ssock, tv, gottime); + } + fprintf(stderr, + "Error receiving encapsulated records from %s:%u -- %s \n", + ssock->groupaddr, ssock->port, + strerror(errno)); + close(ssock->sock); + ssock->sock = -1; + return 0; } ssock->startidle = 0; for (i = 0; i < ret; i++) { ndagstat = check_ndag_received(ssock, ssock->nextwriteind, - ssock->mmsgbufs[i].msg_len, rt); + ssock->mmsgbufs[i].msg_len, rt, 0); if (ndagstat == -1) { break; } @@ -1419,12 +1492,22 @@ static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv, } ret = recvmsg(ssock->sock, &(ssock->singlemsg), MSG_DONTWAIT); - if (ret < 0) { + if (ret == 0 || (ret == -1 && (errno == EAGAIN || + errno == EWOULDBLOCK))) { + if (ret == 0 && ssock->sock != -1) { + close(ssock->sock); + ssock->sock = -1; + } if (is_buffered_data_available(ssock, tv, gottime)) { return 1; + } else { + return 0; } - fprintf(stderr, "recvmsg failed: %s\n", strerror(errno)); - } else if (ret == 0) { + } else if (ret == -1) { + fprintf(stderr, + "Error receiving encapsulated records from %s:%u -- %s \n", + ssock->groupaddr, ssock->port, + strerror(errno)); close(ssock->sock); ssock->sock = -1; return 0; @@ -1432,7 +1515,7 @@ static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv, ssock->startidle = 0; - ndagstat = check_ndag_received(ssock, ssock->nextwriteind, ret, rt); + ndagstat = check_ndag_received(ssock, ssock->nextwriteind, ret, rt, 0); if (ndagstat <= 0) { toret = 0; } else { @@ -1454,9 +1537,11 @@ static int receive_from_sockets(recvstream_t *rt, uint8_t socktype) { gottime = 0; if (rt->maxfd == -1) { - return 0; + return READ_MESSAGE; } + FD_ZERO(&fds); + for (i = 0; i < rt->sourcecount; i++) { if (rt->sources[i].sock == -1) { continue; @@ -1476,9 +1561,6 @@ static int receive_from_sockets(recvstream_t *rt, uint8_t socktype) { continue; } - if (maxfd == 0) { - FD_ZERO(&fds); - } FD_SET(rt->sources[i].sock, &fds); if (maxfd < rt->sources[i].sock) { maxfd = rt->sources[i].sock; @@ -1486,24 +1568,22 @@ static int receive_from_sockets(recvstream_t *rt, uint8_t socktype) { } - if (maxfd <= 0) { - return readybufs; - } + if (maxfd > 0) { - zerotv.tv_sec = 0; - zerotv.tv_usec = 0; - if (select(maxfd + 1, &fds, NULL, NULL, &zerotv) == -1) { - /* log the error? XXX */ - fprintf(stderr, "select() failed for recvstream %d: %s\n", + zerotv.tv_sec = 0; + zerotv.tv_usec = 0; + if (select(maxfd + 1, &fds, NULL, NULL, &zerotv) == -1) { + /* log the error? XXX */ + fprintf(stderr, + "select() failed for recvstream %d: %s\n", rt->threadindex, strerror(errno)); - return -1; + return READ_ERROR; + } } for (i = 0; i < rt->sourcecount; i++) { - if (rt->sources[i].sock == -1) { - continue; - } - if (!FD_ISSET(rt->sources[i].sock, &fds)) { + if (rt->sources[i].sock == -1 || + !FD_ISSET(rt->sources[i].sock, &fds)) { if (rt->sources[i].bufavail < ENCAP_BUFFERS) { readybufs ++; } @@ -1558,7 +1638,7 @@ static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt, } if ((iserr = receive_from_sockets(rt, - FORMAT_DATA->socktype)) < 0) { + FORMAT_DATA->socktype)) == READ_ERROR) { return iserr; } else if (iserr > 0) { /* At least one of our input sockets has available @@ -1567,9 +1647,8 @@ static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt, } /* if we have access to the message queue check for a message - * otherwise we need to return and let libtrace check for a message */ - if ((msg && libtrace_message_queue_count(msg) > 0) || !msg) { + if ((msg && libtrace_message_queue_count(msg) > 0)) { return READ_MESSAGE; } @@ -1580,8 +1659,11 @@ static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt, usleep(100); } - } while (1); + } while (!rt->halted); + if (rt->halted) { + return 0; + } return iserr; } @@ -1609,7 +1691,7 @@ static int receive_encap_records_nonblock(libtrace_t *libtrace, recvstream_t *rt } static streamsock_t *select_next_packet(recvstream_t *rt) { - int i, res = -1; + int i; streamsock_t *ssock = NULL; uint64_t earliest = 0; uint64_t currentts = 0; @@ -1631,13 +1713,8 @@ static streamsock_t *select_next_packet(recvstream_t *rt) { if (earliest == 0 || earliest > currentts) { earliest = currentts; ssock = &(rt->sources[i]); - res = i; } } - assert(ssock == NULL || res == rt->sourcecount - 1); - if (ssock != NULL) { - printf("%d\n", res); - } return ssock; } @@ -1645,16 +1722,21 @@ static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) { int rem, ret; streamsock_t *nextavail = NULL; + + if (FORMAT_DATA->receivers[0].halted) { + return 0; + } + rem = receive_encap_records_block(libtrace, &(FORMAT_DATA->receivers[0]), packet, NULL); - if (rem < 0) { + if (rem == READ_ERROR || rem == READ_EOF) { return rem; } nextavail = select_next_packet(&(FORMAT_DATA->receivers[0])); if (nextavail == NULL) { - return 0; + return READ_MESSAGE; } /* nextread should point at an ERF header, so prepare 'packet' to be @@ -1678,6 +1760,10 @@ static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t, rt = (recvstream_t *)t->format_data; + if (rt->halted) { + return 0; + } + do { /* Only check for messages once per batch */ if (read_packets == 0) { @@ -1704,9 +1790,11 @@ static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t, if (read_packets >= nb_packets) { break; } - } while (1); + } while (!rt->halted); - printf("pread done\n"); + if (rt->halted) { + return 0; + } for (i = 0; i < rt->sourcecount; i++) { streamsock_t *src = &(rt->sources[i]); @@ -1730,6 +1818,10 @@ static libtrace_eventobj_t trace_event_ndag(libtrace_t *libtrace, int rem, i; streamsock_t *nextavail = NULL; + if (FORMAT_DATA->receivers[0].halted) { + event.type = TRACE_EVENT_TERMINATE; + return event; + } /* Only check for messages once per call */ rem = receiver_read_messages(libtrace, &(FORMAT_DATA->receivers[0]), FORMAT_DATA->socktype); From cc64c6156828b538ef56e395a8f4a168a74982aa Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Wed, 25 Oct 2023 22:56:40 +1300 Subject: [PATCH 5/8] traceucast: remove mtu CLI option There's no need to force outgoing messages to fit into a buffer size that matches the link MTU -- TCP will do segmentation for us, so we don't have to truncate large packets that exceed the MTU. Instead, let's just use a buffer that is guaranteed to fit any packet and save ourselves a lot of potential grief. I've also adjusted the EAGAIN handling to use an increasing backoff for retries, and doubled the number of allowed attempts before giving up. --- tools/tracemcast/traceucast.c | 42 +++++++++++++---------------------- 1 file changed, 15 insertions(+), 27 deletions(-) diff --git a/tools/tracemcast/traceucast.c b/tools/tracemcast/traceucast.c index eba5137c..b3593ae1 100644 --- a/tools/tracemcast/traceucast.c +++ b/tools/tracemcast/traceucast.c @@ -86,7 +86,6 @@ struct global_params { uint64_t starttime; uint16_t firstport; int readercount; - uint16_t mtu; }; struct beacon_params { @@ -113,6 +112,8 @@ typedef struct read_thread_data { } read_thread_data_t; +#define MAX_PACKET_SIZE 10000 + volatile int halted = 0; static void cleanup_signal(int signal UNUSED) { @@ -240,7 +241,7 @@ static void *init_reader_thread(libtrace_t *trace, rdata->threadid = trace_get_perpkt_thread_id(t); rdata->streamport = gparams->firstport + rdata->threadid; rdata->streamfd = -1; - rdata->pbuffer = calloc(gparams->mtu, sizeof(uint8_t)); + rdata->pbuffer = calloc(MAX_PACKET_SIZE, sizeof(uint8_t)); rdata->writeptr = rdata->pbuffer; rdata->seqno = 1; rdata->target = NULL; @@ -261,6 +262,7 @@ static int send_ndag_packet(read_thread_data_t *rdata) { int sentsofar = 0; int ret = 0; int attempts = 0; + int backoff = 5000; rdata->encaphdr->recordcount = ntohs(rdata->reccount); @@ -268,13 +270,18 @@ static int send_ndag_packet(read_thread_data_t *rdata) { s = send(rdata->streamfd, rdata->pbuffer + sentsofar, rem, MSG_DONTWAIT); if (s < 0) { - if ((errno == EAGAIN || errno == EWOULDBLOCK) && attempts < 10) { + if ((errno == EAGAIN || errno == EWOULDBLOCK) && attempts < 20) { attempts ++; - usleep(5000); + usleep(backoff); + backoff = backoff * 2; + if (backoff > 1000000) { + backoff = 1000000; + } continue; } fprintf(stderr, "traceucast: thread %d failed to send streamed ERF packet: %s\n", rdata->threadid, strerror(errno)); + fprintf(stderr, "%u\n", rdata->seqno); ret = -1; break; } @@ -417,7 +424,7 @@ static libtrace_packet_t *packet_reader_thread(libtrace_t *trace UNUSED, l2 = trace_get_layer2(packet, <ype, &rem); erfts = trace_get_erf_timestamp(packet); - if (gparams->mtu - (rdata->writeptr - rdata->pbuffer) < + if (MAX_PACKET_SIZE - (rdata->writeptr - rdata->pbuffer) < rem + dag_record_size) { /* if not and if there is already something in the buffer, send it then @@ -453,12 +460,6 @@ static libtrace_packet_t *packet_reader_thread(libtrace_t *trace UNUSED, rdata->seqno ++; } - if (rem > gparams->mtu - (rdata->writeptr - rdata->pbuffer) - - (dag_record_size + 2)) { - rem = gparams->mtu - (rdata->writeptr - rdata->pbuffer); - rem -= (dag_record_size + 2); - } - /* put an ERF header in at writeptr */ rdata->writeptr += construct_erf_header(rdata, packet, ltype, rem, erfts); @@ -468,7 +469,7 @@ static libtrace_packet_t *packet_reader_thread(libtrace_t *trace UNUSED, rdata->reccount ++; /* if the buffer is close to full, just send the buffer anyway */ - if (gparams->mtu - (rdata->writeptr - rdata->pbuffer) - + if (MAX_PACKET_SIZE - (rdata->writeptr - rdata->pbuffer) - (dag_record_size + 2) < 64) { if (send_ndag_packet(rdata) < 0) { rdata->failed = 1; @@ -549,7 +550,7 @@ static uint32_t form_beacon(char **buffer, struct beacon_params *bparams) { uint16_t *next; int i; - if (bsize > bparams->gparams->mtu) { + if (bsize > MAX_PACKET_SIZE) { fprintf(stderr, "traceucast: beacon is too large to fit in a single datagram, either increase MTU or reduce number of threads\n"); return 0; } @@ -626,7 +627,6 @@ static void usage(char *prog) { " -m --monitorid=idnum Tag all streamed packets with the given identifier\n" " -c --clientaddr=address Connect to a ndagtcp receiver at this address/hostname\n" " -p --beaconport=port Send beacons to the receiver on this port number\n" - " -M --mtu=bytes Limit streamed message size to this number of bytes\n" " -t --threads=count Use this number of packet processing threads\n" " -h --help Show this usage statement\n"); } @@ -640,7 +640,6 @@ int main(int argc, char *argv[]) { struct beacon_params bparams; int threads = 1; struct timeval tv; - uint16_t mtu = NDAG_MAX_DGRAM_SIZE; pthread_t beacontid = 0; sigset_t sig_before, sig_block_all; @@ -655,12 +654,11 @@ int main(int argc, char *argv[]) { { "clientaddr", 1, 0, 'c' }, { "beaconport", 1, 0, 'p' }, { "threads", 1, 0, 't' }, - { "mtu", 1, 0, 'M' }, { "help", 0, 0, 'h' }, { NULL, 0, 0, 0 }, }; - int c = getopt_long(argc, argv, "M:t:f:m:c:p:h", long_options, + int c = getopt_long(argc, argv, "t:f:m:c:p:h", long_options, &optindex); if (c == -1) { break; @@ -670,9 +668,6 @@ int main(int argc, char *argv[]) { case 'f': filterstring = optarg; break; - case 'M': - mtu = (uint16_t)strtoul(optarg, NULL, 0); - break; case 'm': gparams.monitorid = (uint16_t)strtoul(optarg, NULL, 0); break; @@ -711,18 +706,11 @@ int main(int argc, char *argv[]) { "traceucast: no client address specified to receive our streams. Exiting\n"); return 1; } - if (mtu >= NDAG_MAX_DGRAM_SIZE) { - mtu = NDAG_MAX_DGRAM_SIZE; - } else if (mtu < 536) { - mtu = 536; - } - gettimeofday(&tv, NULL); gparams.starttime = bswap_host_to_le64(((tv.tv_sec - 1509494400) * 1000) + (tv.tv_usec / 1000.0)); gparams.readercount = threads; - gparams.mtu = mtu; gparams.firstport = 10000 + (rand() % 52000); From 8ffe1abf51b61ae8cf22e01556af62ffc5db01cb Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Wed, 25 Oct 2023 22:59:58 +1300 Subject: [PATCH 6/8] ndag: rewrite packet parsing to handle TCP properly TCP does not guarantee that any particular receive will contain a complete message, nor that there won't be multiple messages arriving in a single receive call. This means that an ndag header, ERF header or single ERF record can span multiple buffers and we need to be able to stitch those together as needed. We also need to handle cases where we need to ask the socket for more data to get a complete header / record. --- lib/format_ndag.c | 528 +++++++++++++++++++++++++--------------------- 1 file changed, 293 insertions(+), 235 deletions(-) diff --git a/lib/format_ndag.c b/lib/format_ndag.c index 0d2cc901..4b34d8a9 100644 --- a/lib/format_ndag.c +++ b/lib/format_ndag.c @@ -92,9 +92,10 @@ typedef struct streamsock { char *nextread; int nextreadind; int nextwriteind; + uint16_t nextrlen; int savedsize[ENCAP_BUFFERS]; - int savedreccount[ENCAP_BUFFERS]; - uint8_t rectype[ENCAP_BUFFERS]; + int expectedreccount; + uint8_t rectype; uint64_t nextts; uint32_t startidle; uint64_t total_recordcount; @@ -152,6 +153,16 @@ typedef struct ndagreadermessage { } ndag_internal_message_t; +#define NEXT_BUFFER(ssock, nr) \ + ssock->savedsize[nr] = 0; \ + ssock->bufwaiting ++; \ + nr ++; \ + if (nr == ENCAP_BUFFERS) { \ + nr = 0; \ + } \ + ssock->nextread = ssock->saved[nr]; \ + ssock->nextreadind = nr; + static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) { /* Calculate seq_a - seq_b, taking wraparound into account */ @@ -201,98 +212,6 @@ static inline void reset_expected_seqs(recvstream_t *rt, ndag_monitor_t *mon) { } -static int check_ndag_received(streamsock_t *ssock, int index, - unsigned int msglen, recvstream_t *rt, unsigned int offset) { - - ndag_encap_t *encaphdr; - ndag_monitor_t *mon; - uint8_t rectype; - - if (msglen < sizeof(ndag_common_t)) { - return 0; - } - - /* Check that we have a valid nDAG encap record */ - rectype = check_ndag_header(ssock->saved[index] + offset, - (uint32_t)msglen); - - if (rectype == NDAG_PKT_KEEPALIVE) { - /* Keep-alive, reset startidle and carry on. Don't - * change nextwrite -- we want to overwrite the - * keep-alive with usable content. */ - if (ssock->socktype == NDAG_SOCKET_TYPE_MULTICAST) { - return 0; - } - /* skip over keep alive */ - offset += sizeof(ndag_common_t); - msglen -= sizeof(ndag_common_t); - return check_ndag_received(ssock, index, msglen, rt, offset); - - } else if (rectype != NDAG_PKT_ENCAPERF && - rectype != NDAG_PKT_CORSAROTAG) { - fprintf(stderr, "Received invalid record on the channel for %s:%u.\n", - ssock->groupaddr, ssock->port); - close(ssock->sock); - ssock->sock = -1; - return -1; - } - - ssock->rectype[index] = rectype; - if (offset == 0) { - ssock->savedsize[index] = msglen; - ssock->nextwriteind ++; - ssock->bufavail --; - - if (ssock->bufavail < 0) { - fprintf(stderr, "No space in buffer in check_ndag_received()\n"); - return -1; - } - if (ssock->nextwriteind >= ENCAP_BUFFERS) { - ssock->nextwriteind = 0; - } - - } - /* Get the useful info from the encap header */ - encaphdr=(ndag_encap_t *)(ssock->saved[index] + offset + - sizeof(ndag_common_t)); - - ssock->savedreccount[index] = ntohs(encaphdr->recordcount); - mon = ssock->monitorptr; - - if (mon->laststart == 0) { - mon->laststart = bswap_be_to_host64(encaphdr->started); - } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) { - mon->laststart = bswap_be_to_host64(encaphdr->started); - reset_expected_seqs(rt, mon); - - /* TODO what is a good way to indicate this to clients? - * set the loss counter in the ERF header? a bit rude? - * use another bit in the ERF header? - * add a queryable flag to libtrace_packet_t? - */ - - } - - if (ssock->expectedseq != 0) { - rt->missing_records += seq_cmp( - ntohl(encaphdr->seqno), ssock->expectedseq); - - } - ssock->expectedseq = ntohl(encaphdr->seqno) + 1; - if (ssock->expectedseq == 0) { - ssock->expectedseq ++; - } - - /* set up 'nextread' * by skipping past the nDAG headers */ - if (index == ssock->nextreadind) { - ssock->nextread = ssock->saved[index] + offset + - sizeof(ndag_common_t) + sizeof(ndag_encap_t); - } - - return 1; - -} - static int join_multicast_group(char *groupaddr, char *localiface, char *portstr, uint16_t portnum, struct addrinfo **srcinfo) { @@ -722,7 +641,7 @@ static void *ndagtcp_controller_run(void *tdata) { libtrace_t *libtrace = (libtrace_t *)tdata; int sock = -1; - while (is_halted(libtrace) == -1) { + while (is_halted(libtrace) == -1 && !ndag_paused) { sock = accept_ndagtcp_connection(libtrace, FORMAT_DATA->multicastgroup, FORMAT_DATA->portstr); @@ -968,6 +887,7 @@ static int ndag_prepare_packet_stream_corsarotag(libtrace_t *restrict libtrace, nr = ssock->nextreadind; ssock->nextread += rlen; ssock->nextts = 0; + ssock->nextrlen = 0; ssock->reccount ++; if (ssock->nextread - ssock->saved[nr] > ssock->savedsize[nr]) { @@ -990,7 +910,8 @@ static int ndag_prepare_packet_stream_corsarotag(libtrace_t *restrict libtrace, ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) + sizeof(ndag_encap_t); ssock->nextreadind = nr; - } else if (ssock->reccount >= ssock->savedreccount[nr]) { + } else if (ssock->reccount >= ssock->expectedreccount) { + /* int x = check_ndag_received(ssock, ssock->nextreadind, ssock->savedsize[nr] - (ssock->nextread - ssock->saved[nr]), rt, @@ -1011,6 +932,7 @@ static int ndag_prepare_packet_stream_corsarotag(libtrace_t *restrict libtrace, sizeof(ndag_encap_t); ssock->nextreadind = nr; } + */ } packet->order = ((uint64_t) ntohl(taghdr->ts_sec)) << 32; @@ -1020,6 +942,197 @@ static int ndag_prepare_packet_stream_corsarotag(libtrace_t *restrict libtrace, return rlen; } +static int got_complete_packet(streamsock_t *ssock) { + unsigned int required, available; + int nr = ssock->nextreadind; + int next; + + if (ssock->rectype == NDAG_PKT_ENCAPERF) { + required = ssock->nextrlen; + } else { + required = 0; + } + if (required == 0) { + return 0; + } + + next = ssock->nextreadind; + available = ssock->savedsize[nr] - (ssock->nextread - ssock->saved[nr]); + while (required > available) { + next = ((next + 1) % ENCAP_BUFFERS); + if (ssock->savedsize[next] == 0) { + /* no more data available */ + return 0; + } + available += ssock->savedsize[next]; + } + return 1; +} + +static unsigned int copy_tmp_buffer(streamsock_t *ssock, char *tmpbuf, + unsigned int required) { + + char *ptr = tmpbuf; + int next, first = 1; + unsigned int available; + int nr = ssock->nextreadind; + + next = ssock->nextreadind; + available = ssock->savedsize[nr] - (ssock->nextread - ssock->saved[nr]); + + while (required > available) { + + if (first && available > 0) { + memcpy(ptr, ssock->nextread, available); + ptr += available; + first = 0; + } + next = ((next + 1) % ENCAP_BUFFERS); + if (ssock->savedsize[next] == 0) { + /* no more data available */ + return 0; + } + memcpy(ptr, ssock->saved[next], ssock->savedsize[next]); + available += ssock->savedsize[next]; + } + return available; +} + +static inline void consume_streamsock_data(streamsock_t *ssock, + unsigned int required) { + + int nr = ssock->nextreadind; + unsigned int available = (ssock->savedsize[nr] - + (ssock->nextread - ssock->saved[nr])); + + while (available < required) { + required -= available; + NEXT_BUFFER(ssock, nr) + nr = ssock->nextreadind; + available = ssock->savedsize[nr]; + assert(available > 0); + } + ssock->nextread += required; + /* handle the case where we use up the current buffer exactly */ + if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) { + NEXT_BUFFER(ssock, ssock->nextreadind); + } +} + +static int process_ndag_encap_headers(streamsock_t *ssock, recvstream_t *rt) { + + ndag_encap_t *encaphdr; + ndag_monitor_t *mon; + uint8_t rectype; + + int nr; + unsigned int available, required; + char tmpbuf[ENCAP_BUFSIZE * 2]; + char *usebuf; + + nr = ssock->nextreadind; + available = ssock->savedsize[nr] - (ssock->nextread - ssock->saved[nr]); + required = sizeof(ndag_common_t) + sizeof(ndag_encap_t); + + usebuf = ssock->nextread; + if (available < required) { + available = copy_tmp_buffer(ssock, tmpbuf, required); + if (available == 0) { + return 0; + } + usebuf = tmpbuf; + } + + rectype = check_ndag_header(usebuf, available); + if (rectype == NDAG_PKT_KEEPALIVE) { + consume_streamsock_data(ssock, required); + return process_ndag_encap_headers(ssock, rt); + } else if (rectype != NDAG_PKT_ENCAPERF) { + fprintf(stderr, "Received invalid record on the channel for %s:%u.\n", + ssock->groupaddr, ssock->port); + assert(0); + return -1; + } + + ssock->rectype = rectype; + encaphdr=(ndag_encap_t *)(usebuf + sizeof(ndag_common_t)); + ssock->expectedreccount = ntohs(encaphdr->recordcount); + mon = ssock->monitorptr; + + if (mon->laststart == 0) { + mon->laststart = bswap_be_to_host64(encaphdr->started); + } else if (mon->laststart != bswap_be_to_host64( + encaphdr->started)) { + mon->laststart = bswap_be_to_host64(encaphdr->started); + reset_expected_seqs(rt, mon); + } + if (ssock->expectedseq != 0) { + rt->missing_records += seq_cmp( + ntohl(encaphdr->seqno), + ssock->expectedseq); + } + ssock->expectedseq = ntohl(encaphdr->seqno) + 1; + if (ssock->expectedseq == 0) { + ssock->expectedseq ++; + } + consume_streamsock_data(ssock, required); + return 1; +} + +static int process_ndag_erf_headers(streamsock_t *ssock, recvstream_t *rt) { + + dag_record_t *erfptr; + + int nr; + unsigned int available, required; + char tmpbuf[ENCAP_BUFSIZE * 2]; + char *usebuf; + + nr = ssock->nextreadind; + + if (ssock->savedsize[nr] == 0) { + /* no data available? I don't think we should be able to + * get here in that case... */ + return 0; + } + + if (ssock->expectedreccount == 0) { + int r; + if ((r = process_ndag_encap_headers(ssock, rt)) <= 0) { + return -1; + } + if (r == 0) { + return 0; + } + } + + available = ssock->savedsize[nr] - (ssock->nextread - ssock->saved[nr]); + required = sizeof(dag_record_t); + //required = dag_record_size; + usebuf = ssock->nextread; + + if (available < required) { + available = copy_tmp_buffer(ssock, tmpbuf, required); + if (available == 0) { + return 0; + } + usebuf = tmpbuf; + } + + erfptr = (dag_record_t *)usebuf; + + ssock->nextts = bswap_le_to_host64(erfptr->ts); + ssock->nextrlen = ntohs(erfptr->rlen); + assert(ssock->nextrlen != 0); + if (rt->received_packets > 0) { + rt->dropped_upstream += ntohs(erfptr->lctr); + } + + /* don't consume the erf header, we'll need it later on */ + + return 1; +} + static int ndag_prepare_packet_stream_encaperf(libtrace_t *restrict libtrace, recvstream_t *restrict rt, streamsock_t *restrict ssock, @@ -1028,26 +1141,35 @@ static int ndag_prepare_packet_stream_encaperf(libtrace_t *restrict libtrace, /* XXX flags is constant, so we can tell the compiler to not * bother copying over the parameter */ - + char tmpbuf[ENCAP_BUFSIZE * 2]; + int nr, rlen; + unsigned int available; + char *usebuf = ssock->nextread; dag_record_t *erfptr; - ndag_encap_t *encaphdr; - uint16_t ndag_reccount = 0; - int nr; - uint16_t rlen; - /* - if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) { - packet->buf_control = TRACE_CTRL_PACKET; - } else { - packet->buf_control = TRACE_CTRL_EXTERNAL; + nr = ssock->nextreadind; + available = ssock->savedsize[nr] - (ssock->nextread - ssock->saved[nr]); + + if (ssock->nextrlen == 0 || ssock->nextrlen > ENCAP_BUFSIZE) { + return -1; + } + + if (available < ssock->nextrlen) { + if (copy_tmp_buffer(ssock, tmpbuf, ssock->nextrlen) == 0) { + return 0; + } + usebuf = tmpbuf; } - */ - packet->buf_control = TRACE_CTRL_EXTERNAL; + if (packet->buf_control != TRACE_CTRL_PACKET || !packet->buffer) { + packet->buf_control = TRACE_CTRL_PACKET; + packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE); + } packet->trace = libtrace; - packet->buffer = ssock->nextread; - packet->header = ssock->nextread; + memcpy(packet->buffer, usebuf, ssock->nextrlen); + packet->header = packet->buffer; packet->type = TRACE_RT_DATA_ERF; + packet->error = ssock->nextrlen; erfptr = (dag_record_t *)packet->header; @@ -1068,83 +1190,23 @@ static int ndag_prepare_packet_stream_encaperf(libtrace_t *restrict libtrace, } } - /* Update upstream drops using lctr */ - - if (erfptr->type == TYPE_DSM_COLOR_ETH) { - /* TODO */ - } else { - if (rt->received_packets > 0) { - assert(erfptr->lctr == 0); - rt->dropped_upstream += ntohs(erfptr->lctr); - } - } - rt->received_packets ++; ssock->total_recordcount += 1; - nr = ssock->nextreadind; - encaphdr = (ndag_encap_t *)(ssock->saved[nr] + - sizeof(ndag_common_t)); - - ndag_reccount = ntohs(encaphdr->recordcount); - if ((ndag_reccount & 0x8000) != 0) { - /* Record was truncated -- update rlen appropriately */ - rlen = ssock->savedsize[nr] - - (ssock->nextread - ssock->saved[nr]); - erfptr->rlen = htons(rlen); - } else { - rlen = ntohs(erfptr->rlen); - } - ssock->nextread += rlen; + consume_streamsock_data(ssock, ssock->nextrlen); + + rlen = ssock->nextrlen; + ssock->nextts = 0; ssock->reccount ++; + ssock->nextrlen = 0; - if (ssock->nextread - ssock->saved[nr] > ssock->savedsize[nr]) { - trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Walked past the end of the " - "nDAG receive buffer, probably due to a invalid rlen, in ndag_prepare_packet_stream()"); - return -1; - } - - if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) { - /* Read everything from this buffer, mark as empty and - * move on. */ - ssock->savedsize[nr] = 0; - ssock->bufwaiting ++; + if (ssock->reccount >= ssock->expectedreccount) { + ssock->expectedreccount = 0; ssock->reccount = 0; - - nr ++; - if (nr == ENCAP_BUFFERS) { - nr = 0; - } - ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) + - sizeof(ndag_encap_t); - ssock->nextreadind = nr; - } else if (ssock->reccount >= ssock->savedreccount[nr]) { - int x = check_ndag_received(ssock, ssock->nextreadind, - ssock->savedsize[nr] - (ssock->nextread - - ssock->saved[nr]), rt, - ssock->nextread - ssock->saved[nr]); - ssock->reccount = 0; - if (x <= 0) { - ssock->savedsize[nr] = 0; - ssock->bufwaiting ++; - ssock->reccount = 0; - - nr ++; - if (nr == ENCAP_BUFFERS) { - nr = 0; - } - ssock->nextread = ssock->saved[nr] + - sizeof(ndag_common_t) + - sizeof(ndag_encap_t); - ssock->nextreadind = nr; - } - - } packet->order = erf_get_erf_timestamp(packet); - packet->error = rlen; packet->cached.link_type = erf_get_link_type(packet); return rlen; } @@ -1155,12 +1217,12 @@ static int ndag_prepare_packet_stream(libtrace_t *restrict libtrace, libtrace_packet_t *restrict packet, uint32_t flags UNUSED) { - if (ssock->rectype[ssock->nextreadind] == NDAG_PKT_ENCAPERF) { + if (ssock->rectype == NDAG_PKT_ENCAPERF) { return ndag_prepare_packet_stream_encaperf(libtrace, rt, ssock, packet); } - if (ssock->rectype[ssock->nextreadind] == NDAG_PKT_CORSAROTAG) { + if (ssock->rectype == NDAG_PKT_CORSAROTAG) { return ndag_prepare_packet_stream_corsarotag(libtrace, rt, ssock, packet); } @@ -1275,13 +1337,18 @@ static int add_new_streamsock(libtrace_t *libtrace, ssock->bufwaiting = 0; ssock->startidle = 0; ssock->nextts = 0; + ssock->nextrlen = 0; ssock->reccount = 0; + ssock->expectedreccount = 0; for (i = 0; i < ENCAP_BUFFERS; i++) { ssock->saved[i] = (char *)malloc(ENCAP_BUFSIZE); ssock->savedsize[i] = 0; - ssock->savedreccount[i] = 0; } + ssock->nextread = ssock->saved[0]; + ssock->nextreadind = 0; + ssock->nextwriteind = 0; + ssock->total_recordcount = 0; if (socktype == NDAG_SOCKET_TYPE_MULTICAST) { ssock->sock = join_multicast_group(src.groupaddr, @@ -1324,10 +1391,6 @@ static int add_new_streamsock(libtrace_t *libtrace, memset(&(ssock->singlemsg), 0, sizeof(ssock->singlemsg)); ssock->singlemsg.msg_iov = (struct iovec *) calloc(1, sizeof(struct iovec)); - ssock->nextread = NULL;; - ssock->nextreadind = 0; - ssock->nextwriteind = 0; - ssock->total_recordcount = 0; rt->sourcecount += 1; return ssock->port; @@ -1357,23 +1420,15 @@ static int receiver_read_messages(libtrace_t *libtrace, static inline int readable_data(streamsock_t *ssock) { - if (ssock->savedsize[ssock->nextreadind] == 0) { + if (ssock->bufavail == ENCAP_BUFFERS) { return 0; } - /* this one shouldn't happen, but just to be safe... */ - if (ssock->savedreccount[ssock->nextreadind] == 0) { - return 0; - } - /* - if (ssock->nextread - ssock->saved[ssock->nextreadind] >= - ssock->savedsize[ssock->nextreadind]) { + if (ssock->savedsize[ssock->nextreadind] == 0) { return 0; } - */ - return 1; - + return 1; } static int init_receivers(streamsock_t *ssock, int required) { @@ -1400,7 +1455,8 @@ static int init_receivers(streamsock_t *ssock, int required) { } ssock->mmsgbufs[i].msg_len = 0; - ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_base = ssock->saved[wind]; + ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_base = + ssock->saved[wind]; ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_len = ENCAP_BUFSIZE; ssock->mmsgbufs[i].msg_hdr.msg_iovlen = 1; @@ -1440,10 +1496,9 @@ static int is_buffered_data_available(streamsock_t *ssock, struct timeval *tv, #if HAVE_DECL_RECVMMSG static int receive_from_single_socket_recvmmsg(streamsock_t *ssock, - struct timeval *tv, int *gottime, recvstream_t *rt) { + struct timeval *tv, int *gottime) { - int ret, ndagstat, avail; - int toret = 0; + int ret, avail; int i; avail = init_receivers(ssock, ssock->bufavail); @@ -1465,25 +1520,19 @@ static int receive_from_single_socket_recvmmsg(streamsock_t *ssock, ssock->startidle = 0; for (i = 0; i < ret; i++) { - ndagstat = check_ndag_received(ssock, ssock->nextwriteind, - ssock->mmsgbufs[i].msg_len, rt, 0); - if (ndagstat == -1) { - break; - } - - if (ndagstat == 1) { - toret = 1; - } + ssock->savedsize[ssock->nextwriteind] = + ssock->mmsgbufs[i].msg_len; + ssock->nextwriteind = (ssock->nextwriteind + 1) % ENCAP_BUFFERS; + ssock->bufavail --; } - return toret; + return 1; } #endif static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv, - int *gottime, recvstream_t *rt) { + int *gottime) { - int ret, ndagstat, avail; - int toret = 0; + int ret, avail; avail = init_receivers(ssock, ssock->bufavail); @@ -1515,14 +1564,10 @@ static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv, ssock->startidle = 0; - ndagstat = check_ndag_received(ssock, ssock->nextwriteind, ret, rt, 0); - if (ndagstat <= 0) { - toret = 0; - } else { - toret = 1; - } - - return toret; + ssock->savedsize[ssock->nextwriteind] = ret; + ssock->nextwriteind = (ssock->nextwriteind + 1) % ENCAP_BUFFERS; + ssock->bufavail --; + return 1; } static int receive_from_sockets(recvstream_t *rt, uint8_t socktype) { @@ -1592,12 +1637,12 @@ static int receive_from_sockets(recvstream_t *rt, uint8_t socktype) { #if HAVE_DECL_RECVMMSG if (socktype == NDAG_SOCKET_TYPE_MULTICAST) { readybufs += receive_from_single_socket_recvmmsg( - &(rt->sources[i]), &tv, &gottime, rt); + &(rt->sources[i]), &tv, &gottime); continue; } #endif readybufs += receive_from_single_socket( - &(rt->sources[i]), &tv, &gottime, rt); + &(rt->sources[i]), &tv, &gottime); } return readybufs; @@ -1691,11 +1736,10 @@ static int receive_encap_records_nonblock(libtrace_t *libtrace, recvstream_t *rt } static streamsock_t *select_next_packet(recvstream_t *rt) { - int i; + int i, r; streamsock_t *ssock = NULL; uint64_t earliest = 0; uint64_t currentts = 0; - dag_record_t *daghdr; for (i = 0; i < rt->sourcecount; i ++) { if (!readable_data(&(rt->sources[i]))) { @@ -1703,19 +1747,26 @@ static streamsock_t *select_next_packet(recvstream_t *rt) { } if (rt->sources[i].nextts == 0) { - daghdr = (dag_record_t *)(rt->sources[i].nextread); - currentts = bswap_le_to_host64(daghdr->ts); - rt->sources[i].nextts = currentts; - } else { - currentts = rt->sources[i].nextts; + r = process_ndag_erf_headers(&(rt->sources[i]), rt); + if (r < 0) { + return NULL; + } + if (r == 0) { + continue; + } } + assert(rt->sources[i].nextts != 0); + currentts = rt->sources[i].nextts; if (earliest == 0 || earliest > currentts) { earliest = currentts; ssock = &(rt->sources[i]); } } - return ssock; + if (ssock && got_complete_packet(ssock)) { + return ssock; + } + return NULL; } static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) { @@ -1754,7 +1805,7 @@ static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets) { recvstream_t *rt; - int rem, i; + int rem, i, r; size_t read_packets = 0; streamsock_t *nextavail = NULL; @@ -1766,7 +1817,7 @@ static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t, do { /* Only check for messages once per batch */ - if (read_packets == 0) { + if (nextavail == NULL) { rem = receive_encap_records_block(libtrace, rt, packets[read_packets], &t->messages); if (rem < 0) { @@ -1779,12 +1830,19 @@ static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t, } nextavail = select_next_packet(rt); if (nextavail == NULL) { - break; + continue; } - ndag_prepare_packet_stream(libtrace, rt, nextavail, + if ((r = ndag_prepare_packet_stream(libtrace, rt, nextavail, packets[read_packets], - TRACE_PREP_DO_NOT_OWN_BUFFER); + TRACE_PREP_DO_NOT_OWN_BUFFER)) < 0) { + return -1; + } + + if (r == 0) { + nextavail = NULL; + continue; + } read_packets ++; if (read_packets >= nb_packets) { From 8b75d0bec3307ac69aaf90be19c7cdb4eeb91b27 Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Wed, 25 Oct 2023 23:29:13 +1300 Subject: [PATCH 7/8] ndag: update corsarotag processing to handle TCP streams --- lib/format_ndag.c | 218 ++++++++++++++++++++++++++-------------------- 1 file changed, 123 insertions(+), 95 deletions(-) diff --git a/lib/format_ndag.c b/lib/format_ndag.c index 4b34d8a9..81c5c8c8 100644 --- a/lib/format_ndag.c +++ b/lib/format_ndag.c @@ -859,89 +859,6 @@ static int ndag_get_framing_length(const libtrace_packet_t *packet) { return 0; } -static int ndag_prepare_packet_stream_corsarotag(libtrace_t *restrict libtrace, - recvstream_t *restrict rt, - streamsock_t *restrict ssock, - libtrace_packet_t *restrict packet) { - - - corsaro_tagged_packet_header_t *taghdr; - int nr; - uint16_t rlen; - - packet->buf_control = TRACE_CTRL_EXTERNAL; - - packet->trace = libtrace; - packet->buffer = ssock->nextread; - packet->header = ssock->nextread; - packet->type = TRACE_RT_DATA_CORSARO_TAGGED; - - taghdr = (corsaro_tagged_packet_header_t *)packet->header; - - packet->payload = &(taghdr->tags); - - rlen = ntohs(taghdr->pktlen) + sizeof(corsaro_tagged_packet_header_t); - rt->received_packets ++; - ssock->total_recordcount += 1; - - nr = ssock->nextreadind; - ssock->nextread += rlen; - ssock->nextts = 0; - ssock->nextrlen = 0; - ssock->reccount ++; - - if (ssock->nextread - ssock->saved[nr] > ssock->savedsize[nr]) { - trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Walked past the end of the " - "nDAG receive buffer, probably due to a invalid taghdr->pktlen, in ndag_prepare_packet_stream_corsarotag()"); - return -1; - } - - if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) { - /* Read everything from this buffer, mark as empty and - * move on. */ - ssock->savedsize[nr] = 0; - ssock->bufwaiting ++; - ssock->reccount = 0; - - nr ++; - if (nr == ENCAP_BUFFERS) { - nr = 0; - } - ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) + - sizeof(ndag_encap_t); - ssock->nextreadind = nr; - } else if (ssock->reccount >= ssock->expectedreccount) { - /* - int x = check_ndag_received(ssock, ssock->nextreadind, - ssock->savedsize[nr] - (ssock->nextread - - ssock->saved[nr]), rt, - ssock->nextread - ssock->saved[nr]); - - ssock->reccount = 0; - if (x <= 0) { - ssock->savedsize[nr] = 0; - ssock->bufwaiting ++; - ssock->reccount = 0; - - nr ++; - if (nr == ENCAP_BUFFERS) { - nr = 0; - } - ssock->nextread = ssock->saved[nr] + - sizeof(ndag_common_t) + - sizeof(ndag_encap_t); - ssock->nextreadind = nr; - } - */ - } - - packet->order = ((uint64_t) ntohl(taghdr->ts_sec)) << 32; - packet->order += (((uint64_t) ntohl(taghdr->ts_usec)) << 32) / 1000000; - packet->error = rlen; - packet->cached.link_type = TRACE_TYPE_CORSAROTAG; - return rlen; -} - static int got_complete_packet(streamsock_t *ssock) { unsigned int required, available; int nr = ssock->nextreadind; @@ -1050,7 +967,6 @@ static int process_ndag_encap_headers(streamsock_t *ssock, recvstream_t *rt) { } else if (rectype != NDAG_PKT_ENCAPERF) { fprintf(stderr, "Received invalid record on the channel for %s:%u.\n", ssock->groupaddr, ssock->port); - assert(0); return -1; } @@ -1079,9 +995,10 @@ static int process_ndag_encap_headers(streamsock_t *ssock, recvstream_t *rt) { return 1; } -static int process_ndag_erf_headers(streamsock_t *ssock, recvstream_t *rt) { +static int process_ndag_corsaro_header(streamsock_t *ssock, + recvstream_t *rt UNUSED) { - dag_record_t *erfptr; + corsaro_tagged_packet_header_t *taghdr; int nr; unsigned int available, required; @@ -1096,19 +1013,48 @@ static int process_ndag_erf_headers(streamsock_t *ssock, recvstream_t *rt) { return 0; } - if (ssock->expectedreccount == 0) { - int r; - if ((r = process_ndag_encap_headers(ssock, rt)) <= 0) { - return -1; - } - if (r == 0) { + assert(ssock->expectedreccount != 0); + available = ssock->savedsize[nr] - (ssock->nextread - ssock->saved[nr]); + required = sizeof(corsaro_tagged_packet_header_t); + usebuf = ssock->nextread; + + if (available < required) { + available = copy_tmp_buffer(ssock, tmpbuf, required); + if (available == 0) { return 0; } + usebuf = tmpbuf; } + taghdr = (corsaro_tagged_packet_header_t *)usebuf; + + ssock->nextrlen = ntohs(taghdr->pktlen) + + sizeof(corsaro_tagged_packet_header_t); + ssock->nextts = ((uint64_t) ntohl(taghdr->ts_sec)) << 32; + ssock->nextts += (((uint64_t) ntohl(taghdr->ts_usec)) << 32) / 1000000; + return 1; +} + +static int process_ndag_erf_headers(streamsock_t *ssock, recvstream_t *rt) { + + dag_record_t *erfptr; + + int nr; + unsigned int available, required; + char tmpbuf[ENCAP_BUFSIZE * 2]; + char *usebuf; + + nr = ssock->nextreadind; + + if (ssock->savedsize[nr] == 0) { + /* no data available? I don't think we should be able to + * get here in that case... */ + return 0; + } + + assert(ssock->expectedreccount != 0); available = ssock->savedsize[nr] - (ssock->nextread - ssock->saved[nr]); - required = sizeof(dag_record_t); - //required = dag_record_size; + required = dag_record_size; usebuf = ssock->nextread; if (available < required) { @@ -1204,6 +1150,7 @@ static int ndag_prepare_packet_stream_encaperf(libtrace_t *restrict libtrace, if (ssock->reccount >= ssock->expectedreccount) { ssock->expectedreccount = 0; ssock->reccount = 0; + ssock->rectype = 0; } packet->order = erf_get_erf_timestamp(packet); @@ -1211,6 +1158,66 @@ static int ndag_prepare_packet_stream_encaperf(libtrace_t *restrict libtrace, return rlen; } +static int ndag_prepare_packet_stream_corsarotag(libtrace_t *restrict libtrace, + recvstream_t *restrict rt, + streamsock_t *restrict ssock, + libtrace_packet_t *restrict packet) { + + + corsaro_tagged_packet_header_t *taghdr; + char tmpbuf[ENCAP_BUFSIZE * 2]; + int nr, rlen; + unsigned int available; + char *usebuf = ssock->nextread; + + nr = ssock->nextreadind; + available = ssock->savedsize[nr] - (ssock->nextread - ssock->saved[nr]); + + if (ssock->nextrlen == 0 || ssock->nextrlen > ENCAP_BUFSIZE) { + return -1; + } + + if (available < ssock->nextrlen) { + if (copy_tmp_buffer(ssock, tmpbuf, ssock->nextrlen) == 0) { + return 0; + } + usebuf = tmpbuf; + } + + if (packet->buf_control != TRACE_CTRL_PACKET || !packet->buffer) { + packet->buf_control = TRACE_CTRL_PACKET; + packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE); + } + packet->trace = libtrace; + memcpy(packet->buffer, usebuf, ssock->nextrlen); + packet->header = packet->buffer; + packet->type = TRACE_RT_DATA_CORSARO_TAGGED; + packet->error = ssock->nextrlen; + + taghdr = (corsaro_tagged_packet_header_t *)packet->header; + + rt->received_packets ++; + ssock->total_recordcount += 1; + + consume_streamsock_data(ssock, ssock->nextrlen); + + rlen = ssock->nextrlen; + + ssock->nextts = 0; + ssock->reccount ++; + ssock->nextrlen = 0; + + if (ssock->reccount >= ssock->expectedreccount) { + ssock->expectedreccount = 0; + ssock->reccount = 0; + } + + packet->order = ((uint64_t) ntohl(taghdr->ts_sec)) << 32; + packet->order += (((uint64_t) ntohl(taghdr->ts_usec)) << 32) / 1000000; + packet->cached.link_type = TRACE_TYPE_CORSAROTAG; + return rlen; +} + static int ndag_prepare_packet_stream(libtrace_t *restrict libtrace, recvstream_t *restrict rt, streamsock_t *restrict ssock, @@ -1340,6 +1347,7 @@ static int add_new_streamsock(libtrace_t *libtrace, ssock->nextrlen = 0; ssock->reccount = 0; ssock->expectedreccount = 0; + ssock->rectype = 0; for (i = 0; i < ENCAP_BUFFERS; i++) { ssock->saved[i] = (char *)malloc(ENCAP_BUFSIZE); @@ -1746,8 +1754,28 @@ static streamsock_t *select_next_packet(recvstream_t *rt) { continue; } + if (rt->sources[i].rectype == 0) { + r = process_ndag_encap_headers(&(rt->sources[i]), rt); + if (r < 0) { + return NULL; + } + if (r == 0) { + continue; + } + } + if (rt->sources[i].nextts == 0) { - r = process_ndag_erf_headers(&(rt->sources[i]), rt); + if (rt->sources[i].rectype == NDAG_PKT_ENCAPERF) { + r = process_ndag_erf_headers( + &(rt->sources[i]), rt); + } else if (rt->sources[i].rectype == + NDAG_PKT_CORSAROTAG) { + r = process_ndag_corsaro_header( + &(rt->sources[i]), rt); + } else { + r = 0; + } + if (r < 0) { return NULL; } From c41e516119aeaacdd77b2f86f32c023c23b08cf6 Mon Sep 17 00:00:00 2001 From: Shane Alcock Date: Thu, 26 Oct 2023 13:21:56 +1300 Subject: [PATCH 8/8] ndag: fix a few bugs when handling CORSAROTAG packets * fix issue where all CORSAROTAG packets were triggering "invalid record" messages. * fix issue where CORSAROTAG packets did not have a payload pointer correctly set. * fix issue where rectype was not being reset when we had processed a complete CORSAROTAG packet --- lib/format_ndag.c | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/lib/format_ndag.c b/lib/format_ndag.c index 81c5c8c8..c6eb35c1 100644 --- a/lib/format_ndag.c +++ b/lib/format_ndag.c @@ -864,7 +864,8 @@ static int got_complete_packet(streamsock_t *ssock) { int nr = ssock->nextreadind; int next; - if (ssock->rectype == NDAG_PKT_ENCAPERF) { + if (ssock->rectype == NDAG_PKT_ENCAPERF || + ssock->rectype == NDAG_PKT_CORSAROTAG) { required = ssock->nextrlen; } else { required = 0; @@ -964,7 +965,8 @@ static int process_ndag_encap_headers(streamsock_t *ssock, recvstream_t *rt) { if (rectype == NDAG_PKT_KEEPALIVE) { consume_streamsock_data(ssock, required); return process_ndag_encap_headers(ssock, rt); - } else if (rectype != NDAG_PKT_ENCAPERF) { + } else if (rectype != NDAG_PKT_ENCAPERF && + rectype != NDAG_PKT_CORSAROTAG) { fprintf(stderr, "Received invalid record on the channel for %s:%u.\n", ssock->groupaddr, ssock->port); return -1; @@ -1196,6 +1198,11 @@ static int ndag_prepare_packet_stream_corsarotag(libtrace_t *restrict libtrace, taghdr = (corsaro_tagged_packet_header_t *)packet->header; + packet->payload = &(taghdr->tags); + packet->order = ((uint64_t) ntohl(taghdr->ts_sec)) << 32; + packet->order += (((uint64_t) ntohl(taghdr->ts_usec)) << 32) / 1000000; + packet->cached.link_type = TRACE_TYPE_CORSAROTAG; + rt->received_packets ++; ssock->total_recordcount += 1; @@ -1210,11 +1217,9 @@ static int ndag_prepare_packet_stream_corsarotag(libtrace_t *restrict libtrace, if (ssock->reccount >= ssock->expectedreccount) { ssock->expectedreccount = 0; ssock->reccount = 0; + ssock->rectype = 0; } - packet->order = ((uint64_t) ntohl(taghdr->ts_sec)) << 32; - packet->order += (((uint64_t) ntohl(taghdr->ts_usec)) << 32) / 1000000; - packet->cached.link_type = TRACE_TYPE_CORSAROTAG; return rlen; }