Skip to content

Commit

Permalink
Updates to management of services (#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
mamullen13316 authored May 8, 2024
1 parent 1a413e3 commit c1f379e
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 16 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "sophosfirewall-python"
packages = [
{ include = "sophosfirewall_python" },
]
version = "0.1.35"
version = "0.1.36"
description = "Python SDK for Sophos Firewall"
authors = ["Matt Mullen <[email protected]>"]
readme = "README.md"
Expand Down
82 changes: 76 additions & 6 deletions sophosfirewall_python/firewallapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,21 +828,22 @@ def create_ip_range(
def create_service(
self,
name: str,
port: str,
protocol: str,
service_list: list[dict],
debug: bool = False,
):
"""Create a TCP or UDP service
Args:
name (str): Service name
port (str): TCP/UDP port
name (str): Service name
service_list(list): List of dictionaries containing the below keys for each port/proto pair
src_port (str, optional): Source TCP/UDP port. Default=1:65535.
dst_port (str): Destination TCP/UDP port
protocol (str): TCP or UDP
debug (bool, optional): Enable debug mode. Defaults to False.
debug (bool, optional): Enable debug mode. Defaults to False.
Returns:
dict: XML response converted to Python dictionary
"""
params = {"name": name, "port": port, "protocol": protocol}
params = {"name": name, "service_list": service_list}
resp = self.submit_template(
"createservice.j2", template_vars=params, debug=debug
)
Expand Down Expand Up @@ -1027,6 +1028,75 @@ def update_urlgroup(
)
return resp

def update_service(
self, name: str, service_list: list[dict], action: str = "add", debug: bool = False
):
"""Add or remove a service entry to/from a service
Args:
name (str): Service name.
service_list (list[dict]): List of dicts containing port/protocol pairs to be added or removed.
src_port(str, optional): Source TCP/UDP port range. Default=1:65535.
dst_port(str): Destination TCP/UDP port range.
protocol(str): TCP or UDP
action (str): Options are 'add', 'remove' or 'replace'. Defaults to 'add'.
debug (bool, optional): Enable debug mode. Defaults to False.
Returns:
dict: XML response converted to Python dictionary
"""
if not isinstance(service_list, list):
raise SophosFirewallInvalidArgument(
"The update_service() argument `service_list` must be of type list!"
)

if action:
self._validate_arg(
arg_name="action",
arg_value=action,
valid_choices=["add", "remove", "replace"],
)

# Get the existing Service list first
resp = self.get_service(name=name)
if "ServiceDetail" in resp["Response"]["Services"]["ServiceDetails"]:
exist_list = (
resp.get("Response").get("Services").get("ServiceDetails").get("ServiceDetail")
)
else:
exist_list = None

# Add src_port to input if not present
for service in service_list:
if not "src_port" in service:
service["src_port"] = "1:65535"
if action == "replace":
exist_list = None
new_service_list = []
if exist_list:
if isinstance(exist_list, dict):
new_service_list.append({"src_port": exist_list["SourcePort"],
"dst_port": exist_list["DestinationPort"],
"protocol": exist_list["Protocol"]})
elif isinstance(exist_list, list):
for service in exist_list:
new_service_list.append({"src_port": service["SourcePort"],
"dst_port": service["DestinationPort"],
"protocol": service["Protocol"]})
for service in service_list:
if action.lower() == "add" and service not in new_service_list:
new_service_list.append(service)
elif action.lower() == "remove" and service in new_service_list:
new_service_list.remove(service)
elif action.lower() == "replace":
new_service_list.append(service)

params = {"name": name, "service_list": new_service_list}
resp = self.submit_template(
"updateservice.j2", template_vars=params, debug=debug
)
return resp

def update_ip_hostgroup(
self,
name: str,
Expand Down
8 changes: 5 additions & 3 deletions sophosfirewall_python/templates/createservice.j2
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
<Name>{{ name }}</Name>
<Type>TCPorUDP</Type>
<ServiceDetails>
{% for service in service_list %}
<ServiceDetail>
<SourcePort>1:65535</SourcePort>
<DestinationPort>{{ port }}</DestinationPort>
<Protocol>{{ protocol }}</Protocol>
<SourcePort>{{ service.get(src_port, '1:65535') }}</SourcePort>
<DestinationPort>{{ service.dst_port }}</DestinationPort>
<Protocol>{{ service.protocol }}</Protocol>
</ServiceDetail>
{% endfor %}
</ServiceDetails>
</Services>
</Set>
Expand Down
21 changes: 21 additions & 0 deletions sophosfirewall_python/templates/updateservice.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<Request>
<Login>
<Username>{{username}}</Username>
<Password>{{password}}</Password>
</Login>
<Set operation="update">
<Services transactionid="">
<Name>{{ name }}</Name>
<Type>TCPorUDP</Type>
<ServiceDetails>
{% for service in service_list %}
<ServiceDetail>
<SourcePort>{{ service.get(src_port, '1:65535') }}</SourcePort>
<DestinationPort>{{ service.dst_port }}</DestinationPort>
<Protocol>{{ service.protocol }}</Protocol>
</ServiceDetail>
{% endfor %}
</ServiceDetails>
</Services>
</Set>
</Request>
97 changes: 91 additions & 6 deletions sophosfirewall_python/tests/functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ def test_login(setup):
"Login": {"status": "Authentication Successful"},
}
}
if float(API_VERSION) >= 2000.2:
expected_result["Response"]["@IS_WIFI6"] = "0"

assert setup.login() == expected_result

Expand All @@ -105,6 +107,8 @@ def test_create_ip_host(setup):
},
}
}
if float(API_VERSION) >= 2000.2:
expected_result["Response"]["@IS_WIFI6"] = "0"

