Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set sender/receiver capabilities to attach frame in the amqp10_client #11337

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
9fc1e0c
Configure capabilities on the source/target field in the ATTACH frame
MarcialRosales Jun 20, 2024
1dca32f
Minor cleanup
MarcialRosales Jul 18, 2024
edbf33e
Ignore sender/receiver non-binary capabilities
MarcialRosales Jul 26, 2024
619ab00
interim change ct:log to be able to
MarcialRosales Jul 26, 2024
800085f
Use logger:debug to log capabilities
MarcialRosales Jul 26, 2024
c3ac525
Log with warning level instead
MarcialRosales Jul 26, 2024
1cf8b35
Capture error and log it
MarcialRosales Jul 29, 2024
f990c3c
Add missing log statement
MarcialRosales Jul 29, 2024
53dc39d
Change message
MarcialRosales Jul 29, 2024
0987877
Add extra logging on attach
MarcialRosales Jul 29, 2024
04ef8cf
Add extra logging
MarcialRosales Jul 29, 2024
2e1b8f7
Add extra logging
MarcialRosales Jul 29, 2024
09f95b3
Fix issue
MarcialRosales Jul 29, 2024
19e18ce
Modified log statement
MarcialRosales Jul 29, 2024
b390138
REvert last change
MarcialRosales Jul 29, 2024
6bccbd6
Log return from send_attach
MarcialRosales Jul 29, 2024
bb5045f
Test capabilities with activemq
MarcialRosales Jul 29, 2024
351038a
Log around sending frame
MarcialRosales Jul 29, 2024
5725797
Log eveny operation related to stop event
MarcialRosales Jul 29, 2024
82c1e1f
Test capabilities with activemq and ibmmq
MarcialRosales Jul 29, 2024
92d390f
Sleep for 500msec before considering ibmmq running
MarcialRosales Jul 30, 2024
8986c5c
Fix issue in the ibmmq helper
MarcialRosales Jul 30, 2024
dcbad62
Add extra logging around connection states
MarcialRosales Jul 30, 2024
2d55f8f
Try to send capability in an array without any non-binary
MarcialRosales Jul 30, 2024
e79c07b
Skip test which sends illegal capability
MarcialRosales Jul 30, 2024
7779984
Try a bigger delay before starting to use ibm mq
MarcialRosales Jul 30, 2024
2cdbebd
Test with non-binary capabilities
MarcialRosales Jul 30, 2024
b228078
Wait for connection and session events
MarcialRosales Jul 30, 2024
3dd1cf6
Log connection related events
MarcialRosales Aug 1, 2024
de14668
Add negative authentication test within ibmmq group
MarcialRosales Aug 1, 2024
6e11d54
Fix password
MarcialRosales Aug 2, 2024
07c76d0
Remove unnecessary command
MarcialRosales Aug 7, 2024
0e53c37
Bump up ibmmq docker image version
MarcialRosales Oct 23, 2024
eeebd4a
Bump up ibmmq docker image
MarcialRosales Oct 23, 2024
0a4d8a8
Fix channel in test cases related to capabilities
MarcialRosales Oct 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions .github/workflows/ibm-mq-make.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
pull_request:
paths:
- '.github/workflows/ibm-mq-make.yaml'

env:
REGISTRY_IMAGE: pivotalrabbitmq/ibm-mqadvanced-server-dev
IBM_MQ_REPOSITORY: ibm-messaging/mq-container
Expand All @@ -19,13 +19,13 @@ jobs:
docker:
runs-on: ubuntu-latest
steps:

- name: Docker meta
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY_IMAGE }}

-
name: Set up QEMU
uses: docker/setup-qemu-action@v3
Expand All @@ -39,9 +39,9 @@ jobs:
repository: ${{ env.IBM_MQ_REPOSITORY }}
ref: ${{ env.IBM_MQ_BRANCH_NAME }}

