Merge branch 'mihomo-socket' into dev

This commit is contained in:
Tunglies 2025-04-16 00:46:36 +08:00
commit 36ac1ab488
10 changed files with 484 additions and 202 deletions

View File

@ -0,0 +1 @@
.env

View File

@ -2,13 +2,20 @@
name = "mihomo_api" name = "mihomo_api"
edition = "2024" edition = "2024"
[features]
debug = []
[dependencies] [dependencies]
reqwest = { version = "0.12.15", features = ["json"] } async-trait = "0.1.88"
serde = { version = "1.0.219", features = ["derive"] } futures = "0.3.31"
http-body-util = "0.1.3"
hyper = { version = "1.6.0", features = ["http1", "client"] }
hyper-util = "0.1.11"
serde_json = "1.0.140" serde_json = "1.0.140"
time = "0.3.41"
tokio = { version = "1.44.1", features = ["rt", "macros", "rt-multi-thread", "io-std", "net", "io-util", "time"] }
tokio-util = { version = "0.7.14", features = ["codec"] }
[dev-dependencies] [dev-dependencies]
tokio = { version = "1.44.1", features = ["rt", "macros"] } dotenv = "0.15.0"
lazy_static = "1.5.0"
[target.'cfg(unix)'.dependencies]
hyperlocal = "0.9.1"

View File

@ -0,0 +1,13 @@
$pipeName = "\\.\pipe\mihomo"
$pipe = new-object System.IO.Pipes.NamedPipeClientStream(".", "mihomo", [System.IO.Pipes.PipeDirection]::InOut)
$pipe.Connect(1000) # 尝试连接 1 秒
if ($pipe.IsConnected) {
Write-Host "成功连接到管道"
# 示例写入或读取可以加上如下内容
# $writer = new-object System.IO.StreamWriter($pipe)
# $writer.WriteLine("hello pipe")
# $writer.Flush()
$pipe.Close()
} else {
Write-Host "连接失败"
}

View File

