Skip to content

Commit

Permalink
Merge pull request #1117 from X0x1RG9f/fix1
Browse files Browse the repository at this point in the history
[FIX] HybridAnalysis API V2
  • Loading branch information
nusantara-self authored Oct 18, 2024
2 parents a548b13 + 25ad921 commit 8401628
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 32 deletions.
9 changes: 1 addition & 8 deletions analyzers/HybridAnalysis/HybridAnalysis_GetReport.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,11 @@
"author": "Daniil Yugoslavskiy, Tieto",
"url": "https://github.com/TheHive-Project/Cortex-Analyzers",
"license": "AGPL-V3",
"dataTypeList": ["hash", "file", "filename"],
"dataTypeList": ["hash", "file", "filename", "url", "domain"],
"description": "Fetch Hybrid Analysis reports associated with hashes and filenames.",
"command": "HybridAnalysis/HybridAnalysis_analyzer.py",
"baseConfig": "HybridAnalysis",
"configurationItems": [
{
"name": "secret",
"description": "HybridAnalysis secret",
"type": "string",
"multi": false,
"required": true
},
{
"name": "key",
"description": "API key",
Expand Down
95 changes: 71 additions & 24 deletions analyzers/HybridAnalysis/HybridAnalysis_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import hashlib
import requests
import time
from datetime import datetime

from requests.auth import HTTPBasicAuth
from cortexutils.analyzer import Analyzer
Expand All @@ -22,11 +23,10 @@
class VxStreamSandboxAnalyzer(Analyzer):
def __init__(self):
Analyzer.__init__(self)
self.basic_url = 'https://www.hybrid-analysis.com/api/'
self.headers = {'User-Agent': 'VxStream'}
self.basic_url = 'https://www.hybrid-analysis.com/api/v2/'

self.secret = self.get_param('config.secret', None, 'VxStream Sandbox secret key is missing')
self.api_key = self.get_param('config.key', None, 'VxStream Sandbox API key is missing')
self.headers = {'User-Agent': 'VxStream', 'api-key': self.api_key, 'accept': 'application/json', 'Content-Type': 'application/x-www-form-urlencoded'}

def summary(self, raw_report):
taxonomies = []
Expand All @@ -35,20 +35,52 @@ def summary(self, raw_report):
level = "info"
namespace = "HybridAnalysis"
predicate = "Threat level"
value = "No verdict"
value = "Unknown"

verdicts = {
"no specific threat": 0,
"whitelisted": 1,
"suspicious": 2,
"malicious": 3,
}

# define json keys to loop
if self.data_type in ['hash', 'file']:
minireports = raw_report.get('results').get('response')
elif self.data_type in ['filename']:
minireports = raw_report.get('results').get('response').get('result')
minireports = raw_report["results"]
elif self.data_type in ['filename', 'url', 'domain']:
minireports = raw_report["results"]["result"]

if len(minireports) != 0:
# get first report with not Null verdict
# Previous solution was looping through the report and take the first one that was not empty
# Better solution, loop throught all the last verdicts (less than an hour from last one) and take the worst verdict
# In some cases, HA returns a verdict with "No specific threat" but the one just before (few seconds) from the same scan and different tool was tagued malicious
last_verdict_time = None
last_verdict = None
highest_threat_score = None

for minireport in minireports:
if minireport.get('verdict') is not None:
report_verdict = minireport.get('verdict')
break
if minireport["verdict"] is not None:
if last_verdict_time is None:
last_verdict_time = int(datetime.timestamp(datetime.strptime(minireport["analysis_start_time"][:10] + minireport["analysis_start_time"][11:19], "%Y-%m-%d%H:%M:%S")))
last_verdict = minireport["verdict"]
# Set the initial threat score
highest_threat_score = minireport.get("threat_score")
else:
new_verdict_time = int(datetime.timestamp(datetime.strptime(minireport["analysis_start_time"][:10] + minireport["analysis_start_time"][11:19], "%Y-%m-%d%H:%M:%S")))
if abs(last_verdict_time - new_verdict_time) <= 3600:
last_verdict_time = new_verdict_time
try:
if verdicts[minireport["verdict"]] > verdicts[last_verdict]:
last_verdict = minireport["verdict"]
except KeyError:
continue
# Update the highest threat score if the current one is greater
current_threat_score = minireport.get("threat_score")
if current_threat_score is not None:
if highest_threat_score is None or current_threat_score > highest_threat_score:
highest_threat_score = current_threat_score

report_verdict = last_verdict

# create shield badge for short.html
if report_verdict == 'malicious':
Expand All @@ -63,6 +95,11 @@ def summary(self, raw_report):
elif report_verdict == 'no specific threat':
level = 'info'
value = "No Specific Threat"

# Add the highest threat score if available
if highest_threat_score is not None:
value = f"{value} (Threat Score: {highest_threat_score})"

else:
level = 'info'
value = "Unknown"
Expand All @@ -71,39 +108,48 @@ def summary(self, raw_report):
return {"taxonomies": taxonomies}

def run(self):

try:
if self.data_type == 'hash':
query_url = 'scan/'
query_data = self.get_param('data', None, 'Hash is missing')
query_url = 'search/hash'
query_data = {'hash': self.get_param('data', None, 'Hash is missing')}

elif self.data_type == 'file':
query_url = 'scan/'
query_url = 'search/hash'
hashes = self.get_param('attachment.hashes', None)

if hashes is None:
filepath = self.get_param('file', None, 'File is missing')
query_data = hashlib.sha256(open(filepath, 'rb').read()).hexdigest()
query_data = {'hash': hashlib.sha256(open(filepath, 'rb').read()).hexdigest()}
else:
# find SHA256 hash
query_data = next(h for h in hashes if len(h) == 64)
query_data = {'hash': next(h for h in hashes if len(h) == 64)}

elif self.data_type == 'filename':
query_url = 'search?query=filename:'
query_data = '"{}"'.format(self.get_param('data', None, 'Filename is missing'))
query_url = 'search/terms'
query_data = {'filename': self.get_param('data', None, 'Filename is missing')}

elif self.data_type == 'url':
query_url = 'search/terms'
query_data = {'url': self.get_param('data', None, 'URL is missing')}

elif self.data_type == 'domain':
query_url = 'search/terms'
query_data = {'domain': self.get_param('data', None, 'Domain is missing')}

else:
self.notSupported()

url = str(self.basic_url) + str(query_url) + str(query_data)
url = str(self.basic_url) + str(query_url)

error = True
while error:
r = requests.get(url, headers=self.headers, auth=HTTPBasicAuth(self.api_key, self.secret), verify=True)
if "error" in r.json().get('response'):
if "Exceeded maximum API requests per minute(5)" in r.json().get('response').get('error'):
r = requests.post(url, headers=self.headers, data=query_data, verify=True)

if "validation_errors" in r.json():
if "Exceeded maximum API requests per minute(5)" in r.json()["validation_errors"][0]["errors"]:
time.sleep(60)
else:
self.error(r.json().get('response').get('error'))
self.error(r.json()["validation_errors"][0]["errors"][0])
else:
error = False

Expand All @@ -115,3 +161,4 @@ def run(self):

if __name__ == '__main__':
VxStreamSandboxAnalyzer().run()

0 comments on commit 8401628

Please sign in to comment.