- name: Prepare image
- name: Prepare image
run: |
ls
ls
echo "Enabling AMQP capability"
sed -i -e 's/genmqpkg_incamqp=0/genmqpkg_incamqp=1/g' Dockerfile-server
echo "AMQP Bootstrap instructions"
Expand All @@ -51,11 +51,12 @@ jobs:
SET AUTHREC PROFILE('SYSTEM.BASE.TOPIC') PRINCIPAL('app') OBJTYPE(TOPIC) AUTHADD(PUB,SUB)
SET AUTHREC PROFILE('SYSTEM.DEFAULT.MODEL.QUEUE') PRINCIPAL('app') OBJTYPE(QUEUE) AUTHADD(PUT,DSP)
ALTER CHANNEL(SYSTEM.DEF.AMQP) CHLTYPE(AMQP) MCAUSER('app')
STOP SERVICE(SYSTEM.AMQP.SERVICE)
START SERVICE(SYSTEM.AMQP.SERVICE)
START CHANNEL(SYSTEM.DEF.AMQP)
EOF
make build-devserver
docker tag ibm-mqadvanced-server-dev:${{ env.IMAGE_TAG }} ${{ env.REGISTRY_IMAGE }}:${{ env.IMAGE_TAG }}
make build-devserver
docker tag ibm-mqadvanced-server-dev:${{ env.IMAGE_TAG }} ${{ env.REGISTRY_IMAGE }}:${{ env.IMAGE_TAG }}
-
name: Login to Docker Hub
uses: docker/login-action@v3
Expand All @@ -64,4 +65,4 @@ jobs:
password: ${{ secrets.DOCKERHUB_PASSWORD }}
- name: Push
run: |
docker push ${{ env.REGISTRY_IMAGE }}:${{ env.IMAGE_TAG }}
docker push ${{ env.REGISTRY_IMAGE }}:${{ env.IMAGE_TAG }}
2 changes: 2 additions & 0 deletions deps/amqp10_client/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@

