package mqtt import ( "flag" "fmt" "container/list" devices "homectrl/internal/devices" mqtt_cli "github.com/eclipse/paho.mqtt.golang" ) type Mqtt interface { New(string, int) Mqtt connect(string, int) mqtt_cli.Client SubscribeTopicSubscribeTopicSubscribeTopic(string) subscribeTopic(string) } // Mqtt defines mqtt broker client and list of suscribed topics type MqttInst struct { options *mqtt_cli.ClientOptions client mqtt_cli.Client topics list.List broker string port int } var messagePubHandler mqtt_cli.MessageHandler = func(client mqtt_cli.Client, msg mqtt_cli.Message) { fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic()) devices.Builder(msg.Payload()) } var ConnectHandler mqtt_cli.OnConnectHandler = func(client mqtt_cli.Client) { fmt.Println("Connected") } var ConnectLostHandler mqtt_cli.ConnectionLostHandler = func(client mqtt_cli.Client, err error) { fmt.Printf("Connect lost: %v", err) } func (mq MqttInst) subscribeTopic(topic string) { token := mq.client.Subscribe(topic, 1, nil) token.Wait() fmt.Printf("Subscribed to topic: %s", topic) receiveCount := 0 choke := make(chan [2]string) num := flag.Int("num", 1, "The number of messages to publish or subscribe (default 1)") for receiveCount < *num { incoming := <-choke fmt.Printf("RECEIVED TOPIC: %s MESSAGE: %s\n", incoming[0], incoming[1]) //devices.Builder2(incoming[1]) receiveCount++ } } func (mq *MqttInst) setDefaultOptions() { mq.options.AddBroker(fmt.Sprintf("tcp://%s:%d", mq.broker, mq.port)) mq.options.SetClientID("go_mqtt_client") mq.options.SetUsername("emqx") mq.options.SetPassword("public") mq.options.SetDefaultPublishHandler(messagePubHandler) mq.options.OnConnect = ConnectHandler mq.options.OnConnectionLost = ConnectLostHandler } func (mq *MqttInst) SetDefaultPublishHandler(messagePubHandler mqtt_cli.MessageHandler) { mq.options.SetDefaultPublishHandler(messagePubHandler) mq.options.AddBroker(fmt.Sprintf("tcp://%s:%d", mq.broker, mq.port)) mq.options.SetClientID("go_mqtt_client") mq.options.SetUsername("emqx") mq.options.SetPassword("public") mq.options.OnConnect = ConnectHandler mq.options.OnConnectionLost = ConnectLostHandler } func (mq *MqttInst) SetOnConnectHandler(ConnectHandler mqtt_cli.OnConnectHandler) { mq.options.OnConnect = ConnectHandler } func (mq *MqttInst) SetOnConnectionLostHandler(ConnectHandler mqtt_cli.ConnectionLostHandler) { mq.options.OnConnectionLost = ConnectLostHandler } func (mq *MqttInst) Connect() { fmt.Println("mqtt/Connect") fmt.Println("mqtt/Connect " + mq.broker + " ") fmt.Println(mq.port) mq.client = mqtt_cli.NewClient(mq.options) if token := mq.client.Connect(); token.Wait() && token.Error() != nil { fmt.Println("Error") panic(token.Error()) } } // New creates a link to mqtt broker func (mq *MqttInst) New(broker string, port int) { fmt.Println("mqtt/New") mq.options = mqtt_cli.NewClientOptions() //mq.setDefaultOptions() mq.broker = broker mq.port = port //mq.Connect() } // SubTopic subscribe to a topic func (mq *MqttInst) SubscribeTopic(topic string) { fmt.Println("subtopic") mq.topics.PushBack(topic) mq.subscribeTopic(topic) }