This commit is contained in:
baiqwerdvd 2024-12-24 16:02:43 +08:00
parent a3dec383b1
commit 3e8d092c40
No known key found for this signature in database
GPG Key ID: 7717E46E1797411A
14 changed files with 481 additions and 18 deletions

1
.gitignore vendored
View File

@ -672,6 +672,7 @@ upload_file.py
test.py test.py
test2.py test2.py
test3.py test3.py
test4.py
test.json test.json
skill_table_test.json skill_table_test.json

View File

@ -0,0 +1,112 @@
import asyncio
import random
from gsuid_core.aps import scheduler
from gsuid_core.bot import Bot
from gsuid_core.data_store import get_res_path
from gsuid_core.logger import logger
from gsuid_core.models import Event
from gsuid_core.subscribe import gs_subscribe
from gsuid_core.sv import SV
from msgspec import json as msgjson
from ..arknightsuid_config import PREFIX, ArkConfig
from .draw_img import get_ann_img
from .get_data import check_bulletin_update, get_announcement, write_json
from .model import BulletinData, BulletinMeta, BulletinTargetData, BulletinTargetDataItem
sv_ann = SV("明日方舟公告")
sv_ann_sub = SV("订阅明日方舟公告", pm=3)
task_name_ann = "订阅明日方舟公告"
ann_minute_check: int = ArkConfig.get_config("AnnMinuteCheck").data
@sv_ann.on_command(f"{PREFIX}公告")
async def ann_(bot: Bot, ev: Event):
cid = ev.text
if not cid.isdigit():
raise Exception("公告ID不正确")
data = await get_announcement(cid)
img = await get_ann_img(data)
await bot.send(img)
@sv_ann_sub.on_fullmatch(f"{PREFIX}订阅公告")
async def sub_ann_(bot: Bot, ev: Event):
if ev.group_id is None:
return await bot.send("请在群聊中订阅")
data = await gs_subscribe.get_subscribe(task_name_ann)
if data:
for subscribe in data:
if subscribe.group_id == ev.group_id:
return await bot.send("已经订阅了明日方舟公告!")
await gs_subscribe.add_subscribe(
"session",
task_name=task_name_ann,
event=ev,
extra_message="",
)
logger.info(data)
await bot.send("成功订阅明日方舟公告!")
@sv_ann_sub.on_fullmatch((f"{PREFIX}取消订阅公告", f"{PREFIX}取消公告", f"{PREFIX}退订公告"))
async def unsub_ann_(bot: Bot, ev: Event):
if ev.group_id is None:
return await bot.send("请在群聊中取消订阅")
data = await gs_subscribe.get_subscribe(task_name_ann)
if data:
for subscribe in data:
if subscribe.group_id == ev.group_id:
await gs_subscribe.delete_subscribe("session", task_name_ann, ev)
return await bot.send("成功取消订阅明日方舟公告!")
return await bot.send("未曾订阅明日方舟公告!")
@scheduler.scheduled_job("interval", minutes=ann_minute_check)
async def check_ark_ann():
await check_ark_ann_state()
async def check_ark_ann_state():
logger.info("[明日方舟公告] 定时任务: 明日方舟公告查询..")
bulletin_path = get_res_path(["ArknightsUID", "announce"]) / "bulletin.meta.json"
logger.info("Checking for game bulletin...")
if not bulletin_path.exists():
data = msgjson.encode(BulletinMeta())
write_json(data, bulletin_path)
logger.info("[明日方舟公告] 初始成功, 将在下个轮询中更新.")
return
updates = await check_bulletin_update()
datas = await gs_subscribe.get_subscribe(task_name_ann)
if not datas:
logger.info("[明日方舟公告] 暂无群订阅")
return
if len(updates) == 0:
logger.info("[明日方舟公告] 没有最新公告")
return
for data in updates.values():
try:
img = await get_ann_img(data)
if isinstance(img, str):
continue
for subscribe in datas:
await subscribe.send(img)
await asyncio.sleep(random.uniform(1, 3))
except Exception as e:
logger.exception(e)
logger.info("[明日方舟公告] 推送完毕")

View File

