|
@@ -0,0 +1,70 @@
|
|
|
|
|
+package mqtt
|
|
|
|
|
+
|
|
|
|
|
+import (
|
|
|
|
|
+ "flag"
|
|
|
|
|
+ "fmt"
|
|
|
|
|
+
|
|
|
|
|
+ "container/list"
|
|
|
|
|
+ devices "homectrl/internal/devices"
|
|
|
|
|
+
|
|
|
|
|
+ mqtt_cli "github.com/eclipse/paho.mqtt.golang"
|
|
|
|
|
+)
|
|
|
|
|
+
|
|
|
|
|
+type mqtt struct {
|
|
|
|
|
+ client mqtt_cli.Client
|
|
|
|
|
+ topics list.List
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+var messagePubHandler mqtt_cli.MessageHandler = func(client mqtt_cli.Client, msg mqtt_cli.Message) {
|
|
|
|
|
+ //fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
|
|
|
|
|
+ devices.Builder(msg.Payload())
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+var connectHandler mqtt_cli.OnConnectHandler = func(client mqtt_cli.Client) {
|
|
|
|
|
+ fmt.Println("Connected")
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+var connectLostHandler mqtt_cli.ConnectionLostHandler = func(client mqtt_cli.Client, err error) {
|
|
|
|
|
+ fmt.Printf("Connect lost: %v", err)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func sub(client mqtt_cli.Client, topic string) {
|
|
|
|
|
+ token := client.Subscribe(topic, 1, nil)
|
|
|
|
|
+ token.Wait()
|
|
|
|
|
+ fmt.Printf("Subscribed to topic: %s", topic)
|
|
|
|
|
+ receiveCount := 0
|
|
|
|
|
+ choke := make(chan [2]string)
|
|
|
|
|
+ num := flag.Int("num", 1, "The number of messages to publish or subscribe (default 1)")
|
|
|
|
|
+ for receiveCount < *num {
|
|
|
|
|
+ incoming := <-choke
|
|
|
|
|
+ fmt.Printf("RECEIVED TOPIC: %s MESSAGE: %s\n", incoming[0], incoming[1])
|
|
|
|
|
+ //devices.Builder2(incoming[1])
|
|
|
|
|
+ receiveCount++
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func connect(broker string, port int) mqtt_cli.Client {
|
|
|
|
|
+ opts := mqtt_cli.NewClientOptions()
|
|
|
|
|
+ opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
|
|
|
|
|
+ opts.SetClientID("go_mqtt_client")
|
|
|
|
|
+ opts.SetUsername("emqx")
|
|
|
|
|
+ opts.SetPassword("public")
|
|
|
|
|
+ opts.SetDefaultPublishHandler(messagePubHandler)
|
|
|
|
|
+ opts.OnConnect = connectHandler
|
|
|
|
|
+ opts.OnConnectionLost = connectLostHandler
|
|
|
|
|
+ client := mqtt_cli.NewClient(opts)
|
|
|
|
|
+ if token := client.Connect(); token.Wait() && token.Error() != nil {
|
|
|
|
|
+ panic(token.Error())
|
|
|
|
|
+ }
|
|
|
|
|
+ return client
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func New(broker string, port int) mqtt {
|
|
|
|
|
+ m := mqtt{client: connect(broker, port)}
|
|
|
|
|
+ return m
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (m mqtt) SubTopic(topic string) {
|
|
|
|
|
+ m.topics.PushBack(topic)
|
|
|
|
|
+ sub(m.client, topic)
|
|
|
|
|
+}
|