Skip to content

Commit

Permalink
Merge pull request #127 from zmstone/241029-resolve-discover-and-conn…
Browse files Browse the repository at this point in the history
…ect-timeout-value

fix: resolve discover-then-connect timeout
  • Loading branch information
zmstone authored Nov 6, 2024
2 parents dfea85f + 235d129 commit ae7df49
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 16 deletions.
7 changes: 1 addition & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ jobs:
restore-keys: |
${{ runner.os }}-dialyzer2-
- name: Set up Docker Compose
run: |
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
# Install Erlang
- name: Install Erlang/OTP
uses: erlef/setup-beam@v1
Expand All @@ -65,4 +60,4 @@ jobs:
run: |
export KAFKA_VERSION=${{ matrix.kafka }}
make test-env
make eunit || (cd scripts && docker-compose logs)
make eunit || (cd scripts && docker compose logs)
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ edoc:
ex_doc:
@$(rebar_cmd) ex_doc

.PHONY: dialyze
dialyze: compile
.PHONY: dialyze dialyzer
dialyze: dialyzer
dialyzer: compile
@$(rebar_cmd) dialyzer

.PHONY: hex-publish
Expand Down
7 changes: 7 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
* 4.1.10
- Resolve timeout value for discover and connect
- partition leader
- consumer group coordinator
- cluster controller
Choose the greater value of connect timeout and request timeout.

* 4.1.9
- Upgrade crc32cer to 0.1.11 for build issue fix on OTP 27.

Expand Down
2 changes: 0 additions & 2 deletions scripts/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: '2'

services:
zookeeper:
image: "zmstone/kafka:${KAFKA_VERSION}"
Expand Down
5 changes: 2 additions & 3 deletions scripts/setup-test-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ export KAFKA_VERSION=$VERSION

TD="$(cd "$(dirname "$0")" && pwd)"

docker-compose -f $TD/docker-compose.yml down || true
docker-compose -f $TD/docker-compose.yml up -d
docker compose -f $TD/docker-compose.yml down || true
docker compose -f $TD/docker-compose.yml up -d

# give kafka some time
sleep 5
Expand Down Expand Up @@ -70,4 +70,3 @@ docker exec kafka-1 /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server l
if [[ "$KAFKA_VERSION" != 0.9* ]] && [[ "$KAFKA_VERSION" != 0.10* ]]; then
docker exec kafka-1 /opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=ecila],SCRAM-SHA-512=[password=ecila]' --entity-type users --entity-name alice
fi

13 changes: 10 additions & 3 deletions src/kpro_brokers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ with_connection(Endpoints, Config, Fun) ->
topic(), partition(), #{timeout => timeout()}) ->
{ok, connection()} | {error, any()}.
connect_partition_leader(Bootstrap, Config, Topic, Partition, Opts) ->
Timeout = maps:get(timeout, Opts, ?DEFAULT_TIMEOUT),
Timeout = resolve_timeout(Config, Opts),
DiscoverFun =
fun(C) -> discover_partition_leader(C, Topic, Partition, Timeout) end,
discover_and_connect(DiscoverFun, Bootstrap, Config, Timeout).
Expand All @@ -96,7 +96,7 @@ connect_partition_leader(Bootstrap, Config, Topic, Partition, Opts) ->
connect_coordinator(Bootstrap, Config, #{ type := Type
, id := Id
} = Args) ->
Timeout = maps:get(timeout, Args, ?DEFAULT_TIMEOUT),
Timeout = resolve_timeout(Config, Args),
DiscoverFun = fun(Conn) -> discover_coordinator(Conn, Type, Id, Timeout) end,
discover_and_connect(DiscoverFun, Bootstrap, Config, Timeout).

Expand All @@ -105,7 +105,7 @@ connect_coordinator(Bootstrap, Config, #{ type := Type
#{timeout => timeout()}) ->
{ok, connection()} | {error, any()}.
connect_controller(Bootstrap, Config, Opts) ->
Timeout = maps:get(timeout, Opts, ?DEFAULT_TIMEOUT),
Timeout = resolve_timeout(Config, Opts),
DiscoverFun = fun(Conn) -> discover_controller(Conn, Timeout) end,
discover_and_connect(DiscoverFun, Bootstrap, Config, Timeout).

Expand Down Expand Up @@ -318,6 +318,13 @@ random_order(L) ->
RI = lists:sort(lists:zip(RandL, L)),
[I || {_R, I} <- RI].

resolve_timeout(ConnConfig, Opts) when is_list(ConnConfig) ->
resolve_timeout(maps:from_list(ConnConfig), Opts);
resolve_timeout(ConnConfig, Opts) ->
ConnectTimeout = kpro_connection:get_connect_timeout(ConnConfig),
RequestTimeout = maps:get(timeout, Opts, ?DEFAULT_TIMEOUT),
max(ConnectTimeout, RequestTimeout).

-ifdef(TEST).

api_vsn_range_intersection_test() ->
Expand Down
1 change: 1 addition & 0 deletions src/kpro_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
, start/3
, stop/1
, debug/2
, get_connect_timeout/1
]).

%% system calls support for worker process
Expand Down

0 comments on commit ae7df49

Please sign in to comment.