From d4f4def17d24d9e3c3a9751b6f2aa43341cae4ef Mon Sep 17 00:00:00 2001 From: wdvxdr Date: Fri, 11 Jun 2021 13:27:35 +0800 Subject: [PATCH] Revert "feat: upload with sendfile" This reverts commit cdb83e54 --- binary/pool.go | 35 ++++++++ binary/tea.go | 6 +- client/highway.go | 199 +++++++++++++++++++++++++++------------------- client/image.go | 2 +- 4 files changed, 158 insertions(+), 84 deletions(-) diff --git a/binary/pool.go b/binary/pool.go index a7a53284..645d10a4 100644 --- a/binary/pool.go +++ b/binary/pool.go @@ -90,3 +90,38 @@ func releaseZlibWriter(w *zlibWriter) { zlibPool.Put(w) } } + +const size128k = 128 * 1024 + +var b128kPool = sync.Pool{ + New: func() interface{} { + return make128kSlicePointer() + }, +} + +// Get128KBytes 获取一个128k大小 []byte +func Get128KBytes() *[]byte { + buf := b128kPool.Get().(*[]byte) + if buf == nil { + return make128kSlicePointer() + } + if cap(*buf) < size128k { + return make128kSlicePointer() + } + *buf = (*buf)[:size128k] + return buf +} + +// Put128KBytes 放回一个128k大小 []byte +func Put128KBytes(b *[]byte) { + if cap(*b) < size128k || cap(*b) > 2*size128k { // 太大或太小的 []byte 不要放入 + return + } + *b = (*b)[:cap(*b)] + b128kPool.Put(b) +} + +func make128kSlicePointer() *[]byte { + data := make([]byte, size128k) + return &data +} diff --git a/binary/tea.go b/binary/tea.go index 218aebc5..ed64e374 100644 --- a/binary/tea.go +++ b/binary/tea.go @@ -3,12 +3,14 @@ package binary import ( "encoding/binary" "math/rand" + "reflect" "unsafe" ) func xorQ(a, b []byte, c []byte) { // MAGIC - *(*uint64)(unsafe.Pointer(&c[0])) = - *(*uint64)(unsafe.Pointer(&a[0])) ^ *(*uint64)(unsafe.Pointer(&b[0])) + *(*uint64)(unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&c)).Data)) = + *(*uint64)(unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&a)).Data)) ^ + *(*uint64)(unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&b)).Data)) } type TEA [4]uint32 diff --git a/client/highway.go b/client/highway.go index 90c92a2b..3baef8c2 100644 --- a/client/highway.go +++ b/client/highway.go @@ -37,52 +37,71 @@ func (c *QQClient) highwayUploadStream(ip uint32, port int, updKey []byte, strea h := md5.New() length, _ := io.Copy(h, stream) fh := h.Sum(nil) + const chunkSize = 8192 * 8 _, _ = stream.Seek(0, io.SeekStart) conn, err := net.DialTCP("tcp", nil, &addr) if err != nil { return errors.Wrap(err, "connect error") } defer conn.Close() + offset := 0 reader := binary.NewNetworkReader(conn) - head, _ := proto.Marshal(&pb.ReqDataHighwayHead{ - MsgBasehead: &pb.DataHighwayHead{ - Version: 1, - Uin: strconv.FormatInt(c.Uin, 10), - Command: "PicUp.DataUp", - Seq: c.nextGroupDataTransSeq(), - Appid: int32(c.version.AppId), - Dataflag: 4096, - CommandId: cmdId, - LocaleId: 2052, - }, - MsgSeghead: &pb.SegHead{ - Filesize: length, - Dataoffset: int64(0), - Datalength: int32(length), - Serviceticket: updKey, - Md5: fh, - FileMd5: fh, - }, - ReqExtendinfo: EmptyBytes, - }) + chunk := *binary.Get128KBytes() + defer func() { // 延迟捕获 chunk + binary.Put128KBytes(&chunk) + }() w := binary.NewWriter() defer binary.PutBuffer(w) - w.WriteByte(40) - w.WriteUInt32(uint32(len(head))) - w.WriteUInt32(uint32(length)) - w.Write(head) - _, _ = conn.Write(w.Bytes()) - _, _ = conn.ReadFrom(stream) - _, err = conn.Write([]byte{41}) - if err != nil { - return errors.Wrap(err, "write conn error") - } - rspHead, _, err := highwayReadResponse(reader) - if err != nil { - return errors.Wrap(err, "highway upload error") - } - if rspHead.ErrorCode != 0 { - return errors.New("upload failed") + for { + chunk = chunk[:chunkSize] + rl, err := io.ReadFull(stream, chunk) + if errors.Is(err, io.EOF) { + break + } + if errors.Is(err, io.ErrUnexpectedEOF) { + chunk = chunk[:rl] + } + ch := md5.Sum(chunk) + head, _ := proto.Marshal(&pb.ReqDataHighwayHead{ + MsgBasehead: &pb.DataHighwayHead{ + Version: 1, + Uin: strconv.FormatInt(c.Uin, 10), + Command: "PicUp.DataUp", + Seq: c.nextGroupDataTransSeq(), + Appid: int32(c.version.AppId), + Dataflag: 4096, + CommandId: cmdId, + LocaleId: 2052, + }, + MsgSeghead: &pb.SegHead{ + Filesize: length, + Dataoffset: int64(offset), + Datalength: int32(rl), + Serviceticket: updKey, + Md5: ch[:], + FileMd5: fh, + }, + ReqExtendinfo: EmptyBytes, + }) + offset += rl + w.Reset() + w.WriteByte(40) + w.WriteUInt32(uint32(len(head))) + w.WriteUInt32(uint32(len(chunk))) + w.Write(head) + w.Write(chunk) + w.WriteByte(41) + _, err = conn.Write(w.Bytes()) + if err != nil { + return errors.Wrap(err, "write conn error") + } + rspHead, _, err := highwayReadResponse(reader) + if err != nil { + return errors.Wrap(err, "highway upload error") + } + if rspHead.ErrorCode != 0 { + return errors.New("upload failed") + } } return nil } @@ -97,11 +116,13 @@ func (c *QQClient) highwayUploadByBDH(stream io.Reader, length int64, cmdId int3 } ext = binary.NewTeaCipher(c.bigDataSession.SessionKey).Encrypt(ext) } + const chunkSize = 8192 * 16 conn, err := net.DialTimeout("tcp", c.srvSsoAddrs[0], time.Second*20) if err != nil { return nil, errors.Wrap(err, "connect error") } defer conn.Close() + offset := 0 reader := binary.NewNetworkReader(conn) if err = c.highwaySendHeartbreak(conn); err != nil { return nil, errors.Wrap(err, "echo error") @@ -110,52 +131,68 @@ func (c *QQClient) highwayUploadByBDH(stream io.Reader, length int64, cmdId int3 return nil, errors.Wrap(err, "echo error") } var rspExt []byte - head, _ := proto.Marshal(&pb.ReqDataHighwayHead{ - MsgBasehead: &pb.DataHighwayHead{ - Version: 1, - Uin: strconv.FormatInt(c.Uin, 10), - Command: "PicUp.DataUp", - Seq: c.nextGroupDataTransSeq(), - Appid: int32(c.version.AppId), - Dataflag: 4096, - CommandId: cmdId, - LocaleId: 2052, - }, - MsgSeghead: &pb.SegHead{ - Filesize: length, - Dataoffset: int64(0), - Datalength: int32(length), - Serviceticket: ticket, - Md5: sum, - FileMd5: sum, - }, - ReqExtendinfo: ext, - }) + chunk := *binary.Get128KBytes() + defer func() { // 延迟捕获 chunk + binary.Put128KBytes(&chunk) + }() w := binary.NewWriter() defer binary.PutBuffer(w) - w.Reset() - w.WriteByte(40) - w.WriteUInt32(uint32(len(head))) - w.WriteUInt32(uint32(length)) - w.Write(head) - _, _ = conn.Write(w.Bytes()) - _, _ = io.Copy(conn, stream) - _, err = conn.Write([]byte{41}) - if err != nil { - return nil, errors.Wrap(err, "write conn error") - } - rspHead, _, err := highwayReadResponse(reader) - if err != nil { - return nil, errors.Wrap(err, "highway upload error") - } - if rspHead.ErrorCode != 0 { - return nil, errors.Errorf("upload failed: %d", rspHead.ErrorCode) - } - if rspHead.RspExtendinfo != nil { - rspExt = rspHead.RspExtendinfo - } - if rspHead.MsgSeghead != nil && rspHead.MsgSeghead.Serviceticket != nil { - ticket = rspHead.MsgSeghead.Serviceticket + for { + chunk = chunk[:chunkSize] + rl, err := io.ReadFull(stream, chunk) + if errors.Is(err, io.EOF) { + break + } + if errors.Is(err, io.ErrUnexpectedEOF) { + chunk = chunk[:rl] + } + ch := md5.Sum(chunk) + head, _ := proto.Marshal(&pb.ReqDataHighwayHead{ + MsgBasehead: &pb.DataHighwayHead{ + Version: 1, + Uin: strconv.FormatInt(c.Uin, 10), + Command: "PicUp.DataUp", + Seq: c.nextGroupDataTransSeq(), + Appid: int32(c.version.AppId), + Dataflag: 4096, + CommandId: cmdId, + LocaleId: 2052, + }, + MsgSeghead: &pb.SegHead{ + Filesize: length, + Dataoffset: int64(offset), + Datalength: int32(rl), + Serviceticket: ticket, + Md5: ch[:], + FileMd5: sum, + }, + ReqExtendinfo: ext, + }) + offset += rl + w.Reset() + w.WriteByte(40) + w.WriteUInt32(uint32(len(head))) + w.WriteUInt32(uint32(len(chunk))) + w.Write(head) + w.Write(chunk) + w.WriteByte(41) + _, err = conn.Write(w.Bytes()) + if err != nil { + return nil, errors.Wrap(err, "write conn error") + } + rspHead, _, err := highwayReadResponse(reader) + if err != nil { + return nil, errors.Wrap(err, "highway upload error") + } + if rspHead.ErrorCode != 0 { + return nil, errors.Errorf("upload failed: %d", rspHead.ErrorCode) + } + if rspHead.RspExtendinfo != nil { + rspExt = rspHead.RspExtendinfo + } + if rspHead.MsgSeghead != nil && rspHead.MsgSeghead.Serviceticket != nil { + ticket = rspHead.MsgSeghead.Serviceticket + } } return rspExt, nil } diff --git a/client/image.go b/client/image.go index fce6383d..c239a40b 100644 --- a/client/image.go +++ b/client/image.go @@ -50,7 +50,7 @@ func (c *QQClient) UploadGroupImage(groupCode int64, img io.ReadSeeker) (*messag c.srvSsoAddrs = append(c.srvSsoAddrs, fmt.Sprintf("%v:%v", binary.UInt32ToIPV4Address(uint32(addr)), rsp.UploadPort[i])) } } - if _, err = c.highwayUploadByBDH(img, length, 2, rsp.UploadKey, fh, EmptyBytes, false); err == nil { + if _, err = c.highwayUploadByBDH(img, length, 2, rsp.UploadKey, EmptyBytes, fh, false); err == nil { goto ok } return nil, errors.Wrap(err, "upload failed")