Skip to content

Commit

Permalink
feat[frontend]: implement configmap parsing
Browse files Browse the repository at this point in the history
Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: quinnovator <[email protected]>
  • Loading branch information
droctothorpe and quinnovator committed Nov 5, 2024
1 parent 60a8865 commit b373f18
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 9 deletions.
9 changes: 9 additions & 0 deletions frontend/server/configs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ export function loadConfigs(argv: string[], env: ProcessEnv): UIConfigs {
* https://github.com/kubeflow/pipelines/blob/7b7918ebf8c30e6ceec99283ef20dbc02fdf6a42/manifests/kustomize/third-party/argo/base/workflow-controller-configmap-patch.yaml#L28
*/
ARGO_KEYFORMAT = 'artifacts/{{workflow.name}}/{{workflow.creationTimestamp.Y}}/{{workflow.creationTimestamp.m}}/{{workflow.creationTimestamp.d}}/{{pod.name}}',
/** Argo Workflows lets you specify a unique artifact repository for each
* namespace by adding an appropriately formatted configmap to the namespace
* as documented here:
* https://argo-workflows.readthedocs.io/en/latest/artifact-repository-ref/.
* Use this field to enable this lookup. It defaults to false.
*/
ARGO_ARTIFACT_REPOSITORIES_LOOKUP = 'false',
/** Should use server API for log streaming? */
STREAM_LOGS_FROM_SERVER_API = 'false',
/** The main container name of a pod where logs are retrieved */
Expand Down Expand Up @@ -132,6 +139,7 @@ export function loadConfigs(argv: string[], env: ProcessEnv): UIConfigs {
archiveBucketName: ARGO_ARCHIVE_BUCKETNAME,
archiveLogs: asBool(ARGO_ARCHIVE_LOGS),
keyFormat: ARGO_KEYFORMAT,
artifactRepositoriesLookup: asBool(ARGO_ARTIFACT_REPOSITORIES_LOOKUP),
},
pod: {
logContainerName: POD_LOG_CONTAINER_NAME,
Expand Down Expand Up @@ -259,6 +267,7 @@ export interface ArgoConfigs {
archiveArtifactory: string;
archiveBucketName: string;
keyFormat: string;
artifactRepositoriesLookup: boolean;
}
export interface ServerConfigs {
basePath: string;
Expand Down
9 changes: 8 additions & 1 deletion frontend/server/handlers/pod-logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,21 @@ export function getPodLogsHandler(
},
podLogContainerName: string,
): Handler {
const { archiveLogs, archiveArtifactory, archiveBucketName, keyFormat } = argoOptions;
const {
archiveLogs,
archiveArtifactory,
archiveBucketName,
keyFormat,
artifactRepositoriesLookup,
} = argoOptions;

// get pod log from the provided bucket and keyFormat.
const getPodLogsStreamFromArchive = toGetPodLogsStream(
createPodLogsMinioRequestConfig(
archiveArtifactory === 'minio' ? artifactsOptions.minio : artifactsOptions.aws,
archiveBucketName,
keyFormat,
artifactRepositoriesLookup,
),
);

Expand Down
20 changes: 20 additions & 0 deletions frontend/server/k8s-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
V1DeleteOptions,
V1Pod,
V1EventList,
V1ConfigMap,
} from '@kubernetes/client-node';
import * as crypto from 'crypto-js';
import * as fs from 'fs';
Expand Down Expand Up @@ -277,6 +278,25 @@ export async function getPod(
}
}

/**
* Retrieves a configmap.
* @param configMapName name of the configmap
* @param configMapNamespace namespace of the configmap
*/
export async function getConfigMap(
configMapName: string,
configMapNamespace: string,
): Promise<[V1ConfigMap, undefined] | [undefined, K8sError]> {
try {
const { body } = await k8sV1Client.readNamespacedConfigMap(configMapName, configMapNamespace);
return [body, undefined];
} catch (error) {
const { message, additionalInfo } = await parseError(error);
const userMessage = `Could not get configMap ${configMapName} in namespace ${configMapNamespace}: ${message}`;
return [undefined, { message: userMessage, additionalInfo }];
}
}

// Golang style result type including an error.
export type Result<T, E = K8sError> = [T, undefined] | [undefined, E];
export async function listPodEvents(
Expand Down
39 changes: 38 additions & 1 deletion frontend/server/workflow-helper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import {
getPodLogsStreamFromK8s,
getPodLogsStreamFromWorkflow,
toGetPodLogsStream,
getKeyFormatFromArtifactRepositories,
} from './workflow-helper';
import { getK8sSecret, getArgoWorkflow, getPodLogs } from './k8s-helper';
import { getK8sSecret, getArgoWorkflow, getPodLogs, getConfigMap } from './k8s-helper';
import { V1ConfigMap, V1ObjectMeta } from '@kubernetes/client-node';

jest.mock('minio');
jest.mock('./k8s-helper');
Expand Down Expand Up @@ -118,13 +120,48 @@ describe('workflow-helper', () => {
});
});

describe('getKeyFormatFromArtifactRepositories', () => {
it('returns a keyFormat string from the artifact-repositories configmap.', async () => {
const artifactRepositories = {
'artifact-repositories':
'archiveLogs: true\n' +
's3:\n' +
' accessKeySecret:\n' +
' key: accesskey\n' +
' name: mlpipeline-minio-artifact\n' +
' bucket: mlpipeline\n' +
' endpoint: minio-service.kubeflow:9000\n' +
' insecure: true\n' +
' keyFormat: foo\n' +
' secretKeySecret:\n' +
' key: secretkey\n' +
' name: mlpipeline-minio-artifact',
};

const mockedConfigMap: V1ConfigMap = {
apiVersion: 'v1',
kind: 'ConfigMap',
metadata: new V1ObjectMeta(),
data: artifactRepositories,
binaryData: {},
};

const mockedGetConfigMap: jest.Mock = getConfigMap as any;
mockedGetConfigMap.mockResolvedValueOnce([mockedConfigMap, undefined]);
const res = await getKeyFormatFromArtifactRepositories('');
expect(mockedGetConfigMap).toBeCalledTimes(1);
expect(res).toEqual('foo');
});
});

