package et_recv import ( "context" "encoding/json" "errors" "fmt" "gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_utils" "gitea.anxinyun.cn/container/common_utils/configLoad" "gitea.anxinyun.cn/container/common_utils/kafkaHelper" "log" "strconv" "time" ) type RecvDataHandler struct { RecvConfigHelper *common_utils.ConfigHelper alarmCacheUtil *common_utils.AlarmCacheUtil } func NewRecvDataHandler() *RecvDataHandler { redisAddr := configLoad.LoadConfig().GetString("redis.address") return &RecvDataHandler{ RecvConfigHelper: common_utils.NewConfigHelper(redisAddr), alarmCacheUtil: common_utils.NewAlarmCacheUtil(), } } func Recover(deviceId string, structId int, alarmType string, time time.Time) { alarm := common_models.AlarmMsg{ MessageMode: common_models.Alarm_Mode_AutoElimination, StructureId: structId, StructureName: "", SourceId: deviceId, SourceName: "", AlarmTypeCode: alarmType, AlarmCode: "", Content: "", AcqTime: time, SourceTypeId: 1, Sponsor: "et.recv", Extras: nil, SubDevices: nil, } jsonOut, _ := json.Marshal(alarm) brokers := configLoad.LoadConfig().GetStringSlice("kafka.brokers") go kafkaHelper.Send2Topic(brokers, "native_alarm", string(jsonOut)) } func AlarmToOut(deviceId string, structId int, alarmType string, time time.Time, subDevices []string) { alarm := common_models.AlarmMsg{ MessageMode: common_models.Alarm_Mode_Generation, StructureId: structId, StructureName: "", SourceId: deviceId, SourceName: "", AlarmTypeCode: alarmType, AlarmCode: "", Content: "", AcqTime: time, SourceTypeId: common_models.Alarm_Source_Device, Sponsor: common_models.Alarm_Sponsor_Recv, Extras: nil, SubDevices: subDevices, } jsonOut, _ := json.Marshal(alarm) brokers := configLoad.LoadConfig().GetStringSlice("kafka.brokers") go kafkaHelper.Send2Topic(brokers, "native_alarm", string(jsonOut)) } func AlarmDtuToOut(device *common_models.DeviceInfo, alarmType, alarmCode, content string, time time.Time, subDevices []string) { alarm := common_models.AlarmMsg{ MessageMode: common_models.Alarm_Mode_Generation, StructureId: device.Structure.Id, StructureName: device.Structure.Name, SourceId: device.Id, SourceName: device.Name, AlarmTypeCode: alarmType, AlarmCode: alarmCode, Content: content, AcqTime: time, SourceTypeId: common_models.Alarm_Source_DTU, Sponsor: common_models.Alarm_Sponsor_Recv, Extras: nil, SubDevices: subDevices, } jsonOut, _ := json.Marshal(alarm) brokers := configLoad.LoadConfig().GetStringSlice("kafka.brokers") go kafkaHelper.Send2Topic(brokers, "native_alarm", string(jsonOut)) } // OnDataHandler iota 数据处理 func (the *RecvDataHandler) OnDataHandler(ctx context.Context, iotaData common_models.IotaData) (*common_models.DeviceData, error) { if iotaData.DeviceId == "" { return nil, fmt.Errorf("DeviceId is null") } //startTime := time.Now() // 记录开始时间 // 使用带有超时的上下文来获取设备信息 deviceInfo, err := the.RecvConfigHelper.GetDeviceInfo(iotaData.DeviceId, false) if err != nil { return nil, err } // 检查上下文是否被取消 select { case <-ctx.Done(): return nil, ctx.Err() // 返回上下文的错误 default: // 继续处理 } if deviceInfo == nil { errMsg := fmt.Sprintf("[%s] not found in redis ", iotaData.DeviceId) log.Printf(errMsg) return nil, errors.New(errMsg) } // 处理数据 if iotaData.Data.Success() { if len(iotaData.Data.Data) == 0 { log.Printf("[%s] empty data received", iotaData.DeviceId) } toRecover := []string{ common_models.Alarm_Type_Device_Status, common_models.Alarm_Type_Timeout, common_models.Alarm_Type_Data_Parse_Error, common_models.Alarm_Type_Data_Interupt, common_models.Alarm_Type_OutRange, common_models.Alarm_Type_OutRange_Legacy} alarmCacheUtil := common_utils.AlarmCacheUtil{} affects := alarmCacheUtil.Rem(alarmCacheUtil.ALARM_SOURCE_DEVICE, iotaData.DeviceId, toRecover...) if affects > 0 { for _, alarmType := range toRecover { Recover(deviceInfo.Id, deviceInfo.Structure.Id, alarmType, iotaData.TriggerTime) } } } else { var leafNodes = the.RecvConfigHelper.GetSubDeviceNext(iotaData.DeviceId, iotaData.ThingId) if len(leafNodes) > 0 { //todo } //Key_alarm_code alarmTypeOpt, err := the.RecvConfigHelper.GetAlarmCode(strconv.Itoa(iotaData.Data.Result.Code)) if err == nil { the.alarmCacheUtil.Add(the.alarmCacheUtil.ALARM_SOURCE_DEVICE, alarmTypeOpt.TypeCode) AlarmToOut(deviceInfo.Id, deviceInfo.Structure.Id, alarmTypeOpt.TypeCode, iotaData.TriggerTime, leafNodes) } if alarmTypeOpt.TypeCode == common_models.Alarm_Type_OutRange { iotaData.Data.Result.Code = 0 } } if iotaData.Data.Result.Code == 0 { dataType := "" if _dataType, ok := iotaData.Data.Data["_data_type"]; ok { if v, ok := _dataType.(string); ok { dataType = v } } data := &common_models.DeviceData{ DeviceId: iotaData.DeviceId, Name: deviceInfo.Name, ThingId: iotaData.ThingId, StructId: deviceInfo.Structure.Id, TaskId: iotaData.ReadTaskId(), AcqTime: iotaData.TriggerTime, RealTime: iotaData.RealTime, ErrCode: 0, Raw: iotaData.Data.Data, DeviceInfo: *deviceInfo, DimensionId: iotaData.DimensionId, DataType: dataType, } // 记录耗时 //elapsedTime := time.Since(startTime) //log.Printf("[iotaData -> deviceData] deviceID[%s] 转换耗时: %v", iotaData.DeviceId, elapsedTime) return data, nil } // 记录耗时 //elapsedTime := time.Since(startTime) //log.Printf("OnDataHandler 耗时: %v", elapsedTime) return nil, nil } // OnAlarmHandler iota 告警处理 func (the *RecvDataHandler) OnAlarmHandler(iotaAlarm common_models.IotaAlarm) { configHelper := the.RecvConfigHelper deviceId := iotaAlarm.Labels.DeviceId deviceInfo, err := configHelper.GetDeviceInfo(deviceId, false) if err != nil { return } else { switch iotaAlarm.Status { //自动恢复 case common_models.Iota_Alarm_Status_Resolved: switch iotaAlarm.Labels.AlertName { case common_models.Iota_Alarm_OutOfRange: case common_models.Iota_Alarm_LinkStatus: Recover(deviceInfo.Id, deviceInfo.Structure.Id, common_models.Iota_Alarm_LinkStatus, iotaAlarm.StartsAt) default: log.Printf("%s not support alarm type: %s - %s - %s", iotaAlarm.R_(), iotaAlarm.Labels.AlertName, iotaAlarm.Annotations.Summary, iotaAlarm.Annotations.Description) } case common_models.Iota_Alarm_Status_Firing: switch iotaAlarm.Labels.AlertName { case common_models.Iota_Alarm_OutOfRange: case common_models.Iota_Alarm_LinkStatus: //dtu断线告警 需要查询下面所有子设备 subDevices := configHelper.GetSubDeviceAll(iotaAlarm.Labels.DeviceId, iotaAlarm.Labels.ThingId) AlarmDtuToOut( deviceInfo, common_models.Alarm_Type_Dtu_LinkStatus, common_models.Alarm_Code_OffLine, iotaAlarm.Annotations.Summary, iotaAlarm.StartsAt, subDevices) default: log.Printf("%s not support alarm type: %s - %s - %s", iotaAlarm.R_(), iotaAlarm.Labels.AlertName, iotaAlarm.Annotations.Summary, iotaAlarm.Annotations.Description) } } } }