Skip to content

Commit

Permalink
Merge pull request #275 from grycap/micafer_dev
Browse files Browse the repository at this point in the history
Fix sec issues
  • Loading branch information
SergioLangaritaBenitez authored Jan 14, 2025
2 parents 9f7787a + 7bf1973 commit 4716219
Show file tree
Hide file tree
Showing 19 changed files with 171 additions and 67 deletions.
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[flake8]
exclude = examples
5 changes: 5 additions & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ jobs:
- name: Run tests
run: go test ./pkg/... -cover -coverprofile=profile.cov

- name: Run Gosec Security Scanner
uses: securego/gosec@master
with:
args: ./...

- name: Report coverage
uses: codacy/codacy-coverage-reporter-action@v1
with:
Expand Down
2 changes: 1 addition & 1 deletion pkg/backends/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (k *KubeBackend) ListServices() ([]*types.Service, error) {
services := []*types.Service{}

for _, cm := range configmaps.Items {
service, err := getServiceFromConfigMap(&cm)
service, err := getServiceFromConfigMap(&cm) // #nosec G601
if err != nil {
return nil, err
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/backends/knative.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"log"
"net/http"
"os"
"strconv"

"github.com/grycap/oscar/v3/pkg/imagepuller"
Expand All @@ -33,8 +32,8 @@ import (
knclientset "knative.dev/serving/pkg/client/clientset/versioned"
)

// Custom logger
var knativeLogger = log.New(os.Stdout, "[KNATIVE] ", log.Flags())
// Custom logger - uncomment if needed
// var knativeLogger = log.New(os.Stdout, "[KNATIVE] ", log.Flags())

// KnativeBackend struct to represent a Knative client
type KnativeBackend struct {
Expand Down Expand Up @@ -84,7 +83,7 @@ func (kn *KnativeBackend) ListServices() ([]*types.Service, error) {
services := []*types.Service{}

for _, cm := range configmaps.Items {
service, err := getServiceFromConfigMap(&cm)
service, err := getServiceFromConfigMap(&cm) // #nosec G601
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -129,7 +128,10 @@ func (kn *KnativeBackend) CreateService(service types.Service) error {

//Create an expose service
if service.Expose.APIPort != 0 {
types.CreateExpose(service, kn.kubeClientset, kn.config)
err = types.CreateExpose(service, kn.kubeClientset, kn.config)
if err != nil {
return err
}
}
//Create deaemonset to cache the service image on all the nodes
if service.ImagePrefetch {
Expand Down
2 changes: 1 addition & 1 deletion pkg/backends/openfaas.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (of *OpenfaasBackend) ListServices() ([]*types.Service, error) {
services := []*types.Service{}

for _, cm := range configmaps.Items {
service, err := getServiceFromConfigMap(&cm)
service, err := getServiceFromConfigMap(&cm) // #nosec G601
if err != nil {
return nil, err
}
Expand Down
40 changes: 32 additions & 8 deletions pkg/handlers/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,14 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand
if len(uids) > 0 {
for _, uid := range uids {
sk, _ := auth.GenerateRandomKey(8)
minIOAdminClient.CreateMinIOUser(uid, sk)
mc.CreateSecretForOIDC(uid, sk)
cmuErr := minIOAdminClient.CreateMinIOUser(uid, sk)
if cmuErr != nil {
log.Printf("Error creating MinIO user for user %s: %v", uid, cmuErr)
}
csErr := mc.CreateSecretForOIDC(uid, sk)
if csErr != nil {
log.Printf("Error creating secret for user %s: %v", uid, csErr)
}
}
}

Expand Down Expand Up @@ -171,7 +177,10 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand

// Register minio webhook and restart the server
if err := registerMinIOWebhook(service.Name, service.Token, service.StorageProviders.MinIO[types.DefaultProvider], cfg); err != nil {
back.DeleteService(service)
derr := back.DeleteService(service)
if derr != nil {
log.Printf("Error deleting service: %v\n", derr)
}
c.String(http.StatusInternalServerError, err.Error())
return
}
Expand All @@ -183,7 +192,10 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand
} else {
c.String(http.StatusInternalServerError, err.Error())
}
back.DeleteService(service)
derr := back.DeleteService(service)
if derr != nil {
log.Printf("Error deleting service: %v\n", derr)
}
return
}

Expand Down Expand Up @@ -361,7 +373,10 @@ func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient *
}

if !isAdminUser {
minIOAdminClient.CreateAddPolicy(b, service.AllowedUsers[i], false)
err = minIOAdminClient.CreateAddPolicy(b, service.AllowedUsers[i], false)
if err != nil {
return err
}
}
}
}
Expand Down Expand Up @@ -401,7 +416,10 @@ func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient *
// Check if the provider identifier is defined in StorageProviders
if !isStorageProviderDefined(provName, provID, service.StorageProviders) {
// TODO fix
disableInputNotifications(s3Client, service.GetMinIOWebhookARN(), "")
dinErr := disableInputNotifications(s3Client, service.GetMinIOWebhookARN(), "")
if dinErr != nil {
log.Printf("Error disabling input notifications: %v\n", dinErr)
}
return fmt.Errorf("the StorageProvider \"%s.%s\" is not defined", provName, provID)
}

Expand Down Expand Up @@ -448,7 +466,10 @@ func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient *
Key: aws.String(folderKey),
})
if err != nil {
disableInputNotifications(s3Client, service.GetMinIOWebhookARN(), splitPath[0])
dinErr := disableInputNotifications(s3Client, service.GetMinIOWebhookARN(), splitPath[0])
if dinErr != nil {
log.Printf("Error disabling input notifications: %v\n", dinErr)
}
return fmt.Errorf("error creating folder \"%s\" in bucket \"%s\": %v", folderKey, splitPath[0], err)
}
}
Expand All @@ -474,7 +495,10 @@ func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient *
log.Printf("Error creating \"%s\" folder in Onedata. Error: %v\n", path, err)
} else {
// TODO fix
disableInputNotifications(s3Client, service.GetMinIOWebhookARN(), "")
dinErr := disableInputNotifications(s3Client, service.GetMinIOWebhookARN(), "")
if dinErr != nil {
log.Printf("Error disabling input notifications: %v\n", dinErr)
}
return fmt.Errorf("error connecting to Onedata's Oneprovider \"%s\". Error: %v", service.StorageProviders.Onedata[provID].OneproviderHost, err)
}
}
Expand Down
16 changes: 12 additions & 4 deletions pkg/handlers/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ func MakeDeleteHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand
// Split buckets and folders from path
bucket := strings.SplitN(path, "/", 2)
var users []string
// Needed ?
minIOAdminClient.UpdateUsersInGroup(users, bucket[0], true)
err = minIOAdminClient.UpdateUsersInGroup(users, bucket[0], true)
if err != nil {
log.Printf("error updating MinIO users in group: %v", err)
}
}

