Skip to content

Commit

Permalink
lbcdblocknotify: reorganize the code with a few updates
Browse files Browse the repository at this point in the history
1. Fixed a bug, which reads certs even TLS is disabled

2. Persists Stratum TCP connection with auto-reconnect.
   (retry backoff increases from 1s to 60s maximum)

3. Stratum update jobs on previous notifications are canceled
   when a new notification arrives.

   Usually, the jobs are so short and completed immediately.
   However, if the Stratum connection is broken, this prevents
   the bridge from accumulating stale jobs.
  • Loading branch information
roylee17 committed Oct 17, 2022
1 parent 6728bf4 commit f513fca
Show file tree
Hide file tree
Showing 6 changed files with 364 additions and 119 deletions.
39 changes: 28 additions & 11 deletions rpcclient/examples/lbcdblocknotify/README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
# lbcd Websockets Example
# lbcdbloknotify

This example shows how to use the rpcclient package to connect to a btcd RPC
server using TLS-secured websockets, register for block connected and block
disconnected notifications, and get the current block count.
This bridge program subscribes to lbcd's notifications over websockets using the rpcclient package.
Users can specify supported actions upon receiving this notifications.

## Running the Example
## Building(or Running) the Program

The first step is to clone the lbcd package:
Clone the lbcd package:

```bash
$ git clone github.com/lbryio/lbcd
$ cd lbcd/rpcclient/examples

# build the program
$ go build .

# or directly run it (build implicitly behind the scene)
$ go run .
```

Display available options:
Expand All @@ -30,19 +36,30 @@ $ go run . -h
-stratumpass string
Stratum server password (default "password")
-quiet
Do not print logs
Do not print periodic logs
```

Start the program:
Running the program:

```bash
$ go run . -stratumpass <STRATUM PASSWD> -rpcuser <RPC USERNAME> -rpcpass <RPC PASSWD>
# Send stratum mining.update_block mesage upon receving block connected notifiations.
$ go run . -rpcuser <RPC USERNAME> -rpcpass <RPC PASSWD> --notls -stratum <STRATUM SERVER> -stratumpass <STRATUM PASSWD>

2022/01/10 23:16:21 NotifyBlocks: Registration Complete
2022/01/10 23:16:21 Block count: 1093112
2022/01/10 23:16:21 Current block count: 1093112
...

