wenlele 2 years ago
parent
commit
afd055e8ae
  1. 8
      api/app/lib/controllers/control/data.js
  2. 2
      api/app/lib/service/clickHouseClient.js
  3. 4
      api/app/lib/service/kafka.js

8
api/app/lib/controllers/control/data.js

@ -305,7 +305,9 @@ async function getLatestDynamic(ctx) {
//查项目名称 查用户名 //查项目名称 查用户名
let pepPojectIds = new Set(), notedUserIds = new Set(); let pepPojectIds = new Set(), notedUserIds = new Set();
for (let p of news) { for (let p of news) {
pepPojectIds.add(p.projectCorrelation.pepProjectId); if(p.projectCorrelation && p.projectCorrelation.pepProjectId){
pepPojectIds.add(p.projectCorrelation.pepProjectId);
}
if (p.emailSendLog) { if (p.emailSendLog) {
p.emailSendLog.toPepUserIds.map(u => { p.emailSendLog.toPepUserIds.map(u => {
@ -317,8 +319,8 @@ async function getLatestDynamic(ctx) {
} }
} }
let pepProjects = pepPojectIds.size ? await clickHouse.projectManage.query(` let pepProjects = pepPojectIds.size ? await clickHouse.projectManage.query(`
SELECT id, project_name FROM t_pim_project WHERE id IN (${[...pepPojectIds].join(',')},-1) SELECT id, project_name FROM t_pim_project WHERE id IN (${[...pepPojectIds].join(',')},-1)
`).toPromise() : []; `).toPromise() : [];
let userPepRes = notedUserIds.size ? await clickHouse.pepEmis.query( let userPepRes = notedUserIds.size ? await clickHouse.pepEmis.query(
`SELECT DISTINCT user.id AS id, "user"."name" AS name FROM user WHERE user.id IN (${[...notedUserIds].join(',')},-1) `SELECT DISTINCT user.id AS id, "user"."name" AS name FROM user WHERE user.id IN (${[...notedUserIds].join(',')},-1)

2
api/app/lib/service/clickHouseClient.js

@ -11,7 +11,7 @@ function factory (app, opts) {
app.fs.clickHouse[d.name] = new ClickHouse({ app.fs.clickHouse[d.name] = new ClickHouse({
url: url, url: url,
port: port, port: port,
debug: true || opts.dev, debug: opts.dev,
format: "json", format: "json",
basicAuth: user && password ? { basicAuth: user && password ? {
username: user, username: user,

4
api/app/lib/service/kafka.js

@ -1,6 +1,7 @@
'use strict'; 'use strict';
const moment = require('moment'); const moment = require('moment');
const Kafka = require('kafka-node'); const Kafka = require('kafka-node');
module.exports = async function factory(app, opts) { module.exports = async function factory(app, opts) {
try { try {
const client = new Kafka.KafkaClient({ kafkaHost: opts.kafka.rootURL, fromOffset: true }); const client = new Kafka.KafkaClient({ kafkaHost: opts.kafka.rootURL, fromOffset: true });
@ -9,14 +10,13 @@ module.exports = async function factory(app, opts) {
app.fs.logger.log('error', "[FS-KAFKA]", err); app.fs.logger.log('error', "[FS-KAFKA]", err);
}); });
producer.on("ready", function () { producer.on("ready", function () {
console.log('111111 ready 666666666666') console.log('Kafka ready')
}) })
let groupId = 'group' + Math.random() let groupId = 'group' + Math.random()
let consumer = new Kafka.ConsumerGroup(Object.assign({}, { groupId, fromOffset: 'latest' }, { kafkaHost: opts.kafka.rootURL }), ['anxinyun_alarm']) let consumer = new Kafka.ConsumerGroup(Object.assign({}, { groupId, fromOffset: 'latest' }, { kafkaHost: opts.kafka.rootURL }), ['anxinyun_alarm'])
consumer.on('message', async function (message) { consumer.on('message', async function (message) {
let msg = JSON.parse(message.value) let msg = JSON.parse(message.value)
console.log('kafka consumer----------接收到消息');
await savePullAlarm(msg); await savePullAlarm(msg);
}) })

Loading…
Cancel
Save