Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
GPrabhudas authored Jul 22, 2021
2 parents fa925e7 + bbee640 commit 646a3f0
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pulsar/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ const (
AddToBatchFailed
// SeekFailed seek failed
SeekFailed
// ProducerClosed means producer already been closed
ProducerClosed
)

// Error implement error interface, composed of two parts: msg and result.
Expand Down Expand Up @@ -201,6 +203,8 @@ func getResultStr(r Result) string {
return "AddToBatchFailed"
case SeekFailed:
return "SeekFailed"
case ProducerClosed:
return "ProducerClosed"
default:
return fmt.Sprintf("Result(%d)", r)
}
Expand Down
8 changes: 8 additions & 0 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var (
errSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue is full")
errContextExpired = newError(TimeoutError, "message send context expired")
errMessageTooLarge = newError(MessageTooBig, "message size exceeds MaxMessageSize")
errProducerClosed = newError(ProducerClosed, "producer already been closed")

buffersPool sync.Pool
)
Expand Down Expand Up @@ -389,6 +390,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
if p.options.Schema != nil {
schemaPayload, err = p.options.Schema.Encode(msg.Value)
if err != nil {
p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value)
return
}
}
Expand Down Expand Up @@ -692,6 +694,12 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,

func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
if p.getProducerState() != producerReady {
// Producer is closing
callback(nil, msg, errProducerClosed)
return
}

sr := &sendRequest{
ctx: ctx,
msg: msg,
Expand Down
30 changes: 30 additions & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1169,3 +1169,33 @@ func TestProducerWithInterceptors(t *testing.T) {
assert.Equal(t, 10, metric.sendn)
assert.Equal(t, 10, metric.ackn)
}

func TestProducerSendAfterClose(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.NoError(t, err)
defer client.Close()

producer, err := client.CreateProducer(ProducerOptions{
Topic: newTopicName(),
})

assert.NoError(t, err)
assert.NotNil(t, producer)
defer producer.Close()

ID, err := producer.Send(context.Background(), &ProducerMessage{
Payload: []byte("hello"),
})

assert.NoError(t, err)
assert.NotNil(t, ID)

producer.Close()
ID, err = producer.Send(context.Background(), &ProducerMessage{
Payload: []byte("hello"),
})
assert.Nil(t, ID)
assert.Error(t, err)
}

0 comments on commit 646a3f0

Please sign in to comment.