Skip to content

Commit

Permalink
Merge pull request #9 from Fanatics/consumer_topic
Browse files Browse the repository at this point in the history
export topic name
  • Loading branch information
GPrabhudas authored Jul 18, 2022
2 parents 01ba8bd + 5147054 commit ceb35c4
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 0 deletions.
3 changes: 3 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,7 @@ type Consumer interface {

// Name returns the name of consumer.
Name() string

// TopicName returns the topic of the single topic consumer
TopicName() (string, error)
}
4 changes: 4 additions & 0 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,3 +690,7 @@ func addMessageCryptoIfMissing(client *client, options *ConsumerOptions, topics
}
return nil
}

func (c *consumer) TopicName() (string, error) {
return c.topic, nil
}
4 changes: 4 additions & 0 deletions pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,7 @@ func (c *multiTopicConsumer) SeekByTime(time time.Time) error {
func (c *multiTopicConsumer) Name() string {
return c.consumerName
}

func (c *multiTopicConsumer) TopicName() (string, error) {
return "", newError(SeekFailed, "topic command not allowed for multi topic consumer")
}
2 changes: 2 additions & 0 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,8 @@ func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) {
seek.err = err
return
}
pc.log.Debugf("Successfully reset subscriptio to timestamp : %v",
uint64(seek.publishTime.UnixNano()/int64(time.Millisecond)))
pc.clearMessageChannels()
}

Expand Down
4 changes: 4 additions & 0 deletions pulsar/consumer_regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ func (c *regexConsumer) SeekByTime(time time.Time) error {
return newError(SeekFailed, "seek command not allowed for regex consumer")
}

func (c *regexConsumer) TopicName() (string, error) {
return "", newError(SeekFailed, "topic command not allowed for multi topic consumer")
}

// Name returns the name of consumer.
func (c *regexConsumer) Name() string {
return c.consumerName
Expand Down
4 changes: 4 additions & 0 deletions pulsar/internal/pulsartracing/consumer_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,7 @@ func (c *mockConsumer) SeekByTime(time time.Time) error {
func (c *mockConsumer) Name() string {
return ""
}

func (c *mockConsumer) TopicName() (string, error) {
return "", nil
}

0 comments on commit ceb35c4

Please sign in to comment.