mqtt.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package mqtt
  2. import (
  3. "flag"
  4. "fmt"
  5. "container/list"
  6. devices "homectrl/internal/devices"
  7. mqtt_cli "github.com/eclipse/paho.mqtt.golang"
  8. )
  9. type Mqtt interface {
  10. New(string, int) Mqtt
  11. connect(string, int) mqtt_cli.Client
  12. SubscribeTopicSubscribeTopicSubscribeTopic(string)
  13. subscribeTopic(string)
  14. }
  15. // Mqtt defines mqtt broker client and list of suscribed topics
  16. type MqttInst struct {
  17. options *mqtt_cli.ClientOptions
  18. client mqtt_cli.Client
  19. topics list.List
  20. broker string
  21. port int
  22. }
  23. var messagePubHandler mqtt_cli.MessageHandler = func(client mqtt_cli.Client, msg mqtt_cli.Message) {
  24. fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
  25. devices.Builder(msg.Payload())
  26. }
  27. var ConnectHandler mqtt_cli.OnConnectHandler = func(client mqtt_cli.Client) {
  28. fmt.Println("Connected")
  29. }
  30. var ConnectLostHandler mqtt_cli.ConnectionLostHandler = func(client mqtt_cli.Client, err error) {
  31. fmt.Printf("Connect lost: %v", err)
  32. }
  33. func (mq MqttInst) subscribeTopic(topic string) {
  34. token := mq.client.Subscribe(topic, 1, nil)
  35. token.Wait()
  36. fmt.Printf("Subscribed to topic: %s", topic)
  37. receiveCount := 0
  38. choke := make(chan [2]string)
  39. num := flag.Int("num", 1, "The number of messages to publish or subscribe (default 1)")
  40. for receiveCount < *num {
  41. incoming := <-choke
  42. fmt.Printf("RECEIVED TOPIC: %s MESSAGE: %s\n", incoming[0], incoming[1])
  43. //devices.Builder2(incoming[1])
  44. receiveCount++
  45. }
  46. }
  47. func (mq *MqttInst) setDefaultOptions() {
  48. mq.options.AddBroker(fmt.Sprintf("tcp://%s:%d", mq.broker, mq.port))
  49. mq.options.SetClientID("go_mqtt_client")
  50. mq.options.SetUsername("emqx")
  51. mq.options.SetPassword("public")
  52. mq.options.SetDefaultPublishHandler(messagePubHandler)
  53. mq.options.OnConnect = ConnectHandler
  54. mq.options.OnConnectionLost = ConnectLostHandler
  55. }
  56. func (mq *MqttInst) SetDefaultPublishHandler(messagePubHandler mqtt_cli.MessageHandler) {
  57. mq.options.SetDefaultPublishHandler(messagePubHandler)
  58. mq.options.AddBroker(fmt.Sprintf("tcp://%s:%d", mq.broker, mq.port))
  59. mq.options.SetClientID("go_mqtt_client")
  60. mq.options.SetUsername("emqx")
  61. mq.options.SetPassword("public")
  62. mq.options.OnConnect = ConnectHandler
  63. mq.options.OnConnectionLost = ConnectLostHandler
  64. }
  65. func (mq *MqttInst) SetOnConnectHandler(ConnectHandler mqtt_cli.OnConnectHandler) {
  66. mq.options.OnConnect = ConnectHandler
  67. }
  68. func (mq *MqttInst) SetOnConnectionLostHandler(ConnectHandler mqtt_cli.ConnectionLostHandler) {
  69. mq.options.OnConnectionLost = ConnectLostHandler
  70. }
  71. func (mq *MqttInst) Connect() {
  72. fmt.Println("mqtt/Connect")
  73. fmt.Println("mqtt/Connect " + mq.broker + " ")
  74. fmt.Println(mq.port)
  75. mq.client = mqtt_cli.NewClient(mq.options)
  76. if token := mq.client.Connect(); token.Wait() && token.Error() != nil {
  77. fmt.Println("Error")
  78. panic(token.Error())
  79. }
  80. }
  81. // New creates a link to mqtt broker
  82. func (mq *MqttInst) New(broker string, port int) {
  83. fmt.Println("mqtt/New")
  84. mq.options = mqtt_cli.NewClientOptions()
  85. //mq.setDefaultOptions()
  86. mq.broker = broker
  87. mq.port = port
  88. //mq.Connect()
  89. }
  90. // SubTopic subscribe to a topic
  91. func (mq *MqttInst) SubscribeTopic(topic string) {
  92. fmt.Println("subtopic")
  93. mq.topics.PushBack(topic)
  94. mq.subscribeTopic(topic)
  95. }