Skip to content

Commit

Permalink
Split httpx code from basic data manipulation routines
Browse files Browse the repository at this point in the history
  • Loading branch information
nanomad committed Sep 27, 2024
1 parent 6a9b1e4 commit 5e9657d
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 89 deletions.
6 changes: 3 additions & 3 deletions src/saic_ismart_client_ng/net/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from saic_ismart_client_ng.listener import SaicApiListener
from saic_ismart_client_ng.model import SaicApiConfiguration
from saic_ismart_client_ng.net.security import decrypt_response, encrypt_request
from saic_ismart_client_ng.net.httpx import decrypt_httpx_response, encrypt_httpx_request


class AbstractSaicClient(ABC):
Expand All @@ -24,7 +24,7 @@ def __init__(
self.__client = httpx.AsyncClient(
event_hooks={
"request": [self.invoke_request_listener, self.encrypt_request],
"response": [decrypt_response, self.invoke_response_listener]
"response": [decrypt_httpx_response, self.invoke_response_listener]
}
)

Expand Down Expand Up @@ -84,7 +84,7 @@ async def invoke_response_listener(self, response: httpx.Response):
self.__logger.warning(f"Error invoking request listener: {e}", exc_info=e)

async def encrypt_request(self, modified_request: httpx.Request):
return await encrypt_request(
return await encrypt_httpx_request(
modified_request=modified_request,
request_timestamp=datetime.now(),
base_uri=self.configuration.base_uri,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@
import hmac
import logging
from datetime import datetime
from typing import Optional

from httpx import Response, Request

from saic_ismart_client_ng.crypto_utils import md5_hex_digest, encrypt_aes_cbc_pkcs5_padding, \
decrypt_aes_cbc_pkcs5_padding
from saic_ismart_client_ng.net.utils import update_request_with_content, normalize_content_type
from saic_ismart_client_ng.net.utils import normalize_content_type

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -50,26 +47,28 @@ def get_app_verification_string(
return ""


async def encrypt_request(
def encrypt_request(
*,
modified_request: Request,
original_request_url: str,
original_request_headers: dict,
original_request_content: str,
request_timestamp: datetime,
base_uri: str,
region: str,
tenant_id: str,
user_token: str = "",
class_name: str = "",
):
original_request_url = modified_request.url
original_content_type = modified_request.headers.get("Content-Type") # 'application/x-www-form-urlencoded'
) -> (str, dict):
original_content_type = original_request_headers.get("Content-Type") # 'application/x-www-form-urlencoded'
if not original_content_type:
modified_content_type = "application/json"
else:
modified_content_type = original_content_type # 'application/x-www-form-urlencoded'
request_content = ""
current_ts = str(int(request_timestamp.timestamp() * 1000))
request_path = str(original_request_url).replace(base_uri, "/")
request_body = modified_request.content.decode("utf-8")
request_body = original_request_content
new_content = original_request_content
if request_body and "multipart" not in original_content_type:
modified_content_type = normalize_content_type(original_content_type)
request_content = request_body.strip()
Expand All @@ -84,23 +83,21 @@ async def encrypt_request(
iv_hex = md5_hex_digest(current_ts, False)
if key_hex and iv_hex:
new_content = encrypt_aes_cbc_pkcs5_padding(request_content, key_hex, iv_hex).encode('utf-8')
# Update the request content
update_request_with_content(modified_request, new_content)

modified_request.headers["User-Agent"] = "okhttp/3.14.9"
modified_request.headers["Content-Type"] = f"{modified_content_type};charset=utf-8"
modified_request.headers["Accept"] = "application/json"
modified_request.headers["Accept-Encoding"] = "gzip"
original_request_headers["User-Agent"] = "okhttp/3.14.9"
original_request_headers["Content-Type"] = f"{modified_content_type};charset=utf-8"
original_request_headers["Accept"] = "application/json"
original_request_headers["Accept-Encoding"] = "gzip"

modified_request.headers["REGION"] = region
original_request_headers["REGION"] = region

modified_request.headers["APP-SEND-DATE"] = current_ts
modified_request.headers["APP-CONTENT-ENCRYPTED"] = "1"
modified_request.headers["tenant-id"] = tenant_id
modified_request.headers["User-Type"] = "app"
modified_request.headers["APP-LANGUAGE-TYPE"] = "en"
original_request_headers["APP-SEND-DATE"] = current_ts
original_request_headers["APP-CONTENT-ENCRYPTED"] = "1"
original_request_headers["tenant-id"] = tenant_id
original_request_headers["User-Type"] = "app"
original_request_headers["APP-LANGUAGE-TYPE"] = "en"
if user_token:
modified_request.headers["blade-auth"] = user_token
original_request_headers["blade-auth"] = user_token
app_verification_string = get_app_verification_string(
class_name,
request_path,
Expand All @@ -110,20 +107,27 @@ async def encrypt_request(
request_content,
user_token
)
modified_request.headers["ORIGINAL-CONTENT-TYPE"] = modified_content_type
modified_request.headers["APP-VERIFICATION-STRING"] = app_verification_string
original_request_headers["ORIGINAL-CONTENT-TYPE"] = modified_content_type
original_request_headers["APP-VERIFICATION-STRING"] = app_verification_string
return new_content, original_request_headers


async def decrypt_request(req: Request, base_uri: str):
def decrypt_request(
*,
original_request_url: str,
original_request_headers: dict,
original_request_content: str,
base_uri: str,
) -> bytes:
charset = 'utf-8'
req_content = (await req.aread()).decode(charset).strip()
req_content = original_request_content.strip()
if req_content:
app_send_date = req.headers.get("APP-SEND-DATE")
original_content_type = req.headers.get("ORIGINAL-CONTENT-TYPE")
app_send_date = original_request_headers.get("APP-SEND-DATE")
original_content_type = original_request_headers.get("ORIGINAL-CONTENT-TYPE")
if app_send_date and original_content_type:
tenant_id = req.headers['tenant-id']
user_token = req.headers.get('blade-auth', '')
request_path = str(req.url).replace(base_uri, "/")
tenant_id = original_request_headers['tenant-id']
user_token = original_request_headers.get('blade-auth', '')
request_path = original_request_url.replace(base_uri, "/")
key = md5_hex_digest(
md5_hex_digest(
request_path + tenant_id + user_token + "app",
Expand All @@ -135,23 +139,26 @@ async def decrypt_request(req: Request, base_uri: str):
decrypted = decrypt_aes_cbc_pkcs5_padding(req_content, key, iv)
if decrypted:
return decrypted.encode(charset)
return req_content


async def decrypt_response(resp: Response):
if resp.is_success:
charset = resp.encoding
resp_content = (await resp.aread()).decode(charset).strip()
if resp_content:
app_send_date = resp.headers.get("APP-SEND-DATE")
original_content_type = resp.headers.get("ORIGINAL-CONTENT-TYPE")
if app_send_date and original_content_type:
original_response_key = app_send_date + "1" + original_content_type
key = md5_hex_digest(original_response_key, False) if len(original_response_key) > 0 else ""
iv = md5_hex_digest(app_send_date, False)
decrypted = decrypt_aes_cbc_pkcs5_padding(resp_content, key, iv)
if decrypted:
resp._content = decrypted.encode(charset)
resp.headers["Content-Length"] = str(len(resp._content))
resp.headers["Content-Type"] = original_content_type
return resp
return original_request_content.encode(charset)


def decrypt_response(
*,
original_response_content: str,
original_response_headers: dict,
original_response_charset: str,
) -> (bytes, dict):
resp_content = original_response_content.strip()
if resp_content:
app_send_date = original_response_headers.get("APP-SEND-DATE")
original_content_type = original_response_headers.get("ORIGINAL-CONTENT-TYPE")
if app_send_date and original_content_type:
original_response_key = app_send_date + "1" + original_content_type
key = md5_hex_digest(original_response_key, False) if len(original_response_key) > 0 else ""
iv = md5_hex_digest(app_send_date, False)
decrypted = decrypt_aes_cbc_pkcs5_padding(resp_content, key, iv)
if decrypted:
resp_content = decrypted
original_response_headers["Content-Type"] = original_content_type

return resp_content.encode(original_response_charset), original_response_headers
68 changes: 68 additions & 0 deletions src/saic_ismart_client_ng/net/httpx/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from datetime import datetime
from typing import Union

import httpx
from httpx import Request, Response
from httpx._content import encode_request

from saic_ismart_client_ng.net.crypto import encrypt_request, decrypt_request, decrypt_response


async def encrypt_httpx_request(
*,
modified_request: Request,
request_timestamp: datetime,
base_uri: str,
region: str,
tenant_id: str,
user_token: str = "",
class_name: str = "",
):
new_content, new_headers = encrypt_request(
original_request_url=str(modified_request.url),
original_request_headers=modified_request.headers,
original_request_content=modified_request.content.decode("utf-8"),
request_timestamp=request_timestamp,
base_uri=base_uri,
region=region,
tenant_id=tenant_id,
user_token=user_token,
class_name=class_name
)
update_httpx_request_with_content(modified_request, new_content)
modified_request.headers.update(new_headers)


async def decrypt_httpx_request(req: Request, base_uri: str):
charset = 'utf-8'
req_content = (await req.aread()).decode(charset).strip()
if req_content:
return decrypt_request(
original_request_url=str(req.url),
original_request_headers=req.headers,
original_request_content=req_content,
base_uri=base_uri
)
return req_content


async def decrypt_httpx_response(resp: Response):
if resp.is_success:
charset = resp.encoding
resp_content = (await resp.aread()).decode(charset).strip()
if resp_content:
new_resp_content, new_resp_headers = decrypt_response(
original_response_content=resp_content,
original_response_headers=resp.headers,
original_response_charset=charset
)
update_httpx_request_with_content(resp, new_resp_content)
resp.headers.update(new_resp_headers)
return resp


def update_httpx_request_with_content(modified_request: Union[httpx.Request, httpx.Response], new_content: bytes):
recomputed_headers, recomputed_stream = encode_request(content=new_content)
modified_request.stream = recomputed_stream
modified_request._content = new_content
modified_request.headers.update(recomputed_headers)
11 changes: 0 additions & 11 deletions src/saic_ismart_client_ng/net/utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,3 @@
import httpx
from httpx._content import encode_request


def update_request_with_content(modified_request: httpx.Request, new_content: bytes):
recomputed_headers, recomputed_stream = encode_request(content=new_content)
modified_request.stream = recomputed_stream
modified_request._content = new_content
modified_request.headers.update(recomputed_headers)


def normalize_content_type(original_content_type: str):
if 'multipart' in original_content_type:
return 'multipart/form-data'
Expand Down
37 changes: 14 additions & 23 deletions tests/security_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import httpx
import pytest

from saic_ismart_client_ng.net import security
from saic_ismart_client_ng.net.crypto import get_app_verification_string
from saic_ismart_client_ng.net.httpx import encrypt_httpx_request, decrypt_httpx_request


def test_get_app_verification_string_valid():
Expand All @@ -17,8 +18,8 @@ def test_get_app_verification_string_valid():
request_content = '{"key": "value"}'
user_token = 'dummy_token'

result = security.get_app_verification_string(clazz_simple_name, request_path, current_ts, tenant_id,
content_type, request_content, user_token)
result = get_app_verification_string(clazz_simple_name, request_path, current_ts, tenant_id,
content_type, request_content, user_token)

assert 'afd4eaf98af2d964f8ea840fc144ee7bae95dbeeeb251d5e3a01371442f92eeb' == result

Expand All @@ -38,7 +39,7 @@ async def test_a_request_should_encrypt_properly():
original_request_content = original_request.content.decode('utf-8').strip()
region = 'EU'
tenant_id = '2559'
computed_verification_string = security.get_app_verification_string(
computed_verification_string = get_app_verification_string(
"",
"/with/path?vin=zevin",
str(int(ts.timestamp() * 1000)),
Expand All @@ -47,13 +48,8 @@ async def test_a_request_should_encrypt_properly():
original_request_content, ''
)

await security.encrypt_request(
modified_request=original_request,
request_timestamp=ts,
base_uri=base_uri,
region=region,
tenant_id=tenant_id,
)
await encrypt_httpx_request(modified_request=original_request, request_timestamp=ts, base_uri=base_uri,
region=region, tenant_id=tenant_id)
assert original_request != None

Check failure on line 53 in tests/security_test.py

View workflow job for this annotation

GitHub Actions / build (3.11)

Ruff (E711)

tests/security_test.py:53:32: E711 Comparison to `None` should be `cond is not None`

Check failure on line 53 in tests/security_test.py

View workflow job for this annotation

GitHub Actions / build (3.12)

Ruff (E711)

tests/security_test.py:53:32: E711 Comparison to `None` should be `cond is not None`
assert region == original_request.headers['REGION']
assert tenant_id == original_request.headers['tenant-id']
Expand All @@ -78,14 +74,9 @@ async def test_a_request_should_decrypt_properly():
region = 'EU'
tenant_id = '2559'

await security.encrypt_request(
modified_request=original_request,
request_timestamp=ts,
base_uri=base_uri,
region=region,
tenant_id=tenant_id,
)
decrypted = await security.decrypt_request(original_request, base_uri=base_uri)
await encrypt_httpx_request(modified_request=original_request, request_timestamp=ts, base_uri=base_uri,
region=region, tenant_id=tenant_id)
decrypted = await decrypt_httpx_request(original_request, base_uri=base_uri)

assert decrypted != None

Check failure on line 81 in tests/security_test.py

View workflow job for this annotation

GitHub Actions / build (3.11)

Ruff (E711)

tests/security_test.py:81:25: E711 Comparison to `None` should be `cond is not None`

Check failure on line 81 in tests/security_test.py

View workflow job for this annotation

GitHub Actions / build (3.12)

Ruff (E711)

tests/security_test.py:81:25: E711 Comparison to `None` should be `cond is not None`
decrypted_json = json.loads(decrypted)
Expand All @@ -101,8 +92,8 @@ def test_with_empty_request_path():
request_content = '{"key": "value"}'
user_token = 'dummy_token'

result = security.get_app_verification_string(clazz_simple_name, request_path, current_ts, tenant_id,
content_type, request_content, user_token)
result = get_app_verification_string(clazz_simple_name, request_path, current_ts, tenant_id,
content_type, request_content, user_token)
assert 'ff8cb13ebcce5958e7fbfe602716c653fd72ce78842be87b6d50dccede198735' == result


Expand All @@ -115,8 +106,8 @@ def test_with_no_request_content():
request_content = ''
user_token = 'dummy_token'

result = security.get_app_verification_string(clazz_simple_name, request_path, current_ts, tenant_id,
content_type, request_content, user_token)
result = get_app_verification_string(clazz_simple_name, request_path, current_ts, tenant_id,
content_type, request_content, user_token)
assert '332c85836aa9afc864282436a740eb2cc778fafd1fea74dd887c1f8de5056de0' == result


Expand Down

0 comments on commit 5e9657d

Please sign in to comment.