From 9e2f754d2ae6abaf2c97e628075ac0439ae03eca Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Fri, 19 Apr 2024 14:14:29 +0200 Subject: [PATCH 1/3] fix: InfluxDB client read/write timeout settings --- README.md | 6 ++-- .../influxdata/nifi/util/InfluxDBUtils.java | 8 ++++- .../influxdata/nifi/util/ITTestTimeout.java | 36 +++++++++++++++++++ scripts/influxdb-restart.sh | 21 +++++++++++ 4 files changed, 67 insertions(+), 4 deletions(-) create mode 100644 nifi-influx-database-utils/src/test/java/org/influxdata/nifi/util/ITTestTimeout.java diff --git a/README.md b/README.md index 1cc8a52..10ffb52 100644 --- a/README.md +++ b/README.md @@ -269,7 +269,7 @@ Allows sharing connection configuration to InfluxDB 1.x among more NiFi process | SSL Context Service | The SSL Context Service used to provide client certificate information for TLS/SSL connections | | Client Auth | The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided. | | **InfluxDB connection URL** | InfluxDB URL to connect to. Eg: http://influxdb:8086 | -| **InfluxDB Max Connection Time Out** | The maximum time for establishing connection to the InfluxDB | +| **InfluxDB Max Connection Time Out** | The maximum time for establishing connection and reading/writing to the InfluxDB | | Username | Username which is used to authorize against the InfluxDB | | Password | Password for the username which is used to authorize against the InfluxDB. If the authorization fail the FlowFile will be penalized and routed to 'retry' relationship. | @@ -284,7 +284,7 @@ Allows sharing connection configuration to InfluxDB 2.0 among more NiFi processo | SSL Context Service | The SSL Context Service used to provide client certificate information for TLS/SSL connections | | Client Auth | The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided. | | **InfluxDB connection URL** | InfluxDB URL to connect to. Eg: http://influxdb:8086 | -| **InfluxDB Max Connection Time Out** | The maximum time for establishing connection to the InfluxDB | +| **InfluxDB Max Connection Time Out** | The maximum time for establishing connection and reading/writing to the InfluxDB | | **InfluxDB Access Token** | Access Token used for authenticating/authorizing the InfluxDB request sent by NiFi. | ### PutInfluxDatabase @@ -297,7 +297,7 @@ Processor to write the content of a FlowFile in 'line protocol'. Please check de | --- | --- | | **Database Name** | InfluxDB database to connect to | | **InfluxDB connection URL** | InfluxDB URL to connect to. Eg: http://influxdb:8086 | -| **InfluxDB Max Connection Time Out** | The maximum time for establishing connection to the InfluxDB | +| **InfluxDB Max Connection Time Out** | The maximum time for establishing connection and reading/writing to the InfluxDB | | Username | Username for accessing InfluxDB | | Password | Password for user | | **Character Set** | Specifies the character set of the document data | diff --git a/nifi-influx-database-utils/src/main/java/org/influxdata/nifi/util/InfluxDBUtils.java b/nifi-influx-database-utils/src/main/java/org/influxdata/nifi/util/InfluxDBUtils.java index 594c964..d8adbba 100644 --- a/nifi-influx-database-utils/src/main/java/org/influxdata/nifi/util/InfluxDBUtils.java +++ b/nifi-influx-database-utils/src/main/java/org/influxdata/nifi/util/InfluxDBUtils.java @@ -287,6 +287,8 @@ public static InfluxDB makeConnectionV1(String influxDbUrl, OkHttpClient.Builder builder = new OkHttpClient .Builder() .connectTimeout(connectionTimeout, TimeUnit.SECONDS) + .readTimeout(connectionTimeout, TimeUnit.SECONDS) + .writeTimeout(connectionTimeout, TimeUnit.SECONDS) // add interceptor with "User-Agent" header .addInterceptor(chain -> { Request request = chain @@ -323,7 +325,11 @@ public static InfluxDBClient makeConnectionV2(String influxDbUrl, long connectionTimeout, Consumer configurer, final String clientType) { - OkHttpClient.Builder builder = new OkHttpClient.Builder().connectTimeout(connectionTimeout, TimeUnit.SECONDS); + OkHttpClient.Builder builder = new OkHttpClient + .Builder() + .connectTimeout(connectionTimeout, TimeUnit.SECONDS) + .readTimeout(connectionTimeout, TimeUnit.SECONDS) + .writeTimeout(connectionTimeout, TimeUnit.SECONDS); if (configurer != null) { configurer.accept(builder); } diff --git a/nifi-influx-database-utils/src/test/java/org/influxdata/nifi/util/ITTestTimeout.java b/nifi-influx-database-utils/src/test/java/org/influxdata/nifi/util/ITTestTimeout.java new file mode 100644 index 0000000..eee5a8e --- /dev/null +++ b/nifi-influx-database-utils/src/test/java/org/influxdata/nifi/util/ITTestTimeout.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.influxdata.nifi.util; + +import org.junit.jupiter.api.Test; + +public class ITTestTimeout { + static final String INFLUX_DB_2_URL = "http://localhost:9999"; + static final String INFLUX_DB_2_ORG = "my-org"; + static final String INFLUX_DB_2_TOKEN = "my-token"; + + @Test + public void testReadTimeout() { + var influxDBClient = InfluxDBUtils.makeConnectionV2(INFLUX_DB_2_URL, INFLUX_DB_2_TOKEN, 60, null, null); + var unused = influxDBClient.getQueryApi().query("import \"array\"\n" + + "import \"experimental/json\"\n" + + "import \"http/requests\"\n" + + "response = requests.get(url: \"http://httpbin:8080/delay/20\")\n" + + "data = json.parse(data: response.body)\n" + + "array.from(rows: [{_field: \"origin\", _value: data.origin, _time: now()}])", INFLUX_DB_2_ORG); + } +} diff --git a/scripts/influxdb-restart.sh b/scripts/influxdb-restart.sh index fc8e438..7b0d637 100755 --- a/scripts/influxdb-restart.sh +++ b/scripts/influxdb-restart.sh @@ -31,6 +31,8 @@ DEFAULT_INFLUXDB_V2_VERSION="latest" INFLUXDB_V2_VERSION="${INFLUXDB_V2_VERSION:-$DEFAULT_INFLUXDB_V2_VERSION}" INFLUXDB_V2_IMAGE=influxdb:${INFLUXDB_V2_VERSION} +HTTPBIN_IMAGE=mccutchen/go-httpbin + SCRIPT_PATH="$( cd "$(dirname "$0")" ; pwd -P )" docker kill influxdb || true @@ -42,8 +44,26 @@ docker rm influxdb-secured || true docker kill influxdb_v2 || true docker rm influxdb_v2 || true +docker kill httpbin || true +docker rm httpbin || true + docker pull ${INFLUXDB_IMAGE} || true docker pull ${INFLUXDB_V2_IMAGE} || true +docker pull ${HTTPBIN_IMAGE} || true + +# +# Testing helper service +# +echo +echo "Starting httpbin service ..." +echo + +docker run \ + --detach \ + --name httpbin \ + --publish 8080:8080 \ + --env=MAX_DURATION=60s \ + ${HTTPBIN_IMAGE} echo echo "Starting unsecured InfluxDB..." @@ -87,6 +107,7 @@ docker run \ --env INFLUXD_HTTP_BIND_ADDRESS=:9999 \ --name influxdb_v2 \ --link=influxdb \ + --link httpbin \ --publish 9999:9999 \ ${INFLUXDB_V2_IMAGE} From b99c0713eb375955da9406378ea226f201be2cc9 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Fri, 19 Apr 2024 14:26:16 +0200 Subject: [PATCH 2/3] test: fix syntax for Java 8 --- .../test/java/org/influxdata/nifi/util/ITTestTimeout.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/nifi-influx-database-utils/src/test/java/org/influxdata/nifi/util/ITTestTimeout.java b/nifi-influx-database-utils/src/test/java/org/influxdata/nifi/util/ITTestTimeout.java index eee5a8e..774bdaa 100644 --- a/nifi-influx-database-utils/src/test/java/org/influxdata/nifi/util/ITTestTimeout.java +++ b/nifi-influx-database-utils/src/test/java/org/influxdata/nifi/util/ITTestTimeout.java @@ -16,8 +16,12 @@ */ package org.influxdata.nifi.util; +import com.influxdb.client.InfluxDBClient; +import com.influxdb.query.FluxTable; import org.junit.jupiter.api.Test; +import java.util.List; + public class ITTestTimeout { static final String INFLUX_DB_2_URL = "http://localhost:9999"; static final String INFLUX_DB_2_ORG = "my-org"; @@ -25,8 +29,8 @@ public class ITTestTimeout { @Test public void testReadTimeout() { - var influxDBClient = InfluxDBUtils.makeConnectionV2(INFLUX_DB_2_URL, INFLUX_DB_2_TOKEN, 60, null, null); - var unused = influxDBClient.getQueryApi().query("import \"array\"\n" + + InfluxDBClient influxDBClient = InfluxDBUtils.makeConnectionV2(INFLUX_DB_2_URL, INFLUX_DB_2_TOKEN, 60, null, null); + List unused = influxDBClient.getQueryApi().query("import \"array\"\n" + "import \"experimental/json\"\n" + "import \"http/requests\"\n" + "response = requests.get(url: \"http://httpbin:8080/delay/20\")\n" + From f1a8dd0d6fefd79b9d1773e3519c528c562ad8d5 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Fri, 19 Apr 2024 14:28:48 +0200 Subject: [PATCH 3/3] docs: update CHANGELOG --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ddce08f..b1ac882 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## v1.29.0 [unreleased] +### Bug Fixes +1. [#163](https://github.com/influxdata/nifi-influxdb-bundle/pull/163): Max connection timeout also used as read/write timeout. + ## v1.28.0 [2024-03-01] ### Others