Browse Source

新增NewRecvDataHandler方法

dev
yfh 2 months ago
parent
commit
4b630acda1
  1. 75
      node/et_worker/et_recv/recvDataHanler.go

75
node/et_worker/et_recv/recvDataHanler.go

@ -1,6 +1,7 @@
package et_recv package et_recv
import ( import (
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -13,6 +14,19 @@ import (
"time" "time"
) )
type RecvDataHandler struct {
RecvConfigHelper *common_utils.ConfigHelper
alarmCacheUtil *common_utils.AlarmCacheUtil
}
func NewRecvDataHandler() *RecvDataHandler {
redisAddr := configLoad.LoadConfig().GetString("redis.address")
return &RecvDataHandler{
RecvConfigHelper: common_utils.NewConfigHelper(redisAddr),
alarmCacheUtil: common_utils.NewAlarmCacheUtil(),
}
}
func Recover(deviceId string, structId int, alarmType string, time time.Time) { func Recover(deviceId string, structId int, alarmType string, time time.Time) {
alarm := common_models.AlarmMsg{ alarm := common_models.AlarmMsg{
MessageMode: common_models.Alarm_Mode_AutoElimination, MessageMode: common_models.Alarm_Mode_AutoElimination,
@ -74,37 +88,36 @@ func AlarmDtuToOut(device *common_models.DeviceInfo, alarmType, alarmCode, conte
go kafkaHelper.Send2Topic(brokers, "native_alarm", string(jsonOut)) go kafkaHelper.Send2Topic(brokers, "native_alarm", string(jsonOut))
} }
type RecvDataHanler struct { // OnDataHandler iota 数据处理
configHelper *common_utils.ConfigHelper func (the *RecvDataHandler) OnDataHandler(ctx context.Context, iotaData common_models.IotaData) (*common_models.DeviceData, error) {
alarmCacheUtil *common_utils.AlarmCacheUtil if iotaData.DeviceId == "" {
} return nil, fmt.Errorf("DeviceId is null")
func NewRecvDataHanler() *RecvDataHanler {
redisAddr := configLoad.LoadConfig().GetString("redis.address")
return &RecvDataHanler{
configHelper: common_utils.NewConfigHelper(redisAddr),
alarmCacheUtil: common_utils.NewAlarmCacheUtil(),
} }
}
// OnDataHandler iota 数据处理 //startTime := time.Now() // 记录开始时间
func (the *RecvDataHanler) OnDataHandler(iotaData common_models.IotaData) (*common_models.DeviceData, error) {
configHelper := the.configHelper //common_utils.NewConfigHelper(common_utils.NewRedisHelper("", "192.168.31.128:30379"))
deviceInfo, err := configHelper.GetDeviceInfo(iotaData.DeviceId) // 使用带有超时的上下文来获取设备信息
deviceInfo, err := the.RecvConfigHelper.GetDeviceInfo(iotaData.DeviceId, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// 检查上下文是否被取消
select {
case <-ctx.Done():
return nil, ctx.Err() // 返回上下文的错误
default:
// 继续处理
}
if deviceInfo == nil { if deviceInfo == nil {
errMsg := fmt.Sprintf("[%s] DeviceId not found in redis ", iotaData.DeviceId) errMsg := fmt.Sprintf("[%s] not found in redis ", iotaData.DeviceId)
log.Printf(errMsg) log.Printf(errMsg)
return nil, errors.New(errMsg) return nil, errors.New(errMsg)
} }
//code := iotaData.ThemeData.Result.Code
//taskId := iotaData.ReadTaskId() // 处理数据
if iotaData.Data.Success() { if iotaData.Data.Success() {
//数据恢复设备高告警
if len(iotaData.Data.Data) == 0 { if len(iotaData.Data.Data) == 0 {
log.Printf("[%s] empty data received", iotaData.DeviceId) log.Printf("[%s] empty data received", iotaData.DeviceId)
} }
@ -124,13 +137,13 @@ func (the *RecvDataHanler) OnDataHandler(iotaData common_models.IotaData) (*comm
} }
} }
} else { } else {
var leafNodes = configHelper.GetSubDeviceNext(iotaData.DeviceId, iotaData.ThingId) var leafNodes = the.RecvConfigHelper.GetSubDeviceNext(iotaData.DeviceId, iotaData.ThingId)
if len(leafNodes) > 0 { if len(leafNodes) > 0 {
//todo //todo
} }
//Key_alarm_code //Key_alarm_code
alarmTypeOpt, err := configHelper.GetAlarmCode(strconv.Itoa(iotaData.Data.Result.Code)) alarmTypeOpt, err := the.RecvConfigHelper.GetAlarmCode(strconv.Itoa(iotaData.Data.Result.Code))
if err == nil { if err == nil {
the.alarmCacheUtil.Add(the.alarmCacheUtil.ALARM_SOURCE_DEVICE, alarmTypeOpt.TypeCode) the.alarmCacheUtil.Add(the.alarmCacheUtil.ALARM_SOURCE_DEVICE, alarmTypeOpt.TypeCode)
AlarmToOut(deviceInfo.Id, deviceInfo.Structure.Id, alarmTypeOpt.TypeCode, iotaData.TriggerTime, leafNodes) AlarmToOut(deviceInfo.Id, deviceInfo.Structure.Id, alarmTypeOpt.TypeCode, iotaData.TriggerTime, leafNodes)
@ -139,7 +152,6 @@ func (the *RecvDataHanler) OnDataHandler(iotaData common_models.IotaData) (*comm
if alarmTypeOpt.TypeCode == common_models.Alarm_Type_OutRange { if alarmTypeOpt.TypeCode == common_models.Alarm_Type_OutRange {
iotaData.Data.Result.Code = 0 iotaData.Data.Result.Code = 0
} }
} }
if iotaData.Data.Result.Code == 0 { if iotaData.Data.Result.Code == 0 {
@ -163,17 +175,26 @@ func (the *RecvDataHanler) OnDataHandler(iotaData common_models.IotaData) (*comm
DimensionId: iotaData.DimensionId, DimensionId: iotaData.DimensionId,
DataType: dataType, DataType: dataType,
} }
return data, err
// 记录耗时
//elapsedTime := time.Since(startTime)
//log.Printf("[iotaData -> deviceData] deviceID[%s] 转换耗时: %v", iotaData.DeviceId, elapsedTime)
return data, nil
} }
return nil, err // 记录耗时
//elapsedTime := time.Since(startTime)
//log.Printf("OnDataHandler 耗时: %v", elapsedTime)
return nil, nil
} }
// OnAlarmHandler iota 告警处理 // OnAlarmHandler iota 告警处理
func (the *RecvDataHanler) OnAlarmHandler(iotaAlarm common_models.IotaAlarm) { func (the *RecvDataHandler) OnAlarmHandler(iotaAlarm common_models.IotaAlarm) {
configHelper := the.configHelper configHelper := the.RecvConfigHelper
deviceId := iotaAlarm.Labels.DeviceId deviceId := iotaAlarm.Labels.DeviceId
deviceInfo, err := configHelper.GetDeviceInfo(deviceId) deviceInfo, err := configHelper.GetDeviceInfo(deviceId, false)
if err != nil { if err != nil {
return return
} else { } else {

Loading…
Cancel
Save