diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f2b8464..a77c571 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -42,11 +42,6 @@ jobs: restore-keys: | ${{ runner.os }}-dialyzer2- - - name: Set up Docker Compose - run: | - sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose - sudo chmod +x /usr/local/bin/docker-compose - # Install Erlang - name: Install Erlang/OTP uses: erlef/setup-beam@v1 @@ -65,4 +60,4 @@ jobs: run: | export KAFKA_VERSION=${{ matrix.kafka }} make test-env - make eunit || (cd scripts && docker-compose logs) + make eunit || (cd scripts && docker compose logs) diff --git a/Makefile b/Makefile index 515b018..76adce2 100644 --- a/Makefile +++ b/Makefile @@ -56,8 +56,9 @@ edoc: ex_doc: @$(rebar_cmd) ex_doc -.PHONY: dialyze -dialyze: compile +.PHONY: dialyze dialyzer +dialyze: dialyzer +dialyzer: compile @$(rebar_cmd) dialyzer .PHONY: hex-publish diff --git a/changelog.md b/changelog.md index 6f6d363..88dad00 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,10 @@ +* 4.1.10 + - Resolve timeout value for discover and connect + - partition leader + - consumer group coordinator + - cluster controller + Choose the greater value of connect timeout and request timeout. + * 4.1.9 - Upgrade crc32cer to 0.1.11 for build issue fix on OTP 27. diff --git a/scripts/docker-compose.yml b/scripts/docker-compose.yml index 257a413..36a3c90 100644 --- a/scripts/docker-compose.yml +++ b/scripts/docker-compose.yml @@ -1,5 +1,3 @@ -version: '2' - services: zookeeper: image: "zmstone/kafka:${KAFKA_VERSION}" diff --git a/scripts/setup-test-env.sh b/scripts/setup-test-env.sh index f05b2df..a2662c2 100755 --- a/scripts/setup-test-env.sh +++ b/scripts/setup-test-env.sh @@ -26,8 +26,8 @@ export KAFKA_VERSION=$VERSION TD="$(cd "$(dirname "$0")" && pwd)" -docker-compose -f $TD/docker-compose.yml down || true -docker-compose -f $TD/docker-compose.yml up -d +docker compose -f $TD/docker-compose.yml down || true +docker compose -f $TD/docker-compose.yml up -d # give kafka some time sleep 5 @@ -70,4 +70,3 @@ docker exec kafka-1 /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server l if [[ "$KAFKA_VERSION" != 0.9* ]] && [[ "$KAFKA_VERSION" != 0.10* ]]; then docker exec kafka-1 /opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=ecila],SCRAM-SHA-512=[password=ecila]' --entity-type users --entity-name alice fi - diff --git a/src/kpro_brokers.erl b/src/kpro_brokers.erl index 964a6a4..4249516 100644 --- a/src/kpro_brokers.erl +++ b/src/kpro_brokers.erl @@ -79,7 +79,7 @@ with_connection(Endpoints, Config, Fun) -> topic(), partition(), #{timeout => timeout()}) -> {ok, connection()} | {error, any()}. connect_partition_leader(Bootstrap, Config, Topic, Partition, Opts) -> - Timeout = maps:get(timeout, Opts, ?DEFAULT_TIMEOUT), + Timeout = resolve_timeout(Config, Opts), DiscoverFun = fun(C) -> discover_partition_leader(C, Topic, Partition, Timeout) end, discover_and_connect(DiscoverFun, Bootstrap, Config, Timeout). @@ -96,7 +96,7 @@ connect_partition_leader(Bootstrap, Config, Topic, Partition, Opts) -> connect_coordinator(Bootstrap, Config, #{ type := Type , id := Id } = Args) -> - Timeout = maps:get(timeout, Args, ?DEFAULT_TIMEOUT), + Timeout = resolve_timeout(Config, Args), DiscoverFun = fun(Conn) -> discover_coordinator(Conn, Type, Id, Timeout) end, discover_and_connect(DiscoverFun, Bootstrap, Config, Timeout). @@ -105,7 +105,7 @@ connect_coordinator(Bootstrap, Config, #{ type := Type #{timeout => timeout()}) -> {ok, connection()} | {error, any()}. connect_controller(Bootstrap, Config, Opts) -> - Timeout = maps:get(timeout, Opts, ?DEFAULT_TIMEOUT), + Timeout = resolve_timeout(Config, Opts), DiscoverFun = fun(Conn) -> discover_controller(Conn, Timeout) end, discover_and_connect(DiscoverFun, Bootstrap, Config, Timeout). @@ -318,6 +318,13 @@ random_order(L) -> RI = lists:sort(lists:zip(RandL, L)), [I || {_R, I} <- RI]. +resolve_timeout(ConnConfig, Opts) when is_list(ConnConfig) -> + resolve_timeout(maps:from_list(ConnConfig), Opts); +resolve_timeout(ConnConfig, Opts) -> + ConnectTimeout = kpro_connection:get_connect_timeout(ConnConfig), + RequestTimeout = maps:get(timeout, Opts, ?DEFAULT_TIMEOUT), + max(ConnectTimeout, RequestTimeout). + -ifdef(TEST). api_vsn_range_intersection_test() -> diff --git a/src/kpro_connection.erl b/src/kpro_connection.erl index f489e98..50d4149 100644 --- a/src/kpro_connection.erl +++ b/src/kpro_connection.erl @@ -28,6 +28,7 @@ , start/3 , stop/1 , debug/2 + , get_connect_timeout/1 ]). %% system calls support for worker process