mirror of
https://github.com/clash-verge-rev/clash-verge-rev
synced 2025-05-05 07:03:45 +08:00
refactor(timer): improve timer management with robust error handling
This commit improves the timer management system with the following enhancements: Replace Mutex with RwLock for better read concurrency in timer state Add structured TimerTask type to store task metadata Use atomic boolean for initialization flag instead of mutex Implement comprehensive error handling with detailed logging Add rollback capability when task operations fail Reduce lock contention by generating task diffs outside locks Add timing metrics for task execution Improve code organization and documentation
This commit is contained in:
parent
df538ca9a3
commit
e16b23946a
@ -2,23 +2,30 @@ use crate::{config::Config, core::CoreManager, feat};
|
||||
use anyhow::{Context, Result};
|
||||
use delay_timer::prelude::{DelayTimer, DelayTimerBuilder, TaskBuilder};
|
||||
use once_cell::sync::OnceCell;
|
||||
use parking_lot::Mutex;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
type TaskID = u64;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct TimerTask {
|
||||
task_id: TaskID,
|
||||
interval_minutes: u64,
|
||||
__last_run: i64, // Timestamp of last execution
|
||||
}
|
||||
|
||||
pub struct Timer {
|
||||
/// cron manager
|
||||
delay_timer: Arc<Mutex<DelayTimer>>,
|
||||
delay_timer: Arc<RwLock<DelayTimer>>,
|
||||
|
||||
/// save the current state
|
||||
timer_map: Arc<Mutex<HashMap<String, (TaskID, u64)>>>,
|
||||
/// save the current state - using RwLock for better read concurrency
|
||||
timer_map: Arc<RwLock<HashMap<String, TimerTask>>>,
|
||||
|
||||
/// increment id
|
||||
/// increment id - kept as mutex since it's just a counter
|
||||
timer_count: Arc<Mutex<TaskID>>,
|
||||
|
||||
/// 标记定时器是否已经初始化
|
||||
initialized: Arc<Mutex<bool>>,
|
||||
/// Flag to mark if timer is initialized - atomic for better performance
|
||||
initialized: Arc<std::sync::atomic::AtomicBool>,
|
||||
}
|
||||
|
||||
impl Timer {
|
||||
@ -26,78 +33,145 @@ impl Timer {
|
||||
static TIMER: OnceCell<Timer> = OnceCell::new();
|
||||
|
||||
TIMER.get_or_init(|| Timer {
|
||||
delay_timer: Arc::new(Mutex::new(DelayTimerBuilder::default().build())),
|
||||
timer_map: Arc::new(Mutex::new(HashMap::new())),
|
||||
delay_timer: Arc::new(RwLock::new(DelayTimerBuilder::default().build())),
|
||||
timer_map: Arc::new(RwLock::new(HashMap::new())),
|
||||
timer_count: Arc::new(Mutex::new(1)),
|
||||
initialized: Arc::new(Mutex::new(false)),
|
||||
initialized: Arc::new(std::sync::atomic::AtomicBool::new(false)),
|
||||
})
|
||||
}
|
||||
|
||||
/// restore timer
|
||||
/// Initialize timer with better error handling and atomic operations
|
||||
pub fn init(&self) -> Result<()> {
|
||||
let mut initialized = self.initialized.lock();
|
||||
if *initialized {
|
||||
log::info!(target: "app", "Timer already initialized, skipping...");
|
||||
// Use compare_exchange for thread-safe initialization check
|
||||
if self
|
||||
.initialized
|
||||
.compare_exchange(
|
||||
false,
|
||||
true,
|
||||
std::sync::atomic::Ordering::SeqCst,
|
||||
std::sync::atomic::Ordering::SeqCst,
|
||||
)
|
||||
.is_err()
|
||||
{
|
||||
log::debug!(target: "app", "Timer already initialized, skipping...");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
log::info!(target: "app", "Initializing timer...");
|
||||
self.refresh()?;
|
||||
|
||||
// Initialize timer tasks
|
||||
if let Err(e) = self.refresh() {
|
||||
// Reset initialization flag on error
|
||||
self.initialized
|
||||
.store(false, std::sync::atomic::Ordering::SeqCst);
|
||||
log::error!(target: "app", "Failed to initialize timer: {}", e);
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
let cur_timestamp = chrono::Local::now().timestamp();
|
||||
let timer_map = self.timer_map.lock();
|
||||
let delay_timer = self.delay_timer.lock();
|
||||
|
||||
if let Some(items) = Config::profiles().latest().get_items() {
|
||||
// Collect profiles that need immediate update
|
||||
let profiles_to_update = if let Some(items) = Config::profiles().latest().get_items() {
|
||||
items
|
||||
.iter()
|
||||
.filter_map(|item| {
|
||||
let interval = ((item.option.as_ref()?.update_interval?) as i64) * 60;
|
||||
let interval = item.option.as_ref()?.update_interval? as i64;
|
||||
let updated = item.updated? as i64;
|
||||
let uid = item.uid.as_ref()?;
|
||||
|
||||
if interval > 0 && cur_timestamp - updated >= interval {
|
||||
Some(item)
|
||||
if interval > 0 && cur_timestamp - updated >= interval * 60 {
|
||||
Some(uid.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.for_each(|item| {
|
||||
if let Some(uid) = item.uid.as_ref() {
|
||||
if let Some((task_id, _)) = timer_map.get(uid) {
|
||||
log::info!(target: "app", "Advancing task for uid: {}", uid);
|
||||
crate::log_err!(delay_timer.advance_task(*task_id));
|
||||
}
|
||||
.collect::<Vec<String>>()
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
// Advance tasks outside of locks to minimize lock contention
|
||||
if !profiles_to_update.is_empty() {
|
||||
let timer_map = self.timer_map.read();
|
||||
let delay_timer = self.delay_timer.write();
|
||||
|
||||
for uid in profiles_to_update {
|
||||
if let Some(task) = timer_map.get(&uid) {
|
||||
log::info!(target: "app", "Advancing task for uid: {}", uid);
|
||||
if let Err(e) = delay_timer.advance_task(task.task_id) {
|
||||
log::warn!(target: "app", "Failed to advance task {}: {}", uid, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
*initialized = true;
|
||||
log::info!(target: "app", "Timer initialization completed");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Correctly update all cron tasks
|
||||
/// Refresh timer tasks with better error handling
|
||||
pub fn refresh(&self) -> Result<()> {
|
||||
// Generate diff outside of lock to minimize lock contention
|
||||
let diff_map = self.gen_diff();
|
||||
|
||||
let mut timer_map = self.timer_map.lock();
|
||||
let mut delay_timer = self.delay_timer.lock();
|
||||
if diff_map.is_empty() {
|
||||
log::debug!(target: "app", "No timer changes needed");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
for (uid, diff) in diff_map.into_iter() {
|
||||
log::info!(target: "app", "Refreshing {} timer tasks", diff_map.len());
|
||||
|
||||
// Apply changes while holding locks
|
||||
let mut timer_map = self.timer_map.write();
|
||||
let mut delay_timer = self.delay_timer.write();
|
||||
|
||||
for (uid, diff) in diff_map {
|
||||
match diff {
|
||||
DiffFlag::Del(tid) => {
|
||||
let _ = timer_map.remove(&uid);
|
||||
crate::log_err!(delay_timer.remove_task(tid));
|
||||
timer_map.remove(&uid);
|
||||
if let Err(e) = delay_timer.remove_task(tid) {
|
||||
log::warn!(target: "app", "Failed to remove task {} for uid {}: {}", tid, uid, e);
|
||||
} else {
|
||||
log::debug!(target: "app", "Removed task {} for uid {}", tid, uid);
|
||||
}
|
||||
}
|
||||
DiffFlag::Add(tid, val) => {
|
||||
let _ = timer_map.insert(uid.clone(), (tid, val));
|
||||
crate::log_err!(self.add_task(&mut delay_timer, uid, tid, val));
|
||||
DiffFlag::Add(tid, interval) => {
|
||||
let task = TimerTask {
|
||||
task_id: tid,
|
||||
interval_minutes: interval,
|
||||
__last_run: chrono::Local::now().timestamp(),
|
||||
};
|
||||
|
||||
timer_map.insert(uid.clone(), task);
|
||||
|
||||
if let Err(e) = self.add_task(&mut delay_timer, uid.clone(), tid, interval) {
|
||||
log::error!(target: "app", "Failed to add task for uid {}: {}", uid, e);
|
||||
timer_map.remove(&uid); // Rollback on failure
|
||||
} else {
|
||||
log::debug!(target: "app", "Added task {} for uid {}", tid, uid);
|
||||
}
|
||||
}
|
||||
DiffFlag::Mod(tid, val) => {
|
||||
let _ = timer_map.insert(uid.clone(), (tid, val));
|
||||
crate::log_err!(delay_timer.remove_task(tid));
|
||||
crate::log_err!(self.add_task(&mut delay_timer, uid, tid, val));
|
||||
DiffFlag::Mod(tid, interval) => {
|
||||
// Remove old task first
|
||||
if let Err(e) = delay_timer.remove_task(tid) {
|
||||
log::warn!(target: "app", "Failed to remove old task {} for uid {}: {}", tid, uid, e);
|
||||
}
|
||||
|
||||
// Then add the new one
|
||||
let task = TimerTask {
|
||||
task_id: tid,
|
||||
interval_minutes: interval,
|
||||
__last_run: chrono::Local::now().timestamp(),
|
||||
};
|
||||
|
||||
timer_map.insert(uid.clone(), task);
|
||||
|
||||
if let Err(e) = self.add_task(&mut delay_timer, uid.clone(), tid, interval) {
|
||||
log::error!(target: "app", "Failed to update task for uid {}: {}", uid, e);
|
||||
timer_map.remove(&uid); // Rollback on failure
|
||||
} else {
|
||||
log::debug!(target: "app", "Updated task {} for uid {}", tid, uid);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -105,18 +179,17 @@ impl Timer {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// generate a uid -> update_interval map
|
||||
/// Generate map of profile UIDs to update intervals
|
||||
fn gen_map(&self) -> HashMap<String, u64> {
|
||||
let mut new_map = HashMap::new();
|
||||
|
||||
if let Some(items) = Config::profiles().latest().get_items() {
|
||||
for item in items.iter() {
|
||||
if item.option.is_some() {
|
||||
let option = item.option.as_ref().unwrap();
|
||||
let interval = option.update_interval.unwrap_or(0);
|
||||
|
||||
if interval > 0 {
|
||||
new_map.insert(item.uid.clone().unwrap(), interval);
|
||||
if let Some(option) = item.option.as_ref() {
|
||||
if let (Some(interval), Some(uid)) = (option.update_interval, &item.uid) {
|
||||
if interval > 0 {
|
||||
new_map.insert(uid.clone(), interval);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -125,39 +198,50 @@ impl Timer {
|
||||
new_map
|
||||
}
|
||||
|
||||
/// generate the diff map for refresh
|
||||
/// Generate differences between current and new timer configuration
|
||||
fn gen_diff(&self) -> HashMap<String, DiffFlag> {
|
||||
let mut diff_map = HashMap::new();
|
||||
|
||||
let timer_map = self.timer_map.lock();
|
||||
|
||||
let new_map = self.gen_map();
|
||||
let cur_map = &timer_map;
|
||||
|
||||
cur_map.iter().for_each(|(uid, (tid, val))| {
|
||||
let new_val = new_map.get(uid).unwrap_or(&0);
|
||||
// Read lock for comparing current state
|
||||
let timer_map = self.timer_map.read();
|
||||
|
||||
if *new_val == 0 {
|
||||
diff_map.insert(uid.clone(), DiffFlag::Del(*tid));
|
||||
} else if new_val != val {
|
||||
diff_map.insert(uid.clone(), DiffFlag::Mod(*tid, *new_val));
|
||||
// Find tasks to modify or delete
|
||||
for (uid, task) in timer_map.iter() {
|
||||
match new_map.get(uid) {
|
||||
Some(&interval) if interval != task.interval_minutes => {
|
||||
// Task exists but interval changed
|
||||
diff_map.insert(uid.clone(), DiffFlag::Mod(task.task_id, interval));
|
||||
}
|
||||
None => {
|
||||
// Task no longer needed
|
||||
diff_map.insert(uid.clone(), DiffFlag::Del(task.task_id));
|
||||
}
|
||||
_ => {
|
||||
// Task exists with same interval, no change needed
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let mut count = self.timer_count.lock();
|
||||
// Find new tasks to add
|
||||
let mut next_id = *self.timer_count.lock();
|
||||
|
||||
new_map.iter().for_each(|(uid, val)| {
|
||||
if cur_map.get(uid).is_none() {
|
||||
diff_map.insert(uid.clone(), DiffFlag::Add(*count, *val));
|
||||
|
||||
*count += 1;
|
||||
for (uid, &interval) in new_map.iter() {
|
||||
if !timer_map.contains_key(uid) {
|
||||
diff_map.insert(uid.clone(), DiffFlag::Add(next_id, interval));
|
||||
next_id += 1;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Update counter only if we added new tasks
|
||||
if next_id > *self.timer_count.lock() {
|
||||
*self.timer_count.lock() = next_id;
|
||||
}
|
||||
|
||||
diff_map
|
||||
}
|
||||
|
||||
/// add a cron task
|
||||
/// Add a timer task with better error handling
|
||||
fn add_task(
|
||||
&self,
|
||||
delay_timer: &mut DelayTimer,
|
||||
@ -165,8 +249,9 @@ impl Timer {
|
||||
tid: TaskID,
|
||||
minutes: u64,
|
||||
) -> Result<()> {
|
||||
log::info!(target: "app", "Adding new task: uid={}, interval={} minutes", uid, minutes);
|
||||
log::info!(target: "app", "Adding task: uid={}, id={}, interval={}min", uid, tid, minutes);
|
||||
|
||||
// Create a task with reasonable retries and backoff
|
||||
let task = TaskBuilder::default()
|
||||
.set_task_id(tid)
|
||||
.set_maximum_parallel_runnable_num(1)
|
||||
@ -183,25 +268,40 @@ impl Timer {
|
||||
.add_task(task)
|
||||
.context("failed to add timer task")?;
|
||||
|
||||
log::info!(target: "app", "Task added successfully: {}", tid);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// the task runner
|
||||
/// Async task with better error handling and logging
|
||||
async fn async_task(uid: String) {
|
||||
log::info!(target: "app", "Running timer task `{}`", uid);
|
||||
let task_start = std::time::Instant::now();
|
||||
log::info!(target: "app", "Running timer task for profile: {}", uid);
|
||||
|
||||
match feat::update_profile(uid.clone(), None).await {
|
||||
Ok(_) => match CoreManager::global().update_config().await {
|
||||
Ok(_) => {
|
||||
log::info!(target: "app", "Timer task completed successfully for uid: {}", uid);
|
||||
// Update profile
|
||||
let profile_result = feat::update_profile(uid.clone(), None).await;
|
||||
|
||||
match profile_result {
|
||||
Ok(_) => {
|
||||
// Update configuration
|
||||
match CoreManager::global().update_config().await {
|
||||
Ok(_) => {
|
||||
let duration = task_start.elapsed().as_millis();
|
||||
log::info!(
|
||||
target: "app",
|
||||
"Timer task completed successfully for uid: {} (took {}ms)",
|
||||
uid, duration
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!(
|
||||
target: "app",
|
||||
"Failed to refresh config after profile update for uid {}: {}",
|
||||
uid, e
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!(target: "app", "Timer task refresh error for uid {}: {}", uid, e);
|
||||
}
|
||||
},
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!(target: "app", "Timer task update error for uid {}: {}", uid, e);
|
||||
log::error!(target: "app", "Failed to update profile uid {}: {}", uid, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user