diff --git a/data-prepper-plugins/kafka-plugins/build.gradle b/data-prepper-plugins/kafka-plugins/build.gradle index 2cf69d4054..3891ec7e64 100644 --- a/data-prepper-plugins/kafka-plugins/build.gradle +++ b/data-prepper-plugins/kafka-plugins/build.gradle @@ -32,6 +32,13 @@ dependencies { implementation 'software.amazon.glue:schema-registry-serde:1.1.15' implementation 'com.amazonaws:aws-java-sdk-glue:1.12.506' implementation 'io.confluent:kafka-json-schema-serializer:7.4.0' + implementation project(':data-prepper-plugins:failures-common') + implementation 'org.apache.kafka:connect-json:3.4.0' + implementation 'com.github.fge:json-schema-validator:2.2.14' + implementation 'commons-collections:commons-collections:3.2.2' + implementation 'software.amazon.awssdk:s3' + implementation 'software.amazon.awssdk:apache-client' + testImplementation 'org.mockito:mockito-inline:4.1.0' testImplementation 'org.yaml:snakeyaml:2.0' testImplementation testLibs.spring.test @@ -48,13 +55,13 @@ dependencies { testImplementation 'org.apache.kafka:kafka-clients:3.4.0:test' testImplementation 'org.apache.kafka:connect-json:3.4.0' testImplementation('com.kjetland:mbknor-jackson-jsonschema_2.13:1.0.39') - implementation project(':data-prepper-plugins:failures-common') testImplementation group: 'org.powermock', name: 'powermock-api-mockito2', version: '2.0.9' - implementation 'org.apache.kafka:connect-json:3.4.0' - implementation 'com.github.fge:json-schema-validator:2.2.14' - implementation 'commons-collections:commons-collections:3.2.2' - implementation 'software.amazon.awssdk:s3' - implementation 'software.amazon.awssdk:apache-client' + testImplementation project(':data-prepper-plugins:otel-metrics-source') + testImplementation project(':data-prepper-plugins:otel-proto-common') + testImplementation libs.opentelemetry.proto + testImplementation libs.protobuf.util + testImplementation libs.commons.io + testImplementation libs.armeria.grpc } diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferOTelIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferOTelIT.java new file mode 100644 index 0000000000..9588ccc3bb --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferOTelIT.java @@ -0,0 +1,179 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.buffer; + +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.CheckpointState; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.metric.Metric; +import org.opensearch.dataprepper.model.metric.JacksonMetric; +import org.opensearch.dataprepper.model.metric.JacksonGauge; +import org.opensearch.dataprepper.model.metric.JacksonSum; +import org.opensearch.dataprepper.model.metric.JacksonHistogram; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; +import org.opensearch.dataprepper.model.buffer.DelegatingBuffer; +import org.opensearch.dataprepper.model.buffer.Buffer; + + +import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.BufferedReader; + +import org.opensearch.dataprepper.plugins.otel.codec.OTelMetricDecoder; +import org.opensearch.dataprepper.plugins.source.otelmetrics.OTelMetricsGrpcService; +import com.google.protobuf.util.JsonFormat; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class KafkaBufferOTelIT { + private static final String TEST_REQUEST_MULTIPLE_METRICS_FILE = "test-request-multiple-metrics.json"; + private static final Logger LOG = LoggerFactory.getLogger(KafkaBufferIT.class); + @Mock + private PluginSetting pluginSetting; + @Mock + private KafkaBufferConfig kafkaBufferConfig; + @Mock + private PluginFactory pluginFactory; + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + @Mock + private BufferTopicConfig topicConfig; + + private DelegatingBuffer buffer; + + private PluginMetrics pluginMetrics; + private String bootstrapServersCommaDelimited; + private OTelMetricsGrpcService oTelMetricsGrpcService; + class KafkaDelegatingBuffer extends DelegatingBuffer { + KafkaDelegatingBuffer(Buffer buffer) { + super(buffer); + } + }; + + @BeforeEach + void setUp() { + pluginMetrics = PluginMetrics.fromNames(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + + when(pluginSetting.getPipelineName()).thenReturn(UUID.randomUUID().toString()); + + String topicName = "buffer-" + RandomStringUtils.randomAlphabetic(5); + when(topicConfig.getName()).thenReturn(topicName); + when(topicConfig.getGroupId()).thenReturn("buffergroup-" + RandomStringUtils.randomAlphabetic(6)); + when(topicConfig.isCreateTopic()).thenReturn(true); + when(topicConfig.getSerdeFormat()).thenReturn(MessageFormat.BYTES); + when(topicConfig.getWorkers()).thenReturn(1); + when(topicConfig.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5)); + when(topicConfig.getConsumerMaxPollRecords()).thenReturn(1); + when(topicConfig.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); + when(topicConfig.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3)); + when(topicConfig.getAutoCommit()).thenReturn(false); + when(topicConfig.getAutoOffsetReset()).thenReturn("earliest"); + when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1)); + when(kafkaBufferConfig.getTopic()).thenReturn(topicConfig); + + EncryptionConfig encryptionConfig = mock(EncryptionConfig.class); + + bootstrapServersCommaDelimited = System.getProperty("tests.kafka.bootstrap_servers"); + + LOG.info("Using Kafka bootstrap servers: {}", bootstrapServersCommaDelimited); + + when(kafkaBufferConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServersCommaDelimited)); + when(kafkaBufferConfig.getEncryptionConfig()).thenReturn(encryptionConfig); + } + + private String getFileAsJsonString(String requestJsonFileName) throws IOException { + final StringBuilder jsonBuilder = new StringBuilder(); + try (final InputStream inputStream = Objects.requireNonNull( + KafkaBufferOTelIT.class.getClassLoader().getResourceAsStream(requestJsonFileName))) { + final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); + bufferedReader.lines().forEach(jsonBuilder::append); + } + return jsonBuilder.toString(); + } + + private ExportMetricsServiceRequest buildExportMetricsServiceRequestFromJsonFile(String requestJsonFileName) throws IOException { + final ExportMetricsServiceRequest.Builder builder = ExportMetricsServiceRequest.newBuilder(); + JsonFormat.parser().merge(getFileAsJsonString(requestJsonFileName), builder); + return builder.build(); + } + + @Test + void test_otel_metrics_with_kafka_buffer() throws Exception { + KafkaBuffer kafkaBuffer = new KafkaBuffer(pluginSetting, kafkaBufferConfig, pluginFactory, acknowledgementSetManager, pluginMetrics, new OTelMetricDecoder(), null, null); + buffer = new KafkaDelegatingBuffer(kafkaBuffer); + oTelMetricsGrpcService = new OTelMetricsGrpcService(10000, + buffer, + pluginMetrics); + + final ExportMetricsServiceRequest request = buildExportMetricsServiceRequestFromJsonFile(TEST_REQUEST_MULTIPLE_METRICS_FILE); + oTelMetricsGrpcService.rawExport(request); + Map.Entry>, CheckpointState> readResult = kafkaBuffer.read(10_000); + assertThat(readResult, notNullValue()); + assertThat(readResult.getKey(), notNullValue()); + assertThat(readResult.getKey().size(), equalTo(3)); + for (Record record : readResult.getKey()) { + Event event = record.getData(); + JacksonMetric metric = (JacksonMetric) (JacksonEvent)(Event)record.getData(); + if (metric.getKind().equals(Metric.KIND.GAUGE.toString())) { + assertThat(metric.getUnit(), equalTo("1")); + assertThat(metric.getName(), equalTo("counter-int")); + JacksonGauge gauge = (JacksonGauge)metric; + assertThat(gauge.getValue(), equalTo(123.0)); + } else if (metric.getKind().equals(Metric.KIND.SUM.toString())) { + assertThat(metric.getUnit(), equalTo("1")); + assertThat(metric.getName(), equalTo("sum-int")); + JacksonSum sum = (JacksonSum)metric; + assertThat(sum.getValue(), equalTo(456.0)); + } else if (metric.getKind().equals(Metric.KIND.HISTOGRAM.toString())) { + assertThat(metric.getUnit(), equalTo("1")); + assertThat(metric.getName(), equalTo("histogram-int")); + JacksonHistogram histogram = (JacksonHistogram)metric; + assertThat(histogram.getSum(), equalTo(100.0)); + assertThat(histogram.getCount(), equalTo(30L)); + assertThat(histogram.getExemplars(), equalTo(Collections.emptyList())); + assertThat(histogram.getExplicitBoundsList(), equalTo(List.of(1.0, 2.0, 3.0, 4.0))); + assertThat(histogram.getExplicitBoundsCount(), equalTo(4)); + assertThat(histogram.getBucketCountsList(), equalTo(List.of(3L, 5L, 15L, 6L, 1L))); + assertThat(histogram.getBucketCount(), equalTo(5)); + assertThat(histogram.getAggregationTemporality(), equalTo("AGGREGATION_TEMPORALITY_CUMULATIVE")); + } else { + assertTrue("FAILED".equals("Unknown Metric type")); + } + } + } + +} + diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/resources/test-request-multiple-metrics.json b/data-prepper-plugins/kafka-plugins/src/integrationTest/resources/test-request-multiple-metrics.json new file mode 100644 index 0000000000..ce87c693d3 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/resources/test-request-multiple-metrics.json @@ -0,0 +1,130 @@ +{ + "resourceMetrics": [ + { + "resource": { + "attributes": [ + { + "key": "resource-attr", + "value": { + "stringValue": "resource-attr-val-1" + } + } + ] + }, + "scopeMetrics": [ + { + "scope": {}, + "metrics": [ + { + "name": "counter-int", + "unit": 1, + "gauge": { + "dataPoints": [ + { + "attributes": [ + { + "key": "label-1", + "value": { + "stringValue": "label-value-1" + } + } + ], + "startTimeUnixNano": "1581452773000000789", + "timeUnixNano": "1581452773000000789", + "asInt": "123" + } + ] + } + } + ] + } + ] + }, + { + "resource": { + "attributes": [ + { + "key": "resource-attr", + "value": { + "stringValue": "resource-attr-val-2" + } + } + ] + }, + "scopeMetrics": [ + { + "scope": {}, + "metrics": [ + { + "name": "histogram-int", + "unit": 1, + "histogram": { + "dataPoints": [ + { + "attributes": [ + { + "key": "label-1", + "value": { + "stringValue": "label-value-1" + } + } + ], + "startTimeUnixNano": "1581452773000000789", + "timeUnixNano": "1581452773000000789", + "count": "30", + "sum": "100", + "bucket_counts": [3, 5, 15, 6, 1], + "explicit_bounds": [1.0, 2.0, 3.0, 4.0], + "exemplars": [] + } + ], + "aggregationTemporality":"2" + } + } + ] + } + ] + }, + { + "resource": { + "attributes": [ + { + "key": "resource-attr", + "value": { + "stringValue": "resource-attr-val-3" + } + } + ] + }, + "scopeMetrics": [ + { + "scope": {}, + "metrics": [ + { + "name": "sum-int", + "unit": 1, + "sum": { + "dataPoints": [ + { + "attributes": [ + { + "key": "label-1", + "value": { + "stringValue": "label-value-1" + } + } + ], + "startTimeUnixNano": "1581452773000000789", + "timeUnixNano": "1581452773000000789", + "asInt": "456" + } + ] + } + } + ] + } + ] + } + ] +} + diff --git a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsGrpcService.java b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsGrpcService.java index 3c500f471b..4a63dabac0 100644 --- a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsGrpcService.java +++ b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsGrpcService.java @@ -92,7 +92,11 @@ private void processRequest(final ExportLogsServiceRequest request, final Stream final List> records = logs.stream().map(log -> new Record(log)).collect(Collectors.toList()); try { - buffer.writeAll(records, bufferWriteTimeoutInMillis); + if (buffer.isByteBuffer()) { + buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis); + } else { + buffer.writeAll(records, bufferWriteTimeoutInMillis); + } } catch (Exception e) { if (ServiceRequestContext.current().isTimedOut()) { LOG.warn("Exception writing to buffer but request already timed out.", e); diff --git a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java index 9fc0e12077..2d560c7e0d 100644 --- a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java +++ b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java @@ -36,6 +36,8 @@ import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; import org.opensearch.dataprepper.plugins.certificate.model.Certificate; import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec; +import org.opensearch.dataprepper.model.codec.ByteDecoder; +import org.opensearch.dataprepper.plugins.otel.codec.OTelLogsDecoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +60,7 @@ public class OTelLogsSource implements Source> { private final CertificateProviderFactory certificateProviderFactory; private final GrpcRequestExceptionHandler requestExceptionHandler; private Server server; + private ByteDecoder byteDecoder; @DataPrepperPluginConstructor public OTelLogsSource(final OTelLogsSourceConfig oTelLogsSourceConfig, @@ -77,6 +80,12 @@ public OTelLogsSource(final OTelLogsSourceConfig oTelLogsSourceConfig, this.pipelineName = pipelineDescription.getPipelineName(); this.authenticationProvider = createAuthenticationProvider(pluginFactory); this.requestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics); + this.byteDecoder = new OTelLogsDecoder(); + } + + @Override + public ByteDecoder getDecoder() { + return byteDecoder; } @Override diff --git a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsGrpcServiceTest.java b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsGrpcServiceTest.java index 74895fbe04..a5a9353981 100644 --- a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsGrpcServiceTest.java +++ b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsGrpcServiceTest.java @@ -43,6 +43,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doAnswer; @@ -93,6 +94,9 @@ public class OTelLogsGrpcServiceTest { @Captor ArgumentCaptor>> recordsCaptor; + @Captor + ArgumentCaptor bytesCaptor; + private OTelLogsGrpcService objectUnderTest; @BeforeEach @@ -136,6 +140,31 @@ public void export_responseObserverOnCompleted() throws Exception { final List> capturedRecords = (List>) recordsCaptor.getValue(); assertThat(capturedRecords.size(), equalTo(1)); } + + @Test + public void export_with_ByteBuffer_responseObserverOnCompleted() throws Exception { + when(buffer.isByteBuffer()).thenReturn(true); + objectUnderTest = generateOTelLogsGrpcService(new OTelProtoCodec.OTelProtoDecoder()); + + try (MockedStatic mockedStatic = mockStatic(ServiceRequestContext.class)) { + mockedStatic.when(ServiceRequestContext::current).thenReturn(serviceRequestContext); + objectUnderTest.export(LOGS_REQUEST, responseObserver); + } + + verify(buffer, times(1)).writeBytes(bytesCaptor.capture(), eq(null), anyInt()); + verify(responseObserver, times(1)).onNext(ExportLogsServiceResponse.newBuilder().build()); + verify(responseObserver, times(1)).onCompleted(); + verify(requestsReceivedCounter, times(1)).increment(); + verify(successRequestsCounter, times(1)).increment(); + final ArgumentCaptor payloadLengthCaptor = ArgumentCaptor.forClass(Double.class); + verify(payloadSizeSummary, times(1)).record(payloadLengthCaptor.capture()); + assertThat(payloadLengthCaptor.getValue().intValue(), equalTo(LOGS_REQUEST.getSerializedSize())); + verify(requestProcessDuration, times(1)).record(ArgumentMatchers.any()); + + final byte[] capturedBytes = (byte[]) bytesCaptor.getValue(); + assertThat(capturedBytes.length, equalTo(LOGS_REQUEST.toByteArray().length)); + } + @Test public void export_BufferTimeout_responseObserverOnError() throws Exception { objectUnderTest = generateOTelLogsGrpcService(new OTelProtoCodec.OTelProtoDecoder()); diff --git a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java index 5543e7c21e..f31dfe03f7 100644 --- a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java +++ b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java @@ -69,6 +69,7 @@ import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.opensearch.dataprepper.plugins.health.HealthGrpcService; import org.opensearch.dataprepper.plugins.source.otellogs.certificate.CertificateProviderFactory; +import org.opensearch.dataprepper.plugins.otel.codec.OTelLogsDecoder; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -209,6 +210,7 @@ public void afterEach() { private void configureObjectUnderTest() { SOURCE = new OTelLogsSource(oTelLogsSourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + assertTrue(SOURCE.getDecoder() instanceof OTelLogsDecoder); } @Test diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java index 0177a57584..b4c45e5a05 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java @@ -51,6 +51,14 @@ public OTelMetricsGrpcService(int bufferWriteTimeoutInMillis, requestProcessDuration = pluginMetrics.timer(REQUEST_PROCESS_DURATION); } + public void rawExport(final ExportMetricsServiceRequest request) { + try { + if (buffer.isByteBuffer()) { + buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis); + } + } catch (Exception e) { + } + } @Override public void export(final ExportMetricsServiceRequest request, final StreamObserver responseObserver) { diff --git a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcServiceTest.java b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcServiceTest.java index 66eff7794a..f257f64f83 100644 --- a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcServiceTest.java +++ b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcServiceTest.java @@ -37,6 +37,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doAnswer; @@ -77,6 +78,9 @@ public class OTelMetricsGrpcServiceTest { @Captor private ArgumentCaptor recordCaptor; + @Captor + ArgumentCaptor bytesCaptor; + private OTelMetricsGrpcService sut; @BeforeEach @@ -122,6 +126,29 @@ public void export_Success_responseObserverOnCompleted() throws Exception { assertEquals(METRICS_REQUEST, capturedRecord.getData()); } + @Test + public void export_Success_with_ByteBuffer_responseObserverOnCompleted() throws Exception { + when(buffer.isByteBuffer()).thenReturn(true); + try (MockedStatic mockedStatic = mockStatic(ServiceRequestContext.class)) { + mockedStatic.when(ServiceRequestContext::current).thenReturn(serviceRequestContext); + sut.export(METRICS_REQUEST, responseObserver); + } + + verify(buffer, times(1)).writeBytes(bytesCaptor.capture(), eq(null), anyInt()); + verify(responseObserver, times(1)).onNext(ExportMetricsServiceResponse.newBuilder().build()); + verify(responseObserver, times(1)).onCompleted(); + verify(requestsReceivedCounter, times(1)).increment(); + verify(successRequestsCounter, times(1)).increment(); + + final ArgumentCaptor payloadLengthCaptor = ArgumentCaptor.forClass(Double.class); + verify(payloadSize, times(1)).record(payloadLengthCaptor.capture()); + assertThat(payloadLengthCaptor.getValue().intValue(), equalTo(METRICS_REQUEST.getSerializedSize())); + verify(requestProcessDuration, times(1)).record(ArgumentMatchers.any()); + + final byte[] capturedBytes = (byte[]) bytesCaptor.getValue(); + assertThat(capturedBytes.length, equalTo(METRICS_REQUEST.toByteArray().length)); + } + @Test public void export_BufferTimeout_responseObserverOnError() throws Exception { doThrow(new TimeoutException()).when(buffer).write(any(Record.class), anyInt()); diff --git a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java index f82c8ea7d4..0a537f9c40 100644 --- a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java +++ b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java @@ -116,6 +116,7 @@ import static org.opensearch.dataprepper.plugins.source.otelmetrics.OTelMetricsSourceConfig.DEFAULT_PORT; import static org.opensearch.dataprepper.plugins.source.otelmetrics.OTelMetricsSourceConfig.DEFAULT_REQUEST_TIMEOUT_MS; import static org.opensearch.dataprepper.plugins.source.otelmetrics.OTelMetricsSourceConfig.SSL; +import org.opensearch.dataprepper.plugins.otel.codec.OTelMetricDecoder; @ExtendWith(MockitoExtension.class) class OTelMetricsSourceTest { @@ -200,6 +201,7 @@ public void beforeEach() { pipelineDescription = mock(PipelineDescription.class); when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); SOURCE = new OTelMetricsSource(oTelMetricsSourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + assertTrue(SOURCE.getDecoder() instanceof OTelMetricDecoder); } @AfterEach @@ -209,6 +211,7 @@ public void afterEach() { private void configureObjectUnderTest() { SOURCE = new OTelMetricsSource(oTelMetricsSourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + assertTrue(SOURCE.getDecoder() instanceof OTelMetricDecoder); } @Test @@ -576,6 +579,7 @@ void createObjectUnderTest() { oTelMetricsSourceConfig = OBJECT_MAPPER.convertValue(testPluginSetting.getSettings(), OTelMetricsSourceConfig.class); SOURCE = new OTelMetricsSource(oTelMetricsSourceConfig, pluginMetrics, pluginFactory, certificateProviderFactory, pipelineDescription); + assertTrue(SOURCE.getDecoder() instanceof OTelMetricDecoder); } @Test diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsDecoder.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsDecoder.java new file mode 100644 index 0000000000..6c491e7510 --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsDecoder.java @@ -0,0 +1,35 @@ +/* + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * */ + +package org.opensearch.dataprepper.plugins.otel.codec; + +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; +import org.opensearch.dataprepper.model.codec.ByteDecoder; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.log.OpenTelemetryLog; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.io.IOException; +import java.io.InputStream; +import java.util.function.Consumer; + + +public class OTelLogsDecoder implements ByteDecoder { + private final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder; + public OTelLogsDecoder() { + otelProtoDecoder = new OTelProtoCodec.OTelProtoDecoder(); + } + public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { + ExportLogsServiceRequest request = ExportLogsServiceRequest.parseFrom(inputStream); + AtomicInteger droppedCounter = new AtomicInteger(0); + List logs = otelProtoDecoder.parseExportLogsServiceRequest(request); + for (OpenTelemetryLog log: logs) { + eventConsumer.accept(new Record<>(log)); + } + } + +} diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricDecoder.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricDecoder.java index bdb51cada1..1918842c22 100644 --- a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricDecoder.java +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricDecoder.java @@ -9,7 +9,6 @@ import org.opensearch.dataprepper.model.codec.ByteDecoder; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.metric.Metric; @@ -31,8 +30,7 @@ public void parse(InputStream inputStream, Consumer> eventConsumer Collection> records = otelProtoDecoder.parseExportMetricsServiceRequest(request, droppedCounter, OTelProtoCodec.DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE, true, true, false); for (Record record: records) { - final JacksonEvent event = JacksonEvent.fromEvent(record.getData()); - eventConsumer.accept(new Record<>(event)); + eventConsumer.accept((Record)record); } } diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java index 29a58be6df..75c8ebb81c 100644 --- a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java @@ -221,13 +221,17 @@ protected Collection parseResourceLogs(ResourceLogs rs) { protected Map splitResourceSpansByTraceId(final ResourceSpans resourceSpans) { final Resource resource = resourceSpans.getResource(); + final boolean hasResource = resourceSpans.hasResource(); Map result = new HashMap<>(); Map resultBuilderMap = new HashMap<>(); if (resourceSpans.getScopeSpansList().size() > 0) { for (Map.Entry> entry: splitScopeSpansByTraceId(resourceSpans.getScopeSpansList()).entrySet()) { - ResourceSpans.Builder b = ResourceSpans.newBuilder().setResource(resource).addAllScopeSpans(entry.getValue()); - resultBuilderMap.put(entry.getKey(), b); + ResourceSpans.Builder resourceSpansBuilder = ResourceSpans.newBuilder().addAllScopeSpans(entry.getValue()); + if (hasResource) { + resourceSpansBuilder.setResource(resource); + } + resultBuilderMap.put(entry.getKey(), resourceSpansBuilder); } } @@ -238,7 +242,10 @@ protected Map splitResourceSpansByTraceId(final ResourceS if (resultBuilderMap.containsKey(traceId)) { resourceSpansBuilder = resultBuilderMap.get(traceId); } else { - resourceSpansBuilder = ResourceSpans.newBuilder().setResource(resource); + resourceSpansBuilder = ResourceSpans.newBuilder(); + if (hasResource) { + resourceSpansBuilder.setResource(resource); + } resultBuilderMap.put(traceId, resourceSpansBuilder); } resourceSpansBuilder.addAllInstrumentationLibrarySpans(entry.getValue()); @@ -278,8 +285,13 @@ private List parseScopeSpans(final List scopeSpansList, final private Map> splitScopeSpansByTraceId(final List scopeSpansList) { Map> result = new HashMap<>(); for (ScopeSpans ss: scopeSpansList) { + final boolean hasScope = ss.hasScope(); + final io.opentelemetry.proto.common.v1.InstrumentationScope scope = ss.getScope(); for (Map.Entry> entry: splitSpansByTraceId(ss.getSpansList()).entrySet()) { - ScopeSpans.Builder scopeSpansBuilder = ScopeSpans.newBuilder().setScope(ss.getScope()).addAllSpans(entry.getValue()); + ScopeSpans.Builder scopeSpansBuilder = ScopeSpans.newBuilder().addAllSpans(entry.getValue()); + if (hasScope) { + scopeSpansBuilder.setScope(scope); + } String traceId = entry.getKey(); if (!result.containsKey(traceId)) { result.put(traceId, new ArrayList<>()); @@ -303,9 +315,15 @@ private List parseInstrumentationLibrarySpans(final List> splitInstrumentationLibrarySpansByTraceId(final List instrumentationLibrarySpansList) { Map> result = new HashMap<>(); for (InstrumentationLibrarySpans is: instrumentationLibrarySpansList) { + final boolean hasInstrumentationLibrary = is.hasInstrumentationLibrary(); + final io.opentelemetry.proto.common.v1.InstrumentationLibrary instrumentationLibrary = is.getInstrumentationLibrary(); for (Map.Entry> entry: splitSpansByTraceId(is.getSpansList()).entrySet()) { String traceId = entry.getKey(); - InstrumentationLibrarySpans.Builder ilSpansBuilder = InstrumentationLibrarySpans.newBuilder().setInstrumentationLibrary(is.getInstrumentationLibrary()).addAllSpans(entry.getValue()); + InstrumentationLibrarySpans.Builder ilSpansBuilder = InstrumentationLibrarySpans.newBuilder().setSchemaUrl(is.getSchemaUrl()).addAllSpans(entry.getValue()); + if (hasInstrumentationLibrary) { + ilSpansBuilder.setInstrumentationLibrary(instrumentationLibrary); + } + if (!result.containsKey(traceId)) { result.put(traceId, new ArrayList<>()); } diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTraceDecoder.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTraceDecoder.java new file mode 100644 index 0000000000..47c3fd03e9 --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTraceDecoder.java @@ -0,0 +1,37 @@ + +/* + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * */ + +package org.opensearch.dataprepper.plugins.otel.codec; + +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import org.opensearch.dataprepper.model.codec.ByteDecoder; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.trace.Span; + + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.io.IOException; +import java.io.InputStream; +import java.util.function.Consumer; + + +public class OTelTraceDecoder implements ByteDecoder { + private final OTelProtoCodec.OTelProtoDecoder otelProtoDecoder; + public OTelTraceDecoder() { + otelProtoDecoder = new OTelProtoCodec.OTelProtoDecoder(); + } + public void parse(InputStream inputStream, Consumer> eventConsumer) throws IOException { + ExportTraceServiceRequest request = ExportTraceServiceRequest.parseFrom(inputStream); + AtomicInteger droppedCounter = new AtomicInteger(0); + List spans = + otelProtoDecoder.parseExportTraceServiceRequest(request); + for (Span span: spans) { + eventConsumer.accept(new Record<>(span)); + } + } +} diff --git a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsDecoderTest.java b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsDecoderTest.java new file mode 100644 index 0000000000..8fef72b2fa --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelLogsDecoderTest.java @@ -0,0 +1,74 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.otel.codec; + +import org.junit.jupiter.api.Test; +import java.io.InputStream; +import java.io.ByteArrayInputStream; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.IOException; + +import java.util.Objects; +import java.util.Map; +import com.google.protobuf.util.JsonFormat; +import org.opensearch.dataprepper.model.log.OpenTelemetryLog; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; + +public class OTelLogsDecoderTest { + private static final String TEST_REQUEST_LOGS_FILE = "test-request-multiple-logs.json"; + + public OTelLogsDecoder createObjectUnderTest() { + return new OTelLogsDecoder(); + } + + private String getFileAsJsonString(String requestJsonFileName) throws IOException { + final StringBuilder jsonBuilder = new StringBuilder(); + try (final InputStream inputStream = Objects.requireNonNull( + OTelLogsDecoderTest.class.getClassLoader().getResourceAsStream(requestJsonFileName))) { + final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); + bufferedReader.lines().forEach(jsonBuilder::append); + } + return jsonBuilder.toString(); + } + + private ExportLogsServiceRequest buildExportLogsServiceRequestFromJsonFile(String requestJsonFileName) throws IOException { + final ExportLogsServiceRequest.Builder builder = ExportLogsServiceRequest.newBuilder(); + JsonFormat.parser().merge(getFileAsJsonString(requestJsonFileName), builder); + return builder.build(); + } + + private void validateLog(OpenTelemetryLog logRecord) { + assertThat(logRecord.getServiceName(), is("service")); + assertThat(logRecord.getTime(), is("2020-05-24T14:00:00Z")); + assertThat(logRecord.getObservedTime(), is("2020-05-24T14:00:02Z")); + assertThat(logRecord.getBody(), is("Log value")); + assertThat(logRecord.getDroppedAttributesCount(), is(3)); + assertThat(logRecord.getSchemaUrl(), is("schemaurl")); + assertThat(logRecord.getSeverityNumber(), is(5)); + assertThat(logRecord.getSeverityText(), is("Severity value")); + assertThat(logRecord.getTraceId(), is("ba1a1c23b4093b63")); + assertThat(logRecord.getSpanId(), is("2cc83ac90ebc469c")); + Map mergedAttributes = logRecord.getAttributes(); + assertThat(mergedAttributes.keySet().size(), is(2)); + assertThat(mergedAttributes.get("log.attributes.statement@params"), is("us-east-1")); + assertThat(mergedAttributes.get("resource.attributes.service@name"), is("service")); + } + + @Test + public void testParse() throws Exception { + final ExportLogsServiceRequest request = buildExportLogsServiceRequestFromJsonFile(TEST_REQUEST_LOGS_FILE); + InputStream inputStream = new ByteArrayInputStream((byte[])request.toByteArray()); + createObjectUnderTest().parse(inputStream, (record) -> { + validateLog((OpenTelemetryLog)record.getData()); + }); + + } +} + diff --git a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricsDecoderTest.java b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricsDecoderTest.java new file mode 100644 index 0000000000..c6bdd9cc89 --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelMetricsDecoderTest.java @@ -0,0 +1,97 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.otel.codec; + +import java.io.InputStream; +import java.io.ByteArrayInputStream; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.IOException; + +import com.google.protobuf.util.JsonFormat; + +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.metric.Metric; +import org.opensearch.dataprepper.model.metric.JacksonMetric; +import org.opensearch.dataprepper.model.metric.JacksonGauge; +import org.opensearch.dataprepper.model.metric.JacksonSum; +import org.opensearch.dataprepper.model.metric.JacksonHistogram; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +public class OTelMetricsDecoderTest { + private static final String TEST_REQUEST_METRICS_FILE = "test-request-multiple-metrics.json"; + + public OTelMetricDecoder createObjectUnderTest() { + return new OTelMetricDecoder(); + } + + private String getFileAsJsonString(String requestJsonFileName) throws IOException { + final StringBuilder jsonBuilder = new StringBuilder(); + try (final InputStream inputStream = Objects.requireNonNull( + OTelMetricsDecoderTest.class.getClassLoader().getResourceAsStream(requestJsonFileName))) { + final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); + bufferedReader.lines().forEach(jsonBuilder::append); + } + return jsonBuilder.toString(); + } + + private ExportMetricsServiceRequest buildExportMetricsServiceRequestFromJsonFile(String requestJsonFileName) throws IOException { + final ExportMetricsServiceRequest.Builder builder = ExportMetricsServiceRequest.newBuilder(); + JsonFormat.parser().merge(getFileAsJsonString(requestJsonFileName), builder); + return builder.build(); + } + + private void validateMetric(Event event) { + JacksonMetric metric = (JacksonMetric)event; + String kind = metric.getKind(); + assertTrue(kind.equals(Metric.KIND.GAUGE.toString()) || kind.equals(Metric.KIND.SUM.toString()) || kind.equals(Metric.KIND.HISTOGRAM.toString())); + if (metric.getKind().equals(Metric.KIND.GAUGE.toString())) { + assertThat(metric.getUnit(), equalTo("1")); + assertThat(metric.getName(), equalTo("counter-int")); + JacksonGauge gauge = (JacksonGauge)metric; + assertThat(gauge.getValue(), equalTo(123.0)); + } else if (metric.getKind().equals(Metric.KIND.SUM.toString())) { + assertThat(metric.getUnit(), equalTo("1")); + assertThat(metric.getName(), equalTo("sum-int")); + JacksonSum sum = (JacksonSum)metric; + assertThat(sum.getValue(), equalTo(456.0)); + } else { // Histogram + assertThat(metric.getUnit(), equalTo("1")); + assertThat(metric.getName(), equalTo("histogram-int")); + JacksonHistogram histogram = (JacksonHistogram)metric; + assertThat(histogram.getSum(), equalTo(100.0)); + assertThat(histogram.getCount(), equalTo(30L)); + assertThat(histogram.getExemplars(), equalTo(Collections.emptyList())); + assertThat(histogram.getExplicitBoundsList(), equalTo(List.of(1.0, 2.0, 3.0, 4.0))); + assertThat(histogram.getExplicitBoundsCount(), equalTo(4)); + assertThat(histogram.getBucketCountsList(), equalTo(List.of(3L, 5L, 15L, 6L, 1L))); + assertThat(histogram.getBucketCount(), equalTo(5)); + assertThat(histogram.getAggregationTemporality(), equalTo("AGGREGATION_TEMPORALITY_CUMULATIVE")); + } + } + + @Test + public void testParse() throws Exception { + final ExportMetricsServiceRequest request = buildExportMetricsServiceRequestFromJsonFile(TEST_REQUEST_METRICS_FILE); + InputStream inputStream = new ByteArrayInputStream((byte[])request.toByteArray()); + createObjectUnderTest().parse(inputStream, (record) -> { + validateMetric((Event)record.getData()); + }); + + } +} + + diff --git a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTraceDecoderTest.java b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTraceDecoderTest.java new file mode 100644 index 0000000000..a6cc6d122b --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelTraceDecoderTest.java @@ -0,0 +1,75 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.otel.codec; + +import org.junit.jupiter.api.Test; +import java.io.InputStream; +import java.io.ByteArrayInputStream; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.IOException; + +import java.util.Objects; +import com.google.protobuf.util.JsonFormat; +import org.opensearch.dataprepper.model.trace.Span; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.nio.charset.StandardCharsets; +import org.apache.commons.codec.binary.Hex; + +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; + +public class OTelTraceDecoderTest { + private static final String TEST_REQUEST_TRACES_FILE = "test-request-multiple-traces.json"; + + public OTelTraceDecoder createObjectUnderTest() { + return new OTelTraceDecoder(); + } + + private String getFileAsJsonString(String requestJsonFileName) throws IOException { + final StringBuilder jsonBuilder = new StringBuilder(); + try (final InputStream inputStream = Objects.requireNonNull( + OTelLogsDecoderTest.class.getClassLoader().getResourceAsStream(requestJsonFileName))) { + final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); + bufferedReader.lines().forEach(jsonBuilder::append); + } + return jsonBuilder.toString(); + } + + private ExportTraceServiceRequest buildExportTraceServiceRequestFromJsonFile(String requestJsonFileName) throws IOException { + final ExportTraceServiceRequest.Builder builder = ExportTraceServiceRequest.newBuilder(); + JsonFormat.parser().merge(getFileAsJsonString(requestJsonFileName), builder); + return builder.build(); + } + + private void validateSpan(Span span) { + String traceId = null; + String spanId = null; + try { + traceId = new String(Hex.decodeHex(span.get("traceId", String.class)), StandardCharsets.UTF_8); + spanId = new String(Hex.decodeHex(span.get("spanId", String.class)), StandardCharsets.UTF_8); + } catch (Exception e) { + } + assertTrue(traceId.equals("TRACEID1") || traceId.equals("TRACEID2") || traceId.equals("TRACEID3")); + if (traceId.equals("TRACEID1")) { + assertTrue(spanId.equals("TRACEID1-SPAN1")); + } else if (traceId.equals("TRACEID2")) { + assertTrue(spanId.equals("TRACEID2-SPAN1") || spanId.equals("TRACEID2-SPAN2")); + } else { + assertTrue(spanId.equals("TRACEID3-SPAN1")); + } + } + + @Test + public void testParse() throws Exception { + final ExportTraceServiceRequest request = buildExportTraceServiceRequestFromJsonFile(TEST_REQUEST_TRACES_FILE); + InputStream inputStream = new ByteArrayInputStream((byte[])request.toByteArray()); + createObjectUnderTest().parse(inputStream, (record) -> { + validateSpan((Span)record.getData()); + }); + + } +} + diff --git a/data-prepper-plugins/otel-proto-common/src/test/resources/test-request-multiple-logs.json b/data-prepper-plugins/otel-proto-common/src/test/resources/test-request-multiple-logs.json new file mode 100644 index 0000000000..2ee4beb4b9 --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/test/resources/test-request-multiple-logs.json @@ -0,0 +1,68 @@ + +{ + "resourceLogs": [ + { + "resource": { + "attributes": [{ + "key": "service.name", + "value": { + "stringValue": "service" + } + }] + }, + "scopeLogs": [{ + "logRecords": [{ + "timeUnixNano": "1590328800000000000", + "severityNumber": "SEVERITY_NUMBER_DEBUG", + "severityText": "Severity value", + "body": { + "stringValue": "Log value" + }, + "attributes": [{ + "key": "statement.params", + "value": { + "stringValue": "us-east-1" + } + }], + "droppedAttributesCount": 3, + "flags": 1, + "traceId": "uhocI7QJO2M=", + "spanId": "LMg6yQ68Rpw=", + "observedTimeUnixNano": "1590328802000000000" + }] + }], + "schemaUrl": "schemaurl" + }, + { + "resource": { + "attributes": [{ + "key": "service.name", + "value": { + "stringValue": "service" + } + }] + }, + "schemaUrl": "schemaurl", + "instrumentationLibraryLogs": [{ + "logRecords": [{ + "timeUnixNano": "1590328800000000000", + "severityNumber": "SEVERITY_NUMBER_DEBUG", + "severityText": "Severity value", + "body": { + "stringValue": "Log value" + }, + "attributes": [{ + "key": "statement.params", + "value": { + "stringValue": "us-east-1" + } + }], + "droppedAttributesCount": 3, + "flags": 1, + "traceId": "uhocI7QJO2M=", + "spanId": "LMg6yQ68Rpw=", + "observedTimeUnixNano": "1590328802000000000" + }] + }] + }] +} diff --git a/data-prepper-plugins/otel-proto-common/src/test/resources/test-request-multiple-metrics.json b/data-prepper-plugins/otel-proto-common/src/test/resources/test-request-multiple-metrics.json new file mode 100644 index 0000000000..ce87c693d3 --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/test/resources/test-request-multiple-metrics.json @@ -0,0 +1,130 @@ +{ + "resourceMetrics": [ + { + "resource": { + "attributes": [ + { + "key": "resource-attr", + "value": { + "stringValue": "resource-attr-val-1" + } + } + ] + }, + "scopeMetrics": [ + { + "scope": {}, + "metrics": [ + { + "name": "counter-int", + "unit": 1, + "gauge": { + "dataPoints": [ + { + "attributes": [ + { + "key": "label-1", + "value": { + "stringValue": "label-value-1" + } + } + ], + "startTimeUnixNano": "1581452773000000789", + "timeUnixNano": "1581452773000000789", + "asInt": "123" + } + ] + } + } + ] + } + ] + }, + { + "resource": { + "attributes": [ + { + "key": "resource-attr", + "value": { + "stringValue": "resource-attr-val-2" + } + } + ] + }, + "scopeMetrics": [ + { + "scope": {}, + "metrics": [ + { + "name": "histogram-int", + "unit": 1, + "histogram": { + "dataPoints": [ + { + "attributes": [ + { + "key": "label-1", + "value": { + "stringValue": "label-value-1" + } + } + ], + "startTimeUnixNano": "1581452773000000789", + "timeUnixNano": "1581452773000000789", + "count": "30", + "sum": "100", + "bucket_counts": [3, 5, 15, 6, 1], + "explicit_bounds": [1.0, 2.0, 3.0, 4.0], + "exemplars": [] + } + ], + "aggregationTemporality":"2" + } + } + ] + } + ] + }, + { + "resource": { + "attributes": [ + { + "key": "resource-attr", + "value": { + "stringValue": "resource-attr-val-3" + } + } + ] + }, + "scopeMetrics": [ + { + "scope": {}, + "metrics": [ + { + "name": "sum-int", + "unit": 1, + "sum": { + "dataPoints": [ + { + "attributes": [ + { + "key": "label-1", + "value": { + "stringValue": "label-value-1" + } + } + ], + "startTimeUnixNano": "1581452773000000789", + "timeUnixNano": "1581452773000000789", + "asInt": "456" + } + ] + } + } + ] + } + ] + } + ] +} + diff --git a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceGrpcService.java b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceGrpcService.java index a0fd459c3d..5ac38d8e4b 100644 --- a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceGrpcService.java +++ b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceGrpcService.java @@ -28,6 +28,7 @@ import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; public class OTelTraceGrpcService extends TraceServiceGrpc.TraceServiceImplBase { @@ -93,11 +94,18 @@ private void processRequest(final ExportTraceServiceRequest request, final Strea throw new BadRequestException(e.getMessage(), e); } - final List> records = spans.stream().map(span -> new Record(span)).collect(Collectors.toList()); - try { - buffer.writeAll(records, bufferWriteTimeoutInMillis); - } catch (Exception e) { + if (buffer.isByteBuffer()) { + Map requestsMap = oTelProtoDecoder.splitExportTraceServiceRequestByTraceId(request); + ExportTraceServiceRequest tmp; + for (Map.Entry entry: requestsMap.entrySet()) { + buffer.writeBytes(entry.getValue().toByteArray(), entry.getKey(), bufferWriteTimeoutInMillis); + } + } else { + final List> records = spans.stream().map(span -> new Record(span)).collect(Collectors.toList()); + buffer.writeAll(records, bufferWriteTimeoutInMillis); + } + } catch (final Exception e) { if (ServiceRequestContext.current().isTimedOut()) { LOG.warn("Exception writing to buffer but request already timed out.", e); return; diff --git a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java index f6444c4fe2..a38a3f9119 100644 --- a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java +++ b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java @@ -38,6 +38,8 @@ import org.opensearch.dataprepper.plugins.health.HealthGrpcService; import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec; import org.opensearch.dataprepper.plugins.source.oteltrace.certificate.CertificateProviderFactory; +import org.opensearch.dataprepper.model.codec.ByteDecoder; +import org.opensearch.dataprepper.plugins.otel.codec.OTelTraceDecoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +65,7 @@ public class OTelTraceSource implements Source> { private final GrpcRequestExceptionHandler requestExceptionHandler; private final String pipelineName; private Server server; + private final ByteDecoder byteDecoder; @DataPrepperPluginConstructor public OTelTraceSource(final OTelTraceSourceConfig oTelTraceSourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, @@ -80,6 +83,12 @@ public OTelTraceSource(final OTelTraceSourceConfig oTelTraceSourceConfig, final this.pipelineName = pipelineDescription.getPipelineName(); this.authenticationProvider = createAuthenticationProvider(pluginFactory); this.requestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics); + this.byteDecoder = new OTelTraceDecoder(); + } + + @Override + public ByteDecoder getDecoder() { + return byteDecoder; } @Override diff --git a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceGrpcServiceTest.java b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceGrpcServiceTest.java index 452e1b9429..d6aa5503c5 100644 --- a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceGrpcServiceTest.java +++ b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceGrpcServiceTest.java @@ -46,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -92,6 +93,9 @@ public class OTelTraceGrpcServiceTest { @Mock private ServiceRequestContext serviceRequestContext; + @Captor + ArgumentCaptor bytesCaptor; + @Captor ArgumentCaptor recordCaptor; @@ -146,6 +150,30 @@ public void export_Success_responseObserverOnCompleted() throws Exception { assertThat(capturedRecords.get(0).getData().getTraceState(), equalTo("SUCCESS")); } + @Test + public void export_Success_with_ByteBuffer_responseObserverOnCompleted() throws Exception { + when(buffer.isByteBuffer()).thenReturn(true); + objectUnderTest = generateOTelTraceGrpcService(new OTelProtoCodec.OTelProtoDecoder()); + + try (MockedStatic mockedStatic = mockStatic(ServiceRequestContext.class)) { + mockedStatic.when(ServiceRequestContext::current).thenReturn(serviceRequestContext); + objectUnderTest.export(SUCCESS_REQUEST, responseObserver); + } + + verify(buffer, times(1)).writeBytes(bytesCaptor.capture(), anyString(), anyInt()); + verify(responseObserver, times(1)).onNext(ExportTraceServiceResponse.newBuilder().build()); + verify(responseObserver, times(1)).onCompleted(); + verify(requestsReceivedCounter, times(1)).increment(); + verify(successRequestsCounter, times(1)).increment(); + final ArgumentCaptor payloadLengthCaptor = ArgumentCaptor.forClass(Double.class); + verify(payloadSizeSummary, times(1)).record(payloadLengthCaptor.capture()); + assertThat(payloadLengthCaptor.getValue().intValue(), equalTo(SUCCESS_REQUEST.getSerializedSize())); + verify(requestProcessDuration, times(1)).record(ArgumentMatchers.any()); + + final byte[] capturedBytes = (byte[]) bytesCaptor.getValue(); + assertThat(capturedBytes.length, equalTo(SUCCESS_REQUEST.toByteArray().length)); + } + @Test public void export_BufferTimeout_responseObserverOnError() throws Exception { objectUnderTest = generateOTelTraceGrpcService(new OTelProtoCodec.OTelProtoDecoder());