Skip to content

Commit

Permalink
Add check related to Lighthouse coverage.
Browse files Browse the repository at this point in the history
I'm still without access to Sentinel, so this is all based on the docs.
  • Loading branch information
johanthoren committed Nov 29, 2024
1 parent 7f29a8e commit e6a3b8c
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 8 deletions.
2 changes: 1 addition & 1 deletion check_sentinel/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .check_sentinel import SentinelCheck, SentinelAPI
from .check_sentinel import SentinelCheck, SentinelAPI, SentinelCheckExit
150 changes: 144 additions & 6 deletions check_sentinel/check_sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@

from azure.identity import DefaultAzureCredential
from azure.mgmt.securityinsight import SecurityInsights
from plugnpy import cachemanager
from azure.mgmt.resource import SubscriptionClient
from azure.mgmt.resource.resources import ResourceManagementClient
from azure.mgmt.loganalytics import LogAnalyticsManagementClient

from plugnpy.exception import ParamError, ParamErrorWithHelp, ResultError
from plugnpy.cachemanager import CacheManagerUtils
Expand Down Expand Up @@ -83,6 +87,66 @@ def __init__(self, args):
subscription_id=self.subscription_id,
)

def list_lighthouse_sentinel_workspaces(self):
"""List all Sentinel workspaces accessible via Azure Lighthouse."""
subscriptions_client = SubscriptionClient(self.credentials)
subscriptions = CacheManagerUtils.get_via_cachemanager(
no_cachemanager=False, # TODO: Add argument to disable cache manager
key="subscriptions",
ttl=3600,
func=subscriptions_client.subscriptions.list,
)

sentinel_workspaces = []

for subscription in subscriptions:
subscription_id = subscription.subscription_id
resource_client = ResourceManagementClient(self.credentials, subscription_id)
log_analytics_client = LogAnalyticsManagementClient(self.credentials, subscription_id)

resource_groups = CacheManagerUtils.get_via_cachemanager(
no_cachemanager=False, # TODO: Add argument to disable cache manager
key=f"resource_groups_{subscription_id}",
ttl=3600,
func=resource_client.resource_groups.list,
)

for rg in resource_groups:
resource_group_name = rg.name

workspaces = CacheManagerUtils.get_via_cachemanager(
no_cachemanager=False, # TODO: Add argument to disable cache manager
key=f"workspaces_{subscription_id}_{resource_group_name}",
ttl=300,
func=log_analytics_client.workspaces.list_by_resource_group,
resource_group_name=resource_group_name,
)

for workspace in workspaces:
workspace_name = workspace.name

# Check if Sentinel is enabled on the workspace
sentinel_client = SecurityInsights(
credential=self.credentials,
subscription_id=subscription_id,
)

try:
sentinel_client.entities.list(resource_group_name, workspace_name)
# If no exception, Sentinel is enabled
sentinel_workspaces.append(
{
"subscription_id": subscription_id,
"resource_group_name": resource_group_name,
"workspace_name": workspace_name,
}
)
except Exception:
# Sentinel is not enabled on this workspace
pass

return sentinel_workspaces

def get_incidents(self):
"""Retrieve incidents from Microsoft Sentinel."""
incidents = self.client.incidents.list(
Expand Down Expand Up @@ -115,6 +179,12 @@ def __init__(
self._interval = interval
self._api = SentinelAPI(args)

self._expected_workspaces = set()
if hasattr(args, "expected_workspaces") and args.expected_workspaces:
self._expected_workspaces = set(
[ws.strip() for ws in args.expected_workspaces.split(",")]
)

try:
if metric_type == METRIC_TYPE_CUSTOM:
check_metric_method, num_thresholds = [
Expand Down Expand Up @@ -221,21 +291,68 @@ def check_new_incidents(self):
]
return metrics

# Additional check methods can be added here.
def check_lighthouse_sentinels(self):
"""Check that Sentinel workspaces are accessible via Azure Lighthouse."""
sentinel_workspaces = self.call_api(self._api.list_lighthouse_sentinel_workspaces)
total_workspaces = len(sentinel_workspaces)

label = "Accessible Sentinel Workspaces"
metric_name = self.convert_label_name(label)
warning, critical = self._get_thresholds(0, metric_name)

metrics = [
Metric(
metric_name,
total_workspaces,
self._unit,
warning,
critical,
display_name=label,
summary_precision=0,
perf_data_precision=0,
)
]

# Optionally, check if total_workspaces is less than expected
expected_workspaces = self._expected_workspaces
if expected_workspaces:
expected_count = len(expected_workspaces)
if total_workspaces < expected_count:
sentinel_workspace_names = set(
[w.get("workspace_name") for w in sentinel_workspaces]
)
missing = sorted(expected_workspaces - sentinel_workspace_names)
missing_str = ",".join(missing)
raise SentinelCheckExit(
Metric.STATUS_CRITICAL,
f"Expected {expected_count} workspaces, but found {total_workspaces}. Missing: {missing_str}",
)
elif total_workspaces > expected_count:
sentinel_workspace_names = set(
[w.get("workspace_name") for w in sentinel_workspaces]
)
extra = sorted(sentinel_workspace_names - expected_workspaces)
extra_str = ",".join(extra)
raise SentinelCheckExit(
Metric.STATUS_WARNING,
f"Expected {expected_count} workspaces, but found {total_workspaces}. Extra: {extra_str}",
)

return metrics


# Define how each mode works, pointing to the correct Objects
MODE_MAPPING = {
"Sentinel.Incidents": ModeUsage(
"Custom",
"check_open_incidents; 1",
[], # Optional arguments
[], # No optional arguments
[
"AZURE_SUBSCRIPTION_ID",
"AZURE_RESOURCE_GROUP",
"AZURE_WORKSPACE_NAME",
], # Required arguments
"", # Unit
"",
300,
SentinelCheck,
{},
Expand All @@ -244,19 +361,29 @@ def check_new_incidents(self):
"Sentinel.Incidents": ModeUsage(
"Custom",
"check_new_incidents; 1",
[], # Optional arguments
[], # No optional arguments
[
"AZURE_SUBSCRIPTION_ID",
"AZURE_RESOURCE_GROUP",
"AZURE_WORKSPACE_NAME",
], # Required arguments
"", # Unit
"",
300,
SentinelCheck,
{},
{},
),
"Sentinel.LighthouseCheck": ModeUsage(
"Custom",
"check_lighthouse_sentinels; 1",
["EXPECTED_WORKSPACES"], # Optional argument
[], # No required arguments
"",
300,
SentinelCheck,
{},
{},
),
# Additional modes can be added here.
}

# Define additional arguments that ArgParse can take in if needed by a mode
Expand Down Expand Up @@ -289,6 +416,17 @@ def check_new_incidents(self):
),
],
),
ResourceVariable(
"EXPECTED_WORKSPACES",
"",
arguments=[
ResourceArgument(
"--expected-workspaces",
"Optional comma-separated list of expected workspace names",
"EXPECTED_WORKSPACES",
),
],
),
]


