Browse Source

(*)元数据定时任务功能完善 数据源增加资源目录树key

master
peng.peng 2 years ago
parent
commit
e2c708b962
  1. 9
      api/app/lib/controllers/metadataAcquisition/dataSource.js
  2. 14
      api/app/lib/controllers/metadataAcquisition/initJob.js
  3. 66
      api/app/lib/controllers/metadataAcquisition/taskHandle.js
  4. 9
      api/app/lib/models/data_source.js
  5. 2
      scripts/0.0.3/schema/03_alter_t_data_source.sql
  6. 10
      web/client/src/sections/metadataAcquisition/components/steps/postgre/stepOne.js
  7. 4
      web/client/src/sections/metadataAcquisition/components/steps/postgre/stepTwo.js
  8. 6
      web/client/src/sections/metadataAcquisition/containers/acquisitionTask.js
  9. 9
      web/client/src/sections/metadataAcquisition/containers/adapter.js
  10. 4
      web/client/src/sections/metadataAcquisition/containers/dataSourceManagement.js

9
api/app/lib/controllers/metadataAcquisition/dataSource.js

@ -116,11 +116,20 @@ function deleteDataSource(opts) {
ctx.status = 400; ctx.status = 400;
ctx.body = { message: '数据源下存在采集任务,请删除任务后再删除数据源!' } ctx.body = { message: '数据源下存在采集任务,请删除任务后再删除数据源!' }
} else { } else {
const datasource = await models.DataSource.findOne({ where: { id: id } })
await models.DataSource.destroy({ await models.DataSource.destroy({
where: { where: {
id: id id: id
} }
}) })
//删除数据源绑定的资源目录下的元数据
await models.MetadataDatabase.destroy({
where: {
catalog: datasource.mountPath,
}
})
ctx.status = 204; ctx.status = 204;
ctx.body = { message: '删除数据源成功' } ctx.body = { message: '删除数据源成功' }
} }

14
api/app/lib/controllers/metadataAcquisition/initJob.js

