diff --git a/docs/images/bucket-list.png b/docs/images/bucket-list.png new file mode 100644 index 00000000..dba9aa46 Binary files /dev/null and b/docs/images/bucket-list.png differ diff --git a/docs/images/minio-ui.png b/docs/images/minio-ui.png new file mode 100644 index 00000000..f90d710a Binary files /dev/null and b/docs/images/minio-ui.png differ diff --git a/docs/images/oscar-info.png b/docs/images/oscar-info.png new file mode 100644 index 00000000..1c9b969e Binary files /dev/null and b/docs/images/oscar-info.png differ diff --git a/docs/minio_usage.md b/docs/minio_usage.md new file mode 100644 index 00000000..a8d41b20 --- /dev/null +++ b/docs/minio_usage.md @@ -0,0 +1,48 @@ +# Accessing and managing MinIO storage provider + +Each OSCAR cluster includes a deployed MinIO instance, which is used to trigger service executions. When a service is configured to use MinIO as its storage provider, it monitors a specified input folder for new data. Whenever new data is added to this folder, it triggers the associated service to execute. + +## Using graphical interfaces + +These folders can be accessed via both the OSCAR UI and the MinIO console UI. + +- **Using OSCAR-UI**: The following image highlights the section where MinIO buckets are accessible. Users can view a list of available buckets and perform operations such as uploading and deleting files. + +![minio-buckets](images/bucket-list.png) + +- **Using the MinIO Console UI**: Access details for this interface are available in the "Info" tab within the OSCAR UI. This tab provides the MinIO console endpoint and the necessary credentials to log in, where the *Access Key* serves as the username, and the *Secret Key* functions as the password. + +![oscar-info](images/oscar-info.png) + +Finally, the following image provides an overview of the MinIO login panel and the "Object Browser" tab. Once logged in, the "Object Browser" tab allows users to navigate their available buckets, view stored objects, and perform various operations such as uploading, downloading, or deleting files. + +![oscar-info](images/minio-ui.png) + +Further information about the MinIO Console avaliable on [MinIO Console documentation](https://min.io/docs/minio/linux/administration/minio-console.html). + +## Using command-line interfaces + +MinIO buckets can also be managed through [oscar-cli command-line](https://github.com/grycap/oscar-cli) or the official [MinIO client](https://min.io/docs/minio/linux/reference/minio-mc.html). + +- **oscar-cli**: The OSCAR client provides a dedicated set of commands for accessing files within buckets. It is important to note that this interface does not support DELETE or UPDATE operations. Below is a brief overview of the available commands and their functionalities. + - [get-file](https://docs.oscar.grycap.net/oscar-cli/#get-file): Get file from a service's storage provider. + - [list-files](https://docs.oscar.grycap.net/oscar-cli/#list-files): List files from a service's storage provider path. + - [put-file](https://docs.oscar.grycap.net/oscar-cli/#put-file): Upload a file on a service storage provider. + + An example of a put-file operation: + ``` sh + oscar-cli service put-file fish-detector.yaml minio .path/to/your/images ./fish-detector/input/ + ``` + +- **mc**: If a user wants to use the MinIO client it needs to follow some previous steps. + - *Install the client*: Detailed instructions for installing the MinIO client (mc) are available in [the official documentation](https://min.io/docs/minio/linux/reference/minio-mc.html#install-mc). + - *Configure the MinIO instance*: The client requires credentials to connect and interact with the MinIO instance. This configuration can be set with the following command: + + ``` sh + mc alias set myminio https://minio.gracious-varahamihira6.im.grycap.net YOUR-ACCESS-KEY YOUR-SECRET-KEY + ``` + + Once the client is configured, users can perform various operations supported by the MinIO client. For a complete list of available commands and their usage, refer to the [MinIO client reference](https://min.io/docs/minio/linux/reference/minio-mc.html#command-quick-reference). The following example demonstrates a PUT operation, where a file is uploaded to a specific folder within a bucket. + ``` sh + mc cp /path/to/your/images/*.jpg myminio/fish-detector/input/ + ``` diff --git a/mkdocs.yml b/mkdocs.yml index fa259848..6a6457cd 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -21,6 +21,7 @@ nav: - oscar-cli.md - usage-ui.md - api.md + - minio-upload.md - Service Execution: - invoking.md diff --git a/pkg/backends/k8s.go b/pkg/backends/k8s.go index 79f41ba6..0879991d 100644 --- a/pkg/backends/k8s.go +++ b/pkg/backends/k8s.go @@ -216,6 +216,14 @@ func (k *KubeBackend) UpdateService(service types.Service) error { } } + //Create deaemonset to cache the service image on all the nodes + if service.ImagePrefetch { + err = imagepuller.CreateDaemonset(k.config, service, k.kubeClientset) + if err != nil { + return err + } + } + return nil } diff --git a/pkg/backends/knative.go b/pkg/backends/knative.go index 08497817..566352c4 100644 --- a/pkg/backends/knative.go +++ b/pkg/backends/knative.go @@ -226,6 +226,14 @@ func (kn *KnativeBackend) UpdateService(service types.Service) error { } } + //Create deaemonset to cache the service image on all the nodes + if service.ImagePrefetch { + err = imagepuller.CreateDaemonset(kn.config, service, kn.kubeClientset) + if err != nil { + return err + } + } + return nil } diff --git a/pkg/handlers/create.go b/pkg/handlers/create.go index 095dbe09..30e16bdc 100644 --- a/pkg/handlers/create.go +++ b/pkg/handlers/create.go @@ -40,6 +40,7 @@ const ( defaultMemory = "256Mi" defaultCPU = "0.2" defaultLogLevel = "INFO" + createPath = "/system/services" ) var errInput = errors.New("unrecognized input (valid inputs are MinIO and dCache)") @@ -56,7 +57,7 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand if len(strings.Split(authHeader, "Bearer")) == 1 { isAdminUser = true service.Owner = "cluster_admin" - createLogger.Printf("Creating service for user: %s", service.Owner) + createLogger.Printf("Creating service '%s' for user '%s'", service.Name, service.Owner) } if err := c.ShouldBindJSON(&service); err != nil { @@ -79,7 +80,7 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand // Set UID from owner service.Owner = uid - createLogger.Printf("Creating service for user: %s", service.Owner) + createLogger.Printf("Creating service '%s' for user '%s'", service.Name, service.Owner) mc, err := auth.GetMultitenancyConfigFromContext(c) if err != nil { @@ -169,7 +170,11 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand log.Println(err.Error()) } } - createLogger.Println("Service created with name: ", service.Name) + uid := service.Owner + if service.Owner == "" { + uid = "nil" + } + createLogger.Printf("%s | %v | %s | %s | %s", "POST", 200, createPath, service.Name, uid) c.Status(http.StatusCreated) } } diff --git a/pkg/handlers/job.go b/pkg/handlers/job.go index 962bcc55..e0319d63 100644 --- a/pkg/handlers/job.go +++ b/pkg/handlers/job.go @@ -23,6 +23,7 @@ import ( "io" "log" "net/http" + "os" "strconv" "strings" @@ -31,6 +32,7 @@ import ( "github.com/grycap/oscar/v3/pkg/resourcemanager" "github.com/grycap/oscar/v3/pkg/types" "github.com/grycap/oscar/v3/pkg/utils/auth" + genericErrors "github.com/pkg/errors" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -45,7 +47,8 @@ var ( // Don't restart jobs in order to keep logs restartPolicy = v1.RestartPolicyNever // command used for passing the event to faas-supervisor - command = []string{"/bin/sh"} + command = []string{"/bin/sh"} + jobLogger = log.New(os.Stdout, "[JOB-HANDLER] ", log.Flags()) ) const ( @@ -99,6 +102,7 @@ func MakeJobHandler(cfg *types.Config, kubeClientset kubernetes.Interface, back } // If isn't service token check if it is an oidc token + var uidFromToken string if len(rawToken) != tokenLength { oidcManager, _ := auth.NewOIDCManager(cfg.OIDCIssuer, cfg.OIDCSubject, cfg.OIDCGroups) @@ -118,6 +122,13 @@ func MakeJobHandler(cfg *types.Config, kubeClientset kubernetes.Interface, back c.String(http.StatusUnauthorized, "this user isn't enrrolled on the vo: %v", service.VO) return } + + // Get UID from token + var uidErr error + uidFromToken, uidErr = oidcManager.GetUID(rawToken) + if uidErr != nil { + jobLogger.Println("WARNING:", uidErr) + } } // Get the event from request body @@ -127,26 +138,22 @@ func MakeJobHandler(cfg *types.Config, kubeClientset kubernetes.Interface, back return } - // Extract user UID from MinIO event - var decoded map[string]interface{} - if err := json.Unmarshal(eventBytes, &decoded); err != nil { - c.String(http.StatusInternalServerError, err.Error()) - return - } - records := decoded["Records"].([]interface{}) // Check if it has the MinIO event format - if records != nil { - r := records[0].(map[string]interface{}) - - eventInfo := r["requestParameters"].(map[string]interface{}) - uid := eventInfo["principalId"] - sourceIPAddress := eventInfo["sourceIPAddress"] - + uid, sourceIPAddress, err := decodeEventBytes(eventBytes) + if err != nil { + // Check if the request was made with OIDC token to get user UID + if uidFromToken != "" { + c.Set("uidOrigin", uidFromToken) + } else { + // Set as nil string if unable to get an UID + jobLogger.Println("WARNING:", err) + c.Set("uidOrigin", "nil") + } + } else { c.Set("IPAddress", sourceIPAddress) c.Set("uidOrigin", uid) - } else { - c.Set("uidOrigin", "nil") } + c.Next() // Initialize event envVar and args var @@ -161,12 +168,13 @@ func MakeJobHandler(cfg *types.Config, kubeClientset kubernetes.Interface, back args = []string{"-c", fmt.Sprintf("echo $%s | %s", types.EventVariable, service.GetSupervisorPath()) + ";echo \"I finish\" > /tmpfolder/finish-file;"} types.SetMount(podSpec, *service, cfg) } else { - event = v1.EnvVar{ - Name: types.EventVariable, - Value: string(eventBytes), - } args = []string{"-c", fmt.Sprintf("echo $%s | %s", types.EventVariable, service.GetSupervisorPath())} } + + event = v1.EnvVar{ + Name: types.EventVariable, + Value: string(eventBytes), + } } // Make JOB_UUID envVar @@ -208,7 +216,7 @@ func MakeJobHandler(cfg *types.Config, kubeClientset kubernetes.Interface, back c.Status(http.StatusCreated) return } - log.Printf("unable to delegate job. Error: %v\n", err) + jobLogger.Printf("unable to delegate job. Error: %v\n", err) } } @@ -243,7 +251,6 @@ func MakeJobHandler(cfg *types.Config, kubeClientset kubernetes.Interface, back } } - // Create job _, err = kubeClientset.BatchV1().Jobs(cfg.ServicesNamespace).Create(context.TODO(), job, metav1.CreateOptions{}) if err != nil { c.String(http.StatusInternalServerError, err.Error()) @@ -252,3 +259,31 @@ func MakeJobHandler(cfg *types.Config, kubeClientset kubernetes.Interface, back c.Status(http.StatusCreated) } } + +func decodeEventBytes(eventBytes []byte) (string, string, error) { + + defer func() { + // recover from panic, if one occurs + if r := recover(); r != nil { + jobLogger.Println("Recovered from panic:", r) + } + }() + // Extract user UID from MinIO event + var decoded map[string]interface{} + if err := json.Unmarshal(eventBytes, &decoded); err != nil { + return "", "", err + } + + if records, panicErr := decoded["Records"].([]interface{}); panicErr { + r := records[0].(map[string]interface{}) + + eventInfo := r["requestParameters"].(map[string]interface{}) + uid := eventInfo["principalId"] + sourceIPAddress := eventInfo["sourceIPAddress"] + + return uid.(string), sourceIPAddress.(string), nil + } else { + return "", "", genericErrors.New("Failed to decode records") + } + +} diff --git a/pkg/imagepuller/daemonset.go b/pkg/imagepuller/daemonset.go index 037685ca..7d5a6f9f 100644 --- a/pkg/imagepuller/daemonset.go +++ b/pkg/imagepuller/daemonset.go @@ -59,7 +59,7 @@ var stopper chan struct{} // Create daemonset func CreateDaemonset(cfg *types.Config, service types.Service, kubeClientset kubernetes.Interface) error { - + DaemonSetLoggerInfo.Println("Creating daemonset for service:", service.Name) //Set needed variables setWorkingNodes(kubeClientset) podGroup = generatePodGroupName() @@ -73,8 +73,6 @@ func CreateDaemonset(cfg *types.Config, service types.Service, kubeClientset kub if err != nil { DaemonSetLoggerInfo.Println(err) return fmt.Errorf("failed to create daemonset: %s", err.Error()) - } else { - DaemonSetLoggerInfo.Println("Created daemonset for service:", service.Name) } //Set watcher informer diff --git a/pkg/utils/auth/oidc.go b/pkg/utils/auth/oidc.go index 7582695f..fb60575b 100644 --- a/pkg/utils/auth/oidc.go +++ b/pkg/utils/auth/oidc.go @@ -198,11 +198,10 @@ func (om *oidcManager) UserHasVO(rawToken string, vo string) (bool, error) { func (om *oidcManager) GetUID(rawToken string) (string, error) { ui, err := om.GetUserInfo(rawToken) - oidcLogger.Println("received uid: ", ui.Subject) if err != nil { - return ui.Subject, nil + return "", err } - return "", err + return ui.Subject, nil } // IsAuthorised checks if a token is authorised to access the API