1
0
mirror of https://github.com/Mrs4s/MiraiGo.git synced 2025-05-04 11:07:40 +08:00

move network logic to network.go.

This commit is contained in:
Mrs4s 2021-07-18 20:10:44 +08:00
parent df059c2a56
commit 36d7eea2dd
No known key found for this signature in database
GPG Key ID: 3186E98FA19CE3A7
9 changed files with 250 additions and 244 deletions

View File

@ -263,7 +263,7 @@ func troopAddMemberBroadcastDecoder(c *QQClient, pMsg *msg.Message, _ *incomingP
func systemMessageDecoder(c *QQClient, _ *msg.Message, _ *incomingPacketInfo) {
_, pkt := c.buildSystemMsgNewFriendPacket()
_ = c.send(pkt)
_ = c.sendPacket(pkt)
}
func troopSystemMessageDecoder(c *QQClient, pMsg *msg.Message, info *incomingPacketInfo) {
@ -291,7 +291,7 @@ func msgType0x211Decoder(c *QQClient, pMsg *msg.Message, info *incomingPacketInf
c.Error("unmarshal sub msg 0x4 error: %v", err)
return
}
if sub4.NotOnlineFile != nil && sub4.NotOnlineFile.GetSubcmd() == 1 { // subcmd: 1 -> send, 2-> recv
if sub4.NotOnlineFile != nil && sub4.NotOnlineFile.GetSubcmd() == 1 { // subcmd: 1 -> sendPacket, 2-> recv
rsp, err := c.sendAndWait(c.buildOfflineFileDownloadRequestPacket(sub4.NotOnlineFile.FileUuid)) // offline_file.go
if err != nil {
return

View File

@ -6,7 +6,6 @@ import (
"math"
"math/rand"
"net"
"runtime/debug"
"sort"
"sync"
"sync/atomic"
@ -206,14 +205,12 @@ func NewClientMd5(uin int64, passwordMd5 [16]byte) *QQClient {
groupSeq: int32(rand.Intn(20000)),
friendSeq: 22911,
highwayApplyUpSeq: 77918,
// ksid: []byte(fmt.Sprintf("|%s|A8.2.7.27f6ea96", SystemDeviceInfo.IMEI)),
eventHandlers: &eventHandlers{},
msgSvcCache: utils.NewCache(time.Second * 15),
transCache: utils.NewCache(time.Second * 15),
onlinePushCache: utils.NewCache(time.Second * 15),
// version: genVersionInfo(SystemDeviceInfo.Protocol),
servers: []*net.TCPAddr{},
alive: true,
eventHandlers: &eventHandlers{},
msgSvcCache: utils.NewCache(time.Second * 15),
transCache: utils.NewCache(time.Second * 15),
onlinePushCache: utils.NewCache(time.Second * 15),
servers: []*net.TCPAddr{},
alive: true,
}
cli.UseDevice(SystemDeviceInfo)
sso, err := getSSOAddress()
@ -762,16 +759,16 @@ func (c *QQClient) SolveGroupJoinRequest(i interface{}, accept, block bool, reas
return 1
}
}(), false, accept, block, reason)
_ = c.send(pkt)
_ = c.sendPacket(pkt)
case *GroupInvitedRequest:
_, pkt := c.buildSystemMsgGroupActionPacket(req.RequestId, req.InvitorUin, req.GroupCode, 1, true, accept, block, reason)
_ = c.send(pkt)
_ = c.sendPacket(pkt)
}
}
func (c *QQClient) SolveFriendRequest(req *NewFriendRequest, accept bool) {
_, pkt := c.buildSystemMsgFriendActionPacket(req.RequestId, req.RequesterUin, accept)
_ = c.send(pkt)
_ = c.sendPacket(pkt)
}
func (c *QQClient) getSKey() string {
@ -858,7 +855,7 @@ func (c *QQClient) SetCustomServer(servers []*net.TCPAddr) {
func (c *QQClient) SendGroupGift(groupCode, uin uint64, gift message.GroupGift) {
_, packet := c.sendGroupGiftPacket(groupCode, uin, gift)
_ = c.send(packet)
_ = c.sendPacket(packet)
}
func (c *QQClient) registerClient() error {
@ -893,220 +890,6 @@ func (c *QQClient) nextHighwayApplySeq() int32 {
return atomic.AddInt32(&c.highwayApplyUpSeq, 2)
}
func (c *QQClient) send(pkt []byte) error {
err := c.TCP.Write(pkt)
if err != nil {
atomic.AddUint64(&c.stat.PacketLost, 1)
} else {
atomic.AddUint64(&c.stat.PacketSent, 1)
}
return errors.Wrap(err, "Packet failed to send")
}
func (c *QQClient) sendAndWait(seq uint16, pkt []byte, params ...requestParams) (interface{}, error) {
type T struct {
Response interface{}
Error error
}
ch := make(chan T)
defer close(ch)
p := func() requestParams {
if len(params) == 0 {
return nil
}
return params[0]
}()
c.handlers.Store(seq, &handlerInfo{fun: func(i interface{}, err error) {
ch <- T{
Response: i,
Error: err,
}
}, params: p})
err := c.send(pkt)
if err != nil {
c.handlers.Delete(seq)
return nil, err
}
retry := 0
for {
select {
case rsp := <-ch:
return rsp.Response, rsp.Error
case <-time.After(time.Second * 15):
retry++
if retry < 2 {
_ = c.send(pkt)
continue
}
c.handlers.Delete(seq)
// c.Error("packet timed out, seq: %v", seq)
// println("Packet Timed out")
return nil, errors.New("Packet timed out")
}
}
}
// 等待一个或多个数据包解析, 优先级低于 sendAndWait
// 返回终止解析函数
func (c *QQClient) waitPacket(cmd string, f func(interface{}, error)) func() {
c.waiters.Store(cmd, f)
return func() {
c.waiters.Delete(cmd)
}
}
func (c *QQClient) connect() error {
c.Info("connect to server: %v", c.servers[c.currServerIndex].String())
err := c.TCP.Connect(c.servers[c.currServerIndex])
c.currServerIndex++
if c.currServerIndex == len(c.servers) {
c.currServerIndex = 0
}
if err != nil {
c.retryTimes++
if c.retryTimes > len(c.servers) {
return errors.New("All servers are unreachable")
}
c.Error("connect server error: %v", err)
return err
}
c.retryTimes = 0
c.ConnectTime = time.Now()
return nil
}
func (c *QQClient) quickReconnect() {
c.Disconnect()
time.Sleep(time.Millisecond * 200)
if err := c.connect(); err != nil {
c.Error("connect server error: %v", err)
c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "quick reconnect failed"})
return
}
if err := c.registerClient(); err != nil {
c.Error("register client failed: %v", err)
c.Disconnect()
c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "register error"})
return
}
}
func (c *QQClient) Disconnect() {
c.Online = false
c.TCP.Close()
}
func (c *QQClient) plannedDisconnect(_ *utils.TCPListener) {
c.Debug("planned disconnect.")
atomic.AddUint32(&c.stat.DisconnectTimes, 1)
c.Online = false
}
func (c *QQClient) unexpectedDisconnect(_ *utils.TCPListener, e error) {
c.Error("unexpected disconnect: %v", e)
atomic.AddUint32(&c.stat.DisconnectTimes, 1)
c.Online = false
if err := c.connect(); err != nil {
c.Error("connect server error: %v", err)
c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "connection dropped by server."})
return
}
if err := c.registerClient(); err != nil {
c.Error("register client failed: %v", err)
c.Disconnect()
c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "register error"})
return
}
}
func (c *QQClient) netLoop() {
errCount := 0
for c.alive {
l, err := c.TCP.ReadInt32()
if err != nil {
time.Sleep(time.Millisecond * 500)
continue
}
data, _ := c.TCP.ReadBytes(int(l) - 4)
pkt, err := packets.ParseIncomingPacket(data, c.sigInfo.d2Key)
if err != nil {
c.Error("parse incoming packet error: %v", err)
if errors.Is(err, packets.ErrSessionExpired) || errors.Is(err, packets.ErrPacketDropped) {
c.Disconnect()
go c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "session expired"})
continue
}
errCount++
if errCount > 2 {
go c.quickReconnect()
continue
}
continue
}
payload := pkt.Payload
if pkt.Flag2 == 2 {
payload, err = pkt.DecryptPayload(c.RandomKey, c.sigInfo.wtSessionTicketKey)
if err != nil {
c.Error("decrypt payload error: %v", err)
continue
}
}
errCount = 0
c.Debug("rev pkt: %v seq: %v", pkt.CommandName, pkt.SequenceId)
atomic.AddUint64(&c.stat.PacketReceived, 1)
go func() {
defer func() {
if pan := recover(); pan != nil {
c.Error("panic on decoder %v : %v\n%s", pkt.CommandName, pan, debug.Stack())
}
}()
if decoder, ok := decoders[pkt.CommandName]; ok {
// found predefined decoder
info, ok := c.handlers.LoadAndDelete(pkt.SequenceId)
rsp, err := decoder(c, &incomingPacketInfo{
SequenceId: pkt.SequenceId,
CommandName: pkt.CommandName,
Params: func() requestParams {
if !ok {
return nil
}
return info.params
}(),
}, payload)
if err != nil {
c.Debug("decode pkt %v error: %+v", pkt.CommandName, err)
}
if ok {
info.fun(rsp, err)
} else if f, ok := c.waiters.Load(pkt.CommandName); ok { // 在不存在handler的情况下触发wait
f.(func(interface{}, error))(rsp, err)
}
} else if f, ok := c.handlers.LoadAndDelete(pkt.SequenceId); ok {
// does not need decoder
f.fun(nil, nil)
} else {
c.Debug("Unhandled Command: %s\nSeq: %d\nThis message can be ignored.", pkt.CommandName, pkt.SequenceId)
}
}()
}
/*
c.NetLooping = false
c.Online = false
_ = c.TCP.Close()
if c.lastLostMsg == "" {
c.lastLostMsg = "Connection lost."
}
c.stat.LostTimes++
c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: c.lastLostMsg})
*/
}
func (c *QQClient) doHeartbeat() {
c.heartbeatEnabled = true
times := 0

View File

@ -369,7 +369,7 @@ func decodePushReqPacket(c *QQClient, _ *incomingPacketInfo, payload []byte) (in
seq := r.ReadInt64(3)
_, pkt := c.buildConfPushRespPacket(t, seq, jceBuf)
return nil, c.send(pkt)
return nil, c.sendPacket(pkt)
}
// MessageSvc.PbGetMsg
@ -401,7 +401,7 @@ func decodeSvcNotify(c *QQClient, _ *incomingPacketInfo, payload []byte) (interf
}
if _, ok := sysMsgDecoders[notify.MsgType]; ok {
_, pkt := c.buildSystemMsgNewFriendPacket()
return nil, c.send(pkt)
return nil, c.sendPacket(pkt)
}
_, err := c.sendAndWait(c.buildGetMessageRequestPacket(msg.SyncFlag_START, time.Now().Unix()))
return nil, err

View File

@ -192,7 +192,7 @@ func (fs *GroupFileSystem) UploadFile(p, name, folderId string) error {
rsp := i.(*oidb.UploadFileRspBody)
if rsp.BoolFileExist {
_, pkt := fs.client.buildGroupFileFeedsRequest(fs.GroupCode, rsp.FileId, rsp.BusId, rand.Int31())
return fs.client.send(pkt)
return fs.client.sendPacket(pkt)
}
if len(rsp.UploadIpLanV4) == 0 {
return errors.New("server requires unsupported ftn upload")
@ -238,7 +238,7 @@ func (fs *GroupFileSystem) UploadFile(p, name, folderId string) error {
return errors.Wrap(err, "upload failed")
}
_, pkt := fs.client.buildGroupFileFeedsRequest(fs.GroupCode, rsp.FileId, rsp.BusId, rand.Int31())
return fs.client.send(pkt)
return fs.client.sendPacket(pkt)
}
func (fs *GroupFileSystem) GetDownloadUrl(file *GroupFile) string {

View File

@ -123,11 +123,11 @@ func (c *QQClient) sendGroupMessage(groupCode int64, forward bool, m *message.Se
fragmented := m.ToFragmented()
for i, elems := range fragmented {
_, pkt := c.buildGroupSendingPacket(groupCode, mr, int32(len(fragmented)), int32(i), div, forward, elems)
_ = c.send(pkt)
_ = c.sendPacket(pkt)
}
} else {
_, pkt := c.buildGroupSendingPacket(groupCode, mr, 1, 0, 0, forward, m.Elements)
_ = c.send(pkt)
_ = c.sendPacket(pkt)
}
var mid int32
ret := &message.GroupMessage{
@ -353,9 +353,9 @@ func decodeMsgSendResponse(c *QQClient, _ *incomingPacketInfo, payload []byte) (
switch rsp.GetResult() {
case 0: // OK.
case 55:
c.Error("send msg error: %v Bot has blocked target's content", rsp.GetResult())
c.Error("sendPacket msg error: %v Bot has blocked target's content", rsp.GetResult())
default:
c.Error("send msg error: %v %v", rsp.GetResult(), rsp.GetErrMsg())
c.Error("sendPacket msg error: %v %v", rsp.GetResult(), rsp.GetErrMsg())
}
return nil, nil
}

223
client/network.go Normal file
View File

@ -0,0 +1,223 @@
package client
import (
"github.com/Mrs4s/MiraiGo/protocol/packets"
"github.com/Mrs4s/MiraiGo/utils"
"github.com/pkg/errors"
"runtime/debug"
"sync/atomic"
"time"
)
// connect 连接到 QQClient.servers 中的服务器
func (c *QQClient) connect() error {
c.Info("connect to server: %v", c.servers[c.currServerIndex].String())
err := c.TCP.Connect(c.servers[c.currServerIndex])
c.currServerIndex++
if c.currServerIndex == len(c.servers) {
c.currServerIndex = 0
}
if err != nil {
c.retryTimes++
if c.retryTimes > len(c.servers) {
return errors.New("All servers are unreachable")
}
c.Error("connect server error: %v", err)
return err
}
c.retryTimes = 0
c.ConnectTime = time.Now()
return nil
}
// quickReconnect 快速重连
func (c *QQClient) quickReconnect() {
c.Disconnect()
time.Sleep(time.Millisecond * 200)
if err := c.connect(); err != nil {
c.Error("connect server error: %v", err)
c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "quick reconnect failed"})
return
}
if err := c.registerClient(); err != nil {
c.Error("register client failed: %v", err)
c.Disconnect()
c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "register error"})
return
}
}
// Disconnect 中断连接, 不释放资源
func (c *QQClient) Disconnect() {
c.Online = false
c.TCP.Close()
}
// sendAndWait 向服务器发送一个数据包, 并等待返回
func (c *QQClient) sendAndWait(seq uint16, pkt []byte, params ...requestParams) (interface{}, error) {
type T struct {
Response interface{}
Error error
}
ch := make(chan T)
defer close(ch)
p := func() requestParams {
if len(params) == 0 {
return nil
}
return params[0]
}()
c.handlers.Store(seq, &handlerInfo{fun: func(i interface{}, err error) {
ch <- T{
Response: i,
Error: err,
}
}, params: p})
err := c.sendPacket(pkt)
if err != nil {
c.handlers.Delete(seq)
return nil, err
}
retry := 0
for {
select {
case rsp := <-ch:
return rsp.Response, rsp.Error
case <-time.After(time.Second * 15):
retry++
if retry < 2 {
_ = c.sendPacket(pkt)
continue
}
c.handlers.Delete(seq)
// c.Error("packet timed out, seq: %v", seq)
// println("Packet Timed out")
return nil, errors.New("Packet timed out")
}
}
}
// sendPacket 向服务器发送一个数据包
func (c *QQClient) sendPacket(pkt []byte) error {
err := c.TCP.Write(pkt)
if err != nil {
atomic.AddUint64(&c.stat.PacketLost, 1)
} else {
atomic.AddUint64(&c.stat.PacketSent, 1)
}
return errors.Wrap(err, "Packet failed to sendPacket")
}
// waitPacket
// 等待一个或多个数据包解析, 优先级低于 sendAndWait
// 返回终止解析函数
func (c *QQClient) waitPacket(cmd string, f func(interface{}, error)) func() {
c.waiters.Store(cmd, f)
return func() {
c.waiters.Delete(cmd)
}
}
// plannedDisconnect 计划中断线事件
func (c *QQClient) plannedDisconnect(_ *utils.TCPListener) {
c.Debug("planned disconnect.")
atomic.AddUint32(&c.stat.DisconnectTimes, 1)
c.Online = false
}
// unexpectedDisconnect 非预期断线事件
func (c *QQClient) unexpectedDisconnect(_ *utils.TCPListener, e error) {
c.Error("unexpected disconnect: %v", e)
atomic.AddUint32(&c.stat.DisconnectTimes, 1)
c.Online = false
if err := c.connect(); err != nil {
c.Error("connect server error: %v", err)
c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "connection dropped by server."})
return
}
if err := c.registerClient(); err != nil {
c.Error("register client failed: %v", err)
c.Disconnect()
c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "register error"})
return
}
}
// netLoop 通过循环来不停接收数据包
func (c *QQClient) netLoop() {
errCount := 0
for c.alive {
l, err := c.TCP.ReadInt32()
if err != nil {
time.Sleep(time.Millisecond * 500)
continue
}
data, _ := c.TCP.ReadBytes(int(l) - 4)
pkt, err := packets.ParseIncomingPacket(data, c.sigInfo.d2Key)
if err != nil {
c.Error("parse incoming packet error: %v", err)
if errors.Is(err, packets.ErrSessionExpired) || errors.Is(err, packets.ErrPacketDropped) {
c.Disconnect()
go c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "session expired"})
continue
}
errCount++
if errCount > 2 {
go c.quickReconnect()
continue
}
continue
}
payload := pkt.Payload
if pkt.Flag2 == 2 {
payload, err = pkt.DecryptPayload(c.RandomKey, c.sigInfo.wtSessionTicketKey)
if err != nil {
c.Error("decrypt payload error: %v", err)
continue
}
}
errCount = 0
c.Debug("rev pkt: %v seq: %v", pkt.CommandName, pkt.SequenceId)
atomic.AddUint64(&c.stat.PacketReceived, 1)
go func() {
defer func() {
if pan := recover(); pan != nil {
c.Error("panic on decoder %v : %v\n%s", pkt.CommandName, pan, debug.Stack())
}
}()
if decoder, ok := decoders[pkt.CommandName]; ok {
// found predefined decoder
info, ok := c.handlers.LoadAndDelete(pkt.SequenceId)
rsp, err := decoder(c, &incomingPacketInfo{
SequenceId: pkt.SequenceId,
CommandName: pkt.CommandName,
Params: func() requestParams {
if !ok {
return nil
}
return info.params
}(),
}, payload)
if err != nil {
c.Debug("decode pkt %v error: %+v", pkt.CommandName, err)
}
if ok {
info.fun(rsp, err)
} else if f, ok := c.waiters.Load(pkt.CommandName); ok { // 在不存在handler的情况下触发wait
f.(func(interface{}, error))(rsp, err)
}
} else if f, ok := c.handlers.LoadAndDelete(pkt.SequenceId); ok {
// does not need decoder
f.fun(nil, nil)
} else {
c.Debug("Unhandled Command: %s\nSeq: %d\nThis message can be ignored.", pkt.CommandName, pkt.SequenceId)
}
}()
}
}

