From bf09ce7268f00653597e7e9d16af9fecca616a48 Mon Sep 17 00:00:00 2001 From: wdvxdr Date: Wed, 17 Mar 2021 19:52:08 +0800 Subject: [PATCH] only marshal once --- coolq/bot.go | 15 +++++++++++---- server/http.go | 14 ++++++++------ server/websocket.go | 18 ++++++++++-------- 3 files changed, 29 insertions(+), 18 deletions(-) diff --git a/coolq/bot.go b/coolq/bot.go index 262c596..076f3fd 100644 --- a/coolq/bot.go +++ b/coolq/bot.go @@ -30,7 +30,7 @@ var json = jsoniter.ConfigCompatibleWithStandardLibrary type CQBot struct { Client *client.QQClient - events []func(MSG) + events []func(*bytes.Buffer) db *leveldb.DB friendReqCache sync.Map tempMsgCache sync.Map @@ -109,7 +109,7 @@ func NewQQBot(cli *client.QQClient, conf *global.JSONConfig) *CQBot { } // OnEventPush 注册事件上报函数 -func (bot *CQBot) OnEventPush(f func(m MSG)) { +func (bot *CQBot) OnEventPush(f func(buf *bytes.Buffer)) { bot.events = append(bot.events, f) } @@ -432,21 +432,28 @@ func (bot *CQBot) dispatchEventMessage(m MSG) { log.Debug("Event filtered!") return } + buf := global.NewBuffer() + wg := sync.WaitGroup{} + wg.Add(len(bot.events)) + _ = json.NewEncoder(buf).Encode(m) for _, f := range bot.events { - go func(fn func(MSG)) { + go func(fn func(*bytes.Buffer)) { defer func() { + wg.Done() if pan := recover(); pan != nil { log.Warnf("处理事件 %v 时出现错误: %v \n%s", m, pan, debug.Stack()) } }() start := time.Now() - fn(m) + fn(buf) end := time.Now() if end.Sub(start) > time.Second*5 { log.Debugf("警告: 事件处理耗时超过 5 秒 (%v), 请检查应用是否有堵塞.", end.Sub(start)) } }(f) } + wg.Wait() + global.PutBuffer(buf) } func (bot *CQBot) formatGroupMessage(m *message.GroupMessage) MSG { diff --git a/server/http.go b/server/http.go index 72be847..8dedd10 100644 --- a/server/http.go +++ b/server/http.go @@ -1,6 +1,7 @@ package server import ( + "bytes" "context" "crypto/hmac" "crypto/sha1" @@ -13,6 +14,7 @@ import ( "github.com/Mrs4s/go-cqhttp/coolq" "github.com/Mrs4s/go-cqhttp/global" + "github.com/Mrs4s/MiraiGo/utils" "github.com/gin-gonic/gin" "github.com/guonaihong/gout" "github.com/guonaihong/gout/dataflow" @@ -121,16 +123,16 @@ func (c *httpClient) Run(addr, secret string, timeout int32, bot *coolq.CQBot) { log.Infof("HTTP POST上报器已启动: %v", addr) } -func (c *httpClient) onBotPushEvent(m coolq.MSG) { +func (c *httpClient) onBotPushEvent(m *bytes.Buffer) { var res string - err := gout.POST(c.addr).SetJSON(m).BindBody(&res).SetHeader(func() gout.H { + err := gout.POST(c.addr).SetJSON(m.Bytes()).BindBody(&res).SetHeader(func() gout.H { h := gout.H{ "X-Self-ID": c.bot.Client.Uin, "User-Agent": "CQHttp/4.15.0", } if c.secret != "" { mac := hmac.New(sha1.New, []byte(c.secret)) - _, err := mac.Write([]byte(m.ToJSON())) + _, err := mac.Write(m.Bytes()) if err != nil { log.Error(err) return nil @@ -148,12 +150,12 @@ func (c *httpClient) onBotPushEvent(m coolq.MSG) { return nil }).Do() if err != nil { - log.Warnf("上报Event数据 %v 到 %v 失败: %v", m.ToJSON(), c.addr, err) + log.Warnf("上报Event数据 %v 到 %v 失败: %v", utils.B2S(m.Bytes()), c.addr, err) return } - log.Debugf("上报Event数据 %v 到 %v", m.ToJSON(), c.addr) + log.Debugf("上报Event数据 %v 到 %v", utils.B2S(m.Bytes()), c.addr) if gjson.Valid(res) { - c.bot.CQHandleQuickOperation(gjson.Parse(m.ToJSON()), gjson.Parse(res)) + c.bot.CQHandleQuickOperation(gjson.Parse(utils.B2S(m.Bytes())), gjson.Parse(res)) } } diff --git a/server/websocket.go b/server/websocket.go index 7c1ab89..f22824e 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -1,6 +1,7 @@ package server import ( + "bytes" "context" "fmt" "net/http" @@ -13,6 +14,7 @@ import ( "github.com/Mrs4s/go-cqhttp/coolq" "github.com/Mrs4s/go-cqhttp/global" + "github.com/Mrs4s/MiraiGo/utils" "github.com/gorilla/websocket" log "github.com/sirupsen/logrus" "github.com/tidwall/gjson" @@ -193,14 +195,14 @@ func (c *WebSocketClient) listenAPI(conn *webSocketConn, u bool) { } } -func (c *WebSocketClient) onBotPushEvent(m coolq.MSG) { +func (c *WebSocketClient) onBotPushEvent(m *bytes.Buffer) { if c.eventConn != nil { - log.Debugf("向WS服务器 %v 推送Event: %v", c.eventConn.RemoteAddr().String(), m.ToJSON()) + log.Debugf("向WS服务器 %v 推送Event: %v", c.eventConn.RemoteAddr().String(), utils.B2S(m.Bytes())) conn := c.eventConn conn.Lock() defer conn.Unlock() _ = c.eventConn.SetWriteDeadline(time.Now().Add(time.Second * 15)) - if err := c.eventConn.WriteJSON(m); err != nil { + if err := c.eventConn.WriteMessage(websocket.TextMessage, m.Bytes()); err != nil { log.Warnf("向WS服务器 %v 推送Event时出现错误: %v", c.eventConn.RemoteAddr().String(), err) _ = c.eventConn.Close() if c.conf.ReverseReconnectInterval != 0 { @@ -210,12 +212,12 @@ func (c *WebSocketClient) onBotPushEvent(m coolq.MSG) { } } if c.universalConn != nil { - log.Debugf("向WS服务器 %v 推送Event: %v", c.universalConn.RemoteAddr().String(), m.ToJSON()) + log.Debugf("向WS服务器 %v 推送Event: %v", c.universalConn.RemoteAddr().String(), utils.B2S(m.Bytes())) conn := c.universalConn conn.Lock() defer conn.Unlock() _ = c.universalConn.SetWriteDeadline(time.Now().Add(time.Second * 15)) - if err := c.universalConn.WriteJSON(m); err != nil { + if err := c.universalConn.WriteMessage(websocket.TextMessage, m.Bytes()); err != nil { log.Warnf("向WS服务器 %v 推送Event时出现错误: %v", c.universalConn.RemoteAddr().String(), err) _ = c.universalConn.Close() if c.conf.ReverseReconnectInterval != 0 { @@ -338,14 +340,14 @@ func (c *webSocketConn) handleRequest(_ *coolq.CQBot, payload []byte) { _ = c.WriteJSON(ret) } -func (s *webSocketServer) onBotPushEvent(m coolq.MSG) { +func (s *webSocketServer) onBotPushEvent(m *bytes.Buffer) { s.eventConnMutex.Lock() defer s.eventConnMutex.Unlock() for i, l := 0, len(s.eventConn); i < l; i++ { conn := s.eventConn[i] - log.Debugf("向WS客户端 %v 推送Event: %v", conn.RemoteAddr().String(), m.ToJSON()) + log.Debugf("向WS客户端 %v 推送Event: %v", conn.RemoteAddr().String(), utils.B2S(m.Bytes())) conn.Lock() - if err := conn.WriteMessage(websocket.TextMessage, []byte(m.ToJSON())); err != nil { + if err := conn.WriteMessage(websocket.TextMessage, m.Bytes()); err != nil { _ = conn.Close() next := i + 1 if next >= l {