From 33b58d99ddd13b19ea6d56cbf6ad2bb73866d4b4 Mon Sep 17 00:00:00 2001 From: ulbi Date: Mon, 8 Jan 2024 09:40:25 +0100 Subject: [PATCH] fix: copy Attributes on Splitting Result Sets (#147) --- CHANGELOG.md | 3 + .../internal/AbstractGetInfluxDatabase_2.java | 11 +- .../TestGetInfluxDatabaseRecord_2.java | 107 ++++++++++++++++++ 3 files changed, 119 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a7d50a5..16bc90d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## v1.27.0 [unreleased] +### Bug Fixes +1. [#147](https://github.com/influxdata/nifi-influxdb-bundle/pull/147): Copy Attributes on Splitting Result Sets + ## v1.26.0 [2023-12-05] ### Others diff --git a/nifi-influx-database-processors/src/main/java/org/influxdata/nifi/processors/internal/AbstractGetInfluxDatabase_2.java b/nifi-influx-database-processors/src/main/java/org/influxdata/nifi/processors/internal/AbstractGetInfluxDatabase_2.java index eda8fb4..30abcda 100644 --- a/nifi-influx-database-processors/src/main/java/org/influxdata/nifi/processors/internal/AbstractGetInfluxDatabase_2.java +++ b/nifi-influx-database-processors/src/main/java/org/influxdata/nifi/processors/internal/AbstractGetInfluxDatabase_2.java @@ -288,9 +288,14 @@ private QueryProcessor(final String org, if (recordsPerFlowFile == -1 || createdFlowFile) { this.flowFile = flowFile; } else { - this.flowFile = session.create(); + FlowFile newflowFile = session.create(); + if (!createdFlowFile) { + newflowFile = session.putAllAttributes(newflowFile, flowFile.getAttributes()); + } + this.flowFile = newflowFile; this.flowFiles.add(this.flowFile); } + this.org = org; this.recordsPerFlowFile = recordsPerFlowFile; this.query = query; @@ -375,7 +380,9 @@ private void beforeOnResponse() { recordIndex++; if (recordsPerFlowFile != -1 && recordIndex > recordsPerFlowFile) { closeRecordWriter(); - flowFile = session.create(); + FlowFile newflowFile = session.create(); + newflowFile = session.putAllAttributes(newflowFile, flowFile.getAttributes()); + flowFile = newflowFile; flowFiles.add(flowFile); recordIndex = 1; } diff --git a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestGetInfluxDatabaseRecord_2.java b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestGetInfluxDatabaseRecord_2.java index 027b6b0..db36fd3 100644 --- a/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestGetInfluxDatabaseRecord_2.java +++ b/nifi-influx-database-processors/src/test/java/org/influxdata/nifi/processors/TestGetInfluxDatabaseRecord_2.java @@ -23,12 +23,17 @@ import com.influxdb.query.FluxRecord; +import java.util.Map; +import java.util.HashMap; + +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.util.MockFlowFile; import org.junit.Assert; import org.junit.Test; @@ -82,6 +87,45 @@ public void success() { Assert.assertEquals("InfluxDB rocks!", nifiRecord.getValue("string_value")); } + + @Test + public void singleFlowFileAttributes() { + + // Create input flowfile + Map inputAttributes = new HashMap<>(); + inputAttributes.put("Testattribute", "testvalue"); + runner.enqueue("someContent", inputAttributes); + + // Add FluxRecords to queryOnResponseRecords + FluxRecord fluxRecord1 = new FluxRecord(0); + fluxRecord1.getValues().put("value", 1L); + + queryOnResponseRecords.add(fluxRecord1); + + runner.run(); + + // Verify that the flowfile has the expected attributes + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); + Assert.assertEquals(1, writer.getRecordsWritten().size()); + + for (FlowFile outputFlowFile: runner.getFlowFilesForRelationship(REL_SUCCESS)) + { + Map outputAttributes = outputFlowFile.getAttributes(); + + for (Map.Entry entry : inputAttributes.entrySet()) { + Assert.assertTrue(outputAttributes.containsKey(entry.getKey())); + Assert.assertEquals(entry.getValue(), outputAttributes.get(entry.getKey())); + } + } + + Record nifiRecord = writer.getRecordsWritten().get(0); + RecordSchema schema = nifiRecord.getSchema(); + + Assert.assertEquals(1, schema.getFieldCount()); + Assert.assertEquals(RecordFieldType.LONG, schema.getField("value").get().getDataType().getFieldType()); + Assert.assertEquals(1L, nifiRecord.getValue("value")); + } + @Test public void moreRecords() { @@ -197,6 +241,69 @@ public void useMoreFlowFiles() { } + @Test + public void useMoreFlowFilesWithAttributes() { + + FluxRecord fluxRecord1 = new FluxRecord(0); + fluxRecord1.getValues().put("value", 1L); + + FluxRecord fluxRecord2 = new FluxRecord(0); + fluxRecord2.getValues().put("value", 2L); + + FluxRecord fluxRecord3 = new FluxRecord(0); + fluxRecord3.getValues().put("value", 3L); + + FluxRecord fluxRecord4 = new FluxRecord(0); + fluxRecord4.getValues().put("value", 4L); + + FluxRecord fluxRecord5 = new FluxRecord(0); + fluxRecord5.getValues().put("value", 5L); + + FluxRecord fluxRecord6 = new FluxRecord(0); + fluxRecord6.getValues().put("value", 6L); + + queryOnResponseRecords.add(fluxRecord1); + queryOnResponseRecords.add(fluxRecord2); + queryOnResponseRecords.add(fluxRecord3); + queryOnResponseRecords.add(fluxRecord4); + queryOnResponseRecords.add(fluxRecord5); + queryOnResponseRecords.add(fluxRecord6); + + // Create input flowfile + Map inputAttributes = new HashMap<>(); + inputAttributes.put("Testattribute", "testvalue"); + runner.enqueue("someContent", inputAttributes); + + runner.setProperty(GetInfluxDatabase_2.RECORDS_PER_FLOWFILE, "2"); + + runner.enqueue(""); + runner.run(); + + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 4); + Assert.assertEquals(6, writer.getRecordsWritten().size()); + + List outputFlowFiles = runner.getFlowFilesForRelationship(REL_SUCCESS); + + for (FlowFile outputFlowFile: runner.getFlowFilesForRelationship(REL_SUCCESS)) + { + Map outputAttributes = outputFlowFile.getAttributes(); + + for (Map.Entry entry : inputAttributes.entrySet()) { + Assert.assertTrue(outputAttributes.containsKey(entry.getKey())); + Assert.assertEquals(entry.getValue(), outputAttributes.get(entry.getKey())); + } + } + + for (int i = 0; i < 6; i++) { + Record nifiRecord = writer.getRecordsWritten().get(i); + RecordSchema schema = nifiRecord.getSchema(); + + Assert.assertEquals(RecordFieldType.LONG, schema.getField("value").get().getDataType().getFieldType()); + Assert.assertEquals((long) i + 1, nifiRecord.getValue("value")); + } + + } + @Test public void containsProvenanceReport() {