@ -0,0 +1,173 @@
import textwrap
from typing import Any
from bs4 import BeautifulSoup, element
from gsuid_core.logger import logger
from gsuid_core.utils.fonts.fonts import core_font as cf
from gsuid_core.utils.image.convert import convert_img
from gsuid_core.utils.image.image_tools import get_div
from gsuid_core.utils.image.utils import download_pic_to_image
from PIL import Image, ImageDraw
from .model import BulletinData
async def get_ann_img(data: BulletinData) -> str | bytes:
match data.displayType:
case 1:
img = await download_pic_to_image(data.bannerImageUrl)
return await convert_img(img)
case 2:
soup = BeautifulSoup(data.content, "lxml")
img = await soup_to_img(data.header, soup)
return img
case _:
return "暂不支持的公告类型"
async def process_tag(
elements: list[dict[str, Any]],
point: int,
tag: element.Tag,
):
space = 10
_type = _data = None
logger.debug(f"[GsCore] 正在处理TAG: {tag.name}")
if tag.name == "img":
img_url = tag.get("src")
if isinstance(img_url, str):
if img_url.startswith("https://web.hycdn.cn/announce/images"):
img = await download_pic_to_image(img_url)
new_h = int((930 / img.size[0]) * img.size[1])
img = img.resize((930, new_h))
point += new_h
_type = "image"
_data = img
elif tag.name and tag.name.startswith("h") and tag.name != "html":
text = tag.get_text(strip=True)
line = len(textwrap.wrap(text, width=14))
point += 70 * line if line >= 1 else 70
_type = "title"
_data = text
elif tag.name == "div" and tag.has_attr("class"):
if "media-wrap image-wrap" in tag["class"]:
tag_img = tag.find("img")
if isinstance(tag_img, element.Tag):
img_url = tag_img.get("src")
if img_url:
point += 60
_type = "div"
_data = "div"
elif tag.name == "p":
text = tag.get_text(strip=True)
if text:
if tag.get("style") == "text-align:right;":
line = len(textwrap.wrap(text, width=57))
point += 30 * line if line >= 1 else 30
_type = "right_text"
_data = text
else:
line = len(textwrap.wrap(text, width=57))
point += 30 * line if line >= 1 else 30
_type = "text"
_data = text
else:
point += 10
if _data is not None and _type is not None:
if elements:
pre_pos = elements[-1]["next_pos"]
else:
pre_pos = 105
elements.append(
{
"type": _type,
"data": _data,
"pos": pre_pos,
"next_pos": point,
}
)
point += space
return point, elements
async def soup_to_img(header: str, soup: BeautifulSoup) -> str | bytes:
elements = []
point = 105
div = get_div()
logger.info("[GsCore] 开始解析帖子内容...")
for tag in soup.descendants:
point, elements = await process_tag(
elements,
point,
tag, # type: ignore
)
logger.info("[GsCore] 帖子解析完成!进入图片处理流程...")
img = Image.new("RGB", (1000, point), (255, 255, 255))
draw = ImageDraw.Draw(img)
if header != "":
header_img = "https://ak.hycdn.cn/announce/assets/images/announcement/header.jpg"
header_img = await download_pic_to_image(header_img)
new_h = int((930 / header_img.size[0]) * header_img.size[1])
header_img = header_img.resize((930, new_h))
img.paste(header_img, (35, 35))
draw.text(
(45, 42),
header,
font=cf(30),
fill=(255, 255, 255),
)
for i in elements:
if i["type"] == "image":
img.paste(i["data"], (35, i["pos"]))
elif i["type"] == "title":
draw.text(
(35, i["pos"]),
i["data"],
font=cf(30),
fill=(0, 0, 0),
)
elif i["type"] == "strong_text":
wrapped_text = textwrap.wrap(i["data"], width=57)
for index, line in enumerate(wrapped_text):
# 加粗
draw.text(
(35, i["pos"] + index * 30),
line,
font=cf(16),
fill=(0, 0, 0),
)
elif i["type"] == "text":
wrapped_text = textwrap.wrap(i["data"], width=57)
for index, line in enumerate(wrapped_text):
draw.text(
(35, i["pos"] + index * 30),
line,
font=cf(16),
fill=(0, 0, 0),
)
elif i["type"] == "right_text":
wrapped_text = textwrap.wrap(i["data"], width=57)
for index, line in enumerate(wrapped_text):
draw.text(
(965, i["pos"] + index * 30),
line,
anchor="rm",
font=cf(16),
fill=(0, 0, 0),
)
elif i["type"] == "div":
img.paste(div, (0, i["pos"]), div)
logger.info("[GsCore] 图片处理完成!")
return await convert_img(img)

View File

