From c940aea1531e454e1afe365108bb11545de807d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E6=B1=A0?= Date: Wed, 21 Feb 2024 15:17:53 +0800 Subject: [PATCH] `Shamrock`: Reusable and restrictive coroutine context MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 白池 --- .../fuqiuluo/qqinterface/servlet/BaseSvc.kt | 17 ++++++--- .../remote/service/WebSocketClientService.kt | 19 +++++----- .../remote/service/WebSocketService.kt | 25 ++++++------- .../service/api/WebSocketClientServlet.kt | 29 ++++++++------- .../service/api/WebSocketTransmitServlet.kt | 35 +++++++++++-------- .../fuqiuluo/shamrock/utils/DownloadUtils.kt | 3 +- .../xposed/hooks/InitRemoteService.kt | 14 +++++--- 7 files changed, 79 insertions(+), 63 deletions(-) diff --git a/xposed/src/main/java/moe/fuqiuluo/qqinterface/servlet/BaseSvc.kt b/xposed/src/main/java/moe/fuqiuluo/qqinterface/servlet/BaseSvc.kt index 3e35eaf..53f9167 100644 --- a/xposed/src/main/java/moe/fuqiuluo/qqinterface/servlet/BaseSvc.kt +++ b/xposed/src/main/java/moe/fuqiuluo/qqinterface/servlet/BaseSvc.kt @@ -11,9 +11,10 @@ import io.ktor.utils.io.core.BytePacketBuilder import io.ktor.utils.io.core.readBytes import io.ktor.utils.io.core.writeFully import io.ktor.utils.io.core.writeInt +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withTimeoutOrNull @@ -28,10 +29,11 @@ import moe.fuqiuluo.shamrock.xposed.helper.internal.IPCRequest import protobuf.oidb.TrpcOidb import mqq.app.MobileQQ import tencent.im.oidb.oidb_sso +import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume internal abstract class BaseSvc { - companion object { + companion object Default: CoroutineScope { val currentUin: String get() = app.currentAccountUin @@ -46,7 +48,7 @@ internal abstract class BaseSvc { val seq = MsfCore.getNextSeq() val buffer = withTimeoutOrNull(timeout) { suspendCancellableCoroutine { continuation -> - GlobalScope.launch(Dispatchers.Default) { + launch(Dispatchers.Default) { DynamicReceiver.register(IPCRequest(cmd, seq) { val buffer = it.getByteArrayExtra("buffer")!! continuation.resume(buffer) @@ -75,7 +77,7 @@ internal abstract class BaseSvc { val seq = MsfCore.getNextSeq() val buffer = withTimeoutOrNull(timeout) { suspendCancellableCoroutine { continuation -> - GlobalScope.launch(Dispatchers.Default) { + launch(Dispatchers.Default) { DynamicReceiver.register(IPCRequest(cmd, seq) { val buffer = it.getByteArrayExtra("buffer")!! continuation.resume(buffer) @@ -143,6 +145,11 @@ internal abstract class BaseSvc { toServiceMsg.addAttribute("shamrock_seq", seq) app.sendToService(toServiceMsg) } + + @OptIn(ExperimentalCoroutinesApi::class) + override val coroutineContext: CoroutineContext by lazy { + Dispatchers.IO.limitedParallelism(12) + } } protected fun send(toServiceMsg: ToServiceMsg) { @@ -153,7 +160,7 @@ internal abstract class BaseSvc { val seq = MsfCore.getNextSeq() val buffer = withTimeoutOrNull(timeout) { suspendCancellableCoroutine { continuation -> - GlobalScope.launch(Dispatchers.Default) { + launch(Dispatchers.Default) { DynamicReceiver.register(IPCRequest(toServiceMsg.serviceCmd, seq) { val buffer = it.getByteArrayExtra("buffer")!! continuation.resume(buffer) diff --git a/xposed/src/main/java/moe/fuqiuluo/shamrock/remote/service/WebSocketClientService.kt b/xposed/src/main/java/moe/fuqiuluo/shamrock/remote/service/WebSocketClientService.kt index c191502..26f1f7c 100644 --- a/xposed/src/main/java/moe/fuqiuluo/shamrock/remote/service/WebSocketClientService.kt +++ b/xposed/src/main/java/moe/fuqiuluo/shamrock/remote/service/WebSocketClientService.kt @@ -1,15 +1,14 @@ -@file:OptIn(DelicateCoroutinesApi::class) - package moe.fuqiuluo.shamrock.remote.service import kotlinx.coroutines.DelicateCoroutinesApi -import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.Job import kotlinx.coroutines.launch import moe.fuqiuluo.shamrock.remote.service.api.WebSocketClientServlet import moe.fuqiuluo.shamrock.helper.Level import moe.fuqiuluo.shamrock.helper.LogCenter -import moe.fuqiuluo.shamrock.remote.service.api.GlobalEventTransmitter +import moe.fuqiuluo.shamrock.remote.service.api.GlobalEventTransmitter.onMessageEvent +import moe.fuqiuluo.shamrock.remote.service.api.GlobalEventTransmitter.onNoticeEvent +import moe.fuqiuluo.shamrock.remote.service.api.GlobalEventTransmitter.onRequestEvent internal class WebSocketClientService( override val address: String, @@ -27,18 +26,18 @@ internal class WebSocketClientService( } override fun init() { - subscribe(GlobalScope.launch { - GlobalEventTransmitter.onMessageEvent { (_, event) -> + subscribe(launch { + onMessageEvent { (_, event) -> pushTo(event) } }) - subscribe(GlobalScope.launch { - GlobalEventTransmitter.onNoticeEvent { event -> + subscribe(launch { + onNoticeEvent { event -> pushTo(event) } }) - subscribe(GlobalScope.launch { - GlobalEventTransmitter.onRequestEvent { event -> + subscribe(launch { + onRequestEvent { event -> pushTo(event) } }) diff --git a/xposed/src/main/java/moe/fuqiuluo/shamrock/remote/service/WebSocketService.kt b/xposed/src/main/java/moe/fuqiuluo/shamrock/remote/service/WebSocketService.kt index 1e0dcfb..b6cd734 100644 --- a/xposed/src/main/java/moe/fuqiuluo/shamrock/remote/service/WebSocketService.kt +++ b/xposed/src/main/java/moe/fuqiuluo/shamrock/remote/service/WebSocketService.kt @@ -3,7 +3,6 @@ package moe.fuqiuluo.shamrock.remote.service import kotlinx.coroutines.DelicateCoroutinesApi -import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.Job import kotlinx.coroutines.launch import moe.fuqiuluo.shamrock.helper.ErrorTokenException @@ -15,7 +14,9 @@ import moe.fuqiuluo.shamrock.remote.service.data.push.* import moe.fuqiuluo.shamrock.tools.ifNullOrEmpty import moe.fuqiuluo.shamrock.helper.Level import moe.fuqiuluo.shamrock.helper.LogCenter -import moe.fuqiuluo.shamrock.remote.service.api.GlobalEventTransmitter +import moe.fuqiuluo.shamrock.remote.service.api.GlobalEventTransmitter.onMessageEvent +import moe.fuqiuluo.shamrock.remote.service.api.GlobalEventTransmitter.onNoticeEvent +import moe.fuqiuluo.shamrock.remote.service.api.GlobalEventTransmitter.onRequestEvent import moe.fuqiuluo.shamrock.xposed.helper.AppRuntimeFetcher import org.java_websocket.WebSocket import org.java_websocket.handshake.ClientHandshake @@ -33,20 +34,14 @@ internal class WebSocketService( } override fun init() { - subscribe(GlobalScope.launch { - GlobalEventTransmitter.onMessageEvent { (_, event) -> - pushTo(event) - } + subscribe(launch { + onMessageEvent { (_, event) -> pushTo(event) } }) - subscribe(GlobalScope.launch { - GlobalEventTransmitter.onNoticeEvent { event -> - pushTo(event) - } + subscribe(launch { + onNoticeEvent { event -> pushTo(event) } }) - subscribe(GlobalScope.launch { - GlobalEventTransmitter.onRequestEvent { event -> - pushTo(event) - } + subscribe(launch { + onRequestEvent { event -> pushTo(event) } }) LogCenter.log("WebSocketService: 初始化服务", Level.WARN) } @@ -86,7 +81,7 @@ internal class WebSocketService( } private fun pushMetaLifecycle() { - GlobalScope.launch { + launch { val runtime = AppRuntimeFetcher.appRuntime pushTo(PushMetaEvent( time = System.currentTimeMillis() / 1000, diff --git a/xposed/src/main/java/moe/fuqiuluo/shamrock/remote/service/api/WebSocketClientServlet.kt b/xposed/src/main/java/moe/fuqiuluo/shamrock/remote/service/api/WebSocketClientServlet.kt index 09ca074..c7d90d5 100644 --- a/xposed/src/main/java/moe/fuqiuluo/shamrock/remote/service/api/WebSocketClientServlet.kt +++ b/xposed/src/main/java/moe/fuqiuluo/shamrock/remote/service/api/WebSocketClientServlet.kt @@ -2,11 +2,12 @@ package moe.fuqiuluo.shamrock.remote.service.api +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.DelicateCoroutinesApi -import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancel import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json import moe.fuqiuluo.shamrock.remote.action.ActionManager @@ -30,12 +31,13 @@ import org.java_websocket.handshake.ServerHandshake import java.lang.Exception import java.net.URI import kotlin.concurrent.timer +import kotlin.coroutines.CoroutineContext internal abstract class WebSocketClientServlet( private val url: String, private val heartbeatInterval: Long, private val wsHeaders: Map -) : BaseTransmitServlet, WebSocketClient(URI(url), wsHeaders) { +) : BaseTransmitServlet, WebSocketClient(URI(url), wsHeaders), CoroutineScope { init { if (connectedClients.containsKey(url)) { throw RuntimeException("WebSocketClient已存在: $url") @@ -43,14 +45,13 @@ internal abstract class WebSocketClientServlet( } private var firstOpen = true - private val sendLock = Mutex() override fun transmitAccess(): Boolean { return ShamrockConfig.openWebSocketClient() } override fun onMessage(message: String) { - GlobalScope.launch { + launch { handleMessage(message) } } @@ -85,7 +86,6 @@ internal abstract class WebSocketClientServlet( connectedClients[url] = this - //startHeartbeatTimer() pushMetaLifecycle() if (firstOpen) { firstOpen = false @@ -106,21 +106,21 @@ internal abstract class WebSocketClientServlet( } LogCenter.log("WebSocketClient onClose: $code, $reason, $remote") unsubscribe() + coroutineContext.cancel() connectedClients.remove(url) } override fun onError(ex: Exception?) { LogCenter.log("WebSocketClient onError: ${ex?.message}") unsubscribe() + coroutineContext.cancel() connectedClients.remove(url) } protected suspend inline fun pushTo(body: T) { if (!transmitAccess() || isClosed || isClosing) return try { - sendLock.withLock { - send(GlobalJson.encodeToString(body)) - } + send(GlobalJson.encodeToString(body)) } catch (e: Throwable) { LogCenter.log("被动WS推送失败: ${e.stackTraceToString()}", Level.ERROR) } @@ -142,8 +142,7 @@ internal abstract class WebSocketClientServlet( } val runtime = AppRuntimeFetcher.appRuntime LogCenter.log("WebSocketClient心跳: ${app.longAccountUin}", Level.DEBUG) - send( - GlobalJson.encodeToString( + send(GlobalJson.encodeToString( PushMetaEvent( time = System.currentTimeMillis() / 1000, selfId = app.longAccountUin, @@ -164,7 +163,7 @@ internal abstract class WebSocketClientServlet( } private fun pushMetaLifecycle() { - GlobalScope.launch { + launch { val runtime = AppRuntimeFetcher.appRuntime val curUin = runtime.currentAccountUin pushTo( @@ -183,6 +182,10 @@ internal abstract class WebSocketClientServlet( } } + @OptIn(ExperimentalCoroutinesApi::class) + override val coroutineContext: CoroutineContext = + Dispatchers.IO.limitedParallelism(20) + companion object { private val connectedClients = mutableMapOf() } diff --git a/xposed/src/main/java/moe/fuqiuluo/shamrock/remote/service/api/WebSocketTransmitServlet.kt b/xposed/src/main/java/moe/fuqiuluo/shamrock/remote/service/api/WebSocketTransmitServlet.kt index 80d8ee7..1fb98d0 100644 --- a/xposed/src/main/java/moe/fuqiuluo/shamrock/remote/service/api/WebSocketTransmitServlet.kt +++ b/xposed/src/main/java/moe/fuqiuluo/shamrock/remote/service/api/WebSocketTransmitServlet.kt @@ -1,12 +1,13 @@ -@file:OptIn(DelicateCoroutinesApi::class) +@file:OptIn(DelicateCoroutinesApi::class, ExperimentalCoroutinesApi::class) package moe.fuqiuluo.shamrock.remote.service.api +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.DelicateCoroutinesApi -import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancel import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json import moe.fuqiuluo.shamrock.remote.action.ActionManager @@ -31,22 +32,23 @@ import org.java_websocket.server.WebSocketServer import java.net.InetSocketAddress import java.net.URI import java.util.Collections +import java.util.Timer import kotlin.concurrent.timer +import kotlin.coroutines.CoroutineContext internal abstract class WebSocketTransmitServlet( host:String, port: Int, protected val heartbeatInterval: Long, -) : BaseTransmitServlet, WebSocketServer(InetSocketAddress(host, port)) { - private val sendLock = Mutex() +) : BaseTransmitServlet, WebSocketServer(InetSocketAddress(host, port)), CoroutineScope { + private lateinit var heartbeatTask: Timer protected val eventReceivers: MutableList = Collections.synchronizedList(mutableListOf()) init { connectionLostTimeout = 0 } - override val address: String - get() = "-" + override val address: String = "-" override fun transmitAccess(): Boolean { return ShamrockConfig.openWebSocket() @@ -62,7 +64,7 @@ internal abstract class WebSocketTransmitServlet( init { if (heartbeatInterval > 0) { - timer("heartbeat", true, 0, heartbeatInterval) { + heartbeatTask = timer("heartbeat", true, 0, heartbeatInterval) { val runtime = AppRuntimeFetcher.appRuntime val curUin = runtime.currentAccountUin LogCenter.log("WebSocket心跳: $curUin", Level.DEBUG) @@ -104,7 +106,7 @@ internal abstract class WebSocketTransmitServlet( override fun onMessage(conn: WebSocket, message: String) { val path = URI.create(conn.resourceDescriptor).path - GlobalScope.launch { + launch { onHandleAction(conn, message, path) } } @@ -130,6 +132,10 @@ internal abstract class WebSocketTransmitServlet( override fun onError(conn: WebSocket, ex: Exception?) { LogCenter.log("WSServer Error: " + ex?.stackTraceToString(), Level.ERROR) unsubscribe() + coroutineContext.cancel() + if (::heartbeatTask.isInitialized) { + heartbeatTask.cancel() + } } override fun onStart() { @@ -137,14 +143,15 @@ internal abstract class WebSocketTransmitServlet( init() } - protected suspend inline fun pushTo(body: T) { + protected inline fun pushTo(body: T) { if(!transmitAccess()) return try { - sendLock.withLock { - broadcastTextEvent(GlobalJson.encodeToString(body)) - } + broadcastTextEvent(GlobalJson.encodeToString(body)) } catch (e: Throwable) { LogCenter.log("WS推送失败: ${e.stackTraceToString()}", Level.ERROR) } } + + override val coroutineContext: CoroutineContext = + Dispatchers.IO.limitedParallelism(40) } \ No newline at end of file diff --git a/xposed/src/main/java/moe/fuqiuluo/shamrock/utils/DownloadUtils.kt b/xposed/src/main/java/moe/fuqiuluo/shamrock/utils/DownloadUtils.kt index f913ecb..1cdaf5a 100644 --- a/xposed/src/main/java/moe/fuqiuluo/shamrock/utils/DownloadUtils.kt +++ b/xposed/src/main/java/moe/fuqiuluo/shamrock/utils/DownloadUtils.kt @@ -22,6 +22,7 @@ import java.io.RandomAccessFile import java.net.HttpURLConnection import java.net.URL import kotlin.math.roundToInt +import kotlin.time.Duration.Companion.minutes object DownloadUtils { private const val MAX_THREAD = 4 @@ -71,7 +72,7 @@ object DownloadUtils { } processed += blockSize } - withTimeoutOrNull(60000L) { + withTimeoutOrNull(1.minutes) { while (progress.value < contentLength) { if(progress.addAndGet(channel.receive()) >= contentLength) { break diff --git a/xposed/src/main/java/moe/fuqiuluo/shamrock/xposed/hooks/InitRemoteService.kt b/xposed/src/main/java/moe/fuqiuluo/shamrock/xposed/hooks/InitRemoteService.kt index 54d09d9..3de2ca2 100644 --- a/xposed/src/main/java/moe/fuqiuluo/shamrock/xposed/hooks/InitRemoteService.kt +++ b/xposed/src/main/java/moe/fuqiuluo/shamrock/xposed/hooks/InitRemoteService.kt @@ -5,6 +5,7 @@ package moe.fuqiuluo.shamrock.xposed.hooks import android.content.Context import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.delay import kotlinx.coroutines.launch import moe.fuqiuluo.shamrock.remote.service.WebSocketClientService import moe.fuqiuluo.shamrock.remote.service.WebSocketService @@ -20,12 +21,12 @@ import moe.fuqiuluo.symbols.Process import moe.fuqiuluo.symbols.XposedHook import mqq.app.MobileQQ import kotlin.concurrent.timer +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds @XposedHook(Process.MAIN, priority = 10) internal class InitRemoteService : IAction { override fun invoke(ctx: Context) { - //if (!PlatformUtils.isMainProcess()) return - GlobalScope.launch { try { HTTPServer.start(ShamrockConfig.getPort()) @@ -109,9 +110,12 @@ internal class InitRemoteService : IAction { if (url.startsWith("ws://") || url.startsWith("wss://")) { val wsClient = WebSocketClientService(url, interval, wsHeaders) wsClient.connect() - timer(initialDelay = 5000L, period = 5000L) { - if (wsClient.isClosed || wsClient.isClosing) { - wsClient.reconnect() + wsClient.launch { + while (true) { + delay(5.seconds) + if (wsClient.isClosed || wsClient.isClosing) { + wsClient.reconnect() + } } } } else {