政务数据资源中心(Government data Resource center) 03专项3期主要建设内容
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

213 lines
8.5 KiB

const Automate = require('sequelize-automate-freesun')
const moment = require('moment');
const schedule = require('node-schedule')
const initJob = require('./initJob');
//删除数据库源时 同步删除该数据源资源目录树下元数据
//数据源 数据库配置信息不能更改 若需更改则需要删除数据源重新配置
const taskRetryIndex = {}
async function handleTask(app, task) {
// const transaction = await app.fs.dc.orm.transaction();
const startTime = moment()
const { models } = app.fs.dc
if (!taskRetryIndex[task.id]) taskRetryIndex[task.id] = 0;
try {
const dataSource = await models.DataSource.findOne({
where: {
id: task.dataSourceId
}
});
if (dataSource) {
const dbOptions = createDbOptions(dataSource.config);
const automate = new Automate(dbOptions, {});
const tables = await automate.getTables(); //获取当前采集任务数据源pg库表字段索引外键数据
const dataToSave = {
code: dataSource.config.database,
name: dataSource.config.database,
catalog: dataSource.mountPath,
parent: null,
description: null,
type: "库",
createBy: 1,
createAt: moment(),
updateAt: null,
user: { id: 1, name: "超级管理员", username: "SuperAdmin" },
attributesParam: null,
catalogKey: dataSource.catalogKey
}
//初始化数据 新增 数据库 表 字段索引外键数据
let databaseFind = await models.MetadataDatabase.findOne({
where: {
code: dataSource.config.database,
type: '库',
catalog: dataSource.mountPath,
}//目录树下库只会存在一个 判断是否有库类型元数据 没有则新增库和表元数据 有库数据则比较更新表类型元数据 全量更新字段索引数据
})
let databaseId = databaseFind ? databaseFind.id : null;
if (databaseFind) { //更新表类型元数据
const newTableNames = []
Object.keys(tables).forEach(key => { newTableNames.push(key) });
//删除不存在表元数据
await models.MetadataDatabase.destroy({
where: {
name: { $notIn: newTableNames },
type: '表',
catalog: dataSource.mountPath,
}
})
//录入新增的表元数据
const metaDatabaseTables = await models.MetadataDatabase.findAll({
where: {
type: '表',
catalog: dataSource.mountPath,
}
})
const tableBodys = []
Object.keys(tables).forEach(table => {
if (!metaDatabaseTables.find(s => s.code == table)) {
dataToSave.parent = databaseFind.id;
dataToSave.name = table;
dataToSave.code = table;
dataToSave.type = '表';
const tableObj = { ...dataToSave }
tableBodys.push(tableObj)
}
});
await models.MetadataDatabase.bulkCreate(tableBodys, { returning: true });
} else { //新增库、表类型数据
//库类型元数据存储
const databaseRslt = await models.MetadataDatabase.create(dataToSave)
databaseId = databaseRslt.id
//表元数据存储
if (databaseRslt && databaseRslt.id) {
const tableBodys = []
Object.keys(tables).forEach(table => {
dataToSave.parent = databaseRslt.id;
dataToSave.name = table;
dataToSave.code = table;
dataToSave.type = '表';
const tableObj = { ...dataToSave }
tableBodys.push(tableObj)
});
await models.MetadataDatabase.bulkCreate(tableBodys, { returning: true });
}
}
if (databaseId) {
const metaDatabaseTables = await models.MetadataDatabase.findAll({
where: {
type: '表',
catalog: dataSource.mountPath,
parent: databaseId
}
})
//字段/索引/外键全量更新 先删除之前的字段 再录入新的数据
await models.MetadataDatabase.destroy({
where: {
type: { $in: ['字段', '索引', '外键'] },
catalog: dataSource.mountPath,
}
})
const fieldBodys = []
for (let table of metaDatabaseTables) {
Object.keys(tables[table.name].structures).forEach(key => {
dataToSave.parent = table.id;
dataToSave.name = tables[table.name].structures[key].comment || key;
dataToSave.code = key;
dataToSave.type = '字段';
const tableObj = { ...dataToSave }
fieldBodys.push(tableObj)
})
tables[table.name].foreignKeys.forEach(v => {
dataToSave.parent = table.id;
dataToSave.name = v.columnName;
dataToSave.code = v.columnName;
dataToSave.type = '外键';
const tableObj = { ...dataToSave }
fieldBodys.push(tableObj)
})
tables[table.name].indexes.forEach(v => {
dataToSave.parent = table.id;
dataToSave.name = v.name;
dataToSave.code = v.name;
dataToSave.type = '索引';
const tableObj = { ...dataToSave }
fieldBodys.push(tableObj)
})
}
const fieldRslt = await models.MetadataDatabase.bulkCreate(fieldBodys);
if (fieldRslt) {
const endTime = moment()
const logBody = {
task: task.id,
success: true,
details: '采集成功',
startTime: startTime,
endTime: endTime
}
//日志记录
await models.AcquisitionLog.create(logBody)
taskRetryIndex[task.id] = 0;
}
}
}
console.log('===========------------------------------------------------------------' + taskRetryIndex[task.id] + '----' + task.taskName + '------' + moment().format('HH:mm:ss'))
} catch (error) {
// await transaction.rollback();
const endTime = moment()
const logBody = {
task: task.id,
success: false,
details: '采集失败' + JSON.stringify(error).substring(0, 248),
startTime: startTime,
endTime: endTime
}
await models.AcquisitionLog.create(logBody)
taskRetryIndex[task.id]++;
//处理采集失败重试
if (task.retried && task.retryCount && task.retryTime && taskRetryIndex[task.id] < task.retryCount) {
setTimeout(() => {
handleTask(app, task)
}, 1000 * 60 * task.retryCount);
}
app.fs.logger.error(`sechedule: handleTask, error: ${error}`);
}
}
function createDbOptions(params) {
const dbOptions = {
database: params.database,
username: params.user,
password: params.password,
dialect: 'postgres',
host: params.host,
port: params.port,
define: {
underscored: false,
freezeTableName: false,
charset: 'utf8mb4',
timezone: '+00: 00',
dialectOptions: {
collate: 'utf8_general_ci',
},
timestamps: false,
},
}
return dbOptions
}
module.exports = { handleTask }