数据 输入输出 处理
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

133 lines
3.6 KiB

package dbOperate
import (
"crypto/tls"
"crypto/x509"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"log"
"os"
"time"
)
type MqttHelper struct {
Host string
Port int
ClientId string
UserName string
Password string
client mqtt.Client
subscribeCalls []subscribeCall //纪录 重连订阅
}
type subscribeCall struct {
topic string
f func(topic string, callMsg string)
}
func (the *MqttHelper) reConn2Subscribe(client mqtt.Client) {
log.Println("mqtt触发链接后的自动订阅")
for _, call := range the.subscribeCalls {
the.Subscribe(call.topic, call.f)
}
}
func (the *MqttHelper) initialClient(sslEnable bool, caPath string) {
maxReConnCount := 3
ReConnDurationSec := 30
reConn:
mqttConnectStr := fmt.Sprintf("tcp://%v:%d", the.Host, the.Port)
opts := mqtt.NewClientOptions().AddBroker(mqttConnectStr)
opts.SetUsername(the.UserName)
opts.SetPassword(the.Password)
opts.SetClientID(the.ClientId)
opts.SetOnConnectHandler(the.reConn2Subscribe)
if sslEnable {
opts.SetTLSConfig(NewTlsConfig(caPath))
}
the.client = mqtt.NewClient(opts)
if token := the.client.Connect(); token.Wait() && token.Error() != nil {
log.Printf("mqtt连接状态异常 %v(u:%v,p:%v,cid:%v) [err=%s]", mqttConnectStr, the.UserName, the.Password, the.ClientId, token.Error())
log.Printf("mqtt重连,%ds后尝试,剩余次数=%d", ReConnDurationSec, maxReConnCount)
if maxReConnCount > 0 {
maxReConnCount--
time.Sleep(time.Duration(ReConnDurationSec) * time.Second)
goto reConn
}
os.Exit(1)
}
time.Sleep(time.Second * 1)
}
func (the *MqttHelper) Initial() {
the.initialClient(false, "")
}
func (the *MqttHelper) InitialWithSSL(caPath string) {
the.initialClient(true, caPath)
}
func NewTlsConfig(sslPath string) *tls.Config {
//"ssl/centerCA.crt"
certpool := x509.NewCertPool()
ca, err := os.ReadFile(sslPath)
if err != nil {
log.Fatalln(err.Error())
}
certpool.AppendCertsFromPEM(ca)
return &tls.Config{
//RootCAs: certpool,
InsecureSkipVerify: true,
}
}
func (the *MqttHelper) Publish(topic string, messageBytes []byte) {
if the.client != nil {
token := the.client.Publish(topic, 1, false, messageBytes)
token.Wait()
//the.client.Disconnect(200)
log.Printf("[%s] -[%v]推送Msg 长度=%d \n", topic, the.client.IsConnected(), len(messageBytes))
}
}
func (the *MqttHelper) Subscribe(topic string, myCallback func(topic string, callMsg string)) {
var callback mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
//log.Printf("收到数据 [%v] TOPIC: %s,MSGLen: %v", msg.MessageID(), msg.Topic(), len(msg.Payload()))
Msg := string(msg.Payload())
myCallback(msg.Topic(), Msg)
}
log.Printf("=================开始订阅 %s [%s]=================", the.Host, topic)
t := the.client.Subscribe(topic, 1, callback)
f := func() {
_ = t.Wait() // Can also use '<-t.Done()' in releases > 1.2.0
if t.Error() != nil {
log.Println(t.Error()) // Use your preferred logging technique (or just fmt.Printf)
}
}
go f()
//纪录需要重连的 callback
the.subscribeCalls = append(the.subscribeCalls, subscribeCall{
topic: topic,
f: myCallback,
})
}
func MqttInitial(host string, port int, clientId string, userName string, password string, isSSL bool, caPtah ...string) *MqttHelper {
mqHelpers := MqttHelper{
Host: host,
Port: port,
ClientId: clientId,
UserName: userName,
Password: password,
}
if isSSL && len(caPtah) > 0 {
log.Println("SSL mqHelpers初始化")
mqHelpers.InitialWithSSL(caPtah[0])
} else {
mqHelpers.Initial()
}
//topic = "t/500103" //龙河桥
time.Sleep(time.Second * 1)
return &mqHelpers
}