From c500bfc55c732b3fa36c9abdb8430c05db31ed1d Mon Sep 17 00:00:00 2001 From: wfjsw Date: Fri, 21 Aug 2020 20:47:33 +0800 Subject: [PATCH] reduce websocket conn lock contention (cherry-picked from commit dac8933..83772e2) --- go.mod | 4 ++ go.sum | 20 ++++++ server/websocket.go | 145 +++++++++++++++++++++++++------------------- 3 files changed, 106 insertions(+), 63 deletions(-) diff --git a/go.mod b/go.mod index 6e33a80..b88b975 100644 --- a/go.mod +++ b/go.mod @@ -4,14 +4,18 @@ go 1.14 require ( github.com/Mrs4s/MiraiGo v0.0.0-20200821111822-80481f0022d5 + github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect github.com/gin-gonic/gin v1.6.3 github.com/gorilla/websocket v1.4.2 github.com/guonaihong/gout v0.1.1 + github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 // indirect + github.com/jonboulle/clockwork v0.2.0 // indirect github.com/lestrrat-go/file-rotatelogs v2.3.0+incompatible github.com/lestrrat-go/strftime v1.0.1 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/sirupsen/logrus v1.6.0 github.com/t-tomalak/logrus-easy-formatter v0.0.0-20190827215021-c074f06c5816 + github.com/tebeka/strftime v0.1.5 // indirect github.com/tidwall/gjson v1.6.0 github.com/xujiajun/nutsdb v0.5.0 github.com/yinghau76/go-ascii-art v0.0.0-20190517192627-e7f465a30189 diff --git a/go.sum b/go.sum index 8223b29..67b62f7 100644 --- a/go.sum +++ b/go.sum @@ -7,14 +7,18 @@ github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 h1:Ghm4eQYC0nEPnSJdVkTrXpu9KtoVCSo1hg7mtI7G9KU= +github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239/go.mod h1:Gdwt2ce0yfBxPvZrHkprdPPTTS3N5rwmLE8T22KBXlw= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= github.com/gin-gonic/gin v1.5.0/go.mod h1:Nd6IXA8m5kNZdNEHMBd93KT+mdY3+bewLgRvmCsR2Do= github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14= github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= +github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.12.1/go.mod h1:IUMDtCfWo/w/mtMfIE/IG2K+Ey3ygWanZIBtBW0W2TM= github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= @@ -41,14 +45,21 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/guonaihong/gout v0.1.1 h1:2i3eqQ1KUhTlj7AFeIHqVUFku5QwUhwE2wNgYTVpbxQ= github.com/guonaihong/gout v0.1.1/go.mod h1:vXvv5Kxr70eM5wrp4F0+t9lnLWmq+YPW2GByll2f/EA= +github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 h1:IPJ3dvxmJ4uczJe5YQdrYB16oTJlGSC/OyZDqUk9xX4= +github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag= +github.com/jonboulle/clockwork v0.2.0 h1:J2SLSdy7HgElq8ekSl2Mxh6vrRNFxqbXGenYH2I02Vs= +github.com/jonboulle/clockwork v0.2.0/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/json-iterator/go v1.1.9 h1:9yzud/Ht36ygwatGx56VwCZtlI/2AD15T1X2sjSuGns= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8= @@ -56,6 +67,7 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxv github.com/leodido/go-urn v1.1.0/go.mod h1:+cyI34gQWZcE1eQU7NVgKkkzdXDQHr1dBMtdAPozLkw= github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= +github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8= github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is= github.com/lestrrat-go/file-rotatelogs v2.3.0+incompatible h1:4mNlp+/SvALIPFpbXV3kxNJJno9iKFWGxSDE13Kl66Q= github.com/lestrrat-go/file-rotatelogs v2.3.0+incompatible/go.mod h1:ZQnN8lSECaebrkQytbHj4xNgtg8CR7RYXnPok8e0EHA= @@ -64,13 +76,16 @@ github.com/lestrrat-go/strftime v1.0.1/go.mod h1:E1nN3pCbtMSu1yjSVeyuRFVm/U0xoR7 github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ= github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= @@ -80,9 +95,12 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/t-tomalak/logrus-easy-formatter v0.0.0-20190827215021-c074f06c5816 h1:J6v8awz+me+xeb/cUTotKgceAYouhIB3pjzgRd6IlGk= github.com/t-tomalak/logrus-easy-formatter v0.0.0-20190827215021-c074f06c5816/go.mod h1:tzym/CEb5jnFI+Q0k4Qq3+LvRF4gO3E2pxS8fHP8jcA= +github.com/tebeka/strftime v0.1.5 h1:1NQKN1NiQgkqd/2moD6ySP/5CoZQsKa1d3ZhJ44Jpmg= +github.com/tebeka/strftime v0.1.5/go.mod h1:29/OidkoWHdEKZqzyDLUyC+LmgDgdHo4WAFCDT7D/Ig= github.com/tidwall/gjson v1.6.0 h1:9VEQWz6LLMUsUl6PueE49ir4Ka6CzLymOAZDxpFsTDc= github.com/tidwall/gjson v1.6.0/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls= github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc= @@ -132,6 +150,7 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -151,6 +170,7 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= gopkg.in/go-playground/validator.v9 v9.29.1/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWdVbfP1avr/N00E2vyQ= diff --git a/server/websocket.go b/server/websocket.go index d61afd5..4d5d280 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -2,24 +2,25 @@ package server import ( "fmt" - "github.com/Mrs4s/go-cqhttp/coolq" - "github.com/Mrs4s/go-cqhttp/global" - "github.com/gorilla/websocket" - log "github.com/sirupsen/logrus" - "github.com/tidwall/gjson" "net/http" "strconv" "strings" "sync" "time" + + "github.com/Mrs4s/go-cqhttp/coolq" + "github.com/Mrs4s/go-cqhttp/global" + "github.com/gorilla/websocket" + log "github.com/sirupsen/logrus" + "github.com/tidwall/gjson" ) type websocketServer struct { - bot *coolq.CQBot - token string - eventConn []*websocket.Conn - pushLock *sync.Mutex - handshake string + bot *coolq.CQBot + token string + eventConn []*websocketConn + eventConnMutex sync.Mutex + handshake string } type websocketClient struct { @@ -27,9 +28,13 @@ type websocketClient struct { token string bot *coolq.CQBot - pushLock *sync.Mutex - universalConn *websocket.Conn - eventConn *websocket.Conn + universalConn *websocketConn + eventConn *websocketConn +} + +type websocketConn struct { + *websocket.Conn + sync.Mutex } var WebsocketServer = &websocketServer{} @@ -41,7 +46,6 @@ var upgrader = websocket.Upgrader{ func (s *websocketServer) Run(addr, authToken string, b *coolq.CQBot) { s.token = authToken - s.pushLock = new(sync.Mutex) s.bot = b s.handshake = fmt.Sprintf(`{"_post_method":2,"meta_event_type":"lifecycle","post_type":"meta_event","self_id":%d,"sub_type":"connect","time":%d}`, s.bot.Client.Uin, time.Now().Unix()) @@ -56,7 +60,7 @@ func (s *websocketServer) Run(addr, authToken string, b *coolq.CQBot) { } func NewWebsocketClient(conf *global.GoCQReverseWebsocketConfig, authToken string, b *coolq.CQBot) *websocketClient { - return &websocketClient{conf: conf, token: authToken, bot: b, pushLock: new(sync.Mutex)} + return &websocketClient{conf: conf, token: authToken, bot: b} } func (c *websocketClient) Run() { @@ -96,7 +100,8 @@ func (c *websocketClient) connectApi() { return } log.Infof("已连接到反向Websocket API服务器 %v", c.conf.ReverseApiUrl) - go c.listenApi(conn, false) + wrappedConn := &websocketConn{Conn: conn} + go c.listenApi(wrappedConn, false) } func (c *websocketClient) connectEvent() { @@ -119,7 +124,7 @@ func (c *websocketClient) connectEvent() { return } log.Infof("已连接到反向Websocket Event服务器 %v", c.conf.ReverseEventUrl) - c.eventConn = conn + c.eventConn = &websocketConn{Conn: conn} } func (c *websocketClient) connectUniversal() { @@ -141,11 +146,12 @@ func (c *websocketClient) connectUniversal() { } return } - go c.listenApi(conn, true) - c.universalConn = conn + wrappedConn := &websocketConn{Conn: conn} + go c.listenApi(wrappedConn, true) + c.universalConn = wrappedConn } -func (c *websocketClient) listenApi(conn *websocket.Conn, u bool) { +func (c *websocketClient) listenApi(conn *websocketConn, u bool) { defer conn.Close() for { _, buf, err := conn.ReadMessage() @@ -153,35 +159,23 @@ func (c *websocketClient) listenApi(conn *websocket.Conn, u bool) { log.Warnf("监听反向WS API时出现错误: %v", err) break } - j := gjson.ParseBytes(buf) - t := strings.ReplaceAll(j.Get("action").Str, "_async", "") - log.Debugf("反向WS接收到API调用: %v 参数: %v", t, j.Get("params").Raw) - if f, ok := wsApi[t]; ok { - go func() { - ret := f(c.bot, j.Get("params")) - if j.Get("echo").Exists() { - ret["echo"] = j.Get("echo").Value() - } - c.pushLock.Lock() - log.Debugf("准备发送API %v 处理结果: %v", t, ret.ToJson()) - _ = conn.WriteJSON(ret) - c.pushLock.Unlock() - }() - } + + go conn.handleRequest(c.bot, buf) + } if c.conf.ReverseReconnectInterval != 0 { time.Sleep(time.Millisecond * time.Duration(c.conf.ReverseReconnectInterval)) if !u { - c.connectApi() + go c.connectApi() } } } func (c *websocketClient) onBotPushEvent(m coolq.MSG) { - c.pushLock.Lock() - defer c.pushLock.Unlock() if c.eventConn != nil { log.Debugf("向WS服务器 %v 推送Event: %v", c.eventConn.RemoteAddr().String(), m.ToJson()) + c.eventConn.Lock() + defer c.eventConn.Unlock() if err := c.eventConn.WriteJSON(m); err != nil { log.Warnf("向WS服务器 %v 推送Event时出现错误: %v", c.eventConn.RemoteAddr().String(), err) _ = c.eventConn.Close() @@ -195,6 +189,8 @@ func (c *websocketClient) onBotPushEvent(m coolq.MSG) { } if c.universalConn != nil { log.Debugf("向WS服务器 %v 推送Event: %v", c.universalConn.RemoteAddr().String(), m.ToJson()) + c.universalConn.Lock() + defer c.universalConn.Unlock() if err := c.universalConn.WriteJSON(m); err != nil { log.Warnf("向WS服务器 %v 推送Event时出现错误: %v", c.universalConn.RemoteAddr().String(), err) _ = c.universalConn.Close() @@ -222,10 +218,19 @@ func (s *websocketServer) event(w http.ResponseWriter, r *http.Request) { return } err = c.WriteMessage(websocket.TextMessage, []byte(s.handshake)) - if err == nil { - log.Infof("接受 Websocket 连接: %v (/event)", r.RemoteAddr) - s.eventConn = append(s.eventConn, c) + if err != nil { + log.Warnf("Websocket 握手时出现错误: %v", err) + c.Close() + return } + + log.Infof("接受 Websocket 连接: %v (/event)", r.RemoteAddr) + + conn := &websocketConn{Conn: c} + + s.eventConnMutex.Lock() + s.eventConn = append(s.eventConn, conn) + s.eventConnMutex.Unlock() } func (s *websocketServer) api(w http.ResponseWriter, r *http.Request) { @@ -242,7 +247,8 @@ func (s *websocketServer) api(w http.ResponseWriter, r *http.Request) { return } log.Infof("接受 Websocket 连接: %v (/api)", r.RemoteAddr) - go s.listenApi(c) + conn := &websocketConn{Conn: c} + go s.listenApi(conn) } func (s *websocketServer) any(w http.ResponseWriter, r *http.Request) { @@ -259,42 +265,55 @@ func (s *websocketServer) any(w http.ResponseWriter, r *http.Request) { return } err = c.WriteMessage(websocket.TextMessage, []byte(s.handshake)) - if err == nil { - log.Infof("接受 Websocket 连接: %v (/)", r.RemoteAddr) - s.eventConn = append(s.eventConn, c) - s.listenApi(c) + if err != nil { + log.Warnf("Websocket 握手时出现错误: %v", err) + c.Close() + return } + + log.Infof("接受 Websocket 连接: %v (/)", r.RemoteAddr) + conn := &websocketConn{Conn: c} + s.eventConn = append(s.eventConn, conn) + s.listenApi(conn) } -func (s *websocketServer) listenApi(c *websocket.Conn) { +func (s *websocketServer) listenApi(c *websocketConn) { defer c.Close() for { t, payload, err := c.ReadMessage() if err != nil { break } + if t == websocket.TextMessage { - j := gjson.ParseBytes(payload) - t := strings.ReplaceAll(j.Get("action").Str, "_async", "") //TODO: async support - log.Debugf("WS接收到API调用: %v 参数: %v", t, j.Get("params").Raw) - if f, ok := wsApi[t]; ok { - go func() { - ret := f(s.bot, j.Get("params")) - if j.Get("echo").Exists() { - ret["echo"] = j.Get("echo").Value() - } - s.pushLock.Lock() - _ = c.WriteJSON(ret) - s.pushLock.Unlock() - }() - } + go c.handleRequest(s.bot, payload) } } } +func (c *websocketConn) handleRequest(bot *coolq.CQBot, payload []byte) { + defer func() { + if err := recover(); err != nil { + log.Printf("处置WS命令时发生无法恢复的异常:%v", err) + c.Close() + } + }() + + j := gjson.ParseBytes(payload) + t := strings.ReplaceAll(j.Get("action").Str, "_async", "") + log.Debugf("WS接收到API调用: %v 参数: %v", t, j.Get("params").Raw) + if f, ok := wsApi[t]; ok { + ret := f(bot, j.Get("params")) + if j.Get("echo").Exists() { + ret["echo"] = j.Get("echo").Value() + } + c.Lock() + defer c.Unlock() + _ = c.WriteJSON(ret) + } +} + func (s *websocketServer) onBotPushEvent(m coolq.MSG) { - s.pushLock.Lock() - defer s.pushLock.Unlock() pos := 0 for _, conn := range s.eventConn { log.Debugf("向WS客户端 %v 推送Event: %v", conn.RemoteAddr().String(), m.ToJson())