From 9bd6b38f909d5c2c0a87a6b9e9ac02f19db15144 Mon Sep 17 00:00:00 2001 From: wdvxdr Date: Sat, 25 Dec 2021 21:05:11 +0800 Subject: [PATCH] refactor: handle params in request --- binary/reader.go | 4 - client/builders.go | 5 +- client/c2c_processor.go | 5 +- client/client.go | 25 +- client/decoders.go | 8 +- client/group_msg.go | 23 +- client/guild.go | 6 +- client/guild_msg.go | 4 +- client/handler_map_gen.go | 387 --------------------------- client/internal/network/request.go | 2 + client/internal/network/response.go | 2 +- client/internal/network/rpc.go | 7 +- client/internal/network/transport.go | 9 +- client/network.go | 97 +------ client/packet.go | 20 +- message/elements.go | 6 +- message/message.go | 2 +- 17 files changed, 51 insertions(+), 561 deletions(-) delete mode 100644 client/handler_map_gen.go diff --git a/binary/reader.go b/binary/reader.go index f11c608f..62ea76fc 100644 --- a/binary/reader.go +++ b/binary/reader.go @@ -120,10 +120,6 @@ func (r *Reader) Len() int { return r.buf.Len() } -func (r *Reader) Index() int64 { - return r.buf.Size() -} - func (tlv TlvMap) Exists(key uint16) bool { _, ok := tlv[key] return ok diff --git a/client/builders.go b/client/builders.go index bd35c216..0c108da1 100644 --- a/client/builders.go +++ b/client/builders.go @@ -815,7 +815,7 @@ func (c *QQClient) buildGroupMemberInfoRequest(groupCode, uin int64) *network.Re } // MessageSvc.PbGetMsg -func (c *QQClient) buildGetMessageRequestPacket(flag msg.SyncFlag, msgTime int64) (uint16, []byte) { +func (c *QQClient) buildGetMessageRequest(flag msg.SyncFlag, msgTime int64) *network.Request { cook := c.sig.SyncCookie if cook == nil { cook, _ = proto.Marshal(&msg.SyncCookie{ @@ -840,8 +840,7 @@ func (c *QQClient) buildGetMessageRequestPacket(flag msg.SyncFlag, msgTime int64 ServerBuf: EmptyBytes, } payload, _ := proto.Marshal(req) - req2 := c.uniRequest("MessageSvc.PbGetMsg", payload) - return uint16(req2.SequenceID), c.transport.PackPacket(req2) + return c.uniRequest("MessageSvc.PbGetMsg", payload) } // MessageSvc.PbDeleteMsg diff --git a/client/c2c_processor.go b/client/c2c_processor.go index 3c9e85f6..4cf122ea 100644 --- a/client/c2c_processor.go +++ b/client/c2c_processor.go @@ -72,8 +72,9 @@ func (c *QQClient) c2cMessageSyncProcessor(rsp *msg.GetMessageResponse, resp *ne } if rsp.GetSyncFlag() != msg.SyncFlag_STOP { c.Debug("continue sync with flag: %v", rsp.SyncFlag) - seq, pkt := c.buildGetMessageRequestPacket(rsp.GetSyncFlag(), time.Now().Unix()) - _, _ = c.sendAndWaitParams(seq, pkt, resp.Params) + req := c.buildGetMessageRequest(rsp.GetSyncFlag(), time.Now().Unix()) + req.Params = resp.Params + _, _ = c.callAndDecode(req, decodeMessageSvcPacket) } } diff --git a/client/client.go b/client/client.go index aa8c49bf..6ce36024 100644 --- a/client/client.go +++ b/client/client.go @@ -23,8 +23,6 @@ import ( "github.com/Mrs4s/MiraiGo/utils" ) -//go:generate go run github.com/a8m/syncmap -o "handler_map_gen.go" -pkg client -name HandlerMap "map[uint16]*handlerInfo" - type QQClient struct { Uin int64 PasswordMd5 [16]byte @@ -60,7 +58,6 @@ type QQClient struct { oicq *oicq.Codec // internal state - handlers HandlerMap waiters sync.Map servers []*net.TCPAddr currServerIndex int @@ -106,19 +103,6 @@ type QiDianAccountInfo struct { bigDataReqSession *bigDataSessionInfo } -type handlerInfo struct { - fun func(i interface{}, err error) - dynamic bool - params network.RequestParams -} - -func (h *handlerInfo) getParams() network.RequestParams { - if h == nil { - return nil - } - return h.params -} - var decoders = map[string]func(*QQClient, *network.Response) (interface{}, error){ "StatSvc.ReqMSFOffline": decodeMSFOfflinePacket, "MessageSvc.PushNotify": decodeSvcNotify, @@ -126,7 +110,6 @@ var decoders = map[string]func(*QQClient, *network.Response) (interface{}, error "OnlinePush.PbPushTransMsg": decodeOnlinePushTransPacket, "OnlinePush.SidTicketExpired": decodeSidExpiredPacket, "ConfigPushSvc.PushReq": decodePushReqPacket, - "MessageSvc.PbGetMsg": decodeMessageSvcPacket, "MessageSvc.PushForceOffline": decodeForceOfflinePacket, } @@ -426,8 +409,9 @@ func (c *QQClient) init(tokenLogin bool) error { _, _ = c.callAndDecode(c.buildLoginExtraPacket(), decodeLoginExtraResponse) // 小登录 _, _ = c.callAndDecode(c.buildConnKeyRequestPacket(), decodeConnKeyResponse) // big data key 如果等待 config push 的话时间来不及 } - seq, pkt := c.buildGetMessageRequestPacket(msg.SyncFlag_START, time.Now().Unix()) - _, _ = c.sendAndWaitParams(seq, pkt, network.RequestParams{"used_reg_proxy": true, "init": true}) + req := c.buildGetMessageRequest(msg.SyncFlag_START, time.Now().Unix()) + req.Params = network.Params{"used_reg_proxy": true, "init": true} + _, _ = c.callAndDecode(req, decodeMessageSvcPacket) c.syncChannelFirstView() return nil } @@ -794,8 +778,7 @@ func (c *QQClient) doHeartbeat() { CommandName: "Heartbeat.Alive", Body: EmptyBytes, } - packet := c.transport.PackPacket(&req) - _, err := c.sendAndWait(seq, packet) + _, err := c.call(&req) if errors.Is(err, network.ErrConnectionClosed) { continue } diff --git a/client/decoders.go b/client/decoders.go index 10813e0a..4d210b89 100644 --- a/client/decoders.go +++ b/client/decoders.go @@ -387,7 +387,7 @@ func decodeSvcNotify(c *QQClient, resp *network.Response) (interface{}, error) { data := &jce.RequestDataVersion2{} data.ReadFrom(jce.NewJceReader(request.SBuffer)) if len(data.Map) == 0 { - _, err := c.sendAndWait(c.buildGetMessageRequestPacket(msg.SyncFlag_START, time.Now().Unix())) + _, err := c.callAndDecode(c.buildGetMessageRequest(msg.SyncFlag_START, time.Now().Unix()), decodeMessageSvcPacket) return nil, err } notify := &jce.RequestPushNotify{} @@ -403,7 +403,7 @@ func decodeSvcNotify(c *QQClient, resp *network.Response) (interface{}, error) { return nil, err } } - _, err := c.sendAndWait(c.buildGetMessageRequestPacket(msg.SyncFlag_START, time.Now().Unix())) + _, err := c.callAndDecode(c.buildGetMessageRequest(msg.SyncFlag_START, time.Now().Unix()), decodeMessageSvcPacket) return nil, err } @@ -824,7 +824,3 @@ func decodeAppInfoResponse(_ *QQClient, _ *incomingPacketInfo) (interface{}, err return rsp.AppInfo, nil } */ - -func ignoreDecoder(_ *QQClient, _ *network.Response) (interface{}, error) { - return nil, nil -} diff --git a/client/group_msg.go b/client/group_msg.go index 846c27bd..9d24c420 100644 --- a/client/group_msg.go +++ b/client/group_msg.go @@ -26,8 +26,6 @@ import ( func init() { decoders["OnlinePush.PbPushGroupMsg"] = decodeGroupMessagePacket - decoders["MessageSvc.PbSendMsg"] = decodeMsgSendResponse - decoders["MessageSvc.PbGetGroupMsg"] = decodeGetGroupMsgResponse } // SendGroupMessage 发送群消息 @@ -78,7 +76,8 @@ func (c *QQClient) SendGroupForwardMessage(groupCode int64, m *message.ForwardEl // GetGroupMessages 从服务器获取历史信息 func (c *QQClient) GetGroupMessages(groupCode, beginSeq, endSeq int64) ([]*message.GroupMessage, error) { req := c.buildGetGroupMsgRequest(groupCode, beginSeq, endSeq) - i, err := c.sendAndWaitParams(uint16(req.SequenceID), c.transport.PackPacket(req), network.RequestParams{"raw": false}) + req.Params = network.Params{"raw": false} + i, err := c.callAndDecode(req, decodeGetGroupMsgResponse) if err != nil { return nil, err } @@ -335,21 +334,6 @@ func decodeGroupMessagePacket(c *QQClient, resp *network.Response) (interface{}, return nil, nil } -func decodeMsgSendResponse(c *QQClient, resp *network.Response) (interface{}, error) { - rsp := msg.SendMessageResponse{} - if err := proto.Unmarshal(resp.Body, &rsp); err != nil { - return nil, errors.Wrap(err, "failed to unmarshal protobuf message") - } - switch rsp.GetResult() { - case 0: // OK. - case 55: - c.Error("sendPacket msg error: %v Bot has blocked target's content", rsp.GetResult()) - default: - c.Error("sendPacket msg error: %v %v", rsp.GetResult(), rsp.GetErrMsg()) - } - return nil, nil -} - func decodeGetGroupMsgResponse(c *QQClient, resp *network.Response) (interface{}, error) { rsp := msg.GetGroupMsgResp{} if err := proto.Unmarshal(resp.Body, &rsp); err != nil { @@ -372,7 +356,8 @@ func decodeGetGroupMsgResponse(c *QQClient, resp *network.Response) (interface{} for { end := int32(math.Min(float64(i+19), float64(m.Head.GetMsgSeq()+m.Content.GetPkgNum()))) req := c.buildGetGroupMsgRequest(m.Head.GroupInfo.GetGroupCode(), int64(i), int64(end)) - data, err := c.sendAndWaitParams(uint16(req.SequenceID), c.transport.PackPacket(req), network.RequestParams{"raw": true}) + req.Params = network.Params{"raw": true} + data, err := c.callAndDecode(req, decodeGetGroupMsgResponse) if err != nil { return nil, errors.Wrap(err, "build fragmented message error") } diff --git a/client/guild.go b/client/guild.go index 8224e5a5..1b6cf13d 100644 --- a/client/guild.go +++ b/client/guild.go @@ -187,7 +187,7 @@ func (s *GuildService) GetUserProfile(tinyId uint64) (*GuildUserProfile, error) 3: tinyId, 4: uint32(0), }) - rsp, err := s.c.commandCall("OidbSvcTrpcTcp.0xfc9_1", payload) + rsp, err := s.c.uniCall("OidbSvcTrpcTcp.0xfc9_1", payload) if err != nil { return nil, errors.Wrap(err, "send packet error") } @@ -224,7 +224,7 @@ func (s *GuildService) FetchGuildMemberListWithRole(guildId, channelId uint64, s m[13] = param } m[14] = roleIdIndex - rsp, err := s.c.commandCall("OidbSvcTrpcTcp.0xf5b_1", s.c.packOIDBPackageDynamically(3931, 1, m)) + rsp, err := s.c.uniCall("OidbSvcTrpcTcp.0xf5b_1", s.c.packOIDBPackageDynamically(3931, 1, m)) if err != nil { return nil, errors.Wrap(err, "send packet error") } @@ -277,7 +277,7 @@ func (s *GuildService) FetchGuildMemberProfileInfo(guildId, tinyId uint64) (*Gui 3: tinyId, 4: guildId, }) - rsp, err := s.c.commandCall("OidbSvcTrpcTcp.0xf88_1", payload) + rsp, err := s.c.uniCall("OidbSvcTrpcTcp.0xf88_1", payload) if err != nil { return nil, errors.Wrap(err, "send packet error") } diff --git a/client/guild_msg.go b/client/guild_msg.go index 13b401ca..bb193ee5 100644 --- a/client/guild_msg.go +++ b/client/guild_msg.go @@ -64,7 +64,7 @@ func (s *GuildService) SendGuildChannelMessage(guildId, channelId uint64, m *mes }, }} payload, _ := proto.Marshal(req) - rsp, err := s.c.commandCall("MsgProxy.SendMsg", payload) + rsp, err := s.c.uniCall("MsgProxy.SendMsg", payload) if err != nil { return nil, errors.Wrap(err, "send packet error") } @@ -210,7 +210,7 @@ func (s *GuildService) pullChannelMessages(guildId, channelId, beginSeq, endSeq, WithVersionFlag: &withVersionFlag, DirectMessageFlag: &directFlag, }) - rsp, err := s.c.commandCall("trpc.group_pro.synclogic.SyncLogic.GetChannelMsg", payload) + rsp, err := s.c.uniCall("trpc.group_pro.synclogic.SyncLogic.GetChannelMsg", payload) if err != nil { return nil, errors.Wrap(err, "send packet error") } diff --git a/client/handler_map_gen.go b/client/handler_map_gen.go deleted file mode 100644 index d88a52fb..00000000 --- a/client/handler_map_gen.go +++ /dev/null @@ -1,387 +0,0 @@ -// Code generated by syncmap; DO NOT EDIT. - -// Copyright 2016 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package client - -import ( - "sync" - "sync/atomic" - "unsafe" -) - -// Map is like a Go map[interface{}]interface{} but is safe for concurrent use -// by multiple goroutines without additional locking or coordination. -// Loads, stores, and deletes run in amortized constant time. -// -// The Map type is specialized. Most code should use a plain Go map instead, -// with separate locking or coordination, for better type safety and to make it -// easier to maintain other invariants along with the map content. -// -// The Map type is optimized for two common use cases: (1) when the entry for a given -// key is only ever written once but read many times, as in caches that only grow, -// or (2) when multiple goroutines read, write, and overwrite entries for disjoint -// sets of keys. In these two cases, use of a Map may significantly reduce lock -// contention compared to a Go map paired with a separate Mutex or RWMutex. -// -// The zero Map is empty and ready for use. A Map must not be copied after first use. -type HandlerMap struct { - mu sync.Mutex - - // read contains the portion of the map's contents that are safe for - // concurrent access (with or without mu held). - // - // The read field itself is always safe to load, but must only be stored with - // mu held. - // - // Entries stored in read may be updated concurrently without mu, but updating - // a previously-expunged entry requires that the entry be copied to the dirty - // map and unexpunged with mu held. - read atomic.Value // readOnly - - // dirty contains the portion of the map's contents that require mu to be - // held. To ensure that the dirty map can be promoted to the read map quickly, - // it also includes all of the non-expunged entries in the read map. - // - // Expunged entries are not stored in the dirty map. An expunged entry in the - // clean map must be unexpunged and added to the dirty map before a new value - // can be stored to it. - // - // If the dirty map is nil, the next write to the map will initialize it by - // making a shallow copy of the clean map, omitting stale entries. - dirty map[uint16]*entryHandlerMap - - // misses counts the number of loads since the read map was last updated that - // needed to lock mu to determine whether the key was present. - // - // Once enough misses have occurred to cover the cost of copying the dirty - // map, the dirty map will be promoted to the read map (in the unamended - // state) and the next store to the map will make a new dirty copy. - misses int -} - -// readOnly is an immutable struct stored atomically in the Map.read field. -type readOnlyHandlerMap struct { - m map[uint16]*entryHandlerMap - amended bool // true if the dirty map contains some key not in m. -} - -// expunged is an arbitrary pointer that marks entries which have been deleted -// from the dirty map. -var expungedHandlerMap = unsafe.Pointer(new(*handlerInfo)) - -// An entry is a slot in the map corresponding to a particular key. -type entryHandlerMap struct { - // p points to the interface{} value stored for the entry. - // - // If p == nil, the entry has been deleted and m.dirty == nil. - // - // If p == expunged, the entry has been deleted, m.dirty != nil, and the entry - // is missing from m.dirty. - // - // Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty - // != nil, in m.dirty[key]. - // - // An entry can be deleted by atomic replacement with nil: when m.dirty is - // next created, it will atomically replace nil with expunged and leave - // m.dirty[key] unset. - // - // An entry's associated value can be updated by atomic replacement, provided - // p != expunged. If p == expunged, an entry's associated value can be updated - // only after first setting m.dirty[key] = e so that lookups using the dirty - // map find the entry. - p unsafe.Pointer // *interface{} -} - -func newEntryHandlerMap(i *handlerInfo) *entryHandlerMap { - return &entryHandlerMap{p: unsafe.Pointer(&i)} -} - -// Load returns the value stored in the map for a key, or nil if no -// value is present. -// The ok result indicates whether value was found in the map. -func (m *HandlerMap) Load(key uint16) (value *handlerInfo, ok bool) { - read, _ := m.read.Load().(readOnlyHandlerMap) - e, ok := read.m[key] - if !ok && read.amended { - m.mu.Lock() - // Avoid reporting a spurious miss if m.dirty got promoted while we were - // blocked on m.mu. (If further loads of the same key will not miss, it's - // not worth copying the dirty map for this key.) - read, _ = m.read.Load().(readOnlyHandlerMap) - e, ok = read.m[key] - if !ok && read.amended { - e, ok = m.dirty[key] - // Regardless of whether the entry was present, record a miss: this key - // will take the slow path until the dirty map is promoted to the read - // map. - m.missLocked() - } - m.mu.Unlock() - } - if !ok { - return value, false - } - return e.load() -} - -func (e *entryHandlerMap) load() (value *handlerInfo, ok bool) { - p := atomic.LoadPointer(&e.p) - if p == nil || p == expungedHandlerMap { - return value, false - } - return *(**handlerInfo)(p), true -} - -// Store sets the value for a key. -func (m *HandlerMap) Store(key uint16, value *handlerInfo) { - read, _ := m.read.Load().(readOnlyHandlerMap) - if e, ok := read.m[key]; ok && e.tryStore(&value) { - return - } - - m.mu.Lock() - read, _ = m.read.Load().(readOnlyHandlerMap) - if e, ok := read.m[key]; ok { - if e.unexpungeLocked() { - // The entry was previously expunged, which implies that there is a - // non-nil dirty map and this entry is not in it. - m.dirty[key] = e - } - e.storeLocked(&value) - } else if e, ok := m.dirty[key]; ok { - e.storeLocked(&value) - } else { - if !read.amended { - // We're adding the first new key to the dirty map. - // Make sure it is allocated and mark the read-only map as incomplete. - m.dirtyLocked() - m.read.Store(readOnlyHandlerMap{m: read.m, amended: true}) - } - m.dirty[key] = newEntryHandlerMap(value) - } - m.mu.Unlock() -} - -// tryStore stores a value if the entry has not been expunged. -// -// If the entry is expunged, tryStore returns false and leaves the entry -// unchanged. -func (e *entryHandlerMap) tryStore(i **handlerInfo) bool { - for { - p := atomic.LoadPointer(&e.p) - if p == expungedHandlerMap { - return false - } - if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { - return true - } - } -} - -// unexpungeLocked ensures that the entry is not marked as expunged. -// -// If the entry was previously expunged, it must be added to the dirty map -// before m.mu is unlocked. -func (e *entryHandlerMap) unexpungeLocked() (wasExpunged bool) { - return atomic.CompareAndSwapPointer(&e.p, expungedHandlerMap, nil) -} - -// storeLocked unconditionally stores a value to the entry. -// -// The entry must be known not to be expunged. -func (e *entryHandlerMap) storeLocked(i **handlerInfo) { - atomic.StorePointer(&e.p, unsafe.Pointer(i)) -} - -// LoadOrStore returns the existing value for the key if present. -// Otherwise, it stores and returns the given value. -// The loaded result is true if the value was loaded, false if stored. -func (m *HandlerMap) LoadOrStore(key uint16, value *handlerInfo) (actual *handlerInfo, loaded bool) { - // Avoid locking if it's a clean hit. - read, _ := m.read.Load().(readOnlyHandlerMap) - if e, ok := read.m[key]; ok { - actual, loaded, ok := e.tryLoadOrStore(value) - if ok { - return actual, loaded - } - } - - m.mu.Lock() - read, _ = m.read.Load().(readOnlyHandlerMap) - if e, ok := read.m[key]; ok { - if e.unexpungeLocked() { - m.dirty[key] = e - } - actual, loaded, _ = e.tryLoadOrStore(value) - } else if e, ok := m.dirty[key]; ok { - actual, loaded, _ = e.tryLoadOrStore(value) - m.missLocked() - } else { - if !read.amended { - // We're adding the first new key to the dirty map. - // Make sure it is allocated and mark the read-only map as incomplete. - m.dirtyLocked() - m.read.Store(readOnlyHandlerMap{m: read.m, amended: true}) - } - m.dirty[key] = newEntryHandlerMap(value) - actual, loaded = value, false - } - m.mu.Unlock() - - return actual, loaded -} - -// tryLoadOrStore atomically loads or stores a value if the entry is not -// expunged. -// -// If the entry is expunged, tryLoadOrStore leaves the entry unchanged and -// returns with ok==false. -func (e *entryHandlerMap) tryLoadOrStore(i *handlerInfo) (actual *handlerInfo, loaded, ok bool) { - p := atomic.LoadPointer(&e.p) - if p == expungedHandlerMap { - return actual, false, false - } - if p != nil { - return *(**handlerInfo)(p), true, true - } - - // Copy the interface after the first load to make this method more amenable - // to escape analysis: if we hit the "load" path or the entry is expunged, we - // shouldn't bother heap-allocating. - ic := i - for { - if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) { - return i, false, true - } - p = atomic.LoadPointer(&e.p) - if p == expungedHandlerMap { - return actual, false, false - } - if p != nil { - return *(**handlerInfo)(p), true, true - } - } -} - -// LoadAndDelete deletes the value for a key, returning the previous value if any. -// The loaded result reports whether the key was present. -func (m *HandlerMap) LoadAndDelete(key uint16) (value *handlerInfo, loaded bool) { - read, _ := m.read.Load().(readOnlyHandlerMap) - e, ok := read.m[key] - if !ok && read.amended { - m.mu.Lock() - read, _ = m.read.Load().(readOnlyHandlerMap) - e, ok = read.m[key] - if !ok && read.amended { - e, ok = m.dirty[key] - delete(m.dirty, key) - // Regardless of whether the entry was present, record a miss: this key - // will take the slow path until the dirty map is promoted to the read - // map. - m.missLocked() - } - m.mu.Unlock() - } - if ok { - return e.delete() - } - return value, false -} - -// Delete deletes the value for a key. -func (m *HandlerMap) Delete(key uint16) { - m.LoadAndDelete(key) -} - -func (e *entryHandlerMap) delete() (value *handlerInfo, ok bool) { - for { - p := atomic.LoadPointer(&e.p) - if p == nil || p == expungedHandlerMap { - return value, false - } - if atomic.CompareAndSwapPointer(&e.p, p, nil) { - return *(**handlerInfo)(p), true - } - } -} - -// Range calls f sequentially for each key and value present in the map. -// If f returns false, range stops the iteration. -// -// Range does not necessarily correspond to any consistent snapshot of the Map's -// contents: no key will be visited more than once, but if the value for any key -// is stored or deleted concurrently, Range may reflect any mapping for that key -// from any point during the Range call. -// -// Range may be O(N) with the number of elements in the map even if f returns -// false after a constant number of calls. -func (m *HandlerMap) Range(f func(key uint16, value *handlerInfo) bool) { - // We need to be able to iterate over all of the keys that were already - // present at the start of the call to Range. - // If read.amended is false, then read.m satisfies that property without - // requiring us to hold m.mu for a long time. - read, _ := m.read.Load().(readOnlyHandlerMap) - if read.amended { - // m.dirty contains keys not in read.m. Fortunately, Range is already O(N) - // (assuming the caller does not break out early), so a call to Range - // amortizes an entire copy of the map: we can promote the dirty copy - // immediately! - m.mu.Lock() - read, _ = m.read.Load().(readOnlyHandlerMap) - if read.amended { - read = readOnlyHandlerMap{m: m.dirty} - m.read.Store(read) - m.dirty = nil - m.misses = 0 - } - m.mu.Unlock() - } - - for k, e := range read.m { - v, ok := e.load() - if !ok { - continue - } - if !f(k, v) { - break - } - } -} - -func (m *HandlerMap) missLocked() { - m.misses++ - if m.misses < len(m.dirty) { - return - } - m.read.Store(readOnlyHandlerMap{m: m.dirty}) - m.dirty = nil - m.misses = 0 -} - -func (m *HandlerMap) dirtyLocked() { - if m.dirty != nil { - return - } - - read, _ := m.read.Load().(readOnlyHandlerMap) - m.dirty = make(map[uint16]*entryHandlerMap, len(read.m)) - for k, e := range read.m { - if !e.tryExpungeLocked() { - m.dirty[k] = e - } - } -} - -func (e *entryHandlerMap) tryExpungeLocked() (isExpunged bool) { - p := atomic.LoadPointer(&e.p) - for p == nil { - if atomic.CompareAndSwapPointer(&e.p, nil, expungedHandlerMap) { - return true - } - p = atomic.LoadPointer(&e.p) - } - return p == expungedHandlerMap -} diff --git a/client/internal/network/request.go b/client/internal/network/request.go index b34930f5..c176613c 100644 --- a/client/internal/network/request.go +++ b/client/internal/network/request.go @@ -24,4 +24,6 @@ type Request struct { Uin int64 CommandName string Body []byte + + Params Params } diff --git a/client/internal/network/response.go b/client/internal/network/response.go index f92e7f27..c51c0cdc 100644 --- a/client/internal/network/response.go +++ b/client/internal/network/response.go @@ -13,7 +13,7 @@ type Response struct { CommandName string Body []byte - Params RequestParams + Params Params // Request is the original request that obtained this response. // Request *Request } diff --git a/client/internal/network/rpc.go b/client/internal/network/rpc.go index 81747087..09d6fba4 100644 --- a/client/internal/network/rpc.go +++ b/client/internal/network/rpc.go @@ -6,13 +6,12 @@ type Call struct { Request *Request Response *Response Err error - Params RequestParams Done chan *Call } -type RequestParams map[string]interface{} +type Params map[string]interface{} -func (p RequestParams) Bool(k string) bool { +func (p Params) Bool(k string) bool { if p == nil { return false } @@ -23,7 +22,7 @@ func (p RequestParams) Bool(k string) bool { return i.(bool) } -func (p RequestParams) Int32(k string) int32 { +func (p Params) Int32(k string) int32 { if p == nil { return 0 } diff --git a/client/internal/network/transport.go b/client/internal/network/transport.go index dd79a8d9..06c06e45 100644 --- a/client/internal/network/transport.go +++ b/client/internal/network/transport.go @@ -2,7 +2,6 @@ package network import ( "strconv" - "sync" "github.com/Mrs4s/MiraiGo/binary" "github.com/Mrs4s/MiraiGo/client/internal/auth" @@ -10,10 +9,10 @@ import ( // Transport is a network transport. type Transport struct { - sessionMu sync.Mutex - Sig *auth.SigInfo - Version *auth.AppVersion - Device *auth.Device + // sessionMu sync.Mutex + Sig *auth.SigInfo + Version *auth.AppVersion + Device *auth.Device // connection // conn *TCPListener diff --git a/client/network.go b/client/network.go index 83b9d36f..ed01ad95 100644 --- a/client/network.go +++ b/client/network.go @@ -192,48 +192,6 @@ func (c *QQClient) callAndDecode(req *network.Request, decoder func(*QQClient, * return decoder(c, resp) } -func (c *QQClient) sendAndWait(seq uint16, pkt []byte) (interface{}, error) { - return c.sendAndWaitParams(seq, pkt, nil) -} - -// sendAndWait 向服务器发送一个数据包, 并等待返回 -func (c *QQClient) sendAndWaitParams(seq uint16, pkt []byte, params network.RequestParams) (interface{}, error) { - type T struct { - Response interface{} - Error error - } - ch := make(chan T, 1) - - c.handlers.Store(seq, &handlerInfo{fun: func(i interface{}, err error) { - ch <- T{ - Response: i, - Error: err, - } - }, params: params, dynamic: false}) - - err := c.sendPacket(pkt) - if err != nil { - c.handlers.Delete(seq) - return nil, err - } - - retry := 0 - for { - select { - case rsp := <-ch: - return rsp.Response, rsp.Error - case <-time.After(time.Second * 15): - retry++ - if retry < 2 { - _ = c.sendPacket(pkt) - continue - } - c.handlers.Delete(seq) - return nil, errors.New("Packet timed out") - } - } -} - // sendPacket 向服务器发送一个数据包 func (c *QQClient) sendPacket(pkt []byte) error { err := c.TCP.Write(pkt) @@ -274,25 +232,6 @@ func (c *QQClient) waitPacketTimeoutSyncF(cmd string, timeout time.Duration, fil } } -// sendAndWaitDynamic -// 发送数据包并返回需要解析的 response -func (c *QQClient) sendAndWaitDynamic(seq uint16, pkt []byte) ([]byte, error) { - ch := make(chan []byte, 1) - c.handlers.Store(seq, &handlerInfo{fun: func(i interface{}, err error) { ch <- i.([]byte) }, dynamic: true}) - err := c.sendPacket(pkt) - if err != nil { - c.handlers.Delete(seq) - return nil, err - } - select { - case rsp := <-ch: - return rsp, nil - case <-time.After(time.Second * 15): - c.handlers.Delete(seq) - return nil, errors.New("Packet timed out") - } -} - // plannedDisconnect 计划中断线事件 func (c *QQClient) plannedDisconnect(_ *network.TCPListener) { c.Debug("planned disconnect.") @@ -380,43 +319,35 @@ func (c *QQClient) netLoop() { SequenceID: req.SequenceID, CommandName: req.CommandName, Body: req.Body, + Params: call.Request.Params, // Request: nil, } } c.pendingMu.Unlock() - if call != nil { + if call != nil && call.Request.CommandName == req.CommandName { select { case call.Done <- call: default: + // we don't want blocking } + return } if decoder, ok := decoders[req.CommandName]; ok { // found predefined decoder - info, ok := c.handlers.LoadAndDelete(uint16(req.SequenceID)) - var decoded interface{} - decoded = req.Body - if info == nil || !info.dynamic { - resp := network.Response{ - SequenceID: req.SequenceID, - CommandName: req.CommandName, - Params: info.getParams(), - Body: req.Body, - // Request: nil, - } - decoded, err = decoder(c, &resp) - if err != nil { - c.Debug("decode req %v error: %+v", req.CommandName, err) - } + resp := network.Response{ + SequenceID: req.SequenceID, + CommandName: req.CommandName, + Body: req.Body, + // Request: nil, } - if ok { - info.fun(decoded, err) - } else if f, ok := c.waiters.Load(req.CommandName); ok { // 在不存在handler的情况下触发wait + decoded, err := decoder(c, &resp) + if err != nil { + c.Debug("decode req %v error: %+v", req.CommandName, err) + } + if f, ok := c.waiters.Load(req.CommandName); ok { // 在不存在handler的情况下触发wait f.(func(interface{}, error))(decoded, err) } - } else if f, ok := c.handlers.LoadAndDelete(uint16(req.SequenceID)); ok { - // does not need decoder - f.fun(req.Body, nil) } else { c.Debug("Unhandled Command: %s\nSeq: %d\nThis message can be ignored.", req.CommandName, req.SequenceID) } diff --git a/client/packet.go b/client/packet.go index d2a47ddb..4b30ef34 100644 --- a/client/packet.go +++ b/client/packet.go @@ -29,7 +29,8 @@ func (c *QQClient) uniRequest(command string, body []byte) *network.Request { } } -func (c *QQClient) commandCall(command string, body []byte) (*network.Response, error) { +//go:noinline +func (c *QQClient) uniCall(command string, body []byte) (*network.Response, error) { seq := c.nextSeq() req := network.Request{ Type: network.RequestTypeSimple, @@ -42,23 +43,6 @@ func (c *QQClient) commandCall(command string, body []byte) (*network.Response, return c.call(&req) } -func (c *QQClient) commandCallAndDecode(command string, body []byte, decode func(*QQClient, *network.Response) (interface{}, error)) (interface{}, error) { - seq := c.nextSeq() - req := network.Request{ - Type: network.RequestTypeSimple, - EncryptType: network.EncryptTypeD2Key, - Uin: c.Uin, - SequenceID: int32(seq), - CommandName: command, - Body: body, - } - resp, err := c.call(&req) - if err != nil { - return nil, err - } - return decode(c, resp) -} - //go:noinline func (c *QQClient) uniPacketWithSeq(seq uint16, command string, body []byte) *network.Request { req := network.Request{ diff --git a/message/elements.go b/message/elements.go index 09e48915..28893875 100644 --- a/message/elements.go +++ b/message/elements.go @@ -106,8 +106,10 @@ type AnimatedSticker struct { Name string } -type RedBagMessageType int -type AtType int +type ( + RedBagMessageType int + AtType int +) // /com/tencent/mobileqq/data/MessageForQQWalletMsg.java const ( diff --git a/message/message.go b/message/message.go index c585c009..5fc996d9 100644 --- a/message/message.go +++ b/message/message.go @@ -776,7 +776,7 @@ func splitPlainMessage(content string) []IMessageElement { } } if last != len(content) { - splittedMessage = append(splittedMessage, NewText(content[last:len(content)])) + splittedMessage = append(splittedMessage, NewText(content[last:])) } return splittedMessage