Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update on ReadOnly Headers after Retry #713

Open
EGomz3 opened this issue Jan 13, 2025 · 4 comments
Open

Update on ReadOnly Headers after Retry #713

EGomz3 opened this issue Jan 13, 2025 · 4 comments

Comments

@EGomz3
Copy link

EGomz3 commented Jan 13, 2025

Observed behavior

Some context on how we found the issue:

We noticed the exception "The response headers cannot be modified because the response has already started." in our logs due to misuse of the library on our part when processing our custom retries of jetstream publish requests. It looked something like this:

Try 1: Some random exception we got
Try 2: "The response headers cannot be modified because the response has already started."
Try 3: "The response headers cannot be modified because the response has already started."

This was because we were using the same NatsHeader object during each retry and the NATS client sets those headers as ReadOnly when a request is being traced.

However, after fixing that, we noticed some strange behavior. Now after fixing that, the exceptions upon each retry look like this:

Try 1: "The response headers cannot be modified because the response has already started."
Try 2: "The response headers cannot be modified because the response has already started."
Try 3: "The response headers cannot be modified because the response has already started."

We are getting that same exception but now even when we are not retrying (or are we). Upon taking a closer look at the NATS internals, we realized that there is an internal retry of requests here in the PublishAsync of the NATSJSContext.

Stack Trace:

The response headers cannot be modified because the response has already started.
at NATS.Client.Core.Internal.Telemetry.AddTraceContextHeaders(Activity activity, NatsHeaders& headers)
   at NATS.Client.Core.NatsConnection.PublishAsync[T](String subject, T data, NatsHeaders headers, String replyTo, INatsSerialize`1 serializer, NatsPubOpts opts, CancellationToken cancellationToken)
   at NATS.Client.Core.NatsConnection.CreateRequestSubAsync[TRequest,TReply](String subject, TRequest data, NatsHeaders headers, INatsSerialize`1 requestSerializer, INatsDeserialize`1 replySerializer, NatsPubOpts requestOpts, NatsSubOpts replyOpts, CancellationToken cancellationToken)
   at NATS.Client.JetStream.NatsJSContext.PublishAsync[T](String subject, T data, INatsSerialize`1 serializer, NatsJSPubOpts opts, NatsHeaders headers, CancellationToken cancellationToken)

What seems to be happening is there is an exception with the publish request, but during the second retry, there is an internal NATS exception because of the reuse of headers during each retry.

Expected behavior

The expected behavior would be for there to be no attempts at modifying NatsHeaders after they have been set to ReadOnly

Server and client version

NATS.NET Client Version: 2.5.5

Host environment

No response

Steps to reproduce

  1. Publish request with jetstream (ensure Telemetry.HasListeners() returns true)
  2. Make sure there is an exception with the request
  3. Retry the request once
@EGomz3 EGomz3 closed this as completed Jan 14, 2025
@mtmk
Copy link
Collaborator

mtmk commented Jan 14, 2025

thanks for the report @EGomz3, yea I thought we fixed this at some stage. is that why you closed it?

@EGomz3
Copy link
Author

EGomz3 commented Jan 14, 2025

I closed it because the information I shared wasn't 100% accurate and I wanted to create a simple reproducible example that would make it easier to spot the issue. I may reopen soon, but first, I want to create a simple test project with the issue 😄

@mtmk
Copy link
Collaborator

mtmk commented Jan 14, 2025

I closed it because the information I shared wasn't 100% accurate and I wanted to create a simple reproducible example that would make it easier to spot the issue. I may reopen soon, but first, I want to create a simple test project with the issue 😄

@EGomz3 really appreciated thanks for that it'd make my life easier too 😉

@EGomz3
Copy link
Author

EGomz3 commented Jan 14, 2025

Hi @mtmk 👋

Was able to create a simple test that reproduces the issue 😄 Or maybe 😢 is more appropriate, haha

