Skip to content

Commit

Permalink
Merge pull request #12686 from rabbitmq/stream-coordinator-ra-local-q…
Browse files Browse the repository at this point in the history
…uery-infinity-timeout

Use infinity timout for RA local query in stream coordinator
  • Loading branch information
acogoluegnes authored Nov 8, 2024
2 parents c839409 + 1634adb commit ca70f20
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
12 changes: 8 additions & 4 deletions deps/rabbit/src/rabbit_stream_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@
-export([query_local_pid/3,
query_writer_pid/2,
query_members/2,
query_stream_overview/2]).
query_stream_overview/2,
ra_local_query/1]).


-export([log_overview/1,
Expand Down Expand Up @@ -271,7 +272,7 @@ sac_state(#?MODULE{single_active_consumer = SacState}) ->

%% for debugging
state() ->
case ra:local_query({?MODULE, node()}, fun(State) -> State end) of
case ra_local_query(fun(State) -> State end) of
{ok, {_, Res}, _} ->
Res;
Any ->
Expand All @@ -289,7 +290,7 @@ local_pid(StreamId) when is_list(StreamId) ->
query_pid(StreamId, MFA).

query_pid(StreamId, MFA) when is_list(StreamId) ->
case ra:local_query({?MODULE, node()}, MFA) of
case ra_local_query(MFA) of
{ok, {_, {ok, Pid}}, _} ->
case erpc:call(node(Pid), erlang, is_process_alive, [Pid]) of
true ->
Expand Down Expand Up @@ -380,7 +381,7 @@ query_writer_pid(StreamId, #?MODULE{streams = Streams}) ->
end.

do_query(MFA) ->
case ra:local_query({?MODULE, node()}, MFA) of
case ra_local_query(MFA) of
{ok, {_, {ok, _} = Result}, _} ->
Result;
{ok, {_, {error, not_found}}, _} ->
Expand Down Expand Up @@ -2337,3 +2338,6 @@ key_metrics_rpc(ServerId) ->

maps_to_list(M) ->
lists:sort(maps:to_list(M)).

ra_local_query(QueryFun) ->
ra:local_query({?MODULE, node()}, QueryFun, infinity).
10 changes: 4 additions & 6 deletions deps/rabbit/src/rabbit_stream_sac_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
group_consumers/5,
overview/1]).

-import(rabbit_stream_coordinator, [ra_local_query/1]).

%% Single Active Consumer API
-spec register_consumer(binary(),
binary(),
Expand Down Expand Up @@ -129,9 +131,7 @@ process_command(Cmd) ->
{ok,
[term()] | {error, atom()}}.
consumer_groups(VirtualHost, InfoKeys) ->
case ra:local_query({rabbit_stream_coordinator,
node()},
fun(State) ->
case ra_local_query(fun(State) ->
SacState =
rabbit_stream_coordinator:sac_state(State),
consumer_groups(VirtualHost,
Expand All @@ -152,9 +152,7 @@ consumer_groups(VirtualHost, InfoKeys) ->
{ok, [term()]} |
{error, atom()}.
group_consumers(VirtualHost, Stream, Reference, InfoKeys) ->
case ra:local_query({rabbit_stream_coordinator,
node()},
fun(State) ->
case ra_local_query(fun(State) ->
SacState =
rabbit_stream_coordinator:sac_state(State),
group_consumers(VirtualHost,
Expand Down

0 comments on commit ca70f20

Please sign in to comment.