🎨 优化代码 (#3)

This commit is contained in:
Wuyi无疑 2023-05-03 18:03:57 +08:00
parent 10fca913ad
commit 9c332491be
11 changed files with 116 additions and 185 deletions

8
.vscode/extensions.json vendored Normal file
View File

@ -0,0 +1,8 @@
{
"recommendations": [
"ms-python.python",
"ms-python.vscode-pylance",
"ms-python.isort",
"ms-python.black-formatter"
]
}

29
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,29 @@
{
"python.languageServer": "Pylance",
"python.analysis.typeCheckingMode": "basic",
"cSpell.words": [
"enka",
"genshin",
"genshinuid"
],
"editor.formatOnSave": true,
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": true
}
},
"isort.args": [
"--profile",
"black"
],
"python.formatting.provider": "black",
"python.linting.flake8Enabled": true,
"python.linting.flake8CategorySeverity.W": "Warning",
"python.linting.flake8CategorySeverity.F": "Warning",
"python.linting.flake8CategorySeverity.E": "Warning",
"python.analysis.extraPaths": [
"${workspaceFolder}/../../../../"
]
}

View File

@ -3,7 +3,7 @@ from typing import Any, Dict, List, Optional, TypedDict
class RoleBasicInfo(TypedDict): class RoleBasicInfo(TypedDict):
avatar: str avatar: str
nick_name: str nickname: str
region: str region: str
level: int level: int

View File

@ -0,0 +1,26 @@
from typing import Dict
from gsuid_core.utils.plugins_config.models import (
GSC,
GsBoolConfig,
GsListStrConfig,
)
CONIFG_DEFAULT: Dict[str, GSC] = {
'SignTime': GsListStrConfig('每晚签到时间设置', '每晚米游社签到时间设置(时,分)', ['0', '38']),
'SignReportSimple': GsBoolConfig(
'简洁签到报告',
'开启后可以大大减少每日签到报告字数',
True,
),
'SchedSignin': GsBoolConfig(
'定时签到',
'开启后每晚00:30将开始自动签到任务',
True,
),
'CrazyNotice': GsBoolConfig(
'催命模式',
'开启后当达到推送阈值将会一直推送',
False,
),
}

View File

@ -0,0 +1,6 @@
from gsuid_core.utils.plugins_config.gs_config import StringConfig
from .config_default import CONIFG_DEFAULT
from ..utils.resource.RESOURCE_PATH import CONFIG_PATH
srconfig = StringConfig('StarRailUID', CONFIG_PATH, CONIFG_DEFAULT)

View File

