Browse Source

kafka 提示

dev
巴林闲侠 2 years ago
parent
commit
612a1ed3ce
  1. 4
      api/app/lib/service/kafka.js

4
api/app/lib/service/kafka.js

@ -1,6 +1,7 @@
'use strict';
const moment = require('moment');
const Kafka = require('kafka-node');
module.exports = async function factory(app, opts) {
try {
const client = new Kafka.KafkaClient({ kafkaHost: opts.kafka.rootURL, fromOffset: true });
@ -9,14 +10,13 @@ module.exports = async function factory(app, opts) {
app.fs.logger.log('error', "[FS-KAFKA]", err);
});
producer.on("ready", function () {
console.log('111111 ready 666666666666')
console.log('Kafka ready')
})
let groupId = 'group' + Math.random()
let consumer = new Kafka.ConsumerGroup(Object.assign({}, { groupId, fromOffset: 'latest' }, { kafkaHost: opts.kafka.rootURL }), ['anxinyun_alarm'])
consumer.on('message', async function (message) {
let msg = JSON.parse(message.value)
console.log('kafka consumer----------接收到消息');
await savePullAlarm(msg);
})

Loading…
Cancel
Save