From 7011a65949d7c41045fd9f60740ac08dfa520d79 Mon Sep 17 00:00:00 2001 From: Diego Lovison Date: Tue, 23 Jul 2024 16:44:30 -0300 Subject: [PATCH] Test artifacts --- tests/artifacts_test.go | 203 ++++++++++++++ .../iris_pipeline_without_cache_compiled.yaml | 261 ++++++++++++++++++ tests/suite_test.go | 5 + 3 files changed, 469 insertions(+) create mode 100644 tests/artifacts_test.go create mode 100644 tests/resources/iris_pipeline_without_cache_compiled.yaml diff --git a/tests/artifacts_test.go b/tests/artifacts_test.go new file mode 100644 index 000000000..84b2ebba9 --- /dev/null +++ b/tests/artifacts_test.go @@ -0,0 +1,203 @@ +//go:build test_integration + +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + TestUtil "github.com/opendatahub-io/data-science-pipelines-operator/tests/util" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "io" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + k8sscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/remotecommand" + "net/http" + "net/url" + "testing" +) + +func (suite *IntegrationTestSuite) TestFetchArtifacts() { + + suite.T().Run("Should successfully fetch artifacts", func(t *testing.T) { + + podName, err := getPodName(clientmgr.clientset, DSPANamespace, "app=ds-pipeline-"+DSPANamespace) + require.NoError(t, err) + + type ResponseArtifact struct { + ArtifactID string `json:"artifact_id"` + DownloadUrl string `json:"download_url"` + } + type ResponseArtifactData struct { + Artifacts []ResponseArtifact `json:"artifacts"` + } + + name := "Test Iris Pipeline" + uploadUrl := fmt.Sprintf("%s/apis/v2beta1/pipelines/upload?name=%s", APIServerURL, url.QueryEscape(name)) + vals := map[string]string{ + "uploadfile": "@resources/iris_pipeline_without_cache_compiled.yaml", + } + bodyUpload, contentTypeUpload := TestUtil.FormFromFile(t, vals) + + response, err := http.Post(uploadUrl, contentTypeUpload, bodyUpload) + require.NoError(t, err) + responseData, err := io.ReadAll(response.Body) + responseString := string(responseData) + loggr.Info(responseString) + require.NoError(t, err) + assert.Equal(t, http.StatusOK, response.StatusCode) + + // Retrieve Pipeline ID to create a new run + pipelineID, err := TestUtil.RetrievePipelineId(t, APIServerURL, name) + require.NoError(t, err) + + // Create a new run + runUrl := fmt.Sprintf("%s/apis/v2beta1/runs", APIServerURL) + bodyRun := TestUtil.FormatRequestBody(t, pipelineID, name) + contentTypeRun := "application/json" + response, err = http.Post(runUrl, contentTypeRun, bytes.NewReader(bodyRun)) + require.NoError(t, err) + responseData, err = io.ReadAll(response.Body) + responseString = string(responseData) + loggr.Info(responseString) + require.NoError(t, err) + require.Equal(t, http.StatusOK, response.StatusCode) + err = TestUtil.WaitForPipelineRunCompletion(t, APIServerURL) + require.NoError(t, err) + + // fetch artifacts + artifactsUrl := fmt.Sprintf("%s/apis/v2beta1/artifacts?namespace=%s", APIServerURL, suite.DSPANamespace) + response, err = http.Get(artifactsUrl) + require.NoError(t, err) + responseData, err = io.ReadAll(response.Body) + require.NoError(t, err) + assert.Equal(t, http.StatusOK, response.StatusCode) + loggr.Info(string(responseData)) + + // iterate over the artifacts + var responseArtifactsData ResponseArtifactData + err = json.Unmarshal([]byte(string(responseData)), &responseArtifactsData) + if err != nil { + t.Errorf("Error unmarshaling JSON: %v", err) + return + } + for _, artifact := range responseArtifactsData.Artifacts { + // get the artifact by ID + artifactsByIdUrl := fmt.Sprintf("%s/apis/v2beta1/artifacts/%s", APIServerURL, artifact.ArtifactID) + response, err = http.Get(artifactsByIdUrl) + require.NoError(t, err) + responseData, err = io.ReadAll(response.Body) + require.NoError(t, err) + assert.Equal(t, http.StatusOK, response.StatusCode) + loggr.Info(string(responseData)) + + // get download url + artifactsByIdUrl = fmt.Sprintf("%s/apis/v2beta1/artifacts/%s?view=DOWNLOAD", APIServerURL, artifact.ArtifactID) + response, err = http.Get(artifactsByIdUrl) + require.NoError(t, err) + responseData, err = io.ReadAll(response.Body) + require.NoError(t, err) + assert.Equal(t, http.StatusOK, response.StatusCode) + loggr.Info(string(responseData)) + + var responseArtifactData ResponseArtifact + err = json.Unmarshal([]byte(string(responseData)), &responseArtifactData) + if err != nil { + t.Errorf("Error unmarshaling JSON: %v", err) + return + } + + downloadUrl, err := getDownloadUrl(responseArtifactData.DownloadUrl) + if err != nil { + t.Errorf("Error retrieving the download url: %v", err) + return + } + + err = execCmdExample(clientmgr.clientset, podName, DSPANamespace, "curl --insecure "+downloadUrl) + require.NoError(t, err) + loggr.Info(downloadUrl) + } + }) +} + +func getDownloadUrl(downloadUrl string) (string, error) { + // the test is running on kind. And it is returning the service + downloadParsedURL, err := url.Parse(downloadUrl) + if err != nil { + return "", err + } + downloadParsedURL.RawQuery = url.QueryEscape(downloadParsedURL.RawQuery) + return downloadParsedURL.String(), nil +} + +func execCmdExample(client kubernetes.Interface, podName, namespace string, command string) error { + cmd := []string{ + "sh", + "-c", + command, + } + req := client.CoreV1().RESTClient().Post().Resource("pods").Name(podName). + Namespace(namespace).SubResource("exec") + option := &v1.PodExecOptions{ + Command: cmd, + Stdin: false, + Stdout: true, + Stderr: true, + TTY: true, + } + req.VersionedParams( + option, + k8sscheme.ParameterCodec, + ) + exec, err := remotecommand.NewSPDYExecutor(cfg, "POST", req.URL()) + if err != nil { + return err + } + var stderrBuffer bytes.Buffer + var stdoutBuffer bytes.Buffer + + err = exec.Stream(remotecommand.StreamOptions{ + Stdin: nil, + Stdout: &stdoutBuffer, + Stderr: &stderrBuffer, + }) + if err != nil { + return err + } + loggr.Info(stdoutBuffer.String()) + loggr.Info(stderrBuffer.String()) + return nil +} + +func getPodName(client kubernetes.Interface, namespace, labelSelector string) (string, error) { + pods, err := client.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{ + LabelSelector: labelSelector, + }) + if err != nil { + return "", fmt.Errorf("failed to list pods: %w", err) + } + if len(pods.Items) == 0 { + return "", fmt.Errorf("no pods found with the label %s", labelSelector) + } + return pods.Items[0].Name, nil +} diff --git a/tests/resources/iris_pipeline_without_cache_compiled.yaml b/tests/resources/iris_pipeline_without_cache_compiled.yaml new file mode 100644 index 000000000..0c1f7f7e1 --- /dev/null +++ b/tests/resources/iris_pipeline_without_cache_compiled.yaml @@ -0,0 +1,261 @@ +# PIPELINE DEFINITION +# Name: iris-training-pipeline +# Inputs: +# neighbors: int [Default: 3.0] +# standard_scaler: bool [Default: True] +# Outputs: +# train-model-metrics: system.ClassificationMetrics +components: + comp-create-dataset: + executorLabel: exec-create-dataset + outputDefinitions: + artifacts: + iris_dataset: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-normalize-dataset: + executorLabel: exec-normalize-dataset + inputDefinitions: + artifacts: + input_iris_dataset: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + parameters: + standard_scaler: + parameterType: BOOLEAN + outputDefinitions: + artifacts: + normalized_iris_dataset: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-train-model: + executorLabel: exec-train-model + inputDefinitions: + artifacts: + normalized_iris_dataset: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + parameters: + n_neighbors: + parameterType: NUMBER_INTEGER + outputDefinitions: + artifacts: + metrics: + artifactType: + schemaTitle: system.ClassificationMetrics + schemaVersion: 0.0.1 + model: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 +deploymentSpec: + executors: + exec-create-dataset: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - create_dataset + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'pandas==2.2.0'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef create_dataset(iris_dataset: Output[Dataset]):\n import pandas\ + \ as pd\n from io import StringIO\n\n data = \"\"\"\n 5.1,3.5,1.4,0.2,Iris-setosa\n\ + \ 4.9,3.0,1.4,0.2,Iris-setosa\n 4.7,3.2,1.3,0.2,Iris-setosa\n 4.6,3.1,1.5,0.2,Iris-setosa\n\ + \ 5.0,3.6,1.4,0.2,Iris-setosa\n 5.7,3.8,1.7,0.3,Iris-setosa\n 5.1,3.8,1.5,0.3,Iris-setosa\n\ + \ 5.4,3.4,1.7,0.2,Iris-setosa\n 5.1,3.7,1.5,0.4,Iris-setosa\n 5.1,3.4,1.5,0.2,Iris-setosa\n\ + \ 5.0,3.5,1.3,0.3,Iris-setosa\n 4.5,2.3,1.3,0.3,Iris-setosa\n 4.4,3.2,1.3,0.2,Iris-setosa\n\ + \ 5.0,3.5,1.6,0.6,Iris-setosa\n 5.1,3.8,1.9,0.4,Iris-setosa\n 4.8,3.0,1.4,0.3,Iris-setosa\n\ + \ 5.1,3.8,1.6,0.2,Iris-setosa\n 4.6,3.2,1.4,0.2,Iris-setosa\n 5.3,3.7,1.5,0.2,Iris-setosa\n\ + \ 5.0,3.3,1.4,0.2,Iris-setosa\n 7.0,3.2,4.7,1.4,Iris-versicolor\n\ + \ 6.4,3.2,4.5,1.5,Iris-versicolor\n 6.9,3.1,4.9,1.5,Iris-versicolor\n\ + \ 5.5,2.3,4.0,1.3,Iris-versicolor\n 6.5,2.8,4.6,1.5,Iris-versicolor\n\ + \ 6.2,2.2,4.5,1.5,Iris-versicolor\n 5.6,2.5,3.9,1.1,Iris-versicolor\n\ + \ 5.9,3.2,4.8,1.8,Iris-versicolor\n 6.1,2.8,4.0,1.3,Iris-versicolor\n\ + \ 6.3,2.5,4.9,1.5,Iris-versicolor\n 6.1,2.8,4.7,1.2,Iris-versicolor\n\ + \ 6.4,2.9,4.3,1.3,Iris-versicolor\n 6.6,3.0,4.4,1.4,Iris-versicolor\n\ + \ 5.6,2.7,4.2,1.3,Iris-versicolor\n 5.7,3.0,4.2,1.2,Iris-versicolor\n\ + \ 5.7,2.9,4.2,1.3,Iris-versicolor\n 6.2,2.9,4.3,1.3,Iris-versicolor\n\ + \ 5.1,2.5,3.0,1.1,Iris-versicolor\n 5.7,2.8,4.1,1.3,Iris-versicolor\n\ + \ 6.3,3.3,6.0,2.5,Iris-virginica\n 5.8,2.7,5.1,1.9,Iris-virginica\n\ + \ 7.1,3.0,5.9,2.1,Iris-virginica\n 6.3,2.9,5.6,1.8,Iris-virginica\n\ + \ 6.5,3.0,5.8,2.2,Iris-virginica\n 6.9,3.1,5.1,2.3,Iris-virginica\n\ + \ 5.8,2.7,5.1,1.9,Iris-virginica\n 6.8,3.2,5.9,2.3,Iris-virginica\n\ + \ 6.7,3.3,5.7,2.5,Iris-virginica\n 6.7,3.0,5.2,2.3,Iris-virginica\n\ + \ 6.3,2.5,5.0,1.9,Iris-virginica\n 6.5,3.0,5.2,2.0,Iris-virginica\n\ + \ 6.2,3.4,5.4,2.3,Iris-virginica\n 5.9,3.0,5.1,1.8,Iris-virginica\n\ + \ \"\"\"\n col_names = [\"Sepal_Length\", \"Sepal_Width\", \"Petal_Length\"\ + , \"Petal_Width\", \"Labels\"]\n df = pd.read_csv(StringIO(data), names=col_names)\n\ + \n with open(iris_dataset.path, \"w\") as f:\n df.to_csv(f)\n\n" + image: quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.1 + exec-normalize-dataset: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - normalize_dataset + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'pandas==2.2.0'\ + \ 'scikit-learn==1.4.0' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef normalize_dataset(\n input_iris_dataset: Input[Dataset],\n\ + \ normalized_iris_dataset: Output[Dataset],\n standard_scaler: bool,\n\ + ):\n import pandas as pd\n from sklearn.preprocessing import MinMaxScaler,\ + \ StandardScaler\n\n with open(input_iris_dataset.path) as f:\n \ + \ df = pd.read_csv(f)\n labels = df.pop(\"Labels\")\n\n scaler =\ + \ StandardScaler() if standard_scaler else MinMaxScaler()\n\n df = pd.DataFrame(scaler.fit_transform(df))\n\ + \ df[\"Labels\"] = labels\n normalized_iris_dataset.metadata[\"state\"\ + ] = \"Normalized\"\n with open(normalized_iris_dataset.path, \"w\") as\ + \ f:\n df.to_csv(f)\n\n" + image: quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.1 + exec-train-model: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - train_model + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'pandas==2.2.0'\ + \ 'scikit-learn==1.4.0' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef train_model(\n normalized_iris_dataset: Input[Dataset],\n\ + \ model: Output[Model],\n metrics: Output[ClassificationMetrics],\n\ + \ n_neighbors: int,\n):\n import pickle\n\n import pandas as pd\n\ + \ from sklearn.metrics import confusion_matrix, roc_curve\n from sklearn.model_selection\ + \ import cross_val_predict, train_test_split\n from sklearn.neighbors\ + \ import KNeighborsClassifier\n\n with open(normalized_iris_dataset.path)\ + \ as f:\n df = pd.read_csv(f)\n\n y = df.pop(\"Labels\")\n \ + \ X = df\n\n X_train, X_test, y_train, y_test = train_test_split(X, y,\ + \ random_state=0)\n\n clf = KNeighborsClassifier(n_neighbors=n_neighbors)\n\ + \ clf.fit(X_train, y_train)\n\n predictions = cross_val_predict(clf,\ + \ X_train, y_train, cv=3)\n metrics.log_confusion_matrix(\n [\"\ + Iris-Setosa\", \"Iris-Versicolour\", \"Iris-Virginica\"],\n confusion_matrix(y_train,\ + \ predictions).tolist(), # .tolist() to convert np array to list.\n \ + \ )\n\n model.metadata[\"framework\"] = \"scikit-learn\"\n with open(model.path,\ + \ \"wb\") as f:\n pickle.dump(clf, f)\n\n" + image: quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.1 +pipelineInfo: + name: iris-training-pipeline +root: + dag: + outputs: + artifacts: + train-model-metrics: + artifactSelectors: + - outputArtifactKey: metrics + producerSubtask: train-model + tasks: + create-dataset: + cachingOptions: {} + componentRef: + name: comp-create-dataset + taskInfo: + name: create-dataset + normalize-dataset: + cachingOptions: {} + componentRef: + name: comp-normalize-dataset + dependentTasks: + - create-dataset + inputs: + artifacts: + input_iris_dataset: + taskOutputArtifact: + outputArtifactKey: iris_dataset + producerTask: create-dataset + parameters: + standard_scaler: + componentInputParameter: standard_scaler + taskInfo: + name: normalize-dataset + train-model: + cachingOptions: {} + componentRef: + name: comp-train-model + dependentTasks: + - normalize-dataset + inputs: + artifacts: + normalized_iris_dataset: + taskOutputArtifact: + outputArtifactKey: normalized_iris_dataset + producerTask: normalize-dataset + parameters: + n_neighbors: + componentInputParameter: neighbors + taskInfo: + name: train-model + inputDefinitions: + parameters: + neighbors: + defaultValue: 3.0 + isOptional: true + parameterType: NUMBER_INTEGER + standard_scaler: + defaultValue: true + isOptional: true + parameterType: BOOLEAN + outputDefinitions: + artifacts: + train-model-metrics: + artifactType: + schemaTitle: system.ClassificationMetrics + schemaVersion: 0.0.1 +schemaVersion: 2.1.0 +sdkVersion: kfp-2.7.0 diff --git a/tests/suite_test.go b/tests/suite_test.go index 512fac361..0145dac0b 100644 --- a/tests/suite_test.go +++ b/tests/suite_test.go @@ -22,6 +22,7 @@ import ( "context" "flag" "fmt" + "k8s.io/client-go/kubernetes" "testing" "time" @@ -87,6 +88,7 @@ type ClientManager struct { k8sClient client.Client mfsClient mf.Client mfopts mf.Option + clientset kubernetes.Interface } type IntegrationTestSuite struct { @@ -157,6 +159,9 @@ func (suite *IntegrationTestSuite) SetupSuite() { suite.Require().NotNil(clientmgr.k8sClient) clientmgr.mfsClient = mfc.NewClient(clientmgr.k8sClient) clientmgr.mfopts = mf.UseClient(clientmgr.mfsClient) + clientset, err := kubernetes.NewForConfig(cfg) + suite.Require().NoError(err) + clientmgr.clientset = clientset suite.Clientmgr = clientmgr DSPA = testUtil.GetDSPAFromPath(suite.T(), clientmgr.mfopts, DSPAPath)