1
0
mirror of https://github.com/Mrs4s/MiraiGo.git synced 2025-05-07 20:45:53 +08:00

feature highway multi-threading upload.

This commit is contained in:
Mrs4s 2021-01-04 17:18:54 +08:00
parent e1e1c97a56
commit a2025e22ac
2 changed files with 198 additions and 1 deletions

View File

@ -12,6 +12,7 @@ import (
"math"
"math/rand"
"net"
"os"
"runtime/debug"
"sort"
"strconv"
@ -613,6 +614,48 @@ ok:
return message.NewGroupImage(binary.CalculateImageResourceId(fh[:]), fh[:], rsp.FileId, int32(length), int32(i.Width), int32(i.Height), imageType), nil
}
func (c *QQClient) UploadGroupImageByFile(groupCode int64, path string) (*message.GroupImageElement, error) {
img, err := os.OpenFile(path, os.O_RDONLY, 0666)
if err != nil {
return nil, err
}
h := md5.New()
length, _ := io.Copy(h, img)
fh := h.Sum(nil)
seq, pkt := c.buildGroupImageStorePacket(groupCode, fh[:], int32(length))
r, err := c.sendAndWait(seq, pkt)
if err != nil {
return nil, err
}
rsp := r.(imageUploadResponse)
if rsp.ResultCode != 0 {
return nil, errors.New(rsp.Message)
}
if rsp.IsExists {
goto ok
}
if len(c.srvSsoAddrs) == 0 {
for i, addr := range rsp.UploadIp {
c.srvSsoAddrs = append(c.srvSsoAddrs, fmt.Sprintf("%v:%v", binary.UInt32ToIPV4Address(uint32(addr)), rsp.UploadPort[i]))
}
}
if _, err = c.highwayUploadFileMultiThreadingByBDH(path, 2, 2, rsp.UploadKey, EmptyBytes); err == nil {
goto ok
}
return nil, errors.New("upload failed")
ok:
_, _ = img.Seek(0, io.SeekStart)
i, _, _ := image.DecodeConfig(img)
var imageType int32 = 1000
_, _ = img.Seek(0, io.SeekStart)
tmp := make([]byte, 4)
_, _ = img.Read(tmp)
if bytes.Equal(tmp, []byte{0x47, 0x49, 0x46, 0x38}) {
imageType = 2000
}
return message.NewGroupImage(binary.CalculateImageResourceId(fh[:]), fh[:], rsp.FileId, int32(length), int32(i.Width), int32(i.Height), imageType), nil
}
func (c *QQClient) UploadPrivateImage(target int64, img io.ReadSeeker) (*message.FriendImageElement, error) {
return c.uploadPrivateImage(target, img, 0)
}

View File

