From 52cd6be0399566edff7436905a6df80a5af69306 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Wed, 29 May 2024 12:41:36 +0200 Subject: [PATCH] Configure capabilities on the source/target field in the ATTACH frame --- deps/amqp10_client/src/amqp10_client.erl | 53 +++++- .../src/amqp10_client_session.erl | 29 ++- deps/amqp10_client/test/system_SUITE.erl | 176 +++++++++++++++++- 3 files changed, 248 insertions(+), 10 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client.erl b/deps/amqp10_client/src/amqp10_client.erl index bf00b531cc4c..200198a5bea0 100644 --- a/deps/amqp10_client/src/amqp10_client.erl +++ b/deps/amqp10_client/src/amqp10_client.erl @@ -20,14 +20,17 @@ attach_sender_link/3, attach_sender_link/4, attach_sender_link/5, + attach_sender_link/6, attach_sender_link_sync/3, attach_sender_link_sync/4, attach_sender_link_sync/5, + attach_sender_link_sync/6, attach_receiver_link/3, attach_receiver_link/4, attach_receiver_link/5, attach_receiver_link/6, attach_receiver_link/7, + attach_receiver_link/8, attach_link/2, detach_link/1, send_msg/2, @@ -46,6 +49,7 @@ -type rcv_settle_mode() :: amqp10_client_session:rcv_settle_mode(). -type terminus_durability() :: amqp10_client_session:terminus_durability(). +-type terminus_capabilities() :: amqp10_client_session:terminus_capabilities(). -type target_def() :: amqp10_client_session:target_def(). -type source_def() :: amqp10_client_session:source_def(). @@ -165,6 +169,8 @@ end_session(Pid) -> %% for the link before returning. attach_sender_link_sync(Session, Name, Target) -> attach_sender_link_sync(Session, Name, Target, mixed). +-spec attach_sender_link_sync(pid(), binary(), binary()) -> + {ok, link_ref()} | link_timeout. %% @doc Synchronously attach a link on 'Session'. %% This is a convenience function that awaits attached event @@ -182,8 +188,18 @@ attach_sender_link_sync(Session, Name, Target, SettleMode) -> snd_settle_mode(), terminus_durability()) -> {ok, link_ref()} | link_timeout. attach_sender_link_sync(Session, Name, Target, SettleMode, Durability) -> + attach_sender_link_sync(Session, Name, Target, SettleMode, Durability, none). + +%% @doc Synchronously attach a link on 'Session'. +%% This is a convenience function that awaits attached event +%% for the link before returning. +-spec attach_sender_link_sync(pid(), binary(), binary(), + snd_settle_mode(), terminus_durability(), + terminus_capabilities()) -> + {ok, link_ref()} | link_timeout. +attach_sender_link_sync(Session, Name, Target, SettleMode, Durability, Capabilities) -> {ok, Ref} = attach_sender_link(Session, Name, Target, SettleMode, - Durability), + Durability, Capabilities), receive {amqp10_event, {link, Ref, attached}} -> {ok, Ref}; @@ -219,9 +235,21 @@ attach_sender_link(Session, Name, Target, SettleMode) -> snd_settle_mode(), terminus_durability()) -> {ok, link_ref()}. attach_sender_link(Session, Name, Target, SettleMode, Durability) -> + attach_sender_link(Session, Name, Target, SettleMode, Durability, none). + +%% @doc Attaches a sender link to a target. +%% This is asynchronous and will notify completion of the attach request to the +%% caller using an amqp10_event of the following format: +%% {amqp10_event, {link, LinkRef, attached | {detached, Why}}} +-spec attach_sender_link(pid(), binary(), binary(), + snd_settle_mode(), terminus_durability(), + terminus_capabilities()) -> + {ok, link_ref()}. +attach_sender_link(Session, Name, Target, SettleMode, Durability, Capabilities) -> AttachArgs = #{name => Name, role => {sender, #{address => Target, - durable => Durability}}, + durable => Durability, + capabilities => Capabilities}}, snd_settle_mode => SettleMode, rcv_settle_mode => first}, amqp10_client_session:attach(Session, AttachArgs). @@ -272,7 +300,19 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) -> -spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(), terminus_durability(), filter(), properties()) -> {ok, link_ref()}. -attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties) +attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties) -> + attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties, none). + +%% @doc Attaches a receiver link to a source. +%% This is asynchronous and will notify completion of the attach request to the +%% caller using an amqp10_event of the following format: +%% {amqp10_event, {link, LinkRef, attached | {detached, Why}}} +-spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(), + terminus_durability(), filter(), properties(), + terminus_capabilities()) -> + {ok, link_ref()}. +attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, + Properties, Capabilities) when is_pid(Session) andalso is_binary(Name) andalso is_binary(Source) andalso @@ -281,10 +321,13 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Prop SettleMode == mixed) andalso is_atom(Durability) andalso is_map(Filter) andalso - is_map(Properties) -> + is_map(Properties) andalso + is_list(Capabilities) orelse + (Capabilities == none orelse is_binary(Capabilities)) -> AttachArgs = #{name => Name, role => {receiver, #{address => Source, - durable => Durability}, self()}, + durable => Durability, + capabilities => Capabilities}, self()}, snd_settle_mode => SettleMode, rcv_settle_mode => first, filter => Filter, diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index d7617e43fec3..58db2152648e 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -73,11 +73,14 @@ -type rcv_settle_mode() :: first | second. -type terminus_durability() :: none | configuration | unsettled_state. +-type terminus_capabilities() :: none | binary() | list(). -type target_def() :: #{address => link_address(), - durable => terminus_durability()}. + durable => terminus_durability(), + capabilities => terminus_capabilities()}. -type source_def() :: #{address => link_address(), - durable => terminus_durability()}. + durable => terminus_durability(), + capabilities => terminus_capabilities()}. -type attach_role() :: {sender, target_def()} | {receiver, source_def(), pid()}. @@ -109,6 +112,7 @@ -export_type([snd_settle_mode/0, rcv_settle_mode/0, terminus_durability/0, + terminus_capabilities/0, attach_args/0, attach_role/0, target_def/0, @@ -713,20 +717,24 @@ make_source(#{role := {sender, _}}) -> make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter := Filter}) -> Durable = translate_terminus_durability(maps:get(durable, Source, none)), TranslatedFilter = translate_filters(Filter), + Capabilities = translate_terminus_capabilities(maps:get(capabilities, Source, none)), #'v1_0.source'{address = {utf8, Address}, durable = {uint, Durable}, - filter = TranslatedFilter}. + filter = TranslatedFilter, + capabilities = Capabilities}. make_target(#{role := {receiver, _Source, _Pid}}) -> #'v1_0.target'{}; make_target(#{role := {sender, #{address := Address} = Target}}) -> Durable = translate_terminus_durability(maps:get(durable, Target, none)), + Capabilities = translate_terminus_capabilities(maps:get(capabilities, Target, none)), TargetAddr = case is_binary(Address) of true -> {utf8, Address}; false -> Address end, #'v1_0.target'{address = TargetAddr, - durable = {uint, Durable}}. + durable = {uint, Durable}, + capabilities = Capabilities}. max_message_size(#{max_message_size := Size}) when is_integer(Size) andalso @@ -771,6 +779,19 @@ filter_value_type({T, _} = V) when is_atom(T) -> %% looks like an already tagged type, just pass it through V. +translate_terminus_capabilities(none) -> + undefined; +translate_terminus_capabilities(Capabilities) when is_binary(Capabilities) -> + {utf8, Capabilities}; +translate_terminus_capabilities(CapabilitiesList) when is_list(CapabilitiesList) -> + {list, [filter_capability(V) || V <- CapabilitiesList]}. + +filter_capability(V) when is_binary(V) -> + {utf8, V}; +filter_capability({T, _} = V) when is_atom(T) -> + %% looks like an already tagged type, just pass it through + V. + % https://people.apache.org/~rgodfrey/amqp-1.0/apache-filters.html translate_legacy_amqp_headers_binding(LegacyHeaders) -> {map, diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl index 314040af392b..9f1042917f40 100644 --- a/deps/amqp10_client/test/system_SUITE.erl +++ b/deps/amqp10_client/test/system_SUITE.erl @@ -52,7 +52,10 @@ groups() -> {mock, [], [ insufficient_credit, incoming_heartbeat, - multi_transfer_without_delivery_id + multi_transfer_without_delivery_id, + set_receiver_capabilities, + set_sender_capabilities, + set_sender_sync_capabilities ]} ]. @@ -687,6 +690,7 @@ subscribe_with_auto_flow_unsettled(Config) -> ok = amqp10_client:end_session(Session), ok = amqp10_client:close_connection(Connection). + insufficient_credit(Config) -> Hostname = ?config(mock_host, Config), Port = ?config(mock_port, Config), @@ -790,6 +794,176 @@ multi_transfer_without_delivery_id(Config) -> ok = amqp10_client:close_connection(Connection), ok. +set_receiver_capabilities(Config) -> + Hostname = ?config(mock_host, Config), + Port = ?config(mock_port, Config), + + OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) -> + {Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]} + end, + BeginStep = fun({1 = Ch, #'v1_0.begin'{}, _Pay}) -> + {Ch, [#'v1_0.begin'{remote_channel = {ushort, 1}, + next_outgoing_id = {uint, 1}, + incoming_window = {uint, 1000}, + outgoing_window = {uint, 1000}} + ]} + end, + AttachStep = fun({1 = Ch, #'v1_0.attach'{role = true, + name = Name, + source = #'v1_0.source'{ + capabilities = {utf8, <<"capability-1">>}}}, <<>>}) -> + {Ch, [#'v1_0.attach'{name = Name, + handle = {uint, 99}, + initial_delivery_count = {uint, 1}, + role = false} + ]} + end, + + LinkCreditStep = fun({1 = Ch, #'v1_0.flow'{}, <<>>}) -> + {Ch, {multi, [[#'v1_0.transfer'{handle = {uint, 99}, + delivery_id = {uint, 12}, + more = true}, + #'v1_0.data'{content = <<"hello ">>}], + [#'v1_0.transfer'{handle = {uint, 99}, + % delivery_id can be omitted + % for continuation frames + delivery_id = undefined, + settled = undefined, + more = false}, + #'v1_0.data'{content = <<"world">>}] + ]}} + end, + Steps = [fun mock_server:recv_amqp_header_step/1, + fun mock_server:send_amqp_header_step/1, + mock_server:amqp_step(OpenStep), + mock_server:amqp_step(BeginStep), + mock_server:amqp_step(AttachStep), + mock_server:amqp_step(LinkCreditStep) + ], + + ok = mock_server:set_steps(?config(mock_server, Config), Steps), + + Cfg = #{address => Hostname, port => Port, sasl => none, notify => self()}, + {ok, Connection} = amqp10_client:open_connection(Cfg), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"mock1-received">>, + <<"test">>, + setlled, none, + #{}, % filter + #{}, % prorperties + <<"capability-1">>), + amqp10_client:flow_link_credit(Receiver, 100, 50), + receive + {amqp10_msg, Receiver, _InMsg} -> + ok + after 2000 -> + exit(delivery_timeout) + end, + + ok = amqp10_client:end_session(Session), + ok = amqp10_client:close_connection(Connection), + ok. + +set_sender_capabilities(Config) -> + Hostname = ?config(mock_host, Config), + Port = ?config(mock_port, Config), + OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) -> + {Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]} + end, + BeginStep = fun({1 = Ch, #'v1_0.begin'{}, _Pay}) -> + {Ch, [#'v1_0.begin'{remote_channel = {ushort, 1}, + next_outgoing_id = {uint, 1}, + incoming_window = {uint, 1000}, + outgoing_window = {uint, 1000}} + ]} + end, + AttachStep = fun({1 = Ch, #'v1_0.attach'{role = false, + name = Name, + source = #'v1_0.source'{ + + }, + target = #'v1_0.target'{ + capabilities = {utf8, <<"capability-1">>}}}, <<>>}) -> + {Ch, [#'v1_0.attach'{name = Name, + handle = {uint, 99}, + role = true}]} + end, + Steps = [fun mock_server:recv_amqp_header_step/1, + fun mock_server:send_amqp_header_step/1, + mock_server:amqp_step(OpenStep), + mock_server:amqp_step(BeginStep), + mock_server:amqp_step(AttachStep)], + + ok = mock_server:set_steps(?config(mock_server, Config), Steps), + + Cfg = #{address => Hostname, port => Port, sasl => none, notify => self()}, + {ok, Connection} = amqp10_client:open_connection(Cfg), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"mock1-sender">>, + <<"test">>, + mixed, + none, + <<"capability-1">>), + await_link(Sender, attached, attached_timeout), + Msg = amqp10_msg:new(<<"mock-tag">>, <<"banana">>, true), + {error, insufficient_credit} = amqp10_client:send_msg(Sender, Msg), + + ok = amqp10_client:end_session(Session), + ok = amqp10_client:close_connection(Connection), + ok. + + +set_sender_sync_capabilities(Config) -> + Hostname = ?config(mock_host, Config), + Port = ?config(mock_port, Config), + OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) -> + {Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]} + end, + BeginStep = fun({1 = Ch, #'v1_0.begin'{}, _Pay}) -> + {Ch, [#'v1_0.begin'{remote_channel = {ushort, 1}, + next_outgoing_id = {uint, 1}, + incoming_window = {uint, 1000}, + outgoing_window = {uint, 1000}} + ]} + end, + AttachStep = fun({1 = Ch, #'v1_0.attach'{role = false, + name = Name, + source = #'v1_0.source'{ + + }, + target = #'v1_0.target'{ + capabilities = {list, [ + {utf8,<<"capability-1">>}, + {utf8,<<"capability-2">>} + ]} + }}, <<>>}) -> + {Ch, [#'v1_0.attach'{name = Name, + handle = {uint, 99}, + role = true}]} + end, + Steps = [fun mock_server:recv_amqp_header_step/1, + fun mock_server:send_amqp_header_step/1, + mock_server:amqp_step(OpenStep), + mock_server:amqp_step(BeginStep), + mock_server:amqp_step(AttachStep)], + + ok = mock_server:set_steps(?config(mock_server, Config), Steps), + + Cfg = #{address => Hostname, port => Port, sasl => none, notify => self()}, + {ok, Connection} = amqp10_client:open_connection(Cfg), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"mock1-sender">>, + <<"test">>, + mixed, + none, + [<<"capability-1">>,<<"capability-2">>]), + Msg = amqp10_msg:new(<<"mock-tag">>, <<"banana">>, true), + {error, insufficient_credit} = amqp10_client:send_msg(Sender, Msg), + + ok = amqp10_client:end_session(Session), + ok = amqp10_client:close_connection(Connection), + ok. + outgoing_heartbeat(Config) -> Hostname = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),