Skip to content

Commit

Permalink
Add support OTEL traces and logs with Kafka buffer (#3625)
Browse files Browse the repository at this point in the history
* Add support OTEL traces and logs with Kafka buffer

Signed-off-by: Krishna Kondaka <[email protected]>

* Removed binary files

Signed-off-by: Krishna Kondaka <[email protected]>

* Rebased and merged with latest changes

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Nov 10, 2023
1 parent 3543634 commit 2ea6edf
Show file tree
Hide file tree
Showing 22 changed files with 995 additions and 19 deletions.
19 changes: 13 additions & 6 deletions data-prepper-plugins/kafka-plugins/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ dependencies {
implementation 'software.amazon.glue:schema-registry-serde:1.1.15'
implementation 'com.amazonaws:aws-java-sdk-glue:1.12.506'
implementation 'io.confluent:kafka-json-schema-serializer:7.4.0'
implementation project(':data-prepper-plugins:failures-common')
implementation 'org.apache.kafka:connect-json:3.4.0'
implementation 'com.github.fge:json-schema-validator:2.2.14'
implementation 'commons-collections:commons-collections:3.2.2'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:apache-client'

testImplementation 'org.mockito:mockito-inline:4.1.0'
testImplementation 'org.yaml:snakeyaml:2.0'
testImplementation testLibs.spring.test
Expand All @@ -48,13 +55,13 @@ dependencies {
testImplementation 'org.apache.kafka:kafka-clients:3.4.0:test'
testImplementation 'org.apache.kafka:connect-json:3.4.0'
testImplementation('com.kjetland:mbknor-jackson-jsonschema_2.13:1.0.39')
implementation project(':data-prepper-plugins:failures-common')
testImplementation group: 'org.powermock', name: 'powermock-api-mockito2', version: '2.0.9'
implementation 'org.apache.kafka:connect-json:3.4.0'
implementation 'com.github.fge:json-schema-validator:2.2.14'
implementation 'commons-collections:commons-collections:3.2.2'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:apache-client'
testImplementation project(':data-prepper-plugins:otel-metrics-source')
testImplementation project(':data-prepper-plugins:otel-proto-common')
testImplementation libs.opentelemetry.proto
testImplementation libs.protobuf.util
testImplementation libs.commons.io
testImplementation libs.armeria.grpc


}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.buffer;

import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.metric.Metric;
import org.opensearch.dataprepper.model.metric.JacksonMetric;
import org.opensearch.dataprepper.model.metric.JacksonGauge;
import org.opensearch.dataprepper.model.metric.JacksonSum;
import org.opensearch.dataprepper.model.metric.JacksonHistogram;
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig;
import org.opensearch.dataprepper.model.buffer.DelegatingBuffer;
import org.opensearch.dataprepper.model.buffer.Buffer;


import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.BufferedReader;

import org.opensearch.dataprepper.plugins.otel.codec.OTelMetricDecoder;
import org.opensearch.dataprepper.plugins.source.otelmetrics.OTelMetricsGrpcService;
import com.google.protobuf.util.JsonFormat;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
public class KafkaBufferOTelIT {
private static final String TEST_REQUEST_MULTIPLE_METRICS_FILE = "test-request-multiple-metrics.json";
private static final Logger LOG = LoggerFactory.getLogger(KafkaBufferIT.class);
@Mock
private PluginSetting pluginSetting;
@Mock
private KafkaBufferConfig kafkaBufferConfig;
@Mock
private PluginFactory pluginFactory;
@Mock
private AcknowledgementSetManager acknowledgementSetManager;
@Mock
private BufferTopicConfig topicConfig;

private DelegatingBuffer buffer;

private PluginMetrics pluginMetrics;
private String bootstrapServersCommaDelimited;
private OTelMetricsGrpcService oTelMetricsGrpcService;
class KafkaDelegatingBuffer extends DelegatingBuffer {
KafkaDelegatingBuffer(Buffer buffer) {
super(buffer);
}
};

@BeforeEach
void setUp() {
pluginMetrics = PluginMetrics.fromNames(UUID.randomUUID().toString(), UUID.randomUUID().toString());

when(pluginSetting.getPipelineName()).thenReturn(UUID.randomUUID().toString());

String topicName = "buffer-" + RandomStringUtils.randomAlphabetic(5);
when(topicConfig.getName()).thenReturn(topicName);
when(topicConfig.getGroupId()).thenReturn("buffergroup-" + RandomStringUtils.randomAlphabetic(6));
when(topicConfig.isCreateTopic()).thenReturn(true);
when(topicConfig.getSerdeFormat()).thenReturn(MessageFormat.BYTES);
when(topicConfig.getWorkers()).thenReturn(1);
when(topicConfig.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5));
when(topicConfig.getConsumerMaxPollRecords()).thenReturn(1);
when(topicConfig.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15));
when(topicConfig.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3));
when(topicConfig.getAutoCommit()).thenReturn(false);
when(topicConfig.getAutoOffsetReset()).thenReturn("earliest");
when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1));
when(kafkaBufferConfig.getTopic()).thenReturn(topicConfig);

