Skip to content

Commit

Permalink
Fix crashes if SCO extensions isn't a dictionary
Browse files Browse the repository at this point in the history
  • Loading branch information
clenk committed May 22, 2020
1 parent 06d1d8e commit ca00b70
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 39 deletions.
5 changes: 5 additions & 0 deletions stix2validator/test/v20/observed_data_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,11 @@ def test_observable_object_extension_embedded_custom_properties_lax(self):
disabled='custom-prefix',
strict_properties=True)

def test_observable_object_extensions_string(self):
observed_data = copy.deepcopy(self.valid_observed_data)
observed_data['objects']['0']['extensions'] = 'example:Object-f39f745f-d36b-4dca-9a3e-16bb1c5516f0'
self.assertFalseWithOptions(observed_data)

def test_observable_object_property_reference(self):
observed_data = copy.deepcopy(self.valid_observed_data)
observed_data['objects']['2'] = {
Expand Down
91 changes: 91 additions & 0 deletions stix2validator/test/v21/ip_addr_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import copy
import json

from . import ValidatorTest
from ... import validate_string

VALID_IPV4 = u"""
{
"type": "ipv4-addr",
"spec_version": "2.1",
"id": "ipv4-addr--ff26c055-6336-5bc5-b98d-13d6226742dd",
"value": "198.51.100.3"
}
"""

VALID_IPV6 = u"""
{
"type": "ipv6-addr",
"spec_version": "2.1",
"id": "ipv6-addr--1e61d36c-a16c-53b7-a80f-2a00161c96b1",
"value": "2001:0db8:85a3:0000:0000:8a2e:0370:7334"
}
"""


class IPv4TestCases(ValidatorTest):
valid_ipv4 = json.loads(VALID_IPV4)

def test_wellformed_ipv4_addr(self):
results = validate_string(VALID_IPV4, self.options)
self.assertTrue(results.is_valid)

def test_valid_ipv4(self):
values = [
"192.168.0.1",
"192.168.0.1/0",
"192.168.0.1/5",
"192.168.0.1/32",
]
ipv4 = copy.deepcopy(self.valid_ipv4)
for value in values:
ipv4['value'] = value
self.assertTrueWithOptions(ipv4)

def test_invalid_ipv4(self):
values = [
"foo",
"999.999.999.999",
"192.168.0.1/",
"192.168.0.1/33",
"192.168.0.1/a",
"192.168.0.1/01",
]
ipv4 = copy.deepcopy(self.valid_ipv4)
for value in values:
ipv4['value'] = value
self.assertFalseWithOptions(ipv4)


class IPv6TestCases(ValidatorTest):
valid_ipv6 = json.loads(VALID_IPV6)

def test_wellformed_ipv6_addr(self):
results = validate_string(VALID_IPV6, self.options)
self.assertTrue(results.is_valid)

def test_valid_ipv6(self):
values = [
"fe80:0000:0000:0000:0204:61ff:fe9d:f156",
"fe80:0000:0000:0000:0204:61ff:fe9d:f156/0",
"fe80:0000:0000:0000:0204:61ff:fe9d:f156/6",
"fe80:0000:0000:0000:0204:61ff:fe9d:f156/128",
]
ipv6 = copy.deepcopy(self.valid_ipv6)
for value in values:
ipv6['value'] = value
self.assertTrueWithOptions(ipv6)

def test_invalid_ipv6(self):
values = [
"foo",
"fe80:0000:0000:0000:0204:61ff:fe9d:f156/",
"fe80:0000:0000:0000:0204:61ff:fe9d:f156/129",
"fe80:0000:0000:0000:0204:61ff:fe9d:f156/a",
"fe80:0000:0000:0000:0204:61ff:fe9d:f156/00",
"fe80:0000:0000:0000:0204:61ff:fe9d:f156/03",
]
ipv6 = copy.deepcopy(self.valid_ipv6)
for value in values:
ipv6['value'] = value
self.assertFalseWithOptions(ipv6)
5 changes: 5 additions & 0 deletions stix2validator/test/v21/observed_data_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,11 @@ def test_observable_object_extension_embedded_custom_properties_lax(self):
disabled='custom-prefix',
strict_properties=True)

