-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmqtt.go
161 lines (138 loc) · 3.91 KB
/
mqtt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
//go:generate go run ./helpers/
package hass_mqtt
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/iancoleman/strcase"
"github.com/kjbreil/hass-mqtt/device"
"github.com/kjbreil/hass-mqtt/entities"
"log/slog"
"os"
"path/filepath"
"time"
)
type Client struct {
NodeID string
config Config
OnConnect func(client mqtt.Client)
devices map[string]*device.Device // Device Identifiers map to devices
mqtt mqtt.Client
logger *slog.Logger
}
type Config struct {
NodeID string `json:"node_id" yaml:"node_id"`
MQTT struct {
Host string `json:"host" yaml:"host"`
Port int `json:"port" yaml:"port"`
SSL bool `json:"ssl" yaml:"ssl"`
UserName *string `json:"user_name,omitempty" yaml:"user_name"`
Password *string `json:"password,omitempty" yaml:"password"`
} `json:"MQTT" yaml:"mqtt"`
}
func NewClient(c Config) (*Client, error) {
return NewClientWithLogger(c, defaultLogger())
}
func NewClientWithLogger(c Config, logger *slog.Logger) (*Client, error) {
// TODO: Validate Config
client := &Client{
NodeID: strcase.ToDelimited(c.NodeID, uint8(0x2d)),
config: c,
devices: make(map[string]*device.Device),
logger: logger,
}
return client, nil
}
func (c *Client) Logger() *slog.Logger {
return c.logger
}
var (
ErrNoDeviceFound = fmt.Errorf("no device found")
)
func (c *Client) Connect() error {
entities.NodeID = c.NodeID
var err error
for _, d := range c.devices {
err = d.Initialize()
if err != nil {
return err
}
}
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", c.config.MQTT.Host, c.config.MQTT.Port))
opts.SetClientID(c.NodeID)
opts.SetOrderMatters(false)
opts.SetKeepAlive(30 * time.Second)
opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
c.logger.Info(fmt.Sprintf("Received message on Default Handler: %s from topic: %s", msg.Payload(), filepath.Base(msg.Topic())))
})
mqtt.ERROR = newMQTTLog(c.logger, slog.LevelError)
mqtt.CRITICAL = newMQTTLog(c.logger, slog.LevelInfo)
// mqtt.WARN = newMQTTLog(c.logger, slog.LevelWarn)
mqtt.DEBUG = newMQTTLog(c.logger, slog.LevelDebug)
opts.SetOnConnectHandler(
func(client mqtt.Client) {
if c.OnConnect != nil {
c.OnConnect(client)
}
for _, d := range c.devices {
d.Subscribe()
}
},
)
opts.OnConnectionLost = func(client mqtt.Client, err error) {
c.logger.Info("MQTT connection lost")
}
c.mqtt = mqtt.NewClient(opts)
for _, d := range c.devices {
d.SetMQTTFields(c.mqtt)
}
if token := c.mqtt.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
return nil
}
func (c *Client) Add(dev *device.Device) error {
if _, ok := c.devices[dev.GetUniqueId()]; ok {
return fmt.Errorf("%s entity already exists", dev.GetUniqueId())
}
c.devices[dev.GetUniqueId()] = dev
if c.mqtt.IsConnected() {
dev.Initialize()
dev.SetMQTTFields(c.mqtt)
dev.Subscribe()
}
return nil
}
func (c *Client) Get(identifier string) *device.Device {
if dev, ok := c.devices[identifier]; ok {
return dev
}
return nil
}
func (c *Client) Disconnect() {
for _, d := range c.devices {
d.UnSubscribe()
}
}
// Subscribe is a pass-through of MQTT Subscribe
func (c *Client) Subscribe(topic string, qos byte, callback mqtt.MessageHandler) mqtt.Token {
return c.mqtt.Subscribe(topic, qos, callback)
}
// Unsubscribe is a pass-through of MQTT Unsubscribe
func (c *Client) Unsubscribe(topics ...string) mqtt.Token {
return c.mqtt.Unsubscribe(topics...)
}
func (c *Client) Publish(topic string, qos byte, retained bool, payload interface{}) mqtt.Token {
return c.mqtt.Publish(topic, qos, retained, payload)
}
func defaultLogger() *slog.Logger {
return slog.New(slog.NewJSONHandler(os.Stdout, nil))
// log := funcr.New(
// func(pfx, args string) { fmt.Println(pfx, args) },
// funcr.Options{
// LogCaller: funcr.None,
// LogTimestamp: true,
// Verbosity: 1,
// })
// return log.WithName("hass-ws")
}