EncryptionConfig encryptionConfig = mock(EncryptionConfig.class);

bootstrapServersCommaDelimited = System.getProperty("tests.kafka.bootstrap_servers");

LOG.info("Using Kafka bootstrap servers: {}", bootstrapServersCommaDelimited);

when(kafkaBufferConfig.getBootstrapServers()).thenReturn(Collections.singletonList(bootstrapServersCommaDelimited));
when(kafkaBufferConfig.getEncryptionConfig()).thenReturn(encryptionConfig);
}

private String getFileAsJsonString(String requestJsonFileName) throws IOException {
final StringBuilder jsonBuilder = new StringBuilder();
try (final InputStream inputStream = Objects.requireNonNull(
KafkaBufferOTelIT.class.getClassLoader().getResourceAsStream(requestJsonFileName))) {
final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
bufferedReader.lines().forEach(jsonBuilder::append);
}
return jsonBuilder.toString();
}

private ExportMetricsServiceRequest buildExportMetricsServiceRequestFromJsonFile(String requestJsonFileName) throws IOException {
final ExportMetricsServiceRequest.Builder builder = ExportMetricsServiceRequest.newBuilder();
JsonFormat.parser().merge(getFileAsJsonString(requestJsonFileName), builder);
return builder.build();
}

@Test
void test_otel_metrics_with_kafka_buffer() throws Exception {
KafkaBuffer kafkaBuffer = new KafkaBuffer(pluginSetting, kafkaBufferConfig, pluginFactory, acknowledgementSetManager, pluginMetrics, new OTelMetricDecoder(), null, null);
buffer = new KafkaDelegatingBuffer(kafkaBuffer);
oTelMetricsGrpcService = new OTelMetricsGrpcService(10000,
buffer,
pluginMetrics);

final ExportMetricsServiceRequest request = buildExportMetricsServiceRequestFromJsonFile(TEST_REQUEST_MULTIPLE_METRICS_FILE);
oTelMetricsGrpcService.rawExport(request);
Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = kafkaBuffer.read(10_000);
assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
assertThat(readResult.getKey().size(), equalTo(3));
for (Record<Event> record : readResult.getKey()) {
Event event = record.getData();
JacksonMetric metric = (JacksonMetric) (JacksonEvent)(Event)record.getData();
if (metric.getKind().equals(Metric.KIND.GAUGE.toString())) {
assertThat(metric.getUnit(), equalTo("1"));
assertThat(metric.getName(), equalTo("counter-int"));
JacksonGauge gauge = (JacksonGauge)metric;
assertThat(gauge.getValue(), equalTo(123.0));
} else if (metric.getKind().equals(Metric.KIND.SUM.toString())) {
assertThat(metric.getUnit(), equalTo("1"));
assertThat(metric.getName(), equalTo("sum-int"));
JacksonSum sum = (JacksonSum)metric;
assertThat(sum.getValue(), equalTo(456.0));
} else if (metric.getKind().equals(Metric.KIND.HISTOGRAM.toString())) {
assertThat(metric.getUnit(), equalTo("1"));
assertThat(metric.getName(), equalTo("histogram-int"));
JacksonHistogram histogram = (JacksonHistogram)metric;
assertThat(histogram.getSum(), equalTo(100.0));
assertThat(histogram.getCount(), equalTo(30L));
assertThat(histogram.getExemplars(), equalTo(Collections.emptyList()));
assertThat(histogram.getExplicitBoundsList(), equalTo(List.of(1.0, 2.0, 3.0, 4.0)));
assertThat(histogram.getExplicitBoundsCount(), equalTo(4));
assertThat(histogram.getBucketCountsList(), equalTo(List.of(3L, 5L, 15L, 6L, 1L)));
assertThat(histogram.getBucketCount(), equalTo(5));
assertThat(histogram.getAggregationTemporality(), equalTo("AGGREGATION_TEMPORALITY_CUMULATIVE"));
} else {
assertTrue("FAILED".equals("Unknown Metric type"));
}
}
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
{
"resourceMetrics": [
{
"resource": {
"attributes": [
{
"key": "resource-attr",
"value": {
"stringValue": "resource-attr-val-1"
}
}
]
},
"scopeMetrics": [
{
"scope": {},
"metrics": [
{
"name": "counter-int",
"unit": 1,
"gauge": {
"dataPoints": [
{
"attributes": [
{
"key": "label-1",
"value": {
"stringValue": "label-value-1"
}
}
],
"startTimeUnixNano": "1581452773000000789",
"timeUnixNano": "1581452773000000789",
"asInt": "123"
}
]
}
}
]
}
]
},
{
"resource": {
"attributes": [
{
"key": "resource-attr",
"value": {
"stringValue": "resource-attr-val-2"
}
}
]
},
"scopeMetrics": [
{
"scope": {},
"metrics": [
{
"name": "histogram-int",
"unit": 1,
"histogram": {
"dataPoints": [
{
"attributes": [
{
"key": "label-1",
"value": {
"stringValue": "label-value-1"
}
}
],
"startTimeUnixNano": "1581452773000000789",
"timeUnixNano": "1581452773000000789",
"count": "30",
"sum": "100",
"bucket_counts": [3, 5, 15, 6, 1],
"explicit_bounds": [1.0, 2.0, 3.0, 4.0],
"exemplars": []
}
],
"aggregationTemporality":"2"
}
}
]
}
]
},
{
"resource": {
"attributes": [
{
"key": "resource-attr",
"value": {
"stringValue": "resource-attr-val-3"
}
}
]
},
"scopeMetrics": [
{
"scope": {},
"metrics": [
{
"name": "sum-int",
"unit": 1,
"sum": {
"dataPoints": [
{
"attributes": [
{
"key": "label-1",
"value": {
"stringValue": "label-value-1"
}
}
],
"startTimeUnixNano": "1581452773000000789",
"timeUnixNano": "1581452773000000789",
"asInt": "456"
}
]
}
}
]
}
]
}
]
}

Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ private void processRequest(final ExportLogsServiceRequest request, final Stream

final List<Record<Object>> records = logs.stream().map(log -> new Record<Object>(log)).collect(Collectors.toList());
try {
buffer.writeAll(records, bufferWriteTimeoutInMillis);
if (buffer.isByteBuffer()) {
buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis);
} else {
buffer.writeAll(records, bufferWriteTimeoutInMillis);
}
} catch (Exception e) {
if (ServiceRequestContext.current().isTimedOut()) {
LOG.warn("Exception writing to buffer but request already timed out.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.model.Certificate;
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.plugins.otel.codec.OTelLogsDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -58,6 +60,7 @@ public class OTelLogsSource implements Source<Record<Object>> {
private final CertificateProviderFactory certificateProviderFactory;
private final GrpcRequestExceptionHandler requestExceptionHandler;
private Server server;
private ByteDecoder byteDecoder;

@DataPrepperPluginConstructor
public OTelLogsSource(final OTelLogsSourceConfig oTelLogsSourceConfig,
Expand All @@ -77,6 +80,12 @@ public OTelLogsSource(final OTelLogsSourceConfig oTelLogsSourceConfig,
this.pipelineName = pipelineDescription.getPipelineName();
this.authenticationProvider = createAuthenticationProvider(pluginFactory);
this.requestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics);
this.byteDecoder = new OTelLogsDecoder();
}

@Override
public ByteDecoder getDecoder() {
return byteDecoder;
}

@Override
Expand Down
Loading

0 comments on commit 2ea6edf

Please sign in to comment.