In terms of setup, the only thing needed is this config in the NATS server Configmap:

    "no_auth_user": "admin",
    "accounts": {
      "dummy": {
        "jetstream": "enabled",
        "users": [
          { "user": "admin" },
          { "user": "app", "permissions": {
            "publish" = ["a"],
            "subscribe" = "_INBOX.>"
            }
          }
        ]
      }
    }

We intentionally did not give the app user any publish permissions so that we could replicate the exception scenario in the code below. Pretty much, we will make sure to create an activity source, then create some kind of retry scenario. Doing so gives the following exception upon publishing:

Unhandled exception. System.InvalidOperationException: The response headers cannot be modified because the response has already started.
   at NATS.Client.Core.NatsHeaders.ThrowIfReadOnly()
   at NATS.Client.Core.NatsHeaders.set_Item(String key, StringValues value)
   at NATS.Client.Core.Internal.Telemetry.<>c.<AddTraceContextHeaders>b__5_0(Object carrier, String fieldName, String fieldValue)
   at System.Diagnostics.LegacyPropagator.Inject(Activity activity, Object carrier, PropagatorSetterCallback setter)
   at NATS.Client.Core.Internal.Telemetry.AddTraceContextHeaders(Activity activity, NatsHeaders& headers)
   at NATS.Client.Core.NatsConnection.PublishAsync[T](String subject, T data, NatsHeaders headers, String replyTo, INatsSerialize`1 serializer, NatsPubOpts opts, CancellationToken cancellationToken)
   at NATS.Client.Core.NatsConnection.CreateRequestSubAsync[TRequest,TReply](String subject, TRequest data, NatsHeaders headers, INatsSerialize`1 requestSerializer, INatsDeserialize`1 replySerializer, NatsPubOpts requestOpts, NatsSubOpts replyOpts, CancellationToken cancellationToken)
   at NATS.Client.JetStream.NatsJSContext.PublishAsync[T](String subject, T data, INatsSerialize`1 serializer, NatsJSPubOpts opts, NatsHeaders headers, CancellationToken cancellationToken)
   at Program.<Main>$(String[] args) in C:\Users\ghd\RiderProjects\NatsHeaderIssue\NatsHeaderIssue\Program.cs:line 33

I hope this test is helpful and do let me know if you need anything else

NATS.Net version = 2.5.6 (latest at time of creating this issue)

using System.Diagnostics;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
using NATS.Net;

// Ensure there's a listener for the activity source
// So that Nats injects the trace context into the headers
ActivitySource.AddActivityListener(
    new ActivityListener
    {
        ShouldListenTo = _ => true,
        Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllDataAndRecorded,
    }
);

// Connection with permissions for everything
await using var adminConnection = new NatsConnection(new NatsOpts { AuthOpts = new NatsAuthOpts { Username = "admin"}});
await adminConnection.ConnectAsync();

var adminJetStreamContext = adminConnection.CreateJetStreamContext();

await adminJetStreamContext.CreateStreamAsync(new StreamConfig(name: "ORDERS", subjects: ["orders.*"]));

// Connection without permissions to publish
await using var appConnection = new NatsConnection(new NatsOpts { AuthOpts = new NatsAuthOpts { Username = "app"}});
await appConnection.ConnectAsync();
var appJetStreamConnection = appConnection.CreateJetStreamContext();

// Create headers object that will be reused internally by nats
var headers = new NatsHeaders();

var ack = await appJetStreamConnection.PublishAsync(subject: "orders.new", data: "order 1", headers: headers);
ack.EnsureSuccess();

// Create a consumer on a stream to receive the messages
var consumer = await adminJetStreamContext.CreateOrUpdateConsumerAsync("ORDERS", new ConsumerConfig("order_processor"));

await foreach (var jsMsg in consumer.ConsumeAsync<string>())
{
    Console.WriteLine($"Processed: {jsMsg.Data}");
    await jsMsg.AckAsync();
}

@EGomz3 EGomz3 reopened this Jan 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants