Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nhse o34 orkv.i58 refactor #59

Open
wants to merge 2 commits into
base: openriak-3.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions src/riak_kv_http_cache.erl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-module(riak_kv_http_cache).

-export([start_link/0,
get_stats/0]).
get_stats/1]).

-export([init/1,
handle_call/3,
Expand All @@ -10,15 +10,19 @@
terminate/2,
code_change/3]).

-define(SERVER, ?MODULE).
-record(st,
{
ts :: undefined|erlang:timestamp(),
stats = []
}
).

-record(st, {ts, stats = []}).

start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

get_stats() ->
gen_server:call(?MODULE, get_stats).
get_stats(Timeout) ->

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice with either a type or a comment stating in which unit the Timeout is given.
-spec get_stats(Milliseconds :: non_neg_integer()) -> ..... or similar

gen_server:call(?MODULE, get_stats, Timeout).

init(_) ->
{ok, #st{}}.
Expand All @@ -40,15 +44,15 @@ code_change(_, S, _) ->
{ok, S}.

check_cache(#st{ts = undefined} = S) ->
S#st{ts = os:timestamp(), stats = do_get_stats()};
Stats = riak_kv_status:get_stats(web),
S#st{ts = os:timestamp(), stats = Stats};
check_cache(#st{ts = Then} = S) ->
CacheTime = application:get_env(riak_kv, http_stats_cache_seconds, 1),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One could be generous and make the configurable time in milliseconds instead of course seconds. That would help if one ever want to test these timeouts in QuickCheck like fashion.
Since this is a new parameter, setting default to 1000 and have it provided in milliseconds is not harming backward compatibility.

Now = os:timestamp(),
case timer:now_diff(Now, Then) < 1000000 of
true ->
S;
false ->
S#st{ts = Now, stats = do_get_stats()}
end.

do_get_stats() ->
riak_kv_wm_stats:get_stats().
case timer:now_diff(Now, Then) < (CacheTime * 1000000) of
true ->
S;
false ->
Stats = riak_kv_status:get_stats(web),
S#st{ts = os:timestamp(), stats = Stats}
end.
15 changes: 2 additions & 13 deletions src/riak_kv_stat.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@
%% Update each stat with the exported function update/1. Add
%% a new stat to the internal stats/0 func to register a new stat with
%% folsom.
%%
%% Get the latest aggregation of stats with the exported function
%% get_stats/0. Or use folsom_metrics:get_metric_value/1,
%% or riak_core_stat_q:get_stats/1.
%%

-module(riak_kv_stat).

Expand All @@ -41,7 +36,7 @@
-endif.

%% API
-export([start_link/0, get_stats/0,
-export([start_link/0,
update/1, perform_update/1, register_stats/0, unregister_vnode_stats/1, produce_stats/0,
leveldb_read_block_errors/0, stat_update_error/3, stop/0]).
-export([track_bucket/1, untrack_bucket/1]).
Expand Down Expand Up @@ -71,11 +66,6 @@ unregister_vnode_stats(Index) ->
unregister_per_index(heads, Index),
unregister_per_index(puts, Index).

%% @spec get_stats() -> proplist()
%% @doc Get the current aggregation of stats.
get_stats() ->
riak_kv_wm_stats:get_stats().


%% Creation of a dynamic stat _must_ be serialized.
register_stat(Name, Type) ->
Expand Down Expand Up @@ -1006,7 +996,6 @@ bc_stats(Pfx) ->
{sys_global_heaps_size, ?MODULE, value, [deprecated]},
{sys_heap_type, erlang, system_info, [heap_type]},
{sys_logical_processors, erlang, system_info, [logical_processors]},
{sys_monitor_count, riak_kv_stat_bc, sys_monitor_count, []},
{sys_otp_release, riak_kv_stat_bc, otp_release, []},
{sys_port_count, erlang, system_info, [port_count]},
{sys_process_count, erlang, system_info, [process_count]},
Expand Down Expand Up @@ -1164,7 +1153,7 @@ create_or_update_histogram_test() ->
Metric = [riak_kv,put_fsm,counter,time],
ok = repeat_create_or_update(Metric, 1, histogram, 100),
?assertNotEqual(exometer:get_value(Metric), 0),
Stats = get_stats(),
Stats = riak_kv_status:get_stats(web),
?LOG_INFO("stats prop list ~s", [Stats]),
?assertNotEqual(proplists:get_value({node_put_fsm_counter_time_mean}, Stats), 0)
after
Expand Down
12 changes: 0 additions & 12 deletions src/riak_kv_stat_bc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -195,18 +195,6 @@ system_version() ->
system_architecture() ->
list_to_binary(erlang:system_info(system_architecture)).

%% Count up all monitors, unfortunately has to obtain process_info
%% from all processes to work it out.
sys_monitor_count() ->
lists:foldl(fun(Pid, Count) ->
case erlang:process_info(Pid, monitors) of
{monitors, Mons} ->
Count + length(Mons);
_ ->
Count
end
end, 0, processes()).

app_stats() ->
[{list_to_atom(atom_to_list(A) ++ "_version"), list_to_binary(V)}
|| {A,_,V} <- application:which_applications()].
Expand Down
79 changes: 53 additions & 26 deletions src/riak_kv_status.erl
Original file line number Diff line number Diff line change
Expand Up @@ -91,33 +91,60 @@ get_stats(console) ->
++ riak_kv_stat_bc:disk_stats()
++ riak_kv_stat_bc:app_stats().

aliases() ->
Grouped = exometer_alias:prefix_foldl(
<<>>,
fun(Alias, Entry, DP, Acc) ->
orddict:append(Entry, {DP, Alias}, Acc)
end, orddict:new()),
lists:keysort(
1,
lists:foldl(
fun({K, DPs}, Acc) ->
case exometer:get_value(K, [D || {D,_} <- DPs]) of
{ok, Vs} when is_list(Vs) ->
lists:foldr(fun({D,V}, Acc1) ->
{_,N} = lists:keyfind(D,1,DPs),
[{N,V}|Acc1]
end, Acc, Vs);
Other ->
Val = case Other of
{ok, disabled} -> undefined;
_ -> 0
end,
lists:foldr(fun({_,N}, Acc1) ->
[{N,Val}|Acc1]
end, Acc, DPs)
end
end, [], orddict:to_list(Grouped))).

aliases() ->
AllStats =
exometer_alias:prefix_foldl(
<<>>,
fun(Alias, Entry, DP, Acc) -> [{Entry, {DP, Alias}}|Acc] end,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Later on we make an assumption on how these entries are kind of sorted such that consecutive entries end up next to each other, right?
Would be good to provide this assumption in a comment. Otherwise it is weird that if
Entry == PrevEntry you "randomly" discard the new entry.
A comment would explain this for future developers.

[]
),
case AllStats of
[] ->
[];
AllStats when is_list(AllStats) ->
{{FinalEntry, FinalDPMap}, AliasVals} =
lists:foldl(
fun({Entry, {DP, Alias}}, {{PrevEntry, DPmap}, Acc}) ->
case Entry of
Entry when Entry == PrevEntry ->
{{PrevEntry, maps:put(DP, Alias, DPmap)}, Acc};
Entry when PrevEntry == none ->
{{Entry, maps:put(DP, Alias, DPmap)}, Acc};
Entry ->
UpdAcc = get_exometer_values(PrevEntry, DPmap),
{{Entry, #{DP => Alias}}, UpdAcc ++ Acc}
end
end,
{{none, #{}}, []},
AllStats
),
lists:keysort(
1,
get_exometer_values(FinalEntry, FinalDPMap) ++ AliasVals
)
end.

get_exometer_values(Entry, DPmap) ->
case exometer:get_value(Entry, maps:keys(DPmap)) of
{ok, Vs} when is_list(Vs) ->
lists:map(
fun({D, V}) ->
{maps:get(D, DPmap), V}
end,
Vs
);
Other ->
DefaultValue =
case Other of
{ok, disabled} -> disabled;
_ -> 0
end,
lists:map(
fun(A) -> {A, DefaultValue} end,
maps:values(DPmap)
)
end.

expand_disk_stats([{disk, Stats}]) ->
[{disk, [{struct, [{id, list_to_binary(Id)}, {size, Size}, {used, Used}]}
Expand Down
22 changes: 20 additions & 2 deletions src/riak_kv_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@
is_modfun_allowed/2,
shuffle_list/1,
kv_ready/0,
ngr_initial_timeout/0
ngr_initial_timeout/0,
sys_monitor_count/0
]).
-export([report_hashtree_tokens/0, reset_hashtree_tokens/2]).

Expand Down Expand Up @@ -714,7 +715,24 @@ get_initial_call(P) ->
_ ->
undefined
end.


%% @doc sys_monitor_count/0
%% Count up all monitors, unfortunately has to obtain process_info
%% from all processes to work it out.
sys_monitor_count() ->
lists:foldl(
fun(Pid, Count) ->
case erlang:process_info(Pid, monitors) of
{monitors, Mons} ->
Count + length(Mons);
_ ->
Count
end
end,
0, processes()
).


%% ===================================================================
%% EUnit tests
%% ===================================================================
Expand Down
71 changes: 58 additions & 13 deletions src/riak_kv_wm_stats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@
content_types_provided/2,
service_available/2,
forbidden/2,
malformed_request/2,
produce_body/2,
pretty_print/2
]).
-export([get_stats/0]).

-define(TIMEOUT, 30000).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with comment:

Suggested change
-define(TIMEOUT, 30000).
-define(TIMEOUT, 30000). %% in milliseconds


-include_lib("webmachine/include/webmachine.hrl").
-include("riak_kv_wm_raw.hrl").

-record(ctx, {}).
-record(ctx, {timeout = ?TIMEOUT :: non_neg_integer()}).

init(_) ->
{ok, #ctx{}}.
Expand Down Expand Up @@ -66,25 +69,67 @@ content_types_provided(ReqData, Context) ->
{"text/plain", pretty_print}],
ReqData, Context}.


service_available(ReqData, Ctx) ->
{true, ReqData, Ctx}.

malformed_request(RD, Ctx) ->
case wrq:get_qs_value("timeout", RD) of
undefined ->
{false, RD, Ctx};
TimeoutStr ->
try
case list_to_integer(TimeoutStr) of
Timeout when Timeout > 0 ->
{false, RD, Ctx#ctx{timeout=Timeout}}
end
catch
_:_ ->
{true,
wrq:append_to_resp_body(
io_lib:format(
"Bad timeout value ~0p",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add in explanation "should be provided in miliseconds"
There is an argument for letting 0 be a valid value for timeout, basically stating, "asap".

[TimeoutStr]
),
wrq:set_resp_header(?HEAD_CTYPE, "text/plain", RD)),
Ctx}
end
end.

forbidden(RD, Ctx) ->
{riak_kv_wm_utils:is_forbidden(RD), RD, Ctx}.

produce_body(ReqData, Ctx) ->
Stats= riak_kv_http_cache:get_stats(),
Body = mochijson2:encode({struct, Stats}),
{Body, ReqData, Ctx}.
produce_body(RD, Ctx) ->
try
Stats = riak_kv_http_cache:get_stats(Ctx#ctx.timeout),
Body = mochijson2:encode({struct, Stats}),
{Body, RD, Ctx}
catch
exit:{timeout, _} ->
{
{halt, 503},
wrq:set_resp_header(
?HEAD_CTYPE,
"text/plain",
wrq:append_to_response_body(
io_lib:format(
"Request timed out after ~w ms",
[Ctx#ctx.timeout]
),
RD
)
),
Ctx
}
end.

%% @spec pretty_print(webmachine:wrq(), context()) ->
%% {string(), webmachine:wrq(), context()}
%% @doc Format the respons JSON object is a "pretty-printed" style.
pretty_print(RD1, C1=#ctx{}) ->
{Json, RD2, C2} = produce_body(RD1, C1),
{json_pp:print(binary_to_list(list_to_binary(Json))), RD2, C2}.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was the reason to do this that Json can be a deeplist and it is flattened in this way to make json_pp:print work?


pretty_print(RD, Ctx) ->
case produce_body(RD, Ctx) of
{{halt, RepsonseCode}, UpdRD, UpdCtx} ->
{{halt, RepsonseCode}, UpdRD, UpdCtx};
{Json, UpdRD, UpdCtx} ->
{json_pp:print(Json), UpdRD, UpdCtx}
end.

get_stats() ->
riak_kv_status:get_stats(web).
Loading