'use strict'; const schedule = require('node-schedule') const moment = require('moment'); const initJob = require('./initJob'); // 新增采集任务 function addAcquisitionTask(app) { return async function (ctx, next) { const models = ctx.fs.dc.models; try { const { taskName } = ctx.request.body const checkName = await models.AcquisitionTask.findOne({ where: { taskName, taskName } }); if (checkName) { ctx.status = 400; ctx.body = { message: '该采集任务名称已存在' } } else { let rslt = ctx.request.body; await models.AcquisitionTask.create(Object.assign({}, rslt)) let task = await models.AcquisitionTask.findOne({ where: { taskName } }) //新增采集任务 更新采集定时任务状态 const job = await initJob(app, task) if (app.fs.schedule['taskJobs'][task.id]) schedule.cancelJob(app.fs.schedule['taskJobs'][task.id]) app.fs.schedule['taskJobs'][task.id] = job ctx.status = 200; ctx.body = { message: '新建采集任务成功' } } } catch (error) { ctx.fs.logger.error(`path: ${ctx.path}, error: ${error}`); ctx.status = 400; ctx.body = { message: '新建采集任务失败' } } } } function getAcquisitionTask(opts) { return async function (ctx, next) { const models = ctx.fs.dc.models; const { page, limit, taskName } = ctx.query; let errMsg = { message: '获取采集任务失败' } const Op = ctx.fs.dc.ORM.Op; try { let searchWhere = {} let option = { where: searchWhere, order: [["id", "desc"]], include: [{ model: models.DataSource, include: [{ model: models.Adapter, }] }] } if (taskName) { searchWhere.$or = [ { taskName: { $iLike: `%${taskName}%` } }, { '$dataSource.name$': { $iLike: `%${taskName}%` } }, ] } option.where = searchWhere let limit_ = limit || 10; let page_ = page || 1; let offset = (page_ - 1) * limit_; if (limit && page) { option.limit = limit_ option.offset = offset } const res = await models.AcquisitionTask.findAndCount(option); ctx.status = 200; ctx.body = res; } catch (error) { ctx.fs.logger.error(`path: ${ctx.path}, error: ${error}`); ctx.status = 400; ctx.body = errMsg } } } // 修改采集任务 function editAcquisitionTask(app) { return async function (ctx, next) { try { const models = ctx.fs.dc.models; const { id } = ctx.params; const body = ctx.request.body; const { taskName } = ctx.request.body const checkName = await models.AcquisitionTask.findOne({ where: { id: { $not: id }, taskName, taskName } }); if (checkName) { ctx.status = 400; ctx.body = { message: '该采集任务名称已存在' } } else { await models.AcquisitionTask.update( body, { where: { id: id, } } ) const task = await models.AcquisitionTask.findOne({ where: { id: id } }); //编辑采集任务 更新采集定时任务状态 if ((task.enabled && (body.cron || body.enabled))) { const job = await initJob(app, task) if (app.fs.schedule['taskJobs'][id]) schedule.cancelJob(app.fs.schedule['taskJobs'][id]) app.fs.schedule['taskJobs'][id] = job } else if (!task.enabled) { if (app.fs.schedule['taskJobs'][id]) { schedule.cancelJob(app.fs.schedule['taskJobs'][id]) } } ctx.status = 204; ctx.body = { message: '修改采集任务成功' } } } catch (error) { ctx.fs.logger.error(`path: ${ctx.path}, error: ${error}`); ctx.status = 400; ctx.body = { message: '修改采集任务失败' } } } } // 删除采集任务 function deleteAcquisitionTask(app) { return async function (ctx, next) { try { const models = ctx.fs.dc.models; const { id } = ctx.params; const task = await models.AcquisitionTask.destroy({ where: { id: id } }) //删除任务 取消定时任务job if (task) { if (app.fs.schedule['taskJobs'][id]) { schedule.cancelJob(app.fs.schedule['taskJobs'][id]) } } ctx.status = 204; ctx.body = { message: '删除采集任务成功' } } catch (error) { ctx.fs.logger.error(`path: ${ctx.path}, error: ${error}`); ctx.status = 400; ctx.body = { message: '删除采集任务失败' } } } } module.exports = { addAcquisitionTask, getAcquisitionTask, editAcquisitionTask, deleteAcquisitionTask }