Skip to content

Commit

Permalink
Allow 2.0 objects in 2.1 bundles
Browse files Browse the repository at this point in the history
  • Loading branch information
clenk committed Jun 10, 2020
1 parent dce8443 commit 4cd14e6
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 54 deletions.
29 changes: 17 additions & 12 deletions stix2validator/test/v21/spec_version_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,27 @@ class SpecVersionTestCases(ValidatorTest):
valid_data = json.loads(VALID_BUNDLE)
internal_options = ValidationOptions()

def test_empty(self):
def test_none(self):
# Test spec_version not specified anywhere
# Fail: defaults to a version that requires spec_version on SDO
# Pass: treated as a 2.1 bundle with a 2.0 SDO
bundle = copy.deepcopy(self.valid_data)
results = validate_parsed_json(bundle, self.internal_options)
self.assertFalse(results.is_valid)
self.assertTrue(results.is_valid)

def test_cmd(self):
# Test spec_version specified only in cmdline option
# Fail in 2.0: spec_version is required on Bundle
# Fail in 2.1: spec_version is required on SDO
# Pass in 2.1: spec_version is required on SDO but it's treated as a 2.0 SDO
for version in VERSION_NUMBERS:
bundle = copy.deepcopy(self.valid_data)
self.internal_options.version = version
results = validate_parsed_json(
bundle,
self.internal_options)
self.assertFalse(results.is_valid)
if version == "2.0":
self.assertFalse(results.is_valid)
elif version == "2.1":
self.assertTrue(results.is_valid)

def test_bundle(self):
# Test spec_version specified only on bundle
Expand All @@ -57,9 +60,9 @@ def test_bundle(self):
self.assertTrue(results.is_valid)
elif version == "2.1":
# Warn: spec_version is custom on bundle,
# Error: spec_version is required on SDO
self.assertFalse(results.is_valid)
self.assertTrue(len(results.errors) > 0)
# Pass: spec_version is missing from SDO so treated as a 2.0 SDO
self.assertTrue(results.is_valid)
self.assertTrue(len(results.errors) == 0)
self.assertTrue(len(results.warnings) == 1)

def test_object(self):
Expand Down Expand Up @@ -117,14 +120,16 @@ def test_cmd_and_bundle(self):
self.assertTrue(len(results.warnings) == 1)

elif cmd_version == "2.1" and bundle_version == "2.0":
self.assertFalse(results.is_valid)
self.assertTrue(len(results.errors) == 2)
# Pass: Treat as 2.1 bundle with 2.0 SDO
self.assertTrue(results.is_valid)
self.assertTrue(len(results.errors) == 0)
self.assertTrue(len(results.warnings) == 2)

elif cmd_version == "2.1" and bundle_version == "2.1":
self.assertFalse(results.is_valid)
# Pass: identity obj has no spec version so treated as 2.0
self.assertTrue(results.is_valid)
self.assertTrue(len(results.warnings) == 1)
self.assertTrue(len(results.errors) == 2)
self.assertTrue(len(results.errors) == 0)

def test_cmd_and_obj(self):
# Test spec_version specified in cmdline option and on SDO
Expand Down
26 changes: 1 addition & 25 deletions stix2validator/util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import argparse
from argparse import RawDescriptionHelpFormatter
from collections import Iterable
from collections.abc import Iterable
import datetime
import errno
import os
Expand Down Expand Up @@ -478,30 +478,6 @@ def new_function(*args, **kwargs):
return inner_cyber_observable_check


def check_spec(instance, options):
""" Checks to see if there are differences in command-line option
provided spec_version and the spec_version found with bundles
and/or objects.
"""
warnings = []
if options.version:
try:
if instance['type'] == 'bundle' and 'spec_version' in instance:
if instance['spec_version'] != options.version:
warnings.append(instance['id'] + ": spec_version mismatch with supplied"
" option. Treating as {} content.".format(options.version))
if instance['type'] == 'bundle' and 'objects' in instance:
for obj in instance['objects']:
if 'spec_version' in obj:
if obj['spec_version'] != options.version:
warnings.append(obj['id'] + ": spec_version mismatch with supplied"
" option. Treating as {} content.".format(options.version))
except Exception:
pass

return warnings


def init_requests_cache(refresh_cache=False):
"""
Initializes a cache which the ``requests`` library will consult for
Expand Down
2 changes: 1 addition & 1 deletion stix2validator/v20/shoulds.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
- add the check code and name to table
"""

from collections import Iterable
from collections.abc import Iterable
from itertools import chain
import re

Expand Down
2 changes: 1 addition & 1 deletion stix2validator/v21/shoulds.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
- add the check code and name to table
"""

from collections import Iterable
from collections.abc import Iterable
from itertools import chain
import re
import uuid
Expand Down
87 changes: 72 additions & 15 deletions stix2validator/validator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Custom jsonschema.IValidator class and validator functions.
"""

