1
0
mirror of https://github.com/Mrs4s/MiraiGo.git synced 2025-05-04 19:17:38 +08:00

Revert "feat: upload with sendfile"

This reverts commit cdb83e54
This commit is contained in:
wdvxdr 2021-06-11 13:27:35 +08:00
parent d4b216a0ed
commit d4f4def17d
No known key found for this signature in database
GPG Key ID: 55FF1414A69CEBA6
4 changed files with 158 additions and 84 deletions

View File

@ -90,3 +90,38 @@ func releaseZlibWriter(w *zlibWriter) {
zlibPool.Put(w) zlibPool.Put(w)
} }
} }
const size128k = 128 * 1024
var b128kPool = sync.Pool{
New: func() interface{} {
return make128kSlicePointer()
},
}
// Get128KBytes 获取一个128k大小 []byte
func Get128KBytes() *[]byte {
buf := b128kPool.Get().(*[]byte)
if buf == nil {
return make128kSlicePointer()
}
if cap(*buf) < size128k {
return make128kSlicePointer()
}
*buf = (*buf)[:size128k]
return buf
}
// Put128KBytes 放回一个128k大小 []byte
func Put128KBytes(b *[]byte) {
if cap(*b) < size128k || cap(*b) > 2*size128k { // 太大或太小的 []byte 不要放入
return
}
*b = (*b)[:cap(*b)]
b128kPool.Put(b)
}
func make128kSlicePointer() *[]byte {
data := make([]byte, size128k)
return &data
}

View File

@ -3,12 +3,14 @@ package binary
import ( import (
"encoding/binary" "encoding/binary"
"math/rand" "math/rand"
"reflect"
"unsafe" "unsafe"
) )
func xorQ(a, b []byte, c []byte) { // MAGIC func xorQ(a, b []byte, c []byte) { // MAGIC
*(*uint64)(unsafe.Pointer(&c[0])) = *(*uint64)(unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&c)).Data)) =
*(*uint64)(unsafe.Pointer(&a[0])) ^ *(*uint64)(unsafe.Pointer(&b[0])) *(*uint64)(unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&a)).Data)) ^
*(*uint64)(unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&b)).Data))
} }
type TEA [4]uint32 type TEA [4]uint32

View File

