Skip to content

Commit

Permalink
Merge pull request #985 from trozet/bz2060080
Browse files Browse the repository at this point in the history
Bug 2060080: [release-4.9] Network Policy fixes
  • Loading branch information
openshift-merge-robot authored Mar 17, 2022
2 parents a6a8170 + 8f0741a commit 8699d75
Show file tree
Hide file tree
Showing 15 changed files with 622 additions and 207 deletions.
7 changes: 7 additions & 0 deletions go-controller/pkg/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
v1coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
listers "k8s.io/client-go/listers/core/v1"
netlisters "k8s.io/client-go/listers/networking/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -547,6 +548,12 @@ func (wf *WatchFactory) GetNamespacesBySelector(selector metav1.LabelSelector) (
return namespaceLister.List(labels.Set(selector.MatchLabels).AsSelector())
}

// GetNetworkPolicy gets a specific network policy by the namespace/name
func (wf *WatchFactory) GetNetworkPolicy(namespace, name string) (*knet.NetworkPolicy, error) {
networkPolicyLister := wf.informers[policyType].lister.(netlisters.NetworkPolicyLister)
return networkPolicyLister.NetworkPolicies(namespace).Get(name)
}

func (wf *WatchFactory) NodeInformer() cache.SharedIndexInformer {
return wf.informers[nodeType].inf
}
Expand Down
3 changes: 2 additions & 1 deletion go-controller/pkg/factory/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

ktypes "k8s.io/apimachinery/pkg/types"
listers "k8s.io/client-go/listers/core/v1"
netlisters "k8s.io/client-go/listers/networking/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -370,7 +371,7 @@ func newInformerLister(oType reflect.Type, sharedInformer cache.SharedIndexInfor
case nodeType:
return listers.NewNodeLister(sharedInformer.GetIndexer()), nil
case policyType:
return nil, nil
return netlisters.NewNetworkPolicyLister(sharedInformer.GetIndexer()), nil
case egressFirewallType:
return egressfirewalllister.NewEgressFirewallLister(sharedInformer.GetIndexer()), nil
case egressIPType:
Expand Down
45 changes: 15 additions & 30 deletions go-controller/pkg/ovn/address_set/address_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
goovn "github.com/ebay/go-ovn"
"net"
"sort"
"strings"

"github.com/pkg/errors"
Expand Down Expand Up @@ -249,13 +248,14 @@ func newOvnAddressSet(nb goovn.Client, name string, ips []net.IP) (*ovnAddressSe
nb: nb,
}

uniqIPs := ipsToStringUnique(ips)
ovnAs, err := nb.ASGet(as.hashName)
if err != nil {
if err != goovn.ErrorNotFound && err != goovn.ErrorSchema {
return nil, fmt.Errorf("failed to get address set %q: %v", name, err)
}
// ovnAddressSet has not been created yet. Create it.
cmd, err := nb.ASAdd(as.hashName, ipsToStringArray(ips), map[string]string{"name": name})
cmd, err := nb.ASAdd(as.hashName, uniqIPs, map[string]string{"name": name})
if err != nil {
return nil, fmt.Errorf("failed to create address set cmd: %q: %v", name, err)
}
Expand All @@ -275,7 +275,7 @@ func newOvnAddressSet(nb goovn.Client, name string, ips []net.IP) (*ovnAddressSe
}
}

klog.V(5).Infof("New(%s) with %v", asDetail(as), ips)
klog.V(5).Infof("New(%s) with IPs: %s", asDetail(as), strings.Join(uniqIPs, " "))