describe('createPodLogsMinioRequestConfig', () => {
it('returns a MinioRequestConfig factory with the provided minioClientOptions, bucket, and prefix.', async () => {
const mockedClient: jest.Mock = MinioClient as any;
const requestFunc = await createPodLogsMinioRequestConfig(
minioConfig,
'bucket',
'artifacts/{{workflow.name}}/{{workflow.creationTimestamp.Y}}/{{workflow.creationTimestamp.m}}/{{workflow.creationTimestamp.d}}/{{pod.name}}',
true,
);
const request = await requestFunc(
'workflow-name-system-container-impl-foo',
Expand Down
89 changes: 82 additions & 7 deletions frontend/server/workflow-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
// limitations under the License.
import { PassThrough, Stream } from 'stream';
import { ClientOptions as MinioClientOptions } from 'minio';
import { getK8sSecret, getArgoWorkflow, getPodLogs } from './k8s-helper';
import { getK8sSecret, getArgoWorkflow, getPodLogs, getConfigMap } from './k8s-helper';
import { createMinioClient, MinioRequestConfig, getObjectStream } from './minio-helper';
import * as JsYaml from 'js-yaml';

export interface PartialArgoWorkflow {
status: {
Expand Down Expand Up @@ -142,18 +143,78 @@ export function toGetPodLogsStream(
};
}

/** PartialArtifactRepositoriesValue is used to deserialize the contents of the
* artifact-repositories configmap.
*/
interface PartialArtifactRepositoriesValue {
s3?: {
keyFormat: string;
};
gcs?: {
keyFormat: string;
};
oss?: {
keyFormat: string;
};
artifactory?: {
keyFormat: string;
};
}

/**
* getKeyFormatFromArtifactRepositories attempts to retrieve an
* artifact-repositories configmap from a specified namespace. It then parses
* the configmap and returns a keyFormat value in its data field.
* @param namespace namespace of the configmap
*/
export async function getKeyFormatFromArtifactRepositories(
namespace: string,
): Promise<string | undefined> {
try {
const [configMap] = await getConfigMap('artifact-repositories', namespace);
if (configMap === undefined) {
// If there is no artifact-repositories configmap, return undefined. The
// caller will just use keyFormat as specified in configs.ts.
return undefined;
}
let artifactRepositories = configMap?.data['artifact-repositories'];
const artifactRepositoriesValue = JsYaml.safeLoad(
artifactRepositories,
) as PartialArtifactRepositoriesValue;
if ('s3' in artifactRepositoriesValue) {
return artifactRepositoriesValue.s3?.keyFormat;
} else if ('gcs' in artifactRepositoriesValue) {
return artifactRepositoriesValue.gcs?.keyFormat;
} else if ('oss' in artifactRepositoriesValue) {
return artifactRepositoriesValue.oss?.keyFormat;
} else if ('artifactory' in artifactRepositoriesValue) {
return artifactRepositoriesValue.artifactory?.keyFormat;
} else {
throw new Error(
'artifact-repositories configmap missing one of [s3|gcs|oss|artifactory] fields.',
);
}
} catch (error) {
console.log(error);
return undefined;
}
}

/**
* Returns a MinioRequestConfig with the provided minio options (a MinioRequestConfig
* object contains the artifact bucket and keys, with the corresponding minio
* client).
* Returns a MinioRequestConfig with the provided minio options (a
* MinioRequestConfig object contains the artifact bucket and keys, with the
* corresponding minio client).
* @param minioOptions Minio options to create a minio client.
* @param bucket bucket containing the pod logs artifacts.
* @param keyFormat the keyFormat for pod logs artifacts stored in the bucket.
* @param keyFormatDefault the default keyFormat for pod logs artifacts stored
* in the bucket. This is overriden if there's an "artifact-repositories"
* configmap in the target namespace with a keyFormat field.
*/
export function createPodLogsMinioRequestConfig(
minioOptions: MinioClientOptions,
bucket: string,
keyFormat: string,
keyFormatDefault: string,
artifactRepositoriesLookup: boolean,
) {
return async (
podName: string,
Expand All @@ -164,7 +225,21 @@ export function createPodLogsMinioRequestConfig(
const client = await createMinioClient(minioOptions, 's3');
const createdAtArray = createdAt.split('-');

let key: string = keyFormat
// If artifactRepositoriesLookup is enabled, try to extract they keyformat
// from the configmap. Otherwise, just used the default keyFormat specified
// in configs.ts.
let keyFormatFromConfigMap = undefined;
if (artifactRepositoriesLookup) {
keyFormatFromConfigMap = await getKeyFormatFromArtifactRepositories(namespace);
}
let key: string;
if (keyFormatFromConfigMap !== undefined) {
key = keyFormatFromConfigMap;
} else {
key = keyFormatDefault;
}

key = key
.replace(/\s+/g, '') // Remove all whitespace.
.replace('{{workflow.name}}', podName.replace(/-system-container-impl-.*/, ''))
.replace('{{workflow.creationTimestamp.Y}}', createdAtArray[0])
Expand Down

0 comments on commit b373f18

Please sign in to comment.