Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
PGarule authored and PGarule committed Oct 25, 2021
2 parents 646a3f0 + 81f3830 commit bf999a2
Show file tree
Hide file tree
Showing 60 changed files with 1,160 additions and 411 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go-version: [1.11, 1.12, 1.13, 1.14]
go-version: [1.13, 1.14, 1.15, 1.16, 1.17]
steps:
- name: clean docker cache
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go-version: [1.11, 1.12, 1.13, 1.14]
go-version: [1.13, 1.14, 1.15, 1.16, 1.17]
steps:
- name: Set up Go
uses: actions/setup-go@v1
Expand Down
37 changes: 37 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,43 @@

All notable changes to this project will be documented in this file.

[0.6.0] 2021-07-21

## Feature

* Make PartitionsAutoDiscoveryInterval configurable, see [PR-514](https://github.com/apache/pulsar-client-go/pull/514).
* Always check connection close channell, before attempting to put requests, see [PR-521](https://github.com/apache/pulsar-client-go/pull/521).
* Add `LedgerId,EntryId,BatchIdx,PartitionIdx` func for MessageId interface, see [PR-529](https://github.com/apache/pulsar-client-go/pull/529).
* Add DisableReplication to Producer Message, see [PR-543](https://github.com/apache/pulsar-client-go/pull/543).
* Updating comments to conform to golang comment specification, see [PR-532](https://github.com/apache/pulsar-client-go/pull/532).
* Producer respects Context passed to Send() and SendAsync() when applying backpressure, see [PR-534](https://github.com/apache/pulsar-client-go/pull/534).
* Simplify connection close logic, see [PR-559](https://github.com/apache/pulsar-client-go/pull/559).
* Add open tracing to pulsar go client, see [PR-518](https://github.com/apache/pulsar-client-go/pull/518).
* Update proto file, see [PR-562](https://github.com/apache/pulsar-client-go/pull/562).
* Add send error logic for connection, see [PR-566](https://github.com/apache/pulsar-client-go/pull/566).
* Add license file for depend on libs, see [PR-567](https://github.com/apache/pulsar-client-go/pull/567).

## Improve

* Update jwt-go dependency to resolve vulnerabilities, see [PR-524](https://github.com/apache/pulsar-client-go/pull/524).
* Fixed Athenz repository name, see [PR-522](https://github.com/apache/pulsar-client-go/pull/522).
* Fix reader latest position, see [PR-525](https://github.com/apache/pulsar-client-go/pull/525).
* Fix timeout guarantee for RequestOnCnx, see [PR-492](https://github.com/apache/pulsar-client-go/pull/492).
* Fix nil pointer error with GetPartitionedTopicMetadata, see [PR-536](https://github.com/apache/pulsar-client-go/pull/536).
* Release locks before calling producer consumer response callbacks, see [PR-542](https://github.com/apache/pulsar-client-go/pull/542).
* Fix lookup service not implemented GetTopicsOfNamespace, see [PR-541](https://github.com/apache/pulsar-client-go/pull/541).
* Regenerate the certs to work with Pulsar 2.8.0 and Java 11, see [PR-548](https://github.com/apache/pulsar-client-go/pull/548).
* Fix race condition when resend pendingItems, see [PR-551](https://github.com/apache/pulsar-client-go/pull/551).
* Fix data race while accessing connection in partitionConsumer, see [PR-535](https://github.com/apache/pulsar-client-go/pull/535).
* Fix channel data race, see [PR-558](https://github.com/apache/pulsar-client-go/pull/558).
* Fix write to closed channel panic() in internal/connection during connection close, see [PR-539](https://github.com/apache/pulsar-client-go/pull/539).
* Fix possible race condition in connection pool, see [PR-561](https://github.com/apache/pulsar-client-go/pull/561).
* Fix default connection timeout, see [PR-563](https://github.com/apache/pulsar-client-go/pull/563).
* Add lock for compressionProviders to fix data race problem, see [PR-533](https://github.com/apache/pulsar-client-go/pull/533).
* Fix send goroutine blocked, see [PR-530](https://github.com/apache/pulsar-client-go/pull/530).



[0.5.0] 2021-05-14

## Feature
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ ARG GO_VERSION=golang:1.12
FROM apachepulsar/pulsar:latest as pulsar
FROM $GO_VERSION as go

RUN apt-get update && apt-get install -y openjdk-11-jre-headless
RUN apt-get update && apt-get install -y openjdk-11-jre-headless ca-certificates

COPY --from=pulsar /pulsar /pulsar

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
// This version number refers to the currently released version number
// Please fix the version when release.
v0.5.0
v0.6.0
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ require (
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/compress v1.10.8
github.com/linkedin/goavro/v2 v2.9.8
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/opentracing/opentracing-go v1.2.0
github.com/pierrec/lz4 v2.0.5+incompatible
github.com/pkg/errors v0.9.1
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3E
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
Expand Down Expand Up @@ -159,7 +158,6 @@ github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
Expand Down
13 changes: 13 additions & 0 deletions oauth2/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (

const (
ClaimNameUserName = "https://pulsar.apache.org/username"
ClaimNameName = "name"
ClaimNameSubject = "sub"
)

// Flow abstracts an OAuth 2.0 authentication and authorization flow
Expand Down Expand Up @@ -74,6 +76,10 @@ type AuthorizationGrant struct {
// Token contains an access token in the client credentials grant type,
// and a refresh token in the device authorization grant type
Token *oauth2.Token `json:"token,omitempty"`

// Scopes contains the scopes associated with the grant, or the scopes
// to request in the client credentials grant type
Scopes []string `json:"scopes,omitempty"`
}

// TokenResult holds token information
Expand Down Expand Up @@ -101,13 +107,20 @@ func convertToOAuth2Token(token *TokenResult, clock clock.Clock) oauth2.Token {
}

// ExtractUserName extracts the username claim from an authorization grant
// conforms to draft-ietf-oauth-access-token-jwt
func ExtractUserName(token oauth2.Token) (string, error) {
p := jwt.Parser{}
claims := jwt.MapClaims{}
if _, _, err := p.ParseUnverified(token.AccessToken, claims); err != nil {
return "", fmt.Errorf("unable to decode the access token: %v", err)
}
username, ok := claims[ClaimNameUserName]
if !ok {
username, ok = claims[ClaimNameName]
}
if !ok {
username, ok = claims[ClaimNameSubject]
}
if !ok {
return "", fmt.Errorf("access token doesn't contain a username claim")
}
Expand Down
20 changes: 18 additions & 2 deletions oauth2/authorization_tokenretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"errors"
"fmt"
"mime"
"net/http"
"net/url"
"strconv"
Expand Down Expand Up @@ -71,6 +72,7 @@ type ClientCredentialsExchangeRequest struct {
ClientID string
ClientSecret string
Audience string
Scopes []string
}

// DeviceCodeExchangeRequest is used to request the exchange of
Expand Down Expand Up @@ -195,7 +197,13 @@ func (ce *TokenRetriever) newClientCredentialsRequest(req ClientCredentialsExcha
uv.Set("grant_type", "client_credentials")
uv.Set("client_id", req.ClientID)
uv.Set("client_secret", req.ClientSecret)
uv.Set("audience", req.Audience)
if len(req.Scopes) > 0 {
uv.Set("scope", strings.Join(req.Scopes, " "))
}
if req.Audience != "" {
// Audience is an Auth0 extension; other providers use scopes to similar effect.
uv.Set("audience", req.Audience)
}

euv := uv.Encode()

Expand Down Expand Up @@ -237,7 +245,15 @@ func (ce *TokenRetriever) handleAuthTokensResponse(resp *http.Response) (*TokenR
}

if resp.StatusCode < 200 || resp.StatusCode > 299 {
if resp.Header.Get("Content-Type") == "application/json" {
cth := resp.Header.Get("Content-Type")
if cth == "" {
cth = "application/json"
}
ct, _, err := mime.ParseMediaType(cth)
if err != nil {
return nil, fmt.Errorf("unprocessable content type: %s: %w", cth, err)
}
if ct == "application/json" {
er := TokenErrorResponse{}
err := json.NewDecoder(resp.Body).Decode(&er)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions oauth2/client_credentials_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (c *ClientCredentialsFlow) Authorize(audience string) (*AuthorizationGrant,
ClientID: c.keyfile.ClientID,
ClientCredentials: c.keyfile,
TokenEndpoint: c.oidcWellKnownEndpoints.TokenEndpoint,
Scopes: c.options.AdditionalScopes,
}

// test the credentials and obtain an initial access token
Expand Down Expand Up @@ -139,6 +140,7 @@ func (g *ClientCredentialsGrantRefresher) Refresh(grant *AuthorizationGrant) (*A
Audience: grant.Audience,
ClientID: grant.ClientCredentials.ClientID,
ClientSecret: grant.ClientCredentials.ClientSecret,
Scopes: grant.Scopes,
}
tr, err := g.exchanger.ExchangeClientCredentials(exchangeRequest)
if err != nil {
Expand All @@ -153,6 +155,7 @@ func (g *ClientCredentialsGrantRefresher) Refresh(grant *AuthorizationGrant) (*A
ClientCredentials: grant.ClientCredentials,
TokenEndpoint: grant.TokenEndpoint,
Token: &token,
Scopes: grant.Scopes,
}
return grant, nil
}
4 changes: 4 additions & 0 deletions oauth2/client_credentials_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ var _ = Describe("ClientCredentialsGrantRefresher", func() {
ClientCredentials: &clientCredentials,
TokenEndpoint: oidcEndpoints.TokenEndpoint,
Token: nil,
Scopes: []string{"profile"},
}
_, err := refresher.Refresh(og)
Expect(err).ToNot(HaveOccurred())
Expand All @@ -155,6 +156,7 @@ var _ = Describe("ClientCredentialsGrantRefresher", func() {
ClientID: clientCredentials.ClientID,
ClientSecret: clientCredentials.ClientSecret,
Audience: og.Audience,
Scopes: og.Scopes,
}))
})

Expand All @@ -169,6 +171,7 @@ var _ = Describe("ClientCredentialsGrantRefresher", func() {
ClientCredentials: &clientCredentials,
TokenEndpoint: oidcEndpoints.TokenEndpoint,
Token: nil,
Scopes: []string{"profile"},
}
ng, err := refresher.Refresh(og)
Expect(err).ToNot(HaveOccurred())
Expand All @@ -178,6 +181,7 @@ var _ = Describe("ClientCredentialsGrantRefresher", func() {
Expect(ng.TokenEndpoint).To(Equal(oidcEndpoints.TokenEndpoint))
expected := convertToOAuth2Token(mockTokenExchanger.ReturnsTokens, mockClock)
Expect(*ng.Token).To(Equal(expected))
Expect(ng.Scopes).To(Equal([]string{"profile"}))
})
})
})
2 changes: 2 additions & 0 deletions oauth2/device_code_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func (p *DeviceCodeFlow) Authorize(audience string) (*AuthorizationGrant, error)
ClientID: p.options.ClientID,
TokenEndpoint: p.oidcWellKnownEndpoints.TokenEndpoint,
Token: &token,
Scopes: additionalScopes,
}
return grant, nil
}
Expand Down Expand Up @@ -198,6 +199,7 @@ func (g *DeviceAuthorizationGrantRefresher) Refresh(grant *AuthorizationGrant) (
ClientID: grant.ClientID,
Token: &token,
TokenEndpoint: grant.TokenEndpoint,
Scopes: grant.Scopes,
}
return grant, nil
}
8 changes: 6 additions & 2 deletions oauth2/device_code_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,12 @@ func (cp *LocalDeviceCodeProvider) newDeviceCodeRequest(
req *DeviceCodeRequest) (*http.Request, error) {
uv := url.Values{}
uv.Set("client_id", req.ClientID)
uv.Set("scope", strings.Join(req.Scopes, " "))
uv.Set("audience", req.Audience)
if len(req.Scopes) > 0 {
uv.Set("scope", strings.Join(req.Scopes, " "))
}
if req.Audience != "" {
uv.Set("audience", req.Audience)
}
euv := uv.Encode()

request, err := http.NewRequest("POST",
Expand Down
1 change: 1 addition & 0 deletions oauth2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.13

require (
github.com/99designs/keyring v1.1.6
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/form3tech-oss/jwt-go v3.2.3+incompatible
github.com/onsi/ginkgo v1.14.0
github.com/onsi/gomega v1.10.1
Expand Down
12 changes: 2 additions & 10 deletions oauth2/go.sum
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/99designs/keyring v1.1.5 h1:wLv7QyzYpFIyMSwOADq1CLTF9KbjbBfcnfmOGJ64aO4=
github.com/99designs/keyring v1.1.5/go.mod h1:7hsVvt2qXgtadGevGJ4ujg+u8m6SpJ5TpHqTozIPqf0=
github.com/99designs/keyring v1.1.6 h1:kVDC2uCgVwecxCk+9zoCt2uEL6dt+dfVzMvGgnVcIuM=
github.com/99designs/keyring v1.1.6/go.mod h1:16e0ds7LGQQcT59QqkTg72Hh5ShM51Byv5PEmW6uoRU=
github.com/danieljoos/wincred v1.0.2 h1:zf4bhty2iLuwgjgpraD2E9UbvO+fe54XXGJbOwe23fU=
github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3EJbmjhLdK9U=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1/go.mod h1:+hnT3ywWDTAFrW5aE+u2Sa/wT555ZqwoCS+pk3p6ry4=
github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a h1:mq+R6XEM6lJX5VlLyZIrUSP8tSuJp82xTK89hvBwJbU=
github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a/go.mod h1:7BvyPhdbLxMXIYTFPLsyJRFMsKmOZnQmzh6Gb+uquuM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b h1:HBah4D48ypg3J7Np4N+HY/ZR76fx3HEUGxDU6Uk39oQ=
github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b/go.mod h1:7BvyPhdbLxMXIYTFPLsyJRFMsKmOZnQmzh6Gb+uquuM=
github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c=
github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0=
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down Expand Up @@ -60,7 +56,6 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
Expand Down Expand Up @@ -113,6 +108,3 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/utils v0.0.0-20200619165400-6e3d28b6ed19 h1:7Nu2dTj82c6IaWvL7hImJzcXoTPz1MsSCH7r+0m6rfo=
k8s.io/utils v0.0.0-20200619165400-6e3d28b6ed19/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
15 changes: 15 additions & 0 deletions pulsar/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ type ClientOptions struct {
// FIXME: use `logger` as internal field name instead of `log` as it's more idiomatic
Logger log.Logger

// Specify metric cardinality to the tenant, namespace or topic levels, or remove it completely.
// Default: MetricsCardinalityNamespace
MetricsCardinality MetricsCardinality

// Add custom labels to all the metrics reported by this client instance
CustomMetricsLabels map[string]string
}
Expand Down Expand Up @@ -150,3 +154,14 @@ type Client interface {
// Close Closes the Client and free associated resources
Close()
}

// MetricsCardinality represents the specificty of labels on a per-metric basis
type MetricsCardinality int

const (
_ MetricsCardinality = iota
MetricsCardinalityNone // Do not add additional labels to metrics
MetricsCardinalityTenant // Label metrics by tenant
MetricsCardinalityNamespace // Label metrics by tenant and namespace
MetricsCardinalityTopic // Label metrics by topic
)
8 changes: 6 additions & 2 deletions pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,15 @@ func newClient(options ClientOptions) (Client, error) {
maxConnectionsPerHost = 1
}

if options.MetricsCardinality == 0 {
options.MetricsCardinality = MetricsCardinalityNamespace
}

var metrics *internal.Metrics
if options.CustomMetricsLabels != nil {
metrics = internal.NewMetricsProvider(options.CustomMetricsLabels)
metrics = internal.NewMetricsProvider(int(options.MetricsCardinality), options.CustomMetricsLabels)
} else {
metrics = internal.NewMetricsProvider(map[string]string{})
metrics = internal.NewMetricsProvider(int(options.MetricsCardinality), map[string]string{})
}

c := &client{
Expand Down
4 changes: 2 additions & 2 deletions pulsar/client_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func TestNamespaceTopicsNamespaceDoesNotExit(t *testing.T) {

// fetch from namespace that does not exist
name := generateRandomName()
topics, err := ci.lookupService.GetTopicsOfNamespace(fmt.Sprintf("%s/%s", name, name), internal.Persistent)
topics, err := ci.lookupService.GetTopicsOfNamespace(fmt.Sprintf("public/%s", name), internal.Persistent)
assert.Nil(t, err)
assert.Equal(t, 0, len(topics))
}
Expand All @@ -401,7 +401,7 @@ func TestNamespaceTopicsNamespaceDoesNotExitWebURL(t *testing.T) {

// fetch from namespace that does not exist
name := generateRandomName()
topics, err := ci.lookupService.GetTopicsOfNamespace(fmt.Sprintf("%s/%s", name, name), internal.Persistent)
topics, err := ci.lookupService.GetTopicsOfNamespace(fmt.Sprintf("public/%s", name), internal.Persistent)
assert.NotNil(t, err)
assert.Equal(t, 0, len(topics))
}
Expand Down
4 changes: 2 additions & 2 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ type ConsumerOptions struct {
// MaxReconnectToBroker set the maximum retry number of reconnectToBroker. (default: ultimate)
MaxReconnectToBroker *uint

// EncryptionInfo encryption related fields to decrypt the encrypted message
Encryption *ConsumerEncryptionInfo
// Decryption decryption related fields to decrypt the encrypted message
Decryption *MessageDecryptionInfo
}

// Consumer is an interface that abstracts behavior of Pulsar's consumer
Expand Down
Loading

0 comments on commit bf999a2

Please sign in to comment.