From bfc29a8c97a2be1d68af65fda658fe2e193b0ca8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8D=83=E6=A9=98=20=E9=9B=AB=E9=9C=9E?= Date: Fri, 24 Dec 2021 22:27:07 +0800 Subject: [PATCH] feat: support set max retries and retries interval (close #1252) (#1289) * feat: support set max retries and retries interval * fix: httpresponse using `res` before checking for errors * fix: `HttpServerPost` now be unexported * refactor: pretty `httpDefault` --- server/http.go | 185 ++++++++++++++++++++++++++----------------------- 1 file changed, 100 insertions(+), 85 deletions(-) diff --git a/server/http.go b/server/http.go index fc233c2..c89d240 100644 --- a/server/http.go +++ b/server/http.go @@ -9,7 +9,6 @@ import ( "encoding/json" "fmt" "io" - "math/rand" "net/http" "net/url" "os" @@ -40,14 +39,18 @@ type HTTPServer struct { Enabled bool `yaml:"enabled"` MaxQueueSize int `yaml:"max-queue-size"` } `yaml:"long-polling"` - Post []struct { - URL string `yaml:"url"` - Secret string `yaml:"secret"` - } + Post []httpServerPost `yaml:"post"` MiddleWares `yaml:"middlewares"` } +type httpServerPost struct { + URL string `yaml:"url"` + Secret string `yaml:"secret"` + MaxRetries *uint64 `yaml:"max-retries"` + RetriesInterval *uint64 `yaml:"retries-interval"` +} + type httpServer struct { HTTP *http.Server api *api.Caller @@ -56,12 +59,14 @@ type httpServer struct { // HTTPClient 反向HTTP上报客户端 type HTTPClient struct { - bot *coolq.CQBot - secret string - addr string - filter string - apiPort int - timeout int32 + bot *coolq.CQBot + secret string + addr string + filter string + apiPort int + timeout int32 + MaxRetries uint64 + RetriesInterval uint64 } type httpCtx struct { @@ -70,66 +75,72 @@ type httpCtx struct { postForm url.Values } -const httpDefault = ` # HTTP 通信设置 - - http: - # 服务端监听地址 - host: 127.0.0.1 - # 服务端监听端口 - port: 5700 - # 反向HTTP超时时间, 单位秒 - # 最小值为5,小于5将会忽略本项设置 - timeout: 5 - # 长轮询拓展 - long-polling: - # 是否开启 - enabled: false - # 消息队列大小,0 表示不限制队列大小,谨慎使用 - max-queue-size: 2000 +const httpDefault = ` + - http: # HTTP 通信设置 + host: 127.0.0.1 # 服务端监听地址 + port: 5700 # 服务端监听端口 + timeout: 5 # 反向 HTTP 超时时间, 单位秒,<5 时将被忽略 + long-polling: # 长轮询拓展 + enabled: false # 是否开启 + max-queue-size: 2000 # 消息队列大小,0 表示不限制队列大小,谨慎使用 middlewares: <<: *default # 引用默认中间件 - # 反向HTTP POST地址列表 - post: - #- url: '' # 地址 - # secret: '' # 密钥 + post: # 反向HTTP POST地址列表 + #- url: '' # 地址 + # secret: '' # 密钥 + # max-retries: 3 # 最大重试,0 时禁用 + # retries-interval: 1500 # 重试时间,单位毫秒,0 时立即 #- url: http://127.0.0.1:5701/ # 地址 - # secret: '' # 密钥 + # secret: '' # 密钥 + # max-retries: 10 # 最大重试,0 时禁用 + # retries-interval: 1000 # 重试时间,单位毫秒,0 时立即 ` -func init() { - config.AddServer(&config.Server{ - Brief: "HTTP通信", - Default: httpDefault, - ParseEnv: func() (string, *yaml.Node) { - if os.Getenv("GCQ_HTTP_PORT") != "" { - // type convert tools - toInt64 := func(str string) int64 { - i, _ := strconv.ParseInt(str, 10, 64) - return i - } - accessTokenEnv := os.Getenv("GCQ_ACCESS_TOKEN") - node := &yaml.Node{} - httpConf := &HTTPServer{ - Host: "0.0.0.0", - Port: 5700, - MiddleWares: MiddleWares{ - AccessToken: accessTokenEnv, - }, - } - param.SetExcludeDefault(&httpConf.Disabled, param.EnsureBool(os.Getenv("GCQ_HTTP_DISABLE"), false), false) - param.SetExcludeDefault(&httpConf.Host, os.Getenv("GCQ_HTTP_HOST"), "") - param.SetExcludeDefault(&httpConf.Port, int(toInt64(os.Getenv("GCQ_HTTP_PORT"))), 0) - if os.Getenv("GCQ_HTTP_POST_URL") != "" { - httpConf.Post = append(httpConf.Post, struct { - URL string `yaml:"url"` - Secret string `yaml:"secret"` - }{os.Getenv("GCQ_HTTP_POST_URL"), os.Getenv("GCQ_HTTP_POST_SECRET")}) - } - _ = node.Encode(httpConf) - return "http", node - } - return "", nil +func nilParseUint(s string, base int, bitSize int) *uint64 { + pu, err := strconv.ParseUint(s, base, bitSize) + if err != nil { + return nil + } + return &pu +} + +func readEnvConfig() (string, *yaml.Node) { + + if s, ok := os.LookupEnv("GCQ_HTTP_PORT"); !ok || s == "" { + return "", nil + } + + // type convert tools + toInt64 := func(str string) int64 { + i, _ := strconv.ParseInt(str, 10, 64) + return i + } + accessTokenEnv := os.Getenv("GCQ_ACCESS_TOKEN") + node := &yaml.Node{} + httpConf := &HTTPServer{ + Host: "0.0.0.0", + Port: 5700, + MiddleWares: MiddleWares{ + AccessToken: accessTokenEnv, }, - }) + } + param.SetExcludeDefault(&httpConf.Disabled, param.EnsureBool(os.Getenv("GCQ_HTTP_DISABLE"), false), false) + param.SetExcludeDefault(&httpConf.Host, os.Getenv("GCQ_HTTP_HOST"), "") + param.SetExcludeDefault(&httpConf.Port, int(toInt64(os.Getenv("GCQ_HTTP_PORT"))), 0) + if os.Getenv("GCQ_HTTP_POST_URL") != "" { + httpConf.Post = append(httpConf.Post, httpServerPost{ + os.Getenv("GCQ_HTTP_POST_URL"), + os.Getenv("GCQ_HTTP_POST_SECRET"), + nilParseUint(os.Getenv("GCQ_HTTP_POST_MAXRETRIES"), 10, 64), + nilParseUint(os.Getenv("GCQ_HTTP_POST_RETRIESINTERVAL"), 10, 64), + }) + } + _ = node.Encode(httpConf) + return "http", node +} + +func init() { + config.AddServer(&config.Server{Brief: "HTTP通信", Default: httpDefault, ParseEnv: readEnvConfig}) } func (h *httpCtx) Get(s string) gjson.Result { @@ -242,6 +253,13 @@ func checkAuth(req *http.Request, token string) int { } } +func puint64Operator(p *uint64, def uint64) uint64 { + if p == nil { + return def + } + return *p +} + // runHTTP 启动HTTP服务器与HTTP上报客户端 func runHTTP(bot *coolq.CQBot, node yaml.Node) { var conf HTTPServer @@ -285,12 +303,14 @@ client: for _, c := range conf.Post { if c.URL != "" { go HTTPClient{ - bot: bot, - secret: c.Secret, - addr: c.URL, - apiPort: conf.Port, - filter: conf.Filter, - timeout: conf.Timeout, + bot: bot, + secret: c.Secret, + addr: c.URL, + apiPort: conf.Port, + filter: conf.Filter, + timeout: conf.Timeout, + MaxRetries: puint64Operator(c.MaxRetries, 3), + RetriesInterval: puint64Operator(c.RetriesInterval, 1500), }.Run() } } @@ -330,10 +350,7 @@ func (c *HTTPClient) onBotPushEvent(e *coolq.Event) { } var res *http.Response - var err error - const maxAttemptTimes = 5 - - for i := 0; i <= maxAttemptTimes; i++ { + for i := uint64(0); i <= c.MaxRetries; i++ { // see https://stackoverflow.com/questions/31337891/net-http-http-contentlength-222-with-body-length-0 // we should create a new request for every single post trial req, err := http.NewRequest("POST", c.addr, bytes.NewReader(e.JSONBytes())) @@ -344,24 +361,22 @@ func (c *HTTPClient) onBotPushEvent(e *coolq.Event) { req.Header = header res, err = client.Do(req) - if err == nil { + if res != nil { //goland:noinspection GoDeferInLoop defer res.Body.Close() + } + if err == nil { break } - if i != maxAttemptTimes { + if i < c.MaxRetries { log.Warnf("上报 Event 数据到 %v 失败: %v 将进行第 %d 次重试", c.addr, err, i+1) + } else { + log.Warnf("上报 Event 数据 %s 到 %v 失败: %v 停止上报:已达重试上线", e.JSONBytes(), c.addr, err) + return } - const maxWait = int64(time.Second * 3) - const minWait = int64(time.Millisecond * 500) - wait := rand.Int63n(maxWait-minWait) + minWait - time.Sleep(time.Duration(wait)) + time.Sleep(time.Millisecond * time.Duration(c.RetriesInterval)) } - if err != nil { - log.Warnf("上报Event数据 %s 到 %v 失败: %v", e.JSONBytes(), c.addr, err) - return - } log.Debugf("上报Event数据 %s 到 %v", e.JSONBytes(), c.addr) r, err := io.ReadAll(res.Body)