You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
229 lines
8.7 KiB
229 lines
8.7 KiB
package consumers
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"goInOut/adaptors"
|
|
"goInOut/consumers/CDJYSN"
|
|
"goInOut/dbOperate"
|
|
"goInOut/monitors"
|
|
"goInOut/utils"
|
|
"log"
|
|
"path"
|
|
"time"
|
|
|
|
"github.com/robfig/cron/v3"
|
|
)
|
|
|
|
type consumerGDKS struct {
|
|
//数据缓存管道
|
|
dataCache chan []byte
|
|
//具体配置
|
|
ConfigInfo CDJYSN.ConfigFile
|
|
//InHttp *dbOperate.HttpHelper
|
|
OutFile *dbOperate.FileSaveHelper
|
|
InHttp monitors.HttpMonitor
|
|
configCron *cron.Cron
|
|
}
|
|
|
|
func (the *consumerGDKS) LoadConfigJson(cfgStr string) {
|
|
// 将 JSON 格式的数据解析到结构体中
|
|
err := json.Unmarshal([]byte(cfgStr), &the.ConfigInfo)
|
|
if err != nil {
|
|
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func (the *consumerGDKS) Initial(cfg string) error {
|
|
the.LoadConfigJson(cfg)
|
|
err := the.InputInitial()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = the.OutputInitial()
|
|
return err
|
|
}
|
|
func (the *consumerGDKS) InputInitial() error {
|
|
the.dataCache = make(chan []byte, 200)
|
|
//数据入口
|
|
the.InHttp = monitors.HttpMonitor{
|
|
HttpClient: &dbOperate.HttpHelper{Url: the.ConfigInfo.IoConfig.In.Http.Url, Token: ""},
|
|
MonitorHelper: &monitors.MonitorHelper{CronStr: the.ConfigInfo.IoConfig.In.CronStr},
|
|
}
|
|
|
|
the.InHttp.Start()
|
|
the.InHttp.RegisterTask(the.ConfigInfo.IoConfig.In.CronStr, the.getEsData)
|
|
return nil
|
|
}
|
|
func (the *consumerGDKS) OutputInitial() error {
|
|
//数据出口
|
|
the.OutFile = &dbOperate.FileSaveHelper{
|
|
Directory: the.ConfigInfo.IoConfig.Out.File.Directory,
|
|
FilenameExtension: the.ConfigInfo.IoConfig.Out.File.FileNameExtension,
|
|
}
|
|
the.OutFile.Initial()
|
|
return nil
|
|
}
|
|
func (the *consumerGDKS) Work() {
|
|
|
|
//每次启动生成测点配置文件
|
|
the.saveConfig()
|
|
the.configCron = cron.New()
|
|
the.configCron.AddFunc("10 12 * * *", func() {
|
|
log.Println("执行每天刷新传感器基础配置任务:")
|
|
the.saveConfig()
|
|
})
|
|
|
|
go func() {
|
|
for {
|
|
pushBytes := <-the.dataCache
|
|
log.Printf("取出ch数据,剩余[%d] ", len(the.dataCache))
|
|
|
|
log.Printf("推送[%v]: len=%d", the.OutFile.Directory, len(pushBytes))
|
|
//hex.EncodeToString(pushBytes)
|
|
//非煤矿山编码_文件分类_时间.txt
|
|
fileName := fmt.Sprintf("%s_%s.txt", the.ConfigInfo.Info["fileNamePrefix"], time.Now().Format("20060102150405"))
|
|
filePath := path.Join(the.ConfigInfo.IoConfig.Out.File.Directory, fileName)
|
|
|
|
the.OutFile.Save(filePath, string(pushBytes))
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
|
|
}()
|
|
}
|
|
|
|
func (the *consumerGDKS) getStructureId() string {
|
|
structureId, ok := the.ConfigInfo.Info["structureId"]
|
|
if !ok {
|
|
structureId = "3381"
|
|
}
|
|
return structureId
|
|
}
|
|
|
|
func (the *consumerGDKS) saveConfig() {
|
|
//130731030001;涿鹿金隅水泥大斜阳矿山;DXYKSJC;涿鹿金隅水泥大斜阳矿山安全监测;承德金隅水泥有限公司;2030-12-30;2024-02-06 11:39:02^13073103000103200001;雨量;01;130731030001;0101;03;;mm;;100;90;80;70;采场;115.244209;40.123344;1217;2020-07-01;武汉新普惠;4;1;0;2024-02-06 10:55:45^13073103000102100001;JC01;01;130731030001;0101;02;;mm;;100;90;80;70;排土1297平台;115.243554;40.114834;1297;2020-07-01;江西飞尚科技;1000;1;0;2024-02-06 10:55:45^13073103000102100002;JC02;01;130731030001;0101;02;;mm;;100;90;80;70;边坡1317平台;115.241985;40.115445;1317;2020-07-01;江西飞尚科技;1000;1;0;2024-02-06 10:55:45^13073103000102100003;JC03;01;130731030001;0101;02;;mm;;100;90;80;70;边坡1317平台;115.241080;40.115698;1317;2020-07-01;江西飞尚科技;1000;1;0;2024-02-06 10:55:45^]]]
|
|
//"structUploadCode": "130731030001",
|
|
//"fileNamePrefix": "130731030001_LTCDSS",
|
|
//"fileContentHeader": "130731030001;涿鹿金隅水泥大斜阳矿山;"
|
|
structureId := the.getStructureId()
|
|
contentConfig := ""
|
|
switch structureId {
|
|
case "3381": //广东省大宝山矿业有限公司李屋排土场在线监测
|
|
contentConfigHead := the.ConfigInfo.Info["fileContentHeader"] + //非煤矿山编码 440205005001,边坡名称
|
|
"LZJDBPJC;" + //系统型号
|
|
"广东省大宝山矿业有限公司李屋排土场在线监测系统;" + //系统名称
|
|
"广东省大宝山矿业有限公司;" + //生产厂家名称
|
|
"2030-12-30;" + //安标有效期
|
|
"2025-12-22 15:00:00" //数据上传时间
|
|
contentConfigBody := "" +
|
|
//测点编码;测点名称;系统编码;露天矿山编码;边坡编号;传感器类型;测点数值类型;测点数值单位;埋深;一级阈值;二级阈值;三级阈值;四级阈值;测点安装位置;位置X;位置Y;位置Z;安装日期;生产厂家;量程;在用状态;故障状态;数据时间
|
|
"^44020500500102100011;757_G1;01;440205005001;0201;2001;;mm;;;;;;排土场;113.71655000;24.55281252;767;2023-05-01;司南导航;15;1;0;2025-12-22 10:00:00" +
|
|
"^44020500500102100012;757_G2;01;440205005001;0201;2001;;mm;;;;;;排土场;113.71533904;24.55295121;767;2023-05-01;司南导航;15;1;0;2025-12-22 10:00:00" +
|
|
"^44020500500102100021;757_G3;01;440205005001;0201;2001;;mm;;;;;;排土场;113.71460937;24.55365389;767;2023-05-01;司南导航;15;1;0;2025-12-22 10:00:00" +
|
|
"^44020500500102100022;757_G4;01;440205005001;0201;2001;;mm;;;;;;排土场;113.71425361;24.55447242;767;2023-05-01;司南导航;15;1;0;2025-12-22 10:00:00" +
|
|
"^44020500500102100031;757_G5;01;440205005001;0201;2001;;mm;;;;;;排土场;113.71396365;24.55502772;767;2023-05-01;司南导航;15;1;0;2025-12-22 10:00:00" +
|
|
"^44020500500102100032;757_G6;01;440205005001;0201;2001;;mm;;;;;;排土场;113.71318562;24.55588272;767;2023-05-01;司南导航;15;1;0;2025-12-22 10:00:00" +
|
|
"^44020500500102100071;681_G1;01;440205005001;0201;2001;;mm;;;;;;排土场;113.71450034;24.55077052;681;2023-05-01;司南导航;15;1;0;2025-12-22 10:00:00" +
|
|
"^44020500500102100041;681_G2;01;440205005001;0201;2001;;mm;;;;;;排土场;113.71363350;24.55145886;681;2023-05-01;司南导航;15;1;0;2025-12-22 10:00:00" +
|
|
"^44020500500102100042;681_G3;01;440205005001;0201;2001;;mm;;;;;;排土场;113.71295301;24.55210073;681;2023-05-01;司南导航;15;1;0;2025-12-22 10:00:00" +
|
|
"^44020500500102100043;625_G1;01;440205005001;0201;2001;;mm;;;;;;排土场;113.71642567;24.54860382;633;2023-05-01;司南导航;15;1;0;2025-12-22 10:00:00" +
|
|
"^44020500500102100051;625_G2;01;440205005001;0201;2001;;mm;;;;;;排土场;113.71498554;24.54938638;633;2023-05-01;司南导航;15;1;0;2025-12-22 10:00:00" +
|
|
"^44020500500102100052;625_G3;01;440205005001;0201;2001;;mm;;;;;;排土场;113.71398753;24.54990754;633;2023-05-01;司南导航;15;1;0;2025-12-22 10:00:00" +
|
|
"^]]]"
|
|
contentConfig = contentConfigHead + contentConfigBody
|
|
}
|
|
fileName := fmt.Sprintf("%s_LTCDJC_%s.txt", the.ConfigInfo.Info["structUploadCode"], time.Now().Format("20060102150405"))
|
|
filePath := path.Join(the.ConfigInfo.IoConfig.Out.File.Directory, fileName)
|
|
the.OutFile.Save(filePath, contentConfig)
|
|
}
|
|
func (the *consumerGDKS) getEsData() {
|
|
structureId := the.getStructureId()
|
|
start, end := utils.GetTimeRangeByHour(-1)
|
|
log.Printf("查询数据时间范围 %s - %s", start, end)
|
|
//start := "2024-02-05T00:00:00.000+0800"
|
|
//end := "2024-02-05T23:59:59.999+0800"
|
|
esQuery := fmt.Sprintf(`
|
|
{
|
|
"size": 0,
|
|
"query": {
|
|
"bool": {
|
|
"filter": [
|
|
{
|
|
"term": {
|
|
"structure": {
|
|
"value": %s
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"range": {
|
|
"collect_time": {
|
|
"from": "%s",
|
|
"to": "%s"
|
|
}
|
|
}
|
|
}
|
|
]
|
|
}
|
|
},
|
|
"sort": [
|
|
{
|
|
"collect_time": {
|
|
"order": "desc"
|
|
}
|
|
}
|
|
],
|
|
"aggs": {
|
|
"gpBySensorId": {
|
|
"terms": {
|
|
"field": "sensor",
|
|
"size": 60
|
|
},
|
|
"aggs": {
|
|
"last": {
|
|
"top_hits": {
|
|
"size": 1,
|
|
"sort": [
|
|
{
|
|
"collect_time": {
|
|
"order": "desc"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
`, structureId, start, end)
|
|
auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"}
|
|
esAggResult := the.InHttp.HttpClient.HttpGetWithHeader(esQuery, auth)
|
|
|
|
var needPush []byte
|
|
adaptor := the.getAdaptor()
|
|
if adaptor != nil {
|
|
needPush = adaptor.Transform(esAggResult)
|
|
if len(needPush) > 0 {
|
|
the.dataCache <- needPush
|
|
}
|
|
}
|
|
}
|
|
|
|
func (the *consumerGDKS) getAdaptor() (adaptor adaptors.IAdaptor) {
|
|
//maps.Copy(the.ConfigInfo.SensorMap.GnssSensorMap, the.ConfigInfo.SensorMap.RainSensorMap)
|
|
return adaptors.Adaptor_AXYES_GDKS{
|
|
GnssMap: the.ConfigInfo.SensorMap.GnssSensorMap,
|
|
RainMap: the.ConfigInfo.SensorMap.RainSensorMap,
|
|
NBWYMap: the.ConfigInfo.SensorMap.NBWYSensorMap,
|
|
Info: the.ConfigInfo.Info,
|
|
}
|
|
}
|
|
|
|
// 获取 内部位移安装位置
|
|
func (the *consumerGDKS) getNBWYLocation(pointCode string) string {
|
|
platformCode := pointCode[:3]
|
|
location := fmt.Sprintf("排土%s平台", platformCode)
|
|
return location
|
|
}
|
|
|