diff --git a/client/c2c_processor.go b/client/c2c_processor.go index 5e67fc74..bcc447c2 100644 --- a/client/c2c_processor.go +++ b/client/c2c_processor.go @@ -263,7 +263,7 @@ func troopAddMemberBroadcastDecoder(c *QQClient, pMsg *msg.Message, _ *incomingP func systemMessageDecoder(c *QQClient, _ *msg.Message, _ *incomingPacketInfo) { _, pkt := c.buildSystemMsgNewFriendPacket() - _ = c.send(pkt) + _ = c.sendPacket(pkt) } func troopSystemMessageDecoder(c *QQClient, pMsg *msg.Message, info *incomingPacketInfo) { @@ -291,7 +291,7 @@ func msgType0x211Decoder(c *QQClient, pMsg *msg.Message, info *incomingPacketInf c.Error("unmarshal sub msg 0x4 error: %v", err) return } - if sub4.NotOnlineFile != nil && sub4.NotOnlineFile.GetSubcmd() == 1 { // subcmd: 1 -> send, 2-> recv + if sub4.NotOnlineFile != nil && sub4.NotOnlineFile.GetSubcmd() == 1 { // subcmd: 1 -> sendPacket, 2-> recv rsp, err := c.sendAndWait(c.buildOfflineFileDownloadRequestPacket(sub4.NotOnlineFile.FileUuid)) // offline_file.go if err != nil { return diff --git a/client/client.go b/client/client.go index dcd4b9a4..91f69544 100644 --- a/client/client.go +++ b/client/client.go @@ -6,7 +6,6 @@ import ( "math" "math/rand" "net" - "runtime/debug" "sort" "sync" "sync/atomic" @@ -206,14 +205,12 @@ func NewClientMd5(uin int64, passwordMd5 [16]byte) *QQClient { groupSeq: int32(rand.Intn(20000)), friendSeq: 22911, highwayApplyUpSeq: 77918, - // ksid: []byte(fmt.Sprintf("|%s|A8.2.7.27f6ea96", SystemDeviceInfo.IMEI)), - eventHandlers: &eventHandlers{}, - msgSvcCache: utils.NewCache(time.Second * 15), - transCache: utils.NewCache(time.Second * 15), - onlinePushCache: utils.NewCache(time.Second * 15), - // version: genVersionInfo(SystemDeviceInfo.Protocol), - servers: []*net.TCPAddr{}, - alive: true, + eventHandlers: &eventHandlers{}, + msgSvcCache: utils.NewCache(time.Second * 15), + transCache: utils.NewCache(time.Second * 15), + onlinePushCache: utils.NewCache(time.Second * 15), + servers: []*net.TCPAddr{}, + alive: true, } cli.UseDevice(SystemDeviceInfo) sso, err := getSSOAddress() @@ -762,16 +759,16 @@ func (c *QQClient) SolveGroupJoinRequest(i interface{}, accept, block bool, reas return 1 } }(), false, accept, block, reason) - _ = c.send(pkt) + _ = c.sendPacket(pkt) case *GroupInvitedRequest: _, pkt := c.buildSystemMsgGroupActionPacket(req.RequestId, req.InvitorUin, req.GroupCode, 1, true, accept, block, reason) - _ = c.send(pkt) + _ = c.sendPacket(pkt) } } func (c *QQClient) SolveFriendRequest(req *NewFriendRequest, accept bool) { _, pkt := c.buildSystemMsgFriendActionPacket(req.RequestId, req.RequesterUin, accept) - _ = c.send(pkt) + _ = c.sendPacket(pkt) } func (c *QQClient) getSKey() string { @@ -858,7 +855,7 @@ func (c *QQClient) SetCustomServer(servers []*net.TCPAddr) { func (c *QQClient) SendGroupGift(groupCode, uin uint64, gift message.GroupGift) { _, packet := c.sendGroupGiftPacket(groupCode, uin, gift) - _ = c.send(packet) + _ = c.sendPacket(packet) } func (c *QQClient) registerClient() error { @@ -893,220 +890,6 @@ func (c *QQClient) nextHighwayApplySeq() int32 { return atomic.AddInt32(&c.highwayApplyUpSeq, 2) } -func (c *QQClient) send(pkt []byte) error { - err := c.TCP.Write(pkt) - if err != nil { - atomic.AddUint64(&c.stat.PacketLost, 1) - } else { - atomic.AddUint64(&c.stat.PacketSent, 1) - } - return errors.Wrap(err, "Packet failed to send") -} - -func (c *QQClient) sendAndWait(seq uint16, pkt []byte, params ...requestParams) (interface{}, error) { - type T struct { - Response interface{} - Error error - } - - ch := make(chan T) - defer close(ch) - - p := func() requestParams { - if len(params) == 0 { - return nil - } - return params[0] - }() - - c.handlers.Store(seq, &handlerInfo{fun: func(i interface{}, err error) { - ch <- T{ - Response: i, - Error: err, - } - }, params: p}) - - err := c.send(pkt) - if err != nil { - c.handlers.Delete(seq) - return nil, err - } - - retry := 0 - for { - select { - case rsp := <-ch: - return rsp.Response, rsp.Error - case <-time.After(time.Second * 15): - retry++ - if retry < 2 { - _ = c.send(pkt) - continue - } - c.handlers.Delete(seq) - // c.Error("packet timed out, seq: %v", seq) - // println("Packet Timed out") - return nil, errors.New("Packet timed out") - } - } -} - -// 等待一个或多个数据包解析, 优先级低于 sendAndWait -// 返回终止解析函数 -func (c *QQClient) waitPacket(cmd string, f func(interface{}, error)) func() { - c.waiters.Store(cmd, f) - return func() { - c.waiters.Delete(cmd) - } -} - -func (c *QQClient) connect() error { - c.Info("connect to server: %v", c.servers[c.currServerIndex].String()) - err := c.TCP.Connect(c.servers[c.currServerIndex]) - c.currServerIndex++ - if c.currServerIndex == len(c.servers) { - c.currServerIndex = 0 - } - if err != nil { - c.retryTimes++ - if c.retryTimes > len(c.servers) { - return errors.New("All servers are unreachable") - } - c.Error("connect server error: %v", err) - return err - } - c.retryTimes = 0 - c.ConnectTime = time.Now() - return nil -} - -func (c *QQClient) quickReconnect() { - c.Disconnect() - time.Sleep(time.Millisecond * 200) - if err := c.connect(); err != nil { - c.Error("connect server error: %v", err) - c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "quick reconnect failed"}) - return - } - if err := c.registerClient(); err != nil { - c.Error("register client failed: %v", err) - c.Disconnect() - c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "register error"}) - return - } -} - -func (c *QQClient) Disconnect() { - c.Online = false - c.TCP.Close() -} - -func (c *QQClient) plannedDisconnect(_ *utils.TCPListener) { - c.Debug("planned disconnect.") - atomic.AddUint32(&c.stat.DisconnectTimes, 1) - c.Online = false -} - -func (c *QQClient) unexpectedDisconnect(_ *utils.TCPListener, e error) { - c.Error("unexpected disconnect: %v", e) - atomic.AddUint32(&c.stat.DisconnectTimes, 1) - c.Online = false - if err := c.connect(); err != nil { - c.Error("connect server error: %v", err) - c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "connection dropped by server."}) - return - } - if err := c.registerClient(); err != nil { - c.Error("register client failed: %v", err) - c.Disconnect() - c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "register error"}) - return - } -} - -func (c *QQClient) netLoop() { - errCount := 0 - for c.alive { - l, err := c.TCP.ReadInt32() - if err != nil { - time.Sleep(time.Millisecond * 500) - continue - } - data, _ := c.TCP.ReadBytes(int(l) - 4) - pkt, err := packets.ParseIncomingPacket(data, c.sigInfo.d2Key) - if err != nil { - c.Error("parse incoming packet error: %v", err) - if errors.Is(err, packets.ErrSessionExpired) || errors.Is(err, packets.ErrPacketDropped) { - c.Disconnect() - go c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "session expired"}) - continue - } - errCount++ - if errCount > 2 { - go c.quickReconnect() - continue - } - continue - } - payload := pkt.Payload - if pkt.Flag2 == 2 { - payload, err = pkt.DecryptPayload(c.RandomKey, c.sigInfo.wtSessionTicketKey) - if err != nil { - c.Error("decrypt payload error: %v", err) - continue - } - } - errCount = 0 - c.Debug("rev pkt: %v seq: %v", pkt.CommandName, pkt.SequenceId) - atomic.AddUint64(&c.stat.PacketReceived, 1) - go func() { - defer func() { - if pan := recover(); pan != nil { - c.Error("panic on decoder %v : %v\n%s", pkt.CommandName, pan, debug.Stack()) - } - }() - - if decoder, ok := decoders[pkt.CommandName]; ok { - // found predefined decoder - info, ok := c.handlers.LoadAndDelete(pkt.SequenceId) - rsp, err := decoder(c, &incomingPacketInfo{ - SequenceId: pkt.SequenceId, - CommandName: pkt.CommandName, - Params: func() requestParams { - if !ok { - return nil - } - return info.params - }(), - }, payload) - if err != nil { - c.Debug("decode pkt %v error: %+v", pkt.CommandName, err) - } - if ok { - info.fun(rsp, err) - } else if f, ok := c.waiters.Load(pkt.CommandName); ok { // 在不存在handler的情况下触发wait - f.(func(interface{}, error))(rsp, err) - } - } else if f, ok := c.handlers.LoadAndDelete(pkt.SequenceId); ok { - // does not need decoder - f.fun(nil, nil) - } else { - c.Debug("Unhandled Command: %s\nSeq: %d\nThis message can be ignored.", pkt.CommandName, pkt.SequenceId) - } - }() - } - /* - c.NetLooping = false - c.Online = false - _ = c.TCP.Close() - if c.lastLostMsg == "" { - c.lastLostMsg = "Connection lost." - } - c.stat.LostTimes++ - c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: c.lastLostMsg}) - */ -} - func (c *QQClient) doHeartbeat() { c.heartbeatEnabled = true times := 0 diff --git a/client/decoders.go b/client/decoders.go index 65ba2c9d..9856ac39 100644 --- a/client/decoders.go +++ b/client/decoders.go @@ -369,7 +369,7 @@ func decodePushReqPacket(c *QQClient, _ *incomingPacketInfo, payload []byte) (in seq := r.ReadInt64(3) _, pkt := c.buildConfPushRespPacket(t, seq, jceBuf) - return nil, c.send(pkt) + return nil, c.sendPacket(pkt) } // MessageSvc.PbGetMsg @@ -401,7 +401,7 @@ func decodeSvcNotify(c *QQClient, _ *incomingPacketInfo, payload []byte) (interf } if _, ok := sysMsgDecoders[notify.MsgType]; ok { _, pkt := c.buildSystemMsgNewFriendPacket() - return nil, c.send(pkt) + return nil, c.sendPacket(pkt) } _, err := c.sendAndWait(c.buildGetMessageRequestPacket(msg.SyncFlag_START, time.Now().Unix())) return nil, err diff --git a/client/group_file.go b/client/group_file.go index a5c8ca32..35ba0107 100644 --- a/client/group_file.go +++ b/client/group_file.go @@ -192,7 +192,7 @@ func (fs *GroupFileSystem) UploadFile(p, name, folderId string) error { rsp := i.(*oidb.UploadFileRspBody) if rsp.BoolFileExist { _, pkt := fs.client.buildGroupFileFeedsRequest(fs.GroupCode, rsp.FileId, rsp.BusId, rand.Int31()) - return fs.client.send(pkt) + return fs.client.sendPacket(pkt) } if len(rsp.UploadIpLanV4) == 0 { return errors.New("server requires unsupported ftn upload") @@ -238,7 +238,7 @@ func (fs *GroupFileSystem) UploadFile(p, name, folderId string) error { return errors.Wrap(err, "upload failed") } _, pkt := fs.client.buildGroupFileFeedsRequest(fs.GroupCode, rsp.FileId, rsp.BusId, rand.Int31()) - return fs.client.send(pkt) + return fs.client.sendPacket(pkt) } func (fs *GroupFileSystem) GetDownloadUrl(file *GroupFile) string { diff --git a/client/group_msg.go b/client/group_msg.go index 31e1be8f..0003d036 100644 --- a/client/group_msg.go +++ b/client/group_msg.go @@ -123,11 +123,11 @@ func (c *QQClient) sendGroupMessage(groupCode int64, forward bool, m *message.Se fragmented := m.ToFragmented() for i, elems := range fragmented { _, pkt := c.buildGroupSendingPacket(groupCode, mr, int32(len(fragmented)), int32(i), div, forward, elems) - _ = c.send(pkt) + _ = c.sendPacket(pkt) } } else { _, pkt := c.buildGroupSendingPacket(groupCode, mr, 1, 0, 0, forward, m.Elements) - _ = c.send(pkt) + _ = c.sendPacket(pkt) } var mid int32 ret := &message.GroupMessage{ @@ -353,9 +353,9 @@ func decodeMsgSendResponse(c *QQClient, _ *incomingPacketInfo, payload []byte) ( switch rsp.GetResult() { case 0: // OK. case 55: - c.Error("send msg error: %v Bot has blocked target's content", rsp.GetResult()) + c.Error("sendPacket msg error: %v Bot has blocked target's content", rsp.GetResult()) default: - c.Error("send msg error: %v %v", rsp.GetResult(), rsp.GetErrMsg()) + c.Error("sendPacket msg error: %v %v", rsp.GetResult(), rsp.GetErrMsg()) } return nil, nil } diff --git a/client/network.go b/client/network.go new file mode 100644 index 00000000..92609431 --- /dev/null +++ b/client/network.go @@ -0,0 +1,223 @@ +package client + +import ( + "github.com/Mrs4s/MiraiGo/protocol/packets" + "github.com/Mrs4s/MiraiGo/utils" + "github.com/pkg/errors" + "runtime/debug" + "sync/atomic" + "time" +) + +// connect 连接到 QQClient.servers 中的服务器 +func (c *QQClient) connect() error { + c.Info("connect to server: %v", c.servers[c.currServerIndex].String()) + err := c.TCP.Connect(c.servers[c.currServerIndex]) + c.currServerIndex++ + if c.currServerIndex == len(c.servers) { + c.currServerIndex = 0 + } + if err != nil { + c.retryTimes++ + if c.retryTimes > len(c.servers) { + return errors.New("All servers are unreachable") + } + c.Error("connect server error: %v", err) + return err + } + c.retryTimes = 0 + c.ConnectTime = time.Now() + return nil +} + +// quickReconnect 快速重连 +func (c *QQClient) quickReconnect() { + c.Disconnect() + time.Sleep(time.Millisecond * 200) + if err := c.connect(); err != nil { + c.Error("connect server error: %v", err) + c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "quick reconnect failed"}) + return + } + if err := c.registerClient(); err != nil { + c.Error("register client failed: %v", err) + c.Disconnect() + c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "register error"}) + return + } +} + +// Disconnect 中断连接, 不释放资源 +func (c *QQClient) Disconnect() { + c.Online = false + c.TCP.Close() +} + +// sendAndWait 向服务器发送一个数据包, 并等待返回 +func (c *QQClient) sendAndWait(seq uint16, pkt []byte, params ...requestParams) (interface{}, error) { + type T struct { + Response interface{} + Error error + } + + ch := make(chan T) + defer close(ch) + + p := func() requestParams { + if len(params) == 0 { + return nil + } + return params[0] + }() + + c.handlers.Store(seq, &handlerInfo{fun: func(i interface{}, err error) { + ch <- T{ + Response: i, + Error: err, + } + }, params: p}) + + err := c.sendPacket(pkt) + if err != nil { + c.handlers.Delete(seq) + return nil, err + } + + retry := 0 + for { + select { + case rsp := <-ch: + return rsp.Response, rsp.Error + case <-time.After(time.Second * 15): + retry++ + if retry < 2 { + _ = c.sendPacket(pkt) + continue + } + c.handlers.Delete(seq) + // c.Error("packet timed out, seq: %v", seq) + // println("Packet Timed out") + return nil, errors.New("Packet timed out") + } + } +} + +// sendPacket 向服务器发送一个数据包 +func (c *QQClient) sendPacket(pkt []byte) error { + err := c.TCP.Write(pkt) + if err != nil { + atomic.AddUint64(&c.stat.PacketLost, 1) + } else { + atomic.AddUint64(&c.stat.PacketSent, 1) + } + return errors.Wrap(err, "Packet failed to sendPacket") +} + +// waitPacket +// 等待一个或多个数据包解析, 优先级低于 sendAndWait +// 返回终止解析函数 +func (c *QQClient) waitPacket(cmd string, f func(interface{}, error)) func() { + c.waiters.Store(cmd, f) + return func() { + c.waiters.Delete(cmd) + } +} + +// plannedDisconnect 计划中断线事件 +func (c *QQClient) plannedDisconnect(_ *utils.TCPListener) { + c.Debug("planned disconnect.") + atomic.AddUint32(&c.stat.DisconnectTimes, 1) + c.Online = false +} + +// unexpectedDisconnect 非预期断线事件 +func (c *QQClient) unexpectedDisconnect(_ *utils.TCPListener, e error) { + c.Error("unexpected disconnect: %v", e) + atomic.AddUint32(&c.stat.DisconnectTimes, 1) + c.Online = false + if err := c.connect(); err != nil { + c.Error("connect server error: %v", err) + c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "connection dropped by server."}) + return + } + if err := c.registerClient(); err != nil { + c.Error("register client failed: %v", err) + c.Disconnect() + c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "register error"}) + return + } +} + +// netLoop 通过循环来不停接收数据包 +func (c *QQClient) netLoop() { + errCount := 0 + for c.alive { + l, err := c.TCP.ReadInt32() + if err != nil { + time.Sleep(time.Millisecond * 500) + continue + } + data, _ := c.TCP.ReadBytes(int(l) - 4) + pkt, err := packets.ParseIncomingPacket(data, c.sigInfo.d2Key) + if err != nil { + c.Error("parse incoming packet error: %v", err) + if errors.Is(err, packets.ErrSessionExpired) || errors.Is(err, packets.ErrPacketDropped) { + c.Disconnect() + go c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "session expired"}) + continue + } + errCount++ + if errCount > 2 { + go c.quickReconnect() + continue + } + continue + } + payload := pkt.Payload + if pkt.Flag2 == 2 { + payload, err = pkt.DecryptPayload(c.RandomKey, c.sigInfo.wtSessionTicketKey) + if err != nil { + c.Error("decrypt payload error: %v", err) + continue + } + } + errCount = 0 + c.Debug("rev pkt: %v seq: %v", pkt.CommandName, pkt.SequenceId) + atomic.AddUint64(&c.stat.PacketReceived, 1) + go func() { + defer func() { + if pan := recover(); pan != nil { + c.Error("panic on decoder %v : %v\n%s", pkt.CommandName, pan, debug.Stack()) + } + }() + + if decoder, ok := decoders[pkt.CommandName]; ok { + // found predefined decoder + info, ok := c.handlers.LoadAndDelete(pkt.SequenceId) + rsp, err := decoder(c, &incomingPacketInfo{ + SequenceId: pkt.SequenceId, + CommandName: pkt.CommandName, + Params: func() requestParams { + if !ok { + return nil + } + return info.params + }(), + }, payload) + if err != nil { + c.Debug("decode pkt %v error: %+v", pkt.CommandName, err) + } + if ok { + info.fun(rsp, err) + } else if f, ok := c.waiters.Load(pkt.CommandName); ok { // 在不存在handler的情况下触发wait + f.(func(interface{}, error))(rsp, err) + } + } else if f, ok := c.handlers.LoadAndDelete(pkt.SequenceId); ok { + // does not need decoder + f.fun(nil, nil) + } else { + c.Debug("Unhandled Command: %s\nSeq: %d\nThis message can be ignored.", pkt.CommandName, pkt.SequenceId) + } + }() + } +} diff --git a/client/online_push.go b/client/online_push.go index ede6ca3a..5eb68040 100644 --- a/client/online_push.go +++ b/client/online_push.go @@ -32,7 +32,7 @@ func decodeOnlinePushReqPacket(c *QQClient, info *incomingPacketInfo, payload [] msgInfos := []jce.PushMessageInfo{} uin := jr.ReadInt64(0) jr.ReadSlice(&msgInfos, 2) - _ = c.send(c.buildDeleteOnlinePushPacket(uin, 0, nil, info.SequenceId, msgInfos)) + _ = c.sendPacket(c.buildDeleteOnlinePushPacket(uin, 0, nil, info.SequenceId, msgInfos)) for _, m := range msgInfos { k := fmt.Sprintf("%v%v%v", m.MsgSeq, m.MsgTime, m.MsgUid) if _, ok := c.onlinePushCache.Get(k); ok { diff --git a/client/private_msg.go b/client/private_msg.go index a692322c..aa199448 100644 --- a/client/private_msg.go +++ b/client/private_msg.go @@ -40,12 +40,12 @@ func (c *QQClient) SendPrivateMessage(target int64, m *message.SendingMessage) * seq = fseq } _, pkt := c.buildFriendSendingPacket(target, fseq, mr, int32(len(fragmented)), int32(i), div, t, elems) - _ = c.send(pkt) + _ = c.sendPacket(pkt) } } else { seq = c.nextFriendSeq() _, pkt := c.buildFriendSendingPacket(target, seq, mr, 1, 0, 0, t, m.Elements) - _ = c.send(pkt) + _ = c.sendPacket(pkt) } atomic.AddUint64(&c.stat.MessageSent, 1) ret := &message.PrivateMessage{ @@ -85,7 +85,7 @@ func (c *QQClient) SendGroupTempMessage(groupCode, target int64, m *message.Send seq := c.nextFriendSeq() t := time.Now().Unix() _, pkt := c.buildGroupTempSendingPacket(group.Uin, target, seq, mr, t, m) - _ = c.send(pkt) + _ = c.sendPacket(pkt) atomic.AddUint64(&c.stat.MessageSent, 1) return &message.TempMessage{ Id: seq, @@ -106,7 +106,7 @@ func (c *QQClient) sendWPATempMessage(target int64, sig []byte, m *message.Sendi seq := c.nextFriendSeq() t := time.Now().Unix() _, pkt := c.buildWPATempSendingPacket(target, sig, seq, mr, t, m) - _ = c.send(pkt) + _ = c.sendPacket(pkt) atomic.AddUint64(&c.stat.MessageSent, 1) return &message.TempMessage{ Id: seq, diff --git a/client/sync.go b/client/sync.go index 02249a0e..e3456805 100644 --- a/client/sync.go +++ b/client/sync.go @@ -98,7 +98,7 @@ func (c *QQClient) SyncSessions() (*SessionSyncResponse, error) { } }) _, pkt := c.buildSyncMsgRequestPacket() - if err := c.send(pkt); err != nil { + if err := c.sendPacket(pkt); err != nil { stop() return nil, err } @@ -414,7 +414,7 @@ func decodeC2CSyncPacket(c *QQClient, info *incomingPacketInfo, payload []byte) if err := proto.Unmarshal(payload, &m); err != nil { return nil, err } - _ = c.send(c.buildDeleteOnlinePushPacket(c.Uin, m.GetSvrip(), m.GetPushToken(), info.SequenceId, nil)) + _ = c.sendPacket(c.buildDeleteOnlinePushPacket(c.Uin, m.GetSvrip(), m.GetPushToken(), info.SequenceId, nil)) c.commMsgProcessor(m.Msg, info) return nil, nil }