Skip to content

Commit

Permalink
Merge pull request #1 from nhs-riak/nhse-contrib-kv1871
Browse files Browse the repository at this point in the history
KV i1871 - Handle timeout on remote connection
  • Loading branch information
martinsumner authored Sep 2, 2023
2 parents 4484001 + dabe0fe commit dee16d5
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 42 deletions.
11 changes: 8 additions & 3 deletions src/riak_kv_replrtq_peer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
-type discovery_peer() ::
{riak_kv_replrtq_snk:queue_name(), [riak_kv_replrtq_snk:peer_info()]}.

-define(DISCOVERY_TIMEOUT_SECONDS, 60).
-define(UPDATE_TIMEOUT_SECONDS, 60).
-define(DISCOVERY_TIMEOUT_SECONDS, 300).
-define(UPDATE_TIMEOUT_SECONDS, 300).
-define(AUTO_DISCOVERY_MAXIMUM_SECONDS, 900).
-define(AUTO_DISCOVERY_MINIMUM_SECONDS, 60).

Expand All @@ -66,7 +66,7 @@ update_discovery(QueueName) ->
?DISCOVERY_TIMEOUT_SECONDS * 1000).

-spec update_workers(pos_integer(), pos_integer()) -> boolean().
update_workers(WorkerCount, PerPeerLimit) ->
update_workers(WorkerCount, PerPeerLimit) when PerPeerLimit =< WorkerCount ->
gen_server:call(
?MODULE,
{update_workers, WorkerCount, PerPeerLimit},
Expand Down Expand Up @@ -142,6 +142,11 @@ handle_info({scheduled_discovery, QueueName}, State) ->
?AUTO_DISCOVERY_MAXIMUM_SECONDS),
Delay = rand:uniform(max(1, MaxDelay - MinDelay)) + MinDelay,
_ = schedule_discovery(QueueName, self(), Delay),
{noreply, State};
handle_info({Ref, {error, HTTPClientError}}, State) when is_reference(Ref) ->
lager:info(
"Client error caught - error ~p returned after timeout",
[HTTPClientError]),
{noreply, State}.

terminate(_Reason, _State) ->
Expand Down
88 changes: 49 additions & 39 deletions src/riak_kv_replrtq_snk.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
handle_call/3,
handle_cast/2,
handle_info/2,
handle_continue/2,
terminate/2,
code_change/3]).

Expand Down Expand Up @@ -65,7 +66,7 @@
-define(STARTING_DELAYMS, 8).
-define(MAX_SUCCESS_DELAYMS, 1024).
-define(ON_ERROR_DELAYMS, 65536).
-define(INACTIVITY_TIMEOUT_MS, 60000).
-define(INITIAL_TIMEOUT_MS, 60000).
-define(DEFAULT_WORKERCOUNT, 1).

