Skip to content

Commit

Permalink
added certificate renewal integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
skourta committed Jan 9, 2025
1 parent e557c0f commit 70a79f1
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 7 deletions.
8 changes: 4 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ allure-pytest-collection-report = {git = "https://github.com/canonical/data-plat
# see also: https://github.com/juju/python-libjuju/issues/1184
websockets = "<14.0"
tenacity = "*"
pydantic = "==2.9.1"

[tool.coverage.run]
branch = true
Expand Down
15 changes: 13 additions & 2 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from pytest_operator.plugin import OpsTest

from literals import CLIENT_PORT, SNAP_NAME
from managers.tls import CertType

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -93,14 +94,14 @@ def get_cluster_endpoints(
)


async def get_juju_leader_unit_name(ops_test: OpsTest, app_name: str = APP_NAME) -> str:
async def get_juju_leader_unit_name(ops_test: OpsTest, app_name: str = APP_NAME) -> str | None:
"""Retrieve the leader unit name."""
for unit in ops_test.model.applications[app_name].units:
if await unit.is_leader_from_status():
return unit.name


async def get_secret_by_label(ops_test: OpsTest, label: str) -> Dict[str, str]:
async def get_secret_by_label(ops_test: OpsTest, label: str) -> Dict[str, str] | None:
secrets_raw = await ops_test.juju("list-secrets")
secret_ids = [
secret_line.split()[0] for secret_line in secrets_raw[1].split("\n")[1:] if secret_line
Expand All @@ -114,3 +115,13 @@ async def get_secret_by_label(ops_test: OpsTest, label: str) -> Dict[str, str]:

if label == secret_data[secret_id].get("label"):
return secret_data[secret_id]["content"]["Data"]


def get_certificate_from_unit(model: str, unit: str, cert_type: CertType) -> str | None:
"""Retrieve a certificate from a unit."""
command = f'juju ssh --model={model} {unit} "cat /var/snap/charmed-etcd/common/tls/{cert_type.value}.pem"'
output = subprocess.getoutput(command)
if output.startswith("-----BEGIN CERTIFICATE-----"):
return output

return None
92 changes: 91 additions & 1 deletion tests/integration/test_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@
# See LICENSE file for licensing details.

import logging
from time import sleep

import pytest
from juju.application import Application
from pytest_operator.plugin import OpsTest

from literals import INTERNAL_USER, PEER_RELATION
from managers.tls import CertType

from .helpers import (
APP_NAME,
get_certificate_from_unit,
get_cluster_endpoints,
get_cluster_members,
get_juju_leader_unit_name,
Expand Down Expand Up @@ -55,6 +58,7 @@ async def test_build_and_deploy_with_tls(ops_test: OpsTest) -> None:
# check if all units have been added to the cluster
endpoints = get_cluster_endpoints(ops_test, APP_NAME, tls_enabled=True)
leader_unit = await get_juju_leader_unit_name(ops_test, APP_NAME)
assert leader_unit

cluster_members = get_cluster_members(model, leader_unit, endpoints, tls_enabled=True)
assert len(cluster_members) == NUM_UNITS
Expand All @@ -64,6 +68,7 @@ async def test_build_and_deploy_with_tls(ops_test: OpsTest) -> None:

# make sure data can be written to the cluster
secret = await get_secret_by_label(ops_test, label=f"{PEER_RELATION}.{APP_NAME}.app")
assert secret
password = secret.get(f"{INTERNAL_USER}-password")

assert (
Expand Down Expand Up @@ -101,7 +106,7 @@ async def test_turning_off_tls(ops_test: OpsTest) -> None:
model = ops_test.model_full_name
assert model is not None

# enable TLS and check if the cluster is still accessible
# disable TLS and check if the cluster is still accessible
etcd_app: Application = ops_test.model.applications[APP_NAME] # type: ignore
await etcd_app.remove_relation("peer-certificates", f"{TLS_NAME}:certificates")
await etcd_app.remove_relation("client-certificates", f"{TLS_NAME}:certificates")
Expand All @@ -110,6 +115,7 @@ async def test_turning_off_tls(ops_test: OpsTest) -> None:

endpoints = get_cluster_endpoints(ops_test, APP_NAME)
leader_unit = await get_juju_leader_unit_name(ops_test, APP_NAME)
assert leader_unit
cluster_members = get_cluster_members(model, leader_unit, endpoints)
assert len(cluster_members) == NUM_UNITS

Expand All @@ -118,6 +124,7 @@ async def test_turning_off_tls(ops_test: OpsTest) -> None:
assert cluster_member["peerURLs"][0].startswith("http://")

secret = await get_secret_by_label(ops_test, label=f"{PEER_RELATION}.{APP_NAME}.app")
assert secret
password = secret.get(f"{INTERNAL_USER}-password")
assert (
get_key(
Expand Down Expand Up @@ -148,6 +155,7 @@ async def test_turning_on_tls(ops_test: OpsTest) -> None:

endpoints = get_cluster_endpoints(ops_test, APP_NAME, tls_enabled=True)
leader_unit = await get_juju_leader_unit_name(ops_test, APP_NAME)
assert leader_unit
cluster_members = get_cluster_members(model, leader_unit, endpoints, tls_enabled=True)
assert len(cluster_members) == NUM_UNITS

Expand All @@ -156,6 +164,7 @@ async def test_turning_on_tls(ops_test: OpsTest) -> None:
assert cluster_member["peerURLs"][0].startswith("https")

secret = await get_secret_by_label(ops_test, label=f"{PEER_RELATION}.{APP_NAME}.app")
assert secret
password = secret.get(f"{INTERNAL_USER}-password")
assert (
get_key(
Expand All @@ -169,3 +178,84 @@ async def test_turning_on_tls(ops_test: OpsTest) -> None:
)
== TEST_VALUE
)


@pytest.mark.runner(["self-hosted", "linux", "X64", "jammy", "large"])
@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_certificate_expiration(ops_test: OpsTest) -> None:
assert ops_test.model
model = ops_test.model_full_name
assert model is not None

# turn off tls
# disable TLS and check if the cluster is still accessible
etcd_app: Application = ops_test.model.applications[APP_NAME] # type: ignore
await etcd_app.remove_relation("peer-certificates", f"{TLS_NAME}:certificates")
await etcd_app.remove_relation("client-certificates", f"{TLS_NAME}:certificates")

await ops_test.model.wait_for_idle(apps=[APP_NAME], status="active", timeout=1000)

endpoints = get_cluster_endpoints(ops_test, APP_NAME)
leader_unit = await get_juju_leader_unit_name(ops_test, APP_NAME)
assert leader_unit
cluster_members = get_cluster_members(model, leader_unit, endpoints)
assert len(cluster_members) == NUM_UNITS

for cluster_member in cluster_members:
assert cluster_member["clientURLs"][0].startswith("http://")
assert cluster_member["peerURLs"][0].startswith("http://")

# change config of TLS operator to have a certificate that is expired in 1m
tls_config = {"ca-common-name": "etcd", "certificate-validity": "1m"}
tls_app: Application = ops_test.model.applications[TLS_NAME] # type: ignore
await tls_app.set_config(tls_config)

# enable TLS and check if the cluster is still accessible
await ops_test.model.integrate(f"{APP_NAME}:peer-certificates", TLS_NAME)
await ops_test.model.integrate(f"{APP_NAME}:client-certificates", TLS_NAME)

await ops_test.model.wait_for_idle(apps=[APP_NAME], status="active", timeout=1000)

endpoints = get_cluster_endpoints(ops_test, APP_NAME, tls_enabled=True)
leader_unit = await get_juju_leader_unit_name(ops_test, APP_NAME)
assert leader_unit
cluster_members = get_cluster_members(model, leader_unit, endpoints, tls_enabled=True)
assert len(cluster_members) == NUM_UNITS

for cluster_member in cluster_members:
assert cluster_member["clientURLs"][0].startswith("https")
assert cluster_member["peerURLs"][0].startswith("https")

# get current certificate
current_certificate = get_certificate_from_unit(model, leader_unit, cert_type=CertType.CLIENT)
assert current_certificate

# wait for certificate to expire
sleep(90)

# get new certificate
new_certificate = get_certificate_from_unit(model, leader_unit, cert_type=CertType.CLIENT)
assert new_certificate

# check that the certificate has been updated
assert current_certificate != new_certificate

# check that the cluster is still accessible
secret = await get_secret_by_label(ops_test, label=f"{PEER_RELATION}.{APP_NAME}.app")
assert secret

password = secret.get(f"{INTERNAL_USER}-password")

assert (
get_key(
model,
leader_unit,
endpoints,
user=INTERNAL_USER,
password=password,
key=TEST_KEY,
tls_enabled=True,
)
== TEST_VALUE
)

0 comments on commit 70a79f1

Please sign in to comment.