View File

@ -32,7 +32,7 @@ func decodeOnlinePushReqPacket(c *QQClient, info *incomingPacketInfo, payload []
msgInfos := []jce.PushMessageInfo{}
uin := jr.ReadInt64(0)
jr.ReadSlice(&msgInfos, 2)
_ = c.send(c.buildDeleteOnlinePushPacket(uin, 0, nil, info.SequenceId, msgInfos))
_ = c.sendPacket(c.buildDeleteOnlinePushPacket(uin, 0, nil, info.SequenceId, msgInfos))
for _, m := range msgInfos {
k := fmt.Sprintf("%v%v%v", m.MsgSeq, m.MsgTime, m.MsgUid)
if _, ok := c.onlinePushCache.Get(k); ok {

View File

@ -40,12 +40,12 @@ func (c *QQClient) SendPrivateMessage(target int64, m *message.SendingMessage) *
seq = fseq
}
_, pkt := c.buildFriendSendingPacket(target, fseq, mr, int32(len(fragmented)), int32(i), div, t, elems)
_ = c.send(pkt)
_ = c.sendPacket(pkt)
}
} else {
seq = c.nextFriendSeq()
_, pkt := c.buildFriendSendingPacket(target, seq, mr, 1, 0, 0, t, m.Elements)
_ = c.send(pkt)
_ = c.sendPacket(pkt)
}
atomic.AddUint64(&c.stat.MessageSent, 1)
ret := &message.PrivateMessage{
@ -85,7 +85,7 @@ func (c *QQClient) SendGroupTempMessage(groupCode, target int64, m *message.Send
seq := c.nextFriendSeq()
t := time.Now().Unix()
_, pkt := c.buildGroupTempSendingPacket(group.Uin, target, seq, mr, t, m)
_ = c.send(pkt)
_ = c.sendPacket(pkt)
atomic.AddUint64(&c.stat.MessageSent, 1)
return &message.TempMessage{
Id: seq,
@ -106,7 +106,7 @@ func (c *QQClient) sendWPATempMessage(target int64, sig []byte, m *message.Sendi
seq := c.nextFriendSeq()
t := time.Now().Unix()
_, pkt := c.buildWPATempSendingPacket(target, sig, seq, mr, t, m)
_ = c.send(pkt)
_ = c.sendPacket(pkt)
atomic.AddUint64(&c.stat.MessageSent, 1)
return &message.TempMessage{
Id: seq,

View File

@ -98,7 +98,7 @@ func (c *QQClient) SyncSessions() (*SessionSyncResponse, error) {
}
})
_, pkt := c.buildSyncMsgRequestPacket()
if err := c.send(pkt); err != nil {
if err := c.sendPacket(pkt); err != nil {
stop()
return nil, err
}
@ -414,7 +414,7 @@ func decodeC2CSyncPacket(c *QQClient, info *incomingPacketInfo, payload []byte)
if err := proto.Unmarshal(payload, &m); err != nil {
return nil, err
}
_ = c.send(c.buildDeleteOnlinePushPacket(c.Uin, m.GetSvrip(), m.GetPushToken(), info.SequenceId, nil))
_ = c.sendPacket(c.buildDeleteOnlinePushPacket(c.Uin, m.GetSvrip(), m.GetPushToken(), info.SequenceId, nil))
c.commMsgProcessor(m.Msg, info)
return nil, nil
}