From 5e9657d318f7326dc4bef0804ef3443c13c82b24 Mon Sep 17 00:00:00 2001 From: Giovanni Condello Date: Thu, 27 Jun 2024 19:09:15 +0200 Subject: [PATCH] Split httpx code from basic data manipulation routines --- .../net/client/__init__.py | 6 +- .../net/{security.py => crypto.py} | 111 ++++++++++-------- .../net/httpx/__init__.py | 68 +++++++++++ src/saic_ismart_client_ng/net/utils.py | 11 -- tests/security_test.py | 37 +++--- 5 files changed, 144 insertions(+), 89 deletions(-) rename src/saic_ismart_client_ng/net/{security.py => crypto.py} (64%) create mode 100644 src/saic_ismart_client_ng/net/httpx/__init__.py diff --git a/src/saic_ismart_client_ng/net/client/__init__.py b/src/saic_ismart_client_ng/net/client/__init__.py index 8b35c5d..7f091ac 100644 --- a/src/saic_ismart_client_ng/net/client/__init__.py +++ b/src/saic_ismart_client_ng/net/client/__init__.py @@ -6,7 +6,7 @@ from saic_ismart_client_ng.listener import SaicApiListener from saic_ismart_client_ng.model import SaicApiConfiguration -from saic_ismart_client_ng.net.security import decrypt_response, encrypt_request +from saic_ismart_client_ng.net.httpx import decrypt_httpx_response, encrypt_httpx_request class AbstractSaicClient(ABC): @@ -24,7 +24,7 @@ def __init__( self.__client = httpx.AsyncClient( event_hooks={ "request": [self.invoke_request_listener, self.encrypt_request], - "response": [decrypt_response, self.invoke_response_listener] + "response": [decrypt_httpx_response, self.invoke_response_listener] } ) @@ -84,7 +84,7 @@ async def invoke_response_listener(self, response: httpx.Response): self.__logger.warning(f"Error invoking request listener: {e}", exc_info=e) async def encrypt_request(self, modified_request: httpx.Request): - return await encrypt_request( + return await encrypt_httpx_request( modified_request=modified_request, request_timestamp=datetime.now(), base_uri=self.configuration.base_uri, diff --git a/src/saic_ismart_client_ng/net/security.py b/src/saic_ismart_client_ng/net/crypto.py similarity index 64% rename from src/saic_ismart_client_ng/net/security.py rename to src/saic_ismart_client_ng/net/crypto.py index f88fdae..b7fb263 100644 --- a/src/saic_ismart_client_ng/net/security.py +++ b/src/saic_ismart_client_ng/net/crypto.py @@ -2,13 +2,10 @@ import hmac import logging from datetime import datetime -from typing import Optional - -from httpx import Response, Request from saic_ismart_client_ng.crypto_utils import md5_hex_digest, encrypt_aes_cbc_pkcs5_padding, \ decrypt_aes_cbc_pkcs5_padding -from saic_ismart_client_ng.net.utils import update_request_with_content, normalize_content_type +from saic_ismart_client_ng.net.utils import normalize_content_type logger = logging.getLogger(__name__) @@ -50,18 +47,19 @@ def get_app_verification_string( return "" -async def encrypt_request( +def encrypt_request( *, - modified_request: Request, + original_request_url: str, + original_request_headers: dict, + original_request_content: str, request_timestamp: datetime, base_uri: str, region: str, tenant_id: str, user_token: str = "", class_name: str = "", -): - original_request_url = modified_request.url - original_content_type = modified_request.headers.get("Content-Type") # 'application/x-www-form-urlencoded' +) -> (str, dict): + original_content_type = original_request_headers.get("Content-Type") # 'application/x-www-form-urlencoded' if not original_content_type: modified_content_type = "application/json" else: @@ -69,7 +67,8 @@ async def encrypt_request( request_content = "" current_ts = str(int(request_timestamp.timestamp() * 1000)) request_path = str(original_request_url).replace(base_uri, "/") - request_body = modified_request.content.decode("utf-8") + request_body = original_request_content + new_content = original_request_content if request_body and "multipart" not in original_content_type: modified_content_type = normalize_content_type(original_content_type) request_content = request_body.strip() @@ -84,23 +83,21 @@ async def encrypt_request( iv_hex = md5_hex_digest(current_ts, False) if key_hex and iv_hex: new_content = encrypt_aes_cbc_pkcs5_padding(request_content, key_hex, iv_hex).encode('utf-8') - # Update the request content - update_request_with_content(modified_request, new_content) - modified_request.headers["User-Agent"] = "okhttp/3.14.9" - modified_request.headers["Content-Type"] = f"{modified_content_type};charset=utf-8" - modified_request.headers["Accept"] = "application/json" - modified_request.headers["Accept-Encoding"] = "gzip" + original_request_headers["User-Agent"] = "okhttp/3.14.9" + original_request_headers["Content-Type"] = f"{modified_content_type};charset=utf-8" + original_request_headers["Accept"] = "application/json" + original_request_headers["Accept-Encoding"] = "gzip" - modified_request.headers["REGION"] = region + original_request_headers["REGION"] = region - modified_request.headers["APP-SEND-DATE"] = current_ts - modified_request.headers["APP-CONTENT-ENCRYPTED"] = "1" - modified_request.headers["tenant-id"] = tenant_id - modified_request.headers["User-Type"] = "app" - modified_request.headers["APP-LANGUAGE-TYPE"] = "en" + original_request_headers["APP-SEND-DATE"] = current_ts + original_request_headers["APP-CONTENT-ENCRYPTED"] = "1" + original_request_headers["tenant-id"] = tenant_id + original_request_headers["User-Type"] = "app" + original_request_headers["APP-LANGUAGE-TYPE"] = "en" if user_token: - modified_request.headers["blade-auth"] = user_token + original_request_headers["blade-auth"] = user_token app_verification_string = get_app_verification_string( class_name, request_path, @@ -110,20 +107,27 @@ async def encrypt_request( request_content, user_token ) - modified_request.headers["ORIGINAL-CONTENT-TYPE"] = modified_content_type - modified_request.headers["APP-VERIFICATION-STRING"] = app_verification_string + original_request_headers["ORIGINAL-CONTENT-TYPE"] = modified_content_type + original_request_headers["APP-VERIFICATION-STRING"] = app_verification_string + return new_content, original_request_headers -async def decrypt_request(req: Request, base_uri: str): +def decrypt_request( + *, + original_request_url: str, + original_request_headers: dict, + original_request_content: str, + base_uri: str, +) -> bytes: charset = 'utf-8' - req_content = (await req.aread()).decode(charset).strip() + req_content = original_request_content.strip() if req_content: - app_send_date = req.headers.get("APP-SEND-DATE") - original_content_type = req.headers.get("ORIGINAL-CONTENT-TYPE") + app_send_date = original_request_headers.get("APP-SEND-DATE") + original_content_type = original_request_headers.get("ORIGINAL-CONTENT-TYPE") if app_send_date and original_content_type: - tenant_id = req.headers['tenant-id'] - user_token = req.headers.get('blade-auth', '') - request_path = str(req.url).replace(base_uri, "/") + tenant_id = original_request_headers['tenant-id'] + user_token = original_request_headers.get('blade-auth', '') + request_path = original_request_url.replace(base_uri, "/") key = md5_hex_digest( md5_hex_digest( request_path + tenant_id + user_token + "app", @@ -135,23 +139,26 @@ async def decrypt_request(req: Request, base_uri: str): decrypted = decrypt_aes_cbc_pkcs5_padding(req_content, key, iv) if decrypted: return decrypted.encode(charset) - return req_content - - -async def decrypt_response(resp: Response): - if resp.is_success: - charset = resp.encoding - resp_content = (await resp.aread()).decode(charset).strip() - if resp_content: - app_send_date = resp.headers.get("APP-SEND-DATE") - original_content_type = resp.headers.get("ORIGINAL-CONTENT-TYPE") - if app_send_date and original_content_type: - original_response_key = app_send_date + "1" + original_content_type - key = md5_hex_digest(original_response_key, False) if len(original_response_key) > 0 else "" - iv = md5_hex_digest(app_send_date, False) - decrypted = decrypt_aes_cbc_pkcs5_padding(resp_content, key, iv) - if decrypted: - resp._content = decrypted.encode(charset) - resp.headers["Content-Length"] = str(len(resp._content)) - resp.headers["Content-Type"] = original_content_type - return resp + return original_request_content.encode(charset) + + +def decrypt_response( + *, + original_response_content: str, + original_response_headers: dict, + original_response_charset: str, +) -> (bytes, dict): + resp_content = original_response_content.strip() + if resp_content: + app_send_date = original_response_headers.get("APP-SEND-DATE") + original_content_type = original_response_headers.get("ORIGINAL-CONTENT-TYPE") + if app_send_date and original_content_type: + original_response_key = app_send_date + "1" + original_content_type + key = md5_hex_digest(original_response_key, False) if len(original_response_key) > 0 else "" + iv = md5_hex_digest(app_send_date, False) + decrypted = decrypt_aes_cbc_pkcs5_padding(resp_content, key, iv) + if decrypted: + resp_content = decrypted + original_response_headers["Content-Type"] = original_content_type + + return resp_content.encode(original_response_charset), original_response_headers diff --git a/src/saic_ismart_client_ng/net/httpx/__init__.py b/src/saic_ismart_client_ng/net/httpx/__init__.py new file mode 100644 index 0000000..3e7e8be --- /dev/null +++ b/src/saic_ismart_client_ng/net/httpx/__init__.py @@ -0,0 +1,68 @@ +from datetime import datetime +from typing import Union + +import httpx +from httpx import Request, Response +from httpx._content import encode_request + +from saic_ismart_client_ng.net.crypto import encrypt_request, decrypt_request, decrypt_response + + +async def encrypt_httpx_request( + *, + modified_request: Request, + request_timestamp: datetime, + base_uri: str, + region: str, + tenant_id: str, + user_token: str = "", + class_name: str = "", +): + new_content, new_headers = encrypt_request( + original_request_url=str(modified_request.url), + original_request_headers=modified_request.headers, + original_request_content=modified_request.content.decode("utf-8"), + request_timestamp=request_timestamp, + base_uri=base_uri, + region=region, + tenant_id=tenant_id, + user_token=user_token, + class_name=class_name + ) + update_httpx_request_with_content(modified_request, new_content) + modified_request.headers.update(new_headers) + + +async def decrypt_httpx_request(req: Request, base_uri: str): + charset = 'utf-8' + req_content = (await req.aread()).decode(charset).strip() + if req_content: + return decrypt_request( + original_request_url=str(req.url), + original_request_headers=req.headers, + original_request_content=req_content, + base_uri=base_uri + ) + return req_content + + +async def decrypt_httpx_response(resp: Response): + if resp.is_success: + charset = resp.encoding + resp_content = (await resp.aread()).decode(charset).strip() + if resp_content: + new_resp_content, new_resp_headers = decrypt_response( + original_response_content=resp_content, + original_response_headers=resp.headers, + original_response_charset=charset + ) + update_httpx_request_with_content(resp, new_resp_content) + resp.headers.update(new_resp_headers) + return resp + + +def update_httpx_request_with_content(modified_request: Union[httpx.Request, httpx.Response], new_content: bytes): + recomputed_headers, recomputed_stream = encode_request(content=new_content) + modified_request.stream = recomputed_stream + modified_request._content = new_content + modified_request.headers.update(recomputed_headers) diff --git a/src/saic_ismart_client_ng/net/utils.py b/src/saic_ismart_client_ng/net/utils.py index 87ae287..ba20a63 100644 --- a/src/saic_ismart_client_ng/net/utils.py +++ b/src/saic_ismart_client_ng/net/utils.py @@ -1,14 +1,3 @@ -import httpx -from httpx._content import encode_request - - -def update_request_with_content(modified_request: httpx.Request, new_content: bytes): - recomputed_headers, recomputed_stream = encode_request(content=new_content) - modified_request.stream = recomputed_stream - modified_request._content = new_content - modified_request.headers.update(recomputed_headers) - - def normalize_content_type(original_content_type: str): if 'multipart' in original_content_type: return 'multipart/form-data' diff --git a/tests/security_test.py b/tests/security_test.py index 8678e1b..c24327d 100644 --- a/tests/security_test.py +++ b/tests/security_test.py @@ -5,7 +5,8 @@ import httpx import pytest -from saic_ismart_client_ng.net import security +from saic_ismart_client_ng.net.crypto import get_app_verification_string +from saic_ismart_client_ng.net.httpx import encrypt_httpx_request, decrypt_httpx_request def test_get_app_verification_string_valid(): @@ -17,8 +18,8 @@ def test_get_app_verification_string_valid(): request_content = '{"key": "value"}' user_token = 'dummy_token' - result = security.get_app_verification_string(clazz_simple_name, request_path, current_ts, tenant_id, - content_type, request_content, user_token) + result = get_app_verification_string(clazz_simple_name, request_path, current_ts, tenant_id, + content_type, request_content, user_token) assert 'afd4eaf98af2d964f8ea840fc144ee7bae95dbeeeb251d5e3a01371442f92eeb' == result @@ -38,7 +39,7 @@ async def test_a_request_should_encrypt_properly(): original_request_content = original_request.content.decode('utf-8').strip() region = 'EU' tenant_id = '2559' - computed_verification_string = security.get_app_verification_string( + computed_verification_string = get_app_verification_string( "", "/with/path?vin=zevin", str(int(ts.timestamp() * 1000)), @@ -47,13 +48,8 @@ async def test_a_request_should_encrypt_properly(): original_request_content, '' ) - await security.encrypt_request( - modified_request=original_request, - request_timestamp=ts, - base_uri=base_uri, - region=region, - tenant_id=tenant_id, - ) + await encrypt_httpx_request(modified_request=original_request, request_timestamp=ts, base_uri=base_uri, + region=region, tenant_id=tenant_id) assert original_request != None assert region == original_request.headers['REGION'] assert tenant_id == original_request.headers['tenant-id'] @@ -78,14 +74,9 @@ async def test_a_request_should_decrypt_properly(): region = 'EU' tenant_id = '2559' - await security.encrypt_request( - modified_request=original_request, - request_timestamp=ts, - base_uri=base_uri, - region=region, - tenant_id=tenant_id, - ) - decrypted = await security.decrypt_request(original_request, base_uri=base_uri) + await encrypt_httpx_request(modified_request=original_request, request_timestamp=ts, base_uri=base_uri, + region=region, tenant_id=tenant_id) + decrypted = await decrypt_httpx_request(original_request, base_uri=base_uri) assert decrypted != None decrypted_json = json.loads(decrypted) @@ -101,8 +92,8 @@ def test_with_empty_request_path(): request_content = '{"key": "value"}' user_token = 'dummy_token' - result = security.get_app_verification_string(clazz_simple_name, request_path, current_ts, tenant_id, - content_type, request_content, user_token) + result = get_app_verification_string(clazz_simple_name, request_path, current_ts, tenant_id, + content_type, request_content, user_token) assert 'ff8cb13ebcce5958e7fbfe602716c653fd72ce78842be87b6d50dccede198735' == result @@ -115,8 +106,8 @@ def test_with_no_request_content(): request_content = '' user_token = 'dummy_token' - result = security.get_app_verification_string(clazz_simple_name, request_path, current_ts, tenant_id, - content_type, request_content, user_token) + result = get_app_verification_string(clazz_simple_name, request_path, current_ts, tenant_id, + content_type, request_content, user_token) assert '332c85836aa9afc864282436a740eb2cc778fafd1fea74dd887c1f8de5056de0' == result