diff --git a/global/config/config.go b/global/config/config.go index cea96e3..58ea8e5 100644 --- a/global/config/config.go +++ b/global/config/config.go @@ -82,11 +82,15 @@ type MiddleWares struct { // HTTPServer HTTP通信相关配置 type HTTPServer struct { - Disabled bool `yaml:"disabled"` - Host string `yaml:"host"` - Port int `yaml:"port"` - Timeout int32 `yaml:"timeout"` - Post []struct { + Disabled bool `yaml:"disabled"` + Host string `yaml:"host"` + Port int `yaml:"port"` + Timeout int32 `yaml:"timeout"` + LongPolling struct { + Enabled bool `yaml:"enabled"` + MaxQueueSize int `yaml:"max-queue-size"` + } `yaml:"long-polling"` + Post []struct { URL string `yaml:"url"` Secret string `yaml:"secret"` } @@ -268,6 +272,12 @@ const httpDefault = ` # HTTP 通信设置 # 反向HTTP超时时间, 单位秒 # 最小值为5,小于5将会忽略本项设置 timeout: 5 + # 长轮询拓展 + long-polling: + # 是否开启 + enabled: false + # 消息队列大小,0 表示不限制队列大小,谨慎使用 + max-queue-size: 2000 middlewares: <<: *default # 引用默认中间件 # 反向HTTP POST地址列表 diff --git a/server/http.go b/server/http.go index ca81d88..fc7246c 100644 --- a/server/http.go +++ b/server/http.go @@ -160,6 +160,9 @@ func RunHTTPServerAndClients(bot *coolq.CQBot, conf *config.HTTPServer) { if conf.RateLimit.Enabled { s.api.use(rateLimit(conf.RateLimit.Frequency, conf.RateLimit.Bucket)) } + if conf.LongPolling.Enabled { + s.api.use(longPolling(bot, conf.LongPolling.MaxQueueSize)) + } go func() { log.Infof("CQ HTTP 服务器已启动: %v", addr) diff --git a/server/middlewares.go b/server/middlewares.go index b8f093b..95f0244 100644 --- a/server/middlewares.go +++ b/server/middlewares.go @@ -1,6 +1,7 @@ package server import ( + "container/list" "context" "os" "sync" @@ -54,3 +55,37 @@ func findFilter(file string) global.Filter { defer filterMutex.RUnlock() return filters[file] } + +func longPolling(bot *coolq.CQBot, maxSize int) handler { + var mutex sync.Mutex + cond := sync.NewCond(&mutex) + queue := list.New() + bot.OnEventPush(func(event *coolq.Event) { + mutex.Lock() + defer mutex.Unlock() + queue.PushBack(event.RawMsg) + for maxSize != 0 && queue.Len() > maxSize { + queue.Remove(queue.Front()) + } + cond.Signal() + }) + return func(action string, p resultGetter) coolq.MSG { + if action != "get_updates" { + return nil + } + mutex.Lock() + defer mutex.Unlock() + if queue.Len() == 0 { + cond.Wait() + } + limit := int(p.Get("limit").Int()) + if limit <= 0 || queue.Len() < limit { + limit = queue.Len() + } + ret := make([]interface{}, limit) + for i := 0; i < limit; i++ { + ret[i] = queue.Remove(queue.Front()) + } + return coolq.OK(ret) + } +}