Skip to content

Commit

Permalink
Change catalog / storage parameters to arbitrary JSON strings (#43)
Browse files Browse the repository at this point in the history
* Change catalog / storage parameters to arbitrary JSON strings

* Pass full dictionary for storage_info and catalog_info.

* Rename RoleARN as Identity.

---------

Co-authored-by: mgiannakopoulos <[email protected]>
  • Loading branch information
kesmit13 and mgiannakopoulos authored Nov 27, 2024
1 parent a3a0379 commit 32a0cee
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 191 deletions.
14 changes: 6 additions & 8 deletions singlestoredb/fusion/handlers/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
from typing import Optional

from .. import result
from ...management.export import Catalog
from ...management.export import ExportService
from ...management.export import ExportStatus
from ...management.export import Link
from ..handler import SQLHandler
from ..result import FusionSQLResult
from .utils import get_workspace_group
Expand Down Expand Up @@ -89,14 +87,14 @@ def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
wsg,
'none',
'none',
Catalog.from_config_and_creds(catalog_config, catalog_creds, wsg._manager),
Link.from_config_and_creds('S3', storage_config, storage_creds, wsg._manager),
dict(**catalog_config, **catalog_creds),
dict(**storage_config, **storage_creds),
columns=None,
).create_cluster_identity()

res = FusionSQLResult()
res.add_field('RoleARN', result.STRING)
res.set_rows([(out['roleARN'],)])
res.add_field('Identity', result.STRING)
res.set_rows([(out['identity'],)])

return res

Expand Down Expand Up @@ -191,8 +189,8 @@ def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
wsg,
from_database,
from_table,
Catalog.from_config_and_creds(catalog_config, catalog_creds, wsg._manager),
Link.from_config_and_creds('S3', storage_config, storage_creds, wsg._manager),
dict(**catalog_config, **catalog_creds),
dict(**storage_config, **storage_creds),
columns=None,
).start()

Expand Down
202 changes: 19 additions & 183 deletions singlestoredb/management/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,194 +2,36 @@
"""SingleStoreDB export service."""
from __future__ import annotations

import abc
import re
import copy
import json
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Union

from .. import ManagementError
from .utils import vars_to_str
from .workspace import WorkspaceGroup
from .workspace import WorkspaceManager


class Link(object):
"""Generic storage base class."""
scheme: str = 'unknown'

def __str__(self) -> str:
"""Return string representation."""
return vars_to_str(self)

def __repr__(self) -> str:
"""Return string representation."""
return str(self)

@abc.abstractmethod
def to_storage_info(self) -> Dict[str, Any]:
raise NotImplementedError

@classmethod
def from_config_and_creds(
cls,
scheme: str,
config: Dict[str, Any],
credentials: Dict[str, Any],
manager: 'WorkspaceManager',
) -> 'Link':
out_cls = None
for c in cls.__subclasses__():
if c.scheme == scheme.upper():
out_cls = c
break

if out_cls is None:
raise TypeError(f'No link class found for given information: {scheme}')

return out_cls.from_config_and_creds(scheme, config, credentials, manager)


class S3Link(Link):
"""S3 link."""

scheme: str = 'S3'
region: str
storage_base_url: str

def __init__(self, region: str, storage_base_url: str):
self.region = region
self.storage_base_url = storage_base_url
self._manager: Optional[WorkspaceManager] = None

def to_storage_info(self) -> Dict[str, Any]:
return dict(
storageBaseURL=self.storage_base_url,
storageRegion=self.region,
)

@classmethod
def from_config_and_creds(
cls,
scheme: str,
config: Dict[str, Any],
credentials: Dict[str, Any],
manager: 'WorkspaceManager',
) -> 'S3Link':
assert scheme.upper() == cls.scheme

params: Dict[str, Any] = {}
params.update(config)
params.update(credentials)

assert params.get('region'), 'region is required'
assert params.get('endpoint_url'), 'endpoint_url is required'

out = cls(params['region'], params['endpoint_url'])
out._manager = manager
return out


class Catalog(object):
"""Generic catalog base class."""

catalog_type: str = 'UNKNOWN'
table_format: str = 'UNKNOWN'

def __str__(self) -> str:
"""Return string representation."""
return vars_to_str(self)

def __repr__(self) -> str:
"""Return string representation."""
return str(self)

