mirror of
https://github.com/Mrs4s/MiraiGo.git
synced 2025-05-04 19:17:38 +08:00
357 lines
10 KiB
Go
357 lines
10 KiB
Go
package client
|
|
|
|
import (
|
|
"net"
|
|
"runtime/debug"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/Mrs4s/MiraiGo/message"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/Mrs4s/MiraiGo/client/internal/network"
|
|
"github.com/Mrs4s/MiraiGo/client/internal/oicq"
|
|
"github.com/Mrs4s/MiraiGo/utils"
|
|
)
|
|
|
|
// ConnectionQualityInfo 客户端连接质量测试结果
|
|
// 延迟单位为 ms 如为 9999 则测试失败 测试方法为 TCP 连接测试
|
|
// 丢包测试方法为 ICMP. 总共发送 10 个包, 记录丢包数
|
|
type ConnectionQualityInfo struct {
|
|
// ChatServerLatency 聊天服务器延迟
|
|
ChatServerLatency int64
|
|
// ChatServerPacketLoss 聊天服务器ICMP丢包数
|
|
ChatServerPacketLoss int
|
|
// LongMessageServerLatency 长消息服务器延迟. 涉及长消息以及合并转发消息下载
|
|
LongMessageServerLatency int64
|
|
// LongMessageServerResponseLatency 长消息服务器返回延迟
|
|
LongMessageServerResponseLatency int64
|
|
// SrvServerLatency Highway服务器延迟. 涉及媒体以及群文件上传
|
|
SrvServerLatency int64
|
|
// SrvServerPacketLoss Highway服务器ICMP丢包数.
|
|
SrvServerPacketLoss int
|
|
}
|
|
|
|
func (c *QQClient) ConnectionQualityTest() *ConnectionQualityInfo {
|
|
if !c.Online.Load() {
|
|
return nil
|
|
}
|
|
r := &ConnectionQualityInfo{}
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(2)
|
|
go func() {
|
|
defer wg.Done()
|
|
var err error
|
|
|
|
if r.ChatServerLatency, err = qualityTest(c.servers[c.currServerIndex].String()); err != nil {
|
|
c.Error("test chat server latency error: %v", err)
|
|
r.ChatServerLatency = 9999
|
|
}
|
|
|
|
if addr, err := net.ResolveIPAddr("ip", "ssl.htdata.qq.com"); err == nil {
|
|
if r.LongMessageServerLatency, err = qualityTest((&net.TCPAddr{IP: addr.IP, Port: 443}).String()); err != nil {
|
|
c.Error("test long message server latency error: %v", err)
|
|
r.LongMessageServerLatency = 9999
|
|
}
|
|
} else {
|
|
c.Error("resolve long message server error: %v", err)
|
|
r.LongMessageServerLatency = 9999
|
|
}
|
|
if c.highwaySession.AddrLength() > 0 {
|
|
if r.SrvServerLatency, err = qualityTest(c.highwaySession.SsoAddr[0].String()); err != nil {
|
|
c.Error("test srv server latency error: %v", err)
|
|
r.SrvServerLatency = 9999
|
|
}
|
|
}
|
|
}()
|
|
go func() {
|
|
defer wg.Done()
|
|
res := utils.RunTCPPingLoop(c.servers[c.currServerIndex].String(), 10)
|
|
r.ChatServerPacketLoss = res.PacketsLoss
|
|
if c.highwaySession.AddrLength() > 0 {
|
|
res = utils.RunTCPPingLoop(c.highwaySession.SsoAddr[0].String(), 10)
|
|
r.SrvServerPacketLoss = res.PacketsLoss
|
|
}
|
|
}()
|
|
start := time.Now()
|
|
if _, err := utils.HttpGetBytes("https://ssl.htdata.qq.com", ""); err == nil {
|
|
r.LongMessageServerResponseLatency = time.Since(start).Milliseconds()
|
|
} else {
|
|
c.Error("test long message server response latency error: %v", err)
|
|
r.LongMessageServerResponseLatency = 9999
|
|
}
|
|
wg.Wait()
|
|
return r
|
|
}
|
|
|
|
func (c *QQClient) connectFastest() error {
|
|
c.Disconnect()
|
|
addr, err := c.transport.ConnectFastest(c.servers)
|
|
if err != nil {
|
|
c.Disconnect()
|
|
return err
|
|
}
|
|
c.Debug("connected to server: %v [fastest]", addr.String())
|
|
c.transport.NetLoop(c.pktProc, c.transport.ReadRequest)
|
|
c.retryTimes = 0
|
|
c.ConnectTime = time.Now()
|
|
return nil
|
|
}
|
|
|
|
// connect 连接到 QQClient.servers 中的服务器
|
|
func (c *QQClient) connect() error {
|
|
c.once.Do(func() {
|
|
c.OnGroupMessage(func(_ *QQClient, _ *message.GroupMessage) {
|
|
c.stat.MessageReceived.Add(1)
|
|
c.stat.LastMessageTime.Store(time.Now().Unix())
|
|
})
|
|
c.OnPrivateMessage(func(_ *QQClient, _ *message.PrivateMessage) {
|
|
c.stat.MessageReceived.Add(1)
|
|
c.stat.LastMessageTime.Store(time.Now().Unix())
|
|
})
|
|
c.OnTempMessage(func(_ *QQClient, _ *TempMessageEvent) {
|
|
c.stat.MessageReceived.Add(1)
|
|
c.stat.LastMessageTime.Store(time.Now().Unix())
|
|
})
|
|
c.onGroupMessageReceipt("internal", func(_ *QQClient, _ *groupMessageReceiptEvent) {
|
|
c.stat.MessageSent.Add(1)
|
|
})
|
|
// go c.netLoop()
|
|
})
|
|
return c.connectFastest() // 暂时?
|
|
/*c.Info("connect to server: %v", c.servers[c.currServerIndex].String())
|
|
err := c.TCP.Connect(c.servers[c.currServerIndex])
|
|
c.currServerIndex++
|
|
if c.currServerIndex == len(c.servers) {
|
|
c.currServerIndex = 0
|
|
}
|
|
if err != nil {
|
|
c.retryTimes++
|
|
if c.retryTimes > len(c.servers) {
|
|
return errors.New("All servers are unreachable")
|
|
}
|
|
c.Error("connect server error: %v", err)
|
|
return err
|
|
}
|
|
c.retryTimes = 0
|
|
c.ConnectTime = time.Now()
|
|
return nil*/
|
|
}
|
|
|
|
func (c *QQClient) QuickReconnect() {
|
|
c.quickReconnect() // TODO "用户请求快速重连"
|
|
}
|
|
|
|
// quickReconnect 快速重连
|
|
func (c *QQClient) quickReconnect() {
|
|
c.Disconnect()
|
|
time.Sleep(time.Millisecond * 200)
|
|
if err := c.connect(); err != nil {
|
|
c.Error("connect server error: %v", err)
|
|
c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "快速重连失败"})
|
|
return
|
|
}
|
|
if err := c.registerClient(); err != nil {
|
|
c.Error("register client failed: %v", err)
|
|
c.Disconnect()
|
|
c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "register error"})
|
|
return
|
|
}
|
|
}
|
|
|
|
// Disconnect 中断连接, 不释放资源
|
|
func (c *QQClient) Disconnect() {
|
|
c.Online.Store(false)
|
|
c.transport.Close()
|
|
}
|
|
|
|
func (c *QQClient) send(call *network.Call) {
|
|
if call.Done == nil {
|
|
call.Done = make(chan *network.Call, 3) // use buffered channel
|
|
}
|
|
seq := call.Request.SequenceID
|
|
c.pendingMu.Lock()
|
|
c.pending[seq] = call
|
|
c.pendingMu.Unlock()
|
|
|
|
err := c.sendPacket(c.transport.PackPacket(call.Request))
|
|
c.Debug("send pkt: %v seq: %d", call.Request.CommandName, call.Request.SequenceID)
|
|
if err != nil {
|
|
c.pendingMu.Lock()
|
|
call = c.pending[seq]
|
|
delete(c.pending, seq)
|
|
c.pendingMu.Unlock()
|
|
call.Err = err
|
|
call.Done <- call
|
|
}
|
|
}
|
|
|
|
func (c *QQClient) sendReq(req *network.Request) {
|
|
c.send(&network.Call{Request: req})
|
|
}
|
|
|
|
func (c *QQClient) call(req *network.Request) (*network.Response, error) {
|
|
call := &network.Call{
|
|
Request: req,
|
|
Done: make(chan *network.Call, 3),
|
|
}
|
|
c.send(call)
|
|
select {
|
|
case <-call.Done:
|
|
return call.Response, call.Err
|
|
case <-time.After(time.Second * 15):
|
|
return nil, errors.New("Packet timed out")
|
|
}
|
|
}
|
|
|
|
func (c *QQClient) callAndDecode(req *network.Request) (interface{}, error) {
|
|
resp, err := c.call(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return req.Decode(resp)
|
|
}
|
|
|
|
// sendPacket 向服务器发送一个数据包
|
|
func (c *QQClient) sendPacket(pkt []byte) error {
|
|
err := c.transport.Write(pkt)
|
|
if err != nil {
|
|
c.stat.PacketLost.Add(1)
|
|
} else {
|
|
c.stat.PacketSent.Add(1)
|
|
}
|
|
return errors.Wrap(err, "Packet failed to sendPacket")
|
|
}
|
|
|
|
// waitPacket
|
|
// 等待一个或多个数据包解析, 优先级低于 sendAndWait
|
|
// 返回终止解析函数
|
|
func (c *QQClient) waitPacket(cmd string, f func(interface{}, error)) func() {
|
|
c.waiters.Store(cmd, f)
|
|
return func() {
|
|
c.waiters.Delete(cmd)
|
|
}
|
|
}
|
|
|
|
// waitPacketTimeoutSyncF
|
|
// 等待一个数据包解析, 优先级低于 sendAndWait
|
|
func (c *QQClient) waitPacketTimeoutSyncF(cmd string, timeout time.Duration, filter func(interface{}) bool) (r interface{}, e error) {
|
|
notifyChan := make(chan bool)
|
|
defer c.waitPacket(cmd, func(i interface{}, err error) {
|
|
if filter(i) {
|
|
r = i
|
|
e = err
|
|
notifyChan <- true
|
|
}
|
|
})()
|
|
select {
|
|
case <-notifyChan:
|
|
return
|
|
case <-time.After(timeout):
|
|
return nil, errors.New("timeout")
|
|
}
|
|
}
|
|
|
|
// plannedDisconnect 计划中断线事件
|
|
func (c *QQClient) plannedDisconnect(_ *network.TCPListener) {
|
|
c.Debug("planned disconnect.")
|
|
c.stat.DisconnectTimes.Add(1)
|
|
c.Online.Store(false)
|
|
}
|
|
|
|
// unexpectedDisconnect 非预期断线事件
|
|
func (c *QQClient) unexpectedDisconnect(_ *network.TCPListener, e error) {
|
|
c.Error("unexpected disconnect: %v", e)
|
|
c.stat.DisconnectTimes.Add(1)
|
|
c.Online.Store(false)
|
|
if err := c.connect(); err != nil {
|
|
c.Error("connect server error: %v", err)
|
|
c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "connection dropped by server."})
|
|
return
|
|
}
|
|
if err := c.registerClient(); err != nil {
|
|
c.Error("register client failed: %v", err)
|
|
c.Disconnect()
|
|
c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "register error"})
|
|
return
|
|
}
|
|
}
|
|
|
|
func (c *QQClient) pktProc(req *network.Request, netErr error) {
|
|
if netErr != nil {
|
|
switch true {
|
|
case errors.Is(netErr, network.ErrConnectionBroken):
|
|
go c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: netErr.Error()})
|
|
c.QuickReconnect()
|
|
case errors.Is(netErr, network.ErrSessionExpired) || errors.Is(netErr, network.ErrPacketDropped):
|
|
c.Disconnect()
|
|
go c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "session expired"})
|
|
}
|
|
c.Error("parse incoming packet error: %v", netErr)
|
|
return
|
|
}
|
|
|
|
if req.EncryptType == network.EncryptTypeEmptyKey {
|
|
m, err := c.oicq.Unmarshal(req.Body)
|
|
if err != nil {
|
|
c.Error("decrypt payload error: %v", err)
|
|
if errors.Is(err, oicq.ErrUnknownFlag) {
|
|
go c.quickReconnect() // TODO "服务器发送未知响应"
|
|
}
|
|
}
|
|
req.Body = m.Body
|
|
}
|
|
|
|
defer func() {
|
|
if pan := recover(); pan != nil {
|
|
c.Error("panic on decoder %v : %v\n%s", req.CommandName, pan, debug.Stack())
|
|
c.Dump("packet decode error: %v - %v", req.Body, req.CommandName, pan)
|
|
}
|
|
}()
|
|
|
|
c.Debug("recv pkt: %v seq: %v", req.CommandName, req.SequenceID)
|
|
c.stat.PacketReceived.Add(1)
|
|
|
|
// snapshot of read call
|
|
c.pendingMu.Lock()
|
|
call := c.pending[req.SequenceID]
|
|
if call != nil {
|
|
call.Response = &network.Response{
|
|
SequenceID: req.SequenceID,
|
|
CommandName: req.CommandName,
|
|
Body: req.Body,
|
|
Request: call.Request,
|
|
}
|
|
delete(c.pending, req.SequenceID)
|
|
}
|
|
c.pendingMu.Unlock()
|
|
if call != nil && call.Request.CommandName == req.CommandName {
|
|
select {
|
|
case call.Done <- call:
|
|
default:
|
|
// we don't want blocking
|
|
}
|
|
return
|
|
}
|
|
|
|
if decoder, ok := decoders[req.CommandName]; ok {
|
|
// found predefined decoder
|
|
resp := network.Response{
|
|
SequenceID: req.SequenceID,
|
|
CommandName: req.CommandName,
|
|
Body: req.Body,
|
|
// Request: nil,
|
|
}
|
|
decoded, err := decoder(c, &resp)
|
|
if err != nil {
|
|
c.Debug("decode req %v error: %+v", req.CommandName, err)
|
|
}
|
|
if f, ok := c.waiters.Load(req.CommandName); ok { // 在不存在handler的情况下触发wait
|
|
f.(func(interface{}, error))(decoded, err)
|
|
}
|
|
} else {
|
|
c.Debug("Unhandled Command: %s\nSeq: %d\nThis message can be ignored.", req.CommandName, req.SequenceID)
|
|
}
|
|
}
|