@ -1,162 +1,49 @@
use reqwest::{Method, header::HeaderMap}; // impl MihomoManager {
use serde_json::json; // pub async fn patch_configs(&self, config: serde_json::Value) -> Result<(), String> {
use std::{ // let url = format!("{}/configs", self.mihomo_server);
sync::{Arc, Mutex}, // let response = self.send_request(Method::PATCH, url, Some(config)).await?;
time::Duration, // if response["code"] == 204 {
}; // Ok(())
// } else {
// Err(response["message"]
// .as_str()
// .unwrap_or("unknown error")
// .to_string())
// }
// }
// pub async fn test_proxy_delay(
// &self,
// name: &str,
// test_url: Option<String>,
// timeout: i32,
// ) -> Result<serde_json::Value, String> {
// let test_url = test_url.unwrap_or("http://cp.cloudflare.com/generate_204".to_string());
// let url = format!(
// "{}/proxies/{}/delay?url={}&timeout={}",
// self.mihomo_server, name, test_url, timeout
// );
// let response = self.send_request(Method::GET, url, None).await?;
// Ok(response)
// }
// pub async fn delete_connection(&self, id: &str) -> Result<(), String> {
// let url = format!("{}/connections/{}", self.mihomo_server, id);
// let response = self.send_request(Method::DELETE, url, None).await?;
// if response["code"] == 204 {
// Ok(())
// } else {
// Err(response["message"]
// .as_str()
// .unwrap_or("unknown error")
// .to_string())
// }
// }
// }
pub mod model; pub mod model;
pub use model::{MihomoData, MihomoManager}; pub use model::E;
pub use model::MihomoData;
impl MihomoManager { pub use model::MihomoManager;
pub fn new(mihomo_server: String, headers: HeaderMap) -> Self { pub mod sock;
Self { pub mod platform;
mihomo_server,
data: Arc::new(Mutex::new(MihomoData {
proxies: serde_json::Value::Null,
providers_proxies: serde_json::Value::Null,
})),
headers,
}
}
fn update_proxies(&self, proxies: serde_json::Value) {
let mut data = self.data.lock().unwrap();
data.proxies = proxies;
}
fn update_providers_proxies(&self, providers_proxies: serde_json::Value) {
let mut data = self.data.lock().unwrap();
data.providers_proxies = providers_proxies;
}
pub fn get_mihomo_server(&self) -> String {
self.mihomo_server.clone()
}
pub fn get_proxies(&self) -> serde_json::Value {
let data = self.data.lock().unwrap();
data.proxies.clone()
}
pub fn get_providers_proxies(&self) -> serde_json::Value {
let data = self.data.lock().unwrap();
data.providers_proxies.clone()
}
async fn send_request(
&self,
method: Method,
url: String,
data: Option<serde_json::Value>,
) -> Result<serde_json::Value, String> {
let client_response = reqwest::ClientBuilder::new()
.default_headers(self.headers.clone())
.no_proxy()
.timeout(Duration::from_secs(60))
.build()
.map_err(|e| e.to_string())?
.request(method.clone(), &url)
.json(&data.unwrap_or(json!({})))
.send()
.await
.map_err(|e| e.to_string())?;
let response = match method {
Method::PATCH => {
let status = client_response.status();
if status.as_u16() == 204 {
json!({"code": 204})
} else {
client_response
.json::<serde_json::Value>()
.await
.map_err(|e| e.to_string())?
}
}
Method::PUT => json!(client_response.text().await.map_err(|e| e.to_string())?),
_ => client_response
.json::<serde_json::Value>()
.await
.map_err(|e| e.to_string())?,
};
Ok(response)
}
pub async fn refresh_proxies(&self) -> Result<&Self, String> {
let url = format!("{}/proxies", self.mihomo_server);
let proxies = self.send_request(Method::GET, url, None).await?;
self.update_proxies(proxies);
Ok(self)
}
pub async fn refresh_providers_proxies(&self) -> Result<&Self, String> {
let url = format!("{}/providers/proxies", self.mihomo_server);
let providers_proxies = self.send_request(Method::GET, url, None).await?;
self.update_providers_proxies(providers_proxies);
Ok(self)
}
}
impl MihomoManager {
pub async fn is_mihomo_running(&self) -> Result<(), String> {
let url = format!("{}/version", self.mihomo_server);
let _response = self.send_request(Method::GET, url, None).await?;
Ok(())
}
pub async fn put_configs_force(&self, clash_config_path: &str) -> Result<(), String> {
let url = format!("{}/configs?force=true", self.mihomo_server);
let payload = serde_json::json!({
"path": clash_config_path,
});
let _response = self.send_request(Method::PUT, url, Some(payload)).await?;
Ok(())
}
pub async fn patch_configs(&self, config: serde_json::Value) -> Result<(), String> {
let url = format!("{}/configs", self.mihomo_server);
let response = self.send_request(Method::PATCH, url, Some(config)).await?;
if response["code"] == 204 {
Ok(())
} else {
Err(response["message"]
.as_str()
.unwrap_or("unknown error")
.to_string())
}
}
pub async fn test_proxy_delay(
&self,
name: &str,
test_url: Option<String>,
timeout: i32,
) -> Result<serde_json::Value, String> {
let test_url = test_url.unwrap_or("http://cp.cloudflare.com/generate_204".to_string());
let url = format!(
"{}/proxies/{}/delay?url={}&timeout={}",
self.mihomo_server, name, test_url, timeout
);
let response = self.send_request(Method::GET, url, None).await?;
Ok(response)
}
pub async fn get_connections(&self) -> Result<serde_json::Value, String> {
let url = format!("{}/connections", self.mihomo_server);
let response = self.send_request(Method::GET, url, None).await?;
Ok(response)
}
pub async fn delete_connection(&self, id: &str) -> Result<(), String> {
let url = format!("{}/connections/{}", self.mihomo_server, id);
let response = self.send_request(Method::DELETE, url, None).await?;
if response["code"] == 204 {
Ok(())
} else {
Err(response["message"]
.as_str()
.unwrap_or("unknown error")
.to_string())
}
}
}

View File

