Shamrock: Rewrite timeout message sending

This commit is contained in:
白池 2024-01-30 18:23:48 +08:00
parent e0e7a9fc2e
commit 72c3c7bdf7
6 changed files with 93 additions and 55 deletions

View File

@ -23,6 +23,7 @@ import moe.fuqiuluo.shamrock.helper.Level
import moe.fuqiuluo.shamrock.helper.LogCenter import moe.fuqiuluo.shamrock.helper.LogCenter
import moe.fuqiuluo.shamrock.helper.MessageHelper import moe.fuqiuluo.shamrock.helper.MessageHelper
import moe.fuqiuluo.shamrock.helper.SendMsgException import moe.fuqiuluo.shamrock.helper.SendMsgException
import moe.fuqiuluo.shamrock.remote.structures.SendMsgResult
import moe.fuqiuluo.shamrock.tools.EMPTY_BYTE_ARRAY import moe.fuqiuluo.shamrock.tools.EMPTY_BYTE_ARRAY
import moe.fuqiuluo.shamrock.xposed.helper.NTServiceFetcher import moe.fuqiuluo.shamrock.xposed.helper.NTServiceFetcher
import moe.fuqiuluo.shamrock.xposed.helper.msgService import moe.fuqiuluo.shamrock.xposed.helper.msgService
@ -170,8 +171,7 @@ internal object MsgSvc: BaseSvc() {
peedId: String, peedId: String,
message: JsonArray, message: JsonArray,
fromId: String = peedId, fromId: String = peedId,
retryCnt: Int = 3 ): Result<SendMsgResult> {
): Result<Pair<Long, Int>> {
// 主动临时消息 // 主动临时消息
when (chatType) { when (chatType) {
MsgConstant.KCHATTYPETEMPC2CFROMGROUP -> { MsgConstant.KCHATTYPETEMPC2CFROMGROUP -> {
@ -181,14 +181,17 @@ internal object MsgSvc: BaseSvc() {
} }
} }
} }
val result = MessageHelper.sendMessageWithoutMsgId(chatType, peedId, message, fromId, MessageCallback(peedId, 0)) val result = MessageHelper.sendMessageWithoutMsgId(chatType, peedId, message, fromId, MessageCallback(peedId, 0))
return if (result.isFailure result.onFailure {
&& result.exceptionOrNull()?.javaClass == SendMsgException::class.java LogCenter.log(it.stackTraceToString(), Level.ERROR)
&& retryCnt > 0) { return result
}
val sendResult = result.getOrThrow()
return if (sendResult.isTimeout) {
// 发送失败,可能网络问题出现红色感叹号,重试 // 发送失败,可能网络问题出现红色感叹号,重试
// 例如 rich media transfer failed // 例如 rich media transfer failed
delay(100) delay(100)
sendToAio(chatType, peedId, message, fromId, retryCnt - 1) MessageHelper.resendMsg(chatType, peedId, fromId, sendResult.qqMsgId, 3, sendResult.msgHashId)
} else { } else {
result result
} }

View File

