From f8a3bf7639977456b6cf9a7c12b5cc5779e155ca Mon Sep 17 00:00:00 2001 From: Matt Mullen Date: Tue, 12 Dec 2023 12:13:39 -0500 Subject: [PATCH 1/2] feat: add update method --- sophosfirewall_python/firewallapi.py | 44 ++++++++++++- sophosfirewall_python/unittests.py | 98 ++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+), 2 deletions(-) diff --git a/sophosfirewall_python/firewallapi.py b/sophosfirewall_python/firewallapi.py index b574983..95f3c1c 100644 --- a/sophosfirewall_python/firewallapi.py +++ b/sophosfirewall_python/firewallapi.py @@ -42,6 +42,7 @@ class SophosFirewallOperatorError(Exception): + class SophosFirewall: """Class used for interacting with the Sophos Firewall XML API""" @@ -279,7 +280,45 @@ def remove(self, xml_tag: str, name: str, output_format: str = "dict"): if output_format == "xml": return resp.content.decode() return xmltodict.parse(resp.content.decode()) + + def update(self, xml_tag: str, name: str, update_params: dict, output_format: str = "dict"): + """Update an existing object on the firewall. + Args: + xml_tag (str): The XML tag indicating the type of object to be updated. + name (str): The name of the object to be updated. + update_params (dict): Keys/values to be updated. Keys must match an existing XML key. + Use API docs and/or `get_tag_with_filter` with `output_format="xml"` to see valid XML keys. + output_format(str): Output format. Valid options are "dict" or "xml". Defaults to dict. + """ + resp = self.get_tag_with_filter( + xml_tag=xml_tag, + key="Name", + value=name, + operator="=") + + for key in update_params: + resp["Response"][xml_tag][key] = update_params[key] + + update_body = {} + update_body[xml_tag]=resp["Response"][xml_tag] + xml_update_body = xmltodict.unparse(update_body, pretty=True).lstrip('') + payload = f""" + + + {self.username} + {self.password} + + + {xml_update_body} + + + """ + resp = self._post(xmldata=payload) + self._error_check(resp, xml_tag) + if output_format == "xml": + return resp.content.decode() + return xmltodict.parse(resp.content.decode()) def _dict_to_lower(self, target_dict): """Convert the keys of a dictionary to lower-case @@ -310,12 +349,13 @@ def _error_check(self, api_response, xml_tag): if "Status" in resp_dict: if resp_dict["Status"] == "Number of records Zero." or resp_dict["Status"] == "No. of records Zero.": raise SophosFirewallZeroRecords(resp_dict["Status"]) + if "@code" in resp_dict["Status"]: + if not resp_dict["Status"]["@code"].startswith("2"): + raise SophosFirewallAPIError(f"{resp_dict['Status']['@code']}: {resp_dict['Status']['#text']}") else: raise SophosFirewallAPIError( str(xmltodict.parse(api_response.content.decode())) ) - - # METHODS FOR OBJECT RETRIEVAL (GET) diff --git a/sophosfirewall_python/unittests.py b/sophosfirewall_python/unittests.py index 0d3f86f..21b930a 100644 --- a/sophosfirewall_python/unittests.py +++ b/sophosfirewall_python/unittests.py @@ -758,3 +758,101 @@ def test_get_ip_host_queryparams(self, mocked_post): } } assert self.fw.get_ip_host(name="TEST1") == expected_result + + @patch.object(SophosFirewall, "_post") + def test_remove(self, mocked_post): + """Test remove() method""" + mock_response = Mock() + mock_response.content = ( + """ + + + + Authentication Successful + + + Configuration applied successfully. + + + """.replace( + "\n", "" + ) + .strip() + .encode() + ) + + mocked_post.return_value = mock_response + + expected_result = { + "Response": { + "@APIVersion": "2000.1", + "@IPS_CAT_VER": "1", + "Login": {"status": "Authentication Successful"}, + "IPHost": { + "@transactionid": "", + "Status": { + "@code": "200", + "#text": "Configuration applied successfully.", + }, + }, + } + } + + assert self.fw.remove(xml_tag="IPHost", name='TESTHOST') == expected_result + + + @patch.object(SophosFirewall, "_post") + @patch.object(SophosFirewall, "get_tag_with_filter") + def test_update(self, mocked_get_tag_with_filter, mocked_post): + """Test update() method""" + mock_get = MagicMock() + mock_get.__getitem__.content = {'Response': + {'@APIVersion': '2000.1', + '@IPS_CAT_VER': '1', + 'Login': {'status': 'Authentication Successful'}, + 'IPHost': {'@transactionid': '', + 'Name': 'TESTHOST', + 'IPFamily': 'IPv4', + 'HostType': 'IP', + 'IPAddress': '1.1.1.1'} + } + } + + mock_response = Mock() + mock_response.content = ( + """ + + + + Authentication Successful + + + Configuration applied successfully. + + + """.replace( + "\n", "" + ) + .strip() + .encode() + ) + + mocked_get_tag_with_filter.return_value = mock_get + mocked_post.return_value = mock_response + + expected_result = { + "Response": { + "@APIVersion": "2000.1", + "@IPS_CAT_VER": "1", + "Login": {"status": "Authentication Successful"}, + "IPHost": { + "@transactionid": "", + "Status": { + "@code": "200", + "#text": "Configuration applied successfully.", + }, + }, + } + } + + assert self.fw.update(xml_tag="IPHost", name='TESTHOST', update_params={"IPAddress": "2.2.2.2"}) == expected_result \ No newline at end of file From f3a6e5921bdad92f53aec664070ec68f1b13ce0e Mon Sep 17 00:00:00 2001 From: Matt Mullen Date: Tue, 12 Dec 2023 12:14:02 -0500 Subject: [PATCH 2/2] version: update to v0.1.24 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 7e1fa3a..874be12 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,7 @@ name = "sophosfirewall-python" packages = [ { include = "sophosfirewall_python" }, ] -version = "0.1.23" +version = "0.1.24" description = "Python SDK for Sophos Firewall" authors = ["Matt Mullen "] readme = "README.md"