测试core信息

This commit is contained in:
KimigaiiWuyi 2025-03-10 06:50:44 +08:00
parent 19e93bdd60
commit c5003d1e7b
20 changed files with 846 additions and 2 deletions

BIN
gsuid_core/bg.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.6 MiB

View File

@ -6,6 +6,7 @@ import gsuid_core.global_val as gv
from gsuid_core.models import Event
from gsuid_core.aps import scheduler
from gsuid_core.logger import logger
from gsuid_core.status.draw_status import draw_status
from gsuid_core.utils.database.models import CoreUser, CoreGroup
from .command_global_val import save_global_val
@ -44,6 +45,19 @@ async def scheduled_save_global_val():
await count_group_user()
@sv_core_status.on_fullmatch(
(
'core信息',
'Core信息',
'CoreStatus',
'corestatus',
)
)
async def send_core_info_msg(bot: Bot, ev: Event):
logger.info('开始执行 早柚核心 [信息]')
await bot.send(await draw_status(ev))
@sv_core_status.on_command(('core状态', 'Core状态'), block=True)
async def send_core_status_msg(bot: Bot, ev: Event):
day = ev.text.strip()

View File

@ -53,7 +53,7 @@ class GsClient:
bot_id='console',
# bot_id='qqgroup',
bot_self_id='511love51',
user_type='direct',
user_type='group',
user_pm=0,
group_id=group_id,
user_id=user_id,

BIN
gsuid_core/fg.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

View File