@ -0,0 +1,124 @@
import json
from pathlib import Path
from typing import cast
import aiohttp
from gsuid_core.data_store import get_res_path
from gsuid_core.logger import logger
from msgspec import convert
from msgspec import json as msgjson
from .model import BulletinData, BulletinMeta, BulletinTargetData, BulletinTargetDataItem
def read_json(file_path: Path) -> dict[str, object]:
try:
with Path.open(file_path, encoding="UTF-8") as file:
return cast(dict[str, object], json.load(file))
except FileNotFoundError as _:
raise FileNotFoundError(f"Error reading JSON file: {file_path}")
except json.JSONDecodeError as e:
raise e
def write_json(data: object, file_path: Path) -> None:
try:
with Path.open(file_path, mode="w", encoding="UTF-8") as file:
json.dump(data, file, sort_keys=False, indent=4, ensure_ascii=False)
except FileNotFoundError as e:
raise e
async def get_image(url: str) -> bytes:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.read()
async def get_announcement(cid: str) -> BulletinData:
url = f"https://ak-webview.hypergryph.com/api/game/bulletin/{cid}"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
data = convert(data.get("data", {}), BulletinData)
return data
async def check_bulletin_update() -> dict[str, BulletinData]:
bulletin_path = get_res_path(["ArknightsUID", "announce"]) / "bulletin.meta.json"
logger.info("Checking for game bulletin...")
bulletin_meta = convert(read_json(bulletin_path), BulletinMeta)
android_data = None
bilibili_data = None
ios_data = None
async with aiohttp.ClientSession() as session:
for target in ["Android", "Bilibili", "IOS"]:
async with session.get(
f"https://ak-webview.hypergryph.com/api/game/bulletinList?target={target}"
) as response:
cur_meta = await response.json()
if cur_meta.get("code") == 0:
match target:
case "Android":
android_data = convert(cur_meta.get("data", {}), BulletinTargetData)
bulletin_meta.target.Android = android_data
case "Bilibili":
bilibili_data = convert(cur_meta.get("data", {}), BulletinTargetData)
bulletin_meta.target.Bilibili = bilibili_data
case "IOS":
ios_data = convert(cur_meta.get("data", {}), BulletinTargetData)
bulletin_meta.target.IOS = ios_data
logger.info("The file 'bulletin.meta.json' has been successfully updated.")
assert android_data is not None
assert bilibili_data is not None
assert ios_data is not None
update_list = android_data.list_ + bilibili_data.list_ + ios_data.list_
update_set: set[int] = set()
update_list: list[BulletinTargetDataItem] = [
x
for x in update_list
if x.updatedAt not in update_set and not update_set.add(x.updatedAt)
]
update_list.sort(key=lambda x: x.updatedAt, reverse=True)
new_ann: dict[str, BulletinData] = {}
for item in update_list:
for key, value in bulletin_meta.update.items():
if value.cid == item.cid and value.updatedAt == item.updatedAt:
break
elif value.cid == item.cid and value.updatedAt != item.updatedAt:
bulletin_meta.update.pop(key)
if "_" in key:
new_key = f"{item.cid}_{int(key.split('_')[1]) + 1}"
else:
new_key = f"{item.cid}_1"
ann = await get_announcement(item.cid)
bulletin_meta.update[new_key] = ann
new_ann[item.cid] = ann
logger.info(f"Bumped bulletin found: {item.cid}:{item.title}")
break
elif value.cid != item.cid:
continue
if item.cid not in bulletin_meta.data:
ann = await get_announcement(item.cid)
bulletin_meta.data[item.cid] = ann
new_ann[item.cid] = ann
logger.info(f"New bulletin found: {item.cid}:{item.title}")
bulletin_meta.data = dict(sorted(bulletin_meta.data.items(), key=lambda x: int(x[0])))
bulletin_meta.update = dict(
sorted(bulletin_meta.update.items(), key=lambda x: x[1].cid, reverse=False)
)
data = msgjson.decode(msgjson.encode(bulletin_meta))
write_json(data, bulletin_path)
return new_ann

View File

@ -0,0 +1,47 @@
from typing import Any
from msgspec import Struct, field
class BulletinTargetDataItem(Struct):
cid: str
title: str
category: int
displayTime: str
updatedAt: int
sticky: bool
class BulletinTargetDataPopup(Struct):
popupList: list[Any]
defaultPopup: str
class BulletinTargetData(Struct):
list_: list[BulletinTargetDataItem] = field(name="list", default_factory=list)
popup: dict[str, Any] = field(default_factory=dict)
class BulletinTarget(Struct):
Android: BulletinTargetData = field(default_factory=BulletinTargetData)
Bilibili: BulletinTargetData = field(default_factory=BulletinTargetData)
IOS: BulletinTargetData = field(default_factory=BulletinTargetData)
class BulletinData(Struct):
cid: str
displayType: int
title: str
category: int
header: str
content: str
jumpLink: str
bannerImageUrl: str
displayTime: str
updatedAt: int
class BulletinMeta(Struct):
data: dict[str, BulletinData] = field(default_factory=dict)
update: dict[str, BulletinData] = field(default_factory=dict)
target: BulletinTarget = field(default_factory=BulletinTarget)

