Shamrock: 修复推送器推送错误

Signed-off-by: WhiteChi <whitechi73@outlook.com>
This commit is contained in:
WhiteChi 2023-10-28 11:27:49 +08:00
parent debcb4f192
commit 47133101bb
6 changed files with 49 additions and 40 deletions

View File

@ -30,12 +30,12 @@ internal class WebSocketClientService(
} }
override fun initTransmitter() { override fun initTransmitter() {
HttpService.submitFlowJob(GlobalScope.launch { submitFlowJob(GlobalScope.launch {
GlobalEventTransmitter.onMessageEvent { (_, event) -> GlobalEventTransmitter.onMessageEvent { (_, event) ->
pushTo(event) pushTo(event)
} }
}) })
HttpService.submitFlowJob(GlobalScope.launch { submitFlowJob(GlobalScope.launch {
GlobalEventTransmitter.onNoticeEvent { event -> GlobalEventTransmitter.onNoticeEvent { event ->
pushTo(event) pushTo(event)
} }

View File

@ -29,12 +29,12 @@ internal class WebSocketService(port: Int): WebSocketTransmitServlet(port) {
} }
override fun initTransmitter() { override fun initTransmitter() {
HttpService.submitFlowJob(GlobalScope.launch { submitFlowJob(GlobalScope.launch {
GlobalEventTransmitter.onMessageEvent { (_, event) -> GlobalEventTransmitter.onMessageEvent { (_, event) ->
pushTo(event) pushTo(event)
} }
}) })
HttpService.submitFlowJob(GlobalScope.launch { submitFlowJob(GlobalScope.launch {
GlobalEventTransmitter.onNoticeEvent { event -> GlobalEventTransmitter.onNoticeEvent { event ->
pushTo(event) pushTo(event)
} }
@ -71,7 +71,7 @@ internal class WebSocketService(port: Int): WebSocketTransmitServlet(port) {
pushMetaLifecycle() pushMetaLifecycle()
eventReceivers.add(conn) eventReceivers.add(conn)
} }
LogCenter.log({ "WSServer连接(${conn.remoteSocketAddress.address.hostAddress}:${conn.remoteSocketAddress.port}$path)" }, Level.DEBUG) LogCenter.log({ "WSServer连接(${conn.remoteSocketAddress.address.hostAddress}:${conn.remoteSocketAddress.port}$path)" }, Level.WARN)
} }
private fun pushMetaLifecycle() { private fun pushMetaLifecycle() {

View File

@ -7,7 +7,6 @@ import kotlinx.coroutines.flow.MutableSharedFlow
import moe.fuqiuluo.qqinterface.servlet.BaseSvc import moe.fuqiuluo.qqinterface.servlet.BaseSvc
import moe.fuqiuluo.qqinterface.servlet.GroupSvc import moe.fuqiuluo.qqinterface.servlet.GroupSvc
import moe.fuqiuluo.qqinterface.servlet.msg.convert.toSegments import moe.fuqiuluo.qqinterface.servlet.msg.convert.toSegments
import moe.fuqiuluo.shamrock.remote.service.HttpService
import moe.fuqiuluo.shamrock.remote.service.config.ShamrockConfig import moe.fuqiuluo.shamrock.remote.service.config.ShamrockConfig
import moe.fuqiuluo.shamrock.remote.service.data.push.GroupFileMsg import moe.fuqiuluo.shamrock.remote.service.data.push.GroupFileMsg
import moe.fuqiuluo.shamrock.remote.service.data.push.MemberRole import moe.fuqiuluo.shamrock.remote.service.data.push.MemberRole
@ -32,9 +31,9 @@ internal object GlobalEventTransmitter: BaseSvc() {
MutableSharedFlow<NoticeEvent>() MutableSharedFlow<NoticeEvent>()
} }
private fun pushNotice(noticeEvent: NoticeEvent) = noticeEventFlow.tryEmit(noticeEvent) private suspend fun pushNotice(noticeEvent: NoticeEvent) = noticeEventFlow.emit(noticeEvent)
private fun transMessageEvent(record: MsgRecord, message: MessageEvent) = messageEventFlow.tryEmit(record to message) private suspend fun transMessageEvent(record: MsgRecord, message: MessageEvent) = messageEventFlow.emit(record to message)
/** /**
* 消息 手淫器 * 消息 手淫器
@ -51,7 +50,7 @@ internal object GlobalEventTransmitter: BaseSvc() {
postType: PostType = PostType.Msg postType: PostType = PostType.Msg
): Boolean { ): Boolean {
val uin = app.longAccountUin val uin = app.longAccountUin
return transMessageEvent(record, transMessageEvent(record,
MessageEvent( MessageEvent(
time = record.msgTime, time = record.msgTime,
selfId = uin, selfId = uin,
@ -82,6 +81,7 @@ internal object GlobalEventTransmitter: BaseSvc() {
) )
) )
) )
return true
} }
/** /**
@ -95,7 +95,7 @@ internal object GlobalEventTransmitter: BaseSvc() {
postType: PostType = PostType.Msg postType: PostType = PostType.Msg
): Boolean { ): Boolean {
val uin = app.longAccountUin val uin = app.longAccountUin
return transMessageEvent(record, transMessageEvent(record,
MessageEvent( MessageEvent(
time = record.msgTime, time = record.msgTime,
selfId = uin, selfId = uin,
@ -116,16 +116,13 @@ internal object GlobalEventTransmitter: BaseSvc() {
userId = record.senderUin, userId = record.senderUin,
nickname = record.sendNickName, nickname = record.sendNickName,
card = record.sendMemberName, card = record.sendMemberName,
role = when (record.senderUin) { role = MemberRole.Member,
GroupSvc.getOwner(record.peerUin.toString()) -> MemberRole.Owner
in GroupSvc.getAdminList(record.peerUin.toString()) -> MemberRole.Admin
else -> MemberRole.Member
},
title = "", title = "",
level = "", level = "",
) )
) )
) )
return true
} }
} }
@ -136,7 +133,7 @@ internal object GlobalEventTransmitter: BaseSvc() {
/** /**
* 推送私聊文件事件 * 推送私聊文件事件
*/ */
fun transPrivateFileEvent( suspend fun transPrivateFileEvent(
msgTime: Long, msgTime: Long,
userId: Long, userId: Long,
fileId: String, fileId: String,
@ -146,7 +143,7 @@ internal object GlobalEventTransmitter: BaseSvc() {
expireTime: Long, expireTime: Long,
url: String url: String
): Boolean { ): Boolean {
return pushNotice(NoticeEvent( pushNotice(NoticeEvent(
time = msgTime, time = msgTime,
selfId = app.longAccountUin, selfId = app.longAccountUin,
postType = PostType.Notice, postType = PostType.Notice,
@ -163,12 +160,13 @@ internal object GlobalEventTransmitter: BaseSvc() {
expire = expireTime expire = expireTime
) )
)) ))
return true
} }
/** /**
* 推送私聊文件事件 * 推送私聊文件事件
*/ */
fun transGroupFileEvent( suspend fun transGroupFileEvent(
msgTime: Long, msgTime: Long,
userId: Long, userId: Long,
groupId: Long, groupId: Long,
@ -178,7 +176,7 @@ internal object GlobalEventTransmitter: BaseSvc() {
bizId: Int, bizId: Int,
url: String url: String
): Boolean { ): Boolean {
return pushNotice(NoticeEvent( pushNotice(NoticeEvent(
time = msgTime, time = msgTime,
selfId = app.longAccountUin, selfId = app.longAccountUin,
postType = PostType.Notice, postType = PostType.Notice,
@ -194,6 +192,7 @@ internal object GlobalEventTransmitter: BaseSvc() {
url = url url = url
) )
)) ))
return true
} }
} }
@ -201,8 +200,8 @@ internal object GlobalEventTransmitter: BaseSvc() {
* 群聊通知 通知器 * 群聊通知 通知器
*/ */
object GroupNoticeTransmitter { object GroupNoticeTransmitter {
fun transGroupPoke(time: Long, operation: Long, target: Long, groupCode: Long): Boolean { suspend fun transGroupPoke(time: Long, operation: Long, target: Long, groupCode: Long): Boolean {
return pushNotice(NoticeEvent( pushNotice(NoticeEvent(
time = time, time = time,
selfId = app.longAccountUin, selfId = app.longAccountUin,
postType = PostType.Notice, postType = PostType.Notice,
@ -213,9 +212,10 @@ internal object GlobalEventTransmitter: BaseSvc() {
groupId = groupCode, groupId = groupCode,
target = target target = target
)) ))
return true
} }
fun transGroupMemberNumChanged( suspend fun transGroupMemberNumChanged(
time: Long, time: Long,
target: Long, target: Long,
groupCode: Long, groupCode: Long,
@ -223,7 +223,7 @@ internal object GlobalEventTransmitter: BaseSvc() {
noticeType: NoticeType, noticeType: NoticeType,
noticeSubType: NoticeSubType noticeSubType: NoticeSubType
): Boolean { ): Boolean {
return pushNotice(NoticeEvent( pushNotice(NoticeEvent(
time = time, time = time,
selfId = app.longAccountUin, selfId = app.longAccountUin,
postType = PostType.Notice, postType = PostType.Notice,
@ -235,15 +235,16 @@ internal object GlobalEventTransmitter: BaseSvc() {
target = target, target = target,
groupId = groupCode groupId = groupCode
)) ))
return true
} }
fun transGroupAdminChanged( suspend fun transGroupAdminChanged(
msgTime: Long, msgTime: Long,
target: Long, target: Long,
groupCode: Long, groupCode: Long,
setAdmin: Boolean setAdmin: Boolean
): Boolean { ): Boolean {
return pushNotice(NoticeEvent( pushNotice(NoticeEvent(
time = msgTime, time = msgTime,
selfId = app.longAccountUin, selfId = app.longAccountUin,
postType = PostType.Notice, postType = PostType.Notice,
@ -253,16 +254,17 @@ internal object GlobalEventTransmitter: BaseSvc() {
target = target, target = target,
groupId = groupCode groupId = groupCode
)) ))
return true
} }
fun transGroupBan( suspend fun transGroupBan(
msgTime: Long, msgTime: Long,
operation: Long, operation: Long,
target: Long, target: Long,
groupCode: Long, groupCode: Long,
duration: Int duration: Int
): Boolean { ): Boolean {
return pushNotice(NoticeEvent( pushNotice(NoticeEvent(
time = msgTime, time = msgTime,
selfId = app.longAccountUin, selfId = app.longAccountUin,
postType = PostType.Notice, postType = PostType.Notice,
@ -275,9 +277,10 @@ internal object GlobalEventTransmitter: BaseSvc() {
groupId = groupCode, groupId = groupCode,
duration = duration duration = duration
)) ))
return true
} }
fun transGroupMsgRecall( suspend fun transGroupMsgRecall(
time: Long, time: Long,
operator: Long, operator: Long,
target: Long, target: Long,
@ -285,7 +288,7 @@ internal object GlobalEventTransmitter: BaseSvc() {
msgHash: Int, msgHash: Int,
tipText: String tipText: String
): Boolean { ): Boolean {
return pushNotice(NoticeEvent( pushNotice(NoticeEvent(
time = time, time = time,
selfId = app.longAccountUin, selfId = app.longAccountUin,
postType = PostType.Notice, postType = PostType.Notice,
@ -296,6 +299,7 @@ internal object GlobalEventTransmitter: BaseSvc() {
tip = tipText, tip = tipText,
groupId = groupCode groupId = groupCode
)) ))
return true
} }
} }
@ -303,8 +307,8 @@ internal object GlobalEventTransmitter: BaseSvc() {
* 私聊通知 通知器 * 私聊通知 通知器
*/ */
object PrivateNoticeTransmitter { object PrivateNoticeTransmitter {
fun transPrivatePoke(msgTime: Long, operation: Long, target: Long): Boolean { suspend fun transPrivatePoke(msgTime: Long, operation: Long, target: Long): Boolean {
return pushNotice(NoticeEvent( pushNotice(NoticeEvent(
time = msgTime, time = msgTime,
selfId = app.longAccountUin, selfId = app.longAccountUin,
postType = PostType.Notice, postType = PostType.Notice,
@ -315,10 +319,11 @@ internal object GlobalEventTransmitter: BaseSvc() {
senderId = operation, senderId = operation,
target = target target = target
)) ))
return true
} }
fun transPrivateRecall(time: Long, operation: Long, msgHashId: Int, tipText: String): Boolean { suspend fun transPrivateRecall(time: Long, operation: Long, msgHashId: Int, tipText: String): Boolean {
return pushNotice(NoticeEvent( pushNotice(NoticeEvent(
time = time, time = time,
selfId = app.longAccountUin, selfId = app.longAccountUin,
postType = PostType.Notice, postType = PostType.Notice,
@ -329,16 +334,17 @@ internal object GlobalEventTransmitter: BaseSvc() {
msgId = msgHashId, msgId = msgHashId,
tip = tipText tip = tipText
)) ))
return true
} }
} }
@ShamrockDsl @ShamrockDsl
suspend fun onMessageEvent(collector: FlowCollector<Pair<MsgRecord, MessageEvent>>) { suspend inline fun onMessageEvent(collector: FlowCollector<Pair<MsgRecord, MessageEvent>>) {
messageEventFlow.collect(collector) messageEventFlow.collect(collector)
} }
@ShamrockDsl @ShamrockDsl
suspend fun onNoticeEvent(collector: FlowCollector<NoticeEvent>) { suspend inline fun onNoticeEvent(collector: FlowCollector<NoticeEvent>) {
noticeEventFlow.collect(collector) noticeEventFlow.collect(collector)
} }
} }

View File

@ -1,5 +1,8 @@
@file:OptIn(DelicateCoroutinesApi::class)
package moe.fuqiuluo.shamrock.remote.service.api package moe.fuqiuluo.shamrock.remote.service.api
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.serialization.encodeToString import kotlinx.serialization.encodeToString
@ -71,7 +74,7 @@ internal abstract class WebSocketTransmitServlet(
if (path != "/api") { if (path != "/api") {
eventReceivers.remove(conn) eventReceivers.remove(conn)
} }
LogCenter.log({ "WSServer断开(${conn.remoteSocketAddress.address.hostAddress}:${conn.remoteSocketAddress.port}$path): $code,$reason,$remote" }, Level.DEBUG) LogCenter.log({ "WSServer断开(${conn.remoteSocketAddress.address.hostAddress}:${conn.remoteSocketAddress.port}$path): $code,$reason,$remote" }, Level.WARN)
} }
override fun onMessage(conn: WebSocket, message: String) { override fun onMessage(conn: WebSocket, message: String) {
@ -105,8 +108,8 @@ internal abstract class WebSocketTransmitServlet(
} }
override fun onStart() { override fun onStart() {
initTransmitter()
LogCenter.log("WSServer start running on ws://0.0.0.0:$port!") LogCenter.log("WSServer start running on ws://0.0.0.0:$port!")
initTransmitter()
} }
protected inline fun <reified T> pushTo(body: T) { protected inline fun <reified T> pushTo(body: T) {

View File

@ -55,7 +55,7 @@ internal object AioListener: IKernelMsgListener {
if(!GlobalEventTransmitter.MessageTransmitter.transGroupMessage( if(!GlobalEventTransmitter.MessageTransmitter.transGroupMessage(
record, record.elements, rawMsg, msgHash record, record.elements, rawMsg, msgHash
)) { )) {
LogCenter.log("群消息推送失败 -> MessageTransmitter", Level.WARN) LogCenter.log("群消息推送失败 -> 推送目标可能不存在", Level.WARN)
} }
} }
MsgConstant.KCHATTYPEC2C -> { MsgConstant.KCHATTYPEC2C -> {

View File

@ -69,7 +69,7 @@ internal object PrimitiveListener {
} }
} }
private fun onC2cPoke(msgTime: Long, pb: ProtoMap) { private suspend fun onC2cPoke(msgTime: Long, pb: ProtoMap) {
val detail = pb[1, 3, 2] val detail = pb[1, 3, 2]
if (detail !is ProtoMap) { if (detail !is ProtoMap) {
error("不支持该私聊戳一戳解析: ${(detail as ProtoByteString).toByteArray().toHexString()}") error("不支持该私聊戳一戳解析: ${(detail as ProtoByteString).toByteArray().toHexString()}")
@ -95,7 +95,7 @@ internal object PrimitiveListener {
} }
} }
private fun onGroupPoke(time: Long, pb: ProtoMap) { private suspend fun onGroupPoke(time: Long, pb: ProtoMap) {
val groupCode1 = pb[1, 1, 1].asULong val groupCode1 = pb[1, 1, 1].asULong
var groupCode: Long = groupCode1 var groupCode: Long = groupCode1