1
0
mirror of https://github.com/Mrs4s/MiraiGo.git synced 2025-05-04 11:07:40 +08:00

client/internal/highway: allow retry other server

This commit is contained in:
wdvxdr 2023-02-10 23:18:46 +08:00
parent 3bfc20fd2e
commit aa5a8c45fc
4 changed files with 39 additions and 19 deletions

View File

@ -113,7 +113,7 @@ func (c *QQClient) uploadGroupOrGuildImage(target message.Source, img io.ReadSee
Ext: ext, Ext: ext,
} }
if tc > 1 && length > 3*1024*1024 { if tc > 1 && length > 3*1024*1024 {
_, err = c.highwaySession.UploadBDHMultiThread(input, tc) _, err = c.highwaySession.UploadBDHMultiThread(input)
} else { } else {
_, err = c.highwaySession.UploadBDH(input) _, err = c.highwaySession.UploadBDH(input)
} }

View File

@ -36,18 +36,35 @@ func (bdh *Transaction) encrypt(key []byte) error {
return nil return nil
} }
func (s *Session) UploadBDH(trans Transaction) ([]byte, error) { func (s *Session) retry(upload func(s *Session, addr Addr, trans *Transaction) ([]byte, error), trans *Transaction) ([]byte, error) {
if len(s.SsoAddr) == 0 { // try to find a available server
return nil, errors.New("srv addrs not found. maybe miss some packet?") for _, addr := range s.SsoAddr {
r, err := upload(s, addr, trans)
if err == nil {
return r, nil
}
if _, ok := err.(net.Error); ok {
// try another server
// TODO: delete broken servers?
continue
}
return nil, err
}
return nil, errors.New("cannot found available server")
} }
addr := s.SsoAddr[0].String()
func (s *Session) UploadBDH(trans Transaction) ([]byte, error) {
// encrypt ext data
if err := trans.encrypt(s.SessionKey); err != nil { if err := trans.encrypt(s.SessionKey); err != nil {
return nil, err return nil, err
} }
conn, err := net.DialTimeout("tcp", addr, time.Second*20) return s.retry(uploadBDH, &trans)
}
func uploadBDH(s *Session, addr Addr, trans *Transaction) ([]byte, error) {
conn, err := net.DialTimeout("tcp", addr.String(), time.Second*20)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "connect error") return nil, err
} }
defer conn.Close() defer conn.Close()
@ -109,23 +126,23 @@ func (s *Session) UploadBDH(trans Transaction) ([]byte, error) {
return rspExt, nil return rspExt, nil
} }
func (s *Session) UploadBDHMultiThread(trans Transaction, threadCount int) ([]byte, error) { func (s *Session) UploadBDHMultiThread(trans Transaction) ([]byte, error) {
// for small file and small thread count, // for small file and small thread count,
// use UploadBDH instead of UploadBDHMultiThread // use UploadBDH instead of UploadBDHMultiThread
if trans.Size < 1024*1024*3 || threadCount < 2 { if trans.Size < 1024*1024*3 {
return s.UploadBDH(trans) return s.UploadBDH(trans)
} }
if len(s.SsoAddr) == 0 { // encrypt ext data
return nil, errors.New("srv addrs not found. maybe miss some packet?")
}
addr := s.SsoAddr[0].String()
if err := trans.encrypt(s.SessionKey); err != nil { if err := trans.encrypt(s.SessionKey); err != nil {
return nil, err return nil, err
} }
return s.retry(uploadBDHMultiThread, &trans)
}
func uploadBDHMultiThread(s *Session, addr Addr, trans *Transaction) ([]byte, error) {
const blockSize int64 = 256 * 1024 const blockSize int64 = 256 * 1024
const threadCount = 4
var ( var (
rspExt []byte rspExt []byte
completedThread uint32 completedThread uint32
@ -141,9 +158,9 @@ func (s *Session) UploadBDHMultiThread(trans Transaction, threadCount int) ([]by
cond.Signal() cond.Signal()
}() }()
conn, err := net.DialTimeout("tcp", addr, time.Second*20) conn, err := net.DialTimeout("tcp", addr.String(), time.Second*20)
if err != nil { if err != nil {
return errors.Wrap(err, "connect error") return err
} }
defer conn.Close() defer conn.Close()
reader := binary.NewNetworkReader(conn) reader := binary.NewNetworkReader(conn)

View File

@ -97,8 +97,11 @@ func (s *Session) Upload(addr Addr, trans Transaction) error {
} }
func (s *Session) UploadExciting(trans Transaction) ([]byte, error) { func (s *Session) UploadExciting(trans Transaction) ([]byte, error) {
addr := s.SsoAddr[0] return s.retry(uploadExciting, &trans)
url := fmt.Sprintf("http://%v/cgi-bin/httpconn?htcmd=0x6FF0087&Uin=%v", addr, s.Uin) }
func uploadExciting(s *Session, addr Addr, trans *Transaction) ([]byte, error) {
url := fmt.Sprintf("http://%v/cgi-bin/httpconn?htcmd=0x6FF0087&Uin=%v", addr.String(), s.Uin)
var rspExt []byte var rspExt []byte
var offset int64 var offset int64
const chunkSize = 524288 const chunkSize = 524288

View File

@ -169,7 +169,7 @@ func (c *QQClient) UploadShortVideo(target message.Source, video, thumb io.ReadS
Encrypt: true, Encrypt: true,
} }
if thread > 1 { if thread > 1 {
hwRsp, err = c.highwaySession.UploadBDHMultiThread(input, thread) hwRsp, err = c.highwaySession.UploadBDHMultiThread(input)
} else { } else {
hwRsp, err = c.highwaySession.UploadBDH(input) hwRsp, err = c.highwaySession.UploadBDH(input)
} }