# Execute a custome command (with blockhash) upon receving block connected notifiations.
$ go run . -rpcuser <RPC USERNAME> -rpcpass <RPC PASSWD> --notls -run "echo %s"
```

## Notes

* Stratum TCP connection is persisted with auto-reconnect. (retry backoff increases from 1s to 60s maximum)

* Stratum update_block jobs on previous notifications are canceled when a new notification arrives.
Usually, the jobs are so short and completed immediately. However, if the Stratum connection is broken, this
prevents the bridge from accumulating stale jobs.

## License

This example is licensed under the [copyfree](http://copyfree.org) ISC License.
20 changes: 20 additions & 0 deletions rpcclient/examples/lbcdblocknotify/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package main

import (
"github.com/lbryio/lbcd/wire"
"github.com/lbryio/lbcutil"
)

type eventBlockConected struct {
height int32
header *wire.BlockHeader
txns []*lbcutil.Tx
}

type adapter struct {
*bridge
}

func (a *adapter) onFilteredBlockConnected(height int32, header *wire.BlockHeader, txns []*lbcutil.Tx) {
a.eventCh <- &eventBlockConected{height, header, txns}
}
172 changes: 172 additions & 0 deletions rpcclient/examples/lbcdblocknotify/bridge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package main

import (
"context"
"errors"
"fmt"
"log"
"net"
"os"
"os/exec"
"strings"
"sync"
"syscall"
"time"
)

type bridge struct {
ctx context.Context

prevJobContext context.Context
prevJobCancel context.CancelFunc

eventCh chan interface{}
errorc chan error
wg sync.WaitGroup

stratum *stratumClient

customCmd string
}

func newBridge(stratumServer, stratumPass, coinid string) *bridge {

s := &bridge{
ctx: context.Background(),
eventCh: make(chan interface{}),
errorc: make(chan error),
}

if len(stratumServer) > 0 {
s.stratum = newStratumClient(stratumServer, stratumPass, coinid)
}

return s
}

func (b *bridge) start() {

if b.stratum != nil {
backoff := time.Second
for {
err := b.stratum.dial()
if err == nil {
break
}
log.Printf("WARN: stratum.dial() error: %s, retry in %s", err, backoff)
time.Sleep(backoff)
if backoff < 60*time.Second {
backoff += time.Second
}
}
}

for e := range b.eventCh {
switch e := e.(type) {
case *eventBlockConected:
b.handleFilteredBlockConnected(e)
default:
b.errorc <- fmt.Errorf("unknown event type: %T", e)
return
}
}
}

func (b *bridge) handleFilteredBlockConnected(e *eventBlockConected) {

if !*quiet {
log.Printf("Block connected: %s (%d) %v", e.header.BlockHash(), e.height, e.header.Timestamp)
}

hash := e.header.BlockHash().String()
height := e.height

// Cancel jobs on previous block. It's safe if they are already done.
if b.prevJobContext != nil {
select {
case <-b.prevJobContext.Done():
log.Printf("prev one canceled")
default:
b.prevJobCancel()
}
}

// Wait until all previous jobs are done or canceled.
b.wg.Wait()

// Create and save cancelable subcontext for new jobs.
ctx, cancel := context.WithCancel(b.ctx)
b.prevJobContext, b.prevJobCancel = ctx, cancel

if len(b.customCmd) > 0 {
go b.execCustomCommand(ctx, hash, height)
}

// Send stratum update block message
if b.stratum != nil {
go b.stratumUpdateBlock(ctx, hash, height)
}
}

func (s *bridge) stratumUpdateBlock(ctx context.Context, hash string, height int32) {
s.wg.Add(1)
defer s.wg.Done()

backoff := time.Second
retry := func(err error) {
if backoff < 60*time.Second {
backoff += time.Second
}
log.Printf("WARN: stratum.send() on block %d error: %s", height, err)
time.Sleep(backoff)
s.stratum.dial()
}

msg := stratumUpdateBlockMsg(*stratumPass, *coinid, hash)

for {
switch err := s.stratum.send(ctx, msg); {
case err == nil:
return
case errors.Is(err, context.Canceled):
log.Printf("INFO: stratum.send() on block %d: %s.", height, err)
return
case errors.Is(err, syscall.EPIPE):
errClose := s.stratum.conn.Close()
if errClose != nil {
log.Printf("WARN: stratum.conn.Close() on block %d: %s.", height, errClose)
}
retry(err)
case errors.Is(err, net.ErrClosed):
retry(err)
default:
retry(err)
}
}

}

func (s *bridge) execCustomCommand(ctx context.Context, hash string, height int32) {
s.wg.Add(1)
defer s.wg.Done()

cmd := strings.ReplaceAll(s.customCmd, "%s", hash)
err := doExecCustomCommand(ctx, cmd)
if err != nil {
log.Printf("ERROR: execCustomCommand on block %s(%d): %s", hash, height, err)
}
}

func doExecCustomCommand(ctx context.Context, cmd string) error {
strs := strings.Split(cmd, " ")
path, err := exec.LookPath(strs[0])
if errors.Is(err, exec.ErrDot) {
err = nil
}
if err != nil {
return err
}
c := exec.CommandContext(ctx, path, strs[1:]...)
c.Stdout = os.Stdout
return c.Run()
}
53 changes: 53 additions & 0 deletions rpcclient/examples/lbcdblocknotify/lbcdclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package main

import (
"io/ioutil"
"log"
"path/filepath"

"github.com/lbryio/lbcd/rpcclient"
)

func newLbcdClient(server, user, pass string, notls bool, adpt adapter) *rpcclient.Client {

ntfnHandlers := rpcclient.NotificationHandlers{
OnFilteredBlockConnected: adpt.onFilteredBlockConnected,
}

// Config lbcd RPC client with websockets.
connCfg := &rpcclient.ConnConfig{
Host: server,
Endpoint: "ws",
User: user,
Pass: pass,
DisableTLS: true,
}

if !notls {
cert, err := ioutil.ReadFile(filepath.Join(lbcdHomeDir, "rpc.cert"))
if err != nil {
log.Fatalf("can't read lbcd certificate: %s", err)
}
connCfg.Certificates = cert
connCfg.DisableTLS = false
}

client, err := rpcclient.New(connCfg, &ntfnHandlers)
if err != nil {
log.Fatalf("can't create rpc client: %s", err)
}

// Register for block connect and disconnect notifications.
if err = client.NotifyBlocks(); err != nil {
log.Fatalf("can't register block notification: %s", err)
}

// Get the current block count.
blockCount, err := client.GetBlockCount()
if err != nil {
log.Fatalf("can't get block count: %s", err)
}
log.Printf("Current block count: %d", blockCount)

return client
}
Loading

0 comments on commit f513fca

Please sign in to comment.