refactor: improve webSocket connection handling and error recovery

This commit is contained in:
wonfen 2025-03-09 04:22:01 +08:00
parent c1a9de4d66
commit f18202a3a4
5 changed files with 391 additions and 101 deletions

View File

@ -11,7 +11,7 @@ import { TrafficGraph, type TrafficRef } from "./traffic-graph";
import { useVisibility } from "@/hooks/use-visibility";
import parseTraffic from "@/utils/parse-traffic";
import useSWRSubscription from "swr/subscription";
import { createSockette } from "@/utils/websocket";
import { createSockette, createAuthSockette } from "@/utils/websocket";
import { useTranslation } from "react-i18next";
import { isDebugEnabled, gc } from "@/services/api";
@ -47,23 +47,41 @@ export const LayoutTraffic = () => {
(_key, { next }) => {
const { server = "", secret = "" } = clashInfo!;
const s = createSockette(
`ws://${server}${secret ? `/traffic?token=${encodeURIComponent(secret)}` : "/traffic"}`,
{
if (!server) {
console.warn("[Traffic] 服务器地址为空,无法建立连接");
next(null, { up: 0, down: 0 });
return () => {};
}
console.log(`[Traffic] 正在连接: ${server}/traffic`);
const s = createAuthSockette(`${server}/traffic`, secret, {
timeout: 8000, // 8秒超时
onmessage(event) {
const data = JSON.parse(event.data) as ITrafficItem;
trafficRef.current?.appendData(data);
next(null, data);
},
onerror(event) {
console.error("[Traffic] WebSocket 连接错误", event);
this.close();
next(event, { up: 0, down: 0 });
next(null, { up: 0, down: 0 });
},
onclose(event) {
console.log("[Traffic] WebSocket 连接关闭", event);
},
);
onopen(event) {
console.log("[Traffic] WebSocket 连接已建立");
},
});
return () => {
console.log("[Traffic] 清理WebSocket连接");
try {
s.close();
} catch (e) {
console.error("[Traffic] 关闭连接时出错", e);
}
};
},
{
@ -85,22 +103,40 @@ export const LayoutTraffic = () => {
(_key, { next }) => {
const { server = "", secret = "" } = clashInfo!;
const s = createSockette(
`ws://${server}${secret ? `/memory?token=${encodeURIComponent(secret)}` : "/memory"}`,
{
if (!server) {
console.warn("[Memory] 服务器地址为空,无法建立连接");
next(null, { inuse: 0 });
return () => {};
}
console.log(`[Memory] 正在连接: ${server}/memory`);
const s = createAuthSockette(`${server}/memory`, secret, {
timeout: 8000, // 8秒超时
onmessage(event) {
const data = JSON.parse(event.data) as MemoryUsage;
next(null, data);
},
onerror(event) {
console.error("[Memory] WebSocket 连接错误", event);
this.close();
next(event, { inuse: 0 });
next(null, { inuse: 0 });
},
onclose(event) {
console.log("[Memory] WebSocket 连接关闭", event);
},
);
onopen(event) {
console.log("[Memory] WebSocket 连接已建立");
},
});
return () => {
console.log("[Memory] 清理WebSocket连接");
try {
s.close();
} catch (e) {
console.error("[Memory] 关闭连接时出错", e);
}
};
},
{

View File

@ -1,6 +1,6 @@
import { useEffect } from "react";
import { useEnableLog } from "../services/states";
import { createSockette } from "../utils/websocket";
import { createSockette, createAuthSockette } from "../utils/websocket";
import { useClashInfo } from "./use-clash";
import dayjs from "dayjs";
import { create } from "zustand";
@ -18,20 +18,16 @@ export type { ILogItem };
const MAX_LOG_NUM = 1000;
const buildWSUrl = (server: string, secret: string, logLevel: LogLevel) => {
const baseUrl = `ws://${server}/logs`;
const params = new URLSearchParams();
const buildWSUrl = (server: string, logLevel: LogLevel) => {
let baseUrl = `${server}/logs`;
if (secret) {
params.append("token", secret);
// 只处理日志级别参数
if (logLevel && logLevel !== "info") {
const level = logLevel === "all" ? "debug" : logLevel;
baseUrl += `?level=${level}`;
}
if (logLevel === "all") {
params.append("level", "debug");
} else {
params.append("level", logLevel);
}
const queryString = params.toString();
return queryString ? `${baseUrl}?${queryString}` : baseUrl;
return baseUrl;
};
interface LogStore {

View File

@ -26,7 +26,7 @@ import {
} from "@/components/base/base-search-box";
import { BaseStyledSelect } from "@/components/base/base-styled-select";
import useSWRSubscription from "swr/subscription";
import { createSockette } from "@/utils/websocket";
import { createSockette, createAuthSockette } from "@/utils/websocket";
import { useTheme } from "@mui/material/styles";
import { useVisibility } from "@/hooks/use-visibility";
@ -74,9 +74,18 @@ const ConnectionsPage = () => {
clashInfo && pageVisible ? "getClashConnections" : null,
(_key, { next }) => {
const { server = "", secret = "" } = clashInfo!;
const s = createSockette(
`ws://${server}/connections?token=${encodeURIComponent(secret)}`,
{
if (!server) {
console.warn("[Connections] 服务器地址为空,无法建立连接");
next(null, initConn);
return () => {};
}
console.log(`[Connections] 正在连接: ${server}/connections`);
// 设置较长的超时时间,确保连接可以建立
const s = createAuthSockette(`${server}/connections`, secret, {
timeout: 8000, // 8秒超时
onmessage(event) {
const data = JSON.parse(event.data) as IConnections;
next(null, (old = initConn) => {
@ -111,14 +120,25 @@ const ConnectionsPage = () => {
});
},
onerror(event) {
next(event);
console.error("[Connections] WebSocket 连接错误", event);
// 报告错误但提供空数据避免UI崩溃
next(null, initConn);
},
onclose(event) {
console.log("[Connections] WebSocket 连接关闭", event);
},
3,
);
onopen(event) {
console.log("[Connections] WebSocket 连接已建立");
},
});
return () => {
console.log("[Connections] 清理WebSocket连接");
try {
s.close();
} catch (e) {
console.error("[Connections] 关闭连接时出错", e);
}
};
},
);

View File

@ -1,6 +1,6 @@
// 全局日志服务,使应用在任何页面都能收集日志
import { create } from "zustand";
import { createSockette } from "../utils/websocket";
import { createSockette, createAuthSockette } from "@/utils/websocket";
import dayjs from "dayjs";
import { useState, useEffect } from "react";
@ -47,20 +47,16 @@ export const useGlobalLogStore = create<GlobalLogStore>((set) => ({
}));
// 构建WebSocket URL
const buildWSUrl = (server: string, secret: string, logLevel: LogLevel) => {
const baseUrl = `ws://${server}/logs`;
const params = new URLSearchParams();
const buildWSUrl = (server: string, logLevel: LogLevel) => {
let baseUrl = `${server}/logs`;
if (secret) {
params.append("token", secret);
// 只处理日志级别参数
if (logLevel && logLevel !== "info") {
const level = logLevel === "all" ? "debug" : logLevel;
baseUrl += `?level=${level}`;
}
if (logLevel === "all") {
params.append("level", "debug");
} else {
params.append("level", logLevel);
}
const queryString = params.toString();
return queryString ? `${baseUrl}?${queryString}` : baseUrl;
return baseUrl;
};
// 初始化全局日志服务
@ -86,28 +82,49 @@ export const initGlobalLogService = (
// 关闭现有连接
closeGlobalLogConnection();
// 创建新的WebSocket连接
const wsUrl = buildWSUrl(server, secret, logLevel);
globalLogSocket = createSockette(wsUrl, {
// 创建新的WebSocket连接使用新的认证方法
const wsUrl = buildWSUrl(server, logLevel);
console.log(`[GlobalLog] 正在连接日志服务: ${wsUrl}`);
if (!server) {
console.warn("[GlobalLog] 服务器地址为空,无法建立连接");
return;
}
globalLogSocket = createAuthSockette(wsUrl, secret, {
timeout: 8000, // 8秒超时
onmessage(event) {
try {
const data = JSON.parse(event.data) as ILogItem;
const time = dayjs().format("MM-DD HH:mm:ss");
appendLog({ ...data, time });
} catch (error) {
console.error("Failed to parse log data:", error);
console.error("[GlobalLog] 解析日志数据失败:", error);
}
},
onerror() {
console.error("Log WebSocket connection error");
onerror(event) {
console.error("[GlobalLog] WebSocket连接错误", event);
// 记录错误状态但不关闭连接,让重连机制起作用
useGlobalLogStore.setState({ isConnected: false });
// 只有在重试彻底失败后才关闭连接
if (
event &&
typeof event === "object" &&
"type" in event &&
event.type === "error"
) {
console.error("[GlobalLog] 连接已彻底失败,关闭连接");
closeGlobalLogConnection();
}
},
onclose() {
console.log("Log WebSocket connection closed");
onclose(event) {
console.log("[GlobalLog] WebSocket连接关闭", event);
useGlobalLogStore.setState({ isConnected: false });
},
onopen() {
console.log("Log WebSocket connection opened");
onopen(event) {
console.log("[GlobalLog] WebSocket连接已建立", event);
useGlobalLogStore.setState({ isConnected: true });
},
});

View File

@ -39,3 +39,224 @@ export const createSockette = (
},
});
};
/**
* WebSocket连接
* 使URL参数方式添加token
*
* mihomo服务器对WebSocket的认证支持不佳使URL参数方式传递token
*/
export const createAuthSockette = (
baseUrl: string,
secret: string,
opt: SocketteOptions,
maxError = 10,
) => {
// 确保baseUrl格式正确
let url = baseUrl;
if (!url.startsWith("ws://") && !url.startsWith("wss://")) {
url = `ws://${url}`;
}
// 重试控制
let reconnectAttempts = 0;
const MAX_RECONNECT = maxError;
let reconnectTimeout: any = null;
let ws: WebSocket | null = null;
// 使用URL API解析和构建URL
try {
const urlObj = new URL(url);
// 添加token参数如果有secret
if (secret) {
urlObj.searchParams.delete("token");
urlObj.searchParams.append("token", secret);
}
url = urlObj.toString();
console.log(`[WebSocket] 创建连接: ${url.replace(secret || "", "***")}`);
} catch (e) {
console.error(`[WebSocket] URL格式错误: ${url}`, e);
if (opt.onerror) {
// 使用任意类型避免类型错误
const anyOpt = opt as any;
anyOpt.onerror(
new ErrorEvent("error", { message: `URL格式错误: ${e}` } as any),
);
}
return createDummySocket();
}
function connect() {
try {
ws = new WebSocket(url);
ws.onopen = function (event) {
console.log(
`[WebSocket] 连接成功: ${url.replace(secret || "", "***")}`,
);
reconnectAttempts = 0; // 重置重连计数
if (opt.onopen) {
// 使用任意类型避免类型错误
const anyOpt = opt as any;
anyOpt.onopen(event);
}
};
ws.onmessage = function (event) {
if (opt.onmessage) {
// 使用任意类型避免类型错误
const anyOpt = opt as any;
anyOpt.onmessage(event);
}
};
ws.onerror = function (event) {
console.error(
`[WebSocket] 连接错误: ${url.replace(secret || "", "***")}`,
);
// 错误处理
if (reconnectAttempts < MAX_RECONNECT) {
scheduleReconnect();
} else if (opt.onerror) {
// 使用任意类型避免类型错误
const anyOpt = opt as any;
anyOpt.onerror(event);
}
};
ws.onclose = function (event) {
console.log(
`[WebSocket] 连接关闭: ${url.replace(secret || "", "***")}, 代码: ${event.code}`,
);
// 如果不是正常关闭(1000, 1001),尝试重连
if (
event.code !== 1000 &&
event.code !== 1001 &&
reconnectAttempts < MAX_RECONNECT
) {
scheduleReconnect();
} else {
if (opt.onclose) {
// 使用任意类型避免类型错误
const anyOpt = opt as any;
anyOpt.onclose(event);
}
// 如果已达到最大重试次数
if (reconnectAttempts >= MAX_RECONNECT && opt.onmaximum) {
console.error(
`[WebSocket] 达到最大重试次数: ${url.replace(secret || "", "***")}`,
);
const anyOpt = opt as any;
anyOpt.onmaximum(event);
}
}
};
} catch (error) {
console.error(`[WebSocket] 创建连接失败:`, error);
if (opt.onerror) {
// 使用任意类型避免类型错误
const anyOpt = opt as any;
anyOpt.onerror(
new ErrorEvent("error", { message: `创建连接失败: ${error}` } as any),
);
}
}
}
function scheduleReconnect() {
if (reconnectTimeout) {
clearTimeout(reconnectTimeout);
}
reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(1.5, reconnectAttempts), 10000); // 指数退避最大10秒
console.log(
`[WebSocket] 计划重连 (${reconnectAttempts}/${MAX_RECONNECT}) 延迟: ${delay}ms`,
);
reconnectTimeout = setTimeout(() => {
console.log(
`[WebSocket] 尝试重连 (${reconnectAttempts}/${MAX_RECONNECT})`,
);
cleanup();
connect();
}, delay);
}
function cleanup() {
if (ws) {
// 移除所有事件监听器
ws.onopen = null;
ws.onmessage = null;
ws.onerror = null;
ws.onclose = null;
// 如果连接仍然打开,关闭它
if (
ws.readyState === WebSocket.OPEN ||
ws.readyState === WebSocket.CONNECTING
) {
try {
ws.close();
} catch (e) {
console.error("[WebSocket] 关闭连接时出错:", e);
}
}
ws = null;
}
// 清除重连计时器
if (reconnectTimeout) {
clearTimeout(reconnectTimeout);
reconnectTimeout = null;
}
}
// 创建一个类似Sockette的接口对象
const socketLike = {
ws,
close: () => {
console.log(
`[WebSocket] 手动关闭连接: ${url.replace(secret || "", "***")}`,
);
cleanup();
},
reconnect: () => {
cleanup();
connect();
},
json: (data: any) => {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(data));
}
},
send: (data: string) => {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(data);
}
},
open: connect,
};
// 立即连接
connect();
return socketLike;
};
// 创建一个空的WebSocket对象
function createDummySocket() {
return {
close: () => {},
reconnect: () => {},
json: () => {},
send: () => {},
open: () => {},
};
}