forked from SAIC-iSmart-API/saic-python-mqtt-gateway
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmqtt_publisher.py
196 lines (171 loc) · 9.32 KB
/
mqtt_publisher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
import logging
import os
import ssl
from abc import ABC
from typing import Optional
import gmqtt
import mqtt_topics
from charging_station import ChargingStation
from configuration import Configuration
from publisher import Publisher
LOG = logging.getLogger(__name__)
LOG.setLevel(level=os.getenv('LOG_LEVEL', 'INFO').upper())
MQTT_LOG = logging.getLogger(gmqtt.__name__)
MQTT_LOG.setLevel(level=os.getenv('MQTT_LOG_LEVEL', 'INFO').upper())
class MqttCommandListener(ABC):
async def on_mqtt_command_received(self, *, vin: str, topic: str, payload: str) -> None:
raise NotImplementedError("Should have implemented this")
class MqttClient(Publisher):
def __init__(self, configuration: Configuration):
super().__init__(configuration)
self.configuration = configuration
self.publisher_id = self.configuration.mqtt_client_id
self.topic_root = configuration.mqtt_topic
self.client = None
self.host = self.configuration.mqtt_host
self.port = self.configuration.mqtt_port
self.transport_protocol = self.configuration.mqtt_transport_protocol
self.command_listener: Optional[MqttCommandListener] = None
self.vin_by_charge_state_topic: dict[str, str] = {}
self.last_charge_state_by_vin: [str, str] = {}
self.vin_by_charger_connected_topic: dict[str, str] = {}
mqtt_client = gmqtt.Client(
client_id=str(self.publisher_id),
transport=self.transport_protocol.transport_mechanism,
logger=MQTT_LOG,
will_message=gmqtt.Message(
topic=self.get_topic(mqtt_topics.INTERNAL_LWT, False),
payload='offline',
retain=True
)
)
mqtt_client.on_connect = self.__on_connect
mqtt_client.on_message = self.__on_message
self.client = mqtt_client
def get_mqtt_account_prefix(self) -> str:
return MqttClient.remove_special_mqtt_characters(
f'{self.configuration.mqtt_topic}/{self.configuration.saic_user}')
@staticmethod
def remove_special_mqtt_characters(input_str: str) -> str:
return input_str.replace('+', '_').replace('#', '_').replace('*', '_') \
.replace('>', '_').replace('$', '_')
async def connect(self):
if self.configuration.mqtt_user is not None:
if self.configuration.mqtt_password is not None:
self.client.set_auth_credentials(
username=self.configuration.mqtt_user,
password=self.configuration.mqtt_password
)
else:
self.client.set_auth_credentials(
username=self.configuration.mqtt_user
)
if self.transport_protocol.with_tls:
cert_uri = self.configuration.tls_server_cert_path
LOG.debug(f'Configuring network encryption and authentication options for MQTT using {cert_uri}')
ssl_context = ssl.SSLContext()
ssl_context.load_verify_locations(cafile=cert_uri)
ssl_context.check_hostname = False
else:
ssl_context = None
await self.client.connect(host=self.host, port=self.port, version=gmqtt.constants.MQTTv311, ssl=ssl_context)
def __on_connect(self, _client, _flags, rc, _properties) -> None:
if rc == gmqtt.constants.CONNACK_ACCEPTED:
LOG.info('Connected to MQTT broker')
mqtt_account_prefix = self.get_mqtt_account_prefix()
self.client.subscribe(f'{mqtt_account_prefix}/{mqtt_topics.VEHICLES}/+/+/+/set')
self.client.subscribe(f'{mqtt_account_prefix}/{mqtt_topics.VEHICLES}/+/+/+/+/set')
self.client.subscribe(f'{mqtt_account_prefix}/{mqtt_topics.VEHICLES}/+/{mqtt_topics.REFRESH_MODE}/set')
self.client.subscribe(f'{mqtt_account_prefix}/{mqtt_topics.VEHICLES}/+/{mqtt_topics.REFRESH_PERIOD}/+/set')
for charging_station in self.configuration.charging_stations_by_vin.values():
LOG.debug(f'Subscribing to MQTT topic {charging_station.charge_state_topic}')
self.vin_by_charge_state_topic[charging_station.charge_state_topic] = charging_station.vin
self.client.subscribe(charging_station.charge_state_topic)
if charging_station.connected_topic:
LOG.debug(f'Subscribing to MQTT topic {charging_station.connected_topic}')
self.vin_by_charger_connected_topic[charging_station.connected_topic] = charging_station.vin
self.client.subscribe(charging_station.connected_topic)
self.keepalive()
else:
if rc == gmqtt.constants.CONNACK_REFUSED_BAD_USERNAME_PASSWORD:
LOG.error(f'MQTT connection error: bad username or password. Return code {rc}')
elif rc == gmqtt.constants.CONNACK_REFUSED_PROTOCOL_VERSION:
LOG.error(f'MQTT connection error: refused protocol version. Return code {rc}')
else:
LOG.error(f'MQTT connection error.Return code {rc}')
SystemExit(f'Unable to connect to MQTT broker. Return code: {rc}')
async def __on_message(self, _client, topic, payload, _qos, _properties) -> None:
try:
if isinstance(payload, bytes):
payload = payload.decode('utf-8')
else:
payload = str(payload)
await self.__on_message_real(topic=topic, payload=payload)
except Exception as e:
LOG.exception(f'Error while processing MQTT message: {e}')
async def __on_message_real(self, *, topic: str, payload: str) -> None:
if topic in self.vin_by_charge_state_topic:
LOG.debug(f'Received message over topic {topic} with payload {payload}')
vin = self.vin_by_charge_state_topic[topic]
charging_station = self.configuration.charging_stations_by_vin[vin]
if self.should_force_refresh(payload, charging_station):
LOG.debug(f'Vehicle with vin {vin} is charging. Setting refresh mode to force')
mqtt_account_prefix = self.get_mqtt_account_prefix()
topic = f'{mqtt_account_prefix}/{mqtt_topics.VEHICLES}/{vin}/{mqtt_topics.REFRESH_MODE}/set'
if self.command_listener is not None:
await self.command_listener.on_mqtt_command_received(vin=vin, topic=topic, payload='force')
elif topic in self.vin_by_charger_connected_topic:
LOG.debug(f'Received message over topic {topic} with payload {payload}')
vin = self.vin_by_charger_connected_topic[topic]
charging_station = self.configuration.charging_stations_by_vin[vin]
if payload == charging_station.connected_value:
LOG.debug(f'Vehicle with vin {vin} is connected to its charging station')
else:
LOG.debug(f'Vehicle with vin {vin} is disconnected from its charging station')
else:
vin = self.get_vin_from_topic(topic)
if self.command_listener is not None:
await self.command_listener.on_mqtt_command_received(vin=vin, topic=topic, payload=payload)
return
def publish(self, topic: str, payload) -> None:
self.client.publish(topic, payload, retain=True)
def get_topic(self, key: str, no_prefix: bool) -> str:
if no_prefix:
topic = key
else:
topic = f'{self.topic_root}/{key}'
return topic
def publish_json(self, key: str, data: dict, no_prefix: bool = False) -> None:
payload = self.dict_to_anonymized_json(data)
self.publish(topic=self.get_topic(key, no_prefix), payload=payload)
def publish_str(self, key: str, value: str, no_prefix: bool = False) -> None:
self.publish(topic=self.get_topic(key, no_prefix), payload=value)
def publish_int(self, key: str, value: int, no_prefix: bool = False) -> None:
self.publish(topic=self.get_topic(key, no_prefix), payload=value)
def publish_bool(self, key: str, value: bool | int | None, no_prefix: bool = False) -> None:
if value is None:
value = False
elif isinstance(value, int):
value = value == 1
self.publish(topic=self.get_topic(key, no_prefix), payload=value)
def publish_float(self, key: str, value: float, no_prefix: bool = False) -> None:
self.publish(topic=self.get_topic(key, no_prefix), payload=value)
def get_vin_from_topic(self, topic: str) -> str:
global_topic_removed = topic[len(self.configuration.mqtt_topic) + 1:]
elements = global_topic_removed.split('/')
return elements[2]
def should_force_refresh(self, current_charging_value: str, charging_station: ChargingStation):
vin = charging_station.vin
last_charging_value: str | None = None
if vin in self.last_charge_state_by_vin:
last_charging_value = self.last_charge_state_by_vin[vin]
self.last_charge_state_by_vin[vin] = current_charging_value
if last_charging_value:
if last_charging_value == current_charging_value:
LOG.debug(f'Last charging value equals current charging value. No refresh needed.')
return False
else:
LOG.debug(f'Charging value has changed from {last_charging_value} to {current_charging_value}.')
return True
else:
return True