完成一半 sr每日 工作

This commit is contained in:
qwerdvd 2023-04-29 17:30:55 +08:00
parent 7b8efec095
commit 85af38f075
6 changed files with 331 additions and 81 deletions

View File

@ -1,5 +1,13 @@
from typing import Any, List, TypedDict
class RoleBasicInfo(TypedDict):
avatar: str
nick_name: str
region: str
level: int
################
# 每月札记相关 #
################
@ -52,7 +60,7 @@ class MonthlyAward(TypedDict):
################
# 实时便签 #
################
class SingleExpedition(TypedDict):
class Expedition(TypedDict):
avatars: List[str] # 头像Url
status: str
remaining_time: int
@ -65,7 +73,7 @@ class DailyNoteData(TypedDict):
stamina_recover_time: int
accepted_expedition_num: int
total_expedition_num: int
expeditions: List[SingleExpedition]
expeditions: List[Expedition]
################

View File

@ -1,99 +1,217 @@
# import json
# import asyncio
# from typing import List
import asyncio
from io import BytesIO
from typing import List
from pathlib import Path
from PIL import Image
import aiohttp
from PIL import Image, ImageDraw
from gsuid_core.logger import logger
# from PIL import ImageDraw
# from gsuid_core.logger import logger
# from gsuid_core.utils.api.mys.models import Expedition
# from ..utils.mys_api import mys_api
# from ..utils.api import get_sqla
# from ..utils.image.convert import convert_img
# from ..utils.image.image_tools import get_simple_bg
# from ..utils.map.name_covert import enName_to_avatarId
# from ..utils.resource.RESOURCE_PATH import PLAYER_PATH, CHAR_SIDE_PATH
# from ..utils.fonts.starrail_fonts import (
# sr_font_20,
# sr_font_26,
# sr_font_32,
# sr_font_60,
# )
from ..utils.api import get_sqla
from ..utils.mys_api import mys_api
from ..utils.image.convert import convert_img
from ..sruid_utils.api.mys.models import Expedition
from ..utils.fonts.starrail_fonts import (
sr_font_20,
sr_font_24,
sr_font_36,
sr_font_50,
)
TEXT_PATH = Path(__file__).parent / 'texture2D'
note_bg = Image.open(TEXT_PATH / 'note_bg.png')
note_travel_bg = Image.open(TEXT_PATH / 'note_travel_bg.png')
based_w = 500
based_h = 900
based_w = 700
based_h = 1000
white_overlay = Image.new('RGBA', (based_w, based_h), (255, 251, 242, 225))
first_color = (29, 29, 29)
second_color = (98, 98, 98)
white_color = (255, 255, 255)
green_color = (15, 196, 35)
orange_color = (237, 115, 61)
red_color = (235, 61, 75)
# async def _draw_task_img(
# img: Image.Image,
# img_draw: ImageDraw.ImageDraw,
# index: int,
# char: Expedition,
# ):
# char_en_name = char['avatar_side_icon'].split('_')[-1].split('.')[0]
# avatar_id = await enName_to_avatarId(char_en_name)
# char_pic = (
# Image.open(CHAR_SIDE_PATH / f'{avatar_id}.png')
# .convert('RGBA')
# .resize((80, 80), Image.Resampling.LANCZOS) # type: ignore
# )
# img.paste(char_pic, (22 + index * 90, 770), char_pic)
# if char['status'] == 'Finished':
# status_mark = '待收取'
# status_color = red_color
# else:
# status_mark = '已派遣'
# status_color = green_color
# img_draw.text(
# (65 + index * 90, 870),
# status_mark,
# font=sr_font_20,
# fill=status_color,
# anchor='mm',
# )
async def download_image(url: str) -> Image.Image:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
img_data = await response.read()
img = Image.open(BytesIO(img_data))
return img
async def _draw_task_img(
img: Image.Image,
img_draw: ImageDraw.ImageDraw,
index: int,
char: Expedition,
):
for i in range(2):
avatar_url = char['avatars'][i]
image = await download_image(avatar_url)
char_pic = image.convert('RGBA').resize(
(40, 40), Image.Resampling.LANCZOS
) # type: ignore
img.paste(char_pic, (22 + index * 90, 40), char_pic)
if char['status'] == 'Finished':
status_mark = '待收取'
status_color = red_color
else:
status_mark = '已派遣'
status_color = green_color
img_draw.text(
(65 + index * 90, 870),
status_mark,
font=sr_font_20,
fill=status_color,
anchor='mm',
)
async def get_resin_img(bot_id: str, user_id: str):
pass
# try:
# sqla = get_sqla(bot_id)
# uid_list: List = await sqla.get_bind_uid_list(user_id)
# logger.info('[每日信息]UID: {}'.format(uid_list))
# # 进行校验UID是否绑定CK
# useable_uid_list = []
# for uid in uid_list:
# status = await sqla.get_user_cookie(uid)
# if status is not None:
# useable_uid_list.append(uid)
# logger.info('[每日信息]可用UID: {}'.format(useable_uid_list))
# if len(useable_uid_list) == 0:
# return '请先绑定一个可用CK & UID再来查询哦~'
# # 开始绘图任务
# task = []
# img = Image.new(
# 'RGBA', (based_w * len(useable_uid_list), based_h), (0, 0, 0, 0)
# )
# for uid_index, uid in enumerate(useable_uid_list):
# task.append(_draw_all_resin_img(img, uid, uid_index))
# await asyncio.gather(*task)
# res = await convert_img(img)
# logger.info('[查询每日信息]绘图已完成,等待发送!')
# except TypeError:
# logger.exception('[查询每日信息]绘图失败!')
# res = '你绑定过的UID中可能存在过期CK~请重新绑定一下噢~'
#
# return res
try:
sqla = get_sqla(bot_id)
uid_list: List = await sqla.get_bind_sruid_list(user_id)
logger.info('[每日信息]UID: {}'.format(uid_list))
# 进行校验UID是否绑定CK
useable_uid_list = []
for uid in uid_list:
status = await sqla.get_user_cookie(uid)
if status is not None:
useable_uid_list.append(uid)
logger.info('[每日信息]可用UID: {}'.format(useable_uid_list))
if len(useable_uid_list) == 0:
return '请先绑定一个可用CK & UID再来查询哦~'
# 开始绘图任务
task = []
img = Image.new(
'RGBA', (based_w * len(useable_uid_list), based_h), (0, 0, 0, 0)
)
for uid_index, uid in enumerate(useable_uid_list):
task.append(_draw_all_resin_img(img, uid, uid_index))
await asyncio.gather(*task)
res = await convert_img(img)
logger.info('[查询每日信息]绘图已完成,等待发送!')
except TypeError:
logger.exception('[查询每日信息]绘图失败!')
res = '你绑定过的UID中可能存在过期CK~请重新绑定一下噢~'
return res
async def _draw_all_resin_img(img: Image.Image, uid: str, index: int):
resin_img = await draw_resin_img(uid)
img.paste(resin_img, (500 * index, 0), resin_img)
async def seconds2hours(seconds: int) -> str:
m, s = divmod(int(seconds), 60)
h, m = divmod(m, 60)
return '%02d小时%02d' % (h, m)
async def draw_resin_img(sr_uid: str) -> Image.Image:
# 获取数据
daily_data = await mys_api.get_daily_data(sr_uid)
# 获取背景图片各项参数
img = note_bg
# img.paste(white_overlay, (0, 0), white_overlay)
if isinstance(daily_data, int):
img_draw = ImageDraw.Draw(img)
# img.paste(warn_pic, (0, 0), warn_pic)
# 写UID
img_draw.text(
(250, 553),
f'UID{sr_uid}',
font=sr_font_36,
fill=first_color,
anchor='mm',
)
img_draw.text(
(250, 518),
f'错误码 {daily_data}',
font=sr_font_36,
fill=red_color,
anchor='mm',
)
return img
# nickname and level
role_basic_info = await mys_api.get_role_basic_info(sr_uid)
nickname = role_basic_info['nickname']
level = role_basic_info['level']
# 开拓力
stamina = daily_data['current_stamina']
max_stamina = daily_data['max_stamina']
stamina_str = f'{stamina}/{max_stamina}'
stamina_percent = stamina / max_stamina
if stamina_percent > 0.8:
stamina_color = red_color
else:
stamina_color = second_color
stamina_recovery_time = await seconds2hours(
daily_data['stamina_recover_time']
)
img_draw = ImageDraw.Draw(img)
# # 派遣
# task_task = []
# for index, char in enumerate(daily_data['expeditions']):
# task_task.append(_draw_task_img(img, img_draw, index, char))
# await asyncio.gather(*task_task)
# 绘制树脂圆环
ring_pic = Image.open(TEXT_PATH / 'ring.apng')
percent = (
round(stamina_percent * 90)
if round(stamina_percent * 90) <= 90
else 90
)
ring_pic.seek(percent)
img.paste(ring_pic, (0, 0), ring_pic)
# 写树脂剩余时间
img_draw.text(
(350, 415),
f'还剩{stamina_recovery_time}',
font=sr_font_24,
fill=stamina_color,
anchor='mm',
)
# 写Nickname
img_draw.text(
(350, 153), nickname, font=sr_font_36, fill=white_color, anchor='mm'
)
# 写开拓等级
img_draw.text(
(350, 210),
f'开拓等级{level}',
font=sr_font_24,
fill=white_color,
anchor='mm',
)
# 写UID
img_draw.text(
(350, 655),
f'UID{sr_uid}',
font=sr_font_24,
fill=first_color,
anchor='mm',
)
# 写树脂
img_draw.text(
(350, 478),
stamina_str,
font=sr_font_50,
fill=first_color,
anchor='mm',
)
return img

