数据 输入输出 处理
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

217 lines
5.3 KiB

package consumers
import (
"crypto/md5"
"encoding/json"
"fmt"
"goInOut/adaptors"
"goInOut/consumers/GZGZM"
"goInOut/dbHelper"
"goInOut/dbHelper/_kafka"
"io"
"log"
"math/rand"
"time"
)
type consumerGZGZM struct {
//数据缓存管道
dataCache chan []byte
//具体配置
ConfigInfo GZGZM.ConfigFile
InKafka _kafka.KafkaHelper
OutHttp *dbHelper.HttpHelper
}
func (the *consumerGZGZM) RegisterPoint(thirdId string, sensorInfo GZGZM.SensorInfo) error {
registerUrl := "http://sit.gibs.deagluo.top:32001/openApi/gzm_v1/monitorPoint/create"
r := GZGZM.RegisterPointBody{
ThirdId: thirdId,
BranchCode: sensorInfo.BranchCode,
Name: sensorInfo.Name,
Code: sensorInfo.Code,
Type: sensorInfo.Type,
AlarmValue1: 0,
AlarmValue2: 0,
AlarmValue3: 0,
WarnValue1: 0,
WarnValue2: 0,
WarnValue3: 0,
AlarmCount: 0,
WarnCount: 0,
InitAmount1: sensorInfo.InitAmount1,
InitAmount2: sensorInfo.InitAmount2,
InitAmount3: sensorInfo.InitAmount3,
Unit: sensorInfo.Unit,
IsClose: 0,
}
the.OutHttp.Url = registerUrl
body, _ := json.Marshal(r)
resp, err := the.OutHttp.PublishWithHeader(body, map[string]string{"Authorization": the.OutHttp.Token})
if err != nil {
log.Printf(" 注册测点异常 err=%v \n,resp=%s", err, resp)
}
return err
}
func (the *consumerGZGZM) LoadConfigJson(cfgStr string) {
// 将 JSON 格式的数据解析到结构体中
err := json.Unmarshal([]byte(cfgStr), &the.ConfigInfo)
if err != nil {
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
panic(err)
}
}
func (the *consumerGZGZM) Initial(cfg string) error {
the.dataCache = make(chan []byte, 200)
the.LoadConfigJson(cfg)
err := the.inputInitial()
if err != nil {
return err
}
err = the.outputInitial()
return err
}
func (the *consumerGZGZM) inputInitial() error {
//数据入口
the.InKafka = _kafka.KafkaHelper{
Brokers: the.ConfigInfo.IoConfig.In.Kafka.Brokers,
GroupId: the.ConfigInfo.IoConfig.In.Kafka.GroupId,
}
the.InKafka.Initial()
for _, inTopic := range the.ConfigInfo.IoConfig.In.Kafka.Topics {
the.InKafka.Subscribe(inTopic, the.onData)
}
the.InKafka.Worker()
return nil
}
func (the *consumerGZGZM) outputInitial() error {
//数据出口
the.OutHttp = &dbHelper.HttpHelper{
Url: the.ConfigInfo.IoConfig.Out.Http.Url,
}
the.OutHttp.Initial()
//判断有无token
if the.ConfigInfo.IoConfig.Out.Http.Token.Static != "" {
//静态token
the.OutHttp.Token = the.ConfigInfo.IoConfig.Out.Http.Token.Static
} else { //需要动态获取的token
if the.ConfigInfo.IoConfig.Out.Http.Token.Url != "" {
go the.RefreshTask()
}
}
return nil
}
func (the *consumerGZGZM) RefreshTask() {
the.tokenRefresh()
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
for true {
<-ticker.C
the.tokenRefresh()
}
}
func (the *consumerGZGZM) tokenRefresh() {
url := the.ConfigInfo.IoConfig.Out.Http.Token.Url
appKey := the.ConfigInfo.IoConfig.Out.Http.Token.AppKey
appSecret := the.ConfigInfo.IoConfig.Out.Http.Token.AppSecret
randomStr16 := string(Krand(16, KC_RAND_KIND_NUM))
timestamp := time.Now().UnixMilli()
secretRaw := fmt.Sprintf("%s%s%s%d", appKey, appSecret, randomStr16, timestamp)
h := md5.New()
io.WriteString(h, secretRaw)
secret := fmt.Sprintf("%x", h.Sum(nil))
queryUrl := url + "?" +
fmt.Sprintf("&appKey=%s", appKey) +
fmt.Sprintf("&random=%s", randomStr16) +
fmt.Sprintf("&timestamp=%d", timestamp) +
fmt.Sprintf("&secret=%s", secret)
tokenHttp := &dbHelper.HttpHelper{
Url: queryUrl,
}
tokenHttp.Initial()
tokenResp := tokenHttp.HttpGet("")
tokenBody := GZGZM.TokenBody{}
err := json.Unmarshal([]byte(tokenResp), &tokenBody)
if err != nil {
log.Printf("解析异常 err=%s,body=%s", err.Error(), tokenResp)
return
}
if tokenBody.Code == 200 {
the.OutHttp.Token = tokenBody.Data
log.Printf("刷新成功,token=%s", the.OutHttp.Token)
}
}
func (the *consumerGZGZM) Work() {
go func() {
for {
pushBytes := <-the.dataCache
log.Printf("取出ch数据,剩余[%d] ", len(the.dataCache))
log.Printf("推送[%v]: len=%d", the.OutHttp.Url, len(pushBytes))
the.OutHttp.PublishWithHeader(pushBytes, map[string]string{"Authorization": the.OutHttp.Token})
time.Sleep(10 * time.Millisecond)
}
}()
}
func (the *consumerGZGZM) onData(topic string, msg string) bool {
if len(msg) > 80 {
log.Printf("recv:[%s]:%s ...", topic, msg[:80])
}
adaptor := the.getAdaptor()
if adaptor != nil {
needPush := adaptor.Transform(topic, msg)
if len(needPush) > 0 {
the.dataCache <- needPush
}
}
return true
}
func (the *consumerGZGZM) getAdaptor() (adaptor adaptors.IAdaptor3) {
return adaptors.Adaptor_THEME_GZGZM{
GZGZM.SensorConfig{
SensorInfoMap: the.ConfigInfo.SensorInfoMap,
},
}
}
// 随机字符串
func Krand(size int, kind int) []byte {
ikind, kinds, result := kind, [][]int{[]int{10, 48}, []int{26, 97}, []int{26, 65}}, make([]byte, size)
is_all := kind > 2 || kind < 0
//rand.Seed(time.Now().UnixNano())
for i := 0; i < size; i++ {
if is_all { // random ikind
ikind = rand.Intn(3)
}
scope, base := kinds[ikind][0], kinds[ikind][1]
result[i] = uint8(base + rand.Intn(scope))
}
return result
}
const (
KC_RAND_KIND_NUM = 0 // 纯数字
KC_RAND_KIND_LOWER = 1 // 小写字母
KC_RAND_KIND_UPPER = 2 // 大写字母
KC_RAND_KIND_ALL = 3 // 数字、大小写字母
)