hosts = [
{"name": "FUNC_TESTHOST1", "ip": "1.1.1.1"},
Expand Down Expand Up @@ -134,6 +138,8 @@ def test_create_ip_hostgroup(setup):
},
}
}
if float(API_VERSION) >= 2000.2:
expected_result["Response"]["@IS_WIFI6"] = "0"

assert (
setup.create_ip_hostgroup(
Expand Down Expand Up @@ -162,6 +168,8 @@ def test_create_ip_network(setup):
},
}
}
if float(API_VERSION) >= 2000.2:
expected_result["Response"]["@IS_WIFI6"] = "0"

assert (
setup.create_ip_network(
Expand Down Expand Up @@ -190,6 +198,8 @@ def test_create_ip_range(setup):
},
}
}
if float(API_VERSION) >= 2000.2:
expected_result["Response"]["@IS_WIFI6"] = "0"

assert (
setup.create_ip_range(
Expand All @@ -216,9 +226,11 @@ def test_create_service(setup):
},
}
}
if float(API_VERSION) >= 2000.2:
expected_result["Response"]["@IS_WIFI6"] = "0"

assert (
setup.create_service(name="FUNC_TESTSVC1", port=1234, protocol="tcp")
setup.create_service(name="FUNC_TESTSVC1", service_list=[{"dst_port": 1234, "protocol": "tcp"}])
== expected_result
)

Expand All @@ -240,6 +252,8 @@ def test_create_rule(setup):
},
}
}
if float(API_VERSION) >= 2000.2:
expected_result["Response"]["@IS_WIFI6"] = "0"

rule_params = dict(
rulename="FUNC_TESTRULE1",
Expand Down Expand Up @@ -274,6 +288,8 @@ def test_create_urlgroup(setup):
},
}
}
if float(API_VERSION) >= 2000.2:
expected_result["Response"]["@IS_WIFI6"] = "0"

assert (
setup.create_urlgroup(
Expand All @@ -300,6 +316,8 @@ def test_create_user(setup):
},
}
}
if float(API_VERSION) >= 2000.2:
expected_result["Response"]["@IS_WIFI6"] = "0"

assert (
setup.create_user(
Expand Down Expand Up @@ -333,10 +351,12 @@ def test_update_ip_hostgroup(setup):
},
}
}
if float(API_VERSION) >= 2000.2:
update_result["Response"]["@IS_WIFI6"] = "0"

get_result = {
"Response": {
"@APIVersion": "2000.1",
"@APIVersion": API_VERSION,
"@IPS_CAT_VER": "1",
"Login": {"status": "Authentication Successful"},
"IPHostGroup": {
Expand All @@ -348,6 +368,8 @@ def test_update_ip_hostgroup(setup):
},
}
}
if float(API_VERSION) >= 2000.2:
get_result["Response"]["@IS_WIFI6"] = "0"

