/**
* WebSocket Hook - TCP代理连接管理
*
* 提供WebSocket连接的统一管理,用于与TCP代理服务器进行通信
*
* 数据格式规范:
* ```json
* {
* "_from": "dev|setup|...", // 数据来源: dev=设备, setup=上位机
* "cmd": "getBase|setConfig|...", // 具体命令
* "values": {} // 命令参数或数据
* }
* ```
*
* 使用方法:
*
* 1. 在根组件中使用WebSocketProvider包装:
* ```jsx
* import { WebSocketProvider } from './actions/websocket.jsx';
*
* const App = () => {
* return (
*
*
*
* );
* };
* ```
*
* 2. 基本使用:
* ```jsx
* import { useWebSocket } from './actions/websocket.jsx';
*
* const MyComponent = () => {
* const { isConnected, sendMessage, lastMessage } = useWebSocket();
*
* const handleSendCommand = () => {
* sendMessage(JSON.stringify({
* _from: 'setup',
* cmd: 'setConfig',
* values: { x: 100, y: 200 }
* }));
* };
*
* return (
*
*
*
最新消息: {JSON.stringify(lastMessage)}
*
* );
* };
* ```
*
* 3. 订阅特定来源和命令的消息:
* ```jsx
* import { useWebSocketSubscription } from './actions/websocket.jsx';
*
* const DeviceDataComponent = () => {
* 只监听设备发送的基础数据
* const baseData = useWebSocketSubscription('dev', 'getBase');
*
* 监听所有设备消息
* const allDeviceData = useWebSocketSubscription('dev');
*
* 监听特定命令(任何来源)
* const configData = useWebSocketSubscription(null, 'setConfig');
*
* return (
*
*
设备基础数据:
*
{JSON.stringify(baseData, null, 2)}
*
* );
* };
* ```
*
* 4. 自定义消息处理:
* ```jsx
* import { useWebSocketMessage } from './actions/websocket.jsx';
*
* const CustomHandler = () => {
* useWebSocketMessage(({ _from, cmd, values, timestamp }) => {
* if (_from === 'dev' && cmd === 'alarm') {
* 处理设备告警
* handleDeviceAlarm(values);
* }
* });
*
* return 监听中...
;
* };
* ```
*
* API说明:
* - isConnected: boolean - 连接状态
* - connectionStatus: string - 连接状态('connecting'|'connected'|'disconnected'|'error')
* - sendMessage: (message: string) => boolean - 发送消息
* - lastMessage: object - 最后接收的消息
* - messageHistory: array - 消息历史
* - subscribe: (from, cmd, callback) => unsubscribe - 订阅消息
* - useWebSocketSubscription: (from, cmd) => data - 订阅Hook
* - useWebSocketMessage: (callback) => void - 消息监听Hook
*
* 特性:
* - 基于 _from 和 cmd 的发布订阅模式
* - 自动连接和重连(最多5次)
* - 消息类型过滤和路由
* - 多组件共享同一WebSocket连接
* - 完整的连接生命周期管理
*/
import React, {
createContext,
useContext,
useEffect,
useRef,
useState,
useCallback,
} from "react";
// 创建WebSocket Context
const WebSocketContext = createContext();
// WebSocket Provider组件
export const WebSocketProvider = ({ children }) => {
const [isConnected, setIsConnected] = useState(false);
const [isReady, setIsReady] = useState(false); // 新增:连接真正就绪状态
const [connectionStatus, setConnectionStatus] = useState("disconnected");
const [lastMessage, setLastMessage] = useState(null); // 最后接收的消息
const [messageHistory, setMessageHistory] = useState([]); // 消息历史
const socketRef = useRef(null);
const reconnectTimeoutRef = useRef(null);
const reconnectAttemptsRef = useRef(0);
const subscriptionsRef = useRef(new Map()); // 订阅器存储
const messageQueueRef = useRef([]); // 新增:消息队列
const readyCheckTimeoutRef = useRef(null); // 新增:就绪检查定时器
const maxReconnectAttempts = 5;
const reconnectInterval = 3000; // 3秒
const maxHistoryLength = 100; // 最大历史消息数量
const READY_TIMEOUT = 1500; // 连接建立后等待1.5秒确保TCP链路就绪
// 动态获取WebSocket连接地址,支持局域网访问
const getWebSocketUrl = () => {
const protocol = window.location.protocol === "https:" ? "wss:" : "ws:";
const host = window.location.hostname;
const port = Number(window.location.port) + 1 || 8081;
return `${protocol}//${host}:${port}/tcp-proxy`;
};
const websocketUrl = getWebSocketUrl();
// 生成订阅key
const getSubscriptionKey = (from, cmd) => {
if (from && cmd) return `${from}:${cmd}`;
if (from) return `${from}:*`;
if (cmd) return `*:${cmd}`;
return "*:*";
};
// 订阅消息
const subscribe = useCallback((from, cmd, callback) => {
const key = getSubscriptionKey(from, cmd);
if (!subscriptionsRef.current.has(key)) {
subscriptionsRef.current.set(key, new Set());
}
subscriptionsRef.current.get(key).add(callback);
// 返回取消订阅函数
return () => {
const callbacks = subscriptionsRef.current.get(key);
if (callbacks) {
callbacks.delete(callback);
if (callbacks.size === 0) {
subscriptionsRef.current.delete(key);
}
}
};
}, []);
// 通知订阅者
const notifySubscribers = useCallback((messageData) => {
const { _from, cmd } = messageData;
// 可能匹配的订阅key列表
const keysToCheck = [
getSubscriptionKey(_from, cmd), // 精确匹配
getSubscriptionKey(_from, null), // 匹配来源
getSubscriptionKey(null, cmd), // 匹配命令
getSubscriptionKey(null, null), // 匹配所有
];
keysToCheck.forEach((key) => {
const callbacks = subscriptionsRef.current.get(key);
if (callbacks) {
callbacks.forEach((callback) => {
try {
callback(messageData);
} catch (error) {
console.error("订阅回调执行错误:", error);
}
});
}
});
}, []);
// 处理接收到的消息
const handleMessage = useCallback(
(data) => {
const timestamp = Date.now();
let parsedData = null;
try {
parsedData = JSON.parse(data);
// 验证数据格式
if (!parsedData._from || !parsedData.cmd) {
console.warn("收到格式不正确的消息:", parsedData);
return;
}
// 处理代理服务器的就绪信号
if (parsedData._from === 'proxy' && parsedData.cmd === 'ready') {
console.log('🟢 收到TCP连接就绪信号');
// 清除原有的定时器
if (readyCheckTimeoutRef.current) {
clearTimeout(readyCheckTimeoutRef.current);
readyCheckTimeoutRef.current = null;
}
// 立即设置为就绪状态
setIsReady(true);
// 发送队列中的消息
flushMessageQueue();
return;
}
// console.log(`收到消息 [${parsedData._from}:${parsedData.cmd}]:`, parsedData);
} catch (error) {
console.error("解析WebSocket消息失败:", error, data);
return;
}
const messageData = {
id: `msg_${timestamp}`,
timestamp,
rawData: data,
...parsedData,
};
// 更新最后消息和历史
setLastMessage(messageData);
setMessageHistory((prev) => {
const newHistory = [messageData, ...prev];
return newHistory.slice(0, maxHistoryLength);
});
// 通知订阅者
notifySubscribers(messageData);
},
[notifySubscribers]
);
// 处理队列中的消息
const flushMessageQueue = useCallback(() => {
if (messageQueueRef.current.length === 0) return;
console.log(`📤 开始发送队列中的 ${messageQueueRef.current.length} 条消息`);
const queue = [...messageQueueRef.current];
messageQueueRef.current = [];
queue.forEach((message, index) => {
setTimeout(() => {
if (socketRef.current?.readyState === WebSocket.OPEN) {
try {
socketRef.current.send(message);
console.log(`✅ 队列消息 ${index + 1}/${queue.length} 已发送`);
} catch (error) {
console.error(`❌ 发送队列消息 ${index + 1} 失败:`, error);
// 发送失败的消息重新加入队列
messageQueueRef.current.push(message);
}
}
}, index * 100); // 每条消息间隔100ms发送,避免拥塞
});
}, []);
// 发送消息
const sendMessage = useCallback((message) => {
// 如果连接就绪,直接发送
if (socketRef.current?.readyState === WebSocket.OPEN && isReady) {
try {
socketRef.current.send(message);
return true;
} catch (error) {
console.error("发送WebSocket消息失败:", error);
return false;
}
}
// 如果已连接但未就绪,加入队列
else if (socketRef.current?.readyState === WebSocket.OPEN && !isReady) {
const cmdMatch = message.match(/"cmd"\s*:\s*"([^"]+)"/);
const cmd = cmdMatch ? cmdMatch[1] : 'unknown';
console.log(`⏳ 连接尚未完全就绪,消息 [${cmd}] 已加入队列`);
messageQueueRef.current.push(message);
return true;
}
// 如果未连接,加入队列
else {
const cmdMatch = message.match(/"cmd"\s*:\s*"([^"]+)"/);
const cmd = cmdMatch ? cmdMatch[1] : 'unknown';
console.warn(`⚠️ WebSocket未连接,消息 [${cmd}] 已加入队列,等待连接建立`);
messageQueueRef.current.push(message);
return true;
}
}, [isReady]);
// 连接WebSocket
const connect = useCallback(() => {
if (socketRef.current?.readyState === WebSocket.OPEN) {
return; // 已经连接
}
setConnectionStatus("connecting");
setIsReady(false); // 重置就绪状态
console.log("尝试连接WebSocket:", websocketUrl);
try {
socketRef.current = new WebSocket(websocketUrl);
socketRef.current.onopen = () => {
console.log("✅ WebSocket连接已建立");
setIsConnected(true);
setConnectionStatus("connected");
reconnectAttemptsRef.current = 0;
// 等待一段时间确保底层TCP连接完全建立
// 如果在此期间收到代理的 ready 信号,则会提前取消这个定时器
readyCheckTimeoutRef.current = setTimeout(() => {
setIsReady(true);
flushMessageQueue();
}, READY_TIMEOUT);
};
socketRef.current.onmessage = (event) => {
try {
const data =
typeof event.data === "string" ? event.data : event.data;
// 这里可以处理接收到的消息,比如更新全局状态
// 或者触发自定义事件
handleMessage(data);
} catch (error) {
console.error("处理WebSocket消息错误:", error);
}
};
socketRef.current.onclose = (event) => {
console.log("❌ WebSocket连接已关闭:", event.code, event.reason);
setIsConnected(false);
setIsReady(false); // 重置就绪状态
setConnectionStatus("disconnected");
// 清除就绪检查定时器
if (readyCheckTimeoutRef.current) {
clearTimeout(readyCheckTimeoutRef.current);
readyCheckTimeoutRef.current = null;
}
// 自动重连逻辑
if (reconnectAttemptsRef.current < maxReconnectAttempts) {
reconnectAttemptsRef.current++;
console.log(
`🔄 尝试重连 ${reconnectAttemptsRef.current}/${maxReconnectAttempts}`
);
reconnectTimeoutRef.current = setTimeout(() => {
connect();
}, reconnectInterval);
} else {
console.log("🛑 达到最大重连次数,停止重连");
setConnectionStatus("error");
}
};
socketRef.current.onerror = (error) => {
console.error("WebSocket错误:", error);
setConnectionStatus("error");
};
} catch (error) {
console.error("创建WebSocket连接失败:", error);
setConnectionStatus("error");
setIsReady(false);
}
}, [handleMessage, flushMessageQueue]);
// 断开连接
const disconnect = useCallback(() => {
if (reconnectTimeoutRef.current) {
clearTimeout(reconnectTimeoutRef.current);
}
if (readyCheckTimeoutRef.current) {
clearTimeout(readyCheckTimeoutRef.current);
readyCheckTimeoutRef.current = null;
}
if (socketRef.current) {
socketRef.current.close(1000, "用户主动断开");
socketRef.current = null;
}
setIsConnected(false);
setIsReady(false);
setConnectionStatus("disconnected");
reconnectAttemptsRef.current = 0;
}, []);
// 获取特定来源和命令的消息历史
const getMessages = useCallback(
(from, cmd) => {
return messageHistory.filter((msg) => {
if (from && cmd) return msg._from === from && msg.cmd === cmd;
if (from) return msg._from === from;
if (cmd) return msg.cmd === cmd;
return true;
});
},
[messageHistory]
);
// 清空消息历史
const clearMessageHistory = useCallback(() => {
setMessageHistory([]);
setLastMessage(null);
}, []);
// 组件挂载时自动连接
useEffect(() => {
connect();
// 组件卸载时清理
return () => {
if (reconnectTimeoutRef.current) {
clearTimeout(reconnectTimeoutRef.current);
}
if (readyCheckTimeoutRef.current) {
clearTimeout(readyCheckTimeoutRef.current);
}
if (socketRef.current) {
socketRef.current.close();
}
};
}, [connect]);
// 提供给子组件的值
const value = {
// 连接状态
isConnected,
isReady, // 新增:暴露就绪状态
connectionStatus,
// 连接控制
connect,
disconnect,
sendMessage,
// 数据访问
lastMessage,
messageHistory,
getMessages,
clearMessageHistory,
// 订阅功能
subscribe,
// 原始socket
socket: socketRef.current,
};
return (
{children}
);
};
// 自定义Hook,方便子组件使用
export const useWebSocket = () => {
const context = useContext(WebSocketContext);
if (!context) {
throw new Error("useWebSocket必须在WebSocketProvider内部使用");
}
return context;
};
// 订阅特定来源和命令的消息Hook
export const useWebSocketSubscription = (from = null, cmd = null) => {
const { subscribe, getMessages } = useWebSocket();
const [data, setData] = useState([]);
const [latestMessage, setLatestMessage] = useState(null);
useEffect(() => {
// 获取历史消息
const history = getMessages(from, cmd);
setData(history);
setLatestMessage(history[0] || null);
// 订阅新消息
const unsubscribe = subscribe(from, cmd, (messageData) => {
setLatestMessage(messageData);
setData((prev) => [messageData, ...prev.slice(0, 99)]); // 保持最多100条
});
return unsubscribe;
}, [subscribe, getMessages, from, cmd]);
return {
data, // 所有匹配的消息
latest: latestMessage, // 最新的消息
values: latestMessage?.values || null, // 最新消息的values字段
};
};
// 通用消息监听Hook
export const useWebSocketMessage = (callback) => {
const { subscribe } = useWebSocket();
useEffect(() => {
if (!callback) return;
// 监听所有消息
const unsubscribe = subscribe(null, null, callback);
return unsubscribe;
}, [subscribe, callback]);
};
// 默认导出
export default WebSocketContext;