diff --git a/stix2matcher/matcher.py b/stix2matcher/matcher.py index 99de0a0..947dd6c 100644 --- a/stix2matcher/matcher.py +++ b/stix2matcher/matcher.py @@ -3,6 +3,7 @@ import argparse import base64 import binascii +import copy import datetime import io import itertools @@ -959,6 +960,10 @@ def __init__(self, observed_data_sdos, verbose=False): # Holds intermediate results self.__compute_stack = [] + # Holds each value observed for a named capture group within regexes. + # Used to check for inter-observable patterns. + self.interobs_group_matches = {} + def __push(self, val, label=None): """Utility for pushing a value onto the compute stack. In verbose mode, show what's being pushed. 'label' lets you prefix @@ -1764,6 +1769,22 @@ def regex_pred(value): passed_obs = _obs_map_prop_test(obs_values, regex_pred) + # store which matching group (defined in interobs pattern) matched on what string for each Observable + for obs_id, container in obs_values.items(): + value = container[list(container.keys())[0]][0] + if not isinstance(value, six.string_types): + continue + if sys.version_info.major < 3: + if isinstance(value, unicode): # noqa: F821 + value = value.encode('utf-8') + for match in compiled_re.finditer(value): + for group, val in match.groupdict().items(): + grp_dict = self.interobs_group_matches.get(group, {}) + obs_list = grp_dict.get(obs_id, []) + obs_list.append(val) if val not in obs_list else None + grp_dict[obs_id] = obs_list + self.interobs_group_matches[group] = grp_dict + self.__push(passed_obs, debug_label) def exitPropTestIsSubset(self, ctx): @@ -2128,12 +2149,69 @@ def match(self, observed_data_sdos, verbose=False): found_bindings = matcher.matched() if found_bindings: - matching_sdos = matcher.get_sdos_from_binding(found_bindings[0]) + valid_bindings = self.validate_inter_observable_patterns(matcher, found_bindings) + if valid_bindings: + matching_sdos = valid_bindings[0] + else: + matching_sdos = [] else: matching_sdos = [] return matching_sdos + @staticmethod + def validate_inter_observable_patterns(matcher, found_bindings): + """ + remove elements from found_bindings, if they violate the inter-observable patterns + this will remove elements if: + - within the same observable multiple values are found for the same matching group + - across observables different values are found for the same matching group + """ + def is_valid_inter_obserbable(found_binding, interobs_group_matches): + if len(found_binding) == 1: + # only one SDO available; if multiple values are observed for one inter-observable, + # the pattern does not evaluate to True. + for _, sdo_ids in interobs_group_matches.items(): + for _, values in sdo_ids.items(): + if len(values) > 1: + return False + else: + # multiple SDOs, check if binding fulfils inter-observable patterns + check = {} + for io_grp, sdo_ids in interobs_group_matches.items(): + for _, values in sdo_ids.items(): + grp = check.get(io_grp, None) + if grp is None: + # first iteration, take all values + grp = values + else: + # check intersection of values; if none remain, pattern does not hold + grp = list(set(grp) & set(values)) + if not grp: + return False + check[io_grp] = grp + return True + + if not matcher.interobs_group_matches: + return [matcher.get_sdos_from_binding(sdo) for sdo in found_bindings] + else: + for found_binding in found_bindings[:]: + inter_obs = copy.deepcopy(matcher.interobs_group_matches) + + # prepare dictionary - only use relevant sdos for the current binding + for io_grp, sdo_ids in list(inter_obs.items()): + for sdo_id, _ in list(sdo_ids.items()): + if sdo_id not in found_binding: + del sdo_ids[sdo_id] + if not sdo_ids: + del inter_obs[io_grp] + + if not is_valid_inter_obserbable(found_binding, inter_obs): + found_bindings.remove(found_binding) + if found_bindings: + return [matcher.get_sdos_from_binding(sdo) for sdo in found_bindings] + return [] + def match(pattern, observed_data_sdos, verbose=False): """ diff --git a/stix2matcher/test/test_inter_observable_expr.py b/stix2matcher/test/test_inter_observable_expr.py new file mode 100644 index 0000000..6f25b54 --- /dev/null +++ b/stix2matcher/test/test_inter_observable_expr.py @@ -0,0 +1,72 @@ +import pytest +from stix2matcher.matcher import match + +_observations = [ + { + "type": "observed-data", + "first_observed": "2004-10-11T21:44:58Z", + "last_observed": "2004-10-11T21:44:58Z", + "number_observed": 1, + "objects": { + "0": { + "type": u"person", + "name": u"alice", + "place": u"earth" + } + } + }, + { + "type": "observed-data", + "first_observed": "2008-05-09T01:21:58.6Z", + "last_observed": "2008-05-09T01:21:58.6Z", + "number_observed": 1, + "objects": { + "0": { + "type": u"person", + "name": u"malice", + "place": u"moontown" + } + } + }, + { + "type": "observed-data", + "first_observed": "2006-11-03T07:42:18.96Z", + "last_observed": "2006-11-03T07:42:18.96Z", + "number_observed": 1, + "objects": { + "0": { + "type": u"person", + "name": u"bob", + "city_ref": u"1" + }, + "1": { + "type": u"city", + "name": u"bobtown" + } + } + } +] + + +@pytest.mark.parametrize("pattern", [ + # same value across observables (name of person B is the same as person A, but with a leading 'm') + "[person:name MATCHES '(?P[a-z]+)'] AND [person:name MATCHES 'm(?P[a-z]+)']", + # same value across properties (home of person is its name plus 'town'-suffix) + "[person:name MATCHES '(?P[a-z]+)' AND person:city_ref.name MATCHES '(?P[a-z]+)town']", + # same value within a property (first letter of name is the same as third letter) + "[person:name MATCHES '(?P[a-z]).(?P=v3)']", +]) +def test_observation_ops_match(pattern): + assert match(pattern, _observations) + + +@pytest.mark.parametrize("pattern", [ + # same value across observables (two persons with the same name) + "[person:name MATCHES '(?P[a-z]+)'] AND [person:name MATCHES '(?P[a-z]+)']", + # same value across properties (home of person is the same as name) + "[person:name MATCHES '(?P[a-z]+)' AND person:city_ref.name MATCHES '(?P[a-z]+)']", + # same value within a property (first letter of name is the same as second letter) + "[person:name MATCHES '(?P[a-z])(?P=v3)']", +]) +def test_observation_ops_nomatch(pattern): + assert not match(pattern, _observations)