@ -1,29 +1,57 @@
use std::sync::{Arc, Mutex}; use async_trait::async_trait;
use reqwest::header::HeaderMap; use hyper::Method;
use serde_json::Value;
use std::{error::Error, sync::Arc};
use tokio::sync::Mutex;
pub struct MihomoData { pub struct MihomoData {
pub(crate) proxies: serde_json::Value, pub(crate) proxies: serde_json::Value,
pub(crate) providers_proxies: serde_json::Value, pub(crate) providers_proxies: serde_json::Value,
} }
#[derive(Clone)] impl Default for MihomoData {
fn default() -> Self {
Self {
proxies: Value::Null,
providers_proxies: Value::Null,
}
}
}
pub type E = Box<dyn Error + Send + Sync>;
#[async_trait]
pub trait MihomoClient: Sized {
async fn set_data_proxies(&self, data: Value);
async fn set_data_providers_proxies(&self, data: Value);
async fn get_data_proxies(&self) -> Value;
async fn get_data_providers_proxies(&self) -> Value;
// async fn generate_unix_path(&self, path: &str) -> Uri;
async fn send_request(
&self,
path: &str,
method: Method,
body: Option<Value>,
) -> Result<Value, E>;
async fn get_version(&self) -> Result<Value, E>;
async fn is_mihomo_running(&self) -> Result<(), E>;
async fn put_configs_force(&self, clash_config_path: &str) -> Result<(), E>;
async fn patch_configs(&self, config: Value) -> Result<(), E>;
async fn refresh_proxies(&self) -> Result<&Self, E>;
async fn refresh_providers_proxies(&self) -> Result<&Self, E>;
async fn get_connections(&self) -> Result<Value, E>;
async fn delete_connections(&self, id: &str) -> Result<(), E>;
async fn test_proxy_delay(
&self,
name: &str,
test_url: Option<String>,
timeout: i32,
) -> Result<Value, E>;
}
use crate::platform::Client;
pub struct MihomoManager { pub struct MihomoManager {
pub(crate) mihomo_server: String, pub(super) socket_path: String,
pub(crate) data: Arc<Mutex<MihomoData>>, pub(super) client: Arc<Mutex<Client>>,
pub(crate) headers: HeaderMap, pub(super) data: Arc<Mutex<MihomoData>>,
} }
#[cfg(feature = "debug")]
impl Drop for MihomoData {
fn drop(&mut self) {
println!("Dropping MihomoData");
}
}
#[cfg(feature = "debug")]
impl Drop for MihomoManager {
fn drop(&mut self) {
println!("Dropping MihomoManager");
}
}

View File

@ -0,0 +1,4 @@
#[cfg(unix)] pub mod unix;
#[cfg(unix)] pub use unix::UnixClient as Client;
#[cfg(windows)] pub mod windows;
#[cfg(windows)] pub use windows::WindowsClient as Client;

View File

@ -0,0 +1,59 @@
use crate::model::E;
use http_body_util::{BodyExt, Full};
use hyper::{
Method, Request,
body::Bytes,
header::{HeaderName, HeaderValue},
};
use hyper_util::client::legacy::Client;
use hyperlocal::{UnixClientExt, Uri};
use serde_json::Value;
use std::sync::Arc;
use tokio::sync::Mutex;
pub struct UnixClient {
client: Arc<Mutex<Client<hyperlocal::UnixConnector, Full<Bytes>>>>,
}
impl UnixClient {
pub fn new() -> Self {
let client: Client<_, Full<Bytes>> = Client::unix();
Self {
client: Arc::new(Mutex::new(client)),
}
}
pub async fn generate_unix_path(&self, socket_path: &str, path: &str) -> Uri {
Uri::new(socket_path, path).into()
}
pub async fn send_request(
&self,
socket_path: String,
path: &str,
method: Method,
body: Option<Value>,
) -> Result<Value, E> {
let uri = self.generate_unix_path(socket_path.as_str(), path).await;
let mut request_builder = Request::builder().method(method).uri(uri);
let body_bytes = if let Some(body) = body {
request_builder = request_builder.header(
HeaderName::from_static("Content-Type"),
HeaderValue::from_static("application/json"),
);
Bytes::from(serde_json::to_vec(&body)?)
} else {
Bytes::new()
};
let request = request_builder.body(Full::new(body_bytes))?;
let response = self.client.lock().await.request(request).await?;
let body_bytes = response.into_body().collect().await?.to_bytes();
let json_value = serde_json::from_slice(&body_bytes)?;
Ok(json_value)
}
}

View File

