Skip to content

Commit

Permalink
Merge pull request #2156 from brave-intl/master
Browse files Browse the repository at this point in the history
Production 2023-10-17_01
  • Loading branch information
clD11 authored Oct 18, 2023
2 parents a928881 + 48d76c5 commit f558bb8
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 70 deletions.
54 changes: 52 additions & 2 deletions services/wallet/controllers_v3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"testing"
"time"

sqlmock "github.com/DATA-DOG/go-sqlmock"
"github.com/DATA-DOG/go-sqlmock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -34,7 +34,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/jmoiron/sqlx"
uuid "github.com/satori/go.uuid"
jose "gopkg.in/square/go-jose.v2"
"gopkg.in/square/go-jose.v2"
"gopkg.in/square/go-jose.v2/jwt"
)

Expand Down Expand Up @@ -259,6 +259,16 @@ func TestLinkBitFlyerWalletV3(t *testing.T) {
ctx = context.WithValue(ctx, appctx.ReputationClientCTXKey, mockReputation)
ctx = context.WithValue(ctx, appctx.NoUnlinkPriorToDurationCTXKey, "-P1D")

mockReputation.EXPECT().IsLinkingReputable(
gomock.Any(), // ctx
gomock.Any(), // wallet id
gomock.Any(), // country
).Return(
true,
[]int{},
nil,
)

r = r.WithContext(ctx)

router := chi.NewRouter()
Expand Down Expand Up @@ -317,6 +327,16 @@ func TestLinkGeminiWalletV3RelinkBadRegion(t *testing.T) {
rw = httptest.NewRecorder()
)

mockReputationClient.EXPECT().IsLinkingReputable(
gomock.Any(), // ctx
gomock.Any(), // wallet id
gomock.Any(), // country
).Return(
true,
[]int{},
nil,
)

ctx = context.WithValue(ctx, appctx.DatastoreCTXKey, datastore)
ctx = context.WithValue(ctx, appctx.ReputationClientCTXKey, mockReputationClient)
ctx = context.WithValue(ctx, appctx.GeminiClientCTXKey, mockGeminiClient)
Expand Down Expand Up @@ -537,6 +557,16 @@ func TestLinkGeminiWalletV3FirstLinking(t *testing.T) {
rw = httptest.NewRecorder()
)

mockReputationClient.EXPECT().IsLinkingReputable(
gomock.Any(), // ctx
gomock.Any(), // wallet id
gomock.Any(), // country
).Return(
true,
[]int{},
nil,
)

ctx = context.WithValue(ctx, appctx.DatastoreCTXKey, datastore)
ctx = context.WithValue(ctx, appctx.ReputationClientCTXKey, mockReputationClient)
ctx = context.WithValue(ctx, appctx.GeminiClientCTXKey, mockGeminiClient)
Expand Down Expand Up @@ -740,6 +770,16 @@ func TestLinkZebPayWalletV3(t *testing.T) {
)),
)

mockReputationClient.EXPECT().IsLinkingReputable(
gomock.Any(), // ctx
gomock.Any(), // wallet id
gomock.Any(), // country
).Return(
true,
[]int{},
nil,
)

mockSQLCustodianLink(mock, "zebpay")

// begin linking tx
Expand Down Expand Up @@ -847,6 +887,16 @@ func TestLinkGeminiWalletV3(t *testing.T) {
nil,
)

mockReputationClient.EXPECT().IsLinkingReputable(
gomock.Any(), // ctx
gomock.Any(), // wallet id
gomock.Any(), // country
).Return(
true,
[]int{},
nil,
)

mockSQLCustodianLink(mock, "gemini")

// begin linking tx
Expand Down
67 changes: 27 additions & 40 deletions services/wallet/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,18 @@ var (
Help: "A counter for seeing how many custodian accounts have been linked 10 times",
ConstLabels: prometheus.Labels{"service": "wallet"},
})
// counter for flagged unusual
countLinkingFlaggedUnusual = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "count_linking_flagged_unusual",
Help: "provides a count of unusual linkings flagged results",
ConstLabels: prometheus.Labels{"service": "wallet"},
})
)

