diff --git a/singlestoredb/fusion/handlers/export.py b/singlestoredb/fusion/handlers/export.py index 6293109c..e893edb7 100644 --- a/singlestoredb/fusion/handlers/export.py +++ b/singlestoredb/fusion/handlers/export.py @@ -5,10 +5,8 @@ from typing import Optional from .. import result -from ...management.export import Catalog from ...management.export import ExportService from ...management.export import ExportStatus -from ...management.export import Link from ..handler import SQLHandler from ..result import FusionSQLResult from .utils import get_workspace_group @@ -89,14 +87,14 @@ def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]: wsg, 'none', 'none', - Catalog.from_config_and_creds(catalog_config, catalog_creds, wsg._manager), - Link.from_config_and_creds('S3', storage_config, storage_creds, wsg._manager), + dict(**catalog_config, **catalog_creds), + dict(**storage_config, **storage_creds), columns=None, ).create_cluster_identity() res = FusionSQLResult() - res.add_field('RoleARN', result.STRING) - res.set_rows([(out['roleARN'],)]) + res.add_field('Identity', result.STRING) + res.set_rows([(out['identity'],)]) return res @@ -191,8 +189,8 @@ def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]: wsg, from_database, from_table, - Catalog.from_config_and_creds(catalog_config, catalog_creds, wsg._manager), - Link.from_config_and_creds('S3', storage_config, storage_creds, wsg._manager), + dict(**catalog_config, **catalog_creds), + dict(**storage_config, **storage_creds), columns=None, ).start() diff --git a/singlestoredb/management/export.py b/singlestoredb/management/export.py index 5f641871..2a8cc931 100644 --- a/singlestoredb/management/export.py +++ b/singlestoredb/management/export.py @@ -2,12 +2,13 @@ """SingleStoreDB export service.""" from __future__ import annotations -import abc -import re +import copy +import json from typing import Any from typing import Dict from typing import List from typing import Optional +from typing import Union from .. import ManagementError from .utils import vars_to_str @@ -15,172 +16,13 @@ from .workspace import WorkspaceManager -class Link(object): - """Generic storage base class.""" - scheme: str = 'unknown' - - def __str__(self) -> str: - """Return string representation.""" - return vars_to_str(self) - - def __repr__(self) -> str: - """Return string representation.""" - return str(self) - - @abc.abstractmethod - def to_storage_info(self) -> Dict[str, Any]: - raise NotImplementedError - - @classmethod - def from_config_and_creds( - cls, - scheme: str, - config: Dict[str, Any], - credentials: Dict[str, Any], - manager: 'WorkspaceManager', - ) -> 'Link': - out_cls = None - for c in cls.__subclasses__(): - if c.scheme == scheme.upper(): - out_cls = c - break - - if out_cls is None: - raise TypeError(f'No link class found for given information: {scheme}') - - return out_cls.from_config_and_creds(scheme, config, credentials, manager) - - -class S3Link(Link): - """S3 link.""" - - scheme: str = 'S3' - region: str - storage_base_url: str - - def __init__(self, region: str, storage_base_url: str): - self.region = region - self.storage_base_url = storage_base_url - self._manager: Optional[WorkspaceManager] = None - - def to_storage_info(self) -> Dict[str, Any]: - return dict( - storageBaseURL=self.storage_base_url, - storageRegion=self.region, - ) - - @classmethod - def from_config_and_creds( - cls, - scheme: str, - config: Dict[str, Any], - credentials: Dict[str, Any], - manager: 'WorkspaceManager', - ) -> 'S3Link': - assert scheme.upper() == cls.scheme - - params: Dict[str, Any] = {} - params.update(config) - params.update(credentials) - - assert params.get('region'), 'region is required' - assert params.get('endpoint_url'), 'endpoint_url is required' - - out = cls(params['region'], params['endpoint_url']) - out._manager = manager - return out - - -class Catalog(object): - """Generic catalog base class.""" - - catalog_type: str = 'UNKNOWN' - table_format: str = 'UNKNOWN' - - def __str__(self) -> str: - """Return string representation.""" - return vars_to_str(self) - - def __repr__(self) -> str: - """Return string representation.""" - return str(self) - - @classmethod - def from_config_and_creds( - cls, - config: Dict[str, Any], - credentials: Dict[str, Any], - manager: 'WorkspaceManager', - ) -> 'Catalog': - catalog_type = config['type'].upper() - table_format = config['table_format'].upper() - - out_cls = None - for c in cls.__subclasses__(): - if c.catalog_type == catalog_type and c.table_format == table_format: - out_cls = c - break - - if out_cls is None: - raise TypeError(f'No catalog class found for given information: {config}') - - return out_cls.from_config_and_creds(config, credentials, manager) - - @abc.abstractmethod - def to_catalog_info(self) -> Dict[str, Any]: - """Return a catalog info dictionary.""" - raise NotImplementedError - - -class IcebergGlueCatalog(Catalog): - """Iceberg glue catalog.""" - - table_format = 'ICEBERG' - catalog_type = 'GLUE' - - region: str - catalog_id: str - - def __init__(self, region: str, catalog_id: str): - self.region = region - self.catalog_id = catalog_id - self._manager: Optional[WorkspaceManager] = None - - @classmethod - def from_config_and_creds( - cls, - config: Dict[str, Any], - credentials: Dict[str, Any], - manager: 'WorkspaceManager', - ) -> 'IcebergGlueCatalog': - params = {} - params.update(config) - params.update(credentials) - - out = cls( - region=params['region'], - catalog_id=params['id'], - ) - out._manager = manager - return out - - def to_catalog_info(self) -> Dict[str, Any]: - """Return a catalog info dictionary.""" - return dict( - catalogType=self.catalog_type, - tableFormat=self.table_format, - glueRegion=self.region, - glueCatalogID=self.catalog_id, - ) - - class ExportService(object): """Export service.""" database: str table: str - catalog: Catalog - storage_link: Link + catalog_info: Dict[str, Any] + storage_info: Dict[str, Any] columns: Optional[List[str]] def __init__( @@ -188,8 +30,8 @@ def __init__( workspace_group: WorkspaceGroup, database: str, table: str, - catalog: Catalog, - storage_link: Link, + catalog_info: Union[str, Dict[str, Any]], + storage_info: Union[str, Dict[str, Any]], columns: Optional[List[str]], ): #: Workspace group @@ -205,10 +47,16 @@ def __init__( self.columns = columns #: Catalog - self.catalog = catalog + if isinstance(catalog_info, str): + self.catalog_info = json.loads(catalog_info) + else: + self.catalog_info = copy.copy(catalog_info) #: Storage - self.storage_link = storage_link + if isinstance(storage_info, str): + self.storage_info = json.loads(storage_info) + else: + self.storage_info = copy.copy(storage_info) self._manager: Optional[WorkspaceManager] = workspace_group._manager @@ -227,21 +75,12 @@ def create_cluster_identity(self) -> Dict[str, Any]: msg='No workspace manager is associated with this object.', ) - if not isinstance(self.catalog, IcebergGlueCatalog): - raise TypeError('Only Iceberg Glue catalog is supported at this time.') - - if not isinstance(self.storage_link, S3Link): - raise TypeError('Only S3 links are supported at this time.') - out = self._manager._post( f'workspaceGroups/{self.workspace_group.id}/' 'egress/createEgressClusterIdentity', json=dict( - storageBucketName=re.split( - r'/+', self.storage_link.storage_base_url, - )[1], - glueRegion=self.catalog.region, - glueCatalogID=self.catalog.catalog_id, + catalogInfo=self.catalog_info, + storageInfo=self.storage_info, ), ) @@ -254,16 +93,13 @@ def start(self, tags: Optional[List[str]] = None) -> 'ExportStatus': msg='No workspace manager is associated with this object.', ) - if not isinstance(self.storage_link, S3Link): - raise TypeError('Only S3 links are supported at this time.') - out = self._manager._post( f'workspaceGroups/{self.workspace_group.id}/egress/startTableEgress', json=dict( databaseName=self.database, tableName=self.table, - storageInfo=self.storage_link.to_storage_info(), - catalogInfo=self.catalog.to_catalog_info(), + storageInfo=self.storage_info, + catalogInfo=self.catalog_info, ), )