package dbOperate import ( "crypto/tls" "crypto/x509" "fmt" mqtt "github.com/eclipse/paho.mqtt.golang" "log" "os" "time" ) type MqttHelper struct { Host string Port int ClientId string UserName string Password string client mqtt.Client subscribeCalls []subscribeCall //纪录 重连订阅 } type subscribeCall struct { topic string f func(topic string, callMsg string) } func (the *MqttHelper) reConn2Subscribe(client mqtt.Client) { log.Println("mqtt触发链接后的自动订阅") for _, call := range the.subscribeCalls { the.Subscribe(call.topic, call.f) } } func (the *MqttHelper) initialClient(sslEnable bool, caCertPath, clientCertPath, clientKeyPath string) { maxReConnCount := 3 ReConnDurationSec := 30 reConn: tag := "tcp" if sslEnable { tag = "ssl" } mqttConnectStr := fmt.Sprintf("%s://%v:%d", tag, the.Host, the.Port) opts := mqtt.NewClientOptions().AddBroker(mqttConnectStr) opts.SetUsername(the.UserName) opts.SetPassword(the.Password) opts.SetClientID(the.ClientId) opts.SetOnConnectHandler(the.reConn2Subscribe) if sslEnable { opts.SetTLSConfig(NewTlsConfig(caCertPath, clientCertPath, clientKeyPath)) } the.client = mqtt.NewClient(opts) if token := the.client.Connect(); token.Wait() && token.Error() != nil { log.Printf("mqtt连接状态异常 %v [ u:%v,p:%v,cid:%v ] [err=%s]", mqttConnectStr, the.UserName, the.Password, the.ClientId, token.Error()) log.Printf("mqtt重连,%ds后尝试,剩余次数=%d", ReConnDurationSec, maxReConnCount) if maxReConnCount > 0 { maxReConnCount-- time.Sleep(time.Duration(ReConnDurationSec) * time.Second) goto reConn } os.Exit(1) } time.Sleep(time.Second * 1) } func (the *MqttHelper) Initial() { the.initialClient(false, "", "", "") } func (the *MqttHelper) InitialWithSSL(caCertPath, clientCertPath, clientKeyPath string) { the.initialClient(true, caCertPath, clientCertPath, clientKeyPath) } func NewTlsConfig(caCertPath, clientCertPath, clientKeyPath string) *tls.Config { //"ssl/centerCA.crt" certPool := x509.NewCertPool() ca, err := os.ReadFile(caCertPath) if err != nil { log.Fatalln(err.Error()) } certPool.AppendCertsFromPEM(ca) tlsConfig := &tls.Config{ RootCAs: certPool, InsecureSkipVerify: true, } if len(clientCertPath) > 0 && len(clientKeyPath) > 0 { // 读取客户端证书和密钥 clientCert, err := tls.LoadX509KeyPair(clientCertPath, clientKeyPath) if err != nil { fmt.Printf("Error loading client certificate and key: %v\n", err) } tlsConfig.Certificates = append(tlsConfig.Certificates, clientCert) } return tlsConfig } func (the *MqttHelper) Publish(topic string, messageBytes []byte) { if the.client != nil { token := the.client.Publish(topic, 1, false, messageBytes) token.Wait() //the.client.Disconnect(200) log.Printf("[%s] -[%v]推送Msg 长度=%d byte \n", topic, the.client.IsConnected(), len(messageBytes)) } } func (the *MqttHelper) Subscribe(topic string, myCallback func(topic string, callMsg string)) { var callback mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { //log.Printf("收到数据 [%v] TOPIC: %s,MSGLen: %v", msg.MessageID(), msg.Topic(), len(msg.Payload())) Msg := string(msg.Payload()) myCallback(msg.Topic(), Msg) } log.Printf("=================开始订阅 %s [%s]=================", the.Host, topic) t := the.client.Subscribe(topic, 1, callback) f := func() { _ = t.Wait() // Can also use '<-t.Done()' in releases > 1.2.0 if t.Error() != nil { log.Println(t.Error()) // Use your preferred logging technique (or just fmt.Printf) } } go f() //纪录需要重连的 callback the.subscribeCalls = append(the.subscribeCalls, subscribeCall{ topic: topic, f: myCallback, }) } func MqttInitial(host string, port int, clientId string, userName string, password string, isSSL bool, caPtah ...string) *MqttHelper { mqHelpers := MqttHelper{ Host: host, Port: port, ClientId: clientId, UserName: userName, Password: password, } if isSSL && len(caPtah) > 0 { log.Println("SSL mqHelpers初始化") switch len(caPtah) { case 1: mqHelpers.InitialWithSSL(caPtah[0], "", "") case 3: mqHelpers.InitialWithSSL(caPtah[0], caPtah[1], caPtah[2]) default: log.Printf("caPtah 参数量错误,请注意") } } else { mqHelpers.Initial() } //topic = "t/500103" //龙河桥 time.Sleep(time.Second * 1) return &mqHelpers }