Skip to content

Commit

Permalink
Merge pull request #267 from grycap/devel
Browse files Browse the repository at this point in the history
Minor changes
  • Loading branch information
catttam authored Nov 18, 2024
2 parents 1020601 + c2036fd commit 9e10935
Show file tree
Hide file tree
Showing 11 changed files with 134 additions and 32 deletions.
Binary file added docs/images/bucket-list.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/minio-ui.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/oscar-info.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
48 changes: 48 additions & 0 deletions docs/minio_usage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Accessing and managing MinIO storage provider

Each OSCAR cluster includes a deployed MinIO instance, which is used to trigger service executions. When a service is configured to use MinIO as its storage provider, it monitors a specified input folder for new data. Whenever new data is added to this folder, it triggers the associated service to execute.

## Using graphical interfaces

These folders can be accessed via both the OSCAR UI and the MinIO console UI.

- **Using OSCAR-UI**: The following image highlights the section where MinIO buckets are accessible. Users can view a list of available buckets and perform operations such as uploading and deleting files.

![minio-buckets](images/bucket-list.png)

- **Using the MinIO Console UI**: Access details for this interface are available in the "Info" tab within the OSCAR UI. This tab provides the MinIO console endpoint and the necessary credentials to log in, where the *Access Key* serves as the username, and the *Secret Key* functions as the password.

![oscar-info](images/oscar-info.png)

Finally, the following image provides an overview of the MinIO login panel and the "Object Browser" tab. Once logged in, the "Object Browser" tab allows users to navigate their available buckets, view stored objects, and perform various operations such as uploading, downloading, or deleting files.

![oscar-info](images/minio-ui.png)

