1
0
mirror of https://github.com/Mrs4s/go-cqhttp.git synced 2025-05-06 12:03:50 +08:00

feat(server): support http long polling.

This commit is contained in:
wdvxdr 2021-07-10 23:13:15 +08:00 committed by wdvxdr
parent 6d5bf84603
commit cdd2dcf907
No known key found for this signature in database
GPG Key ID: 703F8C071DE7A1B6
3 changed files with 53 additions and 5 deletions

View File

@ -86,6 +86,10 @@ type HTTPServer struct {
Host string `yaml:"host"` Host string `yaml:"host"`
Port int `yaml:"port"` Port int `yaml:"port"`
Timeout int32 `yaml:"timeout"` Timeout int32 `yaml:"timeout"`
LongPolling struct {
Enabled bool `yaml:"enabled"`
MaxQueueSize int `yaml:"max-queue-size"`
} `yaml:"long-polling"`
Post []struct { Post []struct {
URL string `yaml:"url"` URL string `yaml:"url"`
Secret string `yaml:"secret"` Secret string `yaml:"secret"`
@ -268,6 +272,12 @@ const httpDefault = ` # HTTP 通信设置
# 反向HTTP超时时间, 单位秒 # 反向HTTP超时时间, 单位秒
# 最小值为5小于5将会忽略本项设置 # 最小值为5小于5将会忽略本项设置
timeout: 5 timeout: 5
# 长轮询拓展
long-polling:
# 是否开启
enabled: false
# 消息队列大小0 表示不限制队列大小谨慎使用
max-queue-size: 2000
middlewares: middlewares:
<<: *default # 引用默认中间件 <<: *default # 引用默认中间件
# 反向HTTP POST地址列表 # 反向HTTP POST地址列表

View File

@ -160,6 +160,9 @@ func RunHTTPServerAndClients(bot *coolq.CQBot, conf *config.HTTPServer) {
if conf.RateLimit.Enabled { if conf.RateLimit.Enabled {
s.api.use(rateLimit(conf.RateLimit.Frequency, conf.RateLimit.Bucket)) s.api.use(rateLimit(conf.RateLimit.Frequency, conf.RateLimit.Bucket))
} }
if conf.LongPolling.Enabled {
s.api.use(longPolling(bot, conf.LongPolling.MaxQueueSize))
}
go func() { go func() {
log.Infof("CQ HTTP 服务器已启动: %v", addr) log.Infof("CQ HTTP 服务器已启动: %v", addr)

View File

@ -1,6 +1,7 @@
package server package server
import ( import (
"container/list"
"context" "context"
"os" "os"
"sync" "sync"
@ -54,3 +55,37 @@ func findFilter(file string) global.Filter {
defer filterMutex.RUnlock() defer filterMutex.RUnlock()
return filters[file] return filters[file]
} }
func longPolling(bot *coolq.CQBot, maxSize int) handler {
var mutex sync.Mutex
cond := sync.NewCond(&mutex)
queue := list.New()
bot.OnEventPush(func(event *coolq.Event) {
mutex.Lock()
defer mutex.Unlock()
queue.PushBack(event.RawMsg)
for maxSize != 0 && queue.Len() > maxSize {
queue.Remove(queue.Front())
}
cond.Signal()
})
return func(action string, p resultGetter) coolq.MSG {
if action != "get_updates" {
return nil
}
mutex.Lock()
defer mutex.Unlock()
if queue.Len() == 0 {
cond.Wait()
}
limit := int(p.Get("limit").Int())
if limit <= 0 || queue.Len() < limit {
limit = queue.Len()
}
ret := make([]interface{}, limit)
for i := 0; i < limit; i++ {
ret[i] = queue.Remove(queue.Front())
}
return coolq.OK(ret)
}
}