Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support to the VT service for uploading unknown samples #153

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
215 changes: 145 additions & 70 deletions virustotal_service/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import urllib2
import urlparse
import requests
import time

from hashlib import md5

Expand All @@ -27,9 +28,6 @@ class VirusTotalService(Service):
Check the VirusTotal database to see if it contains this sample, domain
or IP.

This does not submit the file to VirusTotal, but only performs a
lookup of the sample's MD5.

Requires an API key available from virustotal.com

TODO:
Expand All @@ -38,7 +36,7 @@ class VirusTotalService(Service):
"""

name = "virustotal_lookup"
version = '3.1.0'
version = '3.2.0'
supported_types = ['Sample', 'Domain', 'IP']
required_fields = []
description = "Look up a Sample, Domain or IP in VirusTotal"
Expand All @@ -49,6 +47,11 @@ def bind_runtime_form(analyst, config):
config['vt_add_pcap'] = False
if 'vt_add_domains' not in config:
config['vt_add_domains'] = False
if 'vt_upload_unknown_sample' not in config:
config['vt_upload_unknown_sample'] = False

config['vt_wait_for_processing'] = str(config['vt_wait_for_processing'][0])

return forms.VirusTotalRunForm(config)

@classmethod
Expand Down Expand Up @@ -119,19 +122,11 @@ def get_pcap(self, md5):
network_url = self.config.get('vt_network_url', '')
params = {'apikey': self.config.get('vt_api_key', ''), 'hash': md5}

if settings.HTTP_PROXY:
proxies = { 'http': settings.HTTP_PROXY,
'https': settings.HTTP_PROXY }
else:
proxies = {}

try:
response = requests.get(network_url, params=params, proxies=proxies)
response = requests.get(network_url, params=params, proxies=self.proxies)
except Exception as e:
key = self.config.get('vt_api_key', '')
error = str(e).replace(key, 'REDACTED')
logger.error("Virustotal: network connection error for PCAP (%s)" % error)
self._error("Network connection error checking virustotal for PCAP (%s)" % error)
logger.error("Virustotal: network connection error for PCAP (%s)" % e)
self._error("Network connection error checking virustotal for PCAP (%s)" % e)
return None

if response.headers['content-type'] == 'application/cap':
Expand All @@ -145,10 +140,6 @@ def get_pcap(self, md5):
return None

def run(self, obj, config):
# We assign config and obj to self because it is referenced often
# outside this script
# This is model after the guys who wrote the cuckoo script and all
# credit goes to them on this cool trick
self.config = config
self.obj = obj

Expand All @@ -159,52 +150,59 @@ def run(self, obj, config):
sample_url = config.get('vt_query_url', '')
domain_url = config.get('vt_domain_url', '')
ip_url = config.get('vt_ip_url', '')
upload_unknown_sample = config.get('vt_upload_unknown_sample', True)
wait_for_processing = int(config.get('vt_wait_for_processing', 5)) # time in m to wait before asking vt if the processing of a new sample is done
if not key:
self._error("No valid VT key found")
return

# Process parameters for a GET request for Sample, Domain, or IP adress
if obj._meta['crits_type'] == 'Sample':
if private_key:
parameters = {"resource": obj.md5, "apikey": key, 'allinfo': 1}
else:
parameters = {"resource": obj.md5, "apikey": key}
vt_data = urllib.urlencode(parameters)
req = urllib2.Request(sample_url, vt_data)
elif obj._meta['crits_type'] == 'Domain':
parameters = {'domain': obj.domain, 'apikey': key}
vt_data = urllib.urlencode(parameters)
req = urllib2.Request("%s?%s" % (domain_url, vt_data))
elif obj._meta['crits_type'] == 'IP':
parameters = {'ip': obj.ip, 'apikey': key}
vt_data = urllib.urlencode(parameters)
req = urllib2.Request("%s?%s" % (ip_url, vt_data))

# Execute GET request
# setup proxy as needed
self.proxies = {}
if settings.HTTP_PROXY:
proxy = urllib2.ProxyHandler({'https': settings.HTTP_PROXY})
opener = urllib2.build_opener(proxy)
urllib2.install_opener(opener)
try:
response = urllib2.urlopen(req)
json = response.read()
response_dict = simplejson.loads(json)
except Exception as e:
logger.error("Virustotal: network connection error (%s)" % e)
self._error("Network connection error checking virustotal (%s)" % e)
return
self.proxies = {"https": settings.HTTP_PROXY} # VT only uses https

# Check to see if a VT provided valid results. If not throw error and
# exit
if response_dict.get('response_code', 0) != 1:
self._error("Exiting because Virustotal provided a negative response code.")

response_dict = self._get_sample_data()
if not response_dict:
self._error("Getting sample data from VT failed!")
return

# VT response codes:
# 0 = unknown to vt
# 1 = known and ready
# -2 = queued for analysis
response_code = response_dict.get('response_code', 0)


if response_code != 1:

if response_code == -2: # sample is already queued for analysis - just wait a bit!
self._info("Waiting {} minutes for VT processing".format(wait_for_processing))
time.sleep(float(60*wait_for_processing))

elif response_code == 0 and upload_unknown_sample: # sample is unknown AND we want to upload it
if not self._upload_sample():
return # self._error is set by _upload_sample

self._info("Waiting {} minutes for VT processing".format(wait_for_processing))
time.sleep(float(60*wait_for_processing))
response_dict = self._get_sample_data()

else: # sample is unknown and we DON'T want to upload it
self._error("Exiting because Virustotal provided a negative response code.")
return

response_dict = self._get_sample_data()
if not response_dict or response_dict.get('response_code', 0) != 1:
self._error("Getting sample data from VT failed at last!")
return

# Process Results for Sample
if obj._meta['crits_type'] == 'Sample':
# If we are missing any hashes for this file, add them
self._process_hashes(response_dict)


# Process Public Key data and store scandate for later use
response = self._process_public_sample(response_dict)
if not response['success']:
Expand Down Expand Up @@ -252,6 +250,82 @@ def run(self, obj, config):
if not response['success']:
self._info(response['message'])

def _get_sample_data(self):
"""
Simple get sample data