# Downloaded ActiveMQ.
/test/system_SUITE_data/apache-activemq-*
/test/system_SUITE_data/ibmmq/mq-container
/test/system_SUITE_data/ibmmq/*.tar.gz
6 changes: 4 additions & 2 deletions deps/amqp10_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,14 @@ rabbitmq_integration_suite(
size = "medium",
additional_beam = [
"test/activemq_ct_helpers.beam",
"test/ibmmq_ct_helpers.beam",
"test/mock_server.beam",
],
data = [
"@activemq//:exec_dir",
"@activemq//:exec_dir",
],
test_env = {
"ACTIVEMQ": "$TEST_SRCDIR/$TEST_WORKSPACE/external/activemq/bin/activemq",
"ACTIVEMQ": "$TEST_SRCDIR/$TEST_WORKSPACE/external/activemq/bin/activemq"
},
deps = TEST_DEPS,
)
Expand All @@ -140,6 +141,7 @@ eunit(
name = "eunit",
compiled_suites = [
":test_activemq_ct_helpers_beam",
":test_ibmmq_ct_helpers_beam",
":test_mock_server_beam",
],
target = ":test_erlang_app",
Expand Down
8 changes: 8 additions & 0 deletions deps/amqp10_client/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
app_name = "amqp10_client",
erlc_opts = "//:test_erlc_opts",
)
erlang_bytecode(
name = "test_ibmmq_ct_helpers_beam",
testonly = True,
srcs = ["test/ibmmq_ct_helpers.erl"],
outs = ["test/ibmmq_ct_helpers.beam"],
app_name = "amqp10_client",
erlc_opts = "//:test_erlc_opts",
)
erlang_bytecode(
name = "test_mock_server_beam",
testonly = True,
Expand Down
5 changes: 4 additions & 1 deletion deps/amqp10_client/src/amqp10_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ end_session(Pid) ->
%% @doc Synchronously attach a link on 'Session'.
%% This is a convenience function that awaits attached event
%% for the link before returning.
-spec attach_sender_link_sync(pid(), binary(), binary()) ->
{ok, link_ref()} | link_timeout.
MarcialRosales marked this conversation as resolved.
Show resolved Hide resolved
attach_sender_link_sync(Session, Name, Target) ->
attach_sender_link_sync(Session, Name, Target, mixed).

Expand Down Expand Up @@ -275,7 +277,8 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) ->
-spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(),
terminus_durability(), filter(), properties()) ->
{ok, link_ref()}.
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties)
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter,
Properties)
when is_pid(Session) andalso
is_binary(Name) andalso
is_binary(Source) andalso
Expand Down
25 changes: 23 additions & 2 deletions deps/amqp10_client/src/amqp10_client_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -228,20 +228,27 @@ sasl_hdr_rcvds({call, From}, begin_session,

sasl_init_sent(_EvtType, #'v1_0.sasl_outcome'{code = {ubyte, 0}},
#state{socket = Socket} = State) ->
logger:warning("sasl_init_sent received v1_0.sasl_outcome"),
ok = socket_send(Socket, ?AMQP_PROTOCOL_HEADER),
logger:warning("sasl_init_sent socket_send AMQP_PROTOCOL_HEADER ok"),
{next_state, hdr_sent, State};
sasl_init_sent(_EvtType, #'v1_0.sasl_outcome'{code = {ubyte, C}},
#state{} = State) when C==1;C==2;C==3;C==4 ->
logger:warning("sasl_init_sent received sasl_auth_failure"),
{stop, sasl_auth_failure, State};
sasl_init_sent({call, From}, begin_session,
#state{pending_session_reqs = PendingSessionReqs} = State) ->
logger:warning("sasl_init_sent call to begin_session"),
State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]},
{keep_state, State1}.

hdr_sent(_EvtType, {protocol_header_received, 0, 1, 0, 0}, State) ->
logger:warning("hdr_sent received {protocol_header_received"),
case send_open(State) of
ok -> {next_state, open_sent, State};
Error -> {stop, Error, State}
Error ->
logger:warning("client_connection hdr_sent ~p", [Error]),
{stop, Error, State}
end;
hdr_sent(_EvtType, {protocol_header_received, Protocol, Maj, Min,
Rev}, State) ->
Expand All @@ -250,13 +257,15 @@ hdr_sent(_EvtType, {protocol_header_received, Protocol, Maj, Min,
{stop, normal, State};
hdr_sent({call, From}, begin_session,
#state{pending_session_reqs = PendingSessionReqs} = State) ->
logger:warning("hdr_sent received call begin_session"),
State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]},
{keep_state, State1}.

open_sent(_EvtType, #'v1_0.open'{max_frame_size = MaybeMaxFrameSize,
idle_time_out = Timeout} = Open,
#state{pending_session_reqs = PendingSessionReqs,
config = Config} = State0) ->
logger:warning("open_sent received 'v1_0.open' with pending_session_reqs: ~p", [PendingSessionReqs]),
State = case Timeout of
undefined -> State0;
{uint, T} when T > 0 ->
Expand All @@ -283,13 +292,16 @@ open_sent(_EvtType, #'v1_0.open'{max_frame_size = MaybeMaxFrameSize,
{next_state, opened, State2#state{pending_session_reqs = []}};
open_sent({call, From}, begin_session,
#state{pending_session_reqs = PendingSessionReqs} = State) ->
logger:warning("open_sent received call begin_session with pending_session_reqs: ~p", [PendingSessionReqs]),
State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]},
{keep_state, State1};
open_sent(info, {'DOWN', MRef, _, _, _},
#state{reader_m_ref = MRef}) ->
logger:warning("open_sent received 'DOWN"),
{stop, {shutdown, reader_down}}.

opened(_EvtType, heartbeat, State = #state{idle_time_out = T}) ->
logger:warning("opened received heartbeat"),
ok = send_heartbeat(State),
{ok, Tmr} = start_heartbeat_timer(T),
{keep_state, State#state{heartbeat_timer = Tmr}};
Expand All @@ -308,11 +320,14 @@ opened(_EvtType, #'v1_0.close'{} = Close, State = #state{config = Config}) ->
_ = send_close(State, none),
{stop, normal, State};
opened({call, From}, begin_session, State) ->
logger:warning("opened received call being_session"),
{Ret, State1} = handle_begin_session(From, State),
logger:warning("handle_begin_session ret: ~p", [Ret]),
{keep_state, State1, [{reply, From, Ret}]};
opened(info, {'DOWN', MRef, _, _, _Info},
State = #state{reader_m_ref = MRef, config = Config}) ->
%% reader has gone down and we are not already shutting down
logger:warning("opened info received 'DOWN'"),
ok = notify_closed(Config, shutdown),
{stop, normal, State};
opened(_EvtType, Frame, State) ->
Expand All @@ -328,11 +343,13 @@ close_sent(_EvtType, {'EXIT', _Pid, shutdown}, State) ->
close_sent(_EvtType, {'DOWN', _Ref, process, ReaderPid, _},
#state{reader = ReaderPid} = State) ->
%% if the reader exits we probably wont receive a close frame
logger:warning("client_connection close_sent( DOWN"),
{stop, normal, State};
close_sent(_EvtType, #'v1_0.close'{} = Close, State = #state{config = Config}) ->
ok = notify_closed(Config, Close),
%% TODO: we should probably set up a timer before this to ensure
%% we close down event if no reply is received
logger:warning("client_connection close_sent( v1_0.close"),
{stop, normal, State}.

set_other_procs0(OtherProcs, State) ->
Expand Down Expand Up @@ -450,6 +467,7 @@ send_close(#state{socket = Socket}, _Reason) ->
ok;
_ -> ok
end,
logger:warning("client_connetion send_close Ret: ~p", [Ret]),
Ret.

send_sasl_init(State, anon) ->
Expand All @@ -468,7 +486,10 @@ send_sasl_init(State, {plain, User, Pass}) ->
Response = <<0:8, User/binary, 0:8, Pass/binary>>,
Frame = #'v1_0.sasl_init'{mechanism = {symbol, <<"PLAIN">>},
initial_response = {binary, Response}},
send(Frame, 1, State).
logger:warning("send_sasl_init send v1_0.sasl_init ~p ~p", [User, Pass]),
Ret = send(Frame, 1, State),
logger:warning("send_sasl_init ~p ~p Ret : ~p", [User, Pass, Ret]),
Ret.

send(Record, FrameType, #state{socket = Socket}) ->
Encoded = amqp10_framing:encode_bin(Record),
Expand Down
4 changes: 4 additions & 0 deletions deps/amqp10_client/src/amqp10_client_frame_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ handle_event(cast, {unregister_session, _Session, OutgoingChannel, IncomingChann
incoming_channels = IncomingChannels1},
{keep_state, State1};
handle_event(cast, close, _StateName, State = #state{socket = Socket}) ->
logger:warning("frame_reader handle_event cast close"),
_ = close_socket(Socket),
{stop, normal, State#state{socket = undefined}};

Expand All @@ -167,11 +168,13 @@ handle_event(info, {Tcp, _, Packet}, StateName, #state{buffer = Buffer} = State)
set_active_once(NewState),
{next_state, NextState, NewState#state{buffer = Remaining}};
{error, Reason, NewState} ->
logger:warning("frame_reader info handle_input error ~p", [Reason]),
{stop, Reason, NewState}
end;

handle_event(info, {TcpError, _, Reason}, StateName, State)
when TcpError == tcp_error orelse TcpError == ssl_error ->
logger:warning("frame_reader handle_event info TcpError: ~p", [TcpError]),
logger:warning("AMQP 1.0 connection socket errored, connection state: '~ts', reason: '~tp'",
[StateName, Reason]),
State1 = State#state{socket = undefined,
Expand All @@ -180,6 +183,7 @@ handle_event(info, {TcpError, _, Reason}, StateName, State)
{stop, {error, Reason}, State1};
handle_event(info, {TcpClosed, _}, StateName, State)
when TcpClosed == tcp_closed orelse TcpClosed == ssl_closed ->
logger:warning("frame_reader handle_event info TcpClosed: ~p", [TcpClosed]),
logger:warning("AMQP 1.0 connection socket was closed, connection state: '~ts'",
[StateName]),
State1 = State#state{socket = undefined,
Expand Down
Loading
Loading