Skip to content

Commit

Permalink
Merge pull request #226 from grycap/devel
Browse files Browse the repository at this point in the history
First multenancy version
  • Loading branch information
SergioLangaritaBenitez authored Feb 5, 2024
2 parents 8ecebe7 + 76663a5 commit fbb8301
Show file tree
Hide file tree
Showing 19 changed files with 710 additions and 66 deletions.
2 changes: 1 addition & 1 deletion docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ credentials [here](deploy-ec3.md#default-service-endpoints).
![login](images/usage/usage-01.png)

After a correct login, you should see the main view:

<!-- TODO actualizar foto del login a la nueva interfaz -->
![main view](images/usage/usage-02.png)

## Deploying services
Expand Down
10 changes: 5 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func main() {
r := gin.Default()

// Define system group with basic auth middleware
system := r.Group("/system", auth.GetAuthMiddleware(cfg))
system := r.Group("/system", auth.GetAuthMiddleware(cfg, kubeClientset))

// Config path
system.GET("/config", handlers.MakeConfigHandler(cfg))
Expand All @@ -90,10 +90,10 @@ func main() {
system.DELETE("/services/:serviceName", handlers.MakeDeleteHandler(cfg, back))

// Logs paths
system.GET("/logs/:serviceName", handlers.MakeJobsInfoHandler(kubeClientset, cfg.ServicesNamespace))
system.DELETE("/logs/:serviceName", handlers.MakeDeleteJobsHandler(kubeClientset, cfg.ServicesNamespace))
system.GET("/logs/:serviceName/:jobName", handlers.MakeGetLogsHandler(kubeClientset, cfg.ServicesNamespace))
system.DELETE("/logs/:serviceName/:jobName", handlers.MakeDeleteJobHandler(kubeClientset, cfg.ServicesNamespace))
system.GET("/logs/:serviceName", handlers.MakeJobsInfoHandler(back, kubeClientset, cfg.ServicesNamespace))
system.DELETE("/logs/:serviceName", handlers.MakeDeleteJobsHandler(back, kubeClientset, cfg.ServicesNamespace))
system.GET("/logs/:serviceName/:jobName", handlers.MakeGetLogsHandler(back, kubeClientset, cfg.ServicesNamespace))
system.DELETE("/logs/:serviceName/:jobName", handlers.MakeDeleteJobHandler(back, kubeClientset, cfg.ServicesNamespace))

// Job path for async invocations
r.POST("/job/:serviceName", handlers.MakeJobHandler(cfg, kubeClientset, back, resMan))
Expand Down
5 changes: 5 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ nav:
- oscar-cli.md
- OpenAPI Specification: api.md
- MinIO bucket replication: minio-bucket-replication.md
nav:
- Third party integrations:
- Integration with EGI: egi-integration.md
- OIDC Authorization: oidc-auth.md
- Integration with SCONE: sgx-integration.md
- Frequently Asked Questions (FAQ): faq.md
- license.md
- about.md
Expand Down
8 changes: 0 additions & 8 deletions pkg/backends/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,6 @@ func (k *KubeBackend) CreateService(service types.Service) error {
return err
}

if service.VO != "" {
for _, vo := range k.config.OIDCGroups {
if vo == service.VO {
service.Labels["vo"] = service.VO
}
}
}

// Create podSpec from the service
podSpec, err := service.ToPodSpec(k.config)
if err != nil {
Expand Down
13 changes: 4 additions & 9 deletions pkg/backends/knative.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"log"
"net/http"
"os"
"strconv"

"github.com/grycap/oscar/v2/pkg/imagepuller"
Expand All @@ -33,6 +34,9 @@ import (
knclientset "knative.dev/serving/pkg/client/clientset/versioned"
)

// Custom logger
var knativeLogger = log.New(os.Stdout, "[KNATIVE] ", log.Flags())

// KnativeBackend struct to represent a Knative client
type KnativeBackend struct {
kubeClientset kubernetes.Interface
Expand Down Expand Up @@ -278,15 +282,6 @@ func (kn *KnativeBackend) createKNServiceDefinition(service *types.Service) (*kn
// https://knative.dev/docs/serving/services/private-services/
service.Labels[types.KnativeVisibilityLabel] = types.KnativeClusterLocalValue

// Add to the service labels the user VO for accounting on k8s pods
if service.VO != "" {
for _, vo := range kn.config.OIDCGroups {
if vo == service.VO {
service.Labels["vo"] = service.VO
}
}
}

podSpec, err := service.ToPodSpec(kn.config)
if err != nil {
return nil, err
Expand Down
46 changes: 45 additions & 1 deletion pkg/handlers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,59 @@ limitations under the License.
package handlers

import (
"fmt"
"net/http"
"strings"

"github.com/gin-gonic/gin"
"github.com/grycap/oscar/v2/pkg/types"
"github.com/grycap/oscar/v2/pkg/utils/auth"
)

type configForUser struct {
Cfg *types.Config `json:"config"`
MinIOProvider *types.MinIOProvider `json:"minio_provider"`
}

// MakeConfigHandler makes a handler for getting server's configuration
func MakeConfigHandler(cfg *types.Config) gin.HandlerFunc {
return func(c *gin.Context) {
c.JSON(http.StatusOK, cfg)
// Return configForUser
var conf configForUser
minIOProvider := cfg.MinIOProvider
authHeader := c.GetHeader("Authorization")
if len(strings.Split(authHeader, "Bearer")) == 1 {
conf = configForUser{cfg, minIOProvider}
} else {

// Get MinIO credentials from k8s secret for user

uid, err := auth.GetUIDFromContext(c)
if err != nil {
c.String(http.StatusInternalServerError, fmt.Sprintln(err))
}

mc, err := auth.GetMultitenancyConfigFromContext(c)
if err != nil {
c.String(http.StatusInternalServerError, fmt.Sprintln(err))
}

ak, sk, err := mc.GetUserCredentials(uid)
if err != nil {
c.String(http.StatusInternalServerError, "Error getting credentials for MinIO user: ", uid)
}

userMinIOProvider := &types.MinIOProvider{
Endpoint: minIOProvider.Endpoint,
Verify: minIOProvider.Verify,
AccessKey: ak,
SecretKey: sk,
Region: minIOProvider.Region,
}

conf = configForUser{cfg, userMinIOProvider}
}

c.JSON(http.StatusOK, conf)
}
}
106 changes: 87 additions & 19 deletions pkg/handlers/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"log"
"net/http"
"os"
"reflect"
"strings"

Expand All @@ -43,10 +44,18 @@ const (

var errInput = errors.New("unrecognized input (valid inputs are MinIO and dCache)")

// Custom logger
var createLogger = log.New(os.Stdout, "[CREATE] ", log.Flags())
var isAdminUser = false

// MakeCreateHandler makes a handler for creating services
func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.HandlerFunc {
return func(c *gin.Context) {
var service types.Service
authHeader := c.GetHeader("Authorization")
if len(strings.Split(authHeader, "Bearer")) == 1 {
isAdminUser = true
}

if err := c.ShouldBindJSON(&service); err != nil {
c.String(http.StatusBadRequest, fmt.Sprintf("The service specification is not valid: %v", err))
Expand All @@ -56,21 +65,47 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand
// Check service values and set defaults
checkValues(&service, cfg)

if service.VO != "" {
oidcManager, _ := auth.NewOIDCManager(cfg.OIDCIssuer, cfg.OIDCSubject, cfg.OIDCGroups)
// Check if users in allowed_users have a MinIO associated user
minIOAdminClient, _ := utils.MakeMinIOAdminClient(cfg)

authHeader := c.GetHeader("Authorization")
rawToken := strings.TrimPrefix(authHeader, "Bearer ")
hasVO, err2 := oidcManager.UserHasVO(rawToken, service.VO)
// Service is created by an EGI user
if !isAdminUser {
uid, err := auth.GetUIDFromContext(c)
if err != nil {
c.String(http.StatusInternalServerError, fmt.Sprintln(err))
}

if err2 != nil {
c.String(http.StatusInternalServerError, err2.Error())
return
mc, err := auth.GetMultitenancyConfigFromContext(c)
if err != nil {
c.String(http.StatusInternalServerError, fmt.Sprintln(err))
}

full_uid := auth.FormatUID(uid)
// Check if the service VO is present on the cluster VO's and if the user creating the service is enrrolled in such
if service.VO != "" {
for _, vo := range cfg.OIDCGroups {
if vo == service.VO {
err := checkIdentity(&service, cfg, authHeader)
if err != nil {
c.String(http.StatusBadRequest, fmt.Sprintln(err))
}
break
}
}
}

if !hasVO {
c.String(http.StatusBadRequest, fmt.Sprintf("This user isn't enrrolled on the vo: %v", service.VO))
return
if len(service.AllowedUsers) > 0 {
// If AllowedUsers is empty don't add uid
service.Labels["uid"] = full_uid[0:8]
service.AllowedUsers = append(service.AllowedUsers, uid)
uids := mc.CheckUsersInCache(service.AllowedUsers)
if len(uids) > 0 {
for _, uid := range uids {
sk, _ := auth.GenerateRandomKey(8)
minIOAdminClient.CreateMinIOUser(uid, sk)
mc.CreateSecretForOIDC(uid, sk)
}
}
}
}

Expand All @@ -93,7 +128,7 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand
}

// Create buckets/folders based on the Input and Output and enable notifications
if err := createBuckets(&service, cfg); err != nil {
if err := createBuckets(&service, cfg, minIOAdminClient, service.AllowedUsers, false); err != nil {
if err == errInput {
c.String(http.StatusBadRequest, err.Error())
} else {
Expand Down Expand Up @@ -140,10 +175,6 @@ func checkValues(service *types.Service, cfg *types.Config) {
service.Labels[types.YunikornApplicationIDLabel] = service.Name
service.Labels[types.YunikornQueueLabel] = fmt.Sprintf("%s.%s.%s", types.YunikornRootQueue, types.YunikornOscarQueue, service.Name)

if service.VO != "" {
service.Labels["vo"] = service.VO
}

// Create default annotations map
if service.Annotations == nil {
service.Annotations = make(map[string]string)
Expand All @@ -157,7 +188,6 @@ func checkValues(service *types.Service, cfg *types.Config) {
service.StorageProviders.MinIO = map[string]*types.MinIOProvider{
types.DefaultProvider: cfg.MinIOProvider,
}

}
} else {
service.StorageProviders = &types.StorageProviders{
Expand All @@ -171,7 +201,7 @@ func checkValues(service *types.Service, cfg *types.Config) {
service.Token = utils.GenerateToken()
}

func createBuckets(service *types.Service, cfg *types.Config) error {
func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient *utils.MinIOAdminClient, allowed_users []string, isUpdate bool) error {
var s3Client *s3.S3
var cdmiClient *cdmi.Client
var provName, provID string
Expand Down Expand Up @@ -233,6 +263,26 @@ func createBuckets(service *types.Service, cfg *types.Config) error {
return fmt.Errorf("error creating bucket %s: %v", splitPath[0], err)
}
}

// Create group for the service and add users
if !isAdminUser {
if len(allowed_users) == 0 {
err = minIOAdminClient.AddServiceToAllUsersGroup(splitPath[0])
} else {
if !isUpdate {
err = minIOAdminClient.CreateServiceGroup(splitPath[0])
if err != nil {
return fmt.Errorf("error creating service group for bucket %s: %v", splitPath[0], err)
}
} else {
minIOAdminClient.DeleteServiceGroup(splitPath[0])
}
err = minIOAdminClient.AddUserToGroup(allowed_users, splitPath[0])
if err != nil {
return err
}
}
}
// Create folder(s)
if len(splitPath) == 2 {
// Add "/" to the end of the key in order to create a folder
Expand All @@ -250,7 +300,6 @@ func createBuckets(service *types.Service, cfg *types.Config) error {
if err := enableInputNotification(s3Client, service.GetMinIOWebhookARN(), in); err != nil {
return err
}

}

// Create output buckets
Expand Down Expand Up @@ -347,6 +396,25 @@ func isStorageProviderDefined(storageName string, storageID string, providers *t
return ok
}

func checkIdentity(service *types.Service, cfg *types.Config, authHeader string) error {
oidcManager, _ := auth.NewOIDCManager(cfg.OIDCIssuer, cfg.OIDCSubject, cfg.OIDCGroups)
rawToken := strings.TrimPrefix(authHeader, "Bearer ")

hasVO, err := oidcManager.UserHasVO(rawToken, service.VO)

if err != nil {
return err
}

if !hasVO {
return fmt.Errorf("This user isn't enrrolled on the vo: %v", service.VO)
}

service.Labels["vo"] = service.VO

return nil
}

func registerMinIOWebhook(name string, token string, minIO *types.MinIOProvider, cfg *types.Config) error {
minIOAdminClient, err := utils.MakeMinIOAdminClient(cfg)
if err != nil {
Expand Down
Loading

0 comments on commit fbb8301

Please sign in to comment.