1
0
mirror of https://github.com/Mrs4s/MiraiGo.git synced 2025-05-04 11:07:40 +08:00

fix: use go.uber.org/atomic to force atomic

This commit is contained in:
wdvxdr 2021-12-17 21:01:10 +08:00
parent 10bac416ce
commit cf49727531
No known key found for this signature in database
GPG Key ID: 703F8C071DE7A1B6
8 changed files with 66 additions and 88 deletions

View File

@ -4,7 +4,6 @@ package client
import (
"fmt"
"sync/atomic"
"time"
"github.com/pkg/errors"
@ -104,9 +103,9 @@ func privateMessageDecoder(c *QQClient, pMsg *msg.Message, _ *incomingPacketInfo
case 11, 175: // friend msg
if pMsg.Head.GetFromUin() == c.Uin {
for {
frdSeq := atomic.LoadInt32(&c.friendSeq)
frdSeq := c.friendSeq.Load()
if frdSeq < pMsg.Head.GetMsgSeq() {
if atomic.CompareAndSwapInt32(&c.friendSeq, frdSeq, pMsg.Head.GetMsgSeq()) {
if c.friendSeq.CAS(frdSeq, pMsg.Head.GetMsgSeq()) {
break
}
} else {

View File

@ -7,10 +7,10 @@ import (
"net"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"go.uber.org/atomic"
"github.com/Mrs4s/MiraiGo/binary"
"github.com/Mrs4s/MiraiGo/binary/jce"
@ -34,18 +34,18 @@ type QQClient struct {
AllowSlider bool
// account info
Online atomic.Bool
Nickname string
Age uint16
Gender uint16
FriendList []*FriendInfo
GroupList []*GroupInfo
OnlineClients []*OtherClientInfo
Online bool
QiDian *QiDianAccountInfo
GuildService *GuildService
// protocol public field
SequenceId int32
SequenceId atomic.Int32
OutGoingPacketSessionId []byte
RandomKey []byte
TCP *utils.TCPListener
@ -81,7 +81,7 @@ type QQClient struct {
ksid []byte
// session info
qwebSeq int64
qwebSeq atomic.Int64
sigInfo *loginSigInfo
highwaySession *highway.Session
dpwd []byte
@ -99,11 +99,11 @@ type QQClient struct {
groupSysMsgCache *GroupSystemMessages
groupMsgBuilders sync.Map
onlinePushCache *utils.Cache
requestPacketRequestID int32
groupSeq int32
friendSeq int32
heartbeatEnabled bool
highwayApplyUpSeq int32
requestPacketRequestID atomic.Int32
groupSeq atomic.Int32
friendSeq atomic.Int32
highwayApplyUpSeq atomic.Int32
eventHandlers *eventHandlers
groupListLock sync.Mutex
@ -196,16 +196,11 @@ func NewClientMd5(uin int64, passwordMd5 [16]byte) *QQClient {
cli := &QQClient{
Uin: uin,
PasswordMd5: passwordMd5,
SequenceId: 0x3635,
AllowSlider: true,
RandomKey: make([]byte, 16),
OutGoingPacketSessionId: []byte{0x02, 0xB0, 0x5B, 0x8B},
TCP: &utils.TCPListener{},
sigInfo: &loginSigInfo{},
requestPacketRequestID: 1921334513,
groupSeq: int32(rand.Intn(20000)),
friendSeq: 22911,
highwayApplyUpSeq: 77918,
eventHandlers: &eventHandlers{},
msgSvcCache: utils.NewCache(time.Second * 15),
transCache: utils.NewCache(time.Second * 15),
@ -214,6 +209,13 @@ func NewClientMd5(uin int64, passwordMd5 [16]byte) *QQClient {
alive: true,
ecdh: crypto.NewEcdh(),
}
{ // init atomic values
cli.SequenceId.Store(0x3635)
cli.requestPacketRequestID.Store(1921334513)
cli.groupSeq.Store(int32(rand.Intn(20000)))
cli.friendSeq.Store(22911)
cli.highwayApplyUpSeq.Store(77918)
}
cli.GuildService = &GuildService{c: cli}
cli.ecdh.FetchPubKey(uin)
cli.UseDevice(SystemDeviceInfo)
@ -277,7 +279,7 @@ func (c *QQClient) UseDevice(info *DeviceInfo) {
}
func (c *QQClient) Release() {
if c.Online {
if c.Online.Load() {
c.Disconnect()
}
c.alive = false
@ -285,7 +287,7 @@ func (c *QQClient) Release() {
// Login send login request
func (c *QQClient) Login() (*LoginResponse, error) {
if c.Online {
if c.Online.Load() {
return nil, ErrAlreadyOnline
}
err := c.connect()
@ -305,7 +307,7 @@ func (c *QQClient) Login() (*LoginResponse, error) {
}
func (c *QQClient) TokenLogin(token []byte) error {
if c.Online {
if c.Online.Load() {
return ErrAlreadyOnline
}
err := c.connect()
@ -334,7 +336,7 @@ func (c *QQClient) TokenLogin(token []byte) error {
}
func (c *QQClient) FetchQRCode() (*QRCodeLoginResponse, error) {
if c.Online {
if c.Online.Load() {
return nil, ErrAlreadyOnline
}
err := c.connect()
@ -778,39 +780,39 @@ func (c *QQClient) SetCustomServer(servers []*net.TCPAddr) {
func (c *QQClient) registerClient() error {
_, err := c.sendAndWait(c.buildClientRegisterPacket())
if err == nil {
c.Online = true
c.Online.Store(true)
}
return err
}
func (c *QQClient) nextSeq() uint16 {
return uint16(atomic.AddInt32(&c.SequenceId, 1) & 0x7FFF)
return uint16(c.SequenceId.Add(1) & 0x7FFF)
}
func (c *QQClient) nextPacketSeq() int32 {
return atomic.AddInt32(&c.requestPacketRequestID, 2)
return c.requestPacketRequestID.Add(2)
}
func (c *QQClient) nextGroupSeq() int32 {
return atomic.AddInt32(&c.groupSeq, 2)
return c.groupSeq.Add(2)
}
func (c *QQClient) nextFriendSeq() int32 {
return atomic.AddInt32(&c.friendSeq, 1)
return c.friendSeq.Add(1)
}
func (c *QQClient) nextQWebSeq() int64 {
return atomic.AddInt64(&c.qwebSeq, 1)
return c.qwebSeq.Add(1)
}
func (c *QQClient) nextHighwayApplySeq() int32 {
return atomic.AddInt32(&c.highwayApplyUpSeq, 2)
return c.highwayApplyUpSeq.Add(2)
}
func (c *QQClient) doHeartbeat() {
c.heartbeatEnabled = true
times := 0
for c.Online {
for c.Online.Load() {
time.Sleep(time.Second * 30)
seq := c.nextSeq()
sso := packets.BuildSsoPacket(seq, c.version.AppId, c.version.SubAppId, "Heartbeat.Alive", c.deviceInfo.IMEI, []byte{}, c.OutGoingPacketSessionId, []byte{}, c.ksid)

View File

@ -4,7 +4,6 @@ import (
"net"
"runtime/debug"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
@ -33,7 +32,7 @@ type ConnectionQualityInfo struct {
}
func (c *QQClient) ConnectionQualityTest() *ConnectionQualityInfo {
if !c.Online {
if !c.Online.Load() {
return nil
}
r := &ConnectionQualityInfo{}
@ -102,19 +101,19 @@ func (c *QQClient) connect() error {
}
c.once.Do(func() {
c.OnGroupMessage(func(_ *QQClient, _ *message.GroupMessage) {
atomic.AddUint64(&c.stat.MessageReceived, 1)
atomic.StoreInt64(&c.stat.LastMessageTime, time.Now().Unix())
c.stat.MessageReceived.Add(1)
c.stat.LastMessageTime.Store(time.Now().Unix())
})
c.OnPrivateMessage(func(_ *QQClient, _ *message.PrivateMessage) {
atomic.AddUint64(&c.stat.MessageReceived, 1)
atomic.StoreInt64(&c.stat.LastMessageTime, time.Now().Unix())
c.stat.MessageReceived.Add(1)
c.stat.LastMessageTime.Store(time.Now().Unix())
})
c.OnTempMessage(func(_ *QQClient, _ *TempMessageEvent) {
atomic.AddUint64(&c.stat.MessageReceived, 1)
atomic.StoreInt64(&c.stat.LastMessageTime, time.Now().Unix())
c.stat.MessageReceived.Add(1)
c.stat.LastMessageTime.Store(time.Now().Unix())
})
c.onGroupMessageReceipt("internal", func(_ *QQClient, _ *groupMessageReceiptEvent) {
atomic.AddUint64(&c.stat.MessageSent, 1)
c.stat.MessageSent.Add(1)
})
go c.netLoop()
})
@ -142,7 +141,7 @@ func (c *QQClient) quickReconnect() {
// Disconnect 中断连接, 不释放资源
func (c *QQClient) Disconnect() {
c.Online = false
c.Online.Store(false)
c.TCP.Close()
}
@ -193,9 +192,9 @@ func (c *QQClient) sendAndWait(seq uint16, pkt []byte, params ...requestParams)
func (c *QQClient) sendPacket(pkt []byte) error {
err := c.TCP.Write(pkt)
if err != nil {
atomic.AddUint64(&c.stat.PacketLost, 1)
c.stat.PacketLost.Add(1)
} else {
atomic.AddUint64(&c.stat.PacketSent, 1)
c.stat.PacketSent.Add(1)
}
return errors.Wrap(err, "Packet failed to sendPacket")
}
@ -251,15 +250,15 @@ func (c *QQClient) sendAndWaitDynamic(seq uint16, pkt []byte) ([]byte, error) {
// plannedDisconnect 计划中断线事件
func (c *QQClient) plannedDisconnect(_ *utils.TCPListener) {
c.Debug("planned disconnect.")
atomic.AddUint32(&c.stat.DisconnectTimes, 1)
c.Online = false
c.stat.DisconnectTimes.Add(1)
c.Online.Store(false)
}
// unexpectedDisconnect 非预期断线事件
func (c *QQClient) unexpectedDisconnect(_ *utils.TCPListener, e error) {
c.Error("unexpected disconnect: %v", e)
atomic.AddUint32(&c.stat.DisconnectTimes, 1)
c.Online = false
c.stat.DisconnectTimes.Add(1)
c.Online.Store(false)
if err := c.connect(); err != nil {
c.Error("connect server error: %v", err)
c.dispatchDisconnectEvent(&ClientDisconnectedEvent{Message: "connection dropped by server."})
@ -317,7 +316,7 @@ func (c *QQClient) netLoop() {
}
errCount = 0
c.Debug("rev pkt: %v seq: %v", pkt.CommandName, pkt.SequenceId)
atomic.AddUint64(&c.stat.PacketReceived, 1)
c.stat.PacketReceived.Add(1)
go func(pkt *packets.IncomingPacket) {
defer func() {
if pan := recover(); pan != nil {

View File

@ -2,7 +2,6 @@ package client
import (
"math/rand"
"sync/atomic"
"time"
"github.com/pkg/errors"
@ -47,7 +46,7 @@ func (c *QQClient) SendPrivateMessage(target int64, m *message.SendingMessage) *
_, pkt := c.buildFriendSendingPacket(target, seq, mr, 1, 0, 0, t, m.Elements)
_ = c.sendPacket(pkt)
}
atomic.AddUint64(&c.stat.MessageSent, 1)
c.stat.MessageSent.Add(1)
ret := &message.PrivateMessage{
Id: seq,
InternalId: mr,
@ -86,7 +85,7 @@ func (c *QQClient) SendGroupTempMessage(groupCode, target int64, m *message.Send
t := time.Now().Unix()
_, pkt := c.buildGroupTempSendingPacket(group.Uin, target, seq, mr, t, m)
_ = c.sendPacket(pkt)
atomic.AddUint64(&c.stat.MessageSent, 1)
c.stat.MessageSent.Add(1)
return &message.TempMessage{
Id: seq,
GroupCode: group.Code,
@ -107,7 +106,7 @@ func (c *QQClient) sendWPATempMessage(target int64, sig []byte, m *message.Sendi
t := time.Now().Unix()
_, pkt := c.buildWPATempSendingPacket(target, sig, seq, mr, t, m)
_ = c.sendPacket(pkt)
atomic.AddUint64(&c.stat.MessageSent, 1)
c.stat.MessageSent.Add(1)
return &message.TempMessage{
Id: seq,
Self: c.Uin,

View File

@ -1,43 +1,18 @@
package client
import (
"bytes"
"strconv"
"sync/atomic"
"go.uber.org/atomic"
)
type Statistics struct {
PacketReceived uint64
PacketSent uint64
PacketLost uint64
MessageReceived uint64
MessageSent uint64
LastMessageTime int64
DisconnectTimes uint32
LostTimes uint32
}
func (s *Statistics) MarshalJSON() ([]byte, error) {
var w bytes.Buffer
w.Grow(256)
w.WriteString(`{"packet_received":`)
w.WriteString(strconv.FormatUint(atomic.LoadUint64(&s.PacketReceived), 10))
w.WriteString(`,"packet_sent":`)
w.WriteString(strconv.FormatUint(atomic.LoadUint64(&s.PacketSent), 10))
w.WriteString(`,"packet_lost":`)
w.WriteString(strconv.FormatUint(atomic.LoadUint64(&s.PacketLost), 10))
w.WriteString(`,"message_received":`)
w.WriteString(strconv.FormatUint(atomic.LoadUint64(&s.MessageReceived), 10))
w.WriteString(`,"message_sent":`)
w.WriteString(strconv.FormatUint(atomic.LoadUint64(&s.MessageSent), 10))
w.WriteString(`,"disconnect_times":`)
w.WriteString(strconv.FormatUint(uint64(atomic.LoadUint32(&s.DisconnectTimes)), 10))
w.WriteString(`,"lost_times":`)
w.WriteString(strconv.FormatUint(uint64(atomic.LoadUint32(&s.LostTimes)), 10))
w.WriteString(`,"last_message_time":`)
w.WriteString(strconv.FormatInt(atomic.LoadInt64(&s.LastMessageTime), 10))
w.WriteByte('}')
return w.Bytes(), nil
PacketReceived atomic.Uint64
PacketSent atomic.Uint64
PacketLost atomic.Uint64
MessageReceived atomic.Uint64
MessageSent atomic.Uint64
LastMessageTime atomic.Int64
DisconnectTimes atomic.Uint32
LostTimes atomic.Uint32
}
func (c *QQClient) GetStatistics() *Statistics {

View File

@ -3,7 +3,6 @@ package client
import (
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
@ -150,7 +149,7 @@ func (c *QQClient) buildGetOfflineMsgRequestPacket() (uint16, []byte) {
C2CMsg: &jce.SvcReqGetMsgV2{
Uin: c.Uin,
DateTime: func() int32 {
t := atomic.LoadInt64(&c.stat.LastMessageTime)
t := c.stat.LastMessageTime.Load()
if t == 0 {
return 1
}
@ -219,7 +218,7 @@ func (c *QQClient) buildSyncMsgRequestPacket() (uint16, []byte) {
C2CMsg: &jce.SvcReqGetMsgV2{
Uin: c.Uin,
DateTime: func() int32 {
t := atomic.LoadInt64(&c.stat.LastMessageTime)
t := c.stat.LastMessageTime.Load()
if t == 0 {
return 1
}

3
go.mod
View File

@ -9,12 +9,13 @@ require (
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0
github.com/tidwall/gjson v1.11.0
go.uber.org/atomic v1.9.0
golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
)
require (
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect

6
go.sum
View File

@ -1,7 +1,8 @@
github.com/RomiChan/protobuf v0.0.0-20211204042931-ff4f35848737 h1:p4o7/eSoP39jwnGZz08N1IpH/mNzg9SdCn7kPM9A9BE=
github.com/RomiChan/protobuf v0.0.0-20211204042931-ff4f35848737/go.mod h1:CKKOWC7mBxd36zxsCB1V8DTrwlTNRQvkSVbYqyUiGEE=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
@ -13,6 +14,7 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tidwall/gjson v1.11.0 h1:C16pk7tQNiH6VlCrtIXL1w8GaOsi1X3W8KDkE1BuYd4=
@ -21,6 +23,8 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9 h1:0qxwC5n+ttVOINCBeRHO0nq9X7uy8SDsPoi5OaCdIEI=
golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=