@ -14,7 +14,9 @@ import (
"io"
"net"
"net/http"
"os"
"strconv"
"sync"
"time"
)
@ -101,8 +103,8 @@ func (c *QQClient) highwayUploadByBDH(stream io.ReadSeeker, cmdId int32, ticket,
}
h := md5.New()
length, _ := io.Copy(h, stream)
chunkSize := 8192 * 16
fh := h.Sum(nil)
chunkSize := 8192 * 16
_, _ = stream.Seek(0, io.SeekStart)
conn, err := net.DialTimeout("tcp", c.srvSsoAddrs[0], time.Second*20)
if err != nil {
@ -178,6 +180,158 @@ func (c *QQClient) highwayUploadByBDH(stream io.ReadSeeker, cmdId int32, ticket,
return rspExt, nil
}
func (c *QQClient) highwayUploadFileMultiThreadingByBDH(path string, cmdId int32, threadCount int, ticket, ext []byte) ([]byte, error) {
if len(c.srvSsoAddrs) == 0 {
return nil, errors.New("srv addrs not found. maybe miss some packet?")
}
stat, err := os.Stat(path)
if err != nil {
return nil, errors.Wrap(err, "get stat error")
}
file, err := os.OpenFile(path, os.O_RDONLY, 0666)
if err != nil {
return nil, errors.Wrap(err, "open file error")
}
if stat.Size() < 1024*1024*5 {
return c.highwayUploadByBDH(file, cmdId, ticket, ext)
}
type BlockMetaData struct {
Id int
BeginOffset int64
EndOffset int64
Uploaded bool
Uploading bool
}
h := md5.New()
_, _ = io.Copy(h, file)
fh := h.Sum(nil)
var blockSize int64 = 1024 * 1024
var blocks []*BlockMetaData
var rspExt []byte
// Init Blocks
{
var temp int64 = 0
for temp+blockSize < stat.Size() {
blocks = append(blocks, &BlockMetaData{
Id: len(blocks),
BeginOffset: temp,
EndOffset: temp + blockSize,
})
temp += blockSize
}
blocks = append(blocks, &BlockMetaData{
Id: len(blocks),
BeginOffset: temp,
EndOffset: stat.Size(),
})
}
var nextLock sync.Mutex
nextBlockId := func() int {
nextLock.Lock()
defer nextLock.Unlock()
for i, block := range blocks {
if !block.Uploading && !block.Uploaded {
block.Uploading = true
return i
}
}
return -1
}
doUpload := func() error {
conn, err := net.DialTimeout("tcp", c.srvSsoAddrs[0], time.Second*20)
if err != nil {
return errors.Wrap(err, "connect error")
}
defer conn.Close()
chunk, _ := os.OpenFile(path, os.O_RDONLY, 0666)
reader := binary.NewNetworkReader(conn)
if err = c.highwaySendHeartbreak(conn); err != nil {
return errors.Wrap(err, "echo error")
}
if _, _, err = highwayReadResponse(reader); err != nil {
return errors.Wrap(err, "echo error")
}
for {
nextId := nextBlockId()
if nextId == -1 {
break
}
block := blocks[nextId]
buffer := make([]byte, blockSize)
_, _ = chunk.Seek(block.BeginOffset, io.SeekStart)
ri, err := io.ReadFull(chunk, buffer)
if err != nil {
if err == io.EOF {
break
}
if err == io.ErrUnexpectedEOF {
buffer = buffer[:ri]
} else {
return err
}
}
ch := md5.Sum(buffer)
head, _ := proto.Marshal(&pb.ReqDataHighwayHead{
MsgBasehead: &pb.DataHighwayHead{
Version: 1,
Uin: strconv.FormatInt(c.Uin, 10),
Command: "PicUp.DataUp",
Seq: c.nextGroupDataTransSeq(),
Appid: int32(c.version.AppId),
Dataflag: 4096,
CommandId: cmdId,
LocaleId: 2052,
},
MsgSeghead: &pb.SegHead{
Filesize: stat.Size(),
Dataoffset: block.BeginOffset,
Datalength: int32(ri),
Serviceticket: ticket,
Md5: ch[:],
FileMd5: fh[:],
},
ReqExtendinfo: ext,
})
_, err = conn.Write(binary.NewWriterF(func(w *binary.Writer) {
w.WriteByte(40)
w.WriteUInt32(uint32(len(head)))
w.WriteUInt32(uint32(len(buffer)))
w.Write(head)
w.Write(buffer)
w.WriteByte(41)
}))
if err != nil {
return errors.Wrap(err, "write conn error")
}
rspHead, _, err := highwayReadResponse(reader)
if err != nil {
return errors.Wrap(err, "highway upload error")
}
if rspHead.ErrorCode != 0 {
return errors.New("upload failed")
}
if rspHead.RspExtendinfo != nil {
rspExt = rspHead.RspExtendinfo
}
block.Uploaded = true
}
return nil
}
wg := sync.WaitGroup{}
wg.Add(threadCount)
var lastErr error
for i := 0; i < threadCount; i++ {
go func() {
defer wg.Done()
if err := doUpload(); err != nil {
lastErr = err
}
}()
}
wg.Wait()
return rspExt, err
}
func (c *QQClient) highwaySendHeartbreak(conn net.Conn) error {
head, _ := proto.Marshal(&pb.ReqDataHighwayHead{
MsgBasehead: &pb.DataHighwayHead{