|
@@ -1,7 +1,6 @@
|
|
|
package mqtt
|
|
package mqtt
|
|
|
|
|
|
|
|
import (
|
|
import (
|
|
|
- "flag"
|
|
|
|
|
"fmt"
|
|
"fmt"
|
|
|
|
|
|
|
|
"container/list"
|
|
"container/list"
|
|
@@ -39,14 +38,17 @@ var ConnectLostHandler mqtt_cli.ConnectionLostHandler = func(client mqtt_cli.Cli
|
|
|
fmt.Printf("Connect lost: %v", err)
|
|
fmt.Printf("Connect lost: %v", err)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (mq MqttInst) subscribeTopic(topic string) {
|
|
|
|
|
- token := mq.client.Subscribe(topic, 1, nil)
|
|
|
|
|
|
|
+func (mq MqttInst) subscribeTopic(topic string, callback mqtt_cli.MessageHandler) {
|
|
|
|
|
+ token := mq.client.Subscribe(topic, 1, callback)
|
|
|
token.Wait()
|
|
token.Wait()
|
|
|
- fmt.Printf("Subscribed to topic: %s", topic)
|
|
|
|
|
|
|
+ fmt.Printf("Subscribed to topic: %s\n", topic)
|
|
|
receiveCount := 0
|
|
receiveCount := 0
|
|
|
choke := make(chan [2]string)
|
|
choke := make(chan [2]string)
|
|
|
- num := flag.Int("num", 1, "The number of messages to publish or subscribe (default 1)")
|
|
|
|
|
- for receiveCount < *num {
|
|
|
|
|
|
|
+ var num = 1
|
|
|
|
|
+ for receiveCount < num {
|
|
|
|
|
+
|
|
|
|
|
+ receiveCount++
|
|
|
|
|
+
|
|
|
incoming := <-choke
|
|
incoming := <-choke
|
|
|
fmt.Printf("RECEIVED TOPIC: %s MESSAGE: %s\n", incoming[0], incoming[1])
|
|
fmt.Printf("RECEIVED TOPIC: %s MESSAGE: %s\n", incoming[0], incoming[1])
|
|
|
//devices.Builder2(incoming[1])
|
|
//devices.Builder2(incoming[1])
|
|
@@ -54,11 +56,12 @@ func (mq MqttInst) subscribeTopic(topic string) {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (mq *MqttInst) setDefaultOptions() {
|
|
|
|
|
- mq.options.AddBroker(fmt.Sprintf("tcp://%s:%d", mq.broker, mq.port))
|
|
|
|
|
|
|
+func (mq *MqttInst) SetDefaultOptions() {
|
|
|
|
|
+ mq.options.SetDefaultPublishHandler(messagePubHandler)
|
|
|
|
|
+ mq.options.AddBroker(fmt.Sprintf("mqtt://%s:%d", mq.broker, mq.port))
|
|
|
mq.options.SetClientID("go_mqtt_client")
|
|
mq.options.SetClientID("go_mqtt_client")
|
|
|
- mq.options.SetUsername("emqx")
|
|
|
|
|
- mq.options.SetPassword("public")
|
|
|
|
|
|
|
+ /* mq.options.SetUsername("emqx")
|
|
|
|
|
+ mq.options.SetPassword("public") */
|
|
|
mq.options.SetDefaultPublishHandler(messagePubHandler)
|
|
mq.options.SetDefaultPublishHandler(messagePubHandler)
|
|
|
mq.options.OnConnect = ConnectHandler
|
|
mq.options.OnConnect = ConnectHandler
|
|
|
mq.options.OnConnectionLost = ConnectLostHandler
|
|
mq.options.OnConnectionLost = ConnectLostHandler
|
|
@@ -66,10 +69,10 @@ func (mq *MqttInst) setDefaultOptions() {
|
|
|
|
|
|
|
|
func (mq *MqttInst) SetDefaultPublishHandler(messagePubHandler mqtt_cli.MessageHandler) {
|
|
func (mq *MqttInst) SetDefaultPublishHandler(messagePubHandler mqtt_cli.MessageHandler) {
|
|
|
mq.options.SetDefaultPublishHandler(messagePubHandler)
|
|
mq.options.SetDefaultPublishHandler(messagePubHandler)
|
|
|
- mq.options.AddBroker(fmt.Sprintf("tcp://%s:%d", mq.broker, mq.port))
|
|
|
|
|
- mq.options.SetClientID("go_mqtt_client")
|
|
|
|
|
- mq.options.SetUsername("emqx")
|
|
|
|
|
- mq.options.SetPassword("public")
|
|
|
|
|
|
|
+ mq.options.AddBroker(fmt.Sprintf("mqtt://%s:%d", mq.broker, mq.port))
|
|
|
|
|
+ mq.options.SetClientID("HomeCtrl")
|
|
|
|
|
+ /* mq.options.SetUsername("emqx")
|
|
|
|
|
+ mq.options.SetPassword("public") */
|
|
|
mq.options.OnConnect = ConnectHandler
|
|
mq.options.OnConnect = ConnectHandler
|
|
|
mq.options.OnConnectionLost = ConnectLostHandler
|
|
mq.options.OnConnectionLost = ConnectLostHandler
|
|
|
}
|
|
}
|
|
@@ -89,7 +92,7 @@ func (mq *MqttInst) Connect() {
|
|
|
|
|
|
|
|
mq.client = mqtt_cli.NewClient(mq.options)
|
|
mq.client = mqtt_cli.NewClient(mq.options)
|
|
|
if token := mq.client.Connect(); token.Wait() && token.Error() != nil {
|
|
if token := mq.client.Connect(); token.Wait() && token.Error() != nil {
|
|
|
- fmt.Println("Error")
|
|
|
|
|
|
|
+ fmt.Println("Connection error")
|
|
|
panic(token.Error())
|
|
panic(token.Error())
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -98,15 +101,16 @@ func (mq *MqttInst) Connect() {
|
|
|
func (mq *MqttInst) New(broker string, port int) {
|
|
func (mq *MqttInst) New(broker string, port int) {
|
|
|
fmt.Println("mqtt/New")
|
|
fmt.Println("mqtt/New")
|
|
|
mq.options = mqtt_cli.NewClientOptions()
|
|
mq.options = mqtt_cli.NewClientOptions()
|
|
|
- //mq.setDefaultOptions()
|
|
|
|
|
|
|
+
|
|
|
mq.broker = broker
|
|
mq.broker = broker
|
|
|
mq.port = port
|
|
mq.port = port
|
|
|
|
|
+ mq.SetDefaultOptions()
|
|
|
//mq.Connect()
|
|
//mq.Connect()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// SubTopic subscribe to a topic
|
|
// SubTopic subscribe to a topic
|
|
|
-func (mq *MqttInst) SubscribeTopic(topic string) {
|
|
|
|
|
|
|
+func (mq *MqttInst) SubscribeTopic(topic string, callback mqtt_cli.MessageHandler) {
|
|
|
fmt.Println("subtopic")
|
|
fmt.Println("subtopic")
|
|
|
mq.topics.PushBack(topic)
|
|
mq.topics.PushBack(topic)
|
|
|
- mq.subscribeTopic(topic)
|
|
|
|
|
|
|
+ mq.subscribeTopic(topic, callback)
|
|
|
}
|
|
}
|