From 377d9ccd04582f2620d33aeb1145e4f46d39c0ba Mon Sep 17 00:00:00 2001 From: KimigaiiWuyi <444835641@qq.com> Date: Thu, 30 Nov 2023 04:49:01 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=92=A5=20=E5=AE=8C=E5=85=A8=E7=A7=BB?= =?UTF-8?q?=E9=99=A4=E8=BF=87=E6=97=B6=E7=9A=84`sqla`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../starrailuid_charinfo/draw_char_img.py | 105 +++----- StarRailUID/starrailuid_charinfo/to_card.py | 23 +- StarRailUID/starrailuid_note/__init__.py | 26 +- StarRailUID/starrailuid_roleinfo/__init__.py | 30 ++- StarRailUID/starrailuid_signin/__init__.py | 13 +- StarRailUID/starrailuid_signin/sign.py | 35 +-- .../starrailuid_stamina/draw_stamina_card.py | 27 +- StarRailUID/starrailuid_stamina/notice.py | 20 +- StarRailUID/starrailuid_start/__init__.py | 2 - StarRailUID/starrailuid_user/__init__.py | 32 +-- StarRailUID/starrailuid_user/add_ck.py | 242 ------------------ .../starrailuid_user/draw_user_card.py | 110 -------- .../starrailuid_user/get_ck_help_msg.py | 43 ---- StarRailUID/starrailuid_user/qrlogin.py | 160 ------------ StarRailUID/utils/api.py | 4 +- StarRailUID/utils/convert.py | 13 +- StarRailUID/utils/mys_api.py | 80 +++--- 17 files changed, 187 insertions(+), 778 deletions(-) delete mode 100644 StarRailUID/starrailuid_user/add_ck.py delete mode 100644 StarRailUID/starrailuid_user/get_ck_help_msg.py delete mode 100644 StarRailUID/starrailuid_user/qrlogin.py diff --git a/StarRailUID/starrailuid_charinfo/draw_char_img.py b/StarRailUID/starrailuid_charinfo/draw_char_img.py index a84308a..445ccca 100644 --- a/StarRailUID/starrailuid_charinfo/draw_char_img.py +++ b/StarRailUID/starrailuid_charinfo/draw_char_img.py @@ -5,16 +5,29 @@ import textwrap from pathlib import Path from typing import Dict, Union +from PIL import Image, ImageDraw from gsuid_core.logger import logger +from starrail_damage_cal.to_data import api_to_dict from gsuid_core.utils.image.convert import convert_img from gsuid_core.utils.image.image_tools import draw_text_by_line -from PIL import Image, ImageDraw -from starrail_damage_cal.cal_damage import cal_char_info, cal_info -from starrail_damage_cal.to_data import api_to_dict +from starrail_damage_cal.cal_damage import cal_info, cal_char_info from ..utils.error_reply import CHAR_HINT -from ..utils.excel.read_excel import light_cone_ranks from ..utils.fonts.first_world import fw_font_28 +from ..utils.excel.read_excel import light_cone_ranks +from ..utils.map.name_covert import name_to_avatar_id, alias_to_char_name +from ..utils.map.SR_MAP_PATH import ( + RelicId2Rarity, + AvatarRelicScore, + avatarId2Name, +) +from ..utils.resource.RESOURCE_PATH import ( + RELIC_PATH, + SKILL_PATH, + PLAYER_PATH, + WEAPON_PATH, + CHAR_PORTRAIT_PATH, +) from ..utils.fonts.starrail_fonts import ( sr_font_18, sr_font_20, @@ -25,19 +38,6 @@ from ..utils.fonts.starrail_fonts import ( sr_font_34, sr_font_38, ) -from ..utils.map.name_covert import alias_to_char_name, name_to_avatar_id -from ..utils.map.SR_MAP_PATH import ( - AvatarRelicScore, - RelicId2Rarity, - avatarId2Name, -) -from ..utils.resource.RESOURCE_PATH import ( - CHAR_PORTRAIT_PATH, - PLAYER_PATH, - RELIC_PATH, - SKILL_PATH, - WEAPON_PATH, -) Excel_path = Path(__file__).parent with Path.open(Excel_path / 'Excel' / 'SkillData.json', encoding='utf-8') as f: @@ -131,9 +131,7 @@ async def draw_char_img(char_data: Dict, sr_uid: str, msg: str): # 放角色名 char_img_draw = ImageDraw.Draw(char_info) - char_img_draw.text( - (620, 207), char.char_name, (255, 255, 255), sr_font_38, 'lm' - ) + char_img_draw.text((620, 207), char.char_name, (255, 255, 255), sr_font_38, 'lm') if hasattr(sr_font_38, 'getsize'): char_name_len = sr_font_38.getsize(char.char_name)[0] # type: ignore else: @@ -178,12 +176,9 @@ async def draw_char_img(char_data: Dict, sr_uid: str, msg: str): # 生命值 hp = int(char.base_attributes.get('hp')) add_hp = int( - char.add_attr.get('HPDelta', 0) - + hp * char.add_attr.get('HPAddedRatio', 0) - ) - attr_bg_draw.text( - (413, 31), f'{hp + add_hp}', white_color, sr_font_26, 'rm' + char.add_attr.get('HPDelta', 0) + hp * char.add_attr.get('HPAddedRatio', 0) ) + attr_bg_draw.text((413, 31), f'{hp + add_hp}', white_color, sr_font_26, 'rm') attr_bg_draw.text( (428, 31), f'(+{round(add_hp)!s})', @@ -274,9 +269,7 @@ async def draw_char_img(char_data: Dict, sr_uid: str, msg: str): 'rm', ) # 效果命中 - status_probability_base = ( - char.add_attr.get('StatusProbabilityBase', 0) * 100 - ) + status_probability_base = char.add_attr.get('StatusProbabilityBase', 0) * 100 attr_bg_draw.text( (500, 31 + 48 * 6), f'{status_probability_base:.1f}%', @@ -401,8 +394,7 @@ async def draw_char_img(char_data: Dict, sr_uid: str, msg: str): weapon_bg.paste(rank_img, (weapon_name_len + 330, 2), rank_img) rarity_img = Image.open( - TEXT_PATH - / f'LightCore_Rarity{char.equipment["equipmentRarity"]}.png' + TEXT_PATH / f'LightCore_Rarity{char.equipment["equipmentRarity"]}.png' ).resize((306, 72)) weapon_bg.paste(rarity_img, (223, 55), rarity_img) weapon_bg_draw.text( @@ -415,9 +407,9 @@ async def draw_char_img(char_data: Dict, sr_uid: str, msg: str): # 武器技能 desc = light_cone_ranks[str(char.equipment['equipmentID'])]['desc'] - desc_params = light_cone_ranks[str(char.equipment['equipmentID'])][ - 'params' - ][char.equipment['equipmentRank'] - 1] + desc_params = light_cone_ranks[str(char.equipment['equipmentID'])]['params'][ + char.equipment['equipmentRank'] - 1 + ] for i in range(len(desc_params)): temp = math.floor(desc_params[i] * 1000) / 10 desc = desc.replace(f'#{i + 1}[i]%', f'{temp!s}%') @@ -467,7 +459,9 @@ async def draw_char_img(char_data: Dict, sr_uid: str, msg: str): (105, 105), Image.Resampling.LANCZOS ).convert('RGBA') relic_img.paste( - relic_piece_new_img, (200, 90), relic_piece_new_img + relic_piece_new_img, + (200, 90), + relic_piece_new_img, ) rarity_img = Image.open( TEXT_PATH / f'LightCore_Rarity' @@ -585,9 +579,7 @@ async def draw_char_img(char_data: Dict, sr_uid: str, msg: str): anchor='rm', ) - char_info.paste( - relic_img, RELIC_POS[str(relic['Type'])], relic_img - ) + char_info.paste(relic_img, RELIC_POS[str(relic['Type'])], relic_img) relic_score += single_relic_score if relic_score > 210: relic_value_level = Image.open(TEXT_PATH / 'CommonIconSSS.png') @@ -659,9 +651,7 @@ async def draw_char_img(char_data: Dict, sr_uid: str, msg: str): damage_img = Image.open(TEXT_PATH / 'attack_1.png') else: damage_img = Image.open(TEXT_PATH / 'attack_2.png') - char_info.paste( - damage_img, (0, 2028 + damage_num * 48), damage_img - ) + char_info.paste(damage_img, (0, 2028 + damage_num * 48), damage_img) char_img_draw.text( (55, 2048 + damage_num * 48), f'{damage_info["name"]}', @@ -669,8 +659,9 @@ async def draw_char_img(char_data: Dict, sr_uid: str, msg: str): sr_font_26, 'lm', ) - if len(damage_info['damagelist']) == 3: - damage1 = math.floor(damage_info['damagelist'][0]) # type: ignore + dmg_list = damage_info['damagelist'] + if len(dmg_list) == 3: + damage1 = math.floor(dmg_list[0]) # type: ignore char_img_draw.text( (370, 2048 + damage_num * 48), f'{damage1}', @@ -678,7 +669,7 @@ async def draw_char_img(char_data: Dict, sr_uid: str, msg: str): sr_font_26, 'lm', ) - damage2 = math.floor(damage_info['damagelist'][1]) # type: ignore + damage2 = math.floor(dmg_list[1]) # type: ignore char_img_draw.text( (560, 2048 + damage_num * 48), f'{damage2}', @@ -686,7 +677,7 @@ async def draw_char_img(char_data: Dict, sr_uid: str, msg: str): sr_font_26, 'lm', ) - damage3 = math.floor(damage_info['damagelist'][2]) # type: ignore + damage3 = math.floor(dmg_list[2]) # type: ignore char_img_draw.text( (750, 2048 + damage_num * 48), f'{damage3}', @@ -695,7 +686,7 @@ async def draw_char_img(char_data: Dict, sr_uid: str, msg: str): 'lm', ) else: - damage = math.floor(damage_info['damagelist'][0]) # type: ignore + damage = math.floor(dmg_list[0]) # type: ignore char_img_draw.text( (560, 2048 + damage_num * 48), f'{damage}', @@ -759,9 +750,7 @@ async def get_char_data( elif enable_self and char_self_path.exists(): path = char_self_path else: - char_id_list, _ = await api_to_dict( - uid, save_path=PLAYER_PATH - ) + char_id_list, _ = await api_to_dict(uid, save_path=PLAYER_PATH) charname_list = [] if isinstance(char_id_list, str): return char_id_list @@ -817,9 +806,7 @@ async def get_relic_score( add_value = subValue * 0.3 * 0.5 * weight_dict['AttackDelta'] * 1.0 relic_score += add_value if subProperty == 'DefenceDelta': - add_value = ( - subValue * 0.3 * 0.5 * weight_dict['DefenceDelta'] * 1.0 - ) + add_value = subValue * 0.3 * 0.5 * weight_dict['DefenceDelta'] * 1.0 relic_score += add_value if subProperty == 'HPDelta': add_value = subValue * 0.158 * 0.5 * weight_dict['HPDelta'] * 1.0 @@ -828,9 +815,7 @@ async def get_relic_score( add_value = subValue * 1.5 * weight_dict['AttackAddedRatio'] * 100 relic_score += add_value if subProperty == 'DefenceAddedRatio': - add_value = ( - subValue * 1.19 * weight_dict['DefenceAddedRatio'] * 100 - ) + add_value = subValue * 1.19 * weight_dict['DefenceAddedRatio'] * 100 relic_score += add_value if subProperty == 'HPAddedRatio': add_value = subValue * 1.5 * weight_dict['HPAddedRatio'] * 100 @@ -839,18 +824,12 @@ async def get_relic_score( add_value = subValue * 2.53 * weight_dict['SpeedDelta'] relic_score += add_value if subProperty == 'BreakDamageAddedRatioBase': - add_value = ( - subValue * 1.0 * weight_dict['BreakDamageAddedRatioBase'] * 100 - ) + add_value = subValue * 1.0 * weight_dict['BreakDamageAddedRatioBase'] * 100 relic_score += add_value if subProperty == 'StatusProbabilityBase': - add_value = ( - subValue * 1.49 * weight_dict['StatusProbabilityBase'] * 100 - ) + add_value = subValue * 1.49 * weight_dict['StatusProbabilityBase'] * 100 relic_score += add_value if subProperty == 'StatusResistanceBase': - add_value = ( - subValue * 1.49 * weight_dict['StatusResistanceBase'] * 100 - ) + add_value = subValue * 1.49 * weight_dict['StatusResistanceBase'] * 100 relic_score += add_value return relic_score diff --git a/StarRailUID/starrailuid_charinfo/to_card.py b/StarRailUID/starrailuid_charinfo/to_card.py index ab96bc6..9bc5d84 100644 --- a/StarRailUID/starrailuid_charinfo/to_card.py +++ b/StarRailUID/starrailuid_charinfo/to_card.py @@ -3,17 +3,17 @@ from pathlib import Path from typing import Dict, List, Union from PIL import Image, ImageDraw -from starrail_damage_cal.map.SR_MAP_PATH import avatarId2Name from starrail_damage_cal.to_data import api_to_dict +from starrail_damage_cal.map.SR_MAP_PATH import avatarId2Name -from ..utils.fonts.first_world import fw_font_28 -from ..utils.fonts.starrail_fonts import sr_font_24, sr_font_30, sr_font_58 from ..utils.image.convert import convert_img +from ..utils.fonts.first_world import fw_font_28 from ..utils.map.name_covert import avatar_id_to_char_star +from ..utils.fonts.starrail_fonts import sr_font_24, sr_font_30, sr_font_58 from ..utils.resource.RESOURCE_PATH import ( + PLAYER_PATH, CHAR_ICON_PATH, CHAR_PREVIEW_PATH, - PLAYER_PATH, ) half_color = (255, 255, 255, 120) @@ -53,7 +53,10 @@ async def draw_enka_card(uid: str, char_list: List, showfrom: int = 0): for char in char_list: avatarName = avatarId2Name[str(char)] char_data_list.append( - {'avatarName': avatarName, 'avatarId': str(char)} + { + 'avatarName': avatarName, + 'avatarId': str(char), + } ) if showfrom == 0: line1 = f'展柜内有 {len(char_data_list)} 个角色!' @@ -155,11 +158,11 @@ async def draw_enka_char(index: int, img: Image.Image, char_data: Dict): char_temp.paste(char_img, (8, 8), char_img) char_card.paste(char_temp, (0, 0), char_mask) if index <= 7: - x = ( - 60 + (index % 4) * 220 - if img.size[0] <= 1100 - else 160 + (index % 4) * 220 - ) + if img.size[0] <= 1100: + x = 60 + (index % 4) * 220 + else: + x = 160 + (index % 4) * 220 + img.paste( char_card, (x, 187 + (index // 4) * 220), diff --git a/StarRailUID/starrailuid_note/__init__.py b/StarRailUID/starrailuid_note/__init__.py index 545a1bf..865d20f 100644 --- a/StarRailUID/starrailuid_note/__init__.py +++ b/StarRailUID/starrailuid_note/__init__.py @@ -1,21 +1,21 @@ +from gsuid_core.sv import SV from gsuid_core.bot import Bot from gsuid_core.models import Event -from gsuid_core.sv import SV -from gsuid_core.utils.database.models import GsUser +from gsuid_core.utils.database.models import GsBind -from ..utils.convert import get_uid -from ..utils.error_reply import UID_HINT -from ..utils.sr_prefix import PREFIX -from .draw_note_card import draw_note_img from .note_text import award +from ..utils.convert import get_uid +from ..utils.sr_prefix import PREFIX +from ..utils.error_reply import UID_HINT +from .draw_note_card import draw_note_img -sv_get_monthly_data = SV('sr查询月历') +sv_get_monthly_data = SV("sr查询月历") # 群聊内 每月统计 功能 -@sv_get_monthly_data.on_fullmatch(f'{PREFIX}每月统计') +@sv_get_monthly_data.on_fullmatch(f"{PREFIX}每月统计") async def send_monthly_data(bot: Bot, ev: Event): - sr_uid = await GsUser.get_bind_sruid(ev.user_id) + sr_uid = await GsBind.get_uid_by_game(ev.user_id, ev.bot_id, "sr") if sr_uid is None: return UID_HINT await bot.send(await award(sr_uid)) @@ -23,10 +23,14 @@ async def send_monthly_data(bot: Bot, ev: Event): @sv_get_monthly_data.on_fullmatch( - (f'{PREFIX}开拓月历', f'{PREFIX}zj', f'{PREFIX}月历') + ( + f"{PREFIX}开拓月历", + f"{PREFIX}zj", + f"{PREFIX}月历", + ) ) async def send_monthly_pic(bot: Bot, ev: Event): - await bot.logger.info('开始执行[sr开拓月历]') + await bot.logger.info("开始执行[sr开拓月历]") sr_uid = await get_uid(bot, ev) if sr_uid is None: return UID_HINT diff --git a/StarRailUID/starrailuid_roleinfo/__init__.py b/StarRailUID/starrailuid_roleinfo/__init__.py index 6fc0229..b205d39 100644 --- a/StarRailUID/starrailuid_roleinfo/__init__.py +++ b/StarRailUID/starrailuid_roleinfo/__init__.py @@ -1,35 +1,37 @@ import re -from gsuid_core.bot import Bot -from gsuid_core.logger import logger -from gsuid_core.models import Event from gsuid_core.sv import SV +from gsuid_core.bot import Bot +from gsuid_core.models import Event +from gsuid_core.logger import logger from ..utils.convert import get_uid from ..utils.sr_prefix import PREFIX -from .draw_roleinfo_card import get_detail_img, get_role_img +from ..utils.error_reply import UID_HINT +from .draw_roleinfo_card import get_role_img, get_detail_img -sv_get_info = SV('sr查询信息') +sv_get_info = SV("sr查询信息") -@sv_get_info.on_command(f'{PREFIX}uid') +@sv_get_info.on_command(f"{PREFIX}uid") async def send_role_info(bot: Bot, ev: Event): - name = ''.join(re.findall('[\u4e00-\u9fa5]', ev.text)) + name = "".join(re.findall("[\u4e00-\u9fa5]", ev.text)) if name: return None uid = await get_uid(bot, ev) if uid is None: - return '你还没有绑定UID噢,请使用[sr绑定uid123]完成绑定!' + return "你还没有绑定UID噢,请使用[sr绑定uid123]完成绑定!" - logger.info(f'[sr查询信息]UID: {uid}') - await bot.logger.info('开始执行[sr查询信息]') + logger.info(f"[sr查询信息]UID: {uid}") + await bot.logger.info("开始执行[sr查询信息]") await bot.send(await get_role_img(uid)) return None -@sv_get_info.on_command(f'{PREFIX}练度统计') + +@sv_get_info.on_command(f"{PREFIX}练度统计") async def send_detail_info(bot: Bot, ev: Event): - name = ''.join(re.findall('[\u4e00-\u9fa5]', ev.text)) + name = "".join(re.findall("[\u4e00-\u9fa5]", ev.text)) if name: return None get_uid_ = await get_uid(bot, ev, True) @@ -39,7 +41,7 @@ async def send_detail_info(bot: Bot, ev: Event): if uid is None: return await bot.send(UID_HINT) - logger.info(f'[sr查询信息]UID: {uid}') - await bot.logger.info('开始执行[sr查询信息]') + logger.info(f"[sr查询信息]UID: {uid}") + await bot.logger.info("开始执行[sr查询信息]") await bot.send(await get_detail_img(user_id, uid)) return None diff --git a/StarRailUID/starrailuid_signin/__init__.py b/StarRailUID/starrailuid_signin/__init__.py index 466afc5..ea003d3 100644 --- a/StarRailUID/starrailuid_signin/__init__.py +++ b/StarRailUID/starrailuid_signin/__init__.py @@ -7,8 +7,8 @@ from gsuid_core.gss import gss from gsuid_core.models import Event from gsuid_core.aps import scheduler from gsuid_core.logger import logger +from gsuid_core.utils.database.models import GsBind -from ..utils.api import get_sqla from ..utils.sr_prefix import PREFIX from .sign import sign_in, daily_sign from ..utils.error_reply import UID_HINT @@ -31,8 +31,7 @@ async def sr_sign_at_night(): @sv_sign.on_fullmatch(f'{PREFIX}签到') async def get_sign_func(bot: Bot, ev: Event): await bot.logger.info(f'[SR签到]QQ号: {ev.user_id}') - sqla = get_sqla(ev.bot_id) - sr_uid = await sqla.get_bind_sruid(ev.user_id) + sr_uid = await GsBind.get_uid_by_game(ev.user_id, ev.bot_id, 'sr') if sr_uid is None: return await bot.send(UID_HINT) await bot.logger.info(f'[SR签到]UID: {sr_uid}') @@ -65,9 +64,7 @@ async def send_daily_sign(): single['msg'], 'direct', qid, single['bot_id'], '', '' ) except Exception as e: - logger.warning( - f'[SR每日全部签到] QQ {qid} 私聊推送失败!错误信息:{e}' - ) + logger.warning(f'[SR每日全部签到] QQ {qid} 私聊推送失败!错误信息:{e}') await asyncio.sleep(0.5) logger.info('[SR每日全部签到]私聊推送完成') @@ -76,9 +73,7 @@ async def send_daily_sign(): # 根据succee数判断是否为简洁推送 if group_msg_list[gid]['success'] >= 0: report = ( - '以下为签到失败报告:{}'.format( - group_msg_list[gid]['push_message'] - ) + '以下为签到失败报告:{}'.format(group_msg_list[gid]['push_message']) if group_msg_list[gid]['push_message'] != '' else '' ) diff --git a/StarRailUID/starrailuid_signin/sign.py b/StarRailUID/starrailuid_signin/sign.py index 16c92b4..246a9b6 100644 --- a/StarRailUID/starrailuid_signin/sign.py +++ b/StarRailUID/starrailuid_signin/sign.py @@ -4,9 +4,9 @@ from copy import deepcopy from gsuid_core.gss import gss from gsuid_core.logger import logger +from gsuid_core.utils.database.models import GsUser from gsuid_core.utils.plugins_config.gs_config import core_plugins_config -from ..utils.api import get_sqla from ..utils.mys_api import mys_api from ..starrailuid_config.sr_config import srconfig @@ -49,23 +49,17 @@ async def sign_in(sr_uid: str) -> str: if core_plugins_config.get_config('CaptchaPass').data: gt = sign_data.gt ch = sign_data.challenge - vl, ch = await mys_api._pass( # noqa: SLF001 - gt, ch, Header - ) + vl, ch = await mys_api._pass(gt, ch, Header) if vl: delay = 1 Header['x-rpc-challenge'] = ch Header['x-rpc-validate'] = vl Header['x-rpc-seccode'] = f'{vl}|jordan' - logger.info( - f'[SR签到] {sr_uid} 已获取验证码, 等待时间{delay}秒' - ) + logger.info(f'[SR签到] {sr_uid} 已获取验证码, 等待时间{delay}秒') await asyncio.sleep(delay) else: delay = 605 + random.randint(1, 120) - logger.info( - f'[SR签到] {sr_uid} 未获取验证码,等待{delay}秒后重试...' - ) + logger.info(f'[SR签到] {sr_uid} 未获取验证码,等待{delay}秒后重试...') await asyncio.sleep(delay) continue logger.info('配置文件暂未开启[跳过无感验证],结束本次任务...') @@ -74,9 +68,7 @@ async def sign_in(sr_uid: str) -> str: if index == 0: logger.info(f'[SR签到] {sr_uid} 该用户无校验码!') else: - logger.info( - f'[SR签到] [无感验证] {sr_uid} 该用户重试 {index} 次验证成功!' - ) + logger.info(f'[SR签到] [无感验证] {sr_uid} 该用户重试 {index} 次验证成功!') break if (int(str(sr_uid)[0]) > 5) and (sign_data.code == 'ok'): # 国际服签到无risk_code字段 @@ -109,9 +101,7 @@ async def sign_in(sr_uid: str) -> str: sign_missed -= 1 sign_missed = sign_info.sign_cnt_missed or sign_missed im = f'{mes_im}!\n{get_im}\n本月漏签次数:{sign_missed}' - logger.info( - f'[SR签到] {sr_uid} 签到完成, 结果: {mes_im}, 漏签次数: {sign_missed}' - ) + logger.info(f'[SR签到] {sr_uid} 签到完成, 结果: {mes_im}, 漏签次数: {sign_missed}') return im @@ -121,7 +111,11 @@ async def single_daily_sign(bot_id: str, sr_uid: str, gid: str, qid: str): if qid not in private_msg_list: private_msg_list[qid] = [] private_msg_list[qid].append( - {'bot_id': bot_id, 'uid': sr_uid, 'msg': im} + { + 'bot_id': bot_id, + 'uid': sr_uid, + 'msg': im, + } ) else: # 向群消息推送列表添加这个群 @@ -153,8 +147,7 @@ async def daily_sign(): global already tasks = [] for bot_id in gss.active_bot: - sqla = get_sqla(bot_id) - user_list = await sqla.get_all_user() + user_list = await GsUser.get_all_user() for user in user_list: if user.sign_switch != 'off' and user.sr_uid is not None: tasks.append( @@ -171,9 +164,7 @@ async def daily_sign(): delay = 1 else: delay = 50 + random.randint(3, 45) - logger.info( - f'[SR签到] 已签到{len(tasks)}个用户, 等待{delay}秒进行下一次签到' - ) + logger.info(f'[SR签到] 已签到{len(tasks)}个用户, 等待{delay}秒进行下一次签到') tasks.clear() already = 0 await asyncio.sleep(delay) diff --git a/StarRailUID/starrailuid_stamina/draw_stamina_card.py b/StarRailUID/starrailuid_stamina/draw_stamina_card.py index 8c8202c..93e7ca2 100644 --- a/StarRailUID/starrailuid_stamina/draw_stamina_card.py +++ b/StarRailUID/starrailuid_stamina/draw_stamina_card.py @@ -6,8 +6,8 @@ from typing import Optional import aiohttp from PIL import Image, ImageDraw from gsuid_core.logger import logger +from gsuid_core.utils.database.models import GsBind, GsUser -from ..utils.api import get_sqla from ..utils.mys_api import mys_api from ..utils.image.convert import convert_img from ..sruid_utils.api.mys.models import Expedition @@ -106,15 +106,14 @@ async def _draw_task_img( async def get_stamina_img(bot_id: str, user_id: str): try: - sqla = get_sqla(bot_id) - uid_list = await sqla.get_bind_sruid_list(user_id) + uid_list = await GsBind.get_uid_list_by_game(user_id, bot_id, 'sr') logger.info(f'[每日信息]UID: {uid_list}') if uid_list is None: return '请先绑定一个UID再来查询哦~' # 进行校验UID是否绑定CK useable_uid_list = [] for uid in uid_list: - status = await sqla.get_user_cookie(uid) + status = await GsUser.get_user_cookie_by_uid(uid, 'sr') if status is not None: useable_uid_list.append(uid) logger.info(f'[每日信息]可用UID: {useable_uid_list}') @@ -208,7 +207,7 @@ async def draw_stamina_img(sr_uid: str) -> Image.Image: else: stamina_color = second_color stamina_recovery_time = await seconds2hours_zhcn( - daily_data.stamina_recover_time + daily_data.stamina_recover_time, ) img.paste(note_bg, (0, 0), note_bg) @@ -217,21 +216,13 @@ async def draw_stamina_img(sr_uid: str) -> Image.Image: # 派遣 task_task = [] for i in range(4): - char = ( - daily_data.expeditions[i] - if i < len(daily_data.expeditions) - else None - ) + char = daily_data.expeditions[i] if i < len(daily_data.expeditions) else None task_task.append(_draw_task_img(img, img_draw, i, char)) await asyncio.gather(*task_task) # 绘制树脂圆环 ring_pic = Image.open(TEXT_PATH / 'ring.apng') - percent = ( - round(stamina_percent * 89) - if round(stamina_percent * 89) <= 89 - else 89 - ) + percent = round(stamina_percent * 89) if round(stamina_percent * 89) <= 89 else 89 ring_pic.seek(percent) img.paste(ring_pic, (0, 5), ring_pic) @@ -245,7 +236,11 @@ async def draw_stamina_img(sr_uid: str) -> Image.Image: ) # 写Nickname img_draw.text( - (350, 139), nickname, font=sr_font_36, fill=white_color, anchor='mm' + (350, 139), + nickname, + font=sr_font_36, + fill=white_color, + anchor='mm', ) # 写开拓等级 img_draw.text( diff --git a/StarRailUID/starrailuid_stamina/notice.py b/StarRailUID/starrailuid_stamina/notice.py index 348d8c4..0888dcb 100644 --- a/StarRailUID/starrailuid_stamina/notice.py +++ b/StarRailUID/starrailuid_stamina/notice.py @@ -2,8 +2,8 @@ from typing import Dict from gsuid_core.gss import gss from gsuid_core.logger import logger +from gsuid_core.utils.database.models import GsPush, GsUser -from ..utils.api import get_sqla from ..utils.mys_api import mys_api from ..starrailuid_config.sr_config import srconfig from ..sruid_utils.api.mys.models import DailyNoteData @@ -19,15 +19,14 @@ NOTICE = { async def get_notice_list() -> Dict[str, Dict[str, Dict]]: msg_dict: Dict[str, Dict[str, Dict]] = {} for bot_id in gss.active_bot: - sqla = get_sqla(bot_id) - user_list = await sqla.get_all_push_user_list() + user_list = await GsUser.get_all_push_user_list() for user in user_list: if user.sr_uid is not None: raw_data = await mys_api.get_daily_data(user.sr_uid) if isinstance(raw_data, int): logger.error(f'[sr推送提醒]获取{user.sr_uid}的数据失败!') continue - push_data = await sqla.select_push_data(user.sr_uid) + push_data = await GsPush.select_data_by_uid(user.sr_uid, 'sr') msg_dict = await all_check( user.bot_id, raw_data, @@ -47,14 +46,13 @@ async def all_check( user_id: str, uid: str, ) -> Dict[str, Dict[str, Dict]]: - sqla = get_sqla(bot_id) for mode in NOTICE.keys(): # 检查条件 if push_data[f'{mode}_is_push'] == 'on': if srconfig.get_config('CrazyNotice').data: if not await check(mode, raw_data, push_data[f'{mode}_value']): - await sqla.update_push_data( - uid, bot_id, {f'{mode}_is_push': 'off'} + await GsPush.update_data_by_uid( + uid, bot_id, 'sr', **{f'{mode}_is_push': 'off'} ) continue # 准备推送 @@ -73,8 +71,8 @@ async def all_check( msg_dict[bot_id]['direct'][user_id] = NOTICE[mode] else: msg_dict[bot_id]['direct'][user_id] += NOTICE[mode] - await sqla.update_push_data( - uid, bot_id, {f'{mode}_is_push': 'on'} + await GsPush.update_data_by_uid( + uid, bot_id, 'sr', **{f'{mode}_is_push': 'on'} ) # 群号推送到群聊 else: @@ -87,8 +85,8 @@ async def all_check( msg_dict[bot_id]['group'][gid][user_id] = NOTICE[mode] else: msg_dict[bot_id]['group'][gid][user_id] += NOTICE[mode] - await sqla.update_push_data( - uid, bot_id, {f'{mode}_is_push': 'on'} + await GsPush.update_data_by_uid( + uid, bot_id, 'sr', **{f'{mode}_is_push': 'on'} ) return msg_dict diff --git a/StarRailUID/starrailuid_start/__init__.py b/StarRailUID/starrailuid_start/__init__.py index 0238f09..a0a5cb6 100644 --- a/StarRailUID/starrailuid_start/__init__.py +++ b/StarRailUID/starrailuid_start/__init__.py @@ -3,13 +3,11 @@ import threading from gsuid_core.logger import logger -from ..utils.api import get_sqla from ..starrailuid_resource import startup async def all_start(): try: - get_sqla('TEMP') await startup() except Exception as e: logger.exception(e) diff --git a/StarRailUID/starrailuid_user/__init__.py b/StarRailUID/starrailuid_user/__init__.py index 19f7cc3..fdeebda 100644 --- a/StarRailUID/starrailuid_user/__init__.py +++ b/StarRailUID/starrailuid_user/__init__.py @@ -3,16 +3,14 @@ from typing import List from gsuid_core.sv import SV from gsuid_core.bot import Bot from gsuid_core.models import Event +from gsuid_core.utils.database.models import GsBind -from ..utils.api import get_sqla from ..utils.sr_prefix import PREFIX from ..utils.message import send_diff_msg from .draw_user_card import get_user_card sv_user_config = SV('sr用户管理', pm=2) -sv_user_add = SV('sr用户添加') sv_user_info = SV('sr用户信息') -# sv_user_help = SV('sr绑定帮助') @sv_user_info.on_fullmatch(f'{PREFIX}绑定信息') @@ -39,13 +37,14 @@ async def send_link_uid_msg(bot: Bot, ev: Event): qid = ev.user_id await bot.logger.info(f'sr[绑定/解绑]UserID: {qid}') - sqla = get_sqla(ev.bot_id) sr_uid = ev.text.strip() if sr_uid and not sr_uid.isdigit(): return await bot.send('你输入了错误的格式!') if '绑定' in ev.command: - data = await sqla.insert_bind_data(qid, sr_uid=sr_uid) + data = await GsBind.insert_uid( + qid, ev.bot_id, sr_uid, ev.group_id, 9, game_name='sr' + ) return await send_diff_msg( bot, data, @@ -56,17 +55,18 @@ async def send_link_uid_msg(bot: Bot, ev: Event): -3: '你输入了错误的格式!', }, ) - if '切换' in ev.command: - data = await sqla.switch_uid(qid, uid=sr_uid) + elif '切换' in ev.command: + data = await GsBind.switch_uid_by_game(qid, ev.bot_id, sr_uid, 'sr') if isinstance(data, List): return await bot.send(f'切换SR_UID{sr_uid}成功!') return await bot.send(f'尚未绑定该SR_UID{sr_uid}') - data = await sqla.delete_bind_data(qid, sr_uid=sr_uid) - return await send_diff_msg( - bot, - data, - { - 0: f'删除SR_UID{sr_uid}成功!', - -1: f'该SR_UID{sr_uid}不在已绑定列表中!', - }, - ) + else: + data = await GsBind.delete_uid(qid, ev.bot_id, sr_uid, 'sr') + return await send_diff_msg( + bot, + data, + { + 0: f'删除SR_UID{sr_uid}成功!', + -1: f'该SR_UID{sr_uid}不在已绑定列表中!', + }, + ) diff --git a/StarRailUID/starrailuid_user/add_ck.py b/StarRailUID/starrailuid_user/add_ck.py deleted file mode 100644 index d1c8226..0000000 --- a/StarRailUID/starrailuid_user/add_ck.py +++ /dev/null @@ -1,242 +0,0 @@ -from pathlib import Path -from typing import Dict, List -from http.cookies import SimpleCookie - -from ..utils.api import get_sqla -from ..utils.mys_api import mys_api -from ..utils.error_reply import UID_HINT - -pic_path = Path(__file__).parent / 'pic' -id_list = [ - 'login_uid', - 'login_uid_v2', - 'account_mid_v2', - 'account_mid', - 'account_id', - 'stuid', - 'ltuid', - 'ltmid', - 'stmid', - 'stmid_v2', - 'ltmid_v2', - 'stuid_v2', - 'ltuid_v2', -] -sk_list = ['stoken', 'stoken_v2'] -ck_list = ['cookie_token', 'cookie_token_v2'] -lt_list = ['login_ticket', 'login_ticket_v2'] - - -async def get_ck_by_all_stoken(bot_id: str): - sqla = get_sqla(bot_id) - uid_list: List = await sqla.get_all_uid_list() - uid_dict = {} - for uid in uid_list: - user_data = await sqla.select_user_data(uid) - if user_data: - uid_dict[uid] = user_data.user_id - return await refresh_ck_by_uid_list(bot_id, uid_dict) - - -async def get_ck_by_stoken(bot_id: str, user_id: str): - sqla = get_sqla(bot_id) - uid_list = await sqla.get_bind_uid_list(user_id) - if uid_list is None: - return '请先绑定一个UID噢~' - uid_dict = {uid: user_id for uid in uid_list} - return await refresh_ck_by_uid_list(bot_id, uid_dict) - - -async def refresh_ck_by_uid_list(bot_id: str, uid_dict: Dict) -> str: - sqla = get_sqla(bot_id) - uid_num = len(uid_dict) - if uid_num == 0: - return '请先绑定一个UID噢~' - error_list = {} - skip_num = 0 - error_num = 0 - for uid in uid_dict: - stoken = await sqla.get_user_stoken(uid) - if stoken is None: - skip_num += 1 - error_num += 1 - continue - qid = uid_dict[uid] - try: - mes = await _deal_ck(bot_id, stoken, qid) - except TypeError: - error_list[uid] = 'SK或CK已过期!' - error_num += 1 - continue - ok_num = mes.count('成功') - if ok_num < 2: - error_list[uid] = '可能是SK已过期~' - error_num += 1 - continue - - s_im = f'执行完成~成功刷新CK{uid_num - error_num}个!跳过{skip_num}个!' - f_im = '\n'.join([f'UID{u}:{error_list[u]}' for u in error_list]) - return f'{s_im}\n{f_im}' if f_im else s_im - - -async def deal_ck(bot_id: str, mes: str, user_id: str, mode: str = 'PIC'): - im = await _deal_ck(bot_id, mes, user_id) - if mode == 'PIC': - im = await _deal_ck_to_pic(im) - return im - - -async def _deal_ck_to_pic(im: str) -> bytes: - ok_num = im.count('成功') - if ok_num < 1: - status_pic = pic_path / 'ck_no.png' - elif ok_num < 2: - status_pic = pic_path / 'ck_ok.png' - else: - status_pic = pic_path / 'all_ok.png' - with Path.open(status_pic, 'rb') as f: - return f.read() - - -async def get_account_id(simp_dict: SimpleCookie) -> str: - for _id in id_list: - if _id in simp_dict: - account_id = simp_dict[_id].value - break - else: - account_id = '' - return account_id - - -async def _deal_ck(bot_id: str, mes: str, user_id: str) -> str: - sqla = get_sqla(bot_id) - simp_dict = SimpleCookie(mes) - uid = await sqla.get_bind_uid(user_id) - sr_uid = await sqla.get_bind_sruid(user_id) - - if uid is None and sr_uid is None: - if uid is None: - return UID_HINT - if sr_uid is None: - return '请绑定星穹铁道UID...' - - im_list = [] - is_add_stoken = False - status = True - app_cookie, stoken = '', '' - account_id, cookie_token = '', '' - if status: - for sk in sk_list: - if sk in simp_dict: - account_id = await get_account_id(simp_dict) - if not account_id: - return '该CK字段出错, 缺少login_uid或stuid或ltuid字段!' - stoken = simp_dict[sk].value - if stoken.startswith('v2_'): - if 'mid' in simp_dict: - mid = simp_dict['mid'].value - app_cookie = ( - f'stuid={account_id};stoken={stoken};mid={mid}' - ) - else: - return 'v2类型SK必须携带mid...' - else: - app_cookie = f'stuid={account_id};stoken={stoken}' - cookie_token_data = await mys_api.get_cookie_token_by_stoken( - stoken, account_id, app_cookie - ) - if isinstance(cookie_token_data, Dict): - cookie_token = cookie_token_data['cookie_token'] - is_add_stoken = True - status = False - break - return '返回值错误...' - if status: - for lt in lt_list: - if lt in simp_dict: - # 寻找stoken - login_ticket = simp_dict[lt].value - account_id = await get_account_id(simp_dict) - if not account_id: - return '该CK字段出错, 缺少login_uid或stuid或ltuid字段!' - stoken_data = await mys_api.get_stoken_by_login_ticket( - login_ticket, account_id - ) - if isinstance(stoken_data, Dict): - stoken = stoken_data['list'][0]['token'] - app_cookie = f'stuid={account_id};stoken={stoken}' - cookie_token_data = ( - await mys_api.get_cookie_token_by_stoken( - stoken, account_id - ) - ) - if isinstance(cookie_token_data, Dict): - cookie_token = cookie_token_data['cookie_token'] - is_add_stoken = True - status = False - break - if status: - for ck in ck_list: - if ck in simp_dict: - # 寻找uid - account_id = await get_account_id(simp_dict) - if not account_id: - return '该CK字段出错, 缺少login_uid或stuid或ltuid字段!' - cookie_token = simp_dict[ck].value - status = False - break - if status: - return ( - '添加Cookies失败!Cookies中应该包含cookie_token或者login_ticket相关信息!' - '\n可以尝试退出米游社登陆重新登陆获取!' - ) - - account_cookie = f'account_id={account_id};cookie_token={cookie_token}' - - try: - if sr_uid or (uid and int(uid[0]) < 6): - mys_data = await mys_api.get_mihoyo_bbs_info( - account_id, account_cookie - ) - else: - mys_data = await mys_api.get_mihoyo_bbs_info( - account_id, account_cookie, True - ) - # 剔除除了原神之外的其他游戏 - if isinstance(mys_data, List): - for i in mys_data: - if i['game_id'] == 2: - uid = i['game_role_id'] - elif i['game_id'] == 6: - sr_uid = i['game_role_id'] - if uid and sr_uid: - break - else: - if not (uid or sr_uid): - return f'你的米游社账号{account_id}尚未绑定原神/星铁账号,\ - 请前往米游社操作!' - except Exception: - pass - - if not uid: - return f'你的米游社账号{account_id}尚未绑定原神/星铁账号,请前往米游社操作!' - - await sqla.refresh_cache(uid) - if is_add_stoken: - im_list.append(f'添加Stoken成功,stuid={account_id},stoken={stoken}') - await sqla.insert_user_data( - user_id, uid, sr_uid, account_cookie, app_cookie - ) - - im_list.append( - f'添加Cookies成功,account_id={account_id},cookie_token={cookie_token}' - ) - im_list.append( - 'Cookies和Stoken属于个人重要信息,如果你是在不知情的情况下添加,请马上修改米游社账户密码,保护个人隐私!' - ) - im_list.append( - '如果需要【sr开启自动签到】和【sr开启推送】还需要在【群聊中】使用命令“绑定uid”绑定你的uid。' - '\n例如:绑定uid123456789。' - ) - im_list.append('你可以使用命令【sr绑定信息】检查你的账号绑定情况!') - return '\n'.join(im_list) diff --git a/StarRailUID/starrailuid_user/draw_user_card.py b/StarRailUID/starrailuid_user/draw_user_card.py index c8091c3..4b96308 100644 --- a/StarRailUID/starrailuid_user/draw_user_card.py +++ b/StarRailUID/starrailuid_user/draw_user_card.py @@ -1,112 +1,2 @@ -# from pathlib import Path -from typing import Tuple, Optional - -from PIL import Image - -from ..utils.api import get_sqla - -# from ..utils.image.convert import convert_img -# from ..utils.colors import sec_color, first_color -# from ..utils.fonts.genshin_fonts import gs_font_15, gs_font_30, gs_font_36 -# from ..utils.image.image_tools import ( -# get_color_bg, -# get_qq_avatar, -# draw_pic_with_ring, -# ) - -# TEXT_PATH = Path(__file__).parent / 'texture2d' -# -# status_off = Image.open(TEXT_PATH / 'status_off.png') -# status_on = Image.open(TEXT_PATH / 'status_on.png') -# -# EN_MAP = {'coin': '宝钱', 'resin': '体力', 'go': '派遣', 'transform': '质变仪'} - - async def get_user_card(bot_id: str, user_id: str): - sqla = get_sqla(bot_id) - return await sqla.get_bind_uid_list(user_id) - # w, h = 750, len(uid_list) * 750 + 470 - # - # # 获取背景图片各项参数 - # _id = str(user_id) - # if _id.startswith('http'): - # char_pic = await get_qq_avatar(avatar_url=_id) - # else: - # char_pic = await get_qq_avatar(qid=_id) - # char_pic = await draw_pic_with_ring(char_pic, 290) - # - # img = await get_color_bg(w, h) - # title = Image.open(TEXT_PATH / 'user_title.png') - # title.paste(char_pic, (241, 40), char_pic) - # - # title_draw = ImageDraw.Draw(title) - # title_draw.text( - # (375, 444), f'{bot_id} - {user_id}', first_color, gs_font_30, 'mm' - # ) - # img.paste(title, (0, 0), title) - # - # for index, uid in enumerate(uid_list): - # user_card = Image.open(TEXT_PATH / 'user_bg.png') - # user_draw = ImageDraw.Draw(user_card) - # user_push_data = await sqla.select_push_data(uid) - # user_data = await sqla.select_user_data(uid) - # if user_data is None: - # user_data = GsUser( - # bot_id=bot_id, - # user_id=user_id, - # uid=uid, - # stoken=None, - # cookie=None, - # sign_switch='off', - # bbs_switch='off', - # push_switch='off', - # ) - # - # user_draw.text( - # (375, 62), - # f'UID {uid}', - # first_color, - # font=gs_font_36, - # anchor='mm', - # ) - # - # x, y = 331, 112 - # paste_switch(user_card, user_data.cookie, (241, 128)) - # paste_switch(user_card, user_data.stoken, (241 + x, 128)) - # paste_switch(user_card, user_data.sign_switch, (241, 128 + y)) - # paste_switch(user_card, user_data.bbs_switch, (241 + x, 128 + y)) - # paste_switch(user_card, user_data.push_switch, (241, 128 + 2 * y)) - # paste_switch(user_card, user_data.status, (241 + x, - # 128 + 2 * y), True) - # - # for _index, mode in enumerate(['coin', 'resin', 'go', 'transform']): - # paste_switch( - # user_card, - # getattr(user_push_data, f'{mode}_push'), - # (241 + _index % 2 * x, 128 + (_index // 2 + 3) * y), - # ) - # if getattr(user_push_data, f'{mode}_push') != 'off': - # user_draw.text( - # (268 + _index % 2 * x, 168 + (_index // 2 + 3) * y), - # f'{getattr(user_push_data, f"{mode}_value")}', - # sec_color, - # font=gs_font_15, - # anchor='lm', - # ) - # img.paste(user_card, (0, 500 + index * 690), user_card) - - # return await convert_img(img) - - -def paste_switch( - card: Image.Image, - status: Optional[str], - pos: Tuple[int, int], - is_status: bool = False, -): pass - # if is_status: - # pic = status_off if status else status_on - # else: - # pic = status_on if status != 'off' and status else status_off - # card.paste(pic, pos, pic) diff --git a/StarRailUID/starrailuid_user/get_ck_help_msg.py b/StarRailUID/starrailuid_user/get_ck_help_msg.py deleted file mode 100644 index 6794d12..0000000 --- a/StarRailUID/starrailuid_user/get_ck_help_msg.py +++ /dev/null @@ -1,43 +0,0 @@ -from typing import List - -from gsuid_core.models import Message - -CK_QRCODE_LOGIN = """先发送【绑定uidxxx】绑定UID, -然后发送【扫码登陆】, 使用米游社APP扫码完成绑定, [或者]选择以下方法 -""" - -CK_CONSOLE = """var cookie = document.cookie; -var Str_Num = cookie.indexOf('_MHYUUID='); -cookie = '添加 ' + cookie.substring(Str_Num); -var ask = confirm('Cookie:' + cookie + '\\n\\n按确认,然后粘贴发送给机器人'); -if (ask == true) { - copy(cookie); - msg = cookie -} else { - msg = 'Cancel' -} -""" - -CK_URL = """1.复制上面全部代码,然后打开下面的网站 -https://bbs.mihoyo.com/ys/obc/?bbs_presentation_style=no_header(国服) -https://www.hoyolab.com/home(国际服) -2.在页面上右键检查或者Ctrl+Shift+i -3.选择控制台(Console),粘贴,回车,在弹出的窗口点确认(点完自动复制) -4.然后在和机器人的私聊窗口,粘贴发送即可 -""" - -SK_URL = """如果想获取SK,操作方法和上面一致,网址更换为 -http://user.mihoyo.com/(国服) -登陆后,进入控制台粘贴代码 -然后在和机器人的私聊窗口,粘贴发送即可 -""" - - -async def get_ck_help() -> List[Message]: - msg_list = [] - msg_list.append(Message('text', '请先添加bot为好友')) - msg_list.append(Message('text', CK_QRCODE_LOGIN)) - msg_list.append(Message('text', CK_CONSOLE)) - msg_list.append(Message('text', CK_URL)) - msg_list.append(Message('text', SK_URL)) - return msg_list diff --git a/StarRailUID/starrailuid_user/qrlogin.py b/StarRailUID/starrailuid_user/qrlogin.py deleted file mode 100644 index 8f799b0..0000000 --- a/StarRailUID/starrailuid_user/qrlogin.py +++ /dev/null @@ -1,160 +0,0 @@ -import io -import json -import base64 -import asyncio -from http.cookies import SimpleCookie -from typing import Any, Dict, List, Tuple, Union, Literal - -import qrcode -from gsuid_core.bot import Bot -from gsuid_core.models import Event -from gsuid_core.logger import logger -from qrcode.constants import ERROR_CORRECT_L -from gsuid_core.segment import MessageSegment - -from ..utils.api import get_sqla -from ..utils.mys_api import mys_api - -disnote = """免责声明:您将通过扫码完成获取米游社sk以及ck。 -本Bot将不会保存您的登录状态。 -我方仅提供米游社查询及相关游戏内容服务 -若您的账号封禁、被盗等处罚与我方无关。 -害怕风险请勿扫码! -""" - - -def get_qrcode_base64(url): - qr = qrcode.QRCode( # type: ignore - version=1, - error_correction=ERROR_CORRECT_L, - box_size=10, - border=4, - ) - qr.add_data(url) - qr.make(fit=True) - img = qr.make_image(fill_color='black', back_color='white') - img_byte = io.BytesIO() - img.save(img_byte, format='PNG') # type: ignore - img_byte = img_byte.getvalue() - return base64.b64encode(img_byte).decode() - - -async def refresh( - code_data: Dict, -) -> Union[Tuple[Literal[False], None], Tuple[Literal[True], Any]]: - scanned = False - while True: - await asyncio.sleep(2) - status_data = await mys_api.check_qrcode( - code_data['app_id'], code_data['ticket'], code_data['device'] - ) - if isinstance(status_data, int): - logger.warning('二维码已过期') - return False, None - if status_data['stat'] == 'Scanned': - if not scanned: - logger.info('二维码已扫描') - scanned = True - continue - if status_data['stat'] == 'Confirmed': - logger.info('二维码已确认') - break - return True, json.loads(status_data['payload']['raw']) - - -async def qrcode_login(bot: Bot, ev: Event, user_id: str) -> str: - sqla = get_sqla(ev.bot_id) - - async def send_msg(msg: str): - await bot.send(msg) - return '' - - code_data = await mys_api.create_qrcode_url() - if isinstance(code_data, int): - return await send_msg('链接创建失败...') - try: - im = [] - im.append(MessageSegment.text('请使用米游社扫描下方二维码登录:')) - im.append( - MessageSegment.image( - f'base64://{get_qrcode_base64(code_data["url"])}' - ) - ) - im.append( - MessageSegment.text( - '免责声明:您将通过扫码完成获取米游社sk以及ck。\n' - '本Bot将不会保存您的登录状态。\n' - '我方仅提供米游社查询及相关游戏内容服务,\n' - '若您的账号封禁、被盗等处罚与我方无关。\n' - '害怕风险请勿扫码~' - ) - ) - await bot.send(MessageSegment.node(im)) - except Exception as e: - logger.error(e) - logger.warning(f'[扫码登录] {user_id} 图片发送失败') - status, game_token_data = await refresh(code_data) - if status: - assert game_token_data is not None # 骗过 pyright - logger.info('game_token获取成功') - cookie_token = await mys_api.get_cookie_token(**game_token_data) - stoken_data = await mys_api.get_stoken_by_game_token( - account_id=int(game_token_data['uid']), - game_token=game_token_data['token'], - ) - if isinstance(stoken_data, int): - return await send_msg('获取SK失败...') - account_id = game_token_data['uid'] - stoken = stoken_data['token']['token'] - mid = stoken_data['user_info']['mid'] - app_cookie = f'stuid={account_id};stoken={stoken};mid={mid}' - ck = await mys_api.get_cookie_token_by_stoken( - stoken, account_id, app_cookie - ) - if isinstance(ck, int): - return await send_msg('获取CK失败...') - ck = ck['cookie_token'] - cookie_check = f'account_id={account_id};cookie_token={ck}' - get_uid = await mys_api.get_mihoyo_bbs_info(account_id, cookie_check) - # 剔除除了原神之外的其他游戏 - im = None - if isinstance(get_uid, List): - for i in get_uid: - if i['game_id'] == 2: - uid_check = i['game_role_id'] - break - else: - im = f'你的米游社账号{account_id}尚未绑定原神账号,请前往米游社操作!' - return await send_msg(im) - else: - im = '请求失败, 请稍后再试...' - return await send_msg(im) - - uid_bind = await sqla.get_bind_uid(user_id) - # 没有在gsuid绑定uid的情况 - if not uid_bind: - logger.warning('game_token获取失败') - im = '你还没有绑定uid, \ - 请输入[绑定uid123456]绑定你的uid, 再发送[扫码登录]进行绑定' - return await send_msg(im) - if isinstance(cookie_token, int): - return await send_msg('获取CK失败...') - # 比对gsuid数据库和扫码登陆获取到的uid - if str(uid_bind) == uid_check or str(uid_bind) == account_id: - return SimpleCookie( - { - 'stoken_v2': stoken_data['token']['token'], - 'stuid': stoken_data['user_info']['aid'], - 'mid': stoken_data['user_info']['mid'], - 'cookie_token': cookie_token['cookie_token'], - } - ).output(header='', sep=';') - logger.warning('game_token获取失败') - im = ( - f'检测到扫码登录UID{uid_check}与绑定UID{uid_bind}不同, ' - 'gametoken获取失败, 请重新发送[扫码登录]进行登录!' - ) - else: - logger.warning('game_token获取失败') - im = 'game_token获取失败: 二维码已过期' - return await send_msg(im) diff --git a/StarRailUID/utils/api.py b/StarRailUID/utils/api.py index 4789a1c..4ef09e1 100644 --- a/StarRailUID/utils/api.py +++ b/StarRailUID/utils/api.py @@ -6,5 +6,5 @@ class SRDBSqla(DBSqla): super().__init__(is_sr=True) -srdbsqla = SRDBSqla() -get_sqla = srdbsqla.get_sqla +# srdbsqla = SRDBSqla() +# get_sqla = srdbsqla.get_sqla diff --git a/StarRailUID/utils/convert.py b/StarRailUID/utils/convert.py index 76f938c..1edefa3 100644 --- a/StarRailUID/utils/convert.py +++ b/StarRailUID/utils/convert.py @@ -3,8 +3,7 @@ from typing import Tuple, Union, Optional, overload from gsuid_core.bot import Bot from gsuid_core.models import Event - -from .api import get_sqla +from gsuid_core.utils.database.models import GsBind @overload @@ -24,18 +23,16 @@ async def get_uid( async def get_uid( bot: Bot, ev: Event, get_user_id: bool = False, only_uid: bool = False ) -> Union[Optional[str], Tuple[Optional[str], str]]: - uid_data = re.findall(r'\d{9}', ev.text) + uid_data = re.findall(r"\d{9}", ev.text) user_id = ev.at if ev.at else ev.user_id if uid_data: sr_uid: Optional[str] = uid_data[0] if sr_uid: - ev.text = ev.text.replace(sr_uid, '') + ev.text = ev.text.replace(sr_uid, "") else: - sqla = get_sqla(ev.bot_id) - sr_uid = await sqla.get_bind_sruid(user_id) + sr_uid = await GsBind.get_uid_by_game(ev.user_id, ev.bot_id, "sr") if only_uid: - sqla = get_sqla(ev.bot_id) - sr_uid = await sqla.get_bind_sruid(user_id) + sr_uid = await GsBind.get_uid_by_game(ev.user_id, ev.bot_id, "sr") if get_user_id: return sr_uid, user_id return sr_uid diff --git a/StarRailUID/utils/mys_api.py b/StarRailUID/utils/mys_api.py index aa17147..a3a3d10 100644 --- a/StarRailUID/utils/mys_api.py +++ b/StarRailUID/utils/mys_api.py @@ -1,39 +1,38 @@ import copy -import random import time -from string import ascii_letters, digits -from typing import Any, Dict, Literal, Optional, Union +import random +from string import digits, ascii_letters +from typing import Any, Dict, Union, Literal, Optional import msgspec - -# from gsuid_core.utils.api.mys.models import MysSign, SignList -from gsuid_core.utils.api.mys.tools import ( - _random_int_ds, - generate_os_ds, - get_web_ds_token, - mys_version, -) from gsuid_core.utils.api.mys_api import _MysApi from gsuid_core.utils.database.models import GsUser +# from gsuid_core.utils.api.mys.models import MysSign, SignList +from gsuid_core.utils.api.mys.tools import ( + mys_version, + _random_int_ds, + generate_os_ds, + get_web_ds_token, +) + from ..sruid_utils.api.mys.api import _API from ..sruid_utils.api.mys.models import ( - AbyssData, - AvatarDetail, - AvatarInfo, - DailyNoteData, - GachaLog, - MonthlyAward, MysSign, - RogueData, - RogueLocustData, - RoleBasicInfo, - RoleIndex, + GachaLog, SignInfo, SignList, + AbyssData, + RogueData, + RoleIndex, + AvatarInfo, + AvatarDetail, + MonthlyAward, + DailyNoteData, + RoleBasicInfo, WidgetStamina, + RogueLocustData, ) -from .api import srdbsqla RECOGNIZE_SERVER = { '1': 'prod_gf_cn', @@ -71,10 +70,7 @@ class MysApi(_MysApi): async def get_user_fp(self, uid: str) -> Optional[str]: data = await GsUser.get_user_attr_by_uid(uid, 'fp', 'sr') if data is None: - seed_id, seed_time = self.get_seed() - model_name = self.generate_model_name() - data = await self.generate_fp_by_uid(uid, seed_id, seed_time, model_name) - await GsUser.update_data_by_uid_without_bot_id(uid, 'sr', fp=data) + data = self.generate_random_fp() return data async def get_user_device_id(self, uid: str) -> Optional[str]: @@ -82,7 +78,9 @@ class MysApi(_MysApi): if data is None: data = self.get_device_id() await GsUser.update_data_by_uid_without_bot_id( - uid, 'sr', device_id=data + uid, + 'sr', + device_id=data, ) return data @@ -138,7 +136,8 @@ class MysApi(_MysApi): return data async def get_widget_stamina_data( - self, uid: str + self, + uid: str, ) -> Union[WidgetStamina, int]: header = copy.deepcopy(self._HEADER) sk = await self.get_stoken(uid) @@ -147,9 +146,7 @@ class MysApi(_MysApi): header['Cookie'] = sk header['x-rpc-channel'] = 'beta' device_id = await self.get_user_device_id(uid) - header['x-rpc-device_id'] = ( - '233333' if device_id is None else device_id - ) + header['x-rpc-device_id'] = '23' if device_id is None else device_id header['x-rpc-app_version'] = '2.53.0' header['x-rpc-device_model'] = 'Mi 10' fp = await self.get_user_fp(uid) @@ -161,7 +158,9 @@ class MysApi(_MysApi): header['x-rpc-sys_version'] = '12' header['User-Agent'] = 'okhttp/4.8.0' data = await self._mys_request( - _API['STAR_RAIL_WIDGRT_URL'], 'GET', header + _API['STAR_RAIL_WIDGRT_URL'], + 'GET', + header, ) if isinstance(data, Dict): data = msgspec.convert(data['data'], type=WidgetStamina) @@ -287,9 +286,7 @@ class MysApi(_MysApi): # data = cast(AvatarInfo, data['data']) return data - async def get_avatar_detail( - self, uid: str, avatarid: str - ): + async def get_avatar_detail(self, uid: str, avatarid: str): data = await self.simple_mys_req( 'STAR_RAIL_AVATAR_DETAIL_URL', uid, @@ -322,7 +319,9 @@ class MysApi(_MysApi): 'lang': 'zh-cn', } data = await self._mys_req_get( - 'STAR_RAIL_SIGN_LIST_URL', is_os, params + 'STAR_RAIL_SIGN_LIST_URL', + is_os, + params, ) if isinstance(data, Dict): data = msgspec.convert(data['data'], type=SignList) @@ -354,7 +353,10 @@ class MysApi(_MysApi): } header = self._HEADER data = await self._mys_req_get( - 'STAR_RAIL_SIGN_INFO_URL', is_os, params, header + 'STAR_RAIL_SIGN_INFO_URL', + is_os, + params, + header, ) if isinstance(data, Dict): data = msgspec.convert(data['data'], type=SignInfo) @@ -533,7 +535,8 @@ class MysApi(_MysApi): return data async def get_role_basic_info( - self, sr_uid: str + self, + sr_uid: str, ) -> Union[RoleBasicInfo, int]: data = await self.simple_mys_req( 'STAR_RAIL_ROLE_BASIC_INFO_URL', sr_uid, header=self._HEADER @@ -545,7 +548,6 @@ class MysApi(_MysApi): mys_api = MysApi() -mys_api.dbsqla = srdbsqla mys_api.MAPI.update(_API) mys_api.is_sr = True mys_api.RECOGNIZE_SERVER = RECOGNIZE_SERVER