| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- package mqtt
- import (
- "flag"
- "fmt"
- "container/list"
- devices "homectrl/internal/devices"
- mqtt_cli "github.com/eclipse/paho.mqtt.golang"
- )
- type mqtt struct {
- client mqtt_cli.Client
- topics list.List
- }
- var messagePubHandler mqtt_cli.MessageHandler = func(client mqtt_cli.Client, msg mqtt_cli.Message) {
- fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
- devices.Builder(msg.Payload())
- }
- var connectHandler mqtt_cli.OnConnectHandler = func(client mqtt_cli.Client) {
- fmt.Println("Connected")
- }
- var connectLostHandler mqtt_cli.ConnectionLostHandler = func(client mqtt_cli.Client, err error) {
- fmt.Printf("Connect lost: %v", err)
- }
- func sub(client mqtt_cli.Client, topic string) {
- token := client.Subscribe(topic, 1, nil)
- token.Wait()
- fmt.Printf("Subscribed to topic: %s", topic)
- receiveCount := 0
- choke := make(chan [2]string)
- num := flag.Int("num", 1, "The number of messages to publish or subscribe (default 1)")
- for receiveCount < *num {
- incoming := <-choke
- fmt.Printf("RECEIVED TOPIC: %s MESSAGE: %s\n", incoming[0], incoming[1])
- //devices.Builder2(incoming[1])
- receiveCount++
- }
- }
- func connect(broker string, port int) mqtt_cli.Client {
- opts := mqtt_cli.NewClientOptions()
- opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
- opts.SetClientID("go_mqtt_client")
- opts.SetUsername("emqx")
- opts.SetPassword("public")
- opts.SetDefaultPublishHandler(messagePubHandler)
- opts.OnConnect = connectHandler
- opts.OnConnectionLost = connectLostHandler
- client := mqtt_cli.NewClient(opts)
- if token := client.Connect(); token.Wait() && token.Error() != nil {
- fmt.Println("Error")
- panic(token.Error())
- }
- return client
- }
- func New(broker string, port int) mqtt {
- m := mqtt{client: connect(broker, port)}
- return m
- }
- func (m mqtt) SubTopic(topic string) {
- fmt.Println("subtopic")
- m.topics.PushBack(topic)
- sub(m.client, topic)
- }
|