From 1dfb8fdffb97d0c4f4845cc668bed485324daf5c Mon Sep 17 00:00:00 2001 From: Zhiqiang Li Date: Wed, 21 Jul 2021 17:43:23 +0800 Subject: [PATCH 1/2] Add producer check state before send msg. (#569) Add producer state check before send msg. --- pulsar/error.go | 4 ++++ pulsar/producer_partition.go | 7 +++++++ pulsar/producer_test.go | 30 ++++++++++++++++++++++++++++++ 3 files changed, 41 insertions(+) diff --git a/pulsar/error.go b/pulsar/error.go index 60a832b9dd..f433bfc973 100644 --- a/pulsar/error.go +++ b/pulsar/error.go @@ -99,6 +99,8 @@ const ( AddToBatchFailed // SeekFailed seek failed SeekFailed + // ProducerClosed means producer already been closed + ProducerClosed ) // Error implement error interface, composed of two parts: msg and result. @@ -201,6 +203,8 @@ func getResultStr(r Result) string { return "AddToBatchFailed" case SeekFailed: return "SeekFailed" + case ProducerClosed: + return "ProducerClosed" default: return fmt.Sprintf("Result(%d)", r) } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 8b3d33dadd..7e83bfa42e 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -50,6 +50,7 @@ var ( errSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue is full") errContextExpired = newError(TimeoutError, "message send context expired") errMessageTooLarge = newError(MessageTooBig, "message size exceeds MaxMessageSize") + errProducerClosed = newError(ProducerClosed, "producer already been closed") buffersPool sync.Pool ) @@ -658,6 +659,12 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error), flushImmediately bool) { + if p.getProducerState() != producerReady { + // Producer is closing + callback(nil, msg, errProducerClosed) + return + } + sr := &sendRequest{ ctx: ctx, msg: msg, diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 7c3dbd76bb..bbe8028e55 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -1097,3 +1097,33 @@ func TestProducerWithInterceptors(t *testing.T) { assert.Equal(t, 10, metric.sendn) assert.Equal(t, 10, metric.ackn) } + +func TestProducerSendAfterClose(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.NoError(t, err) + defer client.Close() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: newTopicName(), + }) + + assert.NoError(t, err) + assert.NotNil(t, producer) + defer producer.Close() + + ID, err := producer.Send(context.Background(), &ProducerMessage{ + Payload: []byte("hello"), + }) + + assert.NoError(t, err) + assert.NotNil(t, ID) + + producer.Close() + ID, err = producer.Send(context.Background(), &ProducerMessage{ + Payload: []byte("hello"), + }) + assert.Nil(t, ID) + assert.Error(t, err) +} From bbee6401ac34ae1d8ca5f08e8990418c69aa52e5 Mon Sep 17 00:00:00 2001 From: Zhiqiang Li Date: Wed, 21 Jul 2021 18:41:03 +0800 Subject: [PATCH 2/2] [issue 490] Add error log when schema encode failed. (#571) * Add error log when schema encode failed. * format code --- pulsar/producer_partition.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 7e83bfa42e..abec4fc1f7 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -356,6 +356,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { if p.options.Schema != nil { schemaPayload, err = p.options.Schema.Encode(msg.Value) if err != nil { + p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value) return } }