@ -37,52 +37,71 @@ func (c *QQClient) highwayUploadStream(ip uint32, port int, updKey []byte, strea
h := md5.New() h := md5.New()
length, _ := io.Copy(h, stream) length, _ := io.Copy(h, stream)
fh := h.Sum(nil) fh := h.Sum(nil)
const chunkSize = 8192 * 8
_, _ = stream.Seek(0, io.SeekStart) _, _ = stream.Seek(0, io.SeekStart)
conn, err := net.DialTCP("tcp", nil, &addr) conn, err := net.DialTCP("tcp", nil, &addr)
if err != nil { if err != nil {
return errors.Wrap(err, "connect error") return errors.Wrap(err, "connect error")
} }
defer conn.Close() defer conn.Close()
offset := 0
reader := binary.NewNetworkReader(conn) reader := binary.NewNetworkReader(conn)
head, _ := proto.Marshal(&pb.ReqDataHighwayHead{ chunk := *binary.Get128KBytes()
MsgBasehead: &pb.DataHighwayHead{ defer func() { // 延迟捕获 chunk
Version: 1, binary.Put128KBytes(&chunk)
Uin: strconv.FormatInt(c.Uin, 10), }()
Command: "PicUp.DataUp",
Seq: c.nextGroupDataTransSeq(),
Appid: int32(c.version.AppId),
Dataflag: 4096,
CommandId: cmdId,
LocaleId: 2052,
},
MsgSeghead: &pb.SegHead{
Filesize: length,
Dataoffset: int64(0),
Datalength: int32(length),
Serviceticket: updKey,
Md5: fh,
FileMd5: fh,
},
ReqExtendinfo: EmptyBytes,
})
w := binary.NewWriter() w := binary.NewWriter()
defer binary.PutBuffer(w) defer binary.PutBuffer(w)
w.WriteByte(40) for {
w.WriteUInt32(uint32(len(head))) chunk = chunk[:chunkSize]
w.WriteUInt32(uint32(length)) rl, err := io.ReadFull(stream, chunk)
w.Write(head) if errors.Is(err, io.EOF) {
_, _ = conn.Write(w.Bytes()) break
_, _ = conn.ReadFrom(stream) }
_, err = conn.Write([]byte{41}) if errors.Is(err, io.ErrUnexpectedEOF) {
if err != nil { chunk = chunk[:rl]
return errors.Wrap(err, "write conn error") }
} ch := md5.Sum(chunk)
rspHead, _, err := highwayReadResponse(reader) head, _ := proto.Marshal(&pb.ReqDataHighwayHead{
if err != nil { MsgBasehead: &pb.DataHighwayHead{
return errors.Wrap(err, "highway upload error") Version: 1,
} Uin: strconv.FormatInt(c.Uin, 10),
if rspHead.ErrorCode != 0 { Command: "PicUp.DataUp",
return errors.New("upload failed") Seq: c.nextGroupDataTransSeq(),
Appid: int32(c.version.AppId),
Dataflag: 4096,
CommandId: cmdId,
LocaleId: 2052,
},
MsgSeghead: &pb.SegHead{
Filesize: length,
Dataoffset: int64(offset),
Datalength: int32(rl),
Serviceticket: updKey,
Md5: ch[:],
FileMd5: fh,
},
ReqExtendinfo: EmptyBytes,
})
offset += rl
w.Reset()
w.WriteByte(40)
w.WriteUInt32(uint32(len(head)))
w.WriteUInt32(uint32(len(chunk)))
w.Write(head)
w.Write(chunk)
w.WriteByte(41)
_, err = conn.Write(w.Bytes())
if err != nil {
return errors.Wrap(err, "write conn error")
}
rspHead, _, err := highwayReadResponse(reader)
if err != nil {
return errors.Wrap(err, "highway upload error")
}
if rspHead.ErrorCode != 0 {
return errors.New("upload failed")
}
} }
return nil return nil
} }
@ -97,11 +116,13 @@ func (c *QQClient) highwayUploadByBDH(stream io.Reader, length int64, cmdId int3
} }
ext = binary.NewTeaCipher(c.bigDataSession.SessionKey).Encrypt(ext) ext = binary.NewTeaCipher(c.bigDataSession.SessionKey).Encrypt(ext)
} }
const chunkSize = 8192 * 16
conn, err := net.DialTimeout("tcp", c.srvSsoAddrs[0], time.Second*20) conn, err := net.DialTimeout("tcp", c.srvSsoAddrs[0], time.Second*20)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "connect error") return nil, errors.Wrap(err, "connect error")
} }
defer conn.Close() defer conn.Close()
offset := 0
reader := binary.NewNetworkReader(conn) reader := binary.NewNetworkReader(conn)
if err = c.highwaySendHeartbreak(conn); err != nil { if err = c.highwaySendHeartbreak(conn); err != nil {
return nil, errors.Wrap(err, "echo error") return nil, errors.Wrap(err, "echo error")
@ -110,52 +131,68 @@ func (c *QQClient) highwayUploadByBDH(stream io.Reader, length int64, cmdId int3
return nil, errors.Wrap(err, "echo error") return nil, errors.Wrap(err, "echo error")
} }
var rspExt []byte var rspExt []byte
head, _ := proto.Marshal(&pb.ReqDataHighwayHead{ chunk := *binary.Get128KBytes()
MsgBasehead: &pb.DataHighwayHead{ defer func() { // 延迟捕获 chunk
Version: 1, binary.Put128KBytes(&chunk)
Uin: strconv.FormatInt(c.Uin, 10), }()
Command: "PicUp.DataUp",
Seq: c.nextGroupDataTransSeq(),
Appid: int32(c.version.AppId),
Dataflag: 4096,
CommandId: cmdId,
LocaleId: 2052,
},
MsgSeghead: &pb.SegHead{
Filesize: length,
Dataoffset: int64(0),
Datalength: int32(length),
Serviceticket: ticket,
Md5: sum,
FileMd5: sum,
},
ReqExtendinfo: ext,
})
w := binary.NewWriter() w := binary.NewWriter()
defer binary.PutBuffer(w) defer binary.PutBuffer(w)
w.Reset() for {
w.WriteByte(40) chunk = chunk[:chunkSize]
w.WriteUInt32(uint32(len(head))) rl, err := io.ReadFull(stream, chunk)
w.WriteUInt32(uint32(length)) if errors.Is(err, io.EOF) {
w.Write(head) break
_, _ = conn.Write(w.Bytes()) }
_, _ = io.Copy(conn, stream) if errors.Is(err, io.ErrUnexpectedEOF) {
_, err = conn.Write([]byte{41}) chunk = chunk[:rl]
if err != nil { }
return nil, errors.Wrap(err, "write conn error") ch := md5.Sum(chunk)
} head, _ := proto.Marshal(&pb.ReqDataHighwayHead{
rspHead, _, err := highwayReadResponse(reader) MsgBasehead: &pb.DataHighwayHead{
if err != nil { Version: 1,
return nil, errors.Wrap(err, "highway upload error") Uin: strconv.FormatInt(c.Uin, 10),
} Command: "PicUp.DataUp",
if rspHead.ErrorCode != 0 { Seq: c.nextGroupDataTransSeq(),
return nil, errors.Errorf("upload failed: %d", rspHead.ErrorCode) Appid: int32(c.version.AppId),
} Dataflag: 4096,
if rspHead.RspExtendinfo != nil { CommandId: cmdId,
rspExt = rspHead.RspExtendinfo LocaleId: 2052,
} },
if rspHead.MsgSeghead != nil && rspHead.MsgSeghead.Serviceticket != nil { MsgSeghead: &pb.SegHead{
ticket = rspHead.MsgSeghead.Serviceticket Filesize: length,
Dataoffset: int64(offset),
Datalength: int32(rl),
Serviceticket: ticket,
Md5: ch[:],
FileMd5: sum,
},
ReqExtendinfo: ext,
})
offset += rl
w.Reset()
w.WriteByte(40)
w.WriteUInt32(uint32(len(head)))
w.WriteUInt32(uint32(len(chunk)))
w.Write(head)
w.Write(chunk)
w.WriteByte(41)
_, err = conn.Write(w.Bytes())
if err != nil {
return nil, errors.Wrap(err, "write conn error")
}
rspHead, _, err := highwayReadResponse(reader)
if err != nil {
return nil, errors.Wrap(err, "highway upload error")
}
if rspHead.ErrorCode != 0 {
return nil, errors.Errorf("upload failed: %d", rspHead.ErrorCode)
}
if rspHead.RspExtendinfo != nil {
rspExt = rspHead.RspExtendinfo
}
if rspHead.MsgSeghead != nil && rspHead.MsgSeghead.Serviceticket != nil {
ticket = rspHead.MsgSeghead.Serviceticket
}
} }
return rspExt, nil return rspExt, nil
} }

View File

@ -50,7 +50,7 @@ func (c *QQClient) UploadGroupImage(groupCode int64, img io.ReadSeeker) (*messag
c.srvSsoAddrs = append(c.srvSsoAddrs, fmt.Sprintf("%v:%v", binary.UInt32ToIPV4Address(uint32(addr)), rsp.UploadPort[i])) c.srvSsoAddrs = append(c.srvSsoAddrs, fmt.Sprintf("%v:%v", binary.UInt32ToIPV4Address(uint32(addr)), rsp.UploadPort[i]))
} }
} }
if _, err = c.highwayUploadByBDH(img, length, 2, rsp.UploadKey, fh, EmptyBytes, false); err == nil { if _, err = c.highwayUploadByBDH(img, length, 2, rsp.UploadKey, EmptyBytes, fh, false); err == nil {
goto ok goto ok
} }
return nil, errors.Wrap(err, "upload failed") return nil, errors.Wrap(err, "upload failed")