diff --git a/client/internal/highway/bdh.go b/client/internal/highway/bdh.go index ac688491..36c05dd7 100644 --- a/client/internal/highway/bdh.go +++ b/client/internal/highway/bdh.go @@ -27,12 +27,13 @@ type Transaction struct { } func (bdh *Transaction) encrypt(key []byte) error { - if bdh.Encrypt { - if len(key) == 0 { - return errors.New("session key not found. maybe miss some packet?") - } - bdh.Ext = binary.NewTeaCipher(key).Encrypt(bdh.Ext) + if !bdh.Encrypt { + return nil } + if len(key) == 0 { + return errors.New("session key not found. maybe miss some packet?") + } + bdh.Ext = binary.NewTeaCipher(key).Encrypt(bdh.Ext) return nil } @@ -88,7 +89,16 @@ func uploadBDH(s *Session, addr Addr, trans *Transaction) ([]byte, error) { } ch := md5.Sum(chunk) head, _ := proto.Marshal(&pb.ReqDataHighwayHead{ - MsgBasehead: s.dataHighwayHead(_REQ_CMD_DATA, 4096, trans.CommandID, 2052), + MsgBasehead: &pb.DataHighwayHead{ + Version: 1, + Uin: s.Uin, + Command: _REQ_CMD_DATA, + Seq: s.nextSeq(), + Appid: s.AppID, + Dataflag: 4096, + CommandId: trans.CommandID, + LocaleId: 2052, + }, MsgSeghead: &pb.SegHead{ Filesize: trans.Size, Dataoffset: int64(offset), @@ -100,8 +110,8 @@ func uploadBDH(s *Session, addr Addr, trans *Transaction) ([]byte, error) { ReqExtendinfo: trans.Ext, }) offset += rl - frame := newFrame(head, chunk) - _, err = frame.WriteTo(conn) + buffers := frame(head, chunk) + _, err = buffers.WriteTo(conn) if err != nil { return nil, errors.Wrap(err, "write conn error") } @@ -190,7 +200,16 @@ func uploadBDHMultiThread(s *Session, addr Addr, trans *Transaction) ([]byte, er } ch := md5.Sum(chunk) head, _ := proto.Marshal(&pb.ReqDataHighwayHead{ - MsgBasehead: s.dataHighwayHead(_REQ_CMD_DATA, 4096, trans.CommandID, 2052), + MsgBasehead: &pb.DataHighwayHead{ + Version: 1, + Uin: s.Uin, + Command: _REQ_CMD_DATA, + Seq: s.nextSeq(), + Appid: s.AppID, + Dataflag: 4096, + CommandId: trans.CommandID, + LocaleId: 2052, + }, MsgSeghead: &pb.SegHead{ Filesize: trans.Size, Dataoffset: off, @@ -201,8 +220,8 @@ func uploadBDHMultiThread(s *Session, addr Addr, trans *Transaction) ([]byte, er }, ReqExtendinfo: trans.Ext, }) - frame := newFrame(head, chunk) - _, err = frame.WriteTo(conn) + buffers := frame(head, chunk) + _, err = buffers.WriteTo(conn) if err != nil { return errors.Wrap(err, "write conn error") } diff --git a/client/internal/highway/frame.go b/client/internal/highway/frame.go index 00c60541..ee1fcfb0 100644 --- a/client/internal/highway/frame.go +++ b/client/internal/highway/frame.go @@ -7,7 +7,7 @@ import ( var etx = []byte{0x29} -// newFrame 包格式 +// frame 包格式 // // - STX: 0x28(40) // - head length @@ -17,7 +17,7 @@ var etx = []byte{0x29} // - ETX: 0x29(41) // // 节省内存, 可被go runtime优化为writev操作 -func newFrame(head []byte, body []byte) net.Buffers { +func frame(head []byte, body []byte) net.Buffers { buffers := make(net.Buffers, 4) // buffer0 format: // - STX diff --git a/client/internal/highway/highway.go b/client/internal/highway/highway.go index cfa363ed..c34fe8fc 100644 --- a/client/internal/highway/highway.go +++ b/client/internal/highway/highway.go @@ -1,11 +1,8 @@ package highway import ( - "crypto/md5" "fmt" - "io" "net" - "net/http" "sync/atomic" "github.com/pkg/errors" @@ -42,6 +39,11 @@ type Session struct { SsoAddr []Addr seq int32 + /* + idleMu sync.Mutex + idleCount int + idle *idle + */ } const highwayMaxResponseSize int32 = 1024 * 100 // 100k @@ -58,94 +60,25 @@ func (s *Session) AppendAddr(ip, port uint32) { s.SsoAddr = append(s.SsoAddr, addr) } -func (s *Session) UploadExciting(trans Transaction) ([]byte, error) { - return s.retry(uploadExciting, &trans) -} - -func uploadExciting(s *Session, addr Addr, trans *Transaction) ([]byte, error) { - url := fmt.Sprintf("http://%v/cgi-bin/httpconn?htcmd=0x6FF0087&Uin=%v", addr.String(), s.Uin) - var rspExt []byte - var offset int64 - const chunkSize = 524288 - chunk := make([]byte, chunkSize) - for { - chunk = chunk[:chunkSize] - rl, err := io.ReadFull(trans.Body, chunk) - if rl == 0 { - break - } - if err == io.ErrUnexpectedEOF { - chunk = chunk[:rl] - } - ch := md5.Sum(chunk) - head, _ := proto.Marshal(&pb.ReqDataHighwayHead{ - MsgBasehead: s.dataHighwayHead(_REQ_CMD_DATA, 0, trans.CommandID, 0), - MsgSeghead: &pb.SegHead{ - Filesize: trans.Size, - Dataoffset: offset, - Datalength: int32(rl), - Serviceticket: trans.Ticket, - Md5: ch[:], - FileMd5: trans.Sum, - }, - ReqExtendinfo: trans.Ext, - }) - offset += int64(rl) - frame := newFrame(head, chunk) - req, _ := http.NewRequest(http.MethodPost, url, &frame) - req.Header.Set("Accept", "*/*") - req.Header.Set("Connection", "Keep-Alive") - req.Header.Set("User-Agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)") - req.Header.Set("Pragma", "no-cache") - req.ContentLength = int64(len(head) + len(chunk) + 10) - rsp, err := http.DefaultClient.Do(req) - if err != nil { - return nil, errors.Wrap(err, "request error") - } - body, _ := io.ReadAll(rsp.Body) - _ = rsp.Body.Close() - r := binary.NewReader(body) - r.ReadByte() - hl := r.ReadInt32() - _ = r.ReadInt32() - h := r.ReadBytes(int(hl)) - rspHead := new(pb.RspDataHighwayHead) - if err = proto.Unmarshal(h, rspHead); err != nil { - return nil, errors.Wrap(err, "failed to unmarshal protobuf message") - } - if rspHead.ErrorCode != 0 { - return nil, errors.Errorf("upload failed: %d", rspHead.ErrorCode) - } - if rspHead.RspExtendinfo != nil { - rspExt = rspHead.RspExtendinfo - } - } - return rspExt, nil -} - func (s *Session) nextSeq() int32 { return atomic.AddInt32(&s.seq, 2) } -func (s *Session) dataHighwayHead(cmd string, flag, cmdID, locale int32) *pb.DataHighwayHead { - return &pb.DataHighwayHead{ - Version: 1, - Uin: s.Uin, - Command: cmd, - Seq: s.nextSeq(), - Appid: s.AppID, - Dataflag: flag, - CommandId: cmdID, - LocaleId: locale, - } -} - func (s *Session) sendHeartbreak(conn net.Conn) error { head, _ := proto.Marshal(&pb.ReqDataHighwayHead{ - MsgBasehead: s.dataHighwayHead(_REQ_CMD_HEART_BREAK, 4096, 0, 2052), + MsgBasehead: &pb.DataHighwayHead{ + Version: 1, + Uin: s.Uin, + Command: _REQ_CMD_HEART_BREAK, + Seq: s.nextSeq(), + Appid: s.AppID, + Dataflag: 4096, + CommandId: 0, + LocaleId: 2052, + }, }) - frame := newFrame(head, nil) - _, err := frame.WriteTo(conn) + buffers := frame(head, nil) + _, err := buffers.WriteTo(conn) return err } @@ -179,3 +112,23 @@ func readResponse(r *binary.NetworkReader) (*pb.RspDataHighwayHead, error) { } return rsp, nil } + +/* +const maxIdleConn = 5 + +type idle struct { + conn net.Conn + delay int64 + next *idle +} + +// getConn ... +func (s *Session) getConn() net.Conn { + s.idleMu.Lock() + defer s.idleMu.Unlock() + + conn := s.idle.conn + s.idle = s.idle.next + return conn +} +*/ diff --git a/client/upload_file.go b/client/upload_file.go index cfd9d73a..febac67a 100644 --- a/client/upload_file.go +++ b/client/upload_file.go @@ -147,7 +147,7 @@ func (c *QQClient) UploadFile(target message.Source, file *LocalFile) error { if target.SourceType == message.SourcePrivate { input.CommandID = 69 } - if _, err := c.highwaySession.UploadExciting(input); err != nil { + if _, err := c.highwaySession.UploadBDH(input); err != nil { return errors.Wrap(err, "upload failed") } }