From ffb9cc5bffdfdadf3f4493aa7b6bb61b3e44c29d Mon Sep 17 00:00:00 2001 From: Mrs4s Date: Thu, 4 Mar 2021 16:05:18 +0800 Subject: [PATCH] optimize login speed. --- client/c2c_processor.go | 32 +++++++++--------- client/client.go | 39 +++++++++++++++++----- client/decoders.go | 4 +-- client/global.go | 14 ++++++++ client/handler_map_gen.go | 68 +++++++++------------------------------ client/sync.go | 5 ++- go.mod | 1 + go.sum | 4 +++ 8 files changed, 86 insertions(+), 81 deletions(-) diff --git a/client/c2c_processor.go b/client/c2c_processor.go index 1e18a1b0..c6e77ee2 100644 --- a/client/c2c_processor.go +++ b/client/c2c_processor.go @@ -13,7 +13,7 @@ import ( "google.golang.org/protobuf/proto" ) -var c2cDecoders = map[int32]func(*QQClient, *msg.Message, *c2cExtraOption){ +var c2cDecoders = map[int32]func(*QQClient, *msg.Message, *incomingPacketInfo){ 33: troopAddMemberBroadcastDecoder, 35: troopSystemMessageDecoder, 36: troopSystemMessageDecoder, 37: troopSystemMessageDecoder, 45: troopSystemMessageDecoder, 46: troopSystemMessageDecoder, 84: troopSystemMessageDecoder, @@ -25,11 +25,7 @@ var c2cDecoders = map[int32]func(*QQClient, *msg.Message, *c2cExtraOption){ 529: msgType0x211Decoder, } -type c2cExtraOption struct { - UsedRegProxy bool -} - -func (c *QQClient) c2cMessageSyncProcessor(rsp *msg.GetMessageResponse, opt *c2cExtraOption) { +func (c *QQClient) c2cMessageSyncProcessor(rsp *msg.GetMessageResponse, info *incomingPacketInfo) { c.syncCookie = rsp.SyncCookie c.pubAccountCookie = rsp.PubAccountCookie c.msgCtrlBuf = rsp.MsgCtrlBuf @@ -59,8 +55,11 @@ func (c *QQClient) c2cMessageSyncProcessor(rsp *msg.GetMessageResponse, opt *c2c continue } c.msgSvcCache.Add(strKey, "", time.Minute*5) + if info.Params.bool("init") { + continue + } if decoder, ok := c2cDecoders[pMsg.Head.GetMsgType()]; ok { - decoder(c, pMsg, opt) + decoder(c, pMsg, info) } /* switch pMsg.Head.GetMsgType() { @@ -150,11 +149,12 @@ func (c *QQClient) c2cMessageSyncProcessor(rsp *msg.GetMessageResponse, opt *c2c _, _ = c.sendAndWait(c.buildDeleteMessageRequestPacket(delItems)) if rsp.GetSyncFlag() != msg.SyncFlag_STOP { c.Debug("continue sync with flag: %v", rsp.SyncFlag.String()) - _, _ = c.sendAndWait(c.buildGetMessageRequestPacket(rsp.GetSyncFlag(), time.Now().Unix())) + seq, pkt := c.buildGetMessageRequestPacket(rsp.GetSyncFlag(), time.Now().Unix()) + _, _ = c.sendAndWait(seq, pkt, info.Params) } } -func privateMessageDecoder(c *QQClient, pMsg *msg.Message, _ *c2cExtraOption) { +func privateMessageDecoder(c *QQClient, pMsg *msg.Message, _ *incomingPacketInfo) { if pMsg.Head.GetFromUin() == c.Uin { for { frdSeq := atomic.LoadInt32(&c.friendSeq) @@ -173,7 +173,7 @@ func privateMessageDecoder(c *QQClient, pMsg *msg.Message, _ *c2cExtraOption) { c.dispatchFriendMessage(c.parsePrivateMessage(pMsg)) } -func privatePttDecoder(c *QQClient, pMsg *msg.Message, _ *c2cExtraOption) { +func privatePttDecoder(c *QQClient, pMsg *msg.Message, _ *incomingPacketInfo) { if pMsg.Body == nil || pMsg.Body.RichText == nil || pMsg.Body.RichText.Ptt == nil { return } @@ -184,7 +184,7 @@ func privatePttDecoder(c *QQClient, pMsg *msg.Message, _ *c2cExtraOption) { c.dispatchFriendMessage(c.parsePrivateMessage(pMsg)) } -func tempSessionDecoder(c *QQClient, pMsg *msg.Message, _ *c2cExtraOption) { +func tempSessionDecoder(c *QQClient, pMsg *msg.Message, _ *incomingPacketInfo) { if pMsg.Head.C2CTmpMsgHead == nil || pMsg.Body == nil { return } @@ -200,7 +200,7 @@ func tempSessionDecoder(c *QQClient, pMsg *msg.Message, _ *c2cExtraOption) { } } -func troopAddMemberBroadcastDecoder(c *QQClient, pMsg *msg.Message, _ *c2cExtraOption) { +func troopAddMemberBroadcastDecoder(c *QQClient, pMsg *msg.Message, _ *incomingPacketInfo) { groupJoinLock.Lock() defer groupJoinLock.Unlock() group := c.FindGroupByUin(pMsg.Head.GetFromUin()) @@ -226,13 +226,13 @@ func troopAddMemberBroadcastDecoder(c *QQClient, pMsg *msg.Message, _ *c2cExtraO } } -func systemMessageDecoder(c *QQClient, pMsg *msg.Message, _ *c2cExtraOption) { +func systemMessageDecoder(c *QQClient, pMsg *msg.Message, _ *incomingPacketInfo) { _, pkt := c.buildSystemMsgNewFriendPacket() _ = c.send(pkt) } -func troopSystemMessageDecoder(c *QQClient, pMsg *msg.Message, opt *c2cExtraOption) { - if !opt.UsedRegProxy && pMsg.Head.GetMsgType() != 85 && pMsg.Head.GetMsgType() != 36 { +func troopSystemMessageDecoder(c *QQClient, pMsg *msg.Message, info *incomingPacketInfo) { + if !info.Params.bool("used_reg_proxy") && pMsg.Head.GetMsgType() != 85 && pMsg.Head.GetMsgType() != 36 { c.exceptAndDispatchGroupSysMsg() } if len(pMsg.Body.GetMsgContent()) == 0 { @@ -246,7 +246,7 @@ func troopSystemMessageDecoder(c *QQClient, pMsg *msg.Message, opt *c2cExtraOpti } } -func msgType0x211Decoder(c *QQClient, pMsg *msg.Message, _ *c2cExtraOption) { +func msgType0x211Decoder(c *QQClient, pMsg *msg.Message, _ *incomingPacketInfo) { sub4 := msg.SubMsgType0X4Body{} if err := proto.Unmarshal(pMsg.Body.MsgContent, &sub4); err != nil { err = errors.Wrap(err, "unmarshal sub msg 0x4 error") diff --git a/client/client.go b/client/client.go index 18931b43..e4d66366 100644 --- a/client/client.go +++ b/client/client.go @@ -28,7 +28,7 @@ import ( var json = jsoniter.ConfigFastest -//go:generate go run github.com/a8m/syncmap -o "handler_map_gen.go" -pkg client -name HandlerMap "map[uint16]func(i interface{}, err error)" +//go:generate go run github.com/a8m/syncmap -o "handler_map_gen.go" -pkg client -name HandlerMap "map[uint16]*handlerInfo" type QQClient struct { Uin int64 @@ -121,6 +121,11 @@ type loginSigInfo struct { pt4TokenMap map[string][]byte } +type handlerInfo struct { + fun func(i interface{}, err error) + params requestParams +} + var decoders = map[string]func(*QQClient, *incomingPacketInfo, []byte) (interface{}, error){ "wtlogin.login": decodeLoginResponse, "wtlogin.exchange_emp": decodeExchangeEmpResponse, @@ -313,7 +318,10 @@ func (c *QQClient) init() { go c.doHeartbeat() } _ = c.RefreshStatus() - _, _ = c.sendAndWait(c.buildGetMessageRequestPacket(msg.SyncFlag_START, time.Now().Unix())) + go func() { + seq, pkt := c.buildGetMessageRequestPacket(msg.SyncFlag_START, time.Now().Unix()) + _, _ = c.sendAndWait(seq, pkt, requestParams{"used_reg_proxy": true, "init": true}) + }() _, _ = c.SyncSessions() c.stat.once.Do(func() { c.OnGroupMessage(func(_ *QQClient, _ *message.GroupMessage) { @@ -796,7 +804,7 @@ func (c *QQClient) send(pkt []byte) error { return errors.Wrap(err, "Packet failed to send") } -func (c *QQClient) sendAndWait(seq uint16, pkt []byte) (interface{}, error) { +func (c *QQClient) sendAndWait(seq uint16, pkt []byte, params ...requestParams) (interface{}, error) { type T struct { Response interface{} Error error @@ -809,12 +817,20 @@ func (c *QQClient) sendAndWait(seq uint16, pkt []byte) (interface{}, error) { ch := make(chan T) defer close(ch) - c.handlers.Store(seq, func(i interface{}, err error) { + + p := func() requestParams { + if len(params) == 0 { + return nil + } + return params[0] + }() + + c.handlers.Store(seq, &handlerInfo{fun: func(i interface{}, err error) { ch <- T{ Response: i, Error: err, } - }) + }, params: p}) retry := 0 for true { @@ -916,21 +932,28 @@ func (c *QQClient) netLoop() { if decoder, ok := decoders[pkt.CommandName]; ok { // found predefined decoder + info, ok := c.handlers.LoadAndDelete(pkt.SequenceId) rsp, err := decoder(c, &incomingPacketInfo{ SequenceId: pkt.SequenceId, CommandName: pkt.CommandName, + Params: func() requestParams { + if !ok { + return nil + } + return info.params + }(), }, payload) if err != nil { c.Debug("decode pkt %v error: %+v", pkt.CommandName, err) } - if f, ok := c.handlers.LoadAndDelete(pkt.SequenceId); ok { - f(rsp, err) + if ok { + info.fun(rsp, err) } else if f, ok := c.waiters.Load(pkt.CommandName); ok { // 在不存在handler的情况下触发wait f.(func(interface{}, error))(rsp, err) } } else if f, ok := c.handlers.LoadAndDelete(pkt.SequenceId); ok { // does not need decoder - f(nil, nil) + f.fun(nil, nil) } else { c.Debug("Unhandled Command: %s\nSeq: %d\nThis message can be ignored.", pkt.CommandName, pkt.SequenceId) } diff --git a/client/decoders.go b/client/decoders.go index 11e126c0..caaa341b 100644 --- a/client/decoders.go +++ b/client/decoders.go @@ -288,13 +288,13 @@ func decodePushReqPacket(c *QQClient, _ *incomingPacketInfo, payload []byte) (in } // MessageSvc.PbGetMsg -func decodeMessageSvcPacket(c *QQClient, _ *incomingPacketInfo, payload []byte) (interface{}, error) { +func decodeMessageSvcPacket(c *QQClient, info *incomingPacketInfo, payload []byte) (interface{}, error) { rsp := msg.GetMessageResponse{} err := proto.Unmarshal(payload, &rsp) if err != nil { return nil, errors.Wrap(err, "failed to unmarshal protobuf message") } - c.c2cMessageSyncProcessor(&rsp, &c2cExtraOption{UsedRegProxy: false}) + c.c2cMessageSyncProcessor(&rsp, info) return nil, nil } diff --git a/client/global.go b/client/global.go index 4f462e3c..89e66ef4 100644 --- a/client/global.go +++ b/client/global.go @@ -117,7 +117,10 @@ type ( incomingPacketInfo struct { CommandName string SequenceId uint16 + Params requestParams } + + requestParams map[string]interface{} ) // default @@ -584,6 +587,17 @@ func genLongTemplate(resId, brief string, ts int64) *message.ServiceElement { } } +func (p requestParams) bool(k string) bool { + if p == nil { + return false + } + i, ok := p[k] + if !ok { + return false + } + return i.(bool) +} + func (c *QQClient) packOIDBPackage(cmd, serviceType int32, body []byte) []byte { pkg := &oidb.OIDBSSOPkg{ Command: cmd, diff --git a/client/handler_map_gen.go b/client/handler_map_gen.go index 2d9f5b8c..d88a52fb 100644 --- a/client/handler_map_gen.go +++ b/client/handler_map_gen.go @@ -70,9 +70,7 @@ type readOnlyHandlerMap struct { // expunged is an arbitrary pointer that marks entries which have been deleted // from the dirty map. -var expungedHandlerMap = unsafe.Pointer(new(func(i interface{}, err error, - -))) +var expungedHandlerMap = unsafe.Pointer(new(*handlerInfo)) // An entry is a slot in the map corresponding to a particular key. type entryHandlerMap struct { @@ -97,18 +95,14 @@ type entryHandlerMap struct { p unsafe.Pointer // *interface{} } -func newEntryHandlerMap(i func(i interface{}, err error, - -)) *entryHandlerMap { +func newEntryHandlerMap(i *handlerInfo) *entryHandlerMap { return &entryHandlerMap{p: unsafe.Pointer(&i)} } // Load returns the value stored in the map for a key, or nil if no // value is present. // The ok result indicates whether value was found in the map. -func (m *HandlerMap) Load(key uint16) (value func(i interface{}, err error, - -), ok bool) { +func (m *HandlerMap) Load(key uint16) (value *handlerInfo, ok bool) { read, _ := m.read.Load().(readOnlyHandlerMap) e, ok := read.m[key] if !ok && read.amended { @@ -133,22 +127,16 @@ func (m *HandlerMap) Load(key uint16) (value func(i interface{}, err error, return e.load() } -func (e *entryHandlerMap) load() (value func(i interface{}, err error, - -), ok bool) { +func (e *entryHandlerMap) load() (value *handlerInfo, ok bool) { p := atomic.LoadPointer(&e.p) if p == nil || p == expungedHandlerMap { return value, false } - return *(*func(i interface{}, err error, - - ))(p), true + return *(**handlerInfo)(p), true } // Store sets the value for a key. -func (m *HandlerMap) Store(key uint16, value func(i interface{}, err error, - -)) { +func (m *HandlerMap) Store(key uint16, value *handlerInfo) { read, _ := m.read.Load().(readOnlyHandlerMap) if e, ok := read.m[key]; ok && e.tryStore(&value) { return @@ -181,9 +169,7 @@ func (m *HandlerMap) Store(key uint16, value func(i interface{}, err error, // // If the entry is expunged, tryStore returns false and leaves the entry // unchanged. -func (e *entryHandlerMap) tryStore(i *func(i interface{}, err error, - -)) bool { +func (e *entryHandlerMap) tryStore(i **handlerInfo) bool { for { p := atomic.LoadPointer(&e.p) if p == expungedHandlerMap { @@ -206,20 +192,14 @@ func (e *entryHandlerMap) unexpungeLocked() (wasExpunged bool) { // storeLocked unconditionally stores a value to the entry. // // The entry must be known not to be expunged. -func (e *entryHandlerMap) storeLocked(i *func(i interface{}, err error, - -)) { +func (e *entryHandlerMap) storeLocked(i **handlerInfo) { atomic.StorePointer(&e.p, unsafe.Pointer(i)) } // LoadOrStore returns the existing value for the key if present. // Otherwise, it stores and returns the given value. // The loaded result is true if the value was loaded, false if stored. -func (m *HandlerMap) LoadOrStore(key uint16, value func(i interface{}, err error, - -)) (actual func(i interface{}, err error, - -), loaded bool) { +func (m *HandlerMap) LoadOrStore(key uint16, value *handlerInfo) (actual *handlerInfo, loaded bool) { // Avoid locking if it's a clean hit. read, _ := m.read.Load().(readOnlyHandlerMap) if e, ok := read.m[key]; ok { @@ -259,19 +239,13 @@ func (m *HandlerMap) LoadOrStore(key uint16, value func(i interface{}, err error // // If the entry is expunged, tryLoadOrStore leaves the entry unchanged and // returns with ok==false. -func (e *entryHandlerMap) tryLoadOrStore(i func(i interface{}, err error, - -)) (actual func(i interface{}, err error, - -), loaded, ok bool) { +func (e *entryHandlerMap) tryLoadOrStore(i *handlerInfo) (actual *handlerInfo, loaded, ok bool) { p := atomic.LoadPointer(&e.p) if p == expungedHandlerMap { return actual, false, false } if p != nil { - return *(*func(i interface{}, err error, - - ))(p), true, true + return *(**handlerInfo)(p), true, true } // Copy the interface after the first load to make this method more amenable @@ -287,18 +261,14 @@ func (e *entryHandlerMap) tryLoadOrStore(i func(i interface{}, err error, return actual, false, false } if p != nil { - return *(*func(i interface{}, err error, - - ))(p), true, true + return *(**handlerInfo)(p), true, true } } } // LoadAndDelete deletes the value for a key, returning the previous value if any. // The loaded result reports whether the key was present. -func (m *HandlerMap) LoadAndDelete(key uint16) (value func(i interface{}, err error, - -), loaded bool) { +func (m *HandlerMap) LoadAndDelete(key uint16) (value *handlerInfo, loaded bool) { read, _ := m.read.Load().(readOnlyHandlerMap) e, ok := read.m[key] if !ok && read.amended { @@ -326,18 +296,14 @@ func (m *HandlerMap) Delete(key uint16) { m.LoadAndDelete(key) } -func (e *entryHandlerMap) delete() (value func(i interface{}, err error, - -), ok bool) { +func (e *entryHandlerMap) delete() (value *handlerInfo, ok bool) { for { p := atomic.LoadPointer(&e.p) if p == nil || p == expungedHandlerMap { return value, false } if atomic.CompareAndSwapPointer(&e.p, p, nil) { - return *(*func(i interface{}, err error, - - ))(p), true + return *(**handlerInfo)(p), true } } } @@ -352,9 +318,7 @@ func (e *entryHandlerMap) delete() (value func(i interface{}, err error, // // Range may be O(N) with the number of elements in the map even if f returns // false after a constant number of calls. -func (m *HandlerMap) Range(f func(key uint16, value func(i interface{}, err error, - -)) bool) { +func (m *HandlerMap) Range(f func(key uint16, value *handlerInfo) bool) { // We need to be able to iterate over all of the keys that were already // present at the start of the call to Range. // If read.amended is false, then read.m satisfies that property without diff --git a/client/sync.go b/client/sync.go index c60be7d2..ffa3833f 100644 --- a/client/sync.go +++ b/client/sync.go @@ -29,7 +29,6 @@ func init() { } type ( - // SessionSyncResponse 会话同步结果 SessionSyncResponse struct { GroupSessions []*GroupSessionInfo @@ -342,7 +341,7 @@ func decodePushParamPacket(c *QQClient, _ *incomingPacketInfo, payload []byte) ( } // RegPrxySvc.PbSyncMsg -func decodeMsgSyncResponse(c *QQClient, _ *incomingPacketInfo, payload []byte) (interface{}, error) { +func decodeMsgSyncResponse(c *QQClient, info *incomingPacketInfo, payload []byte) (interface{}, error) { rsp := &msf.SvcRegisterProxyMsgResp{} if err := proto.Unmarshal(payload, rsp); err != nil { return nil, err @@ -376,7 +375,7 @@ func decodeMsgSyncResponse(c *QQClient, _ *incomingPacketInfo, payload []byte) ( if len(rsp.C2CMsg) > 4 { c2cRsp := &msg.GetMessageResponse{} if proto.Unmarshal(rsp.C2CMsg[4:], c2cRsp) == nil { - c.c2cMessageSyncProcessor(c2cRsp, &c2cExtraOption{UsedRegProxy: true}) + c.c2cMessageSyncProcessor(c2cRsp, info) } } return ret, nil diff --git a/go.mod b/go.mod index 41ee143e..595b4c09 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/Mrs4s/MiraiGo go 1.15 require ( + github.com/a8m/syncmap v0.0.0-20200818084611-4bbbd178de97 // indirect github.com/golang/protobuf v1.4.3 github.com/json-iterator/go v1.1.10 github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 diff --git a/go.sum b/go.sum index 0f710ceb..99881c34 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/a8m/syncmap v0.0.0-20200818084611-4bbbd178de97 h1:QJIAdw5m5tNUy7fjBxgg73+YUs/AkeESeqdJ1L3lN10= +github.com/a8m/syncmap v0.0.0-20200818084611-4bbbd178de97/go.mod h1:f3iF7/3t9i9hsYF8DPgT0XeIVyNzevhMCKf2445Q6pE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -59,6 +61,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190501045030-23463209683d/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 h1:5Beo0mZN8dRzgrMMkDp0jc8YXQKx9DiJ2k1dkvGsn5A= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=