diff --git a/node/et_worker/et_recv/recvDataHanler.go b/node/et_worker/et_recv/recvDataHanler.go index 6100086..bdbf202 100644 --- a/node/et_worker/et_recv/recvDataHanler.go +++ b/node/et_worker/et_recv/recvDataHanler.go @@ -1,6 +1,7 @@ package et_recv import ( + "context" "encoding/json" "errors" "fmt" @@ -13,6 +14,19 @@ import ( "time" ) +type RecvDataHandler struct { + RecvConfigHelper *common_utils.ConfigHelper + alarmCacheUtil *common_utils.AlarmCacheUtil +} + +func NewRecvDataHandler() *RecvDataHandler { + redisAddr := configLoad.LoadConfig().GetString("redis.address") + return &RecvDataHandler{ + RecvConfigHelper: common_utils.NewConfigHelper(redisAddr), + alarmCacheUtil: common_utils.NewAlarmCacheUtil(), + } +} + func Recover(deviceId string, structId int, alarmType string, time time.Time) { alarm := common_models.AlarmMsg{ MessageMode: common_models.Alarm_Mode_AutoElimination, @@ -74,37 +88,36 @@ func AlarmDtuToOut(device *common_models.DeviceInfo, alarmType, alarmCode, conte go kafkaHelper.Send2Topic(brokers, "native_alarm", string(jsonOut)) } -type RecvDataHanler struct { - configHelper *common_utils.ConfigHelper - alarmCacheUtil *common_utils.AlarmCacheUtil -} - -func NewRecvDataHanler() *RecvDataHanler { - redisAddr := configLoad.LoadConfig().GetString("redis.address") - return &RecvDataHanler{ - configHelper: common_utils.NewConfigHelper(redisAddr), - alarmCacheUtil: common_utils.NewAlarmCacheUtil(), +// OnDataHandler iota 数据处理 +func (the *RecvDataHandler) OnDataHandler(ctx context.Context, iotaData common_models.IotaData) (*common_models.DeviceData, error) { + if iotaData.DeviceId == "" { + return nil, fmt.Errorf("DeviceId is null") } -} -// OnDataHandler iota 数据处理 -func (the *RecvDataHanler) OnDataHandler(iotaData common_models.IotaData) (*common_models.DeviceData, error) { - configHelper := the.configHelper //common_utils.NewConfigHelper(common_utils.NewRedisHelper("", "192.168.31.128:30379")) + //startTime := time.Now() // 记录开始时间 - deviceInfo, err := configHelper.GetDeviceInfo(iotaData.DeviceId) + // 使用带有超时的上下文来获取设备信息 + deviceInfo, err := the.RecvConfigHelper.GetDeviceInfo(iotaData.DeviceId, false) if err != nil { return nil, err } + // 检查上下文是否被取消 + select { + case <-ctx.Done(): + return nil, ctx.Err() // 返回上下文的错误 + default: + // 继续处理 + } + if deviceInfo == nil { - errMsg := fmt.Sprintf("[%s] DeviceId not found in redis ", iotaData.DeviceId) + errMsg := fmt.Sprintf("[%s] not found in redis ", iotaData.DeviceId) log.Printf(errMsg) return nil, errors.New(errMsg) } - //code := iotaData.ThemeData.Result.Code - //taskId := iotaData.ReadTaskId() + + // 处理数据 if iotaData.Data.Success() { - //数据恢复设备高告警 if len(iotaData.Data.Data) == 0 { log.Printf("[%s] empty data received", iotaData.DeviceId) } @@ -124,13 +137,13 @@ func (the *RecvDataHanler) OnDataHandler(iotaData common_models.IotaData) (*comm } } } else { - var leafNodes = configHelper.GetSubDeviceNext(iotaData.DeviceId, iotaData.ThingId) + var leafNodes = the.RecvConfigHelper.GetSubDeviceNext(iotaData.DeviceId, iotaData.ThingId) if len(leafNodes) > 0 { //todo } //Key_alarm_code - alarmTypeOpt, err := configHelper.GetAlarmCode(strconv.Itoa(iotaData.Data.Result.Code)) + alarmTypeOpt, err := the.RecvConfigHelper.GetAlarmCode(strconv.Itoa(iotaData.Data.Result.Code)) if err == nil { the.alarmCacheUtil.Add(the.alarmCacheUtil.ALARM_SOURCE_DEVICE, alarmTypeOpt.TypeCode) AlarmToOut(deviceInfo.Id, deviceInfo.Structure.Id, alarmTypeOpt.TypeCode, iotaData.TriggerTime, leafNodes) @@ -139,7 +152,6 @@ func (the *RecvDataHanler) OnDataHandler(iotaData common_models.IotaData) (*comm if alarmTypeOpt.TypeCode == common_models.Alarm_Type_OutRange { iotaData.Data.Result.Code = 0 } - } if iotaData.Data.Result.Code == 0 { @@ -163,17 +175,26 @@ func (the *RecvDataHanler) OnDataHandler(iotaData common_models.IotaData) (*comm DimensionId: iotaData.DimensionId, DataType: dataType, } - return data, err + + // 记录耗时 + //elapsedTime := time.Since(startTime) + //log.Printf("[iotaData -> deviceData] deviceID[%s] 转换耗时: %v", iotaData.DeviceId, elapsedTime) + + return data, nil } - return nil, err + // 记录耗时 + //elapsedTime := time.Since(startTime) + //log.Printf("OnDataHandler 耗时: %v", elapsedTime) + + return nil, nil } // OnAlarmHandler iota 告警处理 -func (the *RecvDataHanler) OnAlarmHandler(iotaAlarm common_models.IotaAlarm) { - configHelper := the.configHelper +func (the *RecvDataHandler) OnAlarmHandler(iotaAlarm common_models.IotaAlarm) { + configHelper := the.RecvConfigHelper deviceId := iotaAlarm.Labels.DeviceId - deviceInfo, err := configHelper.GetDeviceInfo(deviceId) + deviceInfo, err := configHelper.GetDeviceInfo(deviceId, false) if err != nil { return } else {