1
0
mirror of https://github.com/Mrs4s/MiraiGo.git synced 2025-05-04 19:17:38 +08:00

client: allow multi-thread upload without cache file

Delete `UploadGroupImageByFile` since `UploadGroupImage` can multi-thread upload
This commit is contained in:
wdvxdr 2022-02-20 23:26:07 +08:00
parent 32ef91dd32
commit 5e8a512698
No known key found for this signature in database
GPG Key ID: 703F8C071DE7A1B6
4 changed files with 153 additions and 132 deletions

View File

@ -5,7 +5,6 @@ import (
"encoding/hex" "encoding/hex"
"io" "io"
"math/rand" "math/rand"
"os"
"strings" "strings"
"time" "time"
@ -44,7 +43,7 @@ type imageUploadResponse struct {
IsExists bool IsExists bool
} }
func (c *QQClient) UploadGroupImage(groupCode int64, img io.ReadSeeker) (*message.GroupImageElement, error) { func (c *QQClient) UploadGroupImage(groupCode int64, img io.ReadSeeker, thread ...int) (*message.GroupImageElement, error) {
_, _ = img.Seek(0, io.SeekStart) // safe _, _ = img.Seek(0, io.SeekStart) // safe
fh, length := utils.ComputeMd5AndLength(img) fh, length := utils.ComputeMd5AndLength(img)
_, _ = img.Seek(0, io.SeekStart) _, _ = img.Seek(0, io.SeekStart)
@ -53,6 +52,11 @@ func (c *QQClient) UploadGroupImage(groupCode int64, img io.ReadSeeker) (*messag
imgWaiter.Wait(key) imgWaiter.Wait(key)
defer imgWaiter.Done(key) defer imgWaiter.Done(key)
tc := 1
if len(thread) > 0 {
tc = thread[0]
}
seq, pkt := c.buildGroupImageStorePacket(groupCode, fh, int32(length)) seq, pkt := c.buildGroupImageStorePacket(groupCode, fh, int32(length))
r, err := c.sendAndWait(seq, pkt) r, err := c.sendAndWait(seq, pkt)
if err != nil { if err != nil {
@ -70,66 +74,29 @@ func (c *QQClient) UploadGroupImage(groupCode int64, img io.ReadSeeker) (*messag
c.highwaySession.AppendAddr(addr, rsp.UploadPort[i]) c.highwaySession.AppendAddr(addr, rsp.UploadPort[i])
} }
} }
if _, err = c.highwaySession.UploadBDH(highway.BdhInput{
if tc > 1 && length > 3*1024*1024 {
_, err = c.highwaySession.UploadBDHMultiThread(highway.BdhMultiThreadInput{
CommandID: 2,
Body: utils.ReaderAtFrom2ReadSeeker(img, nil),
Size: length,
Sum: fh,
Ticket: rsp.UploadKey,
Ext: EmptyBytes,
Encrypt: false,
}, 4)
} else {
_, err = c.highwaySession.UploadBDH(highway.BdhInput{
CommandID: 2, CommandID: 2,
Body: img, Body: img,
Ticket: rsp.UploadKey, Ticket: rsp.UploadKey,
Ext: EmptyBytes, Ext: EmptyBytes,
Encrypt: false, Encrypt: false,
}); err == nil { })
goto ok
} }
return nil, errors.Wrap(err, "upload failed")
ok:
_, _ = img.Seek(0, io.SeekStart)
i, t, _ := imgsz.DecodeSize(img)
var imageType int32 = 1000
if t == "gif" {
imageType = 2000
}
return message.NewGroupImage(binary.CalculateImageResourceId(fh), fh, rsp.FileId, int32(length), int32(i.Width), int32(i.Height), imageType), nil
}
func (c *QQClient) UploadGroupImageByFile(groupCode int64, path string) (*message.GroupImageElement, error) {
img, err := os.OpenFile(path, os.O_RDONLY, 0o666)
if err != nil { if err != nil {
return nil, err
}
defer func() { _ = img.Close() }()
fh, length := utils.ComputeMd5AndLength(img)
key := hex.EncodeToString(fh)
imgWaiter.Wait(key)
defer imgWaiter.Done(key)
seq, pkt := c.buildGroupImageStorePacket(groupCode, fh, int32(length))
r, err := c.sendAndWait(seq, pkt)
if err != nil {
return nil, err
}
rsp := r.(*imageUploadResponse)
if rsp.ResultCode != 0 {
return nil, errors.New(rsp.Message)
}
if rsp.IsExists {
goto ok
}
if c.highwaySession.AddrLength() == 0 {
for i, addr := range rsp.UploadIp {
c.highwaySession.AppendAddr(addr, rsp.UploadPort[i])
}
}
if _, err = c.highwaySession.UploadBDHMultiThread(highway.BdhInput{
CommandID: 2,
File: path,
Ticket: rsp.UploadKey,
Ext: EmptyBytes,
Encrypt: false,
}, 4); err == nil {
goto ok
}
return nil, errors.Wrap(err, "upload failed") return nil, errors.Wrap(err, "upload failed")
}
ok: ok:
_, _ = img.Seek(0, io.SeekStart) _, _ = img.Seek(0, io.SeekStart)
i, t, _ := imgsz.DecodeSize(img) i, t, _ := imgsz.DecodeSize(img)

View File

@ -4,7 +4,6 @@ import (
"crypto/md5" "crypto/md5"
"io" "io"
"net" "net"
"os"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -20,13 +19,22 @@ import (
type BdhInput struct { type BdhInput struct {
CommandID int32 CommandID int32
File string // upload multi-thread required
Body io.ReadSeeker Body io.ReadSeeker
Ticket []byte Ticket []byte
Ext []byte Ext []byte
Encrypt bool Encrypt bool
} }
type BdhMultiThreadInput struct {
CommandID int32
Body io.ReaderAt
Sum []byte
Size int64
Ticket []byte
Ext []byte
Encrypt bool
}
func (bdh *BdhInput) encrypt(key []byte) error { func (bdh *BdhInput) encrypt(key []byte) error {
if bdh.Encrypt { if bdh.Encrypt {
if len(key) == 0 { if len(key) == 0 {
@ -37,6 +45,16 @@ func (bdh *BdhInput) encrypt(key []byte) error {
return nil return nil
} }
func (bdh *BdhMultiThreadInput) encrypt(key []byte) error {
if bdh.Encrypt {
if len(key) == 0 {
return errors.New("session key not found. maybe miss some packet?")
}
bdh.Ext = binary.NewTeaCipher(key).Encrypt(bdh.Ext)
}
return nil
}
func (s *Session) UploadBDH(input BdhInput) ([]byte, error) { func (s *Session) UploadBDH(input BdhInput) ([]byte, error) {
if len(s.SsoAddr) == 0 { if len(s.SsoAddr) == 0 {
return nil, errors.New("srv addrs not found. maybe miss some packet?") return nil, errors.New("srv addrs not found. maybe miss some packet?")
@ -120,42 +138,35 @@ func (s *Session) UploadBDH(input BdhInput) ([]byte, error) {
return rspExt, nil return rspExt, nil
} }
func (s *Session) UploadBDHMultiThread(input BdhInput, threadCount int) ([]byte, error) { func (s *Session) UploadBDHMultiThread(input BdhMultiThreadInput, threadCount int) ([]byte, error) {
// for small file and small thread count,
// use UploadBDH instead of UploadBDHMultiThread
if input.Size < 1024*1024*3 || threadCount < 2 {
return s.UploadBDH(BdhInput{
CommandID: input.CommandID,
Body: io.NewSectionReader(input.Body, 0, input.Size),
Ticket: input.Ticket,
Ext: input.Ext,
Encrypt: input.Encrypt,
})
}
if len(s.SsoAddr) == 0 { if len(s.SsoAddr) == 0 {
return nil, errors.New("srv addrs not found. maybe miss some packet?") return nil, errors.New("srv addrs not found. maybe miss some packet?")
} }
addr := s.SsoAddr[0].String() addr := s.SsoAddr[0].String()
stat, err := os.Stat(input.File)
if err != nil {
return nil, errors.Wrap(err, "get stat error")
}
file, err := os.OpenFile(input.File, os.O_RDONLY, 0o666)
if err != nil {
return nil, errors.Wrap(err, "open file error")
}
sum, length := utils.ComputeMd5AndLength(file)
_, _ = file.Seek(0, io.SeekStart)
if err := input.encrypt(s.SessionKey); err != nil { if err := input.encrypt(s.SessionKey); err != nil {
return nil, errors.Wrap(err, "encrypt error") return nil, errors.Wrap(err, "encrypt error")
} }
// for small file and small thread count,
// use UploadBDH instead of UploadBDHMultiThread
if length < 1024*1024*3 || threadCount < 2 {
input.Body = file
return s.UploadBDH(input)
}
type BlockMetaData struct { type BlockMetaData struct {
Id int Id int
BeginOffset int64 Offset int64
EndOffset int64
} }
const blockSize int64 = 1024 * 512 const blockSize int64 = 1024 * 512
var ( var (
blocks []*BlockMetaData blocks []BlockMetaData
rspExt []byte rspExt []byte
BlockId = ^uint32(0) // -1 BlockId = ^uint32(0) // -1
uploadedCount uint32 uploadedCount uint32
@ -164,18 +175,16 @@ func (s *Session) UploadBDHMultiThread(input BdhInput, threadCount int) ([]byte,
// Init Blocks // Init Blocks
{ {
var temp int64 = 0 var temp int64 = 0
for temp+blockSize < stat.Size() { for temp+blockSize < input.Size {
blocks = append(blocks, &BlockMetaData{ blocks = append(blocks, BlockMetaData{
Id: len(blocks), Id: len(blocks),
BeginOffset: temp, Offset: temp,
EndOffset: temp + blockSize,
}) })
temp += blockSize temp += blockSize
} }
blocks = append(blocks, &BlockMetaData{ blocks = append(blocks, BlockMetaData{
Id: len(blocks), Id: len(blocks),
BeginOffset: temp, Offset: temp,
EndOffset: stat.Size(),
}) })
} }
doUpload := func() error { doUpload := func() error {
@ -187,8 +196,6 @@ func (s *Session) UploadBDHMultiThread(input BdhInput, threadCount int) ([]byte,
return errors.Wrap(err, "connect error") return errors.Wrap(err, "connect error")
} }
defer conn.Close() defer conn.Close()
chunk, _ := os.OpenFile(input.File, os.O_RDONLY, 0o666)
defer chunk.Close()
reader := binary.NewNetworkReader(conn) reader := binary.NewNetworkReader(conn)
if err = s.sendEcho(conn); err != nil { if err = s.sendEcho(conn); err != nil {
return err return err
@ -212,14 +219,17 @@ func (s *Session) UploadBDHMultiThread(input BdhInput, threadCount int) ([]byte,
cond.L.Unlock() cond.L.Unlock()
} }
buffer = buffer[:blockSize] buffer = buffer[:blockSize]
_, _ = chunk.Seek(block.BeginOffset, io.SeekStart)
ri, err := io.ReadFull(chunk, buffer) cond.L.Lock() // lock protect reading
n, err := input.Body.ReadAt(buffer, block.Offset)
cond.L.Unlock()
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
break break
} }
if err == io.ErrUnexpectedEOF { if err == io.ErrUnexpectedEOF {
buffer = buffer[:ri] buffer = buffer[:n]
} else { } else {
return err return err
} }
@ -237,12 +247,12 @@ func (s *Session) UploadBDHMultiThread(input BdhInput, threadCount int) ([]byte,
LocaleId: 2052, LocaleId: 2052,
}, },
MsgSeghead: &pb.SegHead{ MsgSeghead: &pb.SegHead{
Filesize: stat.Size(), Filesize: input.Size,
Dataoffset: block.BeginOffset, Dataoffset: block.Offset,
Datalength: int32(ri), Datalength: int32(n),
Serviceticket: input.Ticket, Serviceticket: input.Ticket,
Md5: ch[:], Md5: ch[:],
FileMd5: sum, FileMd5: input.Sum,
}, },
ReqExtendinfo: input.Ext, ReqExtendinfo: input.Ext,
}) })
@ -271,6 +281,5 @@ func (s *Session) UploadBDHMultiThread(input BdhInput, threadCount int) ([]byte,
for i := 0; i < threadCount; i++ { for i := 0; i < threadCount; i++ {
group.Go(doUpload) group.Go(doUpload)
} }
err = group.Wait() return rspExt, group.Wait()
return rspExt, err
} }

View File

@ -4,7 +4,6 @@ import (
"crypto/md5" "crypto/md5"
"encoding/hex" "encoding/hex"
"io" "io"
"os"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -134,14 +133,10 @@ func (c *QQClient) UploadPrivatePtt(target int64, voice io.ReadSeeker) (*message
} }
// UploadGroupShortVideo 将视频和封面上传到服务器, 返回 message.ShortVideoElement 可直接发送 // UploadGroupShortVideo 将视频和封面上传到服务器, 返回 message.ShortVideoElement 可直接发送
// combinedCache 本地文件缓存, 设置后可多线程上传 // thread 上传线程数
func (c *QQClient) UploadGroupShortVideo(groupCode int64, video, thumb io.ReadSeeker, combinedCache ...string) (*message.ShortVideoElement, error) { func (c *QQClient) UploadGroupShortVideo(groupCode int64, video, thumb io.ReadSeeker, thread int) (*message.ShortVideoElement, error) {
videoHash, videoLen := utils.ComputeMd5AndLength(video) videoHash, videoLen := utils.ComputeMd5AndLength(video)
thumbHash, thumbLen := utils.ComputeMd5AndLength(thumb) thumbHash, thumbLen := utils.ComputeMd5AndLength(thumb)
cache := ""
if len(combinedCache) > 0 {
cache = combinedCache[0]
}
key := string(videoHash) + string(thumbHash) key := string(videoHash) + string(thumbHash)
pttWaiter.Wait(key) pttWaiter.Wait(key)
@ -164,30 +159,27 @@ func (c *QQClient) UploadGroupShortVideo(groupCode int64, video, thumb io.ReadSe
ext, _ := proto.Marshal(c.buildPttGroupShortVideoProto(videoHash, thumbHash, groupCode, videoLen, thumbLen, 1).PttShortVideoUploadReq) ext, _ := proto.Marshal(c.buildPttGroupShortVideoProto(videoHash, thumbHash, groupCode, videoLen, thumbLen, 1).PttShortVideoUploadReq)
var hwRsp []byte var hwRsp []byte
if thread > 1 {
sum, _ := utils.ComputeMd5AndLength(utils.MultiReadSeeker(thumb, video))
input := highway.BdhMultiThreadInput{
CommandID: 25,
Body: utils.ReaderAtFrom2ReadSeeker(thumb, video),
Size: videoLen + thumbLen,
Sum: sum,
Ticket: c.highwaySession.SigSession,
Ext: ext,
Encrypt: true,
}
hwRsp, err = c.highwaySession.UploadBDHMultiThread(input, thread)
} else {
multi := utils.MultiReadSeeker(thumb, video) multi := utils.MultiReadSeeker(thumb, video)
input := highway.BdhInput{ input := highway.BdhInput{
CommandID: 25, CommandID: 25,
File: cache,
Body: multi, Body: multi,
Ticket: c.highwaySession.SigSession, Ticket: c.highwaySession.SigSession,
Ext: ext, Ext: ext,
Encrypt: true, Encrypt: true,
} }
if cache != "" {
var file *os.File
file, err = os.OpenFile(cache, os.O_WRONLY|os.O_CREATE, 0o666)
cp := func() error {
_, err := io.Copy(file, utils.MultiReadSeeker(thumb, video))
return err
}
if err != nil || cp() != nil {
hwRsp, err = c.highwaySession.UploadBDH(input)
} else {
_ = file.Close()
hwRsp, err = c.highwaySession.UploadBDHMultiThread(input, 8)
_ = os.Remove(cache)
}
} else {
hwRsp, err = c.highwaySession.UploadBDH(input) hwRsp, err = c.highwaySession.UploadBDH(input)
} }
if err != nil { if err != nil {

View File

@ -60,6 +60,59 @@ func MultiReadSeeker(r ...io.ReadSeeker) io.ReadSeeker {
} }
} }
type multiReadAt struct {
first io.ReadSeeker
second io.ReadSeeker
firstSize int64
secondSize int64
}
func (m *multiReadAt) ReadAt(p []byte, off int64) (n int, err error) {
if m.second == nil { // quick path
_, _ = m.first.Seek(off, io.SeekStart)
return m.first.Read(p)
}
if off < m.firstSize && off+int64(len(p)) < m.firstSize {
_, err = m.first.Seek(off, io.SeekStart)
if err != nil {
return
}
return m.first.Read(p)
} else if off < m.firstSize && off+int64(len(p)) >= m.firstSize {
_, _ = m.first.Seek(off, io.SeekStart)
_, _ = m.second.Seek(0, io.SeekStart)
n, err = m.first.Read(p[:m.firstSize-off])
if err != nil {
return
}
n2, err := m.second.Read(p[m.firstSize-off:])
return n + n2, err
}
_, err = m.second.Seek(off-m.firstSize, io.SeekStart)
if err != nil {
return
}
return m.second.Read(p)
}
func ReaderAtFrom2ReadSeeker(first, second io.ReadSeeker) io.ReaderAt {
firstSize, _ := first.Seek(0, io.SeekEnd)
if second == nil {
return &multiReadAt{
first: first,
firstSize: firstSize,
secondSize: 0,
}
}
secondSize, _ := second.Seek(0, io.SeekEnd)
return &multiReadAt{
first: first,
second: second,
firstSize: firstSize,
secondSize: secondSize,
}
}
// Select 如果A为nil 将会返回 B 否则返回A // Select 如果A为nil 将会返回 B 否则返回A
// 对应 ?? 语法 // 对应 ?? 语法
func Select(a, b []byte) []byte { func Select(a, b []byte) []byte {