@ -3,9 +3,11 @@ const { handleTask } = require('./taskHandle');
//根据任务配置生成定时任务 //根据任务配置生成定时任务
module.exports = async function (app, task) { module.exports = async function (app, task) {
let job = null let job = null
const { models } = app.fs.dc
const startTime = moment()
try { try {
job = app.fs.scheduleInit( job = app.fs.scheduleInit(
{ {
interval: task.cron, interval: task.cron,
@ -15,13 +17,21 @@ module.exports = async function (app, task) {
async () => { async () => {
try { try {
await handleTask(app, task); await handleTask(app, task);
console.log(task.taskName, moment().format('YYYY-MM-DD HH:mm:ss'));
} catch (error) { } catch (error) {
app.fs.logger.error(`sechedule: taskJobs, error: ${error}`) app.fs.logger.error(`sechedule: taskJobs, error: ${error}`)
} }
}); });
} catch (error) { } catch (error) {
const endTime = moment()
const logBody = {
task: task.id,
success: true,
details: '采集失败' + JSON.stringify(error).substring(0, 248),
startTime: startTime,
endTime: endTime
}
await models.AcquisitionLog.create(logBody)
app.fs.logger.error(`sechedule: taskJobs, error: ${error}`); app.fs.logger.error(`sechedule: taskJobs, error: ${error}`);
} }

66
api/app/lib/controllers/metadataAcquisition/taskHandle.js

@ -1,12 +1,16 @@
const Automate = require('sequelize-automate-freesun') const Automate = require('sequelize-automate-freesun')
const moment = require('moment'); const moment = require('moment');
//@toto删除数据库源时 同步删除该数据源资源目录树下元数据 const schedule = require('node-schedule')
const initJob = require('./initJob');
//删除数据库源时 同步删除该数据源资源目录树下元数据
//数据源 数据库配置信息不能更改 若需更改则需要删除数据源重新配置 //数据源 数据库配置信息不能更改 若需更改则需要删除数据源重新配置
let i = 0;
async function handleTask(app, task) { async function handleTask(app, task) {
const transaction = await app.fs.dc.orm.transaction(); // const transaction = await app.fs.dc.orm.transaction();
try { const startTime = moment()
const { models } = app.fs.dc const { models } = app.fs.dc
try {
const dataSource = await models.DataSource.findOne({ const dataSource = await models.DataSource.findOne({
where: { where: {
id: task.dataSourceId id: task.dataSourceId
@ -16,7 +20,7 @@ async function handleTask(app, task) {
if (dataSource) { if (dataSource) {
const dbOptions = createDbOptions(dataSource.config); const dbOptions = createDbOptions(dataSource.config);
const automate = new Automate(dbOptions, {}); const automate = new Automate(dbOptions, {});
const tables = await automate.getTables(); const tables = await automate.getTables(); //获取当前采集任务数据源pg库表字段索引外键数据
const dataToSave = { const dataToSave = {
code: dataSource.config.database, code: dataSource.config.database,
name: dataSource.config.database, name: dataSource.config.database,
@ -29,7 +33,7 @@ async function handleTask(app, task) {
updateAt: null, updateAt: null,
user: { id: 1, name: "超级管理员", username: "SuperAdmin" }, user: { id: 1, name: "超级管理员", username: "SuperAdmin" },
attributesParam: null, attributesParam: null,
catalogKey: 'rc' catalogKey: dataSource.catalogKey
} }
//初始化数据 新增 数据库 表 字段索引外键数据 //初始化数据 新增 数据库 表 字段索引外键数据
@ -40,7 +44,7 @@ async function handleTask(app, task) {
catalog: dataSource.mountPath, catalog: dataSource.mountPath,
}//目录树下库只会存在一个 判断是否有库类型元数据 没有则新增库和表元数据 有库数据则比较更新表类型元数据 全量更新字段索引数据 }//目录树下库只会存在一个 判断是否有库类型元数据 没有则新增库和表元数据 有库数据则比较更新表类型元数据 全量更新字段索引数据
}) })
let databaseId = databaseFind ? databaseFind.id : null;
if (databaseFind) { //更新表类型元数据 if (databaseFind) { //更新表类型元数据
const newTableNames = [] const newTableNames = []
Object.keys(tables).forEach(key => { newTableNames.push(key) }); Object.keys(tables).forEach(key => { newTableNames.push(key) });
@ -78,6 +82,7 @@ async function handleTask(app, task) {
} else { //新增库、表类型数据 } else { //新增库、表类型数据
//库类型元数据存储 //库类型元数据存储
const databaseRslt = await models.MetadataDatabase.create(dataToSave) const databaseRslt = await models.MetadataDatabase.create(dataToSave)
databaseId = databaseRslt.id
//表元数据存储 //表元数据存储
if (databaseRslt && databaseRslt.id) { if (databaseRslt && databaseRslt.id) {
const tableBodys = [] const tableBodys = []
@ -93,10 +98,12 @@ async function handleTask(app, task) {
} }
} }
if (databaseId) {
const metaDatabaseTables = await models.MetadataDatabase.findAll({ const metaDatabaseTables = await models.MetadataDatabase.findAll({
where: { where: {
type: '表', type: '表',
catalog: dataSource.mountPath, catalog: dataSource.mountPath,
parent: databaseId
} }
}) })
@ -110,10 +117,6 @@ async function handleTask(app, task) {
const fieldBodys = [] const fieldBodys = []
for (let table of metaDatabaseTables) { for (let table of metaDatabaseTables) {
if(!tables[table.name].structures){
console.log(table)
return;
}
Object.keys(tables[table.name].structures).forEach(key => { Object.keys(tables[table.name].structures).forEach(key => {
dataToSave.parent = table.id; dataToSave.parent = table.id;
dataToSave.name = tables[table.name].structures[key].comment || key; dataToSave.name = tables[table.name].structures[key].comment || key;
@ -122,14 +125,57 @@ async function handleTask(app, task) {
const tableObj = { ...dataToSave } const tableObj = { ...dataToSave }
fieldBodys.push(tableObj) fieldBodys.push(tableObj)
}) })
tables[table.name].foreignKeys.forEach(v => {
dataToSave.parent = table.id;
dataToSave.name = v.columnName;
dataToSave.code = v.columnName;
dataToSave.type = '外键';
const tableObj = { ...dataToSave }
fieldBodys.push(tableObj)
})
tables[table.name].indexes.forEach(v => {
dataToSave.parent = table.id;
dataToSave.name = v.name;
dataToSave.code = v.name;
dataToSave.type = '索引';
const tableObj = { ...dataToSave }
fieldBodys.push(tableObj)
})
} }
const fieldRslt = await models.MetadataDatabase.bulkCreate(fieldBodys); const fieldRslt = await models.MetadataDatabase.bulkCreate(fieldBodys);
if (fieldRslt) { if (fieldRslt) {
const endTime = moment()
const logBody = {
task: task.id,
success: true,
details: '采集成功',
startTime: startTime,
endTime: endTime
}
//日志记录 //日志记录
await models.AcquisitionLog.create(logBody)
} }
} }
}
i++
console.log('===========------------------------------------------------------------' + task.taskName + moment().format('HH:mm:ss'))
} catch (error) { } catch (error) {
// await transaction.rollback(); // await transaction.rollback();
const endTime = moment()
const logBody = {
task: task.id,
success: true,
details: '采集失败' + JSON.stringify(error).substring(0, 248),
startTime: startTime,
endTime: endTime
}
await models.AcquisitionLog.create(logBody)
// if (app.fs.schedule['taskJobs'][task.id]) schedule.cancelJob(app.fs.schedule['taskJobs'][task.id])
console.log('===========------------------------------------------------------------' + i)
app.fs.logger.error(`sechedule: handleTask, error: ${error}`); app.fs.logger.error(`sechedule: handleTask, error: ${error}`);
} }
} }

9
api/app/lib/models/data_source.js

@ -86,6 +86,15 @@ module.exports = dc => {
primaryKey: false, primaryKey: false,
field: "time", field: "time",
autoIncrement: false autoIncrement: false
},
catalogKey: {
type: DataTypes.STRING,
allowNull: true,
defaultValue: null,
comment: null,
primaryKey: false,
field: "catalogKey",
autoIncrement: false
} }
}, { }, {
tableName: "t_data_source", tableName: "t_data_source",

2
scripts/0.0.3/schema/03_alter_t_data_source.sql

@ -0,0 +1,2 @@
alter table t_data_source
add "catalogKey" varchar(255);

10
web/client/src/sections/metadataAcquisition/components/steps/postgre/stepOne.js

@ -10,7 +10,7 @@ import {
import '../../style.less'; import '../../style.less';
function StepOne(props) { function StepOne(props) {
const { next, stepOneValues, stepOneValuesFinish, readOnly, treeData } = props; const { next, stepOneValues, stepOneValuesFinish, readOnly, treeData, dataSources } = props;
const formRef = React.createRef(); const formRef = React.createRef();
const initialValues = stepOneValues ? stepOneValues : { const initialValues = stepOneValues ? stepOneValues : {
adapterName: 'PostgreSQL采集适配器', adapterName: 'PostgreSQL采集适配器',
@ -30,7 +30,10 @@ function StepOne(props) {
data = dataSource.filter(ds => ds.parent == parent); data = dataSource.filter(ds => ds.parent == parent);
} }
treeData = data.map(ds => { treeData = data.map(ds => {
return { title: ds.name, key: ds?.id, id: ds.id, value: ds?.id } return {
title: ds.name, key: `${key}-${ds.id}`, id: ds.id, value: `${key}-${ds.id}`,
disabled: dataSources?.rows?.find(s => s.mountPath == ds.id)
}
}); });
for (let d of treeData) { for (let d of treeData) {
d.children = getTreeNodeData(dataSource, d.id, d.key); d.children = getTreeNodeData(dataSource, d.id, d.key);
@ -50,6 +53,7 @@ function StepOne(props) {
onCancel: () => { }, onCancel: () => { },
}} }}
onFinish={async (values) => { onFinish={async (values) => {
values.mountPath = values.catalogKey.split('-')[values.catalogKey.split('-')?.length - 1]
next() next()
stepOneValuesFinish(values) stepOneValuesFinish(values)
return true; return true;
@ -109,7 +113,7 @@ function StepOne(props) {
// disabled={true} // disabled={true}
/> */} /> */}
<ProFormTreeSelect <ProFormTreeSelect
name="mountPath" name="catalogKey"
label="数据源挂载路径" label="数据源挂载路径"
placeholder="请选择数据源挂载路径" placeholder="请选择数据源挂载路径"
rules={[{ required: true, message: '请选择数据源挂载路径' }]} rules={[{ required: true, message: '请选择数据源挂载路径' }]}

4
web/client/src/sections/metadataAcquisition/components/steps/postgre/stepTwo.js

@ -4,7 +4,7 @@ import '../../style.less';
import { Func, useMockRequest, useFsRequest, ApiTable } from '$utils'; import { Func, useMockRequest, useFsRequest, ApiTable } from '$utils';
function StepTwo(props) { function StepTwo(props) {
const { prev, next, dispatch, actions, stepTwoValues, stepTwoValuesFinish, stepProps } = props; const { prev, next, dispatch, actions, stepTwoValues, stepTwoValuesFinish, stepProps, editData } = props;
const [params, setParams] = useState({}) const [params, setParams] = useState({})
const [connect, setConnet] = useState('') const [connect, setConnet] = useState('')
const [loading, setLoading] = useState(false); const [loading, setLoading] = useState(false);
@ -69,6 +69,7 @@ function StepTwo(props) {
render: (text, record) => { render: (text, record) => {
return <Input return <Input
// type={text == "password" ? "password" : ''} // type={text == "password" ? "password" : ''}
disabled={!!editData}
defaultValue={stepTwoValues ? stepTwoValues[text] : ''} defaultValue={stepTwoValues ? stepTwoValues[text] : ''}
onChange={(e) => { onChange={(e) => {
const obj = {}; const obj = {};
@ -88,6 +89,7 @@ function StepTwo(props) {
return true; return true;
} }
} }
const checkConnect = () => { const checkConnect = () => {
if (checkNext() === true) { if (checkNext() === true) {
setLoading(true) setLoading(true)

6
web/client/src/sections/metadataAcquisition/containers/acquisitionTask.js

@ -122,9 +122,9 @@ function AcquisitionTask(props) {
{ {
title: '执行周期', title: '执行周期',
dataIndex: 'cron', dataIndex: 'cron',
// render: (text, record) => { render: (text, record) => {
// return transCron(record?.cron); return transCron(record?.cron);
// } }
}, },
{ {
title: '重复次数', title: '重复次数',

9
web/client/src/sections/metadataAcquisition/containers/adapter.js

@ -13,6 +13,7 @@ const LatestMetadata = (props) => {
const { data: treeData = [] } = useFsRequest({ url: ApiTable.getResourceCatalog }); const { data: treeData = [] } = useFsRequest({ url: ApiTable.getResourceCatalog });
useEffect(() => { useEffect(() => {
dispatch(actions.metadataAcquisition.getAdapters()) dispatch(actions.metadataAcquisition.getAdapters())
dispatch(actions.metadataAcquisition.getDataSources());
}, []) }, [])
const renderRelationalDatabase = () => { const renderRelationalDatabase = () => {
@ -42,7 +43,8 @@ const LatestMetadata = (props) => {
name: stepOneValues?.name, name: stepOneValues?.name,
audited: true, audited: true,
adapterId: adapterInfo?.id, adapterId: adapterInfo?.id,
mountPath: 1, mountPath: stepOneValues?.mountPath,
catalogKey: stepOneValues?.catalogKey,
description: stepOneValues?.description, description: stepOneValues?.description,
config: stepTwoValues, config: stepTwoValues,
time: moment() time: moment()
@ -91,12 +93,13 @@ const LatestMetadata = (props) => {
} }
function mapStateToProps(state) { function mapStateToProps(state) {
const { global, auth, adapters } = state; const { global, auth, adapters, datasources } = state;
return { return {
clientHeight: global.clientHeight, clientHeight: global.clientHeight,
user: auth.user, user: auth.user,
actions: global.actions, actions: global.actions,
adapters: adapters?.data || [] adapters: adapters?.data || [],
dataSources: datasources?.data || {},
}; };
} }
export default connect(mapStateToProps)(LatestMetadata) export default connect(mapStateToProps)(LatestMetadata)

4
web/client/src/sections/metadataAcquisition/containers/dataSourceManagement.js

@ -94,7 +94,8 @@ function DataSourceManagement(props) {
<Popconfirm <Popconfirm
key="del" key="del"
placement="top" placement="top"
title="是否确认删除该数据源?" title={<><div>是否确认删除该数据源</div>
<div>(将同步删除数据源下的元数据)</div></>}
onConfirm={() => handleDelete(record.id)} onConfirm={() => handleDelete(record.id)}
okText="是" okText="是"
cancelText="否" cancelText="否"
@ -116,6 +117,7 @@ function DataSourceManagement(props) {
audited: true, audited: true,
adapterId: adapterInfo?.id, adapterId: adapterInfo?.id,
mountPath: stepOneValues?.mountPath, mountPath: stepOneValues?.mountPath,
catalogKey: stepOneValues?.catalogKey,
description: stepOneValues?.description, description: stepOneValues?.description,
config: stepTwoValues, config: stepTwoValues,
time: moment() time: moment()

Loading…
Cancel
Save