From 5e8a5126989dde26ce77d1603775957fd027dad1 Mon Sep 17 00:00:00 2001 From: wdvxdr Date: Sun, 20 Feb 2022 23:26:07 +0800 Subject: [PATCH] client: allow multi-thread upload without cache file Delete `UploadGroupImageByFile` since `UploadGroupImage` can multi-thread upload --- client/image.go | 83 +++++++++------------------- client/internal/highway/bdh.go | 99 ++++++++++++++++++---------------- client/ptt.go | 50 ++++++++--------- utils/sys.go | 53 ++++++++++++++++++ 4 files changed, 153 insertions(+), 132 deletions(-) diff --git a/client/image.go b/client/image.go index 808ae5d9..a382ce13 100644 --- a/client/image.go +++ b/client/image.go @@ -5,7 +5,6 @@ import ( "encoding/hex" "io" "math/rand" - "os" "strings" "time" @@ -44,7 +43,7 @@ type imageUploadResponse struct { IsExists bool } -func (c *QQClient) UploadGroupImage(groupCode int64, img io.ReadSeeker) (*message.GroupImageElement, error) { +func (c *QQClient) UploadGroupImage(groupCode int64, img io.ReadSeeker, thread ...int) (*message.GroupImageElement, error) { _, _ = img.Seek(0, io.SeekStart) // safe fh, length := utils.ComputeMd5AndLength(img) _, _ = img.Seek(0, io.SeekStart) @@ -53,54 +52,10 @@ func (c *QQClient) UploadGroupImage(groupCode int64, img io.ReadSeeker) (*messag imgWaiter.Wait(key) defer imgWaiter.Done(key) - seq, pkt := c.buildGroupImageStorePacket(groupCode, fh, int32(length)) - r, err := c.sendAndWait(seq, pkt) - if err != nil { - return nil, err + tc := 1 + if len(thread) > 0 { + tc = thread[0] } - rsp := r.(*imageUploadResponse) - if rsp.ResultCode != 0 { - return nil, errors.New(rsp.Message) - } - if rsp.IsExists { - goto ok - } - if c.highwaySession.AddrLength() == 0 { - for i, addr := range rsp.UploadIp { - c.highwaySession.AppendAddr(addr, rsp.UploadPort[i]) - } - } - if _, err = c.highwaySession.UploadBDH(highway.BdhInput{ - CommandID: 2, - Body: img, - Ticket: rsp.UploadKey, - Ext: EmptyBytes, - Encrypt: false, - }); err == nil { - goto ok - } - return nil, errors.Wrap(err, "upload failed") -ok: - _, _ = img.Seek(0, io.SeekStart) - i, t, _ := imgsz.DecodeSize(img) - var imageType int32 = 1000 - if t == "gif" { - imageType = 2000 - } - return message.NewGroupImage(binary.CalculateImageResourceId(fh), fh, rsp.FileId, int32(length), int32(i.Width), int32(i.Height), imageType), nil -} - -func (c *QQClient) UploadGroupImageByFile(groupCode int64, path string) (*message.GroupImageElement, error) { - img, err := os.OpenFile(path, os.O_RDONLY, 0o666) - if err != nil { - return nil, err - } - defer func() { _ = img.Close() }() - fh, length := utils.ComputeMd5AndLength(img) - - key := hex.EncodeToString(fh) - imgWaiter.Wait(key) - defer imgWaiter.Done(key) seq, pkt := c.buildGroupImageStorePacket(groupCode, fh, int32(length)) r, err := c.sendAndWait(seq, pkt) @@ -120,16 +75,28 @@ func (c *QQClient) UploadGroupImageByFile(groupCode int64, path string) (*messag } } - if _, err = c.highwaySession.UploadBDHMultiThread(highway.BdhInput{ - CommandID: 2, - File: path, - Ticket: rsp.UploadKey, - Ext: EmptyBytes, - Encrypt: false, - }, 4); err == nil { - goto ok + if tc > 1 && length > 3*1024*1024 { + _, err = c.highwaySession.UploadBDHMultiThread(highway.BdhMultiThreadInput{ + CommandID: 2, + Body: utils.ReaderAtFrom2ReadSeeker(img, nil), + Size: length, + Sum: fh, + Ticket: rsp.UploadKey, + Ext: EmptyBytes, + Encrypt: false, + }, 4) + } else { + _, err = c.highwaySession.UploadBDH(highway.BdhInput{ + CommandID: 2, + Body: img, + Ticket: rsp.UploadKey, + Ext: EmptyBytes, + Encrypt: false, + }) + } + if err != nil { + return nil, errors.Wrap(err, "upload failed") } - return nil, errors.Wrap(err, "upload failed") ok: _, _ = img.Seek(0, io.SeekStart) i, t, _ := imgsz.DecodeSize(img) diff --git a/client/internal/highway/bdh.go b/client/internal/highway/bdh.go index 79a99f82..7491e588 100644 --- a/client/internal/highway/bdh.go +++ b/client/internal/highway/bdh.go @@ -4,7 +4,6 @@ import ( "crypto/md5" "io" "net" - "os" "sync" "sync/atomic" "time" @@ -20,13 +19,22 @@ import ( type BdhInput struct { CommandID int32 - File string // upload multi-thread required Body io.ReadSeeker Ticket []byte Ext []byte Encrypt bool } +type BdhMultiThreadInput struct { + CommandID int32 + Body io.ReaderAt + Sum []byte + Size int64 + Ticket []byte + Ext []byte + Encrypt bool +} + func (bdh *BdhInput) encrypt(key []byte) error { if bdh.Encrypt { if len(key) == 0 { @@ -37,6 +45,16 @@ func (bdh *BdhInput) encrypt(key []byte) error { return nil } +func (bdh *BdhMultiThreadInput) encrypt(key []byte) error { + if bdh.Encrypt { + if len(key) == 0 { + return errors.New("session key not found. maybe miss some packet?") + } + bdh.Ext = binary.NewTeaCipher(key).Encrypt(bdh.Ext) + } + return nil +} + func (s *Session) UploadBDH(input BdhInput) ([]byte, error) { if len(s.SsoAddr) == 0 { return nil, errors.New("srv addrs not found. maybe miss some packet?") @@ -120,42 +138,35 @@ func (s *Session) UploadBDH(input BdhInput) ([]byte, error) { return rspExt, nil } -func (s *Session) UploadBDHMultiThread(input BdhInput, threadCount int) ([]byte, error) { +func (s *Session) UploadBDHMultiThread(input BdhMultiThreadInput, threadCount int) ([]byte, error) { + // for small file and small thread count, + // use UploadBDH instead of UploadBDHMultiThread + if input.Size < 1024*1024*3 || threadCount < 2 { + return s.UploadBDH(BdhInput{ + CommandID: input.CommandID, + Body: io.NewSectionReader(input.Body, 0, input.Size), + Ticket: input.Ticket, + Ext: input.Ext, + Encrypt: input.Encrypt, + }) + } + if len(s.SsoAddr) == 0 { return nil, errors.New("srv addrs not found. maybe miss some packet?") } addr := s.SsoAddr[0].String() - stat, err := os.Stat(input.File) - if err != nil { - return nil, errors.Wrap(err, "get stat error") - } - file, err := os.OpenFile(input.File, os.O_RDONLY, 0o666) - if err != nil { - return nil, errors.Wrap(err, "open file error") - } - sum, length := utils.ComputeMd5AndLength(file) - _, _ = file.Seek(0, io.SeekStart) - if err := input.encrypt(s.SessionKey); err != nil { return nil, errors.Wrap(err, "encrypt error") } - // for small file and small thread count, - // use UploadBDH instead of UploadBDHMultiThread - if length < 1024*1024*3 || threadCount < 2 { - input.Body = file - return s.UploadBDH(input) - } - type BlockMetaData struct { - Id int - BeginOffset int64 - EndOffset int64 + Id int + Offset int64 } const blockSize int64 = 1024 * 512 var ( - blocks []*BlockMetaData + blocks []BlockMetaData rspExt []byte BlockId = ^uint32(0) // -1 uploadedCount uint32 @@ -164,18 +175,16 @@ func (s *Session) UploadBDHMultiThread(input BdhInput, threadCount int) ([]byte, // Init Blocks { var temp int64 = 0 - for temp+blockSize < stat.Size() { - blocks = append(blocks, &BlockMetaData{ - Id: len(blocks), - BeginOffset: temp, - EndOffset: temp + blockSize, + for temp+blockSize < input.Size { + blocks = append(blocks, BlockMetaData{ + Id: len(blocks), + Offset: temp, }) temp += blockSize } - blocks = append(blocks, &BlockMetaData{ - Id: len(blocks), - BeginOffset: temp, - EndOffset: stat.Size(), + blocks = append(blocks, BlockMetaData{ + Id: len(blocks), + Offset: temp, }) } doUpload := func() error { @@ -187,8 +196,6 @@ func (s *Session) UploadBDHMultiThread(input BdhInput, threadCount int) ([]byte, return errors.Wrap(err, "connect error") } defer conn.Close() - chunk, _ := os.OpenFile(input.File, os.O_RDONLY, 0o666) - defer chunk.Close() reader := binary.NewNetworkReader(conn) if err = s.sendEcho(conn); err != nil { return err @@ -212,14 +219,17 @@ func (s *Session) UploadBDHMultiThread(input BdhInput, threadCount int) ([]byte, cond.L.Unlock() } buffer = buffer[:blockSize] - _, _ = chunk.Seek(block.BeginOffset, io.SeekStart) - ri, err := io.ReadFull(chunk, buffer) + + cond.L.Lock() // lock protect reading + n, err := input.Body.ReadAt(buffer, block.Offset) + cond.L.Unlock() + if err != nil { if err == io.EOF { break } if err == io.ErrUnexpectedEOF { - buffer = buffer[:ri] + buffer = buffer[:n] } else { return err } @@ -237,12 +247,12 @@ func (s *Session) UploadBDHMultiThread(input BdhInput, threadCount int) ([]byte, LocaleId: 2052, }, MsgSeghead: &pb.SegHead{ - Filesize: stat.Size(), - Dataoffset: block.BeginOffset, - Datalength: int32(ri), + Filesize: input.Size, + Dataoffset: block.Offset, + Datalength: int32(n), Serviceticket: input.Ticket, Md5: ch[:], - FileMd5: sum, + FileMd5: input.Sum, }, ReqExtendinfo: input.Ext, }) @@ -271,6 +281,5 @@ func (s *Session) UploadBDHMultiThread(input BdhInput, threadCount int) ([]byte, for i := 0; i < threadCount; i++ { group.Go(doUpload) } - err = group.Wait() - return rspExt, err + return rspExt, group.Wait() } diff --git a/client/ptt.go b/client/ptt.go index beb864e3..5c1e74c8 100644 --- a/client/ptt.go +++ b/client/ptt.go @@ -4,7 +4,6 @@ import ( "crypto/md5" "encoding/hex" "io" - "os" "github.com/pkg/errors" @@ -134,14 +133,10 @@ func (c *QQClient) UploadPrivatePtt(target int64, voice io.ReadSeeker) (*message } // UploadGroupShortVideo 将视频和封面上传到服务器, 返回 message.ShortVideoElement 可直接发送 -// combinedCache 本地文件缓存, 设置后可多线程上传 -func (c *QQClient) UploadGroupShortVideo(groupCode int64, video, thumb io.ReadSeeker, combinedCache ...string) (*message.ShortVideoElement, error) { +// thread 上传线程数 +func (c *QQClient) UploadGroupShortVideo(groupCode int64, video, thumb io.ReadSeeker, thread int) (*message.ShortVideoElement, error) { videoHash, videoLen := utils.ComputeMd5AndLength(video) thumbHash, thumbLen := utils.ComputeMd5AndLength(thumb) - cache := "" - if len(combinedCache) > 0 { - cache = combinedCache[0] - } key := string(videoHash) + string(thumbHash) pttWaiter.Wait(key) @@ -164,30 +159,27 @@ func (c *QQClient) UploadGroupShortVideo(groupCode int64, video, thumb io.ReadSe ext, _ := proto.Marshal(c.buildPttGroupShortVideoProto(videoHash, thumbHash, groupCode, videoLen, thumbLen, 1).PttShortVideoUploadReq) var hwRsp []byte - multi := utils.MultiReadSeeker(thumb, video) - input := highway.BdhInput{ - CommandID: 25, - File: cache, - Body: multi, - Ticket: c.highwaySession.SigSession, - Ext: ext, - Encrypt: true, - } - if cache != "" { - var file *os.File - file, err = os.OpenFile(cache, os.O_WRONLY|os.O_CREATE, 0o666) - cp := func() error { - _, err := io.Copy(file, utils.MultiReadSeeker(thumb, video)) - return err - } - if err != nil || cp() != nil { - hwRsp, err = c.highwaySession.UploadBDH(input) - } else { - _ = file.Close() - hwRsp, err = c.highwaySession.UploadBDHMultiThread(input, 8) - _ = os.Remove(cache) + if thread > 1 { + sum, _ := utils.ComputeMd5AndLength(utils.MultiReadSeeker(thumb, video)) + input := highway.BdhMultiThreadInput{ + CommandID: 25, + Body: utils.ReaderAtFrom2ReadSeeker(thumb, video), + Size: videoLen + thumbLen, + Sum: sum, + Ticket: c.highwaySession.SigSession, + Ext: ext, + Encrypt: true, } + hwRsp, err = c.highwaySession.UploadBDHMultiThread(input, thread) } else { + multi := utils.MultiReadSeeker(thumb, video) + input := highway.BdhInput{ + CommandID: 25, + Body: multi, + Ticket: c.highwaySession.SigSession, + Ext: ext, + Encrypt: true, + } hwRsp, err = c.highwaySession.UploadBDH(input) } if err != nil { diff --git a/utils/sys.go b/utils/sys.go index 3ebda44c..57080779 100644 --- a/utils/sys.go +++ b/utils/sys.go @@ -60,6 +60,59 @@ func MultiReadSeeker(r ...io.ReadSeeker) io.ReadSeeker { } } +type multiReadAt struct { + first io.ReadSeeker + second io.ReadSeeker + firstSize int64 + secondSize int64 +} + +func (m *multiReadAt) ReadAt(p []byte, off int64) (n int, err error) { + if m.second == nil { // quick path + _, _ = m.first.Seek(off, io.SeekStart) + return m.first.Read(p) + } + if off < m.firstSize && off+int64(len(p)) < m.firstSize { + _, err = m.first.Seek(off, io.SeekStart) + if err != nil { + return + } + return m.first.Read(p) + } else if off < m.firstSize && off+int64(len(p)) >= m.firstSize { + _, _ = m.first.Seek(off, io.SeekStart) + _, _ = m.second.Seek(0, io.SeekStart) + n, err = m.first.Read(p[:m.firstSize-off]) + if err != nil { + return + } + n2, err := m.second.Read(p[m.firstSize-off:]) + return n + n2, err + } + _, err = m.second.Seek(off-m.firstSize, io.SeekStart) + if err != nil { + return + } + return m.second.Read(p) +} + +func ReaderAtFrom2ReadSeeker(first, second io.ReadSeeker) io.ReaderAt { + firstSize, _ := first.Seek(0, io.SeekEnd) + if second == nil { + return &multiReadAt{ + first: first, + firstSize: firstSize, + secondSize: 0, + } + } + secondSize, _ := second.Seek(0, io.SeekEnd) + return &multiReadAt{ + first: first, + second: second, + firstSize: firstSize, + secondSize: secondSize, + } +} + // Select 如果A为nil 将会返回 B 否则返回A // 对应 ?? 语法 func Select(a, b []byte) []byte {