View File

@ -0,0 +1,109 @@
from io import BytesIO
from pathlib import Path
from base64 import b64encode
from typing import Union, overload
import aiofiles
from PIL import Image, ImageFont
@overload
async def convert_img(img: Image.Image, is_base64: bool = False) -> bytes:
...
@overload
async def convert_img(img: Image.Image, is_base64: bool = True) -> str:
...
@overload
async def convert_img(img: bytes, is_base64: bool = False) -> str:
...
@overload
async def convert_img(img: Path, is_base64: bool = False) -> str:
...
async def convert_img(
img: Union[Image.Image, str, Path, bytes], is_base64: bool = False
):
"""
:说明:
将PIL.Image对象转换为bytes或者base64格式
:参数:
* img (Image): 图片
* is_base64 (bool): 是否转换为base64格式, 不填默认转为bytes
:返回:
* res: bytes对象或base64编码图片
"""
if isinstance(img, Image.Image):
img = img.convert('RGB')
result_buffer = BytesIO()
img.save(result_buffer, format='PNG', quality=80, subsampling=0)
res = result_buffer.getvalue()
if is_base64:
res = 'base64://' + b64encode(res).decode()
return res
elif isinstance(img, bytes):
pass
else:
async with aiofiles.open(img, 'rb') as fp:
img = await fp.read()
return f'base64://{b64encode(img).decode()}'
async def str_lenth(r: str, size: int, limit: int = 540) -> str:
result = ''
temp = 0
for i in r:
if i == '\n':
temp = 0
result += i
continue
if temp >= limit:
result += '\n' + i
temp = 0
else:
result += i
if i.isdigit():
temp += round(size / 10 * 6)
elif i == '/':
temp += round(size / 10 * 2.2)
elif i == '.':
temp += round(size / 10 * 3)
elif i == '%':
temp += round(size / 10 * 9.4)
else:
temp += size
return result
def get_str_size(
r: str, font: ImageFont.FreeTypeFont, limit: int = 540
) -> str:
result = ''
line = ''
for i in r:
if i == '\n':
result += f'{line}\n'
line = ''
continue
line += i
size, _ = font.getsize(line)
if size >= limit:
result += f'{line}\n'
line = ''
else:
result += line
return result
def get_height(content: str, size: int) -> int:
line_count = content.count('\n')
return (line_count + 1) * size

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.7 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.8 KiB

View File

@ -14,8 +14,12 @@ from gsuid_core.utils.api.mys.tools import (
from ..utils.api import get_sqla
from ..sruid_utils.api.mys.api import _API
from ..sruid_utils.api.mys.models import MonthlyAward, DailyNoteData
from ....GenshinUID.GenshinUID.genshinuid_config.gs_config import gsconfig
from ..sruid_utils.api.mys.models import (
MonthlyAward,
DailyNoteData,
RoleBasicInfo,
)
RECOGNIZE_SERVER = {
'1': 'prod_gf_cn',
@ -225,6 +229,17 @@ class _MysApi(BaseMysApi):
data = cast(MonthlyAward, data['data'])
return data
async def get_role_basic_info(
self, sr_uid: str
) -> Union[RoleBasicInfo, int]:
data = await self.simple_mys_req(
'STAR_RAIL_ROLE_BASIC_INFO_URL', sr_uid
)
print(data)
if isinstance(data, Dict):
data = cast(DailyNoteData, data['data'])
return data
async def _mys_req_get(
self,
url: str,