const moment = require('moment') // let isDev = false let isDev = true module.exports = function (app, opts) { const workorderStatistics = app.fs.scheduleInit( { interval: '00 */30 * * * *', immediate: isDev, proRun: !isDev, // disabled: true, }, async () => { try { const { models, ORM: sequelize } = app.fs.dc const { parseProcessData } = app.fs.utils let data = await getAnxinyunToken(app) //获取所有泵站 let structureList = await app.fs.anxinyun.get(`/organizations/${data.orgId}/structures?token=${data.token}`) || [] await app.redis.set("pumpStation_structure", JSON.stringify(structureList)) if (structureList.length) { let waterLevelData = [] //七天内每个泵站的集水池液位 let waterLevelSix = {} //泵站最新6h的集水池液位 let waterPumpStateAll = [] //所有水泵的状态 let capacity = {} //能耗监测 let currentSix = {} //水泵六小时数据 let cabinet = {} //进线柜 let threePhase = {} //三相电流 for (let index = 0; index < structureList.length; index++) { const strucOne = structureList[index]; //单个泵站 let pumpOne = await app.fs.anxinyun.get(`structures/${strucOne.id}/factors?token=${data.token}`) || [] if (pumpOne.length) { let pump = [] //能耗监测--水泵 let cabinetSun = [] //能耗监测--进线柜 let sun = {} let day1 = 0 let day30 = 0 let day365 = 0 let daySun = 0 //泵站信息 let informationId = pumpOne.find(v => v.name == '泵站信息').id if (informationId) { let pumpInformation = await app.fs.anxinyun.get(`structures/${strucOne.id}/stations?token=${data.token}`, { query: { factorId: informationId } }) || [] if (pumpInformation.length > 0 && pumpInformation[0].groups.length && pumpInformation[0].groups[0].stations[0].id) { //七天内该泵站最新的集水池液位 let waterLevel = await app.fs.anxinyun.get(`stations/theme/data?token=${data.token}`, { query: { stations: pumpInformation[0].groups[0].stations[0].id, startTime: moment().startOf('week').format('YYYY-MM-DD HH:mm:ss'), endTime: moment().endOf('week').format('YYYY-MM-DD HH:mm:ss'), limit: 1 } }) || {} let findOne = waterLevel.stations[0].data[0] || {} sun.sHumidity = findOne.sHumidity sun.sTEMP = findOne.sTEMP sun.sGrille_level = findOne.sGrille_level waterLevelData.push({ strucId: strucOne.id, name: strucOne.name, level: findOne.sLiquid_level || 0 }) //该泵站最新6h的集水池液位 let waterLevel6 = await app.fs.anxinyun.get(`stations/theme/data?token=${data.token}`, { query: { stations: pumpInformation[0].groups[0].stations[0].id, startTime: moment().add(-6, 'hours').format('YYYY-MM-DD HH:mm:ss'), endTime: moment().format('YYYY-MM-DD HH:mm:ss'), limit: 1440 } }) || {} waterLevelSix['struc' + strucOne.id] = waterLevel6.stations[0] && JSON.stringify(waterLevel6.stations[0].data) } } //水泵信息 let waterId = pumpOne.find(v => v.name == '泵站水泵').id if (waterId) { let waterpPmpInformation = await app.fs.anxinyun.get(`structures/${strucOne.id}/stations?token=${data.token}`, { query: { factorId: waterId } }) || [] let dataId = [] waterpPmpInformation.forEach(v => { v.groups.forEach(s => { s.stations.forEach(f => { dataId.push(f.id) }) }) }) if (dataId.length) { // 当前时间 let todayOne = await app.fs.anxinyun.get(`stations/theme/data?token=${data.token}`, { query: { stations: dataId.join(), startTime: moment().startOf('day').format('YYYY-MM-DD HH:mm:ss'), endTime: moment().endOf('day').format('YYYY-MM-DD HH:mm:ss'), limit: 1 } }) || [] waterPumpStateAll.push({ strucId: strucOne.id, name: strucOne.name, data: todayOne.stations }) pump = todayOne.stations || [] todayOne.stations && todayOne.stations.forEach(d => { daySun += d.data[0] && d.data[0].eMotor_EQ || 0 }) // 今天 let dayCollect = await app.fs.anxinyun.get(`stations/data/theme?token=${data.token}`, { query: { stations: dataId.join(), begin: moment().startOf('day').format('x'), end: moment().endOf('day').format('x'), aggtype: "h", method: 'diff' } }) || [] if (dayCollect.length) { dayCollect[0].stations.forEach(f => { f.data.forEach(h => { if (!h.changData) { day1 += h.values.eMotor_EQ } }) }) } // 本月 let monthCollect = await app.fs.anxinyun.get(`stations/data/theme?token=${data.token}`, { query: { stations: dataId.join(), begin: moment().startOf('month').format('x'), end: moment().endOf('month').format('x'), aggtype: "d", method: 'diff' } }) || [] if (monthCollect.length) { monthCollect[0].stations.forEach(f => { f.data.forEach(h => { if (!h.changData) { day30 += h.values.eMotor_EQ } }) }) } // 今年 let yearCollect = await app.fs.anxinyun.get(`stations/data/theme?token=${data.token}`, { query: { stations: dataId.join(), begin: moment().startOf('year').format('x'), end: moment().endOf('year').format('x'), aggtype: "d", method: 'diff' } }) || [] if (yearCollect.length) { yearCollect[0].stations.map(f => { f.data.forEach(h => { if (!h.changData) { day365 += h.values.eMotor_EQ } }) }) } let current = await app.fs.anxinyun.get(`stations/theme/data?token=${data.token}`, { query: { stations: dataId.join(), startTime: moment().add(-6, 'hours').format('YYYY-MM-DD HH:mm:ss'), endTime: moment().format('YYYY-MM-DD HH:mm:ss'), limit: 1440 } }) || {} currentSix['struc' + strucOne.id] = JSON.stringify(current.stations) let threedata = [] let timeSet = new Set() if (current.stations && current.stations.length) { current.stations.map(p => { p.data.map(s => { timeSet.add(moment(s.time).format('YYYY-MM-DD HH:mm:ss')) }) }) let time = [...timeSet].sort((a, b) => moment(a).isBefore(b) ? -1 : 1) time.map(x => { let A = [] let B = [] let C = [] current.stations.map((s, index) => { let abcData = s.data.find(f => moment(f.time).format('YYYY-MM-DD HH:mm:ss') == x) || {} let a = abcData.eMotor_A_A let b = abcData.eMotor_B_A let c = abcData.eMotor_C_A if (a) A.push(a) if (b) B.push(b) if (c) C.push(c) }) const sum = (arr) => { let sum = 0 arr.map(h => { sum += h }) return sum } threedata.push({ A: A.length && (sum(A) / A.length) || null, B: B.length && (sum(B) / B.length) || null, C: C.length && (sum(C) / C.length) || null, time: x }) }) } threePhase['struc' + strucOne.id] = JSON.stringify(threedata) } } //进线柜 let wireCabinetId = pumpOne.find(v => v.name == '泵站进线柜').id if (wireCabinetId) { let dataList = await app.fs.anxinyun.get(`structures/${strucOne.id}/stations?token=${data.token}`, { query: { factorId: wireCabinetId } }) || [] let dataId = [] dataList.map(v => { v.groups.map(s => { s.stations.map(f => { dataId.push(f.id) }) }) }) if (dataId.length) { // 当前时间 let todayOne = await app.fs.anxinyun.get(`stations/theme/data?token=${data.token}`, { query: { stations: dataId.join(), startTime: moment().startOf('day').format('YYYY-MM-DD HH:mm:ss'), endTime: moment().endOf('day').format('YYYY-MM-DD HH:mm:ss'), limit: 1 } }) || {} let cabinetOne = todayOne.stations && todayOne.stations || [] cabinet['struc' + strucOne.id] = JSON.stringify(cabinetOne) cabinetOne.forEach(d => { let todayFindOne = d.data[0] || {} daySun += todayFindOne.eQF_EQ || 0 cabinetSun.push({ today: 0, sameMonth: 0, thisYear: 0, eQF_EQ: todayFindOne.eQF_EQ || 0, id: d.id, name: d.name, sQF_CLOSING: todayFindOne.sQF_CLOSING }) }) // 今天 let dayCollect = await app.fs.anxinyun.get(`stations/data/theme?token=${data.token}`, { query: { stations: dataId.join(), begin: moment().startOf('day').format('x'), end: moment().endOf('day').format('x'), aggtype: "h", method: 'diff' } }) || [] cabinetSun.forEach(p => { if (dayCollect.length) { dayCollect[0].stations.forEach(f => { if (p.id == f.id) { f.data.forEach(h => { if (!h.changData) { p.today = p.today + h.values.eQF_EQ p.sameMonth = p.sameMonth + h.values.eQF_EQ p.thisYear = p.thisYear + h.values.eQF_EQ day1 += h.values.eQF_EQ } }) } }) } }) // 本月 let monthCollect = await app.fs.anxinyun.get(`stations/data/theme?token=${data.token}`, { query: { stations: dataId.join(), begin: moment().startOf('month').format('x'), end: moment().endOf('month').format('x'), aggtype: "d", method: 'diff' } }) || [] cabinetSun.forEach(p => { if (monthCollect.length) { monthCollect[0].stations.forEach(f => { if (p.id == f.id) { f.data.forEach(h => { if (!h.changData) { p.sameMonth = p.sameMonth + h.values.eQF_EQ day30 += h.values.eQF_EQ } }) } }) } }) // 今年 let yearCollect = await app.fs.anxinyun.get(`stations/data/theme?token=${data.token}`, { query: { stations: dataId.join(), begin: moment().startOf('year').format('x'), end: moment().endOf('year').format('x'), aggtype: "d", method: 'diff' } }) || [] cabinetSun.forEach(p => { if (yearCollect.length) { yearCollect[0].stations.forEach(f => { if (p.id == f.id) { f.data.forEach(h => { if (!h.changData) { p.thisYear = p.thisYear + h.values.eQF_EQ day365 += h.values.eQF_EQ } }) } }) } }) } } sun.day1 = day1 sun.day30 = day30 + day1 sun.day365 = day365 + day1 sun.daySun = daySun let capacityOne = { pump, cabinetSun, sun } capacity['struc' + strucOne.id] = JSON.stringify(capacityOne) } } await app.redis.set("pumpStation_waterLevelAll", JSON.stringify(waterLevelData)) await app.redis.hmset("pumpStation_waterLevelSix", waterLevelSix) await app.redis.set("pumpStation_waterPumpStateAll", JSON.stringify(waterPumpStateAll)) await app.redis.hmset("pumpStation_capacity", capacity) await app.redis.hmset("pumpStation_currentSix", currentSix) await app.redis.hmset("pumpStation_cabinet", cabinet) await app.redis.hmset("pumpStation_threePhase", threePhase) } } catch (error) { console.error(error); } } ) return { workorderStatistics, } } let axyTokenCache = { token: null, orgId: null, expireTime: null //过期时间 } const getAnxinyunToken = async function (app) { try { if (!axyTokenCache.token || moment() > moment(axyTokenCache.expireTime)) { if (app.fs.opts.axyProject.split('/').length === 3) { const dataToAxy = { p: app.fs.opts.axyProject.split('/')[0], username: app.fs.opts.axyProject.split('/')[1], password: app.fs.opts.axyProject.split('/')[2], } const axyResponse = await app.fs.anxinyun.post('project/login', { data: dataToAxy }) if (axyResponse.authorized) { axyTokenCache.token = axyResponse.token //放进缓存 axyTokenCache.orgId = axyResponse.orgId //放进缓存 axyTokenCache.expireTime = moment().add(20, 'hour').format('YYYY-MM-DD HH:mm:ss') } } } return axyTokenCache } catch (error) { app.fs.logger.error(`sechedule: laborAttendance, error: ${error}`); } }