Skip to content

Commit

Permalink
Configure capabilities on the source/target field in the ATTACH frame
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcialRosales committed May 29, 2024
1 parent ea9c637 commit 52cd6be
Show file tree
Hide file tree
Showing 3 changed files with 248 additions and 10 deletions.
53 changes: 48 additions & 5 deletions deps/amqp10_client/src/amqp10_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
attach_sender_link/3,
attach_sender_link/4,
attach_sender_link/5,
attach_sender_link/6,
attach_sender_link_sync/3,
attach_sender_link_sync/4,
attach_sender_link_sync/5,
attach_sender_link_sync/6,
attach_receiver_link/3,
attach_receiver_link/4,
attach_receiver_link/5,
attach_receiver_link/6,
attach_receiver_link/7,
attach_receiver_link/8,
attach_link/2,
detach_link/1,
send_msg/2,
Expand All @@ -46,6 +49,7 @@
-type rcv_settle_mode() :: amqp10_client_session:rcv_settle_mode().

-type terminus_durability() :: amqp10_client_session:terminus_durability().
-type terminus_capabilities() :: amqp10_client_session:terminus_capabilities().

-type target_def() :: amqp10_client_session:target_def().
-type source_def() :: amqp10_client_session:source_def().
Expand Down Expand Up @@ -165,6 +169,8 @@ end_session(Pid) ->
%% for the link before returning.
attach_sender_link_sync(Session, Name, Target) ->
attach_sender_link_sync(Session, Name, Target, mixed).
-spec attach_sender_link_sync(pid(), binary(), binary()) ->
{ok, link_ref()} | link_timeout.

%% @doc Synchronously attach a link on 'Session'.
%% This is a convenience function that awaits attached event
Expand All @@ -182,8 +188,18 @@ attach_sender_link_sync(Session, Name, Target, SettleMode) ->
snd_settle_mode(), terminus_durability()) ->
{ok, link_ref()} | link_timeout.
attach_sender_link_sync(Session, Name, Target, SettleMode, Durability) ->
attach_sender_link_sync(Session, Name, Target, SettleMode, Durability, none).

%% @doc Synchronously attach a link on 'Session'.
%% This is a convenience function that awaits attached event
%% for the link before returning.
-spec attach_sender_link_sync(pid(), binary(), binary(),
snd_settle_mode(), terminus_durability(),
terminus_capabilities()) ->
{ok, link_ref()} | link_timeout.
attach_sender_link_sync(Session, Name, Target, SettleMode, Durability, Capabilities) ->
{ok, Ref} = attach_sender_link(Session, Name, Target, SettleMode,
Durability),
Durability, Capabilities),
receive
{amqp10_event, {link, Ref, attached}} ->
{ok, Ref};
Expand Down Expand Up @@ -219,9 +235,21 @@ attach_sender_link(Session, Name, Target, SettleMode) ->
snd_settle_mode(), terminus_durability()) ->
{ok, link_ref()}.
attach_sender_link(Session, Name, Target, SettleMode, Durability) ->
attach_sender_link(Session, Name, Target, SettleMode, Durability, none).

%% @doc Attaches a sender link to a target.
%% This is asynchronous and will notify completion of the attach request to the
%% caller using an amqp10_event of the following format:
%% {amqp10_event, {link, LinkRef, attached | {detached, Why}}}
-spec attach_sender_link(pid(), binary(), binary(),
snd_settle_mode(), terminus_durability(),
terminus_capabilities()) ->
{ok, link_ref()}.
attach_sender_link(Session, Name, Target, SettleMode, Durability, Capabilities) ->
AttachArgs = #{name => Name,
role => {sender, #{address => Target,
durable => Durability}},
durable => Durability,
capabilities => Capabilities}},
snd_settle_mode => SettleMode,
rcv_settle_mode => first},
amqp10_client_session:attach(Session, AttachArgs).
Expand Down Expand Up @@ -272,7 +300,19 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) ->
-spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(),
terminus_durability(), filter(), properties()) ->
{ok, link_ref()}.
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties)
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties) ->
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties, none).

%% @doc Attaches a receiver link to a source.
%% This is asynchronous and will notify completion of the attach request to the
%% caller using an amqp10_event of the following format:
%% {amqp10_event, {link, LinkRef, attached | {detached, Why}}}
-spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(),
terminus_durability(), filter(), properties(),
terminus_capabilities()) ->
{ok, link_ref()}.
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter,
Properties, Capabilities)
when is_pid(Session) andalso
is_binary(Name) andalso
is_binary(Source) andalso
Expand All @@ -281,10 +321,13 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Prop
SettleMode == mixed) andalso
is_atom(Durability) andalso
is_map(Filter) andalso
is_map(Properties) ->
is_map(Properties) andalso
is_list(Capabilities) orelse
(Capabilities == none orelse is_binary(Capabilities)) ->
AttachArgs = #{name => Name,
role => {receiver, #{address => Source,
durable => Durability}, self()},
durable => Durability,
capabilities => Capabilities}, self()},
snd_settle_mode => SettleMode,
rcv_settle_mode => first,
filter => Filter,
Expand Down
29 changes: 25 additions & 4 deletions deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,14 @@
-type rcv_settle_mode() :: first | second.

