1
0
mirror of https://github.com/Mrs4s/MiraiGo.git synced 2025-05-04 19:17:38 +08:00

highway: reduce memcpy, memalloc by using net.Buffers

This commit is contained in:
wdvxdr 2022-02-23 22:19:01 +08:00
parent e287cbfabd
commit c515024783
No known key found for this signature in database
GPG Key ID: 703F8C071DE7A1B6
3 changed files with 57 additions and 31 deletions

View File

@ -12,6 +12,7 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"github.com/Mrs4s/MiraiGo/binary" "github.com/Mrs4s/MiraiGo/binary"
"github.com/Mrs4s/MiraiGo/client/internal/network"
"github.com/Mrs4s/MiraiGo/client/pb" "github.com/Mrs4s/MiraiGo/client/pb"
"github.com/Mrs4s/MiraiGo/internal/proto" "github.com/Mrs4s/MiraiGo/internal/proto"
"github.com/Mrs4s/MiraiGo/utils" "github.com/Mrs4s/MiraiGo/utils"
@ -81,8 +82,6 @@ func (s *Session) UploadBDH(input BdhInput) ([]byte, error) {
var rspExt []byte var rspExt []byte
offset := 0 offset := 0
chunk := make([]byte, chunkSize) chunk := make([]byte, chunkSize)
w := binary.SelectWriter()
defer binary.PutWriter(w)
for { for {
chunk = chunk[:chunkSize] chunk = chunk[:chunkSize]
rl, err := io.ReadFull(input.Body, chunk) rl, err := io.ReadFull(input.Body, chunk)
@ -106,9 +105,7 @@ func (s *Session) UploadBDH(input BdhInput) ([]byte, error) {
ReqExtendinfo: input.Ext, ReqExtendinfo: input.Ext,
}) })
offset += rl offset += rl
w.Reset() _, err = io.Copy(conn, network.HeadBodyFrame(head, chunk))
writeHeadBody(w, head, chunk)
_, err = conn.Write(w.Bytes())
if err != nil { if err != nil {
return nil, errors.Wrap(err, "write conn error") return nil, errors.Wrap(err, "write conn error")
} }
@ -193,9 +190,6 @@ func (s *Session) UploadBDHMultiThread(input BdhMultiThreadInput, threadCount in
} }
buffer := make([]byte, blockSize) buffer := make([]byte, blockSize)
w := binary.SelectWriter()
w.Reset()
w.Grow(600 * 1024) // 复用,600k 不要放回池中
for { for {
nextId := atomic.AddUint32(&BlockId, 1) nextId := atomic.AddUint32(&BlockId, 1)
if nextId >= uint32(len(blocks)) { if nextId >= uint32(len(blocks)) {
@ -238,9 +232,7 @@ func (s *Session) UploadBDHMultiThread(input BdhMultiThreadInput, threadCount in
}, },
ReqExtendinfo: input.Ext, ReqExtendinfo: input.Ext,
}) })
w.Reset() _, err = io.Copy(conn, network.HeadBodyFrame(head, buffer))
writeHeadBody(w, head, buffer)
_, err = conn.Write(w.Bytes())
if err != nil { if err != nil {
return errors.Wrap(err, "write conn error") return errors.Wrap(err, "write conn error")
} }

View File

