You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
307 lines
13 KiB
307 lines
13 KiB
|
|
const Automate = require('sequelize-automate-freesun')
|
|
const moment = require('moment');
|
|
const schedule = require('node-schedule')
|
|
const initJob = require('./initJob');
|
|
//删除数据库源时 同步删除该数据源资源目录树下元数据
|
|
//数据源 数据库配置信息不能更改 若需更改则需要删除数据源重新配置
|
|
const taskRetryIndex = {}
|
|
async function handleTask(app, task) {
|
|
// const transaction = await app.fs.dc.orm.transaction();
|
|
const startTime = moment()
|
|
const { models } = app.fs.dc
|
|
if (!taskRetryIndex[task.id]) taskRetryIndex[task.id] = 0;
|
|
try {
|
|
const dataSource = await models.DataSource.findOne({
|
|
where: {
|
|
id: task.dataSourceId
|
|
}
|
|
});
|
|
|
|
if (dataSource) {
|
|
const dbOptions = createDbOptions(dataSource.config);
|
|
const automate = new Automate(dbOptions, {});
|
|
const tablesOrign = await automate.getTables(); //获取当前采集任务数据源pg库表字段索引外键数据
|
|
const tables = {}
|
|
Object.keys(tablesOrign).forEach(key => {
|
|
if (key.indexOf('_airbyte_raw') < 0) { //过滤掉临时表 后端同步的库里存在临时表(_airbyte_raw开头)
|
|
tables[key] = tablesOrign[key]
|
|
}
|
|
})
|
|
const dataToSave = {
|
|
code: dataSource.config.database,
|
|
name: dataSource.config.database,
|
|
catalog: dataSource.mountPath,
|
|
parent: null,
|
|
description: null,
|
|
type: "库",
|
|
createBy: 1,
|
|
createAt: moment(),
|
|
updateAt: null,
|
|
user: { id: 1, name: "超级管理员", username: "SuperAdmin" },
|
|
attributesParam: null,
|
|
catalogKey: dataSource.catalogKey
|
|
}
|
|
|
|
//初始化数据 新增 数据库 表 字段索引外键数据
|
|
let databaseFind = await models.MetadataDatabase.findOne({
|
|
where: {
|
|
code: dataSource.config.database,
|
|
type: '库',
|
|
catalog: dataSource.mountPath,
|
|
}//目录树下库只会存在一个 判断是否有库类型元数据 没有则新增库和表元数据 有库数据则比较更新表类型元数据 全量更新字段索引数据
|
|
})
|
|
let databaseId = databaseFind ? databaseFind.id : null;
|
|
|
|
const newTableNames = []
|
|
if (databaseFind) { //更新表类型元数据
|
|
Object.keys(tables).forEach(key => { newTableNames.push(key) });
|
|
|
|
//删除不存在表元数据
|
|
await models.MetadataDatabase.destroy({
|
|
where: {
|
|
name: { $notIn: newTableNames },
|
|
type: '表',
|
|
catalog: dataSource.mountPath,
|
|
}
|
|
})
|
|
|
|
//录入新增的表元数据
|
|
const metaDatabaseTables = await models.MetadataDatabase.findAll({
|
|
where: {
|
|
type: '表',
|
|
catalog: dataSource.mountPath,
|
|
}
|
|
})
|
|
|
|
const tableBodys = []
|
|
Object.keys(tables).forEach(table => {
|
|
if (!metaDatabaseTables.find(s => s.code == table)) {
|
|
dataToSave.parent = databaseFind.id;
|
|
dataToSave.name = table;
|
|
dataToSave.code = table;
|
|
dataToSave.type = '表';
|
|
dataToSave.datasourceConfig = dataSource.config;
|
|
|
|
const tableObj = { ...dataToSave }
|
|
tableBodys.push(tableObj)
|
|
}
|
|
});
|
|
await models.MetadataDatabase.bulkCreate(tableBodys, { returning: true });
|
|
|
|
} else { //新增库、表类型数据
|
|
//库类型元数据存储
|
|
const databaseRslt = await models.MetadataDatabase.create(dataToSave)
|
|
databaseId = databaseRslt.id
|
|
//表元数据存储
|
|
if (databaseRslt && databaseRslt.id) {
|
|
const tableBodys = []
|
|
Object.keys(tables).forEach(table => {
|
|
dataToSave.parent = databaseRslt.id;
|
|
dataToSave.name = table;
|
|
dataToSave.code = table;
|
|
dataToSave.type = '表';
|
|
dataToSave.datasourceConfig = dataSource.config;
|
|
const tableObj = { ...dataToSave }
|
|
tableBodys.push(tableObj)
|
|
});
|
|
await models.MetadataDatabase.bulkCreate(tableBodys, { returning: true });
|
|
}
|
|
}
|
|
|
|
if (databaseId) {
|
|
const metaDatabaseTables = await models.MetadataDatabase.findAll({
|
|
where: {
|
|
type: '表',
|
|
catalog: dataSource.mountPath,
|
|
parent: databaseId
|
|
}
|
|
})
|
|
|
|
//字段索引外键上一次存储集合
|
|
const tableChildrens = await models.MetadataDatabase.findAll({
|
|
where: {
|
|
type: { $in: ['字段', '索引', '外键'] },
|
|
catalog: dataSource.mountPath,
|
|
}
|
|
})
|
|
let fieldBodys = []
|
|
let fieldRslt = null;
|
|
if (tableChildrens.length == 0) { //首次执行定时任务新增
|
|
for (let table of metaDatabaseTables) {
|
|
const children = handleAddTableChildren(dataToSave, tables, table);
|
|
fieldBodys = fieldBodys.concat(children)
|
|
}
|
|
fieldRslt = await models.MetadataDatabase.bulkCreate(fieldBodys);
|
|
} else {//更新数据
|
|
const deleteIds = [];//字段索引外键删除id集合
|
|
const deleteParentIds = [];//删除表集合
|
|
tableChildrens.forEach(s => {
|
|
let table = metaDatabaseTables.find(x => x.id == s.parent)
|
|
if (table) {
|
|
if (s.type == '字段' && !tables[table.name].structures[s.code]) {
|
|
deleteIds.push(s.id)
|
|
}
|
|
if (s.type == '外键' && !tables[table.name].foreignKeys.find(x => x.columnName == s.code)) {
|
|
deleteIds.push(s.id)
|
|
}
|
|
if (s.type == '索引' && !tables[table.name].indexes.find(x => x.name == s.code)) {
|
|
deleteIds.push(s.id)
|
|
}
|
|
} else {
|
|
if (!deleteParentIds.find(n => n == s.parent)) deleteParentIds.push(s.parent)
|
|
}
|
|
})
|
|
|
|
//判断新增- 新增表 新增表字段
|
|
Object.keys(tables).forEach(key => {
|
|
const table = metaDatabaseTables.find(s => s.code == key)
|
|
if (tableChildrens.find(s => s.parent == table.id)) {
|
|
const children = handleAddTableChildren(dataToSave, tables, table, tableChildrens);
|
|
fieldBodys = fieldBodys.concat(children)
|
|
} else {
|
|
const children = handleAddTableChildren(dataToSave, tables, table);
|
|
fieldBodys = fieldBodys.concat(children)
|
|
}
|
|
})
|
|
|
|
//删除不存在的字段索引 外键
|
|
await models.MetadataDatabase.destroy({
|
|
where: {
|
|
type: { $in: ['字段', '索引', '外键'] },
|
|
catalog: dataSource.mountPath,
|
|
$or: [
|
|
{ parent: { $in: deleteParentIds } },
|
|
{ id: { $in: deleteIds } }
|
|
]
|
|
}
|
|
})
|
|
|
|
|
|
//增加新增的字段索引外键
|
|
if (fieldBodys.length > 0) {
|
|
fieldRslt = await models.MetadataDatabase.bulkCreate(fieldBodys);
|
|
} else {
|
|
fieldRslt = true;
|
|
}
|
|
}
|
|
|
|
if (fieldRslt) {
|
|
const endTime = moment()
|
|
const logBody = {
|
|
task: task.id,
|
|
success: true,
|
|
details: '采集成功',
|
|
startTime: startTime,
|
|
endTime: endTime
|
|
}
|
|
//日志记录
|
|
await models.AcquisitionLog.create(logBody)
|
|
taskRetryIndex[task.id] = 0;
|
|
}
|
|
}
|
|
}
|
|
|
|
console.log('===========------------------------------------------------------------' + taskRetryIndex[task.id] + '----' + task.taskName + '------' + moment().format('HH:mm:ss'))
|
|
|
|
} catch (error) {
|
|
// await transaction.rollback();
|
|
const endTime = moment()
|
|
let message = error.message ? error.message : error
|
|
//空表auto-sequelize会抛出异常 提示信息处理
|
|
if (error.message && error.message.indexOf('No description found for') > -1) {
|
|
if (error.message.split('.') && error.message.split('.').length > 0) {
|
|
message = error.message.split('.')[0].split('"')[1] + '未定义任何字段'
|
|
}
|
|
}
|
|
|
|
const logBody = {
|
|
task: task.id,
|
|
success: false,
|
|
details: '采集失败:' + JSON.stringify(message).substring(0, 247),
|
|
startTime: startTime,
|
|
endTime: endTime
|
|
}
|
|
await models.AcquisitionLog.create(logBody)
|
|
taskRetryIndex[task.id]++;
|
|
//处理采集失败重试
|
|
if (task.retried && task.retryCount && task.retryTime && taskRetryIndex[task.id] <= task.retryCount) {
|
|
setTimeout(() => {
|
|
handleTask(app, task)
|
|
}, 1000 * 60 * task.retryTime);
|
|
}
|
|
if (taskRetryIndex[task.id] && taskRetryIndex[task.id] == task.retryCount) {
|
|
taskRetryIndex[task.id] = 0;
|
|
}
|
|
app.fs.logger.error(`sechedule: handleTask, error: ${error}`);
|
|
}
|
|
}
|
|
|
|
//处理字段 索引 外键 新增body
|
|
function handleAddTableChildren(dataToSave, tables, table, tableChildrens) {
|
|
const fieldBodys = [];
|
|
Object.keys(tables[table.name].structures).forEach(key => {
|
|
if (!tableChildrens || !tableChildrens.find(s => s.code == key)) {
|
|
dataToSave.parent = table.id;
|
|
dataToSave.name = tables[table.name].structures[key].comment ?
|
|
tables[table.name].structures[key].comment.substring(0, 250)
|
|
: key;
|
|
// dataToSave.name = key;
|
|
dataToSave.code = key;
|
|
dataToSave.type = '字段';
|
|
const tableObj = { ...dataToSave }
|
|
fieldBodys.push(tableObj)
|
|
}
|
|
})
|
|
|
|
tables[table.name].foreignKeys.forEach(v => {
|
|
if (!tableChildrens || !tableChildrens.find(s => s.code == v.columnName)) {
|
|
dataToSave.parent = table.id;
|
|
dataToSave.name = v.columnName;
|
|
dataToSave.code = v.columnName;
|
|
dataToSave.type = '外键';
|
|
const tableObj = { ...dataToSave }
|
|
fieldBodys.push(tableObj)
|
|
}
|
|
|
|
})
|
|
|
|
tables[table.name].indexes.forEach(v => {
|
|
if (!tableChildrens || !tableChildrens.find(s => s.code == v.name)) {
|
|
dataToSave.parent = table.id;
|
|
dataToSave.name = v.name;
|
|
dataToSave.code = v.name;
|
|
dataToSave.type = '索引';
|
|
const tableObj = { ...dataToSave }
|
|
fieldBodys.push(tableObj)
|
|
}
|
|
})
|
|
|
|
return fieldBodys;
|
|
}
|
|
|
|
function createDbOptions(params) {
|
|
const dbOptions = {
|
|
database: params.database,
|
|
username: params.user,
|
|
password: params.password,
|
|
dialect: 'postgres',
|
|
host: params.host,
|
|
port: params.port,
|
|
define: {
|
|
underscored: false,
|
|
freezeTableName: false,
|
|
charset: 'utf8mb4',
|
|
timezone: '+00: 00',
|
|
dialectOptions: {
|
|
collate: 'utf8_general_ci',
|
|
},
|
|
timestamps: false,
|
|
},
|
|
}
|
|
return dbOptions
|
|
|
|
}
|
|
|
|
module.exports = { handleTask }
|
|
|
|
|