mqtt.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package mqtt
  2. import (
  3. "fmt"
  4. "container/list"
  5. devices "homectrl/internal/devices"
  6. mqtt_cli "github.com/eclipse/paho.mqtt.golang"
  7. )
  8. type Mqtt interface {
  9. New(string, int) Mqtt
  10. connect(string, int) mqtt_cli.Client
  11. SubscribeTopicSubscribeTopicSubscribeTopic(string)
  12. subscribeTopic(string)
  13. }
  14. // Mqtt defines mqtt broker client and list of suscribed topics
  15. type MqttInst struct {
  16. options *mqtt_cli.ClientOptions
  17. client mqtt_cli.Client
  18. topics list.List
  19. broker string
  20. port int
  21. }
  22. var messagePubHandler mqtt_cli.MessageHandler = func(client mqtt_cli.Client, msg mqtt_cli.Message) {
  23. fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
  24. devices.Builder(msg.Payload())
  25. }
  26. var ConnectHandler mqtt_cli.OnConnectHandler = func(client mqtt_cli.Client) {
  27. fmt.Println("Connected")
  28. }
  29. var ConnectLostHandler mqtt_cli.ConnectionLostHandler = func(client mqtt_cli.Client, err error) {
  30. fmt.Printf("Connect lost: %v", err)
  31. }
  32. func (mq MqttInst) subscribeTopic(topic string, callback mqtt_cli.MessageHandler) {
  33. token := mq.client.Subscribe(topic, 1, callback)
  34. token.Wait()
  35. fmt.Printf("Subscribed to topic: %s\n", topic)
  36. receiveCount := 0
  37. choke := make(chan [2]string)
  38. var num = 1
  39. for receiveCount < num {
  40. receiveCount++
  41. incoming := <-choke
  42. fmt.Printf("RECEIVED TOPIC: %s MESSAGE: %s\n", incoming[0], incoming[1])
  43. //devices.Builder2(incoming[1])
  44. receiveCount++
  45. }
  46. }
  47. func (mq *MqttInst) SetDefaultOptions() {
  48. mq.options.SetDefaultPublishHandler(messagePubHandler)
  49. mq.options.AddBroker(fmt.Sprintf("mqtt://%s:%d", mq.broker, mq.port))
  50. mq.options.SetClientID("go_mqtt_client")
  51. /* mq.options.SetUsername("emqx")
  52. mq.options.SetPassword("public") */
  53. mq.options.SetDefaultPublishHandler(messagePubHandler)
  54. mq.options.OnConnect = ConnectHandler
  55. mq.options.OnConnectionLost = ConnectLostHandler
  56. }
  57. func (mq *MqttInst) SetDefaultPublishHandler(messagePubHandler mqtt_cli.MessageHandler) {
  58. mq.options.SetDefaultPublishHandler(messagePubHandler)
  59. mq.options.AddBroker(fmt.Sprintf("mqtt://%s:%d", mq.broker, mq.port))
  60. mq.options.SetClientID("HomeCtrl")
  61. /* mq.options.SetUsername("emqx")
  62. mq.options.SetPassword("public") */
  63. mq.options.OnConnect = ConnectHandler
  64. mq.options.OnConnectionLost = ConnectLostHandler
  65. }
  66. func (mq *MqttInst) SetOnConnectHandler(ConnectHandler mqtt_cli.OnConnectHandler) {
  67. mq.options.OnConnect = ConnectHandler
  68. }
  69. func (mq *MqttInst) SetOnConnectionLostHandler(ConnectHandler mqtt_cli.ConnectionLostHandler) {
  70. mq.options.OnConnectionLost = ConnectLostHandler
  71. }
  72. func (mq *MqttInst) Connect() {
  73. fmt.Println("mqtt/Connect")
  74. fmt.Println("mqtt/Connect " + mq.broker + " ")
  75. fmt.Println(mq.port)
  76. mq.client = mqtt_cli.NewClient(mq.options)
  77. if token := mq.client.Connect(); token.Wait() && token.Error() != nil {
  78. fmt.Println("Connection error")
  79. panic(token.Error())
  80. }
  81. }
  82. // New creates a link to mqtt broker
  83. func (mq *MqttInst) New(broker string, port int) {
  84. fmt.Println("mqtt/New")
  85. mq.options = mqtt_cli.NewClientOptions()
  86. mq.broker = broker
  87. mq.port = port
  88. mq.SetDefaultOptions()
  89. //mq.Connect()
  90. }
  91. // SubTopic subscribe to a topic
  92. func (mq *MqttInst) SubscribeTopic(topic string, callback mqtt_cli.MessageHandler) {
  93. fmt.Println("subtopic")
  94. mq.topics.PushBack(topic)
  95. mq.subscribeTopic(topic, callback)
  96. }