-type terminus_durability() :: none | configuration | unsettled_state.
-type terminus_capabilities() :: none | binary() | list().

-type target_def() :: #{address => link_address(),
durable => terminus_durability()}.
durable => terminus_durability(),
capabilities => terminus_capabilities()}.
-type source_def() :: #{address => link_address(),
durable => terminus_durability()}.
durable => terminus_durability(),
capabilities => terminus_capabilities()}.

-type attach_role() :: {sender, target_def()} | {receiver, source_def(), pid()}.

Expand Down Expand Up @@ -109,6 +112,7 @@
-export_type([snd_settle_mode/0,
rcv_settle_mode/0,
terminus_durability/0,
terminus_capabilities/0,
attach_args/0,
attach_role/0,
target_def/0,
Expand Down Expand Up @@ -713,20 +717,24 @@ make_source(#{role := {sender, _}}) ->
make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter := Filter}) ->
Durable = translate_terminus_durability(maps:get(durable, Source, none)),
TranslatedFilter = translate_filters(Filter),
Capabilities = translate_terminus_capabilities(maps:get(capabilities, Source, none)),
#'v1_0.source'{address = {utf8, Address},
durable = {uint, Durable},
filter = TranslatedFilter}.
filter = TranslatedFilter,
capabilities = Capabilities}.

make_target(#{role := {receiver, _Source, _Pid}}) ->
#'v1_0.target'{};
make_target(#{role := {sender, #{address := Address} = Target}}) ->
Durable = translate_terminus_durability(maps:get(durable, Target, none)),
Capabilities = translate_terminus_capabilities(maps:get(capabilities, Target, none)),
TargetAddr = case is_binary(Address) of
true -> {utf8, Address};
false -> Address
end,
#'v1_0.target'{address = TargetAddr,
durable = {uint, Durable}}.
durable = {uint, Durable},
capabilities = Capabilities}.

max_message_size(#{max_message_size := Size})
when is_integer(Size) andalso
Expand Down Expand Up @@ -771,6 +779,19 @@ filter_value_type({T, _} = V) when is_atom(T) ->
%% looks like an already tagged type, just pass it through
V.

translate_terminus_capabilities(none) ->
undefined;
translate_terminus_capabilities(Capabilities) when is_binary(Capabilities) ->
{utf8, Capabilities};
translate_terminus_capabilities(CapabilitiesList) when is_list(CapabilitiesList) ->
{list, [filter_capability(V) || V <- CapabilitiesList]}.

filter_capability(V) when is_binary(V) ->
{utf8, V};
filter_capability({T, _} = V) when is_atom(T) ->
%% looks like an already tagged type, just pass it through
V.

% https://people.apache.org/~rgodfrey/amqp-1.0/apache-filters.html
translate_legacy_amqp_headers_binding(LegacyHeaders) ->
{map,
Expand Down
176 changes: 175 additions & 1 deletion deps/amqp10_client/test/system_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ groups() ->
{mock, [], [
insufficient_credit,
incoming_heartbeat,
multi_transfer_without_delivery_id
multi_transfer_without_delivery_id,
set_receiver_capabilities,
set_sender_capabilities,
set_sender_sync_capabilities
]}
].

Expand Down Expand Up @@ -687,6 +690,7 @@ subscribe_with_auto_flow_unsettled(Config) ->
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection).


insufficient_credit(Config) ->
Hostname = ?config(mock_host, Config),
Port = ?config(mock_port, Config),
Expand Down Expand Up @@ -790,6 +794,176 @@ multi_transfer_without_delivery_id(Config) ->
ok = amqp10_client:close_connection(Connection),
ok.

set_receiver_capabilities(Config) ->
Hostname = ?config(mock_host, Config),
Port = ?config(mock_port, Config),

OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) ->
{Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]}
end,
BeginStep = fun({1 = Ch, #'v1_0.begin'{}, _Pay}) ->
{Ch, [#'v1_0.begin'{remote_channel = {ushort, 1},
next_outgoing_id = {uint, 1},
incoming_window = {uint, 1000},
outgoing_window = {uint, 1000}}
]}
end,
AttachStep = fun({1 = Ch, #'v1_0.attach'{role = true,
name = Name,
source = #'v1_0.source'{
capabilities = {utf8, <<"capability-1">>}}}, <<>>}) ->
{Ch, [#'v1_0.attach'{name = Name,
handle = {uint, 99},
initial_delivery_count = {uint, 1},
role = false}
]}
end,

