Skip to content

Example: Convert Oracle DDL to Teradata DDL

Shinichi Takii edited this page Apr 19, 2020 · 7 revisions

Code

# -*- coding: utf-8 -*-
#
# Copyright (C) 2020 Shinichi Takii, [email protected]
#
# This module is part of python-ddlparse and is released under
# the BSD License: https://opensource.org/licenses/BSD-3-Clause

import re, textwrap
from collections import OrderedDict
from ddlparse import DdlParse, DdlParseColumn


def teradata_data_type(col: DdlParseColumn):
    """
    Get Teradata data type

    Arguments:
        col {DdlParseColumn} -- Parsed Column Object

    Raises:
        ValueError: Unknown data type error

    Returns:
        [str] -- Teradata data type string
    """

    # BigQuery data type = {write_length: (True|False), datatypes: [{datatype: ("literal"|regexp), length_gt: (None|[0-9]+), def_scale: (None|True|False) }, ...]}
    TERADATA_DATA_TYPE_DIC = OrderedDict()
    TERADATA_DATA_TYPE_DIC["VARCHAR"] = {
        'write_length': True,
        'datatypes': [{'datatype': re.compile(r"VARCHAR"), 'length_gt': None, 'def_scale': None}]
    }
    TERADATA_DATA_TYPE_DIC["CHAR"] = {
        'write_length': True,
        'datatypes': [{'datatype': re.compile(r"CHAR"), 'length_gt': None, 'def_scale': None}]
    }
    TERADATA_DATA_TYPE_DIC["TIMESTAMP"] = {
        'write_length': False,
        'datatypes': [{'datatype': "DATE", 'length_gt': None, 'def_scale': None}]
    }
    TERADATA_DATA_TYPE_DIC["FLOAT"] = {
        'write_length': False,
        'datatypes': [
            {'datatype': "FLOAT", 'length_gt': None, 'def_scale': None},
            {'datatype': "NUMBER", 'length_gt': 0, 'def_scale': True}
        ]
    }
    TERADATA_DATA_TYPE_DIC["INTEGER"] = {
        'write_length': False,
        'datatypes': [{'datatype': "NUMBER", 'length_gt': 10, 'def_scale': False}]
    }
    TERADATA_DATA_TYPE_DIC["SMALLINT"] = {
        'write_length': False,
        'datatypes': [{'datatype': "NUMBER", 'length_gt': 5, 'def_scale': False}]
    }
    TERADATA_DATA_TYPE_DIC["BYTEINT"] = {
        'write_length': False,
        'datatypes': [{'datatype': "NUMBER", 'length_gt': 3, 'def_scale': False}]
    }
    TERADATA_DATA_TYPE_DIC["DECIMAL(38, 0)"] = {
        'write_length': False,
        'datatypes': [{'datatype': "NUMBER", 'length_gt': None, 'def_scale': False}]
    }


    is_match = False
    for td_type, conditions in TERADATA_DATA_TYPE_DIC.items():
        for source_datatype in conditions['datatypes']:

            if (isinstance(source_datatype['datatype'], str) and col.data_type == source_datatype['datatype']) \
                or re.search(source_datatype['datatype'], col.data_type):

                if source_datatype['length_gt'] is None:
                    is_match = True

                elif col.length is not None and col.length >= source_datatype['length_gt']:
                    if col.scale is None and source_datatype['def_scale'] is None:
                        is_match = True
                    elif col.scale is not None and source_datatype['def_scale']:
                        is_match = True
                    elif col.scale is None and source_datatype['def_scale'] == False:
                        is_match = True

            if is_match:
                return "{}{}".format(
                    td_type,
                    "" if not conditions['write_length'] or col.length is None else "({}{})".format(
                        col.length, "" if col.scale is None else "," + str(col.scale)
                    )
                )

    raise ValueError("Unknown data type : '{}'".format(col.data_type))



if __name__ == "__main__":

    sample_ddl = """
    -- Oracle DDL
    CREATE TABLE My_Schema.Sample_Table (
      varchar_1 varchar,
      varchar_2 varchar(100) NOT NULL,
      char_1 char,
      char_2 char(100) NOT NULL,
      float_1 float,
      float_2 number(1,2) NOT NULL,
      byteint_1 number(3) NOT NULL,
      smallint_1 number(5),
      integer_1 number(10),
      decimal_1 number
    );
    """


    # Specify source database of Oracle
    table = DdlParse().parse(
        ddl=sample_ddl,
        source_database=DdlParse.DATABASE.oracle
    )


    cols_defs = []
    for col in table.columns.values():
        col_name = col.get_name()
        data_type = teradata_data_type(col)
        not_null = " NOT NULL" if col.not_null else ""

        cols_defs.append("{name} {data_type}{not_null}".format(
            name=col_name,
            data_type=data_type,
            not_null=not_null,
        ))


    teradata_ddl = textwrap.dedent(
        """\
        CREATE TABLE {table}
        (
          {colmns_define}
        )""").format(
        table=table.name,
        colmns_define=",\n  ".join(cols_defs),
    )

    print(teradata_ddl)

Output

CREATE TABLE Sample_Table
(
  varchar_1 VARCHAR,
  varchar_2 VARCHAR(100) NOT NULL,
  char_1 CHAR,
  char_2 CHAR(100) NOT NULL,
  float_1 FLOAT,
  float_2 FLOAT NOT NULL,
  byteint_1 BYTEINT NOT NULL,
  smallint_1 SMALLINT,
  integer_1 INTEGER,
  decimal_1 DECIMAL(38, 0)
)
Clone this wiki locally