From 987daad785839a9cb0425a27edc90a0a695f5c84 Mon Sep 17 00:00:00 2001 From: wdvxdr Date: Tue, 22 Feb 2022 14:45:50 +0800 Subject: [PATCH] coolq: allow upload media concurrently for normal message --- coolq/api.go | 29 +++++--------- coolq/bot.go | 107 ++++++++++++++++++++++++++++++++++++--------------- 2 files changed, 84 insertions(+), 52 deletions(-) diff --git a/coolq/api.go b/coolq/api.go index 621a666..6cbdde6 100644 --- a/coolq/api.go +++ b/coolq/api.go @@ -12,7 +12,6 @@ import ( "runtime" "strconv" "strings" - "sync" "time" "github.com/segmentio/asm/base64" @@ -793,32 +792,27 @@ func (bot *CQBot) uploadForwardElement(m gjson.Result, groupID int64) *message.F fm := message.NewForwardMessage() source := message.Source{SourceType: message.SourceGroup, PrimaryID: groupID} - var lazyUpload []func() - var wg sync.WaitGroup + var w worker resolveElement := func(elems []message.IMessageElement) []message.IMessageElement { for i, elem := range elems { - iescape := i + p := &elems[i] switch o := elem.(type) { case *LocalVideoElement: - wg.Add(1) - lazyUpload = append(lazyUpload, func() { - defer wg.Done() + w.do(func() { gm, err := bot.uploadLocalVideo(source, o) if err != nil { - log.Warnf("警告: 群 %d %s上传失败: %v", groupID, o.Type().String(), err) + log.Warnf(uploadFailedTemplate, "群", groupID, "视频", err) } else { - elems[iescape] = gm + *p = gm } }) case *LocalImageElement: - wg.Add(1) - lazyUpload = append(lazyUpload, func() { - defer wg.Done() + w.do(func() { gm, err := bot.uploadLocalImage(source, o) if err != nil { - log.Warnf("警告: 群 %d %s上传失败: %v", groupID, o.Type().String(), err) + log.Warnf(uploadFailedTemplate, "群", groupID, "图片", err) } else { - elems[iescape] = gm + *p = gm } }) } @@ -903,12 +897,7 @@ func (bot *CQBot) uploadForwardElement(m gjson.Result, groupID int64) *message.F fm.AddNode(node) } } - - for _, upload := range lazyUpload { - go upload() - } - wg.Wait() - + w.wait() return bot.Client.UploadGroupForwardMessage(groupID, fm) } diff --git a/coolq/bot.go b/coolq/bot.go index 00a53c8..529b5fd 100644 --- a/coolq/bot.go +++ b/coolq/bot.go @@ -129,6 +129,22 @@ func (bot *CQBot) OnEventPush(f func(e *Event)) { bot.lock.Unlock() } +type worker struct { + wg sync.WaitGroup +} + +func (w *worker) do(f func()) { + w.wg.Add(1) + go func() { + defer w.wg.Done() + f() + }() +} + +func (w *worker) wait() { + w.wg.Wait() +} + // uploadLocalImage 上传本地图片 func (bot *CQBot) uploadLocalImage(target message.Source, img *LocalImageElement) (i message.IMessageElement, err error) { if img.File != "" { @@ -163,41 +179,72 @@ func (bot *CQBot) uploadLocalVideo(target message.Source, v *LocalVideoElement) return bot.Client.UploadShortVideo(target, video, v.thumb, 4) } -func (bot *CQBot) uploadMedia(target message.Source, elements []message.IMessageElement) []message.IMessageElement { +func removeLocalElement(elements []message.IMessageElement) []message.IMessageElement { var j int - for _, m := range elements { - raw := m // upload failed will make m nil, so copy it - var err error - switch e := m.(type) { - case *LocalImageElement: - m, err = bot.uploadLocalImage(target, e) - case *message.VoiceElement: - if target.SourceType == message.SourceGuildChannel { - continue // todo + for i, e := range elements { + switch e.(type) { + case *LocalImageElement, *LocalVideoElement: + case *message.VoiceElement: // 未上传的语音消息, 也删除 + default: + if j < i { + elements[j] = e } - m, err = bot.Client.UploadVoice(target, bytes.NewReader(e.Data)) - case *LocalVideoElement: - m, err = bot.uploadLocalVideo(target, e) + j++ } - if err != nil { - var source string - switch target.SourceType { // nolint:exhaustive - case message.SourceGroup: - source = "群" - case message.SourcePrivate: - source = "私聊" - case message.SourceGuildChannel: - source = "频道" - } - log.Warnf("警告: %s %d %s上传失败: %v", source, target.PrimaryID, raw.Type().String(), err) - continue - } - elements[j] = m - j++ } return elements[:j] } +const uploadFailedTemplate = "警告: %s %d %s上传失败: %v" + +func (bot *CQBot) uploadMedia(target message.Source, elements []message.IMessageElement) []message.IMessageElement { + var w worker + var source string + switch target.SourceType { // nolint:exhaustive + case message.SourceGroup: + source = "群" + case message.SourcePrivate: + source = "私聊" + case message.SourceGuildChannel: + source = "频道" + } + + for i, m := range elements { + p := &elements[i] + switch e := m.(type) { + case *LocalImageElement: + w.do(func() { + m, err := bot.uploadLocalImage(target, e) + if err != nil { + log.Warnf(uploadFailedTemplate, source, target.PrimaryID, "图片", err) + } else { + *p = m + } + }) + case *message.VoiceElement: + w.do(func() { + m, err := bot.Client.UploadVoice(target, bytes.NewReader(e.Data)) + if err != nil { + log.Warnf(uploadFailedTemplate, source, target.PrimaryID, "语音", err) + } else { + *p = m + } + }) + case *LocalVideoElement: + w.do(func() { + m, err := bot.uploadLocalVideo(target, e) + if err != nil { + log.Warnf(uploadFailedTemplate, source, target.PrimaryID, "视频", err) + } else { + *p = m + } + }) + } + } + w.wait() + return removeLocalElement(elements) +} + // SendGroupMessage 发送群消息 func (bot *CQBot) SendGroupMessage(groupID int64, m *message.SendingMessage) int32 { newElem := make([]message.IMessageElement, 0, len(m.Elements)) @@ -514,10 +561,6 @@ func (bot *CQBot) InsertGuildChannelMessage(m *message.GuildChannelMessage) stri return msg.ID } -// Release 释放Bot实例 -func (bot *CQBot) Release() { -} - func (bot *CQBot) dispatchEventMessage(m global.MSG) { bot.lock.RLock() defer bot.lock.RUnlock()