def test_observable_object_extensions_string(self):
observed_data = copy.deepcopy(self.valid_object)
observed_data['extensions'] = 'example:Object-f39f745f-d36b-4dca-9a3e-16bb1c5516f0'
self.assertFalseWithOptions(observed_data)

def test_observable_object_reserved_property(self):
observed_data = copy.deepcopy(self.valid_object)
observed_data['type'] = 'action'
Expand Down
3 changes: 2 additions & 1 deletion stix2validator/v20/musts.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Mandatory (MUST) requirement checking functions
"""

import collections
import re

from cpe import CPE
Expand Down Expand Up @@ -53,7 +54,7 @@ def timestamp(instance):
% (obj['type'], tprop, obj[tprop], str(e)), instance['id'])
if obj['type'] in enums.TIMESTAMP_EMBEDDED_PROPERTIES:
for embed in enums.TIMESTAMP_EMBEDDED_PROPERTIES[obj['type']]:
if embed in obj:
if embed in obj and isinstance(obj[embed], collections.Mapping):
for tprop in enums.TIMESTAMP_EMBEDDED_PROPERTIES[obj['type']][embed]:
if embed == 'extensions':
for ext in obj[embed]:
Expand Down
38 changes: 19 additions & 19 deletions stix2validator/v20/shoulds.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
- add the check code and name to table
"""

from collections import Iterable
from collections import Iterable, Mapping
from itertools import chain
import re

Expand Down Expand Up @@ -348,7 +348,7 @@ def vocab_hash_algo(instance):

try:
ads = obj['extensions']['ntfs-ext']['alternate_data_streams']
except KeyError:
except (KeyError, TypeError):
pass
else:
for datastream in ads:
Expand All @@ -366,7 +366,7 @@ def vocab_hash_algo(instance):

try:
head_hashes = obj['extensions']['windows-pebinary-ext']['file_header_hashes']
except KeyError:
except (KeyError, TypeError):
pass
else:
for h in head_hashes:
Expand All @@ -380,7 +380,7 @@ def vocab_hash_algo(instance):

try:
hashes = obj['extensions']['windows-pebinary-ext']['optional_header']['hashes']
except KeyError:
except (KeyError, TypeError):
pass
else:
for h in hashes:
Expand All @@ -394,7 +394,7 @@ def vocab_hash_algo(instance):