@ -0,0 +1,129 @@
use crate::{model::E, sock};
use hyper::Method;
use serde_json::Value;
use tokio_util::codec::{Framed, LinesCodec};
use std::{sync::Arc, time::Duration};
use tokio::{
time::timeout,
sync::Mutex,
};
use futures::{SinkExt, StreamExt};
use tokio::net::windows::named_pipe::ClientOptions;
pub struct WindowsClient {
lock: Arc<Mutex<()>>,
}
impl WindowsClient {
pub fn new() -> Self {
Self {
lock: Arc::new(Mutex::new(())),
}
}
pub async fn send_request(
&self,
socket_path: String,
path: &str,
method: Method,
body: Option<Value>,
) -> Result<Value, E> {
// Acquire lock before opening pipe
// let _guard = self.lock.lock().await;
// Attempt to open the pipe with retry logic
let mut retries = 0;
let pipe = loop {
match ClientOptions::new().open(socket_path.clone()) {
Ok(pipe) => break pipe,
Err(e) if e.raw_os_error() == Some(231) && retries < 5 => {
retries += 1;
let delay = Duration::from_millis(200 * retries);
tokio::time::sleep(delay).await;
continue;
}
Err(e) => return Err(e.into()),
}
};
// Use a scope to ensure the pipe is dropped when done
let result = async {
let mut framed = Framed::new(pipe, LinesCodec::new());
// Build request
let mut request = format!(
"{} {} HTTP/1.1\r\nHost: localhost\r\nContent-Type: application/json\r\n",
method.as_str(),
path
);
if let Some(ref json_body) = body {
let body_str = json_body.to_string();
request += &format!("Content-Length: {}\r\n\r\n{}", body_str.len(), body_str);
} else {
request += "\r\n";
}
framed.send(request).await?;
// Parse headers
let mut headers_done = false;
let mut is_chunked = false;
while let Ok(Some(Ok(line))) = timeout(Duration::from_secs(5), framed.next()).await {
if line.is_empty() {
headers_done = true;
break;
}
if line.starts_with("HTTP/1.1 4") || line.starts_with("HTTP/1.1 5") {
return Err(format!("Server error: {}", line).into());
}
if line.eq_ignore_ascii_case("Transfer-Encoding: chunked") {
is_chunked = true;
}
}
if !headers_done {
return Err("Malformed response: no headers end".into());
}
let mut response_body = String::new();
if is_chunked {
// Handle chunked encoding
loop {
// Read chunk size line
let chunk_size_line = match timeout(Duration::from_secs(5), framed.next()).await {
Ok(Some(Ok(line))) => line,
_ => break,
};
let chunk_size = match usize::from_str_radix(chunk_size_line.trim(), 16) {
Ok(0) => break, // End of chunks
Ok(_) => (), // We don't actually need the size with LinesCodec
Err(_) => return Err("Invalid chunk size".into()),
};
// Read chunk data line
if let Ok(Some(Ok(data_line))) = timeout(Duration::from_secs(5), framed.next()).await {
response_body.push_str(&data_line);
}
// Skip trailing CRLF (empty line)
let _ = framed.next().await;
}
} else {
// Handle normal content
while let Ok(Some(Ok(line))) = timeout(Duration::from_secs(5), framed.next()).await {
response_body.push_str(&line);
}
}
serde_json::from_str(&response_body).map_err(|e| e.into())
}.await;
result
}
}

View File

@ -0,0 +1,125 @@
use crate::model::E;
use async_trait::async_trait;
use hyper::Method;
use serde_json::Value;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::platform::Client;
use crate::{
MihomoData,
model::{MihomoClient, MihomoManager},
};
impl MihomoManager {
pub fn new(socket_path: String) -> Self {
let client = Client::new();
Self {
socket_path,
client: Arc::new(Mutex::new(client)),
data: Arc::new(Mutex::new(MihomoData::default())),
}
}
}
#[async_trait]
impl MihomoClient for MihomoManager {
async fn set_data_proxies(&self, data: Value) {
self.data.lock().await.proxies = data;
}
async fn set_data_providers_proxies(&self, data: Value) {
self.data.lock().await.providers_proxies = data;
}
async fn get_data_proxies(&self) -> Value {
self.data.lock().await.proxies.clone()
}
async fn get_data_providers_proxies(&self) -> Value {
self.data.lock().await.providers_proxies.clone()
}
async fn send_request(
&self,
path: &str,
method: Method,
body: Option<Value>,
) -> Result<Value, E> {
let client = self.client.lock().await;
client.send_request(self.socket_path.clone(), path, method, body).await
}
async fn get_version(&self) -> Result<Value, E> {
let data = self.send_request("/version", Method::GET, None).await?;
Ok(data)
}
async fn is_mihomo_running(&self) -> Result<(), E> {
self.get_version().await?;
Ok(())
}
async fn put_configs_force(&self, clash_config_path: &str) -> Result<(), E> {
let body = serde_json::json!({
"path": clash_config_path
});
let _ = self
.send_request("/configs?force=true", Method::PUT, Some(body))
.await?;
Ok(())
}
async fn patch_configs(&self, config: Value) -> Result<(), E> {
let _ = self
.send_request("/configs", Method::PATCH, Some(config))
.await?;
Ok(())
}
async fn refresh_proxies(&self) -> Result<&Self, E> {
let data = self.send_request("/proxies", Method::GET, None).await?;
self.set_data_proxies(data).await;
Ok(self)
}
async fn refresh_providers_proxies(&self) -> Result<&Self, E> {
let data = self
.send_request("/providers/proxies", Method::GET, None)
.await?;
self.set_data_providers_proxies(data).await;
Ok(self)
}
async fn get_connections(&self) -> Result<Value, E> {
let data = self.send_request("/connections", Method::GET, None).await?;
Ok(data)
}
async fn delete_connections(&self, id: &str) -> Result<(), E> {
let _ = self
.send_request(&format!("/connections/{}", id), Method::DELETE, None)
.await?;
Ok(())
}
async fn test_proxy_delay(
&self,
name: &str,
test_url: Option<String>,
timeout: i32,
) -> Result<Value, E> {
let test_url = test_url.unwrap_or("http://cp.cloudflare.com/generate_204".to_string());
let data = self
.send_request(
&format!(
"/proxies/{}/delay?url={}&timeout={}",
name, test_url, timeout
),
Method::GET,
None,
)
.await?;
Ok(data)
}
}