@ -5,8 +5,8 @@ from gsuid_core.logger import logger
from ..utils.api import get_sqla from ..utils.api import get_sqla
from ..utils.mys_api import mys_api from ..utils.mys_api import mys_api
from ..starrailuid_config.sr_config import srconfig
from ..sruid_utils.api.mys.models import DailyNoteData from ..sruid_utils.api.mys.models import DailyNoteData
from ....GenshinUID.GenshinUID.genshinuid_config.gs_config import gsconfig
MR_NOTICE = '\n可发送[srmr]或者[sr每日]来查看更多信息!\n' MR_NOTICE = '\n可发送[srmr]或者[sr每日]来查看更多信息!\n'
@ -50,7 +50,7 @@ async def all_check(
for mode in NOTICE.keys(): for mode in NOTICE.keys():
# 检查条件 # 检查条件
if push_data[f'{mode}_is_push'] == 'on': if push_data[f'{mode}_is_push'] == 'on':
if gsconfig.get_config('CrazyNotice').data: if srconfig.get_config('CrazyNotice').data:
if not await check(mode, raw_data, push_data[f'{mode}_value']): if not await check(mode, raw_data, push_data[f'{mode}_value']):
await sqla.update_push_data( await sqla.update_push_data(
uid, {f'{mode}_is_push': 'off'} uid, {f'{mode}_is_push': 'off'}

View File

@ -1,7 +1,10 @@
import re
from gsuid_core.sv import SV from gsuid_core.sv import SV
from gsuid_core.bot import Bot from gsuid_core.bot import Bot
from utils.convert import get_uid
from gsuid_core.models import Event from gsuid_core.models import Event
import re
from .draw_roleinfo_card import get_role_img from .draw_roleinfo_card import get_role_img
sv_get_info = SV('sr查询信息') sv_get_info = SV('sr查询信息')
@ -13,5 +16,9 @@ async def send_role_info(bot: Bot, ev: Event):
if name: if name:
return return
uid = await get_uid(bot, ev)
if uid is None:
return '你还没有绑定UID噢,请使用[sr绑定uid123]完成绑定!'
await bot.logger.info('开始执行[sr查询信息]') await bot.logger.info('开始执行[sr查询信息]')
await bot.send(await get_role_img(bot.bot_id, ev.user_id)) await bot.send(await get_role_img(uid))

View File

@ -1,26 +1,15 @@
import asyncio import asyncio
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional from typing import Dict, List, Union, Optional
from PIL import Image, ImageDraw from PIL import Image, ImageDraw
from gsuid_core.utils.error_reply import get_error
from gsuid_core.logger import logger
from ..utils.api import get_sqla
from ..utils.mys_api import mys_api from ..utils.mys_api import mys_api
from ..sruid_utils.api.mys.models import (
RoleBasicInfo,
RoleIndex,
Stats,
AvatarListItem,
)
from .utils import get_icon, wrap_list from .utils import get_icon, wrap_list
from ..utils.image.convert import convert_img from ..utils.image.convert import convert_img
from ..utils.fonts.starrail_fonts import ( from ..utils.fonts.starrail_fonts import sr_font_24, sr_font_30, sr_font_36
sr_font_24, from ..sruid_utils.api.mys.models import Stats, RoleBasicInfo, AvatarListItem
sr_font_30,
sr_font_36,
)
TEXT_PATH = Path(__file__).parent / 'texture2D' TEXT_PATH = Path(__file__).parent / 'texture2D'
@ -58,22 +47,9 @@ elements = {
} }
async def get_role_img(bot_id: str, user_id: str): async def get_role_img(uid: str) -> Union[bytes, str]:
sqla = get_sqla(bot_id) im = await draw_role_card(uid)
uid_list: List = await sqla.get_bind_sruid_list(user_id) return im
logger.info(f'[每日信息]UID: {uid_list}')
# 进行校验UID是否绑定CK
useable_uid_list = []
for uid in uid_list:
status = await sqla.get_user_cookie(uid)
if status is not None:
useable_uid_list.append(uid)
uid = useable_uid_list[0]
res = await convert_img(await draw_role_card(uid))
logger.info(f'[每日信息]可用UID: {useable_uid_list}')
if not useable_uid_list:
return '请先绑定一个可用CK & UID再来查询哦~'
return res
def _lv(level: int) -> str: def _lv(level: int) -> str:
@ -218,20 +194,28 @@ async def _draw_card_2(
return img_card_2 return img_card_2
async def draw_role_card(sr_uid: str) -> Image.Image: async def draw_role_card(sr_uid: str) -> Union[bytes, str]:
role_basic_info = await mys_api.get_role_basic_info(sr_uid) role_basic_info = await mys_api.get_role_basic_info(sr_uid)
role_index = await mys_api.get_role_index(sr_uid) role_index = await mys_api.get_role_index(sr_uid)
if isinstance(role_index, int):
return get_error(role_index)
if isinstance(role_basic_info, int):
return get_error(role_basic_info)
stats = role_index['stats'] stats = role_index['stats']
avatars = role_index['avatar_list'] avatars = role_index['avatar_list']
detail = await mys_api.get_avatar_info(sr_uid, avatars[0]['id'])
if isinstance(detail, int):
return get_error(detail)
# 角色武器 # 角色武器
details = (await mys_api.get_avatar_info(sr_uid, avatars[0]['id']))[ details = detail['avatar_list']
'avatar_list'
]
equips: Dict[int, Optional[str]] = {} equips: Dict[int, Optional[str]] = {}
for detail in details: for detail in details:
equip = detail['equip'] equip = detail['equip']
equips[detail['id']] = equip['icon'] if equip is not None else None # type: ignore equips[detail['id']] = equip['icon'] if equip is not None else None
# 绘制总图 # 绘制总图
img1, img2 = await asyncio.gather( img1, img2 = await asyncio.gather(
@ -246,4 +230,4 @@ async def draw_role_card(sr_uid: str) -> Image.Image:
img.paste(img1, (0, 0)) img.paste(img1, (0, 0))
img.paste(img2, (0, 810)) img.paste(img2, (0, 810))
img.paste(bg3, (0, height + 810)) img.paste(bg3, (0, height + 810))
return img return await convert_img(img)

View File

@ -11,9 +11,9 @@ from gsuid_core.logger import logger
from ..utils.api import get_sqla from ..utils.api import get_sqla
from .sign import sign_in, daily_sign from .sign import sign_in, daily_sign
from ..utils.error_reply import UID_HINT from ..utils.error_reply import UID_HINT
from ....GenshinUID.GenshinUID.genshinuid_config.gs_config import gsconfig from ..starrailuid_config.sr_config import srconfig
SIGN_TIME = gsconfig.get_config('SignTime').data SIGN_TIME = srconfig.get_config('SignTime').data
sv_sign = SV('星穹铁道签到') sv_sign = SV('星穹铁道签到')
sv_sign_config = SV('星穹铁道管理', pm=2) sv_sign_config = SV('星穹铁道管理', pm=2)
@ -22,7 +22,7 @@ sv_sign_config = SV('星穹铁道管理', pm=2)
# 每日零点半执行米游社星穹铁道签到 # 每日零点半执行米游社星穹铁道签到
@scheduler.scheduled_job('cron', hour=SIGN_TIME[0], minute=SIGN_TIME[1]) @scheduler.scheduled_job('cron', hour=SIGN_TIME[0], minute=SIGN_TIME[1])
async def sign_at_night(): async def sign_at_night():
if gsconfig.get_config('SchedSignin').data: if srconfig.get_config('SchedSignin').data:
await send_daily_sign() await send_daily_sign()

View File

@ -4,10 +4,11 @@ from copy import deepcopy
from gsuid_core.gss import gss from gsuid_core.gss import gss
from gsuid_core.logger import logger from gsuid_core.logger import logger
from gsuid_core.utils.plugins_config.gs_config import core_plugins_config
from ..utils.api import get_sqla from ..utils.api import get_sqla
from ..utils.mys_api import mys_api from ..utils.mys_api import mys_api
from ....GenshinUID.GenshinUID.genshinuid_config.gs_config import gsconfig from ..starrailuid_config.sr_config import srconfig
private_msg_list = {} private_msg_list = {}
group_msg_list = {} group_msg_list = {}
@ -45,7 +46,7 @@ async def sign_in(sr_uid: str) -> str:
if 'risk_code' in sign_data: if 'risk_code' in sign_data:
# 出现校验码 # 出现校验码
if sign_data['risk_code'] == 375: if sign_data['risk_code'] == 375:
if gsconfig.get_config('CaptchaPass').data: if core_plugins_config.get_config('CaptchaPass').data:
gt = sign_data['gt'] gt = sign_data['gt']
ch = sign_data['challenge'] ch = sign_data['challenge']
vl, ch = await mys_api._pass(gt, ch, Header) vl, ch = await mys_api._pass(gt, ch, Header)
@ -125,7 +126,7 @@ async def single_daily_sign(bot_id: str, sr_uid: str, gid: str, qid: str):
'push_message': '', 'push_message': '',
} }
# 检查是否开启简洁签到 # 检查是否开启简洁签到
if gsconfig.get_config('SignReportSimple').data: if srconfig.get_config('SignReportSimple').data:
# 如果失败, 则添加到推送列表 # 如果失败, 则添加到推送列表
if im.startswith(('sr签到失败', '网络有点忙', 'OK', 'ok')): if im.startswith(('sr签到失败', '网络有点忙', 'OK', 'ok')):
message = f'[CQ:at,qq={qid}] {im}' message = f'[CQ:at,qq={qid}] {im}'

View File

@ -1,26 +1,23 @@
import copy import copy
import random import random
from typing import Dict, Union, cast
from string import digits, ascii_letters from string import digits, ascii_letters
from typing import Dict, Union, Literal, Optional, cast
from gsuid_core.utils.api.mys.request import BaseMysApi from gsuid_core.utils.api.mys_api import _MysApi
from gsuid_core.utils.api.mys.models import MysSign, SignInfo, SignList from gsuid_core.utils.api.mys.models import MysSign, SignInfo, SignList
from gsuid_core.utils.api.mys.tools import ( from gsuid_core.utils.api.mys.tools import (
random_hex, random_hex,
get_ds_token,
generate_os_ds, generate_os_ds,
get_web_ds_token, get_web_ds_token,
) )
from ..utils.api import get_sqla
from ..sruid_utils.api.mys.api import _API from ..sruid_utils.api.mys.api import _API
from ....GenshinUID.GenshinUID.genshinuid_config.gs_config import gsconfig
from ..sruid_utils.api.mys.models import ( from ..sruid_utils.api.mys.models import (
RoleIndex,
AvatarInfo,
MonthlyAward, MonthlyAward,
DailyNoteData, DailyNoteData,
RoleBasicInfo, RoleBasicInfo,
RoleIndex,
AvatarInfo
) )
RECOGNIZE_SERVER = { RECOGNIZE_SERVER = {
@ -34,61 +31,10 @@ RECOGNIZE_SERVER = {
} }
class _MysApi(BaseMysApi): class MysApi(_MysApi):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
async def _pass(self, gt: str, ch: str, header: Dict):
# 警告使用该服务例如某RR等需要注意风险问题
# 本项目不以任何形式提供相关接口
# 代码来源GITHUB项目MIT开源
_pass_api = gsconfig.get_config('_pass_API').data
if _pass_api:
data = await self._mys_request(
url=f'{_pass_api}&gt={gt}&challenge={ch}',
method='GET',
header=header,
)
if isinstance(data, int):
return None, None
else:
validate = data['data']['validate']
ch = data['data']['challenge']
else:
validate = None
return validate, ch
async def _upass(self, header: Dict, is_bbs: bool = False):
if is_bbs:
raw_data = await self.get_bbs_upass_link(header)
else:
raw_data = await self.get_upass_link(header)
if isinstance(raw_data, int):
return False
gt = raw_data['data']['gt']
ch = raw_data['data']['challenge']
vl, ch = await self._pass(gt, ch, header)
if vl:
await self.get_header_and_vl(header, ch, vl)
else:
return True
async def get_ck(
self, uid: str, mode: Literal['OWNER', 'RANDOM'] = 'RANDOM'
) -> Optional[str]:
sqla = get_sqla('TEMP')
if mode == 'RANDOM':
return await sqla.get_random_cookie(uid)
else:
return await sqla.get_user_cookie(uid)
async def get_stoken(self, uid: str) -> Optional[str]:
sqla = get_sqla('TEMP')
return await sqla.get_user_stoken(uid)
async def create_qrcode_url(self) -> Union[Dict, int]: async def create_qrcode_url(self) -> Union[Dict, int]:
device_id: str = ''.join(random.choices(ascii_letters + digits, k=64)) device_id: str = ''.join(random.choices(ascii_letters + digits, k=64))
app_id: str = '8' app_id: str = '8'
@ -129,7 +75,7 @@ class _MysApi(BaseMysApi):
uid, uid,
params={ params={
"id": avatar_id, "id": avatar_id,
"need_wiki": "true" if need_wiki else "false" "need_wiki": "true" if need_wiki else "false",
}, },
) )
if isinstance(data, Dict): if isinstance(data, Dict):
@ -261,81 +207,5 @@ class _MysApi(BaseMysApi):
data = cast(DailyNoteData, data['data']) data = cast(DailyNoteData, data['data'])
return data return data
async def _mys_req_get(
self,
url: str,
is_os: bool,
params: Dict,
header: Optional[Dict] = None,
) -> Union[Dict, int]:
if is_os:
_URL = _API[f'{url}_OS']
HEADER = copy.deepcopy(self._HEADER_OS)
use_proxy = True
else:
_URL = _API[url]
HEADER = copy.deepcopy(self._HEADER)
use_proxy = False
if header:
HEADER.update(header)
if 'Cookie' not in HEADER and 'uid' in params: mys_api = MysApi()
ck = await self.get_ck(params['uid'])
if ck is None:
return -51
HEADER['Cookie'] = ck
data = await self._mys_request(
url=_URL,
method='GET',
header=HEADER,
params=params,
use_proxy=use_proxy,
)
return data
async def simple_mys_req(
self,
URL: str,
uid: Union[str, bool],
params: Dict = {},
header: Dict = {},
cookie: Optional[str] = None,
) -> Union[Dict, int]:
if isinstance(uid, bool):
is_os = uid
server_id = 'cn_qd01' if is_os else 'prod_gf_cn'
else:
server_id = RECOGNIZE_SERVER.get(uid[0])
is_os = int(uid[0]) >= 6
ex_params = '&'.join([f'{k}={v}' for k, v in params.items()])
if is_os:
_URL = _API[f'{URL}_OS']
HEADER = copy.deepcopy(self._HEADER_OS)
HEADER['DS'] = generate_os_ds()
else:
_URL = _API[URL]
HEADER = copy.deepcopy(self._HEADER)
param_str = f'role_id={uid}&server={server_id}'
HEADER['DS'] = get_ds_token(
f"{ex_params}&{param_str}" if ex_params else param_str
)
HEADER.update(header)
if cookie is not None:
HEADER['Cookie'] = cookie
elif 'Cookie' not in HEADER and isinstance(uid, str):
ck = await self.get_ck(uid)
if ck is None:
return -51
HEADER['Cookie'] = ck
param_dict = {'server': server_id, 'role_id': uid}
data = await self._mys_request(
url=_URL,
method='GET',
header=HEADER,
params={**params, **param_dict} if params else param_dict,
use_proxy=True if is_os else False,
)
return data
mys_api = _MysApi()