package dbHelper import ( "crypto/tls" "crypto/x509" "fmt" mqtt "github.com/eclipse/paho.mqtt.golang" "log" "os" "time" ) type MqttHelper struct { Host string Port int ClientId string UserName string Password string client mqtt.Client subscribeCalls []subscribeCall //纪录 重连订阅 } type subscribeCall struct { topic string f func(topic string, callMsg string) } func (the *MqttHelper) reConn2Subscribe(client mqtt.Client) { log.Println("mqtt触发重连后的重订阅") for _, call := range the.subscribeCalls { the.Subscribe(call.topic, call.f) } } func (the *MqttHelper) initialClient(sslEnable bool, caPath string) { maxReConnCount := 3 ReConnDurationSec := 30 reConn: mqttConnectStr := fmt.Sprintf("tcp://%v:%d", the.Host, the.Port) opts := mqtt.NewClientOptions().AddBroker(mqttConnectStr) opts.SetUsername(the.UserName) opts.SetPassword(the.Password) opts.SetClientID(the.ClientId) opts.SetOnConnectHandler(the.reConn2Subscribe) if sslEnable { opts.SetTLSConfig(NewTlsConfig(caPath)) } the.client = mqtt.NewClient(opts) if token := the.client.Connect(); token.Wait() && token.Error() != nil { log.Printf("mqtt连接状态异常 %v(u:%v,p:%v,cid:%v) [err=%s]", mqttConnectStr, the.UserName, the.Password, the.ClientId, token.Error()) log.Printf("mqtt重连,%ds后尝试,剩余次数=%d", ReConnDurationSec, maxReConnCount) if maxReConnCount > 0 { maxReConnCount-- time.Sleep(time.Duration(ReConnDurationSec) * time.Second) goto reConn } os.Exit(1) } time.Sleep(time.Second * 1) } func (the *MqttHelper) Initial() { the.initialClient(false, "") } func (the *MqttHelper) InitialWithSSL(caPath string) { the.initialClient(true, caPath) } func NewTlsConfig(sslPath string) *tls.Config { //"ssl/centerCA.crt" certpool := x509.NewCertPool() ca, err := os.ReadFile(sslPath) if err != nil { log.Fatalln(err.Error()) } certpool.AppendCertsFromPEM(ca) return &tls.Config{ //RootCAs: certpool, InsecureSkipVerify: true, } } func (the *MqttHelper) Publish(topic string, messageBytes []byte) { if the.client != nil { token := the.client.Publish(topic, 0, false, messageBytes) token.Wait() //the.client.Disconnect(200) fmt.Printf("[%s] -[%v]推送Msg 长度=%d \n", topic, the.client.IsConnected(), len(messageBytes)) } } func (the *MqttHelper) Subscribe(topic string, myCallback func(topic string, callMsg string)) { var callback mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { log.Printf("收到数据 [%v] TOPIC: %s,MSGLen: %v", msg.MessageID(), msg.Topic(), len(msg.Payload())) Msg := string(msg.Payload()) myCallback(msg.Topic(), Msg) //log.Println("消息处理结束") } log.Printf("=================开始订阅 %s [%s]=================", the.Host, topic) t := the.client.Subscribe(topic, 1, callback) f := func() { _ = t.Wait() // Can also use '<-t.Done()' in releases > 1.2.0 if t.Error() != nil { log.Println(t.Error()) // Use your preferred logging technique (or just fmt.Printf) } } go f() //纪录需要重连的 callback the.subscribeCalls = append(the.subscribeCalls, subscribeCall{ topic: topic, f: myCallback, }) } func MqttInitial(host string, port int, clientId string, userName string, password string, isSSL bool, caPtah ...string) *MqttHelper { mqHelpers := MqttHelper{ Host: host, Port: port, ClientId: clientId, UserName: userName, Password: password, } if isSSL && len(caPtah) > 0 { log.Println("SSL mqHelpers初始化") mqHelpers.InitialWithSSL(caPtah[0]) } else { mqHelpers.Initial() } //topic = "t/500103" //龙河桥 time.Sleep(time.Second * 1) return &mqHelpers }