Return: False on failure, dict with json response on success
"""

key = self.config.get('vt_api_key', '')

if self.obj._meta['crits_type'] == 'Sample':
if self.config.get('vt_api_key_private', False):
params = {"resource": self.obj.md5, "apikey": key, 'allinfo': 1}
else:
params = {"resource": self.obj.md5, "apikey": key}
url = "https://www.virustotal.com/vtapi/v2/file/report"

elif self.obj._meta['crits_type'] == 'Domain':
params = {'domain': self.obj.domain, 'apikey': key}
url = "https://www.virustotal.com/vtapi/v2/domain/report"

elif self.obj._meta['crits_type'] == 'IP':
params = {'ip': self.obj.ip, 'apikey': key}
url = "https://www.virustotal.com/vtapi/v2/ip-address/report"

else:
logger.error("Virustotal: Unsupported sample type for %s" % self.obj.md5)
return False


try:
response = requests.get(url, params=params, proxies=self.proxies)
response_dict = response.json()
except Exception as e:
logger.error("Virustotal: network connection error (%s)" % e)
return False

return response_dict


def _upload_sample(self):
"""
see function name...
(supports sample / domain)

Return: Bool
"""

key = self.config.get('vt_api_key', '')

if self.obj._meta['crits_type'] == 'Sample':
###
#TODO: If the sample is larger than 32mb we need to go a different route
###

params = {'apikey': key}
f = {'file': (self.obj.filename, self.obj.filedata.read())}
response = requests.post('https://www.virustotal.com/vtapi/v2/file/scan', files=f, params=params, proxies=self.proxies)

elif self.obj._meta['crits_type'] == 'Domain':
params = {'apikey': key, 'url':self.obj.domain}
response = requests.post('https://www.virustotal.com/vtapi/v2/url/scan', params=params, proxies=self.proxies)

else:
self._error("Exiting because Virustotal did not know the sample AND we currently cant upload this type of sample!")
return False


upload_response = response.json()

if upload_response.get('response_code', 0) != 1:
self._error("Exiting because Virustotal did not know the sample AND we couldn't upload it. Msg.: " + upload_response['verbose_msg'])
return False

return True


def _process_hashes(self, report):
"""
Process hash data from VirusTotal.
Expand All @@ -263,7 +337,7 @@ def _process_hashes(self, report):
"""

save = False
# SSDEEP stuff is not present in regular report

# if self.obj.ssdeep != report['ssdeep']:
# self.obj.ssdeep = report['ssdeep']
# save = True
Expand All @@ -276,7 +350,7 @@ def _process_hashes(self, report):

if save:
self.obj.save(username=self.current_task.username)
return
return None

def _process_public_sample(self, report):
"""
Expand Down Expand Up @@ -498,11 +572,13 @@ def _process_private_sample_behaviour(self, report, scandate):
behaviour_network_dns = behaviour_network_dict.get('dns', [])
if behaviour_network_dns:
for item in behaviour_network_dns:
# self._process_domain(item.get('hostname', ''), item.get('ip', ''), scandate)
# Add domain to CRITs
domain = item.get('hostname', None)
ip = item.get('ip', None)
self._add_result('VirusTotal Behaviour DNS', str(domain), {'IP_Address': str(ip)})
if domain and self.config.get('vt_add_domains', False):
# if domain and self.config.get('vt_add_domains', False)
if domain:
self._process_domain(domain, ip, scandate)
else:
status['message'].append("DNS behaviour information not included in VT response.")
Expand Down Expand Up @@ -551,7 +627,7 @@ def _process_private_sample_behaviour(self, report, scandate):
'method': item.get('method', ''),
'success': item.get('success', '')
}
self._add_result('VirusTotal Hooking Detected', item.get('type', ''), item_dict)
self._add_result('Virus Total Hooking Detected', item.get('type', ''), item_dict)
else:
status['message'].append("Hooking behaviour information not included in VT response.")

