Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add helper methods for manual spans in mutiny pipelines #45478

Merged
merged 1 commit into from
Jan 16, 2025

Conversation

arn-ivu
Copy link
Contributor

@arn-ivu arn-ivu commented Jan 9, 2025

Fix #44411

In a traditional blocking and synchronous framework the opentelemetry context is attached to ThreadLocal. In reactive programming, where multiple processings share the same event-loop thread, one has to use Vert.x duplicated contexts instead (see https://quarkus.io/guides/duplicated-context). wrapWithSpan ensures that the pipeline is executed on a duplicated context (If the current context already is duplicated, it will stay the same. Therefore, nested calls to wrapWithSpan will all run on the same vert.x context).

Another difficulty of mutiny pipelines is, that usually only one parameter flows through the pipeline. In oder to end spans and to close the current scope at the end of the pipeline, span and scope must be stored in the mutiny-context. They are stored in a stack datastructure so that multiple nested spans/scopes are closed in the correct order.

inspired by Jan Peremsky (https://github.com/jan-peremsky/quarkus-reactive-otel/blob/c74043d388ec4df155f466f1d6938931c3389b70/src/main/java/com/fng/ewallet/pex/Tracer.java) and edeandrea (https://github.com/quarkusio/quarkus-super-heroes/blob/main/event-statistics/src/main/java/io/quarkus/sample/superheroes/statistics/listener/SuperStats.java)

Copy link

quarkus-bot bot commented Jan 9, 2025

Thanks for your pull request!

Your pull request does not follow our editorial rules. Could you have a look?

  • title should not end up with dot

This message is automatically generated by a bot.

Copy link

quarkus-bot bot commented Jan 9, 2025

/cc @brunobat (opentelemetry), @radcortez (opentelemetry)

@arn-ivu arn-ivu changed the title Add helper methods for manual spans in mutiny pipelines. Add helper methods for manual spans in mutiny pipelines Jan 9, 2025
@brunobat brunobat requested a review from ozangunalp January 9, 2025 14:49

This comment has been minimized.

Copy link

github-actions bot commented Jan 9, 2025

🙈 The PR is closed and the preview is expired.

This comment has been minimized.

This comment has been minimized.

This comment has been minimized.

@ozangunalp
Copy link
Contributor

I need to read the motivation behind the Tracer method linked to the issue itself. For me, this needs to be equivalent to the WithSpanInterceptor, ie. an extracted method returning Uni and annotated with @WithSpan should have the exact same effect.

A helper method like this doesn't need to be redundant, but I don't see why it wouldn't be implemented the same way.

As for what has been done in SuperStats, this has to do with handling message traces. I need to set up an environment myself to compare produced traces, but I am pretty sure such code is no longer needed for message traces.

@arn-ivu
Copy link
Contributor Author

arn-ivu commented Jan 10, 2025

@ozangunalp My usecase looks roughly like this

private void handleMessages() {
        consumer.getMessages() //from our own rabbitMQ-Lib. Will issue item whenever a message arrives
                .onItem()
                .transformToUniAndConcatenate(message -> wrapWithSpan(tracer, 
                                                //this is how our lib provides the remote context
						message.getTracingData().map(MessageTracingData::getContext), 
                                               "handleIncomingMessage", 
						Uni.createFrom()
                                .item(message)
                                .map(Message::getPayload)
                                .invoke(this::handlePayload) //actual message processing
                )
                .subscribe().with(...);
    }

it might be possible to solve it like this

private void handleMessages() {
        consumer.getMessages() //from our own rabbitMQ-Lib. Will issue item whenever a message arrives
                .onItem().invoke(message -> makeContextCurrent(message.getTracingData().map(MessageTracingData::getContext)))
                .onItem().call(this::handleIncomingMessage)
                .subscribe().with(...)
    }
    
@WithSpan
public Uni<Void> handleMessage(Message message){
	Uni.createFrom()
    	.item(message)
        .map(Message::getPayload)
        .invoke(this::handlePayload) //actual message processing
        .replaceWithVoid()
}

however, I have 2 problems with that

  1. I would have to expose the extracted Method with public (otherwise I get a warning that @WithSpan won't do anything)
  2. to make the provided context current (in order for WithSpan to set the parent correctly) I would have to make sure to run on a duplicate context and I think I would have to close the context later as well. So essentially the same problems I encounter when creating the span manually.

@ozangunalp
Copy link
Contributor

@arn-ivu ok, I think that can justify the helper method.
However, I believe holding a stack of scope/spans is not necessary.
I'd simply have a method something like this:

    public static <T> Uni<T> wrapWithSpan(final Tracer tracer,
                                          final Optional<Context> parentContext,
                                          final String spanName, final Uni<T> pipeline) {
        return runOnDuplicatedContext(Uni.createFrom().deferred(() -> {
            final SpanBuilder spanBuilder = tracer.spanBuilder(spanName);
            if (parentContext.isPresent()) {
                spanBuilder.setParent(parentContext.get());
            } else {
                spanBuilder.setNoParent();
            }
            final Span span = spanBuilder.startSpan();
            final Scope scope = span.makeCurrent();
            return pipeline.onTermination()
                    .invoke(new Functions.TriConsumer<T, Throwable, Boolean>() {
                        @Override
                        public void accept(T o, Throwable throwable, Boolean isCancelled) {
                            if (isCancelled) {
                                span.recordException(new CancellationException());
                            } else if (throwable != null) {
                                span.recordException(throwable);
                            }
                            span.end();
                            scope.close();
                        }
                    });
        }));
    }

Note that lambda described in deferred runs on subscription time

Which is much closer to what we do in WithSpanInterceptor.

wdyt?

cf @brunobat

@arn-ivu
Copy link
Contributor Author

arn-ivu commented Jan 14, 2025

It took me a little to understand the suggested code as I am not that familiar with mutiny, but it now looks good to me. I have updated the code accordingly.

This comment has been minimized.

This comment has been minimized.

@ozangunalp
Copy link
Contributor

Ok for me except the formatting issue, @brunobat would you also take a look ?

span.recordException(throwable);
}
span.end();
scope.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the scope close should be on a finally block. It's a good practice.
See

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

@brunobat
Copy link
Contributor

I think it's good. Just left a comment about a minor thing.

@arn-ivu arn-ivu force-pushed the mutiny-tracing-helper branch from e2eaf91 to 26eeeee Compare January 15, 2025 15:28

This comment has been minimized.

This comment has been minimized.

@brunobat
Copy link
Contributor

To fix the imports, you just need to run mvn clean install -DskipTests in the extension... It will auto-fix itself and then you commit that.

In a traditional blocking and synchronous framework the opentelemetry context
is attached to ThreadLocal. In reactive programming, where multiple processings share the same event-loop thread, one has to use Vert.x duplicated contexts instead (see https://quarkus.io/guides/duplicated-context).
wrapWithSpan ensures that the pipeline is executed on a duplicated context (If the current context already is duplicated, it will stay the same. Therefore, nested calls to wrapWithSpan will all run on the same vert.x context).

inspired by Jan Peremsky (https://github.com/jan-peremsky/quarkus-reactive-otel/blob/c74043d388ec4df155f466f1d6938931c3389b70/src/main/java/com/fng/ewallet/pex/Tracer.java)
and edeandrea (https://github.com/quarkusio/quarkus-super-heroes/blob/main/event-statistics/src/main/java/io/quarkus/sample/superheroes/statistics/listener/SuperStats.java)

suggestions from review

I will squash with the previous commit once everything is approved

suggestion from reviewer

todo: squash when everything is done
-> adapt commit message. In this solution span and scope are local variables. no need for a stack.

unnecessary line removed

fix test to run on different contexts again

cleanup
@arn-ivu arn-ivu force-pushed the mutiny-tracing-helper branch from 26eeeee to 1d9029d Compare January 15, 2025 19:54
Copy link

quarkus-bot bot commented Jan 15, 2025

Status for workflow Quarkus Documentation CI

This is the status report for running Quarkus Documentation CI on commit 1d9029d.

✅ The latest workflow run for the pull request has completed successfully.

It should be safe to merge provided you have a look at the other checks in the summary.

Warning

There are other workflow runs running, you probably need to wait for their status before merging.

Copy link

quarkus-bot bot commented Jan 15, 2025

Status for workflow Quarkus CI

This is the status report for running Quarkus CI on commit 1d9029d.

✅ The latest workflow run for the pull request has completed successfully.

It should be safe to merge provided you have a look at the other checks in the summary.

You can consult the Develocity build scans.


Flaky tests - Develocity

⚙️ JVM Tests - JDK 17 Windows

📦 integration-tests/opentelemetry-quickstart

io.quarkus.it.opentelemetry.OpenTelemetryDisabledTest.buildTimeDisabled - History

  • Condition with Lambda expression in io.quarkus.it.opentelemetry.OpenTelemetryDisabledTest was not fulfilled within 200 milliseconds. - org.awaitility.core.ConditionTimeoutException
org.awaitility.core.ConditionTimeoutException: Condition with Lambda expression in io.quarkus.it.opentelemetry.OpenTelemetryDisabledTest was not fulfilled within 200 milliseconds.
	at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)
	at org.awaitility.core.CallableCondition.await(CallableCondition.java:78)
	at org.awaitility.core.CallableCondition.await(CallableCondition.java:26)
	at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:1006)
	at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:975)
	at io.quarkus.it.opentelemetry.OpenTelemetryDisabledTest.buildTimeDisabled(OpenTelemetryDisabledTest.java:29)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)

Copy link
Contributor

@brunobat brunobat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The OpenTelemetryDisabledTest test failure is a flaky one.
I think this is good to go.

@ozangunalp ozangunalp merged commit 1e499cf into quarkusio:main Jan 16, 2025
32 checks passed
@quarkus-bot quarkus-bot bot added this to the 3.19 - main milestone Jan 16, 2025
@quarkus-bot quarkus-bot bot added the kind/enhancement New feature or request label Jan 16, 2025
@gsmet
Copy link
Member

gsmet commented Jan 16, 2025

Hey there,

I have seen these new tests fail on CI, for instance:

2025-01-16T11:02:16.1784304Z [ERROR] io.quarkus.opentelemetry.deployment.traces.MutinyTracingHelperTest.testSimpleMultiPipeline_Merge(String, String)[1] -- Time elapsed: 30.11 s <<< ERROR!
2025-01-16T11:02:16.1839850Z org.awaitility.core.ConditionTimeoutException: Assertion condition defined as a Lambda expression in io.quarkus.opentelemetry.deployment.common.exporter.TestSpanExporter Spans: [SpanData{spanContext=ImmutableSpanContext{traceId=72de7294d5ae1044f6305ea54cad78ae, spanId=8f8aecbb9fb640b9, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, parentSpanContext=ImmutableSpanContext{traceId=72de7294d5ae1044f6305ea54cad78ae, spanId=3b96bfec66a2a32c, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, resource=Resource{schemaUrl=null, attributes={host.name="fv-az1333-898", service.name="quarkus-opentelemetry-deployment", service.version="999-SNAPSHOT", telemetry.sdk.language="java", telemetry.sdk.name="opentelemetry", telemetry.sdk.version="1.42.1", webengine.name="Quarkus", webengine.version="999-SNAPSHOT"}}, instrumentationScopeInfo=InstrumentationScopeInfo{name=io.quarkus.opentelemetry, version=null, schemaUrl=null, attributes={}}, name=childSpan, kind=INTERNAL, startEpochNanos=1737025251124185513, endEpochNanos=1737025251124267667, attributes={}, totalAttributeCount=0, events=[], totalRecordedEvents=0, links=[], totalRecordedLinks=0, status=ImmutableStatusData{statusCode=UNSET, description=}, hasEnded=true}, SpanData{spanContext=ImmutableSpanContext{traceId=72de7294d5ae1044f6305ea54cad78ae, spanId=3b96bfec66a2a32c, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, parentSpanContext=ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}, resource=Resource{schemaUrl=null, attributes={host.name="fv-az1333-898", service.name="quarkus-opentelemetry-deployment", service.version="999-SNAPSHOT", telemetry.sdk.language="java", telemetry.sdk.name="opentelemetry", telemetry.sdk.version="1.42.1", webengine.name="Quarkus", webengine.version="999-SNAPSHOT"}}, instrumentationScopeInfo=InstrumentationScopeInfo{name=io.quarkus.opentelemetry, version=null, schemaUrl=null, attributes={}}, name=parentSpan, kind=INTERNAL, startEpochNanos=1737025251124034019, endEpochNanos=1737025251124304456, attributes={}, totalAttributeCount=0, events=[], totalRecordedEvents=0, links=[], totalRecordedLinks=0, status=ImmutableStatusData{statusCode=UNSET, description=}, hasEnded=true}, SpanData{spanContext=ImmutableSpanContext{traceId=1aa4c4ce6e079b592bf0d31391d335f5, spanId=8a066dc1f6ae6935, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, parentSpanContext=ImmutableSpanContext{traceId=1aa4c4ce6e079b592bf0d31391d335f5, spanId=6e170a071ee04d3e, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, resource=Resource{schemaUrl=null, attributes={host.name="fv-az1333-898", service.name="quarkus-opentelemetry-deployment", service.version="999-SNAPSHOT", telemetry.sdk.language="java", telemetry.sdk.name="opentelemetry", telemetry.sdk.version="1.42.1", webengine.name="Quarkus", webengine.version="999-SNAPSHOT"}}, instrumentationScopeInfo=InstrumentationScopeInfo{name=io.quarkus.opentelemetry, version=null, schemaUrl=null, attributes={}}, name=subspan test1, kind=INTERNAL, startEpochNanos=1737025251542201573, endEpochNanos=1737025251542242079, attributes={}, totalAttributeCount=0, events=[], totalRecordedEvents=0, links=[], totalRecordedLinks=0, status=ImmutableStatusData{statusCode=UNSET, description=}, hasEnded=true}, SpanData{spanContext=ImmutableSpanContext{traceId=1aa4c4ce6e079b592bf0d31391d335f5, spanId=6e170a071ee04d3e, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, parentSpanContext=ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}, resource=Resource{schemaUrl=null, attributes={host.name="fv-az1333-898", service.name="quarkus-opentelemetry-deployment", service.version="999-SNAPSHOT", telemetry.sdk.language="java", telemetry.sdk.name="opentelemetry", telemetry.sdk.version="1.42.1", webengine.name="Quarkus", webengine.version="999-SNAPSHOT"}}, instrumentationScopeInfo=InstrumentationScopeInfo{name=io.quarkus.opentelemetry, version=null, schemaUrl=null, attributes={}}, name=testSpan test1, kind=INTERNAL, startEpochNanos=1737025251542082921, endEpochNanos=1737025251542278177, attributes={}, totalAttributeCount=0, events=[], totalRecordedEvents=0, links=[], totalRecordedLinks=0, status=ImmutableStatusData{statusCode=UNSET, description=}, hasEnded=true}, SpanData{spanContext=ImmutableSpanContext{traceId=c0e1205797a55c6514d973cde7570c98, spanId=903bd9c81697fc0a, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, parentSpanContext=ImmutableSpanContext{traceId=c0e1205797a55c6514d973cde7570c98, spanId=268df3e6d3994072, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, resource=Resource{schemaUrl=null, attributes={host.name="fv-az1333-898", service.name="quarkus-opentelemetry-deployment", service.version="999-SNAPSHOT", telemetry.sdk.language="java", telemetry.sdk.name="opentelemetry", telemetry.sdk.version="1.42.1", webengine.name="Quarkus", webengine.version="999-SNAPSHOT"}}, instrumentationScopeInfo=InstrumentationScopeInfo{name=io.quarkus.opentelemetry, version=null, schemaUrl=null, attributes={}}, name=subspan test2, kind=INTERNAL, startEpochNanos=1737025251542755500, endEpochNanos=1737025251542776349, attributes={}, totalAttributeCount=0, events=[], totalRecordedEvents=0, links=[], totalRecordedLinks=0, status=ImmutableStatusData{statusCode=UNSET, description=}, hasEnded=true}, SpanData{spanContext=ImmutableSpanContext{traceId=c0e1205797a55c6514d973cde7570c98, spanId=268df3e6d3994072, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, parentSpanContext=ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}, resource=Resource{schemaUrl=null, attributes={host.name="fv-az1333-898", service.name="quarkus-opentelemetry-deployment", service.version="999-SNAPSHOT", telemetry.sdk.language="java", telemetry.sdk.name="opentelemetry", telemetry.sdk.version="1.42.1", webengine.name="Quarkus", webengine.version="999-SNAPSHOT"}}, instrumentationScopeInfo=InstrumentationScopeInfo{name=io.quarkus.opentelemetry, version=null, schemaUrl=null, attributes={}}, name=testSpan test2, kind=INTERNAL, startEpochNanos=1737025251542682684, endEpochNanos=1737025251542804883, attributes={}, totalAttributeCount=0, events=[], totalRecordedEvents=0, links=[], totalRecordedLinks=0, status=ImmutableStatusData{statusCode=UNSET, description=}, hasEnded=true}, SpanData{spanContext=ImmutableSpanContext{traceId=d0d7de5db9e1336900dca7901468283a, spanId=a6bb3cdfa7d28bc4, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, parentSpanContext=ImmutableSpanContext{traceId=d0d7de5db9e1336900dca7901468283a, spanId=2cd1ce753586ba11, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, resource=Resource{schemaUrl=null, attributes={host.name="fv-az1333-898", service.name="quarkus-opentelemetry-deployment", service.version="999-SNAPSHOT", telemetry.sdk.language="java", telemetry.sdk.name="opentelemetry", telemetry.sdk.version="1.42.1", webengine.name="Quarkus", webengine.version="999-SNAPSHOT"}}, instrumentationScopeInfo=InstrumentationScopeInfo{name=io.quarkus.opentelemetry, version=null, schemaUrl=null, attributes={}}, name=subspan test3, kind=INTERNAL, startEpochNanos=1737025251543009586, endEpochNanos=1737025251543030836, attributes={}, totalAttributeCount=0, events=[], totalRecordedEvents=0, links=[], totalRecordedLinks=0, status=ImmutableStatusData{statusCode=UNSET, description=}, hasEnded=true}, SpanData{spanContext=ImmutableSpanContext{traceId=d0d7de5db9e1336900dca7901468283a, spanId=2cd1ce753586ba11, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, parentSpanContext=ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}, resource=Resource{schemaUrl=null, attributes={host.name="fv-az1333-898", service.name="quarkus-opentelemetry-deployment", service.version="999-SNAPSHOT", telemetry.sdk.language="java", telemetry.sdk.name="opentelemetry", telemetry.sdk.version="1.42.1", webengine.name="Quarkus", webengine.version="999-SNAPSHOT"}}, instrumentationScopeInfo=InstrumentationScopeInfo{name=io.quarkus.opentelemetry, version=null, schemaUrl=null, attributes={}}, name=testSpan test3, kind=INTERNAL, startEpochNanos=1737025251542949283, endEpochNanos=1737025251543057075, attributes={}, totalAttributeCount=0, events=[], totalRecordedEvents=0, links=[], totalRecordedLinks=0, status=ImmutableStatusData{statusCode=UNSET, description=}, hasEnded=true}] ==> expected: <6> but was: <8> within 30 seconds.

In this case, we have more spans than expected and thus the test fails.

I also experienced some local failures when I was working on the OpenTelemetry module.

Could we make sure these tests are made more reliable?

Thanks!

@brunobat
Copy link
Contributor

Will take a look at that test

@ozangunalp
Copy link
Contributor

@brunobat Thanks! ping me if I can help

@brunobat
Copy link
Contributor

Let's see if #45653 fixes the issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement withSpan method utilities for Mutiny
4 participants