diff --git a/pulsar/error.go b/pulsar/error.go index 60a832b9dd..f433bfc973 100644 --- a/pulsar/error.go +++ b/pulsar/error.go @@ -99,6 +99,8 @@ const ( AddToBatchFailed // SeekFailed seek failed SeekFailed + // ProducerClosed means producer already been closed + ProducerClosed ) // Error implement error interface, composed of two parts: msg and result. @@ -201,6 +203,8 @@ func getResultStr(r Result) string { return "AddToBatchFailed" case SeekFailed: return "SeekFailed" + case ProducerClosed: + return "ProducerClosed" default: return fmt.Sprintf("Result(%d)", r) } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index b67a86b9f6..b157988d45 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -53,6 +53,7 @@ var ( errSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue is full") errContextExpired = newError(TimeoutError, "message send context expired") errMessageTooLarge = newError(MessageTooBig, "message size exceeds MaxMessageSize") + errProducerClosed = newError(ProducerClosed, "producer already been closed") buffersPool sync.Pool ) @@ -389,6 +390,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { if p.options.Schema != nil { schemaPayload, err = p.options.Schema.Encode(msg.Value) if err != nil { + p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value) return } } @@ -692,6 +694,12 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error), flushImmediately bool) { + if p.getProducerState() != producerReady { + // Producer is closing + callback(nil, msg, errProducerClosed) + return + } + sr := &sendRequest{ ctx: ctx, msg: msg, diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 90af5ffee5..f87fd9c162 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -1169,3 +1169,33 @@ func TestProducerWithInterceptors(t *testing.T) { assert.Equal(t, 10, metric.sendn) assert.Equal(t, 10, metric.ackn) } + +func TestProducerSendAfterClose(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.NoError(t, err) + defer client.Close() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: newTopicName(), + }) + + assert.NoError(t, err) + assert.NotNil(t, producer) + defer producer.Close() + + ID, err := producer.Send(context.Background(), &ProducerMessage{ + Payload: []byte("hello"), + }) + + assert.NoError(t, err) + assert.NotNil(t, ID) + + producer.Close() + ID, err = producer.Send(context.Background(), &ProducerMessage{ + Payload: []byte("hello"), + }) + assert.Nil(t, ID) + assert.Error(t, err) +}