return as, nil
}
Expand Down Expand Up @@ -420,7 +420,7 @@ func (as *ovnAddressSets) Destroy() error {
// setIP updates the given address set in OVN to be only the given IPs, disregarding
// existing state.
func (as *ovnAddressSet) setIPs(ips []net.IP) error {
newIPs := ipsToStringArray(ips)
newIPs := ipsToStringUnique(ips)
cmd, err := as.nb.ASUpdate(as.hashName, as.uuid, newIPs, map[string]string{"name": as.name})
if err != nil {
return fmt.Errorf("failed to create update for address set %q: %v", asDetail(as), err)
Expand Down Expand Up @@ -450,14 +450,11 @@ func (as *ovnAddressSet) addIPsCmd(ips []net.IP) (*goovn.OvnCommand, error) {
if len(ips) == 0 {
return nil, nil
}
uniqIPs := make([]string, 0, len(ips))
for _, ip := range ips {
uniqIPs = append(uniqIPs, ip.String())
}

uniqIPs := ipsToStringUnique(ips)
ipStr := strings.Join(uniqIPs, " ")
cmd, err := as.nb.ASAddIPs(as.hashName, as.uuid, uniqIPs)
if err != nil {
return nil, fmt.Errorf("failed to create add ips cmd for address set %q: %v", asDetail(as), err)
return nil, fmt.Errorf("failed to create add ips cmd for address set %q, ips: %s: %v", asDetail(as), ipStr, err)
}

return cmd, nil
Expand All @@ -481,15 +478,12 @@ func (as *ovnAddressSet) deleteIPsCmd(ips []net.IP) (*goovn.OvnCommand, error) {
if len(ips) == 0 {
return nil, nil
}
uniqIPs := make([]string, 0, len(ips))
for _, ip := range ips {
uniqIPs = append(uniqIPs, ip.String())
}
ipStr := joinIPs(ips)
uniqIPs := ipsToStringUnique(ips)
ipStr := strings.Join(uniqIPs, " ")

cmd, err := as.nb.ASDelIPs(as.hashName, as.uuid, uniqIPs)
if err != nil {
return nil, fmt.Errorf("failed to create add ips cmd for address set %q, ips: %s: %v", asDetail(as), ipStr, err)
return nil, fmt.Errorf("failed to create delete ips cmd for address set %q, ips: %s: %v", asDetail(as), ipStr, err)
}
return cmd, nil
}
Expand All @@ -506,14 +500,6 @@ func (as *ovnAddressSet) destroy() error {
return nil
}

func ipsToStringArray(ips []net.IP) []string {
out := make([]string, 0, len(ips))
for _, ip := range ips {
out = append(out, ip.String())
}
return out
}

func MakeAddressSetName(name string) (string, string) {
return name + ipv4AddressSetSuffix, name + ipv6AddressSetSuffix
}
Expand All @@ -536,12 +522,11 @@ func splitIPsByFamily(ips []net.IP) (v4 []net.IP, v6 []net.IP) {
return
}

func joinIPs(ips []net.IP) string {
list := make([]string, 0, len(ips))
// Takes a slice of IPs and returns a slice with unique IPs
func ipsToStringUnique(ips []net.IP) []string {
s := sets.NewString()
for _, ip := range ips {
list = append(list, `"`+ip.String()+`"`)
s.Insert(ip.String())
}
// so tests are predictable
sort.Strings(list)
return strings.Join(list, " ")
return s.UnsortedList()
}
8 changes: 8 additions & 0 deletions go-controller/pkg/ovn/address_set/fake_address_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func (f *FakeAddressSetFactory) EnsureAddressSet(name string) error {
}

func (f *FakeAddressSetFactory) ProcessEachAddressSet(iteratorFn AddressSetIterFunc) error {
f.Lock()
defer f.Unlock()
asNames := sets.String{}
for _, set := range f.sets {
asName := truncateSuffixFromAddressSet(set.getName())
Expand Down Expand Up @@ -134,6 +136,12 @@ func (f *FakeAddressSetFactory) ExpectAddressSetWithIPs(name string, ips []strin
gomega.Expect(lenAddressSet).To(gomega.Equal(len(ips)))
}

func (f *FakeAddressSetFactory) EventuallyExpectAddressSetWithIPs(name string, ips []string) {
gomega.Eventually(func() {
f.ExpectAddressSetWithIPs(name, ips)
}).Should(gomega.Succeed())
}

// ExpectEmptyAddressSet ensures the named address set exists with no IPs
func (f *FakeAddressSetFactory) ExpectEmptyAddressSet(name string) {
f.ExpectAddressSetWithIPs(name, nil)
Expand Down
2 changes: 1 addition & 1 deletion go-controller/pkg/ovn/egressgw.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (oc *Controller) addPodExternalGWForNamespace(namespace string, pod *kapi.P
}
gws += ip.String()
}
nsInfo, nsUnlock, err := oc.ensureNamespaceLocked(namespace, false)
nsInfo, nsUnlock, err := oc.ensureNamespaceLocked(namespace, false, nil)
if err != nil {
return fmt.Errorf("failed to ensure namespace locked: %v", err)
}
Expand Down
11 changes: 6 additions & 5 deletions go-controller/pkg/ovn/gress_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (gp *gressPolicy) delNamespaceAddressSet(name string) bool {
// localPodSetACL adds or updates an ACL that implements the gress policy's rules to the
// given Port Group (which should contain all pod logical switch ports selected
// by the parent NetworkPolicy)
func (gp *gressPolicy) localPodSetACL(portGroupName, portGroupUUID string, aclLogging string) {
func (gp *gressPolicy) localPodSetACL(portGroupName, portGroupUUID string, aclLogging string) error {
l3Match := gp.getL3MatchFromAddressSet()
var lportMatch string
var cidrMatches []string
Expand All @@ -312,15 +312,15 @@ func (gp *gressPolicy) localPodSetACL(portGroupName, portGroupUUID string, aclLo
cidrMatches = gp.getMatchFromIPBlock(lportMatch, l4Match)
for i, cidrMatch := range cidrMatches {
if err := gp.addOrModifyACLAllow(cidrMatch, l4Match, portGroupUUID, i+1, aclLogging); err != nil {
klog.Warningf(err.Error())
return fmt.Errorf("failed to add/update ACL allow rule for IPBlock CIDR: %w", err)
}
}
}
// if there are pod/namespace selector, then allow packets from/to that address_set or
// if the NetworkPolicyPeer is empty, then allow from all sources or to all destinations.
if gp.sizeOfAddressSet() > 0 || len(gp.ipBlock) == 0 {
if err := gp.addOrModifyACLAllow(match, l4Match, portGroupUUID, 0, aclLogging); err != nil {
klog.Warningf(err.Error())
return fmt.Errorf("failed to add/update ACL allow rule for all sources: %w", err)
}
}
}
Expand All @@ -335,16 +335,17 @@ func (gp *gressPolicy) localPodSetACL(portGroupName, portGroupUUID string, aclLo
cidrMatches = gp.getMatchFromIPBlock(lportMatch, l4Match)
for i, cidrMatch := range cidrMatches {
if err := gp.addOrModifyACLAllow(cidrMatch, l4Match, portGroupUUID, i+1, aclLogging); err != nil {
klog.Warningf(err.Error())
return fmt.Errorf("failed to add/update ACL allow rule for IPBlock CIDR: %w", err)
}
}
}
if gp.sizeOfAddressSet() > 0 || len(gp.ipBlock) == 0 {
if err := gp.addOrModifyACLAllow(match, l4Match, portGroupUUID, 0, aclLogging); err != nil {
klog.Warningf(err.Error())
return fmt.Errorf("failed to add/update ACL allow rule for all sources: %w", err)
}
}
}
return nil
}

// addOrModifyACLAllow adds or modifies an ACL with a given match to the given Port Group
Expand Down
2 changes: 1 addition & 1 deletion go-controller/pkg/ovn/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ func (oc *Controller) ensureNodeLogicalNetwork(node *kapi.Node, hostSubnets []*n
if err = func() error {
hostNetworkNamespace := config.Kubernetes.HostNetworkNamespace
if hostNetworkNamespace != "" {
nsInfo, nsUnlock, err := oc.ensureNamespaceLocked(hostNetworkNamespace, true)
nsInfo, nsUnlock, err := oc.ensureNamespaceLocked(hostNetworkNamespace, true, nil)
if err != nil {
return fmt.Errorf("failed to ensure namespace locked: %v", err)
}
Expand Down
76 changes: 46 additions & 30 deletions go-controller/pkg/ovn/namespace.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ovn

import (
"context"
"fmt"
"net"
"strings"
Expand All @@ -13,7 +14,7 @@ import (

goovn "github.com/ebay/go-ovn"
kapi "k8s.io/api/core/v1"
utilwait "k8s.io/apimachinery/pkg/util/wait"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -80,7 +81,7 @@ func (oc *Controller) getRoutingPodGWs(nsInfo *namespaceInfo) map[string]*gatewa
// pod's routing gateway info
func (oc *Controller) addPodToNamespace(ns string, ips []*net.IPNet) (*gatewayInfo, map[string]*gatewayInfo, net.IP,
[]*goovn.OvnCommand, error) {
nsInfo, nsUnlock, err := oc.ensureNamespaceLocked(ns, true)
nsInfo, nsUnlock, err := oc.ensureNamespaceLocked(ns, true, nil)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to ensure namespace locked: %v", err)
}
Expand Down Expand Up @@ -219,7 +220,7 @@ func (oc *Controller) AddNamespace(ns *kapi.Namespace) {
klog.Infof("[%s] adding namespace took %v", ns.Name, time.Since(start))
}()

nsInfo, nsUnlock, err := oc.ensureNamespaceLocked(ns.Name, false)
nsInfo, nsUnlock, err := oc.ensureNamespaceLocked(ns.Name, false, ns)
if err != nil {
klog.Errorf("Failed to ensure namespace locked: %v", err)
return
Expand All @@ -245,6 +246,11 @@ func (oc *Controller) AddNamespace(ns *kapi.Namespace) {
nsInfo.hybridOverlayVTEP = parsedAnnotation
}
}
}

// configureNamespace ensures internal structures are updated based on namespace
// must be called with nsInfo lock
func (oc *Controller) configureNamespace(nsInfo *namespaceInfo, ns *kapi.Namespace) {
if annotation, ok := ns.Annotations[routingExternalGWsAnnotation]; ok {
exGateways, err := parseRoutingExternalGWAnnotation(annotation)
if err != nil {
Expand All @@ -261,7 +267,7 @@ func (oc *Controller) AddNamespace(ns *kapi.Namespace) {
}
}

annotation = ns.Annotations[aclLoggingAnnotation]
annotation := ns.Annotations[aclLoggingAnnotation]
if annotation != "" {
if oc.aclLoggingCanEnable(annotation, nsInfo) {
klog.Infof("Namespace %s: ACL logging is set to deny=%s allow=%s", ns.Name, nsInfo.aclLogging.Deny, nsInfo.aclLogging.Allow)
Expand Down Expand Up @@ -395,33 +401,23 @@ func (oc *Controller) deleteNamespace(ns *kapi.Namespace) {

klog.V(5).Infof("Deleting Namespace's NetworkPolicy entities")
for _, np := range nsInfo.networkPolicies {
delete(nsInfo.networkPolicies, np.name)
oc.destroyNetworkPolicy(np, nsInfo)
oc.checkAndSkipRetryPolicy(np.policy)
// add the full np object to the retry entry, since the namespace is going to be removed
// along with any mappings of nsInfo -> network policies
oc.initRetryPolicyWithDelete(np.policy, np)
isLastPolicyInNamespace := len(nsInfo.networkPolicies) == 1
if err := oc.destroyNetworkPolicy(np, isLastPolicyInNamespace); err != nil {
klog.Errorf("Failed to delete network policy: %s, error: %v", getPolicyNamespacedName(np.policy), err)
oc.unSkipRetryPolicy(np.policy)
} else {
oc.checkAndDeleteRetryPolicy(np.policy)
delete(nsInfo.networkPolicies, np.name)
}
}
oc.deleteGWRoutesForNamespace(ns.Name)
oc.multicastDeleteNamespace(ns, nsInfo)
}

// waitForNamespaceLocked waits up to 10 seconds for a Namespace to be known; use this
// rather than getNamespaceLocked when calling from a thread where you might be processing
// an event in a namespace before the Namespace factory thread has processed the Namespace
// addition.
func (oc *Controller) waitForNamespaceLocked(namespace string, readOnly bool) (*namespaceInfo, func(), error) {
var nsInfo *namespaceInfo
var nsUnlock func()

err := utilwait.PollImmediate(100*time.Millisecond, 10*time.Second,
func() (bool, error) {
nsInfo, nsUnlock = oc.getNamespaceLocked(namespace, readOnly)
return nsInfo != nil, nil
},
)
if err != nil {
return nil, nil, fmt.Errorf("timeout waiting for namespace event")
}
return nsInfo, nsUnlock, nil
}

// getNamespaceLocked locks namespacesMutex, looks up ns, and (if found), returns it with
// its mutex locked. If ns is not known, nil will be returned
func (oc *Controller) getNamespaceLocked(ns string, readOnly bool) (*namespaceInfo, func()) {
Expand Down Expand Up @@ -454,9 +450,11 @@ func (oc *Controller) getNamespaceLocked(ns string, readOnly bool) (*namespaceIn
return nsInfo, unlockFunc
}

// ensureNamespaceLocked locks namespacesMutex, gets/creates an entry for ns, and returns it
// with its mutex locked. Also returns an unlock function and error.
func (oc *Controller) ensureNamespaceLocked(ns string, readOnly bool) (*namespaceInfo, func(), error) {
// ensureNamespaceLocked locks namespacesMutex, gets/creates an entry for ns, configures OVN nsInfo, and returns it
// with its mutex locked.
// ns is the name of the namespace, while namespace is the optional k8s namespace object
// if no k8s namespace object is provided, this function will attempt to find it via informer cache
func (oc *Controller) ensureNamespaceLocked(ns string, readOnly bool, namespace *kapi.Namespace) (*namespaceInfo, func(), error) {
oc.namespacesMutex.Lock()
nsInfo := oc.namespaces[ns]
nsInfoExisted := false
Expand Down Expand Up @@ -502,6 +500,24 @@ func (oc *Controller) ensureNamespaceLocked(ns string, readOnly bool) (*namespac
}
}

// nsInfo and namespace didn't exist, get it from lister
if namespace == nil {
var err error
namespace, err = oc.watchFactory.GetNamespace(ns)
if err != nil {
namespace, err = oc.client.CoreV1().Namespaces().Get(context.TODO(), ns, metav1.GetOptions{})
if err != nil {
klog.Warningf("Unable to find namespace during ensure in informer cache or kube api server. " +
"Will defer configuring namespace.")
}
}
}

if namespace != nil {
// if we have the namespace, attempt to configure nsInfo with it
oc.configureNamespace(nsInfo, namespace)
}

return nsInfo, unlockFunc, nil
}

Expand Down Expand Up @@ -603,7 +619,7 @@ func (oc *Controller) createNamespaceAddrSetAllPods(ns string) (addressset.Addre
} else {
ips = make([]net.IP, 0, len(existingPods))
for _, pod := range existingPods {
if pod.Status.PodIP != "" && !pod.Spec.HostNetwork {
if !pod.Spec.HostNetwork {
podIPs, err := util.GetAllPodIPs(pod)
if err != nil {
klog.Warningf(err.Error())
Expand Down
Loading

0 comments on commit 8699d75

Please sign in to comment.