assert (
setup.update_ip_hostgroup(name="FUNC_TESTGROUP1", host_list=["FUNC_TESTHOST2"])
Expand All @@ -374,10 +396,12 @@ def test_update_urlgroup(setup):
},
}
}
if float(API_VERSION) >= 2000.2:
update_result["Response"]["@IS_WIFI6"] = "0"

get_result = {
"Response": {
"@APIVersion": "2000.1",
"@APIVersion": API_VERSION,
"@IPS_CAT_VER": "1",
"Login": {"status": "Authentication Successful"},
"WebFilterURLGroup": {
Expand All @@ -389,6 +413,8 @@ def test_update_urlgroup(setup):
},
}
}
if float(API_VERSION) >= 2000.2:
get_result["Response"]["@IS_WIFI6"] = "0"

assert (
setup.update_urlgroup(name="FUNC_URLGROUP1", domain_list=["test3.com"])
Expand All @@ -397,6 +423,60 @@ def test_update_urlgroup(setup):

assert setup.get_urlgroup(name="FUNC_URLGROUP1") == get_result

def test_update_service(setup):
"""Test update_service method."""

update_result = {
"Response": {
"@APIVersion": API_VERSION,
"@IPS_CAT_VER": "1",
"Login": {"status": "Authentication Successful"},
"Services": {
"@transactionid": "",
"Status": {
"@code": "200",
"#text": "Configuration applied successfully.",
},
},
}
}
if float(API_VERSION) >= 2000.2:
update_result["Response"]["@IS_WIFI6"] = "0"

get_result = {
"Response": {
"@APIVersion": API_VERSION,
"@IPS_CAT_VER": "1",
"Login": {"status": "Authentication Successful"},
"Services": {
"@transactionid": "",
"Name": "FUNC_TESTSVC1",
"Description": None,
"Type": "TCPorUDP",
"ServiceDetails": {
"ServiceDetail": [{
"SourcePort": "1:65535",
"DestinationPort": "1234",
"Protocol": "TCP"
},
{
"SourcePort": "1:65535",
"DestinationPort": "2222",
"Protocol": "TCP"
}]
},
},
}
}
if float(API_VERSION) >= 2000.2:
get_result["Response"]["@IS_WIFI6"] = "0"

assert (
setup.update_service(name="FUNC_TESTSVC1", service_list=[{"dst_port": "2222","protocol": "TCP"}])
== update_result
)

assert setup.get_service(name="FUNC_TESTSVC1") == get_result

def test_update_service_acl(setup):
"""Test update_service_acl method."""
Expand All @@ -415,10 +495,12 @@ def test_update_service_acl(setup):
},
}
}
if float(API_VERSION) >= 2000.2:
update_result["Response"]["@IS_WIFI6"] = "0"

get_result = {
"Response": {
"@APIVersion": "2000.1",
"@APIVersion": API_VERSION,
"@IPS_CAT_VER": "1",
"Login": {"status": "Authentication Successful"},
"LocalServiceACL": {
Expand All @@ -430,10 +512,10 @@ def test_update_service_acl(setup):
"SourceZone": "Any",
"Hosts": {
"Host": [
"Sophos Internal ACL",
"Sophos External ACL",
"All EAA Hosts",
"FUNC_TESTHOST1",
"Sophos Internal ACL",
"Sophos External ACL",
]
},
"Services": {
Expand All @@ -450,6 +532,9 @@ def test_update_service_acl(setup):
},
}
}
if float(API_VERSION) >= 2000.2:
get_result["Response"]["@IS_WIFI6"] = "0"
get_result["Response"]["LocalServiceACL"]["Services"]["Service"].pop(3) # Removal of extra 'ping' in response

assert setup.update_service_acl(host_list=["FUNC_TESTHOST1"]) == update_result

Expand Down
3 changes: 3 additions & 0 deletions sophosfirewall_python/tests/pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[pytest]
# prevents variable values from printing which may contain sensitive info
addopts = --tb=short

0 comments on commit c1f379e

Please sign in to comment.