Shamrock: typo BaseTransmitServlet.kt

Signed-off-by: 白池 <whitechi73@outlook.com>
This commit is contained in:
白池 2024-02-19 11:30:22 +08:00
parent 1c65aab673
commit c70f3eabfe
8 changed files with 45 additions and 51 deletions

View File

@ -26,34 +26,33 @@ import moe.fuqiuluo.shamrock.remote.action.handlers.QuickOperation.quicklyReply
import moe.fuqiuluo.shamrock.remote.service.api.GlobalEventTransmitter import moe.fuqiuluo.shamrock.remote.service.api.GlobalEventTransmitter
internal object HttpService: HttpTransmitServlet() { internal object HttpService: HttpTransmitServlet() {
private val jobList = arrayListOf<Job>() private val subscribes = arrayListOf<Job>()
override fun submitFlowJob(job: Job) { override fun subscribe(job: Job) {
// HTTP 回调不会触发断连无需释放之前的JOB subscribes.add(job)
jobList.add(job)
} }
override fun cancelFlowJobs() { override fun unsubscribe() {
jobList.removeIf { subscribes.removeIf {
it.cancel() it.cancel()
return@removeIf true return@removeIf true
} }
} }
override fun initTransmitter() { override fun init() {
if (jobList.isNotEmpty()) return if (subscribes.isNotEmpty()) return
submitFlowJob(GlobalScope.launch { subscribe(GlobalScope.launch {
GlobalEventTransmitter.onMessageEvent { (record, event) -> GlobalEventTransmitter.onMessageEvent { (record, event) ->
val respond = pushTo(event) ?: return@onMessageEvent val respond = pushTo(event) ?: return@onMessageEvent
handleQuicklyReply(record, event.messageId, respond.bodyAsText()) handleQuicklyReply(record, event.messageId, respond.bodyAsText())
} }
}) })
submitFlowJob(GlobalScope.launch { subscribe(GlobalScope.launch {
GlobalEventTransmitter.onNoticeEvent { event -> GlobalEventTransmitter.onNoticeEvent { event ->
pushTo(event) pushTo(event)
} }
}) })
submitFlowJob(GlobalScope.launch { subscribe(GlobalScope.launch {
GlobalEventTransmitter.onRequestEvent { GlobalEventTransmitter.onRequestEvent {
pushTo(it) pushTo(it)
} }

View File

@ -16,28 +16,28 @@ internal class WebSocketClientService(
heartbeatInterval: Long, heartbeatInterval: Long,
wsHeaders: Map<String, String> wsHeaders: Map<String, String>
) : WebSocketClientServlet(address, heartbeatInterval, wsHeaders) { ) : WebSocketClientServlet(address, heartbeatInterval, wsHeaders) {
private val eventJobList = mutableSetOf<Job>() private val subscribes = mutableSetOf<Job>()
init { init {
startHeartbeatTimer() startHeartbeatTimer()
} }
override fun submitFlowJob(job: Job) { override fun subscribe(job: Job) {
eventJobList.add(job) subscribes.add(job)
} }
override fun initTransmitter() { override fun init() {
submitFlowJob(GlobalScope.launch { subscribe(GlobalScope.launch {
GlobalEventTransmitter.onMessageEvent { (_, event) -> GlobalEventTransmitter.onMessageEvent { (_, event) ->
pushTo(event) pushTo(event)
} }
}) })
submitFlowJob(GlobalScope.launch { subscribe(GlobalScope.launch {
GlobalEventTransmitter.onNoticeEvent { event -> GlobalEventTransmitter.onNoticeEvent { event ->
pushTo(event) pushTo(event)
} }
}) })
submitFlowJob(GlobalScope.launch { subscribe(GlobalScope.launch {
GlobalEventTransmitter.onRequestEvent { event -> GlobalEventTransmitter.onRequestEvent { event ->
pushTo(event) pushTo(event)
} }
@ -45,8 +45,8 @@ internal class WebSocketClientService(
LogCenter.log("WebSocketClientService: 初始化服务", Level.WARN) LogCenter.log("WebSocketClientService: 初始化服务", Level.WARN)
} }
override fun cancelFlowJobs() { override fun unsubscribe() {
eventJobList.removeIf { job -> subscribes.removeIf { job ->
job.cancel() job.cancel()
return@removeIf true return@removeIf true
} }

View File

@ -26,24 +26,24 @@ internal class WebSocketService(
port: Int, port: Int,
heartbeatInterval: Long, heartbeatInterval: Long,
): WebSocketTransmitServlet(host, port, heartbeatInterval) { ): WebSocketTransmitServlet(host, port, heartbeatInterval) {
private val eventJobList = mutableSetOf<Job>() private val subscribes = mutableSetOf<Job>()
override fun submitFlowJob(job: Job) { override fun subscribe(job: Job) {
eventJobList.add(job) subscribes.add(job)
} }
override fun initTransmitter() { override fun init() {
submitFlowJob(GlobalScope.launch { subscribe(GlobalScope.launch {
GlobalEventTransmitter.onMessageEvent { (_, event) -> GlobalEventTransmitter.onMessageEvent { (_, event) ->
pushTo(event) pushTo(event)
} }
}) })
submitFlowJob(GlobalScope.launch { subscribe(GlobalScope.launch {
GlobalEventTransmitter.onNoticeEvent { event -> GlobalEventTransmitter.onNoticeEvent { event ->
pushTo(event) pushTo(event)
} }
}) })
submitFlowJob(GlobalScope.launch { subscribe(GlobalScope.launch {
GlobalEventTransmitter.onRequestEvent { event -> GlobalEventTransmitter.onRequestEvent { event ->
pushTo(event) pushTo(event)
} }
@ -51,8 +51,8 @@ internal class WebSocketService(
LogCenter.log("WebSocketService: 初始化服务", Level.WARN) LogCenter.log("WebSocketService: 初始化服务", Level.WARN)
} }
override fun cancelFlowJobs() { override fun unsubscribe() {
eventJobList.removeIf { job -> subscribes.removeIf { job ->
job.cancel() job.cancel()
return@removeIf true return@removeIf true
} }

View File

@ -1,24 +1,20 @@
package moe.fuqiuluo.shamrock.remote.service.api package moe.fuqiuluo.shamrock.remote.service.api
import com.tencent.mobileqq.app.QQAppInterface import com.tencent.mobileqq.app.QQAppInterface
import com.tencent.qqnt.kernel.nativeinterface.MsgElement
import com.tencent.qqnt.kernel.nativeinterface.MsgRecord
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import moe.fuqiuluo.shamrock.remote.service.data.push.NoticeSubType
import moe.fuqiuluo.shamrock.remote.service.data.push.NoticeType
import moe.fuqiuluo.shamrock.xposed.helper.AppRuntimeFetcher import moe.fuqiuluo.shamrock.xposed.helper.AppRuntimeFetcher
import oicq.wlogin_sdk.tools.MD5 import oicq.wlogin_sdk.tools.MD5
internal interface BaseTransmitServlet { internal interface BaseTransmitServlet {
val address: String val address: String
fun allowTransmit(): Boolean fun transmitAccess(): Boolean
fun submitFlowJob(job: Job) fun subscribe(job: Job)
fun cancelFlowJobs() fun unsubscribe()
fun initTransmitter() fun init()
val app: QQAppInterface val app: QQAppInterface
get() = AppRuntimeFetcher.appRuntime as QQAppInterface get() = AppRuntimeFetcher.appRuntime as QQAppInterface

View File

@ -8,7 +8,6 @@ import io.ktor.client.request.setBody
import io.ktor.client.statement.HttpResponse import io.ktor.client.statement.HttpResponse
import io.ktor.http.ContentType import io.ktor.http.ContentType
import io.ktor.http.contentType import io.ktor.http.contentType
import kotlinx.coroutines.Job
import moe.fuqiuluo.shamrock.remote.service.config.ShamrockConfig import moe.fuqiuluo.shamrock.remote.service.config.ShamrockConfig
import moe.fuqiuluo.shamrock.tools.GlobalClient import moe.fuqiuluo.shamrock.tools.GlobalClient
import moe.fuqiuluo.shamrock.helper.Level import moe.fuqiuluo.shamrock.helper.Level
@ -22,12 +21,12 @@ import java.net.SocketException
internal abstract class HttpTransmitServlet : BaseTransmitServlet { internal abstract class HttpTransmitServlet : BaseTransmitServlet {
override val address: String by lazy { ShamrockConfig.getWebHookAddress() } override val address: String by lazy { ShamrockConfig.getWebHookAddress() }
override fun allowTransmit(): Boolean { override fun transmitAccess(): Boolean {
return ShamrockConfig.allowWebHook() return ShamrockConfig.allowWebHook()
} }
protected suspend inline fun <reified T> pushTo(body: T): HttpResponse? { protected suspend inline fun <reified T> pushTo(body: T): HttpResponse? {
if (!allowTransmit()) return null if (!transmitAccess()) return null
try { try {
if (address.startsWith("http://") || address.startsWith("https://")) { if (address.startsWith("http://") || address.startsWith("https://")) {
val response = GlobalClient.post(address) { val response = GlobalClient.post(address) {

View File

@ -45,7 +45,7 @@ internal abstract class WebSocketClientServlet(
private var firstOpen = true private var firstOpen = true
private val sendLock = Mutex() private val sendLock = Mutex()
override fun allowTransmit(): Boolean { override fun transmitAccess(): Boolean {
return ShamrockConfig.openWebSocketClient() return ShamrockConfig.openWebSocketClient()
} }
@ -90,9 +90,9 @@ internal abstract class WebSocketClientServlet(
if (firstOpen) { if (firstOpen) {
firstOpen = false firstOpen = false
} else { } else {
cancelFlowJobs() unsubscribe()
} }
initTransmitter() init()
} }
override fun onClose(code: Int, reason: String?, remote: Boolean) { override fun onClose(code: Int, reason: String?, remote: Boolean) {
@ -105,18 +105,18 @@ internal abstract class WebSocketClientServlet(
} }
} }
LogCenter.log("WebSocketClient onClose: $code, $reason, $remote") LogCenter.log("WebSocketClient onClose: $code, $reason, $remote")
cancelFlowJobs() unsubscribe()
connectedClients.remove(url) connectedClients.remove(url)
} }
override fun onError(ex: Exception?) { override fun onError(ex: Exception?) {
LogCenter.log("WebSocketClient onError: ${ex?.message}") LogCenter.log("WebSocketClient onError: ${ex?.message}")
cancelFlowJobs() unsubscribe()
connectedClients.remove(url) connectedClients.remove(url)
} }
protected suspend inline fun <reified T> pushTo(body: T) { protected suspend inline fun <reified T> pushTo(body: T) {
if (!allowTransmit() || isClosed || isClosing) return if (!transmitAccess() || isClosed || isClosing) return
try { try {
sendLock.withLock { sendLock.withLock {
send(GlobalJson.encodeToString(body)) send(GlobalJson.encodeToString(body))

View File

@ -48,7 +48,7 @@ internal abstract class WebSocketTransmitServlet(
override val address: String override val address: String
get() = "-" get() = "-"
override fun allowTransmit(): Boolean { override fun transmitAccess(): Boolean {
return ShamrockConfig.openWebSocket() return ShamrockConfig.openWebSocket()
} }
@ -129,16 +129,16 @@ internal abstract class WebSocketTransmitServlet(
override fun onError(conn: WebSocket, ex: Exception?) { override fun onError(conn: WebSocket, ex: Exception?) {
LogCenter.log("WSServer Error: " + ex?.stackTraceToString(), Level.ERROR) LogCenter.log("WSServer Error: " + ex?.stackTraceToString(), Level.ERROR)
cancelFlowJobs() unsubscribe()
} }
override fun onStart() { override fun onStart() {
LogCenter.log("WSServer start running on ws://${getAddress()}!") LogCenter.log("WSServer start running on ws://${getAddress()}!")
initTransmitter() init()
} }
protected suspend inline fun <reified T> pushTo(body: T) { protected suspend inline fun <reified T> pushTo(body: T) {
if(!allowTransmit()) return if(!transmitAccess()) return
try { try {
sendLock.withLock { sendLock.withLock {
broadcastTextEvent(GlobalJson.encodeToString(body)) broadcastTextEvent(GlobalJson.encodeToString(body))

View File

@ -38,7 +38,7 @@ internal class InitRemoteService : IAction {
if (ShamrockConfig.allowWebHook()) { if (ShamrockConfig.allowWebHook()) {
HttpService.initTransmitter() HttpService.init()
} }
val runtime = AppRuntimeFetcher.appRuntime val runtime = AppRuntimeFetcher.appRuntime