添加深渊查询 (#36)

* 添加深渊查询

* 🚨 `pre-commit-ci`修复格式错误

* 添加深渊查询

* 🚨 `pre-commit-ci`修复格式错误

* .

* .

* 🚨 `pre-commit-ci`修复格式错误

* .

* 🚨 `pre-commit-ci`修复格式错误

* .

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
This commit is contained in:
季落 2023-05-18 13:32:07 +08:00 committed by GitHub
parent 384bf4e7dc
commit e6c50ac8e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 572 additions and 4 deletions

View File

@ -24,6 +24,8 @@ STAR_RAIL_AVATAR_INFO_URL = (
f'{NEW_URL}/game_record/app/hkrpg/api/avatar/info' # 角色详细信息接口 f'{NEW_URL}/game_record/app/hkrpg/api/avatar/info' # 角色详细信息接口
) )
CHALLENGE_INFO_URL = f'{NEW_URL}/game_record/app/hkrpg/api/challenge'
STAR_RAIL_GACHA_LOG_URL = f'{OLD_URL}/common/gacha_record/api/getGachaLog' STAR_RAIL_GACHA_LOG_URL = f'{OLD_URL}/common/gacha_record/api/getGachaLog'

View File

@ -34,6 +34,52 @@ class RoleBasicInfo(TypedDict):
level: int level: int
################
# 深渊相关 #
################
class AbyssTime(TypedDict):
year: int
month: int
day: int
hour: int
minute: int
class AbyssAvatar(TypedDict):
id: int
level: int
icon: str
rarity: int
element: str
class AbyssNodeDetail(TypedDict):
challenge_time: AbyssTime
avatars: List[AbyssAvatar]
class AbyssFloorDetail(TypedDict):
name: str
round_num: int
star_num: int
node_1: List[AbyssNodeDetail]
node_2: List[AbyssNodeDetail]
class AbyssData(TypedDict):
schedule_id: int
begin_time: AbyssTime
end_time: AbyssTime
star_num: int
max_floor: str
battle_num: int
has_data: bool
max_floor_detail: bool
all_floor_detail: List[AbyssFloorDetail]
################ ################
# 每月札记相关 # # 每月札记相关 #
################ ################

View File

@ -0,0 +1,53 @@
import re
from gsuid_core.sv import SV
from gsuid_core.bot import Bot
from gsuid_core.models import Event
from gsuid_core.utils.error_reply import UID_HINT
from ..utils.convert import get_uid
from .draw_abyss_card import draw_abyss_img
sv_srabyss = SV('sr查询深渊')
@sv_srabyss.on_command(
('sr查询深渊', 'srsy', 'sr查询上期深渊', 'srsqsy', 'sr上期深渊', 'sr深渊'), block=True
)
async def send_srabyss_info(bot: Bot, ev: Event):
name = ''.join(re.findall('[\u4e00-\u9fa5]', ev.text))
if name:
return
await bot.logger.info('开始执行[sr查询深渊信息]')
uid = await get_uid(bot, ev)
if uid is None:
return await bot.send(UID_HINT)
await bot.logger.info('[sr查询深渊信息]uid: {}'.format(uid))
if 'sq' in ev.command or '上期' in ev.command:
schedule_type = '2'
else:
schedule_type = '1'
await bot.logger.info('[sr查询深渊信息]深渊期数: {}'.format(schedule_type))
if ev.text in ['', '', '十一', '十二']:
floor = (
ev.text.replace('', '9')
.replace('十一', '11')
.replace('十二', '12')
.replace('', '10')
)
else:
floor = ev.text
if floor and floor.isdigit():
floor = int(floor)
else:
floor = None
await bot.logger.info('[sr查询深渊信息]深渊层数: {}'.format(floor))
# data = GsCookie()
# raw_abyss_data = await data.get_spiral_abyss_data(uid, schedule_type)
# print(raw_abyss_data)
im = await draw_abyss_img(ev.user_id, uid, schedule_type)
await bot.send(im)

View File