View File

@ -3,7 +3,7 @@ from typing import Dict
from gsuid_core.gss import gss from gsuid_core.gss import gss
from gsuid_core.logger import logger from gsuid_core.logger import logger
from ..arknightsuid_config.ark_config import arkconfig from ..arknightsuid_config.ark_config import ArkConfig
from ..utils.ark_api import ark_skd_api from ..utils.ark_api import ark_skd_api
from ..utils.database.models import ArknightsPush, ArknightsUser from ..utils.database.models import ArknightsPush, ArknightsUser
from ..utils.models.skland.models import ArknightsPlayerInfoModel from ..utils.models.skland.models import ArknightsPlayerInfoModel
@ -50,7 +50,7 @@ async def all_check(
for mode in NOTICE.keys(): for mode in NOTICE.keys():
# 检查条件 # 检查条件
if push_data[f"{mode}_is_push"] is True: if push_data[f"{mode}_is_push"] is True:
if arkconfig.get_config("CrazyNotice").data: if ArkConfig.get_config("CrazyNotice").data:
if not await check(mode, raw_data, push_data[f"{mode}_value"]): if not await check(mode, raw_data, push_data[f"{mode}_value"]):
await ArknightsPush.update_push_data( await ArknightsPush.update_push_data(
uid, uid,

View File

@ -7,16 +7,13 @@ from gsuid_core.sv import SV
from gsuid_core.utils.error_reply import UID_HINT from gsuid_core.utils.error_reply import UID_HINT
from ..utils.database.models import ArknightsBind from ..utils.database.models import ArknightsBind
from .ark_config import ArkConfig
from .set_config import set_config_func, set_push_value from .set_config import set_config_func, set_push_value
sv_self_config = SV("ark配置") sv_self_config = SV("ark配置")
# @sv_self_config.on_fullmatch(("ark配置", "方舟配置")) PREFIX = ArkConfig.get_config("ArkPrefix").data
# async def send_config_card(bot: Bot, ev: Event):
# logger.info("开始执行[ark配置]")
# im = await draw_config_img(ev.bot_id)
# await bot.send(im)
@sv_self_config.on_prefix(("ark设置")) # noqa: UP034 @sv_self_config.on_prefix(("ark设置")) # noqa: UP034

View File

@ -3,4 +3,4 @@ from gsuid_core.utils.plugins_config.gs_config import StringConfig
from ..utils.resource.RESOURCE_PATH import CONFIG_PATH from ..utils.resource.RESOURCE_PATH import CONFIG_PATH
from .config_default import CONIFG_DEFAULT from .config_default import CONIFG_DEFAULT
arkconfig = StringConfig("ArknightsUID", CONFIG_PATH, CONIFG_DEFAULT) ArkConfig = StringConfig("ArknightsUID", CONFIG_PATH, CONIFG_DEFAULT)

View File

@ -3,6 +3,7 @@ from typing import Dict
from gsuid_core.utils.plugins_config.models import ( from gsuid_core.utils.plugins_config.models import (
GSC, GSC,
GsBoolConfig, GsBoolConfig,
GsIntConfig,
GsListStrConfig, GsListStrConfig,
GsStrConfig, GsStrConfig,
) )
@ -29,4 +30,12 @@ CONIFG_DEFAULT: Dict[str, GSC] = {
"开启后当达到推送阈值将会一直推送", "开启后当达到推送阈值将会一直推送",
False, False,
), ),
"ArkPrefix": GsStrConfig(
"插件命令前缀(确认无冲突再修改)",
"用于设置ArknightsUID前缀的配置",
"ark",
),
"AnnMinuteCheck": GsIntConfig(
"公告推送时间检测单位min", "公告推送时间检测单位min", 10, 60
),
} }

View File

