You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
217 lines
5.3 KiB
217 lines
5.3 KiB
package consumers
|
|
|
|
import (
|
|
"crypto/md5"
|
|
"encoding/json"
|
|
"fmt"
|
|
"goUpload/adaptors"
|
|
"goUpload/consumers/GZGZM"
|
|
"goUpload/dbHelper"
|
|
"goUpload/dbHelper/_kafka"
|
|
"io"
|
|
"log"
|
|
"math/rand"
|
|
"time"
|
|
)
|
|
|
|
type consumerGZGZM struct {
|
|
//数据缓存管道
|
|
dataCache chan []byte
|
|
//具体配置
|
|
ConfigInfo GZGZM.ConfigFile
|
|
InKafka _kafka.KafkaHelper
|
|
OutHttp *dbHelper.HttpHelper
|
|
}
|
|
|
|
func (the *consumerGZGZM) RegisterPoint(thirdId string, sensorInfo GZGZM.SensorInfo) error {
|
|
registerUrl := "http://sit.gibs.deagluo.top:32001/openApi/gzm_v1/monitorPoint/create"
|
|
r := GZGZM.RegisterPointBody{
|
|
ThirdId: thirdId,
|
|
BranchCode: sensorInfo.BranchCode,
|
|
Name: sensorInfo.Name,
|
|
Code: sensorInfo.Code,
|
|
Type: sensorInfo.Type,
|
|
AlarmValue1: 0,
|
|
AlarmValue2: 0,
|
|
AlarmValue3: 0,
|
|
WarnValue1: 0,
|
|
WarnValue2: 0,
|
|
WarnValue3: 0,
|
|
AlarmCount: 0,
|
|
WarnCount: 0,
|
|
InitAmount1: sensorInfo.InitAmount1,
|
|
InitAmount2: sensorInfo.InitAmount2,
|
|
InitAmount3: sensorInfo.InitAmount3,
|
|
Unit: sensorInfo.Unit,
|
|
IsClose: 0,
|
|
}
|
|
|
|
the.OutHttp.Url = registerUrl
|
|
body, _ := json.Marshal(r)
|
|
resp, err := the.OutHttp.PublishWithHeader(body, map[string]string{"Authorization": the.OutHttp.Token})
|
|
if err != nil {
|
|
log.Printf(" 注册测点异常 err=%v \n,resp=%s", err, resp)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (the *consumerGZGZM) LoadConfigJson(cfgStr string) {
|
|
// 将 JSON 格式的数据解析到结构体中
|
|
err := json.Unmarshal([]byte(cfgStr), &the.ConfigInfo)
|
|
if err != nil {
|
|
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func (the *consumerGZGZM) Initial(cfg string) error {
|
|
the.dataCache = make(chan []byte, 200)
|
|
|
|
the.LoadConfigJson(cfg)
|
|
err := the.inputInitial()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = the.outputInitial()
|
|
return err
|
|
}
|
|
func (the *consumerGZGZM) inputInitial() error {
|
|
//数据入口
|
|
the.InKafka = _kafka.KafkaHelper{
|
|
Brokers: the.ConfigInfo.IoConfig.In.Kafka.Brokers,
|
|
GroupId: the.ConfigInfo.IoConfig.In.Kafka.GroupId,
|
|
}
|
|
the.InKafka.Initial()
|
|
for _, inTopic := range the.ConfigInfo.IoConfig.In.Kafka.Topics {
|
|
the.InKafka.Subscribe(inTopic, the.onData)
|
|
}
|
|
|
|
the.InKafka.Worker()
|
|
return nil
|
|
}
|
|
func (the *consumerGZGZM) outputInitial() error {
|
|
//数据出口
|
|
the.OutHttp = &dbHelper.HttpHelper{
|
|
Url: the.ConfigInfo.IoConfig.Out.Http.Url,
|
|
}
|
|
the.OutHttp.Initial()
|
|
|
|
//判断有无token
|
|
if the.ConfigInfo.IoConfig.Out.Http.Token.Static != "" {
|
|
//静态token
|
|
the.OutHttp.Token = the.ConfigInfo.IoConfig.Out.Http.Token.Static
|
|
} else { //需要动态获取的token
|
|
if the.ConfigInfo.IoConfig.Out.Http.Token.Url != "" {
|
|
go the.RefreshTask()
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (the *consumerGZGZM) RefreshTask() {
|
|
the.tokenRefresh()
|
|
ticker := time.NewTicker(24 * time.Hour)
|
|
defer ticker.Stop()
|
|
for true {
|
|
<-ticker.C
|
|
the.tokenRefresh()
|
|
}
|
|
}
|
|
|
|
func (the *consumerGZGZM) tokenRefresh() {
|
|
url := the.ConfigInfo.IoConfig.Out.Http.Token.Url
|
|
appKey := the.ConfigInfo.IoConfig.Out.Http.Token.AppKey
|
|
appSecret := the.ConfigInfo.IoConfig.Out.Http.Token.AppSecret
|
|
randomStr16 := string(Krand(16, KC_RAND_KIND_NUM))
|
|
timestamp := time.Now().UnixMilli()
|
|
secretRaw := fmt.Sprintf("%s%s%s%d", appKey, appSecret, randomStr16, timestamp)
|
|
h := md5.New()
|
|
|
|
io.WriteString(h, secretRaw)
|
|
secret := fmt.Sprintf("%x", h.Sum(nil))
|
|
|
|
queryUrl := url + "?" +
|
|
fmt.Sprintf("&appKey=%s", appKey) +
|
|
fmt.Sprintf("&random=%s", randomStr16) +
|
|
fmt.Sprintf("×tamp=%d", timestamp) +
|
|
fmt.Sprintf("&secret=%s", secret)
|
|
tokenHttp := &dbHelper.HttpHelper{
|
|
Url: queryUrl,
|
|
}
|
|
tokenHttp.Initial()
|
|
tokenResp := tokenHttp.HttpGet("")
|
|
|
|
tokenBody := GZGZM.TokenBody{}
|
|
|
|
err := json.Unmarshal([]byte(tokenResp), &tokenBody)
|
|
if err != nil {
|
|
log.Printf("解析异常 err=%s,body=%s", err.Error(), tokenResp)
|
|
return
|
|
}
|
|
|
|
if tokenBody.Code == 200 {
|
|
the.OutHttp.Token = tokenBody.Data
|
|
log.Printf("刷新成功,token=%s", the.OutHttp.Token)
|
|
}
|
|
|
|
}
|
|
|
|
func (the *consumerGZGZM) Work() {
|
|
|
|
go func() {
|
|
for {
|
|
pushBytes := <-the.dataCache
|
|
log.Printf("取出ch数据,剩余[%d] ", len(the.dataCache))
|
|
|
|
log.Printf("推送[%v]: len=%d", the.OutHttp.Url, len(pushBytes))
|
|
the.OutHttp.PublishWithHeader(pushBytes, map[string]string{"Authorization": the.OutHttp.Token})
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
|
|
}()
|
|
}
|
|
func (the *consumerGZGZM) onData(topic string, msg string) bool {
|
|
if len(msg) > 80 {
|
|
log.Printf("recv:[%s]:%s ...", topic, msg[:80])
|
|
}
|
|
adaptor := the.getAdaptor()
|
|
if adaptor != nil {
|
|
needPush := adaptor.Transform(topic, msg)
|
|
|
|
if len(needPush) > 0 {
|
|
the.dataCache <- needPush
|
|
}
|
|
|
|
}
|
|
return true
|
|
}
|
|
func (the *consumerGZGZM) getAdaptor() (adaptor adaptors.IAdaptor3) {
|
|
|
|
return adaptors.Adaptor_THEME_GZGZM{
|
|
GZGZM.SensorConfig{
|
|
SensorInfoMap: the.ConfigInfo.SensorInfoMap,
|
|
},
|
|
}
|
|
}
|
|
|
|
// 随机字符串
|
|
func Krand(size int, kind int) []byte {
|
|
ikind, kinds, result := kind, [][]int{[]int{10, 48}, []int{26, 97}, []int{26, 65}}, make([]byte, size)
|
|
is_all := kind > 2 || kind < 0
|
|
//rand.Seed(time.Now().UnixNano())
|
|
for i := 0; i < size; i++ {
|
|
if is_all { // random ikind
|
|
ikind = rand.Intn(3)
|
|
}
|
|
scope, base := kinds[ikind][0], kinds[ikind][1]
|
|
result[i] = uint8(base + rand.Intn(scope))
|
|
}
|
|
return result
|
|
}
|
|
|
|
const (
|
|
KC_RAND_KIND_NUM = 0 // 纯数字
|
|
KC_RAND_KIND_LOWER = 1 // 小写字母
|
|
KC_RAND_KIND_UPPER = 2 // 大写字母
|
|
KC_RAND_KIND_ALL = 3 // 数字、大小写字母
|
|
)
|
|
|