|
|
@ -80,6 +80,7 @@ func (the *consumerAXYThemeToES) outputInitial() error { |
|
|
|
|
|
|
|
func (the *consumerAXYThemeToES) sinkTask() { |
|
|
|
intervalSec := the.Info.IoConfig.Out.Es.Interval |
|
|
|
log.Printf("读取配置文件intervalSec[%d]", intervalSec) |
|
|
|
ticker := time.NewTicker(time.Duration(intervalSec) * time.Second) |
|
|
|
defer ticker.Stop() |
|
|
|
for { |
|
|
@ -108,7 +109,7 @@ func (the *consumerAXYThemeToES) toSink() { |
|
|
|
}) |
|
|
|
if len(themes) > 0 { |
|
|
|
index := the.Info.IoConfig.Out.Es.Index |
|
|
|
log.Printf("写入es [%s] %d条", index, len(themes)) |
|
|
|
log.Printf("写入es [%s] %d条,%s", index, len(themes), themes) |
|
|
|
the.OutEs.BulkWriteThemes2Es(index, themes) |
|
|
|
the.sinkMap.Clear() |
|
|
|
} |
|
|
@ -146,7 +147,9 @@ func (the *consumerAXYThemeToES) onData(topic string, msg string) bool { |
|
|
|
the.dataCache <- needPush |
|
|
|
} else { |
|
|
|
s, _ := json.Marshal(needPush) |
|
|
|
|
|
|
|
if needPush != nil { |
|
|
|
log.Printf("onData 测点[%d] needPush= %s", needPush.Sensor, s) |
|
|
|
if needPush.Sensor == the.logTagId { |
|
|
|
log.Printf("onData 测点[%d] 异常needPush= %s", needPush.Sensor, s) |
|
|
|
} |
|
|
|