diff --git a/pkg/resourcemanager/delegate.go b/pkg/resourcemanager/delegate.go index 566c52f6..972d5160 100644 --- a/pkg/resourcemanager/delegate.go +++ b/pkg/resourcemanager/delegate.go @@ -441,141 +441,145 @@ func DelegateJob(service *types.Service, event string, logger *log.Logger) error storage_provider := service.ClusterID //Create event depending on delegation level fmt.Println("Storage_provider : ", storage_provider) - eventJSON := eventBuild(event, storage_provider) - fmt.Println(string(eventJSON)) + if storage_provider != "" { + eventJSON := eventBuild(event, storage_provider) + fmt.Println(string(eventJSON)) + + for _, replica := range service.Replicas { + // Manage if replica.Type is "oscar" and have the capacity to receive a service + fmt.Println("Delegation job in ClusterID: ", replica.ClusterID, "with Priority ", replica.Priority) + if strings.ToLower(replica.Type) == oscarReplicaType && replica.Priority < noDelegateCode { + // Check ClusterID is defined in 'Clusters' + fmt.Println("Delegating ...") + cluster, ok := service.Clusters[replica.ClusterID] + if !ok { + logger.Printf("Error delegating service \"%s\" to ClusterID \"%s\": Cluster not defined\n", service.Name, replica.ClusterID) + continue + } - for _, replica := range service.Replicas { - // Manage if replica.Type is "oscar" and have the capacity to receive a service - fmt.Println("Delegation job in ClusterID: ", replica.ClusterID, "with Priority ", replica.Priority) - if strings.ToLower(replica.Type) == oscarReplicaType && replica.Priority < noDelegateCode { - // Check ClusterID is defined in 'Clusters' - fmt.Println("Delegating ...") - cluster, ok := service.Clusters[replica.ClusterID] - if !ok { - logger.Printf("Error delegating service \"%s\" to ClusterID \"%s\": Cluster not defined\n", service.Name, replica.ClusterID) - continue - } + // Get token + token, err := getServiceToken(replica, cluster) + if err != nil { + logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": %v\n", service.Name, replica.ClusterID, err) + continue + } - // Get token - token, err := getServiceToken(replica, cluster) - if err != nil { - logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": %v\n", service.Name, replica.ClusterID, err) - continue - } + // Parse the cluster's endpoint URL and add the service's path + postJobURL, err := url.Parse(cluster.Endpoint) + if err != nil { + logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to parse cluster endpoint \"%s\": %v\n", service.Name, replica.ClusterID, cluster.Endpoint, err) + continue + } + postJobURL.Path = path.Join(postJobURL.Path, "job", replica.ServiceName) - // Parse the cluster's endpoint URL and add the service's path - postJobURL, err := url.Parse(cluster.Endpoint) - if err != nil { - logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to parse cluster endpoint \"%s\": %v\n", service.Name, replica.ClusterID, cluster.Endpoint, err) - continue - } - postJobURL.Path = path.Join(postJobURL.Path, "job", replica.ServiceName) + // Make request to get service's definition (including token) from cluster + //fmt.Println(string(eventJSON)) + req, err := http.NewRequest(http.MethodPost, postJobURL.String(), bytes.NewBuffer(eventJSON)) - // Make request to get service's definition (including token) from cluster - //fmt.Println(string(eventJSON)) - req, err := http.NewRequest(http.MethodPost, postJobURL.String(), bytes.NewBuffer(eventJSON)) + if err != nil { + logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to make request: %v\n", service.Name, replica.ClusterID, err) + continue + } - if err != nil { - logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to make request: %v\n", service.Name, replica.ClusterID, err) - continue - } + // Add Headers + for k, v := range replica.Headers { + req.Header.Add(k, v) + } - // Add Headers - for k, v := range replica.Headers { - req.Header.Add(k, v) - } + // Add service token to the request + req.Header.Add("Authorization", "Bearer "+strings.TrimSpace(token)) - // Add service token to the request - req.Header.Add("Authorization", "Bearer "+strings.TrimSpace(token)) + // Make HTTP client - // Make HTTP client + var transport http.RoundTripper = &http.Transport{ + // Enable/disable SSL verification + TLSClientConfig: &tls.Config{InsecureSkipVerify: !cluster.SSLVerify}, + } - var transport http.RoundTripper = &http.Transport{ - // Enable/disable SSL verification - TLSClientConfig: &tls.Config{InsecureSkipVerify: !cluster.SSLVerify}, - } + client := &http.Client{ + Transport: transport, + Timeout: time.Second * 20, + } - client := &http.Client{ - Transport: transport, - Timeout: time.Second * 20, - } + // Send the request + res, err := client.Do(req) + if err != nil { + logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to send request: %v\n", service.Name, replica.ClusterID, err) + continue + } - // Send the request - res, err := client.Do(req) - if err != nil { - logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to send request: %v\n", service.Name, replica.ClusterID, err) - continue + // Check status code + if res.StatusCode == http.StatusCreated { + logger.Printf("Job successfully delegated to cluster \"%s\"\n", replica.ClusterID) + return nil + } else if res.StatusCode == http.StatusUnauthorized { + // Retry updating the token + token, err := updateServiceToken(replica, cluster) + if err != nil { + logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": %v\n", service.Name, replica.ClusterID, err) + continue + } + // Add service token to the request + req.Header.Add("Authorization", "Bearer "+strings.TrimSpace(token)) + + // Send the request + res, err = client.Do(req) + if err != nil { + logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to send request: %v\n", service.Name, replica.ClusterID, err) + continue + } + } + log.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": Status code %d\n", service.Name, replica.ClusterID, res.StatusCode) } - // Check status code - if res.StatusCode == http.StatusCreated { - logger.Printf("Job successfully delegated to cluster \"%s\"\n", replica.ClusterID) - return nil - } else if res.StatusCode == http.StatusUnauthorized { - // Retry updating the token - token, err := updateServiceToken(replica, cluster) + // Manage if replica.Type is "endpoint" + if strings.ToLower(replica.Type) == endpointReplicaType { + // Parse the replica URL to check if it's valid + replicaURL, err := url.Parse(replica.URL) if err != nil { - logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": %v\n", service.Name, replica.ClusterID, err) + logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to parse URL: %v\n", service.Name, replica.URL, err) continue } - // Add service token to the request - req.Header.Add("Authorization", "Bearer "+strings.TrimSpace(token)) - // Send the request - res, err = client.Do(req) + // Make request to get service's definition (including token) from cluster + req, err := http.NewRequest(http.MethodPost, replicaURL.String(), bytes.NewBuffer(eventJSON)) if err != nil { - logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to send request: %v\n", service.Name, replica.ClusterID, err) + logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to make request: %v\n", service.Name, replica.URL, err) continue } - } - log.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": Status code %d\n", service.Name, replica.ClusterID, res.StatusCode) - } - - // Manage if replica.Type is "endpoint" - if strings.ToLower(replica.Type) == endpointReplicaType { - // Parse the replica URL to check if it's valid - replicaURL, err := url.Parse(replica.URL) - if err != nil { - logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to parse URL: %v\n", service.Name, replica.URL, err) - continue - } - // Make request to get service's definition (including token) from cluster - req, err := http.NewRequest(http.MethodPost, replicaURL.String(), bytes.NewBuffer(eventJSON)) - if err != nil { - logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to make request: %v\n", service.Name, replica.URL, err) - continue - } - - // Add Headers - for k, v := range replica.Headers { - req.Header.Add(k, v) - } + // Add Headers + for k, v := range replica.Headers { + req.Header.Add(k, v) + } - // Make HTTP client - var transport http.RoundTripper = &http.Transport{ - // Enable/disable SSL verification - TLSClientConfig: &tls.Config{InsecureSkipVerify: !replica.SSLVerify}, - } - client := &http.Client{ - Transport: transport, - Timeout: time.Second * 20, - } + // Make HTTP client + var transport http.RoundTripper = &http.Transport{ + // Enable/disable SSL verification + TLSClientConfig: &tls.Config{InsecureSkipVerify: !replica.SSLVerify}, + } + client := &http.Client{ + Transport: transport, + Timeout: time.Second * 20, + } - // Send the request - res, err := client.Do(req) - if err != nil { - logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to send request: %v\n", service.Name, replica.URL, err) - continue - } + // Send the request + res, err := client.Do(req) + if err != nil { + logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to send request: %v\n", service.Name, replica.URL, err) + continue + } - // Check status code - if res.StatusCode == http.StatusOK { - logger.Printf("Job successfully delegated to endpoint \"%s\"\n", replica.URL) - return nil + // Check status code + if res.StatusCode == http.StatusOK { + logger.Printf("Job successfully delegated to endpoint \"%s\"\n", replica.URL) + return nil + } + logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": Status code %d\n", service.Name, replica.URL, res.StatusCode) } - logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": Status code %d\n", service.Name, replica.URL, res.StatusCode) } + } else { + fmt.Println("Error by Storage_Provider empty.") } return fmt.Errorf("unable to delegate job from service \"%s\" to any replica, scheduling in the current cluster", service.Name)