diff --git a/internal/assets/manager.go b/internal/assets/manager.go index d37e9e172..68c814946 100644 --- a/internal/assets/manager.go +++ b/internal/assets/manager.go @@ -18,7 +18,9 @@ package assets import ( "context" + "fmt" + "github.com/hyperledger/firefly/internal/broadcast" "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/data" "github.com/hyperledger/firefly/internal/i18n" @@ -37,9 +39,10 @@ type Manager interface { GetTokenPools(ctx context.Context, ns, typeName string, filter database.AndFilter) ([]*fftypes.TokenPool, *database.FilterResult, error) GetTokenPool(ctx context.Context, ns, typeName, name string) (*fftypes.TokenPool, error) GetTokenAccounts(ctx context.Context, ns, typeName, name string, filter database.AndFilter) ([]*fftypes.TokenAccount, *database.FilterResult, error) + ValidateTokenPoolTx(ctx context.Context, pool *fftypes.TokenPool, protocolTxID string) error // Bound token callbacks - TokenPoolCreated(tk tokens.Plugin, pool *fftypes.TokenPool, signingIdentity string, protocolTxID string, additionalInfo fftypes.JSONObject) error + TokenPoolCreated(tk tokens.Plugin, tokenType fftypes.TokenType, tx *fftypes.UUID, protocolID, signingIdentity, protocolTxID string, additionalInfo fftypes.JSONObject) error Start() error WaitStop() @@ -51,12 +54,13 @@ type assetManager struct { identity identity.Plugin data data.Manager syncasync syncasync.Bridge + broadcast broadcast.Manager tokens map[string]tokens.Plugin retry retry.Retry txhelper txcommon.Helper } -func NewAssetManager(ctx context.Context, di database.Plugin, ii identity.Plugin, dm data.Manager, sa syncasync.Bridge, ti map[string]tokens.Plugin) (Manager, error) { +func NewAssetManager(ctx context.Context, di database.Plugin, ii identity.Plugin, dm data.Manager, sa syncasync.Bridge, bm broadcast.Manager, ti map[string]tokens.Plugin) (Manager, error) { if di == nil || ii == nil || sa == nil || ti == nil { return nil, i18n.NewError(ctx, i18n.MsgInitializationNilDepError) } @@ -66,6 +70,7 @@ func NewAssetManager(ctx context.Context, di database.Plugin, ii identity.Plugin identity: ii, data: dm, syncasync: sa, + broadcast: bm, tokens: ti, retry: retry.Retry{ InitialDelay: config.GetDuration(config.AssetManagerRetryInitialDelay), @@ -86,6 +91,28 @@ func (am *assetManager) selectTokenPlugin(ctx context.Context, name string) (tok return nil, i18n.NewError(ctx, i18n.MsgUnknownTokensPlugin, name) } +func storeTokenOpInputs(op *fftypes.Operation, pool *fftypes.TokenPool) { + op.Input = fftypes.JSONObject{ + "id": pool.ID.String(), + "namespace": pool.Namespace, + "name": pool.Name, + } +} + +func retrieveTokenOpInputs(ctx context.Context, op *fftypes.Operation, pool *fftypes.TokenPool) (err error) { + input := &op.Input + pool.ID, err = fftypes.ParseUUID(ctx, input.GetString("id")) + if err != nil { + return err + } + pool.Namespace = input.GetString("namespace") + pool.Name = input.GetString("name") + if pool.Namespace == "" || pool.Name == "" { + return fmt.Errorf("namespace or name missing from inputs") + } + return nil +} + func (am *assetManager) CreateTokenPool(ctx context.Context, ns string, typeName string, pool *fftypes.TokenPool, waitConfirm bool) (*fftypes.TokenPool, error) { return am.CreateTokenPoolWithID(ctx, ns, fftypes.NewUUID(), typeName, pool, waitConfirm) } @@ -132,6 +159,13 @@ func (am *assetManager) CreateTokenPoolWithID(ctx context.Context, ns string, id return nil, err } + pool.ID = id + pool.Namespace = ns + pool.TX = fftypes.TransactionRef{ + ID: tx.ID, + Type: tx.Subject.Type, + } + op := fftypes.NewTXOperation( plugin, ns, @@ -140,17 +174,12 @@ func (am *assetManager) CreateTokenPoolWithID(ctx context.Context, ns string, id fftypes.OpTypeTokensCreatePool, fftypes.OpStatusPending, author.Identifier) + storeTokenOpInputs(op, pool) err = am.database.UpsertOperation(ctx, op, false) if err != nil { return nil, err } - pool.ID = id - pool.Namespace = ns - pool.TX = fftypes.TransactionRef{ - ID: tx.ID, - Type: tx.Subject.Type, - } return pool, plugin.CreateTokenPool(ctx, op.ID, author, pool) } @@ -189,6 +218,11 @@ func (am *assetManager) GetTokenAccounts(ctx context.Context, ns, typeName, name return am.database.GetTokenAccounts(ctx, filter.Condition(filter.Builder().Eq("protocolid", pool.ProtocolID))) } +func (am *assetManager) ValidateTokenPoolTx(ctx context.Context, pool *fftypes.TokenPool, protocolTxID string) error { + // TODO: validate that the given token pool was created with the given protocolTxId + return nil +} + func (am *assetManager) Start() error { return nil } diff --git a/internal/assets/manager_test.go b/internal/assets/manager_test.go index 53935d641..9d0627e48 100644 --- a/internal/assets/manager_test.go +++ b/internal/assets/manager_test.go @@ -21,6 +21,7 @@ import ( "github.com/hyperledger/firefly/internal/config" "github.com/hyperledger/firefly/internal/syncasync" + "github.com/hyperledger/firefly/mocks/broadcastmocks" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/datamocks" "github.com/hyperledger/firefly/mocks/identitymocks" @@ -40,12 +41,13 @@ func newTestAssets(t *testing.T) (*assetManager, func()) { mii := &identitymocks.Plugin{} mdm := &datamocks.Manager{} msa := &syncasyncmocks.Bridge{} + mbm := &broadcastmocks.Manager{} mti := &tokenmocks.Plugin{} mti.On("Name").Return("ut_tokens").Maybe() defaultIdentity := &fftypes.Identity{Identifier: "UTNodeID", OnChain: "0x12345"} mii.On("Resolve", mock.Anything, "UTNodeID").Return(defaultIdentity, nil).Maybe() ctx, cancel := context.WithCancel(context.Background()) - a, err := NewAssetManager(ctx, mdi, mii, mdm, msa, map[string]tokens.Plugin{"magic-tokens": mti}) + a, err := NewAssetManager(ctx, mdi, mii, mdm, msa, mbm, map[string]tokens.Plugin{"magic-tokens": mti}) rag := mdi.On("RunAsGroup", ctx, mock.Anything).Maybe() rag.RunFn = func(a mock.Arguments) { rag.ReturnArguments = mock.Arguments{a[1].(func(context.Context) error)(a[0].(context.Context))} @@ -55,7 +57,7 @@ func newTestAssets(t *testing.T) (*assetManager, func()) { } func TestInitFail(t *testing.T) { - _, err := NewAssetManager(context.Background(), nil, nil, nil, nil, nil) + _, err := NewAssetManager(context.Background(), nil, nil, nil, nil, nil, nil) assert.Regexp(t, "FF10128", err) } @@ -289,3 +291,11 @@ func TestGetTokenAccountsBadPool(t *testing.T) { _, _, err := am.GetTokenAccounts(context.Background(), "ns1", "magic-tokens", "test", f) assert.EqualError(t, err, "pop") } + +func TestValidateTokenPoolTx(t *testing.T) { + am, cancel := newTestAssets(t) + defer cancel() + + err := am.ValidateTokenPoolTx(context.Background(), nil, "") + assert.NoError(t, err) +} diff --git a/internal/assets/token_pool_created.go b/internal/assets/token_pool_created.go index bcdb042dc..eb1a0f6fc 100644 --- a/internal/assets/token_pool_created.go +++ b/internal/assets/token_pool_created.go @@ -17,69 +17,78 @@ package assets import ( - "context" - "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/fftypes" "github.com/hyperledger/firefly/pkg/tokens" ) -func (am *assetManager) persistTokenPoolTransaction(ctx context.Context, pool *fftypes.TokenPool, signingIdentity string, protocolTxID string, additionalInfo fftypes.JSONObject) (valid bool, err error) { - if pool.ID == nil || pool.TX.ID == nil { - log.L(ctx).Errorf("Invalid token pool '%s'. Missing ID (%v) or transaction ID (%v)", pool.ID, pool.ID, pool.TX.ID) - return false, nil // this is not retryable +func (am *assetManager) TokenPoolCreated(tk tokens.Plugin, tokenType fftypes.TokenType, tx *fftypes.UUID, protocolID, signingIdentity, protocolTxID string, additionalInfo fftypes.JSONObject) error { + // Find a matching operation within this transaction + fb := database.OperationQueryFactory.NewFilter(am.ctx) + filter := fb.And( + fb.Eq("tx", tx), + fb.Eq("type", fftypes.OpTypeTokensCreatePool), + ) + operations, _, err := am.database.GetOperations(am.ctx, filter) + if err != nil || len(operations) == 0 { + log.L(am.ctx).Debugf("Token pool transaction '%s' ignored, as it was not submitted by this node", tx) + return nil + } + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + Type: tokenType, + ProtocolID: protocolID, + Author: signingIdentity, + }, + ProtocolTxID: protocolTxID, } - return am.txhelper.PersistTransaction(ctx, &fftypes.Transaction{ - ID: pool.TX.ID, + err = retrieveTokenOpInputs(am.ctx, operations[0], &pool.TokenPool) + if err != nil { + log.L(am.ctx).Errorf("Error retrieving pool info from transaction '%s' (%s) - ignoring: %v", tx, err, operations[0].Input) + return nil + } + + // Update the transaction with the info received (but leave transaction as "pending") + transaction := &fftypes.Transaction{ + ID: tx, + Status: fftypes.OpStatusPending, Subject: fftypes.TransactionSubject{ Namespace: pool.Namespace, - Type: pool.TX.Type, + Type: fftypes.TransactionTypeTokenPool, Signer: signingIdentity, Reference: pool.ID, }, ProtocolID: protocolTxID, Info: additionalInfo, - }) -} - -func (am *assetManager) persistTokenPool(ctx context.Context, pool *fftypes.TokenPool) (valid bool, err error) { - l := log.L(ctx) - if err := fftypes.ValidateFFNameField(ctx, pool.Name, "name"); err != nil { - l.Errorf("Invalid token pool '%s' - invalid name '%s': %a", pool.ID, pool.Name, err) - return false, nil // This is not retryable } - err = am.database.UpsertTokenPool(ctx, pool) - if err != nil { - if err == database.IDMismatch { - log.L(ctx).Errorf("Invalid token pool '%s'. ID mismatch with existing record", pool.ID) - return false, nil // This is not retryable + pool.TX.ID = transaction.ID + pool.TX.Type = transaction.Subject.Type + + // Add a new operation for the announcement + op := fftypes.NewTXOperation( + tk, + pool.Namespace, + tx, + "", + fftypes.OpTypeTokensAnnouncePool, + fftypes.OpStatusPending, + signingIdentity) + + var valid bool + err = am.retry.Do(am.ctx, "persist token pool transaction", func(attempt int) (bool, error) { + valid, err = am.txhelper.PersistTransaction(am.ctx, transaction) + if valid && err == nil { + err = am.database.UpsertOperation(am.ctx, op, false) } - l.Errorf("Failed to insert token pool '%s': %s", pool.ID, err) - return false, err // a persistence failure here is considered retryable (so returned) + return err != nil, err + }) + if !valid || err != nil { + return err } - return true, nil -} -func (am *assetManager) TokenPoolCreated(tk tokens.Plugin, pool *fftypes.TokenPool, signingIdentity string, protocolTxID string, additionalInfo fftypes.JSONObject) error { - return am.retry.Do(am.ctx, "persist token pool", func(attempt int) (bool, error) { - err := am.database.RunAsGroup(am.ctx, func(ctx context.Context) error { - valid, err := am.persistTokenPoolTransaction(ctx, pool, signingIdentity, protocolTxID, additionalInfo) - if valid && err == nil { - valid, err = am.persistTokenPool(ctx, pool) - } - if err != nil { - return err - } - if !valid { - log.L(ctx).Warnf("Token pool rejected id=%s author=%s", pool.ID, signingIdentity) - event := fftypes.NewEvent(fftypes.EventTypePoolRejected, pool.Namespace, pool.ID) - return am.database.InsertEvent(ctx, event) - } - log.L(ctx).Infof("Token pool created id=%s author=%s", pool.ID, signingIdentity) - event := fftypes.NewEvent(fftypes.EventTypePoolConfirmed, pool.Namespace, pool.ID) - return am.database.InsertEvent(ctx, event) - }) - return err != nil, err // retry indefinitely (until context closes) - }) + // Announce the details of the new token pool + _, err = am.broadcast.BroadcastTokenPool(am.ctx, pool.Namespace, pool, false) + return err } diff --git a/internal/assets/token_pool_created_test.go b/internal/assets/token_pool_created_test.go index 484f93102..5c0419e2d 100644 --- a/internal/assets/token_pool_created_test.go +++ b/internal/assets/token_pool_created_test.go @@ -17,9 +17,9 @@ package assets import ( - "fmt" "testing" + "github.com/hyperledger/firefly/mocks/broadcastmocks" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/tokenmocks" "github.com/hyperledger/firefly/pkg/database" @@ -33,267 +33,149 @@ func TestTokenPoolCreatedSuccess(t *testing.T) { defer cancel() mdi := am.database.(*databasemocks.Plugin) mti := &tokenmocks.Plugin{} - - pool := &fftypes.TokenPool{ - ID: fftypes.NewUUID(), - TX: fftypes.TransactionRef{ - ID: fftypes.NewUUID(), - Type: fftypes.TransactionTypeTokenPool, + mbm := am.broadcast.(*broadcastmocks.Manager) + + poolID := fftypes.NewUUID() + txID := fftypes.NewUUID() + operations := []*fftypes.Operation{ + { + ID: fftypes.NewUUID(), + Input: fftypes.JSONObject{ + "id": poolID.String(), + "namespace": "test-ns", + "name": "my-pool", + }, }, - Namespace: "test-ns", - Name: "my-pool", } - mdi.On("GetTransactionByID", mock.Anything, pool.TX.ID).Return(nil, nil) + mti.On("Name").Return("mock-tokens") + mdi.On("GetOperations", am.ctx, mock.Anything).Return(operations, nil, nil) + mdi.On("GetTransactionByID", mock.Anything, txID).Return(nil, nil) mdi.On("UpsertTransaction", am.ctx, mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenPool }), false).Return(nil) - mdi.On("UpsertTokenPool", am.ctx, pool).Return(nil) - mdi.On("InsertEvent", am.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { - return ev.Type == fftypes.EventTypePoolConfirmed && ev.Reference == pool.ID && ev.Namespace == pool.Namespace - })).Return(nil) + mdi.On("UpsertOperation", am.ctx, mock.MatchedBy(func(op *fftypes.Operation) bool { + return op.Type == fftypes.OpTypeTokensAnnouncePool + }), false).Return(nil) + mbm.On("BroadcastTokenPool", am.ctx, "test-ns", mock.MatchedBy(func(pool *fftypes.TokenPoolAnnouncement) bool { + return pool.Namespace == "test-ns" && pool.Name == "my-pool" && *pool.ID == *poolID + }), false).Return(nil, nil) info := fftypes.JSONObject{"some": "info"} - err := am.TokenPoolCreated(mti, pool, "0x12345", "tx1", info) + err := am.TokenPoolCreated(mti, fftypes.TokenTypeFungible, txID, "123", "0x0", "tx1", info) assert.NoError(t, err) - mdi.AssertExpectations(t) -} - -func TestTokenPoolMissingID(t *testing.T) { - am, cancel := newTestAssets(t) - defer cancel() - mdi := am.database.(*databasemocks.Plugin) - mti := &tokenmocks.Plugin{} - - pool := &fftypes.TokenPool{} - - mdi.On("InsertEvent", am.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { - return ev.Type == fftypes.EventTypePoolRejected && ev.Reference == pool.ID && ev.Namespace == pool.Namespace - })).Return(nil) - info := fftypes.JSONObject{"some": "info"} - err := am.TokenPoolCreated(mti, pool, "0x12345", "tx1", info) - assert.NoError(t, err) mdi.AssertExpectations(t) + mbm.AssertExpectations(t) } -func TestTokenPoolBadNamespace(t *testing.T) { +func TestTokenPoolCreatedOpNotFound(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() mdi := am.database.(*databasemocks.Plugin) mti := &tokenmocks.Plugin{} + mbm := am.broadcast.(*broadcastmocks.Manager) - pool := &fftypes.TokenPool{ - ID: fftypes.NewUUID(), - TX: fftypes.TransactionRef{ - ID: fftypes.NewUUID(), - Type: fftypes.TransactionTypeTokenPool, - }, - } + txID := fftypes.NewUUID() + operations := []*fftypes.Operation{} - mdi.On("InsertEvent", am.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { - return ev.Type == fftypes.EventTypePoolRejected && ev.Reference == pool.ID && ev.Namespace == pool.Namespace - })).Return(nil) + mti.On("Name").Return("mock-tokens") + mdi.On("GetOperations", am.ctx, mock.Anything).Return(operations, nil, nil) info := fftypes.JSONObject{"some": "info"} - err := am.TokenPoolCreated(mti, pool, "0x12345", "tx1", info) + err := am.TokenPoolCreated(mti, fftypes.TokenTypeFungible, txID, "123", "0x0", "tx1", info) assert.NoError(t, err) + mdi.AssertExpectations(t) + mbm.AssertExpectations(t) } -func TestTokenPoolBadName(t *testing.T) { +func TestTokenPoolMissingID(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() mdi := am.database.(*databasemocks.Plugin) mti := &tokenmocks.Plugin{} + mbm := am.broadcast.(*broadcastmocks.Manager) - pool := &fftypes.TokenPool{ - ID: fftypes.NewUUID(), - TX: fftypes.TransactionRef{ - ID: fftypes.NewUUID(), - Type: fftypes.TransactionTypeTokenPool, + txID := fftypes.NewUUID() + operations := []*fftypes.Operation{ + { + ID: fftypes.NewUUID(), + Input: fftypes.JSONObject{}, }, - Namespace: "test-ns", } - mdi.On("GetTransactionByID", mock.Anything, pool.TX.ID).Return(nil, nil) - mdi.On("UpsertTransaction", am.ctx, mock.MatchedBy(func(tx *fftypes.Transaction) bool { - return tx.Subject.Type == fftypes.TransactionTypeTokenPool - }), false).Return(nil) - mdi.On("InsertEvent", am.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { - return ev.Type == fftypes.EventTypePoolRejected && ev.Reference == pool.ID && ev.Namespace == pool.Namespace - })).Return(nil) + mti.On("Name").Return("mock-tokens") + mdi.On("GetOperations", am.ctx, mock.Anything).Return(operations, nil, nil) info := fftypes.JSONObject{"some": "info"} - err := am.TokenPoolCreated(mti, pool, "0x12345", "tx1", info) + err := am.TokenPoolCreated(mti, fftypes.TokenTypeFungible, txID, "123", "0x0", "tx1", info) assert.NoError(t, err) - mdi.AssertExpectations(t) -} - -func TestTokenPoolGetTransactionFail(t *testing.T) { - am, cancel := newTestAssets(t) - defer cancel() - mdi := am.database.(*databasemocks.Plugin) - pool := &fftypes.TokenPool{ - ID: fftypes.NewUUID(), - TX: fftypes.TransactionRef{ - ID: fftypes.NewUUID(), - Type: fftypes.TransactionTypeTokenPool, - }, - Namespace: "test-ns", - Name: "my-pool", - } - - mdi.On("GetTransactionByID", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("pop")) - - info := fftypes.JSONObject{"some": "info"} - valid, err := am.persistTokenPoolTransaction(am.ctx, pool, "0x12345", "tx1", info) - assert.EqualError(t, err, "pop") - assert.False(t, valid) mdi.AssertExpectations(t) + mbm.AssertExpectations(t) } -func TestTokenPoolGetTransactionInvalidMatch(t *testing.T) { +func TestTokenPoolCreatedMissingNamespace(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() mdi := am.database.(*databasemocks.Plugin) - - pool := &fftypes.TokenPool{ - ID: fftypes.NewUUID(), - TX: fftypes.TransactionRef{ - ID: fftypes.NewUUID(), - Type: fftypes.TransactionTypeTokenPool, + mti := &tokenmocks.Plugin{} + mbm := am.broadcast.(*broadcastmocks.Manager) + + poolID := fftypes.NewUUID() + txID := fftypes.NewUUID() + operations := []*fftypes.Operation{ + { + ID: fftypes.NewUUID(), + Input: fftypes.JSONObject{ + "id": poolID.String(), + }, }, - Namespace: "test-ns", - Name: "my-pool", } - mdi.On("GetTransactionByID", mock.Anything, mock.Anything).Return(&fftypes.Transaction{ - ID: fftypes.NewUUID(), // wrong - }, nil) + mti.On("Name").Return("mock-tokens") + mdi.On("GetOperations", am.ctx, mock.Anything).Return(operations, nil, nil) info := fftypes.JSONObject{"some": "info"} - valid, err := am.persistTokenPoolTransaction(am.ctx, pool, "0x12345", "tx1", info) + err := am.TokenPoolCreated(mti, fftypes.TokenTypeFungible, txID, "123", "0x0", "tx1", info) assert.NoError(t, err) - assert.False(t, valid) - mdi.AssertExpectations(t) -} -func TestTokenPoolNewTXUpsertFail(t *testing.T) { - am, cancel := newTestAssets(t) - defer cancel() - mdi := am.database.(*databasemocks.Plugin) - - pool := &fftypes.TokenPool{ - ID: fftypes.NewUUID(), - TX: fftypes.TransactionRef{ - ID: fftypes.NewUUID(), - Type: fftypes.TransactionTypeTokenPool, - }, - Namespace: "test-ns", - Name: "my-pool", - } - - mdi.On("GetTransactionByID", mock.Anything, mock.Anything).Return(nil, nil) - mdi.On("UpsertTransaction", mock.Anything, mock.Anything, false).Return(fmt.Errorf("pop")) - - info := fftypes.JSONObject{"some": "info"} - valid, err := am.persistTokenPoolTransaction(am.ctx, pool, "0x12345", "tx1", info) - assert.EqualError(t, err, "pop") - assert.False(t, valid) mdi.AssertExpectations(t) + mbm.AssertExpectations(t) } -func TestTokenPoolExistingTXHashMismatch(t *testing.T) { +func TestTokenPoolCreatedUpsertFail(t *testing.T) { am, cancel := newTestAssets(t) defer cancel() mdi := am.database.(*databasemocks.Plugin) - - pool := &fftypes.TokenPool{ - ID: fftypes.NewUUID(), - TX: fftypes.TransactionRef{ - ID: fftypes.NewUUID(), - Type: fftypes.TransactionTypeTokenPool, - }, - Namespace: "test-ns", - Name: "my-pool", - } - - mdi.On("GetTransactionByID", mock.Anything, mock.Anything).Return(&fftypes.Transaction{ - Subject: fftypes.TransactionSubject{ - Type: fftypes.TransactionTypeTokenPool, - Namespace: pool.Namespace, - Signer: "0x12345", - Reference: pool.ID, - }, - }, nil) - mdi.On("UpsertTransaction", mock.Anything, mock.Anything, false).Return(database.HashMismatch) - - info := fftypes.JSONObject{"some": "info"} - valid, err := am.persistTokenPoolTransaction(am.ctx, pool, "0x12345", "tx1", info) - assert.NoError(t, err) - assert.False(t, valid) - mdi.AssertExpectations(t) -} - -func TestTokenPoolIDMismatch(t *testing.T) { - em, cancel := newTestAssets(t) - defer cancel() - mdi := em.database.(*databasemocks.Plugin) mti := &tokenmocks.Plugin{} - - pool := &fftypes.TokenPool{ - ID: fftypes.NewUUID(), - TX: fftypes.TransactionRef{ - ID: fftypes.NewUUID(), - Type: fftypes.TransactionTypeTokenPool, + mbm := am.broadcast.(*broadcastmocks.Manager) + + poolID := fftypes.NewUUID() + txID := fftypes.NewUUID() + operations := []*fftypes.Operation{ + { + ID: fftypes.NewUUID(), + Input: fftypes.JSONObject{ + "id": poolID.String(), + "namespace": "test-ns", + "name": "my-pool", + }, }, - Namespace: "test-ns", - Name: "my-pool", } - mdi.On("GetTransactionByID", mock.Anything, pool.TX.ID).Return(nil, nil) - mdi.On("UpsertTransaction", em.ctx, mock.MatchedBy(func(tx *fftypes.Transaction) bool { + mti.On("Name").Return("mock-tokens") + mdi.On("GetOperations", am.ctx, mock.Anything).Return(operations, nil, nil) + mdi.On("GetTransactionByID", mock.Anything, txID).Return(nil, nil) + mdi.On("UpsertTransaction", am.ctx, mock.MatchedBy(func(tx *fftypes.Transaction) bool { return tx.Subject.Type == fftypes.TransactionTypeTokenPool - }), false).Return(nil) - mdi.On("UpsertTokenPool", em.ctx, pool).Return(database.IDMismatch) - mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { - return ev.Type == fftypes.EventTypePoolRejected && ev.Reference == pool.ID && ev.Namespace == pool.Namespace - })).Return(nil) + }), false).Return(database.HashMismatch) info := fftypes.JSONObject{"some": "info"} - err := em.TokenPoolCreated(mti, pool, "0x12345", "tx1", info) + err := am.TokenPoolCreated(mti, fftypes.TokenTypeFungible, txID, "123", "0x0", "tx1", info) assert.NoError(t, err) - mdi.AssertExpectations(t) -} -func TestTokenPoolUpsertFailAndRetry(t *testing.T) { - em, cancel := newTestAssets(t) - defer cancel() - mdi := em.database.(*databasemocks.Plugin) - mti := &tokenmocks.Plugin{} - - pool := &fftypes.TokenPool{ - ID: fftypes.NewUUID(), - TX: fftypes.TransactionRef{ - ID: fftypes.NewUUID(), - Type: fftypes.TransactionTypeTokenPool, - }, - Namespace: "test-ns", - Name: "my-pool", - } - - mdi.On("GetTransactionByID", mock.Anything, pool.TX.ID).Return(nil, nil) - mdi.On("UpsertTransaction", mock.Anything, mock.Anything, false).Return(nil) - mdi.On("UpsertTokenPool", em.ctx, pool).Return(fmt.Errorf("pop")).Once() - mdi.On("UpsertTokenPool", em.ctx, pool).Return(nil).Once() - mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *fftypes.Event) bool { - return ev.Type == fftypes.EventTypePoolConfirmed && ev.Reference == pool.ID && ev.Namespace == pool.Namespace - })).Return(nil) - - info := fftypes.JSONObject{"some": "info"} - err := em.TokenPoolCreated(mti, pool, "0x12345", "tx1", info) - assert.NoError(t, err) mdi.AssertExpectations(t) + mbm.AssertExpectations(t) } diff --git a/internal/broadcast/manager.go b/internal/broadcast/manager.go index 36aaf8faa..bf2f8dcc2 100644 --- a/internal/broadcast/manager.go +++ b/internal/broadcast/manager.go @@ -41,6 +41,7 @@ type Manager interface { BroadcastMessage(ctx context.Context, ns string, in *fftypes.MessageInOut, waitConfirm bool) (out *fftypes.Message, err error) BroadcastMessageWithID(ctx context.Context, ns string, id *fftypes.UUID, unresolved *fftypes.MessageInOut, resolved *fftypes.Message, waitConfirm bool) (out *fftypes.Message, err error) BroadcastDefinition(ctx context.Context, def fftypes.Definition, signingIdentity *fftypes.Identity, tag fftypes.SystemTag, waitConfirm bool) (msg *fftypes.Message, err error) + BroadcastTokenPool(ctx context.Context, ns string, pool *fftypes.TokenPoolAnnouncement, waitConfirm bool) (msg *fftypes.Message, err error) GetNodeSigningIdentity(ctx context.Context) (*fftypes.Identity, error) Start() error WaitStop() diff --git a/internal/broadcast/tokenpool.go b/internal/broadcast/tokenpool.go new file mode 100644 index 000000000..43fe8d319 --- /dev/null +++ b/internal/broadcast/tokenpool.go @@ -0,0 +1,38 @@ +// Copyright © 2021 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package broadcast + +import ( + "context" + + "github.com/hyperledger/firefly/pkg/fftypes" +) + +func (bm *broadcastManager) BroadcastTokenPool(ctx context.Context, ns string, pool *fftypes.TokenPoolAnnouncement, waitConfirm bool) (msg *fftypes.Message, err error) { + if err := pool.Validate(ctx, false); err != nil { + return nil, err + } + if err := bm.data.VerifyNamespaceExists(ctx, pool.Namespace); err != nil { + return nil, err + } + + msg, err = bm.broadcastDefinitionAsNode(ctx, pool, fftypes.SystemTagDefinePool, waitConfirm) + if msg != nil { + pool.Message = msg.Header.ID + } + return msg, err +} diff --git a/internal/broadcast/tokenpool_test.go b/internal/broadcast/tokenpool_test.go new file mode 100644 index 000000000..a9ede71cc --- /dev/null +++ b/internal/broadcast/tokenpool_test.go @@ -0,0 +1,137 @@ +// Copyright © 2021 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package broadcast + +import ( + "context" + "fmt" + "testing" + + "github.com/hyperledger/firefly/mocks/databasemocks" + "github.com/hyperledger/firefly/mocks/datamocks" + "github.com/hyperledger/firefly/pkg/fftypes" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestBroadcastTokenPoolNSGetFail(t *testing.T) { + bm, cancel := newTestBroadcast(t) + defer cancel() + mdm := bm.data.(*datamocks.Manager) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "mypool", + Type: fftypes.TokenTypeNonFungible, + ProtocolID: "N1", + Symbol: "COIN", + }, + ProtocolTxID: "tx123", + } + + mdm.On("VerifyNamespaceExists", mock.Anything, "ns1").Return(fmt.Errorf("pop")) + + _, err := bm.BroadcastTokenPool(context.Background(), "ns1", pool, false) + assert.EqualError(t, err, "pop") + + mdm.AssertExpectations(t) +} + +func TestBroadcastTokenPoolInvalid(t *testing.T) { + bm, cancel := newTestBroadcast(t) + defer cancel() + mdi := bm.database.(*databasemocks.Plugin) + mdm := bm.data.(*datamocks.Manager) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "", + Name: "", + Type: fftypes.TokenTypeNonFungible, + ProtocolID: "N1", + Symbol: "COIN", + }, + ProtocolTxID: "tx123", + } + + _, err := bm.BroadcastTokenPool(context.Background(), "ns1", pool, false) + assert.Regexp(t, "FF10131", err) + + mdi.AssertExpectations(t) + mdm.AssertExpectations(t) +} + +func TestBroadcastTokenPoolBroadcastFail(t *testing.T) { + bm, cancel := newTestBroadcast(t) + defer cancel() + mdi := bm.database.(*databasemocks.Plugin) + mdm := bm.data.(*datamocks.Manager) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "mypool", + Type: fftypes.TokenTypeNonFungible, + ProtocolID: "N1", + Symbol: "COIN", + }, + ProtocolTxID: "tx123", + } + + mdm.On("VerifyNamespaceExists", mock.Anything, "ns1").Return(nil) + mdi.On("UpsertData", mock.Anything, mock.Anything, true, false).Return(nil) + mdi.On("InsertMessageLocal", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) + + _, err := bm.BroadcastTokenPool(context.Background(), "ns1", pool, false) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) + mdm.AssertExpectations(t) +} + +func TestBroadcastTokenPoolOk(t *testing.T) { + bm, cancel := newTestBroadcast(t) + defer cancel() + mdi := bm.database.(*databasemocks.Plugin) + mdm := bm.data.(*datamocks.Manager) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "mypool", + Type: fftypes.TokenTypeNonFungible, + ProtocolID: "N1", + Symbol: "COIN", + }, + ProtocolTxID: "tx123", + } + + mdm.On("VerifyNamespaceExists", mock.Anything, "ns1").Return(nil) + mdi.On("UpsertData", mock.Anything, mock.Anything, true, false).Return(nil) + mdi.On("InsertMessageLocal", mock.Anything, mock.Anything).Return(nil) + + _, err := bm.BroadcastTokenPool(context.Background(), "ns1", pool, false) + assert.NoError(t, err) + + mdi.AssertExpectations(t) + mdm.AssertExpectations(t) +} diff --git a/internal/orchestrator/bound_callbacks.go b/internal/orchestrator/bound_callbacks.go index 35d8d04d9..8db638655 100644 --- a/internal/orchestrator/bound_callbacks.go +++ b/internal/orchestrator/bound_callbacks.go @@ -18,6 +18,7 @@ package orchestrator import ( "github.com/hyperledger/firefly/internal/assets" + "github.com/hyperledger/firefly/internal/events" "github.com/hyperledger/firefly/pkg/blockchain" "github.com/hyperledger/firefly/pkg/dataexchange" @@ -56,6 +57,6 @@ func (bc *boundCallbacks) MessageReceived(peerID string, data []byte) error { return bc.ei.MessageReceived(bc.dx, peerID, data) } -func (bc *boundCallbacks) TokenPoolCreated(plugin tokens.Plugin, pool *fftypes.TokenPool, signingIdentity string, protocolTxID string, additionalInfo fftypes.JSONObject) error { - return bc.am.TokenPoolCreated(plugin, pool, signingIdentity, protocolTxID, additionalInfo) +func (bc *boundCallbacks) TokenPoolCreated(plugin tokens.Plugin, tokenType fftypes.TokenType, tx *fftypes.UUID, protocolID, signingIdentity, protocolTxID string, additionalInfo fftypes.JSONObject) error { + return bc.am.TokenPoolCreated(plugin, tokenType, tx, protocolID, signingIdentity, protocolTxID, additionalInfo) } diff --git a/internal/orchestrator/bound_callbacks_test.go b/internal/orchestrator/bound_callbacks_test.go index 893a5f3e5..388a1516e 100644 --- a/internal/orchestrator/bound_callbacks_test.go +++ b/internal/orchestrator/bound_callbacks_test.go @@ -40,9 +40,9 @@ func TestBoundCallbacks(t *testing.T) { info := fftypes.JSONObject{"hello": "world"} batch := &blockchain.BatchPin{TransactionID: fftypes.NewUUID()} - pool := &fftypes.TokenPool{} hash := fftypes.NewRandB32() opID := fftypes.NewUUID() + txID := fftypes.NewUUID() mei.On("BatchPinComplete", mbi, batch, "0x12345", "tx12345", info).Return(fmt.Errorf("pop")) err := bc.BatchPinComplete(batch, "0x12345", "tx12345", info) @@ -68,7 +68,7 @@ func TestBoundCallbacks(t *testing.T) { err = bc.MessageReceived("peer1", []byte{}) assert.EqualError(t, err, "pop") - mam.On("TokenPoolCreated", mti, pool, "0x12345", "tx12345", info).Return(fmt.Errorf("pop")) - err = bc.TokenPoolCreated(mti, pool, "0x12345", "tx12345", info) + mam.On("TokenPoolCreated", mti, fftypes.TokenTypeFungible, txID, "123", "0x12345", "tx12345", info).Return(fmt.Errorf("pop")) + err = bc.TokenPoolCreated(mti, fftypes.TokenTypeFungible, txID, "123", "0x12345", "tx12345", info) assert.EqualError(t, err, "pop") } diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index ee0e4ab66..81a8eeb67 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -391,7 +391,7 @@ func (or *orchestrator) initComponents(ctx context.Context) (err error) { } } - or.syshandlers = syshandlers.NewSystemHandlers(or.database, or.identity, or.dataexchange, or.data, or.broadcast, or.messaging) + or.syshandlers = syshandlers.NewSystemHandlers(or.database, or.identity, or.dataexchange, or.data, or.broadcast, or.messaging, or.assets) if or.events == nil { or.events, err = events.NewEventManager(ctx, or.publicstorage, or.database, or.identity, or.syshandlers, or.data) @@ -410,7 +410,7 @@ func (or *orchestrator) initComponents(ctx context.Context) (err error) { or.syncasync.Init(or.events) if or.assets == nil { - or.assets, err = assets.NewAssetManager(ctx, or.database, or.identity, or.data, or.syncasync, or.tokens) + or.assets, err = assets.NewAssetManager(ctx, or.database, or.identity, or.data, or.syncasync, or.broadcast, or.tokens) if err != nil { return err } diff --git a/internal/syshandlers/syshandler.go b/internal/syshandlers/syshandler.go index 6386ac9da..5aaa71e3b 100644 --- a/internal/syshandlers/syshandler.go +++ b/internal/syshandlers/syshandler.go @@ -20,10 +20,12 @@ import ( "context" "encoding/json" + "github.com/hyperledger/firefly/internal/assets" "github.com/hyperledger/firefly/internal/broadcast" "github.com/hyperledger/firefly/internal/data" "github.com/hyperledger/firefly/internal/log" "github.com/hyperledger/firefly/internal/privatemessaging" + "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/pkg/database" "github.com/hyperledger/firefly/pkg/dataexchange" "github.com/hyperledger/firefly/pkg/fftypes" @@ -45,9 +47,11 @@ type systemHandlers struct { data data.Manager broadcast broadcast.Manager messaging privatemessaging.Manager + assets assets.Manager + txhelper txcommon.Helper } -func NewSystemHandlers(di database.Plugin, ii identity.Plugin, dx dataexchange.Plugin, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager) SystemHandlers { +func NewSystemHandlers(di database.Plugin, ii identity.Plugin, dx dataexchange.Plugin, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager, am assets.Manager) SystemHandlers { return &systemHandlers{ database: di, identity: ii, @@ -55,6 +59,8 @@ func NewSystemHandlers(di database.Plugin, ii identity.Plugin, dx dataexchange.P data: dm, broadcast: bm, messaging: pm, + assets: am, + txhelper: txcommon.NewTransactionHelper(di), } } @@ -86,6 +92,8 @@ func (sh *systemHandlers) HandleSystemBroadcast(ctx context.Context, msg *fftype return sh.handleOrganizationBroadcast(ctx, msg, data) case fftypes.SystemTagDefineNode: return sh.handleNodeBroadcast(ctx, msg, data) + case fftypes.SystemTagDefinePool: + return sh.handleTokenPoolBroadcast(ctx, msg, data) default: l.Warnf("Unknown topic '%s' for system broadcast ID '%s'", msg.Header.Tag, msg.Header.ID) } diff --git a/internal/syshandlers/syshandler_test.go b/internal/syshandlers/syshandler_test.go index 17546b071..e913f973b 100644 --- a/internal/syshandlers/syshandler_test.go +++ b/internal/syshandlers/syshandler_test.go @@ -20,6 +20,7 @@ import ( "context" "testing" + "github.com/hyperledger/firefly/mocks/assetmocks" "github.com/hyperledger/firefly/mocks/broadcastmocks" "github.com/hyperledger/firefly/mocks/databasemocks" "github.com/hyperledger/firefly/mocks/dataexchangemocks" @@ -38,7 +39,8 @@ func newTestSystemHandlers(t *testing.T) *systemHandlers { mdm := &datamocks.Manager{} mbm := &broadcastmocks.Manager{} mpm := &privatemessagingmocks.Manager{} - return NewSystemHandlers(mdi, mii, mdx, mdm, mbm, mpm).(*systemHandlers) + mam := &assetmocks.Manager{} + return NewSystemHandlers(mdi, mii, mdx, mdm, mbm, mpm, mam).(*systemHandlers) } func TestHandleSystemBroadcastUnknown(t *testing.T) { diff --git a/internal/syshandlers/syshandler_tokenpool.go b/internal/syshandlers/syshandler_tokenpool.go new file mode 100644 index 000000000..9f509f650 --- /dev/null +++ b/internal/syshandlers/syshandler_tokenpool.go @@ -0,0 +1,128 @@ +// Copyright © 2021 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syshandlers + +import ( + "context" + + "github.com/hyperledger/firefly/internal/log" + "github.com/hyperledger/firefly/pkg/database" + "github.com/hyperledger/firefly/pkg/fftypes" +) + +func (sh *systemHandlers) persistTokenPool(ctx context.Context, pool *fftypes.TokenPoolAnnouncement) (valid bool, err error) { + // Find a matching operation within this transaction + fb := database.OperationQueryFactory.NewFilter(ctx) + filter := fb.And( + fb.Eq("tx", pool.TX.ID), + fb.Eq("type", fftypes.OpTypeTokensAnnouncePool), + ) + operations, _, err := sh.database.GetOperations(ctx, filter) + if err != nil { + return false, err // retryable + } + + if len(operations) > 0 { + // Mark announce operation completed + update := database.OperationQueryFactory.NewUpdate(ctx). + Set("status", fftypes.OpStatusSucceeded) + if err := sh.database.UpdateOperation(ctx, operations[0].ID, update); err != nil { + return false, err // retryable + } + + // Validate received info matches the database + transaction, err := sh.database.GetTransactionByID(ctx, pool.TX.ID) + if err != nil { + return false, err // retryable + } + if transaction.ProtocolID != pool.ProtocolTxID { + log.L(ctx).Warnf("Ignoring token pool from transaction '%s' - unexpected protocol ID '%s'", pool.TX.ID, pool.ProtocolTxID) + return false, nil // not retryable + } + + // Mark transaction completed + transaction.Status = fftypes.OpStatusSucceeded + err = sh.database.UpsertTransaction(ctx, transaction, false) + if err != nil { + return false, err // retryable + } + } else { + // No local announce operation found (broadcast originated from another node) + log.L(ctx).Infof("Validating token pool transaction '%s' with protocol ID '%s'", pool.TX.ID, pool.ProtocolTxID) + err = sh.assets.ValidateTokenPoolTx(ctx, &pool.TokenPool, pool.ProtocolTxID) + if err != nil { + log.L(ctx).Errorf("Failed to validate token pool transaction '%s': %v", pool.TX.ID, err) + return false, err // retryable + } + transaction := &fftypes.Transaction{ + ID: pool.TX.ID, + Status: fftypes.OpStatusSucceeded, + Subject: fftypes.TransactionSubject{ + Namespace: pool.Namespace, + Type: fftypes.TransactionTypeTokenPool, + Signer: pool.Author, + Reference: pool.ID, + }, + ProtocolID: pool.ProtocolTxID, + } + valid, err = sh.txhelper.PersistTransaction(ctx, transaction) + if !valid || err != nil { + return valid, err + } + } + + err = sh.database.UpsertTokenPool(ctx, &pool.TokenPool) + if err != nil { + if err == database.IDMismatch { + log.L(ctx).Errorf("Invalid token pool '%s'. ID mismatch with existing record", pool.ID) + return false, nil // not retryable + } + log.L(ctx).Errorf("Failed to insert token pool '%s': %s", pool.ID, err) + return false, err // retryable + } + return true, nil +} + +func (sh *systemHandlers) handleTokenPoolBroadcast(ctx context.Context, msg *fftypes.Message, data []*fftypes.Data) (valid bool, err error) { + l := log.L(ctx) + + var pool fftypes.TokenPoolAnnouncement + valid = sh.getSystemBroadcastPayload(ctx, msg, data, &pool) + if valid { + if err = pool.Validate(ctx, true); err != nil { + l.Warnf("Unable to process token pool broadcast %s - validate failed: %s", msg.Header.ID, err) + valid = false + } else { + pool.Message = msg.Header.ID + valid, err = sh.persistTokenPool(ctx, &pool) + if err != nil { + return valid, err + } + } + } + + var event *fftypes.Event + if valid { + l.Infof("Token pool created id=%s author=%s", pool.ID, msg.Header.Author) + event = fftypes.NewEvent(fftypes.EventTypePoolConfirmed, pool.Namespace, pool.ID) + } else { + l.Warnf("Token pool rejected id=%s author=%s", pool.ID, msg.Header.Author) + event = fftypes.NewEvent(fftypes.EventTypePoolRejected, pool.Namespace, pool.ID) + } + err = sh.database.InsertEvent(ctx, event) + return valid, err +} diff --git a/internal/syshandlers/syshandler_tokenpool_test.go b/internal/syshandlers/syshandler_tokenpool_test.go new file mode 100644 index 000000000..b316996d4 --- /dev/null +++ b/internal/syshandlers/syshandler_tokenpool_test.go @@ -0,0 +1,601 @@ +// Copyright © 2021 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syshandlers + +import ( + "context" + "encoding/json" + "fmt" + "testing" + + "github.com/hyperledger/firefly/mocks/assetmocks" + "github.com/hyperledger/firefly/mocks/databasemocks" + "github.com/hyperledger/firefly/pkg/database" + "github.com/hyperledger/firefly/pkg/fftypes" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestHandleSystemBroadcastTokenPoolSelfOk(t *testing.T) { + sh := newTestSystemHandlers(t) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "name1", + Type: fftypes.TokenTypeFungible, + ProtocolID: "12345", + Symbol: "COIN", + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenPool, + ID: fftypes.NewUUID(), + }, + }, + ProtocolTxID: "tx123", + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Tag: string(fftypes.SystemTagDefinePool), + }, + } + b, err := json.Marshal(&pool) + assert.NoError(t, err) + data := []*fftypes.Data{{ + Value: fftypes.Byteable(b), + }} + opID := fftypes.NewUUID() + operations := []*fftypes.Operation{{ID: opID}} + tx := &fftypes.Transaction{ProtocolID: "tx123"} + + mdi := sh.database.(*databasemocks.Plugin) + mdi.On("GetOperations", context.Background(), mock.Anything).Return(operations, nil, nil) + mdi.On("UpdateOperation", context.Background(), opID, mock.Anything).Return(nil) + mdi.On("GetTransactionByID", context.Background(), pool.TX.ID).Return(tx, nil) + mdi.On("UpsertTransaction", context.Background(), tx, false).Return(nil) + mdi.On("UpsertTokenPool", context.Background(), mock.MatchedBy(func(p *fftypes.TokenPool) bool { + return *p.ID == *pool.ID && p.Message == msg.Header.ID + })).Return(nil) + mdi.On("InsertEvent", context.Background(), mock.MatchedBy(func(event *fftypes.Event) bool { + return *event.Reference == *pool.ID && event.Namespace == pool.Namespace && event.Type == fftypes.EventTypePoolConfirmed + })).Return(nil) + + valid, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + assert.True(t, valid) + assert.NoError(t, err) + assert.Equal(t, fftypes.OpStatusSucceeded, tx.Status) + + mdi.AssertExpectations(t) +} + +func TestHandleSystemBroadcastTokenPoolSelfUpdateOpFail(t *testing.T) { + sh := newTestSystemHandlers(t) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "name1", + Type: fftypes.TokenTypeFungible, + ProtocolID: "12345", + Symbol: "COIN", + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenPool, + ID: fftypes.NewUUID(), + }, + }, + ProtocolTxID: "tx123", + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Tag: string(fftypes.SystemTagDefinePool), + }, + } + b, err := json.Marshal(&pool) + assert.NoError(t, err) + data := []*fftypes.Data{{ + Value: fftypes.Byteable(b), + }} + opID := fftypes.NewUUID() + operations := []*fftypes.Operation{{ID: opID}} + + mdi := sh.database.(*databasemocks.Plugin) + mdi.On("GetOperations", context.Background(), mock.Anything).Return(operations, nil, nil) + mdi.On("UpdateOperation", context.Background(), opID, mock.Anything).Return(fmt.Errorf("pop")) + + valid, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + assert.False(t, valid) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) +} + +func TestHandleSystemBroadcastTokenPoolSelfGetTXFail(t *testing.T) { + sh := newTestSystemHandlers(t) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "name1", + Type: fftypes.TokenTypeFungible, + ProtocolID: "12345", + Symbol: "COIN", + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenPool, + ID: fftypes.NewUUID(), + }, + }, + ProtocolTxID: "tx123", + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Tag: string(fftypes.SystemTagDefinePool), + }, + } + b, err := json.Marshal(&pool) + assert.NoError(t, err) + data := []*fftypes.Data{{ + Value: fftypes.Byteable(b), + }} + opID := fftypes.NewUUID() + operations := []*fftypes.Operation{{ID: opID}} + + mdi := sh.database.(*databasemocks.Plugin) + mdi.On("GetOperations", context.Background(), mock.Anything).Return(operations, nil, nil) + mdi.On("UpdateOperation", context.Background(), opID, mock.Anything).Return(nil) + mdi.On("GetTransactionByID", context.Background(), pool.TX.ID).Return(nil, fmt.Errorf("pop")) + + valid, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + assert.False(t, valid) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) +} + +func TestHandleSystemBroadcastTokenPoolSelfTXMismatch(t *testing.T) { + sh := newTestSystemHandlers(t) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "name1", + Type: fftypes.TokenTypeFungible, + ProtocolID: "12345", + Symbol: "COIN", + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenPool, + ID: fftypes.NewUUID(), + }, + }, + ProtocolTxID: "tx123", + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Tag: string(fftypes.SystemTagDefinePool), + }, + } + b, err := json.Marshal(&pool) + assert.NoError(t, err) + data := []*fftypes.Data{{ + Value: fftypes.Byteable(b), + }} + opID := fftypes.NewUUID() + operations := []*fftypes.Operation{{ID: opID}} + tx := &fftypes.Transaction{ProtocolID: "bad"} + + mdi := sh.database.(*databasemocks.Plugin) + mdi.On("GetOperations", context.Background(), mock.Anything).Return(operations, nil, nil) + mdi.On("UpdateOperation", context.Background(), opID, mock.Anything).Return(nil) + mdi.On("GetTransactionByID", context.Background(), pool.TX.ID).Return(tx, nil) + mdi.On("InsertEvent", context.Background(), mock.MatchedBy(func(event *fftypes.Event) bool { + return *event.Reference == *pool.ID && event.Namespace == pool.Namespace && event.Type == fftypes.EventTypePoolRejected + })).Return(nil) + + valid, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + assert.False(t, valid) + assert.NoError(t, err) + + mdi.AssertExpectations(t) +} + +func TestHandleSystemBroadcastTokenPoolSelfUpdateTXFail(t *testing.T) { + sh := newTestSystemHandlers(t) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "name1", + Type: fftypes.TokenTypeFungible, + ProtocolID: "12345", + Symbol: "COIN", + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenPool, + ID: fftypes.NewUUID(), + }, + }, + ProtocolTxID: "tx123", + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Tag: string(fftypes.SystemTagDefinePool), + }, + } + b, err := json.Marshal(&pool) + assert.NoError(t, err) + data := []*fftypes.Data{{ + Value: fftypes.Byteable(b), + }} + opID := fftypes.NewUUID() + operations := []*fftypes.Operation{{ID: opID}} + tx := &fftypes.Transaction{ProtocolID: "tx123"} + + mdi := sh.database.(*databasemocks.Plugin) + mdi.On("GetOperations", context.Background(), mock.Anything).Return(operations, nil, nil) + mdi.On("UpdateOperation", context.Background(), opID, mock.Anything).Return(nil) + mdi.On("GetTransactionByID", context.Background(), pool.TX.ID).Return(tx, nil) + mdi.On("UpsertTransaction", context.Background(), tx, false).Return(fmt.Errorf("pop")) + + valid, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + assert.False(t, valid) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) +} + +func TestHandleSystemBroadcastTokenPoolOk(t *testing.T) { + sh := newTestSystemHandlers(t) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "name1", + Type: fftypes.TokenTypeFungible, + ProtocolID: "12345", + Symbol: "COIN", + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenPool, + ID: fftypes.NewUUID(), + }, + }, + ProtocolTxID: "tx123", + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Tag: string(fftypes.SystemTagDefinePool), + }, + } + b, err := json.Marshal(&pool) + assert.NoError(t, err) + data := []*fftypes.Data{{ + Value: fftypes.Byteable(b), + }} + operations := []*fftypes.Operation{} + + mdi := sh.database.(*databasemocks.Plugin) + mdi.On("GetOperations", context.Background(), mock.Anything).Return(operations, nil, nil) + mdi.On("GetTransactionByID", context.Background(), pool.TX.ID).Return(nil, nil) + mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(t *fftypes.Transaction) bool { + return t.Subject.Type == fftypes.TransactionTypeTokenPool && *t.Subject.Reference == *pool.ID + }), false).Return(nil) + mdi.On("UpsertTokenPool", context.Background(), mock.MatchedBy(func(p *fftypes.TokenPool) bool { + return *p.ID == *pool.ID && p.Message == msg.Header.ID + })).Return(nil) + mdi.On("InsertEvent", context.Background(), mock.MatchedBy(func(event *fftypes.Event) bool { + return *event.Reference == *pool.ID && event.Namespace == pool.Namespace && event.Type == fftypes.EventTypePoolConfirmed + })).Return(nil) + + mam := sh.assets.(*assetmocks.Manager) + mam.On("ValidateTokenPoolTx", context.Background(), mock.MatchedBy(func(p *fftypes.TokenPool) bool { + return *p.ID == *pool.ID + }), "tx123").Return(nil) + + valid, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + assert.True(t, valid) + assert.NoError(t, err) + + mdi.AssertExpectations(t) + mam.AssertExpectations(t) +} + +func TestHandleSystemBroadcastTokenPoolValidateTxFail(t *testing.T) { + sh := newTestSystemHandlers(t) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "name1", + Type: fftypes.TokenTypeFungible, + ProtocolID: "12345", + Symbol: "COIN", + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenPool, + ID: fftypes.NewUUID(), + }, + }, + ProtocolTxID: "tx123", + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Tag: string(fftypes.SystemTagDefinePool), + }, + } + b, err := json.Marshal(&pool) + assert.NoError(t, err) + data := []*fftypes.Data{{ + Value: fftypes.Byteable(b), + }} + operations := []*fftypes.Operation{} + + mdi := sh.database.(*databasemocks.Plugin) + mdi.On("GetOperations", context.Background(), mock.Anything).Return(operations, nil, nil) + + mam := sh.assets.(*assetmocks.Manager) + mam.On("ValidateTokenPoolTx", context.Background(), mock.MatchedBy(func(p *fftypes.TokenPool) bool { + return *p.ID == *pool.ID + }), "tx123").Return(fmt.Errorf("pop")) + + valid, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + assert.False(t, valid) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) + mam.AssertExpectations(t) +} + +func TestHandleSystemBroadcastTokenPoolBadTX(t *testing.T) { + sh := newTestSystemHandlers(t) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "name1", + Type: fftypes.TokenTypeFungible, + ProtocolID: "12345", + Symbol: "COIN", + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenPool, + ID: nil, + }, + }, + ProtocolTxID: "tx123", + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Tag: string(fftypes.SystemTagDefinePool), + }, + } + b, err := json.Marshal(&pool) + assert.NoError(t, err) + data := []*fftypes.Data{{ + Value: fftypes.Byteable(b), + }} + operations := []*fftypes.Operation{} + + mdi := sh.database.(*databasemocks.Plugin) + mdi.On("GetOperations", context.Background(), mock.Anything).Return(operations, nil, nil) + mdi.On("InsertEvent", context.Background(), mock.MatchedBy(func(event *fftypes.Event) bool { + return *event.Reference == *pool.ID && event.Namespace == pool.Namespace && event.Type == fftypes.EventTypePoolRejected + })).Return(nil) + + mam := sh.assets.(*assetmocks.Manager) + mam.On("ValidateTokenPoolTx", context.Background(), mock.MatchedBy(func(p *fftypes.TokenPool) bool { + return *p.ID == *pool.ID + }), "tx123").Return(nil) + + valid, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + assert.False(t, valid) + assert.NoError(t, err) + + mdi.AssertExpectations(t) + mam.AssertExpectations(t) +} + +func TestHandleSystemBroadcastTokenPoolIDMismatch(t *testing.T) { + sh := newTestSystemHandlers(t) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "name1", + Type: fftypes.TokenTypeFungible, + ProtocolID: "12345", + Symbol: "COIN", + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenPool, + ID: fftypes.NewUUID(), + }, + }, + ProtocolTxID: "tx123", + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Tag: string(fftypes.SystemTagDefinePool), + }, + } + b, err := json.Marshal(&pool) + assert.NoError(t, err) + data := []*fftypes.Data{{ + Value: fftypes.Byteable(b), + }} + operations := []*fftypes.Operation{} + + mdi := sh.database.(*databasemocks.Plugin) + mdi.On("GetOperations", context.Background(), mock.Anything).Return(operations, nil, nil) + mdi.On("GetTransactionByID", context.Background(), pool.TX.ID).Return(nil, nil) + mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(t *fftypes.Transaction) bool { + return t.Subject.Type == fftypes.TransactionTypeTokenPool && *t.Subject.Reference == *pool.ID + }), false).Return(nil) + mdi.On("UpsertTokenPool", context.Background(), mock.MatchedBy(func(p *fftypes.TokenPool) bool { + return *p.ID == *pool.ID && p.Message == msg.Header.ID + })).Return(database.IDMismatch) + mdi.On("InsertEvent", context.Background(), mock.MatchedBy(func(event *fftypes.Event) bool { + return *event.Reference == *pool.ID && event.Namespace == pool.Namespace && event.Type == fftypes.EventTypePoolRejected + })).Return(nil) + + mam := sh.assets.(*assetmocks.Manager) + mam.On("ValidateTokenPoolTx", context.Background(), mock.MatchedBy(func(p *fftypes.TokenPool) bool { + return *p.ID == *pool.ID + }), "tx123").Return(nil) + + valid, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + assert.False(t, valid) + assert.NoError(t, err) + + mdi.AssertExpectations(t) + mam.AssertExpectations(t) +} + +func TestHandleSystemBroadcastTokenPoolFailUpsert(t *testing.T) { + sh := newTestSystemHandlers(t) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "name1", + Type: fftypes.TokenTypeFungible, + ProtocolID: "12345", + Symbol: "COIN", + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenPool, + ID: fftypes.NewUUID(), + }, + }, + ProtocolTxID: "tx123", + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Tag: string(fftypes.SystemTagDefinePool), + }, + } + b, err := json.Marshal(&pool) + assert.NoError(t, err) + data := []*fftypes.Data{{ + Value: fftypes.Byteable(b), + }} + operations := []*fftypes.Operation{} + + mdi := sh.database.(*databasemocks.Plugin) + mdi.On("GetOperations", context.Background(), mock.Anything).Return(operations, nil, nil) + mdi.On("GetTransactionByID", context.Background(), pool.TX.ID).Return(nil, nil) + mdi.On("UpsertTransaction", context.Background(), mock.MatchedBy(func(t *fftypes.Transaction) bool { + return t.Subject.Type == fftypes.TransactionTypeTokenPool && *t.Subject.Reference == *pool.ID + }), false).Return(nil) + mdi.On("UpsertTokenPool", context.Background(), mock.MatchedBy(func(p *fftypes.TokenPool) bool { + return *p.ID == *pool.ID && p.Message == msg.Header.ID + })).Return(fmt.Errorf("pop")) + + mam := sh.assets.(*assetmocks.Manager) + mam.On("ValidateTokenPoolTx", context.Background(), mock.MatchedBy(func(p *fftypes.TokenPool) bool { + return *p.ID == *pool.ID + }), "tx123").Return(nil) + + valid, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + assert.False(t, valid) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) + mam.AssertExpectations(t) +} + +func TestHandleSystemBroadcastTokenPoolOpsFail(t *testing.T) { + sh := newTestSystemHandlers(t) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + Name: "name1", + Type: fftypes.TokenTypeFungible, + ProtocolID: "12345", + Symbol: "COIN", + TX: fftypes.TransactionRef{ + Type: fftypes.TransactionTypeTokenPool, + ID: fftypes.NewUUID(), + }, + }, + ProtocolTxID: "tx123", + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Tag: string(fftypes.SystemTagDefinePool), + }, + } + b, err := json.Marshal(&pool) + assert.NoError(t, err) + data := []*fftypes.Data{{ + Value: fftypes.Byteable(b), + }} + + mdi := sh.database.(*databasemocks.Plugin) + mdi.On("GetOperations", context.Background(), mock.Anything).Return(nil, nil, fmt.Errorf("pop")) + + valid, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + assert.False(t, valid) + assert.EqualError(t, err, "pop") + + mdi.AssertExpectations(t) +} + +func TestHandleSystemBroadcastTokenPoolValidateFail(t *testing.T) { + sh := newTestSystemHandlers(t) + + pool := &fftypes.TokenPoolAnnouncement{ + TokenPool: fftypes.TokenPool{}, + ProtocolTxID: "tx123", + } + msg := &fftypes.Message{ + Header: fftypes.MessageHeader{ + ID: fftypes.NewUUID(), + Tag: string(fftypes.SystemTagDefinePool), + }, + } + b, err := json.Marshal(&pool) + assert.NoError(t, err) + data := []*fftypes.Data{{ + Value: fftypes.Byteable(b), + }} + + mdi := sh.database.(*databasemocks.Plugin) + mdi.On("InsertEvent", context.Background(), mock.MatchedBy(func(event *fftypes.Event) bool { + return event.Type == fftypes.EventTypePoolRejected + })).Return(nil) + + valid, err := sh.HandleSystemBroadcast(context.Background(), msg, data) + assert.False(t, valid) + assert.NoError(t, err) + + mdi.AssertExpectations(t) +} diff --git a/internal/tokens/fftokens/fftokens.go b/internal/tokens/fftokens/fftokens.go index d6d97408b..743578c8e 100644 --- a/internal/tokens/fftokens/fftokens.go +++ b/internal/tokens/fftokens/fftokens.go @@ -53,16 +53,9 @@ const ( ) type createPool struct { - Type fftypes.TokenType `json:"type"` - RequestID string `json:"requestId"` - Data string `json:"data"` -} - -type createPoolData struct { - Namespace string `json:"namespace"` - Name string `json:"name"` - ID *fftypes.UUID `json:"id"` - TransactionID *fftypes.UUID `json:"transactionId"` + Type fftypes.TokenType `json:"type"` + RequestID string `json:"requestId"` + TrackingID string `json:"trackingId"` } func (h *FFTokens) Name() string { @@ -126,51 +119,30 @@ func (h *FFTokens) handleReceipt(ctx context.Context, data fftypes.JSONObject) e } func (h *FFTokens) handleTokenPoolCreate(ctx context.Context, data fftypes.JSONObject) (err error) { - packedData := data.GetString("data") tokenType := data.GetString("type") protocolID := data.GetString("poolId") + trackingID := data.GetString("trackingId") operatorAddress := data.GetString("operator") tx := data.GetObject("transaction") txHash := tx.GetString("transactionHash") - if packedData == "" || - tokenType == "" || + if tokenType == "" || protocolID == "" || + trackingID == "" || operatorAddress == "" || txHash == "" { log.L(ctx).Errorf("TokenPool event is not valid - missing data: %+v", data) return nil // move on } - unpackedData := createPoolData{} - err = json.Unmarshal([]byte(packedData), &unpackedData) + txID, err := fftypes.ParseUUID(ctx, trackingID) if err != nil { - log.L(ctx).Errorf("TokenPool event is not valid - could not unpack data (%s): %+v", err, data) - return nil // move on - } - if unpackedData.Namespace == "" || - unpackedData.Name == "" || - unpackedData.ID == nil || - unpackedData.TransactionID == nil { - log.L(ctx).Errorf("TokenPool event is not valid - missing packed data: %+v", unpackedData) + log.L(ctx).Errorf("TokenPool event is not valid - invalid transaction ID (%s): %+v", err, data) return nil // move on } - pool := &fftypes.TokenPool{ - ID: unpackedData.ID, - TX: fftypes.TransactionRef{ - ID: unpackedData.TransactionID, - Type: fftypes.TransactionTypeTokenPool, - }, - Namespace: unpackedData.Namespace, - Name: unpackedData.Name, - Type: fftypes.FFEnum(tokenType), - Connector: h.configuredName, - ProtocolID: protocolID, - } - // If there's an error dispatching the event, we must return the error and shutdown - return h.callbacks.TokenPoolCreated(h, pool, operatorAddress, txHash, tx) + return h.callbacks.TokenPoolCreated(h, fftypes.FFEnum(tokenType), txID, protocolID, operatorAddress, txHash, tx) } func (h *FFTokens) eventLoop() { @@ -224,23 +196,13 @@ func (h *FFTokens) eventLoop() { } func (h *FFTokens) CreateTokenPool(ctx context.Context, operationID *fftypes.UUID, identity *fftypes.Identity, pool *fftypes.TokenPool) error { - data := createPoolData{ - Namespace: pool.Namespace, - Name: pool.Name, - ID: pool.ID, - TransactionID: pool.TX.ID, - } - packedData, err := json.Marshal(data) - var res *resty.Response - if err == nil { - res, err = h.client.R().SetContext(ctx). - SetBody(&createPool{ - Type: pool.Type, - RequestID: operationID.String(), - Data: string(packedData), - }). - Post("/api/v1/pool") - } + res, err := h.client.R().SetContext(ctx). + SetBody(&createPool{ + Type: pool.Type, + RequestID: operationID.String(), + TrackingID: pool.TX.ID.String(), + }). + Post("/api/v1/pool") if err != nil || !res.IsSuccess() { return restclient.WrapRestErr(ctx, res, err, i18n.MsgTokensRESTErr) } diff --git a/internal/tokens/fftokens/fftokens_test.go b/internal/tokens/fftokens/fftokens_test.go index fe375ac7f..30c2c2b55 100644 --- a/internal/tokens/fftokens/fftokens_test.go +++ b/internal/tokens/fftokens/fftokens_test.go @@ -118,8 +118,8 @@ func TestCreateTokenPool(t *testing.T) { assert.Contains(t, body, "requestId") delete(body, "requestId") assert.Equal(t, fftypes.JSONObject{ - "data": "{\"namespace\":\"ns1\",\"name\":\"new-pool\",\"id\":\"" + pool.ID.String() + "\",\"transactionId\":\"" + pool.TX.ID.String() + "\"}", - "type": "fungible", + "trackingId": pool.TX.ID.String(), + "type": "fungible", }, body) res := &http.Response{ @@ -172,53 +172,33 @@ func TestEvents(t *testing.T) { opID := fftypes.NewUUID() fromServer <- `{"id":"2","event":"receipt","data":{}}` - fromServer <- `{"id":"2","event":"receipt","data":{"id":"abc"}}` + fromServer <- `{"id":"3","event":"receipt","data":{"id":"abc"}}` // receipt: success mcb.On("TokensOpUpdate", h, opID, fftypes.OpStatusSucceeded, "", mock.Anything).Return(nil).Once() - fromServer <- `{"id":"3","event":"receipt","data":{"id":"` + opID.String() + `","success":true}}` + fromServer <- `{"id":"4","event":"receipt","data":{"id":"` + opID.String() + `","success":true}}` // receipt: failure mcb.On("TokensOpUpdate", h, opID, fftypes.OpStatusFailed, "", mock.Anything).Return(nil).Once() - fromServer <- `{"id":"4","event":"receipt","data":{"id":"` + opID.String() + `","success":false}}` + fromServer <- `{"id":"5","event":"receipt","data":{"id":"` + opID.String() + `","success":false}}` // token-pool: missing data - fromServer <- `{"id":"5","event":"token-pool"}` - msg = <-toServer - assert.Equal(t, `{"data":{"id":"5"},"event":"ack"}`, string(msg)) - - // token-pool: unparseable packed data - fromServer <- `{"id":"6","event":"token-pool","data":{"data":"!","type":"fungible","poolId":"F1","operator":"0x0","transaction":{"transactionHash":"abc"}}}` + fromServer <- `{"id":"6","event":"token-pool"}` msg = <-toServer assert.Equal(t, `{"data":{"id":"6"},"event":"ack"}`, string(msg)) - // token-pool: missing packed data - fromServer <- `{"id":"7","event":"token-pool","data":{"data":"{}","type":"fungible","poolId":"F1","operator":"0x0","transaction":{"transactionHash":"abc"}}}` + // token-pool: invalid uuid + fromServer <- `{"id":"7","event":"token-pool","data":{"trackingId":"bad","type":"fungible","poolId":"F1","operator":"0x0","transaction":{"transactionHash":"abc"}}}` msg = <-toServer assert.Equal(t, `{"data":{"id":"7"},"event":"ack"}`, string(msg)) - // token-pool: invalid uuids - fromServer <- `{"id":"8","event":"token-pool","data":{"data":"{\"namespace\":\"ns1\",\"name\":\"new-pool\",\"id\":\"bad\",\"transactionId\":\"bad\"}","type":"fungible","poolId":"F1","operator":"0x0","transaction":{"transactionHash":"abc"}}}` - msg = <-toServer - assert.Equal(t, `{"data":{"id":"8"},"event":"ack"}`, string(msg)) - - id1 := fftypes.NewUUID() - id2 := fftypes.NewUUID() + txID := fftypes.NewUUID() // token-pool: success - mcb.On("TokenPoolCreated", h, mock.MatchedBy(func(pool *fftypes.TokenPool) bool { - assert.Equal(t, "ns1", pool.Namespace) - assert.Equal(t, "new-pool", pool.Name) - assert.Equal(t, fftypes.TokenTypeFungible, pool.Type) - assert.Equal(t, "F1", pool.ProtocolID) - assert.Equal(t, *id1, *pool.ID) - assert.Equal(t, *id2, *pool.TX.ID) - return true - }), "0x0", "abc", fftypes.JSONObject{"transactionHash": "abc"}, - ).Return(nil) - fromServer <- `{"id":"9","event":"token-pool","data":{"data":"{\"namespace\":\"ns1\",\"name\":\"new-pool\",\"id\":\"` + id1.String() + `\",\"transactionId\":\"` + id2.String() + `\"}","type":"fungible","poolId":"F1","operator":"0x0","transaction":{"transactionHash":"abc"}}}` + mcb.On("TokenPoolCreated", h, fftypes.TokenTypeFungible, txID, "F1", "0x0", "abc", fftypes.JSONObject{"transactionHash": "abc"}).Return(nil) + fromServer <- `{"id":"8","event":"token-pool","data":{"trackingId":"` + txID.String() + `","type":"fungible","poolId":"F1","operator":"0x0","transaction":{"transactionHash":"abc"}}}` msg = <-toServer - assert.Equal(t, `{"data":{"id":"9"},"event":"ack"}`, string(msg)) + assert.Equal(t, `{"data":{"id":"8"},"event":"ack"}`, string(msg)) mcb.AssertExpectations(t) } diff --git a/mocks/assetmocks/manager.go b/mocks/assetmocks/manager.go index b0f67b6b2..7213fbab5 100644 --- a/mocks/assetmocks/manager.go +++ b/mocks/assetmocks/manager.go @@ -165,13 +165,27 @@ func (_m *Manager) Start() error { return r0 } -// TokenPoolCreated provides a mock function with given fields: tk, pool, signingIdentity, protocolTxID, additionalInfo -func (_m *Manager) TokenPoolCreated(tk tokens.Plugin, pool *fftypes.TokenPool, signingIdentity string, protocolTxID string, additionalInfo fftypes.JSONObject) error { - ret := _m.Called(tk, pool, signingIdentity, protocolTxID, additionalInfo) +// TokenPoolCreated provides a mock function with given fields: tk, tokenType, tx, protocolID, signingIdentity, protocolTxID, additionalInfo +func (_m *Manager) TokenPoolCreated(tk tokens.Plugin, tokenType fftypes.FFEnum, tx *fftypes.UUID, protocolID string, signingIdentity string, protocolTxID string, additionalInfo fftypes.JSONObject) error { + ret := _m.Called(tk, tokenType, tx, protocolID, signingIdentity, protocolTxID, additionalInfo) var r0 error - if rf, ok := ret.Get(0).(func(tokens.Plugin, *fftypes.TokenPool, string, string, fftypes.JSONObject) error); ok { - r0 = rf(tk, pool, signingIdentity, protocolTxID, additionalInfo) + if rf, ok := ret.Get(0).(func(tokens.Plugin, fftypes.FFEnum, *fftypes.UUID, string, string, string, fftypes.JSONObject) error); ok { + r0 = rf(tk, tokenType, tx, protocolID, signingIdentity, protocolTxID, additionalInfo) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// ValidateTokenPoolTx provides a mock function with given fields: ctx, pool, protocolTxID +func (_m *Manager) ValidateTokenPoolTx(ctx context.Context, pool *fftypes.TokenPool, protocolTxID string) error { + ret := _m.Called(ctx, pool, protocolTxID) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *fftypes.TokenPool, string) error); ok { + r0 = rf(ctx, pool, protocolTxID) } else { r0 = ret.Error(0) } diff --git a/mocks/broadcastmocks/manager.go b/mocks/broadcastmocks/manager.go index 3ddbaf6af..b518180ad 100644 --- a/mocks/broadcastmocks/manager.go +++ b/mocks/broadcastmocks/manager.go @@ -129,6 +129,29 @@ func (_m *Manager) BroadcastNamespace(ctx context.Context, ns *fftypes.Namespace return r0, r1 } +// BroadcastTokenPool provides a mock function with given fields: ctx, ns, pool, waitConfirm +func (_m *Manager) BroadcastTokenPool(ctx context.Context, ns string, pool *fftypes.TokenPoolAnnouncement, waitConfirm bool) (*fftypes.Message, error) { + ret := _m.Called(ctx, ns, pool, waitConfirm) + + var r0 *fftypes.Message + if rf, ok := ret.Get(0).(func(context.Context, string, *fftypes.TokenPoolAnnouncement, bool) *fftypes.Message); ok { + r0 = rf(ctx, ns, pool, waitConfirm) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*fftypes.Message) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, *fftypes.TokenPoolAnnouncement, bool) error); ok { + r1 = rf(ctx, ns, pool, waitConfirm) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetNodeSigningIdentity provides a mock function with given fields: ctx func (_m *Manager) GetNodeSigningIdentity(ctx context.Context) (*fftypes.Identity, error) { ret := _m.Called(ctx) diff --git a/mocks/tokenmocks/callbacks.go b/mocks/tokenmocks/callbacks.go index 5761fab62..5c34674a8 100644 --- a/mocks/tokenmocks/callbacks.go +++ b/mocks/tokenmocks/callbacks.go @@ -14,13 +14,13 @@ type Callbacks struct { mock.Mock } -// TokenPoolCreated provides a mock function with given fields: plugin, pool, signingIdentity, protocolTxID, additionalInfo -func (_m *Callbacks) TokenPoolCreated(plugin tokens.Plugin, pool *fftypes.TokenPool, signingIdentity string, protocolTxID string, additionalInfo fftypes.JSONObject) error { - ret := _m.Called(plugin, pool, signingIdentity, protocolTxID, additionalInfo) +// TokenPoolCreated provides a mock function with given fields: plugin, tokenType, tx, protocolID, signingIdentity, protocolTxID, additionalInfo +func (_m *Callbacks) TokenPoolCreated(plugin tokens.Plugin, tokenType fftypes.FFEnum, tx *fftypes.UUID, protocolID string, signingIdentity string, protocolTxID string, additionalInfo fftypes.JSONObject) error { + ret := _m.Called(plugin, tokenType, tx, protocolID, signingIdentity, protocolTxID, additionalInfo) var r0 error - if rf, ok := ret.Get(0).(func(tokens.Plugin, *fftypes.TokenPool, string, string, fftypes.JSONObject) error); ok { - r0 = rf(plugin, pool, signingIdentity, protocolTxID, additionalInfo) + if rf, ok := ret.Get(0).(func(tokens.Plugin, fftypes.FFEnum, *fftypes.UUID, string, string, string, fftypes.JSONObject) error); ok { + r0 = rf(plugin, tokenType, tx, protocolID, signingIdentity, protocolTxID, additionalInfo) } else { r0 = ret.Error(0) } diff --git a/pkg/fftypes/constants.go b/pkg/fftypes/constants.go index d2afee11d..0386dd46e 100644 --- a/pkg/fftypes/constants.go +++ b/pkg/fftypes/constants.go @@ -40,4 +40,7 @@ const ( // SystemTagDefineGroup is the topic for messages that send the definition of a group, to all parties in that group SystemTagDefineGroup SystemTag = "ff_define_group" + + // SystemTagDefinePool is the topic for messages that broadcast data definitions + SystemTagDefinePool SystemTag = "ff_define_pool" ) diff --git a/pkg/fftypes/operation.go b/pkg/fftypes/operation.go index e20b4fa91..96afc8d95 100644 --- a/pkg/fftypes/operation.go +++ b/pkg/fftypes/operation.go @@ -32,6 +32,8 @@ var ( OpTypeDataExchangeBlobSend OpType = ffEnum("optype", "dataexchange_blob_send") // OpTypeTokensCreatePool is a token pool creation OpTypeTokensCreatePool OpType = ffEnum("optype", "tokens_create_pool") + // OpTypeTokensAnnounce is a broadcast of token pool info + OpTypeTokensAnnouncePool OpType = ffEnum("optype", "tokens_announce_pool") ) // OpStatus is the current status of an operation diff --git a/pkg/fftypes/tokenpool.go b/pkg/fftypes/tokenpool.go index 40d911da5..eff7111d0 100644 --- a/pkg/fftypes/tokenpool.go +++ b/pkg/fftypes/tokenpool.go @@ -16,6 +16,10 @@ package fftypes +import ( + "context" +) + type TokenType = FFEnum var ( @@ -35,3 +39,26 @@ type TokenPool struct { Message *UUID `json:"message,omitempty"` TX TransactionRef `json:"tx,omitempty"` } + +type TokenPoolAnnouncement struct { + TokenPool + ProtocolTxID string `json:"protocolTxID"` +} + +func (t *TokenPool) Validate(ctx context.Context, existing bool) (err error) { + if err = ValidateFFNameField(ctx, t.Namespace, "namespace"); err != nil { + return err + } + if err = ValidateFFNameField(ctx, t.Name, "name"); err != nil { + return err + } + return nil +} + +func (t *TokenPool) Topic() string { + return namespaceTopic(t.Namespace) +} + +func (t *TokenPool) SetBroadcastMessage(msgID *UUID) { + t.Message = msgID +} diff --git a/pkg/fftypes/tokenpool_test.go b/pkg/fftypes/tokenpool_test.go new file mode 100644 index 000000000..a21c56899 --- /dev/null +++ b/pkg/fftypes/tokenpool_test.go @@ -0,0 +1,60 @@ +// Copyright © 2021 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fftypes + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestTokenPoolValidation(t *testing.T) { + pool := &TokenPool{ + Namespace: "!wrong", + Name: "ok", + } + err := pool.Validate(context.Background(), false) + assert.Regexp(t, "FF10131.*'namespace'", err) + + pool = &TokenPool{ + Namespace: "ok", + Name: "!wrong", + } + err = pool.Validate(context.Background(), false) + assert.Regexp(t, "FF10131.*'name'", err) + + pool = &TokenPool{ + Namespace: "ok", + Name: "ok", + } + err = pool.Validate(context.Background(), false) + assert.NoError(t, err) +} + +func TestTokenPoolDefinition(t *testing.T) { + pool := &TokenPool{ + Namespace: "ok", + Name: "ok", + } + var def Definition = pool + assert.Equal(t, "ff_ns_ok", def.Topic()) + + id := NewUUID() + def.SetBroadcastMessage(id) + assert.Equal(t, id, pool.Message) +} diff --git a/pkg/tokens/plugin.go b/pkg/tokens/plugin.go index a509f67b8..6cdc0d635 100644 --- a/pkg/tokens/plugin.go +++ b/pkg/tokens/plugin.go @@ -63,7 +63,7 @@ type Callbacks interface { // submitted by us, or by any other authorized party in the network. // // Error should will only be returned in shutdown scenarios - TokenPoolCreated(plugin Plugin, pool *fftypes.TokenPool, signingIdentity string, protocolTxID string, additionalInfo fftypes.JSONObject) error + TokenPoolCreated(plugin Plugin, tokenType fftypes.TokenType, tx *fftypes.UUID, protocolID, signingIdentity, protocolTxID string, additionalInfo fftypes.JSONObject) error } // Capabilities the supported featureset of the tokens