// Remove the service's webhook in MinIO config and restart the server
Expand Down Expand Up @@ -176,7 +178,10 @@ func deleteBuckets(service *types.Service, cfg *types.Config, minIOAdminClient *
// Delete user's buckets if isolated spaces had been created
if strings.ToUpper(service.IsolationLevel) == "USER" && len(service.BucketList) > 0 {
// Delete all private buckets
deletePrivateBuckets(service, minIOAdminClient, s3Client)
err = deletePrivateBuckets(service, minIOAdminClient, s3Client)
if err != nil {
return fmt.Errorf("error while disable the input notification")
}
}
}

Expand All @@ -193,7 +198,10 @@ func deleteBuckets(service *types.Service, cfg *types.Config, minIOAdminClient *
// Check if the provider identifier is defined in StorageProviders
if !isStorageProviderDefined(provName, provID, service.StorageProviders) {
// TODO fix
disableInputNotifications(s3Client, service.GetMinIOWebhookARN(), "")
err := disableInputNotifications(s3Client, service.GetMinIOWebhookARN(), "")
if err != nil {
return fmt.Errorf("error while disable the input notification")
}
return fmt.Errorf("the StorageProvider \"%s.%s\" is not defined", provName, provID)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/handlers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func MakeJobHandler(cfg *types.Config, kubeClientset kubernetes.Interface, back
c.Next()

// Initialize event envVar and args var
event := v1.EnvVar{}
var event v1.EnvVar
var args []string

if cfg.InterLinkAvailable && service.InterLinkNodeName != "" {
Expand Down
6 changes: 3 additions & 3 deletions pkg/handlers/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ func MakeJobsInfoHandler(back types.ServerlessBackend, kubeClientset kubernetes.
for _, contStatus := range pod.Status.ContainerStatuses {
if contStatus.Name == types.ContainerName {
if contStatus.State.Running != nil {
jobsInfo[jobName].StartTime = &contStatus.State.Running.StartedAt
jobsInfo[jobName].StartTime = &(contStatus.State.Running.StartedAt)
} else if contStatus.State.Terminated != nil {
jobsInfo[jobName].StartTime = &contStatus.State.Terminated.StartedAt
jobsInfo[jobName].FinishTime = &contStatus.State.Terminated.FinishedAt
jobsInfo[jobName].StartTime = &(contStatus.State.Terminated.StartedAt)
jobsInfo[jobName].FinishTime = &(contStatus.State.Terminated.FinishedAt)
}
}
}
Expand Down
16 changes: 12 additions & 4 deletions pkg/handlers/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,16 @@ func MakeUpdateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand
return
}

disableInputNotifications(s3Client, oldService.GetMinIOWebhookARN(), splitPath[0])

err = disableInputNotifications(s3Client, oldService.GetMinIOWebhookARN(), splitPath[0])
if err != nil {
return
}
// Register minio webhook and restart the server
if err := registerMinIOWebhook(newService.Name, newService.Token, newService.StorageProviders.MinIO[types.DefaultProvider], cfg); err != nil {
back.UpdateService(*oldService)
uerr := back.UpdateService(*oldService)
if uerr != nil {
log.Println(uerr.Error())
}
c.String(http.StatusInternalServerError, err.Error())
return
}
Expand All @@ -152,7 +157,10 @@ func MakeUpdateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand
c.String(http.StatusInternalServerError, err.Error())
}
// If updateBuckets fails restore the oldService
back.UpdateService(*oldService)
uerr := back.UpdateService(*oldService)
if uerr != nil {
log.Println(uerr.Error())
}
return
}

Expand Down
23 changes: 17 additions & 6 deletions pkg/imagepuller/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ package imagepuller
import (
//"k8s.io/apimachinery/pkg/watch"
"context"
"crypto/rand"
"fmt"
"log"
"math/rand"
"math/big"
"os"
"sync"
"time"
Expand Down Expand Up @@ -61,15 +62,19 @@ var stopper chan struct{}
func CreateDaemonset(cfg *types.Config, service types.Service, kubeClientset kubernetes.Interface) error {
DaemonSetLoggerInfo.Println("Creating daemonset for service:", service.Name)
//Set needed variables
setWorkingNodes(kubeClientset)
err := setWorkingNodes(kubeClientset)
if err != nil {
DaemonSetLoggerInfo.Println(err)
return fmt.Errorf("failed to set working nodes: %s", err.Error())
}
podGroup = generatePodGroupName()
daemonsetName = "image-puller-" + service.Name

//Get daemonset definition
daemon := getDaemonset(cfg, service)

//Create daemonset
_, err := kubeClientset.AppsV1().DaemonSets(cfg.ServicesNamespace).Create(context.TODO(), daemon, metav1.CreateOptions{})
_, err = kubeClientset.AppsV1().DaemonSets(cfg.ServicesNamespace).Create(context.TODO(), daemon, metav1.CreateOptions{})
if err != nil {
DaemonSetLoggerInfo.Println(err)
return fmt.Errorf("failed to create daemonset: %s", err.Error())
Expand Down Expand Up @@ -146,15 +151,19 @@ func watchPods(kubeClientset kubernetes.Interface, cfg *types.Config) {
}

//Add event handler that gets all the pods status
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
_, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: handleUpdatePodEvent,
})
if err != nil {
DaemonSetLoggerInfo.Println(err)
log.Fatalf("Failed to add event handler: %s", err.Error())
}

<-stopper

//Delete daemonset when all pods are in state "Running"
DaemonSetLoggerInfo.Println("Deleting daemonset...")
err := kubeClientset.AppsV1().DaemonSets(cfg.ServicesNamespace).Delete(context.TODO(), daemonsetName, metav1.DeleteOptions{})
err = kubeClientset.AppsV1().DaemonSets(cfg.ServicesNamespace).Delete(context.TODO(), daemonsetName, metav1.DeleteOptions{})
if err != nil {
DaemonSetLoggerInfo.Println(err)
log.Fatalf("Failed to delete daemonset: %s", err.Error())
Expand Down Expand Up @@ -191,7 +200,9 @@ func setWorkingNodes(kubeClientset kubernetes.Interface) error {
func generatePodGroupName() string {
b := make([]byte, lengthStr)
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
max := big.NewInt(int64(len(letterBytes)))
randomNumber, _ := rand.Int(rand.Reader, max)
b[i] = letterBytes[randomNumber.Int64()]
}
return "pod-group-" + string(b)
}
13 changes: 10 additions & 3 deletions pkg/resourcemanager/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ package resourcemanager

import (
"bytes"
"crypto/rand"
"crypto/tls"
"encoding/json"
"fmt"
"log"
"math/rand"
"math/big"
"net/http"
"net/url"
"path"
Expand Down Expand Up @@ -129,6 +130,7 @@ func DelegateJob(service *types.Service, event string, logger *log.Logger) error
req.Header.Add("Authorization", "Bearer "+strings.TrimSpace(token))

// Make HTTP client
// #nosec
var transport http.RoundTripper = &http.Transport{
// Enable/disable SSL verification
TLSClientConfig: &tls.Config{InsecureSkipVerify: !cluster.SSLVerify},
Expand Down Expand Up @@ -191,6 +193,7 @@ func DelegateJob(service *types.Service, event string, logger *log.Logger) error
}

// Make HTTP client
// #nosec
var transport http.RoundTripper = &http.Transport{
// Enable/disable SSL verification
TLSClientConfig: &tls.Config{InsecureSkipVerify: !replica.SSLVerify},
Expand Down Expand Up @@ -267,6 +270,7 @@ func updateServiceToken(replica types.Replica, cluster types.Cluster) (string, e
req.SetBasicAuth(cluster.AuthUser, cluster.AuthPassword)

// Make HTTP client
// #nosec
var transport http.RoundTripper = &http.Transport{
// Enable/disable SSL verification
TLSClientConfig: &tls.Config{InsecureSkipVerify: !cluster.SSLVerify},
Expand Down Expand Up @@ -342,6 +346,7 @@ func getClusterStatus(service *types.Service) {
req.SetBasicAuth(cluster.AuthUser, cluster.AuthPassword)

// Make HTTP client
// #nosec
var transport http.RoundTripper = &http.Transport{
// Enable/disable SSL verification
TLSClientConfig: &tls.Config{InsecureSkipVerify: !cluster.SSLVerify},
Expand Down Expand Up @@ -395,14 +400,16 @@ func getClusterStatus(service *types.Service) {
if dist >= 0 {
fmt.Println("Resources available in ClusterID", replica.ClusterID)
if service.Delegation == "random" {
randPriority := rand.Intn(noDelegateCode)
max := big.NewInt(int64(noDelegateCode))
randomNumber, _ := rand.Int(rand.Reader, max)
randPriority := randomNumber.Uint64()
replica.Priority = uint(randPriority)
fmt.Println("Priority ", replica.Priority, " with ", service.Delegation, " delegation")
} else if service.Delegation == "load-based" {
//Map the totalClusterCPU range to a smaller range (input range 0 to 32 cpu to output range 100 to 0 priority)
totalClusterCPU := clusterStatus.CPUFreeTotal
mappedCPUPriority := mapToRange(totalClusterCPU, 0, 32000, 100, 0)
replica.Priority = uint(mappedCPUPriority)
replica.Priority = uint(mappedCPUPriority) // #nosec G115
fmt.Println("Priority ", replica.Priority, " with ", service.Delegation, " delegation")
} else if service.Delegation != "static" {
replica.Priority = noDelegateCode
Expand Down
Loading

0 comments on commit 4716219

Please sign in to comment.