From 7d33f2120dcf5a5e9c0b1ad769837e974ba66341 Mon Sep 17 00:00:00 2001 From: Chris Sampson Date: Tue, 31 Dec 2024 21:09:37 +0000 Subject: [PATCH] NIFI-14104 allow setting of HTTP Request Headers for Elasticsearch requests --- .../ElasticSearchClientService.java | 52 +++-- .../ElasticSearchClientServiceImpl.java | 90 ++++---- .../ElasticSearchLookupService.java | 4 +- .../ElasticSearchStringLookupService.java | 2 +- .../TestElasticSearchClientService.java | 35 +-- .../integration/AbstractElasticsearch_IT.java | 2 +- .../ElasticSearchClientService_IT.java | 202 ++++++++++-------- .../AbstractByQueryElasticsearch.java | 5 +- ...stractPaginatedJsonQueryElasticsearch.java | 24 ++- .../elasticsearch/ConsumeElasticsearch.java | 26 ++- .../DeleteByQueryElasticsearch.java | 27 ++- .../ElasticsearchRestProcessor.java | 33 ++- .../elasticsearch/GetElasticsearch.java | 27 ++- .../elasticsearch/JsonQueryElasticsearch.java | 25 ++- .../PaginatedJsonQueryElasticsearch.java | 26 ++- .../elasticsearch/PutElasticsearchJson.java | 15 +- .../elasticsearch/PutElasticsearchRecord.java | 30 ++- .../elasticsearch/SearchElasticsearch.java | 26 ++- .../UpdateByQueryElasticsearch.java | 27 ++- .../AbstractByQueryElasticsearchTest.java | 16 +- .../AbstractJsonQueryElasticsearchTest.java | 5 + .../elasticsearch/GetElasticsearchTest.java | 5 + .../PutElasticsearchJsonTest.java | 16 +- .../PutElasticsearchRecordTest.java | 14 +- .../TestElasticsearchClientService.java | 61 +++--- .../integration/AbstractElasticsearch_IT.java | 2 +- .../mock/AbstractMockElasticsearchClient.java | 33 +-- .../mock/MockBulkLoadClientService.java | 11 +- .../AbstractElasticsearchITBase.java | 2 +- 29 files changed, 538 insertions(+), 305 deletions(-) diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java index 77e819332a48..5511da1fe221 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java @@ -262,18 +262,20 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl * * @param operation A document to index. * @param requestParameters A collection of URL request parameters. Optional. + * @param requestHeaders A collection of request headers. Optional. * @return IndexOperationResponse if successful */ - IndexOperationResponse add(IndexOperationRequest operation, Map requestParameters); + IndexOperationResponse add(IndexOperationRequest operation, Map requestParameters, Map requestHeaders); /** * Bulk process multiple documents. * * @param operations A list of index operations. * @param requestParameters A collection of URL request parameters. Optional. + * @param requestHeaders A collection of request headers. Optional. * @return IndexOperationResponse if successful. */ - IndexOperationResponse bulk(List operations, Map requestParameters); + IndexOperationResponse bulk(List operations, Map requestParameters, Map requestHeaders); /** * Count the documents that match the criteria. @@ -282,9 +284,10 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl * @param index The index to target. * @param type The type to target. Will not be used in future versions of Elasticsearch. * @param requestParameters A collection of URL request parameters. Optional. + * @param requestHeaders A collection of request headers. Optional. * @return number of documents matching the query */ - Long count(String query, String index, String type, Map requestParameters); + Long count(String query, String index, String type, Map requestParameters, Map requestHeaders); /** * Delete a document by its ID from an index. @@ -293,9 +296,10 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl * @param type The type to target. Optional. Will not be used in future versions of Elasticsearch. * @param id The document ID to remove from the selected index. * @param requestParameters A collection of URL request parameters. Optional. + * @param requestHeaders A collection of request headers. Optional. * @return A DeleteOperationResponse object if successful. */ - DeleteOperationResponse deleteById(String index, String type, String id, Map requestParameters); + DeleteOperationResponse deleteById(String index, String type, String id, Map requestParameters, Map requestHeaders); /** @@ -304,9 +308,10 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl * @param type The type to target. Optional. Will not be used in future versions of Elasticsearch. * @param ids A list of document IDs to remove from the selected index. * @param requestParameters A collection of URL request parameters. Optional. + * @param requestHeaders A collection of request headers. Optional. * @return A DeleteOperationResponse object if successful. */ - DeleteOperationResponse deleteById(String index, String type, List ids, Map requestParameters); + DeleteOperationResponse deleteById(String index, String type, List ids, Map requestParameters, Map requestHeaders); /** * Delete documents by query. @@ -315,9 +320,10 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl * @param index The index to target. * @param type The type to target within the index. Optional. Will not be used in future versions of Elasticsearch. * @param requestParameters A collection of URL request parameters. Optional. + * @param requestHeaders A collection of request headers. Optional. * @return A DeleteOperationResponse object if successful. */ - DeleteOperationResponse deleteByQuery(String query, String index, String type, Map requestParameters); + DeleteOperationResponse deleteByQuery(String query, String index, String type, Map requestParameters, Map requestHeaders); /** * Update documents by query. @@ -326,25 +332,29 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl * @param index The index to target. * @param type The type to target within the index. Optional. Will not be used in future versions of Elasticsearch. * @param requestParameters A collection of URL request parameters. Optional. + * @param requestHeaders A collection of request headers. Optional. * @return An UpdateOperationResponse object if successful. */ - UpdateOperationResponse updateByQuery(String query, String index, String type, Map requestParameters); + UpdateOperationResponse updateByQuery(String query, String index, String type, Map requestParameters, Map requestHeaders); /** * Refresh index/indices. * * @param index The index to target, if omitted then all indices will be updated. * @param requestParameters A collection of URL request parameters. Optional. + * @param requestHeaders A collection of request headers. Optional. */ - void refresh(final String index, final Map requestParameters); + void refresh(final String index, final Map requestParameters, Map requestHeaders); /** * Check whether an index exists. * * @param index The index to check. * @param requestParameters A collection of URL request parameters. Optional. + * @param requestHeaders A collection of request headers. Optional. + * @return true if index exists, false otherwise */ - boolean exists(final String index, final Map requestParameters); + boolean exists(final String index, final Map requestParameters, Map requestHeaders); /** * Check whether a document exists. @@ -353,8 +363,10 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl * @param type The document type. Optional. Will not be used in future versions of Elasticsearch. * @param id The document ID * @param requestParameters A collection of URL request parameters. Optional. + * @param requestHeaders A collection of request headers. Optional. + * @return true if doc exists in index, false otherwise */ - boolean documentExists(final String index, final String type, final String id, final Map requestParameters); + boolean documentExists(final String index, final String type, final String id, final Map requestParameters, Map requestHeaders); /** * Get a document by ID. @@ -363,28 +375,31 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl * @param type The document type. Optional. Will not be used in future versions of Elasticsearch. * @param id The document ID * @param requestParameters A collection of URL request parameters. Optional. + * @param requestHeaders A collection of request headers. Optional. * @return Map if successful, null if not found. */ - Map get(String index, String type, String id, Map requestParameters); + Map get(String index, String type, String id, Map requestParameters, Map requestHeaders); /** * Perform a search using the JSON DSL. * - * @param query A JSON string reprensenting the query. + * @param query A JSON string representing the query. * @param index The index to target. Optional. * @param type The type to target. Optional. Will not be used in future versions of Elasticsearch. * @param requestParameters A collection of URL request parameters. Optional. + * @param requestHeaders A collection of request headers. Optional. * @return A SearchResponse object if successful. */ - SearchResponse search(String query, String index, String type, Map requestParameters); + SearchResponse search(String query, String index, String type, Map requestParameters, Map requestHeaders); /** * Retrieve next page of results from a Scroll. * * @param scroll A JSON string containing scrollId and optional scroll (keep alive) retention period. + * @param requestHeaders A collection of request headers. Optional. * @return A SearchResponse object if successful. */ - SearchResponse scroll(String scroll); + SearchResponse scroll(String scroll, Map requestHeaders); /** * Initialise a Point in Time for paginated queries. @@ -392,26 +407,29 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl * * @param index Index targeted. * @param keepAlive Point in Time's retention period (maximum time Elasticsearch will retain the PiT between requests). Optional. + * @param requestHeaders A collection of request headers. Optional. * @return the Point in Time Id (pit_id) */ - String initialisePointInTime(String index, String keepAlive); + String initialisePointInTime(String index, String keepAlive, Map requestHeaders); /** * Delete a Point in Time. * Requires Elasticsearch 7.10+ and XPack features. * * @param pitId Point in Time Id to be deleted. + * @param requestHeaders A collection of request headers. Optional. * @return A DeleteOperationResponse object if successful. */ - DeleteOperationResponse deletePointInTime(String pitId); + DeleteOperationResponse deletePointInTime(String pitId, Map requestHeaders); /** * Delete a Scroll. * * @param scrollId Scroll Id to be deleted. + * @param requestHeaders A collection of request headers. Optional. * @return A DeleteOperationResponse object if successful. */ - DeleteOperationResponse deleteScroll(String scrollId); + DeleteOperationResponse deleteScroll(String scrollId, Map requestHeaders); /** * Build a transit URL to use with the provenance reporter. diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java index 8169a38f0636..ffbba1baa82b 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java @@ -56,6 +56,7 @@ import org.elasticsearch.client.Node; import org.elasticsearch.client.NodeSelector; import org.elasticsearch.client.Request; +import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; @@ -187,7 +188,7 @@ public void onEnabled(final ConfigurationContext context) throws InitializationE try { this.client = setupClient(context); this.sniffer = setupSniffer(context, this.client); - responseCharset = Charset.forName(context.getProperty(CHARSET).getValue()); + this.responseCharset = Charset.forName(context.getProperty(CHARSET).getValue()); // re-create the ObjectMapper in case the SUPPRESS_NULLS property has changed - the JsonInclude settings aren't dynamic createObjectMapper(context); @@ -537,7 +538,7 @@ private void appendIndex(final StringBuilder sb, final String index) { } } - private Response runQuery(final String endpoint, final String query, final String index, final String type, final Map requestParameters) { + private Response runQuery(final String endpoint, final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { final StringBuilder sb = new StringBuilder(); appendIndex(sb, index); if (StringUtils.isNotBlank(type)) { @@ -547,7 +548,7 @@ private Response runQuery(final String endpoint, final String query, final Strin try { final HttpEntity queryEntity = new NStringEntity(query, ContentType.APPLICATION_JSON); - return performRequest("POST", sb.toString(), requestParameters, queryEntity); + return performRequest("POST", sb.toString(), requestParameters, requestHeaders, queryEntity); } catch (final Exception e) { throw new ElasticsearchException(e); } @@ -585,8 +586,8 @@ private List parseResponseWarningHeaders(final Response response) { } @Override - public IndexOperationResponse add(final IndexOperationRequest operation, final Map requestParameters) { - return bulk(Collections.singletonList(operation), requestParameters); + public IndexOperationResponse add(final IndexOperationRequest operation, final Map requestParameters, final Map requestHeaders) { + return bulk(Collections.singletonList(operation), requestParameters, requestHeaders); } private String flatten(final String str) { @@ -656,7 +657,7 @@ protected void buildRequest(final IndexOperationRequest request, final StringBui } @Override - public IndexOperationResponse bulk(final List operations, final Map requestParameters) { + public IndexOperationResponse bulk(final List operations, final Map requestParameters, final Map requestHeaders) { try { final StringBuilder payload = new StringBuilder(); for (final IndexOperationRequest or : operations) { @@ -669,7 +670,7 @@ public IndexOperationResponse bulk(final List operations, final HttpEntity entity = new NStringEntity(payload.toString(), ContentType.APPLICATION_JSON); final StopWatch watch = new StopWatch(); watch.start(); - final Response response = performRequest("POST", "/_bulk", requestParameters, entity); + final Response response = performRequest("POST", "/_bulk", requestParameters, requestHeaders, entity); watch.stop(); final String rawResponse = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8); @@ -686,20 +687,20 @@ public IndexOperationResponse bulk(final List operations, } @Override - public Long count(final String query, final String index, final String type, final Map requestParameters) { - final Response response = runQuery("_count", query, index, type, requestParameters); + public Long count(final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { + final Response response = runQuery("_count", query, index, type, requestParameters, requestHeaders); final Map parsed = parseResponse(response); return ((Integer) parsed.get("count")).longValue(); } @Override - public DeleteOperationResponse deleteById(final String index, final String type, final String id, final Map requestParameters) { - return deleteById(index, type, Collections.singletonList(id), requestParameters); + public DeleteOperationResponse deleteById(final String index, final String type, final String id, final Map requestParameters, final Map requestHeaders) { + return deleteById(index, type, Collections.singletonList(id), requestParameters, requestHeaders); } @Override - public DeleteOperationResponse deleteById(final String index, final String type, final List ids, final Map requestParameters) { + public DeleteOperationResponse deleteById(final String index, final String type, final List ids, final Map requestParameters, final Map requestHeaders) { try { final StringBuilder sb = new StringBuilder(); for (final String id : ids) { @@ -709,7 +710,7 @@ public DeleteOperationResponse deleteById(final String index, final String type, final HttpEntity entity = new NStringEntity(sb.toString(), ContentType.APPLICATION_JSON); final StopWatch watch = new StopWatch(); watch.start(); - final Response response = performRequest("POST", "/_bulk", requestParameters, entity); + final Response response = performRequest("POST", "/_bulk", requestParameters, requestHeaders, entity); watch.stop(); if (getLogger().isDebugEnabled()) { @@ -724,10 +725,10 @@ public DeleteOperationResponse deleteById(final String index, final String type, } @Override - public DeleteOperationResponse deleteByQuery(final String query, final String index, final String type, final Map requestParameters) { + public DeleteOperationResponse deleteByQuery(final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { final StopWatch watch = new StopWatch(); watch.start(); - final Response response = runQuery("_delete_by_query", query, index, type, requestParameters); + final Response response = runQuery("_delete_by_query", query, index, type, requestParameters, requestHeaders); watch.stop(); // check for errors in response @@ -738,9 +739,9 @@ public DeleteOperationResponse deleteByQuery(final String query, final String in } @Override - public UpdateOperationResponse updateByQuery(final String query, final String index, final String type, final Map requestParameters) { + public UpdateOperationResponse updateByQuery(final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { final long start = System.currentTimeMillis(); - final Response response = runQuery("_update_by_query", query, index, type, requestParameters); + final Response response = runQuery("_update_by_query", query, index, type, requestParameters, requestHeaders); final long end = System.currentTimeMillis(); // check for errors in response @@ -750,12 +751,12 @@ public UpdateOperationResponse updateByQuery(final String query, final String in } @Override - public void refresh(final String index, final Map requestParameters) { + public void refresh(final String index, final Map requestParameters, final Map requestHeaders) { try { final StringBuilder endpoint = new StringBuilder(); appendIndex(endpoint, index); endpoint.append("/_refresh"); - final Response response = performRequest("POST", endpoint.toString(), requestParameters, null); + final Response response = performRequest("POST", endpoint.toString(), requestParameters, requestHeaders, null); parseResponseWarningHeaders(response); } catch (final Exception ex) { throw new ElasticsearchException(ex); @@ -763,11 +764,11 @@ public void refresh(final String index, final Map requestParamet } @Override - public boolean exists(final String index, final Map requestParameters) { + public boolean exists(final String index, final Map requestParameters, final Map requestHeaders) { try { final StringBuilder endpoint = new StringBuilder(); appendIndex(endpoint, index); - final Response response = performRequest("HEAD", endpoint.toString(), requestParameters, null); + final Response response = performRequest("HEAD", endpoint.toString(), requestParameters, requestHeaders, null); parseResponseWarningHeaders(response); if (response.getStatusLine().getStatusCode() == 200) { @@ -785,12 +786,12 @@ public boolean exists(final String index, final Map requestParam } @Override - public boolean documentExists(final String index, final String type, final String id, final Map requestParameters) { + public boolean documentExists(final String index, final String type, final String id, final Map requestParameters, final Map requestHeaders) { boolean exists = true; try { final Map existsParameters = requestParameters != null ? new HashMap<>(requestParameters) : new HashMap<>(); existsParameters.putIfAbsent("_source", "false"); - get(index, type, id, existsParameters); + get(index, type, id, existsParameters, requestHeaders); } catch (final ElasticsearchException ee) { if (ee.isNotFound()) { exists = false; @@ -803,7 +804,7 @@ public boolean documentExists(final String index, final String type, final Strin @SuppressWarnings("unchecked") @Override - public Map get(final String index, final String type, final String id, final Map requestParameters) { + public Map get(final String index, final String type, final String id, final Map requestParameters, final Map requestHeaders) { try { final StringBuilder endpoint = new StringBuilder(); appendIndex(endpoint, index); @@ -814,7 +815,7 @@ public Map get(final String index, final String type, final Stri } endpoint.append("/").append(id); - final Response response = performRequest("GET", endpoint.toString(), requestParameters, null); + final Response response = performRequest("GET", endpoint.toString(), requestParameters, requestHeaders, null); final String body = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8); parseResponseWarningHeaders(response); @@ -839,20 +840,22 @@ private int handleSearchCount(final Object raw) { } @Override - public SearchResponse search(final String query, final String index, final String type, final Map requestParameters) { + public SearchResponse search(final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { try { - final Response response = runQuery("_search", query, index, type, requestParameters); + final Response response = runQuery("_search", query, index, type, requestParameters, requestHeaders); return buildSearchResponse(response); + } catch (final ElasticsearchException ee) { + throw ee; } catch (final Exception ex) { throw new ElasticsearchException(ex); } } @Override - public SearchResponse scroll(final String scroll) { + public SearchResponse scroll(final String scroll, final Map requestHeaders) { try { final HttpEntity scrollEntity = new NStringEntity(scroll, ContentType.APPLICATION_JSON); - final Response response = performRequest("POST", "/_search/scroll", Collections.emptyMap(), scrollEntity); + final Response response = performRequest("POST", "/_search/scroll", Collections.emptyMap(), requestHeaders, scrollEntity); return buildSearchResponse(response); } catch (final Exception ex) { throw new ElasticsearchException(ex); @@ -860,7 +863,7 @@ public SearchResponse scroll(final String scroll) { } @Override - public String initialisePointInTime(final String index, final String keepAlive) { + public String initialisePointInTime(final String index, final String keepAlive, final Map requestHeaders) { try { final Map params = new HashMap<>() {{ if (StringUtils.isNotBlank(keepAlive)) { @@ -870,7 +873,7 @@ public String initialisePointInTime(final String index, final String keepAlive) final StringBuilder endpoint = new StringBuilder(); appendIndex(endpoint, index); endpoint.append("/_pit"); - final Response response = performRequest("POST", endpoint.toString(), params, null); + final Response response = performRequest("POST", endpoint.toString(), params, requestHeaders, null); final String body = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8); parseResponseWarningHeaders(response); @@ -885,12 +888,12 @@ public String initialisePointInTime(final String index, final String keepAlive) } @Override - public DeleteOperationResponse deletePointInTime(final String pitId) { + public DeleteOperationResponse deletePointInTime(final String pitId, final Map requestHeaders) { try { final HttpEntity pitEntity = new NStringEntity(String.format("{\"id\": \"%s\"}", pitId), ContentType.APPLICATION_JSON); final StopWatch watch = new StopWatch(true); - final Response response = performRequest("DELETE", "/_pit", Collections.emptyMap(), pitEntity); + final Response response = performRequest("DELETE", "/_pit", Collections.emptyMap(), requestHeaders, pitEntity); watch.stop(); if (getLogger().isDebugEnabled()) { @@ -911,12 +914,12 @@ public DeleteOperationResponse deletePointInTime(final String pitId) { } @Override - public DeleteOperationResponse deleteScroll(final String scrollId) { + public DeleteOperationResponse deleteScroll(final String scrollId, final Map requestHeaders) { try { final HttpEntity scrollBody = new NStringEntity(String.format("{\"scroll_id\": \"%s\"}", scrollId), ContentType.APPLICATION_JSON); final StopWatch watch = new StopWatch(true); - final Response response = performRequest("DELETE", "/_search/scroll", Collections.emptyMap(), scrollBody); + final Response response = performRequest("DELETE", "/_search/scroll", Collections.emptyMap(), requestHeaders, scrollBody); watch.stop(); if (getLogger().isDebugEnabled()) { @@ -995,8 +998,18 @@ public String getTransitUrl(final String index, final String type) { return transitUrl.toString(); } - private Response performRequest(final String method, final String endpoint, final Map parameters, final HttpEntity entity) throws IOException { - final Request request = new Request(method, endpoint); + private Request addRequestHeaders(final Request request, final Map headers) { + if (headers != null && !headers.isEmpty()) { + final RequestOptions.Builder requestOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + headers.forEach(requestOptionsBuilder::addHeader); + request.setOptions(requestOptionsBuilder.build()); + } + return request; + } + + private Response performRequest(final String method, final String endpoint, final Map parameters, final Map headers, final HttpEntity entity) throws IOException { + final Request baseRequest = new Request(method, endpoint); + final Request request = addRequestHeaders(baseRequest, headers); if (parameters != null && !parameters.isEmpty()) { request.addParameters(parameters); } @@ -1015,6 +1028,9 @@ private Response performRequest(final String method, final String endpoint, fina .append("\n") .append("Parameters: ") .append(prettyPrintWriter.writeValueAsString(parameters)) + .append("\n") + .append("Request Headers: ") + .append(prettyPrintWriter.writeValueAsString(headers)) .append("\n"); if (entity != null) { diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java index 38b39975628b..852993a2ba3b 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java @@ -201,7 +201,7 @@ private Record getById(final String _id, final Map context) thro final String json = mapper.writeValueAsString(query); - final SearchResponse response = clientService.search(json, index, type, null); + final SearchResponse response = clientService.search(json, index, type, null, null); if (response.getNumberOfHits() > 1) { throw new LookupFailureException(String.format("Expected 1 response, got %d for query %s", response.getNumberOfHits(), json)); @@ -252,7 +252,7 @@ private Record getByQuery(final Map query, final Map lookup(final Map coordinates) throws LookupFailureException { try { final String id = (String) coordinates.get(ID); - final Map enums = esClient.get(index, type, id, null); + final Map enums = esClient.get(index, type, id, null, null); if (enums == null) { return Optional.empty(); } else { diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestElasticSearchClientService.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestElasticSearchClientService.java index 749cadc6273f..f47df165a167 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestElasticSearchClientService.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/TestElasticSearchClientService.java @@ -44,61 +44,62 @@ public List verify(ConfigurationContext context, Compo } @Override - public IndexOperationResponse add(IndexOperationRequest operation, Map requestParameters) { + public IndexOperationResponse add(IndexOperationRequest operation, Map requestParameters, Map requestHeaders) { return null; } @Override - public IndexOperationResponse bulk(List operations, Map requestParameters) { + public IndexOperationResponse bulk(List operations, Map requestParameters, Map requestHeaders) { return null; } @Override - public Long count(String query, String index, String type, Map requestParameters) { + public Long count(String query, String index, String type, Map requestParameters, Map requestHeaders) { return null; } @Override - public DeleteOperationResponse deleteById(String index, String type, String id, Map requestParameters) { + public DeleteOperationResponse deleteById(String index, String type, String id, Map requestParameters, Map requestHeaders) { return null; } @Override - public DeleteOperationResponse deleteById(String index, String type, List ids, Map requestParameters) { + public DeleteOperationResponse deleteById(String index, String type, List ids, Map requestParameters, Map requestHeaders) { return null; } @Override - public DeleteOperationResponse deleteByQuery(String query, String index, String type, Map requestParameters) { + public DeleteOperationResponse deleteByQuery(String query, String index, String type, Map requestParameters, Map requestHeaders) { return null; } @Override - public UpdateOperationResponse updateByQuery(String query, String index, String type, Map requestParameters) { + public UpdateOperationResponse updateByQuery(String query, String index, String type, Map requestParameters, Map requestHeaders) { return null; } @Override - public void refresh(final String index, final Map requestParameters) { + public void refresh(final String index, final Map requestParameters, Map requestHeaders) { + // intentionally blank } @Override - public boolean exists(final String index, final Map requestParameters) { + public boolean exists(final String index, final Map requestParameters, Map requestHeaders) { return true; } @Override - public boolean documentExists(String index, String type, String id, Map requestParameters) { + public boolean documentExists(String index, String type, String id, Map requestParameters, Map requestHeaders) { return true; } @Override - public Map get(String index, String type, String id, Map requestParameters) { + public Map get(String index, String type, String id, Map requestParameters, Map requestHeaders) { return data; } @Override - public SearchResponse search(String query, String index, String type, Map requestParameters) { + public SearchResponse search(String query, String index, String type, Map requestParameters, Map requestHeaders) { List> hits = new ArrayList<>(); Map source = new HashMap<>(); source.put("_source", data); @@ -108,22 +109,22 @@ public SearchResponse search(String query, String index, String type, Map requestHeaders) { + return search(null, null, null, null, requestHeaders); } @Override - public String initialisePointInTime(String index, String keepAlive) { + public String initialisePointInTime(String index, String keepAlive, Map requestHeaders) { return null; } @Override - public DeleteOperationResponse deletePointInTime(String pitId) { + public DeleteOperationResponse deletePointInTime(String pitId, Map requestHeaders) { return null; } @Override - public DeleteOperationResponse deleteScroll(String scrollId) { + public DeleteOperationResponse deleteScroll(String scrollId, Map requestHeaders) { return null; } diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java index 064e88ce6bb4..d368fb721e70 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearch_IT.java @@ -59,7 +59,7 @@ void before() throws Exception { runner.enableControllerService(service); - service.refresh(null, null); + service.refresh(null, null, null); } @AfterAll diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java index 2e7922084719..10c428bc0150 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java @@ -222,15 +222,20 @@ void testVerifyFailedApiKeyAuth() { @Test void testBasicSearch() throws Exception { - assertBasicSearch(null); + assertBasicSearch(null, null); } @Test void testBasicSearchRequestParameters() throws Exception { - assertBasicSearch(createParameters("preference", "_local")); + assertBasicSearch(Map.of("preference", "_local"), null); } - private void assertBasicSearch(final Map requestParameters) throws JsonProcessingException { + @Test + void testBasicSearchRequestHeaders() throws Exception { + assertBasicSearch(null, Map.of("Accept", "application/json")); + } + + private void assertBasicSearch(final Map requestParameters, final Map requestHeaders) throws JsonProcessingException { final Map temp = new MapBuilder() .of("size", 10, "query", new MapBuilder().of("match_all", new HashMap<>()).build(), "aggs", new MapBuilder() @@ -244,7 +249,7 @@ private void assertBasicSearch(final Map requestParameters) thro final String query = prettyJson(temp); - final SearchResponse response = service.search(query, "messages", type, requestParameters); + final SearchResponse response = service.search(query, "messages", type, requestParameters, requestHeaders); assertNotNull(response, "Response was null"); assertEquals(15, response.getNumberOfHits(), "Wrong count"); @@ -263,13 +268,31 @@ private void assertBasicSearch(final Map requestParameters) thro "four", 4, "five", 5) .build(); - buckets.forEach( (aggRes) -> { + buckets.forEach(aggRes -> { final String key = (String) aggRes.get("key"); final Integer docCount = (Integer) aggRes.get("doc_count"); assertEquals(expected.get(key), docCount, String.format("%s did not match.", key)); }); } + @Test + void testRunasUserUnauthorized() throws Exception { + final Map temp = new MapBuilder() + .of("size", 0, "query", new MapBuilder().of("match_all", new HashMap<>()).build()).build(); + final String query = prettyJson(temp); + + final Map headers = Map.of("es-security-runas-user", "test-user"); + + + final ElasticsearchException exception = assertThrows(ElasticsearchException.class, + () -> service.search(query, "messages", type, null, headers)); + assertInstanceOf(ResponseException.class, exception.getCause()); + assertEquals(403, ((ResponseException) exception.getCause()).getResponse().getStatusLine().getStatusCode()); + + final String response = new String(((ResponseException) exception.getCause()).getResponse().getEntity().getContent().readAllBytes()); + assertTrue(response.contains("run as [test-user]"), "Unexpected ResponseException: " + response); + } + @SuppressWarnings("unchecked") @Test void testSearchEmptySource() throws Exception { @@ -280,7 +303,8 @@ void testSearchEmptySource() throws Exception { final String query = prettyJson(temp); - final SearchResponse response = service.search(query, "messages", type, createParameters("_source", "not_exists")); + final SearchResponse response = service.search(query, "messages", type, Map.of("_source", "not_exists"), + Map.of("ES-Client-Authentication", "sharedsecret foobar")); assertNotNull(response, "Response was null"); assertNotNull(response.getHits(), "Hits was null"); @@ -300,7 +324,7 @@ void testSearchNoSource() throws Exception { final String query = prettyJson(temp); - final SearchResponse response = service.search(query, "no_source", type, null); + final SearchResponse response = service.search(query, "no_source", type, null, null); assertNotNull(response, "Response was null"); assertNotNull(response.getHits(), "Hits was null"); @@ -321,7 +345,7 @@ void testV6SearchWarnings() throws JsonProcessingException { ).build()) .build()); final String type = "a-type"; - final SearchResponse response = service.search(query, INDEX, type, null); + final SearchResponse response = service.search(query, INDEX, type, null, null); assertFalse(response.getWarnings().isEmpty(), "Missing warnings"); } @@ -332,7 +356,7 @@ void testV7SearchWarnings() throws JsonProcessingException { .of("size", 1, "query", new MapBuilder().of("match_all", new HashMap<>()).build()) .build()); final String type = "a-type"; - final SearchResponse response = service.search(query, INDEX, type, null); + final SearchResponse response = service.search(query, INDEX, type, null, null); assertFalse(response.getWarnings().isEmpty(), "Missing warnings"); } @@ -357,7 +381,7 @@ void testScroll() throws JsonProcessingException { .build()); // initiate the scroll - final SearchResponse response = service.search(query, INDEX, type, Collections.singletonMap("scroll", "10s")); + final SearchResponse response = service.search(query, INDEX, type, Collections.singletonMap("scroll", "10s"), null); assertNotNull(response, "Response was null"); assertEquals(15, response.getNumberOfHits(), "Wrong count"); @@ -376,8 +400,8 @@ void testScroll() throws JsonProcessingException { assertEquals(5, buckets.size(), "Buckets count is wrong"); // scroll the next page - final Map parameters = createParameters("scroll_id", response.getScrollId(), "scroll", "10s"); - final SearchResponse scrollResponse = service.scroll(prettyJson(parameters)); + final Map parameters = Map.of("scroll_id", response.getScrollId(), "scroll", "10s"); + final SearchResponse scrollResponse = service.scroll(prettyJson(parameters), Map.of("Accept", "application/json")); assertNotNull(scrollResponse, "Scroll Response was null"); assertEquals(15, scrollResponse.getNumberOfHits(), "Wrong count"); @@ -393,11 +417,11 @@ void testScroll() throws JsonProcessingException { assertNotEquals(scrollResponse.getHits(), response.getHits(), "Same results"); // delete the scroll - DeleteOperationResponse deleteResponse = service.deleteScroll(scrollResponse.getScrollId()); + DeleteOperationResponse deleteResponse = service.deleteScroll(scrollResponse.getScrollId(), Map.of("Accept", "application/json")); assertNotNull(deleteResponse, "Delete Response was null"); // delete scroll again (should now be unknown but the 404 caught and ignored) - deleteResponse = service.deleteScroll(scrollResponse.getScrollId()); + deleteResponse = service.deleteScroll(scrollResponse.getScrollId(), Map.of("Accept", "application/json")); assertNotNull(deleteResponse, "Delete Response was null"); } @@ -418,7 +442,7 @@ void testSearchAfter() throws JsonProcessingException { final String query = prettyJson(queryMap); // search first page - final SearchResponse response = service.search(query, INDEX, type, null); + final SearchResponse response = service.search(query, INDEX, type, null, null); assertNotNull(response, "Response was null"); assertEquals(15, response.getNumberOfHits(), "Wrong count"); @@ -441,7 +465,7 @@ void testSearchAfter() throws JsonProcessingException { page2QueryMap.put("search_after", MAPPER.readValue(response.getSearchAfter(), List.class)); page2QueryMap.remove("aggs"); final String secondPage = prettyJson(page2QueryMap); - final SearchResponse secondResponse = service.search(secondPage, INDEX, type, null); + final SearchResponse secondResponse = service.search(secondPage, INDEX, type, null, null); assertNotNull(secondResponse, "Second Response was null"); assertEquals(15, secondResponse.getNumberOfHits(), "Wrong count"); @@ -465,7 +489,7 @@ void testPointInTime() throws JsonProcessingException { assumeTrue(majorVersion >= 8 || (majorVersion == 7 && minorVersion >= 10), "Requires version 7.10+"); // initialise - final String pitId = service.initialisePointInTime(INDEX, "10s"); + final String pitId = service.initialisePointInTime(INDEX, "10s", Map.of("Accept", "application/json")); final Map queryMap = new MapBuilder() .of("size", 2, "query", new MapBuilder().of("match_all", new HashMap<>()).build()) @@ -484,7 +508,7 @@ void testPointInTime() throws JsonProcessingException { final String query = prettyJson(queryMap); // search first page - final SearchResponse response = service.search(query, null, type, null); + final SearchResponse response = service.search(query, null, type, null, null); assertNotNull(response, "Response was null"); assertEquals(15, response.getNumberOfHits(), "Wrong count"); @@ -507,7 +531,7 @@ void testPointInTime() throws JsonProcessingException { page2QueryMap.put("search_after", MAPPER.readValue(response.getSearchAfter(), List.class)); page2QueryMap.remove("aggs"); final String secondPage = prettyJson(page2QueryMap); - final SearchResponse secondResponse = service.search(secondPage, null, type, null); + final SearchResponse secondResponse = service.search(secondPage, null, type, null, null); assertNotNull(secondResponse, "Second Response was null"); assertEquals(15, secondResponse.getNumberOfHits(), "Wrong count"); @@ -523,11 +547,11 @@ void testPointInTime() throws JsonProcessingException { assertNotEquals(secondResponse.getHits(), response.getHits(), "Same results"); // delete pitId - DeleteOperationResponse deleteResponse = service.deletePointInTime(pitId); + DeleteOperationResponse deleteResponse = service.deletePointInTime(pitId, null); assertNotNull(deleteResponse, "Delete Response was null"); // delete pitId again (should now be unknown but the 404 caught and ignored) - deleteResponse = service.deletePointInTime(pitId); + deleteResponse = service.deletePointInTime(pitId, null); assertNotNull(deleteResponse, "Delete Response was null"); } @@ -537,7 +561,7 @@ void testDeleteByQuery() throws Exception { .of("query", new MapBuilder() .of("match", new MapBuilder().of("msg", "five").build()) .build()).build()); - final DeleteOperationResponse response = service.deleteByQuery(query, INDEX, type, null); + final DeleteOperationResponse response = service.deleteByQuery(query, INDEX, type, null, Map.of("Accept", "application/json")); assertNotNull(response); } @@ -549,7 +573,7 @@ void testDeleteByQueryRequestParameters() throws Exception { .build()).build()); final Map parameters = new HashMap<>(); parameters.put("refresh", "true"); - final DeleteOperationResponse response = service.deleteByQuery(query, INDEX, type, parameters); + final DeleteOperationResponse response = service.deleteByQuery(query, INDEX, type, parameters, Map.of("Accept", "application/json")); assertNotNull(response); } @@ -559,7 +583,7 @@ void testUpdateByQuery() throws Exception { .of("query", new MapBuilder() .of("match", new MapBuilder().of("msg", "four").build()) .build()).build()); - final UpdateOperationResponse response = service.updateByQuery(query, INDEX, type, null); + final UpdateOperationResponse response = service.updateByQuery(query, INDEX, type, null, null); assertNotNull(response); } @@ -572,25 +596,25 @@ void testUpdateByQueryRequestParameters() throws Exception { final Map parameters = new HashMap<>(); parameters.put("refresh", "true"); parameters.put("slices", "1"); - final UpdateOperationResponse response = service.updateByQuery(query, INDEX, type, parameters); + final UpdateOperationResponse response = service.updateByQuery(query, INDEX, type, parameters, Map.of("Accept", "application/json")); assertNotNull(response); } @Test void testDeleteById() throws Exception { final String ID = "1"; - final Map originalDoc = service.get(INDEX, type, ID, null); + final Map originalDoc = service.get(INDEX, type, ID, null, Map.of("Accept", "application/json")); try { - final DeleteOperationResponse response = service.deleteById(INDEX, type, ID, null); + final DeleteOperationResponse response = service.deleteById(INDEX, type, ID, null, null); assertNotNull(response); final ElasticsearchException ee = assertThrows(ElasticsearchException.class, () -> - service.get(INDEX, type, ID, null)); + service.get(INDEX, type, ID, null, null)); assertTrue(ee.isNotFound()); - final Map doc = service.get(INDEX, type, "2", null); + final Map doc = service.get(INDEX, type, "2", null, null); assertNotNull(doc); } finally { // replace the deleted doc - service.add(new IndexOperationRequest(INDEX, type, "1", originalDoc, IndexOperationRequest.Operation.Index, null, false, null, null), null); + service.add(new IndexOperationRequest(INDEX, type, "1", originalDoc, IndexOperationRequest.Operation.Index, null, false, null, null), null, Map.of("Accept", "application/json")); waitForIndexRefresh(); // (affects later tests using _search or _bulk) } } @@ -599,7 +623,7 @@ void testDeleteById() throws Exception { void testGet() { for (int index = 1; index <= 15; index++) { final String id = String.valueOf(index); - final Map doc = service.get(INDEX, type, id, null); + final Map doc = service.get(INDEX, type, id, null, null); assertNotNull(doc, "Doc was null"); assertNotNull(doc.get("msg"), "${doc.toString()}\t${doc.keySet().toString()}"); } @@ -607,27 +631,27 @@ void testGet() { @Test void testGetEmptySource() { - final Map doc = service.get(INDEX, type, "1", Collections.singletonMap("_source", "not_exist")); + final Map doc = service.get(INDEX, type, "1", Collections.singletonMap("_source", "not_exist"), null); assertNotNull(doc, "Doc was null"); assertTrue(doc.isEmpty(), "Doc was not empty"); } @Test void testGetNoSource() { - final Map doc = service.get("no_source", type, "1", null); + final Map doc = service.get("no_source", type, "1", null, Map.of("Accept", "application/json")); assertNotNull(doc, "Doc was null"); assertTrue(doc.isEmpty(), "Doc was not empty"); } @Test void testGetNotFound() { - final ElasticsearchException ee = assertThrows(ElasticsearchException.class, () -> service.get(INDEX, type, "not_found", null)); + final ElasticsearchException ee = assertThrows(ElasticsearchException.class, () -> service.get(INDEX, type, "not_found", null, null)); assertTrue(ee.isNotFound()); } @Test void testExists() { - assertTrue(service.exists(INDEX, null), "index does not exist"); - assertFalse(service.exists("index-does-not-exist", null), "index exists"); + assertTrue(service.exists(INDEX, null, null), "index does not exist"); + assertFalse(service.exists("index-does-not-exist", null, Map.of("Accept", "application/json")), "index exists"); } @Test @@ -637,7 +661,7 @@ void testCompression() { runner.enableControllerService(service); runner.assertValid(service); - assertTrue(service.exists(INDEX, null), "index does not exist"); + assertTrue(service.exists(INDEX, null, null), "index does not exist"); } @Test @@ -647,7 +671,7 @@ void testNoMetaHeader() { runner.enableControllerService(service); runner.assertValid(service); - assertTrue(service.exists(INDEX, null), "index does not exist"); + assertTrue(service.exists(INDEX, null, null), "index does not exist"); } @Test @@ -657,7 +681,7 @@ void testStrictDeprecation() { runner.enableControllerService(service); runner.assertValid(service); - assertTrue(service.exists(INDEX, null), "index does not exist"); + assertTrue(service.exists(INDEX, null, null), "index does not exist"); } @Test @@ -667,7 +691,7 @@ void testNodeSelector() { runner.enableControllerService(service); runner.assertValid(service); - assertTrue(service.exists(INDEX, null), "index does not exist"); + assertTrue(service.exists(INDEX, null, null), "index does not exist"); } @Test @@ -678,7 +702,7 @@ void testRestClientRequestHeaders() { runner.enableControllerService(service); runner.assertValid(service); - assertTrue(service.exists(INDEX, null), "index does not exist"); + assertTrue(service.exists(INDEX, null, null), "index does not exist"); } @Test @@ -692,7 +716,7 @@ void testSniffer() { runner.enableControllerService(service); runner.assertValid(service); - assertTrue(service.exists(INDEX, null), "index does not exist"); + assertTrue(service.exists(INDEX, null, null), "index does not exist"); } @Test @@ -711,21 +735,22 @@ void testNullSuppression() throws InterruptedException { Collections.singletonList( new IndexOperationRequest("nulls", type, "1", doc, IndexOperationRequest.Operation.Index, null, false, null, null) ), - null + null, + Map.of("Accept", "application/json") ); assertNotNull(response); waitForIndexRefresh(); - Map result = service.get("nulls", type, "1", null); + Map result = service.get("nulls", type, "1", null, null); assertEquals(doc, result); // suppress nulls suppressNulls(true); - response = service.bulk(Collections.singletonList(new IndexOperationRequest("nulls", type, "2", doc, IndexOperationRequest.Operation.Index, null, false, null, null)), null); + response = service.bulk(Collections.singletonList(new IndexOperationRequest("nulls", type, "2", doc, IndexOperationRequest.Operation.Index, null, false, null, null)), null, null); assertNotNull(response); waitForIndexRefresh(); - result = service.get("nulls", type, "2", null); + result = service.get("nulls", type, "2", null, null); assertTrue(result.keySet().containsAll(Arrays.asList("msg", "is_blank")), "Non-nulls (present): " + result); assertFalse(result.containsKey("is_null"), "is_null (should be omitted): " + result); assertFalse(result.containsKey("is_empty"), "is_empty (should be omitted): " + result); @@ -756,7 +781,7 @@ void testBulkAddTwoIndexes() throws Exception { put("msg", "test"); }}, IndexOperationRequest.Operation.Index, null, false, null, null)); } - final IndexOperationResponse response = service.bulk(payload, createParameters("refresh", "true")); + final IndexOperationResponse response = service.bulk(payload, Map.of("refresh", "true"), null); assertNotNull(response); waitForIndexRefresh(); @@ -764,9 +789,9 @@ void testBulkAddTwoIndexes() throws Exception { * Now, check to ensure that both indexes got populated appropriately. */ final String query = "{ \"query\": { \"match_all\": {}}}"; - final Long indexA = service.count(query, "bulk_a", type, null); - final Long indexB = service.count(query, "bulk_b", type, null); - final Long indexC = service.count(query, "bulk_c", type, null); + final Long indexA = service.count(query, "bulk_a", type, null, Map.of("Accept", "application/json")); + final Long indexB = service.count(query, "bulk_b", type, null, null); + final Long indexC = service.count(query, "bulk_c", type, null, null); assertNotNull(indexA); assertNotNull(indexB); @@ -776,7 +801,7 @@ void testBulkAddTwoIndexes() throws Exception { assertEquals(10, indexB.intValue()); assertEquals(5, indexC.intValue()); - final Long total = service.count(query, "bulk_a,bulk_b,bulk_c", type, null); + final Long total = service.count(query, "bulk_a,bulk_b,bulk_c", type, null, null); assertNotNull(total); assertEquals(25, total.intValue()); } @@ -793,16 +818,16 @@ void testBulkRequestParametersAndBulkHeaders() { payload.add(new IndexOperationRequest("bulk_c", type, String.valueOf(x), new MapBuilder().of("msg", "test").build(), IndexOperationRequest.Operation.Index, null, false, null, null)); } - final IndexOperationResponse response = service.bulk(payload, createParameters("refresh", "true")); + final IndexOperationResponse response = service.bulk(payload, Map.of("refresh", "true"), null); assertNotNull(response); /* * Now, check to ensure that all indices got populated and refreshed appropriately. */ final String query = "{ \"query\": { \"match_all\": {}}}"; - final Long indexA = service.count(query, "bulk_a", type, null); - final Long indexB = service.count(query, "bulk_b", type, null); - final Long indexC = service.count(query, "bulk_c", type, null); + final Long indexA = service.count(query, "bulk_a", type, null, null); + final Long indexB = service.count(query, "bulk_b", type, null, null); + final Long indexC = service.count(query, "bulk_c", type, null, null); assertNotNull(indexA); assertNotNull(indexB); @@ -812,7 +837,7 @@ void testBulkRequestParametersAndBulkHeaders() { assertEquals(10, indexB.intValue()); assertEquals(5, indexC.intValue()); - final Long total = service.count(query, "bulk_*", type, null); + final Long total = service.count(query, "bulk_*", type, null, null); assertNotNull(total); assertEquals(25, total.intValue()); } @@ -821,7 +846,7 @@ void testBulkRequestParametersAndBulkHeaders() { void testUnknownBulkHeader() { final IndexOperationRequest failingRequest = new IndexOperationRequest("bulk_c", type, "1", new MapBuilder().of("msg", "test").build(), IndexOperationRequest.Operation.Index, null, false, null, Collections.singletonMap("not_exist", "true")); - final ElasticsearchException ee = assertThrows(ElasticsearchException.class, () -> service.add(failingRequest, null)); + final ElasticsearchException ee = assertThrows(ElasticsearchException.class, () -> service.add(failingRequest, null, null)); assertInstanceOf(ResponseException.class, ee.getCause()); assertTrue(ee.getCause().getMessage().contains("Action/metadata line [1] contains an unknown parameter [not_exist]")); } @@ -835,7 +860,7 @@ void testDynamicTemplates() { IndexOperationRequest.Operation.Index, null, false, new MapBuilder().of("hello", "test_text").build(), null) ); - final IndexOperationResponse response = service.bulk(payload, createParameters("refresh", "true")); + final IndexOperationResponse response = service.bulk(payload, Map.of("refresh", "true"), null); assertNotNull(response); /* @@ -856,8 +881,8 @@ void testUpdateAndUpsert() throws InterruptedException { final Map doc = new HashMap<>(); doc.put("msg", "Buongiorno, mondo"); doc.put("counter", 1); - service.add(new IndexOperationRequest(INDEX, type, TEST_ID, doc, IndexOperationRequest.Operation.Index, null, false, null, null), createParameters("refresh", "true")); - Map result = service.get(INDEX, type, TEST_ID, null); + service.add(new IndexOperationRequest(INDEX, type, TEST_ID, doc, IndexOperationRequest.Operation.Index, null, false, null, null), Map.of("refresh", "true"), null); + Map result = service.get(INDEX, type, TEST_ID, null, null); assertEquals(doc, result, "Not the same"); final Map updates = new HashMap<>(); @@ -866,8 +891,8 @@ void testUpdateAndUpsert() throws InterruptedException { merged.putAll(updates); merged.putAll(doc); IndexOperationRequest request = new IndexOperationRequest(INDEX, type, TEST_ID, updates, IndexOperationRequest.Operation.Update, null, false, null, null); - service.add(request, createParameters("refresh", "true")); - result = service.get(INDEX, type, TEST_ID, null); + service.add(request, Map.of("refresh", "true"), null); + result = service.get(INDEX, type, TEST_ID, null, null); assertTrue(result.containsKey("from")); assertTrue(result.containsKey("counter")); assertTrue(result.containsKey("msg")); @@ -878,8 +903,8 @@ void testUpdateAndUpsert() throws InterruptedException { upsertItems.put("upsert_2", 1); upsertItems.put("upsert_3", true); request = new IndexOperationRequest(INDEX, type, UPSERTED_ID, upsertItems, IndexOperationRequest.Operation.Upsert, null, false, null, null); - service.add(request, createParameters("refresh", "true")); - result = service.get(INDEX, type, UPSERTED_ID, null); + service.add(request, Map.of("refresh", "true"), null); + result = service.get(INDEX, type, UPSERTED_ID, null, null); assertEquals(upsertItems, result); final Map upsertDoc = new HashMap<>(); @@ -890,14 +915,14 @@ void testUpdateAndUpsert() throws InterruptedException { script.put("params", Collections.singletonMap("count", 2)); // apply script to existing document request = new IndexOperationRequest(INDEX, type, TEST_ID, upsertDoc, IndexOperationRequest.Operation.Upsert, script, false, null, null); - service.add(request, createParameters("refresh", "true")); - result = service.get(INDEX, type, TEST_ID, null); + service.add(request, Map.of("refresh", "true"), null); + result = service.get(INDEX, type, TEST_ID, null, null); assertEquals(doc.get("msg"), result.get("msg")); assertEquals(3, result.get("counter")); // index document that doesn't already exist (don't apply script) request = new IndexOperationRequest(INDEX, type, UPSERT_SCRIPT_ID, upsertDoc, IndexOperationRequest.Operation.Upsert, script, false, null, null); - service.add(request, createParameters("refresh", "true")); - result = service.get(INDEX, type, UPSERT_SCRIPT_ID, null); + service.add(request, Map.of("refresh", "true"), null); + result = service.get(INDEX, type, UPSERT_SCRIPT_ID, null, null); assertNull(result.get("counter")); assertEquals(upsertDoc, result); @@ -908,23 +933,23 @@ void testUpdateAndUpsert() throws InterruptedException { upsertScript.put("params", Collections.singletonMap("count", 2)); // no script execution if doc found (without scripted_upsert) request = new IndexOperationRequest(INDEX, type, SCRIPTED_UPSERT_ID, emptyUpsertDoc, IndexOperationRequest.Operation.Upsert, upsertScript, false, null, null); - service.add(request, createParameters("refresh", "true")); - assertFalse(service.documentExists(INDEX, type, SCRIPTED_UPSERT_ID, null)); + service.add(request, Map.of("refresh", "true"), null); + assertFalse(service.documentExists(INDEX, type, SCRIPTED_UPSERT_ID, null, Map.of("Accept", "application/json"))); // script execution with no doc found (with scripted_upsert) - doc not create, no "upsert" doc provided (empty objects suppressed) suppressNulls(true); request = new IndexOperationRequest(INDEX, type, SCRIPTED_UPSERT_ID, emptyUpsertDoc, IndexOperationRequest.Operation.Upsert, upsertScript, true, null, null); - service.add(request, createParameters("refresh", "true")); - assertFalse(service.documentExists(INDEX, type, SCRIPTED_UPSERT_ID, null)); + service.add(request, Map.of("refresh", "true"), null); + assertFalse(service.documentExists(INDEX, type, SCRIPTED_UPSERT_ID, null, null)); // script execution with no doc found (with scripted_upsert) - doc created, empty "upsert" doc provided suppressNulls(false); request = new IndexOperationRequest(INDEX, type, SCRIPTED_UPSERT_ID, emptyUpsertDoc, IndexOperationRequest.Operation.Upsert, upsertScript, true, null, null); - service.add(request, createParameters("refresh", "true")); - result = service.get(INDEX, type, SCRIPTED_UPSERT_ID, null); + service.add(request, Map.of("refresh", "true"), null); + result = service.get(INDEX, type, SCRIPTED_UPSERT_ID, null, null); assertEquals(2, result.get("counter")); // script execution with no doc found (with scripted_upsert) - doc updated request = new IndexOperationRequest(INDEX, type, SCRIPTED_UPSERT_ID, emptyUpsertDoc, IndexOperationRequest.Operation.Upsert, upsertScript, true, null, null); - service.add(request, createParameters("refresh", "true")); - result = service.get(INDEX, type, SCRIPTED_UPSERT_ID, null); + service.add(request, Map.of("refresh", "true"), null); + result = service.get(INDEX, type, SCRIPTED_UPSERT_ID, null, null); assertEquals(4, result.get("counter")); } finally { final List deletes = new ArrayList<>(); @@ -932,12 +957,12 @@ void testUpdateAndUpsert() throws InterruptedException { deletes.add(new IndexOperationRequest(INDEX, type, UPSERTED_ID, null, IndexOperationRequest.Operation.Delete, null, false, null, null)); deletes.add(new IndexOperationRequest(INDEX, type, UPSERT_SCRIPT_ID, null, IndexOperationRequest.Operation.Delete, null, false, null, null)); deletes.add(new IndexOperationRequest(INDEX, type, SCRIPTED_UPSERT_ID, null, IndexOperationRequest.Operation.Delete, null, false, null, null)); - assertFalse(service.bulk(deletes, createParameters("refresh", "true")).hasErrors()); + assertFalse(service.bulk(deletes, Map.of("refresh", "true"), null).hasErrors()); waitForIndexRefresh(); // wait 1s for index refresh (doesn't prevent GET but affects later tests using _search or _bulk) - assertFalse(service.documentExists(INDEX, type, TEST_ID, null)); - assertFalse(service.documentExists(INDEX, type, UPSERTED_ID, null)); - assertFalse(service.documentExists(INDEX, type, UPSERT_SCRIPT_ID, null)); - assertFalse(service.documentExists(INDEX, type, SCRIPTED_UPSERT_ID, null)); + assertFalse(service.documentExists(INDEX, type, TEST_ID, null, null)); + assertFalse(service.documentExists(INDEX, type, UPSERTED_ID, null, null)); + assertFalse(service.documentExists(INDEX, type, UPSERT_SCRIPT_ID, null, null)); + assertFalse(service.documentExists(INDEX, type, SCRIPTED_UPSERT_ID, null, null)); } } @@ -952,7 +977,7 @@ void testGetBulkResponsesWithErrors() { new IndexOperationRequest(INDEX, type, "1", new MapBuilder().of("msg", "one", "intField", "notaninteger").build(), IndexOperationRequest.Operation.Index, null, false, null, null) // can't parse int field ); - final IndexOperationResponse response = service.bulk(ops, createParameters("refresh", "true")); + final IndexOperationResponse response = service.bulk(ops, Map.of("refresh", "true"), null); assertTrue(response.hasErrors()); assertEquals(2, response.getItems().stream().filter(it -> { final Optional first = it.keySet().stream().findFirst(); @@ -965,19 +990,6 @@ void testGetBulkResponsesWithErrors() { }).count()); } - private Map createParameters(final String... extra) { - if (extra.length % 2 == 1) { //Putting this here to help maintainers catch stupid bugs before they happen - throw new RuntimeException("createParameters must have an even number of String parameters."); - } - - final Map parameters = new HashMap<>(); - for (int index = 0; index < extra.length; index += 2) { - parameters.put(extra[index], extra[index + 1]); - } - - return parameters; - } - private static void waitForIndexRefresh() throws InterruptedException { Thread.sleep(1000); } diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java index 9a5f83be30e0..c63859d856c6 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java @@ -71,7 +71,7 @@ public abstract class AbstractByQueryElasticsearch extends AbstractProcessor imp abstract OperationResponse performOperation(final ElasticSearchClientService clientService, final String query, final String index, final String type, - final Map requestParameters); + final Map requestParameters, final Map requestHeaders); @Override public Set getRelationships() { @@ -131,7 +131,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session ? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue() : null; - final OperationResponse or = performOperation(clientService.get(), query, index, type, getDynamicProperties(context, input)); + final OperationResponse or = performOperation(clientService.get(), query, index, type, + getRequestParametersFromDynamicProperties(context, input), getRequestHeadersFromDynamicProperties(context, input)); if (input == null) { input = session.create(); diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java index fe6ec6f97371..99ff0ef2b2df 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java @@ -104,13 +104,14 @@ SearchResponse doQuery(final PaginatedJsonQueryParameters paginatedJsonQueryPara final boolean newQuery = StringUtils.isBlank(paginatedJsonQueryParameters.getPageExpirationTimestamp()) || expiredQuery; // execute query/scroll - final String queryJson = updateQueryJson(newQuery, paginatedJsonQueryParameters); - final Map requestParameters = getDynamicProperties(context, input); + final String queryJson = updateQueryJson(newQuery, paginatedJsonQueryParameters, context, input); + final Map requestParameters = getRequestParametersFromDynamicProperties(context, input); + final Map requestHeaders = getRequestHeadersFromDynamicProperties(context, input); if (!newQuery && paginationType == PaginationType.SCROLL) { if (!requestParameters.isEmpty()) { getLogger().warn("Elasticsearch _scroll API does not accept query parameters, ignoring dynamic properties {}", requestParameters.keySet()); } - response = clientService.get().scroll(queryJson); + response = clientService.get().scroll(queryJson, requestHeaders); } else { if (paginationType == PaginationType.SCROLL) { requestParameters.put("scroll", paginatedJsonQueryParameters.getKeepAlive()); @@ -121,7 +122,8 @@ SearchResponse doQuery(final PaginatedJsonQueryParameters paginatedJsonQueryPara // Point in Time uses general /_search API not /index/_search paginationType == PaginationType.POINT_IN_TIME ? null : paginatedJsonQueryParameters.getIndex(), paginatedJsonQueryParameters.getType(), - requestParameters + requestParameters, + requestHeaders ); paginatedJsonQueryParameters.setPitId(response.getPitId()); paginatedJsonQueryParameters.setSearchAfter(response.getSearchAfter()); @@ -142,7 +144,7 @@ SearchResponse doQuery(final PaginatedJsonQueryParameters paginatedJsonQueryPara if (response.getHits().isEmpty()) { getLogger().debug("No more results for paginated query, clearing Elasticsearch resources"); - clearElasticsearchState(context, response); + clearElasticsearchState(context, response, input); } return response; @@ -185,7 +187,7 @@ private void prepareNextPageQuery(final ObjectNode queryJson, final PaginatedJso } } - private String updateQueryJson(final boolean newQuery, final PaginatedJsonQueryParameters paginatedJsonQueryParameters) throws IOException { + private String updateQueryJson(final boolean newQuery, final PaginatedJsonQueryParameters paginatedJsonQueryParameters, final ProcessContext context, final FlowFile input) throws IOException { final ObjectNode queryJson = mapper.readValue(paginatedJsonQueryParameters.getQuery(), ObjectNode.class); if (!newQuery) { @@ -199,7 +201,8 @@ private String updateQueryJson(final boolean newQuery, final PaginatedJsonQueryP if (paginationType == PaginationType.POINT_IN_TIME) { // add pit_id to query JSON final String queryPitId = newQuery - ? clientService.get().initialisePointInTime(paginatedJsonQueryParameters.getIndex(), paginatedJsonQueryParameters.getKeepAlive()) + ? clientService.get().initialisePointInTime( + paginatedJsonQueryParameters.getIndex(), paginatedJsonQueryParameters.getKeepAlive(), getRequestHeadersFromDynamicProperties(context, input)) : paginatedJsonQueryParameters.getPitId(); final ObjectNode pit = JsonNodeFactory.instance.objectNode().put("id", queryPitId); @@ -286,19 +289,20 @@ void updateQueryParameters(final PaginatedJsonQueryParameters paginatedJsonQuery ); } - void clearElasticsearchState(final ProcessContext context, final SearchResponse response) { + void clearElasticsearchState(final ProcessContext context, final SearchResponse response, final FlowFile input) { try { + final Map requestHeaders = getRequestHeadersFromDynamicProperties(context, input); if (paginationType == PaginationType.SCROLL) { final String scrollId = getScrollId(context, response); if (StringUtils.isNotBlank(scrollId)) { - clientService.get().deleteScroll(scrollId); + clientService.get().deleteScroll(scrollId, requestHeaders); } } else if (paginationType == PaginationType.POINT_IN_TIME) { final String pitId = getPitId(context, response); if (StringUtils.isNotBlank(pitId)) { - clientService.get().deletePointInTime(pitId); + clientService.get().deletePointInTime(pitId, requestHeaders); } } } catch (final Exception ex) { diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java index 23c3f56aa843..7c6a86e09919 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.DynamicProperties; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; @@ -70,14 +71,23 @@ "The processor will retrieve multiple pages of results until either no more results are available or the Pagination Keep Alive expiration is reached, " + "after which the Range query will automatically update the field constraint based on the last retrieved Document value.") @SeeAlso({SearchElasticsearch.class, PaginatedJsonQueryElasticsearch.class}) -@DynamicProperty( - name = "The name of a URL query parameter to add", - value = "The value of the URL query parameter", - expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT, - description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " + - "These parameters will override any matching parameters in the query request body. " + - "For SCROLL type queries, these parameters are only used in the initial (first page) query as the " + - "Elasticsearch Scroll API does not support the same query parameters for subsequent pages of data.") +@DynamicProperties({ + @DynamicProperty( + name = "The name of the HTTP request header", + value = "A Record Path expression to retrieve the HTTP request header value", + expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT, + description = "Prefix: " + ElasticsearchRestProcessor.DYNAMIC_PROPERTY_PREFIX_REQUEST_HEADER + + " - adds the specified property name/value as a HTTP request header in the Elasticsearch request. " + + "If the Record Path expression results in a null or blank value, the HTTP request header will be omitted."), + @DynamicProperty( + name = "The name of a URL query parameter to add", + value = "The value of the URL query parameter", + expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT, + description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " + + "These parameters will override any matching parameters in the query request body. " + + "For SCROLL type queries, these parameters are only used in the initial (first page) query as the " + + "Elasticsearch Scroll API does not support the same query parameters for subsequent pages of data.") +}) @Stateful(scopes = Scope.CLUSTER, description = "The pagination state (scrollId, searchAfter, pitId, hitCount, pageCount, pageExpirationTimestamp, trackingRangeValue) " + "is retained in between invocations of this processor until the Scroll/PiT has expired " + "(when the current time is later than the last query execution plus the Pagination Keep Alive interval).") diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java index d17c56102d56..d545de88162e 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.elasticsearch; +import org.apache.nifi.annotation.behavior.DynamicProperties; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -39,12 +40,21 @@ @Tags({ "elastic", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "elasticsearch8", "delete", "query"}) @CapabilityDescription("Delete from an Elasticsearch index using a query. The query can be loaded from a flowfile body " + "or from the Query parameter.") -@DynamicProperty( - name = "The name of a URL query parameter to add", - value = "The value of the URL query parameter", - expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, - description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " + - "These parameters will override any matching parameters in the query request body") +@DynamicProperties({ + @DynamicProperty( + name = "The name of the HTTP request header", + value = "A Record Path expression to retrieve the HTTP request header value", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + description = "Prefix: " + ElasticsearchRestProcessor.DYNAMIC_PROPERTY_PREFIX_REQUEST_HEADER + + " - adds the specified property name/value as a HTTP request header in the Elasticsearch request. " + + "If the Record Path expression results in a null or blank value, the HTTP request header will be omitted."), + @DynamicProperty( + name = "The name of a URL query parameter to add", + value = "The value of the URL query parameter", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " + + "These parameters will override any matching parameters in the query request body") +}) public class DeleteByQueryElasticsearch extends AbstractByQueryElasticsearch { static final String TOOK_ATTRIBUTE = "elasticsearch.delete.took"; static final String ERROR_ATTRIBUTE = "elasticsearch.delete.error"; @@ -69,7 +79,8 @@ String getErrorAttribute() { @Override OperationResponse performOperation(final ElasticSearchClientService clientService, final String query, - final String index, final String type, final Map requestParameters) { - return clientService.deleteByQuery(query, index, type, requestParameters); + final String index, final String type, final Map requestParameters, + final Map requestHeaders) { + return clientService.deleteByQuery(query, index, type, requestParameters, requestHeaders); } } diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java index 18fe8080be8c..46e06aadcebb 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java @@ -50,6 +50,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Predicate; import java.util.stream.Collectors; public interface ElasticsearchRestProcessor extends Processor, VerifiableProcessor { @@ -58,6 +59,9 @@ public interface ElasticsearchRestProcessor extends Processor, VerifiableProcess String VERIFICATION_STEP_QUERY_JSON_VALID = "Elasticsearch Query JSON Valid"; String VERIFICATION_STEP_QUERY_VALID = "Elasticsearch Query Valid"; String DEFAULT_MAX_STRING_LENGTH = "20 MB"; + String DYNAMIC_PROPERTY_PREFIX_REQUEST_HEADER = "HEADER:"; + Predicate> REQUEST_HEADER_FILTER = entry -> + StringUtils.startsWith(entry.getKey().getName(), DYNAMIC_PROPERTY_PREFIX_REQUEST_HEADER); PropertyDescriptor INDEX = new PropertyDescriptor.Builder() .name("el-rest-fetch-index") @@ -321,17 +325,33 @@ default void addSortClause(final Map query, final Map getDynamicProperties(final ProcessContext context, final FlowFile flowFile) { - return getDynamicProperties(context, flowFile != null ? flowFile.getAttributes() : null); + default Map getRequestParametersFromDynamicProperties(final ProcessContext context, final FlowFile flowFile) { + return getRequestParametersFromDynamicProperties(context, flowFile != null ? flowFile.getAttributes() : null); } - default Map getDynamicProperties(final ProcessContext context, final Map attributes) { + default Map getRequestParametersFromDynamicProperties(final ProcessContext context, final Map attributes) { + return getDynamicProperties(context, attributes, Predicate.not(REQUEST_HEADER_FILTER)); + } + + default Map getRequestHeadersFromDynamicProperties(final ProcessContext context, final FlowFile flowFile) { + return getRequestHeadersFromDynamicProperties(context, flowFile != null ? flowFile.getAttributes() : null); + } + + default Map getRequestHeadersFromDynamicProperties(final ProcessContext context, final Map attributes) { + final Map dynamicProperties = getDynamicProperties(context, attributes, REQUEST_HEADER_FILTER); + return dynamicProperties.entrySet().stream().collect(Collectors.toMap( + e -> e.getKey().replaceFirst("^" + DYNAMIC_PROPERTY_PREFIX_REQUEST_HEADER, ""), + Map.Entry::getValue)); + } + + default Map getDynamicProperties(final ProcessContext context, final Map attributes, final Predicate> predicate) { return context.getProperties().entrySet().stream() // filter non-blank dynamic properties .filter(e -> e.getKey().isDynamic() && StringUtils.isNotBlank(e.getValue()) && StringUtils.isNotBlank(context.getProperty(e.getKey()).evaluateAttributeExpressions(attributes).getValue()) ) + .filter(predicate) // convert to Map keys and evaluated property values .collect(Collectors.toMap( e -> e.getKey().getName(), @@ -353,7 +373,7 @@ default List verify(final ProcessContext context, fina if (context.getProperty(INDEX).isSet()) { index = context.getProperty(INDEX).evaluateAttributeExpressions(attributes).getValue(); try { - if (verifyClientService.exists(index, getDynamicProperties(context, attributes))) { + if (verifyClientService.exists(index, getRequestParametersFromDynamicProperties(context, attributes), getRequestHeadersFromDynamicProperties(context, attributes))) { indexExistsResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL) .explanation(String.format("Index [%s] exists", index)); indexExists = true; @@ -408,10 +428,11 @@ default List verifyAfterIndex(final ProcessContext con queryJson.remove("script"); } final String type = context.getProperty(TYPE).evaluateAttributeExpressions(attributes).getValue(); - final Map requestParameters = new HashMap<>(getDynamicProperties(context, attributes)); + final Map requestParameters = new HashMap<>(getRequestParametersFromDynamicProperties(context, attributes)); requestParameters.putIfAbsent("_source", "false"); - final SearchResponse response = verifyClientService.search(mapper.writeValueAsString(queryJson), index, type, requestParameters); + final SearchResponse response = verifyClientService.search( + mapper.writeValueAsString(queryJson), index, type, requestParameters, getRequestHeadersFromDynamicProperties(context, attributes)); queryValidResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL) .explanation(String.format("Query found %d hits and %d aggregations in %d milliseconds, timed out: %s", response.getNumberOfHits(), response.getAggregations() == null ? 0 : response.getAggregations().size(), response.getTook(), response.isTimedOut())); diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java index 17ae2a27e481..2b5036bf1020 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.elasticsearch; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.DynamicProperties; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -64,11 +65,20 @@ @WritesAttribute(attribute = "elasticsearch.get.error", description = "The error message provided by Elasticsearch if there is an error fetching the document.") }) @SeeAlso(JsonQueryElasticsearch.class) -@DynamicProperty( - name = "The name of a URL query parameter to add", - value = "The value of the URL query parameter", - expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, - description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing.") +@DynamicProperties({ + @DynamicProperty( + name = "The name of the HTTP request header", + value = "A Record Path expression to retrieve the HTTP request header value", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + description = "Prefix: " + ElasticsearchRestProcessor.DYNAMIC_PROPERTY_PREFIX_REQUEST_HEADER + + " - adds the specified property name/value as a HTTP request header in the Elasticsearch request. " + + "If the Record Path expression results in a null or blank value, the HTTP request header will be omitted."), + @DynamicProperty( + name = "The name of a URL query parameter to add", + value = "The value of the URL query parameter", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing.") +}) public class GetElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor { static final AllowableValue FLOWFILE_CONTENT = new AllowableValue( "flowfile-content", @@ -184,9 +194,9 @@ public List verifyAfterIndex(final ProcessContext cont final String type = context.getProperty(TYPE).evaluateAttributeExpressions(attributes).getValue(); final String id = context.getProperty(ID).evaluateAttributeExpressions(attributes).getValue(); try { - final Map requestParameters = new HashMap<>(getDynamicProperties(context, attributes)); + final Map requestParameters = new HashMap<>(getRequestParametersFromDynamicProperties(context, attributes)); requestParameters.putIfAbsent("_source", "false"); - if (verifyClientService.documentExists(index, type, id, requestParameters)) { + if (verifyClientService.documentExists(index, type, id, requestParameters, getRequestHeadersFromDynamicProperties(context, attributes))) { documentExistsResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL) .explanation(String.format("Document [%s] exists in index [%s]", id, index)); } else { @@ -234,7 +244,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } final StopWatch stopWatch = new StopWatch(true); - final Map doc = clientService.get().get(index, type, id, getDynamicProperties(context, input)); + final Map doc = clientService.get().get(index, type, id, + getRequestParametersFromDynamicProperties(context, input), getRequestHeadersFromDynamicProperties(context, input)); final Map attributes = new HashMap<>(4, 1); attributes.put("filename", id); diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java index 3a8286244658..6e993505bb27 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.elasticsearch; +import org.apache.nifi.annotation.behavior.DynamicProperties; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -49,12 +50,21 @@ "processor, it will use the flowfile's content for the query. Care should be taken on the size of the query because the entire response " + "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.") @SeeAlso(PaginatedJsonQueryElasticsearch.class) -@DynamicProperty( - name = "The name of a URL query parameter to add", - value = "The value of the URL query parameter", - expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, - description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " + - "These parameters will override any matching parameters in the query request body") +@DynamicProperties({ + @DynamicProperty( + name = "The name of the HTTP request header", + value = "A Record Path expression to retrieve the HTTP request header value", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + description = "Prefix: " + ElasticsearchRestProcessor.DYNAMIC_PROPERTY_PREFIX_REQUEST_HEADER + + " - adds the specified property name/value as a HTTP request header in the Elasticsearch request. " + + "If the Record Path expression results in a null or blank value, the HTTP request header will be omitted."), + @DynamicProperty( + name = "The name of a URL query parameter to add", + value = "The value of the URL query parameter", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " + + "These parameters will override any matching parameters in the query request body") +}) public class JsonQueryElasticsearch extends AbstractJsonQueryElasticsearch { @Override JsonQueryParameters buildJsonQueryParameters(final FlowFile input, final ProcessContext context, final ProcessSession session) @@ -72,7 +82,8 @@ SearchResponse doQuery(final JsonQueryParameters queryJsonParameters, final List queryJsonParameters.getQuery(), queryJsonParameters.getIndex(), queryJsonParameters.getType(), - getDynamicProperties(context, input) + getRequestParametersFromDynamicProperties(context, input), + getRequestHeadersFromDynamicProperties(context, input) ); if (input != null) { session.getProvenanceReporter().send( diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java index 2f529a35f0ba..787bacf966d2 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.elasticsearch; +import org.apache.nifi.annotation.behavior.DynamicProperties; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SystemResource; @@ -49,14 +50,23 @@ "It will use the flowfile's content for the query unless the QUERY attribute is populated. " + "Search After/Point in Time queries must include a valid \"sort\" field.") @SeeAlso({JsonQueryElasticsearch.class, ConsumeElasticsearch.class, SearchElasticsearch.class}) -@DynamicProperty( - name = "The name of a URL query parameter to add", - value = "The value of the URL query parameter", - expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, - description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " + - "These parameters will override any matching parameters in the query request body. " + - "For SCROLL type queries, these parameters are only used in the initial (first page) query as the " + - "Elasticsearch Scroll API does not support the same query parameters for subsequent pages of data.") +@DynamicProperties({ + @DynamicProperty( + name = "The name of the HTTP request header", + value = "A Record Path expression to retrieve the HTTP request header value", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + description = "Prefix: " + ElasticsearchRestProcessor.DYNAMIC_PROPERTY_PREFIX_REQUEST_HEADER + + " - adds the specified property name/value as a HTTP request header in the Elasticsearch request. " + + "If the Record Path expression results in a null or blank value, the HTTP request header will be omitted."), + @DynamicProperty( + name = "The name of a URL query parameter to add", + value = "The value of the URL query parameter", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " + + "These parameters will override any matching parameters in the query request body. " + + "For SCROLL type queries, these parameters are only used in the initial (first page) query as the " + + "Elasticsearch Scroll API does not support the same query parameters for subsequent pages of data.") +}) @SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response " + "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.") public class PaginatedJsonQueryElasticsearch extends AbstractPaginatedJsonQueryElasticsearch { diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java index ec7495c923fa..6342cbd5a80c 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java @@ -74,6 +74,14 @@ " - adds the specified property name/value as a Bulk request header in the Elasticsearch Bulk API body used for processing. " + "If the value is null or blank, the Bulk header will be omitted for the document operation. " + "These parameters will override any matching parameters in the _bulk request body."), + @DynamicProperty( + name = "The name of the HTTP request header", + value = "A Record Path expression to retrieve the HTTP request header value", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + description = "Prefix: " + ElasticsearchRestProcessor.DYNAMIC_PROPERTY_PREFIX_REQUEST_HEADER + + " - adds the specified property name/value as a HTTP request header in the Elasticsearch request. " + + "If the Record Path expression results in a null or blank value, the HTTP request header will be omitted. " + + "If FlowFiles are batched, only the first FlowFile in the batch is used to evaluate property values."), @DynamicProperty( name = "The name of a URL query parameter to add", value = "The value of the URL query parameter", @@ -234,7 +242,7 @@ private void addOperation(final List operations, final Li final boolean scriptedUpsert = context.getProperty(SCRIPTED_UPSERT).evaluateAttributeExpressions(input).asBoolean(); final Map dynamicTemplatesMap = getMapFromAttribute(DYNAMIC_TEMPLATES, context, input); - final Map dynamicProperties = getDynamicProperties(context, input); + final Map dynamicProperties = getRequestParametersFromDynamicProperties(context, input); final Map bulkHeaderFields = getBulkHeaderParameters(dynamicProperties); final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue(); @@ -272,8 +280,9 @@ private Map getMapFromAttribute(final PropertyDescriptor propert } private List indexDocuments(final List operations, final List originals, final ProcessContext context, final ProcessSession session) throws IOException { - final Map dynamicProperties = getDynamicProperties(context, originals.getFirst()); - final IndexOperationResponse response = clientService.get().bulk(operations, getRequestURLParameters(dynamicProperties)); + final Map dynamicProperties = getRequestParametersFromDynamicProperties(context, originals.getFirst()); + final IndexOperationResponse response = clientService.get().bulk(operations, getRequestURLParameters(dynamicProperties), + getRequestHeadersFromDynamicProperties(context, originals.getFirst())); final Map> errors = findElasticsearchResponseErrors(response); final List errorDocuments = new ArrayList<>(errors.size()); diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java index 9e19965d0a4b..b2996abf6950 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java @@ -107,6 +107,13 @@ " - adds the specified property name/value as a Bulk request header in the Elasticsearch Bulk API body used for processing. " + "If the Record Path expression results in a null or blank value, the Bulk header will be omitted for the document operation. " + "These parameters will override any matching parameters in the _bulk request body."), + @DynamicProperty( + name = "The name of the HTTP request header", + value = "A Record Path expression to retrieve the HTTP request header value", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + description = "Prefix: " + ElasticsearchRestProcessor.DYNAMIC_PROPERTY_PREFIX_REQUEST_HEADER + + " - adds the specified property name/value as a HTTP request header in the Elasticsearch request. " + + "If the Record Path expression results in a null or blank value, the HTTP request header will be omitted."), @DynamicProperty( name = "The name of a URL query parameter to add", value = "The value of the URL query parameter", @@ -421,13 +428,15 @@ public void onTrigger(final ProcessContext context, final ProcessSession session originals.add(record); if (operationList.size() == indexOperationParameters.getBatchSize() || !recordSet.isAnotherRecord()) { - operate(operationList, originals, reader, session, input, indexOperationParameters.getRequestParameters(), resultRecords, erroredRecords, successfulRecords); + operate(operationList, originals, reader, session, input, indexOperationParameters.getRequestParameters(), + indexOperationParameters.getRequestHeaders(), resultRecords, erroredRecords, successfulRecords); batches++; } } if (!operationList.isEmpty()) { - operate(operationList, originals, reader, session, input, indexOperationParameters.getRequestParameters(), resultRecords, erroredRecords, successfulRecords); + operate(operationList, originals, reader, session, input, indexOperationParameters.getRequestParameters(), + indexOperationParameters.getRequestHeaders(), resultRecords, erroredRecords, successfulRecords); batches++; } } catch (final ElasticsearchException ese) { @@ -501,12 +510,12 @@ private void addOperation(final List operationList, final } private void operate(final List operationList, final List originals, final RecordReader reader, - final ProcessSession session, final FlowFile input, final Map requestParameters, + final ProcessSession session, final FlowFile input, final Map requestParameters, final Map requestHeaders, final List resultRecords, final AtomicLong erroredRecords, final AtomicLong successfulRecords) throws IOException, SchemaNotFoundException, MalformedRecordException { final BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema()); - final ResponseDetails responseDetails = indexDocuments(bundle, session, input, requestParameters); + final ResponseDetails responseDetails = indexDocuments(bundle, session, input, requestParameters, requestHeaders); successfulRecords.getAndAdd(responseDetails.successCount()); erroredRecords.getAndAdd(responseDetails.errorCount()); @@ -525,8 +534,8 @@ private void removeResultRecordFlowFiles(final List results, final Pro } private ResponseDetails indexDocuments(final BulkOperation bundle, final ProcessSession session, final FlowFile input, - final Map requestParameters) throws IOException, SchemaNotFoundException { - final IndexOperationResponse response = clientService.get().bulk(bundle.getOperationList(), requestParameters); + final Map requestParameters, final Map requestHeaders) throws IOException, SchemaNotFoundException { + final IndexOperationResponse response = clientService.get().bulk(bundle.getOperationList(), requestParameters, requestHeaders); final Map> errors = findElasticsearchResponseErrors(response); if (!errors.isEmpty()) { @@ -832,6 +841,7 @@ private class IndexOperationParameters { private final RecordPath scriptedUpsertPath; private final RecordPath dynamicTypesPath; + private final Map requestHeaders; private final Map requestParameters; private final Map bulkHeaderRecordPaths; @@ -854,7 +864,9 @@ private class IndexOperationParameters { scriptedUpsertPath = compileRecordPathFromProperty(context, SCRIPTED_UPSERT_RECORD_PATH, input); dynamicTypesPath = compileRecordPathFromProperty(context, DYNAMIC_TEMPLATES_RECORD_PATH, input); - final Map dynamicProperties = getDynamicProperties(context, input); + requestHeaders = getRequestHeadersFromDynamicProperties(context, input); + + final Map dynamicProperties = getRequestParametersFromDynamicProperties(context, input); requestParameters = getRequestURLParameters(dynamicProperties); final Map bulkHeaderParameterPaths = getBulkHeaderParameters(dynamicProperties); @@ -926,6 +938,10 @@ public RecordPath getDynamicTypesPath() { return dynamicTypesPath; } + public Map getRequestHeaders() { + return requestHeaders; + } + public Map getRequestParameters() { return requestParameters; } diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java index bd22a51fec67..7cda87f9f646 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.elasticsearch; +import org.apache.nifi.annotation.behavior.DynamicProperties; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; @@ -68,14 +69,23 @@ "until either no more results are available or the Pagination Keep Alive expiration is reached, after which the query will " + "restart with the first page of results being retrieved.") @SeeAlso({PaginatedJsonQueryElasticsearch.class, ConsumeElasticsearch.class}) -@DynamicProperty( - name = "The name of a URL query parameter to add", - value = "The value of the URL query parameter", - expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT, - description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " + - "These parameters will override any matching parameters in the query request body. " + - "For SCROLL type queries, these parameters are only used in the initial (first page) query as the " + - "Elasticsearch Scroll API does not support the same query parameters for subsequent pages of data.") +@DynamicProperties({ + @DynamicProperty( + name = "The name of the HTTP request header", + value = "A Record Path expression to retrieve the HTTP request header value", + expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT, + description = "Prefix: " + ElasticsearchRestProcessor.DYNAMIC_PROPERTY_PREFIX_REQUEST_HEADER + + " - adds the specified property name/value as a HTTP request header in the Elasticsearch request. " + + "If the Record Path expression results in a null or blank value, the HTTP request header will be omitted."), + @DynamicProperty( + name = "The name of a URL query parameter to add", + value = "The value of the URL query parameter", + expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT, + description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " + + "These parameters will override any matching parameters in the query request body. " + + "For SCROLL type queries, these parameters are only used in the initial (first page) query as the " + + "Elasticsearch Scroll API does not support the same query parameters for subsequent pages of data.") +}) @Stateful(scopes = Scope.LOCAL, description = "The pagination state (scrollId, searchAfter, pitId, hitCount, pageCount, pageExpirationTimestamp) " + "is retained in between invocations of this processor until the Scroll/PiT has expired " + "(when the current time is later than the last query execution plus the Pagination Keep Alive interval).") diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UpdateByQueryElasticsearch.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UpdateByQueryElasticsearch.java index 87ddaffe3788..2c36f3006788 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UpdateByQueryElasticsearch.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UpdateByQueryElasticsearch.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.elasticsearch; +import org.apache.nifi.annotation.behavior.DynamicProperties; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -38,12 +39,21 @@ @CapabilityDescription("Update documents in an Elasticsearch index using a query. The query can be loaded from a flowfile body " + "or from the Query parameter. The loaded Query can contain any JSON accepted by Elasticsearch's _update_by_query API, " + "for example a \"query\" object to identify what documents are to be updated, plus a \"script\" to define the updates to perform.") -@DynamicProperty( - name = "The name of a URL query parameter to add", - value = "The value of the URL query parameter", - expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, - description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " + - "These parameters will override any matching parameters in the query request body") +@DynamicProperties({ + @DynamicProperty( + name = "The name of the HTTP request header", + value = "A Record Path expression to retrieve the HTTP request header value", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + description = "Prefix: " + ElasticsearchRestProcessor.DYNAMIC_PROPERTY_PREFIX_REQUEST_HEADER + + " - adds the specified property name/value as a HTTP request header in the Elasticsearch request. " + + "If the Record Path expression results in a null or blank value, the HTTP request header will be omitted."), + @DynamicProperty( + name = "The name of a URL query parameter to add", + value = "The value of the URL query parameter", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " + + "These parameters will override any matching parameters in the query request body") +}) public class UpdateByQueryElasticsearch extends AbstractByQueryElasticsearch { static final String TOOK_ATTRIBUTE = "elasticsearch.update.took"; static final String ERROR_ATTRIBUTE = "elasticsearch.update.error"; @@ -60,7 +70,8 @@ String getErrorAttribute() { @Override OperationResponse performOperation(final ElasticSearchClientService clientService, final String query, - final String index, final String type, final Map requestParameters) { - return clientService.updateByQuery(query, index, type, requestParameters); + final String index, final String type, final Map requestParameters, + final Map requestHeaders) { + return clientService.updateByQuery(query, index, type, requestParameters, requestHeaders); } } diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearchTest.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearchTest.java index 5f9ca72b7a69..50e907d05c40 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearchTest.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearchTest.java @@ -32,6 +32,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -172,18 +173,20 @@ public void testWithFlowfileInput() { postTest(runner, query); assertTrue(client.getRequestParameters().isEmpty()); + assertTrue(client.getRequestHeaders().isEmpty()); } @Test - public void testWithFlowfileInputAndRequestParameters() { + public void testWithFlowfileInputAndRequestParametersAndRequestHeaders() { final String query = matchAllQuery; runner.setProperty(AbstractByQueryElasticsearch.INDEX, INDEX); runner.setProperty(AbstractByQueryElasticsearch.TYPE, TYPE); runner.setProperty(AbstractByQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr()); runner.setProperty("refresh", "true"); runner.setProperty("slices", "${slices}"); + runner.setProperty(ElasticsearchRestProcessor.DYNAMIC_PROPERTY_PREFIX_REQUEST_HEADER + "Accept", "${accept}"); runner.assertValid(); - runner.enqueue(query, Collections.singletonMap("slices", "auto")); + runner.enqueue(query, Map.of("slices", "auto", "accept", "application/json")); runner.run(); postTest(runner, query); @@ -191,6 +194,9 @@ public void testWithFlowfileInputAndRequestParameters() { assertEquals(2, client.getRequestParameters().size()); assertEquals("true", client.getRequestParameters().get("refresh")); assertEquals("auto", client.getRequestParameters().get("slices")); + + assertEquals(1, client.getRequestHeaders().size()); + assertEquals("application/json", client.getRequestHeaders().get("Accept")); } @Test @@ -264,6 +270,7 @@ public void testNoInputHandling() { postTest(runner, query); assertTrue(client.getRequestParameters().isEmpty()); + assertTrue(client.getRequestHeaders().isEmpty()); } @Test @@ -275,7 +282,9 @@ public void testNoInputHandlingWithRequestParameters() { runner.setProperty(AbstractByQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr()); runner.setProperty("refresh", "true"); runner.setProperty("slices", "${slices}"); + runner.setProperty(ElasticsearchRestProcessor.DYNAMIC_PROPERTY_PREFIX_REQUEST_HEADER + "Accept", "${accept}"); runner.setEnvironmentVariableValue("slices", "auto"); + runner.setEnvironmentVariableValue("accept", "application/json"); runner.setIncomingConnection(false); runner.assertValid(); runner.run(); @@ -285,6 +294,9 @@ public void testNoInputHandlingWithRequestParameters() { assertEquals(2, client.getRequestParameters().size()); assertEquals("true", client.getRequestParameters().get("refresh")); assertEquals("auto", client.getRequestParameters().get("slices")); + + assertEquals(1, client.getRequestHeaders().size()); + assertEquals("application/json", client.getRequestHeaders().get("Accept")); } @ParameterizedTest diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.java index 375bb5d82d73..a45ae6baf656 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.java @@ -457,7 +457,9 @@ void testRequestParameters() { } runner.setProperty("refresh", "true"); runner.setProperty("slices", "${slices}"); + runner.setProperty(ElasticsearchRestProcessor.DYNAMIC_PROPERTY_PREFIX_REQUEST_HEADER + "Accept", "${accept}"); runner.setEnvironmentVariableValue("slices", "auto"); + runner.setEnvironmentVariableValue("accept", "application/json"); runOnce(runner); @@ -471,6 +473,9 @@ void testRequestParameters() { assertEquals("true", service.getRequestParameters().get("refresh")); assertEquals("auto", service.getRequestParameters().get("slices")); + + assertEquals(1, service.getRequestHeaders().size()); + assertEquals("application/json", service.getRequestHeaders().get("Accept")); } @ParameterizedTest diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.java index f1b0c98248b6..3700438cc8be 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.java @@ -203,13 +203,18 @@ void testRequestParameters() { void testEmptyId() { runner.setProperty("refresh", "true"); runner.setProperty("_source", "${source}"); + runner.setProperty(ElasticsearchRestProcessor.DYNAMIC_PROPERTY_PREFIX_REQUEST_HEADER + "Accept", "${accept}"); runner.setEnvironmentVariableValue("source", "msg"); + runner.setEnvironmentVariableValue("accept", "application/json"); runProcessor(runner); final TestElasticsearchClientService service = getService(runner); assertEquals(2, service.getRequestParameters().size()); assertEquals("true", service.getRequestParameters().get("refresh")); assertEquals("msg", service.getRequestParameters().get("_source")); + + assertEquals(1, service.getRequestHeaders().size()); + assertEquals("application/json", service.getRequestHeaders().get("Accept")); } private static void testCounts(final TestRunner runner, final int doc, final int failure, final int notFound) { diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.java index 30cdd6aafd10..21f50d3f0ffc 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.java @@ -76,6 +76,7 @@ public static void setUpBeforeClass() throws Exception { expectedDynamicTemplate.put("your_field", yourField); } + @Override @BeforeEach public void setup() throws Exception { super.setup(); @@ -181,11 +182,12 @@ void simpleTest() { } @Test - void simpleTestWithDocIdAndRequestParametersAndBulkHeaders() { + void simpleTestWithDocIdAndRequestParametersAndBulkHeadersAndRequestHeaders() { runner.setProperty("refresh", "true"); runner.setProperty(AbstractPutElasticsearch.BULK_HEADER_PREFIX + "routing", "1"); runner.setProperty(AbstractPutElasticsearch.BULK_HEADER_PREFIX + "version", "${version}"); runner.setProperty(AbstractPutElasticsearch.BULK_HEADER_PREFIX + "empty", "${empty}"); + runner.setProperty(ElasticsearchRestProcessor.DYNAMIC_PROPERTY_PREFIX_REQUEST_HEADER + "Accept", "application/json"); runner.setProperty("slices", "${slices}"); runner.setProperty("another", "${blank}"); runner.setEnvironmentVariableValue("slices", "auto"); @@ -199,6 +201,11 @@ void simpleTestWithDocIdAndRequestParametersAndBulkHeaders() { assertEquals("auto", params.get("slices")); }); + clientService.setEvalHeadersConsumer((final Map headers) -> { + assertEquals(1, headers.size()); + assertEquals("application/json", headers.get("Accept")); + }); + clientService.setEvalConsumer((final List items) -> { final long idCount = items.stream().filter(item -> "123".equals(item.getId())).count(); final long indexCount = items.stream().filter(item -> "test_index".equals(item.getIndex())).count(); @@ -228,6 +235,7 @@ void simpleTestWithRequestParametersAndBulkHeadersFlowFileEL() { runner.setProperty(AbstractPutElasticsearch.BULK_HEADER_PREFIX + "routing", "1"); runner.setProperty(AbstractPutElasticsearch.BULK_HEADER_PREFIX + "version", "${version}"); runner.setProperty(AbstractPutElasticsearch.BULK_HEADER_PREFIX + "empty", "${empty}"); + runner.setProperty(ElasticsearchRestProcessor.DYNAMIC_PROPERTY_PREFIX_REQUEST_HEADER + "Accept", "${accept}"); runner.assertValid(); clientService.setEvalParametersConsumer((final Map params) -> { @@ -236,6 +244,11 @@ void simpleTestWithRequestParametersAndBulkHeadersFlowFileEL() { assertEquals("auto", params.get("slices")); }); + clientService.setEvalHeadersConsumer((final Map headers) -> { + assertEquals(1, headers.size()); + assertEquals("application/json", headers.get("Accept")); + }); + clientService.setEvalConsumer((final List items) -> { final long nullIdCount = items.stream().filter(item -> item.getId() == null).count(); final long headerFieldsCount = items.stream().filter(item -> !item.getHeaderFields().isEmpty()).count(); @@ -251,6 +264,7 @@ void simpleTestWithRequestParametersAndBulkHeadersFlowFileEL() { final Map attributes = new LinkedHashMap<>(); attributes.put("slices", "auto"); attributes.put("version", "external"); + attributes.put("accept", "application/json"); attributes.put("blank", " "); attributes.put("doc_id", ""); basicTest(0, 0, 1, attributes); diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.java index 172f24aab685..3cde2357d976 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.java @@ -264,7 +264,8 @@ void simpleTestWithRequestParametersAndBulkHeaders() { runner.setProperty("another", "${blank}"); runner.setEnvironmentVariableValue("slices", "auto"); runner.setEnvironmentVariableValue("version", "/version"); - testWithRequestParametersAndBulkHeaders(null); + runner.setEnvironmentVariableValue("accept", "application/json"); + testWithRequestParametersAndBulkHeadersAndRequestHeaders(null); } @Test @@ -274,7 +275,8 @@ void simpleTestWithRequestParametersAndBulkHeadersFlowFileEL() { attributes.put("version", "/version"); attributes.put("slices", "auto"); attributes.put("blank", " "); - testWithRequestParametersAndBulkHeaders(attributes); + attributes.put("accept", "application/json"); + testWithRequestParametersAndBulkHeadersAndRequestHeaders(attributes); } @Test @@ -889,12 +891,13 @@ private void testInvalidELRecordPaths(final String idRecordPath, final String at runner.assertTransferCount(AbstractPutElasticsearch.REL_ERROR_RESPONSES, 0); } - private void testWithRequestParametersAndBulkHeaders(final Map attributes) { + private void testWithRequestParametersAndBulkHeadersAndRequestHeaders(final Map attributes) { runner.setProperty("refresh", "true"); runner.setProperty("slices", "${slices}"); runner.setProperty(AbstractPutElasticsearch.BULK_HEADER_PREFIX + "routing", "/routing"); runner.setProperty(AbstractPutElasticsearch.BULK_HEADER_PREFIX + "version", "${version}"); runner.setProperty(AbstractPutElasticsearch.BULK_HEADER_PREFIX + "empty", "${empty}"); + runner.setProperty(ElasticsearchRestProcessor.DYNAMIC_PROPERTY_PREFIX_REQUEST_HEADER + "Accept", "${accept}"); runner.setEnvironmentVariableValue("blank", " "); runner.assertValid(); @@ -904,6 +907,11 @@ private void testWithRequestParametersAndBulkHeaders(final Map a assertEquals("auto", params.get("slices")); }); + clientService.setEvalHeadersConsumer((final Map headers) -> { + assertEquals(1, headers.size()); + assertEquals("application/json", headers.get("Accept")); + }); + final Consumer> consumer = (final List items) -> { final long headerFieldsCount = items.stream().filter(item -> !item.getHeaderFields().isEmpty()).count(); final long routingCount = items.stream().filter(item -> "1".equals(item.getHeaderFields().get("routing"))).count(); diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.java index a73dadc70c27..f43bfd4c6fbe 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.java @@ -59,6 +59,7 @@ public class TestElasticsearchClientService extends AbstractControllerService im private int pageCount = 0; private int maxPages = 1; private Map requestParameters; + private Map requestHeaders; private boolean scrolling = false; private String query; @@ -67,7 +68,7 @@ public TestElasticsearchClientService(final boolean returnAggs) { this.returnAggs = returnAggs; } - private void common(final boolean throwError, final Map requestParameters) throws IOException { + private void common(final boolean throwError, final Map requestParameters, final Map requestHeaders) throws IOException { if (throwError) { if (throwNotFoundInGet) { throw new MockElasticsearchException(false, true); @@ -77,6 +78,7 @@ private void common(final boolean throwError, final Map requestP } this.requestParameters = requestParameters; + this.requestHeaders = requestHeaders; } @Override @@ -85,14 +87,14 @@ public List verify(final ConfigurationContext context, } @Override - public IndexOperationResponse add(final IndexOperationRequest operation, final Map requestParameters) { - return bulk(Collections.singletonList(operation), requestParameters); + public IndexOperationResponse add(final IndexOperationRequest operation, final Map requestParameters, final Map requestHeaders) { + return bulk(Collections.singletonList(operation), requestParameters, requestHeaders); } @Override - public IndexOperationResponse bulk(final List operations, final Map requestParameters) { + public IndexOperationResponse bulk(final List operations, final Map requestParameters, final Map requestHeaders) { try { - common(false, requestParameters); + common(false, requestParameters, requestHeaders); } catch (final IOException e) { throw new RuntimeException(e); } @@ -100,9 +102,9 @@ public IndexOperationResponse bulk(final List operations, } @Override - public Long count(final String query, final String index, final String type, final Map requestParameters) { + public Long count(final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { try { - common(false, requestParameters); + common(false, requestParameters, requestHeaders); } catch (final IOException e) { throw new RuntimeException(e); } @@ -111,14 +113,14 @@ public Long count(final String query, final String index, final String type, fin } @Override - public DeleteOperationResponse deleteById(final String index, final String type, final String id, final Map requestParameters) { - return deleteById(index, type, Collections.singletonList(id), requestParameters); + public DeleteOperationResponse deleteById(final String index, final String type, final String id, final Map requestParameters, final Map requestHeaders) { + return deleteById(index, type, Collections.singletonList(id), requestParameters, requestHeaders); } @Override - public DeleteOperationResponse deleteById(final String index, final String type, final List ids, final Map requestParameters) { + public DeleteOperationResponse deleteById(final String index, final String type, final List ids, final Map requestParameters, final Map requestHeaders) { try { - common(throwErrorInDelete, requestParameters); + common(throwErrorInDelete, requestParameters, requestHeaders); } catch (final IOException e) { throw new RuntimeException(e); } @@ -126,15 +128,15 @@ public DeleteOperationResponse deleteById(final String index, final String type, } @Override - public DeleteOperationResponse deleteByQuery(final String query, final String index, final String type, final Map requestParameters) { + public DeleteOperationResponse deleteByQuery(final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { this.query = query; - return deleteById(index, type, Collections.singletonList("1"), requestParameters); + return deleteById(index, type, Collections.singletonList("1"), requestParameters, requestHeaders); } @Override - public UpdateOperationResponse updateByQuery(final String query, final String index, final String type, final Map requestParameters) { + public UpdateOperationResponse updateByQuery(final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { try { - common(throwErrorInUpdate, requestParameters); + common(throwErrorInUpdate, requestParameters, requestHeaders); } catch (final IOException e) { throw new RuntimeException(e); } @@ -143,23 +145,24 @@ public UpdateOperationResponse updateByQuery(final String query, final String in } @Override - public void refresh(final String index, final Map requestParameters) { + public void refresh(final String index, final Map requestParameters, final Map requestHeaders) { + // intentionally blank } @Override - public boolean exists(final String index, final Map requestParameters) { + public boolean exists(final String index, final Map requestParameters, final Map requestHeaders) { return true; } @Override - public boolean documentExists(final String index, final String type, final String id, final Map requestParameters) { + public boolean documentExists(final String index, final String type, final String id, final Map requestParameters, final Map requestHeaders) { return true; } @Override - public Map get(final String index, final String type, final String id, final Map requestParameters) { + public Map get(final String index, final String type, final String id, final Map requestParameters, final Map requestHeaders) { try { - common(throwErrorInGet || throwNotFoundInGet, requestParameters); + common(throwErrorInGet || throwNotFoundInGet, requestParameters, requestHeaders); } catch (final IOException e) { throw new RuntimeException(e); } @@ -169,9 +172,9 @@ public Map get(final String index, final String type, final Stri } @Override - public SearchResponse search(final String query, final String index, final String type, final Map requestParameters) { + public SearchResponse search(final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { try { - common(throwErrorInSearch, requestParameters); + common(throwErrorInSearch, requestParameters, requestHeaders); } catch (final IOException e) { throw new RuntimeException(e); } @@ -192,19 +195,19 @@ public SearchResponse search(final String query, final String index, final Strin } @Override - public SearchResponse scroll(final String scroll) { + public SearchResponse scroll(final String scroll, final Map requestHeaders) { if (throwErrorInSearch) { throw new RuntimeException(new IOException("Simulated IOException - scroll")); } scrolling = true; - final SearchResponse response = search(null, null, null, requestParameters); + final SearchResponse response = search(null, null, null, requestParameters, requestHeaders); scrolling = false; return response; } @Override - public String initialisePointInTime(final String index, final String keepAlive) { + public String initialisePointInTime(final String index, final String keepAlive, final Map requestHeaders) { if (throwErrorInPit) { throw new RuntimeException(new IOException("Simulated IOException - initialisePointInTime")); } @@ -215,7 +218,7 @@ public String initialisePointInTime(final String index, final String keepAlive) } @Override - public DeleteOperationResponse deletePointInTime(final String pitId) { + public DeleteOperationResponse deletePointInTime(final String pitId, final Map requestHeaders) { if (throwErrorInDelete) { throw new RuntimeException(new IOException("Simulated IOException - deletePointInTime")); } @@ -224,7 +227,7 @@ public DeleteOperationResponse deletePointInTime(final String pitId) { } @Override - public DeleteOperationResponse deleteScroll(final String scrollId) { + public DeleteOperationResponse deleteScroll(final String scrollId, final Map requestHeaders) { if (throwErrorInDelete) { throw new RuntimeException(new IOException("Simulated IOException - deleteScroll")); } @@ -269,6 +272,10 @@ public void setMaxPages(final int maxPages) { this.maxPages = maxPages; } + public Map getRequestHeaders() { + return this.requestHeaders; + } + public Map getRequestParameters() { return this.requestParameters; } diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/AbstractElasticsearch_IT.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/AbstractElasticsearch_IT.java index 23bb97998f0a..f962d900ac36 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/AbstractElasticsearch_IT.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/integration/AbstractElasticsearch_IT.java @@ -62,7 +62,7 @@ void before() throws Exception { runner.setProperty(ElasticsearchRestProcessor.INDEX, INDEX); runner.setProperty(ElasticsearchRestProcessor.TYPE, type); - service.refresh(null, null); + service.refresh(null, null, null); } @AfterEach diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.java index 9f3d6fd566f0..413fb2f36a02 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.java @@ -38,81 +38,82 @@ public List verify(final ConfigurationContext context, } @Override - public IndexOperationResponse add(final IndexOperationRequest operation, final Map requestParameters) { + public IndexOperationResponse add(final IndexOperationRequest operation, final Map requestParameters, final Map requestHeaders) { return null; } @Override - public IndexOperationResponse bulk(final List operations, final Map requestParameters) { + public IndexOperationResponse bulk(final List operations, final Map requestParameters, final Map requestHeaders) { return null; } @Override - public Long count(final String query, final String index, final String type, final Map requestParameters) { + public Long count(final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { return null; } @Override - public DeleteOperationResponse deleteById(final String index, final String type, final String id, final Map requestParameters) { + public DeleteOperationResponse deleteById(final String index, final String type, final String id, final Map requestParameters, final Map requestHeaders) { return null; } @Override - public DeleteOperationResponse deleteById(final String index, final String type, final List ids, final Map requestParameters) { + public DeleteOperationResponse deleteById(final String index, final String type, final List ids, final Map requestParameters, final Map requestHeaders) { return null; } @Override - public DeleteOperationResponse deleteByQuery(final String query, final String index, final String type, final Map requestParameters) { + public DeleteOperationResponse deleteByQuery(final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { return null; } @Override - public UpdateOperationResponse updateByQuery(final String query, final String index, final String type, final Map requestParameters) { + public UpdateOperationResponse updateByQuery(final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { return null; } @Override - public void refresh(final String index, final Map requestParameters) { + public void refresh(final String index, final Map requestParameters, final Map requestHeaders) { + // intentionally blank } @Override - public boolean exists(final String index, final Map requestParameters) { + public boolean exists(final String index, final Map requestParameters, final Map requestHeaders) { return true; } @Override - public boolean documentExists(final String index, final String type, final String id, final Map requestParameters) { + public boolean documentExists(final String index, final String type, final String id, final Map requestParameters, final Map requestHeaders) { return true; } @Override - public Map get(final String index, final String type, final String id, final Map requestParameters) { + public Map get(final String index, final String type, final String id, final Map requestParameters, final Map requestHeaders) { return null; } @Override - public SearchResponse search(final String query, final String index, final String type, final Map requestParameters) { + public SearchResponse search(final String query, final String index, final String type, final Map requestParameters, final Map requestHeaders) { return null; } @Override - public SearchResponse scroll(final String scroll) { + public SearchResponse scroll(final String scroll, final Map requestHeaders) { return null; } @Override - public String initialisePointInTime(final String index, final String keepAlive) { + public String initialisePointInTime(final String index, final String keepAlive, final Map requestHeaders) { return null; } @Override - public DeleteOperationResponse deletePointInTime(final String pitId) { + public DeleteOperationResponse deletePointInTime(final String pitId, final Map requestHeaders) { return null; } @Override - public DeleteOperationResponse deleteScroll(final String scrollId) { + public DeleteOperationResponse deleteScroll(final String scrollId, final Map requestHeaders) { return null; } diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.java index 55d89e4600e7..ca11cbc82c84 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.java @@ -27,9 +27,10 @@ public class MockBulkLoadClientService extends AbstractMockElasticsearchClient { private IndexOperationResponse response; private Consumer> evalConsumer; private Consumer> evalParametersConsumer; + private Consumer> evalHeadersConsumer; @Override - public IndexOperationResponse bulk(final List items, final Map requestParameters) { + public IndexOperationResponse bulk(final List items, final Map requestParameters, final Map requestHeaders) { if (isThrowRetriableError()) { throw new MockElasticsearchException(true, false); } else if (isThrowFatalError()) { @@ -44,6 +45,10 @@ public IndexOperationResponse bulk(final List items, fina evalParametersConsumer.accept(requestParameters); } + if (evalHeadersConsumer != null) { + evalHeadersConsumer.accept(requestHeaders); + } + return response; } @@ -58,4 +63,8 @@ public void setEvalConsumer(final Consumer> evalCons public void setEvalParametersConsumer(final Consumer> evalParametersConsumer) { this.evalParametersConsumer = evalParametersConsumer; } + + public void setEvalHeadersConsumer(final Consumer> evalHeadersConsumer) { + this.evalHeadersConsumer = evalHeadersConsumer; + } } diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java index ae3db3cc072b..163b8fad9961 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java @@ -52,7 +52,7 @@ public abstract class AbstractElasticsearchITBase { // default Elasticsearch version should (ideally) match that in the nifi-elasticsearch-bundle#pom.xml for the integration-tests profile protected static final DockerImageName IMAGE = DockerImageName - .parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.15.1")); + .parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.17.0")); protected static final String ELASTIC_USER_PASSWORD = System.getProperty("elasticsearch.elastic_user.password", RandomStringUtils.randomAlphanumeric(10, 20)); private static final int PORT = 9200; protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = new ElasticsearchContainer(IMAGE)