From 3d21b010779d5d9b3857845f425ebe8b51d15370 Mon Sep 17 00:00:00 2001 From: PGarule Date: Mon, 18 Jul 2022 16:52:09 +0530 Subject: [PATCH 1/2] export topic name --- pulsar/consumer.go | 3 +++ pulsar/consumer_impl.go | 4 ++++ pulsar/consumer_multitopic.go | 4 ++++ pulsar/consumer_partition.go | 1 + pulsar/consumer_regex.go | 4 ++++ pulsar/internal/pulsartracing/consumer_interceptor_test.go | 4 ++++ 6 files changed, 20 insertions(+) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 2df16374c4..bd845f4011 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -255,4 +255,7 @@ type Consumer interface { // Name returns the name of consumer. Name() string + + // TopicName returns the topic of the single topic consumer + TopicName() (string, error) } diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 2328ca882b..08bb043a01 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -690,3 +690,7 @@ func addMessageCryptoIfMissing(client *client, options *ConsumerOptions, topics } return nil } + +func (c *consumer) TopicName() (string, error) { + return c.topic, nil +} diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index 1d75a2477b..e20237d83d 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -229,3 +229,7 @@ func (c *multiTopicConsumer) SeekByTime(time time.Time) error { func (c *multiTopicConsumer) Name() string { return c.consumerName } + +func (c *multiTopicConsumer) TopicName() (string, error) { + return "", newError(SeekFailed, "topic command not allowed for multi topic consumer") +} diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index fac9d4bdd2..1688d702c0 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -552,6 +552,7 @@ func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) { seek.err = err return } + pc.log.Debugf("Successfully reset subscriptio to timestamp : %v", uint64(seek.publishTime.UnixNano()/int64(time.Millisecond))) pc.clearMessageChannels() } diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index e4d2077ac7..a031d464f6 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -248,6 +248,10 @@ func (c *regexConsumer) SeekByTime(time time.Time) error { return newError(SeekFailed, "seek command not allowed for regex consumer") } +func (c *regexConsumer) TopicName() (string, error) { + return "", newError(SeekFailed, "topic command not allowed for multi topic consumer") +} + // Name returns the name of consumer. func (c *regexConsumer) Name() string { return c.consumerName diff --git a/pulsar/internal/pulsartracing/consumer_interceptor_test.go b/pulsar/internal/pulsartracing/consumer_interceptor_test.go index 9e70d8bdf0..7a16c7ccbd 100644 --- a/pulsar/internal/pulsartracing/consumer_interceptor_test.go +++ b/pulsar/internal/pulsartracing/consumer_interceptor_test.go @@ -91,3 +91,7 @@ func (c *mockConsumer) SeekByTime(time time.Time) error { func (c *mockConsumer) Name() string { return "" } + +func (c *mockConsumer) TopicName() (string, error) { + return "", nil +} From 51470543fe23c1a42c64bcd9db67919f37cae2dd Mon Sep 17 00:00:00 2001 From: PGarule Date: Mon, 18 Jul 2022 16:57:10 +0530 Subject: [PATCH 2/2] split line --- pulsar/consumer_partition.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 1688d702c0..7287c05207 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -552,7 +552,8 @@ func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) { seek.err = err return } - pc.log.Debugf("Successfully reset subscriptio to timestamp : %v", uint64(seek.publishTime.UnixNano()/int64(time.Millisecond))) + pc.log.Debugf("Successfully reset subscriptio to timestamp : %v", + uint64(seek.publishTime.UnixNano()/int64(time.Millisecond))) pc.clearMessageChannels() }