diff --git a/client/c2c_processor.go b/client/c2c_processor.go index 5a19ba5e..e2cdd03f 100644 --- a/client/c2c_processor.go +++ b/client/c2c_processor.go @@ -4,7 +4,6 @@ package client import ( "fmt" - "sync/atomic" "time" "github.com/pkg/errors" @@ -104,9 +103,9 @@ func privateMessageDecoder(c *QQClient, pMsg *msg.Message, _ *incomingPacketInfo case 11, 175: // friend msg if pMsg.Head.GetFromUin() == c.Uin { for { - frdSeq := atomic.LoadInt32(&c.friendSeq) + frdSeq := c.friendSeq.Load() if frdSeq < pMsg.Head.GetMsgSeq() { - if atomic.CompareAndSwapInt32(&c.friendSeq, frdSeq, pMsg.Head.GetMsgSeq()) { + if c.friendSeq.CAS(frdSeq, pMsg.Head.GetMsgSeq()) { break } } else { diff --git a/client/client.go b/client/client.go index c7533f39..6a8c9587 100644 --- a/client/client.go +++ b/client/client.go @@ -7,10 +7,10 @@ import ( "net" "sort" "sync" - "sync/atomic" "time" "github.com/pkg/errors" + "go.uber.org/atomic" "github.com/Mrs4s/MiraiGo/binary" "github.com/Mrs4s/MiraiGo/binary/jce" @@ -34,18 +34,18 @@ type QQClient struct { AllowSlider bool // account info + Online atomic.Bool Nickname string Age uint16 Gender uint16 FriendList []*FriendInfo GroupList []*GroupInfo OnlineClients []*OtherClientInfo - Online bool QiDian *QiDianAccountInfo GuildService *GuildService // protocol public field - SequenceId int32 + SequenceId atomic.Int32 OutGoingPacketSessionId []byte RandomKey []byte TCP *utils.TCPListener @@ -81,7 +81,7 @@ type QQClient struct { ksid []byte // session info - qwebSeq int64 + qwebSeq atomic.Int64 sigInfo *loginSigInfo highwaySession *highway.Session dpwd []byte @@ -99,11 +99,11 @@ type QQClient struct { groupSysMsgCache *GroupSystemMessages groupMsgBuilders sync.Map onlinePushCache *utils.Cache - requestPacketRequestID int32 - groupSeq int32 - friendSeq int32 heartbeatEnabled bool - highwayApplyUpSeq int32 + requestPacketRequestID atomic.Int32 + groupSeq atomic.Int32 + friendSeq atomic.Int32 + highwayApplyUpSeq atomic.Int32 eventHandlers *eventHandlers groupListLock sync.Mutex @@ -196,16 +196,11 @@ func NewClientMd5(uin int64, passwordMd5 [16]byte) *QQClient { cli := &QQClient{ Uin: uin, PasswordMd5: passwordMd5, - SequenceId: 0x3635, AllowSlider: true, RandomKey: make([]byte, 16), OutGoingPacketSessionId: []byte{0x02, 0xB0, 0x5B, 0x8B}, TCP: &utils.TCPListener{}, sigInfo: &loginSigInfo{}, - requestPacketRequestID: 1921334513, - groupSeq: int32(rand.Intn(20000)), - friendSeq: 22911, - highwayApplyUpSeq: 77918, eventHandlers: &eventHandlers{}, msgSvcCache: utils.NewCache(time.Second * 15), transCache: utils.NewCache(time.Second * 15), @@ -214,6 +209,13 @@ func NewClientMd5(uin int64, passwordMd5 [16]byte) *QQClient { alive: true, ecdh: crypto.NewEcdh(), } + { // init atomic values + cli.SequenceId.Store(0x3635) + cli.requestPacketRequestID.Store(1921334513) + cli.groupSeq.Store(int32(rand.Intn(20000))) + cli.friendSeq.Store(22911) + cli.highwayApplyUpSeq.Store(77918) + } cli.GuildService = &GuildService{c: cli} cli.ecdh.FetchPubKey(uin) cli.UseDevice(SystemDeviceInfo) @@ -277,7 +279,7 @@ func (c *QQClient) UseDevice(info *DeviceInfo) { } func (c *QQClient) Release() { - if c.Online { + if c.Online.Load() { c.Disconnect() } c.alive = false @@ -285,7 +287,7 @@ func (c *QQClient) Release() { // Login send login request func (c *QQClient) Login() (*LoginResponse, error) { - if c.Online { + if c.Online.Load() { return nil, ErrAlreadyOnline } err := c.connect() @@ -305,7 +307,7 @@ func (c *QQClient) Login() (*LoginResponse, error) { } func (c *QQClient) TokenLogin(token []byte) error { - if c.Online { + if c.Online.Load() { return ErrAlreadyOnline } err := c.connect() @@ -334,7 +336,7 @@ func (c *QQClient) TokenLogin(token []byte) error { } func (c *QQClient) FetchQRCode() (*QRCodeLoginResponse, error) { - if c.Online { + if c.Online.Load() { return nil, ErrAlreadyOnline } err := c.connect() @@ -778,39 +780,39 @@ func (c *QQClient) SetCustomServer(servers []*net.TCPAddr) { func (c *QQClient) registerClient() error { _, err := c.sendAndWait(c.buildClientRegisterPacket()) if err == nil { - c.Online = true + c.Online.Store(true) } return err } func (c *QQClient) nextSeq() uint16 { - return uint16(atomic.AddInt32(&c.SequenceId, 1) & 0x7FFF) + return uint16(c.SequenceId.Add(1) & 0x7FFF) } func (c *QQClient) nextPacketSeq() int32 { - return atomic.AddInt32(&c.requestPacketRequestID, 2) + return c.requestPacketRequestID.Add(2) } func (c *QQClient) nextGroupSeq() int32 { - return atomic.AddInt32(&c.groupSeq, 2) + return c.groupSeq.Add(2) } func (c *QQClient) nextFriendSeq() int32 { - return atomic.AddInt32(&c.friendSeq, 1) + return c.friendSeq.Add(1) } func (c *QQClient) nextQWebSeq() int64 { - return atomic.AddInt64(&c.qwebSeq, 1) + return c.qwebSeq.Add(1) } func (c *QQClient) nextHighwayApplySeq() int32 { - return atomic.AddInt32(&c.highwayApplyUpSeq, 2) + return c.highwayApplyUpSeq.Add(2) } func (c *QQClient) doHeartbeat() { c.heartbeatEnabled = true times := 0 - for c.Online { + for c.Online.Load() { time.Sleep(time.Second * 30) seq := c.nextSeq() sso := packets.BuildSsoPacket(seq, c.version.AppId, c.version.SubAppId, "Heartbeat.Alive", c.deviceInfo.IMEI, []byte{}, c.OutGoingPacketSessionId, []byte{}, c.ksid) diff --git a/client/network.go b/client/network.go index 9018696e..1628ae71 100644 --- a/client/network.go +++ b/client/network.go @@ -4,7 +4,6 @@ import ( "net" "runtime/debug" "sync" - "sync/atomic" "time" "github.com/pkg/errors" @@ -33,7 +32,7 @@ type ConnectionQualityInfo struct { } func (c *QQClient) ConnectionQualityTest() *ConnectionQualityInfo { - if !c.Online { + if !c.Online.Load() { return nil } r := &ConnectionQualityInfo{} @@ -102,19 +101,19 @@ func (c *QQClient) connect() error { } c.once.Do(func() { c.OnGroupMessage(func(_ *QQClient, _ *message.GroupMessage) { - atomic.AddUint64(&c.stat.MessageReceived, 1) - atomic.StoreInt64(&c.stat.LastMessageTime, time.Now().Unix()) + c.stat.MessageReceived.Add(1) + c.stat.LastMessageTime.Store(time.Now().Unix()) }) c.OnPrivateMessage(func(_ *QQClient, _ *message.PrivateMessage) { - atomic.AddUint64(&c.stat.MessageReceived, 1) - atomic.StoreInt64(&c.stat.LastMessageTime, time.Now().Unix()) + c.stat.MessageReceived.Add(1) + c.stat.LastMessageTime.Store(time.Now().Unix()) }) c.OnTempMessage(func(_ *QQClient, _ *TempMessageEvent) { - atomic.AddUint64(&c.stat.MessageReceived, 1) - atomic.StoreInt64(&c.stat.LastMessageTime, time.Now().Unix()) + c.stat.MessageReceived.Add(1) + c.stat.LastMessageTime.Store(time.Now().Unix()) }) c.onGroupMessageReceipt("internal", func(_ *QQClient, _ *groupMessageReceiptEvent) { - atomic.AddUint64(&c.stat.MessageSent, 1) + c.stat.MessageSent.Add(1) }) go c.netLoop() }) @@ -142,7 +141,7 @@ func (c *QQClient) quickReconnect() { // Disconnect 中断连接, 不释放资源 func (c *QQClient) Disconnect() { - c.Online = false + c.Online.Store(false) c.TCP.Close() } @@ -193,9 +192,9 @@ func (c *QQClient) sendAndWait(seq uint16, pkt []byte, params ...requestParams) func (c *QQClient) sendPacket(pkt []byte) error { err := c.TCP.Write(pkt) if err != nil { - atomic.AddUint64(&c.stat.PacketLost, 1) + c.stat.PacketLost.Add(1) } else { - atomic.AddUint64(&c.stat.PacketSent, 1) + c.stat.PacketSent.Add(1) } return errors.Wrap(err, "Packet failed to sendPacket") } @@ -251,15 +250,15 @@ func (c *QQClient) sendAndWaitDynamic(seq uint16, pkt []byte) ([]byte, error) { // plannedDisconnect 计划中断线事件 func (c *QQClient) plannedDisconnect(_ *utils.TCPListener) { c.Debug("planned disconnect.") - atomic.AddUint32(&c.stat.DisconnectTimes, 1) - c.Online = false + c.stat.DisconnectTimes.Add(1) + c.Online.Store(false) } // unexpectedDisconnect 非预期断线事件 func (c *QQClient) unexpectedDisconnect(_ *utils.TCPListener, e error) { c.Error("unexpected disconnect: %v", e) - atomic.AddUint32(&c.stat.DisconnectTimes, 1) - c.Online = false + c.stat.DisconnectTimes.Add(1) + c.Online.Store(false) if err := c.connect(); err != nil { c.Error("connect server error: %v", err) c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "connection dropped by server."}) @@ -317,7 +316,7 @@ func (c *QQClient) netLoop() { } errCount = 0 c.Debug("rev pkt: %v seq: %v", pkt.CommandName, pkt.SequenceId) - atomic.AddUint64(&c.stat.PacketReceived, 1) + c.stat.PacketReceived.Add(1) go func(pkt *packets.IncomingPacket) { defer func() { if pan := recover(); pan != nil { diff --git a/client/private_msg.go b/client/private_msg.go index 4aa9b30a..4313c168 100644 --- a/client/private_msg.go +++ b/client/private_msg.go @@ -2,7 +2,6 @@ package client import ( "math/rand" - "sync/atomic" "time" "github.com/pkg/errors" @@ -47,7 +46,7 @@ func (c *QQClient) SendPrivateMessage(target int64, m *message.SendingMessage) * _, pkt := c.buildFriendSendingPacket(target, seq, mr, 1, 0, 0, t, m.Elements) _ = c.sendPacket(pkt) } - atomic.AddUint64(&c.stat.MessageSent, 1) + c.stat.MessageSent.Add(1) ret := &message.PrivateMessage{ Id: seq, InternalId: mr, @@ -86,7 +85,7 @@ func (c *QQClient) SendGroupTempMessage(groupCode, target int64, m *message.Send t := time.Now().Unix() _, pkt := c.buildGroupTempSendingPacket(group.Uin, target, seq, mr, t, m) _ = c.sendPacket(pkt) - atomic.AddUint64(&c.stat.MessageSent, 1) + c.stat.MessageSent.Add(1) return &message.TempMessage{ Id: seq, GroupCode: group.Code, @@ -107,7 +106,7 @@ func (c *QQClient) sendWPATempMessage(target int64, sig []byte, m *message.Sendi t := time.Now().Unix() _, pkt := c.buildWPATempSendingPacket(target, sig, seq, mr, t, m) _ = c.sendPacket(pkt) - atomic.AddUint64(&c.stat.MessageSent, 1) + c.stat.MessageSent.Add(1) return &message.TempMessage{ Id: seq, Self: c.Uin, diff --git a/client/statistics.go b/client/statistics.go index 178603da..7daf871c 100644 --- a/client/statistics.go +++ b/client/statistics.go @@ -1,43 +1,18 @@ package client import ( - "bytes" - "strconv" - "sync/atomic" + "go.uber.org/atomic" ) type Statistics struct { - PacketReceived uint64 - PacketSent uint64 - PacketLost uint64 - MessageReceived uint64 - MessageSent uint64 - LastMessageTime int64 - DisconnectTimes uint32 - LostTimes uint32 -} - -func (s *Statistics) MarshalJSON() ([]byte, error) { - var w bytes.Buffer - w.Grow(256) - w.WriteString(`{"packet_received":`) - w.WriteString(strconv.FormatUint(atomic.LoadUint64(&s.PacketReceived), 10)) - w.WriteString(`,"packet_sent":`) - w.WriteString(strconv.FormatUint(atomic.LoadUint64(&s.PacketSent), 10)) - w.WriteString(`,"packet_lost":`) - w.WriteString(strconv.FormatUint(atomic.LoadUint64(&s.PacketLost), 10)) - w.WriteString(`,"message_received":`) - w.WriteString(strconv.FormatUint(atomic.LoadUint64(&s.MessageReceived), 10)) - w.WriteString(`,"message_sent":`) - w.WriteString(strconv.FormatUint(atomic.LoadUint64(&s.MessageSent), 10)) - w.WriteString(`,"disconnect_times":`) - w.WriteString(strconv.FormatUint(uint64(atomic.LoadUint32(&s.DisconnectTimes)), 10)) - w.WriteString(`,"lost_times":`) - w.WriteString(strconv.FormatUint(uint64(atomic.LoadUint32(&s.LostTimes)), 10)) - w.WriteString(`,"last_message_time":`) - w.WriteString(strconv.FormatInt(atomic.LoadInt64(&s.LastMessageTime), 10)) - w.WriteByte('}') - return w.Bytes(), nil + PacketReceived atomic.Uint64 + PacketSent atomic.Uint64 + PacketLost atomic.Uint64 + MessageReceived atomic.Uint64 + MessageSent atomic.Uint64 + LastMessageTime atomic.Int64 + DisconnectTimes atomic.Uint32 + LostTimes atomic.Uint32 } func (c *QQClient) GetStatistics() *Statistics { diff --git a/client/sync.go b/client/sync.go index 1e073376..a54d26b1 100644 --- a/client/sync.go +++ b/client/sync.go @@ -3,7 +3,6 @@ package client import ( "math/rand" "sync" - "sync/atomic" "time" "github.com/pkg/errors" @@ -150,7 +149,7 @@ func (c *QQClient) buildGetOfflineMsgRequestPacket() (uint16, []byte) { C2CMsg: &jce.SvcReqGetMsgV2{ Uin: c.Uin, DateTime: func() int32 { - t := atomic.LoadInt64(&c.stat.LastMessageTime) + t := c.stat.LastMessageTime.Load() if t == 0 { return 1 } @@ -219,7 +218,7 @@ func (c *QQClient) buildSyncMsgRequestPacket() (uint16, []byte) { C2CMsg: &jce.SvcReqGetMsgV2{ Uin: c.Uin, DateTime: func() int32 { - t := atomic.LoadInt64(&c.stat.LastMessageTime) + t := c.stat.LastMessageTime.Load() if t == 0 { return 1 } diff --git a/go.mod b/go.mod index 459f2f15..67ce77f9 100644 --- a/go.mod +++ b/go.mod @@ -9,12 +9,13 @@ require ( github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.7.0 github.com/tidwall/gjson v1.11.0 + go.uber.org/atomic v1.9.0 golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c ) require ( - github.com/davecgh/go-spew v1.1.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect diff --git a/go.sum b/go.sum index 6ef9c72a..2beda80c 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,8 @@ github.com/RomiChan/protobuf v0.0.0-20211204042931-ff4f35848737 h1:p4o7/eSoP39jwnGZz08N1IpH/mNzg9SdCn7kPM9A9BE= github.com/RomiChan/protobuf v0.0.0-20211204042931-ff4f35848737/go.mod h1:CKKOWC7mBxd36zxsCB1V8DTrwlTNRQvkSVbYqyUiGEE= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= @@ -13,6 +14,7 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/tidwall/gjson v1.11.0 h1:C16pk7tQNiH6VlCrtIXL1w8GaOsi1X3W8KDkE1BuYd4= @@ -21,6 +23,8 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9 h1:0qxwC5n+ttVOINCBeRHO0nq9X7uy8SDsPoi5OaCdIEI= golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=