@ -0,0 +1,415 @@
import asyncio
from pathlib import Path
from typing import Union
from PIL import Image, ImageDraw
from gsuid_core.logger import logger
from gsuid_core.utils.error_reply import get_error
from .utils import get_icon
from ..utils.convert import GsCookie
from ..utils.image.convert import convert_img
from ..sruid_utils.api.mys.models import AbyssAvatar
# )
from ..utils.image.image_tools import get_qq_avatar, draw_pic_with_ring
from ..utils.fonts.starrail_fonts import (
sr_font_22,
sr_font_28,
sr_font_30,
sr_font_34,
sr_font_42,
)
# from ..utils.resource.download_url import download_file
# from ..utils.resource.generate_char_card import create_single_char_card
# from ..utils.resource.RESOURCE_PATH import (
# CHAR_ICON_PATH,
TEXT_PATH = Path(__file__).parent / 'texture2D'
white_color = (255, 255, 255)
gray_color = (175, 175, 175)
img_bg = Image.open(TEXT_PATH / 'bg.jpg')
level_cover = Image.open(TEXT_PATH / 'level_cover.png').convert("RGBA")
char_bg_4 = Image.open(TEXT_PATH / 'char4_bg.png').convert("RGBA")
char_bg_5 = Image.open(TEXT_PATH / 'char5_bg.png').convert("RGBA")
star_yes = Image.open(TEXT_PATH / 'star.png').convert("RGBA")
star_gray = Image.open(TEXT_PATH / 'star_gray.png').convert("RGBA")
elements = {
"ice": Image.open(TEXT_PATH / "IconNatureColorIce.png").convert("RGBA"),
"fire": Image.open(TEXT_PATH / "IconNatureColorFire.png").convert("RGBA"),
"imaginary": Image.open(
TEXT_PATH / "IconNatureColorImaginary.png"
).convert("RGBA"),
"quantum": Image.open(TEXT_PATH / "IconNatureColorQuantum.png").convert(
"RGBA"
),
"lightning": Image.open(TEXT_PATH / "IconNatureColorThunder.png").convert(
"RGBA"
),
"wind": Image.open(TEXT_PATH / "IconNatureColorWind.png").convert("RGBA"),
"physical": Image.open(TEXT_PATH / "IconNaturePhysical.png").convert(
"RGBA"
),
}
async def get_abyss_star_pic(star: int) -> Image.Image:
star_pic = Image.open(TEXT_PATH / f'star{star}.png')
return star_pic
async def _draw_abyss_card(
char: AbyssAvatar,
talent_num: str,
floor_pic: Image.Image,
index_char: int,
index_part: int,
):
# char_id = char['id']
# # 确认角色头像路径
# char_pic_path = CHAR_ICON_PATH / f'{char_id}.png'
char_bg = (char_bg_4 if char['rarity'] == 4 else char_bg_5).copy()
char_icon = (await get_icon(char['icon'])).resize((151, 170))
element_icon = elements[char['element']]
char_bg.paste(char_icon, (24, 16), mask=char_icon)
char_bg.paste(level_cover, (0, 0), mask=level_cover)
char_bg.paste(element_icon, (135, 30), mask=element_icon)
# 不存在自动下载
# if not char_pic_path.exists():
# await create_single_char_card(char_id)
# talent_pic = await get_talent_pic(int(talent_num))
# talent_pic = talent_pic.resize((90, 45))
# char_card.paste(talent_pic, (137, 260), talent_pic)
char_card_draw = ImageDraw.Draw(char_bg)
char_card_draw.text(
(100, 165),
f'等级 {char["level"]}',
font=sr_font_22,
fill=white_color,
anchor='mm',
)
floor_pic.paste(
char_bg,
(75 + 185 * index_char, 130 + index_part * 219),
char_bg,
)
async def _draw_floor_card(
level_star: int,
floor_pic: Image.Image,
img: Image.Image,
index_floor: int,
floor_name: str,
round_num: int,
):
for index_num in [0, 1, 2]:
star_num = index_num + 1
if star_num <= level_star:
star_pic = star_yes.copy()
else:
star_pic = star_gray.copy()
floor_pic.paste(star_pic, (103 + index_num * 50, 25), star_pic)
floor_pic_draw = ImageDraw.Draw(floor_pic)
floor_pic_draw.text(
(450, 60),
floor_name,
font=sr_font_42,
fill=white_color,
anchor='mm',
)
floor_pic_draw.text(
(802, 60),
f'使用轮: {round_num}',
font=sr_font_28,
fill=gray_color,
anchor='rm',
)
img.paste(floor_pic, (0, 657 + index_floor * 570), floor_pic)
async def draw_abyss_img(
qid: Union[str, int],
uid: str,
schedule_type: str = '1',
) -> Union[bytes, str]:
# 获取Cookies
data = GsCookie()
# retcode = await data.get_cookie(uid)
# if retcode:
# return retcode
# raw_data = data.raw_data
raw_abyss_data = await data.get_spiral_abyss_data(uid, schedule_type)
# print(raw_abyss_data)
if isinstance(raw_abyss_data, int):
return get_error(raw_abyss_data)
# 获取数据
# if raw_data:
# char_data = raw_data['avatars']
# else:
# return '没有获取到角色数据'
# char_temp = {}
# 获取查询者数据
# if floor:
# floor = floor - 9
# if floor < 0:
# return '楼层不能小于9层!'
# if len(raw_abyss_data['floors']) >= floor + 1:
# floors_data = raw_abyss_data['floors'][floor]
# else:
# return '你还没有挑战该层!'
# else:
# if len(raw_abyss_data['floors']) == 0:
# return '你还没有挑战本期深渊!\n可以使用[上期深渊]命令查询上期~'
# floors_data = raw_abyss_data['floors'][-1]
if raw_abyss_data['max_floor'] == '':
return '你还没有挑战本期深渊!\n可以使用[sr上期深渊]命令查询上期~'
# if floors_data['levels'][-1]['battles']:
# is_unfull = False
# else:
# is_unfull = True
# 获取背景图片各项参数
based_w = 900
floor_num = len(raw_abyss_data['all_floor_detail'])
if floor_num >= 3:
based_h = 1227
else:
based_h = 657 + 570 * floor_num
img = img_bg.copy()
img = img.crop((0, 0, based_w, based_h))
abyss_title = Image.open(TEXT_PATH / 'head.png')
img.paste(abyss_title, (0, 0), abyss_title)
# 获取头像
_id = str(qid)
if _id.startswith('http'):
char_pic = await get_qq_avatar(avatar_url=_id)
else:
char_pic = await get_qq_avatar(qid=qid)
char_pic = await draw_pic_with_ring(char_pic, 250)
img.paste(char_pic, (325, 132), char_pic)
# 绘制抬头
img_draw = ImageDraw.Draw(img)
img_draw.text((450, 442), f'UID {uid}', white_color, sr_font_28, 'mm')
# 总体数据
abyss_data = Image.open(TEXT_PATH / 'data.png')
img.paste(abyss_data, (0, 500), abyss_data)
# 最深抵达
img_draw.text(
(220, 565),
f'{raw_abyss_data["max_floor"]}',
white_color,
sr_font_34,
'lm',
)
# 挑战次数
img_draw.text(
(220, 612),
f'{raw_abyss_data["battle_num"]}',
white_color,
sr_font_34,
'lm',
)
star_num_pic = Image.open(TEXT_PATH / 'star.png')
img.paste(star_num_pic, (615, 557), star_num_pic)
img_draw.text(
(695, 590),
f'{raw_abyss_data["star_num"]}/30',
white_color,
sr_font_42,
'lm',
)
task = []
for index_floor, level in enumerate(raw_abyss_data['all_floor_detail']):
if index_floor >= 3:
break
floor_pic = Image.open(TEXT_PATH / 'floor_bg.png')
level_star = level['star_num']
floor_name = level['name']
round_num = level['round_num']
for index_part in [0, 1]:
node_num = index_part + 1
node = f'node_{node_num}'
# 节点1
time_array = level[node]['challenge_time']
time_str = f"{time_array['year']}-{time_array['month']}"
time_str = f"{time_str}-{time_array['day']}"
time_str = (
f"{time_str} {time_array['hour']}:{time_array['minute']}:00"
)
floor_pic_draw = ImageDraw.Draw(floor_pic)
floor_pic_draw.text(
(112, 120 + index_part * 219),
f'节点{node_num}',
white_color,
sr_font_30,
'lm',
)
floor_pic_draw.text(
(201, 120 + index_part * 219),
f'{time_str}',
gray_color,
sr_font_22,
'lm',
)
for index_char, char in enumerate(level[node]['avatars']):
# 获取命座
# if char["id"] in char_temp:
# talent_num = char_temp[char["id"]]
# else:
# for i in char_data:
# if i["id"] == char["id"]:
# talent_num = str(
# i["actived_constellation_num"]
# )
# char_temp[char["id"]] = talent_num
# break
task.append(
_draw_abyss_card(
char,
0, # type: ignore
floor_pic,
index_char,
index_part,
)
)
await asyncio.gather(*task)
task.clear()
task.append(
_draw_floor_card(
level_star,
floor_pic,
img,
index_floor,
floor_name,
round_num,
)
)
await asyncio.gather(*task)
# title_data = {
# '最强一击!': damage_rank[0],
# '最多击破!': defeat_rank[0],
# '承受伤害': take_damage_rank[0],
# '元素战技': energy_skill_rank[0],
# }
# for _index, _name in enumerate(title_data):
# _char = title_data[_name]
# _char_id = _char['avatar_id']
# char_side_path = TEXT_PATH / f'{_char_id}.png'
# # if not char_side_path.exists():
# # await download_file(_char['avatar_icon'], 3, f'{_char_id}.png')
# char_side = Image.open(char_side_path)
# char_side = char_side.resize((75, 75))
# intent = _index * 224
# title_xy = (115 + intent, 523)
# val_xy = (115 + intent, 545)
# _val = str(_char['value'])
# img.paste(char_side, (43 + intent, 484), char_side)
# img_draw.text(title_xy, _name, white_color, gs_font_20, 'lm')
# img_draw.text(val_xy, _val, white_color, gs_font_26, 'lm')
# 过滤数据
# raw_abyss_data['floors'] = [
# i for i in raw_abyss_data['floors'] if i['index'] >= 9
# ]
# 绘制缩略信息
# for num in range(4):
# omit_bg = Image.open(TEXT_PATH / 'abyss_omit.png')
# omit_draw = ImageDraw.Draw(omit_bg)
# omit_draw.text((56, 34), f'第{num+9}层', white_color, gs_font_32, 'lm')
# omit_draw.rounded_rectangle((165, 19, 225, 49), 20, red_color)
# if len(raw_abyss_data['floors']) - 1 >= num:
# _floor = raw_abyss_data['floors'][num]
# if _floor['star'] == _floor['max_star']:
# _color = red_color
# _text = '全满星'
# else:
# _gap = _floor['max_star'] - _floor['star']
# _color = blue_color
# _text = f'差{_gap}颗'
# if not is_unfull:
# _timestamp = int(
# _floor['levels'][-1]['battles'][-1]['timestamp']
# )
# _time_array = time.localtime(_timestamp)
# _time_str = time.strftime('%Y-%m-%d %H:%M:%S', _time_array)
# else:
# _time_str = '请挑战后查看时间数据!'
# else:
# _color = gray_color
# _text = '未解锁'
# _time_str = '请挑战后查看时间数据!'
# omit_draw.rounded_rectangle((165, 19, 255, 49), 20, _color)
# omit_draw.text((210, 34), _text, white_color, gs_font_26, 'mm')
# omit_draw.text((54, 65), _time_str, sec_color, gs_font_22, 'lm')
# pos = (20 + 459 * (num % 2), 613 + 106 * (num // 2))
# img.paste(omit_bg, pos, omit_bg)
# if is_unfull:
# hint = Image.open(TEXT_PATH / 'hint.png')
# img.paste(hint, (0, 830), hint)
# else:
# task = []
# floor_num = floors_data['index']
# for index_floor, level in enumerate(floors_data['levels']):
# floor_pic = Image.open(TEXT_PATH / 'abyss_floor.png')
# level_star = level['star']
# timestamp = int(level['battles'][0]['timestamp'])
# time_array = time.localtime(timestamp)
# time_str = time.strftime('%Y-%m-%d %H:%M:%S', time_array)
# for index_part, battle in enumerate(level['battles']):
# for index_char, char in enumerate(battle['avatars']):
# # 获取命座
# if char["id"] in char_temp:
# talent_num = char_temp[char["id"]]
# else:
# for i in char_data:
# if i["id"] == char["id"]:
# talent_num = str(
# i["actived_constellation_num"]
# )
# char_temp[char["id"]] = talent_num
# break
# task.append(
# _draw_abyss_card(
# char,
# talent_num, # type: ignore
# floor_pic,
# index_char,
# index_part,
# )
# )
# await asyncio.gather(*task)
# task.clear()
# task.append(
# _draw_floor_card(
# level_star,
# floor_pic,
# img,
# time_str,
# index_floor,
# floor_num,
# )
# )
# await asyncio.gather(*task)
res = await convert_img(img)
logger.info('[查询深渊信息]绘图已完成,等待发送!')
return res

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 396 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 37 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 38 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 55 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 102 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 40 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 381 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.9 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.7 KiB

