const Automate = require('sequelize-automate-freesun') const moment = require('moment'); const schedule = require('node-schedule') const initJob = require('./initJob'); //删除数据库源时 同步删除该数据源资源目录树下元数据 //数据源 数据库配置信息不能更改 若需更改则需要删除数据源重新配置 const taskRetryIndex = {} async function handleTask(app, task) { // const transaction = await app.fs.dc.orm.transaction(); const startTime = moment() const { models } = app.fs.dc if (!taskRetryIndex[task.id]) taskRetryIndex[task.id] = 0; try { const dataSource = await models.DataSource.findOne({ where: { id: task.dataSourceId } }); if (dataSource) { const dbOptions = createDbOptions(dataSource.config); const automate = new Automate(dbOptions, {}); const tables = await automate.getTables(); //获取当前采集任务数据源pg库表字段索引外键数据 const dataToSave = { code: dataSource.config.database, name: dataSource.config.database, catalog: dataSource.mountPath, parent: null, description: null, type: "库", createBy: 1, createAt: moment(), updateAt: null, user: { id: 1, name: "超级管理员", username: "SuperAdmin" }, attributesParam: null, catalogKey: dataSource.catalogKey } //初始化数据 新增 数据库 表 字段索引外键数据 let databaseFind = await models.MetadataDatabase.findOne({ where: { code: dataSource.config.database, type: '库', catalog: dataSource.mountPath, }//目录树下库只会存在一个 判断是否有库类型元数据 没有则新增库和表元数据 有库数据则比较更新表类型元数据 全量更新字段索引数据 }) let databaseId = databaseFind ? databaseFind.id : null; if (databaseFind) { //更新表类型元数据 const newTableNames = [] Object.keys(tables).forEach(key => { newTableNames.push(key) }); //删除不存在表元数据 await models.MetadataDatabase.destroy({ where: { name: { $notIn: newTableNames }, type: '表', catalog: dataSource.mountPath, } }) //录入新增的表元数据 const metaDatabaseTables = await models.MetadataDatabase.findAll({ where: { type: '表', catalog: dataSource.mountPath, } }) const tableBodys = [] Object.keys(tables).forEach(table => { if (!metaDatabaseTables.find(s => s.code == table)) { dataToSave.parent = databaseFind.id; dataToSave.name = table; dataToSave.code = table; dataToSave.type = '表'; const tableObj = { ...dataToSave } tableBodys.push(tableObj) } }); await models.MetadataDatabase.bulkCreate(tableBodys, { returning: true }); } else { //新增库、表类型数据 //库类型元数据存储 const databaseRslt = await models.MetadataDatabase.create(dataToSave) databaseId = databaseRslt.id //表元数据存储 if (databaseRslt && databaseRslt.id) { const tableBodys = [] Object.keys(tables).forEach(table => { dataToSave.parent = databaseRslt.id; dataToSave.name = table; dataToSave.code = table; dataToSave.type = '表'; const tableObj = { ...dataToSave } tableBodys.push(tableObj) }); await models.MetadataDatabase.bulkCreate(tableBodys, { returning: true }); } } if (databaseId) { const metaDatabaseTables = await models.MetadataDatabase.findAll({ where: { type: '表', catalog: dataSource.mountPath, parent: databaseId } }) //字段/索引/外键全量更新 先删除之前的字段 再录入新的数据 await models.MetadataDatabase.destroy({ where: { type: { $in: ['字段', '索引', '外键'] }, catalog: dataSource.mountPath, } }) const fieldBodys = [] for (let table of metaDatabaseTables) { Object.keys(tables[table.name].structures).forEach(key => { dataToSave.parent = table.id; dataToSave.name = tables[table.name].structures[key].comment || key; dataToSave.code = key; dataToSave.type = '字段'; const tableObj = { ...dataToSave } fieldBodys.push(tableObj) }) tables[table.name].foreignKeys.forEach(v => { dataToSave.parent = table.id; dataToSave.name = v.columnName; dataToSave.code = v.columnName; dataToSave.type = '外键'; const tableObj = { ...dataToSave } fieldBodys.push(tableObj) }) tables[table.name].indexes.forEach(v => { dataToSave.parent = table.id; dataToSave.name = v.name; dataToSave.code = v.name; dataToSave.type = '索引'; const tableObj = { ...dataToSave } fieldBodys.push(tableObj) }) } const fieldRslt = await models.MetadataDatabase.bulkCreate(fieldBodys); if (fieldRslt) { const endTime = moment() const logBody = { task: task.id, success: true, details: '采集成功', startTime: startTime, endTime: endTime } //日志记录 await models.AcquisitionLog.create(logBody) taskRetryIndex[task.id] = 0; } } } console.log('===========------------------------------------------------------------' + taskRetryIndex[task.id] + '----' + task.taskName + '------' + moment().format('HH:mm:ss')) } catch (error) { // await transaction.rollback(); const endTime = moment() const logBody = { task: task.id, success: true, details: '采集失败' + JSON.stringify(error).substring(0, 248), startTime: startTime, endTime: endTime } await models.AcquisitionLog.create(logBody) taskRetryIndex[task.id]++; //处理采集失败重试 if (task.retried && task.retryCount && task.retryTime && taskRetryIndex[task.id] < task.retryCount) { setTimeout(() => { handleTask(app, task) }, 1000 * 60 * task.retryCount); } app.fs.logger.error(`sechedule: handleTask, error: ${error}`); } } function createDbOptions(params) { const dbOptions = { database: params.database, username: params.user, password: params.password, dialect: 'postgres', host: params.host, port: params.port, define: { underscored: false, freezeTableName: false, charset: 'utf8mb4', timezone: '+00: 00', dialectOptions: { collate: 'utf8_general_ci', }, timestamps: false, }, } return dbOptions } module.exports = { handleTask }