diff --git a/client/image.go b/client/image.go index 40dd1951..394ffd21 100644 --- a/client/image.go +++ b/client/image.go @@ -43,10 +43,10 @@ type imageUploadResponse struct { IsExists bool } -func (c *QQClient) UploadImage(target message.Source, img io.ReadSeeker, thread ...int) (message.IMessageElement, error) { +func (c *QQClient) UploadImage(target message.Source, img io.ReadSeeker) (message.IMessageElement, error) { switch target.SourceType { case message.SourceGroup, message.SourceGuildChannel, message.SourceGuildDirect: - return c.uploadGroupOrGuildImage(target, img, thread...) + return c.uploadGroupOrGuildImage(target, img) case message.SourcePrivate: return c.uploadPrivateImage(target.PrimaryID, img, 0) default: @@ -54,7 +54,7 @@ func (c *QQClient) UploadImage(target message.Source, img io.ReadSeeker, thread } } -func (c *QQClient) uploadGroupOrGuildImage(target message.Source, img io.ReadSeeker, thread ...int) (message.IMessageElement, error) { +func (c *QQClient) uploadGroupOrGuildImage(target message.Source, img io.ReadSeeker) (message.IMessageElement, error) { _, _ = img.Seek(0, io.SeekStart) // safe fh, length := utils.ComputeMd5AndLength(img) _, _ = img.Seek(0, io.SeekStart) @@ -63,10 +63,6 @@ func (c *QQClient) uploadGroupOrGuildImage(target message.Source, img io.ReadSee imgWaiter.Wait(key) defer imgWaiter.Done(key) - tc := 1 - if len(thread) > 0 { - tc = thread[0] - } cmd := int32(2) ext := EmptyBytes if target.SourceType != message.SourceGroup { // guild @@ -112,11 +108,7 @@ func (c *QQClient) uploadGroupOrGuildImage(target message.Source, img io.ReadSee Ticket: rsp.UploadKey, Ext: ext, } - if tc > 1 && length > 3*1024*1024 { - _, err = c.highwaySession.UploadBDHMultiThread(input) - } else { - _, err = c.highwaySession.UploadBDH(input) - } + _, err = c.highwaySession.Upload(input) if err != nil { return nil, errors.Wrap(err, "upload failed") } @@ -306,7 +298,7 @@ func (c *QQClient) uploadOcrImage(img io.Reader, size int32, sum []byte) (string Uuid: binary.GenUUID(r), }) - rsp, err := c.highwaySession.UploadBDH(highway.Transaction{ + rsp, err := c.highwaySession.Upload(highway.Transaction{ CommandID: 76, Body: img, Size: int64(size), diff --git a/client/internal/highway/bdh.go b/client/internal/highway/bdh.go index cc9a21d2..ff6ca5e7 100644 --- a/client/internal/highway/bdh.go +++ b/client/internal/highway/bdh.go @@ -3,7 +3,6 @@ package highway import ( "crypto/md5" "io" - "net" "sync" "sync/atomic" @@ -36,29 +35,7 @@ func (bdh *Transaction) encrypt(key []byte) error { return nil } -func (s *Session) retry(upload func(s *Session, addr Addr, trans *Transaction) ([]byte, error), trans *Transaction) ([]byte, error) { - // try to find a available server - for _, addr := range s.SsoAddr { - r, err := upload(s, addr, trans) - if err == nil { - return r, nil - } - if _, ok := err.(net.Error); ok { - // try another server - // TODO: delete broken servers? - continue - } - return nil, err - } - return nil, errors.New("cannot found available server") -} - -func (s *Session) UploadBDH(trans Transaction) ([]byte, error) { - // encrypt ext data - if err := trans.encrypt(s.SessionKey); err != nil { - return nil, err - } - +func (s *Session) uploadSingle(trans Transaction) ([]byte, error) { pc, err := s.selectConn() if err != nil { return nil, err @@ -66,7 +43,7 @@ func (s *Session) UploadBDH(trans Transaction) ([]byte, error) { defer s.putIdleConn(pc) reader := binary.NewNetworkReader(pc.conn) - const chunkSize = 256 * 1024 + const chunkSize = 128 * 1024 var rspExt []byte offset := 0 chunk := make([]byte, chunkSize) @@ -105,7 +82,7 @@ func (s *Session) UploadBDH(trans Transaction) ([]byte, error) { buffers := frame(head, chunk) _, err = buffers.WriteTo(pc.conn) if err != nil { - return nil, errors.Wrap(err, "write pc error") + return nil, errors.Wrap(err, "write conn error") } rspHead, err := readResponse(reader) if err != nil { @@ -124,18 +101,22 @@ func (s *Session) UploadBDH(trans Transaction) ([]byte, error) { return rspExt, nil } -func (s *Session) UploadBDHMultiThread(trans Transaction) ([]byte, error) { - // for small file and small thread count, - // use UploadBDH instead of UploadBDHMultiThread - if trans.Size < 1024*1024*3 { - return s.UploadBDH(trans) - } - +func (s *Session) Upload(trans Transaction) ([]byte, error) { // encrypt ext data if err := trans.encrypt(s.SessionKey); err != nil { return nil, err } + const maxThreadCount = 4 + threadCount := int(trans.Size) / (3 * 512 * 1024) // 1 thread upload 1.5 MB + if threadCount > maxThreadCount { + threadCount = maxThreadCount + } + if threadCount < 2 { + // single thread upload + return s.uploadSingle(trans) + } + // pick a address // TODO: pick smarter pc, err := s.selectConn() @@ -145,9 +126,7 @@ func (s *Session) UploadBDHMultiThread(trans Transaction) ([]byte, error) { addr := pc.addr s.putIdleConn(pc) - // TODO: use idle conn const blockSize int64 = 256 * 1024 - const threadCount = 4 var ( rspExt []byte completedThread uint32 @@ -163,11 +142,12 @@ func (s *Session) UploadBDHMultiThread(trans Transaction) ([]byte, error) { cond.Signal() }() + // todo: get from pool? pc, err := s.connect(addr) if err != nil { return err } - // defer s.putIdleConn(pc) // TODO: should we put back? + defer s.putIdleConn(pc) reader := binary.NewNetworkReader(pc.conn) chunk := make([]byte, blockSize) @@ -176,7 +156,8 @@ func (s *Session) UploadBDHMultiThread(trans Transaction) ([]byte, error) { off := offset offset += blockSize id++ - if int64(id) == count { // last + last := int64(id) == count + if last { // last for atomic.LoadUint32(&completedThread) != uint32(threadCount-1) { cond.Wait() } @@ -228,7 +209,7 @@ func (s *Session) UploadBDHMultiThread(trans Transaction) ([]byte, error) { if rspHead.ErrorCode != 0 { return errors.Errorf("upload failed: %d", rspHead.ErrorCode) } - if rspHead.RspExtendinfo != nil { + if last && rspHead.RspExtendinfo != nil { rspExt = rspHead.RspExtendinfo } } diff --git a/client/internal/highway/highway.go b/client/internal/highway/highway.go index 8028e15c..7a390938 100644 --- a/client/internal/highway/highway.go +++ b/client/internal/highway/highway.go @@ -135,7 +135,7 @@ type persistConn struct { ping int64 // echo ping } -const maxIdleConn = 5 +const maxIdleConn = 7 type idle struct { pc persistConn @@ -207,6 +207,7 @@ func (s *Session) connect(addr Addr) (persistConn, error) { if err != nil { return persistConn{}, err } + _ = conn.(*net.TCPConn).SetKeepAlive(true) // close conn runtime.SetFinalizer(conn, func(conn net.Conn) { diff --git a/client/multimsg.go b/client/multimsg.go index eba7270e..a2d86f82 100644 --- a/client/multimsg.go +++ b/client/multimsg.go @@ -313,7 +313,7 @@ func (builder *ForwardMessageBuilder) Main(m *message.ForwardMessage) *message.F Sum: bodyHash[:], Size: int64(len(body)), } - _, err = c.highwaySession.UploadBDH(input) + _, err = c.highwaySession.Upload(input) if err != nil { return nil } diff --git a/client/ptt.go b/client/ptt.go index aaa2bc98..95896164 100644 --- a/client/ptt.go +++ b/client/ptt.go @@ -73,7 +73,7 @@ func (c *QQClient) UploadVoice(target message.Source, voice io.ReadSeeker) (*mes ext = c.buildGroupPttStoreBDHExt(target.PrimaryID, fh, int32(length), 0, int32(length)) } // multi-thread upload is no need - rsp, err := c.highwaySession.UploadBDH(highway.Transaction{ + rsp, err := c.highwaySession.Upload(highway.Transaction{ CommandID: cmd, Body: voice, Sum: fh, @@ -121,8 +121,7 @@ func (c *QQClient) UploadVoice(target message.Source, voice io.ReadSeeker) (*mes } // UploadShortVideo 将视频和封面上传到服务器, 返回 message.ShortVideoElement 可直接发送 -// thread 上传线程数 -func (c *QQClient) UploadShortVideo(target message.Source, video, thumb io.ReadSeeker, thread int) (*message.ShortVideoElement, error) { +func (c *QQClient) UploadShortVideo(target message.Source, video, thumb io.ReadSeeker) (*message.ShortVideoElement, error) { thumbHash := md5.New() thumbLen, _ := io.Copy(thumbHash, thumb) thumbSum := thumbHash.Sum(nil) @@ -168,11 +167,7 @@ func (c *QQClient) UploadShortVideo(target message.Source, video, thumb io.ReadS Ext: ext, Encrypt: true, } - if thread > 1 { - hwRsp, err = c.highwaySession.UploadBDHMultiThread(input) - } else { - hwRsp, err = c.highwaySession.UploadBDH(input) - } + hwRsp, err = c.highwaySession.Upload(input) if err != nil { return nil, errors.Wrap(err, "upload video file error") } diff --git a/client/upload_file.go b/client/upload_file.go index 4f7fbacf..d8a0fc72 100644 --- a/client/upload_file.go +++ b/client/upload_file.go @@ -147,7 +147,7 @@ func (c *QQClient) UploadFile(target message.Source, file *LocalFile) error { if target.SourceType == message.SourcePrivate { input.CommandID = 69 } - if _, err := c.highwaySession.UploadBDHMultiThread(input); err != nil { + if _, err := c.highwaySession.Upload(input); err != nil { return errors.Wrap(err, "upload failed") } }