Expand Down
2 changes: 2 additions & 0 deletions check_sentinel/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
git+https://github.com/opsview/plugnpy.git
azure-identity
azure-mgmt-securityinsight
azure-mgmt-resource
azure-mgmt-loganalytics
76 changes: 75 additions & 1 deletion check_sentinel/tests/test_check_sentinel.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
import argparse
from check_sentinel import SentinelCheck
from check_sentinel import SentinelCheck, SentinelCheckExit
from plugnpy import Metric
from plugnpy.exception import ParamError
from unittest.mock import patch
from collections import namedtuple
Expand Down Expand Up @@ -34,6 +35,14 @@ def assert_metrics(metrics, expected_value, expected_display_name=None):
assert metrics[0].display_name == expected_display_name


def create_mock_workspaces(workspace_names):
"""Helper to create mock workspaces from a list of workspace names."""
return [
{"subscription_id": f"sub{i}", "resource_group_name": f"rg{i}", "workspace_name": name}
for i, name in enumerate(workspace_names, start=1)
]


# Tests start here
def test_sentinel_check_init(sentinel_args):
"""Test initialization of SentinelCheck."""
Expand Down Expand Up @@ -74,3 +83,68 @@ def test_check_incidents(
# Call the appropriate method dynamically
metrics = getattr(check, method)()
assert_metrics(metrics, expected_value, display_name)


@patch("check_sentinel.SentinelAPI.list_lighthouse_sentinel_workspaces")
@pytest.mark.parametrize(
"mock_workspace_names, expected_workspaces_arg, expected_value, expected_status, expected_message",
[
# Test case 1: No accessible workspaces, no expected workspaces
([], "", 0, Metric.STATUS_OK, None),
# Test case 2: Accessible workspaces match expected workspaces
(["ws1", "ws2"], "ws1,ws2", 2, Metric.STATUS_OK, None),
# Test case 3: Some expected workspaces missing
(
["ws1"],
"ws1,ws2",
1,
Metric.STATUS_CRITICAL,
"Expected 2 workspaces, but found 1. Missing: ws2",
),
# Test case 4: No accessible workspaces, but expected workspaces provided
([], "ws1", 0, Metric.STATUS_CRITICAL, "Expected 1 workspaces, but found 0. Missing: ws1"),
# Test case 5: More accessible workspaces than expected
(
["ws1", "ws2", "ws3"],
"ws1,ws2",
3,
Metric.STATUS_WARNING,
"Expected 2 workspaces, but found 3. Extra: ws3",
),
# Test case 6: No expected workspaces provided
(["ws1", "ws2"], "", 2, Metric.STATUS_OK, None),
],
)
def test_check_lighthouse_sentinels(
mock_list_workspaces,
sentinel_args,
mock_workspace_names,
expected_workspaces_arg,
expected_value,
expected_status,
expected_message,
):
"""Test the check_lighthouse_sentinels method with various scenarios."""
# Mock data
mock_workspaces = create_mock_workspaces(mock_workspace_names)
mock_list_workspaces.return_value = mock_workspaces

# Set expected workspaces
sentinel_args.expected_workspaces = expected_workspaces_arg

# Initialize SentinelCheck
check = SentinelCheck({}, {}, "Custom", "check_lighthouse_sentinels; 1", "", 300, sentinel_args)

# Call the method and handle exceptions
if expected_status == Metric.STATUS_OK:
metrics = check.check_lighthouse_sentinels()
assert_metrics(
metrics,
expected_value=expected_value,
expected_display_name="Accessible Sentinel Workspaces",
)
else:
with pytest.raises(SentinelCheckExit) as exc_info:
check.check_lighthouse_sentinels()
assert exc_info.value.status == expected_status
assert expected_message in exc_info.value.message

0 comments on commit e6a3b8c

Please sign in to comment.