Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Put receiver dropped packets into ring buffer #609

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/internal_modules/roc_pipeline/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,21 @@ void ReceiverCommonConfig::deduce_defaults() {

ReceiverSessionConfig::ReceiverSessionConfig()
: payload_type(0)
, prebuf_len(0)
, enable_beeping(false) {
}

void ReceiverSessionConfig::deduce_defaults() {
if (prebuf_len == 0) {
prebuf_len = latency.target_latency;
}
latency.deduce_defaults(DefaultLatency, true);
watchdog.deduce_defaults(latency.target_latency);
resampler.deduce_defaults(latency.tuner_backend, latency.tuner_profile);
}

ReceiverSourceConfig::ReceiverSourceConfig() {
ReceiverSourceConfig::ReceiverSourceConfig()
: max_session_packets(0) {
}

void ReceiverSourceConfig::deduce_defaults() {
Expand Down
6 changes: 6 additions & 0 deletions src/internal_modules/roc_pipeline/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ struct ReceiverSessionConfig {
//! Packet payload type.
unsigned int payload_type;

//! Packet prebuffer length, nanoseconds.
core::nanoseconds_t prebuf_len;

//! FEC reader parameters.
fec::BlockReaderConfig fec_reader;

Expand Down Expand Up @@ -188,6 +191,9 @@ struct ReceiverSourceConfig {
//! Default parameters for a session.
ReceiverSessionConfig session_defaults;

//! Maximum number of packets per session.
size_t max_session_packets;

//! Initialize config.
ReceiverSourceConfig();

Expand Down
45 changes: 45 additions & 0 deletions src/internal_modules/roc_pipeline/receiver_session_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ ReceiverSessionGroup::route_transport_packet_(const packet::PacketPtr& packet) {
}

if (sess) {
enqueue_prebuf_packet_(packet);
// Session found, route packet to it.
return sess->route_packet(packet);
}
Expand All @@ -340,6 +341,48 @@ ReceiverSessionGroup::route_transport_packet_(const packet::PacketPtr& packet) {
return status::StatusNoRoute;
}

void ReceiverSessionGroup::enqueue_prebuf_packet_(const packet::PacketPtr& packet_ptr) {
prebuf_packets_.push_back(*packet_ptr.get());

core::nanoseconds_t now = core::timestamp(core::ClockMonotonic);

while (prebuf_packets_.size() > 0) {
core::nanoseconds_t received = prebuf_packets_.front()->udp()->receive_timestamp;
if (now - received > source_config_.session_defaults.prebuf_len) {
prebuf_packets_.remove(*prebuf_packets_.front());
} else {
break;
}
}
}

void ReceiverSessionGroup::dequeue_prebuf_packets_(ReceiverSession& sess) {
packet::PacketPtr curr, next;

if (prebuf_packets_.size() == 0) {
return;
}

core::nanoseconds_t now = core::timestamp(core::ClockMonotonic);

for (curr = prebuf_packets_.front(); curr; curr = next) {
next = prebuf_packets_.nextof(*curr);

// if packet is too old, remove it from the queue
core::nanoseconds_t received = curr->udp()->receive_timestamp;
if (now - received > source_config_.session_defaults.prebuf_len) {
prebuf_packets_.remove(*curr);
continue;
}

// if session handles the packet, remove it from the queue
const status::StatusCode code = sess.route_packet(curr);
if (code == status::StatusOK) {
prebuf_packets_.remove(*curr);
}
}
}

status::StatusCode
ReceiverSessionGroup::route_control_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time) {
Expand Down Expand Up @@ -431,6 +474,8 @@ ReceiverSessionGroup::create_session_(const packet::PacketPtr& packet) {
sessions_.push_back(*sess);
state_tracker_.register_session();

dequeue_prebuf_packets_(*sess);

return status::StatusOK;
}

Expand Down
4 changes: 4 additions & 0 deletions src/internal_modules/roc_pipeline/receiver_session_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip
status::StatusCode route_transport_packet_(const packet::PacketPtr& packet);
status::StatusCode route_control_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time);
void enqueue_prebuf_packet_(const packet::PacketPtr& packet);
void dequeue_prebuf_packets_(ReceiverSession& sess);

bool can_create_session_(const packet::PacketPtr& packet);

Expand Down Expand Up @@ -159,6 +161,8 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip
sessions_;
ReceiverSessionRouter session_router_;

core::List<packet::Packet> prebuf_packets_;

status::StatusCode init_status_;
};

Expand Down
8 changes: 8 additions & 0 deletions src/public_api/include/roc/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,14 @@ typedef struct roc_receiver_config {
* If zero, default value is used. If negative, the check is disabled.
*/
long long choppy_playback_timeout;

/** Packet prebuffer length, in nanoseconds.
* Packets received for sessions that have not yet been created
* will be buffered. Any packets older than the prebuf_len
* will be discarded.
* If zero, default value is used.
*/
unsigned long long prebuf_len;
} roc_receiver_config;

/** Interface configuration.
Expand Down
55 changes: 55 additions & 0 deletions src/tests/roc_pipeline/test_receiver_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2181,6 +2181,61 @@ TEST(receiver_source, timestamp_mapping_remixing) {
CHECK(first_ts);
}

TEST(receiver_source, packet_buffer) {
enum { Rate = SampleRate, Chans = Chans_Stereo, MaxPackets = 10 };

init(Rate, Chans, Rate, Chans);

ReceiverSourceConfig config = make_default_config();
config.session_defaults.prebuf_len = 0;
ReceiverSource receiver(config, encoding_map, packet_pool, packet_buffer_pool,
frame_pool, frame_buffer_pool, arena);
LONGS_EQUAL(status::StatusOK, receiver.init_status());

ReceiverSlot* slot = create_slot(receiver);
CHECK(slot);

packet::FifoQueue queue;
packet::FifoQueue source_queue;
packet::FifoQueue repair_queue;

packet::IWriter* source_endpoint_writer = create_transport_endpoint(
slot, address::Iface_AudioSource, address::Proto_RTP_RS8M_Source, dst_addr1);

packet::IWriter* repair_endpoint_writer = create_transport_endpoint(
slot, address::Iface_AudioRepair, address::Proto_RS8M_Repair, dst_addr2);

fec::BlockWriterConfig fec_config;

test::PacketWriter packet_writer(
arena, *source_endpoint_writer, *repair_endpoint_writer, encoding_map,
packet_factory, src_id1, src_addr1, dst_addr1, dst_addr2, PayloadType_Ch2,
packet::FEC_ReedSolomon_M8, fec_config);

// setup reader
test::FrameReader frame_reader(receiver, frame_factory);

packet_writer.write_packets(fec_config.n_source_packets, SamplesPerPacket,
output_sample_spec);

for (int i = 0; i < ManyPackets; ++i) {
packet::PacketPtr pp;
LONGS_EQUAL(status::StatusOK, queue.read(pp, packet::ModeFetch));
CHECK(pp);

if (pp->flags() & packet::Packet::FlagAudio) {
UNSIGNED_LONGS_EQUAL(status::StatusOK, source_queue.write(pp));
}
if (pp->flags() & packet::Packet::FlagRepair) {
UNSIGNED_LONGS_EQUAL(status::StatusOK, repair_queue.write(pp));
}
}

LONGS_EQUAL(status::StatusOK, receiver.refresh(frame_reader.refresh_ts(), NULL));
frame_reader.read_nonzero_samples(SamplesPerFrame, output_sample_spec);
UNSIGNED_LONGS_EQUAL(1, receiver.num_sessions());
}

// Check receiver metrics for multiple remote participants (senders).
TEST(receiver_source, metrics_participants) {
enum { Rate = SampleRate, Chans = Chans_Stereo, MaxParties = 10 };
Expand Down
3 changes: 3 additions & 0 deletions src/tools/roc_recv/cmdline.ggo
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ section "Options"
option "no-play-timeout" - "No playback timeout, TIME units"
string optional

option "prebuf-len" - "Length of packet prebuffer, TIME units"
string optional

option "choppy-play-timeout" - "Choppy playback timeout, TIME units"
string optional

Expand Down
13 changes: 13 additions & 0 deletions src/tools/roc_recv/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,19 @@ int main(int argc, char** argv) {
}
}

if (args.prebuf_len_given) {
core::nanoseconds_t prebuf_len = 0;
if (!core::parse_duration(args.prebuf_len_arg, prebuf_len)) {
roc_log(LogError, "invalid --prebuf-len");
return 1;
}
receiver_config.session_defaults.prebuf_len =
(core::nanoseconds_t)args.prebuf_len_arg;
} else {
receiver_config.session_defaults.prebuf_len =
receiver_config.session_defaults.latency.target_latency;
}

if (args.choppy_play_timeout_given) {
if (!core::parse_duration(
args.choppy_play_timeout_arg,
Expand Down
Loading