forked from SAIC-iSmart-API/saic-python-mqtt-gateway
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpublisher.py
90 lines (75 loc) · 3.56 KB
/
publisher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import json
import re
import mqtt_topics
from configuration import Configuration
class Publisher:
def __init__(self, config: Configuration):
self.configuration = config
self.mode_by_vin = {}
self.map = {}
def publish_json(self, key: str, data: dict, no_prefix: bool = False) -> None:
self.map[key] = data
def publish_str(self, key: str, value: str, no_prefix: bool = False) -> None:
self.map[key] = value
def publish_int(self, key: str, value: int, no_prefix: bool = False) -> None:
self.map[key] = value
def publish_bool(self, key: str, value: bool | int | None, no_prefix: bool = False) -> None:
if value is None:
value = False
elif isinstance(value, int):
value = value == 1
self.map[key] = value
def publish_float(self, key: str, value: float, no_prefix: bool = False) -> None:
self.map[key] = value
def remove_byte_strings(self, data: dict) -> dict:
for key in data.keys():
if isinstance(data[key], bytes):
data[key] = str(data[key])
elif isinstance(data[key], dict):
data[key] = self.remove_byte_strings(data[key])
elif isinstance(data[key], list):
for item in data[key]:
if isinstance(item, dict):
self.remove_byte_strings(item)
return data
def anonymize(self, data: dict) -> dict:
if isinstance(data, dict):
for key in data.keys():
if isinstance(data[key], str):
match key:
case 'password':
data[key] = '******'
case 'uid' | 'email' | 'user_name' | 'account' | 'ping' | 'token' | 'access_token' | 'refreshToken' | 'refresh_token' | 'vin':
data[key] = Publisher.anonymize_str(data[key])
case 'deviceId':
data[key] = self.anonymize_device_id(data[key])
case 'seconds' | 'bindTime' | 'eventCreationTime' | 'latitude' | 'longitude':
data[key] = Publisher.anonymize_int(data[key])
case 'eventID' | 'event-id' | 'event_id' | 'eventId' | 'event_id' | 'eventID' | 'lastKeySeen':
data[key] = 9999
case 'content':
data[key] = re.sub('\\(\\*\\*\\*...\\)', '(***XXX)', data[key])
elif isinstance(data[key], dict):
data[key] = self.anonymize(data[key])
elif isinstance(data[key], (list, set, tuple)):
data[key] = [self.anonymize(item) for item in data[key]]
return data
def keepalive(self):
self.publish_str(mqtt_topics.INTERNAL_LWT, 'online', False)
@staticmethod
def anonymize_str(value: str) -> str:
r = re.sub('[a-zA-Z]', 'X', value)
return re.sub('[1-9]', '9', r)
def anonymize_device_id(self, device_id: str) -> str:
elements = device_id.split('###')
return f'{self.anonymize_str(elements[0])}###{self.anonymize_str(elements[1])}'
@staticmethod
def anonymize_int(value: int) -> int:
return int(value / 100000 * 100000)
def dict_to_anonymized_json(self, data):
no_binary_strings = self.remove_byte_strings(data)
if self.configuration.anonymized_publishing:
result = self.anonymize(no_binary_strings)
else:
result = no_binary_strings
return json.dumps(result, indent=2)