diff --git a/CHANGELOG.md b/CHANGELOG.md index ddce08f..b1ac882 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## v1.29.0 [unreleased] +### Bug Fixes +1. [#163](https://github.com/influxdata/nifi-influxdb-bundle/pull/163): Max connection timeout also used as read/write timeout. + ## v1.28.0 [2024-03-01] ### Others diff --git a/README.md b/README.md index 1cc8a52..10ffb52 100644 --- a/README.md +++ b/README.md @@ -269,7 +269,7 @@ Allows sharing connection configuration to InfluxDB 1.x among more NiFi process | SSL Context Service | The SSL Context Service used to provide client certificate information for TLS/SSL connections | | Client Auth | The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided. | | **InfluxDB connection URL** | InfluxDB URL to connect to. Eg: http://influxdb:8086 | -| **InfluxDB Max Connection Time Out** | The maximum time for establishing connection to the InfluxDB | +| **InfluxDB Max Connection Time Out** | The maximum time for establishing connection and reading/writing to the InfluxDB | | Username | Username which is used to authorize against the InfluxDB | | Password | Password for the username which is used to authorize against the InfluxDB. If the authorization fail the FlowFile will be penalized and routed to 'retry' relationship. | @@ -284,7 +284,7 @@ Allows sharing connection configuration to InfluxDB 2.0 among more NiFi processo | SSL Context Service | The SSL Context Service used to provide client certificate information for TLS/SSL connections | | Client Auth | The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided. | | **InfluxDB connection URL** | InfluxDB URL to connect to. Eg: http://influxdb:8086 | -| **InfluxDB Max Connection Time Out** | The maximum time for establishing connection to the InfluxDB | +| **InfluxDB Max Connection Time Out** | The maximum time for establishing connection and reading/writing to the InfluxDB | | **InfluxDB Access Token** | Access Token used for authenticating/authorizing the InfluxDB request sent by NiFi. | ### PutInfluxDatabase @@ -297,7 +297,7 @@ Processor to write the content of a FlowFile in 'line protocol'. Please check de | --- | --- | | **Database Name** | InfluxDB database to connect to | | **InfluxDB connection URL** | InfluxDB URL to connect to. Eg: http://influxdb:8086 | -| **InfluxDB Max Connection Time Out** | The maximum time for establishing connection to the InfluxDB | +| **InfluxDB Max Connection Time Out** | The maximum time for establishing connection and reading/writing to the InfluxDB | | Username | Username for accessing InfluxDB | | Password | Password for user | | **Character Set** | Specifies the character set of the document data | diff --git a/nifi-influx-database-utils/src/main/java/org/influxdata/nifi/util/InfluxDBUtils.java b/nifi-influx-database-utils/src/main/java/org/influxdata/nifi/util/InfluxDBUtils.java index 594c964..d8adbba 100644 --- a/nifi-influx-database-utils/src/main/java/org/influxdata/nifi/util/InfluxDBUtils.java +++ b/nifi-influx-database-utils/src/main/java/org/influxdata/nifi/util/InfluxDBUtils.java @@ -287,6 +287,8 @@ public static InfluxDB makeConnectionV1(String influxDbUrl, OkHttpClient.Builder builder = new OkHttpClient .Builder() .connectTimeout(connectionTimeout, TimeUnit.SECONDS) + .readTimeout(connectionTimeout, TimeUnit.SECONDS) + .writeTimeout(connectionTimeout, TimeUnit.SECONDS) // add interceptor with "User-Agent" header .addInterceptor(chain -> { Request request = chain @@ -323,7 +325,11 @@ public static InfluxDBClient makeConnectionV2(String influxDbUrl, long connectionTimeout, Consumer configurer, final String clientType) { - OkHttpClient.Builder builder = new OkHttpClient.Builder().connectTimeout(connectionTimeout, TimeUnit.SECONDS); + OkHttpClient.Builder builder = new OkHttpClient + .Builder() + .connectTimeout(connectionTimeout, TimeUnit.SECONDS) + .readTimeout(connectionTimeout, TimeUnit.SECONDS) + .writeTimeout(connectionTimeout, TimeUnit.SECONDS); if (configurer != null) { configurer.accept(builder); } diff --git a/nifi-influx-database-utils/src/test/java/org/influxdata/nifi/util/ITTestTimeout.java b/nifi-influx-database-utils/src/test/java/org/influxdata/nifi/util/ITTestTimeout.java new file mode 100644 index 0000000..774bdaa --- /dev/null +++ b/nifi-influx-database-utils/src/test/java/org/influxdata/nifi/util/ITTestTimeout.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.influxdata.nifi.util; + +import com.influxdb.client.InfluxDBClient; +import com.influxdb.query.FluxTable; +import org.junit.jupiter.api.Test; + +import java.util.List; + +public class ITTestTimeout { + static final String INFLUX_DB_2_URL = "http://localhost:9999"; + static final String INFLUX_DB_2_ORG = "my-org"; + static final String INFLUX_DB_2_TOKEN = "my-token"; + + @Test + public void testReadTimeout() { + InfluxDBClient influxDBClient = InfluxDBUtils.makeConnectionV2(INFLUX_DB_2_URL, INFLUX_DB_2_TOKEN, 60, null, null); + List unused = influxDBClient.getQueryApi().query("import \"array\"\n" + + "import \"experimental/json\"\n" + + "import \"http/requests\"\n" + + "response = requests.get(url: \"http://httpbin:8080/delay/20\")\n" + + "data = json.parse(data: response.body)\n" + + "array.from(rows: [{_field: \"origin\", _value: data.origin, _time: now()}])", INFLUX_DB_2_ORG); + } +} diff --git a/scripts/influxdb-restart.sh b/scripts/influxdb-restart.sh index fc8e438..7b0d637 100755 --- a/scripts/influxdb-restart.sh +++ b/scripts/influxdb-restart.sh @@ -31,6 +31,8 @@ DEFAULT_INFLUXDB_V2_VERSION="latest" INFLUXDB_V2_VERSION="${INFLUXDB_V2_VERSION:-$DEFAULT_INFLUXDB_V2_VERSION}" INFLUXDB_V2_IMAGE=influxdb:${INFLUXDB_V2_VERSION} +HTTPBIN_IMAGE=mccutchen/go-httpbin + SCRIPT_PATH="$( cd "$(dirname "$0")" ; pwd -P )" docker kill influxdb || true @@ -42,8 +44,26 @@ docker rm influxdb-secured || true docker kill influxdb_v2 || true docker rm influxdb_v2 || true +docker kill httpbin || true +docker rm httpbin || true + docker pull ${INFLUXDB_IMAGE} || true docker pull ${INFLUXDB_V2_IMAGE} || true +docker pull ${HTTPBIN_IMAGE} || true + +# +# Testing helper service +# +echo +echo "Starting httpbin service ..." +echo + +docker run \ + --detach \ + --name httpbin \ + --publish 8080:8080 \ + --env=MAX_DURATION=60s \ + ${HTTPBIN_IMAGE} echo echo "Starting unsecured InfluxDB..." @@ -87,6 +107,7 @@ docker run \ --env INFLUXD_HTTP_BIND_ADDRESS=:9999 \ --name influxdb_v2 \ --link=influxdb \ + --link httpbin \ --publish 9999:9999 \ ${INFLUXDB_V2_IMAGE}