1
0
mirror of https://github.com/Mrs4s/MiraiGo.git synced 2025-05-04 19:17:38 +08:00

fix stat data race.

This commit is contained in:
wdvxdr 2021-07-11 23:04:19 +08:00
parent 297b902100
commit 2d79fccd10
No known key found for this signature in database
GPG Key ID: 55FF1414A69CEBA6
4 changed files with 62 additions and 32 deletions

View File

@ -98,7 +98,7 @@ type QQClient struct {
groupDataTransSeq int32 groupDataTransSeq int32
highwayApplyUpSeq int32 highwayApplyUpSeq int32
eventHandlers *eventHandlers eventHandlers *eventHandlers
stat *Statistics stat Statistics
groupListLock sync.Mutex groupListLock sync.Mutex
} }
@ -201,7 +201,6 @@ func NewClientMd5(uin int64, passwordMd5 [16]byte) *QQClient {
onlinePushCache: utils.NewCache(time.Second * 15), onlinePushCache: utils.NewCache(time.Second * 15),
version: genVersionInfo(SystemDeviceInfo.Protocol), version: genVersionInfo(SystemDeviceInfo.Protocol),
servers: []*net.TCPAddr{}, servers: []*net.TCPAddr{},
stat: &Statistics{},
} }
sso, err := getSSOAddress() sso, err := getSSOAddress()
if err == nil && len(sso) > 0 { if err == nil && len(sso) > 0 {
@ -429,19 +428,19 @@ func (c *QQClient) init(tokenLogin bool) error {
_, _ = c.sendAndWait(seq, pkt, requestParams{"used_reg_proxy": true, "init": true}) _, _ = c.sendAndWait(seq, pkt, requestParams{"used_reg_proxy": true, "init": true})
c.stat.once.Do(func() { c.stat.once.Do(func() {
c.OnGroupMessage(func(_ *QQClient, _ *message.GroupMessage) { c.OnGroupMessage(func(_ *QQClient, _ *message.GroupMessage) {
c.stat.MessageReceived++ atomic.AddUint64(&c.stat.MessageReceived, 1)
c.stat.LastMessageTime = time.Now().Unix() atomic.StoreInt64(&c.stat.LastMessageTime, time.Now().Unix())
}) })
c.OnPrivateMessage(func(_ *QQClient, _ *message.PrivateMessage) { c.OnPrivateMessage(func(_ *QQClient, _ *message.PrivateMessage) {
c.stat.MessageReceived++ atomic.AddUint64(&c.stat.MessageReceived, 1)
c.stat.LastMessageTime = time.Now().Unix() atomic.StoreInt64(&c.stat.LastMessageTime, time.Now().Unix())
}) })
c.OnTempMessage(func(_ *QQClient, _ *TempMessageEvent) { c.OnTempMessage(func(_ *QQClient, _ *TempMessageEvent) {
c.stat.MessageReceived++ atomic.AddUint64(&c.stat.MessageReceived, 1)
c.stat.LastMessageTime = time.Now().Unix() atomic.StoreInt64(&c.stat.LastMessageTime, time.Now().Unix())
}) })
c.onGroupMessageReceipt("internal", func(_ *QQClient, _ *groupMessageReceiptEvent) { c.onGroupMessageReceipt("internal", func(_ *QQClient, _ *groupMessageReceiptEvent) {
c.stat.MessageSent++ atomic.AddUint64(&c.stat.MessageSent, 1)
}) })
}) })
return nil return nil
@ -917,9 +916,9 @@ func (c *QQClient) nextHighwayApplySeq() int32 {
func (c *QQClient) send(pkt []byte) error { func (c *QQClient) send(pkt []byte) error {
err := c.TCP.Write(pkt) err := c.TCP.Write(pkt)
if err != nil { if err != nil {
c.stat.PacketLost++ atomic.AddUint64(&c.stat.PacketLost, 1)
} else { } else {
c.stat.PacketSent++ atomic.AddUint64(&c.stat.PacketSent, 1)
} }
return errors.Wrap(err, "Packet failed to send") return errors.Wrap(err, "Packet failed to send")
} }
@ -1023,13 +1022,13 @@ func (c *QQClient) Disconnect() {
func (c *QQClient) plannedDisconnect(_ *utils.TCPListener) { func (c *QQClient) plannedDisconnect(_ *utils.TCPListener) {
c.Debug("planned disconnect.") c.Debug("planned disconnect.")
c.stat.DisconnectTimes++ atomic.AddUint32(&c.stat.DisconnectTimes, 1)
c.Online = false c.Online = false
} }
func (c *QQClient) unexpectedDisconnect(_ *utils.TCPListener, e error) { func (c *QQClient) unexpectedDisconnect(_ *utils.TCPListener, e error) {
c.Error("unexpected disconnect: %v", e) c.Error("unexpected disconnect: %v", e)
c.stat.DisconnectTimes++ atomic.AddUint32(&c.stat.DisconnectTimes, 1)
c.Online = false c.Online = false
if err := c.connect(); err != nil { if err := c.connect(); err != nil {
c.Error("connect server error: %v", err) c.Error("connect server error: %v", err)
@ -1079,7 +1078,7 @@ func (c *QQClient) netLoop() {
} }
errCount = 0 errCount = 0
c.Debug("rev pkt: %v seq: %v", pkt.CommandName, pkt.SequenceId) c.Debug("rev pkt: %v seq: %v", pkt.CommandName, pkt.SequenceId)
c.stat.PacketReceived++ atomic.AddUint64(&c.stat.PacketReceived, 1)
go func() { go func() {
defer func() { defer func() {
if pan := recover(); pan != nil { if pan := recover(); pan != nil {

View File

@ -2,6 +2,7 @@ package client
import ( import (
"math/rand" "math/rand"
"sync/atomic"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -37,7 +38,7 @@ func (c *QQClient) SendPrivateMessage(target int64, m *message.SendingMessage) *
_, pkt := c.buildFriendSendingPacket(target, seq, mr, 1, 0, 0, t, m.Elements) _, pkt := c.buildFriendSendingPacket(target, seq, mr, 1, 0, 0, t, m.Elements)
_ = c.send(pkt) _ = c.send(pkt)
} }
c.stat.MessageSent++ atomic.AddUint64(&c.stat.MessageSent, 1)
ret := &message.PrivateMessage{ ret := &message.PrivateMessage{
Id: seq, Id: seq,
InternalId: mr, InternalId: mr,
@ -76,7 +77,7 @@ func (c *QQClient) SendGroupTempMessage(groupCode, target int64, m *message.Send
t := time.Now().Unix() t := time.Now().Unix()
_, pkt := c.buildGroupTempSendingPacket(group.Uin, target, seq, mr, t, m) _, pkt := c.buildGroupTempSendingPacket(group.Uin, target, seq, mr, t, m)
_ = c.send(pkt) _ = c.send(pkt)
c.stat.MessageSent++ atomic.AddUint64(&c.stat.MessageSent, 1)
return &message.TempMessage{ return &message.TempMessage{
Id: seq, Id: seq,
GroupCode: group.Code, GroupCode: group.Code,
@ -97,7 +98,7 @@ func (c *QQClient) sendWPATempMessage(target int64, sig []byte, m *message.Sendi
t := time.Now().Unix() t := time.Now().Unix()
_, pkt := c.buildWPATempSendingPacket(target, sig, seq, mr, t, m) _, pkt := c.buildWPATempSendingPacket(target, sig, seq, mr, t, m)
_ = c.send(pkt) _ = c.send(pkt)
c.stat.MessageSent++ atomic.AddUint64(&c.stat.MessageSent, 1)
return &message.TempMessage{ return &message.TempMessage{
Id: seq, Id: seq,
Self: c.Uin, Self: c.Uin,

View File

@ -1,20 +1,47 @@
package client package client
import "sync" import (
"bytes"
"strconv"
"sync"
"sync/atomic"
)
type Statistics struct { type Statistics struct {
PacketReceived uint64 `json:"packet_received"` PacketReceived uint64
PacketSent uint64 `json:"packet_sent"` PacketSent uint64
PacketLost uint32 `json:"packet_lost"` PacketLost uint64
MessageReceived uint64 `json:"message_received"` MessageReceived uint64
MessageSent uint64 `json:"message_sent"` MessageSent uint64
DisconnectTimes uint32 `json:"disconnect_times"` DisconnectTimes uint32
LostTimes uint32 `json:"lost_times"` LostTimes uint32
LastMessageTime int64 `json:"last_message_time"` LastMessageTime int64
once sync.Once once sync.Once
} }
func (c *QQClient) GetStatistics() *Statistics { func (s *Statistics) MarshalJSON() ([]byte, error) {
return c.stat var w bytes.Buffer
w.Grow(256)
w.WriteString(strconv.FormatUint(atomic.LoadUint64(&s.PacketReceived), 10))
w.WriteString(`,"packet_sent":`)
w.WriteString(strconv.FormatUint(atomic.LoadUint64(&s.PacketSent), 10))
w.WriteString(`,"packet_lost":`)
w.WriteString(strconv.FormatUint(atomic.LoadUint64(&s.PacketLost), 10))
w.WriteString(`,"message_received":`)
w.WriteString(strconv.FormatUint(atomic.LoadUint64(&s.MessageReceived), 10))
w.WriteString(`,"message_sent":`)
w.WriteString(strconv.FormatUint(atomic.LoadUint64(&s.MessageSent), 10))
w.WriteString(`,"disconnect_times":`)
w.WriteString(strconv.FormatUint(uint64(atomic.LoadUint32(&s.DisconnectTimes)), 10))
w.WriteString(`,"lost_times":`)
w.WriteString(strconv.FormatUint(uint64(atomic.LoadUint32(&s.LostTimes)), 10))
w.WriteString(`,"last_message_time":`)
w.WriteString(strconv.FormatInt(atomic.LoadInt64(&s.LastMessageTime), 10))
w.WriteByte('}')
return w.Bytes(), nil
}
func (c *QQClient) GetStatistics() *Statistics {
return &c.stat
} }

View File

@ -3,6 +3,7 @@ package client
import ( import (
"math/rand" "math/rand"
"sync" "sync"
"sync/atomic"
"time" "time"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
@ -150,10 +151,11 @@ func (c *QQClient) buildGetOfflineMsgRequestPacket() (uint16, []byte) {
C2CMsg: &jce.SvcReqGetMsgV2{ C2CMsg: &jce.SvcReqGetMsgV2{
Uin: c.Uin, Uin: c.Uin,
DateTime: func() int32 { DateTime: func() int32 {
if c.stat.LastMessageTime == 0 { t := atomic.LoadInt64(&c.stat.LastMessageTime)
if t == 0 {
return 1 return 1
} }
return int32(c.stat.LastMessageTime) return int32(t)
}(), }(),
RecivePic: 1, RecivePic: 1,
Ability: 15, Ability: 15,
@ -218,10 +220,11 @@ func (c *QQClient) buildSyncMsgRequestPacket() (uint16, []byte) {
C2CMsg: &jce.SvcReqGetMsgV2{ C2CMsg: &jce.SvcReqGetMsgV2{
Uin: c.Uin, Uin: c.Uin,
DateTime: func() int32 { DateTime: func() int32 {
if c.stat.LastMessageTime == 0 { t := atomic.LoadInt64(&c.stat.LastMessageTime)
if t == 0 {
return 1 return 1
} }
return int32(c.stat.LastMessageTime) return int32(t)
}(), }(),
RecivePic: 1, RecivePic: 1,
Ability: 15, Ability: 15,