diff --git a/src-tauri/src/core/tray/mod.rs b/src-tauri/src/core/tray/mod.rs index f2639af0..f7e12fe9 100644 --- a/src-tauri/src/core/tray/mod.rs +++ b/src-tauri/src/core/tray/mod.rs @@ -10,7 +10,6 @@ use crate::{ lightweight::{entry_lightweight_mode, is_in_lightweight_mode}, mihomo::Rate, }, - process::AsyncHandler, resolve, utils::{dirs::find_target_icons, i18n::t, logging::Type, resolve::VERSION}, }; @@ -390,56 +389,136 @@ impl Tray { // 如果已经订阅,先取消订阅 if *self.is_subscribed.read() { self.unsubscribe_traffic(); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; } - let (shutdown_tx, shutdown_rx) = broadcast::channel(1); + let (shutdown_tx, shutdown_rx) = broadcast::channel(3); *self.shutdown_tx.write() = Some(shutdown_tx); *self.is_subscribed.write() = true; let speed_rate = Arc::clone(&self.speed_rate); let is_subscribed = Arc::clone(&self.is_subscribed); - AsyncHandler::spawn(move || { - let mut shutdown = shutdown_rx; - let speed_rate = speed_rate.clone(); // 确保 Arc 被正确克隆 - let is_subscribed = is_subscribed.clone(); + // 使用单线程防止阻塞主线程 + std::thread::Builder::new() + .name("traffic-monitor".into()) + .spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Failed to build tokio runtime for traffic monitor"); + // 在单独的运行时中执行异步任务 + rt.block_on(async move { + let mut shutdown = shutdown_rx; + let speed_rate = speed_rate.clone(); + let is_subscribed = is_subscribed.clone(); + let mut consecutive_errors = 0; + let max_consecutive_errors = 5; - Box::pin(async move { - 'outer: loop { - match Traffic::get_traffic_stream().await { - Ok(mut stream) => loop { - tokio::select! { - Some(traffic) = stream.next() => { - if let Ok(traffic) = traffic { - let guard = speed_rate.lock(); - let enable_tray_speed: bool = Config::verge() - .latest() - .enable_tray_speed - .unwrap_or(true); - if !enable_tray_speed { - continue; - } - if let Some(sr) = guard.as_ref() { - if let Some(rate) = sr.update_and_check_changed(traffic.up, traffic.down) { - let _ = Tray::global().update_icon(Some(rate)); + let mut interval = tokio::time::interval(std::time::Duration::from_secs(10)); + + 'outer: loop { + if !*is_subscribed.read() { + log::info!(target: "app", "Traffic subscription has been cancelled"); + break; + } + + match tokio::time::timeout( + std::time::Duration::from_secs(5), + Traffic::get_traffic_stream() + ).await { + Ok(stream_result) => { + match stream_result { + Ok(mut stream) => { + consecutive_errors = 0; + + loop { + tokio::select! { + traffic_result = stream.next() => { + match traffic_result { + Some(Ok(traffic)) => { + if let Ok(speedrate_result) = tokio::time::timeout( + std::time::Duration::from_millis(50), + async { + let guard = speed_rate.try_lock(); + if let Some(guard) = guard { + if let Some(sr) = guard.as_ref() { + sr.update_and_check_changed(traffic.up, traffic.down) + } else { + None + } + } else { + None + } + } + ).await { + if let Some(rate) = speedrate_result { + let _ = tokio::time::timeout( + std::time::Duration::from_millis(100), + async { let _ = Tray::global().update_icon(Some(rate)); } + ).await; + } + } + }, + Some(Err(e)) => { + log::error!(target: "app", "Traffic stream error: {}", e); + consecutive_errors += 1; + if consecutive_errors >= max_consecutive_errors { + log::error!(target: "app", "Too many errors, reconnecting traffic stream"); + break; + } + }, + None => { + log::info!(target: "app", "Traffic stream ended, reconnecting"); + break; + } + } + }, + _ = shutdown.recv() => { + log::info!(target: "app", "Received shutdown signal for traffic stream"); + break 'outer; + }, + _ = interval.tick() => { + if !*is_subscribed.read() { + log::info!(target: "app", "Traffic monitor detected subscription cancelled"); + break 'outer; + } + log::debug!(target: "app", "Traffic subscription periodic health check"); + }, + _ = tokio::time::sleep(std::time::Duration::from_secs(60)) => { + log::info!(target: "app", "Traffic stream max active time reached, reconnecting"); + break; + } } } + }, + Err(e) => { + log::error!(target: "app", "Failed to get traffic stream: {}", e); + consecutive_errors += 1; + if consecutive_errors >= max_consecutive_errors { + log::error!(target: "app", "Too many consecutive errors, pausing traffic monitoring"); + tokio::time::sleep(std::time::Duration::from_secs(30)).await; + consecutive_errors = 0; + } else { + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + } } } - _ = shutdown.recv() => break 'outer, - } - }, - Err(e) => { - log::error!(target: "app", "Failed to get traffic stream: {}", e); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - if !*is_subscribed.read() { - break; + }, + Err(_) => { + log::error!(target: "app", "Traffic stream initialization timed out"); + tokio::time::sleep(std::time::Duration::from_secs(2)).await; } } + + if !*is_subscribed.read() { + break; + } } - } + log::info!(target: "app", "Traffic subscription thread terminated"); + }); }) - }); + .expect("Failed to spawn traffic monitor thread"); Ok(()) } diff --git a/src-tauri/src/core/tray/speed_rate.rs b/src-tauri/src/core/tray/speed_rate.rs index 5ecb0182..c36fc0ac 100644 --- a/src-tauri/src/core/tray/speed_rate.rs +++ b/src-tauri/src/core/tray/speed_rate.rs @@ -9,7 +9,7 @@ use image::{GenericImageView, Rgba, RgbaImage}; use imageproc::drawing::draw_text_mut; use parking_lot::Mutex; use std::{io::Cursor, sync::Arc}; -use tokio_tungstenite::tungstenite::{http, Message}; +use tokio_tungstenite::tungstenite::http; use tungstenite::client::IntoClientRequest; #[derive(Debug, Clone)] pub struct SpeedRate { @@ -240,41 +240,90 @@ pub struct Traffic { impl Traffic { pub async fn get_traffic_stream() -> Result>> { + use futures::future::FutureExt; use futures::stream::{self, StreamExt}; use std::time::Duration; + // 先处理错误和超时情况 let stream = Box::pin( - stream::unfold((), |_| async { - loop { + stream::unfold((), move |_| async move { + 'retry: loop { + log::info!(target: "app", "establishing traffic websocket connection"); let (url, token) = MihomoManager::get_traffic_ws_url(); - let mut request = url.into_client_request().unwrap(); - request - .headers_mut() - .insert(http::header::AUTHORIZATION, token); - - match tokio_tungstenite::connect_async(request).await { - Ok((ws_stream, _)) => { - log::info!(target: "app", "traffic ws connection established"); - return Some(( - ws_stream.map(|msg| { - msg.map_err(anyhow::Error::from).and_then(|msg: Message| { - let data = msg.into_text()?; - let json: serde_json::Value = serde_json::from_str(&data)?; - Ok(Traffic { - up: json["up"].as_u64().unwrap_or(0), - down: json["down"].as_u64().unwrap_or(0), - }) - }) - }), - (), - )); - } + let mut request = match url.into_client_request() { + Ok(req) => req, Err(e) => { - log::error!(target: "app", "traffic ws connection failed: {e}"); - tokio::time::sleep(Duration::from_secs(5)).await; - continue; + log::error!(target: "app", "failed to create websocket request: {}", e); + tokio::time::sleep(Duration::from_secs(2)).await; + continue 'retry; + } + }; + + request.headers_mut().insert(http::header::AUTHORIZATION, token); + + match tokio::time::timeout(Duration::from_secs(3), + tokio_tungstenite::connect_async(request) + ).await { + Ok(Ok((ws_stream, _))) => { + log::info!(target: "app", "traffic websocket connection established"); + // 设置流超时控制 + let traffic_stream = ws_stream + .take_while(|msg| { + let continue_stream = matches!(msg, Ok(_)); + async move { continue_stream }.boxed() + }) + .filter_map(|msg| async move { + match msg { + Ok(msg) => { + if !msg.is_text() { + return None; + } + + match tokio::time::timeout( + Duration::from_millis(200), + async { msg.into_text() } + ).await { + Ok(Ok(text)) => { + match serde_json::from_str::(&text) { + Ok(json) => { + let up = json["up"].as_u64().unwrap_or(0); + let down = json["down"].as_u64().unwrap_or(0); + Some(Ok(Traffic { up, down })) + }, + Err(e) => { + log::warn!(target: "app", "traffic json parse error: {} for {}", e, text); + None + } + } + }, + Ok(Err(e)) => { + log::warn!(target: "app", "traffic text conversion error: {}", e); + None + }, + Err(_) => { + log::warn!(target: "app", "traffic text processing timeout"); + None + } + } + }, + Err(e) => { + log::error!(target: "app", "traffic websocket error: {}", e); + None + } + } + }); + + return Some((traffic_stream, ())); + }, + Ok(Err(e)) => { + log::error!(target: "app", "traffic websocket connection failed: {}", e); + }, + Err(_) => { + log::error!(target: "app", "traffic websocket connection timed out"); } } + + tokio::time::sleep(Duration::from_secs(2)).await; } }) .flatten(),