diff --git a/UPDATELOG.md b/UPDATELOG.md index 08482462..04ac0738 100644 --- a/UPDATELOG.md +++ b/UPDATELOG.md @@ -42,6 +42,7 @@ - 编辑非激活订阅的时候不在触发当前订阅配置重载 - 改进核心功能防止主进程阻塞、改进MihomoManager实现,以及优化窗口创建流程。减少应用程序可能出现的主进程卡死情况 - 优化系统代理设置更新逻辑 + - 重构前端通知系统分离通知线程防止前端卡死 ## v2.2.3 diff --git a/src-tauri/src/core/handle.rs b/src-tauri/src/core/handle.rs index 67dd9d3d..92127afa 100644 --- a/src-tauri/src/core/handle.rs +++ b/src-tauri/src/core/handle.rs @@ -1,9 +1,32 @@ use once_cell::sync::OnceCell; use parking_lot::RwLock; -use std::{sync::Arc, time::Duration}; +use std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + mpsc, Arc, + }, + thread, + time::{Duration, Instant}, +}; use tauri::{AppHandle, Emitter, Manager, WebviewWindow}; -use crate::{logging, logging_error, process::AsyncHandler, utils::logging::Type}; +use crate::{logging, utils::logging::Type}; + +/// 不同类型的前端通知 +#[derive(Debug, Clone)] +enum FrontendEvent { + RefreshClash, + RefreshVerge, + NoticeMessage { status: String, message: String }, +} + +/// 事件发送统计和监控 +#[derive(Debug, Default)] +struct EventStats { + total_sent: AtomicU64, + total_errors: AtomicU64, + last_error_time: RwLock>, +} /// 存储启动期间的错误消息 #[derive(Debug, Clone)] @@ -12,30 +35,270 @@ struct ErrorMessage { message: String, } -#[derive(Debug, Default, Clone)] +/// 全局前端通知系统 +#[derive(Debug)] +struct NotificationSystem { + sender: Option>, + worker_handle: Option>, + is_running: bool, + stats: EventStats, + last_emit_time: RwLock, + /// 当通知系统失败超过阈值时,进入紧急模式 + emergency_mode: RwLock, +} + +impl Default for NotificationSystem { + fn default() -> Self { + Self::new() + } +} + +impl NotificationSystem { + fn new() -> Self { + Self { + sender: None, + worker_handle: None, + is_running: false, + stats: EventStats::default(), + last_emit_time: RwLock::new(Instant::now()), + emergency_mode: RwLock::new(false), + } + } + + /// 启动通知处理线程 + fn start(&mut self) { + if self.is_running { + return; + } + + let (tx, rx) = mpsc::channel(); + self.sender = Some(tx); + self.is_running = true; + + *self.last_emit_time.write() = Instant::now(); + + self.worker_handle = Some( + thread::Builder::new() + .name("frontend-notifier".into()) + .spawn(move || { + let handle = Handle::global(); + + while !handle.is_exiting() { + match rx.recv_timeout(Duration::from_millis(100)) { + Ok(event) => { + let system_guard = handle.notification_system.read(); + let is_emergency = system_guard + .as_ref() + .and_then(|sys| Some(*sys.emergency_mode.read())) + .unwrap_or(false); + + if is_emergency { + if let FrontendEvent::NoticeMessage { ref status, .. } = event { + if status == "info" { + log::warn!( + "Emergency mode active, skipping info message" + ); + continue; + } + } + } + + if let Some(window) = handle.get_window() { + match event { + FrontendEvent::RefreshClash => { + Self::emit_with_timeout( + &window, + "verge://refresh-clash-config", + "yes", + &handle, + ); + } + FrontendEvent::RefreshVerge => { + Self::emit_with_timeout( + &window, + "verge://refresh-verge-config", + "yes", + &handle, + ); + } + FrontendEvent::NoticeMessage { + ref status, + ref message, + } => { + if let Err(e) = window.emit( + "verge://notice-message", + (status.clone(), message.clone()), + ) { + log::warn!("Failed to send notice: {}", e); + if let Some(sys) = system_guard.as_ref() { + sys.stats + .total_errors + .fetch_add(1, Ordering::SeqCst); + *sys.stats.last_error_time.write() = + Some(Instant::now()); + } + } else { + if let Some(sys) = system_guard.as_ref() { + sys.stats + .total_sent + .fetch_add(1, Ordering::SeqCst); + } + } + } + } + } + + thread::sleep(Duration::from_millis(20)); + } + Err(mpsc::RecvTimeoutError::Timeout) => { + continue; + } + Err(mpsc::RecvTimeoutError::Disconnected) => { + log::info!( + "Notification channel disconnected, exiting worker thread" + ); + break; + } + } + } + + log::info!("Notification worker thread exiting"); + }) + .expect("Failed to start notification worker thread"), + ); + } + + /// 使用超时控制发送事件,防止无限阻塞 + fn emit_with_timeout( + window: &WebviewWindow, + event: &str, + payload: P, + handle: &Handle, + ) { + let start = Instant::now(); + + let system_guard = handle.notification_system.read(); + if let Some(system) = system_guard.as_ref() { + *system.last_emit_time.write() = start; + + let window_label = window.label().to_string(); + let event_clone = event.to_string(); + let app_handle_clone = match handle.app_handle() { + Some(h) => h, + None => return, + }; + + let (tx, rx) = mpsc::channel(); + let _ = thread::Builder::new() + .name("emit-timeout".into()) + .spawn(move || { + if let Some(win) = app_handle_clone.get_webview_window(&window_label) { + let result = win.emit(&event_clone, payload); + let _ = tx.send(result); + } else { + let _ = tx.send(Err(tauri::Error::WebviewNotFound)); + } + }); + + match rx.recv_timeout(Duration::from_millis(500)) { + Ok(result) => { + if let Err(e) = result { + log::warn!("Failed to emit event {}: {}", event, e); + system.stats.total_errors.fetch_add(1, Ordering::SeqCst); + *system.stats.last_error_time.write() = Some(Instant::now()); + } else { + system.stats.total_sent.fetch_add(1, Ordering::SeqCst); + } + } + Err(_) => { + log::error!("Emit timed out for event: {}", event); + system.stats.total_errors.fetch_add(1, Ordering::SeqCst); + *system.stats.last_error_time.write() = Some(Instant::now()); + + let errors = system.stats.total_errors.load(Ordering::SeqCst); + if errors > 5 { + log::warn!("Too many emit errors, entering emergency mode"); + *system.emergency_mode.write() = true; + } + } + } + } + } + + /// 发送事件到队列 + fn send_event(&self, event: FrontendEvent) -> bool { + if *self.emergency_mode.read() { + if let FrontendEvent::NoticeMessage { ref status, .. } = event { + if status == "info" { + log::info!("Skipping info message in emergency mode"); + return false; + } + } + } + + if let Some(sender) = &self.sender { + match sender.send(event) { + Ok(_) => true, + Err(e) => { + log::warn!("Failed to send event to notification queue: {:?}", e); + self.stats.total_errors.fetch_add(1, Ordering::SeqCst); + *self.stats.last_error_time.write() = Some(Instant::now()); + false + } + } + } else { + log::warn!("Notification system not started, can't send event"); + false + } + } + + fn shutdown(&mut self) { + self.is_running = false; + self.sender = None; + + if let Some(handle) = self.worker_handle.take() { + let _ = handle.join(); + } + } +} + +#[derive(Debug, Clone)] pub struct Handle { pub app_handle: Arc>>, pub is_exiting: Arc>, - /// 存储启动过程中产生的错误消息队列 startup_errors: Arc>>, startup_completed: Arc>, + notification_system: Arc>>, +} + +impl Default for Handle { + fn default() -> Self { + Self { + app_handle: Arc::new(RwLock::new(None)), + is_exiting: Arc::new(RwLock::new(false)), + startup_errors: Arc::new(RwLock::new(Vec::new())), + startup_completed: Arc::new(RwLock::new(false)), + notification_system: Arc::new(RwLock::new(Some(NotificationSystem::new()))), + } + } } impl Handle { pub fn global() -> &'static Handle { static HANDLE: OnceCell = OnceCell::new(); - - HANDLE.get_or_init(|| Handle { - app_handle: Arc::new(RwLock::new(None)), - is_exiting: Arc::new(RwLock::new(false)), - startup_errors: Arc::new(RwLock::new(Vec::new())), - startup_completed: Arc::new(RwLock::new(false)), - }) + HANDLE.get_or_init(|| Handle::default()) } pub fn init(&self, app_handle: &AppHandle) { - let mut handle = self.app_handle.write(); - *handle = Some(app_handle.clone()); + { + let mut handle = self.app_handle.write(); + *handle = Some(app_handle.clone()); + } + + let mut system_opt = self.notification_system.write(); + if let Some(system) = system_opt.as_mut() { + system.start(); + } } pub fn app_handle(&self) -> Option { @@ -43,7 +306,7 @@ impl Handle { } pub fn get_window(&self) -> Option { - let app_handle = self.app_handle().unwrap(); + let app_handle = self.app_handle()?; let window: Option = app_handle.get_webview_window("main"); if window.is_none() { log::debug!(target:"app", "main window not found"); @@ -52,43 +315,35 @@ impl Handle { } pub fn refresh_clash() { - if let Some(window) = Self::global().get_window() { - logging_error!( - Type::Frontend, - true, - window.emit("verge://refresh-clash-config", "yes") - ); + let handle = Self::global(); + if handle.is_exiting() { + return; + } + + let system_opt = handle.notification_system.read(); + if let Some(system) = system_opt.as_ref() { + system.send_event(FrontendEvent::RefreshClash); } } pub fn refresh_verge() { - if let Some(window) = Self::global().get_window() { - logging_error!( - Type::Frontend, - true, - window.emit("verge://refresh-verge-config", "yes") - ); + let handle = Self::global(); + if handle.is_exiting() { + return; + } + + let system_opt = handle.notification_system.read(); + if let Some(system) = system_opt.as_ref() { + system.send_event(FrontendEvent::RefreshVerge); } } - #[allow(unused)] - pub fn refresh_profiles() { - if let Some(window) = Self::global().get_window() { - logging_error!( - Type::Frontend, - true, - window.emit("verge://refresh-profiles-config", "yes") - ); - } - } - - /// 通知前端显示消息,如果在启动过程中,则将消息存入启动错误队列 + /// 通知前端显示消息队列 pub fn notice_message, M: Into>(status: S, msg: M) { let handle = Self::global(); let status_str = status.into(); let msg_str = msg.into(); - // 检查是否正在启动过程中 if !*handle.startup_completed.read() { logging!( info, @@ -99,7 +354,6 @@ impl Handle { msg_str ); - // 将消息添加到启动错误队列 let mut errors = handle.startup_errors.write(); errors.push(ErrorMessage { status: status_str, @@ -108,32 +362,19 @@ impl Handle { return; } - // 使用AsyncHandler发送消息,防止阻塞 - let status_clone = status_str.clone(); - let msg_clone = msg_str.clone(); + if handle.is_exiting() { + return; + } - crate::process::AsyncHandler::spawn(move || async move { - let handle_clone = Self::global(); - if let Some(window) = handle_clone.get_window() { - match tokio::time::timeout(tokio::time::Duration::from_millis(500), async { - window.emit("verge://notice-message", (status_clone, msg_clone)) - }) - .await - { - Ok(result) => { - if let Err(e) = result { - logging!(warn, Type::Frontend, true, "发送通知消息失败: {}", e); - } - } - Err(_) => { - logging!(warn, Type::Frontend, true, "发送通知消息超时"); - } - } - } - }); + let system_opt = handle.notification_system.read(); + if let Some(system) = system_opt.as_ref() { + system.send_event(FrontendEvent::NoticeMessage { + status: status_str, + message: msg_str, + }); + } } - /// 标记启动已完成,并发送所有启动阶段累积的错误消息 pub fn mark_startup_completed(&self) { { let mut completed = self.startup_completed.write(); @@ -162,26 +403,47 @@ impl Handle { errors.len() ); - // 等待2秒以确保前端已完全加载,延迟发送错误通知 - if let Some(window) = self.get_window() { - let window_clone = window.clone(); - let errors_clone = errors.clone(); + // 启动单独线程处理启动错误,避免阻塞主线程 + let thread_result = thread::Builder::new() + .name("startup-errors-sender".into()) + .spawn(move || { + thread::sleep(Duration::from_secs(2)); - AsyncHandler::spawn(move || async move { - tokio::time::sleep(Duration::from_secs(2)).await; + let handle = Handle::global(); + if handle.is_exiting() { + return; + } - for error in errors_clone { - let _ = - window_clone.emit("verge://notice-message", (error.status, error.message)); - tokio::time::sleep(Duration::from_millis(500)).await; + let system_opt = handle.notification_system.read(); + if let Some(system) = system_opt.as_ref() { + for error in errors { + if handle.is_exiting() { + break; + } + + system.send_event(FrontendEvent::NoticeMessage { + status: error.status, + message: error.message, + }); + + thread::sleep(Duration::from_millis(300)); + } } }); + + if let Err(e) = thread_result { + log::error!("Failed to spawn startup errors thread: {}", e); } } pub fn set_is_exiting(&self) { let mut is_exiting = self.is_exiting.write(); *is_exiting = true; + + let mut system_opt = self.notification_system.write(); + if let Some(system) = system_opt.as_mut() { + system.shutdown(); + } } pub fn is_exiting(&self) -> bool {