Skip to content

Commit

Permalink
Detect storage_provider empty
Browse files Browse the repository at this point in the history
  • Loading branch information
vicente87 committed Dec 13, 2024
1 parent 5f9b4e6 commit 284a8c0
Showing 1 changed file with 113 additions and 109 deletions.
222 changes: 113 additions & 109 deletions pkg/resourcemanager/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,141 +441,145 @@ func DelegateJob(service *types.Service, event string, logger *log.Logger) error
storage_provider := service.ClusterID
//Create event depending on delegation level
fmt.Println("Storage_provider : ", storage_provider)
eventJSON := eventBuild(event, storage_provider)
fmt.Println(string(eventJSON))
if storage_provider != "" {
eventJSON := eventBuild(event, storage_provider)
fmt.Println(string(eventJSON))

for _, replica := range service.Replicas {
// Manage if replica.Type is "oscar" and have the capacity to receive a service
fmt.Println("Delegation job in ClusterID: ", replica.ClusterID, "with Priority ", replica.Priority)
if strings.ToLower(replica.Type) == oscarReplicaType && replica.Priority < noDelegateCode {
// Check ClusterID is defined in 'Clusters'
fmt.Println("Delegating ...")
cluster, ok := service.Clusters[replica.ClusterID]
if !ok {
logger.Printf("Error delegating service \"%s\" to ClusterID \"%s\": Cluster not defined\n", service.Name, replica.ClusterID)
continue
}

for _, replica := range service.Replicas {
// Manage if replica.Type is "oscar" and have the capacity to receive a service
fmt.Println("Delegation job in ClusterID: ", replica.ClusterID, "with Priority ", replica.Priority)
if strings.ToLower(replica.Type) == oscarReplicaType && replica.Priority < noDelegateCode {
// Check ClusterID is defined in 'Clusters'
fmt.Println("Delegating ...")
cluster, ok := service.Clusters[replica.ClusterID]
if !ok {
logger.Printf("Error delegating service \"%s\" to ClusterID \"%s\": Cluster not defined\n", service.Name, replica.ClusterID)
continue
}
// Get token
token, err := getServiceToken(replica, cluster)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": %v\n", service.Name, replica.ClusterID, err)
continue
}

// Get token
token, err := getServiceToken(replica, cluster)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": %v\n", service.Name, replica.ClusterID, err)
continue
}
// Parse the cluster's endpoint URL and add the service's path
postJobURL, err := url.Parse(cluster.Endpoint)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to parse cluster endpoint \"%s\": %v\n", service.Name, replica.ClusterID, cluster.Endpoint, err)
continue
}
postJobURL.Path = path.Join(postJobURL.Path, "job", replica.ServiceName)

// Parse the cluster's endpoint URL and add the service's path
postJobURL, err := url.Parse(cluster.Endpoint)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to parse cluster endpoint \"%s\": %v\n", service.Name, replica.ClusterID, cluster.Endpoint, err)
continue
}
postJobURL.Path = path.Join(postJobURL.Path, "job", replica.ServiceName)
// Make request to get service's definition (including token) from cluster
//fmt.Println(string(eventJSON))
req, err := http.NewRequest(http.MethodPost, postJobURL.String(), bytes.NewBuffer(eventJSON))

// Make request to get service's definition (including token) from cluster
//fmt.Println(string(eventJSON))
req, err := http.NewRequest(http.MethodPost, postJobURL.String(), bytes.NewBuffer(eventJSON))
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to make request: %v\n", service.Name, replica.ClusterID, err)
continue
}

if err != nil {
logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to make request: %v\n", service.Name, replica.ClusterID, err)
continue
}
// Add Headers
for k, v := range replica.Headers {
req.Header.Add(k, v)
}

// Add Headers
for k, v := range replica.Headers {
req.Header.Add(k, v)
}
// Add service token to the request
req.Header.Add("Authorization", "Bearer "+strings.TrimSpace(token))

// Add service token to the request
req.Header.Add("Authorization", "Bearer "+strings.TrimSpace(token))
// Make HTTP client

// Make HTTP client
var transport http.RoundTripper = &http.Transport{
// Enable/disable SSL verification
TLSClientConfig: &tls.Config{InsecureSkipVerify: !cluster.SSLVerify},
}

var transport http.RoundTripper = &http.Transport{
// Enable/disable SSL verification
TLSClientConfig: &tls.Config{InsecureSkipVerify: !cluster.SSLVerify},
}
client := &http.Client{
Transport: transport,
Timeout: time.Second * 20,
}