View File

@ -0,0 +1,25 @@
from io import BytesIO
from typing import TypeVar
from PIL import Image
from aiohttp import ClientSession
from gsuid_core.data_store import get_res_path
T = TypeVar("T")
ROLEINFO_PATH = get_res_path() / 'StarRailUID' / 'roleinfo'
ROLEINFO_PATH.mkdir(parents=True, exist_ok=True)
async def get_icon(url: str) -> Image.Image:
name = url.split('/')[-1]
path = ROLEINFO_PATH / name
if (path).exists():
content = path.read_bytes()
else:
async with ClientSession() as client:
async with client.get(url) as resp:
content = await resp.read()
with open(path, "wb") as f:
f.write(content)
return Image.open(BytesIO(content)).convert("RGBA")

View File

@ -3,11 +3,12 @@ from typing import Tuple, Union, Optional, overload
from gsuid_core.bot import Bot from gsuid_core.bot import Bot
from gsuid_core.models import Event from gsuid_core.models import Event
from gsuid_core.utils.api.mys.models import AbyssData, IndexData from gsuid_core.utils.api.mys.models import IndexData
from .api import get_sqla from .api import get_sqla
from .mys_api import mys_api from .mys_api import mys_api
from .error_reply import VERIFY_HINT from .error_reply import VERIFY_HINT
from ..sruid_utils.api.mys.models import AbyssData
@overload @overload
@ -71,9 +72,11 @@ class GsCookie:
return data return data
async def get_spiral_abyss_data( async def get_spiral_abyss_data(
self, schedule_type: str = '1' self, uid: str, schedule_type: str = '1'
) -> Union[AbyssData, int]: ) -> Union[AbyssData, int]:
data = await mys_api.get_spiral_abyss_info( self.uid = uid
self.cookie = await self.sqla.get_random_cookie(uid)
data = await mys_api.get_srspiral_abyss_info(
self.uid, schedule_type, self.cookie self.uid, schedule_type, self.cookie
) )
return data return data