@ -19,6 +19,7 @@ import moe.fuqiuluo.qqinterface.servlet.MsgSvc
import moe.fuqiuluo.qqinterface.servlet.msg.MessageMaker import moe.fuqiuluo.qqinterface.servlet.msg.MessageMaker
import moe.fuqiuluo.shamrock.helper.db.MessageDB import moe.fuqiuluo.shamrock.helper.db.MessageDB
import moe.fuqiuluo.shamrock.helper.db.MessageMapping import moe.fuqiuluo.shamrock.helper.db.MessageMapping
import moe.fuqiuluo.shamrock.remote.structures.SendMsgResult
import moe.fuqiuluo.shamrock.tools.EmptyJsonObject import moe.fuqiuluo.shamrock.tools.EmptyJsonObject
import moe.fuqiuluo.shamrock.tools.asJsonObject import moe.fuqiuluo.shamrock.tools.asJsonObject
import moe.fuqiuluo.shamrock.tools.asJsonObjectOrNull import moe.fuqiuluo.shamrock.tools.asJsonObjectOrNull
@ -36,13 +37,13 @@ internal object MessageHelper {
message: String, message: String,
callback: IOperateCallback, callback: IOperateCallback,
fromId: String = peerId fromId: String = peerId
): Pair<Long, Int> { ): SendMsgResult {
val uniseq = generateMsgId(chatType) val uniseq = generateMsgId(chatType)
val msg = messageArrayToMessageElements(chatType, uniseq.second, peerId, decodeCQCode(message)).also { val msg = messageArrayToMessageElements(chatType, uniseq.qqMsgId, peerId, decodeCQCode(message)).also {
if (it.second.isEmpty() && !it.first) { if (it.second.isEmpty() && !it.first) {
error("消息合成失败,请查看日志或者检查输入。") error("消息合成失败,请查看日志或者检查输入。")
} else if (it.second.isEmpty()) { } else if (it.second.isEmpty()) {
return System.currentTimeMillis() to 0 return uniseq.copy(msgHashId = 0, msgTime = System.currentTimeMillis())
} }
}.second.filter { }.second.filter {
it.elementType != -1 it.elementType != -1
@ -50,6 +51,28 @@ internal object MessageHelper {
return sendMessageWithoutMsgId(chatType, peerId, msg, fromId, callback) return sendMessageWithoutMsgId(chatType, peerId, msg, fromId, callback)
} }
suspend fun resendMsg(chatType: Int, peerId: String, fromId: String, msgId: Long, retryCnt: Int, msgHashId: Int): Result<SendMsgResult> {
val contact = generateContact(chatType, peerId, fromId)
return resendMsg(contact, msgId, retryCnt, msgHashId)
}
suspend fun resendMsg(contact: Contact, msgId: Long, retryCnt: Int, msgHashId: Int): Result<SendMsgResult> {
if (retryCnt < 0) return Result.failure(SendMsgException("消息发送超时次数过多"))
val service = QRoute.api(IMsgService::class.java)
val result = withTimeoutOrNull(15000) {
if(suspendCancellableCoroutine {
service.resendMsg(contact, msgId) { result, _ ->
it.resume(result)
}
} != 0) {
resendMsg(contact, msgId, retryCnt - 1, msgHashId)
} else {
Result.success(SendMsgResult(msgHashId, msgId, System.currentTimeMillis()))
}
}
return result ?: resendMsg(contact, msgId, retryCnt - 1, msgHashId)
}
@OptIn(DelicateCoroutinesApi::class) @OptIn(DelicateCoroutinesApi::class)
suspend fun sendMessageWithoutMsgId( suspend fun sendMessageWithoutMsgId(
chatType: Int, chatType: Int,
@ -57,9 +80,9 @@ internal object MessageHelper {
message: JsonArray, message: JsonArray,
fromId: String = peerId, fromId: String = peerId,
callback: IOperateCallback callback: IOperateCallback
): Result<Pair<Long, Int>> { ): Result<SendMsgResult> {
val uniseq = generateMsgId(chatType) val uniseq = generateMsgId(chatType)
val msg = messageArrayToMessageElements(chatType, uniseq.second, peerId, message).also { val msg = messageArrayToMessageElements(chatType, uniseq.qqMsgId, peerId, message).also {
if (it.second.isEmpty() && !it.first) error("消息合成失败,请查看日志或者检查输入。") if (it.second.isEmpty() && !it.first) error("消息合成失败,请查看日志或者检查输入。")
}.second.filter { }.second.filter {
it.elementType != -1 it.elementType != -1
@ -67,7 +90,7 @@ internal object MessageHelper {
// ActionMsg No Care // ActionMsg No Care
if (msg.isEmpty()) { if (msg.isEmpty()) {
return Result.success(System.currentTimeMillis() to 0) return Result.success(uniseq.copy(msgTime = System.currentTimeMillis(), msgHashId = 0))
} }
val totalSize = msg.filter { val totalSize = msg.filter {
@ -78,13 +101,13 @@ internal object MessageHelper {
(it.picElement?.fileSize ?: 0) + (it.pttElement?.fileSize (it.picElement?.fileSize ?: 0) + (it.pttElement?.fileSize
?: 0) + (it.videoElement?.fileSize ?: 0) ?: 0) + (it.videoElement?.fileSize ?: 0)
}.reduceOrNull { a, b -> a + b } ?: 0 }.reduceOrNull { a, b -> a + b } ?: 0
val estimateTime = (totalSize / (300 * 1024)) * 1000 + 2000 val estimateTime = (totalSize / (300 * 1024)) * 1000 + 2000
lateinit var sendResultPair: Pair<Long, Int> lateinit var sendResult: SendMsgResult // msgTime to msgHash
val sendRet = withTimeoutOrNull<Pair<Int, String>>(estimateTime) { val sendRet = withTimeoutOrNull<Pair<Int, String>>(estimateTime) {
suspendCancellableCoroutine { suspendCancellableCoroutine {
GlobalScope.launch { GlobalScope.launch {
sendResultPair = sendMessageWithoutMsgId( sendResult = sendMessageWithoutMsgId(
chatType, chatType,
peerId, peerId,
msg, msg,
@ -96,10 +119,12 @@ internal object MessageHelper {
} }
} }
} }
if (sendRet?.first != 0) { if (sendRet?.first != 0) {
return Result.failure(SendMsgException(sendRet?.second ?: "发送消息超时")) //return Result.failure(SendMsgException(sendRet?.second ?: "发送消息超时"))
return Result.success(uniseq.copy(isTimeout = true))
} }
return Result.success(sendResultPair) return Result.success(sendResult)
} }
suspend fun sendMessageWithoutMsgId( suspend fun sendMessageWithoutMsgId(
@ -108,7 +133,7 @@ internal object MessageHelper {
message: ArrayList<MsgElement>, message: ArrayList<MsgElement>,
fromId: String = peerId, fromId: String = peerId,
callback: IOperateCallback callback: IOperateCallback
): Pair<Long, Int> { ): SendMsgResult {
return sendMessageWithoutMsgId(generateContact(chatType, peerId, fromId), message, callback) return sendMessageWithoutMsgId(generateContact(chatType, peerId, fromId), message, callback)
} }
@ -116,24 +141,25 @@ internal object MessageHelper {
contact: Contact, contact: Contact,
message: ArrayList<MsgElement>, message: ArrayList<MsgElement>,
callback: IOperateCallback callback: IOperateCallback
): Pair<Long, Int> { ): SendMsgResult {
val uniseq = generateMsgId(contact.chatType) val uniseq = generateMsgId(contact.chatType)
val nonMsg: Boolean = message.isEmpty() val nonMsg: Boolean = message.isEmpty()
return if (!nonMsg) { return if (!nonMsg) {
val service = QRoute.api(IMsgService::class.java) val service = QRoute.api(IMsgService::class.java)
if (callback is MsgSvc.MessageCallback) { if (callback is MsgSvc.MessageCallback) {
callback.msgHash = uniseq.first callback.msgHash = uniseq.msgHashId
} }
service.sendMsg( service.sendMsg(
contact, contact,
uniseq.second, uniseq.qqMsgId,
message, message,
callback callback
) )
System.currentTimeMillis() to uniseq.first
uniseq.copy(msgTime = System.currentTimeMillis())
} else { } else {
System.currentTimeMillis() to 0 uniseq.copy(msgHashId = 0, msgTime = System.currentTimeMillis())
} }
} }
@ -143,9 +169,9 @@ internal object MessageHelper {
message: JsonArray, message: JsonArray,
callback: IOperateCallback, callback: IOperateCallback,
fromId: String = peerId fromId: String = peerId
): Pair<Long, Int> { ): SendMsgResult {
val uniseq = generateMsgId(chatType) val uniseq = generateMsgId(chatType)
val msg = messageArrayToMessageElements(chatType, uniseq.second, peerId, message).also { val msg = messageArrayToMessageElements(chatType, uniseq.qqMsgId, peerId, message).also {
if (it.second.isEmpty() && !it.first) error("消息合成失败,请查看日志或者检查输入。") if (it.second.isEmpty() && !it.first) error("消息合成失败,请查看日志或者检查输入。")
}.second.filter { }.second.filter {
it.elementType != -1 it.elementType != -1
@ -155,18 +181,18 @@ internal object MessageHelper {
return if (!nonMsg) { return if (!nonMsg) {
val service = QRoute.api(IMsgService::class.java) val service = QRoute.api(IMsgService::class.java)
if (callback is MsgSvc.MessageCallback) { if (callback is MsgSvc.MessageCallback) {
callback.msgHash = uniseq.first callback.msgHash = uniseq.msgHashId
} }
service.sendMsg( service.sendMsg(
contact, contact,
uniseq.second, uniseq.qqMsgId,
msg, msg,
callback callback
) )
uniseq.second to uniseq.first uniseq.copy(msgTime = System.currentTimeMillis())
} else { } else {
uniseq.second to 0 uniseq.copy(msgHashId = 0, msgTime = System.currentTimeMillis())
} }
} }
@ -174,24 +200,25 @@ internal object MessageHelper {
contact: Contact, contact: Contact,
message: ArrayList<MsgElement>, message: ArrayList<MsgElement>,
callback: IOperateCallback callback: IOperateCallback
): Pair<Long, Int> { ): SendMsgResult {
val uniseq = generateMsgId(contact.chatType) val uniseq = generateMsgId(contact.chatType)
val nonMsg: Boolean = message.isEmpty() val nonMsg: Boolean = message.isEmpty()
return if (!nonMsg) { return if (!nonMsg) {
val service = QRoute.api(IMsgService::class.java) val service = QRoute.api(IMsgService::class.java)
if (callback is MsgSvc.MessageCallback) { if (callback is MsgSvc.MessageCallback) {
callback.msgHash = uniseq.first callback.msgHash = uniseq.msgHashId
} }
service.sendMsg( service.sendMsg(
contact, contact,
uniseq.second, uniseq.qqMsgId,
message, message,
callback callback
) )
uniseq.second to uniseq.first
uniseq.copy(msgTime = System.currentTimeMillis())
} else { } else {
0L to 0 uniseq.copy(msgTime = 0, msgHashId = 0)
} }
} }
@ -200,24 +227,23 @@ internal object MessageHelper {
peerId: String, peerId: String,
message: JsonArray, message: JsonArray,
fromId: String = peerId fromId: String = peerId
): Pair<Int, Long> { ): SendMsgResult {
val uniseq = generateMsgId(chatType) val uniseq = generateMsgId(chatType)
val msg = messageArrayToMessageElements(chatType, uniseq.second, peerId, message).also { val msg = messageArrayToMessageElements(chatType, uniseq.qqMsgId, peerId, message).also {
if (it.second.isEmpty() && !it.first) error("消息合成失败,请查看日志或者检查输入。") if (it.second.isEmpty() && !it.first) error("消息合成失败,请查看日志或者检查输入。")
}.second.filter { }.second.filter {
it.elementType != -1 it.elementType != -1
} as ArrayList<MsgElement> } as ArrayList<MsgElement>
val contact = generateContact(chatType, peerId, fromId) val contact = generateContact(chatType, peerId, fromId)
val nonMsg: Boolean = message.isEmpty() return if (!message.isEmpty()) {
return if (!nonMsg) {
val service = QRoute.api(IMsgService::class.java) val service = QRoute.api(IMsgService::class.java)
return suspendCoroutine { return suspendCancellableCoroutine {
service.sendMsg(contact, uniseq.second, msg) { code, why -> service.sendMsg(contact, uniseq.qqMsgId, msg) { code, why ->
it.resume(code to uniseq.second) it.resume(uniseq.copy(msgTime = System.currentTimeMillis()))
} }
} }
} else { } else {
-1 to uniseq.second uniseq.copy(msgHashId = 0, msgTime = 0)
} }
} }
@ -287,10 +313,10 @@ internal object MessageHelper {
return abs(key.hashCode()) return abs(key.hashCode())
} }
fun generateMsgId(chatType: Int): Pair<Int, Long> { fun generateMsgId(chatType: Int): SendMsgResult {
val msgId = createMessageUniseq(chatType, System.currentTimeMillis()) val msgId = createMessageUniseq(chatType, System.currentTimeMillis())
val hashCode: Int = generateMsgIdHash(chatType, msgId) val hashCode: Int = generateMsgIdHash(chatType, msgId)
return hashCode to msgId return SendMsgResult(hashCode, msgId, 0)
} }
fun getMsgMappingByHash(hash: Int): MessageMapping? { fun getMsgMappingByHash(hash: Int): MessageMapping? {

View File

@ -145,10 +145,11 @@ internal object SendForwardMessage : IActionHandler() {
}.json }.json
val result = MessageHelper.sendMessageNoCb(MsgConstant.KCHATTYPEC2C, selfUin, content) val result = MessageHelper.sendMessageNoCb(MsgConstant.KCHATTYPEC2C, selfUin, content)
if (result.first != 0) { if (result.qqMsgId == 0L) {
LogCenter.log("合并转发消息节点消息发送失败", Level.WARN) LogCenter.log("合并转发消息节点消息发送失败", Level.WARN)
return@map null
} }
result.second to node.first result.qqMsgId to node.first
} }
}.filterNotNull() }.filterNotNull()
@ -158,11 +159,11 @@ internal object SendForwardMessage : IActionHandler() {
val uniseq = MessageHelper.generateMsgId(chatType) val uniseq = MessageHelper.generateMsgId(chatType)
msgService.multiForwardMsg(ArrayList<MultiMsgInfo>().apply { msgService.multiForwardMsg(ArrayList<MultiMsgInfo>().apply {
multiNodes.forEach { add(MultiMsgInfo(it.first, it.second)) } multiNodes.forEach { add(MultiMsgInfo(it.first, it.second)) }
}.also { it.reverse() }, from, to, MsgSvc.MessageCallback(peerId, uniseq.first)) }.also { it.reverse() }, from, to, MsgSvc.MessageCallback(peerId, uniseq.msgHashId))
return ok( return ok(
ForwardMessageResult( ForwardMessageResult(
msgId = uniseq.first, msgId = uniseq.msgHashId,
forwardId = "" forwardId = ""
), echo = echo ), echo = echo
) )

View File

@ -108,12 +108,12 @@ internal object UploadGroupFile : IActionHandler() {
val contact = MessageHelper.generateContact(MsgConstant.KCHATTYPEGROUP, groupId) val contact = MessageHelper.generateContact(MsgConstant.KCHATTYPEGROUP, groupId)
suspendCancellableCoroutine<FileTransNotifyInfo?> { suspendCancellableCoroutine<FileTransNotifyInfo?> {
msgService.sendMsgWithMsgId( msgService.sendMsgWithMsgId(
contact, msgIdPair.second, arrayListOf(msgElement) contact, msgIdPair.qqMsgId, arrayListOf(msgElement)
) { code, reason -> ) { code, reason ->
LogCenter.log("群文件消息发送异常(code = $code, reason = $reason)") LogCenter.log("群文件消息发送异常(code = $code, reason = $reason)")
it.resume(null) it.resume(null)
} }
RichMediaUploadHandler.registerListener(msgIdPair.second) { RichMediaUploadHandler.registerListener(msgIdPair.qqMsgId) {
it.resume(this) it.resume(this)
return@registerListener true return@registerListener true
} }
@ -125,7 +125,7 @@ internal object UploadGroupFile : IActionHandler() {
}.commonFileInfo }.commonFileInfo
return ok(data = FileUploadResult( return ok(data = FileUploadResult(
msgHash = msgIdPair.first, msgHash = msgIdPair.msgHashId,
bizid = info.bizType ?: 0, bizid = info.bizType ?: 0,
md5 = info.md5, md5 = info.md5,
sha = info.sha, sha = info.sha,

View File

@ -109,12 +109,12 @@ internal object UploadPrivateFile : IActionHandler() {
val contact = MessageHelper.generateContact(MsgConstant.KCHATTYPEC2C, userId) val contact = MessageHelper.generateContact(MsgConstant.KCHATTYPEC2C, userId)
suspendCancellableCoroutine<FileTransNotifyInfo?> { suspendCancellableCoroutine<FileTransNotifyInfo?> {
msgService.sendMsgWithMsgId( msgService.sendMsgWithMsgId(
contact, msgIdPair.second, arrayListOf(msgElement) contact, msgIdPair.qqMsgId, arrayListOf(msgElement)
) { code, reason -> ) { code, reason ->
LogCenter.log("私聊文件消息发送异常(code = $code, reason = $reason)") LogCenter.log("私聊文件消息发送异常(code = $code, reason = $reason)")
it.resume(null) it.resume(null)
} }
RichMediaUploadHandler.registerListener(msgIdPair.second) { RichMediaUploadHandler.registerListener(msgIdPair.qqMsgId) {
it.resume(this) it.resume(this)
return@registerListener true return@registerListener true
} }
@ -126,7 +126,7 @@ internal object UploadPrivateFile : IActionHandler() {
}.commonFileInfo }.commonFileInfo
return ok(data = UploadGroupFile.FileUploadResult( return ok(data = UploadGroupFile.FileUploadResult(
msgHash = msgIdPair.first, msgHash = msgIdPair.msgHashId,
bizid = info.bizType ?: 0, bizid = info.bizType ?: 0,
md5 = info.md5, md5 = info.md5,
sha = info.sha, sha = info.sha,

View File

@ -0,0 +1,8 @@
package moe.fuqiuluo.shamrock.remote.structures
data class SendMsgResult(
val msgHashId: Int,
val qqMsgId: Long,
var msgTime: Long,
var isTimeout: Boolean = false
)