client := &http.Client{
Transport: transport,
Timeout: time.Second * 20,
}
// Send the request
res, err := client.Do(req)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to send request: %v\n", service.Name, replica.ClusterID, err)
continue
}

// Send the request
res, err := client.Do(req)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to send request: %v\n", service.Name, replica.ClusterID, err)
continue
// Check status code
if res.StatusCode == http.StatusCreated {
logger.Printf("Job successfully delegated to cluster \"%s\"\n", replica.ClusterID)
return nil
} else if res.StatusCode == http.StatusUnauthorized {
// Retry updating the token
token, err := updateServiceToken(replica, cluster)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": %v\n", service.Name, replica.ClusterID, err)
continue
}
// Add service token to the request
req.Header.Add("Authorization", "Bearer "+strings.TrimSpace(token))

// Send the request
res, err = client.Do(req)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to send request: %v\n", service.Name, replica.ClusterID, err)
continue
}
}
log.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": Status code %d\n", service.Name, replica.ClusterID, res.StatusCode)
}

// Check status code
if res.StatusCode == http.StatusCreated {
logger.Printf("Job successfully delegated to cluster \"%s\"\n", replica.ClusterID)
return nil
} else if res.StatusCode == http.StatusUnauthorized {
// Retry updating the token
token, err := updateServiceToken(replica, cluster)
// Manage if replica.Type is "endpoint"
if strings.ToLower(replica.Type) == endpointReplicaType {
// Parse the replica URL to check if it's valid
replicaURL, err := url.Parse(replica.URL)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": %v\n", service.Name, replica.ClusterID, err)
logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to parse URL: %v\n", service.Name, replica.URL, err)
continue
}
// Add service token to the request
req.Header.Add("Authorization", "Bearer "+strings.TrimSpace(token))

// Send the request
res, err = client.Do(req)
// Make request to get service's definition (including token) from cluster
req, err := http.NewRequest(http.MethodPost, replicaURL.String(), bytes.NewBuffer(eventJSON))
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": unable to send request: %v\n", service.Name, replica.ClusterID, err)
logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to make request: %v\n", service.Name, replica.URL, err)
continue
}
}
log.Printf("Error delegating job from service \"%s\" to ClusterID \"%s\": Status code %d\n", service.Name, replica.ClusterID, res.StatusCode)
}

// Manage if replica.Type is "endpoint"
if strings.ToLower(replica.Type) == endpointReplicaType {
// Parse the replica URL to check if it's valid
replicaURL, err := url.Parse(replica.URL)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to parse URL: %v\n", service.Name, replica.URL, err)
continue
}

// Make request to get service's definition (including token) from cluster
req, err := http.NewRequest(http.MethodPost, replicaURL.String(), bytes.NewBuffer(eventJSON))
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to make request: %v\n", service.Name, replica.URL, err)
continue
}

// Add Headers
for k, v := range replica.Headers {
req.Header.Add(k, v)
}
// Add Headers
for k, v := range replica.Headers {
req.Header.Add(k, v)
}

// Make HTTP client
var transport http.RoundTripper = &http.Transport{
// Enable/disable SSL verification
TLSClientConfig: &tls.Config{InsecureSkipVerify: !replica.SSLVerify},
}
client := &http.Client{
Transport: transport,
Timeout: time.Second * 20,
}
// Make HTTP client
var transport http.RoundTripper = &http.Transport{
// Enable/disable SSL verification
TLSClientConfig: &tls.Config{InsecureSkipVerify: !replica.SSLVerify},
}
client := &http.Client{
Transport: transport,
Timeout: time.Second * 20,
}

// Send the request
res, err := client.Do(req)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to send request: %v\n", service.Name, replica.URL, err)
continue
}
// Send the request
res, err := client.Do(req)
if err != nil {
logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": unable to send request: %v\n", service.Name, replica.URL, err)
continue
}

// Check status code
if res.StatusCode == http.StatusOK {
logger.Printf("Job successfully delegated to endpoint \"%s\"\n", replica.URL)
return nil
// Check status code
if res.StatusCode == http.StatusOK {
logger.Printf("Job successfully delegated to endpoint \"%s\"\n", replica.URL)
return nil
}
logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": Status code %d\n", service.Name, replica.URL, res.StatusCode)
}
logger.Printf("Error delegating job from service \"%s\" to endpoint \"%s\": Status code %d\n", service.Name, replica.URL, res.StatusCode)
}
} else {
fmt.Println("Error by Storage_Provider empty.")
}

return fmt.Errorf("unable to delegate job from service \"%s\" to any replica, scheduling in the current cluster", service.Name)
Expand Down

0 comments on commit 284a8c0

Please sign in to comment.