Skip to content

Commit

Permalink
chore(iast): taint get and post http parameter name in django (#11945)
Browse files Browse the repository at this point in the history
  • Loading branch information
gnufede authored Jan 16, 2025
1 parent c46c302 commit 4183671
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 24 deletions.
4 changes: 2 additions & 2 deletions ddtrace/appsec/_iast/_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,6 @@ def _on_django_func_wrapped(fn_args, fn_kwargs, first_arg_expected_type, *_):
http_req = fn_args[0]

http_req.COOKIES = taint_structure(http_req.COOKIES, OriginType.COOKIE_NAME, OriginType.COOKIE)
http_req.GET = taint_structure(http_req.GET, OriginType.PARAMETER_NAME, OriginType.PARAMETER)
http_req.POST = taint_structure(http_req.POST, OriginType.BODY, OriginType.BODY)
if (
getattr(http_req, "_body", None) is not None
and len(getattr(http_req, "_body", None)) > 0
Expand Down Expand Up @@ -202,6 +200,8 @@ def _on_django_func_wrapped(fn_args, fn_kwargs, first_arg_expected_type, *_):
except AttributeError:
log.debug("IAST can't set attribute http_req.body", exc_info=True)

http_req.GET = taint_structure(http_req.GET, OriginType.PARAMETER_NAME, OriginType.PARAMETER)
http_req.POST = taint_structure(http_req.POST, OriginType.PARAMETER_NAME, OriginType.BODY)
http_req.headers = taint_structure(http_req.headers, OriginType.HEADER_NAME, OriginType.HEADER)
http_req.path = taint_pyobject(
http_req.path, source_name="path", source_value=http_req.path, source_origin=OriginType.PATH
Expand Down
81 changes: 59 additions & 22 deletions tests/contrib/django/django_app/appsec_urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,26 @@
from typing import Any # noqa:F401


if python_supported_by_iast():
with override_env({"DD_IAST_ENABLED": "True"}):
from ddtrace.appsec._iast._taint_tracking import OriginType
from ddtrace.appsec._iast._taint_tracking._taint_objects import is_pyobject_tainted
from ddtrace.appsec._iast.reporter import IastSpanReporter

def assert_origin(parameter, origin_type): # type: (Any, Any) -> None
assert is_pyobject_tainted(parameter)
sources, _ = IastSpanReporter.taint_ranges_as_evidence_info(parameter)
assert sources[0].origin == origin_type

else:

def assert_origin(pyobject, origin_type): # type: (Any) -> bool
return True

def is_pyobject_tainted(pyobject): # type: (Any) -> bool
return True


def include_view(request):
return HttpResponse(status=200)

Expand All @@ -52,6 +72,8 @@ def body_view(request):
return HttpResponse(data, status=200)
else:
data = request.POST
first_post_key = list(request.POST.keys())[0]
assert_origin(first_post_key, OriginType.PARAMETER_NAME)
return HttpResponse(str(dict(data)), status=200)


Expand Down Expand Up @@ -86,6 +108,24 @@ def sqli_http_request_parameter(request):
return HttpResponse(request.META["HTTP_USER_AGENT"], status=200)


def sqli_http_request_parameter_name_get(request):
obj = " 1"
with connection.cursor() as cursor:
# label iast_enabled_sqli_http_request_parameter_name_get
cursor.execute(add_aspect(list(request.GET.keys())[0], obj))

return HttpResponse(request.META["HTTP_USER_AGENT"], status=200)


def sqli_http_request_parameter_name_post(request):
obj = " 1"
with connection.cursor() as cursor:
# label iast_enabled_sqli_http_request_parameter_name_post
cursor.execute(add_aspect(list(request.POST.keys())[0], obj))

return HttpResponse(request.META["HTTP_USER_AGENT"], status=200)


def sqli_http_request_header_name(request):
key = [x for x in request.META.keys() if x == "master"][0]

Expand Down Expand Up @@ -119,35 +159,21 @@ def sqli_http_path_parameter(request, q_http_path_parameter):


def taint_checking_enabled_view(request):
if python_supported_by_iast():
with override_env({"DD_IAST_ENABLED": "True"}):
from ddtrace.appsec._iast._taint_tracking import OriginType
from ddtrace.appsec._iast._taint_tracking._taint_objects import is_pyobject_tainted
from ddtrace.appsec._iast.reporter import IastSpanReporter

def assert_origin_path(path): # type: (Any) -> None
assert is_pyobject_tainted(path)
sources, tainted_ranges_to_dict = IastSpanReporter.taint_ranges_as_evidence_info(path)
assert sources[0].origin == OriginType.PATH

else:

def assert_origin_path(pyobject): # type: (Any) -> bool
return True

def is_pyobject_tainted(pyobject): # type: (Any) -> bool
return True

# TODO: Taint request body
# assert is_pyobject_tainted(request.body)
first_get_key = list(request.GET.keys())[0]
assert is_pyobject_tainted(request.GET["q"])
assert is_pyobject_tainted(first_get_key)
assert is_pyobject_tainted(request.META["QUERY_STRING"])
assert is_pyobject_tainted(request.META["HTTP_USER_AGENT"])
# TODO: Taint request headers
# assert is_pyobject_tainted(request.headers["User-Agent"])
assert_origin_path(request.path_info)
assert_origin_path(request.path)
assert_origin_path(request.META["PATH_INFO"])
assert_origin(request.path_info, OriginType.PATH)
assert_origin(request.path, OriginType.PATH)
assert_origin(request.META["PATH_INFO"], OriginType.PATH)
assert_origin(request.GET["q"], OriginType.PARAMETER)
assert_origin(first_get_key, OriginType.PARAMETER_NAME)

return HttpResponse(request.META["HTTP_USER_AGENT"], status=200)


Expand All @@ -162,6 +188,7 @@ def is_pyobject_tainted(pyobject): # type: (Any) -> bool

assert not is_pyobject_tainted(request.body)
assert not is_pyobject_tainted(request.GET["q"])
assert not is_pyobject_tainted(list(request.GET.keys())[0])
assert not is_pyobject_tainted(request.META["QUERY_STRING"])
assert not is_pyobject_tainted(request.META["HTTP_USER_AGENT"])
assert not is_pyobject_tainted(request.headers["User-Agent"])
Expand Down Expand Up @@ -297,6 +324,16 @@ def validate_querydict(request):
handler("taint-checking-enabled/$", taint_checking_enabled_view, name="taint_checking_enabled_view"),
handler("taint-checking-disabled/$", taint_checking_disabled_view, name="taint_checking_disabled_view"),
handler("sqli_http_request_parameter/$", sqli_http_request_parameter, name="sqli_http_request_parameter"),
handler(
"sqli_http_request_parameter_name_get/$",
sqli_http_request_parameter_name_get,
name="sqli_http_request_parameter_name_get",
),
handler(
"sqli_http_request_parameter_name_post/$",
sqli_http_request_parameter_name_post,
name="sqli_http_request_parameter_name_post",
),
handler("sqli_http_request_header_name/$", sqli_http_request_header_name, name="sqli_http_request_header_name"),
handler("sqli_http_request_header_value/$", sqli_http_request_header_value, name="sqli_http_request_header_value"),
handler("sqli_http_request_cookie_name/$", sqli_http_request_cookie_name, name="sqli_http_request_cookie_name"),
Expand Down
99 changes: 99 additions & 0 deletions tests/contrib/django/test_django_appsec_iast.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,105 @@ def test_django_tainted_user_agent_iast_enabled_sqli_http_request_parameter(clie
assert loaded["vulnerabilities"][0]["hash"] == hash_value


@pytest.mark.django_db()
@pytest.mark.skipif(not python_supported_by_iast(), reason="Python version not supported by IAST")
def test_django_tainted_user_agent_iast_enabled_sqli_http_request_parameter_name_get(client, test_spans, tracer):
with override_global_config(dict(_iast_enabled=True, _deduplication_enabled=False, _iast_request_sampling=100.0)):
root_span, response = _aux_appsec_get_root_span(
client,
test_spans,
tracer,
content_type="application/x-www-form-urlencoded",
url="/appsec/sqli_http_request_parameter_name_get/?SELECT=unused",
headers={"HTTP_USER_AGENT": "test/1.2.3"},
)

vuln_type = "SQL_INJECTION"

assert response.status_code == 200
assert response.content == b"test/1.2.3"

loaded = json.loads(root_span.get_tag(IAST.JSON))

line, hash_value = get_line_and_hash(
"iast_enabled_sqli_http_request_parameter_name_get", vuln_type, filename=TEST_FILE
)

assert loaded["sources"] == [
{
"name": "SELECT",
"origin": "http.request.parameter.name",
"value": "SELECT",
}
]

assert loaded["vulnerabilities"][0]["type"] == vuln_type
assert loaded["vulnerabilities"][0]["evidence"] == {
"valueParts": [
{"source": 0, "value": "SELECT"},
{
"value": " ",
},
{
"redacted": True,
},
]
}
assert loaded["vulnerabilities"][0]["location"]["path"] == TEST_FILE
assert loaded["vulnerabilities"][0]["location"]["line"] == line
assert loaded["vulnerabilities"][0]["hash"] == hash_value


@pytest.mark.django_db()
@pytest.mark.skipif(not python_supported_by_iast(), reason="Python version not supported by IAST")
def test_django_tainted_user_agent_iast_enabled_sqli_http_request_parameter_name_post(client, test_spans, tracer):
with override_global_config(dict(_iast_enabled=True, _deduplication_enabled=False, _iast_request_sampling=100.0)):
root_span, response = _aux_appsec_get_root_span(
client,
test_spans,
tracer,
payload=urlencode({"SELECT": "unused"}),
content_type="application/x-www-form-urlencoded",
url="/appsec/sqli_http_request_parameter_name_post/",
headers={"HTTP_USER_AGENT": "test/1.2.3"},
)

vuln_type = "SQL_INJECTION"

assert response.status_code == 200
assert response.content == b"test/1.2.3"

loaded = json.loads(root_span.get_tag(IAST.JSON))

line, hash_value = get_line_and_hash(
"iast_enabled_sqli_http_request_parameter_name_post", vuln_type, filename=TEST_FILE
)

assert loaded["sources"] == [
{
"name": "SELECT",
"origin": "http.request.parameter.name",
"value": "SELECT",
}
]

assert loaded["vulnerabilities"][0]["type"] == vuln_type
assert loaded["vulnerabilities"][0]["evidence"] == {
"valueParts": [
{"source": 0, "value": "SELECT"},
{
"value": " ",
},
{
"redacted": True,
},
]
}
assert loaded["vulnerabilities"][0]["location"]["path"] == TEST_FILE
assert loaded["vulnerabilities"][0]["location"]["line"] == line
assert loaded["vulnerabilities"][0]["hash"] == hash_value


@pytest.mark.django_db()
@pytest.mark.skipif(not python_supported_by_iast(), reason="Python version not supported by IAST")
def test_django_tainted_user_agent_iast_enabled_sqli_http_request_header_value(client, test_spans, tracer):
Expand Down

0 comments on commit 4183671

Please sign in to comment.