Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Matching re-occuring value within patterns #58

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 79 additions & 1 deletion stix2matcher/matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import argparse
import base64
import binascii
import copy
import datetime
import io
import itertools
Expand Down Expand Up @@ -959,6 +960,10 @@ def __init__(self, observed_data_sdos, verbose=False):
# Holds intermediate results
self.__compute_stack = []

# Holds each value observed for a named capture group within regexes.
# Used to check for inter-observable patterns.
self.interobs_group_matches = {}

def __push(self, val, label=None):
"""Utility for pushing a value onto the compute stack.
In verbose mode, show what's being pushed. 'label' lets you prefix
Expand Down Expand Up @@ -1764,6 +1769,22 @@ def regex_pred(value):

passed_obs = _obs_map_prop_test(obs_values, regex_pred)

# store which matching group (defined in interobs pattern) matched on what string for each Observable
for obs_id, container in obs_values.items():
value = container[list(container.keys())[0]][0]
if not isinstance(value, six.string_types):
continue
if sys.version_info.major < 3:
if isinstance(value, unicode): # noqa: F821
value = value.encode('utf-8')
for match in compiled_re.finditer(value):
for group, val in match.groupdict().items():
grp_dict = self.interobs_group_matches.get(group, {})
obs_list = grp_dict.get(obs_id, [])
obs_list.append(val) if val not in obs_list else None
grp_dict[obs_id] = obs_list
self.interobs_group_matches[group] = grp_dict

self.__push(passed_obs, debug_label)

def exitPropTestIsSubset(self, ctx):
Expand Down Expand Up @@ -2128,12 +2149,69 @@ def match(self, observed_data_sdos, verbose=False):

found_bindings = matcher.matched()
if found_bindings:
matching_sdos = matcher.get_sdos_from_binding(found_bindings[0])
valid_bindings = self.validate_inter_observable_patterns(matcher, found_bindings)
if valid_bindings:
matching_sdos = valid_bindings[0]
else:
matching_sdos = []
else:
matching_sdos = []

return matching_sdos

@staticmethod
def validate_inter_observable_patterns(matcher, found_bindings):
"""
remove elements from found_bindings, if they violate the inter-observable patterns
this will remove elements if:
- within the same observable multiple values are found for the same matching group
- across observables different values are found for the same matching group
"""
def is_valid_inter_obserbable(found_binding, interobs_group_matches):
if len(found_binding) == 1:
# only one SDO available; if multiple values are observed for one inter-observable,
# the pattern does not evaluate to True.
for _, sdo_ids in interobs_group_matches.items():
for _, values in sdo_ids.items():
if len(values) > 1:
return False
else:
# multiple SDOs, check if binding fulfils inter-observable patterns
check = {}
for io_grp, sdo_ids in interobs_group_matches.items():
for _, values in sdo_ids.items():
grp = check.get(io_grp, None)
if grp is None:
# first iteration, take all values
grp = values
else:
# check intersection of values; if none remain, pattern does not hold
grp = list(set(grp) & set(values))
if not grp:
return False
check[io_grp] = grp
return True

if not matcher.interobs_group_matches:
return [matcher.get_sdos_from_binding(sdo) for sdo in found_bindings]
else:
for found_binding in found_bindings[:]:
inter_obs = copy.deepcopy(matcher.interobs_group_matches)

# prepare dictionary - only use relevant sdos for the current binding
for io_grp, sdo_ids in list(inter_obs.items()):
for sdo_id, _ in list(sdo_ids.items()):
if sdo_id not in found_binding:
del sdo_ids[sdo_id]
if not sdo_ids:
del inter_obs[io_grp]

if not is_valid_inter_obserbable(found_binding, inter_obs):
found_bindings.remove(found_binding)
if found_bindings:
return [matcher.get_sdos_from_binding(sdo) for sdo in found_bindings]
return []


def match(pattern, observed_data_sdos, verbose=False):
"""
Expand Down
72 changes: 72 additions & 0 deletions stix2matcher/test/test_inter_observable_expr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import pytest
from stix2matcher.matcher import match

_observations = [
{
"type": "observed-data",
"first_observed": "2004-10-11T21:44:58Z",
"last_observed": "2004-10-11T21:44:58Z",
"number_observed": 1,
"objects": {
"0": {
"type": u"person",
"name": u"alice",
"place": u"earth"
}
}
},
{
"type": "observed-data",
"first_observed": "2008-05-09T01:21:58.6Z",
"last_observed": "2008-05-09T01:21:58.6Z",
"number_observed": 1,
"objects": {
"0": {
"type": u"person",
"name": u"malice",
"place": u"moontown"
}
}
},
{
"type": "observed-data",
"first_observed": "2006-11-03T07:42:18.96Z",
"last_observed": "2006-11-03T07:42:18.96Z",
"number_observed": 1,
"objects": {
"0": {
"type": u"person",
"name": u"bob",
"city_ref": u"1"
},
"1": {
"type": u"city",
"name": u"bobtown"
}
}
}
]


@pytest.mark.parametrize("pattern", [
# same value across observables (name of person B is the same as person A, but with a leading 'm')
"[person:name MATCHES '(?P<v1>[a-z]+)'] AND [person:name MATCHES 'm(?P<v1>[a-z]+)']",
# same value across properties (home of person is its name plus 'town'-suffix)
"[person:name MATCHES '(?P<v2>[a-z]+)' AND person:city_ref.name MATCHES '(?P<v2>[a-z]+)town']",
# same value within a property (first letter of name is the same as third letter)
"[person:name MATCHES '(?P<v3>[a-z]).(?P=v3)']",
])
def test_observation_ops_match(pattern):
assert match(pattern, _observations)


@pytest.mark.parametrize("pattern", [
# same value across observables (two persons with the same name)
"[person:name MATCHES '(?P<v1>[a-z]+)'] AND [person:name MATCHES '(?P<v1>[a-z]+)']",
# same value across properties (home of person is the same as name)
"[person:name MATCHES '(?P<v2>[a-z]+)' AND person:city_ref.name MATCHES '(?P<v2>[a-z]+)']",
# same value within a property (first letter of name is the same as second letter)
"[person:name MATCHES '(?P<v3>[a-z])(?P=v3)']",
])
def test_observation_ops_nomatch(pattern):
assert not match(pattern, _observations)