diff --git a/client/client.go b/client/client.go index a00cbffe..d0fc46ec 100644 --- a/client/client.go +++ b/client/client.go @@ -12,6 +12,7 @@ import ( "math" "math/rand" "net" + "os" "runtime/debug" "sort" "strconv" @@ -613,6 +614,48 @@ ok: return message.NewGroupImage(binary.CalculateImageResourceId(fh[:]), fh[:], rsp.FileId, int32(length), int32(i.Width), int32(i.Height), imageType), nil } +func (c *QQClient) UploadGroupImageByFile(groupCode int64, path string) (*message.GroupImageElement, error) { + img, err := os.OpenFile(path, os.O_RDONLY, 0666) + if err != nil { + return nil, err + } + h := md5.New() + length, _ := io.Copy(h, img) + fh := h.Sum(nil) + seq, pkt := c.buildGroupImageStorePacket(groupCode, fh[:], int32(length)) + r, err := c.sendAndWait(seq, pkt) + if err != nil { + return nil, err + } + rsp := r.(imageUploadResponse) + if rsp.ResultCode != 0 { + return nil, errors.New(rsp.Message) + } + if rsp.IsExists { + goto ok + } + if len(c.srvSsoAddrs) == 0 { + for i, addr := range rsp.UploadIp { + c.srvSsoAddrs = append(c.srvSsoAddrs, fmt.Sprintf("%v:%v", binary.UInt32ToIPV4Address(uint32(addr)), rsp.UploadPort[i])) + } + } + if _, err = c.highwayUploadFileMultiThreadingByBDH(path, 2, 2, rsp.UploadKey, EmptyBytes); err == nil { + goto ok + } + return nil, errors.New("upload failed") +ok: + _, _ = img.Seek(0, io.SeekStart) + i, _, _ := image.DecodeConfig(img) + var imageType int32 = 1000 + _, _ = img.Seek(0, io.SeekStart) + tmp := make([]byte, 4) + _, _ = img.Read(tmp) + if bytes.Equal(tmp, []byte{0x47, 0x49, 0x46, 0x38}) { + imageType = 2000 + } + return message.NewGroupImage(binary.CalculateImageResourceId(fh[:]), fh[:], rsp.FileId, int32(length), int32(i.Width), int32(i.Height), imageType), nil +} + func (c *QQClient) UploadPrivateImage(target int64, img io.ReadSeeker) (*message.FriendImageElement, error) { return c.uploadPrivateImage(target, img, 0) } diff --git a/client/highway.go b/client/highway.go index 7b7ecd7a..6e7888f1 100644 --- a/client/highway.go +++ b/client/highway.go @@ -14,7 +14,9 @@ import ( "io" "net" "net/http" + "os" "strconv" + "sync" "time" ) @@ -101,8 +103,8 @@ func (c *QQClient) highwayUploadByBDH(stream io.ReadSeeker, cmdId int32, ticket, } h := md5.New() length, _ := io.Copy(h, stream) - chunkSize := 8192 * 16 fh := h.Sum(nil) + chunkSize := 8192 * 16 _, _ = stream.Seek(0, io.SeekStart) conn, err := net.DialTimeout("tcp", c.srvSsoAddrs[0], time.Second*20) if err != nil { @@ -178,6 +180,158 @@ func (c *QQClient) highwayUploadByBDH(stream io.ReadSeeker, cmdId int32, ticket, return rspExt, nil } +func (c *QQClient) highwayUploadFileMultiThreadingByBDH(path string, cmdId int32, threadCount int, ticket, ext []byte) ([]byte, error) { + if len(c.srvSsoAddrs) == 0 { + return nil, errors.New("srv addrs not found. maybe miss some packet?") + } + stat, err := os.Stat(path) + if err != nil { + return nil, errors.Wrap(err, "get stat error") + } + file, err := os.OpenFile(path, os.O_RDONLY, 0666) + if err != nil { + return nil, errors.Wrap(err, "open file error") + } + if stat.Size() < 1024*1024*5 { + return c.highwayUploadByBDH(file, cmdId, ticket, ext) + } + type BlockMetaData struct { + Id int + BeginOffset int64 + EndOffset int64 + Uploaded bool + Uploading bool + } + h := md5.New() + _, _ = io.Copy(h, file) + fh := h.Sum(nil) + var blockSize int64 = 1024 * 1024 + var blocks []*BlockMetaData + var rspExt []byte + // Init Blocks + { + var temp int64 = 0 + for temp+blockSize < stat.Size() { + blocks = append(blocks, &BlockMetaData{ + Id: len(blocks), + BeginOffset: temp, + EndOffset: temp + blockSize, + }) + temp += blockSize + } + blocks = append(blocks, &BlockMetaData{ + Id: len(blocks), + BeginOffset: temp, + EndOffset: stat.Size(), + }) + } + var nextLock sync.Mutex + nextBlockId := func() int { + nextLock.Lock() + defer nextLock.Unlock() + for i, block := range blocks { + if !block.Uploading && !block.Uploaded { + block.Uploading = true + return i + } + } + return -1 + } + doUpload := func() error { + conn, err := net.DialTimeout("tcp", c.srvSsoAddrs[0], time.Second*20) + if err != nil { + return errors.Wrap(err, "connect error") + } + defer conn.Close() + chunk, _ := os.OpenFile(path, os.O_RDONLY, 0666) + reader := binary.NewNetworkReader(conn) + if err = c.highwaySendHeartbreak(conn); err != nil { + return errors.Wrap(err, "echo error") + } + if _, _, err = highwayReadResponse(reader); err != nil { + return errors.Wrap(err, "echo error") + } + for { + nextId := nextBlockId() + if nextId == -1 { + break + } + block := blocks[nextId] + buffer := make([]byte, blockSize) + _, _ = chunk.Seek(block.BeginOffset, io.SeekStart) + ri, err := io.ReadFull(chunk, buffer) + if err != nil { + if err == io.EOF { + break + } + if err == io.ErrUnexpectedEOF { + buffer = buffer[:ri] + } else { + return err + } + } + ch := md5.Sum(buffer) + head, _ := proto.Marshal(&pb.ReqDataHighwayHead{ + MsgBasehead: &pb.DataHighwayHead{ + Version: 1, + Uin: strconv.FormatInt(c.Uin, 10), + Command: "PicUp.DataUp", + Seq: c.nextGroupDataTransSeq(), + Appid: int32(c.version.AppId), + Dataflag: 4096, + CommandId: cmdId, + LocaleId: 2052, + }, + MsgSeghead: &pb.SegHead{ + Filesize: stat.Size(), + Dataoffset: block.BeginOffset, + Datalength: int32(ri), + Serviceticket: ticket, + Md5: ch[:], + FileMd5: fh[:], + }, + ReqExtendinfo: ext, + }) + _, err = conn.Write(binary.NewWriterF(func(w *binary.Writer) { + w.WriteByte(40) + w.WriteUInt32(uint32(len(head))) + w.WriteUInt32(uint32(len(buffer))) + w.Write(head) + w.Write(buffer) + w.WriteByte(41) + })) + if err != nil { + return errors.Wrap(err, "write conn error") + } + rspHead, _, err := highwayReadResponse(reader) + if err != nil { + return errors.Wrap(err, "highway upload error") + } + if rspHead.ErrorCode != 0 { + return errors.New("upload failed") + } + if rspHead.RspExtendinfo != nil { + rspExt = rspHead.RspExtendinfo + } + block.Uploaded = true + } + return nil + } + wg := sync.WaitGroup{} + wg.Add(threadCount) + var lastErr error + for i := 0; i < threadCount; i++ { + go func() { + defer wg.Done() + if err := doUpload(); err != nil { + lastErr = err + } + }() + } + wg.Wait() + return rspExt, err +} + func (c *QQClient) highwaySendHeartbreak(conn net.Conn) error { head, _ := proto.Marshal(&pb.ReqDataHighwayHead{ MsgBasehead: &pb.DataHighwayHead{