mirror of
https://github.com/clash-verge-rev/clash-verge-rev
synced 2025-05-05 03:13:44 +08:00
feat: enhance traffic monitoring, optimize data handling logic, and add error retry mechanism and timeout control
This commit is contained in:
parent
73b9a71c84
commit
c72413cbe6
@ -10,7 +10,6 @@ use crate::{
|
||||
lightweight::{entry_lightweight_mode, is_in_lightweight_mode},
|
||||
mihomo::Rate,
|
||||
},
|
||||
process::AsyncHandler,
|
||||
resolve,
|
||||
utils::{dirs::find_target_icons, i18n::t, logging::Type, resolve::VERSION},
|
||||
};
|
||||
@ -390,56 +389,136 @@ impl Tray {
|
||||
// 如果已经订阅,先取消订阅
|
||||
if *self.is_subscribed.read() {
|
||||
self.unsubscribe_traffic();
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
}
|
||||
|
||||
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
|
||||
let (shutdown_tx, shutdown_rx) = broadcast::channel(3);
|
||||
*self.shutdown_tx.write() = Some(shutdown_tx);
|
||||
*self.is_subscribed.write() = true;
|
||||
|
||||
let speed_rate = Arc::clone(&self.speed_rate);
|
||||
let is_subscribed = Arc::clone(&self.is_subscribed);
|
||||
|
||||
AsyncHandler::spawn(move || {
|
||||
let mut shutdown = shutdown_rx;
|
||||
let speed_rate = speed_rate.clone(); // 确保 Arc 被正确克隆
|
||||
let is_subscribed = is_subscribed.clone();
|
||||
// 使用单线程防止阻塞主线程
|
||||
std::thread::Builder::new()
|
||||
.name("traffic-monitor".into())
|
||||
.spawn(move || {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to build tokio runtime for traffic monitor");
|
||||
// 在单独的运行时中执行异步任务
|
||||
rt.block_on(async move {
|
||||
let mut shutdown = shutdown_rx;
|
||||
let speed_rate = speed_rate.clone();
|
||||
let is_subscribed = is_subscribed.clone();
|
||||
let mut consecutive_errors = 0;
|
||||
let max_consecutive_errors = 5;
|
||||
|
||||
Box::pin(async move {
|
||||
'outer: loop {
|
||||
match Traffic::get_traffic_stream().await {
|
||||
Ok(mut stream) => loop {
|
||||
tokio::select! {
|
||||
Some(traffic) = stream.next() => {
|
||||
if let Ok(traffic) = traffic {
|
||||
let guard = speed_rate.lock();
|
||||
let enable_tray_speed: bool = Config::verge()
|
||||
.latest()
|
||||
.enable_tray_speed
|
||||
.unwrap_or(true);
|
||||
if !enable_tray_speed {
|
||||
continue;
|
||||
}
|
||||
if let Some(sr) = guard.as_ref() {
|
||||
if let Some(rate) = sr.update_and_check_changed(traffic.up, traffic.down) {
|
||||
let _ = Tray::global().update_icon(Some(rate));
|
||||
let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
|
||||
|
||||
'outer: loop {
|
||||
if !*is_subscribed.read() {
|
||||
log::info!(target: "app", "Traffic subscription has been cancelled");
|
||||
break;
|
||||
}
|
||||
|
||||
match tokio::time::timeout(
|
||||
std::time::Duration::from_secs(5),
|
||||
Traffic::get_traffic_stream()
|
||||
).await {
|
||||
Ok(stream_result) => {
|
||||
match stream_result {
|
||||
Ok(mut stream) => {
|
||||
consecutive_errors = 0;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
traffic_result = stream.next() => {
|
||||
match traffic_result {
|
||||
Some(Ok(traffic)) => {
|
||||
if let Ok(speedrate_result) = tokio::time::timeout(
|
||||
std::time::Duration::from_millis(50),
|
||||
async {
|
||||
let guard = speed_rate.try_lock();
|
||||
if let Some(guard) = guard {
|
||||
if let Some(sr) = guard.as_ref() {
|
||||
sr.update_and_check_changed(traffic.up, traffic.down)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
).await {
|
||||
if let Some(rate) = speedrate_result {
|
||||
let _ = tokio::time::timeout(
|
||||
std::time::Duration::from_millis(100),
|
||||
async { let _ = Tray::global().update_icon(Some(rate)); }
|
||||
).await;
|
||||
}
|
||||
}
|
||||
},
|
||||
Some(Err(e)) => {
|
||||
log::error!(target: "app", "Traffic stream error: {}", e);
|
||||
consecutive_errors += 1;
|
||||
if consecutive_errors >= max_consecutive_errors {
|
||||
log::error!(target: "app", "Too many errors, reconnecting traffic stream");
|
||||
break;
|
||||
}
|
||||
},
|
||||
None => {
|
||||
log::info!(target: "app", "Traffic stream ended, reconnecting");
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
_ = shutdown.recv() => {
|
||||
log::info!(target: "app", "Received shutdown signal for traffic stream");
|
||||
break 'outer;
|
||||
},
|
||||
_ = interval.tick() => {
|
||||
if !*is_subscribed.read() {
|
||||
log::info!(target: "app", "Traffic monitor detected subscription cancelled");
|
||||
break 'outer;
|
||||
}
|
||||
log::debug!(target: "app", "Traffic subscription periodic health check");
|
||||
},
|
||||
_ = tokio::time::sleep(std::time::Duration::from_secs(60)) => {
|
||||
log::info!(target: "app", "Traffic stream max active time reached, reconnecting");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
log::error!(target: "app", "Failed to get traffic stream: {}", e);
|
||||
consecutive_errors += 1;
|
||||
if consecutive_errors >= max_consecutive_errors {
|
||||
log::error!(target: "app", "Too many consecutive errors, pausing traffic monitoring");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
|
||||
consecutive_errors = 0;
|
||||
} else {
|
||||
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = shutdown.recv() => break 'outer,
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
log::error!(target: "app", "Failed to get traffic stream: {}", e);
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
if !*is_subscribed.read() {
|
||||
break;
|
||||
},
|
||||
Err(_) => {
|
||||
log::error!(target: "app", "Traffic stream initialization timed out");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
|
||||
}
|
||||
}
|
||||
|
||||
if !*is_subscribed.read() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
log::info!(target: "app", "Traffic subscription thread terminated");
|
||||
});
|
||||
})
|
||||
});
|
||||
.expect("Failed to spawn traffic monitor thread");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -9,7 +9,7 @@ use image::{GenericImageView, Rgba, RgbaImage};
|
||||
use imageproc::drawing::draw_text_mut;
|
||||
use parking_lot::Mutex;
|
||||
use std::{io::Cursor, sync::Arc};
|
||||
use tokio_tungstenite::tungstenite::{http, Message};
|
||||
use tokio_tungstenite::tungstenite::http;
|
||||
use tungstenite::client::IntoClientRequest;
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SpeedRate {
|
||||
@ -240,41 +240,90 @@ pub struct Traffic {
|
||||
impl Traffic {
|
||||
pub async fn get_traffic_stream() -> Result<impl Stream<Item = Result<Traffic, anyhow::Error>>>
|
||||
{
|
||||
use futures::future::FutureExt;
|
||||
use futures::stream::{self, StreamExt};
|
||||
use std::time::Duration;
|
||||
|
||||
// 先处理错误和超时情况
|
||||
let stream = Box::pin(
|
||||
stream::unfold((), |_| async {
|
||||
loop {
|
||||
stream::unfold((), move |_| async move {
|
||||
'retry: loop {
|
||||
log::info!(target: "app", "establishing traffic websocket connection");
|
||||
let (url, token) = MihomoManager::get_traffic_ws_url();
|
||||
let mut request = url.into_client_request().unwrap();
|
||||
request
|
||||
.headers_mut()
|
||||
.insert(http::header::AUTHORIZATION, token);
|
||||
|
||||
match tokio_tungstenite::connect_async(request).await {
|
||||
Ok((ws_stream, _)) => {
|
||||
log::info!(target: "app", "traffic ws connection established");
|
||||
return Some((
|
||||
ws_stream.map(|msg| {
|
||||
msg.map_err(anyhow::Error::from).and_then(|msg: Message| {
|
||||
let data = msg.into_text()?;
|
||||
let json: serde_json::Value = serde_json::from_str(&data)?;
|
||||
Ok(Traffic {
|
||||
up: json["up"].as_u64().unwrap_or(0),
|
||||
down: json["down"].as_u64().unwrap_or(0),
|
||||
})
|
||||
})
|
||||
}),
|
||||
(),
|
||||
));
|
||||
}
|
||||
let mut request = match url.into_client_request() {
|
||||
Ok(req) => req,
|
||||
Err(e) => {
|
||||
log::error!(target: "app", "traffic ws connection failed: {e}");
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
continue;
|
||||
log::error!(target: "app", "failed to create websocket request: {}", e);
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
continue 'retry;
|
||||
}
|
||||
};
|
||||
|
||||
request.headers_mut().insert(http::header::AUTHORIZATION, token);
|
||||
|
||||
match tokio::time::timeout(Duration::from_secs(3),
|
||||
tokio_tungstenite::connect_async(request)
|
||||
).await {
|
||||
Ok(Ok((ws_stream, _))) => {
|
||||
log::info!(target: "app", "traffic websocket connection established");
|
||||
// 设置流超时控制
|
||||
let traffic_stream = ws_stream
|
||||
.take_while(|msg| {
|
||||
let continue_stream = matches!(msg, Ok(_));
|
||||
async move { continue_stream }.boxed()
|
||||
})
|
||||
.filter_map(|msg| async move {
|
||||
match msg {
|
||||
Ok(msg) => {
|
||||
if !msg.is_text() {
|
||||
return None;
|
||||
}
|
||||
|
||||
match tokio::time::timeout(
|
||||
Duration::from_millis(200),
|
||||
async { msg.into_text() }
|
||||
).await {
|
||||
Ok(Ok(text)) => {
|
||||
match serde_json::from_str::<serde_json::Value>(&text) {
|
||||
Ok(json) => {
|
||||
let up = json["up"].as_u64().unwrap_or(0);
|
||||
let down = json["down"].as_u64().unwrap_or(0);
|
||||
Some(Ok(Traffic { up, down }))
|
||||
},
|
||||
Err(e) => {
|
||||
log::warn!(target: "app", "traffic json parse error: {} for {}", e, text);
|
||||
None
|
||||
}
|
||||
}
|
||||
},
|
||||
Ok(Err(e)) => {
|
||||
log::warn!(target: "app", "traffic text conversion error: {}", e);
|
||||
None
|
||||
},
|
||||
Err(_) => {
|
||||
log::warn!(target: "app", "traffic text processing timeout");
|
||||
None
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
log::error!(target: "app", "traffic websocket error: {}", e);
|
||||
None
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return Some((traffic_stream, ()));
|
||||
},
|
||||
Ok(Err(e)) => {
|
||||
log::error!(target: "app", "traffic websocket connection failed: {}", e);
|
||||
},
|
||||
Err(_) => {
|
||||
log::error!(target: "app", "traffic websocket connection timed out");
|
||||
}
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
}
|
||||
})
|
||||
.flatten(),
|
||||
|
Loading…
x
Reference in New Issue
Block a user