package mqtt import ( "flag" "fmt" "container/list" devices "homectrl/internal/devices" mqtt_cli "github.com/eclipse/paho.mqtt.golang" ) type mqtt struct { client mqtt_cli.Client topics list.List } var messagePubHandler mqtt_cli.MessageHandler = func(client mqtt_cli.Client, msg mqtt_cli.Message) { //fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic()) devices.Builder(msg.Payload()) } var connectHandler mqtt_cli.OnConnectHandler = func(client mqtt_cli.Client) { fmt.Println("Connected") } var connectLostHandler mqtt_cli.ConnectionLostHandler = func(client mqtt_cli.Client, err error) { fmt.Printf("Connect lost: %v", err) } func sub(client mqtt_cli.Client, topic string) { token := client.Subscribe(topic, 1, nil) token.Wait() fmt.Printf("Subscribed to topic: %s", topic) receiveCount := 0 choke := make(chan [2]string) num := flag.Int("num", 1, "The number of messages to publish or subscribe (default 1)") for receiveCount < *num { incoming := <-choke fmt.Printf("RECEIVED TOPIC: %s MESSAGE: %s\n", incoming[0], incoming[1]) //devices.Builder2(incoming[1]) receiveCount++ } } func connect(broker string, port int) mqtt_cli.Client { opts := mqtt_cli.NewClientOptions() opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port)) opts.SetClientID("go_mqtt_client") opts.SetUsername("emqx") opts.SetPassword("public") opts.SetDefaultPublishHandler(messagePubHandler) opts.OnConnect = connectHandler opts.OnConnectionLost = connectLostHandler client := mqtt_cli.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } return client } func New(broker string, port int) mqtt { m := mqtt{client: connect(broker, port)} return m } func (m mqtt) SubTopic(topic string) { m.topics.PushBack(topic) sub(m.client, topic) }