LinkCreditStep = fun({1 = Ch, #'v1_0.flow'{}, <<>>}) ->
{Ch, {multi, [[#'v1_0.transfer'{handle = {uint, 99},
delivery_id = {uint, 12},
more = true},
#'v1_0.data'{content = <<"hello ">>}],
[#'v1_0.transfer'{handle = {uint, 99},
% delivery_id can be omitted
% for continuation frames
delivery_id = undefined,
settled = undefined,
more = false},
#'v1_0.data'{content = <<"world">>}]
]}}
end,
Steps = [fun mock_server:recv_amqp_header_step/1,
fun mock_server:send_amqp_header_step/1,
mock_server:amqp_step(OpenStep),
mock_server:amqp_step(BeginStep),
mock_server:amqp_step(AttachStep),
mock_server:amqp_step(LinkCreditStep)
],

ok = mock_server:set_steps(?config(mock_server, Config), Steps),

Cfg = #{address => Hostname, port => Port, sasl => none, notify => self()},
{ok, Connection} = amqp10_client:open_connection(Cfg),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"mock1-received">>,
<<"test">>,
setlled, none,
#{}, % filter
#{}, % prorperties
<<"capability-1">>),
amqp10_client:flow_link_credit(Receiver, 100, 50),
receive
{amqp10_msg, Receiver, _InMsg} ->
ok
after 2000 ->
exit(delivery_timeout)
end,

ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection),
ok.

set_sender_capabilities(Config) ->
Hostname = ?config(mock_host, Config),
Port = ?config(mock_port, Config),
OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) ->
{Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]}
end,
BeginStep = fun({1 = Ch, #'v1_0.begin'{}, _Pay}) ->
{Ch, [#'v1_0.begin'{remote_channel = {ushort, 1},
next_outgoing_id = {uint, 1},
incoming_window = {uint, 1000},
outgoing_window = {uint, 1000}}
]}
end,
AttachStep = fun({1 = Ch, #'v1_0.attach'{role = false,
name = Name,
source = #'v1_0.source'{

},
target = #'v1_0.target'{
capabilities = {utf8, <<"capability-1">>}}}, <<>>}) ->
{Ch, [#'v1_0.attach'{name = Name,
handle = {uint, 99},
role = true}]}
end,
Steps = [fun mock_server:recv_amqp_header_step/1,
fun mock_server:send_amqp_header_step/1,
mock_server:amqp_step(OpenStep),
mock_server:amqp_step(BeginStep),
mock_server:amqp_step(AttachStep)],

ok = mock_server:set_steps(?config(mock_server, Config), Steps),

Cfg = #{address => Hostname, port => Port, sasl => none, notify => self()},
{ok, Connection} = amqp10_client:open_connection(Cfg),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"mock1-sender">>,
<<"test">>,
mixed,
none,
<<"capability-1">>),
await_link(Sender, attached, attached_timeout),
Msg = amqp10_msg:new(<<"mock-tag">>, <<"banana">>, true),
{error, insufficient_credit} = amqp10_client:send_msg(Sender, Msg),

ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection),
ok.


set_sender_sync_capabilities(Config) ->
Hostname = ?config(mock_host, Config),
Port = ?config(mock_port, Config),
OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) ->
{Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]}
end,
BeginStep = fun({1 = Ch, #'v1_0.begin'{}, _Pay}) ->
{Ch, [#'v1_0.begin'{remote_channel = {ushort, 1},
next_outgoing_id = {uint, 1},
incoming_window = {uint, 1000},
outgoing_window = {uint, 1000}}
]}
end,
AttachStep = fun({1 = Ch, #'v1_0.attach'{role = false,
name = Name,
source = #'v1_0.source'{

},
target = #'v1_0.target'{
capabilities = {list, [
{utf8,<<"capability-1">>},
{utf8,<<"capability-2">>}
]}
}}, <<>>}) ->
{Ch, [#'v1_0.attach'{name = Name,
handle = {uint, 99},
role = true}]}
end,
Steps = [fun mock_server:recv_amqp_header_step/1,
fun mock_server:send_amqp_header_step/1,
mock_server:amqp_step(OpenStep),
mock_server:amqp_step(BeginStep),
mock_server:amqp_step(AttachStep)],

ok = mock_server:set_steps(?config(mock_server, Config), Steps),

Cfg = #{address => Hostname, port => Port, sasl => none, notify => self()},
{ok, Connection} = amqp10_client:open_connection(Cfg),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"mock1-sender">>,
<<"test">>,
mixed,
none,
[<<"capability-1">>,<<"capability-2">>]),
Msg = amqp10_msg:new(<<"mock-tag">>, <<"banana">>, true),
{error, insufficient_credit} = amqp10_client:send_msg(Sender, Msg),

ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection),
ok.

outgoing_heartbeat(Config) ->
Hostname = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
Expand Down

0 comments on commit 52cd6be

Please sign in to comment.