1
0
mirror of https://github.com/Mrs4s/go-cqhttp.git synced 2025-06-19 05:55:04 +08:00

coolq: allow upload media concurrently for normal message

This commit is contained in:
wdvxdr 2022-02-22 14:45:50 +08:00
parent d1f143ebf7
commit 987daad785
No known key found for this signature in database
GPG Key ID: 703F8C071DE7A1B6
2 changed files with 84 additions and 52 deletions

View File

@ -12,7 +12,6 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"time"
"github.com/segmentio/asm/base64"
@ -793,32 +792,27 @@ func (bot *CQBot) uploadForwardElement(m gjson.Result, groupID int64) *message.F
fm := message.NewForwardMessage()
source := message.Source{SourceType: message.SourceGroup, PrimaryID: groupID}
var lazyUpload []func()
var wg sync.WaitGroup
var w worker
resolveElement := func(elems []message.IMessageElement) []message.IMessageElement {
for i, elem := range elems {
iescape := i
p := &elems[i]
switch o := elem.(type) {
case *LocalVideoElement:
wg.Add(1)
lazyUpload = append(lazyUpload, func() {
defer wg.Done()
w.do(func() {
gm, err := bot.uploadLocalVideo(source, o)
if err != nil {
log.Warnf("警告: 群 %d %s上传失败: %v", groupID, o.Type().String(), err)
log.Warnf(uploadFailedTemplate, "群", groupID, "视频", err)
} else {
elems[iescape] = gm
*p = gm
}
})
case *LocalImageElement:
wg.Add(1)
lazyUpload = append(lazyUpload, func() {
defer wg.Done()
w.do(func() {
gm, err := bot.uploadLocalImage(source, o)
if err != nil {
log.Warnf("警告: 群 %d %s上传失败: %v", groupID, o.Type().String(), err)
log.Warnf(uploadFailedTemplate, "群", groupID, "图片", err)
} else {
elems[iescape] = gm
*p = gm
}
})
}
@ -903,12 +897,7 @@ func (bot *CQBot) uploadForwardElement(m gjson.Result, groupID int64) *message.F
fm.AddNode(node)
}
}
for _, upload := range lazyUpload {
go upload()
}
wg.Wait()
w.wait()
return bot.Client.UploadGroupForwardMessage(groupID, fm)
}

View File

@ -129,6 +129,22 @@ func (bot *CQBot) OnEventPush(f func(e *Event)) {
bot.lock.Unlock()
}
type worker struct {
wg sync.WaitGroup
}
func (w *worker) do(f func()) {
w.wg.Add(1)
go func() {
defer w.wg.Done()
f()
}()
}
func (w *worker) wait() {
w.wg.Wait()
}
// uploadLocalImage 上传本地图片
func (bot *CQBot) uploadLocalImage(target message.Source, img *LocalImageElement) (i message.IMessageElement, err error) {
if img.File != "" {
@ -163,23 +179,26 @@ func (bot *CQBot) uploadLocalVideo(target message.Source, v *LocalVideoElement)
return bot.Client.UploadShortVideo(target, video, v.thumb, 4)
}
func (bot *CQBot) uploadMedia(target message.Source, elements []message.IMessageElement) []message.IMessageElement {
func removeLocalElement(elements []message.IMessageElement) []message.IMessageElement {
var j int
for _, m := range elements {
raw := m // upload failed will make m nil, so copy it
var err error
switch e := m.(type) {
case *LocalImageElement:
m, err = bot.uploadLocalImage(target, e)
case *message.VoiceElement:
if target.SourceType == message.SourceGuildChannel {
continue // todo
for i, e := range elements {
switch e.(type) {
case *LocalImageElement, *LocalVideoElement:
case *message.VoiceElement: // 未上传的语音消息, 也删除
default:
if j < i {
elements[j] = e
}
m, err = bot.Client.UploadVoice(target, bytes.NewReader(e.Data))
case *LocalVideoElement:
m, err = bot.uploadLocalVideo(target, e)
j++
}
if err != nil {
}
return elements[:j]
}
const uploadFailedTemplate = "警告: %s %d %s上传失败: %v"
func (bot *CQBot) uploadMedia(target message.Source, elements []message.IMessageElement) []message.IMessageElement {
var w worker
var source string
switch target.SourceType { // nolint:exhaustive
case message.SourceGroup:
@ -189,13 +208,41 @@ func (bot *CQBot) uploadMedia(target message.Source, elements []message.IMessage
case message.SourceGuildChannel:
source = "频道"
}
log.Warnf("警告: %s %d %s上传失败: %v", source, target.PrimaryID, raw.Type().String(), err)
continue
for i, m := range elements {
p := &elements[i]
switch e := m.(type) {
case *LocalImageElement:
w.do(func() {
m, err := bot.uploadLocalImage(target, e)
if err != nil {
log.Warnf(uploadFailedTemplate, source, target.PrimaryID, "图片", err)
} else {
*p = m
}
elements[j] = m
j++
})
case *message.VoiceElement:
w.do(func() {
m, err := bot.Client.UploadVoice(target, bytes.NewReader(e.Data))
if err != nil {
log.Warnf(uploadFailedTemplate, source, target.PrimaryID, "语音", err)
} else {
*p = m
}
return elements[:j]
})
case *LocalVideoElement:
w.do(func() {
m, err := bot.uploadLocalVideo(target, e)
if err != nil {
log.Warnf(uploadFailedTemplate, source, target.PrimaryID, "视频", err)
} else {
*p = m
}
})
}
}
w.wait()
return removeLocalElement(elements)
}
// SendGroupMessage 发送群消息
@ -514,10 +561,6 @@ func (bot *CQBot) InsertGuildChannelMessage(m *message.GuildChannelMessage) stri
return msg.ID
}
// Release 释放Bot实例
func (bot *CQBot) Release() {
}
func (bot *CQBot) dispatchEventMessage(m global.MSG) {
bot.lock.RLock()
defer bot.lock.RUnlock()