From e3d13952d7546020cf6547768ba3a96934729244 Mon Sep 17 00:00:00 2001 From: Christian Doll Date: Thu, 13 Jun 2019 15:25:09 +0200 Subject: [PATCH 1/6] Added capabilities for validating inter-observable patterns. --- stix2matcher/matcher.py | 75 ++++++++++++++++++- .../test/test_inter_observable_expr.py | 70 +++++++++++++++++ 2 files changed, 144 insertions(+), 1 deletion(-) create mode 100644 stix2matcher/test/test_inter_observable_expr.py diff --git a/stix2matcher/matcher.py b/stix2matcher/matcher.py index 99de0a0..8b73b75 100644 --- a/stix2matcher/matcher.py +++ b/stix2matcher/matcher.py @@ -3,6 +3,7 @@ import argparse import base64 import binascii +import copy import datetime import io import itertools @@ -959,6 +960,10 @@ def __init__(self, observed_data_sdos, verbose=False): # Holds intermediate results self.__compute_stack = [] + # Holds each value observed for a named capture group within regexes. + # Used to check for inter-observable patterns. + self.interobs_group_matches = {} + def __push(self, val, label=None): """Utility for pushing a value onto the compute stack. In verbose mode, show what's being pushed. 'label' lets you prefix @@ -1764,6 +1769,17 @@ def regex_pred(value): passed_obs = _obs_map_prop_test(obs_values, regex_pred) + # store which matching group (defined in interobs pattern) matched on what string for each Observable + for obs_id, container in obs_values.items(): + value = str(container[list(container.keys())[0]][0]) + for match in compiled_re.finditer(value): + for group, val in match.groupdict().items(): + grp_dict = self.interobs_group_matches.get(group, {}) + obs_list = grp_dict.get(obs_id, []) + obs_list.append(val) if val not in obs_list else None + grp_dict[obs_id] = obs_list + self.interobs_group_matches[group] = grp_dict + self.__push(passed_obs, debug_label) def exitPropTestIsSubset(self, ctx): @@ -2128,12 +2144,69 @@ def match(self, observed_data_sdos, verbose=False): found_bindings = matcher.matched() if found_bindings: - matching_sdos = matcher.get_sdos_from_binding(found_bindings[0]) + valid_bindings = self.validate_inter_observable_patterns(matcher, found_bindings) + if valid_bindings: + matching_sdos = valid_bindings[0] + else: + matching_sdos = [] else: matching_sdos = [] return matching_sdos + @staticmethod + def validate_inter_observable_patterns(matcher, found_bindings): + """ + remove elements from found_bindings, if they violate the inter-observable patterns + this will remove elements if: + - within the same observable multiple values are found for the same matching group + - across observables different values are found for the same matching group + """ + def is_valid_inter_obserbable(found_binding, interobs_group_matches): + if len(found_binding) == 1: + # only one SDO available; if multiple values are observed for one inter-observable, + # the pattern does not evaluate to True. + for _, sdo_ids in interobs_group_matches.items(): + for _, values in sdo_ids.items(): + if len(values) > 1: + return False + else: + # multiple SDOs, check if binding fulfils inter-observable patterns + check = {} + for io_grp, sdo_ids in interobs_group_matches.items(): + for _, values in sdo_ids.items(): + grp = check.get(io_grp, None) + if grp is None: + # first iteration, take all values + grp = values + else: + # check intersection of values; if none remain, pattern does not hold + grp = list(set(grp) & set(values)) + if not grp: + return False + check[io_grp] = grp + return True + + if not matcher.interobs_group_matches: + return [matcher.get_sdos_from_binding(sdo) for sdo in found_bindings] + else: + for found_binding in found_bindings[:]: + inter_obs = copy.deepcopy(matcher.interobs_group_matches) + + # prepare dictionary - only use relevant sdos for the current binding + for io_grp, sdo_ids in list(inter_obs.items()): + for sdo_id, _ in list(sdo_ids.items()): + if sdo_id not in found_binding: + del sdo_ids[sdo_id] + if not sdo_ids: + del inter_obs[io_grp] + + if not is_valid_inter_obserbable(found_binding, inter_obs): + found_bindings.remove(found_binding) + if found_bindings: + return [matcher.get_sdos_from_binding(sdo) for sdo in found_bindings] + return [] + def match(pattern, observed_data_sdos, verbose=False): """ diff --git a/stix2matcher/test/test_inter_observable_expr.py b/stix2matcher/test/test_inter_observable_expr.py new file mode 100644 index 0000000..033a252 --- /dev/null +++ b/stix2matcher/test/test_inter_observable_expr.py @@ -0,0 +1,70 @@ +import pytest +from stix2matcher.matcher import match + +_observations = [ + { + "type": "observed-data", + "first_observed": "2004-10-11T21:44:58Z", + "last_observed": "2004-10-11T21:44:58Z", + "number_observed": 1, + "objects": { + "0": { + "type": u"person", + "name": u"alice", + "place": u"earth" + } + } + }, + { + "type": "observed-data", + "first_observed": "2008-05-09T01:21:58.6Z", + "last_observed": "2008-05-09T01:21:58.6Z", + "number_observed": 1, + "objects": { + "0": { + "type": u"person", + "name": u"malice", + "place": u"moontown" + } + } + }, + { + "type": "observed-data", + "first_observed": "2006-11-03T07:42:18.96Z", + "last_observed": "2006-11-03T07:42:18.96Z", + "number_observed": 1, + "objects": { + "0": { + "type": u"person", + "name": u"bob", + "city_ref": u"1" + }, + "1": { + "type": u"city", + "name": u"bobtown" + } + } + } +] + + +@pytest.mark.parametrize("pattern", [ + "[person:name MATCHES '(?P[a-z]+)'] AND [person:name MATCHES 'm(?P[a-z]+)']", # same value across observables (name of person B is the same as person A, but with a leading 'm') + "[person:name MATCHES '(?P[a-z]+)' AND person:city_ref.name MATCHES '(?P[a-z]+)town']", # same value across properties (home of person is its name plus 'town'-suffix) + "[person:name MATCHES '(?P[a-z]).(?P=v3)']", # same value within a property (first letter of name is the same as third letter) + # the following three patterns are the equivalent ones to the negative assertions below, only without interobs patterns + "[person:name MATCHES '[a-z]+'] AND [person:name MATCHES '[a-z]+']", + "[person:name MATCHES '[a-z]+' AND person:city_ref.name MATCHES '[a-z]+']", + "[person:name MATCHES '[a-z][a-z]']", +]) +def test_observation_ops_match(pattern): + assert match(pattern, _observations) + + +@pytest.mark.parametrize("pattern", [ + "[person:name MATCHES '(?P[a-z]+)'] AND [person:name MATCHES '(?P[a-z]+)']", # same value across observables (two persons with the same name) + "[person:name MATCHES '(?P[a-z]+)' AND person:city_ref.name MATCHES '(?P[a-z]+)']", # same value across properties (home of person is the same as name) + "[person:name MATCHES '(?P[a-z])(?P=v3)']", # same value within a property (first letter of name is the same as second letter) +]) +def test_observation_ops_nomatch(pattern): + assert not match(pattern, _observations) From 4908cf9f161165a05f7628cef2a7d06658e48738 Mon Sep 17 00:00:00 2001 From: Christian Doll Date: Thu, 13 Jun 2019 15:32:04 +0200 Subject: [PATCH 2/6] Cleaned up tests for inter observable patterns. --- stix2matcher/test/test_inter_observable_expr.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/stix2matcher/test/test_inter_observable_expr.py b/stix2matcher/test/test_inter_observable_expr.py index 033a252..16716fa 100644 --- a/stix2matcher/test/test_inter_observable_expr.py +++ b/stix2matcher/test/test_inter_observable_expr.py @@ -49,22 +49,18 @@ @pytest.mark.parametrize("pattern", [ - "[person:name MATCHES '(?P[a-z]+)'] AND [person:name MATCHES 'm(?P[a-z]+)']", # same value across observables (name of person B is the same as person A, but with a leading 'm') - "[person:name MATCHES '(?P[a-z]+)' AND person:city_ref.name MATCHES '(?P[a-z]+)town']", # same value across properties (home of person is its name plus 'town'-suffix) - "[person:name MATCHES '(?P[a-z]).(?P=v3)']", # same value within a property (first letter of name is the same as third letter) - # the following three patterns are the equivalent ones to the negative assertions below, only without interobs patterns - "[person:name MATCHES '[a-z]+'] AND [person:name MATCHES '[a-z]+']", - "[person:name MATCHES '[a-z]+' AND person:city_ref.name MATCHES '[a-z]+']", - "[person:name MATCHES '[a-z][a-z]']", + "[person:name MATCHES '(?P[a-z]+)'] AND [person:name MATCHES 'm(?P[a-z]+)']", # same value across observables (name of person B is the same as person A, but with a leading 'm') + "[person:name MATCHES '(?P[a-z]+)' AND person:city_ref.name MATCHES '(?P[a-z]+)town']", # same value across properties (home of person is its name plus 'town'-suffix) + "[person:name MATCHES '(?P[a-z]).(?P=v3)']", # same value within a property (first letter of name is the same as third letter) ]) def test_observation_ops_match(pattern): assert match(pattern, _observations) @pytest.mark.parametrize("pattern", [ - "[person:name MATCHES '(?P[a-z]+)'] AND [person:name MATCHES '(?P[a-z]+)']", # same value across observables (two persons with the same name) - "[person:name MATCHES '(?P[a-z]+)' AND person:city_ref.name MATCHES '(?P[a-z]+)']", # same value across properties (home of person is the same as name) - "[person:name MATCHES '(?P[a-z])(?P=v3)']", # same value within a property (first letter of name is the same as second letter) + "[person:name MATCHES '(?P[a-z]+)'] AND [person:name MATCHES '(?P[a-z]+)']", # same value across observables (two persons with the same name) + "[person:name MATCHES '(?P[a-z]+)' AND person:city_ref.name MATCHES '(?P[a-z]+)']", # same value across properties (home of person is the same as name) + "[person:name MATCHES '(?P[a-z])(?P=v3)']", # same value within a property (first letter of name is the same as second letter) ]) def test_observation_ops_nomatch(pattern): assert not match(pattern, _observations) From 91fb0f62212fd7b672a81f8f2a82ac590651e9eb Mon Sep 17 00:00:00 2001 From: Christian Doll Date: Mon, 17 Jun 2019 15:40:25 +0200 Subject: [PATCH 3/6] fixed formatting --- .../test/test_inter_observable_expr.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/stix2matcher/test/test_inter_observable_expr.py b/stix2matcher/test/test_inter_observable_expr.py index 16716fa..6f25b54 100644 --- a/stix2matcher/test/test_inter_observable_expr.py +++ b/stix2matcher/test/test_inter_observable_expr.py @@ -49,18 +49,24 @@ @pytest.mark.parametrize("pattern", [ - "[person:name MATCHES '(?P[a-z]+)'] AND [person:name MATCHES 'm(?P[a-z]+)']", # same value across observables (name of person B is the same as person A, but with a leading 'm') - "[person:name MATCHES '(?P[a-z]+)' AND person:city_ref.name MATCHES '(?P[a-z]+)town']", # same value across properties (home of person is its name plus 'town'-suffix) - "[person:name MATCHES '(?P[a-z]).(?P=v3)']", # same value within a property (first letter of name is the same as third letter) + # same value across observables (name of person B is the same as person A, but with a leading 'm') + "[person:name MATCHES '(?P[a-z]+)'] AND [person:name MATCHES 'm(?P[a-z]+)']", + # same value across properties (home of person is its name plus 'town'-suffix) + "[person:name MATCHES '(?P[a-z]+)' AND person:city_ref.name MATCHES '(?P[a-z]+)town']", + # same value within a property (first letter of name is the same as third letter) + "[person:name MATCHES '(?P[a-z]).(?P=v3)']", ]) def test_observation_ops_match(pattern): assert match(pattern, _observations) @pytest.mark.parametrize("pattern", [ - "[person:name MATCHES '(?P[a-z]+)'] AND [person:name MATCHES '(?P[a-z]+)']", # same value across observables (two persons with the same name) - "[person:name MATCHES '(?P[a-z]+)' AND person:city_ref.name MATCHES '(?P[a-z]+)']", # same value across properties (home of person is the same as name) - "[person:name MATCHES '(?P[a-z])(?P=v3)']", # same value within a property (first letter of name is the same as second letter) + # same value across observables (two persons with the same name) + "[person:name MATCHES '(?P[a-z]+)'] AND [person:name MATCHES '(?P[a-z]+)']", + # same value across properties (home of person is the same as name) + "[person:name MATCHES '(?P[a-z]+)' AND person:city_ref.name MATCHES '(?P[a-z]+)']", + # same value within a property (first letter of name is the same as second letter) + "[person:name MATCHES '(?P[a-z])(?P=v3)']", ]) def test_observation_ops_nomatch(pattern): assert not match(pattern, _observations) From 51aa15fa79d6c5d4a221cb68cd91a66075c9ce42 Mon Sep 17 00:00:00 2001 From: Christian Doll Date: Mon, 17 Jun 2019 16:17:11 +0200 Subject: [PATCH 4/6] fixed type handling when gathering inter-observable values --- stix2matcher/matcher.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/stix2matcher/matcher.py b/stix2matcher/matcher.py index 8b73b75..8c88e29 100644 --- a/stix2matcher/matcher.py +++ b/stix2matcher/matcher.py @@ -1771,7 +1771,11 @@ def regex_pred(value): # store which matching group (defined in interobs pattern) matched on what string for each Observable for obs_id, container in obs_values.items(): - value = str(container[list(container.keys())[0]][0]) + value = container[list(container.keys())[0]][0] + if not isinstance(value, (str, unicode)): + continue + if isinstance(value, unicode): + value = value.encode('utf-8') for match in compiled_re.finditer(value): for group, val in match.groupdict().items(): grp_dict = self.interobs_group_matches.get(group, {}) From 9aab3a3926cfc86c3d1a68cbfab533eca1a4edb1 Mon Sep 17 00:00:00 2001 From: Christian Doll Date: Mon, 17 Jun 2019 16:21:52 +0200 Subject: [PATCH 5/6] compatibility checks for python version with unicode checks --- stix2matcher/matcher.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/stix2matcher/matcher.py b/stix2matcher/matcher.py index 8c88e29..9d8c333 100644 --- a/stix2matcher/matcher.py +++ b/stix2matcher/matcher.py @@ -1772,10 +1772,14 @@ def regex_pred(value): # store which matching group (defined in interobs pattern) matched on what string for each Observable for obs_id, container in obs_values.items(): value = container[list(container.keys())[0]][0] - if not isinstance(value, (str, unicode)): - continue - if isinstance(value, unicode): - value = value.encode('utf-8') + if sys.version_info < (3, 0, 0): + if not isinstance(value, (str, unicode)): + continue + if isinstance(value, unicode): + value = value.encode('utf-8') + else: + if not isinstance(value, str): + continue for match in compiled_re.finditer(value): for group, val in match.groupdict().items(): grp_dict = self.interobs_group_matches.get(group, {}) From 9becc8ee75417fcb0db798108313857596241641 Mon Sep 17 00:00:00 2001 From: Christian Doll Date: Mon, 17 Jun 2019 16:44:21 +0200 Subject: [PATCH 6/6] added comment for flake8 --- stix2matcher/matcher.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/stix2matcher/matcher.py b/stix2matcher/matcher.py index 9d8c333..947dd6c 100644 --- a/stix2matcher/matcher.py +++ b/stix2matcher/matcher.py @@ -1772,14 +1772,11 @@ def regex_pred(value): # store which matching group (defined in interobs pattern) matched on what string for each Observable for obs_id, container in obs_values.items(): value = container[list(container.keys())[0]][0] - if sys.version_info < (3, 0, 0): - if not isinstance(value, (str, unicode)): - continue - if isinstance(value, unicode): + if not isinstance(value, six.string_types): + continue + if sys.version_info.major < 3: + if isinstance(value, unicode): # noqa: F821 value = value.encode('utf-8') - else: - if not isinstance(value, str): - continue for match in compiled_re.finditer(value): for group, val in match.groupdict().items(): grp_dict = self.interobs_group_matches.get(group, {})