diff --git a/CHANGELOG.md b/CHANGELOG.md index ad817808..c46e9906 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,7 @@ - [230](https://github.com/vegaprotocol/vegatools/issue/230) - Update datanode api use to v2 - [232](https://github.com/vegaprotocol/vegatools/issue/232) - Add batched orders to perftest - [234](https://github.com/vegaprotocol/vegatools/issue/234) - Add pegged order support to perftest +- [236](https://github.com/vegaprotocol/vegatools/issues/236) - Diff tool introduced to compare core snapshot with data node `API` - [237](https://github.com/vegaprotocol/vegatools/issue/237) - Rename of Oracles to Data Sources ### 🐛 Fixes diff --git a/cmd/difftool.go b/cmd/difftool.go new file mode 100644 index 00000000..df441457 --- /dev/null +++ b/cmd/difftool.go @@ -0,0 +1,50 @@ +package cmd + +import ( + "os" + "strings" + + "code.vegaprotocol.io/vegatools/difftool/diff" + "code.vegaprotocol.io/vegatools/snapshotdb" + "github.com/spf13/cobra" +) + +var ( + diffToolOpts struct { + snapshotDatabasePath string + heightToOutput int64 + datanode string + } + + diffToolCmd = &cobra.Command{ + Use: "difftool", + Short: "Compare the state of a core snapshot with datanode API", + RunE: runDiffToolCmd, + } +) + +func init() { + rootCmd.AddCommand(diffToolCmd) + diffToolCmd.Flags().StringVarP(&diffToolOpts.snapshotDatabasePath, "snap-db-path", "s", "", "path to the goleveldb database folder") + diffToolCmd.Flags().Int64VarP(&diffToolOpts.heightToOutput, "block-height", "r", 0, "block-height of the snapshot to dump") + diffToolCmd.Flags().StringVarP(&diffToolOpts.datanode, "datanode", "d", "", "datanode url") + diffToolCmd.MarkFlagRequired("snap-db-path") + diffToolCmd.MarkFlagRequired("datanode") + +} +func runDiffToolCmd(cmd *cobra.Command, args []string) error { + temp := os.TempDir() + if !strings.HasSuffix(temp, string(os.PathSeparator)) { + temp = temp + string(os.PathSeparator) + } + println(temp) + snapshotPath := temp + "snapshot.dat" + + err := snapshotdb.Run(diffToolOpts.snapshotDatabasePath, false, snapshotPath, snapshotDBOpts.heightToOutput, "proto") + defer os.Remove(snapshotPath) + if err != nil { + return err + } + + return diff.Run(snapshotPath, diffToolOpts.datanode) +} diff --git a/cmd/snapshotdb.go b/cmd/snapshotdb.go index 54148f5b..902e16c8 100644 --- a/cmd/snapshotdb.go +++ b/cmd/snapshotdb.go @@ -9,6 +9,7 @@ var ( snapshotDBOpts struct { databasePath string outputPath string + outputFormat string heightToOutput int64 versionCountOnly bool } @@ -23,12 +24,13 @@ var ( func init() { rootCmd.AddCommand(snapshotDBCmd) snapshotDBCmd.Flags().StringVarP(&snapshotDBOpts.databasePath, "db-path", "d", "", "path to the goleveldb database folder") - snapshotDBCmd.Flags().StringVarP(&snapshotDBOpts.outputPath, "out", "o", "", "file to write JSON to") + snapshotDBCmd.Flags().StringVarP(&snapshotDBOpts.outputPath, "out", "o", "", "file to write output to") + snapshotDBCmd.Flags().StringVarP(&snapshotDBOpts.outputFormat, "format", "f", "json", "output format") snapshotDBCmd.Flags().Int64VarP(&snapshotDBOpts.heightToOutput, "block-height", "r", 0, "block-height of the snapshot to dump") snapshotDBCmd.Flags().BoolVarP(&snapshotDBOpts.versionCountOnly, "versions", "v", false, "display the number of stored versions") snapshotDBCmd.MarkFlagRequired("db-path") } func runSnapshotDBCmd(cmd *cobra.Command, args []string) error { - return snapshotdb.Run(snapshotDBOpts.databasePath, snapshotDBOpts.versionCountOnly, snapshotDBOpts.outputPath, snapshotDBOpts.heightToOutput) + return snapshotdb.Run(snapshotDBOpts.databasePath, snapshotDBOpts.versionCountOnly, snapshotDBOpts.outputPath, snapshotDBOpts.heightToOutput, snapshotDBOpts.outputFormat) } diff --git a/difftool/diff/core_snapshot.go b/difftool/diff/core_snapshot.go new file mode 100644 index 00000000..a30550d1 --- /dev/null +++ b/difftool/diff/core_snapshot.go @@ -0,0 +1,449 @@ +package diff + +import ( + "io/ioutil" + "os" + + "code.vegaprotocol.io/vega/libs/crypto" + dn "code.vegaprotocol.io/vega/protos/data-node/api/v2" + "code.vegaprotocol.io/vega/protos/vega" + events "code.vegaprotocol.io/vega/protos/vega/events/v1" + v1 "code.vegaprotocol.io/vega/protos/vega/events/v1" + snapshot "code.vegaprotocol.io/vega/protos/vega/snapshot/v1" + decimal "github.com/shopspring/decimal" + "google.golang.org/protobuf/proto" +) + +type snap struct { + chunk *snapshot.Chunk +} + +// Collect returns a dataset for comparison from core snapshot. +func (s *snap) Collect() *Result { + return &Result{ + Accounts: s.getAccounts(), + Orders: s.getOrders(), + Markets: s.getMarkets(), + Parties: s.getParties(), + Limits: s.getNetLimits(), + Assets: s.getAssets(), + VegaTime: s.getVegaTime(), + Delegations: s.getDelegations(), + Epoch: s.getEpoch(), + Nodes: s.getValidators(), + NetParams: s.getNetParams(), + Proposals: s.getProposals(), + Deposits: s.getDeposits(), + Withdrawals: s.getWithdrawals(), + Transfers: s.getTransfers(), + Positions: s.getPositions(), + Lps: s.getLps(), + Stake: s.getStake(), + } +} + +// getNetParams returns the network parmeters from the core snapshot. +func (s *snap) getNetParams() []*vega.NetworkParameter { + for _, c := range s.chunk.Data { + switch c.Data.(type) { + case *snapshot.Payload_NetworkParameters: + return c.GetNetworkParameters().Params + default: + continue + } + } + return []*vega.NetworkParameter{} +} + +// getWithdrawals returns withdrawals from the core snapshot. To make it compatible with datanode, the timestamps are converted to have +// microsecond resolution. +func (s *snap) getWithdrawals() []*vega.Withdrawal { + for _, c := range s.chunk.Data { + switch c.Data.(type) { + case *snapshot.Payload_BankingWithdrawals: + withdrawalsSnap := c.GetBankingWithdrawals().Withdrawals + withdrawals := make([]*vega.Withdrawal, 0, len(withdrawalsSnap)) + for _, w := range withdrawalsSnap { + w.Withdrawal.CreatedTimestamp = (w.Withdrawal.CreatedTimestamp / 1000) * 1000 + w.Withdrawal.WithdrawnTimestamp = (w.Withdrawal.WithdrawnTimestamp / 1000) * 1000 + w.Withdrawal.Ext = nil + withdrawals = append(withdrawals, w.Withdrawal) + } + return withdrawals + default: + continue + } + } + return []*vega.Withdrawal{} +} + +// getDeposits returns deposits from the core snapshot. To make it compatible with datanode, the timestamps are converted to have +// microsecond resolution. +func (s *snap) getDeposits() []*vega.Deposit { + for _, c := range s.chunk.Data { + switch c.Data.(type) { + case *snapshot.Payload_BankingDeposits: + depositsSnap := c.GetBankingDeposits().Deposit + deposits := make([]*vega.Deposit, 0, len(depositsSnap)) + for _, d := range depositsSnap { + d.Deposit.CreatedTimestamp = (d.Deposit.CreatedTimestamp / 1000) * 1000 + d.Deposit.CreditedTimestamp = (d.Deposit.CreditedTimestamp / 1000) * 1000 + deposits = append(deposits, d.Deposit) + } + return deposits + + default: + continue + } + } + return []*vega.Deposit{} +} + +// getLps returns the liquidity provisions from the core snapshot. To make it compatible with datanode, the timestamps are converted to have +// microsecond resolution. +func (s *snap) getLps() []*vega.LiquidityProvision { + lps := []*vega.LiquidityProvision{} + for _, c := range s.chunk.Data { + switch c.Data.(type) { + case *snapshot.Payload_LiquidityProvisions: + lps = append(lps, c.GetLiquidityProvisions().LiquidityProvisions...) + default: + continue + } + } + for _, lp := range lps { + lp.CreatedAt = (lp.CreatedAt / 1000) * 1000 + lp.UpdatedAt = (lp.UpdatedAt / 1000) * 1000 + } + return lps +} + +// getStake returns stake linking from the core snapshot. To make it compatible with datanode, the timestamps are converted to have +// microsecond resolution. +func (s *snap) getStake() []*v1.StakeLinking { + for _, c := range s.chunk.Data { + switch c.Data.(type) { + case *snapshot.Payload_StakingAccounts: + sl := []*v1.StakeLinking{} + for _, sa := range c.GetStakingAccounts().Accounts { + sl = append(sl, sa.Events...) + } + for _, s := range sl { + s.FinalizedAt = (s.FinalizedAt / 1000) * 1000 + } + return sl + default: + continue + } + } + return []*v1.StakeLinking{} +} + +// getAccounts returns account balances from the core snapshot. To make it compatible with datanode, network owner and no market are replaced with empty string. +func (s *snap) getAccounts() []*dn.AccountBalance { + for _, c := range s.chunk.Data { + switch c.Data.(type) { + case *snapshot.Payload_CollateralAccounts: + accs := c.GetCollateralAccounts().Accounts + balances := make([]*dn.AccountBalance, 0, len(accs)) + for _, a := range accs { + owner := a.Owner + if owner == "*" { + owner = "" + } + marketID := a.MarketId + if marketID == "!" { + marketID = "" + } + balances = append(balances, &dn.AccountBalance{ + Owner: owner, + MarketId: marketID, + Balance: a.Balance, + Asset: a.Asset, + Type: a.Type, + }) + } + return balances + + default: + continue + } + } + return []*dn.AccountBalance{} +} + +// getOrders returns the order book orders from the core snapshot. To make it compatible with datanode, the timestamps are converted to have +// microsecond resolution. In addition price is scaled to the asset decimals to be comparable with data node. +func (s *snap) getOrders() []*vega.Order { + orders := []*vega.Order{} + for _, c := range s.chunk.Data { + switch c.Data.(type) { + case *snapshot.Payload_MatchingBook: + orders = append(orders, c.GetMatchingBook().Buy...) + orders = append(orders, c.GetMatchingBook().Sell...) + default: + continue + } + } + assets := s.getAssets() + markets := s.getMarkets() + dpFactors := map[string]decimal.Decimal{} + for _, m := range markets { + marketDecimals := m.DecimalPlaces + asset, _ := m.GetAsset() + for _, a := range assets { + if a.Id == asset { + dpFactors[m.Id] = decimal.NewFromFloat32(10).Pow(decimal.NewFromFloat32(float32(a.Details.Decimals - marketDecimals))) + } + } + } + for _, o := range orders { + o.CreatedAt = (o.CreatedAt / 1000) * 1000 + o.ExpiresAt = (o.ExpiresAt / 1000) * 1000 + o.UpdatedAt = (o.UpdatedAt / 1000) * 1000 + price, _ := decimal.NewFromString(o.Price) + o.Price = price.Div(dpFactors[o.MarketId]).String() + } + + return orders +} + +// getMarkets returns active markets from the core snapshot. +func (s *snap) getMarkets() []*vega.Market { + for _, c := range s.chunk.Data { + switch c.Data.(type) { + case *snapshot.Payload_ExecutionMarkets: + markets := []*vega.Market{} + for _, m := range c.GetExecutionMarkets().Markets { + markets = append(markets, m.Market) + } + return markets + default: + continue + } + } + return []*vega.Market{} +} + +// getParties returns parties as a combination of parties with accounts and parties staking account. To make it comparable with datanode, network party * is replaced with "network". +func (s *snap) getParties() []*vega.Party { + partyMap := map[string]struct{}{} + for _, c := range s.chunk.Data { + switch c.Data.(type) { + case *snapshot.Payload_CollateralAccounts: + for _, a := range c.GetCollateralAccounts().Accounts { + if len(a.Owner) > 0 { + owner := a.Owner + if owner == "*" { + owner = "network" + } + partyMap[owner] = struct{}{} + } + } + case *snapshot.Payload_StakingAccounts: + for _, a := range c.GetStakingAccounts().Accounts { + partyMap[a.Party] = struct{}{} + } + default: + continue + } + } + parties := make([]*vega.Party, 0, len(partyMap)) + for k := range partyMap { + parties = append(parties, &vega.Party{Id: k}) + } + + return parties +} + +// getNetLimits returns the nework limits from the core snapshot. To work around snapshot specific logic of enabled to/from it is only set if positive. +func (s *snap) getNetLimits() *vega.NetworkLimits { + for _, c := range s.chunk.Data { + switch c.Data.(type) { + case *snapshot.Payload_LimitState: + limits := c.GetLimitState() + nl := &vega.NetworkLimits{ + CanProposeMarket: limits.CanProposeMarket, + CanProposeAsset: limits.CanProposeAsset, + GenesisLoaded: limits.GenesisLoaded, + ProposeMarketEnabled: limits.ProposeMarketEnabled, + ProposeAssetEnabled: limits.ProposeAssetEnabled, + BootstrapBlockCount: limits.BlockCount, + } + if limits.ProposeAssetEnabledFrom > 0 { + nl.ProposeAssetEnabledFrom = limits.ProposeAssetEnabledFrom + } + if limits.ProposeMarketEnabledFrom > 0 { + nl.ProposeMarketEnabledFrom = limits.ProposeMarketEnabledFrom + } + return nl + default: + continue + } + } + return &vega.NetworkLimits{} +} + +// getAssets returns all pending and active assets from the core snapshot. +func (s *snap) getAssets() []*vega.Asset { + assets := []*vega.Asset{} + for _, c := range s.chunk.Data { + switch c.Data.(type) { + case *snapshot.Payload_ActiveAssets: + assets = append(assets, c.GetActiveAssets().Assets...) + case *snapshot.Payload_PendingAssets: + assets = append(assets, c.GetPendingAssets().Assets...) + default: + continue + } + } + return assets +} + +// getVegaTime returns the vega time from the core snapshot. To make it compatible with datanode, the timestamps are converted to have +// microsecond resolution. +func (s *snap) getVegaTime() int64 { + for _, c := range s.chunk.Data { + switch c.Data.(type) { + case *snapshot.Payload_AppState: + return (c.GetAppState().Time / 1000) * 1000 + default: + continue + } + } + return 0 +} + +// getDelegations returns the delegations from the core snapshot. +func (s *snap) getDelegations() []*vega.Delegation { + for _, c := range s.chunk.Data { + switch c.Data.(type) { + case *snapshot.Payload_DelegationActive: + return c.GetDelegationActive().Delegations + default: + continue + } + } + return []*vega.Delegation{} +} + +// getEpoch returns the current epoch information (timestamps) +func (s *snap) getEpoch() *vega.Epoch { + for _, c := range s.chunk.Data { + switch c.Data.(type) { + case *snapshot.Payload_Epoch: + epoch := c.GetEpoch() + return &vega.Epoch{ + Seq: epoch.Seq, + Timestamps: &vega.EpochTimestamps{ + StartTime: (epoch.StartTime / 1000) * 1000, + ExpiryTime: (epoch.ExpireTime / 1000) * 1000, + }, + } + default: + continue + } + } + return &vega.Epoch{} +} + +// getProposals returns all the pending and enacted proposals. To make it compatible with datanode, the timestamps are converted to have +// microsecond resolution. +func (s *snap) getProposals() []*vega.Proposal { + proposals := []*vega.Proposal{} + for _, c := range s.chunk.Data { + switch c.Data.(type) { + case *snapshot.Payload_GovernanceActive: + for _, p := range c.GetGovernanceActive().Proposals { + proposals = append(proposals, p.Proposal) + } + case *snapshot.Payload_GovernanceEnacted: + for _, p := range c.GetGovernanceEnacted().Proposals { + proposals = append(proposals, p.Proposal) + } + case *snapshot.Payload_GovernanceNode: + proposals = append(proposals, c.GetGovernanceNode().Proposals...) + default: + continue + } + } + for _, p := range proposals { + p.Timestamp = (p.Timestamp / 1000) * 1000 + } + return proposals +} + +// getTransfers returns recurring and scheduled transfers from the core snapshot. To make it compatible with datanode, the timestamps are converted to have +// microsecond resolution. +func (s *snap) getTransfers() []*events.Transfer { + transfers := []*events.Transfer{} + for _, c := range s.chunk.Data { + switch c.Data.(type) { + case *snapshot.Payload_BankingRecurringTransfers: + transfers = append(transfers, c.GetBankingRecurringTransfers().RecurringTransfers.RecurringTransfers...) + case *snapshot.Payload_BankingScheduledTransfers: + for _, tt := range c.GetBankingScheduledTransfers().TransfersAtTime { + for _, t := range tt.Transfers { + transfers = append(transfers, t.OneoffTransfer) + } + } + default: + continue + } + } + + for _, t := range transfers { + t.Timestamp = (t.Timestamp / 1000) * 1000 + } + return transfers +} + +// getValidators returns information about the current validators and their ranking scores from the core snapshot. The ethereum address gets checksummed. +func (s *snap) getValidators() []*vega.Node { + for _, c := range s.chunk.Data { + switch c.Data.(type) { + case *snapshot.Payload_Topology: + nodes := []*vega.Node{} + for _, u := range c.GetTopology().ValidatorData { + nodes = append(nodes, &vega.Node{ + Id: u.ValidatorUpdate.NodeId, + PubKey: u.ValidatorUpdate.VegaPubKey, + TmPubKey: u.ValidatorUpdate.TmPubKey, + EthereumAddress: crypto.EthereumChecksumAddress(u.ValidatorUpdate.EthereumAddress), + InfoUrl: u.ValidatorUpdate.InfoUrl, + Location: u.ValidatorUpdate.Country, + Status: 1, + RankingScore: u.RankingScore, + Name: u.ValidatorUpdate.Name, + AvatarUrl: u.ValidatorUpdate.AvatarUrl, + }) + } + return nodes + default: + continue + } + } + return []*vega.Node{} +} + +// getPositions is currently unsupported as the core snapshot and datanode have very different abstractions. +// TODO +func (s *snap) getPositions() []*vega.Position { + return []*vega.Position{} +} + +// NewSnapshotData deserealises a proto file into snap. +func newSnapshotData(fileName string) (*snap, error) { + jsonFile, err := os.Open(fileName) + if err != nil { + return nil, err + } + defer jsonFile.Close() + + bytes, _ := ioutil.ReadAll(jsonFile) + + chunk := snapshot.Chunk{} + proto.Unmarshal(bytes, &chunk) + + return &snap{chunk: &chunk}, nil +} diff --git a/difftool/diff/datanode_client.go b/difftool/diff/datanode_client.go new file mode 100644 index 00000000..f20d34d3 --- /dev/null +++ b/difftool/diff/datanode_client.go @@ -0,0 +1,442 @@ +package diff + +import ( + "context" + "sync" + + "code.vegaprotocol.io/vega/libs/crypto" + dn "code.vegaprotocol.io/vega/protos/data-node/api/v2" + "code.vegaprotocol.io/vega/protos/vega" + v1 "code.vegaprotocol.io/vega/protos/vega/events/v1" + "google.golang.org/grpc" +) + +type dataNodeClient struct { + datanode dn.TradingDataServiceClient +} + +func newDataNodeClient(dataNodeAddr string) *dataNodeClient { + connection, err := grpc.Dial(dataNodeAddr, grpc.WithInsecure()) + if err != nil { + return nil + } + + return &dataNodeClient{ + datanode: dn.NewTradingDataServiceClient(connection), + } +} + +func (dnc *dataNodeClient) Collect() (*Result, error) { + res := &Result{} + var wg sync.WaitGroup + wg.Add(17) + + errors := make(chan error) + + go func() { + defer wg.Done() + var err error + res.Accounts, err = dnc.listAccounts() + if err != nil { + errors <- err + } + }() + + go func() { + defer wg.Done() + var err error + res.Orders, err = dnc.listOrders() + if err != nil { + errors <- err + } + }() + + go func() { + defer wg.Done() + var err error + res.Markets, err = dnc.listMarkets() + if err != nil { + errors <- err + } else { + for _, m := range res.Markets { + var lps []*vega.LiquidityProvision + lps, err = dnc.listLiquidityProvisions(m.Id) + if err != nil { + errors <- err + } + res.Lps = append(res.Lps, lps...) + } + } + }() + + go func() { + defer wg.Done() + var err error + res.Parties, err = dnc.listParties() + if err != nil { + errors <- err + } + }() + + go func() { + defer wg.Done() + var err error + res.Limits, err = dnc.getNetworkLimits() + if err != nil { + errors <- err + } + }() + + go func() { + defer wg.Done() + var err error + res.Assets, err = dnc.listAssets() + if err != nil { + errors <- err + } + }() + + go func() { + defer wg.Done() + var err error + res.VegaTime, err = dnc.getVegaTime() + if err != nil { + errors <- err + } + }() + + go func() { + defer wg.Done() + var err error + res.Delegations, err = dnc.listDelegations() + if err != nil { + errors <- err + } + }() + + go func() { + defer wg.Done() + var err error + res.Epoch, err = dnc.getEpoch() + if err != nil { + errors <- err + } + }() + + go func() { + defer wg.Done() + var err error + res.Nodes, err = dnc.listNodes() + if err != nil { + errors <- err + } + }() + + go func() { + defer wg.Done() + var err error + res.NetParams, err = dnc.listNetworkParameters() + if err != nil { + errors <- err + } + }() + + go func() { + defer wg.Done() + var err error + res.Proposals, err = dnc.listGovernanceData() + if err != nil { + errors <- err + } + }() + + go func() { + defer wg.Done() + var err error + res.Deposits, err = dnc.listDeposits() + if err != nil { + errors <- err + } + }() + + go func() { + defer wg.Done() + var err error + res.Withdrawals, err = dnc.listWithdrawals() + if err != nil { + errors <- err + } + }() + + go func() { + defer wg.Done() + var err error + res.Positions, err = dnc.listPositions() + if err != nil { + errors <- err + } + }() + + go func() { + defer wg.Done() + var err error + res.Transfers, err = dnc.listTransfers() + if err != nil { + errors <- err + } + }() + + go func() { + defer wg.Done() + var err error + res.Stake, err = dnc.getStake() + if err != nil { + errors <- err + } + }() + + var resErr error + go func() { + for r := range errors { + resErr = r + } + }() + wg.Wait() + + return res, resErr +} + +func (dnc *dataNodeClient) listAccounts() ([]*dn.AccountBalance, error) { + accResp, err := dnc.datanode.ListAccounts(context.Background(), &dn.ListAccountsRequest{}) + if err != nil { + return nil, err + } + accounts := make([]*dn.AccountBalance, 0, len(accResp.Accounts.Edges)) + for _, ae := range accResp.Accounts.Edges { + accounts = append(accounts, ae.Account) + } + return accounts, nil +} + +func (dnc *dataNodeClient) listOrders() ([]*vega.Order, error) { + liveOnly := true + orderResp, err := dnc.datanode.ListOrders(context.Background(), &dn.ListOrdersRequest{LiveOnly: &liveOnly}) + if err != nil { + return nil, err + } + orders := make([]*vega.Order, 0, len(orderResp.Orders.Edges)) + for _, oe := range orderResp.Orders.Edges { + if oe.Node.Status != vega.Order_STATUS_PARKED { + orders = append(orders, oe.Node) + } + } + return orders, nil +} + +func (dnc *dataNodeClient) listMarkets() ([]*vega.Market, error) { + marketResp, err := dnc.datanode.ListMarkets(context.Background(), &dn.ListMarketsRequest{}) + if err != nil { + return nil, err + } + markets := make([]*vega.Market, 0, len(marketResp.Markets.Edges)) + for _, me := range marketResp.Markets.Edges { + markets = append(markets, me.Node) + } + return markets, nil +} + +func (dnc *dataNodeClient) listParties() ([]*vega.Party, error) { + partiesResp, err := dnc.datanode.ListParties(context.Background(), &dn.ListPartiesRequest{}) + if err != nil { + return nil, err + } + parties := make([]*vega.Party, 0, len(partiesResp.Parties.Edges)) + for _, pe := range partiesResp.Parties.Edges { + parties = append(parties, pe.Node) + } + return parties, nil +} + +func (dnc *dataNodeClient) getNetworkLimits() (*vega.NetworkLimits, error) { + limitsResp, err := dnc.datanode.GetNetworkLimits(context.Background(), &dn.GetNetworkLimitsRequest{}) + if err != nil { + return nil, err + } + return limitsResp.Limits, nil +} + +func (dnc *dataNodeClient) listAssets() ([]*vega.Asset, error) { + assetResp, err := dnc.datanode.ListAssets(context.Background(), &dn.ListAssetsRequest{}) + if err != nil { + return nil, err + } + assets := make([]*vega.Asset, 0, len(assetResp.Assets.Edges)) + for _, a := range assetResp.Assets.Edges { + assets = append(assets, a.Node) + } + return assets, nil +} + +func (dnc *dataNodeClient) getVegaTime() (int64, error) { + vegaTimeResp, err := dnc.datanode.GetVegaTime(context.Background(), &dn.GetVegaTimeRequest{}) + if err != nil { + return 0, err + } + return vegaTimeResp.Timestamp, nil +} + +func (dnc *dataNodeClient) listDelegations() ([]*vega.Delegation, error) { + delegationResp, err := dnc.datanode.ListDelegations(context.Background(), &dn.ListDelegationsRequest{}) + if err != nil { + return nil, err + } + delegations := make([]*vega.Delegation, 0, len(delegationResp.Delegations.Edges)) + for _, d := range delegationResp.Delegations.Edges { + delegations = append(delegations, d.Node) + } + return delegations, nil +} + +func (dnc *dataNodeClient) getEpoch() (*vega.Epoch, error) { + epochResp, err := dnc.datanode.GetEpoch(context.Background(), &dn.GetEpochRequest{}) + if err != nil { + return nil, err + } + + return &vega.Epoch{ + Seq: epochResp.Epoch.Seq, + Timestamps: epochResp.Epoch.Timestamps, + }, nil +} + +func (dnc *dataNodeClient) listNodes() ([]*vega.Node, error) { + nodeResp, err := dnc.datanode.ListNodes(context.Background(), &dn.ListNodesRequest{}) + if err != nil { + return nil, err + } + nodes := make([]*vega.Node, 0, len(nodeResp.Nodes.Edges)) + for _, ne := range nodeResp.Nodes.Edges { + nodes = append(nodes, &vega.Node{ + Id: ne.Node.Id, + PubKey: ne.Node.PubKey, + TmPubKey: ne.Node.TmPubKey, + EthereumAddress: crypto.EthereumChecksumAddress(ne.Node.EthereumAddress), + InfoUrl: ne.Node.InfoUrl, + Location: ne.Node.Location, + Status: ne.Node.Status, + RankingScore: ne.Node.RankingScore, + Name: ne.Node.Name, + AvatarUrl: ne.Node.AvatarUrl, + }) + } + return nodes, nil +} + +func (dnc *dataNodeClient) listNetworkParameters() ([]*vega.NetworkParameter, error) { + resp, err := dnc.datanode.ListNetworkParameters(context.Background(), &dn.ListNetworkParametersRequest{}) + if err != nil { + return nil, err + } + + params := make([]*vega.NetworkParameter, 0, len(resp.NetworkParameters.Edges)) + for _, npe := range resp.NetworkParameters.Edges { + params = append(params, npe.Node) + } + return params, nil +} + +func (dnc *dataNodeClient) listGovernanceData() ([]*vega.Proposal, error) { + resp, err := dnc.datanode.ListGovernanceData(context.Background(), &dn.ListGovernanceDataRequest{}) + if err != nil { + return nil, err + } + proposals := make([]*vega.Proposal, 0, len(resp.Connection.Edges)) + for _, gde := range resp.Connection.Edges { + if gde.Node.Proposal.State != vega.Proposal_STATE_DECLINED && gde.Node.Proposal.State != vega.Proposal_STATE_REJECTED { + proposals = append(proposals, gde.Node.Proposal) + } + } + return proposals, nil +} + +func (dnc *dataNodeClient) listDeposits() ([]*vega.Deposit, error) { + resp, err := dnc.datanode.ListDeposits(context.Background(), &dn.ListDepositsRequest{}) + if err != nil { + return nil, err + } + deposits := make([]*vega.Deposit, 0, len(resp.Deposits.Edges)) + for _, de := range resp.Deposits.Edges { + deposits = append(deposits, de.Node) + } + return deposits, nil +} + +func (dnc *dataNodeClient) listWithdrawals() ([]*vega.Withdrawal, error) { + resp, err := dnc.datanode.ListWithdrawals(context.Background(), &dn.ListWithdrawalsRequest{}) + if err != nil { + return nil, err + } + withdrawals := make([]*vega.Withdrawal, 0, len(resp.Withdrawals.Edges)) + for _, we := range resp.Withdrawals.Edges { + we.Node.Ext = nil + withdrawals = append(withdrawals, we.Node) + } + return withdrawals, nil +} + +func (dnc *dataNodeClient) listTransfers() ([]*v1.Transfer, error) { + resp, err := dnc.datanode.ListTransfers(context.Background(), &dn.ListTransfersRequest{}) + if err != nil { + return nil, err + } + transfers := make([]*v1.Transfer, 0, len(resp.Transfers.Edges)) + for _, te := range resp.Transfers.Edges { + transfers = append(transfers, te.Node) + } + return transfers, nil +} + +func (dnc *dataNodeClient) listPositions() ([]*vega.Position, error) { + resp, err := dnc.datanode.ListPositions(context.Background(), &dn.ListPositionsRequest{}) + if err != nil { + return nil, err + } + positions := make([]*vega.Position, 0, len(resp.Positions.Edges)) + for _, pe := range resp.Positions.Edges { + positions = append(positions, pe.Node) + } + return positions, nil +} + +func (dnc *dataNodeClient) listLiquidityProvisions(market string) ([]*vega.LiquidityProvision, error) { + resp, err := dnc.datanode.ListLiquidityProvisions(context.Background(), &dn.ListLiquidityProvisionsRequest{MarketId: &market}) + if err != nil { + return nil, err + } + lps := make([]*vega.LiquidityProvision, 0, len(resp.LiquidityProvisions.Edges)) + for _, lpe := range resp.LiquidityProvisions.Edges { + if lpe.Node.Status == vega.LiquidityProvision_STATUS_ACTIVE || lpe.Node.Status == vega.LiquidityProvision_STATUS_UNDEPLOYED { + lps = append(lps, lpe.Node) + } + } + return lps, nil +} + +func (dnc *dataNodeClient) getStake() ([]*v1.StakeLinking, error) { + parties, err := dnc.listParties() + if err != nil { + return nil, err + } + + stake := []*v1.StakeLinking{} + for _, p := range parties { + resp, err := dnc.datanode.GetStake(context.Background(), &dn.GetStakeRequest{PartyId: p.Id}) + if err != nil { + return stake, err + } + for _, sle := range resp.StakeLinkings.Edges { + stake = append(stake, sle.Node) + } + } + return stake, nil +} diff --git a/difftool/diff/diff_report.go b/difftool/diff/diff_report.go new file mode 100644 index 00000000..1456c808 --- /dev/null +++ b/difftool/diff/diff_report.go @@ -0,0 +1,584 @@ +package diff + +import ( + "sort" + "strconv" + + dnproto "code.vegaprotocol.io/vega/protos/data-node/api/v2" + "code.vegaprotocol.io/vega/protos/vega" + v1 "code.vegaprotocol.io/vega/protos/vega/events/v1" +) + +func newDiffReport(coreResult *Result, datanodeResult *Result) *Report { + d := &Report{ + coreResult: coreResult, + datanodeResult: datanodeResult, + DiffResult: []Status{}, + Success: true, + } + d.diff() + return d +} + +func (dr *Report) diff() { + diffFuncs := []func(*Result, *Result) Status{ + diffAccountBalances, + diffOrders, + diffMarkets, + diffParties, + diffLimits, + diffAssets, + diffDelegations, + diffEpoch, + diffVegaTime, + diffNodes, + diffNetParams, + diffProposals, + diffDeposits, + diffWithdrawals, + diffLPs, + diffStake, + diffTransfers, + } + + for _, v := range diffFuncs { + r := v(dr.coreResult, dr.datanodeResult) + if r.MatchResult != FullMatch { + dr.Success = false + } + println("completed diff for", r.Key, "with status", r.MatchResult) + dr.DiffResult = append(dr.DiffResult, r) + } + println("completed diff with success?", dr.Success) +} + +// diffAccountBalances compares account balances - assuming all accounts should be in both the core snapshot and the datanode. +func diffAccountBalances(coreSnapshot *Result, dn *Result) Status { + core := coreSnapshot.Accounts + datanode := dn.Accounts + + markets := map[string]struct{}{} + for _, m := range coreSnapshot.Markets { + markets[m.Id] = struct{}{} + } + markets[""] = struct{}{} + + filteredDN := []*dnproto.AccountBalance{} + // datanode would have margin and bond accounts for settled markets so need to exclude them + for _, ab := range datanode { + if _, ok := markets[ab.MarketId]; ok || ab.Owner == "" { + filteredDN = append(filteredDN, ab) + } + } + datanode = filteredDN + + if len(core) != len(datanode) { + return getSizeMismatchStatus("accounts", core, datanode) + } + + sort.Slice(core, func(i, j int) bool { + return core[i].Owner+core[i].MarketId+core[i].Asset+core[i].Type.String() < core[j].Owner+core[j].MarketId+core[j].Asset+core[j].Type.String() + }) + + dnData := map[string]*dnproto.AccountBalance{} + for _, ab := range datanode { + id := ab.Owner + ab.MarketId + ab.Asset + ab.Type.String() + dnData[id] = ab + } + + for _, a := range core { + id := a.Owner + a.MarketId + a.Asset + a.Type.String() + if d, ok := dnData[id]; ok { + if a.String() != d.String() && a.Type != vega.AccountType_ACCOUNT_TYPE_EXTERNAL { + return getValueMismatchStatus("accounts", core, datanode) + } + } + } + + return getSuccessStatus("accounts", core, datanode) +} + +// diffOrders compares live orders from core snapshot and datanode. +// NB: parked orders from datanode are excluded in advance. +func diffOrders(coreSnapshot *Result, dn *Result) Status { + core := coreSnapshot.Orders + datanode := dn.Orders + sort.Slice(core, func(i, j int) bool { return core[i].Id < core[j].Id }) + sort.Slice(datanode, func(i, j int) bool { return datanode[i].Id < datanode[j].Id }) + + if len(core) != len(datanode) { + return getSizeMismatchStatus("orders", core, datanode) + } + + for i, a := range core { + d := datanode[i] + if a.String() != d.String() { + return getValueMismatchStatus("orders", core, datanode) + } + } + + return getSuccessStatus("orders", core, datanode) +} + +// diffMarkets compares active markets from core snapshots with the same from datanode. +func diffMarkets(coreSnapshot *Result, dn *Result) Status { + core := coreSnapshot.Markets + coreIds := map[string]struct{}{} + for _, m := range core { + coreIds[m.Id] = struct{}{} + } + + datanode := []*vega.Market{} + for _, m := range dn.Markets { + if _, ok := coreIds[m.Id]; ok { + datanode = append(datanode, m) + } + } + + sort.Slice(core, func(i, j int) bool { return core[i].Id < core[j].Id }) + sort.Slice(datanode, func(i, j int) bool { return datanode[i].Id < datanode[j].Id }) + if len(core) != len(datanode) { + return getSizeMismatchStatus("markets", core, datanode) + } + + for i, a := range core { + d := datanode[i] + if a.String() != d.String() { + return getValueMismatchStatus("markets", core, datanode) + } + } + + return getSuccessStatus("markets", core, datanode) +} + +// diffParties compares parties from core snapshot (i.e. collateral accounts and staking accounts) with datanode. +func diffParties(coreSnapshot *Result, dn *Result) Status { + core := coreSnapshot.Parties + datanode := dn.Parties + + sort.Slice(core, func(i, j int) bool { return core[i].Id < core[j].Id }) + sort.Slice(datanode, func(i, j int) bool { return datanode[i].Id < datanode[j].Id }) + + if len(core) != len(datanode) { + return getSizeMismatchStatus("parties", core, datanode) + } + + for i, a := range core { + if a.String() != datanode[i].String() { + return getValueMismatchStatus("parties", core, datanode) + } + } + + return getSuccessStatus("parties", core, datanode) +} + +// diffLimits compares network limits. +func diffLimits(coreSnapshot *Result, dn *Result) Status { + core := coreSnapshot.Limits + datanode := dn.Limits + + if core.String() != datanode.String() { + return getSimpleValueMismatchStatus("limits", core.String(), datanode.String()) + } + + return getSimpleSuccessStatus("limits") +} + +// assuming all assets ever existed are returned by both. +func diffAssets(coreSnapshot *Result, dn *Result) Status { + core := coreSnapshot.Assets + datanode := dn.Assets + + sort.Slice(core, func(i, j int) bool { return core[i].Id < core[j].Id }) + sort.Slice(datanode, func(i, j int) bool { return datanode[i].Id < datanode[j].Id }) + + if len(core) != len(datanode) { + return getSizeMismatchStatus("assets", core, datanode) + } + + for i, a := range core { + if a.String() != datanode[i].String() { + return getValueMismatchStatus("assets", core, datanode) + } + } + + return getSuccessStatus("assets", core, datanode) +} + +// diffDelegations compares the live delegations from the core with the corresponding delegtions from datanode. +// As datanode would return all delegations, need to filter by epochs the core has. For those epochs the state much perfectly match. +func diffDelegations(coreSnapshot *Result, dn *Result) Status { + core := coreSnapshot.Delegations + + epochs := map[string]struct{}{} + for _, d := range core { + epochs[d.EpochSeq] = struct{}{} + } + + datanode := []*vega.Delegation{} + for _, d := range dn.Delegations { + if _, ok := epochs[d.EpochSeq]; ok { + datanode = append(datanode, d) + } + } + + sort.Slice(core, func(i, j int) bool { + ai := core[i] + aj := core[j] + return ai.EpochSeq+"_"+ai.NodeId+"_"+ai.Party < aj.EpochSeq+"_"+aj.NodeId+"_"+aj.Party + }) + sort.Slice(datanode, func(i, j int) bool { + ai := datanode[i] + aj := datanode[j] + return ai.EpochSeq+"_"+ai.NodeId+"_"+ai.Party < aj.EpochSeq+"_"+aj.NodeId+"_"+aj.Party + }) + if len(core) != len(datanode) { + return getSizeMismatchStatus("delegations", core, datanode) + } + for i, a := range core { + d := datanode[i] + if a.String() != d.String() { + getValueMismatchStatus("delegations", core, datanode) + } + } + + return getSuccessStatus("delegations", core, datanode) +} + +// diffEpoch compares the timestamps of epoch from core snapshot and datanode. +func diffEpoch(coreSnapshot *Result, dn *Result) Status { + core := coreSnapshot.Epoch.String() + datanode := dn.Epoch.String() + + if core != datanode { + return getSimpleValueMismatchStatus("epoch", core, datanode) + } + + return getSimpleSuccessStatus("epoch") +} + +// diffVegaTime compares the current vega time on core snapshot and datanode. +func diffVegaTime(coreSnapshot *Result, dn *Result) Status { + core := coreSnapshot.VegaTime + datanode := dn.VegaTime + if core != datanode { + return getSimpleValueMismatchStatus("vegaTime", strconv.FormatInt(core, 10), strconv.FormatInt(datanode, 10)) + } + + return getSimpleSuccessStatus("vegaTime") +} + +// diffNodes compares the validator list on core snapshot and datanode. +// TODO: need to see what happens on a network where a node has been announced to be added in a future epoch - such node would be returned by the +// core snapshot but not by datanode APi. +func diffNodes(coreSnapshot *Result, dn *Result) Status { + core := coreSnapshot.Nodes + datanode := dn.Nodes + + sort.Slice(core, func(i, j int) bool { return core[i].Id < core[j].Id }) + sort.Slice(datanode, func(i, j int) bool { return datanode[i].Id < datanode[j].Id }) + + if len(core) != len(datanode) { + return getSizeMismatchStatus("nodes", core, datanode) + } + for i, a := range core { + d := datanode[i] + if a.String() != d.String() { + return getValueMismatchStatus("nodes", core, datanode) + } + } + return getSuccessStatus("nodes", core, datanode) +} + +// diffNetParams compares enacted and pending governance proposals from core snapshot and datanode. +func diffNetParams(coreSnapshot *Result, dn *Result) Status { + core := coreSnapshot.NetParams + datanode := dn.NetParams + sort.Slice(core, func(i, j int) bool { return core[i].Key < core[j].Key }) + sort.Slice(datanode, func(i, j int) bool { return datanode[i].Key < datanode[j].Key }) + + if len(core) != len(datanode) { + return getSizeMismatchStatus("netparams", core, datanode) + } + + for i, a := range core { + if a.String() != datanode[i].String() { + return getValueMismatchStatus("netparams", core, datanode) + } + } + + return getSuccessStatus("netparams", core, datanode) +} + +// diffProposals compares enacted and pending governance proposals from core snapshot and datanode. +func diffProposals(coreSnapshot *Result, dn *Result) Status { + core := coreSnapshot.Proposals + datanode := dn.Proposals + sort.Slice(core, func(i, j int) bool { return core[i].Id < core[j].Id }) + sort.Slice(datanode, func(i, j int) bool { return datanode[i].Id < datanode[j].Id }) + + if len(core) != len(datanode) { + return getSizeMismatchStatus("proposals", core, datanode) + } + + for i, a := range core { + d := datanode[i] + if a.String() != d.String() { + return getValueMismatchStatus("proposals", core, datanode) + } + // if a.Id != d.Id || + // a.Reference != d.Reference || + // a.PartyId != d.PartyId || + // a.State != d.State || + // a.Timestamp != d.Timestamp || + // a.Terms.ClosingTimestamp != d.Terms.ClosingTimestamp || + // a.Terms.EnactmentTimestamp != d.Terms.EnactmentTimestamp { + // return errResult + // } + // switch a.Terms.Change.(type) { + // case *vega.ProposalTerms_NewMarket: + // cCore := a.Terms.Change.(*vega.ProposalTerms_NewMarket).NewMarket.String() + // cDN := d.Terms.Change.(*vega.ProposalTerms_NewMarket).NewMarket.String() + // if cCore != cDN { + // return errResult + // } + // case *vega.ProposalTerms_UpdateMarket: + // cCore := a.Terms.Change.(*vega.ProposalTerms_UpdateMarket).UpdateMarket.String() + // cDN := d.Terms.Change.(*vega.ProposalTerms_UpdateMarket).UpdateMarket.String() + // if cCore != cDN { + // return errResult + // } + // case *vega.ProposalTerms_NewAsset: + // cCore := a.Terms.Change.(*vega.ProposalTerms_NewAsset).NewAsset.String() + // cDN := d.Terms.Change.(*vega.ProposalTerms_NewAsset).NewAsset.String() + // if cCore != cDN { + // return errResult + // } + // case *vega.ProposalTerms_UpdateAsset: + // cCore := a.Terms.Change.(*vega.ProposalTerms_UpdateAsset).UpdateAsset.String() + // cDN := d.Terms.Change.(*vega.ProposalTerms_UpdateAsset).UpdateAsset.String() + // if cCore != cDN { + // return errResult + // } + // case *vega.ProposalTerms_NewFreeform: + // cCore := a.Terms.Change.(*vega.ProposalTerms_NewFreeform).NewFreeform.String() + // cDN := d.Terms.Change.(*vega.ProposalTerms_NewFreeform).NewFreeform.String() + // if cCore != cDN { + // return errResult + // } + // case *vega.ProposalTerms_UpdateNetworkParameter: + // cCore := a.Terms.Change.(*vega.ProposalTerms_UpdateNetworkParameter).UpdateNetworkParameter.String() + // cDN := d.Terms.Change.(*vega.ProposalTerms_UpdateNetworkParameter).UpdateNetworkParameter.String() + // if cCore != cDN { + // return errResult + // } + // } + } + return getSuccessStatus("proposals", core, datanode) +} + +// diffDeposits compares *live* deposits from the core snapshot with the same from datanode. +func diffDeposits(coreSnapshot *Result, dn *Result) Status { + core := coreSnapshot.Deposits + datanode := []*vega.Deposit{} + coreDIDs := map[string]struct{}{} + for _, w := range core { + coreDIDs[w.Id] = struct{}{} + } + + // filter only live deposits from datanode + for _, w := range dn.Deposits { + if _, ok := coreDIDs[w.Id]; ok { + datanode = append(datanode, w) + } + } + + sort.Slice(core, func(i, j int) bool { return core[i].Id < core[j].Id }) + sort.Slice(datanode, func(i, j int) bool { return datanode[i].Id < datanode[j].Id }) + if len(core) != len(datanode) { + return getSizeMismatchStatus("deposits", core, datanode) + } + + for i, a := range core { + d := datanode[i] + if a.String() != d.String() { + return getValueMismatchStatus("deposits", core, datanode) + } + } + + return getSuccessStatus("deposits", core, datanode) +} + +// diffWithdrawals compares *live* withdarawls from the core snapshot with the same from datanode. +func diffWithdrawals(coreSnapshot *Result, dn *Result) Status { + core := coreSnapshot.Withdrawals + datanode := []*vega.Withdrawal{} + coreWIDs := map[string]struct{}{} + for _, w := range core { + coreWIDs[w.Id] = struct{}{} + } + + // filter only live withdrawals from datanode + for _, w := range dn.Withdrawals { + if _, ok := coreWIDs[w.Id]; ok { + datanode = append(datanode, w) + } + } + + sort.Slice(core, func(i, j int) bool { return core[i].Id < core[j].Id }) + sort.Slice(datanode, func(i, j int) bool { return datanode[i].Id < datanode[j].Id }) + if len(core) != len(datanode) { + return getSizeMismatchStatus("withdrawals", core, datanode) + } + + for i, a := range core { + d := datanode[i] + if a.String() != d.String() { + return getValueMismatchStatus("withdrawals", core, datanode) + } + } + + return getSuccessStatus("withdrawals", core, datanode) +} + +// diffLPs compares liquidity provisions from live markets from core snapshot with the same market LPs in datanode. +// only active and undeployed LPs are compared. +func diffLPs(coreSnapshot *Result, dn *Result) Status { + core := coreSnapshot.Lps + datanode := []*vega.LiquidityProvision{} + markets := map[string]struct{}{} + for _, m := range coreSnapshot.Markets { + markets[m.Id] = struct{}{} + } + for _, ab := range dn.Lps { + if _, ok := markets[ab.MarketId]; ok { + datanode = append(datanode, ab) + } + } + sort.Slice(core, func(i, j int) bool { return core[i].Id < core[j].Id }) + sort.Slice(datanode, func(i, j int) bool { return datanode[i].Id < datanode[j].Id }) + + if len(core) != len(datanode) { + return getSizeMismatchStatus("liquidityProvisions", core, datanode) + } + for i, a := range core { + if a.String() != datanode[i].String() { + return getValueMismatchStatus("liquidityProvisions", core, datanode) + } + } + + return getSuccessStatus("liquidityProvisions", core, datanode) +} + +// diffStake compares stake linking from the core and data node. They are expected to be of the same size and perfectly matching at this point. +func diffStake(coreSnapshot *Result, dn *Result) Status { + core := coreSnapshot.Stake + datanode := dn.Stake + sort.Slice(core, func(i, j int) bool { return core[i].Id < core[j].Id }) + sort.Slice(datanode, func(i, j int) bool { return datanode[i].Id < datanode[j].Id }) + + if len(core) != len(datanode) { + return getSizeMismatchStatus("stake", core, datanode) + } + + for i, a := range core { + d := datanode[i] + if a.String() != d.String() { + return getValueMismatchStatus("stake", core, datanode) + } + } + return getSuccessStatus("stake", core, datanode) +} + +// diffTransfers compares live transfers from the core snapshot with datanode. +// All live transfers from the core snapshot must exist on the data node snapshot and match perfectly. +func diffTransfers(coreSnapshot *Result, dn *Result) Status { + core := coreSnapshot.Transfers + datanode := []*v1.Transfer{} + + coreIDs := map[string]struct{}{} + for _, t := range core { + coreIDs[t.Id] = struct{}{} + } + for _, d := range dn.Transfers { + if _, ok := coreIDs[d.Id]; ok { + datanode = append(datanode, d) + } + } + + sort.Slice(core, func(i, j int) bool { return core[i].Id < core[j].Id }) + sort.Slice(datanode, func(i, j int) bool { return datanode[i].Id < datanode[j].Id }) + + if len(core) != len(datanode) { + return getSizeMismatchStatus("transfers", core, datanode) + } + + for i, a := range core { + d := datanode[i] + if a.String() != d.String() { + return getValueMismatchStatus("transfers", core, datanode) + } + } + + return getSuccessStatus("transfers", core, datanode) +} + +func getSuccessStatus[A interface{ String() string }](key string, core, datanode []A) Status { + return Status{ + Key: key, + MatchResult: FullMatch, + CoreResLen: len(core), + DataNodeLen: len(datanode), + } +} + +func getSimpleSuccessStatus(key string) Status { + return Status{ + Key: key, + MatchResult: FullMatch, + CoreResLen: 1, + DataNodeLen: 1, + } +} + +func getSizeMismatchStatus[A interface{ String() string }](key string, core, datanode []A) Status { + return Status{ + Key: key, + MatchResult: SizeMismatch, + CoreRes: sliceToString(core), + DatanodeRes: sliceToString(datanode), + CoreResLen: len(core), + DataNodeLen: len(datanode), + } +} + +func getValueMismatchStatus[A interface{ String() string }](key string, core, datanode []A) Status { + return Status{ + Key: key, + MatchResult: ValuesMismatch, + CoreRes: sliceToString(core), + DatanodeRes: sliceToString(datanode), + CoreResLen: len(core), + DataNodeLen: len(datanode), + } +} + +func getSimpleValueMismatchStatus(key string, core, datanode string) Status { + return Status{ + Key: key, + MatchResult: ValuesMismatch, + CoreRes: core, + DatanodeRes: datanode, + CoreResLen: 1, + DataNodeLen: 1, + } +} + +func sliceToString[A interface{ String() string }](s []A) string { + str := "" + for _, o := range s { + a := o.String() + str += a + "\n" + } + return str +} diff --git a/difftool/diff/diff_tool.go b/difftool/diff/diff_tool.go new file mode 100644 index 00000000..83355a86 --- /dev/null +++ b/difftool/diff/diff_tool.go @@ -0,0 +1,33 @@ +package diff + +import ( + "errors" + "fmt" +) + +// Run takes a snapshot (proto serialised) file path and data node connection string and runs the diff tool. +// Returns nil if no error is found or the error report otherwise. +func Run(snapshotFilePath, datanodeConnection string) error { + // get data node data + datanode := newDataNodeClient(datanodeConnection) + dataNodeResult, err := datanode.Collect() + if err != nil { + return err + } + + // get core snapshot data + coreSnapshot, err := newSnapshotData(snapshotFilePath) + if err != nil { + return err + } + coreResult := coreSnapshot.Collect() + + // generate a diff report + diffReport := newDiffReport(coreResult, dataNodeResult) + + if !diffReport.Success { + report := fmt.Sprintf("mismatch between core and datanode: %s", diffReport.String()) + return errors.New(report) + } + return nil +} diff --git a/difftool/diff/types.go b/difftool/diff/types.go new file mode 100644 index 00000000..e2271bfc --- /dev/null +++ b/difftool/diff/types.go @@ -0,0 +1,79 @@ +package diff + +import ( + "fmt" + + dn "code.vegaprotocol.io/vega/protos/data-node/api/v2" + "code.vegaprotocol.io/vega/protos/vega" + v1 "code.vegaprotocol.io/vega/protos/vega/events/v1" +) + +// MatchResult represents the result of comparing core snapshot of an engine with data node api for the same. +type MatchResult int64 + +const ( + // FullMatch means no discrepancies found. + FullMatch MatchResult = iota + // SizeMismatch means some expected entries are missing. + SizeMismatch + // ValuesMismatch means some values disagree between the core snapshot and the data node api. + ValuesMismatch +) + +var matchResultToName map[MatchResult]string = map[MatchResult]string{ + FullMatch: "full match", + SizeMismatch: "mismatching number of elements", + ValuesMismatch: "mismatching values", +} + +// Result corresponds to a dataset representing data node state ot core snapshot state. +type Result struct { + Accounts []*dn.AccountBalance + Orders []*vega.Order + Markets []*vega.Market + Parties []*vega.Party + Limits *vega.NetworkLimits + Assets []*vega.Asset + VegaTime int64 + Delegations []*vega.Delegation + Epoch *vega.Epoch + Nodes []*vega.Node + NetParams []*vega.NetworkParameter + Proposals []*vega.Proposal + Deposits []*vega.Deposit + Withdrawals []*vega.Withdrawal + Transfers []*v1.Transfer + Positions []*vega.Position + Lps []*vega.LiquidityProvision + Stake []*v1.StakeLinking +} + +// Status is a diff summary report for a key. +type Status struct { + Key string + MatchResult MatchResult + DatanodeRes string + CoreRes string + CoreResLen int + DataNodeLen int +} + +func (ds Status) String() string { + return fmt.Sprintf("key=%s, matchResult=%s, coreLength=%d, datanodeLength=%d, coreResult=%s, datanodeResult=%s", ds.Key, matchResultToName[ds.MatchResult], ds.CoreResLen, ds.DataNodeLen, ds.CoreRes, ds.DatanodeRes) +} + +// Report is the top level diff result aggregating the results from all compared keys. +type Report struct { + coreResult *Result + datanodeResult *Result + DiffResult []Status + Success bool +} + +func (dr *Report) String() string { + str := "" + for _, ds := range dr.DiffResult { + str += ds.String() + "\n" + } + return fmt.Sprintf("success=%t, report:\n%s", dr.Success, str) +} diff --git a/go.mod b/go.mod index afe8e19a..41259a08 100644 --- a/go.mod +++ b/go.mod @@ -4,12 +4,13 @@ go 1.19 require ( code.vegaprotocol.io/shared v0.0.0-20221010085458-55c50711135f - code.vegaprotocol.io/vega v0.60.1-0.20221028083753-0937da9816e1 + code.vegaprotocol.io/vega v0.61.1-0.20221031193706-46d1c0c29f82 github.com/cosmos/iavl v0.19.1 github.com/ethereum/go-ethereum v1.10.21 github.com/gdamore/tcell/v2 v2.5.2 github.com/gogo/protobuf v1.3.2 github.com/golang/protobuf v1.5.2 + github.com/shopspring/decimal v1.3.1 github.com/spf13/cobra v1.5.0 github.com/stretchr/testify v1.8.0 github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a @@ -42,6 +43,7 @@ require ( github.com/google/uuid v1.3.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.9.0 // indirect + github.com/holiman/uint256 v1.2.0 // indirect github.com/inconshreveable/mousetrap v1.0.1 // indirect github.com/jmhodges/levigo v1.0.0 // indirect github.com/lucasb-eyer/go-colorful v1.2.0 // indirect @@ -64,3 +66,8 @@ require ( gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace ( + github.com/jackc/pgx/v4 v4.14.1 => github.com/pscott31/pgx/v4 v4.16.2-0.20220531164027-bd666b84b61f + github.com/shopspring/decimal => github.com/vegaprotocol/decimal v1.3.1-uint256 +) diff --git a/go.sum b/go.sum index bcba936b..f4db8269 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,9 @@ +code.vegaprotocol.io/shared v0.0.0-20220704150014-7c22d12ccb72 h1:BJwmbEbC+ujqLSA1dSyUwDOc4+aRoZcndj/RM210CtE= +code.vegaprotocol.io/shared v0.0.0-20220704150014-7c22d12ccb72/go.mod h1:P9MfU2GyzI4Vc7OrKx9+qN9JV0bNGFB/aYe0++e7158= code.vegaprotocol.io/shared v0.0.0-20221010085458-55c50711135f h1:ivaWSXN7XKESShqYw/QFuT6b2mvThWPQLO7lW5sadHs= code.vegaprotocol.io/shared v0.0.0-20221010085458-55c50711135f/go.mod h1:XzX67GsyOHzvytMr0QOHX4CCTdCZDYKUUi88rx40Nt0= -code.vegaprotocol.io/vega v0.60.1-0.20221028083753-0937da9816e1 h1:jqGJ5x1a86QlCD4Jt9FOo6c7hLFF211cg0qi5uncMJM= -code.vegaprotocol.io/vega v0.60.1-0.20221028083753-0937da9816e1/go.mod h1:sVg4/2ExeIOZc2MCpdGjNiRM74ANqMQlhDyqdJEWPQE= +code.vegaprotocol.io/vega v0.61.1-0.20221031193706-46d1c0c29f82 h1:BDUzLad+pZhn32lYCmCSH1D07ryTNZYGwaH5V1m9hyQ= +code.vegaprotocol.io/vega v0.61.1-0.20221031193706-46d1c0c29f82/go.mod h1:sVg4/2ExeIOZc2MCpdGjNiRM74ANqMQlhDyqdJEWPQE= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM= github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= @@ -115,6 +117,7 @@ github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d h1:dg1dEPuW github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao= github.com/holiman/uint256 v1.2.0 h1:gpSYcPLWGv4sG43I2mVLiDZCNDh/EpGjSk8tmtxitHM= +github.com/holiman/uint256 v1.2.0/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huin/goupnp v1.0.3 h1:N8No57ls+MnjlB+JPiCVSOyy/ot7MJTqlo7rn+NYSqQ= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= @@ -215,6 +218,8 @@ github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hM github.com/tyler-smith/go-bip39 v1.1.0 h1:5eUemwrMargf3BSLRRCalXT93Ns6pQJIjYQN2nyfOP8= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/urfave/cli/v2 v2.10.2 h1:x3p8awjp/2arX+Nl/G2040AZpOCHS/eMJJ1/a+mye4Y= +github.com/vegaprotocol/decimal v1.3.1-uint256 h1:Aj//9joGGuz+dAKo6W/r9Rt1HUXYrjH7oerdCg1q/So= +github.com/vegaprotocol/decimal v1.3.1-uint256/go.mod h1:+mRbjtsnpvm5Qw6aiLEf3I6SHICNB4nhMTmH9y8hMtg= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/snapshotdb/snapshotdb.go b/snapshotdb/snapshotdb.go index ec1c3b29..d68fc45e 100644 --- a/snapshotdb/snapshotdb.go +++ b/snapshotdb/snapshotdb.go @@ -3,6 +3,7 @@ package snapshotdb import ( "bufio" "encoding/json" + "errors" "fmt" "os" @@ -97,6 +98,32 @@ func writeSnapshotAsJSON(tree *iavl.MutableTree, outputPath string) error { return nil } +// writeSnapshotAsProtobuf saves the snapshot as a binary slice of payloads which is more useful for loading when comparing against datanode. +func writeSnapshotAsProtobuf(tree *iavl.MutableTree, outputPath string) error { + // traverse the tree and get the payloads + payloads, _, err := getAllPayloads(tree) + if err != nil { + return err + } + + f, _ := os.Create(outputPath) + defer f.Close() + + w := bufio.NewWriter(f) + + chunk := &snapshot.Chunk{Data: payloads} + bytes, err := proto.Marshal(chunk) + if err != nil { + return err + } + + w.WriteString(string(bytes)) + + w.Flush() + fmt.Println("snapshot payloads written to:", outputPath) + return nil +} + func displayNumberOfVersions(versions int) error { j := struct { Versions int64 `json:"n_versions"` @@ -113,7 +140,7 @@ func displayNumberOfVersions(versions int) error { } // Run is the main entry point for this tool -func Run(dbpath string, versionsOnly bool, outputPath string, heightToOutput int64) error { +func Run(dbpath string, versionsOnly bool, outputPath string, heightToOutput int64, outputFormat string) error { // Attempt to open the database options := &opt.Options{ ErrorIfMissing: true, @@ -152,9 +179,14 @@ func Run(dbpath string, versionsOnly bool, outputPath string, heightToOutput int // either a height wasn't specified so we take the latest if heightToOutput == 0 || blockHeight == uint64(heightToOutput) { fmt.Println("found snapshot for block-height", blockHeight) - return writeSnapshotAsJSON(tree, outputPath) + if outputFormat == "json" { + return writeSnapshotAsJSON(tree, outputPath) + } + if outputFormat == "proto" { + return writeSnapshotAsProtobuf(tree, outputPath) + } + return errors.New("unknown output format requested") } - } return fmt.Errorf("could not find snapshot for height %d", heightToOutput)