1
0
mirror of https://github.com/Mrs4s/MiraiGo.git synced 2025-06-18 21:45:04 +08:00

feat: upload waiter.

This commit is contained in:
wdvxdr 2021-07-29 19:22:18 +08:00
parent 1d68826cef
commit 624a7e4101
No known key found for this signature in database
GPG Key ID: 703F8C071DE7A1B6
8 changed files with 93 additions and 32 deletions

View File

@ -605,30 +605,30 @@ func decodeOffPicUpResponse(_ *QQClient, _ *incomingPacketInfo, payload []byte)
return nil, errors.Wrap(err, "failed to unmarshal protobuf message") return nil, errors.Wrap(err, "failed to unmarshal protobuf message")
} }
if rsp.GetFailMsg() != nil { if rsp.GetFailMsg() != nil {
return imageUploadResponse{ return &imageUploadResponse{
ResultCode: -1, ResultCode: -1,
Message: string(rsp.FailMsg), Message: string(rsp.FailMsg),
}, nil }, nil
} }
if rsp.GetSubcmd() != 1 || len(rsp.GetTryupImgRsp()) == 0 { if rsp.GetSubcmd() != 1 || len(rsp.GetTryupImgRsp()) == 0 {
return imageUploadResponse{ return &imageUploadResponse{
ResultCode: -2, ResultCode: -2,
}, nil }, nil
} }
imgRsp := rsp.GetTryupImgRsp()[0] imgRsp := rsp.GetTryupImgRsp()[0]
if imgRsp.GetResult() != 0 { if imgRsp.GetResult() != 0 {
return imageUploadResponse{ return &imageUploadResponse{
ResultCode: int32(*imgRsp.Result), ResultCode: int32(*imgRsp.Result),
Message: string(imgRsp.GetFailMsg()), Message: string(imgRsp.GetFailMsg()),
}, nil }, nil
} }
if imgRsp.GetFileExit() { if imgRsp.GetFileExit() {
return imageUploadResponse{ return &imageUploadResponse{
IsExists: true, IsExists: true,
ResourceId: string(imgRsp.GetUpResid()), ResourceId: string(imgRsp.GetUpResid()),
}, nil }, nil
} }
return imageUploadResponse{ return &imageUploadResponse{
ResourceId: string(imgRsp.GetUpResid()), ResourceId: string(imgRsp.GetUpResid()),
UploadKey: imgRsp.GetUpUkey(), UploadKey: imgRsp.GetUpUkey(),
UploadIp: imgRsp.GetUpIp(), UploadIp: imgRsp.GetUpIp(),

View File

@ -8,7 +8,6 @@ import (
"math/rand" "math/rand"
"os" "os"
"runtime/debug" "runtime/debug"
"sync"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
@ -56,7 +55,7 @@ type (
} }
) )
var fileSingleFlight = sync.Map{} var fsWaiter = utils.NewUploadWaiter()
func init() { func init() {
decoders["OidbSvc.0x6d8_1"] = decodeOIDB6d81Response decoders["OidbSvc.0x6d8_1"] = decodeOIDB6d81Response
@ -164,15 +163,8 @@ func (fs *GroupFileSystem) GetFilesByFolder(folderID string) ([]*GroupFile, []*G
func (fs *GroupFileSystem) UploadFile(p, name, folderId string) error { func (fs *GroupFileSystem) UploadFile(p, name, folderId string) error {
// 同文件等待其他线程上传 // 同文件等待其他线程上传
if wg, ok := fileSingleFlight.Load(p); ok { fsWaiter.Wait(p)
wg.(*sync.WaitGroup).Wait() defer fsWaiter.Done(p)
} else {
wg := &sync.WaitGroup{}
wg.Add(1)
fileSingleFlight.Store(p, wg)
defer wg.Done()
defer fileSingleFlight.Delete(p)
}
file, err := os.OpenFile(p, os.O_RDONLY, 0o666) file, err := os.OpenFile(p, os.O_RDONLY, 0o666)
if err != nil { if err != nil {

View File

@ -4,7 +4,7 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"html" "html"
"io/ioutil" "io"
"mime/multipart" "mime/multipart"
"net/http" "net/http"
"net/textproto" "net/textproto"
@ -218,7 +218,7 @@ func (c *QQClient) uploadGroupNoticePic(img []byte) (*noticeImage, error) {
return nil, errors.Wrap(err, "post error") return nil, errors.Wrap(err, "post error")
} }
defer resp.Body.Close() defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "read body error") return nil, errors.Wrap(err, "read body error")
} }

View File

@ -29,16 +29,23 @@ func init() {
decoders["OidbSvc.0xe07_0"] = decodeImageOcrResponse decoders["OidbSvc.0xe07_0"] = decodeImageOcrResponse
} }
var imgWaiter = utils.NewUploadWaiter()
func (c *QQClient) UploadGroupImage(groupCode int64, img io.ReadSeeker) (*message.GroupImageElement, error) { func (c *QQClient) UploadGroupImage(groupCode int64, img io.ReadSeeker) (*message.GroupImageElement, error) {
_, _ = img.Seek(0, io.SeekStart) // safe _, _ = img.Seek(0, io.SeekStart) // safe
fh, length := utils.ComputeMd5AndLength(img) fh, length := utils.ComputeMd5AndLength(img)
_, _ = img.Seek(0, io.SeekStart) _, _ = img.Seek(0, io.SeekStart)
key := hex.EncodeToString(fh)
imgWaiter.Wait(key)
defer imgWaiter.Done(key)
seq, pkt := c.buildGroupImageStorePacket(groupCode, fh, int32(length)) seq, pkt := c.buildGroupImageStorePacket(groupCode, fh, int32(length))
r, err := c.sendAndWait(seq, pkt) r, err := c.sendAndWait(seq, pkt)
if err != nil { if err != nil {
return nil, err return nil, err
} }
rsp := r.(imageUploadResponse) rsp := r.(*imageUploadResponse)
if rsp.ResultCode != 0 { if rsp.ResultCode != 0 {
return nil, errors.New(rsp.Message) return nil, errors.New(rsp.Message)
} }
@ -74,12 +81,17 @@ func (c *QQClient) UploadGroupImageByFile(groupCode int64, path string) (*messag
} }
defer func() { _ = img.Close() }() defer func() { _ = img.Close() }()
fh, length := utils.ComputeMd5AndLength(img) fh, length := utils.ComputeMd5AndLength(img)
key := hex.EncodeToString(fh)
imgWaiter.Wait(key)
defer imgWaiter.Done(key)
seq, pkt := c.buildGroupImageStorePacket(groupCode, fh, int32(length)) seq, pkt := c.buildGroupImageStorePacket(groupCode, fh, int32(length))
r, err := c.sendAndWait(seq, pkt) r, err := c.sendAndWait(seq, pkt)
if err != nil { if err != nil {
return nil, err return nil, err
} }
rsp := r.(imageUploadResponse) rsp := r.(*imageUploadResponse)
if rsp.ResultCode != 0 { if rsp.ResultCode != 0 {
return nil, errors.New(rsp.Message) return nil, errors.New(rsp.Message)
} }
@ -159,7 +171,7 @@ func (c *QQClient) QueryGroupImage(groupCode int64, hash []byte, size int32) (*m
if err != nil { if err != nil {
return nil, err return nil, err
} }
rsp := r.(imageUploadResponse) rsp := r.(*imageUploadResponse)
if rsp.ResultCode != 0 { if rsp.ResultCode != 0 {
return nil, errors.New(rsp.Message) return nil, errors.New(rsp.Message)
} }
@ -174,7 +186,7 @@ func (c *QQClient) QueryFriendImage(target int64, hash []byte, size int32) (*mes
if err != nil { if err != nil {
return nil, err return nil, err
} }
rsp := i.(imageUploadResponse) rsp := i.(*imageUploadResponse)
if rsp.ResultCode != 0 { if rsp.ResultCode != 0 {
return nil, errors.New(rsp.Message) return nil, errors.New(rsp.Message)
} }
@ -272,18 +284,18 @@ func decodeGroupImageStoreResponse(_ *QQClient, _ *incomingPacketInfo, payload [
} }
rsp := pkt.MsgTryUpImgRsp[0] rsp := pkt.MsgTryUpImgRsp[0]
if rsp.Result != 0 { if rsp.Result != 0 {
return imageUploadResponse{ return &imageUploadResponse{
ResultCode: rsp.Result, ResultCode: rsp.Result,
Message: rsp.FailMsg, Message: rsp.FailMsg,
}, nil }, nil
} }
if rsp.BoolFileExit { if rsp.BoolFileExit {
if rsp.MsgImgInfo != nil { if rsp.MsgImgInfo != nil {
return imageUploadResponse{IsExists: true, FileId: rsp.Fid, Width: rsp.MsgImgInfo.FileWidth, Height: rsp.MsgImgInfo.FileHeight}, nil return &imageUploadResponse{IsExists: true, FileId: rsp.Fid, Width: rsp.MsgImgInfo.FileWidth, Height: rsp.MsgImgInfo.FileHeight}, nil
} }
return imageUploadResponse{IsExists: true, FileId: rsp.Fid}, nil return &imageUploadResponse{IsExists: true, FileId: rsp.Fid}, nil
} }
return imageUploadResponse{ return &imageUploadResponse{
FileId: rsp.Fid, FileId: rsp.Fid,
UploadKey: rsp.UpUkey, UploadKey: rsp.UpUkey,
UploadIp: rsp.Uint32UpIp, UploadIp: rsp.Uint32UpIp,

View File

@ -24,12 +24,19 @@ func init() {
decoders["PttCenterSvr.GroupShortVideoUpReq"] = decodeGroupShortVideoUploadResponse decoders["PttCenterSvr.GroupShortVideoUpReq"] = decodeGroupShortVideoUploadResponse
} }
var pttWaiter = utils.NewUploadWaiter()
// UploadGroupPtt 将语音数据使用群语音通道上传到服务器, 返回 message.GroupVoiceElement 可直接发送 // UploadGroupPtt 将语音数据使用群语音通道上传到服务器, 返回 message.GroupVoiceElement 可直接发送
func (c *QQClient) UploadGroupPtt(groupCode int64, voice io.ReadSeeker) (*message.GroupVoiceElement, error) { func (c *QQClient) UploadGroupPtt(groupCode int64, voice io.ReadSeeker) (*message.GroupVoiceElement, error) {
h := md5.New() h := md5.New()
length, _ := io.Copy(h, voice) length, _ := io.Copy(h, voice)
fh := h.Sum(nil) fh := h.Sum(nil)
_, _ = voice.Seek(0, io.SeekStart) _, _ = voice.Seek(0, io.SeekStart)
key := hex.EncodeToString(fh)
pttWaiter.Wait(key)
defer pttWaiter.Done(key)
ext := c.buildGroupPttStoreBDHExt(groupCode, fh[:], int32(length), 0, int32(length)) ext := c.buildGroupPttStoreBDHExt(groupCode, fh[:], int32(length), 0, int32(length))
rsp, err := c.highwayUploadByBDH(voice, length, 29, c.bigDataSession.SigSession, fh, ext, false) rsp, err := c.highwayUploadByBDH(voice, length, 29, c.bigDataSession.SigSession, fh, ext, false)
if err != nil { if err != nil {
@ -65,6 +72,11 @@ func (c *QQClient) UploadPrivatePtt(target int64, voice io.ReadSeeker) (*message
length, _ := io.Copy(h, voice) length, _ := io.Copy(h, voice)
fh := h.Sum(nil) fh := h.Sum(nil)
_, _ = voice.Seek(0, io.SeekStart) _, _ = voice.Seek(0, io.SeekStart)
key := hex.EncodeToString(fh)
pttWaiter.Wait(key)
defer pttWaiter.Done(key)
ext := c.buildC2CPttStoreBDHExt(target, fh[:], int32(length), int32(length)) ext := c.buildC2CPttStoreBDHExt(target, fh[:], int32(length), int32(length))
rsp, err := c.highwayUploadByBDH(voice, length, 26, c.bigDataSession.SigSession, fh, ext, false) rsp, err := c.highwayUploadByBDH(voice, length, 26, c.bigDataSession.SigSession, fh, ext, false)
if err != nil { if err != nil {
@ -103,6 +115,11 @@ func (c *QQClient) UploadGroupShortVideo(groupCode int64, video, thumb io.ReadSe
if len(combinedCache) > 0 { if len(combinedCache) > 0 {
cache = combinedCache[0] cache = combinedCache[0]
} }
key := string(videoHash) + string(thumbHash)
pttWaiter.Wait(key)
defer pttWaiter.Done(key)
i, err := c.sendAndWait(c.buildPttGroupShortVideoUploadReqPacket(videoHash, thumbHash, groupCode, videoLen, thumbLen)) i, err := c.sendAndWait(c.buildPttGroupShortVideoUploadReqPacket(videoHash, thumbHash, groupCode, videoLen, thumbLen))
if err != nil { if err != nil {
return nil, errors.Wrap(err, "upload req error") return nil, errors.Wrap(err, "upload req error")

View File

@ -4,7 +4,6 @@ import (
"bytes" "bytes"
"compress/gzip" "compress/gzip"
"io" "io"
"io/ioutil"
"net/http" "net/http"
"strings" "strings"
) )
@ -40,7 +39,7 @@ func HttpPostBytes(url string, data []byte) ([]byte, error) {
return nil, err return nil, err
} }
defer resp.Body.Close() defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -48,7 +47,7 @@ func HttpPostBytes(url string, data []byte) ([]byte, error) {
buffer := bytes.NewBuffer(body) buffer := bytes.NewBuffer(body)
r, _ := gzip.NewReader(buffer) r, _ := gzip.NewReader(buffer)
defer r.Close() defer r.Close()
unCom, err := ioutil.ReadAll(r) unCom, err := io.ReadAll(r)
return unCom, err return unCom, err
} }
return body, nil return body, nil
@ -73,7 +72,7 @@ func HttpPostBytesWithCookie(url string, data []byte, cookie string, contentType
return nil, err return nil, err
} }
defer resp.Body.Close() defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -81,7 +80,7 @@ func HttpPostBytesWithCookie(url string, data []byte, cookie string, contentType
buffer := bytes.NewBuffer(body) buffer := bytes.NewBuffer(body)
r, _ := gzip.NewReader(buffer) r, _ := gzip.NewReader(buffer)
defer r.Close() defer r.Close()
unCom, err := ioutil.ReadAll(r) unCom, err := io.ReadAll(r)
return unCom, err return unCom, err
} }
return body, nil return body, nil

View File

@ -31,7 +31,7 @@ func ChunkString(s string, chunkSize int) []string {
chunkLen++ chunkLen++
} }
var chunks = make([]string, 0, chunkLen) chunks := make([]string, 0, chunkLen)
for i := 0; i < len(runes); i += chunkSize { for i := 0; i < len(runes); i += chunkSize {
nn := i + chunkSize nn := i + chunkSize
if nn > len(runes) { if nn > len(runes) {

41
utils/waiter.go Normal file
View File

@ -0,0 +1,41 @@
package utils
import "sync"
// UploadWaiter 用于控制并发上传,当有一个文件多次上传时,
// 等待第一个上传,后续的上传并发进行(可以秒传).
type UploadWaiter struct {
mu sync.Mutex
m map[string]*sync.WaitGroup
}
// NewUploadWaiter return a new UploadWaiter.
func NewUploadWaiter() *UploadWaiter {
return &UploadWaiter{
m: make(map[string]*sync.WaitGroup),
}
}
// Wait 如果不是第一个上传则等待。
func (s *UploadWaiter) Wait(key string) {
s.mu.Lock()
if w, ok := s.m[key]; ok {
s.mu.Unlock()
w.Wait()
} else {
wg := new(sync.WaitGroup)
wg.Add(1)
s.m[key] = wg
s.mu.Unlock()
}
}
// Done 当前上传任务已完成。
func (s *UploadWaiter) Done(key string) {
s.mu.Lock()
if w, ok := s.m[key]; ok {
w.Done()
delete(s.m, key)
}
s.mu.Unlock()
}