diff --git a/dataSource/channels.go b/dataSource/channels.go
deleted file mode 100644
index b6b8b7d..0000000
--- a/dataSource/channels.go
+++ /dev/null
@@ -1,30 +0,0 @@
-package dataSource
-
-import (
-	"gitea.anxinyun.cn/container/common_models"
-	"sync"
-)
-
-type DataChannels struct {
-	RawDataChan chan common_models.IotaData
-	AggDataChan chan common_models.AggData
-}
-
-var (
-	once         sync.Once
-	dataChannels *DataChannels
-)
-
-func InitChannels() *DataChannels {
-	once.Do(func() {
-		dataChannels = &DataChannels{
-			RawDataChan: make(chan common_models.IotaData, 1),
-			AggDataChan: make(chan common_models.AggData, 1),
-		}
-	})
-	return dataChannels
-}
-
-func GetChannels() *DataChannels {
-	return dataChannels
-}
diff --git a/dataSource/kafka/aggData.go b/dataSource/kafka/aggData.go
deleted file mode 100644
index f358b45..0000000
--- a/dataSource/kafka/aggData.go
+++ /dev/null
@@ -1,60 +0,0 @@
-package kafka
-
-import (
-	"dataSource"
-	"encoding/json"
-	"gitea.anxinyun.cn/container/common_models"
-	"gitea.anxinyun.cn/container/common_utils"
-	"gitea.anxinyun.cn/container/common_utils/configLoad"
-	"log"
-	"strings"
-	"time"
-)
-
-type AggDataHandler struct {
-	configHelper *common_utils.ConfigHelper
-}
-
-func NewAggDataHandler() *AggDataHandler {
-	redisAddr := configLoad.LoadConfig().GetString("redis.address")
-	return &AggDataHandler{
-		configHelper: common_utils.NewConfigHelper(redisAddr),
-	}
-
-}
-
-func (h AggDataHandler) HandleMessage(message string) bool {
-	// aggDataMsg: {"date":"2024-09-19T09:39:59.999+0000","sensorId":106,"structId":1,"factorId":11,"aggTypeId":2006,"aggMethodId":3004,"agg":{"strain":-19.399999618530273},"changed":{"strain":-3}}
-	// aggDataMsg 中的时间为UTC格式 2024-04-19T01:10:59.999+0000,
-	// 在进行 json.Unmarshal() 时报错
-	// 解决方案:先将 +0000 -> Z,然后再将 UTC 时间转换为中国时区时间("Asia/Shanghai")
-
-	// 2024-09-28T23:59:59.999+0800
-	// 将 2024-04-19T01:10:59.999+0000 -> 2024-04-19T01:10:59.999Z
-	utcTimeStr := strings.Replace(message, "+0800", "+08:00", 1)
-	utcTimeStr = strings.Replace(utcTimeStr, "+0000", "Z", 1)
-
-	aggData := common_models.AggData{}
-	err := json.Unmarshal([]byte(utcTimeStr), &aggData)
-	if err != nil {
-		log.Printf("json parse error: %v", err)
-		return false
-	}
-	// 转换为中国时区时间("Asia/Shanghai")
-	loc, _ := time.LoadLocation("Asia/Shanghai")
-	chinaTime := aggData.Date.In(loc)
-	aggData.Date = chinaTime
-	//log.Printf("message:%v\n, cvt: %+v", message, aggData)
-	if aggData.ThingId == "" {
-		structure, err := h.configHelper.GetStructure(aggData.StructId)
-		if err != nil {
-			log.Printf("redis 中无 key = structure:%d 的缓存数据.", aggData.StructId)
-			return false
-		}
-		aggData.ThingId = structure.ThingId
-	}
-
-	log.Printf("handler 处理sensorId[%d]消息", aggData.SensorId)
-	dataSource.GetChannels().AggDataChan <- aggData
-	return true
-}
diff --git a/dataSource/kafka/iotaData.go b/dataSource/kafka/iotaData.go
deleted file mode 100644
index 4d2fc59..0000000
--- a/dataSource/kafka/iotaData.go
+++ /dev/null
@@ -1,23 +0,0 @@
-package kafka
-
-import (
-	"dataSource"
-	"encoding/json"
-	"gitea.anxinyun.cn/container/common_models"
-	"log"
-)
-
-type IotaDataHandler struct{}
-
-func (h IotaDataHandler) HandleMessage(message string) bool {
-	// 处理 alarm 消息
-	rawData := common_models.IotaData{}
-	err := json.Unmarshal([]byte(message), &rawData)
-	if err != nil {
-		log.Printf("[dataSource/kafka/iotaData/IotaHandler] Parse msg error: %v", err)
-		return false
-	}
-	log.Printf("handler 处理[%s|%s]消息", rawData.DeviceId, rawData.TriggerTime)
-	dataSource.GetChannels().RawDataChan <- rawData
-	return true
-}
diff --git a/dataSource/kafka/kafka_handler.go b/dataSource/kafka/kafka_handler.go
deleted file mode 100644
index 67251e9..0000000
--- a/dataSource/kafka/kafka_handler.go
+++ /dev/null
@@ -1,71 +0,0 @@
-package kafka
-
-import (
-	"dataSource"
-	"gitea.anxinyun.cn/container/common_utils/configLoad"
-	"gitea.anxinyun.cn/container/common_utils/kafkaHelper"
-	"log"
-)
-
-type KafkaDataSource struct {
-	groupId      string
-	brokers      []string
-	topics       map[string]string
-	DataChannels *dataSource.DataChannels
-}
-
-func NewKafkaDataSource() *KafkaDataSource {
-	// 初始化所有通道
-	dataSource.InitChannels()
-
-	// 读取配置
-	config := configLoad.LoadConfig()
-	groupId := config.GetString("kafka.groupId")
-	brokers := config.GetStringSlice("kafka.brokers")
-	topics := config.GetStringMapString("kafka.topics")
-
-	return &KafkaDataSource{
-		groupId:      groupId,
-		brokers:      brokers,
-		topics:       topics,
-		DataChannels: dataSource.GetChannels(),
-	}
-}
-
-// Producer 将 kafka message -> 各数据模型 -> 各数据通道
-func (s *KafkaDataSource) Producer() {
-	// 消费数据
-	kafkaConsumer := kafkaHelper.NewConsumerGroupHandler(s.brokers, s.groupId)
-	for cfgName, topic := range s.topics {
-		// 创建消息处理器
-		handler := NewMessageHandler(cfgName)
-		if handler == nil {
-			log.Printf("Kafka topic【%s】未定义处理者。\n", cfgName)
-			continue
-		}
-		// 订阅主题 和 消息处理
-		kafkaConsumer.Subscribe(topic, handler.HandleMessage)
-	}
-
-	kafkaConsumer.Worker()
-}
-
-// IMessageHandler 是 kafka 消息处理者接口
-type IMessageHandler interface {
-	HandleMessage(message string) bool
-}
-
-// NewMessageHandler 是 MessageHandler 构造函数
-// cfgName: config.yaml 中 kafka.topics 的配置名
-func NewMessageHandler(cfgName string) IMessageHandler {
-	switch cfgName {
-	case "data_raw":
-		log.Printf("Kafka topic【%s】已注册处理者IotaDataHandler\n", cfgName)
-		return IotaDataHandler{}
-	case "data_agg":
-		log.Printf("Kafka topic【%s】已注册处理者AggDataHandler\n", cfgName)
-		return NewAggDataHandler()
-	default:
-		return nil
-	}
-}