mqtt.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package mqtt
  2. import (
  3. "flag"
  4. "fmt"
  5. "container/list"
  6. devices "homectrl/internal/devices"
  7. mqtt_cli "github.com/eclipse/paho.mqtt.golang"
  8. )
  9. type mqtt struct {
  10. client mqtt_cli.Client
  11. topics list.List
  12. }
  13. var messagePubHandler mqtt_cli.MessageHandler = func(client mqtt_cli.Client, msg mqtt_cli.Message) {
  14. fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
  15. devices.Builder(msg.Payload())
  16. }
  17. var connectHandler mqtt_cli.OnConnectHandler = func(client mqtt_cli.Client) {
  18. fmt.Println("Connected")
  19. }
  20. var connectLostHandler mqtt_cli.ConnectionLostHandler = func(client mqtt_cli.Client, err error) {
  21. fmt.Printf("Connect lost: %v", err)
  22. }
  23. func sub(client mqtt_cli.Client, topic string) {
  24. token := client.Subscribe(topic, 1, nil)
  25. token.Wait()
  26. fmt.Printf("Subscribed to topic: %s", topic)
  27. receiveCount := 0
  28. choke := make(chan [2]string)
  29. num := flag.Int("num", 1, "The number of messages to publish or subscribe (default 1)")
  30. for receiveCount < *num {
  31. incoming := <-choke
  32. fmt.Printf("RECEIVED TOPIC: %s MESSAGE: %s\n", incoming[0], incoming[1])
  33. //devices.Builder2(incoming[1])
  34. receiveCount++
  35. }
  36. }
  37. func connect(broker string, port int) mqtt_cli.Client {
  38. opts := mqtt_cli.NewClientOptions()
  39. opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
  40. opts.SetClientID("go_mqtt_client")
  41. opts.SetUsername("emqx")
  42. opts.SetPassword("public")
  43. opts.SetDefaultPublishHandler(messagePubHandler)
  44. opts.OnConnect = connectHandler
  45. opts.OnConnectionLost = connectLostHandler
  46. client := mqtt_cli.NewClient(opts)
  47. if token := client.Connect(); token.Wait() && token.Error() != nil {
  48. fmt.Println("Error")
  49. panic(token.Error())
  50. }
  51. return client
  52. }
  53. func New(broker string, port int) mqtt {
  54. m := mqtt{client: connect(broker, port)}
  55. return m
  56. }
  57. func (m mqtt) SubTopic(topic string) {
  58. fmt.Println("subtopic")
  59. m.topics.PushBack(topic)
  60. sub(m.client, topic)
  61. }