Further information about the MinIO Console avaliable on [MinIO Console documentation](https://min.io/docs/minio/linux/administration/minio-console.html).

## Using command-line interfaces

MinIO buckets can also be managed through [oscar-cli command-line](https://github.com/grycap/oscar-cli) or the official [MinIO client](https://min.io/docs/minio/linux/reference/minio-mc.html).

- **oscar-cli**: The OSCAR client provides a dedicated set of commands for accessing files within buckets. It is important to note that this interface does not support DELETE or UPDATE operations. Below is a brief overview of the available commands and their functionalities.
- [get-file](https://docs.oscar.grycap.net/oscar-cli/#get-file): Get file from a service's storage provider.
- [list-files](https://docs.oscar.grycap.net/oscar-cli/#list-files): List files from a service's storage provider path.
- [put-file](https://docs.oscar.grycap.net/oscar-cli/#put-file): Upload a file on a service storage provider.

An example of a put-file operation:
``` sh
oscar-cli service put-file fish-detector.yaml minio .path/to/your/images ./fish-detector/input/
```

- **mc**: If a user wants to use the MinIO client it needs to follow some previous steps.
- *Install the client*: Detailed instructions for installing the MinIO client (mc) are available in [the official documentation](https://min.io/docs/minio/linux/reference/minio-mc.html#install-mc).
- *Configure the MinIO instance*: The client requires credentials to connect and interact with the MinIO instance. This configuration can be set with the following command:

``` sh
mc alias set myminio https://minio.gracious-varahamihira6.im.grycap.net YOUR-ACCESS-KEY YOUR-SECRET-KEY
```

Once the client is configured, users can perform various operations supported by the MinIO client. For a complete list of available commands and their usage, refer to the [MinIO client reference](https://min.io/docs/minio/linux/reference/minio-mc.html#command-quick-reference). The following example demonstrates a PUT operation, where a file is uploaded to a specific folder within a bucket.
``` sh
mc cp /path/to/your/images/*.jpg myminio/fish-detector/input/
```
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ nav:
- oscar-cli.md
- usage-ui.md
- api.md
- minio-upload.md

- Service Execution:
- invoking.md
Expand Down
8 changes: 8 additions & 0 deletions pkg/backends/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,14 @@ func (k *KubeBackend) UpdateService(service types.Service) error {
}
}

//Create deaemonset to cache the service image on all the nodes
if service.ImagePrefetch {
err = imagepuller.CreateDaemonset(k.config, service, k.kubeClientset)
if err != nil {
return err
}
}

return nil
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/backends/knative.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,14 @@ func (kn *KnativeBackend) UpdateService(service types.Service) error {
}
}

//Create deaemonset to cache the service image on all the nodes
if service.ImagePrefetch {
err = imagepuller.CreateDaemonset(kn.config, service, kn.kubeClientset)
if err != nil {
return err
}
}

return nil
}

Expand Down
11 changes: 8 additions & 3 deletions pkg/handlers/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
defaultMemory = "256Mi"
defaultCPU = "0.2"
defaultLogLevel = "INFO"
createPath = "/system/services"
)

var errInput = errors.New("unrecognized input (valid inputs are MinIO and dCache)")
Expand All @@ -56,7 +57,7 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand
if len(strings.Split(authHeader, "Bearer")) == 1 {
isAdminUser = true
service.Owner = "cluster_admin"
createLogger.Printf("Creating service for user: %s", service.Owner)
createLogger.Printf("Creating service '%s' for user '%s'", service.Name, service.Owner)
}

if err := c.ShouldBindJSON(&service); err != nil {
Expand All @@ -79,7 +80,7 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand

// Set UID from owner
service.Owner = uid
createLogger.Printf("Creating service for user: %s", service.Owner)
createLogger.Printf("Creating service '%s' for user '%s'", service.Name, service.Owner)

mc, err := auth.GetMultitenancyConfigFromContext(c)
if err != nil {
Expand Down Expand Up @@ -169,7 +170,11 @@ func MakeCreateHandler(cfg *types.Config, back types.ServerlessBackend) gin.Hand
log.Println(err.Error())
}
}
createLogger.Println("Service created with name: ", service.Name)
uid := service.Owner
if service.Owner == "" {
uid = "nil"
}
createLogger.Printf("%s | %v | %s | %s | %s", "POST", 200, createPath, service.Name, uid)
c.Status(http.StatusCreated)
}
}
Expand Down
81 changes: 58 additions & 23 deletions pkg/handlers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"io"
"log"
"net/http"
"os"
"strconv"
"strings"

Expand All @@ -31,6 +32,7 @@ import (
"github.com/grycap/oscar/v3/pkg/resourcemanager"
"github.com/grycap/oscar/v3/pkg/types"
"github.com/grycap/oscar/v3/pkg/utils/auth"
genericErrors "github.com/pkg/errors"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -45,7 +47,8 @@ var (
// Don't restart jobs in order to keep logs
restartPolicy = v1.RestartPolicyNever
// command used for passing the event to faas-supervisor
command = []string{"/bin/sh"}
command = []string{"/bin/sh"}
jobLogger = log.New(os.Stdout, "[JOB-HANDLER] ", log.Flags())
)

const (
Expand Down Expand Up @@ -99,6 +102,7 @@ func MakeJobHandler(cfg *types.Config, kubeClientset kubernetes.Interface, back
}

// If isn't service token check if it is an oidc token
var uidFromToken string
if len(rawToken) != tokenLength {
oidcManager, _ := auth.NewOIDCManager(cfg.OIDCIssuer, cfg.OIDCSubject, cfg.OIDCGroups)

Expand All @@ -118,6 +122,13 @@ func MakeJobHandler(cfg *types.Config, kubeClientset kubernetes.Interface, back
c.String(http.StatusUnauthorized, "this user isn't enrrolled on the vo: %v", service.VO)
return
}

// Get UID from token
var uidErr error
uidFromToken, uidErr = oidcManager.GetUID(rawToken)
if uidErr != nil {
jobLogger.Println("WARNING:", uidErr)
}
}

// Get the event from request body
Expand All @@ -127,26 +138,22 @@ func MakeJobHandler(cfg *types.Config, kubeClientset kubernetes.Interface, back
return
}

// Extract user UID from MinIO event
var decoded map[string]interface{}
if err := json.Unmarshal(eventBytes, &decoded); err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
records := decoded["Records"].([]interface{})
// Check if it has the MinIO event format
if records != nil {
r := records[0].(map[string]interface{})

eventInfo := r["requestParameters"].(map[string]interface{})
uid := eventInfo["principalId"]
sourceIPAddress := eventInfo["sourceIPAddress"]

uid, sourceIPAddress, err := decodeEventBytes(eventBytes)
if err != nil {
// Check if the request was made with OIDC token to get user UID
if uidFromToken != "" {
c.Set("uidOrigin", uidFromToken)
} else {
// Set as nil string if unable to get an UID
jobLogger.Println("WARNING:", err)
c.Set("uidOrigin", "nil")
}
} else {
c.Set("IPAddress", sourceIPAddress)
c.Set("uidOrigin", uid)
} else {
c.Set("uidOrigin", "nil")
}

c.Next()

// Initialize event envVar and args var
Expand All @@ -161,12 +168,13 @@ func MakeJobHandler(cfg *types.Config, kubeClientset kubernetes.Interface, back
args = []string{"-c", fmt.Sprintf("echo $%s | %s", types.EventVariable, service.GetSupervisorPath()) + ";echo \"I finish\" > /tmpfolder/finish-file;"}
types.SetMount(podSpec, *service, cfg)
} else {
event = v1.EnvVar{
Name: types.EventVariable,
Value: string(eventBytes),
}
args = []string{"-c", fmt.Sprintf("echo $%s | %s", types.EventVariable, service.GetSupervisorPath())}
}

event = v1.EnvVar{
Name: types.EventVariable,
Value: string(eventBytes),
}
}

// Make JOB_UUID envVar
Expand Down Expand Up @@ -208,7 +216,7 @@ func MakeJobHandler(cfg *types.Config, kubeClientset kubernetes.Interface, back
c.Status(http.StatusCreated)
return
}
log.Printf("unable to delegate job. Error: %v\n", err)
jobLogger.Printf("unable to delegate job. Error: %v\n", err)
}
}

Expand Down Expand Up @@ -243,7 +251,6 @@ func MakeJobHandler(cfg *types.Config, kubeClientset kubernetes.Interface, back
}
}

