Skip to content

Commit

Permalink
Refactor code related to auto flow queues
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Oct 27, 2023
1 parent 0958822 commit b075c70
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 34 deletions.
3 changes: 1 addition & 2 deletions lib/membrane/core/element/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ defmodule Membrane.Core.Element.CallbackContext do
name: state.name,
playback: state.playback,
resource_guard: state.resource_guard,
utility_supervisor: state.subprocess_supervisor,
big_state: state
utility_supervisor: state.subprocess_supervisor
})
end
end
37 changes: 8 additions & 29 deletions lib/membrane/core/element/demand_controller/auto_flow_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,6 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
pad_data.flow_control == :auto and pad_data.demand < 0
end

@spec auto_flow_queue_empty?(Pad.ref(), State.t()) :: boolean()
def auto_flow_queue_empty?(pad_ref, state) do
PadModel.get_data!(state, pad_ref, :auto_flow_queue) == Qex.new()
end

@spec store_buffers_in_queue(Pad.ref(), [Buffer.t()], State.t()) :: State.t()
def store_buffers_in_queue(pad_ref, buffers, state) do
store_in_queue(pad_ref, :buffers, buffers, state)
Expand All @@ -99,9 +94,11 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
end

@spec auto_adjust_atomic_demand(Pad.ref() | [Pad.ref()], State.t()) :: State.t()
def auto_adjust_atomic_demand(pad_ref_list, state) when is_list(pad_ref_list) do
def auto_adjust_atomic_demand(ref_or_ref_list, state)
when Pad.is_pad_ref(ref_or_ref_list) or is_list(ref_or_ref_list) do
{bumped_pads, state} =
pad_ref_list
ref_or_ref_list
|> Bunch.listify()
|> Enum.flat_map_reduce(state, fn pad_ref, state ->
PadModel.get_data!(state, pad_ref)
|> do_auto_adjust_atomic_demand(state)
Expand All @@ -114,18 +111,6 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
flush_auto_flow_queues(bumped_pads, state)
end

def auto_adjust_atomic_demand(pad_ref, state) when Pad.is_pad_ref(pad_ref) do
PadModel.get_data!(state, pad_ref)
|> do_auto_adjust_atomic_demand(state)
|> case do
{:increased, state} ->
flush_auto_flow_queues([pad_ref], state)

{:unchanged, state} ->
state
end
end

defp do_auto_adjust_atomic_demand(pad_data, state) when is_input_auto_pad_data(pad_data) do
if increase_atomic_demand?(pad_data, state) do
%{
Expand Down Expand Up @@ -167,15 +152,9 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
atomic_demand_value > 0
end

defp flush_auto_flow_queues(pad_ref_list, state) do
pad_ref_list
|> Enum.reject(&hard_corcked?(&1, state))
|> do_flush_auto_flow_queues(state)
end

defp do_flush_auto_flow_queues([], state), do: state
defp flush_auto_flow_queues([], state), do: state

defp do_flush_auto_flow_queues(pads_to_flush, state) do
defp flush_auto_flow_queues(pads_to_flush, state) do
selected_pad = Enum.random(pads_to_flush)

PadModel.get_data!(state, selected_pad, :auto_flow_queue)
Expand All @@ -186,14 +165,14 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
exec_queue_item_callback(selected_pad, queue_item, state)
|> PadModel.set_data!(selected_pad, :auto_flow_queue, popped_queue)

do_flush_auto_flow_queues(pads_to_flush, state)
flush_auto_flow_queues(pads_to_flush, state)

{:empty, empty_queue} ->
state = PadModel.set_data!(state, selected_pad, :auto_flow_queue, empty_queue)

pads_to_flush
|> List.delete(selected_pad)
|> do_flush_auto_flow_queues(state)
|> flush_auto_flow_queues(state)
end
end

Expand Down
3 changes: 0 additions & 3 deletions logs.txt

This file was deleted.

0 comments on commit b075c70

Please sign in to comment.