Skip to content

Commit

Permalink
Feature: Add update method (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
mamullen13316 authored Dec 12, 2023
1 parent 43fd59e commit 5a81f0b
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 3 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "sophosfirewall-python"
packages = [
{ include = "sophosfirewall_python" },
]
version = "0.1.23"
version = "0.1.24"
description = "Python SDK for Sophos Firewall"
authors = ["Matt Mullen <[email protected]>"]
readme = "README.md"
Expand Down
44 changes: 42 additions & 2 deletions sophosfirewall_python/firewallapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class SophosFirewallOperatorError(Exception):




class SophosFirewall:
"""Class used for interacting with the Sophos Firewall XML API"""

Expand Down Expand Up @@ -279,7 +280,45 @@ def remove(self, xml_tag: str, name: str, output_format: str = "dict"):
if output_format == "xml":
return resp.content.decode()
return xmltodict.parse(resp.content.decode())

def update(self, xml_tag: str, name: str, update_params: dict, output_format: str = "dict"):
"""Update an existing object on the firewall.
Args:
xml_tag (str): The XML tag indicating the type of object to be updated.
name (str): The name of the object to be updated.
update_params (dict): Keys/values to be updated. Keys must match an existing XML key.
Use API docs and/or `get_tag_with_filter` with `output_format="xml"` to see valid XML keys.
output_format(str): Output format. Valid options are "dict" or "xml". Defaults to dict.
"""
resp = self.get_tag_with_filter(
xml_tag=xml_tag,
key="Name",
value=name,
operator="=")

for key in update_params:
resp["Response"][xml_tag][key] = update_params[key]

update_body = {}
update_body[xml_tag]=resp["Response"][xml_tag]
xml_update_body = xmltodict.unparse(update_body, pretty=True).lstrip('<?xml version="1.0" encoding="utf-8"?>')
payload = f"""
<Request>
<Login>
<Username>{self.username}</Username>
<Password>{self.password}</Password>
</Login>
<Set operation="update">
{xml_update_body}
</Set>
</Request>
"""
resp = self._post(xmldata=payload)
self._error_check(resp, xml_tag)
if output_format == "xml":
return resp.content.decode()
return xmltodict.parse(resp.content.decode())

def _dict_to_lower(self, target_dict):
"""Convert the keys of a dictionary to lower-case
Expand Down Expand Up @@ -310,12 +349,13 @@ def _error_check(self, api_response, xml_tag):
if "Status" in resp_dict:
if resp_dict["Status"] == "Number of records Zero." or resp_dict["Status"] == "No. of records Zero.":
raise SophosFirewallZeroRecords(resp_dict["Status"])
if "@code" in resp_dict["Status"]:
if not resp_dict["Status"]["@code"].startswith("2"):
raise SophosFirewallAPIError(f"{resp_dict['Status']['@code']}: {resp_dict['Status']['#text']}")
else:
raise SophosFirewallAPIError(
str(xmltodict.parse(api_response.content.decode()))
)



# METHODS FOR OBJECT RETRIEVAL (GET)

Expand Down
98 changes: 98 additions & 0 deletions sophosfirewall_python/unittests.py
Original file line number Diff line number Diff line change
Expand Up @@ -758,3 +758,101 @@ def test_get_ip_host_queryparams(self, mocked_post):
}
}
assert self.fw.get_ip_host(name="TEST1") == expected_result

@patch.object(SophosFirewall, "_post")
def test_remove(self, mocked_post):
"""Test remove() method"""
mock_response = Mock()
mock_response.content = (
"""
<?xml version="1.0" encoding="utf-8"?>
<Response APIVersion="2000.1" IPS_CAT_VER="1">
<Login>
<status>Authentication Successful</status>
</Login>
<IPHost transactionid="">
<Status code="200">Configuration applied successfully.</Status>
</IPHost>
</Response>
""".replace(
"\n", ""
)
.strip()
.encode()
)

mocked_post.return_value = mock_response

expected_result = {
"Response": {
"@APIVersion": "2000.1",
"@IPS_CAT_VER": "1",
"Login": {"status": "Authentication Successful"},
"IPHost": {
"@transactionid": "",
"Status": {
"@code": "200",
"#text": "Configuration applied successfully.",
},
},
}
}

assert self.fw.remove(xml_tag="IPHost", name='TESTHOST') == expected_result


@patch.object(SophosFirewall, "_post")
@patch.object(SophosFirewall, "get_tag_with_filter")
def test_update(self, mocked_get_tag_with_filter, mocked_post):
"""Test update() method"""
mock_get = MagicMock()
mock_get.__getitem__.content = {'Response':
{'@APIVersion': '2000.1',
'@IPS_CAT_VER': '1',
'Login': {'status': 'Authentication Successful'},
'IPHost': {'@transactionid': '',
'Name': 'TESTHOST',
'IPFamily': 'IPv4',
'HostType': 'IP',
'IPAddress': '1.1.1.1'}
}
}

mock_response = Mock()
mock_response.content = (
"""
<?xml version="1.0" encoding="utf-8"?>
<Response APIVersion="2000.1" IPS_CAT_VER="1">
<Login>
<status>Authentication Successful</status>
</Login>
<IPHost transactionid="">
<Status code="200">Configuration applied successfully.</Status>
</IPHost>
</Response>
""".replace(
"\n", ""
)
.strip()
.encode()
)

mocked_get_tag_with_filter.return_value = mock_get
mocked_post.return_value = mock_response

expected_result = {
"Response": {
"@APIVersion": "2000.1",
"@IPS_CAT_VER": "1",
"Login": {"status": "Authentication Successful"},
"IPHost": {
"@transactionid": "",
"Status": {
"@code": "200",
"#text": "Configuration applied successfully.",
},
},
}
}

assert self.fw.update(xml_tag="IPHost", name='TESTHOST', update_params={"IPAddress": "2.2.2.2"}) == expected_result

0 comments on commit 5a81f0b

Please sign in to comment.