Skip to content

Commit

Permalink
node/object: Refactor code making request forwarding
Browse files Browse the repository at this point in the history
This refactors storage node app making it to use gRPC connection to
remote nodes for request forwarding without wrappers. The behavior is
preserved with some error text added/changed for the better.

Avoid importing `github.com/nspcc-dev/neofs-api-go/v2/rpc` packages. The
only place left will naturally go with coming SDK upgrade. One step
closer to whole `neofs-api-go` module's deprecation.

Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Jan 10, 2025
1 parent 55a25a1 commit 798d7a0
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 126 deletions.
66 changes: 38 additions & 28 deletions cmd/neofs-node/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ import (
"fmt"

objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/common"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/grpc"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/message"
"github.com/nspcc-dev/neofs-api-go/v2/status"
coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/proto"
)

type transport struct {
Expand All @@ -26,49 +25,60 @@ func (x *transport) SendReplicationRequestToNode(ctx context.Context, req []byte
return nil, fmt.Errorf("connect to remote node: %w", err)
}

var resp replicateResponse
err = c.ExecRaw(func(c *rawclient.Client) error {
var resp objectGRPC.ReplicateResponse
err = c.ExecRaw(func(conn *grpc.ClientConn) error {

Check warning on line 29 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L28-L29

Added lines #L28 - L29 were not covered by tests
// this will be changed during NeoFS API Go deprecation. Code most likely be
// placed in SDK
m := common.CallMethodInfo{Service: "neo.fs.v2.object.ObjectService", Name: "Replicate"}
err = rawclient.SendUnary(c, m, rawclient.BinaryMessage(req), &resp,
rawclient.WithContext(ctx), rawclient.AllowBinarySendingOnly())
err = conn.Invoke(ctx, objectGRPC.ObjectService_Replicate_FullMethodName, req, &resp, binaryMessageOnly)

Check warning on line 32 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L32

Added line #L32 was not covered by tests
if err != nil {
return fmt.Errorf("API transport (service=%s,op=%s): %w", m.Service, m.Name, err)
return fmt.Errorf("API transport (op=%s): %w", objectGRPC.ObjectService_Replicate_FullMethodName, err)

Check warning on line 34 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L34

Added line #L34 was not covered by tests
}
return resp.err
return err

Check warning on line 36 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L36

Added line #L36 was not covered by tests
})
return resp.sigs, err
}
if err != nil {
return nil, err
}

Check warning on line 40 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L38-L40

Added lines #L38 - L40 were not covered by tests

type replicateResponse struct {
sigs []byte
err error
return replicationResultFromResponse(&resp)

Check warning on line 42 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L42

Added line #L42 was not covered by tests
}

func (x replicateResponse) ToGRPCMessage() grpc.Message { return new(objectGRPC.ReplicateResponse) }
// [encoding.Codec] making Marshal to accept and forward []byte messages only.
var binaryMessageOnly = grpc.ForceCodec(protoCodecBinaryRequestOnly{})

type protoCodecBinaryRequestOnly struct{}

func (protoCodecBinaryRequestOnly) Name() string {
// may be any non-empty, conflicts are unlikely to arise
return "neofs_binary_sender"

Check warning on line 52 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L50-L52

Added lines #L50 - L52 were not covered by tests
}

func (x *replicateResponse) FromGRPCMessage(gm grpc.Message) error {
m, ok := gm.(*objectGRPC.ReplicateResponse)
if !ok {
return message.NewUnexpectedMessageType(gm, m)
func (protoCodecBinaryRequestOnly) Marshal(msg any) ([]byte, error) {
bMsg, ok := msg.([]byte)
if ok {
return bMsg, nil

Check warning on line 58 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L55-L58

Added lines #L55 - L58 were not covered by tests
}

return nil, fmt.Errorf("message is not of type %T", bMsg)

Check warning on line 61 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L61

Added line #L61 was not covered by tests
}

func (protoCodecBinaryRequestOnly) Unmarshal(raw []byte, msg any) error {
return encoding.GetCodec(proto.Name).Unmarshal(raw, msg)

Check warning on line 65 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L64-L65

Added lines #L64 - L65 were not covered by tests
}

func replicationResultFromResponse(m *objectGRPC.ReplicateResponse) ([]byte, error) {

Check warning on line 68 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L68

Added line #L68 was not covered by tests
var st *status.Status
if mst := m.GetStatus(); mst != nil {
st = new(status.Status)
err := st.FromGRPCMessage(mst)
if err != nil {
return fmt.Errorf("decode response status: %w", err)
return nil, fmt.Errorf("decode response status: %w", err)

Check warning on line 74 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L74

Added line #L74 was not covered by tests
}
}

x.err = apistatus.ErrorFromV2(st)
if x.err != nil {
return nil
err := apistatus.ErrorFromV2(st)
if err != nil {
return nil, err

Check warning on line 80 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L78-L80

Added lines #L78 - L80 were not covered by tests
}

x.sigs = m.GetObjectSignature()

return nil
return m.GetObjectSignature(), nil

Check warning on line 83 in cmd/neofs-node/transport.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/transport.go#L83

Added line #L83 was not covered by tests
}
10 changes: 5 additions & 5 deletions pkg/core/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"io"

rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/container"
Expand All @@ -14,6 +13,7 @@ import (
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation"
"github.com/nspcc-dev/neofs-sdk-go/user"
"google.golang.org/grpc"
)

// Client is an interface of NeoFS storage
Expand All @@ -30,7 +30,7 @@ type Client interface {
ObjectHash(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHash) ([][]byte, error)
AnnounceLocalTrust(ctx context.Context, epoch uint64, trusts []reputationSDK.Trust, prm client.PrmAnnounceLocalTrust) error
AnnounceIntermediateTrust(ctx context.Context, epoch uint64, trust reputationSDK.PeerToPeerTrust, prm client.PrmAnnounceIntermediateTrust) error
ExecRaw(f func(client *rawclient.Client) error) error
ExecRaw(f func(*grpc.ClientConn) error) error
Close() error
}

Expand All @@ -39,9 +39,9 @@ type Client interface {
type MultiAddressClient interface {
Client

// RawForAddress must return rawclient.Client
// for the passed network.Address.
RawForAddress(network.Address, func(cli *rawclient.Client) error) error
// RawForAddress executes op over gRPC connections to given multi-address
// endpoint-by-endpoint until success.
RawForAddress(multiAddr network.Address, op func(*grpc.ClientConn) error) error

ReportError(error)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/network/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
"github.com/nspcc-dev/neofs-node/internal/uriutil"
)

/*
Expand Down Expand Up @@ -69,7 +69,7 @@ func (a *Address) FromString(s string) error {
host string
hasTLS bool
)
host, hasTLS, err = client.ParseURI(s)
host, hasTLS, err = uriutil.Parse(s)
if err != nil {
host = s
}
Expand Down
21 changes: 18 additions & 3 deletions pkg/network/cache/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation"
"github.com/nspcc-dev/neofs-sdk-go/user"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -56,6 +57,20 @@ func newMultiClient(addr network.AddressGroup, opts ClientCacheOpts) *multiClien
}
}

type clientWrapper struct {
*client.Client
}

func (x clientWrapper) ExecRaw(f func(*grpc.ClientConn) error) error {
return x.Client.ExecRaw(func(c *rawclient.Client) error {
conn := c.Conn()
if conn == nil {
return errors.New("missing conn")
}
return f(conn.(*grpc.ClientConn))

Check warning on line 70 in pkg/network/cache/multi.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/cache/multi.go#L64-L70

Added lines #L64 - L70 were not covered by tests
})
}

func (x *multiClient) createForAddress(addr network.Address) (clientcore.Client, error) {
var (
prmInit client.PrmInit
Expand Down Expand Up @@ -88,7 +103,7 @@ func (x *multiClient) createForAddress(addr network.Address) (clientcore.Client,
return nil, fmt.Errorf("can't init SDK client: %w", err)
}

return c, nil
return clientWrapper{c}, nil

Check warning on line 106 in pkg/network/cache/multi.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/cache/multi.go#L106

Added line #L106 was not covered by tests
}

// updateGroup replaces current multiClient addresses with a new group.
Expand Down Expand Up @@ -329,7 +344,7 @@ func (x *multiClient) AnnounceIntermediateTrust(ctx context.Context, epoch uint6
})
}

func (x *multiClient) ExecRaw(f func(client *rawclient.Client) error) error {
func (x *multiClient) ExecRaw(f func(*grpc.ClientConn) error) error {

Check warning on line 347 in pkg/network/cache/multi.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/cache/multi.go#L347

Added line #L347 was not covered by tests
return x.iterateClients(context.Background(), func(c clientcore.Client) error {
return c.ExecRaw(f)
})
Expand All @@ -351,7 +366,7 @@ func (x *multiClient) Close() error {
return nil
}

func (x *multiClient) RawForAddress(addr network.Address, f func(client *rawclient.Client) error) error {
func (x *multiClient) RawForAddress(addr network.Address, f func(*grpc.ClientConn) error) error {

Check warning on line 369 in pkg/network/cache/multi.go

View check run for this annotation

Codecov / codecov/patch

pkg/network/cache/multi.go#L369

Added line #L369 was not covered by tests
c, err := x.client(addr)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 798d7a0

Please sign in to comment.