From 624a7e4101e70bea2fd6e77da1cd9db2a264c153 Mon Sep 17 00:00:00 2001 From: wdvxdr Date: Thu, 29 Jul 2021 19:22:18 +0800 Subject: [PATCH] feat: upload waiter. --- client/decoders.go | 10 +++++----- client/group_file.go | 14 +++----------- client/http_api.go | 4 ++-- client/image.go | 28 ++++++++++++++++++++-------- client/ptt.go | 17 +++++++++++++++++ utils/http.go | 9 ++++----- utils/string.go | 2 +- utils/waiter.go | 41 +++++++++++++++++++++++++++++++++++++++++ 8 files changed, 93 insertions(+), 32 deletions(-) create mode 100644 utils/waiter.go diff --git a/client/decoders.go b/client/decoders.go index 7e320af4..cc36e191 100644 --- a/client/decoders.go +++ b/client/decoders.go @@ -605,30 +605,30 @@ func decodeOffPicUpResponse(_ *QQClient, _ *incomingPacketInfo, payload []byte) return nil, errors.Wrap(err, "failed to unmarshal protobuf message") } if rsp.GetFailMsg() != nil { - return imageUploadResponse{ + return &imageUploadResponse{ ResultCode: -1, Message: string(rsp.FailMsg), }, nil } if rsp.GetSubcmd() != 1 || len(rsp.GetTryupImgRsp()) == 0 { - return imageUploadResponse{ + return &imageUploadResponse{ ResultCode: -2, }, nil } imgRsp := rsp.GetTryupImgRsp()[0] if imgRsp.GetResult() != 0 { - return imageUploadResponse{ + return &imageUploadResponse{ ResultCode: int32(*imgRsp.Result), Message: string(imgRsp.GetFailMsg()), }, nil } if imgRsp.GetFileExit() { - return imageUploadResponse{ + return &imageUploadResponse{ IsExists: true, ResourceId: string(imgRsp.GetUpResid()), }, nil } - return imageUploadResponse{ + return &imageUploadResponse{ ResourceId: string(imgRsp.GetUpResid()), UploadKey: imgRsp.GetUpUkey(), UploadIp: imgRsp.GetUpIp(), diff --git a/client/group_file.go b/client/group_file.go index 35ba0107..5a7d7ea0 100644 --- a/client/group_file.go +++ b/client/group_file.go @@ -8,7 +8,6 @@ import ( "math/rand" "os" "runtime/debug" - "sync" "google.golang.org/protobuf/proto" @@ -56,7 +55,7 @@ type ( } ) -var fileSingleFlight = sync.Map{} +var fsWaiter = utils.NewUploadWaiter() func init() { decoders["OidbSvc.0x6d8_1"] = decodeOIDB6d81Response @@ -164,15 +163,8 @@ func (fs *GroupFileSystem) GetFilesByFolder(folderID string) ([]*GroupFile, []*G func (fs *GroupFileSystem) UploadFile(p, name, folderId string) error { // 同文件等待其他线程上传 - if wg, ok := fileSingleFlight.Load(p); ok { - wg.(*sync.WaitGroup).Wait() - } else { - wg := &sync.WaitGroup{} - wg.Add(1) - fileSingleFlight.Store(p, wg) - defer wg.Done() - defer fileSingleFlight.Delete(p) - } + fsWaiter.Wait(p) + defer fsWaiter.Done(p) file, err := os.OpenFile(p, os.O_RDONLY, 0o666) if err != nil { diff --git a/client/http_api.go b/client/http_api.go index b4ae2ff9..5aaa2d0e 100644 --- a/client/http_api.go +++ b/client/http_api.go @@ -4,7 +4,7 @@ import ( "bytes" "fmt" "html" - "io/ioutil" + "io" "mime/multipart" "net/http" "net/textproto" @@ -218,7 +218,7 @@ func (c *QQClient) uploadGroupNoticePic(img []byte) (*noticeImage, error) { return nil, errors.Wrap(err, "post error") } defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) if err != nil { return nil, errors.Wrap(err, "read body error") } diff --git a/client/image.go b/client/image.go index cfdf854c..6a7373fe 100644 --- a/client/image.go +++ b/client/image.go @@ -29,16 +29,23 @@ func init() { decoders["OidbSvc.0xe07_0"] = decodeImageOcrResponse } +var imgWaiter = utils.NewUploadWaiter() + func (c *QQClient) UploadGroupImage(groupCode int64, img io.ReadSeeker) (*message.GroupImageElement, error) { _, _ = img.Seek(0, io.SeekStart) // safe fh, length := utils.ComputeMd5AndLength(img) _, _ = img.Seek(0, io.SeekStart) + + key := hex.EncodeToString(fh) + imgWaiter.Wait(key) + defer imgWaiter.Done(key) + seq, pkt := c.buildGroupImageStorePacket(groupCode, fh, int32(length)) r, err := c.sendAndWait(seq, pkt) if err != nil { return nil, err } - rsp := r.(imageUploadResponse) + rsp := r.(*imageUploadResponse) if rsp.ResultCode != 0 { return nil, errors.New(rsp.Message) } @@ -74,12 +81,17 @@ func (c *QQClient) UploadGroupImageByFile(groupCode int64, path string) (*messag } defer func() { _ = img.Close() }() fh, length := utils.ComputeMd5AndLength(img) + + key := hex.EncodeToString(fh) + imgWaiter.Wait(key) + defer imgWaiter.Done(key) + seq, pkt := c.buildGroupImageStorePacket(groupCode, fh, int32(length)) r, err := c.sendAndWait(seq, pkt) if err != nil { return nil, err } - rsp := r.(imageUploadResponse) + rsp := r.(*imageUploadResponse) if rsp.ResultCode != 0 { return nil, errors.New(rsp.Message) } @@ -159,7 +171,7 @@ func (c *QQClient) QueryGroupImage(groupCode int64, hash []byte, size int32) (*m if err != nil { return nil, err } - rsp := r.(imageUploadResponse) + rsp := r.(*imageUploadResponse) if rsp.ResultCode != 0 { return nil, errors.New(rsp.Message) } @@ -174,7 +186,7 @@ func (c *QQClient) QueryFriendImage(target int64, hash []byte, size int32) (*mes if err != nil { return nil, err } - rsp := i.(imageUploadResponse) + rsp := i.(*imageUploadResponse) if rsp.ResultCode != 0 { return nil, errors.New(rsp.Message) } @@ -272,18 +284,18 @@ func decodeGroupImageStoreResponse(_ *QQClient, _ *incomingPacketInfo, payload [ } rsp := pkt.MsgTryUpImgRsp[0] if rsp.Result != 0 { - return imageUploadResponse{ + return &imageUploadResponse{ ResultCode: rsp.Result, Message: rsp.FailMsg, }, nil } if rsp.BoolFileExit { if rsp.MsgImgInfo != nil { - return imageUploadResponse{IsExists: true, FileId: rsp.Fid, Width: rsp.MsgImgInfo.FileWidth, Height: rsp.MsgImgInfo.FileHeight}, nil + return &imageUploadResponse{IsExists: true, FileId: rsp.Fid, Width: rsp.MsgImgInfo.FileWidth, Height: rsp.MsgImgInfo.FileHeight}, nil } - return imageUploadResponse{IsExists: true, FileId: rsp.Fid}, nil + return &imageUploadResponse{IsExists: true, FileId: rsp.Fid}, nil } - return imageUploadResponse{ + return &imageUploadResponse{ FileId: rsp.Fid, UploadKey: rsp.UpUkey, UploadIp: rsp.Uint32UpIp, diff --git a/client/ptt.go b/client/ptt.go index bb911f48..250472a5 100644 --- a/client/ptt.go +++ b/client/ptt.go @@ -24,12 +24,19 @@ func init() { decoders["PttCenterSvr.GroupShortVideoUpReq"] = decodeGroupShortVideoUploadResponse } +var pttWaiter = utils.NewUploadWaiter() + // UploadGroupPtt 将语音数据使用群语音通道上传到服务器, 返回 message.GroupVoiceElement 可直接发送 func (c *QQClient) UploadGroupPtt(groupCode int64, voice io.ReadSeeker) (*message.GroupVoiceElement, error) { h := md5.New() length, _ := io.Copy(h, voice) fh := h.Sum(nil) _, _ = voice.Seek(0, io.SeekStart) + + key := hex.EncodeToString(fh) + pttWaiter.Wait(key) + defer pttWaiter.Done(key) + ext := c.buildGroupPttStoreBDHExt(groupCode, fh[:], int32(length), 0, int32(length)) rsp, err := c.highwayUploadByBDH(voice, length, 29, c.bigDataSession.SigSession, fh, ext, false) if err != nil { @@ -65,6 +72,11 @@ func (c *QQClient) UploadPrivatePtt(target int64, voice io.ReadSeeker) (*message length, _ := io.Copy(h, voice) fh := h.Sum(nil) _, _ = voice.Seek(0, io.SeekStart) + + key := hex.EncodeToString(fh) + pttWaiter.Wait(key) + defer pttWaiter.Done(key) + ext := c.buildC2CPttStoreBDHExt(target, fh[:], int32(length), int32(length)) rsp, err := c.highwayUploadByBDH(voice, length, 26, c.bigDataSession.SigSession, fh, ext, false) if err != nil { @@ -103,6 +115,11 @@ func (c *QQClient) UploadGroupShortVideo(groupCode int64, video, thumb io.ReadSe if len(combinedCache) > 0 { cache = combinedCache[0] } + + key := string(videoHash) + string(thumbHash) + pttWaiter.Wait(key) + defer pttWaiter.Done(key) + i, err := c.sendAndWait(c.buildPttGroupShortVideoUploadReqPacket(videoHash, thumbHash, groupCode, videoLen, thumbLen)) if err != nil { return nil, errors.Wrap(err, "upload req error") diff --git a/utils/http.go b/utils/http.go index 0b0f2f84..fb0735be 100644 --- a/utils/http.go +++ b/utils/http.go @@ -4,7 +4,6 @@ import ( "bytes" "compress/gzip" "io" - "io/ioutil" "net/http" "strings" ) @@ -40,7 +39,7 @@ func HttpPostBytes(url string, data []byte) ([]byte, error) { return nil, err } defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) if err != nil { return nil, err } @@ -48,7 +47,7 @@ func HttpPostBytes(url string, data []byte) ([]byte, error) { buffer := bytes.NewBuffer(body) r, _ := gzip.NewReader(buffer) defer r.Close() - unCom, err := ioutil.ReadAll(r) + unCom, err := io.ReadAll(r) return unCom, err } return body, nil @@ -73,7 +72,7 @@ func HttpPostBytesWithCookie(url string, data []byte, cookie string, contentType return nil, err } defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) if err != nil { return nil, err } @@ -81,7 +80,7 @@ func HttpPostBytesWithCookie(url string, data []byte, cookie string, contentType buffer := bytes.NewBuffer(body) r, _ := gzip.NewReader(buffer) defer r.Close() - unCom, err := ioutil.ReadAll(r) + unCom, err := io.ReadAll(r) return unCom, err } return body, nil diff --git a/utils/string.go b/utils/string.go index 7ee0e68f..0078e82b 100644 --- a/utils/string.go +++ b/utils/string.go @@ -31,7 +31,7 @@ func ChunkString(s string, chunkSize int) []string { chunkLen++ } - var chunks = make([]string, 0, chunkLen) + chunks := make([]string, 0, chunkLen) for i := 0; i < len(runes); i += chunkSize { nn := i + chunkSize if nn > len(runes) { diff --git a/utils/waiter.go b/utils/waiter.go new file mode 100644 index 00000000..b472e756 --- /dev/null +++ b/utils/waiter.go @@ -0,0 +1,41 @@ +package utils + +import "sync" + +// UploadWaiter 用于控制并发上传,当有一个文件多次上传时, +// 等待第一个上传,后续的上传并发进行(可以秒传). +type UploadWaiter struct { + mu sync.Mutex + m map[string]*sync.WaitGroup +} + +// NewUploadWaiter return a new UploadWaiter. +func NewUploadWaiter() *UploadWaiter { + return &UploadWaiter{ + m: make(map[string]*sync.WaitGroup), + } +} + +// Wait 如果不是第一个上传则等待。 +func (s *UploadWaiter) Wait(key string) { + s.mu.Lock() + if w, ok := s.m[key]; ok { + s.mu.Unlock() + w.Wait() + } else { + wg := new(sync.WaitGroup) + wg.Add(1) + s.m[key] = wg + s.mu.Unlock() + } +} + +// Done 当前上传任务已完成。 +func (s *UploadWaiter) Done(key string) { + s.mu.Lock() + if w, ok := s.m[key]; ok { + w.Done() + delete(s.m, key) + } + s.mu.Unlock() +}