Skip to content

Commit

Permalink
feat: add wss support (#36)
Browse files Browse the repository at this point in the history
* feat: add wss support

* feat: add no-op database client

* improve: address logging

* add: wss port range configuration

* remove: separate wss port

* fix: provide host to cert manager

* Revert "remove: separate wss port"

This reverts commit 0737dae.

* remove: share tcp listener

* fix: use temporary directory to store certificates

* improve logging

* tmp: only spawn a single ant

* remove: dedicated tcp websocket port

* add: ant subcommand

* update: p2p-forge

* set log level

* disable resource manager

* disable connection manager

* tweak the connection gater

* fix: don't add nil slices to recource manager

* don't start cert manager

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP
  • Loading branch information
dennis-tra authored Jan 9, 2025
1 parent acba720 commit 19ba888
Show file tree
Hide file tree
Showing 8 changed files with 361 additions and 193 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ log.txt
*.dat
*.mmdb

db/migrations/local
db/migrations/local
p2p-forge-certs
*.db
153 changes: 129 additions & 24 deletions ant.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,25 @@ import (
"context"
"fmt"

"github.com/caddyserver/certmagic"
ds "github.com/ipfs/go-datastore"
p2pforge "github.com/ipshipyard/p2p-forge/client"
"github.com/libp2p/go-libp2p"
kad "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/ants"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"

rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
libp2ptcp "github.com/libp2p/go-libp2p/p2p/transport/tcp"
libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc"
libp2pws "github.com/libp2p/go-libp2p/p2p/transport/websocket"
libp2pwebtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport"
"github.com/probe-lab/go-libdht/kad/key/bit256"
)

Expand All @@ -29,6 +38,7 @@ type AntConfig struct {
ProtocolPrefix string
BootstrapPeers []peer.AddrInfo
EventsChan chan ants.RequestEvent
CertPath string
}

func (cfg *AntConfig) Validate() error {
Expand Down Expand Up @@ -56,10 +66,13 @@ func (cfg *AntConfig) Validate() error {
}

type Ant struct {
cfg *AntConfig
host host.Host
dht *kad.IpfsDHT
kadID bit256.Key
cfg *AntConfig
host host.Host
dht *kad.IpfsDHT
certMgr *p2pforge.P2PForgeCertMgr
kadID bit256.Key
certLoadedChan chan struct{}
sub event.Subscription
}

func SpawnAnt(ctx context.Context, ps peerstore.Peerstore, ds ds.Batching, cfg *AntConfig) (*Ant, error) {
Expand All @@ -69,17 +82,43 @@ func SpawnAnt(ctx context.Context, ps peerstore.Peerstore, ds ds.Batching, cfg *
return nil, fmt.Errorf("invalid config: %w", err)
}

portStr := fmt.Sprint(cfg.Port)
certLoadedChan := make(chan struct{})
forgeDomain := p2pforge.DefaultForgeDomain
certMgr, err := p2pforge.NewP2PForgeCertMgr(
p2pforge.WithForgeDomain(forgeDomain),
p2pforge.WithOnCertLoaded(func() {
certLoadedChan <- struct{}{}
}),
p2pforge.WithLogger(logger.Desugar().Sugar()),
p2pforge.WithCertificateStorage(&certmagic.FileStorage{Path: cfg.CertPath}),
)
if err != nil {
return nil, fmt.Errorf("new p2pforge cert manager: %w", err)
}

// Configure the resource manager to not limit anything
noSubnetLimit := []rcmgr.ConnLimitPerSubnet{}
noNetPrefixLimit := []rcmgr.NetworkPrefixLimit{}
limiter := rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits)
rm, err := rcmgr.NewResourceManager(limiter,
rcmgr.WithLimitPerSubnet(noSubnetLimit, noSubnetLimit),
rcmgr.WithNetworkPrefixLimit(noNetPrefixLimit, noNetPrefixLimit),
)
if err != nil {
return nil, fmt.Errorf("new resource manager: %w", err)
}

// taken from github.com/celestiaorg/celestia-node/nodebuilder/p2p/config.go
// ports are assigned automatically
listenAddrs := []string{
"/ip4/0.0.0.0/udp/" + portStr + "/quic-v1/webtransport",
"/ip6/::/udp/" + portStr + "/quic-v1/webtransport",
"/ip4/0.0.0.0/udp/" + portStr + "/quic-v1",
"/ip6/::/udp/" + portStr + "/quic-v1",
"/ip4/0.0.0.0/tcp/" + portStr,
"/ip6/::/tcp/" + portStr,
fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", cfg.Port),
fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic-v1", cfg.Port),
fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic-v1/webtransport", cfg.Port),
fmt.Sprintf("/ip4/0.0.0.0/udp/%d/webrtc-direct", cfg.Port),
fmt.Sprintf("/ip4/0.0.0.0/tcp/%d/tls/sni/*.%s/ws", cfg.Port, forgeDomain), // cert manager websocket multi address
fmt.Sprintf("/ip6/::/tcp/%d", cfg.Port),
fmt.Sprintf("/ip6/::/udp/%d/quic-v1", cfg.Port),
fmt.Sprintf("/ip6/::/udp/%d/quic-v1/webtransport", cfg.Port),
fmt.Sprintf("/ip6/::/udp/%d/webrtc-direct", cfg.Port),
fmt.Sprintf("/ip6/::/tcp/%d/tls/sni/*.%s/ws", cfg.Port, forgeDomain), // cert manager websocket multi address
}

opts := []libp2p.Option{
Expand All @@ -89,6 +128,15 @@ func SpawnAnt(ctx context.Context, ps peerstore.Peerstore, ds ds.Batching, cfg *
libp2p.DisableRelay(),
libp2p.ListenAddrStrings(listenAddrs...),
libp2p.DisableMetrics(),
libp2p.ShareTCPListener(),
libp2p.ResourceManager(rm),
libp2p.ConnectionManager(connmgr.NullConnMgr{}),
libp2p.Transport(libp2ptcp.NewTCPTransport),
libp2p.Transport(libp2pquic.NewTransport),
libp2p.Transport(libp2pwebtransport.New),
libp2p.Transport(libp2pwebrtc.New),
libp2p.Transport(libp2pws.New, libp2pws.WithTLSConfig(certMgr.TLSConfig())),
libp2p.AddrsFactory(certMgr.AddressFactory()),
}

if cfg.Port == 0 {
Expand All @@ -111,25 +159,82 @@ func SpawnAnt(ctx context.Context, ps peerstore.Peerstore, ds ds.Batching, cfg *
if err != nil {
return nil, fmt.Errorf("new libp2p dht: %w", err)
}

logger.Debugf("spawned ant. kadid: %s, peerid: %s", PeerIDToKadID(h.ID()).HexString(), h.ID())

ant := &Ant{
cfg: cfg,
host: h,
dht: dht,
kadID: PeerIDToKadID(h.ID()),
if err = dht.Bootstrap(ctx); err != nil {
logger.Warn("bootstrap failed: %s", err)
}

certMgr.ProvideHost(h)

if err = certMgr.Start(); err != nil {
return nil, fmt.Errorf("start cert manager: %w", err)
}

go dht.Bootstrap(ctx)
go func() {
for range certLoadedChan {
logger.Infow("Loaded certificate", "ant", h.ID())
}
logger.Debug("certificate loaded channel closed")
}()

sub, err := h.EventBus().Subscribe([]interface{}{new(event.EvtLocalAddressesUpdated), new(event.EvtLocalReachabilityChanged)})
if err != nil {
return nil, fmt.Errorf("subscribe to event bus: %w", err)
}

go func() {
for out := range sub.Out() {
switch evt := out.(type) {
case event.EvtLocalAddressesUpdated:
if !evt.Diffs {
continue
}

logger.Infow("Ant now listening on:", "ant", h.ID())
for i, maddr := range evt.Current {
actionStr := ""
switch maddr.Action {
case event.Added:
actionStr = "ADD"
case event.Removed:
actionStr = "REMOVE"
case event.Maintained:
actionStr = "MAINTAINED"
default:
continue
}
logger.Infof(" [%d] %s %s/p2p/%s", i, actionStr, maddr.Address, h.ID())
}
case event.EvtLocalReachabilityChanged:
logger.Infow("Reachability changed", "ant", h.ID(), "reachability", evt.Reachability)
}
}
}()

ant := &Ant{
cfg: cfg,
host: h,
dht: dht,
certMgr: certMgr,
certLoadedChan: certLoadedChan,
sub: sub,
kadID: PeerIDToKadID(h.ID()),
}

return ant, nil
}

func (a *Ant) Close() error {
err := a.dht.Close()
if err != nil {
return err
if err := a.sub.Close(); err != nil {
logger.Warnf("failed to close address update subscription: %s", err)
}

a.certMgr.Stop()
close(a.certLoadedChan)

if err := a.dht.Close(); err != nil {
logger.Warnf("failed to close dht: %s", err)
}
return a.host.Close()
}
39 changes: 28 additions & 11 deletions cmd/ants/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var queenConfig = struct {
ClickhouseSSL bool
NebulaDBConnString string
KeyDBPath string
CertsPath string
NumPorts int
FirstPort int
UPnp bool
Expand All @@ -50,6 +51,7 @@ var queenConfig = struct {
ClickhouseSSL: true,
NebulaDBConnString: "",
KeyDBPath: "keys.db",
CertsPath: "p2p-forge-certs",
NumPorts: 128,
FirstPort: 6000,
UPnp: false,
Expand Down Expand Up @@ -166,6 +168,13 @@ func main() {
Destination: &queenConfig.KeyDBPath,
Value: queenConfig.KeyDBPath,
},
&cli.PathFlag{
Name: "certs.path",
Usage: "The path where we store the TLC certificates",
EnvVars: []string{"ANTS_CERTS_PATH"},
Destination: &queenConfig.CertsPath,
Value: queenConfig.CertsPath,
},
&cli.IntFlag{
Name: "first_port",
Usage: "First port ants can listen on",
Expand Down Expand Up @@ -266,17 +275,24 @@ func runQueenCommand(c *cli.Context) error {
logger.Debugln("Starting metrics server", "host", queenConfig.MetricsHost, "port", queenConfig.MetricsPort)
go metrics.ListenAndServe(queenConfig.MetricsHost, queenConfig.MetricsPort)

// initializing a new clickhouse client
client, err := db.NewClient(
queenConfig.ClickhouseAddress,
queenConfig.ClickhouseDatabase,
queenConfig.ClickhouseUsername,
queenConfig.ClickhousePassword,
queenConfig.ClickhouseSSL,
telemetry,
)
if err != nil {
return fmt.Errorf("init database client: %w", err)
var client db.Client
if queenConfig.ClickhouseAddress == "" {
logger.Warn("No clickhouse address provided, using no-op client.")
client = db.NewNoopClient()
} else {
// initializing a new clickhouse client
client, err = db.NewClickhouseClient(
queenConfig.ClickhouseAddress,
queenConfig.ClickhouseDatabase,
queenConfig.ClickhouseUsername,
queenConfig.ClickhousePassword,
queenConfig.ClickhouseSSL,
telemetry,
)
if err != nil {
return fmt.Errorf("init database client: %w", err)
}

}

// pinging database to check availability
Expand All @@ -288,6 +304,7 @@ func runQueenCommand(c *cli.Context) error {

queenCfg := &ants.QueenConfig{
KeysDBPath: queenConfig.KeyDBPath,
CertsPath: queenConfig.CertsPath,
NPorts: queenConfig.NumPorts,
FirstPort: queenConfig.FirstPort,
UPnP: queenConfig.UPnp,
Expand Down
18 changes: 17 additions & 1 deletion db/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,30 @@ type Client interface {
BulkInsertRequests(ctx context.Context, requests []*Request) error
}

type NoopClient struct{}

var _ Client = (*NoopClient)(nil)

func NewNoopClient() *NoopClient {
return &NoopClient{}
}

func (n NoopClient) Ping(ctx context.Context) error {
return nil
}

func (n NoopClient) BulkInsertRequests(ctx context.Context, requests []*Request) error {
return nil
}

type ClickhouseClient struct {
driver.Conn
telemetry *metrics.Telemetry
}

var _ Client = (*ClickhouseClient)(nil)

func NewClient(address, database, username, password string, ssl bool, telemetry *metrics.Telemetry) (*ClickhouseClient, error) {
func NewClickhouseClient(address, database, username, password string, ssl bool, telemetry *metrics.Telemetry) (*ClickhouseClient, error) {
logger.Infoln("Creating new clickhouse client...")

conn, err := clickhouse.Open(&clickhouse.Options{
Expand Down
Loading

0 comments on commit 19ba888

Please sign in to comment.