@ -0,0 +1,552 @@
from pathlib import Path
from typing import Dict, List, Tuple, Union
from PIL import Image, ImageOps, ImageDraw
import gsuid_core.global_val as gv
from gsuid_core.models import Event
from gsuid_core.version import __version__
from gsuid_core.help.draw_core_help import ICON
from gsuid_core.utils.fonts.fonts import core_font
from gsuid_core.utils.database.models import CoreUser, CoreGroup
from gsuid_core.utils.image.convert import convert_img, number_to_chinese
from gsuid_core.utils.image.image_tools import (
add_footer,
get_font_x,
crop_center_img,
)
from .utils import generate_y_ticks
from .plugin_status import plugins_status
from .get_hw import get_cpu_info, get_disk_info, get_swap_info, get_memory_info
TEXT_PATH = Path(__file__).parent / 'texture2d'
THEME_COLOR = (94, 79, 171)
HINT_COLOR = (235, 54, 54)
BLACK = (24, 24, 24)
GREY = (101, 101, 101)
SE_COLOR = (71, 71, 71)
async def draw_title():
title = Image.new('RGBA', (1400, 300))
title_draw = ImageDraw.Draw(title)
icon = Image.open(ICON).resize((186, 186))
title.paste(icon, (92, 77), icon)
MAIN_TITLE = '机器人小柚子'
S_TITLE = '祝你拥有美好的一天!'
all_group = await CoreGroup.get_all_group()
all_user = await CoreUser.get_all_user()
all_group_num = len(all_group) if all_group else 0
all_user_num = len(all_user) if all_user else 0
x = get_font_x(core_font(40), MAIN_TITLE)
title_draw.text(
(330, 147),
MAIN_TITLE,
BLACK,
core_font(40),
'lm',
)
tag_w, tag_h = 111, 47
title_draw.rounded_rectangle(
(340 + x, 121, 340 + x + tag_w, 121 + tag_h),
12,
HINT_COLOR,
)
title_draw.text(
(340 + x + 55, 145),
f'v{__version__}',
'White',
core_font(32),
'mm',
)
title_draw.text(
(330, 193),
S_TITLE,
GREY,
core_font(30),
'lm',
)
msg_w, msg_h = 156, 32
title_draw.rounded_rectangle(
(905, 180, 905 + msg_w, 180 + msg_h),
10,
THEME_COLOR,
)
title_draw.rounded_rectangle(
(1126, 180, 1126 + msg_w, 180 + msg_h),
10,
THEME_COLOR,
)
title_draw.text(
(983, 196),
'已加入群聊',
'White',
core_font(24),
'mm',
)
title_draw.text(
(1204, 196),
'已服务用户',
'White',
core_font(24),
'mm',
)
title_draw.text(
(983, 147),
str(all_group_num),
BLACK,
core_font(60),
'mm',
)
title_draw.text(
(1204, 147),
str(all_user_num),
BLACK,
core_font(60),
'mm',
)
return title
async def draw_bar(text1: str, text2: str):
bar = Image.new('RGBA', (1400, 100))
bar_draw = ImageDraw.Draw(bar)
bar_draw.rounded_rectangle(
(116, 74, 1307, 79),
12,
THEME_COLOR,
)
bar_draw.text(
(113, 48),
text1,
BLACK,
core_font(40),
'lm',
)
x = get_font_x(core_font(40), text1)
bar_draw.text(
(117 + x, 54),
text2,
GREY,
core_font(32),
'lm',
)
return bar
async def draw_badge(
title: str,
value: Union[str, int, float],
avg_value: int = 0,
color: Tuple[int, int, int] = THEME_COLOR,
):
badge = Image.new('RGBA', (210, 150))
badge_draw = ImageDraw.Draw(badge)
badge_draw.rounded_rectangle(
(27, 93, 183, 125),
10,
color,
)
badge_draw.text(
(105, 109),
title,
'White',
core_font(24),
'mm',
)
if isinstance(value, int) or isinstance(value, float) or value.isdigit():
value = number_to_chinese(float(value))
if avg_value != 0 and (isinstance(value, int) or isinstance(value, float)):
if value > avg_value * 1.2:
arrow = Image.open(TEXT_PATH / 'up.png')
point = (92, 51)
badge.paste(
arrow,
(154, 23),
arrow,
)
elif value < avg_value * 0.8:
arrow = Image.open(TEXT_PATH / 'down.png')
point = (92, 51)
badge.paste(
arrow,
(154, 23),
arrow,
)
else:
point = (105, 63)
else:
point = (105, 63)
badge_draw.text(
point,
value,
BLACK,
core_font(46),
'mm',
)
return badge
async def draw_data_analysis1(ev: Event):
local_val = await gv.get_global_val(
ev.real_bot_id,
ev.bot_self_id,
)
yesterday = await gv.get_global_val(
ev.real_bot_id,
ev.bot_self_id,
1,
)
data_bar = Image.new('RGBA', (1400, 200))
badge1 = await draw_badge(
'今日接收',
local_val['receive'],
yesterday['receive'],
SE_COLOR,
)
badge2 = await draw_badge(
'今日发送',
local_val['send'],
yesterday['send'],
HINT_COLOR,
)
badge3 = await draw_badge(
'绘制图片',
local_val['image'],
yesterday['image'],
)
badge4 = await draw_badge(
'触发命令',
local_val['command'],
yesterday['command'],
)
badge5 = await draw_badge(
'使用群聊',
len(local_val['group']),
len(yesterday['group']),
SE_COLOR,
)
badge6 = await draw_badge(
'使用用户',
len(local_val['user']),
len(yesterday['user']),
SE_COLOR,
)
for index, i in enumerate(
[badge1, badge2, badge3, badge4, badge5, badge6]
):
data_bar.paste(i, (75 + index * 210, 25), i)
return data_bar
async def draw_data_analysis2(ev: Event):
data = await gv.get_global_analysis(
ev.real_bot_id,
ev.bot_self_id,
)
badge1 = await draw_badge(
'DAU',
data['DAU'],
0,
HINT_COLOR,
)
badge2 = await draw_badge(
'DAG',
data['DAG'],
)
badge3 = await draw_badge(
'用户新增',
data['NU'],
)
badge4 = await draw_badge(
'用户留存',
data['OU'],
0,
HINT_COLOR,
)
data_bar = Image.new('RGBA', (1400, 200))
for index, i in enumerate([badge1, badge2, badge3, badge4]):
data_bar.paste(i, (75 + index * 210, 25), i)
return data_bar
def draw_ring(value: float):
img = Image.new('RGBA', (100, 100))
resin_percent = value / 100
ring_pic = Image.open(TEXT_PATH / 'ring.webp')
percent = (
round(resin_percent * 49) if round(resin_percent * 49) <= 49 else 49
)
ring_pic.seek(percent)
img.paste(ring_pic, (0, 0), ring_pic)
img_draw = ImageDraw.Draw(img)
img_draw.text(
(50, 50),
f'{int(value)}',
GREY,
core_font(27),
'mm',
)
return img
def draw_hw_status_bar(title: str, value: float, msg: str):
img = Image.new('RGBA', (740, 100))
img_draw = ImageDraw.Draw(img)
ring = draw_ring(value)
img.paste(ring, (77, 0), ring)
img_draw.rounded_rectangle((175, 27, 266, 74), 12, THEME_COLOR)
img_draw.text(
(220, 50),
title,
'White',
core_font(32),
'mm',
)
img_draw.text(
(280, 50),
msg,
GREY,
core_font(32),
'lm',
)
return img
def draw_hw():
img = Image.new('RGBA', (1400, 300))
cpu = get_cpu_info()
memory = get_memory_info()
disk = get_disk_info()
swap = get_swap_info()
cpu_img = draw_hw_status_bar('CPU', cpu['value'], cpu['name'])
memory_img = draw_hw_status_bar('内存', memory['value'], memory['name'])
disk_img = draw_hw_status_bar('磁盘', disk['value'], disk['name'])
swap_img = draw_hw_status_bar('交换', swap['value'], swap['name'])
for index, i in enumerate([cpu_img, memory_img, disk_img, swap_img]):
img.paste(
i,
(20 + (index % 2) * 670, 50 + (index // 2) * 100),
i,
)
return img
async def draw_plugins_status():
plugins_num = len(plugins_status)
plugins_h = 50 + plugins_num * 180
img = Image.new('RGBA', (1400, plugins_h))
img_draw = ImageDraw.Draw(img)
if plugins_num == 0:
img_draw.text(
(700, 25),
'当前没有插件有额外信息',
GREY,
core_font(32),
'mm',
)
else:
for index, i in enumerate(plugins_status):
plugin_bar = Image.new('RGBA', (1400, 180))
plugin_bar_draw = ImageDraw.Draw(plugin_bar)
plugin_bar_draw.rounded_rectangle(
(115, 75, 540, 133),
30,
THEME_COLOR,
)
plugin = plugins_status[i]
icon = plugin['icon']
icon = icon.resize((128, 128))
status = plugin['status']
plugin_bar.paste(icon, (109, 30), icon)
plugin_bar_draw.text(
(251, 104),
i,
'White',
core_font(26),
'lm',
)
for indexj, j in enumerate(status):
badge = await draw_badge(j, await status[j]())
plugin_bar.paste(
badge,
(605 + 210 * indexj, 11),
badge,
)
if index >= 2:
break
img.paste(
plugin_bar,
(0, 25 + 180 * index),
plugin_bar,
)
return img
async def draw_curve(datas: Dict[Tuple[int, int, int], List[float]]):
img = Image.new('RGBA', (1400, 550))
img_draw = ImageDraw.Draw(img)
num = 20
rad = 5
a_y = 375
a_x = 1200
step_x = a_x / num
start_x = 147
is_text = False
for color in datas:
data = datas[color][:num][::-1]
y_ticks = generate_y_ticks(data)
y_ticks = [int(i) for i in y_ticks]
if not is_text:
for yindex, y in enumerate(y_ticks):
img_draw.text(
(116, 460 - 75 * yindex),
str(y),
BLACK,
core_font(30),
'rm',
)
is_text = True
points = []
for dataindex, data in enumerate(data):
x1 = int(start_x + dataindex * step_x)
d_y = (data / y_ticks[-1]) * a_y
y1 = 460 - d_y
points.append((x1, y1))
img_draw.line(points, color, 4)
for p in points:
img_draw.ellipse(
(
p[0] - rad,
p[1] - rad,
p[0] + rad,
p[1] + rad,
),
THEME_COLOR,
)
return img
async def draw_curve_img(ev: Event):
_data = await gv.get_value_analysis(
ev.real_bot_id,
ev.bot_self_id,
20,
)
result: Dict[Tuple[int, int, int], List[float]] = {
THEME_COLOR: [],
HINT_COLOR: [],
}
for day in _data:
data = _data[day]
result[THEME_COLOR].append(data['receive'])
result[HINT_COLOR].append(data['send'])
curve_img = await draw_curve(result)
return curve_img
async def draw_bg(w: int, h: int):
bg = Image.open(TEXT_PATH / 'bg.jpg').convert('RGBA')
bg = crop_center_img(bg, w, h)
mask = Image.open(TEXT_PATH / 'mask.png')
line = Image.open(TEXT_PATH / 'line.png')
fg_temp = Image.new('RGBA', (w, h))
fg_temp.paste(mask, (0, 222), mask)
r, g, b, a = fg_temp.split()
a_inv = ImageOps.invert(a)
fg_temp = Image.merge("RGBA", (r, g, b, a_inv))
_fg = Image.new('RGBA', (w, h))
fg = crop_center_img(Image.open(TEXT_PATH / 'fg.png'), w, h)
_fg.paste(fg, (0, 0), fg_temp)
_fg.save('fg.png')
bg = Image.alpha_composite(bg, _fg)
bg.paste(line, (0, 222), line)
bg.save('bg.png')
return bg
async def draw_status(ev: Event):
title = await draw_title()
bar1 = await draw_bar('服务器基础信息', 'Base Info')
bar2 = await draw_bar('机器人数据统计', 'Data Analysis')
bar3 = await draw_bar('日活曲线', 'Daily Activity Curve')
bar4 = await draw_bar('插件额外信息', 'Extra Data')
hw = draw_hw()
data_bar1 = await draw_data_analysis1(ev)
data_bar2 = await draw_data_analysis2(ev)
plugin_status_img = await draw_plugins_status()
curve_img = await draw_curve_img(ev)
plugins_num = len(plugins_status)
plugins_h = 100 + plugins_num * 180
img = await draw_bg(1400, 2267 + 150 + plugins_h)
img.paste(title, (0, 0), title)
img.paste(bar1, (0, 855), bar1)
img.paste(hw, (0, 920), hw)
img.paste(bar2, (0, 1202), bar2)
img.paste(data_bar1, (0, 1289), data_bar1)
img.paste(data_bar2, (0, 1463), data_bar2)
img.paste(bar3, (0, 1686), bar3)
img.paste(curve_img, (0, 1755), curve_img)
img.paste(bar4, (0, 2267), bar4)
img.paste(plugin_status_img, (0, 2367), plugin_status_img)
img = add_footer(img, footer=Image.open(TEXT_PATH / 'footer.png'))
res = await convert_img(img)
return res

152
gsuid_core/status/get_hw.py Normal file
View File

@ -0,0 +1,152 @@
import asyncio
import platform
import psutil
def get_cpu_info():
try:
with open('/proc/cpuinfo', 'r') as f:
for line in f:
if line.startswith('model name'):
cpu_name = line.split(': ')[1].strip()
break
except FileNotFoundError:
cpu_name = platform.processor() or "Unknown CPU"
cores = psutil.cpu_count(logical=True)
usage = psutil.cpu_percent(interval=1)
cpu_name = cpu_name.split()[0]
return {"name": f"{cpu_name} ({cores}核)", "value": usage}
def get_memory_info():
mem = psutil.virtual_memory()
total_gb = round(mem.total / (1024**3), 1)
used_mem_gb = round(mem.used / (1024**3), 1)
# 最大内存
usage_percent = mem.percent
return {"name": f"{used_mem_gb}GB / {total_gb}GB", "value": usage_percent}
def get_disk_info():
# 获取所有物理硬盘信息(跨平台)
total_size = 0
used_size = 0
for part in psutil.disk_partitions(all=False):
if (
'fixed' in part.opts or part.fstype != ''
): # 过滤可移动磁盘和虚拟分区
try:
usage = psutil.disk_usage(part.mountpoint)
used_size += usage.used
total_size += usage.total
except PermissionError:
continue # 跳过无权限访问的分区
# 转换为 TB/GB 显示
total_gb = total_size / (1024**3)
if total_gb >= 1000:
total_tb = round(total_gb / 1024, 1)
name = f"{total_tb}TB"
else:
name = f"{round(total_gb, 1)}GB"
used_gb = used_size / (1024**3)
used = f"{round(used_gb, 1)}GB"
name = f"{used} / {name}"
# 使用根分区的使用率作为示例(可根据需求修改)
usage_percent = psutil.disk_usage('/').percent
return {"name": name, "value": usage_percent}
async def get_network_info():
# 异步获取两次流量统计
before = psutil.net_io_counters()
await asyncio.sleep(1)
after = psutil.net_io_counters()
speed_current = (
(
after.bytes_sent
- before.bytes_sent
+ after.bytes_recv
- before.bytes_recv
)
* 8
/ 1e6
) # Mbps
# 异步获取最大带宽
speed_max = 1000
try:
if platform.system() == 'Linux':
# 异步执行命令:获取默认网络接口
proc = await asyncio.create_subprocess_exec(
'ip',
'route',
'show',
'default',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await proc.communicate()
default_interface = (
stdout.decode().split('dev ')[1].split()[0]
if 'dev' in stdout.decode()
else None
)
if default_interface:
# 异步读取 /sys/class/net/{interface}/speed
proc = await asyncio.create_subprocess_exec(
'cat',
f'/sys/class/net/{default_interface}/speed',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await proc.communicate()
speed_str = stdout.decode().strip()
if speed_str.isdigit():
speed_max = float(speed_str)
elif platform.system() == 'Windows':
# 异步执行 PowerShell 命令
proc = await asyncio.create_subprocess_exec(
'powershell',
'-Command',
"(Get-NetAdapter | Where-Object {$_.Status -eq 'Up'}).Speed",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await proc.communicate()
output = stdout.decode().strip()
if output.isdigit():
speed_max = float(output) / 1e6 # 转换为 Mbps
except Exception as e:
print(f"获取网络速度失败: {e}")
usage_percent = min(round((speed_current / speed_max) * 100, 1), 100)
return {"name": f"{speed_max:.0f}Mbps", "value": usage_percent}
def get_swap_info():
swap = psutil.swap_memory()
# 总容量格式化GB/TB
total_gb = swap.total / (1024**3)
if total_gb >= 1000:
total_tb = round(total_gb / 1024, 1)
name = f"{total_tb}TB"
else:
name = f"{round(total_gb, 1)}GB"
# 已使用容量
used_gb = swap.used / (1024**3)
used = f"{round(used_gb, 1)}GB"
name = f"{used} / {name}"
# 使用率百分比(若无 SWAP 则显示 0
usage = swap.percent if swap.total > 0 else 0.0
return {"name": name, "value": usage}

View File

@ -0,0 +1,8 @@
from typing import Dict, Union, Callable, Awaitable, TypedDict
from PIL import Image
class PluginStatus(TypedDict):
icon: Image.Image
status: Dict[str, Callable[..., Awaitable[Union[str, int, float]]]]

View File

@ -0,0 +1,16 @@
from typing import Dict, Union, Callable, Awaitable
from PIL import Image
from .models import PluginStatus
plugins_status: Dict[str, PluginStatus] = {}
def register_status(
ICON: Image.Image,
plugin_name: str,
plugin_status: Dict[str, Callable[..., Awaitable[Union[str, int, float]]]],
):
global plugins_status
plugins_status[plugin_name] = {'icon': ICON, 'status': plugin_status}

Binary file not shown.

After

Width:  |  Height:  |  Size: 343 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 801 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 21 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 12 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 16 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.0 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 41 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 782 B

View File

@ -0,0 +1,54 @@
import math
from typing import List
def generate_y_ticks(values: List[float]) -> List[float]:
if not values:
return []
max_val = max(values)
avg_val = sum(values) / len(values)
if max_val == 0:
return [0.0] * 6
# 计算d_min确保步长下最高刻度和中间刻度满足条件
d_min = max(max_val / 5, avg_val / 3)
# 候选基数列表,用于生成整洁的步长
candidates = [1, 1.2, 1.5, 2, 2.5, 3, 4, 5, 6, 7, 8, 9, 10]
# 计算数量级和基数
exponent = math.floor(math.log10(d_min)) if d_min > 0 else 0
base = d_min / (10**exponent) if d_min > 0 else 0
# 寻找合适的候选基数
selected_base = None
for c in candidates:
if c >= base:
selected_base = c
break
# 如果没有更大的基数,则使用最小基数并增加数量级
if selected_base is None:
selected_base = candidates[0]
exponent += 1
# 初始步长候选
step_candidate = selected_base * (10**exponent)
# 确保步长满足条件
while True:
if 5 * step_candidate > max_val and 3 * step_candidate > avg_val:
break
# 寻找下一个候选基数或增加数量级
current_index = candidates.index(selected_base)
if current_index < len(candidates) - 1:
selected_base = candidates[current_index + 1]
else:
selected_base = candidates[0]
exponent += 1
step_candidate = selected_base * (10**exponent)
# 生成等距的六个刻度值
ticks = [i * step_candidate for i in range(6)]
return ticks

View File

@ -1,3 +1,4 @@
import math
from io import BytesIO
from pathlib import Path
from base64 import b64encode
@ -171,3 +172,22 @@ async def text2pic(text: str, max_size: int = 800, font_size: int = 24):
)
img = img.crop((0, 0, max_size, int(y + 80)))
return await convert_img(img)
def number_to_chinese(num):
units = [
{'threshold': 10**8, 'suffix': '亿'}, # 1e8 (100,000,000)
{'threshold': 10**7, 'suffix': '千万'}, # 1e7 (10,000,000)
{'threshold': 10**4, 'suffix': ''}, # 1e4 (10,000)
{'threshold': 10**3, 'suffix': ''}, # 1e3 (1,000)
]
for unit in units:
if num >= unit['threshold']:
value = num / unit['threshold']
truncated = math.floor(value * 10) / 10 # 截断一位小数,不四舍五入
return f"{truncated:.1f}{unit['suffix']}"
# 处理小于1e3的情况
if isinstance(num, float) and num.is_integer():
return str(int(num))
else:
return str(num)

View File

@ -17,6 +17,11 @@ TEXT_PATH = Path(__file__).parent / 'texture2d'
BG_PATH = Path(__file__).parents[1] / 'default_bg'
def get_font_x(font: ImageFont.FreeTypeFont, text: str):
bbox = font.getbbox(text)
return int(bbox[2] - bbox[0])
def get_div():
return Image.open(TEXT_PATH / 'div.png')
@ -58,6 +63,27 @@ def get_v4_footer():
return Image.open(TEXT_PATH / 'footer.png')
def add_footer(
img: Image.Image,
w: int = 0,
footer: Optional[Image.Image] = None,
) -> Image.Image:
if footer is None:
footer = get_v4_footer()
w = img.size[0] if not w else w
if w != footer.size[0]:
footer = footer.resize(
(w, int(footer.size[1] * w / footer.size[0])),
)
x, y = (
int((img.size[0] - footer.size[0]) / 2),
img.size[1] - footer.size[1] - 10,
)
img.paste(footer, (x, y), footer)
return img
def get_v4_bg(w: int, h: int, is_dark: bool = False, is_blur: bool = False):
CI_img = CustomizeImage(BG_PATH)
img = CI_img.get_image(None, w, h)

View File

@ -1,3 +1,5 @@
[[index]]
url = "https://ssss/simple"
[[index]]
url = "https://mirror.nju.edu.cn/pypi/web/simple"
default = true