Skip to content

Commit

Permalink
Merge pull request #649 from onflow/gregor/subscribe-opts
Browse files Browse the repository at this point in the history
  • Loading branch information
turbolent authored May 15, 2024
2 parents 3f1aac8 + 40ed635 commit 2e8cb25
Show file tree
Hide file tree
Showing 6 changed files with 849 additions and 36 deletions.
18 changes: 16 additions & 2 deletions access/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"github.com/onflow/flow-go-sdk"
)

//go:generate go run github.com/vektra/mockery/cmd/mockery --name Client --structname Client --output mocks

type Client interface {
// Ping is used to check if the access node is alive and healthy.
Ping(ctx context.Context) error
Expand Down Expand Up @@ -116,11 +118,23 @@ type Client interface {
SubscribeExecutionDataByBlockHeight(ctx context.Context, startHeight uint64) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error)

// SubscribeEventsByBlockID subscribes to events starting at the given block ID.
SubscribeEventsByBlockID(ctx context.Context, startBlockID flow.Identifier, filter flow.EventFilter) (<-chan flow.BlockEvents, <-chan error, error)
SubscribeEventsByBlockID(ctx context.Context, startBlockID flow.Identifier, filter flow.EventFilter, opts ...SubscribeOption) (<-chan flow.BlockEvents, <-chan error, error)

// SubscribeEventsByBlockHeight subscribes to events starting at the given block height.
SubscribeEventsByBlockHeight(ctx context.Context, startHeight uint64, filter flow.EventFilter) (<-chan flow.BlockEvents, <-chan error, error)
SubscribeEventsByBlockHeight(ctx context.Context, startHeight uint64, filter flow.EventFilter, opts ...SubscribeOption) (<-chan flow.BlockEvents, <-chan error, error)

// Close stops the client connection to the access node.
Close() error
}

type SubscribeOption func(*SubscribeConfig)

type SubscribeConfig struct {
HeartbeatInterval uint64
}

func WithHeartbeatInterval(interval uint64) SubscribeOption {
return func(config *SubscribeConfig) {
config.HeartbeatInterval = interval
}
}
48 changes: 41 additions & 7 deletions access/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ package grpc
import (
"context"

"github.com/onflow/flow-go-sdk/access"

jsoncdc "github.com/onflow/cadence/encoding/json"
"google.golang.org/grpc"

Expand Down Expand Up @@ -94,9 +96,11 @@ func NewClient(host string, opts ...ClientOption) (*Client, error) {
return &Client{grpc: client}, nil
}

var _ access.Client = &Client{}

// Client implements all common gRPC methods providing a network agnostic API.
type Client struct {
grpc *BaseClient
grpc *BaseClient
}

func (c *Client) Ping(ctx context.Context) error {
Expand Down Expand Up @@ -210,22 +214,52 @@ func (c *Client) GetExecutionDataByBlockID(ctx context.Context, blockID flow.Ide
return c.grpc.GetExecutionDataByBlockID(ctx, blockID)
}

func (c *Client) SubscribeExecutionDataByBlockID(ctx context.Context, startBlockID flow.Identifier) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) {
func (c *Client) SubscribeExecutionDataByBlockID(
ctx context.Context,
startBlockID flow.Identifier,
) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) {
return c.grpc.SubscribeExecutionDataByBlockID(ctx, startBlockID)
}

func (c *Client) SubscribeExecutionDataByBlockHeight(ctx context.Context, startHeight uint64) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) {
func (c *Client) SubscribeExecutionDataByBlockHeight(
ctx context.Context,
startHeight uint64,
) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) {
return c.grpc.SubscribeExecutionDataByBlockHeight(ctx, startHeight)
}

func (c *Client) SubscribeEventsByBlockID(ctx context.Context, startBlockID flow.Identifier, filter flow.EventFilter) (<-chan flow.BlockEvents, <-chan error, error) {
return c.grpc.SubscribeEventsByBlockID(ctx, startBlockID, filter)
func (c *Client) SubscribeEventsByBlockID(
ctx context.Context,
startBlockID flow.Identifier,
filter flow.EventFilter,
opts ...access.SubscribeOption,
) (<-chan flow.BlockEvents, <-chan error, error) {
conf := convertSubscribeOptions(opts...)
return c.grpc.SubscribeEventsByBlockID(ctx, startBlockID, filter, WithHeartbeatInterval(conf.heartbeatInterval))
}

func (c *Client) SubscribeEventsByBlockHeight(ctx context.Context, startHeight uint64, filter flow.EventFilter) (<-chan flow.BlockEvents, <-chan error, error) {
return c.grpc.SubscribeEventsByBlockHeight(ctx, startHeight, filter)
func (c *Client) SubscribeEventsByBlockHeight(
ctx context.Context,
startHeight uint64,
filter flow.EventFilter,
opts ...access.SubscribeOption,
) (<-chan flow.BlockEvents, <-chan error, error) {
conf := convertSubscribeOptions(opts...)
return c.grpc.SubscribeEventsByBlockHeight(ctx, startHeight, filter, WithHeartbeatInterval(conf.heartbeatInterval))
}

func (c *Client) Close() error {
return c.grpc.Close()
}

// convertSubscribeOptions creates the default subscribe config and applies all the provided options
func convertSubscribeOptions(opts ...access.SubscribeOption) *SubscribeConfig {
subsConf := DefaultSubscribeConfig()
conf := &access.SubscribeConfig{
HeartbeatInterval: subsConf.heartbeatInterval,
}
for _, opt := range opts {
opt(conf)
}
return subsConf
}
8 changes: 6 additions & 2 deletions access/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"context"
"fmt"

"github.com/onflow/flow-go-sdk/access"

"github.com/onflow/cadence"
jsoncdc "github.com/onflow/cadence/encoding/json"

Expand Down Expand Up @@ -76,6 +78,8 @@ func NewClient(host string, opts ...ClientOption) (*Client, error) {
return &Client{client}, nil
}

var _ access.Client = &Client{}

// Client implements all common HTTP methods providing a network agnostic API.
type Client struct {
httpClient *BaseClient
Expand Down Expand Up @@ -273,11 +277,11 @@ func (c *Client) SubscribeExecutionDataByBlockHeight(ctx context.Context, startH
return nil, nil, fmt.Errorf("not implemented")
}

func (c *Client) SubscribeEventsByBlockID(ctx context.Context, startBlockID flow.Identifier, filter flow.EventFilter) (<-chan flow.BlockEvents, <-chan error, error) {
func (c *Client) SubscribeEventsByBlockID(ctx context.Context, startBlockID flow.Identifier, filter flow.EventFilter, opts ...access.SubscribeOption) (<-chan flow.BlockEvents, <-chan error, error) {
return nil, nil, fmt.Errorf("not implemented")
}

func (c *Client) SubscribeEventsByBlockHeight(ctx context.Context, startHeight uint64, filter flow.EventFilter) (<-chan flow.BlockEvents, <-chan error, error) {
func (c *Client) SubscribeEventsByBlockHeight(ctx context.Context, startHeight uint64, filter flow.EventFilter, opts ...access.SubscribeOption) (<-chan flow.BlockEvents, <-chan error, error) {
return nil, nil, fmt.Errorf("not implemented")
}

Expand Down
Loading

0 comments on commit 2e8cb25

Please sign in to comment.