From 9fc1e0cead2b5b72820579c747cadbe9661e9e4f Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Thu, 20 Jun 2024 17:01:06 +0200 Subject: [PATCH 01/35] Configure capabilities on the source/target field in the ATTACH frame --- deps/amqp10_client/.gitignore | 2 + deps/amqp10_client/BUILD.bazel | 6 +- deps/amqp10_client/app.bzl | 8 + deps/amqp10_client/src/amqp10_client.erl | 5 +- .../src/amqp10_client_session.erl | 27 +- .../test/activemq_ct_helpers.erl | 1 + deps/amqp10_client/test/ibmmq_ct_helpers.erl | 89 +++++ deps/amqp10_client/test/system_SUITE.erl | 319 ++++++++++++++++-- .../test/system_SUITE_data/ibmmq_runner | 97 ++++++ 9 files changed, 514 insertions(+), 40 deletions(-) create mode 100644 deps/amqp10_client/test/ibmmq_ct_helpers.erl create mode 100755 deps/amqp10_client/test/system_SUITE_data/ibmmq_runner diff --git a/deps/amqp10_client/.gitignore b/deps/amqp10_client/.gitignore index ac3616494721..685d2fea4246 100644 --- a/deps/amqp10_client/.gitignore +++ b/deps/amqp10_client/.gitignore @@ -4,3 +4,5 @@ # Downloaded ActiveMQ. /test/system_SUITE_data/apache-activemq-* +/test/system_SUITE_data/ibmmq/mq-container +/test/system_SUITE_data/ibmmq/*.tar.gz diff --git a/deps/amqp10_client/BUILD.bazel b/deps/amqp10_client/BUILD.bazel index 6606efaf289c..bba484f0e35a 100644 --- a/deps/amqp10_client/BUILD.bazel +++ b/deps/amqp10_client/BUILD.bazel @@ -117,13 +117,14 @@ rabbitmq_integration_suite( size = "medium", additional_beam = [ "test/activemq_ct_helpers.beam", + "test/ibmmq_ct_helpers.beam", "test/mock_server.beam", ], data = [ - "@activemq//:exec_dir", + "@activemq//:exec_dir", ], test_env = { - "ACTIVEMQ": "$TEST_SRCDIR/$TEST_WORKSPACE/external/activemq/bin/activemq", + "ACTIVEMQ": "$TEST_SRCDIR/$TEST_WORKSPACE/external/activemq/bin/activemq" }, deps = TEST_DEPS, ) @@ -140,6 +141,7 @@ eunit( name = "eunit", compiled_suites = [ ":test_activemq_ct_helpers_beam", + ":test_ibmmq_ct_helpers_beam", ":test_mock_server_beam", ], target = ":test_erlang_app", diff --git a/deps/amqp10_client/app.bzl b/deps/amqp10_client/app.bzl index 8fcdad73cf9d..de6c84488338 100644 --- a/deps/amqp10_client/app.bzl +++ b/deps/amqp10_client/app.bzl @@ -126,6 +126,14 @@ def test_suite_beam_files(name = "test_suite_beam_files"): app_name = "amqp10_client", erlc_opts = "//:test_erlc_opts", ) + erlang_bytecode( + name = "test_ibmmq_ct_helpers_beam", + testonly = True, + srcs = ["test/ibmmq_ct_helpers.erl"], + outs = ["test/ibmmq_ct_helpers.beam"], + app_name = "amqp10_client", + erlc_opts = "//:test_erlc_opts", + ) erlang_bytecode( name = "test_mock_server_beam", testonly = True, diff --git a/deps/amqp10_client/src/amqp10_client.erl b/deps/amqp10_client/src/amqp10_client.erl index e296d3ff8533..0eafc16ab936 100644 --- a/deps/amqp10_client/src/amqp10_client.erl +++ b/deps/amqp10_client/src/amqp10_client.erl @@ -164,6 +164,8 @@ end_session(Pid) -> %% @doc Synchronously attach a link on 'Session'. %% This is a convenience function that awaits attached event %% for the link before returning. +-spec attach_sender_link_sync(pid(), binary(), binary()) -> + {ok, link_ref()} | link_timeout. attach_sender_link_sync(Session, Name, Target) -> attach_sender_link_sync(Session, Name, Target, mixed). @@ -275,7 +277,8 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) -> -spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(), terminus_durability(), filter(), properties()) -> {ok, link_ref()}. -attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties) +attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, + Properties) when is_pid(Session) andalso is_binary(Name) andalso is_binary(Source) andalso diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 7e2c82560398..32e743ecddc2 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -70,11 +70,14 @@ -type input_handle() :: link_handle(). -type terminus_durability() :: none | configuration | unsettled_state. +-type terminus_capabilities() :: binary() | [binary(),...]. -type target_def() :: #{address => link_address(), - durable => terminus_durability()}. + durable => terminus_durability(), + capabilities => terminus_capabilities()}. -type source_def() :: #{address => link_address(), - durable => terminus_durability()}. + durable => terminus_durability(), + capabilities => terminus_capabilities()}. -type attach_role() :: {sender, target_def()} | {receiver, source_def(), pid()}. @@ -706,20 +709,24 @@ make_source(#{role := {sender, _}}) -> make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter := Filter}) -> Durable = translate_terminus_durability(maps:get(durable, Source, none)), TranslatedFilter = translate_filters(Filter), + Capabilities = translate_terminus_capabilities(maps:get(capabilities, Source, [])), #'v1_0.source'{address = {utf8, Address}, durable = {uint, Durable}, - filter = TranslatedFilter}. + filter = TranslatedFilter, + capabilities = Capabilities}. make_target(#{role := {receiver, _Source, _Pid}}) -> #'v1_0.target'{}; make_target(#{role := {sender, #{address := Address} = Target}}) -> Durable = translate_terminus_durability(maps:get(durable, Target, none)), + Capabilities = translate_terminus_capabilities(maps:get(capabilities, Target, [])), TargetAddr = case is_binary(Address) of true -> {utf8, Address}; false -> Address end, #'v1_0.target'{address = TargetAddr, - durable = {uint, Durable}}. + durable = {uint, Durable}, + capabilities = Capabilities}. max_message_size(#{max_message_size := Size}) when is_integer(Size) andalso @@ -762,6 +769,17 @@ filter_value_type({T, _} = V) when is_atom(T) -> %% looks like an already tagged type, just pass it through V. +translate_terminus_capabilities(Capabilities) when is_binary(Capabilities) -> + {symbol, Capabilities}; +translate_terminus_capabilities(CapabilitiesList) when is_list(CapabilitiesList) -> + {array, symbol, [filter_capability(V) || V <- CapabilitiesList]}. + +filter_capability(V) when is_binary(V) -> + {symbol, V}; +filter_capability(_) -> + %% Any other type is ignored + {}. + % https://people.apache.org/~rgodfrey/amqp-1.0/apache-filters.html translate_legacy_amqp_headers_binding(LegacyHeaders) -> {map, @@ -837,6 +855,7 @@ send_attach(Send, #{name := Name, role := RoleTuple} = Args, {FromPid, _}, rcv_settle_mode = rcv_settle_mode(Args), target = Target, max_message_size = MaxMessageSize}, + ok = Send(Attach, State), Ref = make_link_ref(Role, self(), OutHandle), diff --git a/deps/amqp10_client/test/activemq_ct_helpers.erl b/deps/amqp10_client/test/activemq_ct_helpers.erl index ba1b7fe5721e..2c0ebb5ae845 100644 --- a/deps/amqp10_client/test/activemq_ct_helpers.erl +++ b/deps/amqp10_client/test/activemq_ct_helpers.erl @@ -63,6 +63,7 @@ start_activemq_nodes(Config) -> ActivemqCmd = ?config(activemq_cmd, Config1), TCPPort = rabbit_ct_broker_helpers:get_node_config(Config1, 0, tcp_port_amqp), ConfigFile = ?config(activemq_config_filename, Config1), + ct:log("Running ~p", [ActivemqCmd]), Cmd = [ActivemqCmd, "start", {"-Dtestsuite.tcp_port_amqp=~b", [TCPPort]}, diff --git a/deps/amqp10_client/test/ibmmq_ct_helpers.erl b/deps/amqp10_client/test/ibmmq_ct_helpers.erl new file mode 100644 index 000000000000..aad76b3ce3df --- /dev/null +++ b/deps/amqp10_client/test/ibmmq_ct_helpers.erl @@ -0,0 +1,89 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(ibmmq_ct_helpers). + +-include_lib("common_test/include/ct.hrl"). + +-export([setup_steps/0, + teardown_steps/0, + init_config/1, + start_ibmmq_server/1, + stop_ibmmq_server/1]). + +setup_steps() -> + [fun init_config/1, + fun start_ibmmq_server/1 + ]. + +teardown_steps() -> + [ + fun stop_ibmmq_server/1 + ]. + +init_config(Config) -> + NodeConfig = [{tcp_port_amqp, 5672}], + rabbit_ct_helpers:set_config(Config, [ {rmq_nodes, [NodeConfig]}, + {rmq_hostname, "localhost"}, + {tcp_hostname_amqp, "localhost"}, + {sasl, {plain, <<"app">>, <<"passw0rd">>}} ]). + +start_ibmmq_server(Config) -> + IBMmqCmd = filename:join([?config(data_dir, Config), "ibmmq_runner"]), + Cmd = [IBMmqCmd, "start"], + ct:log("Running command ~p", [Cmd]), + case rabbit_ct_helpers:exec(Cmd, []) of + {ok, _} -> wait_for_ibmmq_nodes(Config); + Error -> ct:pal("Error: ~tp", [Error]), + {skip, "Failed to start IBM MQ"} + end. + +wait_for_ibmmq_nodes(Config) -> + Hostname = ?config(rmq_hostname, Config), + Ports = rabbit_ct_broker_helpers:get_node_configs(Config, tcp_port_amqp), + wait_for_ibmmq_ports(Config, Hostname, Ports). + +wait_for_ibmmq_ports(Config, Hostname, [Port | Rest]) -> + ct:log("Waiting for IBM MQ on port ~b", [Port]), + case wait_for_ibmmq_port(Hostname, Port, 60) of + ok -> + ct:log("IBM MQ ready on port ~b", [Port]), + wait_for_ibmmq_ports(Config, Hostname, Rest); + {error, _} -> + Msg = lists:flatten( + io_lib:format( + "Failed to start IBM MQ on port ~b; see IBM MQ logs", + [Port])), + ct:pal(?LOW_IMPORTANCE, Msg, []), + {skip, Msg} + end; +wait_for_ibmmq_ports(Config, _, []) -> + Config. + +wait_for_ibmmq_port(_, _, 0) -> + {error, econnrefused}; +wait_for_ibmmq_port(Hostname, Port, Retries) -> + case gen_tcp:connect(Hostname, Port, []) of + {ok, Connection} -> + gen_tcp:close(Connection), + ok; + {error, econnrefused} -> + timer:sleep(1000), + wait_for_ibmmq_port(Hostname, Port, Retries - 1); + Error -> + Error + end. + +stop_ibmmq_server(Config) -> + IBMmqCmd = filename:join([?config(data_dir, Config), "ibmmq_runner"]), + Cmd = [IBMmqCmd, "stop"], + ct:log("Running command ~p", [Cmd]), + case rabbit_ct_helpers:exec(Cmd, []) of + {ok, _} -> Config; + Error -> ct:pal("Error: ~tp", [Error]), + {skip, "Failed to stop IBM MQ"} + end. diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl index 27a59d5efef8..b3eb598b0843 100644 --- a/deps/amqp10_client/test/system_SUITE.erl +++ b/deps/amqp10_client/test/system_SUITE.erl @@ -24,6 +24,7 @@ all() -> {group, rabbitmq}, {group, rabbitmq_strict}, {group, activemq}, + {group, ibmmq}, {group, activemq_no_anon}, {group, mock} ]. @@ -32,6 +33,10 @@ groups() -> [ {rabbitmq, [], shared() ++ [notify_with_performative]}, {activemq, [], shared()}, + {ibmmq, [], [ + open_close_connection, + basic_roundtrip_ibmmq + ]}, {rabbitmq_strict, [], [ basic_roundtrip_tls, roundtrip_tls_global_config, @@ -53,7 +58,10 @@ groups() -> {mock, [], [ insufficient_credit, incoming_heartbeat, - multi_transfer_without_delivery_id + multi_transfer_without_delivery_id, + set_receiver_capabilities, + set_sender_capabilities, + set_sender_sync_capabilities ]} ]. @@ -122,6 +130,14 @@ init_per_group(activemq, Config0) -> Config = rabbit_ct_helpers:set_config(Config0, {sasl, anon}), rabbit_ct_helpers:run_steps(Config, activemq_ct_helpers:setup_steps("activemq.xml")); + +init_per_group(ibmmq, Config) -> + ct:log("Found arch: ~p", [erlang:system_info(system_architecture)]), + case string:find(erlang:system_info(system_architecture), "x86_64") of + nomatch -> {skip, no_arm64_docker_image_for_ibmmq}; + _ -> rabbit_ct_helpers:run_steps(Config, ibmmq_ct_helpers:setup_steps()) + end; + init_per_group(activemq_no_anon, Config0) -> Config = rabbit_ct_helpers:set_config( Config0, {sasl, {plain, <<"user">>, <<"password">>}}), @@ -137,7 +153,9 @@ init_per_group(azure, Config) -> ]); init_per_group(mock, Config) -> rabbit_ct_helpers:set_config(Config, [{mock_port, 25000}, + {tcp_port_amqp, 25000}, {mock_host, "localhost"}, + {tcp_hostname_amqp, "localhost"}, {sasl, none} ]). end_per_group(rabbitmq, Config) -> @@ -146,6 +164,8 @@ end_per_group(rabbitmq_strict, Config) -> rabbit_ct_helpers:run_steps(Config, rabbit_ct_broker_helpers:teardown_steps()); end_per_group(activemq, Config) -> rabbit_ct_helpers:run_steps(Config, activemq_ct_helpers:teardown_steps()); +end_per_group(ibmmq, Config) -> + rabbit_ct_helpers:run_steps(Config, ibmmq_ct_helpers:teardown_steps()); end_per_group(activemq_no_anon, Config) -> rabbit_ct_helpers:run_steps(Config, activemq_ct_helpers:teardown_steps()); end_per_group(_, Config) -> @@ -156,9 +176,11 @@ end_per_group(_, Config) -> %% ------------------------------------------------------------------- init_per_testcase(_Test, Config) -> + ct:log("Setting per test case"), case lists:keyfind(mock_port, 1, Config) of {_, Port} -> M = mock_server:start(Port), + ct:log("Setting mock server"), rabbit_ct_helpers:set_config(Config, {mock_server, M}); _ -> Config end. @@ -175,19 +197,19 @@ open_close_connection(Config) -> Hostname = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), %% an address list - OpnConf = #{addresses => [Hostname], + OpenConf = #{addresses => [Hostname], port => Port, notify => self(), container_id => <<"open_close_connection_container">>, sasl => ?config(sasl, Config)}, - {ok, Connection} = amqp10_client:open_connection(Hostname, Port), - {ok, Connection2} = amqp10_client:open_connection(OpnConf), + ct:log("Connecting to ~p", [OpenConf]), + {ok, Connection2} = amqp10_client:open_connection(OpenConf), receive - {amqp10_event, {connection, Connection2, opened}} -> ok + {amqp10_event, {connection, Connection2, opened}} -> ct:log("connection opened"), ok after 5000 -> exit(connection_timeout) end, ok = amqp10_client:close_connection(Connection2), - ok = amqp10_client:close_connection(Connection). + ct:log("Closed connection ."). open_connection_plain_sasl(Config) -> Hostname = ?config(rmq_hostname, Config), @@ -253,10 +275,10 @@ open_connection_plain_sasl_failure(Config) -> ok = amqp10_client:close_connection(Connection). basic_roundtrip(Config) -> - application:start(sasl), + application:start(sasl), Hostname = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), - OpenConf = #{address => Hostname, port => Port, sasl => anon}, + OpenConf = #{address => Hostname, port => Port, sasl => ?config(sasl, Config)}, roundtrip(OpenConf). basic_roundtrip_tls(Config) -> @@ -323,54 +345,100 @@ roundtrip_large_messages(Config) -> DataMb = rand:bytes(1024 * 1024), Data8Mb = rand:bytes(8 * 1024 * 1024), Data64Mb = rand:bytes(64 * 1024 * 1024), - ok = roundtrip(OpenConf, DataKb), - ok = roundtrip(OpenConf, DataMb), - ok = roundtrip(OpenConf, Data8Mb), - ok = roundtrip(OpenConf, Data64Mb). + ok = roundtrip(OpenConf, [{body, DataKb}]), + ok = roundtrip(OpenConf, [{body, DataMb}]), + ok = roundtrip(OpenConf, [{body, Data8Mb}]), + ok = roundtrip(OpenConf, [{body, Data64Mb}]). + +basic_roundtrip_ibmmq(Config) -> + application:start(sasl), + Hostname = ?config(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + OpenConf = #{address => Hostname, port => Port, sasl => ?config(sasl, Config)}, + roundtrip(OpenConf, [ + {body, <<"banana">>}, + {destination, <<"DEV.QUEUE.3">>}, + {sender_capabilities, <<"queue">>}, + {receiver_capabilities, <<"queue">>}, + {message_annotations, #{}} + ], [creation_time]). roundtrip(OpenConf) -> - roundtrip(OpenConf, <<"banana">>). + roundtrip(OpenConf, [], []). + +roundtrip(OpenConf, Args) -> + roundtrip(OpenConf, Args, []). + +roundtrip(OpenConf, Args, DoNotAssertMessageProperties) -> + Body = proplists:get_value(body, Args, <<"banana">>), + Destination = proplists:get_value(destination, Args, <<"test1">>), + SenderCapabilities = proplists:get_value(sender_capabilities, Args, <<>>), + ReceiverCapabilities = proplists:get_value(receiver_capabilities, Args, <<>>), + MessageAnnotations = proplists:get_value(message_annotations, Args, + #{<<"x-key">> => <<"x-value">>, + <<"x_key">> => <<"x_value">>}), -roundtrip(OpenConf, Body) -> {ok, Connection} = amqp10_client:open_connection(OpenConf), {ok, Session} = amqp10_client:begin_session(Connection), - {ok, Sender} = amqp10_client:attach_sender_link( - Session, <<"banana-sender">>, <<"test1">>, settled, unsettled_state), - await_link(Sender, credited, link_credit_timeout), + SenderAttachArgs = #{name => <<"banana-sender">>, + role => {sender, #{address => Destination, + durable => unsettled_state, + capabilities => SenderCapabilities}}, + snd_settle_mode => settled, + rcv_settle_mode => first, + filter => #{}, + properties => #{} + }, + {ok, Sender} = amqp10_client:attach_link(Session, SenderAttachArgs), + await_link(Sender, attached, attached_timeout), Now = os:system_time(millisecond), - Props = #{creation_time => Now, - message_id => <<"my message ID">>, - correlation_id => <<"my correlation ID">>, + Props = #{content_encoding => <<"my content encoding">>, content_type => <<"my content type">>, - content_encoding => <<"my content encoding">>, - group_id => <<"my group ID">>}, + correlation_id => <<"my correlation ID">>, + creation_time => Now, + group_id => <<"my group ID">>, + message_id => <<"my message ID">>, + to => <<"localhost">> + }, Msg0 = amqp10_msg:new(<<"my-tag">>, Body, true), Msg1 = amqp10_msg:set_application_properties(#{"a_key" => "a_value"}, Msg0), Msg2 = amqp10_msg:set_properties(Props, Msg1), - Msg = amqp10_msg:set_message_annotations(#{<<"x-key 1">> => "value 1", - <<"x-key 2">> => "value 2"}, Msg2), + Msg = amqp10_msg:set_message_annotations(MessageAnnotations, Msg2), ok = amqp10_client:send_msg(Sender, Msg), ok = amqp10_client:detach_link(Sender), await_link(Sender, {detached, normal}, link_detach_timeout), {error, link_not_found} = amqp10_client:detach_link(Sender), - {ok, Receiver} = amqp10_client:attach_receiver_link( - Session, <<"banana-receiver">>, <<"test1">>, settled, unsettled_state), + ReceiverAttachArgs = #{ + name => <<"banana-receiver">>, + role => {receiver, #{address => Destination, + durable => unsettled_state, + capabilities => ReceiverCapabilities}, self()}, + snd_settle_mode => settled, + rcv_settle_mode => first, + filter => #{}, + properties => #{} + }, + {ok, Receiver} = amqp10_client:attach_link(Session, ReceiverAttachArgs), {ok, OutMsg} = amqp10_client:get_msg(Receiver, 4 * 60_000), ok = amqp10_client:end_session(Session), ok = amqp10_client:close_connection(Connection), % ct:pal(?LOW_IMPORTANCE, "roundtrip message Out: ~tp~nIn: ~tp~n", [OutMsg, Msg]), - ?assertMatch(Props, amqp10_msg:properties(OutMsg)), + ActualProps = amqp10_msg:properties(OutMsg), + [ ?assertEqual(V, maps:get(K, ActualProps)) || K := V <- Props, + not lists:member(K, DoNotAssertMessageProperties)], + ?assertEqual(#{<<"a_key">> => <<"a_value">>}, amqp10_msg:application_properties(OutMsg)), - ?assertMatch(#{<<"x-key 1">> := <<"value 1">>, - <<"x-key 2">> := <<"value 2">>}, amqp10_msg:message_annotations(OutMsg)), + ActualMessageAnnotations = amqp10_msg:message_annotations(OutMsg), + [ ?assertEqual(V, maps:get(K, ActualMessageAnnotations)) || K := V <- MessageAnnotations], + ?assertEqual([Body], amqp10_msg:body(OutMsg)), ok. filtered_roundtrip(OpenConf) -> - filtered_roundtrip(OpenConf, <<"banana">>). + filtered_roundtrip(OpenConf, <<"test1">>). filtered_roundtrip(OpenConf, Body) -> {ok, Connection} = amqp10_client:open_connection(OpenConf), @@ -389,7 +457,7 @@ filtered_roundtrip(OpenConf, Body) -> {ok, DefaultReceiver} = amqp10_client:attach_receiver_link(Session, <<"default-receiver">>, - <<"test1">>, + <<"test1">>, settled, unsettled_state), ok = amqp10_client:send_msg(Sender, Msg1), @@ -405,12 +473,14 @@ filtered_roundtrip(OpenConf, Body) -> ok = amqp10_client:send_msg(Sender, Msg2), + SelectorFilter = #{<<"apache.org:selector-filter:string">> => + <<"amqp.annotation.x-opt-enqueuedtimeutc > ", Now2Binary/binary>>}, {ok, FilteredReceiver} = amqp10_client:attach_receiver_link(Session, <<"filtered-receiver">>, - <<"test1">>, + <<"test1">>, settled, unsettled_state, - #{<<"apache.org:selector-filter:string">> => <<"amqp.annotation.x-opt-enqueuedtimeutc > ", Now2Binary/binary>>}), + SelectorFilter), {ok, OutMsg2} = amqp10_client:get_msg(DefaultReceiver, 60_000 * 4), ?assertEqual(<<"msg-2-tag">>, amqp10_msg:delivery_tag(OutMsg2)), @@ -759,6 +829,7 @@ subscribe_with_auto_flow_unsettled(Config) -> ok = amqp10_client:end_session(Session), ok = amqp10_client:close_connection(Connection). + insufficient_credit(Config) -> Hostname = ?config(mock_host, Config), Port = ?config(mock_port, Config), @@ -862,6 +933,188 @@ multi_transfer_without_delivery_id(Config) -> ok = amqp10_client:close_connection(Connection), ok. +set_receiver_capabilities(Config) -> + Hostname = ?config(tcp_hostname_amqp, Config), + Port = ?config(tcp_port_amqp, Config), + +% Hostname = ?config(mock_host, Config), +% Port = ?config(mock_port, Config), + + OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) -> + {Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]} + end, + BeginStep = fun({1 = Ch, #'v1_0.begin'{}, _Pay}) -> + {Ch, [#'v1_0.begin'{remote_channel = {ushort, 1}, + next_outgoing_id = {uint, 1}, + incoming_window = {uint, 1000}, + outgoing_window = {uint, 1000}} + ]} + end, + AttachStep = fun({1 = Ch, #'v1_0.attach'{role = true, + name = Name, + source = #'v1_0.source'{ + capabilities = {symbol, <<"capability-1">>}}}, <<>>}) -> + {Ch, [#'v1_0.attach'{name = Name, + handle = {uint, 99}, + initial_delivery_count = {uint, 1}, + role = false} + ]} + end, + + LinkCreditStep = fun({1 = Ch, #'v1_0.flow'{}, <<>>}) -> + {Ch, {multi, [[#'v1_0.transfer'{handle = {uint, 99}, + delivery_id = {uint, 12}, + more = true}, + #'v1_0.data'{content = <<"hello ">>}], + [#'v1_0.transfer'{handle = {uint, 99}, + % delivery_id can be omitted + % for continuation frames + delivery_id = undefined, + settled = undefined, + more = false}, + #'v1_0.data'{content = <<"world">>}] + ]}} + end, + Steps = [fun mock_server:recv_amqp_header_step/1, + fun mock_server:send_amqp_header_step/1, + mock_server:amqp_step(OpenStep), + mock_server:amqp_step(BeginStep), + mock_server:amqp_step(AttachStep), + mock_server:amqp_step(LinkCreditStep) + ], + + ok = mock_server:set_steps(?config(mock_server, Config), Steps), + + Cfg = #{address => Hostname, port => Port, sasl => none, notify => self()}, + {ok, Connection} = amqp10_client:open_connection(Cfg), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + AttachArgs = #{name => <<"mock1-received">>, + role => {receiver, #{address => <<"test">>, + durable => none, + capabilities => <<"capability-1">>}, self()}, + snd_settle_mode => setlled, + rcv_settle_mode => first, + filter => #{}, + properties => #{}}, + {ok, Receiver} = amqp10_client:attach_link(Session, AttachArgs), + amqp10_client:flow_link_credit(Receiver, 100, 50), + receive + {amqp10_msg, Receiver, _InMsg} -> + ok + after 2000 -> + exit(delivery_timeout) + end, + + ok = amqp10_client:end_session(Session), + ok = amqp10_client:close_connection(Connection), + ok. + +set_sender_capabilities(Config) -> + Hostname = ?config(tcp_hostname_amqp, Config), + Port = ?config(tcp_port_amqp, Config), + OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) -> + {Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]} + end, + BeginStep = fun({1 = Ch, #'v1_0.begin'{}, _Pay}) -> + {Ch, [#'v1_0.begin'{remote_channel = {ushort, 1}, + next_outgoing_id = {uint, 1}, + incoming_window = {uint, 1000}, + outgoing_window = {uint, 1000}} + ]} + end, + AttachStep = fun({1 = Ch, #'v1_0.attach'{role = false, + name = Name, + source = #'v1_0.source'{ + + }, + target = #'v1_0.target'{ + capabilities = {symbol, <<"capability-1">>}}}, <<>>}) -> + {Ch, [#'v1_0.attach'{name = Name, + handle = {uint, 99}, + role = true}]} + end, + Steps = [fun mock_server:recv_amqp_header_step/1, + fun mock_server:send_amqp_header_step/1, + mock_server:amqp_step(OpenStep), + mock_server:amqp_step(BeginStep), + mock_server:amqp_step(AttachStep)], + + ok = mock_server:set_steps(?config(mock_server, Config), Steps), + + Cfg = #{address => Hostname, port => Port, sasl => none, notify => self()}, + {ok, Connection} = amqp10_client:open_connection(Cfg), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + AttachArgs = #{name => <<"mock1-sender">>, + role => {sender, #{address => <<"test">>, + durable => none, + capabilities => <<"capability-1">>}}, + snd_settle_mode => mixed, + rcv_settle_mode => first}, + {ok, Sender} = amqp10_client:attach_link(Session, AttachArgs), + await_link(Sender, attached, attached_timeout), + Msg = amqp10_msg:new(<<"mock-tag">>, <<"banana">>, true), + {error, insufficient_credit} = amqp10_client:send_msg(Sender, Msg), + + ok = amqp10_client:end_session(Session), + ok = amqp10_client:close_connection(Connection), + ok. + + +set_sender_sync_capabilities(Config) -> + Hostname = ?config(tcp_hostname_amqp, Config), + Port = ?config(tcp_port_amqp, Config), + + OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) -> + {Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]} + end, + BeginStep = fun({1 = Ch, #'v1_0.begin'{}, _Pay}) -> + {Ch, [#'v1_0.begin'{remote_channel = {ushort, 1}, + next_outgoing_id = {uint, 1}, + incoming_window = {uint, 1000}, + outgoing_window = {uint, 1000}} + ]} + end, + AttachStep = fun({1 = Ch, #'v1_0.attach'{role = false, + name = Name, + source = #'v1_0.source'{ + + }, + target = #'v1_0.target'{ + capabilities = {array, symbol, [ + {symbol,<<"capability-1">>}, + {symbol,<<"capability-2">>} + ]} + }}, <<>>}) -> + {Ch, [#'v1_0.attach'{name = Name, + handle = {uint, 99}, + role = true}]} + end, + Steps = [fun mock_server:recv_amqp_header_step/1, + fun mock_server:send_amqp_header_step/1, + mock_server:amqp_step(OpenStep), + mock_server:amqp_step(BeginStep), + mock_server:amqp_step(AttachStep)], + + ok = mock_server:set_steps(?config(mock_server, Config), Steps), + + Cfg = #{address => Hostname, port => Port, sasl => none, notify => self()}, + {ok, Connection} = amqp10_client:open_connection(Cfg), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + AttachArgs = #{name => <<"mock1-sender">>, + role => {sender, #{address => <<"test">>, + durable => none, + capabilities => [<<"capability-1">>,<<"capability-2">>]}}, + snd_settle_mode => mixed, + rcv_settle_mode => first}, + {ok, Sender} = amqp10_client:attach_link(Session, AttachArgs), + await_link(Sender, attached, attached_timeout), + Msg = amqp10_msg:new(<<"mock-tag">>, <<"banana">>, true), + {error, insufficient_credit} = amqp10_client:send_msg(Sender, Msg), + + ok = amqp10_client:end_session(Session), + ok = amqp10_client:close_connection(Connection), + ok. + outgoing_heartbeat(Config) -> Hostname = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), diff --git a/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner b/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner new file mode 100755 index 000000000000..98c84f6d7a29 --- /dev/null +++ b/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner @@ -0,0 +1,97 @@ +#!/bin/bash + +SCRIPT="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +set -u + +IMAGE=pivotalrabbitmq/ibm-mqadvanced-server-dev +IMAGE_TAG=9.3.5.1-amd64 + +kill_container_if_exist() { + if docker stop $1 &> /dev/null; then + docker rm $1 &> /dev/null + fi +} + +ensure_docker_image_exists() { + TAG=`docker images --filter reference=$IMAGE | grep $IMAGE_TAG` + if [ -z ${TAG+x} ] + then + echo "Docker image ${IMAGE}:${IMAGE_TAG} does not exist" + exit 1 + else + echo "Docker image ${IMAGE}:${IMAGE_TAG} ready" + fi +} + +wait_for_message() { + attemps_left=10 + while ! docker logs $1 2>&1 | grep -q "$2"; + do + sleep 5 + print "Waiting 5sec for $1 to start ($attemps_left attempts left )..." + ((attemps_left--)) + if [[ "$attemps_left" -lt 1 ]]; then + print "Timed out waiting" + save_container_log $1 + exit 1 + fi + done +} +declare -i PADDING_LEVEL=0 + +print() { + tabbing="" + if [[ $PADDING_LEVEL -gt 0 ]]; then + for i in $(seq $PADDING_LEVEL); do + tabbing="$tabbing\t" + done + fi + echo -e "$tabbing$1" +} + +invoke_start(){ + kill_container_if_exist ibmmq + ensure_docker_image_exists + + docker run --name ibmmq \ + --env LICENSE=accept \ + --env MQ_QMGR_NAME=QM1 \ + --env MQ_APP_PASSWORD=passw0rd \ + --env MQ_ADMIN_PASSWORD=passw0rd \ + --env LICENSE=accept \ + --publish 1414:1414 \ + --publish 9443:9443 \ + --publish 5672:5672 \ + --detach \ + $IMAGE:$IMAGE_TAG + wait_for_message ibmmq "The listener 'SYSTEM.LISTENER.TCP.1' has started." + wait_for_message ibmmq "Successfully loaded default keystore" + + docker exec ibmmq bash -c 'echo "SET CHLAUTH(SYSTEM.DEF.AMQP) TYPE(ADDRESSMAP) ADDRESS(*) MCAUSER(app)" | /opt/mqm/bin/runmqsc QM1' + docker exec ibmmq bash -c 'echo "STOP SERVICE(SYSTEM.AMQP.SERVICE)" | /opt/mqm/bin/runmqsc QM1' + docker exec ibmmq bash -c 'echo "START SERVICE(SYSTEM.AMQP.SERVICE)" | /opt/mqm/bin/runmqsc QM1' + docker exec ibmmq bash -c 'echo "START CHANNEL(SYSTEM.DEF.AMQP)" | /opt/mqm/bin/runmqsc QM1' + wait_for_message ibmmq "The Server 'SYSTEM.AMQP.SERVICE' has started" + sleep 5 +} + +invoke_stop(){ + kill_container_if_exist ibmmq +} + +case "$1" in + version) + echo "IBM MQ ${IMAGE}:${IMAGE_TAG}" + ;; + build) + build_docker_image + ;; + start) + invoke_start + ;; + stop) + invoke_stop + exit $? + ;; +esac From 1dca32ff62b59489339f0f78a36ed702648fd1e4 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Thu, 18 Jul 2024 18:24:56 +0200 Subject: [PATCH 02/35] Minor cleanup --- deps/amqp10_client/test/system_SUITE.erl | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl index b3eb598b0843..452de1bfb624 100644 --- a/deps/amqp10_client/test/system_SUITE.erl +++ b/deps/amqp10_client/test/system_SUITE.erl @@ -208,8 +208,7 @@ open_close_connection(Config) -> {amqp10_event, {connection, Connection2, opened}} -> ct:log("connection opened"), ok after 5000 -> exit(connection_timeout) end, - ok = amqp10_client:close_connection(Connection2), - ct:log("Closed connection ."). + ok = amqp10_client:close_connection(Connection2). open_connection_plain_sasl(Config) -> Hostname = ?config(rmq_hostname, Config), @@ -425,7 +424,6 @@ roundtrip(OpenConf, Args, DoNotAssertMessageProperties) -> ok = amqp10_client:end_session(Session), ok = amqp10_client:close_connection(Connection), - % ct:pal(?LOW_IMPORTANCE, "roundtrip message Out: ~tp~nIn: ~tp~n", [OutMsg, Msg]), ActualProps = amqp10_msg:properties(OutMsg), [ ?assertEqual(V, maps:get(K, ActualProps)) || K := V <- Props, not lists:member(K, DoNotAssertMessageProperties)], From edbf33e5b398631259e9e4bbcebfddfbf5c51ec4 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Fri, 26 Jul 2024 15:49:17 +0200 Subject: [PATCH 03/35] Ignore sender/receiver non-binary capabilities --- .../src/amqp10_client_session.erl | 10 ++--- deps/amqp10_client/test/system_SUITE.erl | 42 ++++++++++++++++--- 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 32e743ecddc2..bf9725acc21a 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -772,13 +772,9 @@ filter_value_type({T, _} = V) when is_atom(T) -> translate_terminus_capabilities(Capabilities) when is_binary(Capabilities) -> {symbol, Capabilities}; translate_terminus_capabilities(CapabilitiesList) when is_list(CapabilitiesList) -> - {array, symbol, [filter_capability(V) || V <- CapabilitiesList]}. - -filter_capability(V) when is_binary(V) -> - {symbol, V}; -filter_capability(_) -> - %% Any other type is ignored - {}. + {array, symbol, [{symbol, V} || V <- CapabilitiesList, is_binary(V)]}; +translate_terminus_capabilities(_) -> + []. % https://people.apache.org/~rgodfrey/amqp-1.0/apache-filters.html translate_legacy_amqp_headers_binding(LegacyHeaders) -> diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl index 452de1bfb624..de7121a4a5a3 100644 --- a/deps/amqp10_client/test/system_SUITE.erl +++ b/deps/amqp10_client/test/system_SUITE.erl @@ -35,7 +35,9 @@ groups() -> {activemq, [], shared()}, {ibmmq, [], [ open_close_connection, - basic_roundtrip_ibmmq + basic_roundtrip_ibmmq, + basic_roundtrip_with_several_capabilities_ibmmq, + basic_roundtrip_with_non_binary_capability_is_ignored ]}, {rabbitmq_strict, [], [ basic_roundtrip_tls, @@ -131,7 +133,7 @@ init_per_group(activemq, Config0) -> rabbit_ct_helpers:run_steps(Config, activemq_ct_helpers:setup_steps("activemq.xml")); -init_per_group(ibmmq, Config) -> +init_per_group(ibmmq, Config) -> ct:log("Found arch: ~p", [erlang:system_info(system_architecture)]), case string:find(erlang:system_info(system_architecture), "x86_64") of nomatch -> {skip, no_arm64_docker_image_for_ibmmq}; @@ -274,7 +276,7 @@ open_connection_plain_sasl_failure(Config) -> ok = amqp10_client:close_connection(Connection). basic_roundtrip(Config) -> - application:start(sasl), + application:start(sasl), Hostname = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), OpenConf = #{address => Hostname, port => Port, sasl => ?config(sasl, Config)}, @@ -362,6 +364,33 @@ basic_roundtrip_ibmmq(Config) -> {message_annotations, #{}} ], [creation_time]). + +basic_roundtrip_with_several_capabilities_ibmmq(Config) -> + application:start(sasl), + Hostname = ?config(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + OpenConf = #{address => Hostname, port => Port, sasl => ?config(sasl, Config)}, + roundtrip(OpenConf, [ + {body, <<"banana">>}, + {destination, <<"DEV.QUEUE.3">>}, + {sender_capabilities, [<<"queue">>, <<"tx-capabilities">>, dummy]}, + {receiver_capabilities, <<"queue">>}, + {message_annotations, #{}} + ], [creation_time]). + +basic_roundtrip_with_non_binary_capability_is_ignored(Config) -> + application:start(sasl), + Hostname = ?config(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + OpenConf = #{address => Hostname, port => Port, sasl => ?config(sasl, Config)}, + roundtrip(OpenConf, [ + {body, <<"banana">>}, + {destination, <<"DEV.QUEUE.3">>}, + {sender_capabilities, [<<"queue">>, dummy]}, + {receiver_capabilities, <<"queue">>}, + {message_annotations, #{}} + ], [creation_time]). + roundtrip(OpenConf) -> roundtrip(OpenConf, [], []). @@ -425,9 +454,9 @@ roundtrip(OpenConf, Args, DoNotAssertMessageProperties) -> ok = amqp10_client:close_connection(Connection), ActualProps = amqp10_msg:properties(OutMsg), - [ ?assertEqual(V, maps:get(K, ActualProps)) || K := V <- Props, + [ ?assertEqual(V, maps:get(K, ActualProps)) || K := V <- Props, not lists:member(K, DoNotAssertMessageProperties)], - + ?assertEqual(#{<<"a_key">> => <<"a_value">>}, amqp10_msg:application_properties(OutMsg)), ActualMessageAnnotations = amqp10_msg:message_annotations(OutMsg), [ ?assertEqual(V, maps:get(K, ActualMessageAnnotations)) || K := V <- MessageAnnotations], @@ -1169,6 +1198,7 @@ incoming_heartbeat(Config) -> %%% await_link(Who, What, Err) -> + ct:log("await_link ..."), receive {amqp10_event, {link, Who0, What0}} when Who0 =:= Who andalso @@ -1176,8 +1206,10 @@ await_link(Who, What, Err) -> ok; {amqp10_event, {link, Who0, {detached, Why}}} when Who0 =:= Who -> + ct:log("await_link fail ..."), ct:fail(Why) after 5000 -> + ct:log("await_link fail ~p", [Err]), flush(), ct:fail(Err) end. From 619ab0009858bef1bcbba3163c03df23c4a056bc Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Fri, 26 Jul 2024 16:52:52 +0200 Subject: [PATCH 04/35] interim change ct:log to be able to log what capabilites are sent on each test case scenario it looks like some times the test case passes because the non-binary value `dummy` is properly skipped but sometimes the test cases fails because it is not skipping it . i.e. is_binary(dummy) returns true. --- deps/amqp10_client/src/amqp10_client_session.erl | 2 ++ deps/amqp10_client/test/system_SUITE.erl | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index bf9725acc21a..9c7402d3f3a6 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -710,6 +710,7 @@ make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter : Durable = translate_terminus_durability(maps:get(durable, Source, none)), TranslatedFilter = translate_filters(Filter), Capabilities = translate_terminus_capabilities(maps:get(capabilities, Source, [])), + ct:log("make_source capabilities : ~p", [Capabilities]), #'v1_0.source'{address = {utf8, Address}, durable = {uint, Durable}, filter = TranslatedFilter, @@ -720,6 +721,7 @@ make_target(#{role := {receiver, _Source, _Pid}}) -> make_target(#{role := {sender, #{address := Address} = Target}}) -> Durable = translate_terminus_durability(maps:get(durable, Target, none)), Capabilities = translate_terminus_capabilities(maps:get(capabilities, Target, [])), + ct:log("make_target capabilities : ~p", [Capabilities]), TargetAddr = case is_binary(Address) of true -> {utf8, Address}; false -> Address diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl index de7121a4a5a3..ec708850a1e8 100644 --- a/deps/amqp10_client/test/system_SUITE.erl +++ b/deps/amqp10_client/test/system_SUITE.erl @@ -378,7 +378,7 @@ basic_roundtrip_with_several_capabilities_ibmmq(Config) -> {message_annotations, #{}} ], [creation_time]). -basic_roundtrip_with_non_binary_capability_is_ignored(Config) -> +basic_roundtrip_with_non_binary_capability_is_ignored(Config) -> application:start(sasl), Hostname = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), From 800085ffd8e0db821f04a0fc891fa4948c2bd417 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Fri, 26 Jul 2024 17:02:53 +0200 Subject: [PATCH 05/35] Use logger:debug to log capabilities --- deps/amqp10_client/src/amqp10_client_session.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 9c7402d3f3a6..9752846b052e 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -710,7 +710,7 @@ make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter : Durable = translate_terminus_durability(maps:get(durable, Source, none)), TranslatedFilter = translate_filters(Filter), Capabilities = translate_terminus_capabilities(maps:get(capabilities, Source, [])), - ct:log("make_source capabilities : ~p", [Capabilities]), + logger:debug("make_source capabilities : ~p", [Capabilities]), #'v1_0.source'{address = {utf8, Address}, durable = {uint, Durable}, filter = TranslatedFilter, @@ -721,7 +721,7 @@ make_target(#{role := {receiver, _Source, _Pid}}) -> make_target(#{role := {sender, #{address := Address} = Target}}) -> Durable = translate_terminus_durability(maps:get(durable, Target, none)), Capabilities = translate_terminus_capabilities(maps:get(capabilities, Target, [])), - ct:log("make_target capabilities : ~p", [Capabilities]), + logger:debug("make_target capabilities : ~p", [Capabilities]), TargetAddr = case is_binary(Address) of true -> {utf8, Address}; false -> Address From c3ac525615d468ea2777a527798869028faeec81 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Fri, 26 Jul 2024 17:11:46 +0200 Subject: [PATCH 06/35] Log with warning level instead --- deps/amqp10_client/src/amqp10_client_session.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 9752846b052e..9a09a60fdb8b 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -710,7 +710,7 @@ make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter : Durable = translate_terminus_durability(maps:get(durable, Source, none)), TranslatedFilter = translate_filters(Filter), Capabilities = translate_terminus_capabilities(maps:get(capabilities, Source, [])), - logger:debug("make_source capabilities : ~p", [Capabilities]), + logger:warning("make_source capabilities : ~p", [Capabilities]), #'v1_0.source'{address = {utf8, Address}, durable = {uint, Durable}, filter = TranslatedFilter, @@ -721,7 +721,7 @@ make_target(#{role := {receiver, _Source, _Pid}}) -> make_target(#{role := {sender, #{address := Address} = Target}}) -> Durable = translate_terminus_durability(maps:get(durable, Target, none)), Capabilities = translate_terminus_capabilities(maps:get(capabilities, Target, [])), - logger:debug("make_target capabilities : ~p", [Capabilities]), + logger:warning("make_target capabilities : ~p", [Capabilities]), TargetAddr = case is_binary(Address) of true -> {utf8, Address}; false -> Address From 1cf8b35e0a58a4b4e8c1515bc9b5f1bb83903c93 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Mon, 29 Jul 2024 12:28:04 +0200 Subject: [PATCH 07/35] Capture error and log it To help troublehshoot an issue originated from translate_terminus_capabilities --- .../src/amqp10_client_session.erl | 34 ++++++++++++------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 9a09a60fdb8b..069dd90bff68 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -709,26 +709,34 @@ make_source(#{role := {sender, _}}) -> make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter := Filter}) -> Durable = translate_terminus_durability(maps:get(durable, Source, none)), TranslatedFilter = translate_filters(Filter), - Capabilities = translate_terminus_capabilities(maps:get(capabilities, Source, [])), - logger:warning("make_source capabilities : ~p", [Capabilities]), - #'v1_0.source'{address = {utf8, Address}, + try translate_terminus_capabilities(maps:get(capabilities, Source, [])) of + Capabilities -> + logger:warning("make_source capabilities : ~p", [Capabilities]), + #'v1_0.source'{address = {utf8, Address}, durable = {uint, Durable}, filter = TranslatedFilter, - capabilities = Capabilities}. + capabilities = Capabilities} + catch + throw:Err -> {error, Err} + end. make_target(#{role := {receiver, _Source, _Pid}}) -> #'v1_0.target'{}; make_target(#{role := {sender, #{address := Address} = Target}}) -> Durable = translate_terminus_durability(maps:get(durable, Target, none)), - Capabilities = translate_terminus_capabilities(maps:get(capabilities, Target, [])), - logger:warning("make_target capabilities : ~p", [Capabilities]), - TargetAddr = case is_binary(Address) of - true -> {utf8, Address}; - false -> Address - end, - #'v1_0.target'{address = TargetAddr, - durable = {uint, Durable}, - capabilities = Capabilities}. + try translate_terminus_capabilities(maps:get(capabilities, Target, [])) of + Capabilities -> + logger:warning("make_target capabilities : ~p", [Capabilities]), + TargetAddr = case is_binary(Address) of + true -> {utf8, Address}; + false -> Address + end, + #'v1_0.target'{address = TargetAddr, + durable = {uint, Durable}, + capabilities = Capabilities} + catch + throw:Err -> {error, Err} + end. max_message_size(#{max_message_size := Size}) when is_integer(Size) andalso From f990c3ccb8eef063a644d31d241ba2c0acd53316 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Mon, 29 Jul 2024 12:36:58 +0200 Subject: [PATCH 08/35] Add missing log statement --- deps/amqp10_client/src/amqp10_client_session.erl | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 069dd90bff68..1e25b0e80c0c 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -716,8 +716,10 @@ make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter : durable = {uint, Durable}, filter = TranslatedFilter, capabilities = Capabilities} - catch - throw:Err -> {error, Err} + catch + throw:Err -> + logger:warning("make_source failed : ~p", [Err]), + {error, Err} end. make_target(#{role := {receiver, _Source, _Pid}}) -> @@ -735,7 +737,9 @@ make_target(#{role := {sender, #{address := Address} = Target}}) -> durable = {uint, Durable}, capabilities = Capabilities} catch - throw:Err -> {error, Err} + throw:Err -> + logger:warning("make_target failed : ~p", [Err]), + {error, Err} end. max_message_size(#{max_message_size := Size}) From 53dc39d0be2e95ed20f4cb75950c0f983ff6e432 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Mon, 29 Jul 2024 12:47:26 +0200 Subject: [PATCH 09/35] Change message --- deps/amqp10_client/src/amqp10_client_session.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 1e25b0e80c0c..a8fcee3a8b83 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -718,7 +718,7 @@ make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter : capabilities = Capabilities} catch throw:Err -> - logger:warning("make_source failed : ~p", [Err]), + logger:warning("make_source failed due to ~p", [Err]), {error, Err} end. @@ -738,7 +738,7 @@ make_target(#{role := {sender, #{address := Address} = Target}}) -> capabilities = Capabilities} catch throw:Err -> - logger:warning("make_target failed : ~p", [Err]), + logger:warning("make_target failed due to ~p", [Err]), {error, Err} end. From 09878778111859ef01ccaea1d1e7709b8b479097 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Mon, 29 Jul 2024 12:59:50 +0200 Subject: [PATCH 10/35] Add extra logging on attach --- deps/amqp10_client/src/amqp10_client_session.erl | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index a8fcee3a8b83..987287b93ef9 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -833,8 +833,11 @@ send_attach(Send, #{name := Name, role := RoleTuple} = Args, {FromPid, _}, #state{next_link_handle = OutHandle0, links = Links, link_index = LinkIndex} = State) -> + logger:warning("make_source ..."), Source = make_source(Args), + logger:warning("make_target ..."), Target = make_target(Args), + logger:warning("make properties ..."), Properties = amqp10_client_types:make_properties(Args), {LinkTarget, InitialDeliveryCount, MaxMessageSize} = @@ -866,7 +869,9 @@ send_attach(Send, #{name := Name, role := RoleTuple} = Args, {FromPid, _}, target = Target, max_message_size = MaxMessageSize}, + logger:warning("sending ..."), ok = Send(Attach, State), + logger:warning("sent ..."), Ref = make_link_ref(Role, self(), OutHandle), Link = #link{name = Name, From 04ef8cf75841b1a558f6a1a12d723a93a66fa8e1 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Mon, 29 Jul 2024 13:10:22 +0200 Subject: [PATCH 11/35] Add extra logging --- deps/amqp10_client/src/amqp10_client_session.erl | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 987287b93ef9..2b329d0528e1 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -191,7 +191,10 @@ begin_sync(Connection, Timeout) -> -spec attach(pid(), attach_args()) -> {ok, link_ref()}. attach(Session, Args) -> - gen_statem:call(Session, {attach, Args}, ?TIMEOUT). + logger:warning("amqp10_session: call_session attach ~p", [Args]), + Ret = gen_statem:call(Session, {attach, Args}, ?TIMEOUT), + logger:warning("amqp10_session: call_session attach returned ~p", [Ret]), + Ret. -spec detach(pid(), output_handle()) -> ok | {error, link_not_found | half_attached}. detach(Session, Handle) -> @@ -540,7 +543,11 @@ mapped({call, From}, {keep_state, State, {reply, From, Res}}; mapped({call, From}, {attach, Attach}, State) -> + logger:warning("amqp10_session: call attach ~p", + [Attach]), {State1, LinkRef} = send_attach(fun send/2, Attach, From, State), + logger:warning("amqp10_session: returned call attach ~p", + [Attach]), {keep_state, State1, {reply, From, {ok, LinkRef}}}; mapped({call, From}, Msg, State) -> From 2e1b8f74ff7a21341eac0a04ca12dea597475107 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Mon, 29 Jul 2024 14:05:12 +0200 Subject: [PATCH 12/35] Add extra logging --- deps/amqp10_client/src/amqp10_client_session.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 2b329d0528e1..b09aa54ef959 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -262,7 +262,7 @@ begin_sent(cast, #'v1_0.begin'{remote_channel = {ushort, RemoteChannel}, incoming_window = {uint, InWindow}, outgoing_window = {uint, OutWindow}} = Begin, #state{early_attach_requests = EARs} = State) -> - + logger:warning("begin_sent send-attach ~tp", [Attach]), State1 = State#state{remote_channel = RemoteChannel}, State2 = lists:foldr(fun({From, Attach}, S) -> {S2, H} = send_attach(fun send/2, Attach, From, S), @@ -543,10 +543,10 @@ mapped({call, From}, {keep_state, State, {reply, From, Res}}; mapped({call, From}, {attach, Attach}, State) -> - logger:warning("amqp10_session: call attach ~p", + logger:warning("amqp10_session: mapped send_attach ~p", [Attach]), {State1, LinkRef} = send_attach(fun send/2, Attach, From, State), - logger:warning("amqp10_session: returned call attach ~p", + logger:warning("amqp10_session: mapped returned call attach ~p", [Attach]), {keep_state, State1, {reply, From, {ok, LinkRef}}}; From 09f95b369b45bc9203a06eb7ad7fc86cf93e0991 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Mon, 29 Jul 2024 14:09:12 +0200 Subject: [PATCH 13/35] Fix issue --- deps/amqp10_client/src/amqp10_client_session.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index b09aa54ef959..f38c7b7ba146 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -262,9 +262,10 @@ begin_sent(cast, #'v1_0.begin'{remote_channel = {ushort, RemoteChannel}, incoming_window = {uint, InWindow}, outgoing_window = {uint, OutWindow}} = Begin, #state{early_attach_requests = EARs} = State) -> - logger:warning("begin_sent send-attach ~tp", [Attach]), + logger:warning("begin_sent send-attach "), State1 = State#state{remote_channel = RemoteChannel}, State2 = lists:foldr(fun({From, Attach}, S) -> + logger:warning("begin_sent send-attach ~tp", [Attach]), {S2, H} = send_attach(fun send/2, Attach, From, S), gen_statem:reply(From, {ok, H}), S2 From 19e18ce77f9d54b212817c1b46d59ea187b91708 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Mon, 29 Jul 2024 15:14:49 +0200 Subject: [PATCH 14/35] Modified log statement --- deps/amqp10_client/src/amqp10_client_session.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index f38c7b7ba146..69a005177505 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -262,7 +262,7 @@ begin_sent(cast, #'v1_0.begin'{remote_channel = {ushort, RemoteChannel}, incoming_window = {uint, InWindow}, outgoing_window = {uint, OutWindow}} = Begin, #state{early_attach_requests = EARs} = State) -> - logger:warning("begin_sent send-attach "), + logger:warning("begin_sent send-attach "), State1 = State#state{remote_channel = RemoteChannel}, State2 = lists:foldr(fun({From, Attach}, S) -> logger:warning("begin_sent send-attach ~tp", [Attach]), From b390138dea7e02a0759f22635ee2c9d4d8681ee4 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Mon, 29 Jul 2024 15:40:00 +0200 Subject: [PATCH 15/35] REvert last change --- deps/amqp10_client/src/amqp10_client_session.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 69a005177505..f38c7b7ba146 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -262,7 +262,7 @@ begin_sent(cast, #'v1_0.begin'{remote_channel = {ushort, RemoteChannel}, incoming_window = {uint, InWindow}, outgoing_window = {uint, OutWindow}} = Begin, #state{early_attach_requests = EARs} = State) -> - logger:warning("begin_sent send-attach "), + logger:warning("begin_sent send-attach "), State1 = State#state{remote_channel = RemoteChannel}, State2 = lists:foldr(fun({From, Attach}, S) -> logger:warning("begin_sent send-attach ~tp", [Attach]), From 6bccbd66271dc0e0a152924f08690b759a747154 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Mon, 29 Jul 2024 16:20:35 +0200 Subject: [PATCH 16/35] Log return from send_attach --- deps/amqp10_client/src/amqp10_client_session.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index f38c7b7ba146..bfcc65df127b 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -267,6 +267,7 @@ begin_sent(cast, #'v1_0.begin'{remote_channel = {ushort, RemoteChannel}, State2 = lists:foldr(fun({From, Attach}, S) -> logger:warning("begin_sent send-attach ~tp", [Attach]), {S2, H} = send_attach(fun send/2, Attach, From, S), + logger:warning("begin_sent send-attach ~tp returning ~p", [Attach, H]), gen_statem:reply(From, {ok, H}), S2 end, State1, EARs), From bb5045f397be5172c00369f5bd613f7bf66d9e25 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Mon, 29 Jul 2024 16:32:36 +0200 Subject: [PATCH 17/35] Test capabilities with activemq --- deps/amqp10_client/test/system_SUITE.erl | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl index ec708850a1e8..7ad1d15a0fc9 100644 --- a/deps/amqp10_client/test/system_SUITE.erl +++ b/deps/amqp10_client/test/system_SUITE.erl @@ -67,6 +67,10 @@ groups() -> ]} ]. +test() -> + [ + basic_roundtrip_ibmmq + ]. shared() -> [ open_close_connection, From 351038a5dd28acd2083335ff964e046439bd100d Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Mon, 29 Jul 2024 16:44:35 +0200 Subject: [PATCH 18/35] Log around sending frame --- deps/amqp10_client/src/amqp10_client_session.erl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index bfcc65df127b..c897542c9b09 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -249,6 +249,7 @@ init([FromPid, Channel, Reader, ConnConfig]) -> {ok, unmapped, State}. unmapped(cast, {socket_ready, Socket}, State) -> + logger:warning("unmapped socket_ready calling send_begin"), State1 = State#state{socket = Socket}, ok = send_begin(State1), {next_state, begin_sent, State1}; @@ -591,8 +592,12 @@ send_begin(#state{socket = Socket, Begin = #'v1_0.begin'{next_outgoing_id = uint(NextOutId), incoming_window = uint(InWin), outgoing_window = ?UINT_OUTGOING_WINDOW}, + logger:warning("send_begin encode_frame ..."), Frame = encode_frame(Begin, State), - socket_send(Socket, Frame). + logger:warning("send_begin encoded frame ~p", [Frame]), + Ret = socket_send(Socket, Frame), + logger:warning("socket_send ~p", [Ret]), + Ret. send_end(State) -> send_end(State, undefined). From 572579709259b2318d5da668ece40c15a36199df Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Mon, 29 Jul 2024 17:07:25 +0200 Subject: [PATCH 19/35] Log eveny operation related to stop event --- deps/amqp10_client/src/amqp10_client_connection.erl | 9 ++++++++- deps/amqp10_client/src/amqp10_client_frame_reader.erl | 4 ++++ deps/amqp10_client/src/amqp10_client_session.erl | 3 ++- 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client_connection.erl b/deps/amqp10_client/src/amqp10_client_connection.erl index 8fbcb22f3d1b..5fabb6f36ad3 100644 --- a/deps/amqp10_client/src/amqp10_client_connection.erl +++ b/deps/amqp10_client/src/amqp10_client_connection.erl @@ -241,7 +241,9 @@ sasl_init_sent({call, From}, begin_session, hdr_sent(_EvtType, {protocol_header_received, 0, 1, 0, 0}, State) -> case send_open(State) of ok -> {next_state, open_sent, State}; - Error -> {stop, Error, State} + Error -> + logger:warning("client_connection hdr_sent ~p", [Error]), + {stop, Error, State} end; hdr_sent(_EvtType, {protocol_header_received, Protocol, Maj, Min, Rev}, State) -> @@ -287,6 +289,7 @@ open_sent({call, From}, begin_session, {keep_state, State1}; open_sent(info, {'DOWN', MRef, _, _, _}, #state{reader_m_ref = MRef}) -> + logger:warning("client_connection open_sent info(Down reader_down"), {stop, {shutdown, reader_down}}. opened(_EvtType, heartbeat, State = #state{idle_time_out = T}) -> @@ -313,6 +316,7 @@ opened({call, From}, begin_session, State) -> opened(info, {'DOWN', MRef, _, _, _Info}, State = #state{reader_m_ref = MRef, config = Config}) -> %% reader has gone down and we are not already shutting down + logger:warning("client_connection opened(info DOWN"), ok = notify_closed(Config, shutdown), {stop, normal, State}; opened(_EvtType, Frame, State) -> @@ -328,11 +332,13 @@ close_sent(_EvtType, {'EXIT', _Pid, shutdown}, State) -> close_sent(_EvtType, {'DOWN', _Ref, process, ReaderPid, _}, #state{reader = ReaderPid} = State) -> %% if the reader exits we probably wont receive a close frame + logger:warning("client_connection close_sent( DOWN"), {stop, normal, State}; close_sent(_EvtType, #'v1_0.close'{} = Close, State = #state{config = Config}) -> ok = notify_closed(Config, Close), %% TODO: we should probably set up a timer before this to ensure %% we close down event if no reply is received + logger:warning("client_connection close_sent( v1_0.close"), {stop, normal, State}. set_other_procs0(OtherProcs, State) -> @@ -450,6 +456,7 @@ send_close(#state{socket = Socket}, _Reason) -> ok; _ -> ok end, + logger:warning("client_connetion send_close Ret: ~p", [Ret]), Ret. send_sasl_init(State, anon) -> diff --git a/deps/amqp10_client/src/amqp10_client_frame_reader.erl b/deps/amqp10_client/src/amqp10_client_frame_reader.erl index 364748b16c85..fe314092e3ca 100644 --- a/deps/amqp10_client/src/amqp10_client_frame_reader.erl +++ b/deps/amqp10_client/src/amqp10_client_frame_reader.erl @@ -152,6 +152,7 @@ handle_event(cast, {unregister_session, _Session, OutgoingChannel, IncomingChann incoming_channels = IncomingChannels1}, {keep_state, State1}; handle_event(cast, close, _StateName, State = #state{socket = Socket}) -> + logger:warning("frame_reader handle_event cast close"), _ = close_socket(Socket), {stop, normal, State#state{socket = undefined}}; @@ -167,11 +168,13 @@ handle_event(info, {Tcp, _, Packet}, StateName, #state{buffer = Buffer} = State) set_active_once(NewState), {next_state, NextState, NewState#state{buffer = Remaining}}; {error, Reason, NewState} -> + logger:warning("frame_reader info handle_input error ~p", [Reason]), {stop, Reason, NewState} end; handle_event(info, {TcpError, _, Reason}, StateName, State) when TcpError == tcp_error orelse TcpError == ssl_error -> + logger:warning("frame_reader handle_event info TcpError: ~p", [TcpError]), logger:warning("AMQP 1.0 connection socket errored, connection state: '~ts', reason: '~tp'", [StateName, Reason]), State1 = State#state{socket = undefined, @@ -180,6 +183,7 @@ handle_event(info, {TcpError, _, Reason}, StateName, State) {stop, {error, Reason}, State1}; handle_event(info, {TcpClosed, _}, StateName, State) when TcpClosed == tcp_closed orelse TcpClosed == ssl_closed -> + logger:warning("frame_reader handle_event info TcpClosed: ~p", [TcpClosed]), logger:warning("AMQP 1.0 connection socket was closed, connection state: '~ts'", [StateName]), State1 = State#state{socket = undefined, diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index c897542c9b09..9ed48af27c50 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -1218,7 +1218,8 @@ amqp10_session_event(Evt) -> socket_send(Sock, Data) -> case socket_send0(Sock, Data) of ok -> ok; - {error, _Reason} -> + {error, Reason} -> + logger:warning("socket_send ~p", [Reason]), throw({stop, normal}) end. From 82c1e1fc352b11be05e66cc2666f58100e25a5dd Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Mon, 29 Jul 2024 17:29:29 +0200 Subject: [PATCH 20/35] Test capabilities with activemq and ibmmq --- deps/amqp10_client/test/system_SUITE.erl | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl index 7ad1d15a0fc9..8384dbd5377d 100644 --- a/deps/amqp10_client/test/system_SUITE.erl +++ b/deps/amqp10_client/test/system_SUITE.erl @@ -67,14 +67,11 @@ groups() -> ]} ]. -test() -> - [ - basic_roundtrip_ibmmq - ]. shared() -> [ open_close_connection, basic_roundtrip, + basic_roundtrip_ibmmq, early_transfer, split_transfer, transfer_unsettled, From 92d390f003fb31487e31e11f5166e19a5888d1fc Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Tue, 30 Jul 2024 13:00:20 +0200 Subject: [PATCH 21/35] Sleep for 500msec before considering ibmmq running --- .../src/amqp10_client_session.erl | 15 +++++------ deps/amqp10_client/test/ibmmq_ct_helpers.erl | 3 ++- deps/amqp10_client/test/system_SUITE.erl | 26 +++++-------------- 3 files changed, 15 insertions(+), 29 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 9ed48af27c50..12f90af6d4bd 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -255,6 +255,7 @@ unmapped(cast, {socket_ready, Socket}, State) -> {next_state, begin_sent, State1}; unmapped({call, From}, {attach, Attach}, #state{early_attach_requests = EARs} = State) -> + logger:warning("on unmapped state, received call attach. Storing it in early_attach_requests"), {keep_state, State#state{early_attach_requests = [{From, Attach} | EARs]}}. @@ -263,12 +264,13 @@ begin_sent(cast, #'v1_0.begin'{remote_channel = {ushort, RemoteChannel}, incoming_window = {uint, InWindow}, outgoing_window = {uint, OutWindow}} = Begin, #state{early_attach_requests = EARs} = State) -> - logger:warning("begin_sent send-attach "), + logger:warning("on state begin_sent, received v1_0.beging with remote channlel : ~p", [RemoteChannel]), State1 = State#state{remote_channel = RemoteChannel}, + logger:warning("sending early attach requests ~p", [EARs]), State2 = lists:foldr(fun({From, Attach}, S) -> - logger:warning("begin_sent send-attach ~tp", [Attach]), + logger:warning("send early attach request ~tp", [Attach]), {S2, H} = send_attach(fun send/2, Attach, From, S), - logger:warning("begin_sent send-attach ~tp returning ~p", [Attach, H]), + logger:warning("sent attach request ~p with result ~p", [Attach, H]), gen_statem:reply(From, {ok, H}), S2 end, State1, EARs), @@ -313,7 +315,7 @@ mapped(cast, #'v1_0.attach'{name = {utf8, Name}, max_message_size = MaybeMaxMessageSize} = Attach, #state{links = Links, link_index = LinkIndex, link_handle_index = LHI} = State0) -> - + logger:warning("on mapped state, received v1_0.attach frame"), OurRoleBool = not PeerRoleBool, OurRole = boolean_to_role(OurRoleBool), LinkIndexKey = {OurRole, Name}, @@ -546,11 +548,8 @@ mapped({call, From}, {keep_state, State, {reply, From, Res}}; mapped({call, From}, {attach, Attach}, State) -> - logger:warning("amqp10_session: mapped send_attach ~p", - [Attach]), + logger:warning("on mapped state, received call to attach ~p", [Attach]), {State1, LinkRef} = send_attach(fun send/2, Attach, From, State), - logger:warning("amqp10_session: mapped returned call attach ~p", - [Attach]), {keep_state, State1, {reply, From, {ok, LinkRef}}}; mapped({call, From}, Msg, State) -> diff --git a/deps/amqp10_client/test/ibmmq_ct_helpers.erl b/deps/amqp10_client/test/ibmmq_ct_helpers.erl index aad76b3ce3df..551c2c7b79ed 100644 --- a/deps/amqp10_client/test/ibmmq_ct_helpers.erl +++ b/deps/amqp10_client/test/ibmmq_ct_helpers.erl @@ -45,7 +45,8 @@ start_ibmmq_server(Config) -> wait_for_ibmmq_nodes(Config) -> Hostname = ?config(rmq_hostname, Config), Ports = rabbit_ct_broker_helpers:get_node_configs(Config, tcp_port_amqp), - wait_for_ibmmq_ports(Config, Hostname, Ports). + wait_for_ibmmq_ports(Config, Hostname, Ports), + timer:sleep(500). wait_for_ibmmq_ports(Config, Hostname, [Port | Rest]) -> ct:log("Waiting for IBM MQ on port ~b", [Port]), diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl index 8384dbd5377d..33155c448330 100644 --- a/deps/amqp10_client/test/system_SUITE.erl +++ b/deps/amqp10_client/test/system_SUITE.erl @@ -35,9 +35,8 @@ groups() -> {activemq, [], shared()}, {ibmmq, [], [ open_close_connection, - basic_roundtrip_ibmmq, - basic_roundtrip_with_several_capabilities_ibmmq, - basic_roundtrip_with_non_binary_capability_is_ignored + basic_roundtrip_with_sender_and_receiver_capabilities, + basic_roundtrip_with_non_binary_capability ]}, {rabbitmq_strict, [], [ basic_roundtrip_tls, @@ -71,7 +70,8 @@ shared() -> [ open_close_connection, basic_roundtrip, - basic_roundtrip_ibmmq, + basic_roundtrip_with_sender_and_receiver_capabilities, + basic_roundtrip_with_non_binary_capability, early_transfer, split_transfer, transfer_unsettled, @@ -352,7 +352,7 @@ roundtrip_large_messages(Config) -> ok = roundtrip(OpenConf, [{body, Data8Mb}]), ok = roundtrip(OpenConf, [{body, Data64Mb}]). -basic_roundtrip_ibmmq(Config) -> +basic_roundtrip_with_sender_and_receiver_capabilities(Config) -> application:start(sasl), Hostname = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), @@ -365,21 +365,7 @@ basic_roundtrip_ibmmq(Config) -> {message_annotations, #{}} ], [creation_time]). - -basic_roundtrip_with_several_capabilities_ibmmq(Config) -> - application:start(sasl), - Hostname = ?config(rmq_hostname, Config), - Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), - OpenConf = #{address => Hostname, port => Port, sasl => ?config(sasl, Config)}, - roundtrip(OpenConf, [ - {body, <<"banana">>}, - {destination, <<"DEV.QUEUE.3">>}, - {sender_capabilities, [<<"queue">>, <<"tx-capabilities">>, dummy]}, - {receiver_capabilities, <<"queue">>}, - {message_annotations, #{}} - ], [creation_time]). - -basic_roundtrip_with_non_binary_capability_is_ignored(Config) -> +basic_roundtrip_with_non_binary_capability(Config) -> application:start(sasl), Hostname = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), From 8986c5cc591da164b834bff359eb35fd3e7c5125 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Tue, 30 Jul 2024 13:10:43 +0200 Subject: [PATCH 22/35] Fix issue in the ibmmq helper --- deps/amqp10_client/test/ibmmq_ct_helpers.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/deps/amqp10_client/test/ibmmq_ct_helpers.erl b/deps/amqp10_client/test/ibmmq_ct_helpers.erl index 551c2c7b79ed..fe568a840b41 100644 --- a/deps/amqp10_client/test/ibmmq_ct_helpers.erl +++ b/deps/amqp10_client/test/ibmmq_ct_helpers.erl @@ -46,7 +46,8 @@ wait_for_ibmmq_nodes(Config) -> Hostname = ?config(rmq_hostname, Config), Ports = rabbit_ct_broker_helpers:get_node_configs(Config, tcp_port_amqp), wait_for_ibmmq_ports(Config, Hostname, Ports), - timer:sleep(500). + timer:sleep(500), + Config. wait_for_ibmmq_ports(Config, Hostname, [Port | Rest]) -> ct:log("Waiting for IBM MQ on port ~b", [Port]), From dcbad6288ee19d5e452b5ea1e52e5f31249962b9 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Tue, 30 Jul 2024 13:26:13 +0200 Subject: [PATCH 23/35] Add extra logging around connection states --- deps/amqp10_client/src/amqp10_client_connection.erl | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client_connection.erl b/deps/amqp10_client/src/amqp10_client_connection.erl index 5fabb6f36ad3..5fae0c1d5a90 100644 --- a/deps/amqp10_client/src/amqp10_client_connection.erl +++ b/deps/amqp10_client/src/amqp10_client_connection.erl @@ -239,6 +239,7 @@ sasl_init_sent({call, From}, begin_session, {keep_state, State1}. hdr_sent(_EvtType, {protocol_header_received, 0, 1, 0, 0}, State) -> + logger:warning("hdr_sent received {protocol_header_received"), case send_open(State) of ok -> {next_state, open_sent, State}; Error -> @@ -252,6 +253,7 @@ hdr_sent(_EvtType, {protocol_header_received, Protocol, Maj, Min, {stop, normal, State}; hdr_sent({call, From}, begin_session, #state{pending_session_reqs = PendingSessionReqs} = State) -> + logger:warning("hdr_sent received call begin_session"), State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]}, {keep_state, State1}. @@ -259,6 +261,7 @@ open_sent(_EvtType, #'v1_0.open'{max_frame_size = MaybeMaxFrameSize, idle_time_out = Timeout} = Open, #state{pending_session_reqs = PendingSessionReqs, config = Config} = State0) -> + logger:warning("open_sent received 'v1_0.open' with pending_session_reqs: ~p", [PendingSessionReqs]), State = case Timeout of undefined -> State0; {uint, T} when T > 0 -> @@ -285,14 +288,16 @@ open_sent(_EvtType, #'v1_0.open'{max_frame_size = MaybeMaxFrameSize, {next_state, opened, State2#state{pending_session_reqs = []}}; open_sent({call, From}, begin_session, #state{pending_session_reqs = PendingSessionReqs} = State) -> + logger:warning("open_sent received call begin_session with pending_session_reqs: ~p", [PendingSessionReqs]), State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]}, {keep_state, State1}; open_sent(info, {'DOWN', MRef, _, _, _}, #state{reader_m_ref = MRef}) -> - logger:warning("client_connection open_sent info(Down reader_down"), + logger:warning("open_sent received 'DOWN"), {stop, {shutdown, reader_down}}. opened(_EvtType, heartbeat, State = #state{idle_time_out = T}) -> + logger:warning("opened received heartbeat"), ok = send_heartbeat(State), {ok, Tmr} = start_heartbeat_timer(T), {keep_state, State#state{heartbeat_timer = Tmr}}; @@ -311,12 +316,14 @@ opened(_EvtType, #'v1_0.close'{} = Close, State = #state{config = Config}) -> _ = send_close(State, none), {stop, normal, State}; opened({call, From}, begin_session, State) -> + logger:warning("opened received call being_session"), {Ret, State1} = handle_begin_session(From, State), + logger:warning("handle_begin_session ret: ~p", [Ret]), {keep_state, State1, [{reply, From, Ret}]}; opened(info, {'DOWN', MRef, _, _, _Info}, State = #state{reader_m_ref = MRef, config = Config}) -> %% reader has gone down and we are not already shutting down - logger:warning("client_connection opened(info DOWN"), + logger:warning("opened info received 'DOWN'"), ok = notify_closed(Config, shutdown), {stop, normal, State}; opened(_EvtType, Frame, State) -> From 2d55f8fa90ea9dc0aedc2391b0a8749bc1a379fe Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Tue, 30 Jul 2024 13:40:34 +0200 Subject: [PATCH 24/35] Try to send capability in an array without any non-binary --- deps/amqp10_client/test/system_SUITE.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl index 33155c448330..3fd7f661245f 100644 --- a/deps/amqp10_client/test/system_SUITE.erl +++ b/deps/amqp10_client/test/system_SUITE.erl @@ -373,7 +373,8 @@ basic_roundtrip_with_non_binary_capability(Config) -> roundtrip(OpenConf, [ {body, <<"banana">>}, {destination, <<"DEV.QUEUE.3">>}, - {sender_capabilities, [<<"queue">>, dummy]}, + %{sender_capabilities, [<<"queue">>, dummy]}, + {sender_capabilities, [<<"queue">>]}, {receiver_capabilities, <<"queue">>}, {message_annotations, #{}} ], [creation_time]). From e79c07b02c958f5f7948c92c0b9f0eaffcb17666 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Tue, 30 Jul 2024 13:59:58 +0200 Subject: [PATCH 25/35] Skip test which sends illegal capability --- deps/amqp10_client/test/system_SUITE.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl index 3fd7f661245f..63d463a13b7e 100644 --- a/deps/amqp10_client/test/system_SUITE.erl +++ b/deps/amqp10_client/test/system_SUITE.erl @@ -35,8 +35,8 @@ groups() -> {activemq, [], shared()}, {ibmmq, [], [ open_close_connection, - basic_roundtrip_with_sender_and_receiver_capabilities, - basic_roundtrip_with_non_binary_capability + basic_roundtrip_with_sender_and_receiver_capabilities + % basic_roundtrip_with_non_binary_capability ]}, {rabbitmq_strict, [], [ basic_roundtrip_tls, From 7779984198622f0f5131b5936f956412d3e26486 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Tue, 30 Jul 2024 15:32:32 +0200 Subject: [PATCH 26/35] Try a bigger delay before starting to use ibm mq --- deps/amqp10_client/test/ibmmq_ct_helpers.erl | 4 +--- deps/amqp10_client/test/system_SUITE_data/ibmmq_runner | 3 ++- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/deps/amqp10_client/test/ibmmq_ct_helpers.erl b/deps/amqp10_client/test/ibmmq_ct_helpers.erl index fe568a840b41..aad76b3ce3df 100644 --- a/deps/amqp10_client/test/ibmmq_ct_helpers.erl +++ b/deps/amqp10_client/test/ibmmq_ct_helpers.erl @@ -45,9 +45,7 @@ start_ibmmq_server(Config) -> wait_for_ibmmq_nodes(Config) -> Hostname = ?config(rmq_hostname, Config), Ports = rabbit_ct_broker_helpers:get_node_configs(Config, tcp_port_amqp), - wait_for_ibmmq_ports(Config, Hostname, Ports), - timer:sleep(500), - Config. + wait_for_ibmmq_ports(Config, Hostname, Ports). wait_for_ibmmq_ports(Config, Hostname, [Port | Rest]) -> ct:log("Waiting for IBM MQ on port ~b", [Port]), diff --git a/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner b/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner index 98c84f6d7a29..c0d97d6962d1 100755 --- a/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner +++ b/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner @@ -73,7 +73,8 @@ invoke_start(){ docker exec ibmmq bash -c 'echo "START SERVICE(SYSTEM.AMQP.SERVICE)" | /opt/mqm/bin/runmqsc QM1' docker exec ibmmq bash -c 'echo "START CHANNEL(SYSTEM.DEF.AMQP)" | /opt/mqm/bin/runmqsc QM1' wait_for_message ibmmq "The Server 'SYSTEM.AMQP.SERVICE' has started" - sleep 5 + sleep 10 + print "Waited 10 seconds for container to start" } invoke_stop(){ From 2cdbebd8a44b69e338f2138f55ad1e54b8c2ca83 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Tue, 30 Jul 2024 15:46:22 +0200 Subject: [PATCH 27/35] Test with non-binary capabilities --- deps/amqp10_client/test/system_SUITE.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl index 63d463a13b7e..3fd7f661245f 100644 --- a/deps/amqp10_client/test/system_SUITE.erl +++ b/deps/amqp10_client/test/system_SUITE.erl @@ -35,8 +35,8 @@ groups() -> {activemq, [], shared()}, {ibmmq, [], [ open_close_connection, - basic_roundtrip_with_sender_and_receiver_capabilities - % basic_roundtrip_with_non_binary_capability + basic_roundtrip_with_sender_and_receiver_capabilities, + basic_roundtrip_with_non_binary_capability ]}, {rabbitmq_strict, [], [ basic_roundtrip_tls, From b22807841fdc0c20da7acd4b1b50699b9189be96 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Tue, 30 Jul 2024 16:02:02 +0200 Subject: [PATCH 28/35] Wait for connection and session events --- deps/amqp10_client/test/system_SUITE.erl | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl index 3fd7f661245f..b67bb2c938cd 100644 --- a/deps/amqp10_client/test/system_SUITE.erl +++ b/deps/amqp10_client/test/system_SUITE.erl @@ -395,7 +395,15 @@ roundtrip(OpenConf, Args, DoNotAssertMessageProperties) -> <<"x_key">> => <<"x_value">>}), {ok, Connection} = amqp10_client:open_connection(OpenConf), + receive + {amqp10_event, {connection, Connection, opened}} -> ok + after 5000 -> exit(connection_timeout) + end, {ok, Session} = amqp10_client:begin_session(Connection), + receive + {amqp10_event, {session, Session, begun}} -> ok + after 5000 -> exit(connection_timeout) + end, SenderAttachArgs = #{name => <<"banana-sender">>, role => {sender, #{address => Destination, durable => unsettled_state, From 3dd1cf6521de67374e0ba2c20ccf79abcf8045d3 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Thu, 1 Aug 2024 13:37:29 +0200 Subject: [PATCH 29/35] Log connection related events added capture_logs function to the ibmmq_helper but not called until I figure out how to add extra logging. So far the only auth events captured are those originated from the admin ui --- .../src/amqp10_client_connection.erl | 21 ++++++++++++------- deps/amqp10_client/test/ibmmq_ct_helpers.erl | 13 +++++++++++- .../test/system_SUITE_data/ibmmq_runner | 8 ++++++- 3 files changed, 33 insertions(+), 9 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client_connection.erl b/deps/amqp10_client/src/amqp10_client_connection.erl index 5fae0c1d5a90..438ec4f4ed17 100644 --- a/deps/amqp10_client/src/amqp10_client_connection.erl +++ b/deps/amqp10_client/src/amqp10_client_connection.erl @@ -228,13 +228,17 @@ sasl_hdr_rcvds({call, From}, begin_session, sasl_init_sent(_EvtType, #'v1_0.sasl_outcome'{code = {ubyte, 0}}, #state{socket = Socket} = State) -> + logger:warning("sasl_init_sent "), ok = socket_send(Socket, ?AMQP_PROTOCOL_HEADER), + logger:warning("sasl_init_sent socket_send AMQP_PROTOCOL_HEADER ok"), {next_state, hdr_sent, State}; sasl_init_sent(_EvtType, #'v1_0.sasl_outcome'{code = {ubyte, C}}, #state{} = State) when C==1;C==2;C==3;C==4 -> + logger:warning("sasl_init_sent sasl_auth_failure"), {stop, sasl_auth_failure, State}; sasl_init_sent({call, From}, begin_session, #state{pending_session_reqs = PendingSessionReqs} = State) -> + logger:warning("sasl_init_sent call to begin_session"), State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]}, {keep_state, State1}. @@ -243,7 +247,7 @@ hdr_sent(_EvtType, {protocol_header_received, 0, 1, 0, 0}, State) -> case send_open(State) of ok -> {next_state, open_sent, State}; Error -> - logger:warning("client_connection hdr_sent ~p", [Error]), + logger:warning("client_connection hdr_sent ~p", [Error]), {stop, Error, State} end; hdr_sent(_EvtType, {protocol_header_received, Protocol, Maj, Min, @@ -253,7 +257,7 @@ hdr_sent(_EvtType, {protocol_header_received, Protocol, Maj, Min, {stop, normal, State}; hdr_sent({call, From}, begin_session, #state{pending_session_reqs = PendingSessionReqs} = State) -> - logger:warning("hdr_sent received call begin_session"), + logger:warning("hdr_sent received call begin_session"), State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]}, {keep_state, State1}. @@ -261,7 +265,7 @@ open_sent(_EvtType, #'v1_0.open'{max_frame_size = MaybeMaxFrameSize, idle_time_out = Timeout} = Open, #state{pending_session_reqs = PendingSessionReqs, config = Config} = State0) -> - logger:warning("open_sent received 'v1_0.open' with pending_session_reqs: ~p", [PendingSessionReqs]), + logger:warning("open_sent received 'v1_0.open' with pending_session_reqs: ~p", [PendingSessionReqs]), State = case Timeout of undefined -> State0; {uint, T} when T > 0 -> @@ -288,16 +292,16 @@ open_sent(_EvtType, #'v1_0.open'{max_frame_size = MaybeMaxFrameSize, {next_state, opened, State2#state{pending_session_reqs = []}}; open_sent({call, From}, begin_session, #state{pending_session_reqs = PendingSessionReqs} = State) -> - logger:warning("open_sent received call begin_session with pending_session_reqs: ~p", [PendingSessionReqs]), + logger:warning("open_sent received call begin_session with pending_session_reqs: ~p", [PendingSessionReqs]), State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]}, {keep_state, State1}; open_sent(info, {'DOWN', MRef, _, _, _}, #state{reader_m_ref = MRef}) -> - logger:warning("open_sent received 'DOWN"), + logger:warning("open_sent received 'DOWN"), {stop, {shutdown, reader_down}}. opened(_EvtType, heartbeat, State = #state{idle_time_out = T}) -> - logger:warning("opened received heartbeat"), + logger:warning("opened received heartbeat"), ok = send_heartbeat(State), {ok, Tmr} = start_heartbeat_timer(T), {keep_state, State#state{heartbeat_timer = Tmr}}; @@ -482,7 +486,10 @@ send_sasl_init(State, {plain, User, Pass}) -> Response = <<0:8, User/binary, 0:8, Pass/binary>>, Frame = #'v1_0.sasl_init'{mechanism = {symbol, <<"PLAIN">>}, initial_response = {binary, Response}}, - send(Frame, 1, State). + logger:warning("send_sasl_init ~p ~p", [User, Pass]), + Ret = send(Frame, 1, State), + logger:warning("send_sasl_init ~p ~p Ret : ~p", [User, Pass, Ret]), + Ret. send(Record, FrameType, #state{socket = Socket}) -> Encoded = amqp10_framing:encode_bin(Record), diff --git a/deps/amqp10_client/test/ibmmq_ct_helpers.erl b/deps/amqp10_client/test/ibmmq_ct_helpers.erl index aad76b3ce3df..87b39c9d2f61 100644 --- a/deps/amqp10_client/test/ibmmq_ct_helpers.erl +++ b/deps/amqp10_client/test/ibmmq_ct_helpers.erl @@ -12,6 +12,7 @@ -export([setup_steps/0, teardown_steps/0, init_config/1, + capture_logs/1, start_ibmmq_server/1, stop_ibmmq_server/1]). @@ -30,7 +31,7 @@ init_config(Config) -> rabbit_ct_helpers:set_config(Config, [ {rmq_nodes, [NodeConfig]}, {rmq_hostname, "localhost"}, {tcp_hostname_amqp, "localhost"}, - {sasl, {plain, <<"app">>, <<"passw0rd">>}} ]). + {sasl, {plain, <<"app">>, <<"passw10rd">>}} ]). start_ibmmq_server(Config) -> IBMmqCmd = filename:join([?config(data_dir, Config), "ibmmq_runner"]), @@ -78,6 +79,16 @@ wait_for_ibmmq_port(Hostname, Port, Retries) -> Error end. +capture_logs(Config) -> + IBMmqCmd = filename:join([?config(data_dir, Config), "ibmmq_runner"]), + Cmd = [IBMmqCmd, "logs", "/tmp/ibmmq.log"], + ct:log("Running command ~p", [Cmd]), + case rabbit_ct_helpers:exec(Cmd, []) of + {ok, _} -> Config; + Error -> ct:pal("Error: ~tp", [Error]), + {skip, "Failed to stop IBM MQ"} + end. + stop_ibmmq_server(Config) -> IBMmqCmd = filename:join([?config(data_dir, Config), "ibmmq_runner"]), Cmd = [IBMmqCmd, "stop"], diff --git a/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner b/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner index c0d97d6962d1..cfe358381d2c 100755 --- a/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner +++ b/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner @@ -76,7 +76,10 @@ invoke_start(){ sleep 10 print "Waited 10 seconds for container to start" } - +capture_logs() { + print "Capturing ibmmq logs to $1" + docker logs ibmmq > $1 +} invoke_stop(){ kill_container_if_exist ibmmq } @@ -88,6 +91,9 @@ case "$1" in build) build_docker_image ;; + logs) + capture_logs "$2" + ;; start) invoke_start ;; From de14668e2a8804c9ef52320a41b55648d0425911 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Thu, 1 Aug 2024 13:58:03 +0200 Subject: [PATCH 30/35] Add negative authentication test within ibmmq group --- deps/amqp10_client/src/amqp10_client_connection.erl | 6 +++--- deps/amqp10_client/test/system_SUITE.erl | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client_connection.erl b/deps/amqp10_client/src/amqp10_client_connection.erl index 438ec4f4ed17..32eb996da686 100644 --- a/deps/amqp10_client/src/amqp10_client_connection.erl +++ b/deps/amqp10_client/src/amqp10_client_connection.erl @@ -228,13 +228,13 @@ sasl_hdr_rcvds({call, From}, begin_session, sasl_init_sent(_EvtType, #'v1_0.sasl_outcome'{code = {ubyte, 0}}, #state{socket = Socket} = State) -> - logger:warning("sasl_init_sent "), + logger:warning("sasl_init_sent received v1_0.sasl_outcome"), ok = socket_send(Socket, ?AMQP_PROTOCOL_HEADER), logger:warning("sasl_init_sent socket_send AMQP_PROTOCOL_HEADER ok"), {next_state, hdr_sent, State}; sasl_init_sent(_EvtType, #'v1_0.sasl_outcome'{code = {ubyte, C}}, #state{} = State) when C==1;C==2;C==3;C==4 -> - logger:warning("sasl_init_sent sasl_auth_failure"), + logger:warning("sasl_init_sent received sasl_auth_failure"), {stop, sasl_auth_failure, State}; sasl_init_sent({call, From}, begin_session, #state{pending_session_reqs = PendingSessionReqs} = State) -> @@ -486,7 +486,7 @@ send_sasl_init(State, {plain, User, Pass}) -> Response = <<0:8, User/binary, 0:8, Pass/binary>>, Frame = #'v1_0.sasl_init'{mechanism = {symbol, <<"PLAIN">>}, initial_response = {binary, Response}}, - logger:warning("send_sasl_init ~p ~p", [User, Pass]), + logger:warning("send_sasl_init send v1_0.sasl_init ~p ~p", [User, Pass]), Ret = send(Frame, 1, State), logger:warning("send_sasl_init ~p ~p Ret : ~p", [User, Pass, Ret]), Ret. diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl index b67bb2c938cd..6b1b4ddc3a00 100644 --- a/deps/amqp10_client/test/system_SUITE.erl +++ b/deps/amqp10_client/test/system_SUITE.erl @@ -35,6 +35,7 @@ groups() -> {activemq, [], shared()}, {ibmmq, [], [ open_close_connection, + open_connection_plain_sasl_failure, basic_roundtrip_with_sender_and_receiver_capabilities, basic_roundtrip_with_non_binary_capability ]}, @@ -365,7 +366,7 @@ basic_roundtrip_with_sender_and_receiver_capabilities(Config) -> {message_annotations, #{}} ], [creation_time]). -basic_roundtrip_with_non_binary_capability(Config) -> +basic_roundtrip_with_non_binary_capability(Config) -> application:start(sasl), Hostname = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), From 6e11d54faf41525bf1742d0aff0e271c8b1bf0ff Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Fri, 2 Aug 2024 10:38:15 +0200 Subject: [PATCH 31/35] Fix password --- deps/amqp10_client/test/ibmmq_ct_helpers.erl | 2 +- deps/amqp10_client/test/system_SUITE.erl | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/deps/amqp10_client/test/ibmmq_ct_helpers.erl b/deps/amqp10_client/test/ibmmq_ct_helpers.erl index 87b39c9d2f61..aa64bcdb6dd3 100644 --- a/deps/amqp10_client/test/ibmmq_ct_helpers.erl +++ b/deps/amqp10_client/test/ibmmq_ct_helpers.erl @@ -31,7 +31,7 @@ init_config(Config) -> rabbit_ct_helpers:set_config(Config, [ {rmq_nodes, [NodeConfig]}, {rmq_hostname, "localhost"}, {tcp_hostname_amqp, "localhost"}, - {sasl, {plain, <<"app">>, <<"passw10rd">>}} ]). + {sasl, {plain, <<"app">>, <<"passw0rd">>}} ]). start_ibmmq_server(Config) -> IBMmqCmd = filename:join([?config(data_dir, Config), "ibmmq_runner"]), diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl index 6b1b4ddc3a00..d6218b453003 100644 --- a/deps/amqp10_client/test/system_SUITE.erl +++ b/deps/amqp10_client/test/system_SUITE.erl @@ -35,7 +35,6 @@ groups() -> {activemq, [], shared()}, {ibmmq, [], [ open_close_connection, - open_connection_plain_sasl_failure, basic_roundtrip_with_sender_and_receiver_capabilities, basic_roundtrip_with_non_binary_capability ]}, @@ -364,7 +363,8 @@ basic_roundtrip_with_sender_and_receiver_capabilities(Config) -> {sender_capabilities, <<"queue">>}, {receiver_capabilities, <<"queue">>}, {message_annotations, #{}} - ], [creation_time]). + ], [creation_time]), + timer:sleep(20000). basic_roundtrip_with_non_binary_capability(Config) -> application:start(sasl), From 07c76d09c9dc08e4d0337f97d12248582de4a942 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Wed, 7 Aug 2024 11:45:16 +0200 Subject: [PATCH 32/35] Remove unnecessary command --- .github/workflows/ibm-mq-make.yaml | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/.github/workflows/ibm-mq-make.yaml b/.github/workflows/ibm-mq-make.yaml index f23d8a437a2e..fe85416a0227 100644 --- a/.github/workflows/ibm-mq-make.yaml +++ b/.github/workflows/ibm-mq-make.yaml @@ -9,7 +9,7 @@ on: pull_request: paths: - '.github/workflows/ibm-mq-make.yaml' - + env: REGISTRY_IMAGE: pivotalrabbitmq/ibm-mqadvanced-server-dev IBM_MQ_REPOSITORY: ibm-messaging/mq-container @@ -19,13 +19,13 @@ jobs: docker: runs-on: ubuntu-latest steps: - + - name: Docker meta id: meta uses: docker/metadata-action@v5 with: images: ${{ env.REGISTRY_IMAGE }} - + - name: Set up QEMU uses: docker/setup-qemu-action@v3 @@ -39,9 +39,9 @@ jobs: repository: ${{ env.IBM_MQ_REPOSITORY }} ref: ${{ env.IBM_MQ_BRANCH_NAME }} - - name: Prepare image + - name: Prepare image run: | - ls + ls echo "Enabling AMQP capability" sed -i -e 's/genmqpkg_incamqp=0/genmqpkg_incamqp=1/g' Dockerfile-server echo "AMQP Bootstrap instructions" @@ -51,11 +51,12 @@ jobs: SET AUTHREC PROFILE('SYSTEM.BASE.TOPIC') PRINCIPAL('app') OBJTYPE(TOPIC) AUTHADD(PUB,SUB) SET AUTHREC PROFILE('SYSTEM.DEFAULT.MODEL.QUEUE') PRINCIPAL('app') OBJTYPE(QUEUE) AUTHADD(PUT,DSP) ALTER CHANNEL(SYSTEM.DEF.AMQP) CHLTYPE(AMQP) MCAUSER('app') + STOP SERVICE(SYSTEM.AMQP.SERVICE) START SERVICE(SYSTEM.AMQP.SERVICE) START CHANNEL(SYSTEM.DEF.AMQP) EOF - make build-devserver - docker tag ibm-mqadvanced-server-dev:${{ env.IMAGE_TAG }} ${{ env.REGISTRY_IMAGE }}:${{ env.IMAGE_TAG }} + make build-devserver + docker tag ibm-mqadvanced-server-dev:${{ env.IMAGE_TAG }} ${{ env.REGISTRY_IMAGE }}:${{ env.IMAGE_TAG }} - name: Login to Docker Hub uses: docker/login-action@v3 @@ -64,4 +65,4 @@ jobs: password: ${{ secrets.DOCKERHUB_PASSWORD }} - name: Push run: | - docker push ${{ env.REGISTRY_IMAGE }}:${{ env.IMAGE_TAG }} \ No newline at end of file + docker push ${{ env.REGISTRY_IMAGE }}:${{ env.IMAGE_TAG }} From 0e53c3745cad62d1c17ca2d1d2c0d32f52d86d3a Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Wed, 23 Oct 2024 12:15:05 +0200 Subject: [PATCH 33/35] Bump up ibmmq docker image version --- deps/amqp10_client/test/system_SUITE_data/ibmmq_runner | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner b/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner index cfe358381d2c..d3b8e4005b1e 100755 --- a/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner +++ b/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner @@ -5,7 +5,7 @@ SCRIPT="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" set -u IMAGE=pivotalrabbitmq/ibm-mqadvanced-server-dev -IMAGE_TAG=9.3.5.1-amd64 +IMAGE_TAG=9.4.0-amd64 kill_container_if_exist() { if docker stop $1 &> /dev/null; then From eeebd4a80bfff35e1652ba86e42f462ce4906e1d Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Wed, 23 Oct 2024 12:59:41 +0200 Subject: [PATCH 34/35] Bump up ibmmq docker image --- deps/amqp10_client/test/system_SUITE_data/ibmmq_runner | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner b/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner index d3b8e4005b1e..3740a70f29a3 100755 --- a/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner +++ b/deps/amqp10_client/test/system_SUITE_data/ibmmq_runner @@ -5,7 +5,7 @@ SCRIPT="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" set -u IMAGE=pivotalrabbitmq/ibm-mqadvanced-server-dev -IMAGE_TAG=9.4.0-amd64 +IMAGE_TAG=9.4.0.5-amd64 kill_container_if_exist() { if docker stop $1 &> /dev/null; then From 0a4d8a8a5094205a2c92917acaa4637c8fb3eba1 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Thu, 24 Oct 2024 10:23:58 +0200 Subject: [PATCH 35/35] Fix channel in test cases related to capabilities --- deps/amqp10_client/test/system_SUITE.erl | 27 ++++++++++++------------ 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl index d6218b453003..d94ff74f4fb9 100644 --- a/deps/amqp10_client/test/system_SUITE.erl +++ b/deps/amqp10_client/test/system_SUITE.erl @@ -24,7 +24,7 @@ all() -> {group, rabbitmq}, {group, rabbitmq_strict}, {group, activemq}, - {group, ibmmq}, + % {group, ibmmq}, {group, activemq_no_anon}, {group, mock} ]. @@ -967,17 +967,18 @@ set_receiver_capabilities(Config) -> OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) -> {Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]} end, - BeginStep = fun({1 = Ch, #'v1_0.begin'{}, _Pay}) -> - {Ch, [#'v1_0.begin'{remote_channel = {ushort, 1}, + BeginStep = fun({0 = Ch, #'v1_0.begin'{}, _Pay}) -> + {Ch, [#'v1_0.begin'{remote_channel = {ushort, Ch}, next_outgoing_id = {uint, 1}, incoming_window = {uint, 1000}, outgoing_window = {uint, 1000}} - ]} + ]} end, - AttachStep = fun({1 = Ch, #'v1_0.attach'{role = true, + AttachStep = fun({0 = Ch, #'v1_0.attach'{role = true, name = Name, source = #'v1_0.source'{ - capabilities = {symbol, <<"capability-1">>}}}, <<>>}) -> + capabilities = {symbol, <<"capability-1">>}} + }, <<>>}) -> {Ch, [#'v1_0.attach'{name = Name, handle = {uint, 99}, initial_delivery_count = {uint, 1}, @@ -985,7 +986,7 @@ set_receiver_capabilities(Config) -> ]} end, - LinkCreditStep = fun({1 = Ch, #'v1_0.flow'{}, <<>>}) -> + LinkCreditStep = fun({0 = Ch, #'v1_0.flow'{}, <<>>}) -> {Ch, {multi, [[#'v1_0.transfer'{handle = {uint, 99}, delivery_id = {uint, 12}, more = true}, @@ -1039,14 +1040,14 @@ set_sender_capabilities(Config) -> OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) -> {Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]} end, - BeginStep = fun({1 = Ch, #'v1_0.begin'{}, _Pay}) -> - {Ch, [#'v1_0.begin'{remote_channel = {ushort, 1}, + BeginStep = fun({0 = Ch, #'v1_0.begin'{}, _Pay}) -> + {Ch, [#'v1_0.begin'{remote_channel = {ushort, Ch}, next_outgoing_id = {uint, 1}, incoming_window = {uint, 1000}, outgoing_window = {uint, 1000}} ]} end, - AttachStep = fun({1 = Ch, #'v1_0.attach'{role = false, + AttachStep = fun({0 = Ch, #'v1_0.attach'{role = false, name = Name, source = #'v1_0.source'{ @@ -1091,14 +1092,14 @@ set_sender_sync_capabilities(Config) -> OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) -> {Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]} end, - BeginStep = fun({1 = Ch, #'v1_0.begin'{}, _Pay}) -> - {Ch, [#'v1_0.begin'{remote_channel = {ushort, 1}, + BeginStep = fun({0 = Ch, #'v1_0.begin'{}, _Pay}) -> + {Ch, [#'v1_0.begin'{remote_channel = {ushort, Ch}, next_outgoing_id = {uint, 1}, incoming_window = {uint, 1000}, outgoing_window = {uint, 1000}} ]} end, - AttachStep = fun({1 = Ch, #'v1_0.attach'{role = false, + AttachStep = fun({0 = Ch, #'v1_0.attach'{role = false, name = Name, source = #'v1_0.source'{