@ -1,7 +1,6 @@
package highway package highway
import ( import (
"bytes"
"crypto/md5" "crypto/md5"
"fmt" "fmt"
"io" "io"
@ -13,6 +12,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/Mrs4s/MiraiGo/binary" "github.com/Mrs4s/MiraiGo/binary"
"github.com/Mrs4s/MiraiGo/client/internal/network"
"github.com/Mrs4s/MiraiGo/client/pb" "github.com/Mrs4s/MiraiGo/client/pb"
"github.com/Mrs4s/MiraiGo/internal/proto" "github.com/Mrs4s/MiraiGo/internal/proto"
"github.com/Mrs4s/MiraiGo/utils" "github.com/Mrs4s/MiraiGo/utils"
@ -86,9 +86,7 @@ func (s *Session) Upload(addr Addr, input Input) error {
ReqExtendinfo: []byte{}, ReqExtendinfo: []byte{},
}) })
offset += rl offset += rl
w.Reset() _, err = io.Copy(conn, network.HeadBodyFrame(head, chunk))
writeHeadBody(w, head, chunk)
_, err = conn.Write(w.Bytes())
if err != nil { if err != nil {
return errors.Wrap(err, "write conn error") return errors.Wrap(err, "write conn error")
} }
@ -147,9 +145,7 @@ func (s *Session) UploadExciting(input ExcitingInput) ([]byte, error) {
ReqExtendinfo: input.Ext, ReqExtendinfo: input.Ext,
}) })
offset += int64(rl) offset += int64(rl)
w.Reset() req, _ := http.NewRequest("POST", url, network.HeadBodyFrame(head, chunk))
writeHeadBody(w, head, chunk)
req, _ := http.NewRequest("POST", url, bytes.NewReader(w.Bytes()))
req.Header.Set("Accept", "*/*") req.Header.Set("Accept", "*/*")
req.Header.Set("Connection", "Keep-Alive") req.Header.Set("Connection", "Keep-Alive")
req.Header.Set("User-Agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)") req.Header.Set("User-Agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)")
@ -210,10 +206,7 @@ func (s *Session) sendHeartbreak(conn net.Conn) error {
LocaleId: 2052, LocaleId: 2052,
}, },
}) })
w := binary.SelectWriter() _, err := io.Copy(conn, network.HeadBodyFrame(head, nil))
writeHeadBody(w, head, nil)
_, err := conn.Write(w.Bytes())
binary.PutWriter(w)
return err return err
} }
@ -228,15 +221,6 @@ func (s *Session) sendEcho(conn net.Conn) error {
return nil return nil
} }
func writeHeadBody(w *binary.Writer, head []byte, body []byte) {
w.WriteByte(40)
w.WriteUInt32(uint32(len(head)))
w.WriteUInt32(uint32(len(body)))
w.Write(head)
w.Write(body)
w.WriteByte(41)
}
func readResponse(r *binary.NetworkReader) (*pb.RspDataHighwayHead, []byte, error) { func readResponse(r *binary.NetworkReader) (*pb.RspDataHighwayHead, []byte, error) {
_, err := r.ReadByte() _, err := r.ReadByte()
if err != nil { if err != nil {

View File

@ -0,0 +1,50 @@
package network
import (
"encoding/binary"
"io"
"net"
"sync"
)
var etx = []byte{0x29}
type Buffers struct {
net.Buffers
}
var pool = sync.Pool{
New: func() interface{} {
lenHead := make([]byte, 9)
lenHead[0] = 0x28
return &Buffers{net.Buffers{lenHead, nil, nil, etx}}
},
}
func (b *Buffers) WriteTo(w io.Writer) (n int64, err error) {
defer pool.Put(b) // implement auto put to pool
return b.Buffers.WriteTo(w)
}
// HeadBodyFrame 包格式
// * STX
// * head length
// * body length
// * head data
// * body data
// * ETX
// 节省内存, 可被go runtime优化为writev操作
func HeadBodyFrame(head []byte, body []byte) *Buffers {
b := pool.Get().(*Buffers)
if len(b.Buffers) == 0 {
lenHead := make([]byte, 9)
lenHead[0] = 0x28
b.Buffers = net.Buffers{lenHead, nil, nil, etx}
}
b.Buffers[2] = body
b.Buffers[1] = head
_ = b.Buffers[0][8]
binary.BigEndian.PutUint32(b.Buffers[0][1:], uint32(len(head)))
binary.BigEndian.PutUint32(b.Buffers[0][5:], uint32(len(body)))
return b
}