Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nhse o32 orkv.i58 backport #60

Open
wants to merge 3 commits into
base: openriak-3.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@
{provider_hooks, [
{pre, [{compile, {protobuf, compile}}]}
]}.

{profiles, [
{test, [{deps, [meck]}]},
{test, [{deps, [{meck, {git, "https://github.com/OpenRiak/meck.git", {branch, "openriak-3.2"}}}]}]},
{gha, [{erl_opts, [{d, 'GITHUBEXCLUDE'}]}]}
]}.

Expand Down
34 changes: 19 additions & 15 deletions src/riak_kv_http_cache.erl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-module(riak_kv_http_cache).

-export([start_link/0,
get_stats/0]).
get_stats/1]).

-export([init/1,
handle_call/3,
Expand All @@ -10,15 +10,19 @@
terminate/2,
code_change/3]).

-define(SERVER, ?MODULE).
-record(st,
{
ts :: undefined|erlang:timestamp(),
stats = []
}
).

-record(st, {ts, stats = []}).

start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

get_stats() ->
gen_server:call(?MODULE, get_stats).
get_stats(Timeout) ->
gen_server:call(?MODULE, get_stats, Timeout).

init(_) ->
{ok, #st{}}.
Expand All @@ -40,15 +44,15 @@ code_change(_, S, _) ->
{ok, S}.

check_cache(#st{ts = undefined} = S) ->
S#st{ts = os:timestamp(), stats = do_get_stats()};
Stats = riak_kv_status:get_stats(web),
S#st{ts = os:timestamp(), stats = Stats};
check_cache(#st{ts = Then} = S) ->
CacheTime = application:get_env(riak_kv, http_stats_cache_seconds, 1),
Now = os:timestamp(),
case timer:now_diff(Now, Then) < 1000000 of
true ->
S;
false ->
S#st{ts = Now, stats = do_get_stats()}
end.

do_get_stats() ->
riak_kv_wm_stats:get_stats().
case timer:now_diff(Now, Then) < (CacheTime * 1000000) of
true ->
S;
false ->
Stats = riak_kv_status:get_stats(web),
S#st{ts = os:timestamp(), stats = Stats}
end.
15 changes: 2 additions & 13 deletions src/riak_kv_stat.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@
%% Update each stat with the exported function update/1. Add
%% a new stat to the internal stats/0 func to register a new stat with
%% folsom.
%%
%% Get the latest aggregation of stats with the exported function
%% get_stats/0. Or use folsom_metrics:get_metric_value/1,
%% or riak_core_stat_q:get_stats/1.
%%

-module(riak_kv_stat).

Expand All @@ -41,7 +36,7 @@
-endif.

%% API
-export([start_link/0, get_stats/0,
-export([start_link/0,
update/1, perform_update/1, register_stats/0, unregister_vnode_stats/1, produce_stats/0,
leveldb_read_block_errors/0, stat_update_error/3, stop/0]).
-export([track_bucket/1, untrack_bucket/1]).
Expand Down Expand Up @@ -71,11 +66,6 @@ unregister_vnode_stats(Index) ->
unregister_per_index(heads, Index),
unregister_per_index(puts, Index).

%% @spec get_stats() -> proplist()
%% @doc Get the current aggregation of stats.
get_stats() ->
riak_kv_wm_stats:get_stats().


%% Creation of a dynamic stat _must_ be serialized.
register_stat(Name, Type) ->
Expand Down Expand Up @@ -969,7 +959,6 @@ bc_stats(Pfx) ->
{sys_global_heaps_size, ?MODULE, value, [deprecated]},
{sys_heap_type, erlang, system_info, [heap_type]},
{sys_logical_processors, erlang, system_info, [logical_processors]},
{sys_monitor_count, riak_kv_stat_bc, sys_monitor_count, []},
{sys_otp_release, riak_kv_stat_bc, otp_release, []},
{sys_port_count, erlang, system_info, [port_count]},
{sys_process_count, erlang, system_info, [process_count]},
Expand Down Expand Up @@ -1127,7 +1116,7 @@ create_or_update_histogram_test() ->
Metric = [riak_kv,put_fsm,counter,time],
ok = repeat_create_or_update(Metric, 1, histogram, 100),
?assertNotEqual(exometer:get_value(Metric), 0),
Stats = get_stats(),
Stats = riak_kv_status:get_stats(web),
?LOG_INFO("stats prop list ~s", [Stats]),
?assertNotEqual(proplists:get_value({node_put_fsm_counter_time_mean}, Stats), 0)
after
Expand Down
12 changes: 0 additions & 12 deletions src/riak_kv_stat_bc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -195,18 +195,6 @@ system_version() ->
system_architecture() ->
list_to_binary(erlang:system_info(system_architecture)).

%% Count up all monitors, unfortunately has to obtain process_info
%% from all processes to work it out.
sys_monitor_count() ->
lists:foldl(fun(Pid, Count) ->
case erlang:process_info(Pid, monitors) of
{monitors, Mons} ->
Count + length(Mons);
_ ->
Count
end
end, 0, processes()).

app_stats() ->
[{list_to_atom(atom_to_list(A) ++ "_version"), list_to_binary(V)}
|| {A,_,V} <- application:which_applications()].
Expand Down
79 changes: 53 additions & 26 deletions src/riak_kv_status.erl
Original file line number Diff line number Diff line change
Expand Up @@ -91,33 +91,60 @@ get_stats(console) ->
++ riak_kv_stat_bc:disk_stats()
++ riak_kv_stat_bc:app_stats().

aliases() ->
Grouped = exometer_alias:prefix_foldl(
<<>>,
fun(Alias, Entry, DP, Acc) ->
orddict:append(Entry, {DP, Alias}, Acc)
end, orddict:new()),
lists:keysort(
1,
lists:foldl(
fun({K, DPs}, Acc) ->
case exometer:get_value(K, [D || {D,_} <- DPs]) of
{ok, Vs} when is_list(Vs) ->
lists:foldr(fun({D,V}, Acc1) ->
{_,N} = lists:keyfind(D,1,DPs),
[{N,V}|Acc1]
end, Acc, Vs);
Other ->
Val = case Other of
{ok, disabled} -> undefined;
_ -> 0
end,
lists:foldr(fun({_,N}, Acc1) ->
[{N,Val}|Acc1]
end, Acc, DPs)
end
end, [], orddict:to_list(Grouped))).

aliases() ->
AllStats =
exometer_alias:prefix_foldl(
<<>>,
fun(Alias, Entry, DP, Acc) -> [{Entry, {DP, Alias}}|Acc] end,
[]
),
case AllStats of
[] ->
[];
AllStats when is_list(AllStats) ->
{{FinalEntry, FinalDPMap}, AliasVals} =
lists:foldl(
fun({Entry, {DP, Alias}}, {{PrevEntry, DPmap}, Acc}) ->
case Entry of
Entry when Entry == PrevEntry ->
{{PrevEntry, maps:put(DP, Alias, DPmap)}, Acc};
Entry when PrevEntry == none ->
{{Entry, maps:put(DP, Alias, DPmap)}, Acc};
Entry ->
UpdAcc = get_exometer_values(PrevEntry, DPmap),
{{Entry, #{DP => Alias}}, UpdAcc ++ Acc}
end
end,
{{none, #{}}, []},
AllStats
),
lists:keysort(
1,
get_exometer_values(FinalEntry, FinalDPMap) ++ AliasVals
)
end.

get_exometer_values(Entry, DPmap) ->
case exometer:get_value(Entry, maps:keys(DPmap)) of
{ok, Vs} when is_list(Vs) ->
lists:map(
fun({D, V}) ->
{maps:get(D, DPmap), V}
end,
Vs
);
Other ->
DefaultValue =
case Other of
{ok, disabled} -> disabled;
_ -> 0
end,
lists:map(
fun(A) -> {A, DefaultValue} end,
maps:values(DPmap)
)
end.

expand_disk_stats([{disk, Stats}]) ->
[{disk, [{struct, [{id, list_to_binary(Id)}, {size, Size}, {used, Used}]}
Expand Down
22 changes: 20 additions & 2 deletions src/riak_kv_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@
is_modfun_allowed/2,
shuffle_list/1,
kv_ready/0,
ngr_initial_timeout/0
ngr_initial_timeout/0,
sys_monitor_count/0
]).
-export([report_hashtree_tokens/0, reset_hashtree_tokens/2]).

Expand Down Expand Up @@ -714,7 +715,24 @@ get_initial_call(P) ->
_ ->
undefined
end.


%% @doc sys_monitor_count/0
%% Count up all monitors, unfortunately has to obtain process_info
%% from all processes to work it out.
sys_monitor_count() ->
lists:foldl(
fun(Pid, Count) ->
case erlang:process_info(Pid, monitors) of
{monitors, Mons} ->
Count + length(Mons);
_ ->
Count
end
end,
0, processes()
).


%% ===================================================================
%% EUnit tests
%% ===================================================================
Expand Down
71 changes: 58 additions & 13 deletions src/riak_kv_wm_stats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@
content_types_provided/2,
service_available/2,
forbidden/2,
malformed_request/2,
produce_body/2,
pretty_print/2
]).
-export([get_stats/0]).

-define(TIMEOUT, 30000).

-include_lib("webmachine/include/webmachine.hrl").
-include("riak_kv_wm_raw.hrl").

-record(ctx, {}).
-record(ctx, {timeout = ?TIMEOUT :: non_neg_integer()}).

init(_) ->
{ok, #ctx{}}.
Expand Down Expand Up @@ -66,25 +69,67 @@ content_types_provided(ReqData, Context) ->
{"text/plain", pretty_print}],
ReqData, Context}.


service_available(ReqData, Ctx) ->
{true, ReqData, Ctx}.

malformed_request(RD, Ctx) ->
case wrq:get_qs_value("timeout", RD) of
undefined ->
{false, RD, Ctx};
TimeoutStr ->
try
case list_to_integer(TimeoutStr) of
Timeout when Timeout > 0 ->
{false, RD, Ctx#ctx{timeout=Timeout}}
end
catch
_:_ ->
{true,
wrq:append_to_resp_body(
io_lib:format(
"Bad timeout value ~0p",
[TimeoutStr]
),
wrq:set_resp_header(?HEAD_CTYPE, "text/plain", RD)),
Ctx}
end
end.

forbidden(RD, Ctx) ->
{riak_kv_wm_utils:is_forbidden(RD), RD, Ctx}.

produce_body(ReqData, Ctx) ->
Stats= riak_kv_http_cache:get_stats(),
Body = mochijson2:encode({struct, Stats}),
{Body, ReqData, Ctx}.
produce_body(RD, Ctx) ->
try
Stats = riak_kv_http_cache:get_stats(Ctx#ctx.timeout),
Body = mochijson2:encode({struct, Stats}),
{Body, RD, Ctx}
catch
exit:{timeout, _} ->
{
{halt, 503},
wrq:set_resp_header(
?HEAD_CTYPE,
"text/plain",
wrq:append_to_response_body(
io_lib:format(
"Request timed out after ~w ms",
[Ctx#ctx.timeout]
),
RD
)
),
Ctx
}
end.

%% @spec pretty_print(webmachine:wrq(), context()) ->
%% {string(), webmachine:wrq(), context()}
%% @doc Format the respons JSON object is a "pretty-printed" style.
pretty_print(RD1, C1=#ctx{}) ->
{Json, RD2, C2} = produce_body(RD1, C1),
{json_pp:print(binary_to_list(list_to_binary(Json))), RD2, C2}.

pretty_print(RD, Ctx) ->
case produce_body(RD, Ctx) of
{{halt, RepsonseCode}, UpdRD, UpdCtx} ->
{{halt, RepsonseCode}, UpdRD, UpdCtx};
{Json, UpdRD, UpdCtx} ->
{json_pp:print(Json), UpdRD, UpdCtx}
end.

get_stats() ->
riak_kv_status:get_stats(web).
Loading