Expand Down Expand Up @@ -705,7 +781,7 @@ def _process_public_ip(self, report):
}
self._add_result('Resolutions', resolution.get('hostname', ''), stats)
else:
status['message'].append("Resolution information not included in VT response.")
status['message'].append("Resolution information not included in VT response.")

detected_communicating_samples = report.get('detected_communicating_samples', [])
if detected_communicating_samples:
Expand Down Expand Up @@ -778,27 +854,26 @@ def _process_pcap(self, pcap, scandate):
self._info("Adding PCAP and creating relationship to %s" % (str(self.obj.id)))
self._notify()
h = md5(pcap).hexdigest()
result = handle_pcap_file("%s.pcap" % h,
pcap,
self.obj.source,
user=self.current_task.username,
description='Created %s' % (scandate),
related_id=str(self.obj.id),
related_type="Sample",
method=self.name,
reference=None,
relationship='Related_To')
result = handle_pcap_file("%s.pcap" % h, # File Name
pcap, # Pcap data
self.obj.source, # Data Source
user=self.current_task.username, # User adding the PCAP
description='Created %s' % (scandate), # Description
related_id=str(self.obj.id), # Top level ID of related object
related_type="Sample", # Top level type of the related object
method=self.name, # Method for aquiring the PCAP
reference=None, # Reference to the source of this PCAP
relationship='Related_To') # Relationship between parent and the PCAP

self._add_result("pcap added", h, {'md5': h})

def _process_domain(self, domain, ip, scandate):
def _process_domain(self, domain, ip, scandate):
"""
Add domain to CRITs.

Args:
domain (str): pcap data
scandate (str): scan date from when domain is believed to be
collected.

TODO:
handle IP
"""
Expand Down
30 changes: 26 additions & 4 deletions virustotal_service/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ class VirusTotalConfigForm(forms.Form):
initial=False,
label='Private key?',
help_text="Is the key a private key?")
vt_add_domains = forms.BooleanField(required=False,
initial=False,
label='Add Domains?',
help_text="Should we always add domains?")
vt_add_pcap = forms.BooleanField(required=False,
initial=False,
label='Pull PCAP?',
help_text="If available, should we pull the pcap file?")
vt_api_key = forms.CharField(required=True,
label="API Key",
widget=forms.TextInput(),
Expand All @@ -28,10 +36,15 @@ class VirusTotalConfigForm(forms.Form):
label="Network URL",
widget=forms.TextInput(),
initial='https://www.virustotal.com/vtapi/v2/file/network-traffic')


def __init__(self, *args, **kwargs):
super(VirusTotalConfigForm, self).__init__(*args, **kwargs)
vt_upload_unknown_sample = forms.BooleanField(required=False,
initial=True,
label='Upload unknown samples to VT?',
help_text="If VT does not know a sample, should we upload it for analysis?")
vt_wait_for_processing = forms.CharField(required=False,
label="Wait for processing",
initial='5',
widget=forms.TextInput(),
help_text="How many minutes should we give VT to process newly uploaded samples?")

class VirusTotalRunForm(forms.Form):
error_css_class = 'error'
Expand All @@ -44,6 +57,15 @@ class VirusTotalRunForm(forms.Form):
initial=False,
label='Domain',
help_text="Add Domains")
vt_upload_unknown_sample = forms.BooleanField(required=False,
initial=False,
label='Upload unknown samples to VT?',
help_text="If VT does not know a sample, should we upload it for analysis?")
vt_wait_for_processing = forms.CharField(required=False,
label="Wait for processing",
initial='5',
widget=forms.TextInput(),
help_text="How many minutes should we give VT to process newly uploaded samples?")

def __init__(self, *args, **kwargs):
super(VirusTotalRunForm, self).__init__(*args, **kwargs)