View File

@ -2,8 +2,8 @@ import copy
import time import time
import uuid import uuid
import random import random
from typing import Dict, Union, cast
from string import digits, ascii_letters from string import digits, ascii_letters
from typing import Dict, Union, Optional, cast
from gsuid_core.utils.api.mys_api import _MysApi from gsuid_core.utils.api.mys_api import _MysApi
from gsuid_core.utils.api.mys.models import MysSign, SignInfo, SignList from gsuid_core.utils.api.mys.models import MysSign, SignInfo, SignList
@ -17,6 +17,7 @@ from .api import srdbsqla
from ..sruid_utils.api.mys.api import _API from ..sruid_utils.api.mys.api import _API
from ..sruid_utils.api.mys.models import ( from ..sruid_utils.api.mys.models import (
GachaLog, GachaLog,
AbyssData,
RoleIndex, RoleIndex,
AvatarInfo, AvatarInfo,
MonthlyAward, MonthlyAward,
@ -182,6 +183,29 @@ class MysApi(_MysApi):
data = cast(SignInfo, data['data']) data = cast(SignInfo, data['data'])
return data return data
async def get_srspiral_abyss_info(
self,
uid: str,
schedule_type='1',
ck: Optional[str] = None,
) -> Union[AbyssData, int]:
server_id = self.RECOGNIZE_SERVER.get(uid[0])
data = await self.simple_mys_req(
'CHALLENGE_INFO_URL',
uid,
params={
'role_id': uid,
'schedule_type': schedule_type,
'server': server_id,
},
cookie=ck,
header=self._HEADER,
)
print(data)
if isinstance(data, Dict):
data = cast(AbyssData, data['data'])
return data
async def mys_sign( async def mys_sign(
self, uid, header={}, server_id='cn_gf01' self, uid, header={}, server_id='cn_gf01'
) -> Union[MysSign, int]: ) -> Union[MysSign, int]: