From 67f0ea914d2650cf88e0b378237f89387094b20e Mon Sep 17 00:00:00 2001 From: wdvxdr Date: Thu, 7 Oct 2021 21:40:20 +0800 Subject: [PATCH 1/3] feat(server): use nhooyr.io/websocket --- go.mod | 5 +-- go.sum | 42 +++++++++++++++++++++++++ server/websocket.go | 76 ++++++++++++++++++++------------------------- 3 files changed, 79 insertions(+), 44 deletions(-) diff --git a/go.mod b/go.mod index 0f76f09..5347874 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( github.com/dustin/go-humanize v1.0.0 github.com/fumiama/go-hide-param v0.1.4 github.com/gabriel-vasile/mimetype v1.3.1 - github.com/gorilla/websocket v1.4.2 github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible github.com/pkg/errors v0.9.1 @@ -26,13 +25,16 @@ require ( golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b + nhooyr.io/websocket v1.8.7 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect github.com/google/uuid v1.1.1 // indirect + github.com/gorilla/websocket v1.4.2 // indirect github.com/jonboulle/clockwork v0.2.2 // indirect + github.com/klauspost/compress v1.10.3 // indirect github.com/klauspost/cpuid/v2 v2.0.6 // indirect github.com/lestrrat-go/strftime v1.0.5 // indirect github.com/maruel/rs v0.0.0-20150922171536-2c81c4312fe4 // indirect @@ -48,7 +50,6 @@ require ( golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 // indirect google.golang.org/protobuf v1.27.1 // indirect - gopkg.in/yaml.v2 v2.2.8 // indirect modernc.org/libc v1.8.1 // indirect modernc.org/mathutil v1.2.2 // indirect modernc.org/memory v1.0.4 // indirect diff --git a/go.sum b/go.sum index ff6ff1e..5993587 100644 --- a/go.sum +++ b/go.sum @@ -22,10 +22,29 @@ github.com/fumiama/go-hide-param v0.1.4 h1:y7TRTzZMdCH9GOXnIzU3B+1BSkcmvejVGmGsz github.com/fumiama/go-hide-param v0.1.4/go.mod h1:vJkQlJIEI56nIyp7tCQu1/2QOyKtZpudsnJkGk9U1aY= github.com/gabriel-vasile/mimetype v1.3.1 h1:qevA6c2MtE1RorlScnixeG0VA1H4xrXyhyX3oWBynNQ= github.com/gabriel-vasile/mimetype v1.3.1/go.mod h1:fA8fi6KUiG7MgQQ+mEWotXoEOvmxRtOJlERCzSmRvr8= +github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14= +github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= +github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= +github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= +github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no= +github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= +github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY= +github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= +github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0= +github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= +github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8= +github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= +github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo= +github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= @@ -33,6 +52,7 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -42,19 +62,27 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.1.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9qUBdQ= github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= +github.com/json-iterator/go v1.1.9 h1:9yzud/Ht36ygwatGx56VwCZtlI/2AD15T1X2sjSuGns= +github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 h1:iQTw/8FWTuc7uiaSepXwyf3o52HaUYcV+Tu66S3F5GA= github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8= +github.com/klauspost/compress v1.10.3 h1:OP96hzwJVBIHYU52pVTI6CczrxPvrGfgqF9N5eTO0Q8= +github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/cpuid/v2 v2.0.6 h1:dQ5ueTiftKxp0gyjKSx5+8BtPWkyQbd95m8Gys/RarI= github.com/klauspost/cpuid/v2 v2.0.6/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= +github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8= github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is= github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible h1:Y6sqxHMyB1D2YSzWkLibYKgg+SwmyFU9dF2hn6MdTj4= @@ -67,6 +95,10 @@ github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -90,6 +122,7 @@ github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e/go.mod h1:XV66xRDq github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= @@ -102,6 +135,10 @@ github.com/tidwall/pretty v1.1.0 h1:K3hMW5epkdAVwibsQEfR/7Zj0Qgt4DxtNumTq/VloO8= github.com/tidwall/pretty v1.1.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tuotoo/qrcode v0.0.0-20190222102259-ac9c44189bf2 h1:BWVtt2VBY+lmVDu9MGKqLGKl04B+iRHcrW1Ptyi/8tg= github.com/tuotoo/qrcode v0.0.0-20190222102259-ac9c44189bf2/go.mod h1:lPnW9HVS0vJdeYyQtOvIvlXgZPNhUAhwz+z5r8AJk0Y= +github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= +github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= +github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= +github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/wdvxdr1123/go-silk v0.0.0-20210316130616-d47b553def60 h1:lRKf10iIOW0VsH5WDF621ihzR+R2wEBZVtNRHuLLCb4= github.com/wdvxdr1123/go-silk v0.0.0-20210316130616-d47b553def60/go.mod h1:ecFKZPX81BaB70I6ruUgEwYcDOtuNgJGnjdK+MIl5ko= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -141,9 +178,11 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9sn golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b h1:9zKuko04nR4gjZ4+DNjHqRlAJqbJETHwiNKDqTfOjfE= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs= golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -179,6 +218,7 @@ gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMy gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= @@ -193,3 +233,5 @@ modernc.org/mathutil v1.2.2 h1:+yFk8hBprV+4c0U9GjFtL+dV3N8hOJ8JCituQcMShFY= modernc.org/mathutil v1.2.2/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= modernc.org/memory v1.0.4 h1:utMBrFcpnQDdNsmM6asmyH/FM9TqLPS7XF7otpJmrwM= modernc.org/memory v1.0.4/go.mod h1:nV2OApxradM3/OVbs2/0OsP6nPfakXpi50C7dcoHXlc= +nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g= +nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= diff --git a/server/websocket.go b/server/websocket.go index 5200aed..c9ee06e 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -2,6 +2,8 @@ package server import ( "bytes" + "context" + "encoding/json" "fmt" "net/http" "runtime/debug" @@ -11,13 +13,13 @@ import ( "time" "gopkg.in/yaml.v3" + "nhooyr.io/websocket" "github.com/Mrs4s/go-cqhttp/coolq" "github.com/Mrs4s/go-cqhttp/global" "github.com/Mrs4s/go-cqhttp/modules/config" "github.com/Mrs4s/MiraiGo/utils" - "github.com/gorilla/websocket" log "github.com/sirupsen/logrus" "github.com/tidwall/gjson" ) @@ -46,16 +48,9 @@ type websocketClient struct { type webSocketConn struct { *websocket.Conn - sync.Mutex apiCaller *apiCaller } -var upgrader = websocket.Upgrader{ - CheckOrigin: func(r *http.Request) bool { - return true - }, -} - // runWSServer 运行一个正向WS server func runWSServer(b *coolq.CQBot, node yaml.Node) { var conf config.WebsocketServer @@ -129,7 +124,7 @@ func (c *websocketClient) connectAPI() { if c.token != "" { header["Authorization"] = []string{"Token " + c.token} } - conn, _, err := websocket.DefaultDialer.Dial(c.conf.API, header) // nolint + conn, _, err := websocket.Dial(context.Background(), c.conf.Universal, &websocket.DialOptions{HTTPHeader: header}) // nolint if err != nil { log.Warnf("连接到反向WebSocket API服务器 %v 时出现错误: %v", c.conf.API, err) if c.conf.ReconnectInterval != 0 { @@ -156,7 +151,7 @@ func (c *websocketClient) connectEvent() { if c.token != "" { header["Authorization"] = []string{"Token " + c.token} } - conn, _, err := websocket.DefaultDialer.Dial(c.conf.Event, header) // nolint + conn, _, err := websocket.Dial(context.Background(), c.conf.Universal, &websocket.DialOptions{HTTPHeader: header}) // nolint if err != nil { log.Warnf("连接到反向WebSocket Event服务器 %v 时出现错误: %v", c.conf.Event, err) if c.conf.ReconnectInterval != 0 { @@ -168,7 +163,7 @@ func (c *websocketClient) connectEvent() { handshake := fmt.Sprintf(`{"meta_event_type":"lifecycle","post_type":"meta_event","self_id":%d,"sub_type":"connect","time":%d}`, c.bot.Client.Uin, time.Now().Unix()) - err = conn.WriteMessage(websocket.TextMessage, []byte(handshake)) + err = conn.Write(context.Background(), websocket.MessageText, []byte(handshake)) if err != nil { log.Warnf("反向WebSocket 握手时出现错误: %v", err) } @@ -192,7 +187,7 @@ func (c *websocketClient) connectUniversal() { if c.token != "" { header["Authorization"] = []string{"Token " + c.token} } - conn, _, err := websocket.DefaultDialer.Dial(c.conf.Universal, header) // nolint + conn, _, err := websocket.Dial(context.Background(), c.conf.Universal, &websocket.DialOptions{HTTPHeader: header}) // nolint if err != nil { log.Warnf("连接到反向WebSocket Universal服务器 %v 时出现错误: %v", c.conf.Universal, err) if c.conf.ReconnectInterval != 0 { @@ -203,7 +198,7 @@ func (c *websocketClient) connectUniversal() { } handshake := fmt.Sprintf(`{"meta_event_type":"lifecycle","post_type":"meta_event","self_id":%d,"sub_type":"connect","time":%d}`, c.bot.Client.Uin, time.Now().Unix()) - err = conn.WriteMessage(websocket.TextMessage, []byte(handshake)) + err = conn.Write(context.Background(), websocket.MessageText, []byte(handshake)) if err != nil { log.Warnf("反向WebSocket 握手时出现错误: %v", err) } @@ -221,10 +216,10 @@ func (c *websocketClient) connectUniversal() { } func (c *websocketClient) listenAPI(conn *webSocketConn, u bool) { - defer func() { _ = conn.Close() }() + defer func() { _ = conn.Close(websocket.StatusNormalClosure, "") }() for { buffer := global.NewBuffer() - t, reader, err := conn.NextReader() + t, reader, err := conn.Conn.Reader(context.Background()) if err != nil { log.Warnf("监听反向WS API时出现错误: %v", err) break @@ -234,7 +229,7 @@ func (c *websocketClient) listenAPI(conn *webSocketConn, u bool) { log.Warnf("监听反向WS API时出现错误: %v", err) break } - if t == websocket.TextMessage { + if t == websocket.MessageText { go func(buffer *bytes.Buffer) { defer global.PutBuffer(buffer) conn.handleRequest(c.bot, buffer.Bytes()) @@ -258,13 +253,12 @@ func (c *websocketClient) onBotPushEvent(e *coolq.Event) { return } push := func(conn *webSocketConn, reconnect func()) { - log.Debugf("向WS服务器 %v 推送Event: %s", conn.RemoteAddr().String(), e.JSONBytes()) - conn.Lock() - defer conn.Unlock() - _ = conn.SetWriteDeadline(time.Now().Add(time.Second * 15)) - if err := conn.WriteMessage(websocket.TextMessage, e.JSONBytes()); err != nil { - log.Warnf("向WS服务器 %v 推送Event时出现错误: %v", conn.RemoteAddr().String(), err) - _ = conn.Close() + log.Debugf("向WS服务器推送Event: %s", e.JSONBytes()) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + defer cancel() + if err := conn.Write(ctx, websocket.MessageText, e.JSONBytes()); err != nil { + log.Warnf("向WS服务器推送 Event 时出现错误: %v", err) + _ = conn.Close(websocket.StatusNormalClosure, "") if c.conf.ReconnectInterval != 0 { time.Sleep(time.Millisecond * time.Duration(c.conf.ReconnectInterval)) reconnect() @@ -286,15 +280,15 @@ func (s *webSocketServer) event(w http.ResponseWriter, r *http.Request) { w.WriteHeader(status) return } - c, err := upgrader.Upgrade(w, r, nil) + c, err := websocket.Accept(w, r, nil) if err != nil { log.Warnf("处理 WebSocket 请求时出现错误: %v", err) return } - err = c.WriteMessage(websocket.TextMessage, []byte(s.handshake)) + err = c.Write(context.Background(), websocket.MessageText, []byte(s.handshake)) if err != nil { log.Warnf("WebSocket 握手时出现错误: %v", err) - _ = c.Close() + _ = c.Close(websocket.StatusNormalClosure, "") return } @@ -314,7 +308,7 @@ func (s *webSocketServer) api(w http.ResponseWriter, r *http.Request) { w.WriteHeader(status) return } - c, err := upgrader.Upgrade(w, r, nil) + c, err := websocket.Accept(w, r, nil) if err != nil { log.Warnf("处理 WebSocket 请求时出现错误: %v", err) return @@ -334,15 +328,15 @@ func (s *webSocketServer) any(w http.ResponseWriter, r *http.Request) { w.WriteHeader(status) return } - c, err := upgrader.Upgrade(w, r, nil) + c, err := websocket.Accept(w, r, nil) if err != nil { log.Warnf("处理 WebSocket 请求时出现错误: %v", err) return } - err = c.WriteMessage(websocket.TextMessage, []byte(s.handshake)) + err = c.Write(context.Background(), websocket.MessageText, []byte(s.handshake)) if err != nil { log.Warnf("WebSocket 握手时出现错误: %v", err) - _ = c.Close() + _ = c.Close(websocket.StatusNormalClosure, "") return } log.Infof("接受 WebSocket 连接: %v (/)", r.RemoteAddr) @@ -357,10 +351,10 @@ func (s *webSocketServer) any(w http.ResponseWriter, r *http.Request) { } func (s *webSocketServer) listenAPI(c *webSocketConn) { - defer func() { _ = c.Close() }() + defer func() { _ = c.Close(websocket.StatusNormalClosure, "") }() for { buffer := global.NewBuffer() - t, reader, err := c.NextReader() + t, reader, err := c.Reader(context.Background()) if err != nil { break } @@ -369,7 +363,7 @@ func (s *webSocketServer) listenAPI(c *webSocketConn) { break } - if t == websocket.TextMessage { + if t == websocket.MessageText { go func(buffer *bytes.Buffer) { defer global.PutBuffer(buffer) c.handleRequest(s.bot, buffer.Bytes()) @@ -384,7 +378,7 @@ func (c *webSocketConn) handleRequest(_ *coolq.CQBot, payload []byte) { defer func() { if err := recover(); err != nil { log.Printf("处置WS命令时发生无法恢复的异常:%v\n%s", err, debug.Stack()) - _ = c.Close() + _ = c.Close(websocket.StatusInternalError, fmt.Sprint(err)) } }() j := gjson.Parse(utils.B2S(payload)) @@ -394,9 +388,9 @@ func (c *webSocketConn) handleRequest(_ *coolq.CQBot, payload []byte) { if j.Get("echo").Exists() { ret["echo"] = j.Get("echo").Value() } - c.Lock() - defer c.Unlock() - _ = c.WriteJSON(ret) + writer, _ := c.Writer(context.Background(), websocket.MessageText) + _ = json.NewEncoder(writer).Encode(ret) + _ = writer.Close() } func (s *webSocketServer) onBotPushEvent(e *coolq.Event) { @@ -411,10 +405,9 @@ func (s *webSocketServer) onBotPushEvent(e *coolq.Event) { j := 0 for i := 0; i < len(s.eventConn); i++ { conn := s.eventConn[i] - log.Debugf("向WS客户端 %v 推送Event: %s", conn.RemoteAddr().String(), e.JSONBytes()) - conn.Lock() - if err := conn.WriteMessage(websocket.TextMessage, e.JSONBytes()); err != nil { - _ = conn.Close() + log.Debugf("向WS客户端推送Event: %s", e.JSONBytes()) + if err := conn.Write(context.Background(), websocket.MessageText, e.JSONBytes()); err != nil { + _ = conn.Close(websocket.StatusNormalClosure, "") conn = nil continue } @@ -423,7 +416,6 @@ func (s *webSocketServer) onBotPushEvent(e *coolq.Event) { // use a in-place removal to avoid copying. s.eventConn[j] = conn } - conn.Unlock() j++ } s.eventConn = s.eventConn[:j] From 1771cda11c42cec8cb6fb1b8326926c35f16ed79 Mon Sep 17 00:00:00 2001 From: wdvxdr Date: Thu, 7 Oct 2021 22:24:45 +0800 Subject: [PATCH 2/3] feat(server): unify websocket client connect --- server/websocket.go | 180 +++++++++++++++++--------------------------- 1 file changed, 70 insertions(+), 110 deletions(-) diff --git a/server/websocket.go b/server/websocket.go index c9ee06e..64dd856 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -28,11 +28,12 @@ type webSocketServer struct { bot *coolq.CQBot conf *config.WebsocketServer - eventConn []*webSocketConn - eventConnMutex sync.Mutex - token string - handshake string - filter string + mu sync.Mutex + eventConn []*wsConn + + token string + handshake string + filter string } // websocketClient WebSocket客户端实例 @@ -40,13 +41,15 @@ type websocketClient struct { bot *coolq.CQBot conf *config.WebsocketReverse - universalConn *webSocketConn - eventConn *webSocketConn - token string - filter string + mu sync.Mutex + + universal *wsConn + event *wsConn + token string + filter string } -type webSocketConn struct { +type wsConn struct { *websocket.Conn apiCaller *apiCaller } @@ -102,20 +105,20 @@ func runWSClient(b *coolq.CQBot, node yaml.Node) { } addFilter(c.filter) if c.conf.Universal != "" { - c.connectUniversal() + c.connect("Universal", conf.Universal, &c.universal) } else { if c.conf.API != "" { - c.connectAPI() + c.connect("API", conf.API, nil) } if c.conf.Event != "" { - c.connectEvent() + c.connect("Event", conf.Event, &c.event) } } c.bot.OnEventPush(c.onBotPushEvent) } -func (c *websocketClient) connectAPI() { - log.Infof("开始尝试连接到反向WebSocket API服务器: %v", c.conf.API) +func (c *websocketClient) connect(typ, url string, conptr **wsConn) { + log.Infof("开始尝试连接到反向WebSocket %s服务器: %v", typ, c.conf.API) header := http.Header{ "X-Client-Role": []string{"API"}, "X-Self-ID": []string{strconv.FormatInt(c.bot.Client.Uin, 10)}, @@ -124,98 +127,44 @@ func (c *websocketClient) connectAPI() { if c.token != "" { header["Authorization"] = []string{"Token " + c.token} } - conn, _, err := websocket.Dial(context.Background(), c.conf.Universal, &websocket.DialOptions{HTTPHeader: header}) // nolint + conn, _, err := websocket.Dial(context.Background(), url, &websocket.DialOptions{HTTPHeader: header}) // nolint if err != nil { - log.Warnf("连接到反向WebSocket API服务器 %v 时出现错误: %v", c.conf.API, err) + log.Warnf("连接到反向WebSocket %s服务器 %v 时出现错误: %v", typ, c.conf.API, err) if c.conf.ReconnectInterval != 0 { time.Sleep(time.Millisecond * time.Duration(c.conf.ReconnectInterval)) - c.connectAPI() + c.connect(typ, url, conptr) } return } - log.Infof("已连接到反向WebSocket API服务器 %v", c.conf.API) - wrappedConn := &webSocketConn{Conn: conn, apiCaller: newAPICaller(c.bot)} + + switch typ { + case "Event", "Universal": + handshake := fmt.Sprintf(`{"meta_event_type":"lifecycle","post_type":"meta_event","self_id":%d,"sub_type":"connect","time":%d}`, c.bot.Client.Uin, time.Now().Unix()) + err = conn.Write(context.Background(), websocket.MessageText, []byte(handshake)) + if err != nil { + log.Warnf("反向WebSocket 握手时出现错误: %v", err) + } + } + + log.Infof("已连接到反向WebSocket %s服务器 %v", typ, c.conf.API) + wrappedConn := &wsConn{Conn: conn, apiCaller: newAPICaller(c.bot)} if c.conf.RateLimit.Enabled { wrappedConn.apiCaller.use(rateLimit(c.conf.RateLimit.Frequency, c.conf.RateLimit.Bucket)) } - go c.listenAPI(wrappedConn, false) -} -func (c *websocketClient) connectEvent() { - log.Infof("开始尝试连接到反向WebSocket Event服务器: %v", c.conf.Event) - header := http.Header{ - "X-Client-Role": []string{"Event"}, - "X-Self-ID": []string{strconv.FormatInt(c.bot.Client.Uin, 10)}, - "User-Agent": []string{"CQHttp/4.15.0"}, - } - if c.token != "" { - header["Authorization"] = []string{"Token " + c.token} - } - conn, _, err := websocket.Dial(context.Background(), c.conf.Universal, &websocket.DialOptions{HTTPHeader: header}) // nolint - if err != nil { - log.Warnf("连接到反向WebSocket Event服务器 %v 时出现错误: %v", c.conf.Event, err) - if c.conf.ReconnectInterval != 0 { - time.Sleep(time.Millisecond * time.Duration(c.conf.ReconnectInterval)) - c.connectEvent() - } - return + if conptr != nil { + *conptr = wrappedConn } - handshake := fmt.Sprintf(`{"meta_event_type":"lifecycle","post_type":"meta_event","self_id":%d,"sub_type":"connect","time":%d}`, - c.bot.Client.Uin, time.Now().Unix()) - err = conn.Write(context.Background(), websocket.MessageText, []byte(handshake)) - if err != nil { - log.Warnf("反向WebSocket 握手时出现错误: %v", err) - } - - log.Infof("已连接到反向WebSocket Event服务器 %v", c.conf.Event) - if c.eventConn == nil { - wrappedConn := &webSocketConn{Conn: conn, apiCaller: newAPICaller(c.bot)} - c.eventConn = wrappedConn - } else { - c.eventConn.Conn = conn + switch typ { + case "API": + go c.listenAPI(wrappedConn, false) + case "Universal": + go c.listenAPI(wrappedConn, true) } } -func (c *websocketClient) connectUniversal() { - log.Infof("开始尝试连接到反向WebSocket Universal服务器: %v", c.conf.Universal) - header := http.Header{ - "X-Client-Role": []string{"Universal"}, - "X-Self-ID": []string{strconv.FormatInt(c.bot.Client.Uin, 10)}, - "User-Agent": []string{"CQHttp/4.15.0"}, - } - if c.token != "" { - header["Authorization"] = []string{"Token " + c.token} - } - conn, _, err := websocket.Dial(context.Background(), c.conf.Universal, &websocket.DialOptions{HTTPHeader: header}) // nolint - if err != nil { - log.Warnf("连接到反向WebSocket Universal服务器 %v 时出现错误: %v", c.conf.Universal, err) - if c.conf.ReconnectInterval != 0 { - time.Sleep(time.Millisecond * time.Duration(c.conf.ReconnectInterval)) - c.connectUniversal() - } - return - } - handshake := fmt.Sprintf(`{"meta_event_type":"lifecycle","post_type":"meta_event","self_id":%d,"sub_type":"connect","time":%d}`, - c.bot.Client.Uin, time.Now().Unix()) - err = conn.Write(context.Background(), websocket.MessageText, []byte(handshake)) - if err != nil { - log.Warnf("反向WebSocket 握手时出现错误: %v", err) - } - - if c.universalConn == nil { - wrappedConn := &webSocketConn{Conn: conn, apiCaller: newAPICaller(c.bot)} - if c.conf.RateLimit.Enabled { - wrappedConn.apiCaller.use(rateLimit(c.conf.RateLimit.Frequency, c.conf.RateLimit.Bucket)) - } - c.universalConn = wrappedConn - } else { - c.universalConn.Conn = conn - } - go c.listenAPI(c.universalConn, true) -} - -func (c *websocketClient) listenAPI(conn *webSocketConn, u bool) { +func (c *websocketClient) listenAPI(conn *wsConn, u bool) { defer func() { _ = conn.Close(websocket.StatusNormalClosure, "") }() for { buffer := global.NewBuffer() @@ -241,7 +190,7 @@ func (c *websocketClient) listenAPI(conn *webSocketConn, u bool) { if c.conf.ReconnectInterval != 0 { time.Sleep(time.Millisecond * time.Duration(c.conf.ReconnectInterval)) if !u { - go c.connectAPI() + go c.connect("API", c.conf.API, nil) } } } @@ -252,7 +201,7 @@ func (c *websocketClient) onBotPushEvent(e *coolq.Event) { log.Debugf("上报Event %s 到 WS服务器 时被过滤.", e.JSONBytes()) return } - push := func(conn *webSocketConn, reconnect func()) { + push := func(conn *wsConn, reconnect func()) { log.Debugf("向WS服务器推送Event: %s", e.JSONBytes()) ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) defer cancel() @@ -265,11 +214,21 @@ func (c *websocketClient) onBotPushEvent(e *coolq.Event) { } } } - if c.eventConn != nil { - push(c.eventConn, c.connectEvent) + + connect := func(typ, url string, conptr **wsConn) func() { + return func() { + c.connect(typ, url, conptr) + } } - if c.universalConn != nil { - push(c.universalConn, c.connectUniversal) + + c.mu.Lock() + defer c.mu.Unlock() + + if c.event != nil { + push(c.event, connect("Event", c.conf.Event, &c.event)) + } + if c.universal != nil { + push(c.universal, connect("Universal", c.conf.Universal, &c.universal)) } } @@ -294,11 +253,11 @@ func (s *webSocketServer) event(w http.ResponseWriter, r *http.Request) { log.Infof("接受 WebSocket 连接: %v (/event)", r.RemoteAddr) - conn := &webSocketConn{Conn: c, apiCaller: newAPICaller(s.bot)} + conn := &wsConn{Conn: c, apiCaller: newAPICaller(s.bot)} - s.eventConnMutex.Lock() + s.mu.Lock() s.eventConn = append(s.eventConn, conn) - s.eventConnMutex.Unlock() + s.mu.Unlock() } func (s *webSocketServer) api(w http.ResponseWriter, r *http.Request) { @@ -314,7 +273,7 @@ func (s *webSocketServer) api(w http.ResponseWriter, r *http.Request) { return } log.Infof("接受 WebSocket 连接: %v (/api)", r.RemoteAddr) - conn := &webSocketConn{Conn: c, apiCaller: newAPICaller(s.bot)} + conn := &wsConn{Conn: c, apiCaller: newAPICaller(s.bot)} if s.conf.RateLimit.Enabled { conn.apiCaller.use(rateLimit(s.conf.RateLimit.Frequency, s.conf.RateLimit.Bucket)) } @@ -340,17 +299,17 @@ func (s *webSocketServer) any(w http.ResponseWriter, r *http.Request) { return } log.Infof("接受 WebSocket 连接: %v (/)", r.RemoteAddr) - conn := &webSocketConn{Conn: c, apiCaller: newAPICaller(s.bot)} + conn := &wsConn{Conn: c, apiCaller: newAPICaller(s.bot)} if s.conf.RateLimit.Enabled { conn.apiCaller.use(rateLimit(s.conf.RateLimit.Frequency, s.conf.RateLimit.Bucket)) } - s.eventConnMutex.Lock() + s.mu.Lock() s.eventConn = append(s.eventConn, conn) - s.eventConnMutex.Unlock() + s.mu.Unlock() s.listenAPI(conn) } -func (s *webSocketServer) listenAPI(c *webSocketConn) { +func (s *webSocketServer) listenAPI(c *wsConn) { defer func() { _ = c.Close(websocket.StatusNormalClosure, "") }() for { buffer := global.NewBuffer() @@ -374,7 +333,7 @@ func (s *webSocketServer) listenAPI(c *webSocketConn) { } } -func (c *webSocketConn) handleRequest(_ *coolq.CQBot, payload []byte) { +func (c *wsConn) handleRequest(_ *coolq.CQBot, payload []byte) { defer func() { if err := recover(); err != nil { log.Printf("处置WS命令时发生无法恢复的异常:%v\n%s", err, debug.Stack()) @@ -394,14 +353,15 @@ func (c *webSocketConn) handleRequest(_ *coolq.CQBot, payload []byte) { } func (s *webSocketServer) onBotPushEvent(e *coolq.Event) { - s.eventConnMutex.Lock() - defer s.eventConnMutex.Unlock() - filter := findFilter(s.filter) if filter != nil && !filter.Eval(gjson.Parse(e.JSONString())) { log.Debugf("上报Event %s 到 WS客户端 时被过滤.", e.JSONBytes()) return } + + s.mu.Lock() + defer s.mu.Unlock() + j := 0 for i := 0; i < len(s.eventConn); i++ { conn := s.eventConn[i] From 47cdc20d4567f20cf4db6655a5c45f40919004fb Mon Sep 17 00:00:00 2001 From: wdvxdr Date: Wed, 13 Oct 2021 22:51:51 +0800 Subject: [PATCH 3/3] fix: correct x-client-role header --- server/websocket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/websocket.go b/server/websocket.go index 64dd856..fcbc49b 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -120,7 +120,7 @@ func runWSClient(b *coolq.CQBot, node yaml.Node) { func (c *websocketClient) connect(typ, url string, conptr **wsConn) { log.Infof("开始尝试连接到反向WebSocket %s服务器: %v", typ, c.conf.API) header := http.Header{ - "X-Client-Role": []string{"API"}, + "X-Client-Role": []string{typ}, "X-Self-ID": []string{strconv.FormatInt(c.bot.Client.Uin, 10)}, "User-Agent": []string{"CQHttp/4.15.0"}, }