-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmqtt.py
59 lines (53 loc) · 1.75 KB
/
mqtt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import json
import threading
import paho.mqtt.client as mqtt
import config
class Client():
"""MQTT client as context manager"""
def __init__(self, enable=True):
self.enable = enable
if not enable:
return
self.client = mqtt.Client()
self.client.enable_logger()
self.client.tls_set(ca_certs=config.mqtt_certs)
self.client.username_pw_set(config.mqtt_user, config.mqtt_pass)
self.mid = None
self.cv = threading.Condition()
self.client.on_publish = self.__handle_publish
def __enter__(self):
if self.enable:
self.client.connect(config.mqtt_broker, port=config.mqtt_port)
self.client.loop_start()
return self
def __exit__(self, type, value, traceback):
if self.enable:
self.client.loop_stop()
self.client.disconnect()
def __handle_publish(self, client, userdata, mid):
with self.cv:
self.mid = mid
self.cv.notify()
def __publish(self, msg, level, timeout=1):
"""
Publish MQTT message with timeout
"""
msg = {
"level": level,
"component": "transcoding/" + config.mqtt_host,
"msg": msg,
}
payload = json.dumps(msg)
res = self.client.publish("/voc/alert", payload)
if res.rc != mqtt.MQTT_ERR_SUCCESS:
print("MQTT publish failed with", res.rc)
return
sent_mid = lambda: self.mid == res.mid
with self.cv:
if not self.cv.wait_for(sent_mid, timeout):
print("MQTT publish timeout")
return
def info(self, msg):
print(msg)
if self.enable:
self.__publish(msg, level="info")