diff --git a/client/internal/highway/bdh.go b/client/internal/highway/bdh.go index 5fab686a..4d76a182 100644 --- a/client/internal/highway/bdh.go +++ b/client/internal/highway/bdh.go @@ -12,6 +12,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/Mrs4s/MiraiGo/binary" + "github.com/Mrs4s/MiraiGo/client/internal/network" "github.com/Mrs4s/MiraiGo/client/pb" "github.com/Mrs4s/MiraiGo/internal/proto" "github.com/Mrs4s/MiraiGo/utils" @@ -81,8 +82,6 @@ func (s *Session) UploadBDH(input BdhInput) ([]byte, error) { var rspExt []byte offset := 0 chunk := make([]byte, chunkSize) - w := binary.SelectWriter() - defer binary.PutWriter(w) for { chunk = chunk[:chunkSize] rl, err := io.ReadFull(input.Body, chunk) @@ -106,9 +105,7 @@ func (s *Session) UploadBDH(input BdhInput) ([]byte, error) { ReqExtendinfo: input.Ext, }) offset += rl - w.Reset() - writeHeadBody(w, head, chunk) - _, err = conn.Write(w.Bytes()) + _, err = io.Copy(conn, network.HeadBodyFrame(head, chunk)) if err != nil { return nil, errors.Wrap(err, "write conn error") } @@ -193,9 +190,6 @@ func (s *Session) UploadBDHMultiThread(input BdhMultiThreadInput, threadCount in } buffer := make([]byte, blockSize) - w := binary.SelectWriter() - w.Reset() - w.Grow(600 * 1024) // 复用,600k 不要放回池中 for { nextId := atomic.AddUint32(&BlockId, 1) if nextId >= uint32(len(blocks)) { @@ -238,9 +232,7 @@ func (s *Session) UploadBDHMultiThread(input BdhMultiThreadInput, threadCount in }, ReqExtendinfo: input.Ext, }) - w.Reset() - writeHeadBody(w, head, buffer) - _, err = conn.Write(w.Bytes()) + _, err = io.Copy(conn, network.HeadBodyFrame(head, buffer)) if err != nil { return errors.Wrap(err, "write conn error") } diff --git a/client/internal/highway/highway.go b/client/internal/highway/highway.go index 7e74021d..81cd0ffd 100644 --- a/client/internal/highway/highway.go +++ b/client/internal/highway/highway.go @@ -1,7 +1,6 @@ package highway import ( - "bytes" "crypto/md5" "fmt" "io" @@ -13,6 +12,7 @@ import ( "github.com/pkg/errors" "github.com/Mrs4s/MiraiGo/binary" + "github.com/Mrs4s/MiraiGo/client/internal/network" "github.com/Mrs4s/MiraiGo/client/pb" "github.com/Mrs4s/MiraiGo/internal/proto" "github.com/Mrs4s/MiraiGo/utils" @@ -86,9 +86,7 @@ func (s *Session) Upload(addr Addr, input Input) error { ReqExtendinfo: []byte{}, }) offset += rl - w.Reset() - writeHeadBody(w, head, chunk) - _, err = conn.Write(w.Bytes()) + _, err = io.Copy(conn, network.HeadBodyFrame(head, chunk)) if err != nil { return errors.Wrap(err, "write conn error") } @@ -147,9 +145,7 @@ func (s *Session) UploadExciting(input ExcitingInput) ([]byte, error) { ReqExtendinfo: input.Ext, }) offset += int64(rl) - w.Reset() - writeHeadBody(w, head, chunk) - req, _ := http.NewRequest("POST", url, bytes.NewReader(w.Bytes())) + req, _ := http.NewRequest("POST", url, network.HeadBodyFrame(head, chunk)) req.Header.Set("Accept", "*/*") req.Header.Set("Connection", "Keep-Alive") req.Header.Set("User-Agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)") @@ -210,10 +206,7 @@ func (s *Session) sendHeartbreak(conn net.Conn) error { LocaleId: 2052, }, }) - w := binary.SelectWriter() - writeHeadBody(w, head, nil) - _, err := conn.Write(w.Bytes()) - binary.PutWriter(w) + _, err := io.Copy(conn, network.HeadBodyFrame(head, nil)) return err } @@ -228,15 +221,6 @@ func (s *Session) sendEcho(conn net.Conn) error { return nil } -func writeHeadBody(w *binary.Writer, head []byte, body []byte) { - w.WriteByte(40) - w.WriteUInt32(uint32(len(head))) - w.WriteUInt32(uint32(len(body))) - w.Write(head) - w.Write(body) - w.WriteByte(41) -} - func readResponse(r *binary.NetworkReader) (*pb.RspDataHighwayHead, []byte, error) { _, err := r.ReadByte() if err != nil { diff --git a/client/internal/network/frame.go b/client/internal/network/frame.go new file mode 100644 index 00000000..c272a64a --- /dev/null +++ b/client/internal/network/frame.go @@ -0,0 +1,50 @@ +package network + +import ( + "encoding/binary" + "io" + "net" + "sync" +) + +var etx = []byte{0x29} + +type Buffers struct { + net.Buffers +} + +var pool = sync.Pool{ + New: func() interface{} { + lenHead := make([]byte, 9) + lenHead[0] = 0x28 + return &Buffers{net.Buffers{lenHead, nil, nil, etx}} + }, +} + +func (b *Buffers) WriteTo(w io.Writer) (n int64, err error) { + defer pool.Put(b) // implement auto put to pool + return b.Buffers.WriteTo(w) +} + +// HeadBodyFrame 包格式 +// * STX +// * head length +// * body length +// * head data +// * body data +// * ETX +// 节省内存, 可被go runtime优化为writev操作 +func HeadBodyFrame(head []byte, body []byte) *Buffers { + b := pool.Get().(*Buffers) + if len(b.Buffers) == 0 { + lenHead := make([]byte, 9) + lenHead[0] = 0x28 + b.Buffers = net.Buffers{lenHead, nil, nil, etx} + } + b.Buffers[2] = body + b.Buffers[1] = head + _ = b.Buffers[0][8] + binary.BigEndian.PutUint32(b.Buffers[0][1:], uint32(len(head))) + binary.BigEndian.PutUint32(b.Buffers[0][5:], uint32(len(body))) + return b +}