From 72fb095b9705bc88da15441ca531b1e47fa83aad Mon Sep 17 00:00:00 2001 From: nusantara-self <15647296+nusantara-self@users.noreply.github.com> Date: Tue, 24 Dec 2024 16:39:16 +0100 Subject: [PATCH 1/3] Add ValidateObservable analyzer --- .../ValidateObservable.json | 18 + .../ValidateObservable/ValidateObservable.py | 384 ++++++++++++++++++ analyzers/ValidateObservable/requirements.txt | 3 + 3 files changed, 405 insertions(+) create mode 100644 analyzers/ValidateObservable/ValidateObservable.json create mode 100755 analyzers/ValidateObservable/ValidateObservable.py create mode 100644 analyzers/ValidateObservable/requirements.txt diff --git a/analyzers/ValidateObservable/ValidateObservable.json b/analyzers/ValidateObservable/ValidateObservable.json new file mode 100644 index 000000000..c4ef5a08e --- /dev/null +++ b/analyzers/ValidateObservable/ValidateObservable.json @@ -0,0 +1,18 @@ +{ + "name": "ValidateObservable", + "version": "1.0", + "author": "nusantara-self, StrangeBee", + "url": "https://github.com/TheHive-Project/Cortex-Analyzers", + "license": "AGPL-V3", + "description": "Use regexes and libraries to indicate if observable is valid", + "dataTypeList": ["ip", "domain", "url", "fqdn", "mail", "hash", "filename", "uri_path", "user-agent"], + "baseConfig": "ValidateObservable", + "command": "ValidateObservable/ValidateObservable.py", + "config": { + "service": "validateObservable" + }, + "configurationItems": [], + "registration_required": false, + "subscription_required": false, + "free_subscription": false +} diff --git a/analyzers/ValidateObservable/ValidateObservable.py b/analyzers/ValidateObservable/ValidateObservable.py new file mode 100755 index 000000000..929c5ae3d --- /dev/null +++ b/analyzers/ValidateObservable/ValidateObservable.py @@ -0,0 +1,384 @@ +#!/usr/bin/env python3 +# encoding: utf-8 + +from cortexutils.analyzer import Analyzer +import re +import ipaddress +from validators import url as validate_url_lib +from urllib.parse import urlparse +import idna + +class ValidateObservable(Analyzer): + def __init__(self): + Analyzer.__init__(self) + + def run(self): + # Validate based on data type + if self.data_type == 'ip': + result = self.validate_ip(self.get_data()) + elif self.data_type == 'domain': + result = self.validate_domain(self.get_data()) + elif self.data_type == 'url': + result = self.validate_url(self.get_data()) + elif self.data_type == 'fqdn': + result = self.validate_fqdn(self.get_data()) + elif self.data_type == 'mail': + result = self.validate_email(self.get_data()) + elif self.data_type == 'hash': + result = self.validate_hash(self.get_data()) + elif self.data_type == 'filename': + result = self.validate_filename(self.get_data()) + elif self.data_type == 'uri_path': + result = self.validate_uri_path(self.get_data()) + elif self.data_type == 'user-agent': + result = self.validate_user_agent(self.get_data()) + else: + self.error(f"Unsupported data type: {self.data_type}") + + self.report(result) + + def contains_bidi_override(self, value): + bidi_override_chars = ["\u202E", "\u202D", "\u200E", "\u200F", "\u2066", "\u2067"] + for char in bidi_override_chars: + if char in value: + return f"Contains Unicode bidirectional override character U+{ord(char):04X}" + return None + + def validate_ip(self, ip): + try: + + if "/" in ip: # CIDR range + ipaddress.ip_network(ip, strict=False) + return { + "status": "valid", + "type": "IP range", + "value": ip + } + else: # Single IP + ip_obj = ipaddress.ip_address(ip) + if ip_obj.is_loopback: + return { + "status": "valid", + "type": "IP address", + "value": ip, + "note": "Loopback IP address" + } + elif ip_obj.is_private: + return { + "status": "valid", + "type": "IP address", + "value": ip, + "note": "Private IP address" + } + elif ip_obj.is_reserved: + return { + "status": "valid", + "type": "IP address", + "value": ip, + "note": "Reserved IP address" + } + else: + return { + "status": "valid", + "type": "IP address", + "value": ip + } + except ValueError: + return { + "status": "invalid", + "type": "IP address", + "value": ip + } + + def validate_domain(self, domain): + try: + # Convert non-ASCII domains to Punycode + punycode_domain = idna.encode(domain).decode() + + # Check for Punycode (IDN) and unusual characters + if domain.startswith("xn--"): + return { + "status": "suspicious", + "type": "Domain", + "value": domain, + "reason": "Domain uses Punycode, which may indicate an internationalized domain name (IDN)" + } + + # Validate the domain structure + domain_regex = r'^(?!-)([A-Za-z0-9-]{1,63}(? 255: + return { + "status": "invalid", + "type": "Domain", + "value": domain, + "reason": "Exceeds maximum length of 255 characters" + } + if re.match(domain_regex, punycode_domain): + if re.search(r"[^a-zA-Z0-9.-]", domain): + return { + "status": "suspicious", + "type": "Domain", + "value": domain, + "reason": "Domain is valid but contains IDN or unusual characters" + } + return { + "status": "valid", + "type": "Domain", + "value": domain + } + else: + return { + "status": "invalid", + "type": "Domain", + "value": domain + } + except idna.IDNAError: + return { + "status": "invalid", + "type": "Domain", + "value": domain, + "reason": "Invalid internationalized domain name" + } + + + + def validate_url(self, url): + bidi_check = self.contains_bidi_override(url) + if bidi_check: + return { + "status": "suspicious", + "type": "URL", + "value": url, + "reason": bidi_check + } + + parsed = urlparse(url) + if not parsed.scheme and not parsed.netloc: + # Validate as a domain if scheme and netloc are missing + return self.validate_domain(url) + + if all([parsed.scheme, parsed.netloc]): + if parsed.netloc.startswith("xn--"): + return { + "status": "suspicious", + "type": "URL", + "value": url, + "reason": "URL contains a Punycode domain, which may indicate an internationalized domain name (IDN)" + } + + if re.search(r"[^a-zA-Z0-9:/?&=._-]", url): + return { + "status": "suspicious", + "type": "URL", + "value": url, + "reason": "Contains unusual characters" + } + return { + "status": "valid", + "type": "URL", + "value": url + } + return { + "status": "invalid", + "type": "URL", + "value": url, + "reason": "Malformed or missing scheme/netloc" + } + + def validate_fqdn(self, fqdn): + fqdn_regex = ( + r'^(?!-)([A-Za-z0-9-]{1,63}(? 255: + return { + "status": "invalid", + "type": "FQDN", + "value": fqdn, + "reason": "Exceeds maximum length of 255 characters" + } + if fqdn.startswith("xn--"): + return { + "status": "suspicious", + "type": "FQDN", + "value": fqdn, + "reason": "FQDN uses Punycode, which may indicate an internationalized domain name (IDN)" + } + if re.match(fqdn_regex, fqdn): + if re.search(r"[^a-zA-Z0-9.-]", fqdn): + return { + "status": "suspicious", + "type": "FQDN", + "value": fqdn, + "reason": "Contains unusual characters" + } + return { + "status": "valid", + "type": "FQDN", + "value": fqdn + } + else: + return { + "status": "invalid", + "type": "FQDN", + "value": fqdn + } + + def validate_email(self, email): + bidi_check = self.contains_bidi_override(email) + if bidi_check: + return { + "status": "suspicious", + "type": "Email", + "value": email, + "reason": bidi_check + } + + email_regex = ( + r'^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' + ) + if len(email) > 254: + return { + "status": "invalid", + "type": "Email", + "value": email, + "reason": "Exceeds maximum length of 254 characters" + } + if re.match(email_regex, email): + if re.search(r"[^a-zA-Z0-9@._%+-]", email): + return { + "status": "suspicious", + "type": "Email", + "value": email, + "reason": "Contains unusual characters" + } + return { + "status": "valid", + "type": "Email", + "value": email + } + else: + return { + "status": "invalid", + "type": "Email", + "value": email + } + + def validate_hash(self, hash_value): + hash_regex = { + "MD5": r"^[a-fA-F0-9]{32}$", + "SHA1": r"^[a-fA-F0-9]{40}$", + "SHA256": r"^[a-fA-F0-9]{64}$", + "SHA512": r"^[a-fA-F0-9]{128}$" + } + for hash_type, regex in hash_regex.items(): + if re.match(regex, hash_value): + return { + "status": "valid", + "type": f"{hash_type} Hash", + "value": hash_value + } + return { + "status": "invalid", + "type": "Hash", + "value": hash_value, + "reason": "Does not match known hash formats (supported types: MD5, SHA1, SHA256, SHA512)" + } + + def validate_filename(self, filename): + bidi_check = self.contains_bidi_override(filename) + if bidi_check: + return { + "status": "suspicious", + "type": "Filename", + "value": filename, + "reason": bidi_check + } + + invalid_chars = r"[<>:\"/\\|?*]" + if len(filename) > 255: + return { + "status": "invalid", + "type": "Filename", + "value": filename, + "reason": "Exceeds maximum length of 255 characters" + } + if re.search(invalid_chars, filename): + return { + "status": "invalid", + "type": "Filename", + "value": filename, + "reason": "Contains invalid characters" + } + if re.search(r"\.\w{2,4}(\.\w{2,4})", filename): + return { + "status": "suspicious", + "type": "Filename", + "value": filename, + "reason": "Contains multiple extensions that may confuse users" + } + return { + "status": "valid", + "type": "Filename", + "value": filename + } + + + def validate_uri_path(self, uri_path): + parsed = urlparse(uri_path) + if parsed.path and parsed.path.startswith("/"): + return { + "status": "valid", + "type": "URI Path", + "value": uri_path + } + return { + "status": "invalid", + "type": "URI Path", + "value": uri_path, + "reason": "Does not start with '/' or is malformed" + } + + def validate_user_agent(self, user_agent): + if len(user_agent) > 512: + return { + "status": "invalid", + "type": "User-Agent", + "value": user_agent, + "reason": "Exceeds maximum length of 512 characters" + } + if re.search(r"[\x00-\x1F\x7F]", user_agent): + return { + "status": "invalid", + "type": "User-Agent", + "value": user_agent, + "reason": "Contains control characters" + } + return { + "status": "valid", + "type": "User-Agent", + "value": user_agent + } + + def summary(self, raw): + taxonomies = [] + namespace = "ValidateObs" + predicate = self.data_type + + # Determine level based on status + status = raw.get("status") + if status == "valid": + level = "info" + elif status == "suspicious": + level = "suspicious" + else: + level = "suspicious" + + # Build taxonomy based on validation result + taxonomies.append( + self.build_taxonomy( + level, namespace, predicate, status) + ) + return {"taxonomies": taxonomies} + +if __name__ == "__main__": + ValidateObservable().run() diff --git a/analyzers/ValidateObservable/requirements.txt b/analyzers/ValidateObservable/requirements.txt new file mode 100644 index 000000000..dd8e4afa9 --- /dev/null +++ b/analyzers/ValidateObservable/requirements.txt @@ -0,0 +1,3 @@ +cortexutils +validators +idna \ No newline at end of file From 7c19108edac38c16e3c128ba97e950b4d8b475a0 Mon Sep 17 00:00:00 2001 From: nusantara-self <15647296+nusantara-self@users.noreply.github.com> Date: Tue, 24 Dec 2024 16:39:44 +0100 Subject: [PATCH 2/3] Add README.md --- analyzers/ValidateObservable/README.md | 70 ++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 analyzers/ValidateObservable/README.md diff --git a/analyzers/ValidateObservable/README.md b/analyzers/ValidateObservable/README.md new file mode 100644 index 000000000..7eccc3ac0 --- /dev/null +++ b/analyzers/ValidateObservable/README.md @@ -0,0 +1,70 @@ +The **ValidateObservable** analyzer is designed to validate multiple observable datatypes. + +* _ip_ +* _domain_ +* _url_ +* _fqdn_ +* _mail_ +* _hash_ +* _filename_ +* _uri_path_ +* _user-agent_ + +## Supported Data Types / Features +1. **IP Addresses** + +- Validates individual IPs and CIDR ranges. +- Flags reserved, private, and loopback IPs with appropriate notes. + +2. **Domains** + +- Detects valid domain names. +- Flags domains using Punycode (e.g., xn--) as suspicious. +- Identifies unusual characters in domain names. + +3. **URLs** + +- Validates URLs with or without schemes. +- Flags URLs containing Punycode domains or unusual characters as suspicious. +- Detects malformed URLs. + +4. **Fully Qualified Domain Names (FQDNs)** + +- Validates FQDNs for proper structure and length. +- Flags FQDNs using Punycode and unusual characters as suspicious. + +5. **Emails** + +- Checks email structure for validity. +- Detects unusual characters in email addresses. +- Validates against length constraints. + +6. **File Hashes** + +- Validates MD5, SHA1, SHA256, and SHA512 hash formats. + +7. **Filenames** + +- Flags invalid characters in filenames (<, >, :, |, etc.). +- Detects multiple extensions (for example, .txt.exe) as suspicious. +- Identifies Unicode bidirectional override characters (U+202E, etc.) to prevent obfuscated extensions. + +8. **URI Paths** + +- Ensures paths start with / and are well-formed. + +9. **User Agents** + +- Checks for excessive length and control characters. + +## Special Features + +- **Unicode Detection**: + - Identifies Unicode bidirectional override characters (for example, U+202E) across domains, URLs, emails, filenames, and more. + - Flags their usage as suspicious to prevent obfuscation attacks. +- **Punycode Detection**: + - Flags internationalized domain names (IDNs) using xn-- prefix or uncommon characters. +- **Structured Output**: + - Returns valid, invalid, or suspicious statuses with detailed reasons. +- **Short reports**: + - Generates short reports to indicate the validation status and risk level : info (blue) or invalid / suspicious (orange). \ No newline at end of file From 073dff4b98f639a7595b9cfd2777911d4c519606 Mon Sep 17 00:00:00 2001 From: nusantara-self <15647296+nusantara-self@users.noreply.github.com> Date: Tue, 24 Dec 2024 16:44:11 +0100 Subject: [PATCH 3/3] Add analyzer template for ValidateObservable --- .../ValidateObservable_1_0/long.html | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 thehive-templates/ValidateObservable_1_0/long.html diff --git a/thehive-templates/ValidateObservable_1_0/long.html b/thehive-templates/ValidateObservable_1_0/long.html new file mode 100644 index 000000000..49671cab2 --- /dev/null +++ b/thehive-templates/ValidateObservable_1_0/long.html @@ -0,0 +1,55 @@ + +
+
+ ValidateObservable Result +
+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Type{{ content["type"] }}
Value{{ content["value"] }}
Status + + {{ content["status"] | uppercase }} + +
Reason{{ content["reason"] }}
Note{{ content["note"] }}
+
+
+ + +
+
+ No Result +
+
+

No output is available for the "ValidateObservable" analyzer.

+
+
\ No newline at end of file