ShamrockPrivate: 修改WebSocket发信逻辑

Signed-off-by: WhiteChi <whitechi73@outlook.com>
This commit is contained in:
WhiteChi 2023-11-02 08:22:59 +08:00
parent b76ef7efb3
commit dc2503b045
4 changed files with 17 additions and 15 deletions

View File

@ -2,19 +2,12 @@
package moe.fuqiuluo.shamrock.remote.service package moe.fuqiuluo.shamrock.remote.service
import com.tencent.qqnt.kernel.nativeinterface.MsgElement
import com.tencent.qqnt.kernel.nativeinterface.MsgRecord
import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import moe.fuqiuluo.shamrock.remote.service.api.WebSocketClientServlet import moe.fuqiuluo.shamrock.remote.service.api.WebSocketClientServlet
import moe.fuqiuluo.shamrock.remote.service.config.ShamrockConfig
import moe.fuqiuluo.shamrock.remote.service.data.push.* import moe.fuqiuluo.shamrock.remote.service.data.push.*
import moe.fuqiuluo.shamrock.tools.json
import moe.fuqiuluo.qqinterface.servlet.GroupSvc
import moe.fuqiuluo.qqinterface.servlet.TicketSvc
import moe.fuqiuluo.qqinterface.servlet.msg.convert.toSegments
import moe.fuqiuluo.shamrock.helper.Level import moe.fuqiuluo.shamrock.helper.Level
import moe.fuqiuluo.shamrock.helper.LogCenter import moe.fuqiuluo.shamrock.helper.LogCenter
import moe.fuqiuluo.shamrock.remote.service.api.GlobalEventTransmitter import moe.fuqiuluo.shamrock.remote.service.api.GlobalEventTransmitter

View File

@ -7,6 +7,8 @@ import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.serialization.encodeToString import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import moe.fuqiuluo.shamrock.remote.action.ActionManager import moe.fuqiuluo.shamrock.remote.action.ActionManager
@ -36,6 +38,8 @@ internal abstract class WebSocketClientServlet(
url: String, url: String,
wsHeaders: Map<String, String> wsHeaders: Map<String, String>
) : BaseTransmitServlet, WebSocketClient(URI(url), wsHeaders) { ) : BaseTransmitServlet, WebSocketClient(URI(url), wsHeaders) {
private val sendLock = Mutex()
override fun allowTransmit(): Boolean { override fun allowTransmit(): Boolean {
return ShamrockConfig.openWebSocketClient() return ShamrockConfig.openWebSocketClient()
} }
@ -89,10 +93,12 @@ internal abstract class WebSocketClientServlet(
cancelFlowJobs() cancelFlowJobs()
} }
protected inline fun <reified T> pushTo(body: T) { protected suspend inline fun <reified T> pushTo(body: T) {
if (!allowTransmit() || isClosed || isClosing) return if (!allowTransmit() || isClosed || isClosing) return
try { try {
send(GlobalJson.encodeToString(body)) sendLock.withLock {
send(GlobalJson.encodeToString(body))
}
} catch (e: Throwable) { } catch (e: Throwable) {
LogCenter.log("被动WS推送失败: ${e.stackTraceToString()}", Level.ERROR) LogCenter.log("被动WS推送失败: ${e.stackTraceToString()}", Level.ERROR)
} }

View File

@ -5,6 +5,8 @@ package moe.fuqiuluo.shamrock.remote.service.api
import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.serialization.encodeToString import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import moe.fuqiuluo.shamrock.remote.action.ActionManager import moe.fuqiuluo.shamrock.remote.action.ActionManager
@ -35,6 +37,7 @@ internal abstract class WebSocketTransmitServlet(
host:String, host:String,
port: Int port: Int
) : BaseTransmitServlet, WebSocketServer(InetSocketAddress(host, port)) { ) : BaseTransmitServlet, WebSocketServer(InetSocketAddress(host, port)) {
private val sendLock = Mutex()
protected val eventReceivers: MutableList<WebSocket> = Collections.synchronizedList(mutableListOf<WebSocket>()) protected val eventReceivers: MutableList<WebSocket> = Collections.synchronizedList(mutableListOf<WebSocket>())
override val address: String override val address: String
@ -113,10 +116,12 @@ internal abstract class WebSocketTransmitServlet(
initTransmitter() initTransmitter()
} }
protected inline fun <reified T> pushTo(body: T) { protected suspend inline fun <reified T> pushTo(body: T) {
if(!allowTransmit()) return if(!allowTransmit()) return
try { try {
broadcastTextEvent(GlobalJson.encodeToString(body)) sendLock.withLock {
broadcastTextEvent(GlobalJson.encodeToString(body))
}
} catch (e: Throwable) { } catch (e: Throwable) {
LogCenter.log("WS推送失败: ${e.stackTraceToString()}", Level.ERROR) LogCenter.log("WS推送失败: ${e.stackTraceToString()}", Level.ERROR)
} }

View File

@ -92,13 +92,11 @@ internal class InitRemoteService : IAction {
GlobalScope.launch { GlobalScope.launch {
try { try {
if (url.startsWith("ws://") || url.startsWith("wss://")) { if (url.startsWith("ws://") || url.startsWith("wss://")) {
var wsClient = WebSocketClientService(url, wsHeaders) val wsClient = WebSocketClientService(url, wsHeaders)
wsClient.connect() wsClient.connect()
timer(initialDelay = 5000L, period = 5000L) { timer(initialDelay = 5000L, period = 5000L) {
if (wsClient.isClosed || wsClient.isClosing) { if (wsClient.isClosed || wsClient.isClosing) {
wsClient.cancelFlowJobs() wsClient.reconnect()
wsClient = WebSocketClientService(url, wsHeaders)
wsClient.connect()
} }
} }
} else { } else {