Skip to content

Commit

Permalink
External Network: ping disable
Browse files Browse the repository at this point in the history
  • Loading branch information
cheina97 committed Oct 27, 2023
1 parent ba2071e commit 012c979
Show file tree
Hide file tree
Showing 12 changed files with 149 additions and 90 deletions.
26 changes: 11 additions & 15 deletions cmd/gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var (
addToSchemeFunctions = []func(*runtime.Scheme) error{
networkingv1alpha1.AddToScheme,
}
options = gateway.NewOptions()
options = connection.NewOptions(gateway.NewOptions())
)

// +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;create;update;delete
Expand All @@ -57,17 +57,13 @@ func main() {
klog.InitFlags(legacyflags)
flagsutils.FromFlagToPflag(legacyflags, cmd.Flags())

gateway.InitFlags(cmd.Flags(), options)
gateway.InitFlags(cmd.Flags(), options.GwOptions)
if err := gateway.MarkFlagsRequired(&cmd); err != nil {
klog.Error(err)
os.Exit(1)
}

connection.InitFlags(cmd.Flags())
if err := connection.MarkFlagsRequired(&cmd); err != nil {
klog.Error(err)
os.Exit(1)
}
connection.InitFlags(cmd.Flags(), options)

if err := cmd.Execute(); err != nil {
klog.Error(err)
Expand Down Expand Up @@ -97,20 +93,20 @@ func run(_ *cobra.Command, _ []string) error {
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
MapperProvider: mapper.LiqoMapperProvider(scheme),
Scheme: scheme,
Namespace: options.Namespace,
Namespace: options.GwOptions.Namespace,
MetricsBindAddress: "0", // Metrics are exposed by "connection" container.
HealthProbeBindAddress: options.ProbeAddr,
LeaderElection: options.LeaderElection,
HealthProbeBindAddress: options.GwOptions.ProbeAddr,
LeaderElection: options.GwOptions.LeaderElection,
LeaderElectionID: fmt.Sprintf(
"%s.%s.%s.connections.liqo.io",
options.Name, options.Namespace, options.Mode,
options.GwOptions.Name, options.GwOptions.Namespace, options.GwOptions.Mode,
),
LeaderElectionNamespace: options.Namespace,
LeaderElectionNamespace: options.GwOptions.Namespace,
LeaderElectionReleaseOnCancel: true,
LeaderElectionResourceLock: resourcelock.LeasesResourceLock,
LeaseDuration: &options.LeaderElectionLeaseDuration,
RenewDeadline: &options.LeaderElectionRenewDeadline,
RetryPeriod: &options.LeaderElectionRetryPeriod,
LeaseDuration: &options.GwOptions.LeaderElectionLeaseDuration,
RenewDeadline: &options.GwOptions.LeaderElectionRenewDeadline,
RetryPeriod: &options.GwOptions.LeaderElectionRetryPeriod,
})
if err != nil {
return fmt.Errorf("unable to create manager: %w", err)
Expand Down
20 changes: 17 additions & 3 deletions cmd/liqonet/gateway-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type gatewayOperatorFlags struct {
tunnelMTU uint
tunnelListeningPort uint
updateStatusInterval time.Duration
PingPort int
PingBufferSize uint
PingLossThreshold uint
PingInterval time.Duration
}

func addGatewayOperatorFlags(liqonet *gatewayOperatorFlags) {
Expand All @@ -68,9 +72,13 @@ func addGatewayOperatorFlags(liqonet *gatewayOperatorFlags) {
"listening-port is the port used by the vpn tunnel")
flag.DurationVar(&liqonet.updateStatusInterval, "gateway.ping-latency-update-interval", 30*time.Second,
"ping-latency-update-interval is the interval at which the gateway operator updates the latency value in the status of the tunnel-endpoint")
flag.UintVar(&conncheck.PingLossThreshold, "gateway.ping-loss-threshold", 5,
flag.IntVar(&liqonet.PingPort, "gateway.ping-port", 12345,
"ping-port is the port used by the vpn tunnel")
flag.UintVar(&liqonet.PingBufferSize, "gateway.ping-buffer-size", 1024,
"ping-buffer-size is the size of the buffer used for the ping check")
flag.UintVar(&liqonet.PingLossThreshold, "gateway.ping-loss-threshold", 5,
"ping-loss-threshold is the number of lost packets after which the connection check is considered as failed.")
flag.DurationVar(&conncheck.PingInterval, "gateway.ping-interval", 2*time.Second,
flag.DurationVar(&liqonet.PingInterval, "gateway.ping-interval", 2*time.Second,
"ping-interval is the interval between two connection checks")
}

Expand Down Expand Up @@ -157,7 +165,13 @@ func runGatewayOperator(commonFlags *liqonetCommonFlags, gatewayFlags *gatewayOp
os.Exit(1)
}
tunnelController, err := tunneloperator.NewTunnelController(ctx, &wg, podIP.String(), podNamespace, eventRecorder,
clientset, main.GetClient(), &readyClustersMutex, readyClusters, gatewayNetns, hostNetns, int(MTU), int(port), updateStatusInterval)
clientset, main.GetClient(), &readyClustersMutex, readyClusters, gatewayNetns, hostNetns, int(MTU), int(port), updateStatusInterval,
&conncheck.Options{
PingPort: gatewayFlags.PingPort,
PingBufferSize: gatewayFlags.PingBufferSize,
PingLossThreshold: gatewayFlags.PingLossThreshold,
PingInterval: gatewayFlags.PingInterval,
})
// If something goes wrong while creating and configuring the tunnel controller
// then make sure that we remove all the resources created during the create process.
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions internal/liqonet/tunnel-operator/tunnel-operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type TunnelController struct {
func NewTunnelController(ctx context.Context, wg *sync.WaitGroup,
podIP, namespace string, er record.EventRecorder, k8sClient k8s.Interface, cl client.Client,
readyClustersMutex *sync.Mutex, readyClusters map[string]struct{}, gatewayNetns, hostNetns ns.NetNS, mtu, port int,
updateStatusInterval time.Duration) (*TunnelController, error) {
updateStatusInterval time.Duration, connCheckOpts *conncheck.Options) (*TunnelController, error) {
tunnelEndpointFinalizer := liqoconst.LiqoGatewayOperatorName + "." + liqoconst.FinalizersSuffix
tc := &TunnelController{
Client: cl,
Expand Down Expand Up @@ -141,7 +141,7 @@ func NewTunnelController(ctx context.Context, wg *sync.WaitGroup,
return fmt.Errorf("unable to enforce tunnel IP: %w", err)
}

wg.Connchecker, err = conncheck.NewConnChecker()
wg.Connchecker, err = conncheck.NewConnChecker(connCheckOpts)
if err != nil {
return fmt.Errorf("failed to create connchecker: %w", err)
}
Expand Down Expand Up @@ -336,7 +336,7 @@ func (tc *TunnelController) disconnectFromPeer(ep *netv1alpha1.TunnelEndpoint) e
}

func (tc *TunnelController) forgeConncheckUpdateStatus(ctx context.Context, req ctrl.Request) conncheck.UpdateFunc {
return func(connected bool, latency time.Duration, timestamp time.Time) error {
return func(connected bool, latency time.Duration, timestamp *time.Time) error {
var tep = new(netv1alpha1.TunnelEndpoint)
if err := tc.Get(ctx, req.NamespacedName, tep); err != nil && !k8sApiErrors.IsNotFound(err) {
return fmt.Errorf("unable to fetch resource %s: %w", req.String(), err)
Expand All @@ -357,7 +357,7 @@ func (tc *TunnelController) forgeConncheckUpdateStatus(ctx context.Context, req
}
conn.Latency = netv1alpha1.ConnectionLatency{
Value: liqonetutils.FormatLatency(latency),
Timestamp: metav1.Time{Time: timestamp},
Timestamp: metav1.Time{Time: *timestamp},
}
tep.Status.Connection = conn
if err := tc.Client.Status().Update(ctx, tep); err != nil {
Expand Down
12 changes: 7 additions & 5 deletions pkg/gateway/connection/conncheck/conncheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

// ConnChecker is a struct that holds the receiver and senders.
type ConnChecker struct {
opts *Options
receiver *Receiver
// key is the target cluster ID.
senders map[string]*Sender
Expand All @@ -36,9 +37,9 @@ type ConnChecker struct {
}

// NewConnChecker creates a new ConnChecker.
func NewConnChecker() (*ConnChecker, error) {
func NewConnChecker(opts *Options) (*ConnChecker, error) {
addr := &net.UDPAddr{
Port: port,
Port: opts.PingPort,
IP: net.ParseIP("0.0.0.0"),
}
conn, err := net.ListenUDP("udp", addr)
Expand All @@ -47,7 +48,8 @@ func NewConnChecker() (*ConnChecker, error) {
}
klog.V(4).Infof("conncheck socket: listening on %s", addr)
connChecker := ConnChecker{
receiver: NewReceiver(conn),
opts: opts,
receiver: NewReceiver(conn, opts),
senders: make(map[string]*Sender),
runningSenders: make(map[string]*Sender),
conn: conn,
Expand Down Expand Up @@ -81,7 +83,7 @@ func (c *ConnChecker) AddSender(ctx context.Context, clusterID, ip string, updat
}

ctxSender, cancelSender := context.WithCancel(ctx)
c.senders[clusterID], err = NewSender(ctxSender, clusterID, cancelSender, c.conn, ip)
c.senders[clusterID], err = NewSender(ctxSender, c.opts, clusterID, cancelSender, c.conn, ip)
if err != nil {
return fmt.Errorf("failed to create sender: %w", err)
}
Expand All @@ -105,7 +107,7 @@ func (c *ConnChecker) RunSender(clusterID string) {

klog.Infof("conncheck sender %q starting against %q", clusterID, sender.raddr.IP.String())

if err := wait.PollUntilContextCancel(sender.Ctx, PingInterval, false, func(ctx context.Context) (done bool, err error) {
if err := wait.PollUntilContextCancel(sender.Ctx, c.opts.PingInterval, false, func(ctx context.Context) (done bool, err error) {
err = c.senders[clusterID].SendPing()
if err != nil {
klog.Warningf("failed to send ping: %s", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@ package conncheck

import "time"

const (
port = 12345
buffSize = 1024
)

var (
// Options contains the options for the wireguard interface.
type Options struct {
// PingPort is the port used for the ping check.
PingPort int
// PingBufferSize is the size of the buffer used for the ping check.
PingBufferSize uint
// PingLossThreshold is the number of lost packets after which the connection check is considered as failed.
PingLossThreshold uint
// PingInterval is the interval at which the ping is sent.
PingInterval time.Duration
// PingUpdateStatusInterval is the interval at which the status is updated.
PingUpdateStatusInterval time.Duration
)
}

// NewOptions returns a new Options struct.
func NewOptions() *Options {
return &Options{}
}
10 changes: 6 additions & 4 deletions pkg/gateway/connection/conncheck/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@ type Receiver struct {
m sync.RWMutex
buff []byte
conn *net.UDPConn
opts *Options
}

// NewReceiver creates a new conncheck receiver.
func NewReceiver(conn *net.UDPConn) *Receiver {
func NewReceiver(conn *net.UDPConn, opts *Options) *Receiver {
return &Receiver{
peers: make(map[string]*Peer),
buff: make([]byte, buffSize),
buff: make([]byte, opts.PingBufferSize),
conn: conn,
opts: opts,
}
}

Expand Down Expand Up @@ -141,12 +143,12 @@ func (r *Receiver) Run(ctx context.Context) {
func (r *Receiver) RunDisconnectObserver(ctx context.Context) {
klog.Infof("conncheck receiver disconnect checker: started")
// Ignore errors because only caused by context cancellation.
err := wait.PollUntilContextCancel(ctx, time.Duration(PingLossThreshold)*PingInterval/10, true,
err := wait.PollUntilContextCancel(ctx, time.Duration(r.opts.PingLossThreshold)*r.opts.PingInterval/10, true,
func(ctx context.Context) (done bool, err error) {
r.m.Lock()
defer r.m.Unlock()
for id, peer := range r.peers {
if time.Since(peer.lastReceivedTimestamp.Add(peer.latency)) <= PingInterval*time.Duration(PingLossThreshold) {
if time.Since(peer.lastReceivedTimestamp.Add(peer.latency)) <= r.opts.PingInterval*time.Duration(r.opts.PingLossThreshold) {
continue
}
klog.V(8).Infof("conncheck receiver: %s unreachable", id)
Expand Down
4 changes: 2 additions & 2 deletions pkg/gateway/connection/conncheck/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Sender struct {
}

// NewSender creates a new conncheck sender.
func NewSender(ctx context.Context, clusterID string, cancel func(), conn *net.UDPConn, ip string) (*Sender, error) {
func NewSender(ctx context.Context, opts *Options, clusterID string, cancel func(), conn *net.UDPConn, ip string) (*Sender, error) {
pip := net.ParseIP(ip)
if pip == nil {
return nil, fmt.Errorf("conncheck sender: invalid IP address %s", ip)
Expand All @@ -44,7 +44,7 @@ func NewSender(ctx context.Context, clusterID string, cancel func(), conn *net.U
clusterID: clusterID,
cancel: cancel,
conn: conn,
raddr: net.UDPAddr{IP: pip, Port: port},
raddr: net.UDPAddr{IP: pip, Port: opts.PingPort},
}, nil
}

Expand Down
49 changes: 29 additions & 20 deletions pkg/gateway/connection/connections_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

networkingv1alpha1 "github.com/liqotech/liqo/apis/networking/v1alpha1"
"github.com/liqotech/liqo/pkg/consts"
"github.com/liqotech/liqo/pkg/gateway"
"github.com/liqotech/liqo/pkg/gateway/connection/conncheck"
"github.com/liqotech/liqo/pkg/gateway/tunnel/common"
)
Expand All @@ -45,13 +44,13 @@ type ConnectionsReconciler struct {
Client client.Client
Scheme *runtime.Scheme
EventsRecorder record.EventRecorder
Options *gateway.Options
Options *Options
}

// NewConnectionsReconciler returns a new PublicKeysReconciler.
func NewConnectionsReconciler(ctx context.Context, cl client.Client,
s *runtime.Scheme, er record.EventRecorder, options *gateway.Options) (*ConnectionsReconciler, error) {
connchecker, err := conncheck.NewConnChecker()
s *runtime.Scheme, er record.EventRecorder, options *Options) (*ConnectionsReconciler, error) {
connchecker, err := conncheck.NewConnChecker(options.ConnCheckOptions)
if err != nil {
return nil, fmt.Errorf("unable to create the connection checker: %w", err)
}
Expand All @@ -76,23 +75,33 @@ func (r *ConnectionsReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
return ctrl.Result{}, fmt.Errorf("unable to get the connection %q: %w", req.NamespacedName, err)
}
klog.Infof("Reconciling connection %q", req.NamespacedName)

remoteIP, err := common.GetRemoteInterfaceIP(r.Options.Mode)
if err != nil {
return ctrl.Result{}, fmt.Errorf("unable to get the remote interface IP: %w", err)
}
updateConnection := ForgeUpdateConnectionCallback(ctx, r.Client, r.Options, req)

err = r.ConnChecker.AddSender(ctx, r.Options.RemoteClusterID, remoteIP, ForgeUpdateConnectionCallback(ctx, r.Client, req))
if err != nil {
switch err.(type) {
case *conncheck.DuplicateError:
return ctrl.Result{}, nil
default:
return ctrl.Result{}, fmt.Errorf("unable to add the sender: %w", err)
switch r.Options.PingEnabled {
case true:
remoteIP, err := common.GetRemoteInterfaceIP(r.Options.GwOptions.Mode)
if err != nil {
return ctrl.Result{}, fmt.Errorf("unable to get the remote interface IP: %w", err)
}
}

go r.ConnChecker.RunSender(r.Options.RemoteClusterID)
err = r.ConnChecker.AddSender(ctx, r.Options.GwOptions.RemoteClusterID, remoteIP, updateConnection)
if err != nil {
switch err.(type) {
case *conncheck.DuplicateError:
return ctrl.Result{}, nil
default:
return ctrl.Result{}, fmt.Errorf("unable to add the sender: %w", err)
}
}

go r.ConnChecker.RunSender(r.Options.GwOptions.RemoteClusterID)
case false:
if err := updateConnection(true, 0, time.Time{}); err != nil {
return ctrl.Result{}, fmt.Errorf("unable to update the connection status: %w", err)
}
}

return ctrl.Result{}, nil
}
Expand All @@ -115,12 +124,12 @@ func (r *ConnectionsReconciler) Predicates() builder.Predicates {
if connection.Labels == nil {
return false
}
return connection.Labels[string(consts.RemoteClusterID)] == r.Options.RemoteClusterID
return connection.Labels[string(consts.RemoteClusterID)] == r.Options.GwOptions.RemoteClusterID
}))
}

// ForgeUpdateConnectionCallback forges the UpdateConnectionStatus function.
func ForgeUpdateConnectionCallback(ctx context.Context, cl client.Client, req ctrl.Request) conncheck.UpdateFunc {
func ForgeUpdateConnectionCallback(ctx context.Context, cl client.Client, opts *Options, req ctrl.Request) conncheck.UpdateFunc {
return func(connected bool, latency time.Duration, timestamp time.Time) error {
connection := &networkingv1alpha1.Connection{}
if err := cl.Get(ctx, req.NamespacedName, connection); err != nil {
Expand All @@ -133,6 +142,6 @@ func ForgeUpdateConnectionCallback(ctx context.Context, cl client.Client, req ct
case false:
connStatusValue = networkingv1alpha1.ConnectionError
}
return UpdateConnectionStatus(ctx, cl, connection, connStatusValue, latency, timestamp)
return UpdateConnectionStatus(ctx, cl, opts, connection, connStatusValue, latency, timestamp)
}
}
Loading

0 comments on commit 012c979

Please sign in to comment.