From ca8d28f913bee86edb377c37100fe0edf22fd5b1 Mon Sep 17 00:00:00 2001 From: wdvxdr Date: Sat, 10 Jul 2021 23:29:42 +0800 Subject: [PATCH] clean ws client push event. --- coolq/bot.go | 9 +++++---- server/http.go | 14 +++++++------- server/websocket.go | 44 +++++++++++++++++--------------------------- 3 files changed, 29 insertions(+), 38 deletions(-) diff --git a/coolq/bot.go b/coolq/bot.go index 6cd9620..0c4b9ef 100644 --- a/coolq/bot.go +++ b/coolq/bot.go @@ -45,6 +45,7 @@ type CQBot struct { // MSG 消息Map type MSG map[string]interface{} +// Event 事件 type Event struct { RawMsg MSG @@ -59,15 +60,15 @@ func (e *Event) marshal() { _ = json.NewEncoder(e.buffer).Encode(e.RawMsg) } -// JsonBytes return byes of json by lazy marshalling. -func (e *Event) JsonBytes() []byte { +// JSONBytes return byes of json by lazy marshalling. +func (e *Event) JSONBytes() []byte { e.once.Do(e.marshal) return e.buffer.Bytes() } -// JsonString return string of json without extra allocation +// JSONString return string of json without extra allocation // by lazy marshalling. -func (e *Event) JsonString() string { +func (e *Event) JSONString() string { e.once.Do(e.marshal) return utils.B2S(e.buffer.Bytes()) } diff --git a/server/http.go b/server/http.go index e663632..7f43938 100644 --- a/server/http.go +++ b/server/http.go @@ -197,20 +197,20 @@ func (c *HTTPClient) onBotPushEvent(e *coolq.Event) { var res string if c.filter != "" { filter := findFilter(c.filter) - if filter != nil && !filter.Eval(gjson.Parse(e.JsonString())) { - log.Debugf("上报Event %v 到 HTTP 服务器 %s 时被过滤.", c.addr, e.JsonBytes()) + if filter != nil && !filter.Eval(gjson.Parse(e.JSONString())) { + log.Debugf("上报Event %v 到 HTTP 服务器 %s 时被过滤.", c.addr, e.JSONBytes()) return } } - err := gout.POST(c.addr).SetJSON(e.JsonBytes()).BindBody(&res).SetHeader(func() gout.H { + err := gout.POST(c.addr).SetJSON(e.JSONBytes()).BindBody(&res).SetHeader(func() gout.H { h := gout.H{ "X-Self-ID": c.bot.Client.Uin, "User-Agent": "CQHttp/4.15.0", } if c.secret != "" { mac := hmac.New(sha1.New, []byte(c.secret)) - _, err := mac.Write(e.JsonBytes()) + _, err := mac.Write(e.JSONBytes()) if err != nil { log.Error(err) return nil @@ -228,12 +228,12 @@ func (c *HTTPClient) onBotPushEvent(e *coolq.Event) { return nil }).Do() if err != nil { - log.Warnf("上报Event数据 %s 到 %v 失败: %v", e.JsonBytes(), c.addr, err) + log.Warnf("上报Event数据 %s 到 %v 失败: %v", e.JSONBytes(), c.addr, err) return } - log.Debugf("上报Event数据 %s 到 %v", e.JsonBytes(), c.addr) + log.Debugf("上报Event数据 %s 到 %v", e.JSONBytes(), c.addr) if gjson.Valid(res) { - c.bot.CQHandleQuickOperation(gjson.Parse(e.JsonString()), gjson.Parse(res)) + c.bot.CQHandleQuickOperation(gjson.Parse(e.JSONString()), gjson.Parse(res)) } } diff --git a/server/websocket.go b/server/websocket.go index ff7c8c3..9e3feaa 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -230,39 +230,29 @@ func (c *websocketClient) listenAPI(conn *webSocketConn, u bool) { func (c *websocketClient) onBotPushEvent(e *coolq.Event) { filter := findFilter(c.filter) - if filter != nil && !filter.Eval(gjson.Parse(e.JsonString())) { - log.Debugf("上报Event %s 到 WS客户端 时被过滤.", e.JsonBytes()) + if filter != nil && !filter.Eval(gjson.Parse(e.JSONString())) { + log.Debugf("上报Event %s 到 WS服务器 时被过滤.", e.JSONBytes()) return } - if c.eventConn != nil { - log.Debugf("向WS服务器 %v 推送Event: %s", c.eventConn.RemoteAddr().String(), e.JsonBytes()) - conn := c.eventConn + push := func(conn *webSocketConn, reconnect func()) { + log.Debugf("向WS服务器 %v 推送Event: %s", conn.RemoteAddr().String(), e.JSONBytes()) conn.Lock() defer conn.Unlock() - _ = c.eventConn.SetWriteDeadline(time.Now().Add(time.Second * 15)) - if err := c.eventConn.WriteMessage(websocket.TextMessage, e.JsonBytes()); err != nil { - log.Warnf("向WS服务器 %v 推送Event时出现错误: %v", c.eventConn.RemoteAddr().String(), err) - _ = c.eventConn.Close() + _ = conn.SetWriteDeadline(time.Now().Add(time.Second * 15)) + if err := conn.WriteMessage(websocket.TextMessage, e.JSONBytes()); err != nil { + log.Warnf("向WS服务器 %v 推送Event时出现错误: %v", conn.RemoteAddr().String(), err) + _ = conn.Close() if c.conf.ReconnectInterval != 0 { time.Sleep(time.Millisecond * time.Duration(c.conf.ReconnectInterval)) - c.connectEvent() + reconnect() } } } + if c.eventConn != nil { + push(c.eventConn, c.connectEvent) + } if c.universalConn != nil { - log.Debugf("向WS服务器 %v 推送Event: %s", c.universalConn.RemoteAddr().String(), e.JsonBytes()) - conn := c.universalConn - conn.Lock() - defer conn.Unlock() - _ = c.universalConn.SetWriteDeadline(time.Now().Add(time.Second * 15)) - if err := c.universalConn.WriteMessage(websocket.TextMessage, e.JsonBytes()); err != nil { - log.Warnf("向WS服务器 %v 推送Event时出现错误: %v", c.universalConn.RemoteAddr().String(), err) - _ = c.universalConn.Close() - if c.conf.ReconnectInterval != 0 { - time.Sleep(time.Millisecond * time.Duration(c.conf.ReconnectInterval)) - c.connectUniversal() - } - } + push(c.universalConn, c.connectUniversal) } } @@ -391,16 +381,16 @@ func (s *webSocketServer) onBotPushEvent(e *coolq.Event) { defer s.eventConnMutex.Unlock() filter := findFilter(s.filter) - if filter != nil && !filter.Eval(gjson.Parse(e.JsonString())) { - log.Debugf("上报Event %s 到 WS客户端 时被过滤.", e.JsonBytes()) + if filter != nil && !filter.Eval(gjson.Parse(e.JSONString())) { + log.Debugf("上报Event %s 到 WS客户端 时被过滤.", e.JSONBytes()) return } for i, l := 0, len(s.eventConn); i < l; i++ { conn := s.eventConn[i] - log.Debugf("向WS客户端 %v 推送Event: %s", conn.RemoteAddr().String(), e.JsonBytes()) + log.Debugf("向WS客户端 %v 推送Event: %s", conn.RemoteAddr().String(), e.JSONBytes()) conn.Lock() - if err := conn.WriteMessage(websocket.TextMessage, e.JsonBytes()); err != nil { + if err := conn.WriteMessage(websocket.TextMessage, e.JSONBytes()); err != nil { _ = conn.Close() next := i + 1 if next >= l {