@ -3,7 +3,7 @@ from typing import Optional
from gsuid_core.logger import logger from gsuid_core.logger import logger
from ..utils.database.models import ArknightsPush, ArknightsUser from ..utils.database.models import ArknightsPush, ArknightsUser
from .ark_config import arkconfig from .ark_config import ArkConfig
from .config_default import CONIFG_DEFAULT from .config_default import CONIFG_DEFAULT
PUSH_MAP = { PUSH_MAP = {
@ -78,7 +78,7 @@ async def set_config_func(
logger.info(f"config_name:{config_name},query:{query}") logger.info(f"config_name:{config_name},query:{query}")
# 执行设置 # 执行设置
if query is not None: if query is not None:
arkconfig.set_config(name, query) ArkConfig.set_config(name, query)
im = "成功设置{}{}".format(config_name, "" if query else "") im = "成功设置{}{}".format(config_name, "" if query else "")
else: else:
im = "未传入参数query!" im = "未传入参数query!"

View File

@ -9,12 +9,12 @@ from gsuid_core.models import Event
from gsuid_core.sv import SV from gsuid_core.sv import SV
from gsuid_core.utils.database.api import get_uid from gsuid_core.utils.database.api import get_uid
from ..arknightsuid_config.ark_config import arkconfig from ..arknightsuid_config.ark_config import ArkConfig
from ..utils.ark_prefix import PREFIX from ..utils.ark_prefix import PREFIX
from ..utils.database.models import ArknightsBind from ..utils.database.models import ArknightsBind
from .sign import daily_sign, sign_in from .sign import daily_sign, sign_in
SIGN_TIME = arkconfig.get_config("SignTime").data SIGN_TIME = ArkConfig.get_config("SignTime").data
sv_sign = SV("森空岛签到") sv_sign = SV("森空岛签到")
sv_sign_config = SV("森空岛管理", pm=2) sv_sign_config = SV("森空岛管理", pm=2)
@ -23,7 +23,7 @@ sv_sign_config = SV("森空岛管理", pm=2)
# 每日零点半执行森空岛签到 # 每日零点半执行森空岛签到
@scheduler.scheduled_job("cron", hour=SIGN_TIME[0], minute=SIGN_TIME[1]) @scheduler.scheduled_job("cron", hour=SIGN_TIME[0], minute=SIGN_TIME[1])
async def ark_sign_at_night(): async def ark_sign_at_night():
if arkconfig.get_config("SchedSignin").data: if ArkConfig.get_config("SchedSignin").data:
await send_daily_sign() await send_daily_sign()

View File

@ -7,7 +7,7 @@ from typing import Sequence
from gsuid_core.gss import gss from gsuid_core.gss import gss
from gsuid_core.logger import logger from gsuid_core.logger import logger
from ..arknightsuid_config.ark_config import arkconfig from ..arknightsuid_config.ark_config import ArkConfig
from ..utils.ark_api import ark_skd_api from ..utils.ark_api import ark_skd_api
from ..utils.database.models import ArknightsUser from ..utils.database.models import ArknightsUser
@ -93,7 +93,7 @@ async def single_daily_sign(bot_id: str, ark_uid: str, gid: str, qid: str):
"push_message": "", "push_message": "",
} }
# 检查是否开启简洁签到 # 检查是否开启简洁签到
if arkconfig.get_config("SignReportSimple").data: if ArkConfig.get_config("SignReportSimple").data:
# 如果失败, 则添加到推送列表 # 如果失败, 则添加到推送列表
if im.startswith(("ark签到失败", "网络有点忙", "OK", "ok")): if im.startswith(("ark签到失败", "网络有点忙", "OK", "ok")):
message = f"[CQ:at,qq={qid}] {im}" message = f"[CQ:at,qq={qid}] {im}"

View File

@ -1,5 +1,5 @@
from typing import cast from typing import cast
from ..arknightsuid_config.ark_config import arkconfig from ..arknightsuid_config.ark_config import ArkConfig
PREFIX = cast(str, arkconfig.get_config("ArknightsPrefix").data) PREFIX = cast(str, ArkConfig.get_config("ArknightsPrefix").data)

2
pdm.lock generated
View File

@ -3,7 +3,7 @@
[metadata] [metadata]
groups = ["default", "dev", "test"] groups = ["default", "dev", "test"]
strategy = ["cross_platform"] strategy = []
lock_version = "4.5.0" lock_version = "4.5.0"
content_hash = "sha256:6220f156c4c1b2b47e90c0292d4e5af93c179f7e6c3e3b568440641f2d348526" content_hash = "sha256:6220f156c4c1b2b47e90c0292d4e5af93c179f7e6c3e3b568440641f2d348526"