-record(sink_work, {queue_name :: queue_name(),
Expand Down Expand Up @@ -208,10 +209,13 @@ add_snkqueue(QueueName, Peers, WorkerCount) ->
%% number of workers overall
-spec add_snkqueue(queue_name(), list(peer_info()),
pos_integer(), pos_integer()) -> ok.
add_snkqueue(QueueName, Peers, WorkerCount, PerPeerLimit)
when PerPeerLimit =< WorkerCount ->
gen_server:call(?MODULE,
{add, QueueName, Peers, WorkerCount, PerPeerLimit}).
add_snkqueue(
QueueName, Peers, WorkerCount, PerPeerLimit)
when PerPeerLimit =< WorkerCount ->
gen_server:call(
?MODULE,
{add, QueueName, Peers, WorkerCount, PerPeerLimit},
infinity).


%% @doc
Expand All @@ -220,7 +224,7 @@ add_snkqueue(QueueName, Peers, WorkerCount, PerPeerLimit)
%% Returns undefined if there are currently no peers defined.
-spec current_peers(queue_name()) -> list(peer_info())|undefined.
current_peers(QueueName) ->
gen_server:call(?MODULE, {current_peers, QueueName}).
gen_server:call(?MODULE, {current_peers, QueueName}, infinity).


%% @doc
Expand All @@ -236,47 +240,26 @@ set_workercount(QueueName, WorkerCount) ->
%% @doc
%% Change the number of concurrent workers whilst limiting the number of
%% workers per peer
-spec set_workercount(queue_name(), pos_integer(), pos_integer())
-> ok|not_found.
set_workercount(QueueName, WorkerCount, PerPeerLimit)
when PerPeerLimit =< WorkerCount ->
gen_server:call(?MODULE,
{worker_count, QueueName, WorkerCount, PerPeerLimit}).
-spec set_workercount(
queue_name(), pos_integer(), pos_integer()) -> ok|not_found.
set_workercount(
QueueName, WorkerCount, PerPeerLimit)
when PerPeerLimit =< WorkerCount ->
gen_server:call(
?MODULE,
{worker_count, QueueName, WorkerCount, PerPeerLimit},
infinity
).

%%%============================================================================
%%% gen_server callbacks
%%%============================================================================

init([]) ->
SinkEnabled =
app_helper:get_env(riak_kv, replrtq_enablesink, false),
SinkEnabled = app_helper:get_env(riak_kv, replrtq_enablesink, false),
case SinkEnabled of
true ->
SinkPeers =
app_helper:get_env(riak_kv, replrtq_sinkpeers, ""),
DefaultQueue =
app_helper:get_env(riak_kv, replrtq_sinkqueue),
SnkQueuePeerInfo = tokenise_peers(DefaultQueue, SinkPeers),
{SnkWorkerCount, PerPeerLimit} = get_worker_counts(),
Iteration = 1,
MapPeerInfoFun =
fun({SnkQueueName, SnkPeerInfo}) ->
{SnkQueueLength, SnkWorkQueue} =
determine_workitems(SnkQueueName,
Iteration,
SnkPeerInfo,
SnkWorkerCount,
min(SnkWorkerCount, PerPeerLimit)),
SnkW =
#sink_work{queue_name = SnkQueueName,
work_queue = SnkWorkQueue,
minimum_queue_length = SnkQueueLength,
peer_list = SnkPeerInfo,
max_worker_count = SnkWorkerCount},
{SnkQueueName, Iteration, SnkW}
end,
Work = lists:map(MapPeerInfoFun, SnkQueuePeerInfo),
{ok, #state{enabled = true, work = Work}, ?INACTIVITY_TIMEOUT_MS};
{ok, #state{}, {continue, initialise_work}};
false ->
{ok, #state{}}
end.
Expand Down Expand Up @@ -432,6 +415,33 @@ handle_info({prompt_requeue, WorkItem}, State) ->
requeue_work(WorkItem),
{noreply, State}.

handle_continue(initialise_work, State) ->
SinkPeers =
app_helper:get_env(riak_kv, replrtq_sinkpeers, ""),
DefaultQueue =
app_helper:get_env(riak_kv, replrtq_sinkqueue),
SnkQueuePeerInfo = tokenise_peers(DefaultQueue, SinkPeers),
{SnkWorkerCount, PerPeerLimit} = get_worker_counts(),
Iteration = 1,
MapPeerInfoFun =
fun({SnkQueueName, SnkPeerInfo}) ->
{SnkQueueLength, SnkWorkQueue} =
determine_workitems(SnkQueueName,
Iteration,
SnkPeerInfo,
SnkWorkerCount,
min(SnkWorkerCount, PerPeerLimit)),
SnkW =
#sink_work{queue_name = SnkQueueName,
work_queue = SnkWorkQueue,
minimum_queue_length = SnkQueueLength,
peer_list = SnkPeerInfo,
max_worker_count = SnkWorkerCount},
{SnkQueueName, Iteration, SnkW}
end,
Work = lists:map(MapPeerInfoFun, SnkQueuePeerInfo),
{noreply, State#state{enabled = true, work = Work}, ?INITIAL_TIMEOUT_MS}.

terminate(_Reason, State) ->
WorkItems = lists:map(fun(SW) -> element(3, SW) end, State#state.work),
CloseFun =
Expand Down

0 comments on commit dee16d5

Please sign in to comment.