diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 1a605e8924..015bc7c1eb 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -49,7 +49,8 @@ type objectSvc struct { search *searchsvc.Service - get *getsvcV2.Service + get *getsvcV2.Service + get_ *getsvc.Service delete *deletesvc.Service } @@ -69,24 +70,24 @@ func (s *objectSvc) Put(ctx context.Context) (*putsvc.Streamer, error) { return s.put.Put(ctx) } -func (s *objectSvc) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) { - return s.get.Head(ctx, req) +func (s *objectSvc) Head(ctx context.Context, prm getsvc.HeadPrm) error { + return s.get_.Head(ctx, prm) } func (s *objectSvc) Search(ctx context.Context, prm searchsvc.Prm) error { return s.search.Search(ctx, prm) } -func (s *objectSvc) Get(req *object.GetRequest, stream objectService.GetObjectStream) error { - return s.get.Get(req, stream) +func (s *objectSvc) Get(ctx context.Context, prm getsvc.Prm) error { + return s.get_.Get(ctx, prm) } func (s *objectSvc) Delete(ctx context.Context, prm deletesvc.Prm) error { return s.delete.Delete(ctx, prm) } -func (s *objectSvc) GetRange(req *object.GetRangeRequest, stream objectService.GetObjectRangeStream) error { - return s.get.GetRange(req, stream) +func (s *objectSvc) GetRange(ctx context.Context, prm getsvc.RangePrm) error { + return s.get_.GetRange(ctx, prm) } func (s *objectSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { @@ -279,6 +280,7 @@ func initObjectService(c *cfg) { put: sPut, search: sSearch, get: sGetV2, + get_: sGet, delete: sDelete, } diff --git a/pkg/services/object/get/v2/service.go b/pkg/services/object/get/v2/service.go index 63e4ebd416..23ed610a1b 100644 --- a/pkg/services/object/get/v2/service.go +++ b/pkg/services/object/get/v2/service.go @@ -2,13 +2,10 @@ package getsvc import ( "context" - "errors" objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" - objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - "github.com/nspcc-dev/neofs-sdk-go/object" ) // Service implements Get operation of Object service v2. @@ -38,44 +35,6 @@ func NewService(opts ...Option) *Service { } } -// Get calls internal service and returns v2 object stream. -func (s *Service) Get(req *objectV2.GetRequest, stream objectSvc.GetObjectStream) error { - p, err := s.toPrm(req, stream) - if err != nil { - return err - } - - err = s.svc.Get(stream.Context(), *p) - - var splitErr *object.SplitInfoError - - switch { - case errors.As(err, &splitErr): - return stream.Send(splitInfoResponse(splitErr.SplitInfo())) - default: - return err - } -} - -// GetRange calls internal service and returns v2 payload range stream. -func (s *Service) GetRange(req *objectV2.GetRangeRequest, stream objectSvc.GetObjectRangeStream) error { - p, err := s.toRangePrm(req, stream) - if err != nil { - return err - } - - err = s.svc.GetRange(stream.Context(), *p) - - var splitErr *object.SplitInfoError - - switch { - case errors.As(err, &splitErr): - return stream.Send(splitInfoRangeResponse(splitErr.SplitInfo())) - default: - return err - } -} - // GetRangeHash calls internal service and returns v2 response. func (s *Service) GetRangeHash(ctx context.Context, req *objectV2.GetRangeHashRequest) (*objectV2.GetRangeHashResponse, error) { p, err := s.toHashRangePrm(req) @@ -91,28 +50,6 @@ func (s *Service) GetRangeHash(ctx context.Context, req *objectV2.GetRangeHashRe return toHashResponse(req.GetBody().GetType(), res), nil } -// Head serves NeoFS API v2 compatible HEAD requests. -func (s *Service) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV2.HeadResponse, error) { - resp := new(objectV2.HeadResponse) - resp.SetBody(new(objectV2.HeadResponseBody)) - - p, err := s.toHeadPrm(ctx, req, resp) - if err != nil { - return nil, err - } - - err = s.svc.Head(ctx, *p) - - var splitErr *object.SplitInfoError - - if errors.As(err, &splitErr) { - setSplitInfoHeadResponse(splitErr.SplitInfo(), resp) - err = nil - } - - return resp, err -} - func WithInternalService(v *getsvc.Service) Option { return func(c *cfg) { c.svc = v diff --git a/pkg/services/object/get/v2/streamer.go b/pkg/services/object/get/v2/streamer.go deleted file mode 100644 index b502d4542a..0000000000 --- a/pkg/services/object/get/v2/streamer.go +++ /dev/null @@ -1,62 +0,0 @@ -package getsvc - -import ( - objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" - objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" - "github.com/nspcc-dev/neofs-sdk-go/object" -) - -type streamObjectWriter struct { - objectSvc.GetObjectStream -} - -type streamObjectRangeWriter struct { - objectSvc.GetObjectRangeStream -} - -func (s *streamObjectWriter) WriteHeader(obj *object.Object) error { - p := new(objectV2.GetObjectPartInit) - - objV2 := obj.ToV2() - p.SetObjectID(objV2.GetObjectID()) - p.SetHeader(objV2.GetHeader()) - p.SetSignature(objV2.GetSignature()) - - return s.GetObjectStream.Send(newResponse(p)) -} - -func (s *streamObjectWriter) WriteChunk(chunk []byte) error { - p := new(objectV2.GetObjectPartChunk) - p.SetChunk(chunk) - - return s.GetObjectStream.Send(newResponse(p)) -} - -func newResponse(p objectV2.GetObjectPart) *objectV2.GetResponse { - r := new(objectV2.GetResponse) - - body := new(objectV2.GetResponseBody) - r.SetBody(body) - - body.SetObjectPart(p) - - return r -} - -func (s *streamObjectRangeWriter) WriteChunk(chunk []byte) error { - return s.GetObjectRangeStream.Send(newRangeResponse(chunk)) -} - -func newRangeResponse(p []byte) *objectV2.GetRangeResponse { - r := new(objectV2.GetRangeResponse) - - body := new(objectV2.GetRangeResponseBody) - r.SetBody(body) - - part := new(objectV2.GetRangePartChunk) - part.SetChunk(p) - - body.SetRangePart(part) - - return r -} diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index e6713ad249..8af1e9b056 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -7,26 +7,21 @@ import ( "errors" "fmt" "hash" - "io" "sync" objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" protoobject "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" "github.com/nspcc-dev/neofs-api-go/v2/refs" - protorefs "github.com/nspcc-dev/neofs-api-go/v2/refs/grpc" "github.com/nspcc-dev/neofs-api-go/v2/session" "github.com/nspcc-dev/neofs-api-go/v2/signature" "github.com/nspcc-dev/neofs-api-go/v2/status" protostatus "github.com/nspcc-dev/neofs-api-go/v2/status/grpc" "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/network" - objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" "github.com/nspcc-dev/neofs-node/pkg/services/object/internal" - internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" - neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" versionSDK "github.com/nspcc-dev/neofs-sdk-go/version" @@ -34,346 +29,6 @@ import ( "google.golang.org/grpc" ) -var errWrongMessageSeq = errors.New("incorrect message sequence") - -func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStream) (*getsvc.Prm, error) { - body := req.GetBody() - - addrV2 := body.GetAddress() - if addrV2 == nil { - return nil, errors.New("missing object address") - } - - var addr oid.Address - - err := addr.ReadFromV2(*addrV2) - if err != nil { - return nil, fmt.Errorf("invalid object address: %w", err) - } - - meta := req.GetMetaHeader() - - commonPrm, err := util.CommonPrmFromV2(req) - if err != nil { - return nil, err - } - - streamWrapper := &streamObjectWriter{stream} - - p := new(getsvc.Prm) - p.SetCommonParameters(commonPrm) - - p.WithAddress(addr) - p.WithRawFlag(body.GetRaw()) - p.SetObjectWriter(streamWrapper) - - if !commonPrm.LocalOnly() { - var onceResign sync.Once - - var onceHeaderSending sync.Once - var globalProgress int - - p.SetRequestForwarder(groupAddressRequestForwarder(func(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) { - var err error - - key, err := s.keyStorage.GetKey(nil) - if err != nil { - return nil, err - } - - // once compose and resign forwarding request - onceResign.Do(func() { - // compose meta header of the local server - metaHdr := new(session.RequestMetaHeader) - metaHdr.SetTTL(meta.GetTTL() - 1) - // TODO: #1165 think how to set the other fields - metaHdr.SetOrigin(meta) - writeCurrentVersion(metaHdr) - - req.SetMetaHeader(metaHdr) - - err = signature.SignServiceMessage(key, req) - }) - - if err != nil { - return nil, err - } - - // code below is copy-pasted from c.GetObject implementation, - // perhaps it is worth highlighting the utility function in neofs-api-go - - // open stream - var getStream protoobject.ObjectService_GetClient - err = c.RawForAddress(addr, func(conn *grpc.ClientConn) error { - // FIXME: context should be cancelled on return from upper func - getStream, err = protoobject.NewObjectServiceClient(conn).Get(ctx, req.ToGRPCMessage().(*protoobject.GetRequest)) - return err - }) - if err != nil { - return nil, fmt.Errorf("stream opening failed: %w", err) - } - - var ( - headWas bool - localProgress int - ) - - for { - // receive message from server stream - resp, err := getStream.Recv() - if err != nil { - if errors.Is(err, io.EOF) { - if !headWas { - return nil, io.ErrUnexpectedEOF - } - - break - } - - internalclient.ReportError(c, err) - return nil, fmt.Errorf("reading the response failed: %w", err) - } - - // verify response key - if err = internal.VerifyResponseKeyV2(pubkey, resp); err != nil { - return nil, err - } - - // verify response structure - resp2 := new(objectV2.GetResponse) - if err = resp2.FromGRPCMessage(resp); err != nil { - panic(err) // can only fail on wrong type, here it's correct - } - if err := signature.VerifyServiceMessage(resp2); err != nil { - return nil, fmt.Errorf("response verification failed: %w", err) - } - - if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil { - return nil, err - } - - switch v := resp.GetBody().GetObjectPart().(type) { - default: - return nil, fmt.Errorf("unexpected object part %T", v) - case *protoobject.GetResponse_Body_Init_: - if headWas { - return nil, errWrongMessageSeq - } - - headWas = true - - if v == nil || v.Init == nil { - return nil, errors.New("nil header oneof field") - } - - m := &protoobject.Object{ - ObjectId: v.Init.ObjectId, - Signature: v.Init.Signature, - Header: v.Init.Header, - } - - obj := new(objectV2.Object) - if err := obj.FromGRPCMessage(m); err != nil { - panic(err) // can only fail on wrong type, here it's correct - } - - onceHeaderSending.Do(func() { - err = streamWrapper.WriteHeader(object.NewFromV2(obj)) - }) - if err != nil { - return nil, fmt.Errorf("could not write object header in Get forwarder: %w", err) - } - case *protoobject.GetResponse_Body_Chunk: - if !headWas { - return nil, errWrongMessageSeq - } - - origChunk := v.GetChunk() - - chunk := chunkToSend(globalProgress, localProgress, origChunk) - if len(chunk) == 0 { - localProgress += len(origChunk) - continue - } - - if err = streamWrapper.WriteChunk(chunk); err != nil { - return nil, fmt.Errorf("could not write object chunk in Get forwarder: %w", err) - } - - localProgress += len(origChunk) - globalProgress += len(chunk) - case *protoobject.GetResponse_Body_SplitInfo: - if v == nil || v.SplitInfo == nil { - return nil, errors.New("nil split info oneof field") - } - var si2 objectV2.SplitInfo - if err := si2.FromGRPCMessage(v.SplitInfo); err != nil { - panic(err) // can only fail on wrong type, here it's correct - } - si := object.NewSplitInfoFromV2(&si2) - return nil, object.NewSplitInfoError(si) - } - } - - return nil, nil - })) - } - - return p, nil -} - -func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.GetObjectRangeStream) (*getsvc.RangePrm, error) { - body := req.GetBody() - - addrV2 := body.GetAddress() - if addrV2 == nil { - return nil, errors.New("missing object address") - } - - var addr oid.Address - - err := addr.ReadFromV2(*addrV2) - if err != nil { - return nil, fmt.Errorf("invalid object address: %w", err) - } - - meta := req.GetMetaHeader() - - commonPrm, err := util.CommonPrmFromV2(req) - if err != nil { - return nil, err - } - - p := new(getsvc.RangePrm) - p.SetCommonParameters(commonPrm) - - streamWrapper := &streamObjectRangeWriter{stream} - - p.WithAddress(addr) - p.WithRawFlag(body.GetRaw()) - p.SetChunkWriter(streamWrapper) - p.SetRange(object.NewRangeFromV2(body.GetRange())) - - err = p.Validate() - if err != nil { - return nil, fmt.Errorf("request params validation: %w", err) - } - - if !commonPrm.LocalOnly() { - var onceResign sync.Once - var globalProgress int - - key, err := s.keyStorage.GetKey(nil) - if err != nil { - return nil, err - } - - p.SetRequestForwarder(groupAddressRequestForwarder(func(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) { - var err error - - // once compose and resign forwarding request - onceResign.Do(func() { - // compose meta header of the local server - metaHdr := new(session.RequestMetaHeader) - metaHdr.SetTTL(meta.GetTTL() - 1) - // TODO: #1165 think how to set the other fields - metaHdr.SetOrigin(meta) - writeCurrentVersion(metaHdr) - - req.SetMetaHeader(metaHdr) - - err = signature.SignServiceMessage(key, req) - }) - - if err != nil { - return nil, err - } - - // code below is copy-pasted from c.ObjectPayloadRangeData implementation, - // perhaps it is worth highlighting the utility function in neofs-api-go - - // open stream - var rangeStream protoobject.ObjectService_GetRangeClient - err = c.RawForAddress(addr, func(conn *grpc.ClientConn) error { - // FIXME: context should be cancelled on return from upper func - rangeStream, err = protoobject.NewObjectServiceClient(conn).GetRange(ctx, req.ToGRPCMessage().(*protoobject.GetRangeRequest)) - return err - }) - if err != nil { - return nil, fmt.Errorf("could not create Get payload range stream: %w", err) - } - - var localProgress int - - for { - // receive message from server stream - resp, err := rangeStream.Recv() - if err != nil { - if errors.Is(err, io.EOF) { - break - } - - internalclient.ReportError(c, err) - return nil, fmt.Errorf("reading the response failed: %w", err) - } - - // verify response key - if err = internal.VerifyResponseKeyV2(pubkey, resp); err != nil { - return nil, err - } - - // verify response structure - resp2 := new(objectV2.GetRangeResponse) - if err = resp2.FromGRPCMessage(resp); err != nil { - panic(err) // can only fail on wrong type, here it's correct - } - if err := signature.VerifyServiceMessage(resp); err != nil { - return nil, fmt.Errorf("could not verify %T: %w", resp, err) - } - - if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil { - return nil, err - } - - switch v := resp.GetBody().GetRangePart().(type) { - case nil: - return nil, fmt.Errorf("unexpected range type %T", v) - case *protoobject.GetRangeResponse_Body_Chunk: - origChunk := v.GetChunk() - - chunk := chunkToSend(globalProgress, localProgress, origChunk) - if len(chunk) == 0 { - localProgress += len(origChunk) - continue - } - - if err = streamWrapper.WriteChunk(chunk); err != nil { - return nil, fmt.Errorf("could not write object chunk in GetRange forwarder: %w", err) - } - - localProgress += len(origChunk) - globalProgress += len(chunk) - case *protoobject.GetRangeResponse_Body_SplitInfo: - if v == nil || v.SplitInfo == nil { - return nil, errors.New("nil split info oneof field") - } - var si2 objectV2.SplitInfo - if err := si2.FromGRPCMessage(v.SplitInfo); err != nil { - panic(err) // can only fail on wrong type, here it's correct - } - si := object.NewSplitInfoFromV2(&si2) - - return nil, object.NewSplitInfoError(si) - } - } - - return nil, nil - })) - } - - return p, nil -} - func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.RangeHashPrm, error) { body := req.GetBody() @@ -502,239 +157,6 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran return p, nil } -type headResponseWriter struct { - mainOnly bool - - body *objectV2.HeadResponseBody -} - -func (w *headResponseWriter) WriteHeader(hdr *object.Object) error { - if w.mainOnly { - w.body.SetHeaderPart(toShortObjectHeader(hdr)) - } else { - w.body.SetHeaderPart(toFullObjectHeader(hdr)) - } - - return nil -} - -func (s *Service) toHeadPrm(_ context.Context, req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) { - body := req.GetBody() - - addrV2 := body.GetAddress() - if addrV2 == nil { - return nil, errors.New("missing object address") - } - - var objAddr oid.Address - - err := objAddr.ReadFromV2(*addrV2) - if err != nil { - return nil, fmt.Errorf("invalid object address: %w", err) - } - - meta := req.GetMetaHeader() - - commonPrm, err := util.CommonPrmFromV2(req) - if err != nil { - return nil, err - } - - p := new(getsvc.HeadPrm) - p.SetCommonParameters(commonPrm) - - p.WithAddress(objAddr) - p.WithRawFlag(body.GetRaw()) - p.SetHeaderWriter(&headResponseWriter{ - mainOnly: body.GetMainOnly(), - body: resp.GetBody(), - }) - - if !commonPrm.LocalOnly() { - var onceResign sync.Once - - p.SetRequestForwarder(groupAddressRequestForwarder(func(ctx context.Context, addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) { - var err error - - key, err := s.keyStorage.GetKey(nil) - if err != nil { - return nil, err - } - - // once compose and resign forwarding request - onceResign.Do(func() { - // compose meta header of the local server - metaHdr := new(session.RequestMetaHeader) - metaHdr.SetTTL(meta.GetTTL() - 1) - // TODO: #1165 think how to set the other fields - metaHdr.SetOrigin(meta) - writeCurrentVersion(metaHdr) - - req.SetMetaHeader(metaHdr) - - err = signature.SignServiceMessage(key, req) - }) - - if err != nil { - return nil, err - } - - // code below is copy-pasted from c.GetObjectHeader implementation, - // perhaps it is worth highlighting the utility function in neofs-api-go - - // send Head request - var headResp *protoobject.HeadResponse - err = c.RawForAddress(addr, func(conn *grpc.ClientConn) error { - headResp, err = protoobject.NewObjectServiceClient(conn).Head(ctx, req.ToGRPCMessage().(*protoobject.HeadRequest)) - return err - }) - if err != nil { - return nil, fmt.Errorf("sending the request failed: %w", err) - } - - // verify response key - if err = internal.VerifyResponseKeyV2(pubkey, headResp); err != nil { - return nil, err - } - - // verify response structure - resp2 := new(objectV2.HeadResponse) - if err = resp2.FromGRPCMessage(headResp); err != nil { - panic(err) // can only fail on wrong type, here it's correct - } - if err := signature.VerifyServiceMessage(resp2); err != nil { - return nil, fmt.Errorf("response verification failed: %w", err) - } - - if err = checkStatus(headResp.GetMetaHeader().GetStatus()); err != nil { - return nil, err - } - - var ( - hdr *protoobject.Header - idSig *protorefs.Signature - ) - - switch v := headResp.GetBody().GetHead().(type) { - case nil: - return nil, fmt.Errorf("unexpected header type %T", v) - case *protoobject.HeadResponse_Body_ShortHeader: - if !body.GetMainOnly() { - return nil, fmt.Errorf("wrong header part type: expected %T, received %T", - (*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil), - ) - } - - if v == nil || v.ShortHeader == nil { - return nil, errors.New("nil short header oneof field") - } - - h := v.ShortHeader - hdr = &protoobject.Header{ - Version: h.Version, - OwnerId: h.OwnerId, - CreationEpoch: h.CreationEpoch, - PayloadLength: h.PayloadLength, - PayloadHash: h.PayloadHash, - ObjectType: h.ObjectType, - HomomorphicHash: h.HomomorphicHash, - } - case *protoobject.HeadResponse_Body_Header: - if body.GetMainOnly() { - return nil, fmt.Errorf("wrong header part type: expected %T, received %T", - (*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil), - ) - } - - if v == nil || v.Header == nil { - return nil, errors.New("nil header oneof field") - } - - if v.Header.Header == nil { - return nil, errors.New("missing header") - } - - hdr = v.Header.Header - idSig = v.Header.Signature - - if idSig == nil { - // TODO(@cthulhu-rider): #1387 use "const" error - return nil, errors.New("missing signature") - } - - binID := objAddr.Object().Marshal() - - var sig2 refs.Signature - if err := sig2.FromGRPCMessage(idSig); err != nil { - panic(err) // can only fail on wrong type, here it's correct - } - var sig neofscrypto.Signature - if err := sig.ReadFromV2(sig2); err != nil { - return nil, fmt.Errorf("can't read signature: %w", err) - } - - if !sig.Verify(binID) { - return nil, errors.New("invalid object ID signature") - } - case *protoobject.HeadResponse_Body_SplitInfo: - if v == nil || v.SplitInfo == nil { - return nil, errors.New("nil split info oneof field") - } - var si2 objectV2.SplitInfo - if err := si2.FromGRPCMessage(v.SplitInfo); err != nil { - panic(err) // can only fail on wrong type, here it's correct - } - si := object.NewSplitInfoFromV2(&si2) - - return nil, object.NewSplitInfoError(si) - } - - mObj := &protoobject.Object{ - Signature: idSig, - Header: hdr, - } - objv2 := new(objectV2.Object) - if err := objv2.FromGRPCMessage(mObj); err != nil { - panic(err) // can only fail on wrong type, here it's correct - } - - obj := object.NewFromV2(objv2) - obj.SetID(objAddr.Object()) - - // convert the object - return obj, nil - })) - } - - return p, nil -} - -func splitInfoResponse(info *object.SplitInfo) *objectV2.GetResponse { - resp := new(objectV2.GetResponse) - - body := new(objectV2.GetResponseBody) - resp.SetBody(body) - - body.SetObjectPart(info.ToV2()) - - return resp -} - -func splitInfoRangeResponse(info *object.SplitInfo) *objectV2.GetRangeResponse { - resp := new(objectV2.GetRangeResponse) - - body := new(objectV2.GetRangeResponseBody) - resp.SetBody(body) - - body.SetRangePart(info.ToV2()) - - return resp -} - -func setSplitInfoHeadResponse(info *object.SplitInfo, resp *objectV2.HeadResponse) { - resp.GetBody().SetHeaderPart(info.ToV2()) -} - func toHashResponse(typ refs.ChecksumType, res *getsvc.RangeHashRes) *objectV2.GetRangeHashResponse { resp := new(objectV2.GetRangeHashResponse) @@ -747,31 +169,6 @@ func toHashResponse(typ refs.ChecksumType, res *getsvc.RangeHashRes) *objectV2.G return resp } -func toFullObjectHeader(hdr *object.Object) objectV2.GetHeaderPart { - obj := hdr.ToV2() - - hs := new(objectV2.HeaderWithSignature) - hs.SetHeader(obj.GetHeader()) - hs.SetSignature(obj.GetSignature()) - - return hs -} - -func toShortObjectHeader(hdr *object.Object) objectV2.GetHeaderPart { - hdrV2 := hdr.ToV2().GetHeader() - - sh := new(objectV2.ShortHeader) - sh.SetOwnerID(hdrV2.GetOwnerID()) - sh.SetCreationEpoch(hdrV2.GetCreationEpoch()) - sh.SetPayloadLength(hdrV2.GetPayloadLength()) - sh.SetVersion(hdrV2.GetVersion()) - sh.SetObjectType(hdrV2.GetObjectType()) - sh.SetHomomorphicHash(hdrV2.GetHomomorphicHash()) - sh.SetPayloadHash(hdrV2.GetPayloadHash()) - - return sh -} - func groupAddressRequestForwarder[V any](f func(context.Context, network.Address, client.MultiAddressClient, []byte) (V, error)) func(context.Context, client.NodeInfo, client.MultiAddressClient) (V, error) { return func(ctx context.Context, info client.NodeInfo, c client.MultiAddressClient) (V, error) { var ( @@ -823,16 +220,3 @@ func checkStatus(st *protostatus.Status) error { return nil } - -func chunkToSend(global, local int, chunk []byte) []byte { - if global == local { - return chunk - } - - if local+len(chunk) <= global { - // chunk has already been sent - return nil - } - - return chunk[global-local:] -} diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index 4f870bdd2c..865d8eeeac 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -25,6 +25,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/network" aclsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/acl/v2" deletesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/delete" + getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" "github.com/nspcc-dev/neofs-node/pkg/services/object/internal" internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client" putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" @@ -41,30 +42,17 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/user" "github.com/nspcc-dev/neofs-sdk-go/version" "google.golang.org/grpc" - "google.golang.org/protobuf/proto" ) -// GetObjectStream is an interface of NeoFS API v2 compatible object streamer. -type GetObjectStream interface { - util.ServerStream - Send(*v2object.GetResponse) error -} - -// GetObjectRangeStream is an interface of NeoFS API v2 compatible payload range streamer. -type GetObjectRangeStream interface { - util.ServerStream - Send(*v2object.GetRangeResponse) error -} - // ServiceServer is an interface of utility // serving v2 Object service. type ServiceServer interface { - Get(*v2object.GetRequest, GetObjectStream) error + Get(context.Context, getsvc.Prm) error Put(context.Context) (*putsvc.Streamer, error) - Head(context.Context, *v2object.HeadRequest) (*v2object.HeadResponse, error) + Head(context.Context, getsvc.HeadPrm) error Search(context.Context, searchsvc.Prm) error Delete(context.Context, deletesvc.Prm) error - GetRange(*v2object.GetRangeRequest, GetObjectRangeStream) error + GetRange(context.Context, getsvc.RangePrm) error GetRangeHash(context.Context, *v2object.GetRangeHashRequest) (*v2object.GetRangeHashResponse, error) } @@ -540,12 +528,21 @@ func (s *server) signHeadResponse(err error, startTime time.Time, resp *protoobj } func (s *server) makeStatusHeadResponse(startTime time.Time, err error) (*protoobject.HeadResponse, error) { + var splitErr *object.SplitInfoError + if errors.As(err, &splitErr) { + return s.signHeadResponse(err, startTime, &protoobject.HeadResponse{ + Body: &protoobject.HeadResponse_Body{ + Head: &protoobject.HeadResponse_Body_SplitInfo{ + SplitInfo: splitErr.SplitInfo().ToV2().ToGRPCMessage().(*protoobject.SplitInfo), + }, + }, + }) + } return s.signHeadResponse(err, startTime, &protoobject.HeadResponse{ MetaHeader: s.makeResponseMetaHeader(util.ToStatus(err)), }) } -// Head converts gRPC HeadRequest message and passes it to internal Object service. func (s *server) Head(ctx context.Context, req *protoobject.HeadRequest) (*protoobject.HeadResponse, error) { searchReq := new(v2object.HeadRequest) if err := searchReq.FromGRPCMessage(req); err != nil { @@ -572,16 +569,237 @@ func (s *server) Head(ctx context.Context, req *protoobject.HeadRequest) (*proto return s.makeStatusHeadResponse(startTime, eACLErr(reqInfo, err)) } - resp, err := s.srv.Head(ctx, searchReq) + var resp protoobject.HeadResponse + p, err := convertHeadPrm(s.signer, req, &resp) + if err != nil { + return s.makeStatusHeadResponse(startTime, err) + } + err = s.srv.Head(ctx, p) if err != nil { return s.makeStatusHeadResponse(startTime, err) } - if err := s.aclChecker.CheckEACL(resp.ToGRPCMessage().(*protoobject.HeadResponse), reqInfo); err != nil { + if err := s.aclChecker.CheckEACL(&resp, reqInfo); err != nil { return s.makeStatusHeadResponse(startTime, eACLErr(reqInfo, err)) } - return s.signHeadResponse(nil, startTime, resp.ToGRPCMessage().(*protoobject.HeadResponse)) + return s.signHeadResponse(nil, startTime, &resp) +} + +type headResponse struct { + short bool + dst *protoobject.HeadResponse +} + +func (x *headResponse) WriteHeader(hdr *object.Object) error { + mo := hdr.ToV2().ToGRPCMessage().(*protoobject.Object) + if x.short { + mh := mo.GetHeader() + x.dst.Body = &protoobject.HeadResponse_Body{ + Head: &protoobject.HeadResponse_Body_ShortHeader{ + ShortHeader: &protoobject.ShortHeader{ + Version: mh.GetVersion(), + CreationEpoch: mh.GetCreationEpoch(), + OwnerId: mh.GetOwnerId(), + ObjectType: mh.GetObjectType(), + PayloadLength: mh.GetPayloadLength(), + PayloadHash: mh.GetPayloadHash(), + HomomorphicHash: mh.GetHomomorphicHash(), + }, + }, + } + return nil + } + x.dst.Body = &protoobject.HeadResponse_Body{ + Head: &protoobject.HeadResponse_Body_Header{ + Header: &protoobject.HeaderWithSignature{ + Header: mo.GetHeader(), + Signature: mo.GetSignature(), + }, + }, + } + return nil +} + +// converts original request into parameters accepted by the internal handler. +// Note that the response is untouched within this call. +func convertHeadPrm(signer ecdsa.PrivateKey, req *protoobject.HeadRequest, resp *protoobject.HeadResponse) (getsvc.HeadPrm, error) { + body := req.GetBody() + ma := body.GetAddress() + if ma == nil { // includes nil body + return getsvc.HeadPrm{}, errors.New("missing object address") + } + + var addr oid.Address + var addr2 refsv2.Address + if err := addr2.FromGRPCMessage(ma); err != nil { + panic(err) + } + if err := addr.ReadFromV2(addr2); err != nil { + return getsvc.HeadPrm{}, fmt.Errorf("invalid object address: %w", err) + } + + cp, err := objutil.CommonPrmFromRequest(req) + if err != nil { + return getsvc.HeadPrm{}, err + } + + var p getsvc.HeadPrm + p.SetCommonParameters(cp) + p.WithAddress(addr) + p.WithRawFlag(body.Raw) + p.SetHeaderWriter(&headResponse{ + short: body.MainOnly, + dst: resp, + }) + if cp.LocalOnly() { + return p, nil + } + + var onceResign sync.Once + meta := req.GetMetaHeader() + bID := addr.Object().Marshal() + p.SetRequestForwarder(func(ctx context.Context, node client.NodeInfo, c client.MultiAddressClient) (*object.Object, error) { + var err error + onceResign.Do(func() { + req.MetaHeader = &protosession.RequestMetaHeader{ + // TODO: #1165 think how to set the other fields + Ttl: meta.GetTtl() - 1, // FIXME: meta can be nil + Origin: meta, + } + var req2 v2object.HeadRequest + if err := req2.FromGRPCMessage(req); err != nil { + panic(err) + } + if err = signature.SignServiceMessage(&signer, &req2); err == nil { + req = req2.ToGRPCMessage().(*protoobject.HeadRequest) + } + }) + if err != nil { + return nil, err + } + + var firstErr error + nodePub := node.PublicKey() + addrs := node.AddressGroup() + for i := range addrs { + hdr, err := getHeaderFromRemoteNode(ctx, c, addrs[i], nodePub, req, bID) + if err == nil { + hdr.SetID(addr.Object()) + return hdr, nil + } + if firstErr == nil { + firstErr = err + } + // TODO: log error + } + return nil, firstErr + }) + return p, nil +} + +func getHeaderFromRemoteNode(ctx context.Context, c client.MultiAddressClient, addr network.Address, nodePub []byte, req *protoobject.HeadRequest, + bID []byte) (*object.Object, error) { + var resp *protoobject.HeadResponse + err := c.RawForAddress(addr, func(conn *grpc.ClientConn) error { + var err error + resp, err = protoobject.NewObjectServiceClient(conn).Head(ctx, req) + return err + }) + if err != nil { + return nil, fmt.Errorf("sending the request failed: %w", err) + } + + if err := internal.VerifyResponseKeyV2(nodePub, resp); err != nil { + return nil, err + } + resp2 := new(v2object.HeadResponse) + if err := resp2.FromGRPCMessage(resp); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + if err := signature.VerifyServiceMessage(resp2); err != nil { + return nil, fmt.Errorf("response verification failed: %w", err) + } + if err := checkStatus(resp.GetMetaHeader().GetStatus()); err != nil { + return nil, err + } + + var hdr *protoobject.Header + var idSig *refs.Signature + switch v := resp.GetBody().GetHead().(type) { + case nil: + return nil, fmt.Errorf("unexpected header type %T", v) + case *protoobject.HeadResponse_Body_ShortHeader: + if !req.Body.MainOnly { + return nil, fmt.Errorf("wrong header part type: expected %T, received %T", + (*protoobject.HeadResponse_Body_Header)(nil), (*protoobject.HeadResponse_Body_ShortHeader)(nil), + ) + } + if v == nil || v.ShortHeader == nil { + return nil, errors.New("nil short header oneof field") + } + h := v.ShortHeader + hdr = &protoobject.Header{ + Version: h.Version, + OwnerId: h.OwnerId, + CreationEpoch: h.CreationEpoch, + PayloadLength: h.PayloadLength, + PayloadHash: h.PayloadHash, + ObjectType: h.ObjectType, + HomomorphicHash: h.HomomorphicHash, + } + case *protoobject.HeadResponse_Body_Header: + if req.Body.MainOnly { + return nil, fmt.Errorf("wrong header part type: expected %T, received %T", + (*protoobject.HeadResponse_Body_Header)(nil), (*protoobject.HeadResponse_Body_ShortHeader)(nil), + ) + } + if v == nil || v.Header == nil { + return nil, errors.New("nil header oneof field") + } + if v.Header.Header == nil { + return nil, errors.New("missing header") + } + if v.Header.Signature == nil { + // TODO(@cthulhu-rider): #1387 use "const" error + return nil, errors.New("missing signature") + } + + var sig2 refsv2.Signature + if err := sig2.FromGRPCMessage(v.Header.Signature); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + var sig neofscrypto.Signature + if err := sig.ReadFromV2(sig2); err != nil { + return nil, fmt.Errorf("can't read signature: %w", err) + } + if !sig.Verify(bID) { + return nil, errors.New("invalid object ID signature") + } + + hdr = v.Header.Header + idSig = v.Header.Signature + case *protoobject.HeadResponse_Body_SplitInfo: + if v == nil || v.SplitInfo == nil { + return nil, errors.New("nil split info oneof field") + } + var si2 v2object.SplitInfo + if err := si2.FromGRPCMessage(v.SplitInfo); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + si := object.NewSplitInfoFromV2(&si2) + return nil, object.NewSplitInfoError(si) + } + + mObj := &protoobject.Object{ + Signature: idSig, + Header: hdr, + } + objv2 := new(v2object.Object) + if err := objv2.FromGRPCMessage(mObj); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + return object.NewFromV2(objv2), nil } func (s *server) signHashResponse(startTime time.Time, err error, resp *protoobject.GetRangeHashResponse) (*protoobject.GetRangeHashResponse, error) { @@ -637,6 +855,17 @@ func (s *server) sendGetResponse(stream protoobject.ObjectService_GetServer, res } func (s *server) sendStatusGetResponse(startTime time.Time, stream protoobject.ObjectService_GetServer, err error) error { + var splitErr *object.SplitInfoError + if errors.As(err, &splitErr) { + s.pushOpExecResult(stat.MethodObjectGet, err, startTime) + return s.sendGetResponse(stream, &protoobject.GetResponse{ + Body: &protoobject.GetResponse_Body{ + ObjectPart: &protoobject.GetResponse_Body_SplitInfo{ + SplitInfo: splitErr.SplitInfo().ToV2().ToGRPCMessage().(*protoobject.SplitInfo), + }, + }, + }) + } sendErr := s.sendGetResponse(stream, &protoobject.GetResponse{ MetaHeader: s.makeResponseMetaHeader(util.ToStatus(err)), }) @@ -644,42 +873,46 @@ func (s *server) sendStatusGetResponse(startTime time.Time, stream protoobject.O return sendErr } -type getStreamerV2 struct { - protoobject.ObjectService_GetServer +type getStream struct { + base protoobject.ObjectService_GetServer srv *server reqInfo aclsvc.RequestInfo } -func (s *getStreamerV2) Send(resp *v2object.GetResponse) error { - r := resp.ToGRPCMessage().(*protoobject.GetResponse) - switch v := r.GetBody().GetObjectPart().(type) { - case *protoobject.GetResponse_Body_Init_: - if err := s.srv.aclChecker.CheckEACL(r, s.reqInfo); err != nil { - return eACLErr(s.reqInfo, err) - } - case *protoobject.GetResponse_Body_Chunk: - for buf := bytes.NewBuffer(v.GetChunk()); buf.Len() > 0; { - newResp := &protoobject.GetResponse{ - Body: &protoobject.GetResponse_Body{ - ObjectPart: &protoobject.GetResponse_Body_Chunk{ - Chunk: buf.Next(maxRespDataChunkSize), - }, +func (s *getStream) WriteHeader(hdr *object.Object) error { + mo := hdr.ToV2().ToGRPCMessage().(*protoobject.Object) + resp := &protoobject.GetResponse{ + Body: &protoobject.GetResponse_Body{ + ObjectPart: &protoobject.GetResponse_Body_Init_{Init: &protoobject.GetResponse_Body_Init{ + ObjectId: mo.ObjectId, + Signature: mo.Signature, + Header: mo.Header, + }}, + }, + } + if err := s.srv.aclChecker.CheckEACL(resp, s.reqInfo); err != nil { + return eACLErr(s.reqInfo, err) + } + return s.srv.sendGetResponse(s.base, resp) +} + +func (s *getStream) WriteChunk(chunk []byte) error { + for buf := bytes.NewBuffer(chunk); buf.Len() > 0; { + newResp := &protoobject.GetResponse{ + Body: &protoobject.GetResponse_Body{ + ObjectPart: &protoobject.GetResponse_Body_Chunk{ + Chunk: buf.Next(maxRespDataChunkSize), }, - MetaHeader: proto.Clone(r.GetMetaHeader()).(*protosession.ResponseMetaHeader), // TODO: can go w/o cloning? - VerifyHeader: proto.Clone(r.GetVerifyHeader()).(*protosession.ResponseVerificationHeader), - } - if err := s.srv.sendGetResponse(s.ObjectService_GetServer, newResp); err != nil { - return err - } + }, + } + if err := s.srv.sendGetResponse(s.base, newResp); err != nil { + return err } - s.srv.metrics.AddGetPayload(len(v.Chunk)) - return nil } - return s.srv.sendGetResponse(s.ObjectService_GetServer, r) + s.srv.metrics.AddGetPayload(len(chunk)) + return nil } -// Get converts gRPC GetRequest message and server-side stream and overtakes its data -// to gRPC stream. func (s *server) Get(req *protoobject.GetRequest, gStream protoobject.ObjectService_GetServer) error { getReq := new(v2object.GetRequest) if err := getReq.FromGRPCMessage(req); err != nil { @@ -706,14 +939,15 @@ func (s *server) Get(req *protoobject.GetRequest, gStream protoobject.ObjectServ return s.sendStatusGetResponse(startTime, gStream, eACLErr(reqInfo, err)) } - err = s.srv.Get( - getReq, - &getStreamerV2{ - ObjectService_GetServer: gStream, - srv: s, - reqInfo: reqInfo, - }, - ) + p, err := convertGetPrm(s.signer, req, &getStream{ + base: gStream, + srv: s, + reqInfo: reqInfo, + }) + if err != nil { + return s.sendStatusGetResponse(startTime, gStream, err) + } + err = s.srv.Get(gStream.Context(), p) if err != nil { return s.sendStatusGetResponse(startTime, gStream, err) } @@ -721,12 +955,195 @@ func (s *server) Get(req *protoobject.GetRequest, gStream protoobject.ObjectServ return nil } +// converts original request into parameters accepted by the internal handler. +// Note that the stream is untouched within this call, errors are not reported +// into it. +func convertGetPrm(signer ecdsa.PrivateKey, req *protoobject.GetRequest, stream *getStream) (getsvc.Prm, error) { + body := req.GetBody() + ma := body.GetAddress() + if ma == nil { // includes nil body + return getsvc.Prm{}, errors.New("missing object address") + } + + var addr oid.Address + var addr2 refsv2.Address + if err := addr2.FromGRPCMessage(ma); err != nil { + panic(err) + } + if err := addr.ReadFromV2(addr2); err != nil { + return getsvc.Prm{}, fmt.Errorf("invalid object address: %w", err) + } + + cp, err := objutil.CommonPrmFromRequest(req) + if err != nil { + return getsvc.Prm{}, err + } + + var p getsvc.Prm + p.SetCommonParameters(cp) + p.WithAddress(addr) + p.WithRawFlag(body.Raw) + p.SetObjectWriter(stream) + if cp.LocalOnly() { + return p, nil + } + + var onceResign sync.Once + var onceHdr sync.Once + var respondedPayload int + meta := req.GetMetaHeader() + p.SetRequestForwarder(func(ctx context.Context, node client.NodeInfo, c client.MultiAddressClient) (*object.Object, error) { + var err error + onceResign.Do(func() { + req.MetaHeader = &protosession.RequestMetaHeader{ + // TODO: #1165 think how to set the other fields + Ttl: meta.GetTtl() - 1, // FIXME: meta can be nil + Origin: meta, + } + var req2 v2object.GetRequest + if err := req2.FromGRPCMessage(req); err != nil { + panic(err) + } + if err = signature.SignServiceMessage(&signer, &req2); err == nil { + req = req2.ToGRPCMessage().(*protoobject.GetRequest) + } + }) + if err != nil { + return nil, err + } + + var firstErr error + nodePub := node.PublicKey() + addrs := node.AddressGroup() + for i := range addrs { + err := continueGetFromRemoteNode(ctx, c, addrs[i], nodePub, req, stream, &onceHdr, &respondedPayload) + if errors.Is(err, io.EOF) { + return nil, nil + } + if firstErr == nil { + firstErr = err + } + // TODO: log error + } + return nil, firstErr + }) + return p, nil +} + +func continueGetFromRemoteNode(ctx context.Context, c client.MultiAddressClient, addr network.Address, nodePub []byte, req *protoobject.GetRequest, + stream *getStream, onceHdr *sync.Once, respondedPayload *int) error { + var getStream protoobject.ObjectService_GetClient + err := c.RawForAddress(addr, func(conn *grpc.ClientConn) error { + var err error + // FIXME: context should be cancelled on return from upper func + getStream, err = protoobject.NewObjectServiceClient(conn).Get(ctx, req) + return err + }) + if err != nil { + return fmt.Errorf("stream opening failed: %w", err) + } + + var headWas bool + var readPayload int + for { + resp, err := getStream.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + if !headWas { + return io.ErrUnexpectedEOF + } + return io.EOF + } + internalclient.ReportError(c, err) + return fmt.Errorf("reading the response failed: %w", err) + } + + if err = internal.VerifyResponseKeyV2(nodePub, resp); err != nil { + return err + } + resp2 := new(v2object.GetResponse) + if err := resp2.FromGRPCMessage(resp); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + if err := signature.VerifyServiceMessage(resp2); err != nil { + return fmt.Errorf("response verification failed: %w", err) + } + if err := checkStatus(resp.GetMetaHeader().GetStatus()); err != nil { + return err + } + + switch v := resp.GetBody().GetObjectPart().(type) { + default: + return fmt.Errorf("unexpected object part %T", v) + case *protoobject.GetResponse_Body_Init_: + if headWas { + return errors.New("incorrect message sequence") + } + headWas = true + if v == nil || v.Init == nil { + return errors.New("nil header oneof field") + } + mo := &protoobject.Object{ + ObjectId: v.Init.ObjectId, + Signature: v.Init.Signature, + Header: v.Init.Header, + } + obj := new(v2object.Object) + if err := obj.FromGRPCMessage(mo); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + onceHdr.Do(func() { + err = stream.WriteHeader(object.NewFromV2(obj)) + }) + if err != nil { + return fmt.Errorf("could not write object header in Get forwarder: %w", err) + } + case *protoobject.GetResponse_Body_Chunk: + if !headWas { + return errors.New("incorrect message sequence") + } + fullChunk := v.GetChunk() + respChunk := chunkToSend(*respondedPayload, readPayload, fullChunk) + if len(respChunk) == 0 { + readPayload += len(fullChunk) + continue + } + if err := stream.WriteChunk(respChunk); err != nil { + return fmt.Errorf("could not write object chunk in Get forwarder: %w", err) + } + readPayload += len(fullChunk) + *respondedPayload += len(respChunk) + case *protoobject.GetResponse_Body_SplitInfo: + if v == nil || v.SplitInfo == nil { + return errors.New("nil split info oneof field") + } + var si2 v2object.SplitInfo + if err := si2.FromGRPCMessage(v.SplitInfo); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + si := object.NewSplitInfoFromV2(&si2) + return object.NewSplitInfoError(si) + } + } +} + func (s *server) sendRangeResponse(stream protoobject.ObjectService_GetRangeServer, resp *protoobject.GetRangeResponse) error { resp = util.SignResponse(&s.signer, resp, v2object.GetRangeResponse{}) return stream.Send(resp) } func (s *server) sendStatusRangeResponse(startTime time.Time, stream protoobject.ObjectService_GetRangeServer, err error) error { + var splitErr *object.SplitInfoError + if errors.As(err, &splitErr) { + s.pushOpExecResult(stat.MethodObjectRange, err, startTime) + return s.sendRangeResponse(stream, &protoobject.GetRangeResponse{ + Body: &protoobject.GetRangeResponse_Body{ + RangePart: &protoobject.GetRangeResponse_Body_SplitInfo{ + SplitInfo: splitErr.SplitInfo().ToV2().ToGRPCMessage().(*protoobject.SplitInfo), + }, + }, + }) + } sendErr := s.sendRangeResponse(stream, &protoobject.GetRangeResponse{ MetaHeader: s.makeResponseMetaHeader(util.ToStatus(err)), }) @@ -734,45 +1151,33 @@ func (s *server) sendStatusRangeResponse(startTime time.Time, stream protoobject return sendErr } -type getRangeStreamerV2 struct { - protoobject.ObjectService_GetRangeServer +type rangeStream struct { + base protoobject.ObjectService_GetRangeServer srv *server reqInfo aclsvc.RequestInfo } -func (s *getRangeStreamerV2) Send(resp *v2object.GetRangeResponse) error { - r := resp.ToGRPCMessage().(*protoobject.GetRangeResponse) - v, ok := r.GetBody().GetRangePart().(*protoobject.GetRangeResponse_Body_Chunk) - if !ok { - if err := s.srv.aclChecker.CheckEACL(r, s.reqInfo); err != nil { - return eACLErr(s.reqInfo, err) - } - return s.srv.sendRangeResponse(s.ObjectService_GetRangeServer, r) - } - for buf := bytes.NewBuffer(v.GetChunk()); buf.Len() > 0; { +func (s *rangeStream) WriteChunk(chunk []byte) error { + for buf := bytes.NewBuffer(chunk); buf.Len() > 0; { newResp := &protoobject.GetRangeResponse{ Body: &protoobject.GetRangeResponse_Body{ RangePart: &protoobject.GetRangeResponse_Body_Chunk{ Chunk: buf.Next(maxRespDataChunkSize), }, }, - MetaHeader: proto.Clone(r.GetMetaHeader()).(*protosession.ResponseMetaHeader), // TODO: can go w/o cloning? - VerifyHeader: proto.Clone(r.GetVerifyHeader()).(*protosession.ResponseVerificationHeader), } // TODO: do not check response multiple times // TODO: why check it at all? if err := s.srv.aclChecker.CheckEACL(newResp, s.reqInfo); err != nil { return eACLErr(s.reqInfo, err) } - if err := s.srv.sendRangeResponse(s.ObjectService_GetRangeServer, newResp); err != nil { + if err := s.srv.sendRangeResponse(s.base, newResp); err != nil { return err } } return nil } -// GetRange converts gRPC GetRangeRequest message and server-side stream and overtakes its data -// to gRPC stream. func (s *server) GetRange(req *protoobject.GetRangeRequest, gStream protoobject.ObjectService_GetRangeServer) error { getRngReq := new(v2object.GetRangeRequest) if err := getRngReq.FromGRPCMessage(req); err != nil { @@ -799,14 +1204,15 @@ func (s *server) GetRange(req *protoobject.GetRangeRequest, gStream protoobject. return s.sendStatusRangeResponse(startTime, gStream, eACLErr(reqInfo, err)) } - err = s.srv.GetRange( - getRngReq, - &getRangeStreamerV2{ - ObjectService_GetRangeServer: gStream, - srv: s, - reqInfo: reqInfo, - }, - ) + p, err := convertRangePrm(s.signer, req, &rangeStream{ + base: gStream, + srv: s, + reqInfo: reqInfo, + }) + if err != nil { + return s.sendStatusRangeResponse(startTime, gStream, err) + } + err = s.srv.GetRange(gStream.Context(), p) if err != nil { return s.sendStatusRangeResponse(startTime, gStream, err) } @@ -814,6 +1220,159 @@ func (s *server) GetRange(req *protoobject.GetRangeRequest, gStream protoobject. return nil } +// converts original request into parameters accepted by the internal handler. +// Note that the stream is untouched within this call, errors are not reported +// into it. +func convertRangePrm(signer ecdsa.PrivateKey, req *protoobject.GetRangeRequest, stream *rangeStream) (getsvc.RangePrm, error) { + body := req.GetBody() + ma := body.GetAddress() + if ma == nil { // includes nil body + return getsvc.RangePrm{}, errors.New("missing object address") + } + + var addr oid.Address + var addr2 refsv2.Address + if err := addr2.FromGRPCMessage(ma); err != nil { + panic(err) + } + if err := addr.ReadFromV2(addr2); err != nil { + return getsvc.RangePrm{}, fmt.Errorf("invalid object address: %w", err) + } + + rln := body.Range.GetLength() + if rln == 0 { // includes nil range + return getsvc.RangePrm{}, errors.New("zero range length") + } + if body.Range.Offset+rln <= body.Range.Offset { + return getsvc.RangePrm{}, errors.New("range overflow") + } + + cp, err := objutil.CommonPrmFromRequest(req) + if err != nil { + return getsvc.RangePrm{}, err + } + + var p getsvc.RangePrm + p.SetCommonParameters(cp) + p.WithAddress(addr) + p.WithRawFlag(body.Raw) + p.SetChunkWriter(stream) + var rng object.Range + rng.SetOffset(body.Range.Offset) + rng.SetLength(rln) + p.SetRange(&rng) + if cp.LocalOnly() { + return p, nil + } + + var onceResign sync.Once + var respondedPayload int + meta := req.GetMetaHeader() + p.SetRequestForwarder(func(ctx context.Context, node client.NodeInfo, c client.MultiAddressClient) (*object.Object, error) { + var err error + onceResign.Do(func() { + req.MetaHeader = &protosession.RequestMetaHeader{ + // TODO: #1165 think how to set the other fields + Ttl: meta.GetTtl() - 1, // FIXME: meta can be nil + Origin: meta, + } + var req2 v2object.GetRangeRequest + if err := req2.FromGRPCMessage(req); err != nil { + panic(err) + } + if err = signature.SignServiceMessage(&signer, &req2); err == nil { + req = req2.ToGRPCMessage().(*protoobject.GetRangeRequest) + } + }) + if err != nil { + return nil, err + } + + var firstErr error + nodePub := node.PublicKey() + addrs := node.AddressGroup() + for i := range addrs { + err := continueRangeFromRemoteNode(ctx, c, addrs[i], nodePub, req, stream, &respondedPayload) + if errors.Is(err, io.EOF) { + return nil, nil + } + if firstErr == nil { + firstErr = err + } + // TODO: log error + } + return nil, firstErr + }) + return p, nil +} + +func continueRangeFromRemoteNode(ctx context.Context, c client.MultiAddressClient, addr network.Address, nodePub []byte, req *protoobject.GetRangeRequest, + stream *rangeStream, respondedPayload *int) error { + var rangeStream protoobject.ObjectService_GetRangeClient + err := c.RawForAddress(addr, func(conn *grpc.ClientConn) error { + var err error + // FIXME: context should be cancelled on return from upper func + rangeStream, err = protoobject.NewObjectServiceClient(conn).GetRange(ctx, req) + return err + }) + if err != nil { + return fmt.Errorf("stream opening failed: %w", err) + } + + var readPayload int + for { + resp, err := rangeStream.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + return io.EOF + } + internalclient.ReportError(c, err) + return fmt.Errorf("reading the response failed: %w", err) + } + + if err = internal.VerifyResponseKeyV2(nodePub, resp); err != nil { + return err + } + resp2 := new(v2object.GetRangeResponse) + if err := resp2.FromGRPCMessage(resp); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + if err := signature.VerifyServiceMessage(resp2); err != nil { + return fmt.Errorf("response verification failed: %w", err) + } + if err := checkStatus(resp.GetMetaHeader().GetStatus()); err != nil { + return err + } + + switch v := resp.GetBody().GetRangePart().(type) { + default: + return fmt.Errorf("unexpected range type %T", v) + case *protoobject.GetRangeResponse_Body_Chunk: + fullChunk := v.GetChunk() + respChunk := chunkToSend(*respondedPayload, readPayload, fullChunk) + if len(respChunk) == 0 { + readPayload += len(fullChunk) + continue + } + if err := stream.WriteChunk(respChunk); err != nil { + return fmt.Errorf("could not write object chunk in Get forwarder: %w", err) + } + readPayload += len(fullChunk) + *respondedPayload += len(respChunk) + case *protoobject.GetRangeResponse_Body_SplitInfo: + if v == nil || v.SplitInfo == nil { + return errors.New("nil split info oneof field") + } + var si2 v2object.SplitInfo + if err := si2.FromGRPCMessage(v.SplitInfo); err != nil { + panic(err) // can only fail on wrong type, here it's correct + } + si := object.NewSplitInfoFromV2(&si2) + return object.NewSplitInfoError(si) + } + } +} + func (s *server) sendSearchResponse(stream protoobject.ObjectService_SearchServer, resp *protoobject.SearchResponse) error { resp = util.SignResponse(&s.signer, resp, v2object.SearchResponse{}) return stream.Send(resp) @@ -897,7 +1456,7 @@ func (s *server) Search(req *protoobject.SearchRequest, gStream protoobject.Obje reqInfo: reqInfo, }) if err != nil { - return s.sendStatusSearchResponse(startTime, err, gStream) + return s.sendStatusSearchResponse(startTime, gStream, err) } err = s.srv.Search(gStream.Context(), p) if err != nil { @@ -1302,3 +1861,14 @@ func checkStatus(st *protostatus.Status) error { return nil } + +func chunkToSend(global, local int, chunk []byte) []byte { + if global == local { + return chunk + } + if local+len(chunk) <= global { + // chunk has already been sent + return nil + } + return chunk[global-local:] +} diff --git a/pkg/services/object/server_test.go b/pkg/services/object/server_test.go index 761e688d75..80f7a56faa 100644 --- a/pkg/services/object/server_test.go +++ b/pkg/services/object/server_test.go @@ -17,9 +17,9 @@ import ( refs "github.com/nspcc-dev/neofs-api-go/v2/refs/grpc" objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" . "github.com/nspcc-dev/neofs-node/pkg/services/object" - objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" v2 "github.com/nspcc-dev/neofs-node/pkg/services/object/acl/v2" deletesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/delete" + getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -44,7 +44,7 @@ func randECDSAPrivateKey(tb testing.TB) *ecdsa.PrivateKey { type noCallObjectService struct{} -func (x noCallObjectService) Get(*objectV2.GetRequest, objectSvc.GetObjectStream) error { +func (x noCallObjectService) Get(context.Context, getsvc.Prm) error { panic("must not be called") } @@ -52,7 +52,7 @@ func (x noCallObjectService) Put(context.Context) (*putsvc.Streamer, error) { panic("must not be called") } -func (x noCallObjectService) Head(context.Context, *objectV2.HeadRequest) (*objectV2.HeadResponse, error) { +func (x noCallObjectService) Head(context.Context, getsvc.HeadPrm) error { panic("must not be called") } @@ -64,7 +64,7 @@ func (x noCallObjectService) Delete(context.Context, deletesvc.Prm) error { panic("must not be called") } -func (x noCallObjectService) GetRange(*objectV2.GetRangeRequest, objectSvc.GetObjectRangeStream) error { +func (x noCallObjectService) GetRange(context.Context, getsvc.RangePrm) error { panic("must not be called") } diff --git a/pkg/services/util/server.go b/pkg/services/util/server.go deleted file mode 100644 index 83ab323f71..0000000000 --- a/pkg/services/util/server.go +++ /dev/null @@ -1,10 +0,0 @@ -package util - -import ( - "context" -) - -// ServerStream is an interface of server-side stream v2. -type ServerStream interface { - Context() context.Context -}