try:
sections = obj['extensions']['windows-pebinary-ext']['sections']
except KeyError:
except (KeyError, TypeError):
pass
else:
for s in sections:
Expand Down Expand Up @@ -433,7 +433,7 @@ def vocab_encryption_algo(instance):
if 'type' in obj and obj['type'] == 'file':
try:
enc_algo = obj['encryption_algorithm']
except KeyError:
except (KeyError, TypeError):
continue
if enc_algo not in enums.ENCRYPTION_ALGO_OV:
yield JSONError("Object '%s' has an 'encryption_algorithm' of "
Expand All @@ -452,7 +452,7 @@ def vocab_windows_pebinary_type(instance):
if 'type' in obj and obj['type'] == 'file':
try:
pe_type = obj['extensions']['windows-pebinary-ext']['pe_type']
except KeyError:
except (KeyError, TypeError):
continue
if pe_type not in enums.WINDOWS_PEBINARY_TYPE_OV:
yield JSONError("Object '%s' has a Windows PE Binary File "
Expand All @@ -471,7 +471,7 @@ def vocab_account_type(instance):
if 'type' in obj and obj['type'] == 'user-account':
try:
acct_type = obj['account_type']
except KeyError:
except (KeyError, TypeError):
continue
if acct_type not in enums.ACCOUNT_TYPE_OV:
yield JSONError("Object '%s' is a User Account Object "
Expand Down Expand Up @@ -558,8 +558,8 @@ def custom_object_extension_prefix_strict(instance):
conventions.
"""
for key, obj in instance['objects'].items():
if not ('extensions' in obj and 'type' in obj and
obj['type'] in enums.OBSERVABLE_EXTENSIONS):
if not ('extensions' in obj and isinstance(obj['extensions'], Mapping)
and 'type' in obj and obj['type'] in enums.OBSERVABLE_EXTENSIONS):
continue
for ext_key in obj['extensions']:
if (ext_key not in enums.OBSERVABLE_EXTENSIONS[obj['type']] and
Expand Down Expand Up @@ -860,7 +860,7 @@ def http_request_headers(instance):
if 'type' in obj and obj['type'] == 'network-traffic':
try:
headers = obj['extensions']['http-request-ext']['request_header']
except KeyError:
except (KeyError, TypeError):
continue

for hdr in headers:
Expand All @@ -881,7 +881,7 @@ def socket_options(instance):
if 'type' in obj and obj['type'] == 'network-traffic':
try:
options = obj['extensions']['socket-ext']['options']
except KeyError:
except (KeyError, TypeError):
continue

for opt in options:
Expand All @@ -902,7 +902,7 @@ def pdf_doc_info(instance):
if 'type' in obj and obj['type'] == 'file':
try:
did = obj['extensions']['pdf-ext']['document_info_dict']
except KeyError:
except (KeyError, TypeError):
continue

for elem in did:
Expand All @@ -924,7 +924,7 @@ def windows_process_priority_format(instance):
if 'type' in obj and obj['type'] == 'process':
try:
priority = obj['extensions']['windows-process-ext']['priority']
except KeyError:
except (KeyError, TypeError):
continue
if not class_suffix_re.match(priority):
yield JSONError("The 'priority' property of object '%s' should"
Expand Down Expand Up @@ -955,7 +955,7 @@ def hash_length(instance):

try:
ads = obj['extensions']['ntfs-ext']['alternate_data_streams']
except KeyError:
except (KeyError, TypeError):
pass
else:
for datastream in ads:
Expand All @@ -972,7 +972,7 @@ def hash_length(instance):

try:
head_hashes = obj['extensions']['windows-pebinary-ext']['file_header_hashes']
except KeyError:
except (KeyError, TypeError):
pass
else:
for h in head_hashes:
Expand All @@ -985,7 +985,7 @@ def hash_length(instance):

try:
hashes = obj['extensions']['windows-pebinary-ext']['optional_header']['hashes']
except KeyError:
except (KeyError, TypeError):
pass
else:
for h in hashes:
Expand All @@ -998,7 +998,7 @@ def hash_length(instance):

try:
sections = obj['extensions']['windows-pebinary-ext']['sections']
except KeyError:
except (KeyError, TypeError):
pass
else:
for s in sections:
Expand All @@ -1015,7 +1015,7 @@ def hash_length(instance):
elif obj['type'] == 'artifact' or obj['type'] == 'x509-certificate':
try:
hashes = obj['hashes']
except KeyError:
except (KeyError, TypeError):
pass
else:
for h in hashes:
Expand Down
5 changes: 3 additions & 2 deletions stix2validator/v21/musts.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Mandatory (MUST) requirement checking functions
"""
import collections
import operator
import re
import uuid
Expand Down Expand Up @@ -59,7 +60,7 @@ def timestamp(instance):
% (obj['type'], tprop, obj[tprop], str(e)), instance['id'])
if obj['type'] in enums.TIMESTAMP_EMBEDDED_PROPERTIES:
for embed in enums.TIMESTAMP_EMBEDDED_PROPERTIES[obj['type']]:
if embed in obj:
if embed in obj and isinstance(obj[embed], collections.Mapping):
for tprop in enums.TIMESTAMP_EMBEDDED_PROPERTIES[obj['type']][embed]:
if embed == 'extensions':
for ext in obj[embed]:
Expand Down Expand Up @@ -89,7 +90,7 @@ def timestamp(instance):
% (instance['type'], tprop, instance[tprop], str(e)), instance['id'])
if instance['type'] in enums.TIMESTAMP_EMBEDDED_PROPERTIES:
for embed in enums.TIMESTAMP_EMBEDDED_PROPERTIES[instance['type']]:
if embed in instance:
if embed in instance and isinstance(instance[embed], collections.Mapping):
for tprop in enums.TIMESTAMP_EMBEDDED_PROPERTIES[instance['type']][embed]:
if embed == 'extensions':
for ext in instance[embed]:
Expand Down
Loading

0 comments on commit ca00b70

Please sign in to comment.