func init() {
prometheus.MustRegister(tooManyCardsCounter)
prometheus.MustRegister(metricTxLockGauge)
prometheus.MustRegister(tenLinkagesReached)
prometheus.MustRegister(countLinkingFlaggedUnusual)
}

// Datastore holds the interface for the wallet datastore
type Datastore interface {
datastore.Datastore
LinkWallet(ctx context.Context, ID string, providerID string, providerLinkingID uuid.UUID, depositProvider string) error
LinkWallet(ctx context.Context, id string, providerID string, providerLinkingID uuid.UUID, depositProvider, country string) error
GetLinkingLimitInfo(ctx context.Context, providerLinkingID string) (map[string]LinkingInfo, error)
HasPriorLinking(ctx context.Context, walletID uuid.UUID, providerLinkingID uuid.UUID) (bool, error)
// GetLinkingsByProviderLinkingID gets the wallet linking info by provider linking id
Expand Down Expand Up @@ -560,14 +552,26 @@ var (
ErrGeoResetDifferent = errors.New("geo reset is different")
)

// LinkWallet links a wallet together
func (pg *Postgres) LinkWallet(ctx context.Context, ID string, userDepositDestination string, providerLinkingID uuid.UUID, depositProvider string) error {
sublogger := logger(ctx).With().Str("wallet_id", ID).Logger()
sublogger.Debug().Msg("linking wallet")
// LinkWallet links a rewards wallet to the given deposit provider.
func (pg *Postgres) LinkWallet(ctx context.Context, id string, userDepositDestination string, providerLinkingID uuid.UUID, depositProvider, country string) error {
walletID, err := uuid.FromString(id)
if err != nil {
return fmt.Errorf("invalid wallet id, not uuid: %w", err)
}

repClient, ok := ctx.Value(appctx.ReputationClientCTXKey).(reputation.Client)
if !ok {
return ErrNoReputationClient
}

// TODO(clD11): We no longer need to act on the response and only require a successful call to reputation to
// continue linking. As part of the wallet refactor we should clean this up.
if _, _, err := repClient.IsLinkingReputable(ctx, walletID, country); err != nil {
return fmt.Errorf("failed to check wallet rep: %w", err)
}

ctx, tx, rollback, commit, err := getTx(ctx, pg)
if err != nil {
sublogger.Error().Err(err).Msg("error getting tx")
return fmt.Errorf("error getting tx: %w", err)
}
defer func() {
Expand All @@ -576,52 +580,35 @@ func (pg *Postgres) LinkWallet(ctx context.Context, ID string, userDepositDestin
}()

metricTxLockGauge.Inc()
err = waitAndLockTx(ctx, tx, providerLinkingID)
if err != nil {
sublogger.Error().Err(err).Msg("error acquiring tx lock")
if err := waitAndLockTx(ctx, tx, providerLinkingID); err != nil {
return fmt.Errorf("error acquiring tx lock: %w", err)
}

id, err := uuid.FromString(ID)
if err != nil {
return errorutils.Wrap(err, "error invalid id")
}

// connect custodian link (does the link limit checking in insert)
if err = pg.ConnectCustodialWallet(ctx, &CustodianLink{
WalletID: &id,
if err := pg.ConnectCustodialWallet(ctx, &CustodianLink{
WalletID: &walletID,
Custodian: depositProvider,
LinkingID: &providerLinkingID,
}, userDepositDestination); err != nil {
sublogger.Error().Err(err).
Msg("error connect custodian wallet")
return fmt.Errorf("error connect custodian wallet: %w", err)
}

// TODO(clD11): the below verified wallets calls were added as a quick fix and should be addressed in the wallet refactor.
if VerifiedWalletEnable {
err := pg.InsertVerifiedWalletOutboxTx(ctx, tx, id, true)
if err != nil {
if err := pg.InsertVerifiedWalletOutboxTx(ctx, tx, walletID, true); err != nil {
return fmt.Errorf("failed to update verified wallet: %w", err)
}
}

if directVerifiedWalletEnable {
client, ok := ctx.Value(appctx.ReputationClientCTXKey).(reputation.Client)
if !ok {
return ErrNoReputationClient
op := func() (interface{}, error) {
return nil, repClient.UpdateReputationSummary(ctx, walletID.String(), true)
}
upsertReputationSummary := func() (interface{}, error) {
return nil, client.UpdateReputationSummary(ctx, ID, true)
}
_, err = backoff.Retry(ctx, upsertReputationSummary, retryPolicy, canRetry(nonRetriableErrors))
if err != nil {
if _, err := backoff.Retry(ctx, op, retryPolicy, canRetry(nonRetriableErrors)); err != nil {
return fmt.Errorf("failed to update verified wallet: %w", err)
}
}

err = commit()
if err != nil {
sublogger.Error().Err(err).Msg("error committing tx")
if err := commit(); err != nil {
sentry.CaptureException(fmt.Errorf("error failed to commit link wallet transaction: %w", err))
return fmt.Errorf("error committing tx: %w", err)
}
Expand Down
55 changes: 36 additions & 19 deletions services/wallet/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,16 @@ func (suite *WalletPostgresTestSuite) TestLinkWallet_Concurrent_InsertUpdate() {
pg, _, err := NewPostgres()
suite.Require().NoError(err)

for i := 0; i < 1; i++ {
mockCtrl := gomock.NewController(suite.T())
defer mockCtrl.Finish()

repClient := mock_reputation.NewMockClient(mockCtrl)
repClient.EXPECT().IsLinkingReputable(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()

ctx := context.WithValue(context.Background(), appctx.NoUnlinkPriorToDurationCTXKey, "-P1D")
ctx = context.WithValue(ctx, appctx.ReputationClientCTXKey, repClient)

for i := 0; i < 1; i++ {
// seed 3 wallets with same linkingID
userDepositDestination, providerLinkingID := suite.seedWallet(pg)

Expand All @@ -214,8 +222,7 @@ func (suite *WalletPostgresTestSuite) TestLinkWallet_Concurrent_InsertUpdate() {
PublicKey: "hBrtClwIppLmu/qZ8EhGM1TQZUwDUosbOrVu1jMwryY=",
}

err = pg.UpsertWallet(context.WithValue(context.Background(),
appctx.NoUnlinkPriorToDurationCTXKey, "-P1D"), walletInfo)
err = pg.UpsertWallet(ctx, walletInfo)
suite.Require().NoError(err, "save wallet should succeed")

runs := 2
Expand All @@ -225,15 +232,12 @@ func (suite *WalletPostgresTestSuite) TestLinkWallet_Concurrent_InsertUpdate() {
for i := 0; i < runs; i++ {
go func() {
defer wg.Done()
err = pg.LinkWallet(context.WithValue(context.Background(), appctx.NoUnlinkPriorToDurationCTXKey, "-P1D"),
walletInfo.ID, userDepositDestination, providerLinkingID, walletInfo.Provider)
err = pg.LinkWallet(ctx, walletInfo.ID, userDepositDestination, providerLinkingID, walletInfo.Provider, "")
}()
}

wg.Wait()

used, max, err := pg.GetCustodianLinkCount(context.WithValue(context.Background(),
appctx.NoUnlinkPriorToDurationCTXKey, "-P1D"), providerLinkingID, "")
used, max, err := pg.GetCustodianLinkCount(ctx, providerLinkingID, "")

suite.Require().NoError(err, "should have no error getting custodian link count")
suite.Require().True(used == max, fmt.Sprintf("used %d should not exceed max %d", used, max))
Expand All @@ -244,6 +248,15 @@ func (suite *WalletPostgresTestSuite) seedWallet(pg Datastore) (string, uuid.UUI
userDepositDestination := uuid.NewV4().String()
providerLinkingID := uuid.NewV4()

mockCtrl := gomock.NewController(suite.T())
defer mockCtrl.Finish()

repClient := mock_reputation.NewMockClient(mockCtrl)
repClient.EXPECT().IsLinkingReputable(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()

ctx := context.WithValue(context.Background(), appctx.NoUnlinkPriorToDurationCTXKey, "-P1D")
ctx = context.WithValue(ctx, appctx.ReputationClientCTXKey, repClient)

walletCount := 3
for i := 0; i < walletCount; i++ {
altCurrency := altcurrency.BAT
Expand All @@ -256,16 +269,14 @@ func (suite *WalletPostgresTestSuite) seedWallet(pg Datastore) (string, uuid.UUI
AnonymousAddress: nil,
}

err := pg.UpsertWallet(context.WithValue(context.Background(), appctx.NoUnlinkPriorToDurationCTXKey, "-P1D"), walletInfo)
err := pg.UpsertWallet(ctx, walletInfo)
suite.Require().NoError(err, "save wallet should succeed")

err = pg.LinkWallet(context.WithValue(context.Background(), appctx.NoUnlinkPriorToDurationCTXKey, "-P1D"),
walletInfo.ID, userDepositDestination, providerLinkingID, "uphold")
err = pg.LinkWallet(ctx, walletInfo.ID, userDepositDestination, providerLinkingID, "uphold", "")
suite.Require().NoError(err, "link wallet should succeed")
}

used, _, err := pg.GetCustodianLinkCount(context.WithValue(context.Background(),
appctx.NoUnlinkPriorToDurationCTXKey, "-P1D"), providerLinkingID, "")
used, _, err := pg.GetCustodianLinkCount(ctx, providerLinkingID, "")

suite.Require().NoError(err, "should have no error getting custodian link count")
suite.Require().True(used == walletCount, fmt.Sprintf("used %d", used))
Expand All @@ -277,6 +288,15 @@ func (suite *WalletPostgresTestSuite) TestLinkWallet_Concurrent_MaxLinkCount() {
pg, _, err := NewPostgres()
suite.Require().NoError(err)

mockCtrl := gomock.NewController(suite.T())
defer mockCtrl.Finish()

repClient := mock_reputation.NewMockClient(mockCtrl)
repClient.EXPECT().IsLinkingReputable(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()

ctx := context.WithValue(context.Background(), appctx.NoUnlinkPriorToDurationCTXKey, "-P1D")
ctx = context.WithValue(ctx, appctx.ReputationClientCTXKey, repClient)

wallets := make([]*walletutils.Info, 10, 10)

for i := 0; i < len(wallets); i++ {
Expand All @@ -289,7 +309,7 @@ func (suite *WalletPostgresTestSuite) TestLinkWallet_Concurrent_MaxLinkCount() {
PublicKey: "hBrtClwIppLmu/qZ8EhGM1TQZUwDUosbOrVu1jMwryY=",
}
wallets[i] = walletInfo
err := pg.UpsertWallet(context.WithValue(context.Background(), appctx.NoUnlinkPriorToDurationCTXKey, "-P1D"), walletInfo)
err := pg.UpsertWallet(ctx, walletInfo)
suite.Require().NoError(err, "save wallet should succeed")
}

Expand All @@ -302,15 +322,12 @@ func (suite *WalletPostgresTestSuite) TestLinkWallet_Concurrent_MaxLinkCount() {
for i := 0; i < len(wallets); i++ {
go func(index int) {
defer wg.Done()
err = pg.LinkWallet(context.WithValue(context.Background(), appctx.NoUnlinkPriorToDurationCTXKey, "-P1D"),
wallets[index].ID, userDepositDestination, providerLinkingID, wallets[index].Provider)
err = pg.LinkWallet(ctx, wallets[index].ID, userDepositDestination, providerLinkingID, wallets[index].Provider, "")
}(i)
}

wg.Wait()

used, max, err := pg.GetCustodianLinkCount(context.WithValue(context.Background(),
appctx.NoUnlinkPriorToDurationCTXKey, "-P1D"), providerLinkingID, "")
used, max, err := pg.GetCustodianLinkCount(ctx, providerLinkingID, "")

suite.Require().NoError(err, "should have no error getting custodian link count")
suite.Require().True(used == max, fmt.Sprintf("used %d should not exceed max %d", used, max))
Expand Down
4 changes: 2 additions & 2 deletions services/wallet/instrumented_datastore.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f558bb8

Please sign in to comment.