// Create job
_, err = kubeClientset.BatchV1().Jobs(cfg.ServicesNamespace).Create(context.TODO(), job, metav1.CreateOptions{})
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
Expand All @@ -252,3 +259,31 @@ func MakeJobHandler(cfg *types.Config, kubeClientset kubernetes.Interface, back
c.Status(http.StatusCreated)
}
}

func decodeEventBytes(eventBytes []byte) (string, string, error) {

defer func() {
// recover from panic, if one occurs
if r := recover(); r != nil {
jobLogger.Println("Recovered from panic:", r)
}
}()
// Extract user UID from MinIO event
var decoded map[string]interface{}
if err := json.Unmarshal(eventBytes, &decoded); err != nil {
return "", "", err
}

if records, panicErr := decoded["Records"].([]interface{}); panicErr {
r := records[0].(map[string]interface{})

eventInfo := r["requestParameters"].(map[string]interface{})
uid := eventInfo["principalId"]
sourceIPAddress := eventInfo["sourceIPAddress"]

return uid.(string), sourceIPAddress.(string), nil
} else {
return "", "", genericErrors.New("Failed to decode records")
}

}
4 changes: 1 addition & 3 deletions pkg/imagepuller/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ var stopper chan struct{}

// Create daemonset
func CreateDaemonset(cfg *types.Config, service types.Service, kubeClientset kubernetes.Interface) error {

DaemonSetLoggerInfo.Println("Creating daemonset for service:", service.Name)
//Set needed variables
setWorkingNodes(kubeClientset)
podGroup = generatePodGroupName()
Expand All @@ -73,8 +73,6 @@ func CreateDaemonset(cfg *types.Config, service types.Service, kubeClientset kub
if err != nil {
DaemonSetLoggerInfo.Println(err)
return fmt.Errorf("failed to create daemonset: %s", err.Error())
} else {
DaemonSetLoggerInfo.Println("Created daemonset for service:", service.Name)
}

//Set watcher informer
Expand Down
5 changes: 2 additions & 3 deletions pkg/utils/auth/oidc.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,10 @@ func (om *oidcManager) UserHasVO(rawToken string, vo string) (bool, error) {

func (om *oidcManager) GetUID(rawToken string) (string, error) {
ui, err := om.GetUserInfo(rawToken)
oidcLogger.Println("received uid: ", ui.Subject)
if err != nil {
return ui.Subject, nil
return "", err
}
return "", err
return ui.Subject, nil
}

// IsAuthorised checks if a token is authorised to access the API
Expand Down

0 comments on commit 9e10935

Please sign in to comment.