diff --git a/client/c2c_processor.go b/client/c2c_processor.go index f353e2e0..f71d2b71 100644 --- a/client/c2c_processor.go +++ b/client/c2c_processor.go @@ -117,6 +117,20 @@ func privateMessageDecoder(c *QQClient, pMsg *msg.Message, _ *network.IncomingPa if pMsg.Body.RichText == nil || pMsg.Body.RichText.Elems == nil { return } + + // handle fragmented message + if pMsg.Content != nil && pMsg.Content.GetPkgNum() > 1 { + seq := pMsg.Content.GetDivSeq() + builder := c.messageBuilder(seq) + builder.append(pMsg) + if builder.len() < pMsg.Content.GetPkgNum() { + // continue to receive other fragments + return + } + c.msgBuilders.Delete(seq) + pMsg = builder.build() + } + if pMsg.Head.GetFromUin() == c.Uin { c.dispatchPrivateMessageSelf(c.parsePrivateMessage(pMsg)) return diff --git a/client/client.go b/client/client.go index 404a9d39..cebb22e1 100644 --- a/client/client.go +++ b/client/client.go @@ -82,14 +82,14 @@ type QQClient struct { lastC2CMsgTime int64 transCache *utils.Cache groupSysMsgCache *GroupSystemMessages - groupMsgBuilders sync.Map + msgBuilders sync.Map onlinePushCache *utils.Cache heartbeatEnabled bool requestPacketRequestID atomic.Int32 groupSeq atomic.Int32 friendSeq atomic.Int32 highwayApplyUpSeq atomic.Int32 - eventHandlers *eventHandlers + eventHandlers eventHandlers groupListLock sync.Mutex } @@ -165,7 +165,6 @@ func NewClientMd5(uin int64, passwordMd5 [16]byte) *QQClient { sig: &auth.SigInfo{ OutPacketSessionID: []byte{0x02, 0xB0, 0x5B, 0x8B}, }, - eventHandlers: &eventHandlers{}, msgSvcCache: utils.NewCache(time.Second * 15), transCache: utils.NewCache(time.Second * 15), onlinePushCache: utils.NewCache(time.Second * 15), diff --git a/client/global.go b/client/global.go index 270af0ee..6c237e38 100644 --- a/client/global.go +++ b/client/global.go @@ -9,6 +9,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/pkg/errors" @@ -26,10 +27,6 @@ import ( type ( DeviceInfo = auth.Device Version = auth.OSVersion - - groupMessageBuilder struct { - MessageSlices []*msg.Message - } ) var SystemDeviceInfo = &DeviceInfo{ @@ -246,12 +243,43 @@ func (c *QQClient) parseTempMessage(msg *msg.Message) *message.TempMessage { } } -func (b *groupMessageBuilder) build() *msg.Message { - sort.Slice(b.MessageSlices, func(i, j int) bool { - return b.MessageSlices[i].Content.GetPkgIndex() < b.MessageSlices[j].Content.GetPkgIndex() +func (c *QQClient) messageBuilder(seq int32) *messageBuilder { + builder := &messageBuilder{} + actual, ok := c.msgBuilders.LoadOrStore(seq, builder) + if !ok { + time.AfterFunc(time.Minute, func() { + c.msgBuilders.Delete(seq) // delete avoid memory leak + }) + } + return actual.(*messageBuilder) +} + +type messageBuilder struct { + lock sync.Mutex + slices []*msg.Message +} + +func (b *messageBuilder) append(msg *msg.Message) { + b.lock.Lock() + defer b.lock.Unlock() + b.slices = append(b.slices, msg) +} + +func (b *messageBuilder) len() int32 { + b.lock.Lock() + x := len(b.slices) + b.lock.Unlock() + return int32(x) +} + +func (b *messageBuilder) build() *msg.Message { + b.lock.Lock() + defer b.lock.Unlock() + sort.Slice(b.slices, func(i, j int) bool { + return b.slices[i].Content.GetPkgIndex() < b.slices[j].Content.GetPkgIndex() }) - base := b.MessageSlices[0] - for _, m := range b.MessageSlices[1:] { + base := b.slices[0] + for _, m := range b.slices[1:] { base.Body.RichText.Elems = append(base.Body.RichText.Elems, m.Body.RichText.Elems...) } return base diff --git a/client/group_msg.go b/client/group_msg.go index 95e2e0d7..c6478888 100644 --- a/client/group_msg.go +++ b/client/group_msg.go @@ -311,17 +311,11 @@ func decodeGroupMessagePacket(c *QQClient, _ *network.IncomingPacketInfo, payloa }) } if pkt.Message.Content != nil && pkt.Message.Content.GetPkgNum() > 1 { - var builder *groupMessageBuilder - i, ok := c.groupMsgBuilders.Load(pkt.Message.Content.GetDivSeq()) - if !ok { - builder = &groupMessageBuilder{} - c.groupMsgBuilders.Store(pkt.Message.Content.GetDivSeq(), builder) - } else { - builder = i.(*groupMessageBuilder) - } - builder.MessageSlices = append(builder.MessageSlices, pkt.Message) - if int32(len(builder.MessageSlices)) >= pkt.Message.Content.GetPkgNum() { - c.groupMsgBuilders.Delete(pkt.Message.Content.GetDivSeq()) + seq := pkt.Message.Content.GetDivSeq() + builder := c.messageBuilder(pkt.Message.Content.GetDivSeq()) + builder.append(pkt.Message) + if builder.len() >= pkt.Message.Content.GetPkgNum() { + c.msgBuilders.Delete(seq) if pkt.Message.Head.GetFromUin() == c.Uin { c.dispatchGroupMessageSelf(c.parseGroupMessage(builder.build())) } else { @@ -371,7 +365,7 @@ func decodeGetGroupMsgResponse(c *QQClient, info *network.IncomingPacketInfo, pa if m.Content.GetPkgIndex() == 0 { c.Debug("build fragmented message from history") i := m.Head.GetMsgSeq() - m.Content.GetPkgNum() - builder := &groupMessageBuilder{} + builder := &messageBuilder{} for { end := int32(math.Min(float64(i+19), float64(m.Head.GetMsgSeq()+m.Content.GetPkgNum()))) seq, pkt := c.buildGetGroupMsgRequest(m.Head.GroupInfo.GetGroupCode(), int64(i), int64(end)) @@ -381,7 +375,7 @@ func decodeGetGroupMsgResponse(c *QQClient, info *network.IncomingPacketInfo, pa } for _, fm := range data.([]*message.GroupMessage) { if fm.OriginalObject.Content != nil && fm.OriginalObject.Content.GetDivSeq() == m.Content.GetDivSeq() { - builder.MessageSlices = append(builder.MessageSlices, fm.OriginalObject) + builder.append(fm.OriginalObject) } } if end >= m.Head.GetMsgSeq()+m.Content.GetPkgNum() { diff --git a/client/image.go b/client/image.go index bf654f58..808ae5d9 100644 --- a/client/image.go +++ b/client/image.go @@ -126,7 +126,7 @@ func (c *QQClient) UploadGroupImageByFile(groupCode int64, path string) (*messag Ticket: rsp.UploadKey, Ext: EmptyBytes, Encrypt: false, - }, 1); err == nil { + }, 4); err == nil { goto ok } return nil, errors.Wrap(err, "upload failed")