Skip to content

Commit

Permalink
Announce token pool info via broadcast
Browse files Browse the repository at this point in the history
Rather than requiring all token pool metadata (namespace, name, id) to be written
to the blockchain, send it out via a broadcast message after creating the actual
pool on chain.

Signed-off-by: Andrew Richardson <[email protected]>
  • Loading branch information
awrichar committed Sep 27, 2021
1 parent 598a5c3 commit 2584876
Show file tree
Hide file tree
Showing 24 changed files with 1,283 additions and 361 deletions.
50 changes: 42 additions & 8 deletions internal/assets/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package assets

import (
"context"
"fmt"

"github.com/hyperledger/firefly/internal/broadcast"
"github.com/hyperledger/firefly/internal/config"
"github.com/hyperledger/firefly/internal/data"
"github.com/hyperledger/firefly/internal/i18n"
Expand All @@ -37,9 +39,10 @@ type Manager interface {
GetTokenPools(ctx context.Context, ns, typeName string, filter database.AndFilter) ([]*fftypes.TokenPool, *database.FilterResult, error)
GetTokenPool(ctx context.Context, ns, typeName, name string) (*fftypes.TokenPool, error)
GetTokenAccounts(ctx context.Context, ns, typeName, name string, filter database.AndFilter) ([]*fftypes.TokenAccount, *database.FilterResult, error)
ValidateTokenPoolTx(ctx context.Context, pool *fftypes.TokenPool, protocolTxID string) error

// Bound token callbacks
TokenPoolCreated(tk tokens.Plugin, pool *fftypes.TokenPool, signingIdentity string, protocolTxID string, additionalInfo fftypes.JSONObject) error
TokenPoolCreated(tk tokens.Plugin, tokenType fftypes.TokenType, tx *fftypes.UUID, protocolID, signingIdentity, protocolTxID string, additionalInfo fftypes.JSONObject) error

Start() error
WaitStop()
Expand All @@ -51,12 +54,13 @@ type assetManager struct {
identity identity.Plugin
data data.Manager
syncasync syncasync.Bridge
broadcast broadcast.Manager
tokens map[string]tokens.Plugin
retry retry.Retry
txhelper txcommon.Helper
}

func NewAssetManager(ctx context.Context, di database.Plugin, ii identity.Plugin, dm data.Manager, sa syncasync.Bridge, ti map[string]tokens.Plugin) (Manager, error) {
func NewAssetManager(ctx context.Context, di database.Plugin, ii identity.Plugin, dm data.Manager, sa syncasync.Bridge, bm broadcast.Manager, ti map[string]tokens.Plugin) (Manager, error) {
if di == nil || ii == nil || sa == nil || ti == nil {
return nil, i18n.NewError(ctx, i18n.MsgInitializationNilDepError)
}
Expand All @@ -66,6 +70,7 @@ func NewAssetManager(ctx context.Context, di database.Plugin, ii identity.Plugin
identity: ii,
data: dm,
syncasync: sa,
broadcast: bm,
tokens: ti,
retry: retry.Retry{
InitialDelay: config.GetDuration(config.AssetManagerRetryInitialDelay),
Expand All @@ -86,6 +91,28 @@ func (am *assetManager) selectTokenPlugin(ctx context.Context, name string) (tok
return nil, i18n.NewError(ctx, i18n.MsgUnknownTokensPlugin, name)
}

func storeTokenOpInputs(op *fftypes.Operation, pool *fftypes.TokenPool) {
op.Input = fftypes.JSONObject{
"id": pool.ID.String(),
"namespace": pool.Namespace,
"name": pool.Name,
}
}

func retrieveTokenOpInputs(ctx context.Context, op *fftypes.Operation, pool *fftypes.TokenPool) (err error) {
input := &op.Input
pool.ID, err = fftypes.ParseUUID(ctx, input.GetString("id"))
if err != nil {
return err
}
pool.Namespace = input.GetString("namespace")
pool.Name = input.GetString("name")
if pool.Namespace == "" || pool.Name == "" {
return fmt.Errorf("namespace or name missing from inputs")
}
return nil
}

func (am *assetManager) CreateTokenPool(ctx context.Context, ns string, typeName string, pool *fftypes.TokenPool, waitConfirm bool) (*fftypes.TokenPool, error) {
return am.CreateTokenPoolWithID(ctx, ns, fftypes.NewUUID(), typeName, pool, waitConfirm)
}
Expand Down Expand Up @@ -132,6 +159,13 @@ func (am *assetManager) CreateTokenPoolWithID(ctx context.Context, ns string, id
return nil, err
}

pool.ID = id
pool.Namespace = ns
pool.TX = fftypes.TransactionRef{
ID: tx.ID,
Type: tx.Subject.Type,
}

op := fftypes.NewTXOperation(
plugin,
ns,
Expand All @@ -140,17 +174,12 @@ func (am *assetManager) CreateTokenPoolWithID(ctx context.Context, ns string, id
fftypes.OpTypeTokensCreatePool,
fftypes.OpStatusPending,
author.Identifier)
storeTokenOpInputs(op, pool)
err = am.database.UpsertOperation(ctx, op, false)
if err != nil {
return nil, err
}

pool.ID = id
pool.Namespace = ns
pool.TX = fftypes.TransactionRef{
ID: tx.ID,
Type: tx.Subject.Type,
}
return pool, plugin.CreateTokenPool(ctx, op.ID, author, pool)
}

Expand Down Expand Up @@ -189,6 +218,11 @@ func (am *assetManager) GetTokenAccounts(ctx context.Context, ns, typeName, name
return am.database.GetTokenAccounts(ctx, filter.Condition(filter.Builder().Eq("protocolid", pool.ProtocolID)))
}

func (am *assetManager) ValidateTokenPoolTx(ctx context.Context, pool *fftypes.TokenPool, protocolTxID string) error {
// TODO: validate that the given token pool was created with the given protocolTxId
return nil
}

func (am *assetManager) Start() error {
return nil
}
Expand Down
14 changes: 12 additions & 2 deletions internal/assets/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/hyperledger/firefly/internal/config"
"github.com/hyperledger/firefly/internal/syncasync"
"github.com/hyperledger/firefly/mocks/broadcastmocks"
"github.com/hyperledger/firefly/mocks/databasemocks"
"github.com/hyperledger/firefly/mocks/datamocks"
"github.com/hyperledger/firefly/mocks/identitymocks"
Expand All @@ -40,12 +41,13 @@ func newTestAssets(t *testing.T) (*assetManager, func()) {
mii := &identitymocks.Plugin{}
mdm := &datamocks.Manager{}
msa := &syncasyncmocks.Bridge{}
mbm := &broadcastmocks.Manager{}
mti := &tokenmocks.Plugin{}
mti.On("Name").Return("ut_tokens").Maybe()
defaultIdentity := &fftypes.Identity{Identifier: "UTNodeID", OnChain: "0x12345"}
mii.On("Resolve", mock.Anything, "UTNodeID").Return(defaultIdentity, nil).Maybe()
ctx, cancel := context.WithCancel(context.Background())
a, err := NewAssetManager(ctx, mdi, mii, mdm, msa, map[string]tokens.Plugin{"magic-tokens": mti})
a, err := NewAssetManager(ctx, mdi, mii, mdm, msa, mbm, map[string]tokens.Plugin{"magic-tokens": mti})
rag := mdi.On("RunAsGroup", ctx, mock.Anything).Maybe()
rag.RunFn = func(a mock.Arguments) {
rag.ReturnArguments = mock.Arguments{a[1].(func(context.Context) error)(a[0].(context.Context))}
Expand All @@ -55,7 +57,7 @@ func newTestAssets(t *testing.T) (*assetManager, func()) {
}

func TestInitFail(t *testing.T) {
_, err := NewAssetManager(context.Background(), nil, nil, nil, nil, nil)
_, err := NewAssetManager(context.Background(), nil, nil, nil, nil, nil, nil)
assert.Regexp(t, "FF10128", err)
}

Expand Down Expand Up @@ -289,3 +291,11 @@ func TestGetTokenAccountsBadPool(t *testing.T) {
_, _, err := am.GetTokenAccounts(context.Background(), "ns1", "magic-tokens", "test", f)
assert.EqualError(t, err, "pop")
}

func TestValidateTokenPoolTx(t *testing.T) {
am, cancel := newTestAssets(t)
defer cancel()

err := am.ValidateTokenPoolTx(context.Background(), nil, "")
assert.NoError(t, err)
}
103 changes: 56 additions & 47 deletions internal/assets/token_pool_created.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,69 +17,78 @@
package assets

import (
"context"

"github.com/hyperledger/firefly/internal/log"
"github.com/hyperledger/firefly/pkg/database"
"github.com/hyperledger/firefly/pkg/fftypes"
"github.com/hyperledger/firefly/pkg/tokens"
)

func (am *assetManager) persistTokenPoolTransaction(ctx context.Context, pool *fftypes.TokenPool, signingIdentity string, protocolTxID string, additionalInfo fftypes.JSONObject) (valid bool, err error) {
if pool.ID == nil || pool.TX.ID == nil {
log.L(ctx).Errorf("Invalid token pool '%s'. Missing ID (%v) or transaction ID (%v)", pool.ID, pool.ID, pool.TX.ID)
return false, nil // this is not retryable
func (am *assetManager) TokenPoolCreated(tk tokens.Plugin, tokenType fftypes.TokenType, tx *fftypes.UUID, protocolID, signingIdentity, protocolTxID string, additionalInfo fftypes.JSONObject) error {
// Find a matching operation within this transaction
fb := database.OperationQueryFactory.NewFilter(am.ctx)
filter := fb.And(
fb.Eq("tx", tx),
fb.Eq("type", fftypes.OpTypeTokensCreatePool),
)
operations, _, err := am.database.GetOperations(am.ctx, filter)
if err != nil || len(operations) == 0 {
log.L(am.ctx).Debugf("Token pool transaction '%s' ignored, as it was not submitted by this node", tx)
return nil
}

pool := &fftypes.TokenPoolAnnouncement{
TokenPool: fftypes.TokenPool{
Type: tokenType,
ProtocolID: protocolID,
Author: signingIdentity,
},
ProtocolTxID: protocolTxID,
}
return am.txhelper.PersistTransaction(ctx, &fftypes.Transaction{
ID: pool.TX.ID,
err = retrieveTokenOpInputs(am.ctx, operations[0], &pool.TokenPool)
if err != nil {
log.L(am.ctx).Errorf("Error retrieving pool info from transaction '%s' (%s) - ignoring: %v", tx, err, operations[0].Input)
return nil
}

// Update the transaction with the info received (but leave transaction as "pending")
transaction := &fftypes.Transaction{
ID: tx,
Status: fftypes.OpStatusPending,
Subject: fftypes.TransactionSubject{
Namespace: pool.Namespace,
Type: pool.TX.Type,
Type: fftypes.TransactionTypeTokenPool,
Signer: signingIdentity,
Reference: pool.ID,
},
ProtocolID: protocolTxID,
Info: additionalInfo,
})
}

func (am *assetManager) persistTokenPool(ctx context.Context, pool *fftypes.TokenPool) (valid bool, err error) {
l := log.L(ctx)
if err := fftypes.ValidateFFNameField(ctx, pool.Name, "name"); err != nil {
l.Errorf("Invalid token pool '%s' - invalid name '%s': %a", pool.ID, pool.Name, err)
return false, nil // This is not retryable
}
err = am.database.UpsertTokenPool(ctx, pool)
if err != nil {
if err == database.IDMismatch {
log.L(ctx).Errorf("Invalid token pool '%s'. ID mismatch with existing record", pool.ID)
return false, nil // This is not retryable
pool.TX.ID = transaction.ID
pool.TX.Type = transaction.Subject.Type

// Add a new operation for the announcement
op := fftypes.NewTXOperation(
tk,
pool.Namespace,
tx,
"",
fftypes.OpTypeTokensAnnouncePool,
fftypes.OpStatusPending,
signingIdentity)

var valid bool
err = am.retry.Do(am.ctx, "persist token pool transaction", func(attempt int) (bool, error) {
valid, err = am.txhelper.PersistTransaction(am.ctx, transaction)
if valid && err == nil {
err = am.database.UpsertOperation(am.ctx, op, false)
}
l.Errorf("Failed to insert token pool '%s': %s", pool.ID, err)
return false, err // a persistence failure here is considered retryable (so returned)
return err != nil, err
})
if !valid || err != nil {
return err
}
return true, nil
}

func (am *assetManager) TokenPoolCreated(tk tokens.Plugin, pool *fftypes.TokenPool, signingIdentity string, protocolTxID string, additionalInfo fftypes.JSONObject) error {
return am.retry.Do(am.ctx, "persist token pool", func(attempt int) (bool, error) {
err := am.database.RunAsGroup(am.ctx, func(ctx context.Context) error {
valid, err := am.persistTokenPoolTransaction(ctx, pool, signingIdentity, protocolTxID, additionalInfo)
if valid && err == nil {
valid, err = am.persistTokenPool(ctx, pool)
}
if err != nil {
return err
}
if !valid {
log.L(ctx).Warnf("Token pool rejected id=%s author=%s", pool.ID, signingIdentity)
event := fftypes.NewEvent(fftypes.EventTypePoolRejected, pool.Namespace, pool.ID)
return am.database.InsertEvent(ctx, event)
}
log.L(ctx).Infof("Token pool created id=%s author=%s", pool.ID, signingIdentity)
event := fftypes.NewEvent(fftypes.EventTypePoolConfirmed, pool.Namespace, pool.ID)
return am.database.InsertEvent(ctx, event)
})
return err != nil, err // retry indefinitely (until context closes)
})
// Announce the details of the new token pool
_, err = am.broadcast.BroadcastTokenPool(am.ctx, pool.Namespace, pool, false)
return err
}
Loading

0 comments on commit 2584876

Please sign in to comment.