Skip to content

Commit

Permalink
Feature: Cassandra SSL (#19226)
Browse files Browse the repository at this point in the history
  • Loading branch information
keshavmohta09 authored Jan 10, 2025
1 parent 8599aab commit 4bd8994
Show file tree
Hide file tree
Showing 10 changed files with 299 additions and 21 deletions.
5 changes: 5 additions & 0 deletions ingestion/src/metadata/examples/workflows/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ source:
# token: <Token String>
# requestTimeout: <Timeout in seconds>
# connectTimeout: <Timeout in seconds>
# sslMode: allow
# sslConfig:
# caCertificate: "CA certificate content"
# sslCertificate: "SSL certificate content"
# sslKey: "SSL key content"
hostPort: localhost:9042
sourceConfig:
config:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from metadata.generated.schema.entity.services.connections.testConnectionResult import (
TestConnectionResult,
)
from metadata.ingestion.connections.builders import init_empty_connection_arguments
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.cassandra.queries import (
Expand Down Expand Up @@ -77,7 +78,14 @@ def get_connection(connection: CassandraConnection):
password=connection.authType.password.get_secret_value(),
)

cluster = Cluster(**cluster_config)
connection.connectionArguments = (
connection.connectionArguments or init_empty_connection_arguments()
)

cluster = Cluster(
**cluster_config,
ssl_context=connection.connectionArguments.root.get("ssl_context"),
)
session = cluster.connect()

return session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Iceberg source helpers.
Cassandra source helpers.
"""
from __future__ import annotations

Expand Down Expand Up @@ -57,6 +57,7 @@ def parse(cls, field) -> Column:
data_type = None
array_data_type = None
raw_data_type = ""

for letter in field.type:
if letter == "<":
if raw_data_type in ("", "frozen"):
Expand All @@ -71,31 +72,29 @@ def parse(cls, field) -> Column:
array_data_type = cls.datatype_mapping.get(
raw_data_type.lower(), DataType.UNKNOWN
)

raw_data_type = ""
if data_type != DataType.ARRAY:
if data_type != DataType.ARRAY or array_data_type:
break

elif letter != ">":
raw_data_type += letter

elif letter == ">":
if not array_data_type and data_type:
array_data_type = cls.datatype_mapping.get(
raw_data_type.lower(), DataType.UNKNOWN
)
break
else:
if not data_type:
data_type = cls.datatype_mapping.get(
field.type.lower(), DataType.UNKNOWN
)

else:
raw_data_type += letter

if not data_type:
data_type = cls.datatype_mapping.get(field.type.lower(), DataType.UNKNOWN)

column_def = {
"name": field.column_name,
"dataTypeDisplay": field.type,
"dataType": data_type,
"arrayDataType": array_data_type,
}
if array_data_type:
column_def["arrayDataType"] = array_data_type

return Column(**column_def)
35 changes: 35 additions & 0 deletions ingestion/src/metadata/utils/ssl_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import tempfile
import traceback
from functools import singledispatch, singledispatchmethod
from ssl import CERT_REQUIRED, SSLContext
from typing import Optional, Union, cast

from pydantic import SecretStr
Expand All @@ -27,6 +28,9 @@
from metadata.generated.schema.entity.services.connections.dashboard.qlikSenseConnection import (
QlikSenseConnection,
)
from metadata.generated.schema.entity.services.connections.database.cassandraConnection import (
CassandraConnection,
)
from metadata.generated.schema.entity.services.connections.database.dorisConnection import (
DorisConnection,
)
Expand Down Expand Up @@ -55,6 +59,7 @@
MatillionConnection,
)
from metadata.generated.schema.security.ssl import verifySSLConfig
from metadata.generated.schema.security.ssl.verifySSLConfig import SslMode
from metadata.ingestion.connections.builders import init_empty_connection_arguments
from metadata.ingestion.models.custom_pydantic import CustomSecretStr
from metadata.ingestion.source.connections import get_connection
Expand Down Expand Up @@ -206,6 +211,25 @@ def _(self, connection):
] = self.cert_file_path
return connection

@setup_ssl.register(CassandraConnection)
def _(self, connection):
connection = cast(CassandraConnection, connection)

ssl_context = None
if connection.sslMode != SslMode.disable:
ssl_context = SSLContext()
ssl_context.load_verify_locations(cafile=self.ca_file_path)
ssl_context.verify_mode = CERT_REQUIRED
ssl_context.load_cert_chain(
certfile=self.cert_file_path, keyfile=self.key_file_path
)

connection.connectionArguments = (
connection.connectionArguments or init_empty_connection_arguments()
)
connection.connectionArguments.root["ssl_context"] = ssl_context
return connection


@singledispatch
def check_ssl_and_init(_) -> Optional[SSLManager]:
Expand Down Expand Up @@ -289,6 +313,17 @@ def _(connection):
return None


@check_ssl_and_init.register(CassandraConnection)
def _(connection):
service_connection = cast(CassandraConnection, connection)
ssl: Optional[verifySSLConfig.SslConfig] = service_connection.sslConfig
if ssl and (ssl.root.caCertificate or ssl.root.sslCertificate or ssl.root.sslKey):
return SSLManager(
ca=ssl.root.caCertificate, cert=ssl.root.sslCertificate, key=ssl.root.sslKey
)
return None


def get_ssl_connection(service_config):
try:
# To be cleaned up as part of https://github.com/open-metadata/OpenMetadata/issues/15913
Expand Down
71 changes: 71 additions & 0 deletions ingestion/tests/unit/test_ssl_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
OpenMetadataJWTClientConfig,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.cassandra.metadata import CassandraSource
from metadata.ingestion.source.messaging.kafka.metadata import KafkaSource
from metadata.utils.ssl_manager import SSLManager

Expand Down Expand Up @@ -124,3 +125,73 @@ def test_init(self, mock_ssl_manager, test_connection):
kafka_source_with_ssl.service_connection.schemaRegistrySSL.root.sslCertificate.get_secret_value(),
"sslCertificateData",
)


class CassandraSourceSSLTest(TestCase):
@patch("metadata.utils.ssl_manager.SSLManager.setup_ssl")
@patch(
"metadata.ingestion.source.database.cassandra.metadata.CassandraSource.test_connection"
)
@patch("metadata.ingestion.source.database.cassandra.connection.get_connection")
def test_init(self, get_connection, test_connection, setup_ssl):
get_connection.return_value = True
test_connection.return_value = True
setup_ssl.side_effect = lambda x: x

config = WorkflowSource(
**{
"type": "cassandra",
"serviceName": "local_cassandra",
"serviceConnection": {
"config": {
"type": "Cassandra",
"hostPort": "localhost:9042",
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
}
)
metadata = OpenMetadata(
OpenMetadataConnection(
hostPort="http://localhost:8585/api",
authProvider="openmetadata",
securityConfig=OpenMetadataJWTClientConfig(jwtToken="token"),
)
)
cassandra_source = CassandraSource(config, metadata)
self.assertIsNone(cassandra_source.ssl_manager)

config_with_ssl = WorkflowSource(
**{
"type": "cassandra",
"serviceName": "local_cassandra",
"serviceConnection": {
"config": {
"type": "Cassandra",
"hostPort": "localhost:9042",
"sslConfig": {
"caCertificate": "caCertificateData",
"sslKey": "sslKeyData",
"sslCertificate": "sslCertificateData",
},
"sslMode": "allow",
},
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
}
)
cassandra_source_with_ssl = CassandraSource(config_with_ssl, metadata)

self.assertIsNotNone(cassandra_source_with_ssl.ssl_manager)
self.assertEqual(
cassandra_source_with_ssl.service_connection.sslConfig.root.caCertificate.get_secret_value(),
"caCertificateData",
)
self.assertEqual(
cassandra_source_with_ssl.service_connection.sslConfig.root.sslKey.get_secret_value(),
"sslKeyData",
)
self.assertEqual(
cassandra_source_with_ssl.service_connection.sslConfig.root.sslCertificate.get_secret_value(),
"sslCertificateData",
)
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Configure and schedule Cassandra metadata workflows from the OpenMetadata UI:

- [Requirements](#requirements)
- [Metadata Ingestion](#metadata-ingestion)
- [Enable Security](#securing-cassandra-connection-with-ssl-in-openmetadata)

{% partial file="/v1.7/connectors/ingestion-modes-tiles.md" variables={yamlPath: "/connectors/database/cassandra/yaml"} /%}

Expand Down Expand Up @@ -59,6 +60,19 @@ Configuration for connecting to DataStax Astra DB in the cloud.
- **token**: The Astra DB application token used for authentication.
- **secureConnectBundle**: File path to the Secure Connect Bundle (.zip) used for a secure connection to DataStax Astra DB.

**SSL Modes**

There are a couple of types of SSL modes that Cassandra supports which can be added to ConnectionArguments, they are as follows:
- **disable**: SSL is disabled and the connection is not encrypted.
- **allow**: SSL is used if the server requires it.
- **prefer**: SSL is used if the server supports it.
- **require**: SSL is required.
- **verify-ca**: SSL must be used and the server certificate must be verified.
- **verify-full**: SSL must be used. The server certificate must be verified, and the server hostname must match the hostname attribute on the certificate.

**SSL Configuration**

In order to integrate SSL in the Metadata Ingestion Config, the user will have to add the SSL config under sslConfig which is placed in the source.

{% /extraContent %}

Expand All @@ -70,6 +84,18 @@ Configuration for connecting to DataStax Astra DB in the cloud.

{% /stepsContainer %}

## Securing Cassandra Connection with SSL in OpenMetadata

To establish secure connections between OpenMetadata and a Cassandra database, you can use any SSL mode provided by Cassandra, except disable.

Under `Advanced Config`, after selecting the SSL mode, provide the CA certificate, SSL certificate and SSL key.

{% image
src="/images/v1.7/connectors/ssl_connection.png"
alt="SSL Configuration"
height="450px"
caption="SSL Configuration" /%}

{% partial file="/v1.7/connectors/troubleshooting.md" /%}

{% partial file="/v1.7/connectors/database/related.md" /%}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ Configure and schedule Cassandra metadata workflows from the OpenMetadata UI:

- [Requirements](#requirements)
- [Metadata Ingestion](#metadata-ingestion)
- [Enable Security](#securing-cassandra-connection-with-ssl-in-openmetadata)


{% partial file="/v1.7/connectors/ingestion-modes-tiles.md" variables={yamlPath: "/connectors/database/cassandra/yaml"} /%}

Expand Down Expand Up @@ -102,8 +104,36 @@ Configuration for connecting to DataStax Astra DB in the cloud.

{% partial file="/v1.7/connectors/yaml/workflow-config-def.md" /%}

#### Advanced Configuration

{% codeInfo srNumber=6 %}

**Connection Arguments (Optional)**: Enter the details for any additional connection arguments such as security or protocol configs that can be sent to database during the connection. These details must be added as Key-Value pairs.

{% /codeInfo %}

{% codeInfo srNumber=7 %}

The sslConfig and sslMode are used to configure the SSL (Secure Sockets Layer) connection between your application and the PostgreSQL server.

- **caCertificate**: Provide the path to ssl ca file.

- **sslCertificate**: Provide the path to ssl client certificate file (ssl_cert).

- **sslKey**: Provide the path to ssl client certificate file (ssl_key).

**sslMode**: This field controls whether a secure SSL/TLS connection will be negotiated with the server. There are several modes you can choose:

disable: No SSL/TLS encryption will be used; the data sent over the network is not encrypted.
allow: The driver will try to negotiate a non-SSL connection but if the server insists on SSL, it will switch to SSL.
prefer (the default): The driver will try to negotiate an SSL connection but if the server does not support SSL, it will switch to a non-SSL connection.
require: The driver will try to negotiate an SSL connection. If the server does not support SSL, the driver will not fall back to a non-SSL connection.
verify-ca: The driver will negotiate an SSL connection and verify that the server certificate is issued by a trusted certificate authority (CA).
verify-full: The driver will negotiate an SSL connection, verify that the server certificate is issued by a trusted CA and check that the server host name matches the one in the certificate.

{% /codeInfo %}

{% /codeInfo %}

{% /codeInfoContainer %}

Expand Down Expand Up @@ -135,6 +165,17 @@ source:
requestTimeout: <Timeout in seconds>
connectTimeout: <Timeout in seconds>
```
```yaml {% srNumber=6 %}
# connectionArguments:
# key: value
```
```yaml {% srNumber=7 %}
# sslConfig:
# caCertificate: "path/to/ca/certificate"
# sslCertificate: "path/to/ssl/certificate"
# sslKey: "path/to/ssl/key"
# sslMode: disable #allow prefer require verify-ca verify-full
```


{% partial file="/v1.7/connectors/yaml/database/source-config.md" /%}
Expand All @@ -148,3 +189,17 @@ source:
{% /codePreview %}

{% partial file="/v1.7/connectors/yaml/ingestion-cli.md" /%}

## Securing Cassandra Connection with SSL in OpenMetadata

To establish secure connections between OpenMetadata and a Cassandra database, you can use any SSL mode provided by Cassandra, except disable.

Under `Advanced Config`, after selecting the SSL mode, provide the CA certificate, SSL certificate and SSL key.

```yaml
sslMode: allow
sslConfig:
caCertificate: "/path/to/ca/certificate"
sslCertificate: "/path/to/ssl/certificate"
sslKey: "/path/to/ssl/key"
```
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@
"description": "Optional name to give to the database in OpenMetadata. If left blank, we will use default as the database name.",
"type": "string"
},
"connectionArguments": {
"title": "Connection Arguments",
"$ref": "../connectionBasicType.json#/definitions/connectionArguments"
},
"sslMode": {
"title": "SSL Mode",
"$ref": "../../../../security/ssl/verifySSLConfig.json#/definitions/sslMode"
},
"sslConfig": {
"title": "SSL Configuration",
"$ref": "../../../../security/ssl/verifySSLConfig.json#/definitions/sslConfig"
},
"supportsMetadataExtraction": {
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
Expand Down
Loading

0 comments on commit 4bd8994

Please sign in to comment.