@classmethod
def from_config_and_creds(
cls,
config: Dict[str, Any],
credentials: Dict[str, Any],
manager: 'WorkspaceManager',
) -> 'Catalog':
catalog_type = config['type'].upper()
table_format = config['table_format'].upper()

out_cls = None
for c in cls.__subclasses__():
if c.catalog_type == catalog_type and c.table_format == table_format:
out_cls = c
break

if out_cls is None:
raise TypeError(f'No catalog class found for given information: {config}')

return out_cls.from_config_and_creds(config, credentials, manager)

@abc.abstractmethod
def to_catalog_info(self) -> Dict[str, Any]:
"""Return a catalog info dictionary."""
raise NotImplementedError


class IcebergGlueCatalog(Catalog):
"""Iceberg glue catalog."""

table_format = 'ICEBERG'
catalog_type = 'GLUE'

region: str
catalog_id: str

def __init__(self, region: str, catalog_id: str):
self.region = region
self.catalog_id = catalog_id
self._manager: Optional[WorkspaceManager] = None

@classmethod
def from_config_and_creds(
cls,
config: Dict[str, Any],
credentials: Dict[str, Any],
manager: 'WorkspaceManager',
) -> 'IcebergGlueCatalog':
params = {}
params.update(config)
params.update(credentials)

out = cls(
region=params['region'],
catalog_id=params['id'],
)
out._manager = manager
return out

def to_catalog_info(self) -> Dict[str, Any]:
"""Return a catalog info dictionary."""
return dict(
catalogType=self.catalog_type,
tableFormat=self.table_format,
glueRegion=self.region,
glueCatalogID=self.catalog_id,
)


class ExportService(object):
"""Export service."""

database: str
table: str
catalog: Catalog
storage_link: Link
catalog_info: Dict[str, Any]
storage_info: Dict[str, Any]
columns: Optional[List[str]]

def __init__(
self,
workspace_group: WorkspaceGroup,
database: str,
table: str,
catalog: Catalog,
storage_link: Link,
catalog_info: Union[str, Dict[str, Any]],
storage_info: Union[str, Dict[str, Any]],
columns: Optional[List[str]],
):
#: Workspace group
Expand All @@ -205,10 +47,16 @@ def __init__(
self.columns = columns

#: Catalog
self.catalog = catalog
if isinstance(catalog_info, str):
self.catalog_info = json.loads(catalog_info)
else:
self.catalog_info = copy.copy(catalog_info)

#: Storage
self.storage_link = storage_link
if isinstance(storage_info, str):
self.storage_info = json.loads(storage_info)
else:
self.storage_info = copy.copy(storage_info)

self._manager: Optional[WorkspaceManager] = workspace_group._manager

Expand All @@ -227,21 +75,12 @@ def create_cluster_identity(self) -> Dict[str, Any]:
msg='No workspace manager is associated with this object.',
)

if not isinstance(self.catalog, IcebergGlueCatalog):
raise TypeError('Only Iceberg Glue catalog is supported at this time.')

if not isinstance(self.storage_link, S3Link):
raise TypeError('Only S3 links are supported at this time.')

out = self._manager._post(
f'workspaceGroups/{self.workspace_group.id}/'
'egress/createEgressClusterIdentity',
json=dict(
storageBucketName=re.split(
r'/+', self.storage_link.storage_base_url,
)[1],
glueRegion=self.catalog.region,
glueCatalogID=self.catalog.catalog_id,
catalogInfo=self.catalog_info,
storageInfo=self.storage_info,
),
)

Expand All @@ -254,16 +93,13 @@ def start(self, tags: Optional[List[str]] = None) -> 'ExportStatus':
msg='No workspace manager is associated with this object.',
)

if not isinstance(self.storage_link, S3Link):
raise TypeError('Only S3 links are supported at this time.')

out = self._manager._post(
f'workspaceGroups/{self.workspace_group.id}/egress/startTableEgress',
json=dict(
databaseName=self.database,
tableName=self.table,
storageInfo=self.storage_link.to_storage_info(),
catalogInfo=self.catalog.to_catalog_info(),
storageInfo=self.storage_info,
catalogInfo=self.catalog_info,
),
)

Expand Down

0 comments on commit 32a0cee

Please sign in to comment.