| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- package mqtt
- import (
- "flag"
- "fmt"
- "container/list"
- devices "homectrl/internal/devices"
- mqtt_cli "github.com/eclipse/paho.mqtt.golang"
- )
- type Mqtt interface {
- New(string, int) Mqtt
- connect(string, int) mqtt_cli.Client
- SubscribeTopicSubscribeTopicSubscribeTopic(string)
- subscribeTopic(string)
- }
- // Mqtt defines mqtt broker client and list of suscribed topics
- type MqttInst struct {
- options *mqtt_cli.ClientOptions
- client mqtt_cli.Client
- topics list.List
- broker string
- port int
- }
- var messagePubHandler mqtt_cli.MessageHandler = func(client mqtt_cli.Client, msg mqtt_cli.Message) {
- fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
- devices.Builder(msg.Payload())
- }
- var ConnectHandler mqtt_cli.OnConnectHandler = func(client mqtt_cli.Client) {
- fmt.Println("Connected")
- }
- var ConnectLostHandler mqtt_cli.ConnectionLostHandler = func(client mqtt_cli.Client, err error) {
- fmt.Printf("Connect lost: %v", err)
- }
- func (mq MqttInst) subscribeTopic(topic string) {
- token := mq.client.Subscribe(topic, 1, nil)
- token.Wait()
- fmt.Printf("Subscribed to topic: %s", topic)
- receiveCount := 0
- choke := make(chan [2]string)
- num := flag.Int("num", 1, "The number of messages to publish or subscribe (default 1)")
- for receiveCount < *num {
- incoming := <-choke
- fmt.Printf("RECEIVED TOPIC: %s MESSAGE: %s\n", incoming[0], incoming[1])
- //devices.Builder2(incoming[1])
- receiveCount++
- }
- }
- func (mq *MqttInst) setDefaultOptions() {
- mq.options.AddBroker(fmt.Sprintf("tcp://%s:%d", mq.broker, mq.port))
- mq.options.SetClientID("go_mqtt_client")
- mq.options.SetUsername("emqx")
- mq.options.SetPassword("public")
- mq.options.SetDefaultPublishHandler(messagePubHandler)
- mq.options.OnConnect = ConnectHandler
- mq.options.OnConnectionLost = ConnectLostHandler
- }
- func (mq *MqttInst) SetDefaultPublishHandler(messagePubHandler mqtt_cli.MessageHandler) {
- mq.options.SetDefaultPublishHandler(messagePubHandler)
- mq.options.AddBroker(fmt.Sprintf("tcp://%s:%d", mq.broker, mq.port))
- mq.options.SetClientID("go_mqtt_client")
- mq.options.SetUsername("emqx")
- mq.options.SetPassword("public")
- mq.options.OnConnect = ConnectHandler
- mq.options.OnConnectionLost = ConnectLostHandler
- }
- func (mq *MqttInst) SetOnConnectHandler(ConnectHandler mqtt_cli.OnConnectHandler) {
- mq.options.OnConnect = ConnectHandler
- }
- func (mq *MqttInst) SetOnConnectionLostHandler(ConnectHandler mqtt_cli.ConnectionLostHandler) {
- mq.options.OnConnectionLost = ConnectLostHandler
- }
- func (mq *MqttInst) Connect() {
- fmt.Println("mqtt/Connect")
- fmt.Println("mqtt/Connect " + mq.broker + " ")
- fmt.Println(mq.port)
- mq.client = mqtt_cli.NewClient(mq.options)
- if token := mq.client.Connect(); token.Wait() && token.Error() != nil {
- fmt.Println("Error")
- panic(token.Error())
- }
- }
- // New creates a link to mqtt broker
- func (mq *MqttInst) New(broker string, port int) {
- fmt.Println("mqtt/New")
- mq.options = mqtt_cli.NewClientOptions()
- //mq.setDefaultOptions()
- mq.broker = broker
- mq.port = port
- //mq.Connect()
- }
- // SubTopic subscribe to a topic
- func (mq *MqttInst) SubscribeTopic(topic string) {
- fmt.Println("subtopic")
- mq.topics.PushBack(topic)
- mq.subscribeTopic(topic)
- }
|