From b398cec6a56464ad2305662c36de8203dced9e19 Mon Sep 17 00:00:00 2001 From: wdvxdr Date: Wed, 15 Dec 2021 19:12:43 +0800 Subject: [PATCH] refactor: move highway.go to client/internal/highway --- binary/jce/reader_test.go | 3 +- binary/jce/writer_test.go | 4 +- client/client.go | 5 +- client/decoders.go | 8 +- client/group_file.go | 14 +- client/group_info.go | 16 + client/group_msg.go | 17 +- client/guild.go | 1 - client/guild_eventflow.go | 5 +- client/guild_msg.go | 28 +- client/highway.go | 499 ----------------------------- client/image.go | 47 ++- client/internal/highway/addr.go | 31 ++ client/internal/highway/bdh.go | 275 ++++++++++++++++ client/internal/highway/highway.go | 265 +++++++++++++++ client/network.go | 22 +- client/ptt.go | 42 ++- internal/generator/jce_gen/main.go | 2 +- topic/feed.go | 4 +- 19 files changed, 716 insertions(+), 572 deletions(-) delete mode 100644 client/highway.go create mode 100644 client/internal/highway/addr.go create mode 100644 client/internal/highway/bdh.go create mode 100644 client/internal/highway/highway.go diff --git a/binary/jce/reader_test.go b/binary/jce/reader_test.go index 883897fd..4fad5585 100644 --- a/binary/jce/reader_test.go +++ b/binary/jce/reader_test.go @@ -63,7 +63,8 @@ var req = RequestDataVersion2{ "5": { "123": []byte(`123`), }, - }} + }, +} func TestRequestDataVersion2_ReadFrom(t *testing.T) { // todo(wdv): fuzz test diff --git a/binary/jce/writer_test.go b/binary/jce/writer_test.go index 2a2d85bb..718ec84a 100644 --- a/binary/jce/writer_test.go +++ b/binary/jce/writer_test.go @@ -9,7 +9,7 @@ import ( var globalBytes []byte func BenchmarkJceWriter_WriteMap(b *testing.B) { - var x = globalBytes + x := globalBytes for i := 0; i < b.N; i++ { w := NewJceWriter() w.writeMapStrMapStrBytes(req.Map, 0) @@ -39,7 +39,7 @@ var reqPacket1 = &RequestPacket{ } func BenchmarkJceWriter_WriteJceStructRaw(b *testing.B) { - var x = globalBytes + x := globalBytes for i := 0; i < b.N; i++ { _ = reqPacket1.ToBytes() } diff --git a/client/client.go b/client/client.go index 9c9a17b2..4c4e5758 100644 --- a/client/client.go +++ b/client/client.go @@ -15,6 +15,7 @@ import ( "github.com/Mrs4s/MiraiGo/binary" "github.com/Mrs4s/MiraiGo/binary/jce" + "github.com/Mrs4s/MiraiGo/client/internal/highway" "github.com/Mrs4s/MiraiGo/client/pb/msg" "github.com/Mrs4s/MiraiGo/internal/crypto" "github.com/Mrs4s/MiraiGo/internal/packets" @@ -84,13 +85,12 @@ type QQClient struct { // session info qwebSeq int64 sigInfo *loginSigInfo - bigDataSession *bigDataSessionInfo + highwaySession *highway.Session dpwd []byte timeDiff int64 pwdFlag bool // address - srvSsoAddrs []string otherSrvAddrs []string fileStorageInfo *jce.FileStoragePushFSSvcList @@ -270,6 +270,7 @@ func NewClientMd5(uin int64, passwordMd5 [16]byte) *QQClient { func (c *QQClient) UseDevice(info *DeviceInfo) { c.version = genVersionInfo(info.Protocol) + c.highwaySession = highway.NewSession(int32(c.version.AppId), c.Uin) c.ksid = []byte(fmt.Sprintf("|%s|A8.2.7.27f6ea96", info.IMEI)) c.deviceInfo = info } diff --git a/client/decoders.go b/client/decoders.go index 91f4018a..4828e4a7 100644 --- a/client/decoders.go +++ b/client/decoders.go @@ -346,14 +346,12 @@ func decodePushReqPacket(c *QQClient, _ *incomingPacketInfo, payload []byte) (in c.fileStorageInfo = list rsp := cmd0x6ff.C501RspBody{} if err := proto.Unmarshal(list.BigDataChannel.PbBuf, &rsp); err == nil && rsp.RspBody != nil { - c.bigDataSession = &bigDataSessionInfo{ - SigSession: rsp.RspBody.SigSession, - SessionKey: rsp.RspBody.SessionKey, - } + c.highwaySession.SigSession = rsp.RspBody.SigSession + c.highwaySession.SessionKey = rsp.RspBody.SessionKey for _, srv := range rsp.RspBody.Addrs { if srv.GetServiceType() == 10 { for _, addr := range srv.Addrs { - c.srvSsoAddrs = append(c.srvSsoAddrs, fmt.Sprintf("%v:%v", binary.UInt32ToIPV4Address(addr.GetIp()), addr.GetPort())) + c.highwaySession.AppendAddr(addr.GetIp(), addr.GetPort()) } } if srv.GetServiceType() == 21 { diff --git a/client/group_file.go b/client/group_file.go index 16c1c48d..f2df4a52 100644 --- a/client/group_file.go +++ b/client/group_file.go @@ -11,6 +11,7 @@ import ( "github.com/pkg/errors" + "github.com/Mrs4s/MiraiGo/client/internal/highway" "github.com/Mrs4s/MiraiGo/client/pb/exciting" "github.com/Mrs4s/MiraiGo/client/pb/oidb" "github.com/Mrs4s/MiraiGo/internal/packets" @@ -225,11 +226,18 @@ func (fs *GroupFileSystem) UploadFile(p, name, folderId string) error { }, Unknown3: proto.Int32(0), }) - if _, err = fs.client.excitingUploadStream(file, 71, fs.client.bigDataSession.SigSession, ext); err != nil { + client := fs.client + input := highway.ExcitingInput{ + CommandID: 71, + Body: file, + Ticket: fs.client.highwaySession.SigSession, + Ext: ext, + } + if _, err = fs.client.highwaySession.UploadExciting(input); err != nil { return errors.Wrap(err, "upload failed") } - _, pkt := fs.client.buildGroupFileFeedsRequest(fs.GroupCode, rsp.GetFileId(), rsp.GetBusId(), rand.Int31()) - return fs.client.sendPacket(pkt) + _, pkt := client.buildGroupFileFeedsRequest(fs.GroupCode, rsp.GetFileId(), rsp.GetBusId(), rand.Int31()) + return client.sendPacket(pkt) } func (fs *GroupFileSystem) GetDownloadUrl(file *GroupFile) string { diff --git a/client/group_info.go b/client/group_info.go index 46add952..0cd449ac 100644 --- a/client/group_info.go +++ b/client/group_info.go @@ -1,9 +1,11 @@ package client import ( + "bytes" "encoding/json" "fmt" "math/rand" + "net/http" "net/url" "sort" "strings" @@ -260,6 +262,20 @@ func decodeGroupInfoResponse(c *QQClient, _ *incomingPacketInfo, payload []byte) }, nil } +func (c *QQClient) uploadGroupHeadPortrait(groupCode int64, img []byte) error { + url := fmt.Sprintf("http://htdata3.qq.com/cgi-bin/httpconn?htcmd=0x6ff0072&ver=5520&ukey=%v&range=0&uin=%v&seq=23&groupuin=%v&filetype=3&imagetype=5&userdata=0&subcmd=1&subver=101&clip=0_0_0_0&filesize=%v", + c.getSKey(), c.Uin, groupCode, len(img)) + req, _ := http.NewRequest("POST", url, bytes.NewReader(img)) + req.Header["User-Agent"] = []string{"Dalvik/2.1.0 (Linux; U; Android 7.1.2; PCRT00 Build/N2G48H)"} + req.Header["Content-Type"] = []string{"multipart/form-data;boundary=****"} + rsp, err := http.DefaultClient.Do(req) + if err != nil { + return errors.Wrap(err, "failed to upload group head portrait") + } + rsp.Body.Close() + return nil +} + func (g *GroupInfo) UpdateName(newName string) { if g.AdministratorOrOwner() && newName != "" && strings.Count(newName, "") <= 20 { g.client.updateGroupName(g.Code, newName) diff --git a/client/group_msg.go b/client/group_msg.go index 83ee0767..ccd1bece 100644 --- a/client/group_msg.go +++ b/client/group_msg.go @@ -13,6 +13,7 @@ import ( "github.com/pkg/errors" + "github.com/Mrs4s/MiraiGo/client/internal/highway" "github.com/Mrs4s/MiraiGo/client/pb/longmsg" "github.com/Mrs4s/MiraiGo/client/pb/msg" "github.com/Mrs4s/MiraiGo/client/pb/multimsg" @@ -169,7 +170,13 @@ func (c *QQClient) uploadGroupLongMessage(groupCode int64, m *message.ForwardMes return nil, errors.Errorf("upload long message error: %v", err) } for i, ip := range rsp.Uint32UpIp { - err := c.highwayUpload(uint32(ip), int(rsp.Uint32UpPort[i]), rsp.MsgSig, body, 27) + addr := highway.Addr{IP: uint32(ip), Port: int(rsp.Uint32UpPort[i])} + input := highway.Input{ + CommandID: 27, + Key: rsp.MsgSig, + Body: bytes.NewReader(body), + } + err := c.highwaySession.Upload(addr, input) if err != nil { c.Error("highway upload long message error: %v", err) continue @@ -191,7 +198,13 @@ func (c *QQClient) UploadGroupForwardMessage(groupCode int64, m *message.Forward return nil } for i, ip := range rsp.Uint32UpIp { - err := c.highwayUpload(uint32(ip), int(rsp.Uint32UpPort[i]), rsp.MsgSig, body, 27) + addr := highway.Addr{IP: uint32(ip), Port: int(rsp.Uint32UpPort[i])} + input := highway.Input{ + CommandID: 27, + Key: rsp.MsgSig, + Body: bytes.NewReader(body), + } + err := c.highwaySession.Upload(addr, input) if err != nil { continue } diff --git a/client/guild.go b/client/guild.go index 21b4271e..18f0d2f2 100644 --- a/client/guild.go +++ b/client/guild.go @@ -793,7 +793,6 @@ func decodeGuildPushFirstView(c *QQClient, _ *incomingPacketInfo, payload []byte } } if len(firstViewMsg.ChannelMsgs) > 0 { // sync msg - } return nil, nil } diff --git a/client/guild_eventflow.go b/client/guild_eventflow.go index 1a93f403..136a2241 100644 --- a/client/guild_eventflow.go +++ b/client/guild_eventflow.go @@ -17,9 +17,7 @@ func init() { decoders["MsgPush.PushGroupProMsg"] = decodeGuildEventFlowPacket } -var ( - updateChanLock sync.Mutex -) +var updateChanLock sync.Mutex type tipsPushInfo struct { TinyId uint64 @@ -63,7 +61,6 @@ func decodeGuildEventFlowPacket(c *QQClient, _ *incomingPacketInfo, payload []by } if m.Head.ContentHead.GetSubType() == 2 { // todo: tips? if common == nil { // empty tips - } tipsInfo := &tipsPushInfo{ TinyId: m.Head.RoutingHead.GetFromTinyid(), diff --git a/client/guild_msg.go b/client/guild_msg.go index ad9d5c2c..45023b13 100644 --- a/client/guild_msg.go +++ b/client/guild_msg.go @@ -2,9 +2,7 @@ package client import ( "bytes" - "crypto/md5" "encoding/hex" - "fmt" "image" "io" "math/rand" @@ -13,6 +11,7 @@ import ( "github.com/pkg/errors" "github.com/Mrs4s/MiraiGo/binary" + "github.com/Mrs4s/MiraiGo/client/internal/highway" "github.com/Mrs4s/MiraiGo/client/pb/channel" "github.com/Mrs4s/MiraiGo/client/pb/cmd0x388" "github.com/Mrs4s/MiraiGo/client/pb/msg" @@ -130,12 +129,18 @@ func (s *GuildService) UploadGuildImage(guildId, channelId uint64, img io.ReadSe if body.IsExists { goto ok } - if len(s.c.srvSsoAddrs) == 0 { + if s.c.highwaySession.AddrLength() == 0 { for i, addr := range body.UploadIp { - s.c.srvSsoAddrs = append(s.c.srvSsoAddrs, fmt.Sprintf("%v:%v", binary.UInt32ToIPV4Address(addr), body.UploadPort[i])) + s.c.highwaySession.AppendAddr(addr, body.UploadPort[i]) } } - if _, err = s.c.highwayUploadByBDH(img, length, 83, body.UploadKey, fh, binary.DynamicProtoMessage{11: guildId, 12: channelId}.Encode(), false); err == nil { + if _, err = s.c.highwaySession.UploadBDH(highway.BdhInput{ + CommandID: 83, + Body: img, + Ticket: body.UploadKey, + Ext: binary.DynamicProtoMessage{11: guildId, 12: channelId}.Encode(), + Encrypt: false, + }); err == nil { goto ok } return nil, errors.Wrap(err, "highway upload error") @@ -383,14 +388,15 @@ func (c *QQClient) UploadGuildShortVideo(guildId, channelId uint64, video, thumb req.BusinessType = 4601 req.ToUin = int64(channelId) ext, _ := proto.Marshal(req) - var hwRsp []byte multi := utils.MultiReadSeeker(thumb, video) - h := md5.New() - length, _ := io.Copy(h, multi) - fh := h.Sum(nil) - _, _ = multi.Seek(0, io.SeekStart) - hwRsp, err = c.highwayUploadByBDH(multi, length, 89, c.bigDataSession.SigSession, fh, ext, true) + hwRsp, err := c.highwaySession.UploadBDH(highway.BdhInput{ + CommandID: 89, + Body: multi, + Ticket: c.highwaySession.SigSession, + Ext: ext, + Encrypt: true, + }) if err != nil { return nil, errors.Wrap(err, "upload video file error") } diff --git a/client/highway.go b/client/highway.go deleted file mode 100644 index 252c7bee..00000000 --- a/client/highway.go +++ /dev/null @@ -1,499 +0,0 @@ -package client - -import ( - "bytes" - "crypto/md5" - binary2 "encoding/binary" - "fmt" - "io" - "net" - "net/http" - "os" - "strconv" - "sync" - "sync/atomic" - "time" - - "github.com/pkg/errors" - "golang.org/x/sync/errgroup" - - "github.com/Mrs4s/MiraiGo/binary" - "github.com/Mrs4s/MiraiGo/client/pb" - "github.com/Mrs4s/MiraiGo/internal/proto" - "github.com/Mrs4s/MiraiGo/utils" -) - -func (c *QQClient) highwayUpload(ip uint32, port int, updKey, data []byte, cmdID int32) error { - return c.highwayUploadStream(ip, port, updKey, bytes.NewReader(data), cmdID) -} - -func (c *QQClient) highwayUploadStream(ip uint32, port int, updKey []byte, stream io.ReadSeeker, cmdId int32) error { - addr := net.TCPAddr{ - IP: make([]byte, 4), - Port: port, - } - binary2.LittleEndian.PutUint32(addr.IP, ip) - h := md5.New() - length, _ := io.Copy(h, stream) - fh := h.Sum(nil) - const chunkSize = 8192 * 8 - _, _ = stream.Seek(0, io.SeekStart) - conn, err := net.DialTCP("tcp", nil, &addr) - if err != nil { - return errors.Wrap(err, "connect error") - } - defer conn.Close() - offset := 0 - reader := binary.NewNetworkReader(conn) - buf := binary.Get256KBytes() - chunk := *buf - defer binary.Put256KBytes(buf) - - w := binary.SelectWriter() - defer binary.PutWriter(w) - for { - chunk = chunk[:chunkSize] - rl, err := io.ReadFull(stream, chunk) - if errors.Is(err, io.EOF) { - break - } - if errors.Is(err, io.ErrUnexpectedEOF) { - chunk = chunk[:rl] - } - ch := md5.Sum(chunk) - head, _ := proto.Marshal(&pb.ReqDataHighwayHead{ - MsgBasehead: &pb.DataHighwayHead{ - Version: 1, - Uin: strconv.FormatInt(c.Uin, 10), - Command: "PicUp.DataUp", - Seq: c.nextGroupDataTransSeq(), - Appid: int32(c.version.AppId), - Dataflag: 4096, - CommandId: cmdId, - LocaleId: 2052, - }, - MsgSeghead: &pb.SegHead{ - Filesize: length, - Dataoffset: int64(offset), - Datalength: int32(rl), - Serviceticket: updKey, - Md5: ch[:], - FileMd5: fh, - }, - ReqExtendinfo: EmptyBytes, - }) - offset += rl - w.Reset() - w.WriteByte(40) - w.WriteUInt32(uint32(len(head))) - w.WriteUInt32(uint32(len(chunk))) - w.Write(head) - w.Write(chunk) - w.WriteByte(41) - _, err = conn.Write(w.Bytes()) - if err != nil { - return errors.Wrap(err, "write conn error") - } - rspHead, _, err := highwayReadResponse(reader) - if err != nil { - return errors.Wrap(err, "highway upload error") - } - if rspHead.ErrorCode != 0 { - return errors.New("upload failed") - } - } - return nil -} - -func (c *QQClient) highwayUploadByBDH(stream io.Reader, length int64, cmdId int32, ticket, sum, ext []byte, encrypt bool) ([]byte, error) { - if len(c.srvSsoAddrs) == 0 { - return nil, errors.New("srv addrs not found. maybe miss some packet?") - } - if encrypt { - if c.bigDataSession == nil || len(c.bigDataSession.SessionKey) == 0 { - return nil, errors.New("session key not found. maybe miss some packet?") - } - ext = binary.NewTeaCipher(c.bigDataSession.SessionKey).Encrypt(ext) - } - const chunkSize = 256 * 1024 - conn, err := net.DialTimeout("tcp", c.srvSsoAddrs[0], time.Second*20) - if err != nil { - return nil, errors.Wrap(err, "connect error") - } - defer conn.Close() - offset := 0 - reader := binary.NewNetworkReader(conn) - if err = c.highwaySendHeartbreak(conn); err != nil { - return nil, errors.Wrap(err, "echo error") - } - if _, _, err = highwayReadResponse(reader); err != nil { - return nil, errors.Wrap(err, "echo error") - } - var rspExt []byte - buf := binary.Get256KBytes() - chunk := *buf - defer binary.Put256KBytes(buf) - - w := binary.SelectWriter() - defer binary.PutWriter(w) - for { - chunk = chunk[:chunkSize] - rl, err := io.ReadFull(stream, chunk) - if errors.Is(err, io.EOF) { - break - } - if errors.Is(err, io.ErrUnexpectedEOF) { - chunk = chunk[:rl] - } - ch := md5.Sum(chunk) - head, _ := proto.Marshal(&pb.ReqDataHighwayHead{ - MsgBasehead: &pb.DataHighwayHead{ - Version: 1, - Uin: strconv.FormatInt(c.Uin, 10), - Command: "PicUp.DataUp", - Seq: c.nextGroupDataTransSeq(), - Appid: int32(c.version.AppId), - Dataflag: 4096, - CommandId: cmdId, - LocaleId: 2052, - }, - MsgSeghead: &pb.SegHead{ - Filesize: length, - Dataoffset: int64(offset), - Datalength: int32(rl), - Serviceticket: ticket, - Md5: ch[:], - FileMd5: sum, - }, - ReqExtendinfo: ext, - }) - offset += rl - w.Reset() - w.WriteByte(40) - w.WriteUInt32(uint32(len(head))) - w.WriteUInt32(uint32(len(chunk))) - w.Write(head) - w.Write(chunk) - w.WriteByte(41) - _, err = conn.Write(w.Bytes()) - if err != nil { - return nil, errors.Wrap(err, "write conn error") - } - rspHead, _, err := highwayReadResponse(reader) - if err != nil { - return nil, errors.Wrap(err, "highway upload error") - } - if rspHead.ErrorCode != 0 { - return nil, errors.Errorf("upload failed: %d", rspHead.ErrorCode) - } - if rspHead.RspExtendinfo != nil { - rspExt = rspHead.RspExtendinfo - } - if rspHead.MsgSeghead != nil && rspHead.MsgSeghead.Serviceticket != nil { - ticket = rspHead.MsgSeghead.Serviceticket - } - } - return rspExt, nil -} - -func (c *QQClient) highwayUploadFileMultiThreadingByBDH(path string, cmdId int32, threadCount int, ticket, ext []byte, encrypt bool) ([]byte, error) { - if len(c.srvSsoAddrs) == 0 { - return nil, errors.New("srv addrs not found. maybe miss some packet?") - } - if encrypt { - if c.bigDataSession == nil || len(c.bigDataSession.SessionKey) == 0 { - return nil, errors.New("session key not found. maybe miss some packet?") - } - ext = binary.NewTeaCipher(c.bigDataSession.SessionKey).Encrypt(ext) - } - stat, err := os.Stat(path) - if err != nil { - return nil, errors.Wrap(err, "get stat error") - } - file, err := os.OpenFile(path, os.O_RDONLY, 0o666) - if err != nil { - return nil, errors.Wrap(err, "open file error") - } - defer file.Close() - h := md5.New() - length, _ := io.Copy(h, file) - fh := h.Sum(nil) - _, _ = file.Seek(0, io.SeekStart) - if stat.Size() < 1024*1024*3 || threadCount < 2 { - return c.highwayUploadByBDH(file, length, cmdId, ticket, fh, ext, false) - } - type BlockMetaData struct { - Id int - BeginOffset int64 - EndOffset int64 - } - const blockSize int64 = 1024 * 512 - var ( - blocks []*BlockMetaData - rspExt []byte - BlockId = ^uint32(0) // -1 - uploadedCount uint32 - cond = sync.NewCond(&sync.Mutex{}) - ) - // Init Blocks - { - var temp int64 = 0 - for temp+blockSize < stat.Size() { - blocks = append(blocks, &BlockMetaData{ - Id: len(blocks), - BeginOffset: temp, - EndOffset: temp + blockSize, - }) - temp += blockSize - } - blocks = append(blocks, &BlockMetaData{ - Id: len(blocks), - BeginOffset: temp, - EndOffset: stat.Size(), - }) - } - doUpload := func() error { - defer cond.Signal() - - conn, err := net.DialTimeout("tcp", c.srvSsoAddrs[0], time.Second*20) - if err != nil { - return errors.Wrap(err, "connect error") - } - defer conn.Close() - chunk, _ := os.OpenFile(path, os.O_RDONLY, 0o666) - defer chunk.Close() - reader := binary.NewNetworkReader(conn) - if err = c.highwaySendHeartbreak(conn); err != nil { - return errors.Wrap(err, "echo error") - } - if _, _, err = highwayReadResponse(reader); err != nil { - return errors.Wrap(err, "echo error") - } - buffer := make([]byte, blockSize) - w := binary.SelectWriter() - w.Reset() - w.Grow(600 * 1024) // 复用,600k 不要放回池中 - for { - nextId := atomic.AddUint32(&BlockId, 1) - if nextId >= uint32(len(blocks)) { - break - } - block := blocks[nextId] - if block.Id == len(blocks)-1 { - cond.L.Lock() - for atomic.LoadUint32(&uploadedCount) != uint32(len(blocks))-1 { - cond.Wait() - } - cond.L.Unlock() - } - buffer = buffer[:blockSize] - _, _ = chunk.Seek(block.BeginOffset, io.SeekStart) - ri, err := io.ReadFull(chunk, buffer) - if err != nil { - if err == io.EOF { - break - } - if err == io.ErrUnexpectedEOF { - buffer = buffer[:ri] - } else { - return err - } - } - ch := md5.Sum(buffer) - head, _ := proto.Marshal(&pb.ReqDataHighwayHead{ - MsgBasehead: &pb.DataHighwayHead{ - Version: 1, - Uin: strconv.FormatInt(c.Uin, 10), - Command: "PicUp.DataUp", - Seq: c.nextGroupDataTransSeq(), - Appid: int32(c.version.AppId), - Dataflag: 4096, - CommandId: cmdId, - LocaleId: 2052, - }, - MsgSeghead: &pb.SegHead{ - Filesize: stat.Size(), - Dataoffset: block.BeginOffset, - Datalength: int32(ri), - Serviceticket: ticket, - Md5: ch[:], - FileMd5: fh, - }, - ReqExtendinfo: ext, - }) - w.Reset() - w.WriteByte(40) - w.WriteUInt32(uint32(len(head))) - w.WriteUInt32(uint32(len(buffer))) - w.Write(head) - w.Write(buffer) - w.WriteByte(41) - _, err = conn.Write(w.Bytes()) - if err != nil { - return errors.Wrap(err, "write conn error") - } - rspHead, _, err := highwayReadResponse(reader) - if err != nil { - return errors.Wrap(err, "highway upload error") - } - if rspHead.ErrorCode != 0 { - return errors.Errorf("upload failed: %d", rspHead.ErrorCode) - } - if rspHead.RspExtendinfo != nil { - rspExt = rspHead.RspExtendinfo - } - atomic.AddUint32(&uploadedCount, 1) - } - return nil - } - - group := errgroup.Group{} - for i := 0; i < threadCount; i++ { - group.Go(doUpload) - } - err = group.Wait() - return rspExt, err -} - -func (c *QQClient) highwaySendHeartbreak(conn net.Conn) error { - head, _ := proto.Marshal(&pb.ReqDataHighwayHead{ - MsgBasehead: &pb.DataHighwayHead{ - Version: 1, - Uin: strconv.FormatInt(c.Uin, 10), - Command: "PicUp.Echo", - Seq: c.nextGroupDataTransSeq(), - Appid: int32(c.version.AppId), - Dataflag: 4096, - CommandId: 0, - LocaleId: 2052, - }, - }) - w := binary.SelectWriter() - w.WriteByte(40) - w.WriteUInt32(uint32(len(head))) - w.WriteUInt32(0) - w.Write(head) - w.WriteByte(41) - _, err := conn.Write(w.Bytes()) - binary.PutWriter(w) - return err -} - -func highwayReadResponse(r *binary.NetworkReader) (*pb.RspDataHighwayHead, []byte, error) { - _, err := r.ReadByte() - if err != nil { - return nil, nil, errors.Wrap(err, "failed to read byte") - } - hl, _ := r.ReadInt32() - a2, _ := r.ReadInt32() - head, _ := r.ReadBytes(int(hl)) - payload, _ := r.ReadBytes(int(a2)) - _, _ = r.ReadByte() - rsp := new(pb.RspDataHighwayHead) - if err = proto.Unmarshal(head, rsp); err != nil { - return nil, nil, errors.Wrap(err, "failed to unmarshal protobuf message") - } - return rsp, payload, nil -} - -func (c *QQClient) excitingUploadStream(stream io.ReadSeeker, cmdId int32, ticket, ext []byte) ([]byte, error) { - fileMd5, fileLength := utils.ComputeMd5AndLength(stream) - _, _ = stream.Seek(0, io.SeekStart) - url := fmt.Sprintf("http://%v/cgi-bin/httpconn?htcmd=0x6FF0087&uin=%v", c.srvSsoAddrs[0], c.Uin) - var ( - rspExt []byte - offset int64 = 0 - chunkSize = 524288 - ) - chunk := make([]byte, chunkSize) - w := binary.SelectWriter() - w.Reset() - w.Grow(600 * 1024) // 复用,600k 不要放回池中 - for { - chunk = chunk[:chunkSize] - rl, err := io.ReadFull(stream, chunk) - if err == io.EOF { - break - } - if err == io.ErrUnexpectedEOF { - chunk = chunk[:rl] - } - ch := md5.Sum(chunk) - head, _ := proto.Marshal(&pb.ReqDataHighwayHead{ - MsgBasehead: &pb.DataHighwayHead{ - Version: 1, - Uin: strconv.FormatInt(c.Uin, 10), - Command: "PicUp.DataUp", - Seq: c.nextGroupDataTransSeq(), - Appid: int32(c.version.AppId), - Dataflag: 0, - CommandId: cmdId, - LocaleId: 0, - }, - MsgSeghead: &pb.SegHead{ - Filesize: fileLength, - Dataoffset: offset, - Datalength: int32(rl), - Serviceticket: ticket, - Md5: ch[:], - FileMd5: fileMd5, - }, - ReqExtendinfo: ext, - }) - offset += int64(rl) - w.Reset() - w.WriteByte(40) - w.WriteUInt32(uint32(len(head))) - w.WriteUInt32(uint32(len(chunk))) - w.Write(head) - w.Write(chunk) - w.WriteByte(41) - req, _ := http.NewRequest("POST", url, bytes.NewReader(w.Bytes())) - req.Header.Set("Accept", "*/*") - req.Header.Set("Connection", "Keep-Alive") - req.Header.Set("User-Agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)") - req.Header.Set("Pragma", "no-cache") - rsp, err := http.DefaultClient.Do(req) - if err != nil { - return nil, errors.Wrap(err, "request error") - } - body, _ := io.ReadAll(rsp.Body) - _ = rsp.Body.Close() - r := binary.NewReader(body) - r.ReadByte() - hl := r.ReadInt32() - a2 := r.ReadInt32() - h := r.ReadBytes(int(hl)) - r.ReadBytes(int(a2)) - rspHead := new(pb.RspDataHighwayHead) - if err = proto.Unmarshal(h, rspHead); err != nil { - return nil, errors.Wrap(err, "failed to unmarshal protobuf message") - } - if rspHead.ErrorCode != 0 { - return nil, errors.Errorf("upload failed: %d", rspHead.ErrorCode) - } - if rspHead.RspExtendinfo != nil { - rspExt = rspHead.RspExtendinfo - } - } - return rspExt, nil -} - -func (c *QQClient) uploadGroupHeadPortrait(groupCode int64, img []byte) error { - url := fmt.Sprintf( - "http://htdata3.qq.com/cgi-bin/httpconn?htcmd=0x6ff0072&ver=5520&ukey=%v&range=0&uin=%v&seq=23&groupuin=%v&filetype=3&imagetype=5&userdata=0&subcmd=1&subver=101&clip=0_0_0_0&filesize=%v", - c.getSKey(), - c.Uin, - groupCode, - len(img), - ) - req, _ := http.NewRequest("POST", url, bytes.NewReader(img)) - req.Header["User-Agent"] = []string{"Dalvik/2.1.0 (Linux; U; Android 7.1.2; PCRT00 Build/N2G48H)"} - req.Header["Content-Type"] = []string{"multipart/form-data;boundary=****"} - rsp, err := http.DefaultClient.Do(req) - if err != nil { - return errors.Wrap(err, "failed to upload group head portrait") - } - rsp.Body.Close() - return nil -} diff --git a/client/image.go b/client/image.go index b096767a..0db3036b 100644 --- a/client/image.go +++ b/client/image.go @@ -3,7 +3,6 @@ package client import ( "bytes" "encoding/hex" - "fmt" "image" _ "image/gif" "io" @@ -15,8 +14,9 @@ import ( "github.com/pkg/errors" "github.com/Mrs4s/MiraiGo/binary" + "github.com/Mrs4s/MiraiGo/client/internal/highway" "github.com/Mrs4s/MiraiGo/client/pb/cmd0x388" - "github.com/Mrs4s/MiraiGo/client/pb/highway" + highway2 "github.com/Mrs4s/MiraiGo/client/pb/highway" "github.com/Mrs4s/MiraiGo/client/pb/oidb" "github.com/Mrs4s/MiraiGo/internal/packets" "github.com/Mrs4s/MiraiGo/internal/proto" @@ -68,12 +68,18 @@ func (c *QQClient) UploadGroupImage(groupCode int64, img io.ReadSeeker) (*messag if rsp.IsExists { goto ok } - if len(c.srvSsoAddrs) == 0 { + if c.highwaySession.AddrLength() == 0 { for i, addr := range rsp.UploadIp { - c.srvSsoAddrs = append(c.srvSsoAddrs, fmt.Sprintf("%v:%v", binary.UInt32ToIPV4Address(addr), rsp.UploadPort[i])) + c.highwaySession.AppendAddr(addr, rsp.UploadPort[i]) } } - if _, err = c.highwayUploadByBDH(img, length, 2, rsp.UploadKey, fh, EmptyBytes, false); err == nil { + if _, err = c.highwaySession.UploadBDH(highway.BdhInput{ + CommandID: 2, + Body: img, + Ticket: rsp.UploadKey, + Ext: EmptyBytes, + Encrypt: false, + }); err == nil { goto ok } return nil, errors.Wrap(err, "upload failed") @@ -114,12 +120,19 @@ func (c *QQClient) UploadGroupImageByFile(groupCode int64, path string) (*messag if rsp.IsExists { goto ok } - if len(c.srvSsoAddrs) == 0 { + if c.highwaySession.AddrLength() == 0 { for i, addr := range rsp.UploadIp { - c.srvSsoAddrs = append(c.srvSsoAddrs, fmt.Sprintf("%v:%v", binary.UInt32ToIPV4Address(addr), rsp.UploadPort[i])) + c.highwaySession.AppendAddr(addr, rsp.UploadPort[i]) } } - if _, err = c.highwayUploadFileMultiThreadingByBDH(path, 2, 1, rsp.UploadKey, EmptyBytes, false); err == nil { + + if _, err = c.highwaySession.UploadBDHMultiThread(highway.BdhInput{ + CommandID: 2, + File: path, + Ticket: rsp.UploadKey, + Ext: EmptyBytes, + Encrypt: false, + }, 1); err == nil { goto ok } return nil, errors.Wrap(err, "upload failed") @@ -176,7 +189,7 @@ func (c *QQClient) ImageOcr(img interface{}) (*OcrResponse, error) { case *message.GroupImageElement: url = e.Url if b, err := utils.HTTPGetReadCloser(e.Url, ""); err == nil { - if url, err = c.uploadOcrImage(b, int64(e.Size), e.Md5); err != nil { + if url, err = c.uploadOcrImage(b); err != nil { url = e.Url } _ = b.Close() @@ -285,18 +298,26 @@ func (c *QQClient) buildGroupImageDownloadPacket(fileId, groupCode int64, fileMd return seq, packet } -func (c *QQClient) uploadOcrImage(img io.Reader, length int64, sum []byte) (string, error) { +func (c *QQClient) uploadOcrImage(img io.Reader) (string, error) { r := make([]byte, 16) rand.Read(r) - ext, _ := proto.Marshal(&highway.CommFileExtReq{ + ext, _ := proto.Marshal(&highway2.CommFileExtReq{ ActionType: proto.Uint32(0), Uuid: binary.GenUUID(r), }) - rsp, err := c.highwayUploadByBDH(img, length, 76, c.bigDataSession.SigSession, sum, ext, false) + + buf, _ := io.ReadAll(img) + rsp, err := c.highwaySession.UploadBDH(highway.BdhInput{ + CommandID: 76, + Body: bytes.NewReader(buf), + Ticket: c.highwaySession.SigSession, + Ext: ext, + Encrypt: false, + }) if err != nil { return "", errors.Wrap(err, "upload ocr image error") } - rspExt := highway.CommFileExtRsp{} + rspExt := highway2.CommFileExtRsp{} if err = proto.Unmarshal(rsp, &rspExt); err != nil { return "", errors.Wrap(err, "error unmarshal highway resp") } diff --git a/client/internal/highway/addr.go b/client/internal/highway/addr.go new file mode 100644 index 00000000..3430ce11 --- /dev/null +++ b/client/internal/highway/addr.go @@ -0,0 +1,31 @@ +package highway + +import ( + binary2 "encoding/binary" + "fmt" + "net" + + "github.com/Mrs4s/MiraiGo/binary" +) + +type Addr struct { + IP uint32 + Port int +} + +func (a Addr) asTcpAddr() *net.TCPAddr { + addr := &net.TCPAddr{ + IP: make([]byte, 4), + Port: a.Port, + } + binary2.LittleEndian.PutUint32(addr.IP, a.IP) + return addr +} + +func (a Addr) AsNetIP() net.IP { + return net.IPv4(byte(a.IP>>24), byte(a.IP>>16), byte(a.IP>>8), byte(a.IP)) +} + +func (a Addr) String() string { + return fmt.Sprintf("%v:%v", binary.UInt32ToIPV4Address(a.IP), a.Port) +} diff --git a/client/internal/highway/bdh.go b/client/internal/highway/bdh.go new file mode 100644 index 00000000..e3537b58 --- /dev/null +++ b/client/internal/highway/bdh.go @@ -0,0 +1,275 @@ +package highway + +import ( + "crypto/md5" + "io" + "net" + "os" + "sync" + "sync/atomic" + "time" + + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + + "github.com/Mrs4s/MiraiGo/binary" + "github.com/Mrs4s/MiraiGo/client/pb" + "github.com/Mrs4s/MiraiGo/internal/proto" + "github.com/Mrs4s/MiraiGo/utils" +) + +type BdhInput struct { + CommandID int32 + File string // upload multi-thread required + Body io.ReadSeeker + Ticket []byte + Ext []byte + Encrypt bool +} + +func (bdh *BdhInput) encrypt(key []byte) error { + if bdh.Encrypt { + if len(key) == 0 { + return errors.New("session key not found. maybe miss some packet?") + } + bdh.Ext = binary.NewTeaCipher(key).Encrypt(bdh.Ext) + } + return nil +} + +func (s *Session) UploadBDH(input BdhInput) ([]byte, error) { + if len(s.SsoAddr) == 0 { + return nil, errors.New("srv addrs not found. maybe miss some packet?") + } + addr := s.SsoAddr[0].String() + + sum, length := utils.ComputeMd5AndLength(input.Body) + _, _ = input.Body.Seek(0, io.SeekStart) + if err := input.encrypt(s.SessionKey); err != nil { + return nil, errors.Wrap(err, "encrypt error") + } + conn, err := net.DialTimeout("tcp", addr, time.Second*20) + if err != nil { + return nil, errors.Wrap(err, "connect error") + } + defer conn.Close() + offset := 0 + reader := binary.NewNetworkReader(conn) + if err = s.sendEcho(conn); err != nil { + return nil, err + } + + var rspExt []byte + const chunkSize = 256 * 1024 + chunk := make([]byte, chunkSize) + w := binary.SelectWriter() + defer binary.PutWriter(w) + for { + chunk = chunk[:chunkSize] + rl, err := io.ReadFull(input.Body, chunk) + if errors.Is(err, io.EOF) { + break + } + if errors.Is(err, io.ErrUnexpectedEOF) { + chunk = chunk[:rl] + } + ch := md5.Sum(chunk) + head, _ := proto.Marshal(&pb.ReqDataHighwayHead{ + MsgBasehead: &pb.DataHighwayHead{ + Version: 1, + Uin: s.uin, + Command: "PicUp.DataUp", + Seq: s.nextSeq(), + Appid: s.appID, + Dataflag: 4096, + CommandId: input.CommandID, + LocaleId: 2052, + }, + MsgSeghead: &pb.SegHead{ + Filesize: length, + Dataoffset: int64(offset), + Datalength: int32(rl), + Serviceticket: input.Ticket, + Md5: ch[:], + FileMd5: sum, + }, + ReqExtendinfo: input.Ext, + }) + offset += rl + w.Reset() + writeHeadBody(w, head, chunk) + _, err = conn.Write(w.Bytes()) + if err != nil { + return nil, errors.Wrap(err, "write conn error") + } + rspHead, _, err := readResponse(reader) + if err != nil { + return nil, errors.Wrap(err, "highway upload error") + } + if rspHead.ErrorCode != 0 { + return nil, errors.Errorf("upload failed: %d", rspHead.ErrorCode) + } + if rspHead.RspExtendinfo != nil { + rspExt = rspHead.RspExtendinfo + } + if rspHead.MsgSeghead != nil && rspHead.MsgSeghead.Serviceticket != nil { + input.Ticket = rspHead.MsgSeghead.Serviceticket + } + } + return rspExt, nil +} + +func (s *Session) UploadBDHMultiThread(input BdhInput, threadCount int) ([]byte, error) { + if len(s.SsoAddr) == 0 { + return nil, errors.New("srv addrs not found. maybe miss some packet?") + } + addr := s.SsoAddr[0].String() + + stat, err := os.Stat(input.File) + if err != nil { + return nil, errors.Wrap(err, "get stat error") + } + file, err := os.OpenFile(input.File, os.O_RDONLY, 0o666) + if err != nil { + return nil, errors.Wrap(err, "open file error") + } + sum, length := utils.ComputeMd5AndLength(file) + _, _ = file.Seek(0, io.SeekStart) + + if err := input.encrypt(s.SessionKey); err != nil { + return nil, errors.Wrap(err, "encrypt error") + } + + // for small file and small thread count, + // use UploadBDH instead of UploadBDHMultiThread + if length < 1024*1024*3 || threadCount < 2 { + input.Body = file + return s.UploadBDH(input) + } + + type BlockMetaData struct { + Id int + BeginOffset int64 + EndOffset int64 + } + const blockSize int64 = 1024 * 512 + var ( + blocks []*BlockMetaData + rspExt []byte + BlockId = ^uint32(0) // -1 + uploadedCount uint32 + cond = sync.NewCond(&sync.Mutex{}) + ) + // Init Blocks + { + var temp int64 = 0 + for temp+blockSize < stat.Size() { + blocks = append(blocks, &BlockMetaData{ + Id: len(blocks), + BeginOffset: temp, + EndOffset: temp + blockSize, + }) + temp += blockSize + } + blocks = append(blocks, &BlockMetaData{ + Id: len(blocks), + BeginOffset: temp, + EndOffset: stat.Size(), + }) + } + doUpload := func() error { + // send signal complete uploading + defer cond.Signal() + + conn, err := net.DialTimeout("tcp", addr, time.Second*20) + if err != nil { + return errors.Wrap(err, "connect error") + } + defer conn.Close() + chunk, _ := os.OpenFile(input.File, os.O_RDONLY, 0o666) + defer chunk.Close() + reader := binary.NewNetworkReader(conn) + if err = s.sendEcho(conn); err != nil { + return err + } + + buffer := make([]byte, blockSize) + w := binary.SelectWriter() + w.Reset() + w.Grow(600 * 1024) // 复用,600k 不要放回池中 + for { + nextId := atomic.AddUint32(&BlockId, 1) + if nextId >= uint32(len(blocks)) { + break + } + block := blocks[nextId] + if block.Id == len(blocks)-1 { + cond.L.Lock() + for atomic.LoadUint32(&uploadedCount) != uint32(len(blocks))-1 { + cond.Wait() + } + cond.L.Unlock() + } + buffer = buffer[:blockSize] + _, _ = chunk.Seek(block.BeginOffset, io.SeekStart) + ri, err := io.ReadFull(chunk, buffer) + if err != nil { + if err == io.EOF { + break + } + if err == io.ErrUnexpectedEOF { + buffer = buffer[:ri] + } else { + return err + } + } + ch := md5.Sum(buffer) + head, _ := proto.Marshal(&pb.ReqDataHighwayHead{ + MsgBasehead: &pb.DataHighwayHead{ + Version: 1, + Uin: s.uin, + Command: "PicUp.DataUp", + Seq: s.nextSeq(), + Appid: s.appID, + Dataflag: 4096, + CommandId: input.CommandID, + LocaleId: 2052, + }, + MsgSeghead: &pb.SegHead{ + Filesize: stat.Size(), + Dataoffset: block.BeginOffset, + Datalength: int32(ri), + Serviceticket: input.Ticket, + Md5: ch[:], + FileMd5: sum, + }, + ReqExtendinfo: input.Ext, + }) + w.Reset() + writeHeadBody(w, head, buffer) + _, err = conn.Write(w.Bytes()) + if err != nil { + return errors.Wrap(err, "write conn error") + } + rspHead, _, err := readResponse(reader) + if err != nil { + return errors.Wrap(err, "highway upload error") + } + if rspHead.ErrorCode != 0 { + return errors.Errorf("upload failed: %d", rspHead.ErrorCode) + } + if rspHead.RspExtendinfo != nil { + rspExt = rspHead.RspExtendinfo + } + atomic.AddUint32(&uploadedCount, 1) + } + return nil + } + + group := errgroup.Group{} + for i := 0; i < threadCount; i++ { + group.Go(doUpload) + } + err = group.Wait() + return rspExt, err +} diff --git a/client/internal/highway/highway.go b/client/internal/highway/highway.go new file mode 100644 index 00000000..7a34f10d --- /dev/null +++ b/client/internal/highway/highway.go @@ -0,0 +1,265 @@ +package highway + +import ( + "bytes" + "crypto/md5" + "fmt" + "io" + "net" + "net/http" + "strconv" + "sync/atomic" + + "github.com/pkg/errors" + + "github.com/Mrs4s/MiraiGo/binary" + "github.com/Mrs4s/MiraiGo/client/pb" + "github.com/Mrs4s/MiraiGo/internal/proto" + "github.com/Mrs4s/MiraiGo/utils" +) + +type Session struct { + SigSession []byte + SessionKey []byte + SsoAddr []Addr + + seq int32 + appID int32 + uin string +} + +func NewSession(appID int32, uin int64) *Session { + return &Session{ + appID: appID, + uin: strconv.FormatInt(uin, 10), + } +} + +func (s *Session) AddrLength() int { + return len(s.SsoAddr) +} + +func (s *Session) AppendAddr(ip, port uint32) { + addr := Addr{ + IP: ip, + Port: int(port), + } + s.SsoAddr = append(s.SsoAddr, addr) +} + +type Input struct { + CommandID int32 + Key []byte + Body io.ReadSeeker +} + +func (s *Session) Upload(addr Addr, input Input) error { + fh, length := utils.ComputeMd5AndLength(input.Body) + _, _ = input.Body.Seek(0, io.SeekStart) + conn, err := net.DialTCP("tcp", nil, addr.asTcpAddr()) + if err != nil { + return errors.Wrap(err, "connect error") + } + defer conn.Close() + + const chunkSize = 8192 * 8 + chunk := make([]byte, chunkSize) + offset := 0 + reader := binary.NewNetworkReader(conn) + w := binary.SelectWriter() + defer binary.PutWriter(w) + for { + chunk = chunk[:chunkSize] + rl, err := io.ReadFull(input.Body, chunk) + if errors.Is(err, io.EOF) { + break + } + if errors.Is(err, io.ErrUnexpectedEOF) { + chunk = chunk[:rl] + } + ch := md5.Sum(chunk) + head, _ := proto.Marshal(&pb.ReqDataHighwayHead{ + MsgBasehead: &pb.DataHighwayHead{ + Version: 1, + Uin: s.uin, + Command: "PicUp.DataUp", + Seq: s.nextSeq(), + Appid: s.appID, + Dataflag: 4096, + CommandId: input.CommandID, + LocaleId: 2052, + }, + MsgSeghead: &pb.SegHead{ + Filesize: length, + Dataoffset: int64(offset), + Datalength: int32(rl), + Serviceticket: input.Key, + Md5: ch[:], + FileMd5: fh, + }, + ReqExtendinfo: []byte{}, + }) + offset += rl + w.Reset() + writeHeadBody(w, head, chunk) + _, err = conn.Write(w.Bytes()) + if err != nil { + return errors.Wrap(err, "write conn error") + } + rspHead, _, err := readResponse(reader) + if err != nil { + return errors.Wrap(err, "highway upload error") + } + if rspHead.ErrorCode != 0 { + return errors.New("upload failed") + } + } + return nil +} + +type ExcitingInput struct { + CommandID int32 + Body io.ReadSeeker + Ticket []byte + Ext []byte +} + +func (s *Session) UploadExciting(input ExcitingInput) ([]byte, error) { + fileMd5, fileLength := utils.ComputeMd5AndLength(input.Body) + _, _ = input.Body.Seek(0, io.SeekStart) + addr := s.SsoAddr[0] + url := fmt.Sprintf("http://%v/cgi-bin/httpconn?htcmd=0x6FF0087&uin=%v", addr, s.uin) + var ( + rspExt []byte + offset int64 = 0 + chunkSize = 524288 + ) + chunk := make([]byte, chunkSize) + w := binary.SelectWriter() + w.Reset() + w.Grow(600 * 1024) // 复用,600k 不要放回池中 + for { + chunk = chunk[:chunkSize] + rl, err := io.ReadFull(input.Body, chunk) + if err == io.EOF { + break + } + if err == io.ErrUnexpectedEOF { + chunk = chunk[:rl] + } + ch := md5.Sum(chunk) + head, _ := proto.Marshal(&pb.ReqDataHighwayHead{ + MsgBasehead: &pb.DataHighwayHead{ + Version: 1, + Uin: s.uin, + Command: "PicUp.DataUp", + Seq: s.nextSeq(), + Appid: s.appID, + Dataflag: 0, + CommandId: input.CommandID, + LocaleId: 0, + }, + MsgSeghead: &pb.SegHead{ + Filesize: fileLength, + Dataoffset: offset, + Datalength: int32(rl), + Serviceticket: input.Ticket, + Md5: ch[:], + FileMd5: fileMd5, + }, + ReqExtendinfo: input.Ext, + }) + offset += int64(rl) + w.Reset() + writeHeadBody(w, head, chunk) + req, _ := http.NewRequest("POST", url, bytes.NewReader(w.Bytes())) + req.Header.Set("Accept", "*/*") + req.Header.Set("Connection", "Keep-Alive") + req.Header.Set("User-Agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)") + req.Header.Set("Pragma", "no-cache") + rsp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, errors.Wrap(err, "request error") + } + body, _ := io.ReadAll(rsp.Body) + _ = rsp.Body.Close() + r := binary.NewReader(body) + r.ReadByte() + hl := r.ReadInt32() + a2 := r.ReadInt32() + h := r.ReadBytes(int(hl)) + r.ReadBytes(int(a2)) + rspHead := new(pb.RspDataHighwayHead) + if err = proto.Unmarshal(h, rspHead); err != nil { + return nil, errors.Wrap(err, "failed to unmarshal protobuf message") + } + if rspHead.ErrorCode != 0 { + return nil, errors.Errorf("upload failed: %d", rspHead.ErrorCode) + } + if rspHead.RspExtendinfo != nil { + rspExt = rspHead.RspExtendinfo + } + } + return rspExt, nil +} + +func (s *Session) nextSeq() int32 { + return atomic.AddInt32(&s.seq, 2) +} + +func (s *Session) sendHeartbreak(conn net.Conn) error { + head, _ := proto.Marshal(&pb.ReqDataHighwayHead{ + MsgBasehead: &pb.DataHighwayHead{ + Version: 1, + Uin: s.uin, + Command: "PicUp.Echo", + Seq: s.nextSeq(), + Appid: s.appID, + Dataflag: 4096, + CommandId: 0, + LocaleId: 2052, + }, + }) + w := binary.SelectWriter() + writeHeadBody(w, head, nil) + _, err := conn.Write(w.Bytes()) + binary.PutWriter(w) + return err +} + +func (s *Session) sendEcho(conn net.Conn) error { + err := s.sendHeartbreak(conn) + if err != nil { + return errors.Wrap(err, "echo error") + } + if _, _, err = readResponse(binary.NewNetworkReader(conn)); err != nil { + return errors.Wrap(err, "echo error") + } + return nil +} + +func writeHeadBody(w *binary.Writer, head []byte, body []byte) { + w.WriteByte(40) + w.WriteUInt32(uint32(len(head))) + w.WriteUInt32(uint32(len(body))) + w.Write(head) + w.Write(body) + w.WriteByte(41) +} + +func readResponse(r *binary.NetworkReader) (*pb.RspDataHighwayHead, []byte, error) { + _, err := r.ReadByte() + if err != nil { + return nil, nil, errors.Wrap(err, "failed to read byte") + } + hl, _ := r.ReadInt32() + a2, _ := r.ReadInt32() + head, _ := r.ReadBytes(int(hl)) + payload, _ := r.ReadBytes(int(a2)) + _, _ = r.ReadByte() + rsp := new(pb.RspDataHighwayHead) + if err = proto.Unmarshal(head, rsp); err != nil { + return nil, nil, errors.Wrap(err, "failed to unmarshal protobuf message") + } + return rsp, payload, nil +} diff --git a/client/network.go b/client/network.go index 7ae16f86..d4b4e42b 100644 --- a/client/network.go +++ b/client/network.go @@ -3,7 +3,6 @@ package client import ( "net" "runtime/debug" - "strings" "sync" "sync/atomic" "time" @@ -40,7 +39,8 @@ func (c *QQClient) ConnectionQualityTest() *ConnectionQualityInfo { r := &ConnectionQualityInfo{} wg := sync.WaitGroup{} wg.Add(2) - go func(w *sync.WaitGroup) { + go func() { + defer wg.Done() var err error if r.ChatServerLatency, err = qualityTest(c.servers[c.currServerIndex].String()); err != nil { @@ -57,24 +57,22 @@ func (c *QQClient) ConnectionQualityTest() *ConnectionQualityInfo { c.Error("resolve long message server error: %v", err) r.LongMessageServerLatency = 9999 } - if len(c.srvSsoAddrs) > 0 { - if r.SrvServerLatency, err = qualityTest(c.srvSsoAddrs[0]); err != nil { + if c.highwaySession.AddrLength() > 0 { + if r.SrvServerLatency, err = qualityTest(c.highwaySession.SsoAddr[0].String()); err != nil { c.Error("test srv server latency error: %v", err) r.SrvServerLatency = 9999 } } - - w.Done() - }(&wg) - go func(w *sync.WaitGroup) { + }() + go func() { + defer wg.Done() res := utils.RunICMPPingLoop(&net.IPAddr{IP: c.servers[c.currServerIndex].IP}, 10) r.ChatServerPacketLoss = res.PacketsLoss - if len(c.srvSsoAddrs) > 0 { - res = utils.RunICMPPingLoop(&net.IPAddr{IP: net.ParseIP(strings.Split(c.srvSsoAddrs[0], ":")[0])}, 10) + if c.highwaySession.AddrLength() > 0 { + res = utils.RunICMPPingLoop(&net.IPAddr{IP: c.highwaySession.SsoAddr[0].AsNetIP()}, 10) r.SrvServerPacketLoss = res.PacketsLoss } - w.Done() - }(&wg) + }() start := time.Now() if _, err := utils.HttpGetBytes("https://ssl.htdata.qq.com", ""); err == nil { r.LongMessageServerResponseLatency = time.Now().Sub(start).Milliseconds() diff --git a/client/ptt.go b/client/ptt.go index 1828b9e1..62603f7e 100644 --- a/client/ptt.go +++ b/client/ptt.go @@ -9,6 +9,7 @@ import ( "github.com/pkg/errors" "github.com/Mrs4s/MiraiGo/binary" + "github.com/Mrs4s/MiraiGo/client/internal/highway" "github.com/Mrs4s/MiraiGo/client/pb/cmd0x346" "github.com/Mrs4s/MiraiGo/client/pb/cmd0x388" "github.com/Mrs4s/MiraiGo/client/pb/msg" @@ -28,17 +29,21 @@ var pttWaiter = utils.NewUploadWaiter() // UploadGroupPtt 将语音数据使用群语音通道上传到服务器, 返回 message.GroupVoiceElement 可直接发送 func (c *QQClient) UploadGroupPtt(groupCode int64, voice io.ReadSeeker) (*message.GroupVoiceElement, error) { - h := md5.New() - length, _ := io.Copy(h, voice) - fh := h.Sum(nil) + fh, length := utils.ComputeMd5AndLength(voice) _, _ = voice.Seek(0, io.SeekStart) - key := hex.EncodeToString(fh) + key := string(fh) pttWaiter.Wait(key) defer pttWaiter.Done(key) ext := c.buildGroupPttStoreBDHExt(groupCode, fh[:], int32(length), 0, int32(length)) - rsp, err := c.highwayUploadByBDH(voice, length, 29, c.bigDataSession.SigSession, fh, ext, false) + rsp, err := c.highwaySession.UploadBDH(highway.BdhInput{ + CommandID: 29, + Body: voice, + Ticket: c.highwaySession.SigSession, + Ext: ext, + Encrypt: false, + }) if err != nil { return nil, err } @@ -78,7 +83,13 @@ func (c *QQClient) UploadPrivatePtt(target int64, voice io.ReadSeeker) (*message defer pttWaiter.Done(key) ext := c.buildC2CPttStoreBDHExt(target, fh[:], int32(length), int32(length)) - rsp, err := c.highwayUploadByBDH(voice, length, 26, c.bigDataSession.SigSession, fh, ext, false) + rsp, err := c.highwaySession.UploadBDH(highway.BdhInput{ + CommandID: 26, + Body: voice, + Ticket: c.highwaySession.SigSession, + Ext: ext, + Encrypt: false, + }) if err != nil { return nil, err } @@ -135,12 +146,17 @@ func (c *QQClient) UploadGroupShortVideo(groupCode int64, video, thumb io.ReadSe }, nil } ext, _ := proto.Marshal(c.buildPttGroupShortVideoProto(videoHash, thumbHash, groupCode, videoLen, thumbLen, 1).PttShortVideoUploadReq) + var hwRsp []byte multi := utils.MultiReadSeeker(thumb, video) - h := md5.New() - length, _ := io.Copy(h, multi) - fh := h.Sum(nil) - _, _ = multi.Seek(0, io.SeekStart) + input := highway.BdhInput{ + CommandID: 25, + File: cache, + Body: multi, + Ticket: c.highwaySession.SigSession, + Ext: ext, + Encrypt: true, + } if cache != "" { var file *os.File file, err = os.OpenFile(cache, os.O_WRONLY|os.O_CREATE, 0o666) @@ -149,14 +165,14 @@ func (c *QQClient) UploadGroupShortVideo(groupCode int64, video, thumb io.ReadSe return err } if err != nil || cp() != nil { - hwRsp, err = c.highwayUploadByBDH(multi, length, 25, c.bigDataSession.SigSession, fh, ext, true) + hwRsp, err = c.highwaySession.UploadBDH(input) } else { _ = file.Close() - hwRsp, err = c.highwayUploadFileMultiThreadingByBDH(cache, 25, 8, c.bigDataSession.SigSession, ext, true) + hwRsp, err = c.highwaySession.UploadBDHMultiThread(input, 8) _ = os.Remove(cache) } } else { - hwRsp, err = c.highwayUploadByBDH(multi, length, 25, c.bigDataSession.SigSession, fh, ext, true) + hwRsp, err = c.highwaySession.UploadBDH(input) } if err != nil { return nil, errors.Wrap(err, "upload video file error") diff --git a/internal/generator/jce_gen/main.go b/internal/generator/jce_gen/main.go index 734dc558..ed418bd0 100644 --- a/internal/generator/jce_gen/main.go +++ b/internal/generator/jce_gen/main.go @@ -80,7 +80,7 @@ func main() { fmt.Printf("%s\n", buf.Bytes()) panic(err) } - _ = os.WriteFile(*output, formated, 0644) + _ = os.WriteFile(*output, formated, 0o644) } func (g Generator) Generate(w io.Writer) { diff --git a/topic/feed.go b/topic/feed.go index 9997727f..b7bfa146 100644 --- a/topic/feed.go +++ b/topic/feed.go @@ -58,9 +58,7 @@ type ( content map[string]interface{} ) -var ( - globalBlockId int64 = 0 -) +var globalBlockId int64 = 0 func genBlockId() string { id := atomic.AddInt64(&globalBlockId, 1)