1
0
mirror of https://github.com/Mrs4s/MiraiGo.git synced 2025-05-04 19:17:38 +08:00
MiraiGo/client/network.go
2021-09-16 15:32:39 +08:00

261 lines
7.0 KiB
Go

package client
import (
"runtime/debug"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/Mrs4s/MiraiGo/message"
"github.com/Mrs4s/MiraiGo/protocol/packets"
"github.com/Mrs4s/MiraiGo/utils"
)
// connect 连接到 QQClient.servers 中的服务器
func (c *QQClient) connect() error {
c.Info("connect to server: %v", c.servers[c.currServerIndex].String())
err := c.TCP.Connect(c.servers[c.currServerIndex])
c.currServerIndex++
if c.currServerIndex == len(c.servers) {
c.currServerIndex = 0
}
if err != nil {
c.retryTimes++
if c.retryTimes > len(c.servers) {
return errors.New("All servers are unreachable")
}
c.Error("connect server error: %v", err)
return err
}
c.once.Do(func() {
c.OnGroupMessage(func(_ *QQClient, _ *message.GroupMessage) {
atomic.AddUint64(&c.stat.MessageReceived, 1)
atomic.StoreInt64(&c.stat.LastMessageTime, time.Now().Unix())
})
c.OnPrivateMessage(func(_ *QQClient, _ *message.PrivateMessage) {
atomic.AddUint64(&c.stat.MessageReceived, 1)
atomic.StoreInt64(&c.stat.LastMessageTime, time.Now().Unix())
})
c.OnTempMessage(func(_ *QQClient, _ *TempMessageEvent) {
atomic.AddUint64(&c.stat.MessageReceived, 1)
atomic.StoreInt64(&c.stat.LastMessageTime, time.Now().Unix())
})
c.onGroupMessageReceipt("internal", func(_ *QQClient, _ *groupMessageReceiptEvent) {
atomic.AddUint64(&c.stat.MessageSent, 1)
})
go c.netLoop()
})
c.retryTimes = 0
c.ConnectTime = time.Now()
return nil
}
// quickReconnect 快速重连
func (c *QQClient) quickReconnect() {
c.Disconnect()
time.Sleep(time.Millisecond * 200)
if err := c.connect(); err != nil {
c.Error("connect server error: %v", err)
c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "quick reconnect failed"})
return
}
if err := c.registerClient(); err != nil {
c.Error("register client failed: %v", err)
c.Disconnect()
c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "register error"})
return
}
}
// Disconnect 中断连接, 不释放资源
func (c *QQClient) Disconnect() {
c.Online = false
c.TCP.Close()
}
// sendAndWait 向服务器发送一个数据包, 并等待返回
func (c *QQClient) sendAndWait(seq uint16, pkt []byte, params ...requestParams) (interface{}, error) {
type T struct {
Response interface{}
Error error
}
ch := make(chan T, 1)
var p requestParams
if len(params) != 0 {
p = params[0]
}
c.handlers.Store(seq, &handlerInfo{fun: func(i interface{}, err error) {
ch <- T{
Response: i,
Error: err,
}
}, params: p, dynamic: false})
err := c.sendPacket(pkt)
if err != nil {
c.handlers.Delete(seq)
return nil, err
}
retry := 0
for {
select {
case rsp := <-ch:
return rsp.Response, rsp.Error
case <-time.After(time.Second * 15):
retry++
if retry < 2 {
_ = c.sendPacket(pkt)
continue
}
c.handlers.Delete(seq)
return nil, errors.New("Packet timed out")
}
}
}
// sendPacket 向服务器发送一个数据包
func (c *QQClient) sendPacket(pkt []byte) error {
err := c.TCP.Write(pkt)
if err != nil {
atomic.AddUint64(&c.stat.PacketLost, 1)
} else {
atomic.AddUint64(&c.stat.PacketSent, 1)
}
return errors.Wrap(err, "Packet failed to sendPacket")
}
// waitPacket
// 等待一个或多个数据包解析, 优先级低于 sendAndWait
// 返回终止解析函数
func (c *QQClient) waitPacket(cmd string, f func(interface{}, error)) func() {
c.waiters.Store(cmd, f)
return func() {
c.waiters.Delete(cmd)
}
}
// sendAndWaitDynamic
// 发送数据包并返回需要解析的 response
func (c *QQClient) sendAndWaitDynamic(seq uint16, pkt []byte) ([]byte, error) {
ch := make(chan []byte, 1)
c.handlers.Store(seq, &handlerInfo{fun: func(i interface{}, err error) { ch <- i.([]byte) }, dynamic: true})
err := c.sendPacket(pkt)
if err != nil {
c.handlers.Delete(seq)
return nil, err
}
select {
case rsp := <-ch:
return rsp, nil
case <-time.After(time.Second * 15):
c.handlers.Delete(seq)
return nil, errors.New("Packet timed out")
}
}
// plannedDisconnect 计划中断线事件
func (c *QQClient) plannedDisconnect(_ *utils.TCPListener) {
c.Debug("planned disconnect.")
atomic.AddUint32(&c.stat.DisconnectTimes, 1)
c.Online = false
}
// unexpectedDisconnect 非预期断线事件
func (c *QQClient) unexpectedDisconnect(_ *utils.TCPListener, e error) {
c.Error("unexpected disconnect: %v", e)
atomic.AddUint32(&c.stat.DisconnectTimes, 1)
c.Online = false
if err := c.connect(); err != nil {
c.Error("connect server error: %v", err)
c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "connection dropped by server."})
return
}
if err := c.registerClient(); err != nil {
c.Error("register client failed: %v", err)
c.Disconnect()
c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "register error"})
return
}
}
// netLoop 通过循环来不停接收数据包
func (c *QQClient) netLoop() {
errCount := 0
for c.alive {
l, err := c.TCP.ReadInt32()
if err != nil {
time.Sleep(time.Millisecond * 500)
continue
}
data, _ := c.TCP.ReadBytes(int(l) - 4)
pkt, err := packets.ParseIncomingPacket(data, c.sigInfo.d2Key)
if err != nil {
c.Error("parse incoming packet error: %v", err)
if errors.Is(err, packets.ErrSessionExpired) || errors.Is(err, packets.ErrPacketDropped) {
c.Disconnect()
go c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "session expired"})
continue
}
errCount++
if errCount > 2 {
go c.quickReconnect()
continue
}
continue
}
if pkt.Flag2 == 2 {
pkt.Payload, err = pkt.DecryptPayload(c.RandomKey, c.sigInfo.wtSessionTicketKey)
if err != nil {
c.Error("decrypt payload error: %v", err)
continue
}
}
errCount = 0
c.Debug("rev pkt: %v seq: %v", pkt.CommandName, pkt.SequenceId)
atomic.AddUint64(&c.stat.PacketReceived, 1)
go func(pkt *packets.IncomingPacket) {
defer func() {
if pan := recover(); pan != nil {
c.Error("panic on decoder %v : %v\n%s", pkt.CommandName, pan, debug.Stack())
}
}()
if decoder, ok := decoders[pkt.CommandName]; ok {
// found predefined decoder
info, ok := c.handlers.LoadAndDelete(pkt.SequenceId)
var decoded interface{}
decoded = pkt.Payload
if info != nil && !info.dynamic {
decoded, err = decoder(c, &incomingPacketInfo{
SequenceId: pkt.SequenceId,
CommandName: pkt.CommandName,
Params: func() requestParams {
if !ok {
return nil
}
return info.params
}(),
}, pkt.Payload)
if err != nil {
c.Debug("decode pkt %v error: %+v", pkt.CommandName, err)
}
}
if ok {
info.fun(decoded, err)
} else if f, ok := c.waiters.Load(pkt.CommandName); ok { // 在不存在handler的情况下触发wait
f.(func(interface{}, error))(decoded, err)
}
} else if f, ok := c.handlers.LoadAndDelete(pkt.SequenceId); ok {
// does not need decoder
f.fun(pkt.Payload, nil)
} else {
c.Debug("Unhandled Command: %s\nSeq: %d\nThis message can be ignored.", pkt.CommandName, pkt.SequenceId)
}
}(pkt)
}
}