from collections import Iterable
from collections.abc import Iterable
import io
from itertools import chain
import os
Expand All @@ -17,8 +17,8 @@
from . import output
from .errors import (NoJSONFileFoundError, SchemaError, SchemaInvalidError,
ValidationError, pretty_error)
from .util import (DEFAULT_VER, ValidationOptions, check_spec,
clear_requests_cache, init_requests_cache)
from .util import (DEFAULT_VER, ValidationOptions, clear_requests_cache,
init_requests_cache)
from .v20 import musts as musts20
from .v20 import shoulds as shoulds20
from .v21 import musts as musts21
Expand Down Expand Up @@ -651,6 +651,14 @@ def _get_error_generator(type, obj, schema_dir=None, version=DEFAULT_VER, defaul
"minProperties": 1
}
}
elif type == 'bundle':
# Validate against schemas for specific objects later
schema['properties']['objects'] = {
"objects": {
"type": "array",
"minItems": 1
}
}

# Don't use custom validator; only check schemas, no additional checks
validator = load_validator(schema_path, schema)
Expand Down Expand Up @@ -688,7 +696,7 @@ def _get_shoulds(options):
return shoulds21.list_shoulds(options)


def _schema_validate(sdo, options):
def _schema_validate(sdo, options, bundle_version=None):
"""Set up validation of a single STIX object against its type's schema.
This does no actual validation; it just returns generators which must be
iterated to trigger the actual generation.
Expand All @@ -698,6 +706,14 @@ def _schema_validate(sdo, options):
Do not call this function directly; use validate_instance() instead, as it
calls this one. This function does not perform any custom checks.
Args:
sdo: STIX object to validate.
options: ValidationOptions instance with validation options for this
validation run, including the STIX spec version.
bundle_version: STIX version of the bundle containing this object, or
None if the object is not inside a bundle or the bundle has no
spec_version property.
"""
error_gens = []

Expand All @@ -716,16 +732,34 @@ def _schema_validate(sdo, options):
else:
version = DEFAULT_VER

if bundle_version == '2.0':
version = bundle_version

# Allow 2.0 objects in 2.1+ bundles (2.1 SCOs don't have 'created')
_20_in_21_bundle = (bundle_version == '2.1' and 'spec_version' not in sdo and
'created' in sdo)
if _20_in_21_bundle:
version = '2.0'
output.info("%sno spec_version so treated as a 2.0 object in a 2.1 bundle."
% error_prefix)

options.set_check_codes(version)

core_schema = 'core'
# Check for custom 2.1+ SCO
if (version > '2.0' and all(p in sdo for p in ['type', 'id']) and
all(p not in sdo for p in ['created', 'modified']) and
not sdo['type'] == 'marking-definition'):
core_schema = 'cyber-observable-core'

# Get validator for built-in schema
base_sdo_errors = _get_error_generator(sdo['type'], sdo, version=version)
base_sdo_errors = _get_error_generator(sdo['type'], sdo, version=version, default=core_schema)
if base_sdo_errors:
error_gens.append((base_sdo_errors, error_prefix))

# Get validator for any user-supplied schema
if options.schema_dir:
custom_sdo_errors = _get_error_generator(sdo['type'], sdo, options.schema_dir)
custom_sdo_errors = _get_error_generator(sdo['type'], sdo, options.schema_dir, default=core_schema)
if custom_sdo_errors:
error_gens.append((custom_sdo_errors, error_prefix))

Expand Down Expand Up @@ -766,6 +800,35 @@ def _schema_validate(sdo, options):
return error_gens


def _schema_validate_bundle(instance, options):
errors = []
version = options.version
if version is None and 'spec_version' in instance:
version = instance['spec_version']

warnings = []
if version:
if 'spec_version' in instance:
if instance['spec_version'] != version:
warnings.append(instance['id'] + ": spec_version mismatch with supplied"
" option. Treating as {} content.".format(version))
if 'objects' in instance:
for obj in instance['objects']:
if 'spec_version' in obj:
if obj['spec_version'] != version:
warnings.append(obj['id'] + ": spec_version mismatch with supplied"
" option. Treating as {} content.".format(version))

bundle_version = instance.get('spec_version', '2.1')
# Validate each object in a bundle separately
for sdo in instance.get('objects', []):
if 'type' not in sdo:
raise ValidationError("Each object in bundle must have a 'type' property.")
errors += _schema_validate(sdo, options, bundle_version)

return errors, warnings


def validate_instance(instance, options=None):
"""Perform STIX JSON Schema validation against STIX input.
Expand All @@ -789,19 +852,13 @@ def validate_instance(instance, options=None):
options = ValidationOptions()

error_gens = []
spec_warnings = []

# Schema validation
error_gens += _schema_validate(instance, options)
if instance['type'] == 'bundle' and 'objects' in instance:
if options.version is None and 'spec_version' in instance:
options.version = instance['spec_version']
# Validate each object in a bundle separately
for sdo in instance['objects']:
if 'type' not in sdo:
raise ValidationError("Each object in bundle must have a 'type' property.")
error_gens += _schema_validate(sdo, options)

spec_warnings = check_spec(instance, options)
schema_errors, spec_warnings = _schema_validate_bundle(instance, options)
error_gens += schema_errors

# Custom validation
must_checks = _get_musts(options)
Expand Down

0 comments on commit 4cd14e6

Please sign in to comment.