|
|
@@ -10,9 +10,20 @@ import (
|
|
|
mqtt_cli "github.com/eclipse/paho.mqtt.golang"
|
|
|
)
|
|
|
|
|
|
-type mqtt struct {
|
|
|
- client mqtt_cli.Client
|
|
|
- topics list.List
|
|
|
+type Mqtt interface {
|
|
|
+ New(string, int) Mqtt
|
|
|
+ connect(string, int) mqtt_cli.Client
|
|
|
+ SubscribeTopicSubscribeTopicSubscribeTopic(string)
|
|
|
+ subscribeTopic(string)
|
|
|
+}
|
|
|
+
|
|
|
+// Mqtt defines mqtt broker client and list of suscribed topics
|
|
|
+type MqttInst struct {
|
|
|
+ options *mqtt_cli.ClientOptions
|
|
|
+ client mqtt_cli.Client
|
|
|
+ topics list.List
|
|
|
+ broker string
|
|
|
+ port int
|
|
|
}
|
|
|
|
|
|
var messagePubHandler mqtt_cli.MessageHandler = func(client mqtt_cli.Client, msg mqtt_cli.Message) {
|
|
|
@@ -20,16 +31,16 @@ var messagePubHandler mqtt_cli.MessageHandler = func(client mqtt_cli.Client, msg
|
|
|
devices.Builder(msg.Payload())
|
|
|
}
|
|
|
|
|
|
-var connectHandler mqtt_cli.OnConnectHandler = func(client mqtt_cli.Client) {
|
|
|
+var ConnectHandler mqtt_cli.OnConnectHandler = func(client mqtt_cli.Client) {
|
|
|
fmt.Println("Connected")
|
|
|
}
|
|
|
|
|
|
-var connectLostHandler mqtt_cli.ConnectionLostHandler = func(client mqtt_cli.Client, err error) {
|
|
|
+var ConnectLostHandler mqtt_cli.ConnectionLostHandler = func(client mqtt_cli.Client, err error) {
|
|
|
fmt.Printf("Connect lost: %v", err)
|
|
|
}
|
|
|
|
|
|
-func sub(client mqtt_cli.Client, topic string) {
|
|
|
- token := client.Subscribe(topic, 1, nil)
|
|
|
+func (mq MqttInst) subscribeTopic(topic string) {
|
|
|
+ token := mq.client.Subscribe(topic, 1, nil)
|
|
|
token.Wait()
|
|
|
fmt.Printf("Subscribed to topic: %s", topic)
|
|
|
receiveCount := 0
|
|
|
@@ -43,30 +54,59 @@ func sub(client mqtt_cli.Client, topic string) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func connect(broker string, port int) mqtt_cli.Client {
|
|
|
- opts := mqtt_cli.NewClientOptions()
|
|
|
- opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
|
|
|
- opts.SetClientID("go_mqtt_client")
|
|
|
- opts.SetUsername("emqx")
|
|
|
- opts.SetPassword("public")
|
|
|
- opts.SetDefaultPublishHandler(messagePubHandler)
|
|
|
- opts.OnConnect = connectHandler
|
|
|
- opts.OnConnectionLost = connectLostHandler
|
|
|
- client := mqtt_cli.NewClient(opts)
|
|
|
- if token := client.Connect(); token.Wait() && token.Error() != nil {
|
|
|
+func (mq *MqttInst) setDefaultOptions() {
|
|
|
+ mq.options.AddBroker(fmt.Sprintf("tcp://%s:%d", mq.broker, mq.port))
|
|
|
+ mq.options.SetClientID("go_mqtt_client")
|
|
|
+ mq.options.SetUsername("emqx")
|
|
|
+ mq.options.SetPassword("public")
|
|
|
+ mq.options.SetDefaultPublishHandler(messagePubHandler)
|
|
|
+ mq.options.OnConnect = ConnectHandler
|
|
|
+ mq.options.OnConnectionLost = ConnectLostHandler
|
|
|
+}
|
|
|
+
|
|
|
+func (mq *MqttInst) SetDefaultPublishHandler(messagePubHandler mqtt_cli.MessageHandler) {
|
|
|
+ mq.options.SetDefaultPublishHandler(messagePubHandler)
|
|
|
+ mq.options.AddBroker(fmt.Sprintf("tcp://%s:%d", mq.broker, mq.port))
|
|
|
+ mq.options.SetClientID("go_mqtt_client")
|
|
|
+ mq.options.SetUsername("emqx")
|
|
|
+ mq.options.SetPassword("public")
|
|
|
+ mq.options.OnConnect = ConnectHandler
|
|
|
+ mq.options.OnConnectionLost = ConnectLostHandler
|
|
|
+}
|
|
|
+
|
|
|
+func (mq *MqttInst) SetOnConnectHandler(ConnectHandler mqtt_cli.OnConnectHandler) {
|
|
|
+ mq.options.OnConnect = ConnectHandler
|
|
|
+}
|
|
|
+
|
|
|
+func (mq *MqttInst) SetOnConnectionLostHandler(ConnectHandler mqtt_cli.ConnectionLostHandler) {
|
|
|
+ mq.options.OnConnectionLost = ConnectLostHandler
|
|
|
+}
|
|
|
+
|
|
|
+func (mq *MqttInst) Connect() {
|
|
|
+ fmt.Println("mqtt/Connect")
|
|
|
+ fmt.Println("mqtt/Connect " + mq.broker + " ")
|
|
|
+ fmt.Println(mq.port)
|
|
|
+
|
|
|
+ mq.client = mqtt_cli.NewClient(mq.options)
|
|
|
+ if token := mq.client.Connect(); token.Wait() && token.Error() != nil {
|
|
|
fmt.Println("Error")
|
|
|
panic(token.Error())
|
|
|
}
|
|
|
- return client
|
|
|
}
|
|
|
|
|
|
-func New(broker string, port int) mqtt {
|
|
|
- m := mqtt{client: connect(broker, port)}
|
|
|
- return m
|
|
|
+// New creates a link to mqtt broker
|
|
|
+func (mq *MqttInst) New(broker string, port int) {
|
|
|
+ fmt.Println("mqtt/New")
|
|
|
+ mq.options = mqtt_cli.NewClientOptions()
|
|
|
+ //mq.setDefaultOptions()
|
|
|
+ mq.broker = broker
|
|
|
+ mq.port = port
|
|
|
+ //mq.Connect()
|
|
|
}
|
|
|
|
|
|
-func (m mqtt) SubTopic(topic string) {
|
|
|
+// SubTopic subscribe to a topic
|
|
|
+func (mq *MqttInst) SubscribeTopic(topic string) {
|
|
|
fmt.Println("subtopic")
|
|
|
- m.topics.PushBack(topic)
|
|
|
- sub(m.client, topic)
|
|
|
+ mq.topics.PushBack(topic)
|
|
|
+ mq.subscribeTopic(topic)
|
|
|
}
|