const Automate = require('sequelize-automate-freesun') const moment = require('moment'); const schedule = require('node-schedule') const initJob = require('./initJob'); //删除数据库源时 同步删除该数据源资源目录树下元数据 //数据源 数据库配置信息不能更改 若需更改则需要删除数据源重新配置 const taskRetryIndex = {} async function handleTask(app, task) { // const transaction = await app.fs.dc.orm.transaction(); const startTime = moment() const { models } = app.fs.dc if (!taskRetryIndex[task.id]) taskRetryIndex[task.id] = 0; try { const dataSource = await models.DataSource.findOne({ where: { id: task.dataSourceId } }); if (dataSource) { const dbOptions = createDbOptions(dataSource.config); const automate = new Automate(dbOptions, {}); const tablesOrign = await automate.getTables(); //获取当前采集任务数据源pg库表字段索引外键数据 const tables = {} Object.keys(tablesOrign).forEach(key => { if (key.indexOf('_airbyte_raw') < 0) { //过滤掉临时表 后端同步的库里存在临时表(_airbyte_raw开头) tables[key] = tablesOrign[key] } }) const dataToSave = { code: dataSource.config.database, name: dataSource.config.database, catalog: dataSource.mountPath, parent: null, description: null, type: "库", createBy: 1, createAt: moment(), updateAt: null, user: { id: 1, name: "超级管理员", username: "SuperAdmin" }, attributesParam: null, catalogKey: dataSource.catalogKey } //初始化数据 新增 数据库 表 字段索引外键数据 let databaseFind = await models.MetadataDatabase.findOne({ where: { code: dataSource.config.database, type: '库', catalog: dataSource.mountPath, }//目录树下库只会存在一个 判断是否有库类型元数据 没有则新增库和表元数据 有库数据则比较更新表类型元数据 全量更新字段索引数据 }) let databaseId = databaseFind ? databaseFind.id : null; const newTableNames = [] if (databaseFind) { //更新表类型元数据 Object.keys(tables).forEach(key => { newTableNames.push(key) }); //删除不存在表元数据 await models.MetadataDatabase.destroy({ where: { name: { $notIn: newTableNames }, type: '表', catalog: dataSource.mountPath, } }) //录入新增的表元数据 const metaDatabaseTables = await models.MetadataDatabase.findAll({ where: { type: '表', catalog: dataSource.mountPath, } }) const tableBodys = [] Object.keys(tables).forEach(table => { if (!metaDatabaseTables.find(s => s.code == table)) { dataToSave.parent = databaseFind.id; dataToSave.name = table; dataToSave.code = table; dataToSave.type = '表'; dataToSave.datasourceConfig = dataSource.config; const tableObj = { ...dataToSave } tableBodys.push(tableObj) } }); await models.MetadataDatabase.bulkCreate(tableBodys, { returning: true }); } else { //新增库、表类型数据 //库类型元数据存储 const databaseRslt = await models.MetadataDatabase.create(dataToSave) databaseId = databaseRslt.id //表元数据存储 if (databaseRslt && databaseRslt.id) { const tableBodys = [] Object.keys(tables).forEach(table => { dataToSave.parent = databaseRslt.id; dataToSave.name = table; dataToSave.code = table; dataToSave.type = '表'; dataToSave.datasourceConfig = dataSource.config; const tableObj = { ...dataToSave } tableBodys.push(tableObj) }); await models.MetadataDatabase.bulkCreate(tableBodys, { returning: true }); } } if (databaseId) { const metaDatabaseTables = await models.MetadataDatabase.findAll({ where: { type: '表', catalog: dataSource.mountPath, parent: databaseId } }) //字段索引外键上一次存储集合 const tableChildrens = await models.MetadataDatabase.findAll({ where: { type: { $in: ['字段', '索引', '外键'] }, catalog: dataSource.mountPath, } }) let fieldBodys = [] let fieldRslt = null; if (tableChildrens.length == 0) { //首次执行定时任务新增 for (let table of metaDatabaseTables) { const children = handleAddTableChildren(dataToSave, tables, table); fieldBodys = fieldBodys.concat(children) } fieldRslt = await models.MetadataDatabase.bulkCreate(fieldBodys); } else {//更新数据 const deleteIds = [];//字段索引外键删除id集合 const deleteParentIds = [];//删除表集合 tableChildrens.forEach(s => { let table = metaDatabaseTables.find(x => x.id == s.parent) if (table) { if (s.type == '字段' && !tables[table.name].structures[s.code]) { deleteIds.push(s.id) } if (s.type == '外键' && !tables[table.name].foreignKeys.find(x => x.columnName == s.code)) { deleteIds.push(s.id) } if (s.type == '索引' && !tables[table.name].indexes.find(x => x.name == s.code)) { deleteIds.push(s.id) } } else { if (!deleteParentIds.find(n => n == s.parent)) deleteParentIds.push(s.parent) } }) //判断新增- 新增表 新增表字段 Object.keys(tables).forEach(key => { const table = metaDatabaseTables.find(s => s.code == key) if (tableChildrens.find(s => s.parent == table.id)) { const children = handleAddTableChildren(dataToSave, tables, table, tableChildrens); fieldBodys = fieldBodys.concat(children) } else { const children = handleAddTableChildren(dataToSave, tables, table); fieldBodys = fieldBodys.concat(children) } }) //删除不存在的字段索引 外键 await models.MetadataDatabase.destroy({ where: { type: { $in: ['字段', '索引', '外键'] }, catalog: dataSource.mountPath, $or: [ { parent: { $in: deleteParentIds } }, { id: { $in: deleteIds } } ] } }) //增加新增的字段索引外键 if (fieldBodys.length > 0) { fieldRslt = await models.MetadataDatabase.bulkCreate(fieldBodys); } else { fieldRslt = true; } } if (fieldRslt) { const endTime = moment() const logBody = { task: task.id, success: true, details: '采集成功', startTime: startTime, endTime: endTime } //日志记录 await models.AcquisitionLog.create(logBody) taskRetryIndex[task.id] = 0; } } } console.log('===========------------------------------------------------------------' + taskRetryIndex[task.id] + '----' + task.taskName + '------' + moment().format('HH:mm:ss')) } catch (error) { // await transaction.rollback(); const endTime = moment() let message = error.message ? error.message : error //空表auto-sequelize会抛出异常 提示信息处理 if (error.message && error.message.indexOf('No description found for') > -1) { if (error.message.split('.') && error.message.split('.').length > 0) { message = error.message.split('.')[0].split('"')[1] + '未定义任何字段' } } const logBody = { task: task.id, success: false, details: '采集失败:' + JSON.stringify(message).substring(0, 247), startTime: startTime, endTime: endTime } await models.AcquisitionLog.create(logBody) taskRetryIndex[task.id]++; //处理采集失败重试 if (task.retried && task.retryCount && task.retryTime && taskRetryIndex[task.id] <= task.retryCount) { setTimeout(() => { handleTask(app, task) }, 1000 * 60 * task.retryTime); } if (taskRetryIndex[task.id] && taskRetryIndex[task.id] == task.retryCount) { taskRetryIndex[task.id] = 0; } app.fs.logger.error(`sechedule: handleTask, error: ${error}`); } } //处理字段 索引 外键 新增body function handleAddTableChildren(dataToSave, tables, table, tableChildrens) { const fieldBodys = []; Object.keys(tables[table.name].structures).forEach(key => { if (!tableChildrens || !tableChildrens.find(s => s.code == key)) { dataToSave.parent = table.id; dataToSave.name = tables[table.name].structures[key].comment ? tables[table.name].structures[key].comment.substring(0, 250) : key; // dataToSave.name = key; dataToSave.code = key; dataToSave.type = '字段'; const tableObj = { ...dataToSave } fieldBodys.push(tableObj) } }) tables[table.name].foreignKeys.forEach(v => { if (!tableChildrens || !tableChildrens.find(s => s.code == v.columnName)) { dataToSave.parent = table.id; dataToSave.name = v.columnName; dataToSave.code = v.columnName; dataToSave.type = '外键'; const tableObj = { ...dataToSave } fieldBodys.push(tableObj) } }) tables[table.name].indexes.forEach(v => { if (!tableChildrens || !tableChildrens.find(s => s.code == v.name)) { dataToSave.parent = table.id; dataToSave.name = v.name; dataToSave.code = v.name; dataToSave.type = '索引'; const tableObj = { ...dataToSave } fieldBodys.push(tableObj) } }) return fieldBodys; } function createDbOptions(params) { const dbOptions = { database: params.database, username: params.user, password: params.password, dialect: 'postgres', host: params.host, port: params.port, define: { underscored: false, freezeTableName: false, charset: 'utf8mb4', timezone: '+00: 00', dialectOptions: { collate: 'utf8_general_ci', }, timestamps: false, }, } return dbOptions } module.exports = { handleTask }