From 1577c1e57502097f568d8d6d3a55cc2caf0f7e69 Mon Sep 17 00:00:00 2001 From: Francesco Torta <62566275+fra98@users.noreply.github.com> Date: Wed, 30 Oct 2024 15:59:50 +0100 Subject: [PATCH] feat: ipam sync routine --- cmd/ipam/main.go | 12 ++--- pkg/consts/ipam.go | 6 ++- pkg/ipam/initialize.go | 76 ----------------------------- pkg/ipam/ipam.go | 38 +++++++++++---- pkg/ipam/ips.go | 91 +++++++++++++++++++++++++++++++++++ pkg/ipam/networks.go | 83 ++++++++++++++++++++++++++++++++ pkg/ipam/sync.go | 105 +++++++++++++++++++++++++++++++++++++++++ 7 files changed, 316 insertions(+), 95 deletions(-) create mode 100644 pkg/ipam/ips.go create mode 100644 pkg/ipam/networks.go create mode 100644 pkg/ipam/sync.go diff --git a/cmd/ipam/main.go b/cmd/ipam/main.go index ec992c4bf9..9dab33f221 100644 --- a/cmd/ipam/main.go +++ b/cmd/ipam/main.go @@ -23,7 +23,6 @@ import ( "github.com/spf13/cobra" "google.golang.org/grpc" - "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" @@ -71,6 +70,8 @@ func main() { restcfg.InitFlags(cmd.Flags()) cmd.Flags().IntVar(&options.Port, "port", consts.IpamPort, "The port on which to listen for incoming gRPC requests.") + cmd.Flags().DurationVar(&options.SyncFrequency, "interval", consts.SyncFrequency, + "The interval at which the IPAM will synchronize the IPAM storage.") cmd.Flags().BoolVar(&options.EnableLeaderElection, "leader-election", false, "Enable leader election for IPAM. "+ "Enabling this will ensure there is only one active IPAM.") cmd.Flags().StringVar(&options.LeaderElectionNamespace, "leader-election-namespace", consts.DefaultLiqoNamespace, @@ -102,14 +103,12 @@ func run(cmd *cobra.Command, _ []string) error { // Get the rest config. cfg := restcfg.SetRateLimiter(ctrl.GetConfigOrDie()) - options.Config = cfg cl, err := client.New(cfg, client.Options{ Scheme: scheme, }) if err != nil { return err } - options.Client = cl if options.EnableLeaderElection { if leader, err := leaderelection.Blocking(ctx, cfg, record.NewBroadcaster(), &leaderelection.Opts{ @@ -129,10 +128,7 @@ func run(cmd *cobra.Command, _ []string) error { } } - hs := health.NewServer() - options.HealthServer = hs - - liqoIPAM, err := ipam.New(ctx, &options) + liqoIPAM, err := ipam.New(ctx, cl, cfg, &options) if err != nil { return err } @@ -145,7 +141,7 @@ func run(cmd *cobra.Command, _ []string) error { server := grpc.NewServer() // Register health service - grpc_health_v1.RegisterHealthServer(server, hs) + grpc_health_v1.RegisterHealthServer(server, liqoIPAM.HealthServer) // Register IPAM service ipam.RegisterIPAMServer(server, liqoIPAM) diff --git a/pkg/consts/ipam.go b/pkg/consts/ipam.go index 871ddb1bc5..35b45ff2f9 100644 --- a/pkg/consts/ipam.go +++ b/pkg/consts/ipam.go @@ -14,12 +14,16 @@ package consts +import "time" + // NetworkType indicates the type of Network. type NetworkType string const ( // IpamPort is the port used by the IPAM gRPC server. - IpamPort = 50051 + IpamPort = 6000 + // SyncFrequency is the frequency at which the IPAM should periodically sync its status. + SyncFrequency = 2 * time.Minute // NetworkNotRemappedLabelKey is the label key used to mark a Network that does not need CIDR remapping. NetworkNotRemappedLabelKey = "ipam.liqo.io/network-not-remapped" diff --git a/pkg/ipam/initialize.go b/pkg/ipam/initialize.go index 744e1262f9..5d417d0924 100644 --- a/pkg/ipam/initialize.go +++ b/pkg/ipam/initialize.go @@ -18,19 +18,11 @@ import ( "context" klog "k8s.io/klog/v2" - - ipamv1alpha1 "github.com/liqotech/liqo/apis/ipam/v1alpha1" - "github.com/liqotech/liqo/pkg/consts" ) // +kubebuilder:rbac:groups=ipam.liqo.io,resources=ips,verbs=get;list;watch // +kubebuilder:rbac:groups=ipam.liqo.io,resources=networks,verbs=get;list;watch -type ipCidr struct { - ip string - cidr string -} - func (lipam *LiqoIPAM) initialize(ctx context.Context) error { if err := lipam.initializeNetworks(ctx); err != nil { return err @@ -77,71 +69,3 @@ func (lipam *LiqoIPAM) initializeIPs(ctx context.Context) error { return nil } - -func (lipam *LiqoIPAM) getReservedNetworks(ctx context.Context) ([]string, error) { - var nets []string - var networks ipamv1alpha1.NetworkList - if err := lipam.Options.Client.List(ctx, &networks); err != nil { - return nil, err - } - - for i := range networks.Items { - net := &networks.Items[i] - - var cidr string - switch { - case net.Labels != nil && net.Labels[consts.NetworkNotRemappedLabelKey] == consts.NetworkNotRemappedLabelValue: - cidr = net.Spec.CIDR.String() - default: - cidr = net.Status.CIDR.String() - } - if cidr == "" { - klog.Warningf("Network %s has no CIDR", net.Name) - continue - } - - nets = append(nets, cidr) - } - - return nets, nil -} - -func (lipam *LiqoIPAM) getReservedIPs(ctx context.Context) ([]ipCidr, error) { - var ips []ipCidr - var ipList ipamv1alpha1.IPList - if err := lipam.Options.Client.List(ctx, &ipList); err != nil { - return nil, err - } - - for i := range ipList.Items { - ip := &ipList.Items[i] - - address := ip.Status.IP.String() - if address == "" { - klog.Warningf("IP %s has no address", ip.Name) - continue - } - - cidr := ip.Status.CIDR.String() - if cidr == "" { - klog.Warningf("IP %s has no CIDR", ip.Name) - continue - } - - ips = append(ips, ipCidr{ip: address, cidr: cidr}) - } - - return ips, nil -} - -func (lipam *LiqoIPAM) reserveNetwork(cidr string) error { - // TODO: Reserve the network. - klog.Infof("Reserved network %s", cidr) - return nil -} - -func (lipam *LiqoIPAM) reserveIP(ip, cidr string) error { - // TODO: Reserve the IP. - klog.Infof("Reserved IP %s in network %s", ip, cidr) - return nil -} diff --git a/pkg/ipam/ipam.go b/pkg/ipam/ipam.go index f591345059..6ccfa9d7c9 100644 --- a/pkg/ipam/ipam.go +++ b/pkg/ipam/ipam.go @@ -16,6 +16,7 @@ package ipam import ( "context" + "sync" "time" "google.golang.org/grpc/health" @@ -27,15 +28,21 @@ import ( // LiqoIPAM is the struct implementing the IPAM interface. type LiqoIPAM struct { UnimplementedIPAMServer + HealthServer *health.Server + + client.Client + *rest.Config - Options *Options + options *Options + cacheNetworks map[string]networkInfo + cacheIPs map[string]ipInfo + mutex sync.Mutex } // Options contains the options to configure the IPAM. type Options struct { - Port int - Config *rest.Config - Client client.Client + Port int + SyncFrequency time.Duration EnableLeaderElection bool LeaderElectionNamespace string @@ -44,23 +51,34 @@ type Options struct { RenewDeadline time.Duration RetryPeriod time.Duration PodName string - - HealthServer *health.Server } // New creates a new instance of the LiqoIPAM. -func New(ctx context.Context, opts *Options) (*LiqoIPAM, error) { - opts.HealthServer.SetServingStatus(IPAM_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_NOT_SERVING) +func New(ctx context.Context, cl client.Client, cfg *rest.Config, opts *Options) (*LiqoIPAM, error) { + hs := health.NewServer() + hs.SetServingStatus(IPAM_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_NOT_SERVING) lipam := &LiqoIPAM{ - Options: opts, + HealthServer: hs, + + Config: cfg, + Client: cl, + + options: opts, + cacheNetworks: make(map[string]networkInfo), + cacheIPs: make(map[string]ipInfo), } + // Initialize the IPAM instance if err := lipam.initialize(ctx); err != nil { return nil, err } - opts.HealthServer.SetServingStatus(IPAM_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_SERVING) + // Launch sync routine + go lipam.sync(ctx, opts.SyncFrequency) + + hs.SetServingStatus(IPAM_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_SERVING) + return lipam, nil } diff --git a/pkg/ipam/ips.go b/pkg/ipam/ips.go new file mode 100644 index 0000000000..b2f04e8522 --- /dev/null +++ b/pkg/ipam/ips.go @@ -0,0 +1,91 @@ +// Copyright 2019-2024 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ipam + +import ( + "context" + "time" + + klog "k8s.io/klog/v2" + + ipamv1alpha1 "github.com/liqotech/liqo/apis/ipam/v1alpha1" +) + +type ipInfo struct { + ipCidr + creationTimestamp time.Time +} + +func (i *ipInfo) String() string { + return i.ipCidr.String() +} + +type ipCidr struct { + ip string + cidr string +} + +func (i *ipCidr) String() string { + return i.ip + "-" + i.cidr +} + +func (lipam *LiqoIPAM) reserveIP(ip, cidr string) error { + lipam.mutex.Lock() + defer lipam.mutex.Unlock() + + if lipam.cacheIPs == nil { + lipam.cacheIPs = make(map[string]ipInfo) + } + + // TODO: add correct logic. + ipI := ipInfo{ + ipCidr: ipCidr{ip: ip, cidr: cidr}, + creationTimestamp: time.Now(), + } + + // Save IP in cache. + lipam.cacheIPs[ipI.String()] = ipI + klog.Infof("Reserved IP %s in network %s", ip, cidr) + + return nil +} + +func (lipam *LiqoIPAM) getReservedIPs(ctx context.Context) ([]ipCidr, error) { + var ips []ipCidr + var ipList ipamv1alpha1.IPList + if err := lipam.Client.List(ctx, &ipList); err != nil { + return nil, err + } + + for i := range ipList.Items { + ip := &ipList.Items[i] + + address := ip.Status.IP.String() + if address == "" { + klog.Warningf("IP %s has no address", ip.Name) + continue + } + + cidr := ip.Status.CIDR.String() + if cidr == "" { + klog.Warningf("IP %s has no CIDR", ip.Name) + continue + } + + ips = append(ips, ipCidr{ip: address, cidr: cidr}) + } + + return ips, nil +} diff --git a/pkg/ipam/networks.go b/pkg/ipam/networks.go new file mode 100644 index 0000000000..64800091aa --- /dev/null +++ b/pkg/ipam/networks.go @@ -0,0 +1,83 @@ +// Copyright 2019-2024 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ipam + +import ( + "context" + "time" + + klog "k8s.io/klog/v2" + + ipamv1alpha1 "github.com/liqotech/liqo/apis/ipam/v1alpha1" + "github.com/liqotech/liqo/pkg/consts" +) + +type networkInfo struct { + cidr string + creationTimestamp time.Time +} + +func (c *networkInfo) String() string { + return c.cidr +} + +func (lipam *LiqoIPAM) reserveNetwork(cidr string) error { + lipam.mutex.Lock() + defer lipam.mutex.Unlock() + + if lipam.cacheNetworks == nil { + lipam.cacheNetworks = make(map[string]networkInfo) + } + + // TODO: add correct logic + nwI := networkInfo{ + cidr: cidr, + creationTimestamp: time.Now(), + } + + // Save network in cache. + lipam.cacheNetworks[nwI.String()] = nwI + klog.Infof("Reserved network %s", cidr) + + return nil +} + +func (lipam *LiqoIPAM) getReservedNetworks(ctx context.Context) ([]string, error) { + var nets []string + var networks ipamv1alpha1.NetworkList + if err := lipam.Client.List(ctx, &networks); err != nil { + return nil, err + } + + for i := range networks.Items { + net := &networks.Items[i] + + var cidr string + switch { + case net.Labels != nil && net.Labels[consts.NetworkNotRemappedLabelKey] == consts.NetworkNotRemappedLabelValue: + cidr = net.Spec.CIDR.String() + default: + cidr = net.Status.CIDR.String() + } + if cidr == "" { + klog.Warningf("Network %s has no CIDR", net.Name) + continue + } + + nets = append(nets, cidr) + } + + return nets, nil +} diff --git a/pkg/ipam/sync.go b/pkg/ipam/sync.go new file mode 100644 index 0000000000..455c680fa7 --- /dev/null +++ b/pkg/ipam/sync.go @@ -0,0 +1,105 @@ +// Copyright 2019-2024 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ipam + +import ( + "context" + "os" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + klog "k8s.io/klog/v2" +) + +// +kubebuilder:rbac:groups=ipam.liqo.io,resources=ips,verbs=get;list;watch +// +kubebuilder:rbac:groups=ipam.liqo.io,resources=networks,verbs=get;list;watch + +func (lipam *LiqoIPAM) sync(ctx context.Context, syncFrequency time.Duration) { + err := wait.PollUntilContextCancel(ctx, syncFrequency, false, + func(ctx context.Context) (done bool, err error) { + lipam.mutex.Lock() + defer lipam.mutex.Unlock() + + now := time.Now() + // networks created before this threshold will be removed from the cache if they are not present in the cluster. + expiredThreshold := now.Add(-syncFrequency) + + // Sync networks. + if err := lipam.syncNetworks(ctx, expiredThreshold); err != nil { + return false, err + } + + // Sync IPs. + if err := lipam.syncIPs(ctx, expiredThreshold); err != nil { + return false, err + } + + return false, nil + }) + if err != nil { + klog.Error(err) + os.Exit(1) + } +} + +func (lipam *LiqoIPAM) syncNetworks(ctx context.Context, expiredThreshold time.Time) error { + // List all networks present in the cluster. + networks, err := lipam.getReservedNetworks(ctx) + if err != nil { + return err + } + + // Create a set for faster lookup. + nwSet := make(map[string]struct{}) + for i := range networks { + nwI := networkInfo{cidr: networks[i]} + nwSet[nwI.String()] = struct{}{} + } + + // Remove networks that are not present in the cache and were added before the threshold. + for key := range lipam.cacheNetworks { + if _, ok := nwSet[key]; !ok && lipam.cacheNetworks[key].creationTimestamp.Before(expiredThreshold) { + delete(lipam.cacheNetworks, key) + klog.Infof("Removed network %s from cache", key) + } + } + + return nil +} + +func (lipam *LiqoIPAM) syncIPs(ctx context.Context, expiredThreshold time.Time) error { + // List all IPs present in the cluster. + ipCidrs, err := lipam.getReservedIPs(ctx) + if err != nil { + return err + } + + // Create a set for faster lookup. + ipSet := make(map[string]struct{}) + for i := range ipCidrs { + ipI := ipInfo{ipCidr: ipCidrs[i]} + ipSet[ipI.String()] = struct{}{} + } + + // Remove IPs that are not present in the cache and were added before the threshold. + for key := range lipam.cacheIPs { + if _, ok := ipSet[key]; !ok && lipam.cacheIPs[key].creationTimestamp.Before(expiredThreshold) { + delete(lipam.cacheIPs, key) + klog.Infof("Removed IP %s from cache", key) + } + } + + return nil +}