View File

@ -1,29 +1,58 @@
use mihomo_api; use dotenv::dotenv;
use reqwest::header::HeaderMap; use mihomo_api::{self, model::MihomoClient};
use std::env;
#[test] lazy_static::lazy_static! {
fn test_mihomo_manager_init() { static ref LOCAL_SOCK: String = {
let manager = mihomo_api::MihomoManager::new("url".into(), HeaderMap::new()); dotenv().ok();
assert_eq!(manager.get_proxies(), serde_json::Value::Null);
assert_eq!(manager.get_providers_proxies(), serde_json::Value::Null); env::var("LOCAL_SOCK")
.expect("LOCAL_SOCK must be set in .env or environment variables")
.trim_matches('"')
.to_string()
};
}
#[tokio::test]
async fn test_env() {
assert_eq!(LOCAL_SOCK.to_string(), LOCAL_SOCK.to_string());
}
#[tokio::test]
async fn test_mihomo_manager_init() {
let manager = mihomo_api::MihomoManager::new(LOCAL_SOCK.to_string());
let proxies = manager.get_data_proxies().await;
let providers = manager.get_data_providers_proxies().await;
assert_eq!(proxies, serde_json::Value::Null);
assert_eq!(providers, serde_json::Value::Null);
}
#[tokio::test]
async fn test_get_version() {
let manager = mihomo_api::MihomoManager::new(LOCAL_SOCK.to_string());
let version = manager.get_version().await;
assert!(version.is_ok());
if let Ok(version) = version {
assert!(!version.get("version").is_none());
}
} }
#[tokio::test] #[tokio::test]
async fn test_refresh_proxies() { async fn test_refresh_proxies() {
let manager = mihomo_api::MihomoManager::new("http://127.0.0.1:9097".into(), HeaderMap::new()); let manager = mihomo_api::MihomoManager::new(LOCAL_SOCK.to_string());
let manager = manager.refresh_proxies().await.unwrap(); let manager = manager.refresh_proxies().await.unwrap();
let proxies = manager.get_proxies(); let proxies = manager.get_data_proxies().await;
let providers = manager.get_providers_proxies(); let providers = manager.get_data_providers_proxies().await;
assert_ne!(proxies, serde_json::Value::Null); assert_ne!(proxies, serde_json::Value::Null);
assert_eq!(providers, serde_json::Value::Null); assert_eq!(providers, serde_json::Value::Null);
} }
#[tokio::test] #[tokio::test]
async fn test_refresh_providers_proxies() { async fn test_refresh_providers_proxies() {
let manager = mihomo_api::MihomoManager::new("http://127.0.0.1:9097".into(), HeaderMap::new()); let manager = mihomo_api::MihomoManager::new(LOCAL_SOCK.to_string());
let manager = manager.refresh_providers_proxies().await.unwrap(); let manager = manager.refresh_providers_proxies().await.unwrap();
let proxies = manager.get_proxies(); let proxies = manager.get_data_proxies().await;
let providers = manager.get_providers_proxies(); let providers = manager.get_data_providers_proxies().await;
assert_eq!(proxies, serde_json::Value::Null); assert_eq!(proxies, serde_json::Value::Null);
assert_ne!(providers, serde_json::Value::Null); assert_ne!(providers, serde_json::Value::Null);
} }