Shamrock: 尝试修复重复Client连接 #187

This commit is contained in:
whitechi73 2024-01-17 04:15:24 +08:00
parent 25fe9fab37
commit 8d6d984849
3 changed files with 36 additions and 13 deletions

View File

@ -18,6 +18,11 @@ internal class WebSocketClientService(
) : WebSocketClientServlet(address, heartbeatInterval, wsHeaders) { ) : WebSocketClientServlet(address, heartbeatInterval, wsHeaders) {
private val eventJobList = mutableSetOf<Job>() private val eventJobList = mutableSetOf<Job>()
init {
startHeartbeatTimer()
initTransmitter()
}
override fun submitFlowJob(job: Job) { override fun submitFlowJob(job: Job) {
eventJobList.add(job) eventJobList.add(job)
} }

View File

@ -35,24 +35,22 @@ import java.net.URI
import kotlin.concurrent.timer import kotlin.concurrent.timer
internal abstract class WebSocketClientServlet( internal abstract class WebSocketClientServlet(
url: String, private val url: String,
private val heartbeatInterval: Long, private val heartbeatInterval: Long,
private val wsHeaders: Map<String, String> private val wsHeaders: Map<String, String>
) : BaseTransmitServlet, WebSocketClient(URI(url), wsHeaders) { ) : BaseTransmitServlet, WebSocketClient(URI(url), wsHeaders) {
init {
if (connectedClients.containsKey(url)) {
throw RuntimeException("WebSocketClient已存在: $url")
}
}
private val sendLock = Mutex() private val sendLock = Mutex()
override fun allowTransmit(): Boolean { override fun allowTransmit(): Boolean {
return ShamrockConfig.openWebSocketClient() return ShamrockConfig.openWebSocketClient()
} }
override fun onOpen(handshakedata: ServerHandshake?) {
LogCenter.log("WebSocketClient onOpen: ${handshakedata?.httpStatus}, ${handshakedata?.httpStatusMessage}")
startHeartbeatTimer()
pushMetaLifecycle()
initTransmitter()
}
override fun onMessage(message: String) { override fun onMessage(message: String) {
GlobalScope.launch { GlobalScope.launch {
handleMessage(message) handleMessage(message)
@ -84,6 +82,16 @@ internal abstract class WebSocketClientServlet(
respond?.let { send(it) } respond?.let { send(it) }
} }
override fun onOpen(handshakedata: ServerHandshake?) {
LogCenter.log("WebSocketClient onOpen: ${handshakedata?.httpStatus}, ${handshakedata?.httpStatusMessage}")
connectedClients[url] = this
//startHeartbeatTimer()
pushMetaLifecycle()
//initTransmitter()
}
override fun onClose(code: Int, reason: String?, remote: Boolean) { override fun onClose(code: Int, reason: String?, remote: Boolean) {
if (code == 403) { if (code == 403) {
if (wsHeaders.containsKey("authorization")) { if (wsHeaders.containsKey("authorization")) {
@ -95,11 +103,13 @@ internal abstract class WebSocketClientServlet(
} }
LogCenter.log("WebSocketClient onClose: $code, $reason, $remote") LogCenter.log("WebSocketClient onClose: $code, $reason, $remote")
cancelFlowJobs() cancelFlowJobs()
connectedClients.remove(url)
} }
override fun onError(ex: Exception?) { override fun onError(ex: Exception?) {
LogCenter.log("WebSocketClient onError: ${ex?.message}") LogCenter.log("WebSocketClient onError: ${ex?.message}")
cancelFlowJobs() cancelFlowJobs()
connectedClients.remove(url)
} }
protected suspend inline fun <reified T> pushTo(body: T) { protected suspend inline fun <reified T> pushTo(body: T) {
@ -113,14 +123,14 @@ internal abstract class WebSocketClientServlet(
} }
} }
private fun startHeartbeatTimer() { fun startHeartbeatTimer() {
if (heartbeatInterval <= 0) { if (heartbeatInterval <= 0) {
LogCenter.log("被动WebSocket心跳间隔为0不启动心跳", Level.WARN) LogCenter.log("被动WebSocket心跳间隔为0不启动心跳", Level.WARN)
return return
} }
timer( timer(
name = "heartbeat", name = "heartbeat",
initialDelay = 0, initialDelay = heartbeatInterval,
period = heartbeatInterval, period = heartbeatInterval,
) { ) {
if (isClosed || isClosing || !isOpen) { if (isClosed || isClosing || !isOpen) {
@ -143,7 +153,7 @@ internal abstract class WebSocketClientServlet(
status = "正常", status = "正常",
good = true good = true
), ),
interval = 1000L * 15 interval = heartbeatInterval
) )
) )
) )
@ -169,4 +179,8 @@ internal abstract class WebSocketClientServlet(
) )
} }
} }
companion object {
private val connectedClients = mutableMapOf<String, WebSocketClientServlet>()
}
} }

View File

@ -115,7 +115,11 @@ internal class InitRemoteService : IAction {
LogCenter.log("被动WebSocket地址不合法: $url", Level.ERROR) LogCenter.log("被动WebSocket地址不合法: $url", Level.ERROR)
} }
} catch (e: Throwable) { } catch (e: Throwable) {
LogCenter.log(e.stackTraceToString(), Level.ERROR) if (e is RuntimeException) {
LogCenter.log(e.message ?: e.stackTraceToString(), Level.ERROR)
} else {
LogCenter.log(e.stackTraceToString(), Level.ERROR)
}
} }
} }
} }