Skip to content

Commit

Permalink
Feature/send ingested alerts (#102)
Browse files Browse the repository at this point in the history
* Added Alert object

* Added url to alert

* Added tests for the new alert object

* Remove AlertDfinitionMapping as TypedDict is not supported on older versions of python

* Made Alert object more similar to BaseAnalysis

* Add changes

* Added a function to fetch the scans for a given alert, and create corresponding objects for them

* Assuming that alert id exists in send_alert

* Add wait for Alert.from_id, Fix PR comments

* Removed interval from object, renamed self.raw_alert to self._report

* Version bump
  • Loading branch information
daniel-oronsi authored Jul 11, 2023
1 parent 0718959 commit d99e9f0
Show file tree
Hide file tree
Showing 9 changed files with 387 additions and 18 deletions.
4 changes: 4 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
1.17.5
______
- Add `Alert` object, which allows sending / querying for alerts.

1.17.4
______
- Add verdict property to `UrlAnalysis` and `EndpointAnalysis`
Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,13 @@ for analyse in history_results:
print(analyse)
```

### Get alert by id
```python
alert = Alert.from_id(alert_id=alert_id,
fetch_scans=False,
wait=False)
```

## Code examples
You can find more code examples under [analyze-python-sdk/examples/](https://github.com/intezer/analyze-python-sdk/tree/master/examples) directory

18 changes: 18 additions & 0 deletions examples/get_alert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import sys
from pprint import pprint

from intezer_sdk import api
from intezer_sdk.alerts import Alert


def get_alert_by_id(alert_id: str):
api.set_global_api('<api_key>')

alert = Alert.from_id(alert_id=alert_id,
fetch_scans=False,
wait=False)
pprint(alert)


if __name__ == '__main__':
get_alert_by_id(*sys.argv[1:])
2 changes: 1 addition & 1 deletion intezer_sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.17.4'
__version__ = '1.18'
37 changes: 35 additions & 2 deletions intezer_sdk/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,30 @@ def create_endpoint_scan(self, scanner_info: dict) -> Dict[str, str]:
raise_for_status(response)
return response.json()['result']

def send_alert(self,
alert: dict,
definition_mapping: dict,
**additional_parameters) -> str:
"""
Send an alert for further investigation.
:param alert: The alert to send.
:param definition_mapping: The definition mapping that is used to extract relevant information from the alert.
:param additional_parameters: Additional parameters to pass to the API.
:raises: :class:`requests.HTTPError` if the request failed for any reason.
:return: The alert id of the submitted alert.
"""
self.assert_any_on_premise()
response = self.api.request_with_refresh_expired_access_token(method='POST',
path='/alerts/ingest',
data=dict(alert=alert,
definition_mapping=definition_mapping,
**additional_parameters))
raise_for_status(response, statuses_to_ignore=[HTTPStatus.BAD_REQUEST])
self._assert_alert_response_status_code(response)
return response.json()['alert_id']

def get_iocs(self, analyses_id: str) -> Optional[dict]:
"""
Get the IOCs of an analysis.
Expand Down Expand Up @@ -585,10 +609,12 @@ def get_alerts_by_alert_ids(self, alert_ids: List[str], environments: List[str]
:param environments: The environments to get the alerts from.
:return: The alerts' data.
"""
data = dict(alert_ids=alert_ids)
if environments:
data['environments'] = environments
response = self.api.request_with_refresh_expired_access_token(method='GET',
path='/alerts/search',
data=dict(alert_ids=alert_ids,
environments=environments))
data=data)
raise_for_status(response)
data_response = response.json()

Expand Down Expand Up @@ -678,6 +704,13 @@ def _assert_index_response_status_code(response: Response):
elif response.status_code != HTTPStatus.CREATED:
raise errors.ServerError(f'Error in response status code:{response.status_code}', response)

@staticmethod
def _assert_alert_response_status_code(response: Response):
if response.status_code == HTTPStatus.BAD_REQUEST:
raise errors.InvalidAlertMappingError(response)
elif response.status_code != HTTPStatus.OK:
raise errors.ServerError(f'Error in response status code:{response.status_code}', response)

@staticmethod
def _get_analysis_id_from_response(response: Response):
return response.json()['result_url'].split('/')[2]
Expand Down
235 changes: 235 additions & 0 deletions intezer_sdk/alerts.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,22 @@
import time
import requests
import datetime
from typing import Dict
from typing import List
from typing import Tuple
from typing import Union
from typing import Type
from typing import Optional

from intezer_sdk._api import IntezerApi
from intezer_sdk.analysis import FileAnalysis
from intezer_sdk.analysis import UrlAnalysis
from intezer_sdk.endpoint_analysis import EndpointAnalysis
from intezer_sdk.consts import AlertStatusCode
from intezer_sdk._api import IntezerApiClient
from intezer_sdk.api import get_global_api
from intezer_sdk import errors
from intezer_sdk import consts


def get_alerts_by_alert_ids(alert_ids: List[str],
Expand All @@ -20,3 +33,225 @@ def get_alerts_by_alert_ids(alert_ids: List[str],
api = IntezerApi(api or get_global_api())
result = api.get_alerts_by_alert_ids(alert_ids, environments)
return result['alerts_count'], result['alerts']


class Alert:
"""
The Alert class is used to represent an alert from the Intezer Analyze API.
:ivar alert_id: The alert id.
:vartype alert_id: str
:ivar _report: The raw alert data.
:vartype source: str
:ivar verdict: The verdict of the alert.
:vartype verdict: str
:ivar family_name: The family name of the alert.
:vartype family_name: str
:ivar sender: The sender of the alert.
:vartype sender: str
:ivar intezer_alert_url: URL for the alert in Intezer's website.
:vartype intezer_alert_url: str
:ivar scans: Relevant scans for the alert.
:vartype scans: list
"""
def __init__(self, alert_id: str, api: IntezerApiClient = None):
"""
Create a new Alert instance with the given alert id.
Please note that this does not query the Intezer Analyze API for the alert data, but rather creates an Alert
instance with the given alert id.
:param alert_id: The alert id.
:param api: The API connection to Intezer.
"""
self.alert_id: str = alert_id
self._intezer_api_client = api
self._api = IntezerApi(api or get_global_api())
self._report: Optional[Dict] = None
self.source: Optional[str] = None
self.verdict: Optional[str] = None
self.family_name: Optional[str] = None
self.sender: Optional[str] = None
self.intezer_alert_url: Optional[str] = None
self.status: Optional[AlertStatusCode] = None
self.scans: List[Union[UrlAnalysis, FileAnalysis, EndpointAnalysis]] = []

def check_status(self) -> AlertStatusCode:
"""
Refresh the alert data from the Intezer Analyze API - overrides current data (if exists) with the new data.
:return: The updated status of the alert.
"""
try:
result = self._api.get_alerts_by_alert_ids(alert_ids=[self.alert_id])
except requests.HTTPError:
self.status = AlertStatusCode.NOT_FOUND
raise errors.AlertNotFoundError(self.alert_id)

if result.get('alerts_count', 0) != 1:
self.status = AlertStatusCode.NOT_FOUND
raise errors.AlertNotFoundError(self.alert_id)
alert = result['alerts'][0]
self._report = alert
if not alert.get('triage_result'):
self.status = AlertStatusCode.IN_PROGRESS
return self.status

self.source = alert.get('source')
self.verdict = alert.get('triage_result', {}).get('alert_verdict')
self.family_name = alert.get('triage_result', {}).get('family_name')
self.sender = alert.get('sender')
self.intezer_alert_url = alert.get('intezer_alert_url')
self.status = AlertStatusCode.FINISHED
return self.status

def is_running(self) -> bool:
return self.status not in (AlertStatusCode.FINISHED, AlertStatusCode.NOT_FOUND)

def result(self) -> dict:
"""
Get the raw alert result, as received from Intezer Analyze API.
:raises intezer_sdk.errors.AlertNotFound: If the alert was not found.
:raises intezer_sdk.errors.AlertInProgressError: If the alert is in progress
:return: The raw alert dictionary.
"""
if self.status == AlertStatusCode.NOT_FOUND:
raise errors.AlertNotFoundError(self.alert_id)
if self.status == AlertStatusCode.IN_PROGRESS:
raise errors.AlertInProgressError(self.alert_id)
return self._report

@classmethod
def from_id(cls,
alert_id: str,
api: IntezerApiClient = None,
fetch_scans: bool = False,
wait: bool = False,
timeout: Optional[int] = None,
):
"""
Create a new Alert instance, and fetch the alert data from the Intezer Analyze API.
:param alert_id: The alert id.
:param api: The API connection to Intezer.
:param fetch_scans: Whether to fetch the scans for the alert - this could take some time.
:param wait: Wait for the alert to finish processing before returning.
:param timeout: The timeout for the wait operation.
:raises intezer_sdk.errors.AlertNotFound: If the alert was not found.
:raises intezer_sdk.errors.AlertInProgressError: If the alert is still being processed.
:return: The Alert instance, with the updated alert data.
"""
new_alert = cls(alert_id=alert_id, api=api)
status = new_alert.check_status()
if status == AlertStatusCode.IN_PROGRESS:
raise errors.AlertInProgressError(alert_id)
if fetch_scans:
new_alert.fetch_scans()
if wait:
new_alert.wait_for_completion(timeout=timeout)
return new_alert

@classmethod
def send(cls,
raw_alert: dict,
alert_mapping: dict,
source: str,
api: IntezerApiClient = None,
environment: Optional[str] = None,
display_fields: Optional[List[str]] = None,
default_verdict: Optional[str] = None,
alert_sender: Optional[str] = None,
wait: bool = False,
timeout: Optional[int] = None,
):
"""
Send an alert for further investigation using the Intezer Analyze API.
:param raw_alert: The raw alert data.
:param alert_mapping: The alert mapping - defines how to map the raw alert to get relevant information.
:param source: The source of the alert.
:param api: The API connection to Intezer.
:param environment: The environment of the alert.
:param display_fields: Fields from raw alert to display in the alert's webpage.
:param default_verdict: The default verdict to send the alert with.
:param alert_sender: The sender of the alert.
:param wait: Wait for the alert to finish processing before returning.
:param timeout: The timeout for the wait operation.
:raises: :class:`requests.HTTPError` if the request failed for any reason.
:return: The Alert instance, initialized with the alert id. when the `wait` parameter is set to True, the
resulting alert object will be initialized with the alert triage data.
"""
_api = IntezerApi(api or get_global_api())
send_alert_params = dict(
alert=raw_alert,
definition_mapping=alert_mapping,
alert_source=source,
environment=environment,
display_fields=display_fields,
default_verdict=default_verdict,
alert_sender=alert_sender
)
send_alert_params = {key: value for key, value in send_alert_params.items() if value is not None}
alert_id = _api.send_alert(**send_alert_params)

alert = cls(alert_id=alert_id, api=api)
if wait:
alert.wait_for_completion(timeout=timeout)
return alert

def wait_for_completion(self,
interval: int = None,
sleep_before_first_check=False,
timeout: Optional[datetime.timedelta] = None):
"""
Blocks until the alert is finished processing, or until the timeout is reached.
:param interval: The interval to wait between checks in seconds.
:param sleep_before_first_check: Whether to sleep before the first status check.
:param timeout: Maximum duration to wait for analysis completion in seconds.
:raises intezer_sdk.errors.AlertNotFoundError: If the alert was not found.
:raise TimeoutError: If the timeout was reached.
"""
start_time = datetime.datetime.utcnow()
if not interval:
interval = consts.CHECK_STATUS_INTERVAL

if self.is_running:
if sleep_before_first_check:
time.sleep(interval)
status_code: AlertStatusCode = self.check_status()

while status_code != AlertStatusCode.FINISHED:
timeout_passed = timeout and datetime.datetime.utcnow() - start_time > timeout
if timeout_passed:
raise TimeoutError()
time.sleep(interval)
status_code = self.check_status()

def fetch_scans(self):
"""
Fetch the scans of the alert.
"""
if self.status == AlertStatusCode.NOT_FOUND:
raise errors.AlertNotFoundError(self.alert_id)
elif self.status == AlertStatusCode.IN_PROGRESS:
raise errors.AlertInProgressError(self.alert_id)

def _fetch_scan(scan_: dict,
scan_key: str,
scan_object: Union[Type[FileAnalysis], Type[EndpointAnalysis], Type[UrlAnalysis]]):
current_analysis_id = scan_.get(scan_key, {}).get('analysis_id')
if current_analysis_id:
self.scans.append(scan_object.from_analysis_id(analysis_id=current_analysis_id,
api=self._intezer_api_client))

self.scans = []
for scan in self._report.get('scans', []):
scan_type = scan.get('scan_type')
if scan_type == 'file':
_fetch_scan(scan, 'file_analysis', FileAnalysis)
elif scan_type == 'endpoint':
_fetch_scan(scan, 'endpoint_analysis', EndpointAnalysis)
elif scan_type == 'url':
_fetch_scan(scan, 'url_analysis', UrlAnalysis)
6 changes: 6 additions & 0 deletions intezer_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ class AnalysisStatusCode(enum.Enum):
FINISHED = 'finished'


class AlertStatusCode(AutoName):
IN_PROGRESS = enum.auto()
NOT_FOUND = enum.auto()
FINISHED = enum.auto()


class EndpointAnalysisEndReason(enum.Enum):
DONE = 'done'
INTERRUPTED = 'interrupted'
Expand Down
19 changes: 19 additions & 0 deletions intezer_sdk/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,25 @@ def __init__(self, response: requests.Response):
super().__init__('Account does not have permission to this route', response)


class AlertError(IntezerError):
pass


class InvalidAlertMappingError(AlertError):
def __init__(self, response: requests.Response):
super().__init__('Bad request - the mapping is probably malformed', response)


class AlertInProgressError(AlertError):
def __init__(self, alert_id: str):
super().__init__(f'The alert {alert_id} is being processed at the moment, please try again later')


class AlertNotFoundError(AlertError):
def __init__(self, alert_id: str):
super().__init__(f'The given alert does not exist - {alert_id}')


class UrlOfflineError(ServerError):
def __init__(self, response: requests.Response):
super().__init__('Url is offline', response)
Loading

0 comments on commit d99e9f0

Please sign in to comment.