mirror of
https://github.com/Genshin-bots/gsuid_core.git
synced 2025-05-12 06:55:49 +08:00
🎨 大幅调整关键函数,提供更多的utils
方法 (KimigaiiWuyi/GenshinUID#526)
This commit is contained in:
parent
955c073ce8
commit
cd85963e04
@ -15,6 +15,7 @@ from gsuid_core.logger import logger # noqa: E402
|
||||
from gsuid_core.config import core_config # noqa: E402
|
||||
from gsuid_core.handler import handle_event # noqa: E402
|
||||
from gsuid_core.models import MessageReceive # noqa: E402
|
||||
from gsuid_core.webconsole.mount_app import site # noqa: E402
|
||||
from gsuid_core.aps import start_scheduler, shutdown_scheduler # noqa: E402
|
||||
|
||||
app = FastAPI()
|
||||
@ -58,13 +59,6 @@ async def shutdown_event():
|
||||
|
||||
|
||||
def main():
|
||||
try:
|
||||
from gsuid_core.webconsole.mount_app import site
|
||||
from gsuid_core.webconsole.create_config_panel import (
|
||||
GsListStrConfig,
|
||||
gsconfig,
|
||||
)
|
||||
|
||||
@app.post('/genshinuid/setSV/{name}')
|
||||
@site.auth.requires('admin')
|
||||
async def _set_SV(request: Request, data: Dict, name: str):
|
||||
@ -82,6 +76,7 @@ def main():
|
||||
data['white_list'] = []
|
||||
sv.set(**data)
|
||||
|
||||
'''
|
||||
@app.post('/genshinuid/setGsConfig')
|
||||
@site.auth.requires('admin')
|
||||
async def _set_Config(request: Request, data: Dict):
|
||||
@ -94,10 +89,8 @@ def main():
|
||||
else:
|
||||
value = data[name]
|
||||
gsconfig.set_config(name, value)
|
||||
|
||||
'''
|
||||
site.mount_app(app)
|
||||
except ImportError:
|
||||
logger.warning('未加载GenshinUID...网页控制台启动失败...')
|
||||
|
||||
uvicorn.run(
|
||||
app,
|
||||
|
@ -1,6 +1,7 @@
|
||||
import sys
|
||||
import logging
|
||||
import datetime
|
||||
import traceback
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import loguru
|
||||
@ -36,6 +37,9 @@ class LoguruHandler(logging.Handler): # pragma: no cover
|
||||
|
||||
|
||||
def format_event(record):
|
||||
if record['exception']:
|
||||
return f'{traceback.print_tb(record["exception"].traceback)} \n'
|
||||
|
||||
if 'trigger' in record['extra']:
|
||||
_tg = record['extra']['trigger']
|
||||
message = (
|
||||
@ -109,5 +113,6 @@ logger.add(
|
||||
format=format_event,
|
||||
rotation=datetime.time(),
|
||||
level=LEVEL,
|
||||
diagnose=False,
|
||||
# diagnose=False,
|
||||
# backtrace=False,
|
||||
)
|
||||
|
@ -14,10 +14,8 @@ async def send_plugins_install(bot: Bot, ev: Event):
|
||||
return await bot.send('不存在该插件...可以使用[core刷新插件列表]获取最新列表!')
|
||||
|
||||
await bot.send('开始安装...请稍等一段时间...')
|
||||
if install_plugins(plugins_url):
|
||||
await bot.send('安装成功!使用[gs重启]以应用...')
|
||||
else:
|
||||
await bot.send('安装失败...请查看控制台!')
|
||||
im = install_plugins(plugins_url)
|
||||
await bot.send(im)
|
||||
|
||||
|
||||
@sv_core_install_plugins.on_prefix(('core刷新插件列表'))
|
||||
|
@ -43,11 +43,12 @@ async def get_plugins_url(name: str) -> Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
def install_plugins(url: str) -> bool:
|
||||
def install_plugins(url: str) -> str:
|
||||
plugin_name = url.split('/')[-1]
|
||||
git_path = f'{proxy_url}{url}.git'
|
||||
logger.info(f'稍等...开始安装插件, 地址: {git_path}')
|
||||
Repo.clone_from(
|
||||
git_path, PLUGINS_PATH / plugin_name, single_branch=True, depth=1
|
||||
)
|
||||
return True
|
||||
path = PLUGINS_PATH / plugin_name
|
||||
if path.exists():
|
||||
return '该插件已经安装过了!'
|
||||
Repo.clone_from(git_path, path, single_branch=True, depth=1)
|
||||
return f'插件{plugin_name}安装成功!发送[gs重启]以应用!'
|
||||
|
@ -57,7 +57,10 @@ class GsServer:
|
||||
elif plugin.suffix == '.py':
|
||||
importlib.import_module(f'plugins.{plugin.name[:-3]}')
|
||||
except Exception as e: # noqa
|
||||
logger.exception(e)
|
||||
exception = sys.exc_info()
|
||||
logger.opt(exception=exception).warning(
|
||||
'Warning encountered in function:'
|
||||
)
|
||||
logger.warning(f'插件{plugin.name}加载失败')
|
||||
|
||||
def load_dir_plugins(self, plugin: Path, nest: bool = False):
|
||||
|
67
gsuid_core/utils/api/mys_api.py
Normal file
67
gsuid_core/utils/api/mys_api.py
Normal file
@ -0,0 +1,67 @@
|
||||
from typing import Dict, Literal, Optional
|
||||
|
||||
from gsuid_core.utils.api.mys import MysApi
|
||||
from gsuid_core.utils.database.api import DBSqla
|
||||
from gsuid_core.utils.plugins_config.gs_config import core_plugins_config
|
||||
|
||||
get_sqla = DBSqla().get_sqla
|
||||
gsconfig = core_plugins_config
|
||||
|
||||
|
||||
class _MysApi(MysApi):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
async def _pass(self, gt: str, ch: str, header: Dict):
|
||||
# 警告:使用该服务(例如某RR等)需要注意风险问题
|
||||
# 本项目不以任何形式提供相关接口
|
||||
# 代码来源:GITHUB项目MIT开源
|
||||
_pass_api = gsconfig.get_config('_pass_API').data
|
||||
if _pass_api:
|
||||
data = await self._mys_request(
|
||||
url=f'{_pass_api}>={gt}&challenge={ch}',
|
||||
method='GET',
|
||||
header=header,
|
||||
)
|
||||
if isinstance(data, int):
|
||||
return None, None
|
||||
else:
|
||||
validate = data['data']['validate']
|
||||
ch = data['data']['challenge']
|
||||
else:
|
||||
validate = None
|
||||
|
||||
return validate, ch
|
||||
|
||||
async def _upass(self, header: Dict, is_bbs: bool = False):
|
||||
if is_bbs:
|
||||
raw_data = await self.get_bbs_upass_link(header)
|
||||
else:
|
||||
raw_data = await self.get_upass_link(header)
|
||||
if isinstance(raw_data, int):
|
||||
return False
|
||||
gt = raw_data['data']['gt']
|
||||
ch = raw_data['data']['challenge']
|
||||
|
||||
vl, ch = await self._pass(gt, ch, header)
|
||||
|
||||
if vl:
|
||||
await self.get_header_and_vl(header, ch, vl)
|
||||
else:
|
||||
return True
|
||||
|
||||
async def get_ck(
|
||||
self, uid: str, mode: Literal['OWNER', 'RANDOM'] = 'RANDOM'
|
||||
) -> Optional[str]:
|
||||
sqla = get_sqla('TEMP')
|
||||
if mode == 'RANDOM':
|
||||
return await sqla.get_random_cookie(uid)
|
||||
else:
|
||||
return await sqla.get_user_cookie(uid)
|
||||
|
||||
async def get_stoken(self, uid: str) -> Optional[str]:
|
||||
sqla = get_sqla('TEMP')
|
||||
return await sqla.get_user_stoken(uid)
|
||||
|
||||
|
||||
mys_api = _MysApi()
|
252
gsuid_core/utils/cookie_manager/add_ck.py
Normal file
252
gsuid_core/utils/cookie_manager/add_ck.py
Normal file
@ -0,0 +1,252 @@
|
||||
from pathlib import Path
|
||||
from typing import Dict, List
|
||||
from http.cookies import SimpleCookie
|
||||
|
||||
from gsuid_core.utils.api.mys_api import mys_api
|
||||
from gsuid_core.utils.database.api import DBSqla
|
||||
from gsuid_core.utils.error_reply import UID_HINT
|
||||
|
||||
pic_path = Path(__file__).parent / 'pic'
|
||||
id_list = [
|
||||
'login_uid',
|
||||
'login_uid_v2',
|
||||
'account_mid_v2',
|
||||
'account_mid',
|
||||
'account_id',
|
||||
'stuid',
|
||||
'ltuid',
|
||||
'ltmid',
|
||||
'stmid',
|
||||
'stmid_v2',
|
||||
'ltmid_v2',
|
||||
'stuid_v2',
|
||||
'ltuid_v2',
|
||||
]
|
||||
sk_list = ['stoken', 'stoken_v2']
|
||||
ck_list = ['cookie_token', 'cookie_token_v2']
|
||||
lt_list = ['login_ticket', 'login_ticket_v2']
|
||||
|
||||
get_sqla = DBSqla().get_sqla
|
||||
|
||||
|
||||
async def get_ck_by_all_stoken(bot_id: str):
|
||||
sqla = get_sqla(bot_id)
|
||||
uid_list: List = await sqla.get_all_uid_list()
|
||||
uid_dict = {}
|
||||
for uid in uid_list:
|
||||
user_data = await sqla.select_user_data(uid)
|
||||
if user_data:
|
||||
uid_dict[uid] = user_data.user_id
|
||||
im = await refresh_ck_by_uid_list(bot_id, uid_dict)
|
||||
return im
|
||||
|
||||
|
||||
async def get_ck_by_stoken(bot_id: str, user_id: str):
|
||||
sqla = get_sqla(bot_id)
|
||||
uid_list: List = await sqla.get_bind_uid_list(user_id)
|
||||
uid_dict = {uid: user_id for uid in uid_list}
|
||||
im = await refresh_ck_by_uid_list(bot_id, uid_dict)
|
||||
return im
|
||||
|
||||
|
||||
async def refresh_ck_by_uid_list(bot_id: str, uid_dict: Dict):
|
||||
sqla = get_sqla(bot_id)
|
||||
uid_num = len(uid_dict)
|
||||
if uid_num == 0:
|
||||
return '请先绑定一个UID噢~'
|
||||
error_list = {}
|
||||
skip_num = 0
|
||||
error_num = 0
|
||||
for uid in uid_dict:
|
||||
stoken = await sqla.get_user_stoken(uid)
|
||||
if stoken is None:
|
||||
skip_num += 1
|
||||
error_num += 1
|
||||
continue
|
||||
else:
|
||||
qid = uid_dict[uid]
|
||||
try:
|
||||
mes = await _deal_ck(bot_id, stoken, qid)
|
||||
except TypeError:
|
||||
error_list[uid] = 'SK或CK已过期!'
|
||||
error_num += 1
|
||||
continue
|
||||
ok_num = mes.count('成功')
|
||||
if ok_num < 2:
|
||||
error_list[uid] = '可能是SK已过期~'
|
||||
error_num += 1
|
||||
continue
|
||||
|
||||
s_im = f'执行完成~成功刷新CK{uid_num - error_num}个!跳过{skip_num}个!'
|
||||
f_im = '\n'.join([f'UID{u}:{error_list[u]}' for u in error_list])
|
||||
im = f'{s_im}\n{f_im}' if f_im else s_im
|
||||
|
||||
return im
|
||||
|
||||
|
||||
async def deal_ck(bot_id: str, mes: str, user_id: str, mode: str = 'PIC'):
|
||||
im = await _deal_ck(bot_id, mes, user_id)
|
||||
if mode == 'PIC':
|
||||
im = await _deal_ck_to_pic(im)
|
||||
return im
|
||||
|
||||
|
||||
async def _deal_ck_to_pic(im: str) -> bytes:
|
||||
ok_num = im.count('成功')
|
||||
if ok_num < 1:
|
||||
status_pic = pic_path / 'ck_no.png'
|
||||
elif ok_num < 2:
|
||||
status_pic = pic_path / 'ck_ok.png'
|
||||
else:
|
||||
status_pic = pic_path / 'all_ok.png'
|
||||
with open(status_pic, 'rb') as f:
|
||||
img = f.read()
|
||||
return img
|
||||
|
||||
|
||||
async def get_account_id(simp_dict: SimpleCookie) -> str:
|
||||
for _id in id_list:
|
||||
if _id in simp_dict:
|
||||
account_id = simp_dict[_id].value
|
||||
break
|
||||
else:
|
||||
account_id = ''
|
||||
return account_id
|
||||
|
||||
|
||||
async def _deal_ck(bot_id: str, mes: str, user_id: str) -> str:
|
||||
sqla = get_sqla(bot_id)
|
||||
simp_dict = SimpleCookie(mes)
|
||||
uid = await sqla.get_bind_uid(user_id)
|
||||
sr_uid = await sqla.get_bind_sruid(user_id)
|
||||
|
||||
if uid is None and sr_uid is None:
|
||||
if uid is None:
|
||||
return UID_HINT
|
||||
elif sr_uid is None:
|
||||
return '请绑定星穹铁道UID...'
|
||||
|
||||
im_list = []
|
||||
is_add_stoken = False
|
||||
status = True
|
||||
app_cookie, stoken = '', ''
|
||||
account_id, cookie_token = '', ''
|
||||
if status:
|
||||
for sk in sk_list:
|
||||
if sk in simp_dict:
|
||||
account_id = await get_account_id(simp_dict)
|
||||
if not account_id:
|
||||
return '该CK字段出错, 缺少login_uid或stuid或ltuid字段!'
|
||||
stoken = simp_dict[sk].value
|
||||
if stoken.startswith('v2_'):
|
||||
if 'mid' in simp_dict:
|
||||
mid = simp_dict['mid'].value
|
||||
app_cookie = (
|
||||
f'stuid={account_id};stoken={stoken};mid={mid}'
|
||||
)
|
||||
else:
|
||||
return 'v2类型SK必须携带mid...'
|
||||
else:
|
||||
app_cookie = f'stuid={account_id};stoken={stoken}'
|
||||
cookie_token_data = await mys_api.get_cookie_token_by_stoken(
|
||||
stoken, account_id, app_cookie
|
||||
)
|
||||
if isinstance(cookie_token_data, Dict):
|
||||
cookie_token = cookie_token_data['cookie_token']
|
||||
is_add_stoken = True
|
||||
status = False
|
||||
break
|
||||
else:
|
||||
return '返回值错误...'
|
||||
if status:
|
||||
for lt in lt_list:
|
||||
if lt in simp_dict:
|
||||
# 寻找stoken
|
||||
login_ticket = simp_dict[lt].value
|
||||
account_id = await get_account_id(simp_dict)
|
||||
if not account_id:
|
||||
return '该CK字段出错, 缺少login_uid或stuid或ltuid字段!'
|
||||
stoken_data = await mys_api.get_stoken_by_login_ticket(
|
||||
login_ticket, account_id
|
||||
)
|
||||
if isinstance(stoken_data, Dict):
|
||||
stoken = stoken_data['list'][0]['token']
|
||||
app_cookie = f'stuid={account_id};stoken={stoken}'
|
||||
cookie_token_data = (
|
||||
await mys_api.get_cookie_token_by_stoken(
|
||||
stoken, account_id
|
||||
)
|
||||
)
|
||||
if isinstance(cookie_token_data, Dict):
|
||||
cookie_token = cookie_token_data['cookie_token']
|
||||
is_add_stoken = True
|
||||
status = False
|
||||
break
|
||||
if status:
|
||||
for ck in ck_list:
|
||||
if ck in simp_dict:
|
||||
# 寻找uid
|
||||
account_id = await get_account_id(simp_dict)
|
||||
if not account_id:
|
||||
return '该CK字段出错, 缺少login_uid或stuid或ltuid字段!'
|
||||
cookie_token = simp_dict[ck].value
|
||||
status = False
|
||||
break
|
||||
if status:
|
||||
return (
|
||||
'添加Cookies失败!Cookies中应该包含cookie_token或者login_ticket相关信息!'
|
||||
'\n可以尝试退出米游社登陆重新登陆获取!'
|
||||
)
|
||||
|
||||
account_cookie = f'account_id={account_id};cookie_token={cookie_token}'
|
||||
|
||||
try:
|
||||
if sr_uid or (uid and int(uid[0]) < 6):
|
||||
mys_data = await mys_api.get_mihoyo_bbs_info(
|
||||
account_id, account_cookie
|
||||
)
|
||||
else:
|
||||
mys_data = await mys_api.get_mihoyo_bbs_info(
|
||||
account_id, account_cookie, True
|
||||
)
|
||||
# 剔除除了原神之外的其他游戏
|
||||
if isinstance(mys_data, List):
|
||||
for i in mys_data:
|
||||
if i['game_id'] == 2:
|
||||
uid = i['game_role_id']
|
||||
elif i['game_id'] == 6:
|
||||
sr_uid = i['game_role_id']
|
||||
if uid and sr_uid:
|
||||
break
|
||||
else:
|
||||
if not (uid or sr_uid):
|
||||
return f'你的米游社账号{account_id}尚未绑定原神/星铁账号,请前往米游社操作!'
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if uid:
|
||||
await sqla.refresh_cache(uid)
|
||||
if sr_uid:
|
||||
await sqla.refresh_cache(sr_uid)
|
||||
|
||||
if is_add_stoken:
|
||||
im_list.append(f'添加Stoken成功,stuid={account_id},stoken={stoken}')
|
||||
await sqla.insert_user_data(
|
||||
user_id, uid, sr_uid, account_cookie, app_cookie
|
||||
)
|
||||
|
||||
im_list.append(
|
||||
f'添加Cookies成功,account_id={account_id},cookie_token={cookie_token}'
|
||||
)
|
||||
im_list.append(
|
||||
'Cookies和Stoken属于个人重要信息,如果你是在不知情的情况下添加,请马上修改米游社账户密码,保护个人隐私!'
|
||||
)
|
||||
im_list.append(
|
||||
(
|
||||
'如果需要【gs开启自动签到】和【gs开启推送】还需要在【群聊中】使用命令“绑定uid”绑定你的uid。'
|
||||
'\n例如:绑定uid123456789。'
|
||||
)
|
||||
)
|
||||
im_list.append('你可以使用命令【绑定信息】检查你的账号绑定情况!')
|
||||
im = '\n'.join(im_list)
|
||||
return im
|
164
gsuid_core/utils/cookie_manager/qrlogin.py
Normal file
164
gsuid_core/utils/cookie_manager/qrlogin.py
Normal file
@ -0,0 +1,164 @@
|
||||
import io
|
||||
import json
|
||||
import base64
|
||||
import asyncio
|
||||
from http.cookies import SimpleCookie
|
||||
from typing import Any, List, Tuple, Union, Literal
|
||||
|
||||
import qrcode
|
||||
from qrcode.constants import ERROR_CORRECT_L
|
||||
|
||||
from gsuid_core.bot import Bot
|
||||
from gsuid_core.models import Event
|
||||
from gsuid_core.logger import logger
|
||||
from gsuid_core.segment import MessageSegment
|
||||
from gsuid_core.utils.api.mys_api import mys_api
|
||||
from gsuid_core.utils.database.api import DBSqla
|
||||
|
||||
disnote = '''免责声明:您将通过扫码完成获取米游社sk以及ck。
|
||||
本Bot将不会保存您的登录状态。
|
||||
我方仅提供米游社查询及相关游戏内容服务
|
||||
若您的账号封禁、被盗等处罚与我方无关。
|
||||
害怕风险请勿扫码!
|
||||
'''
|
||||
|
||||
get_sqla = DBSqla().get_sqla
|
||||
|
||||
|
||||
def get_qrcode_base64(url):
|
||||
qr = qrcode.QRCode(
|
||||
version=1,
|
||||
error_correction=ERROR_CORRECT_L,
|
||||
box_size=10,
|
||||
border=4,
|
||||
)
|
||||
qr.add_data(url)
|
||||
qr.make(fit=True)
|
||||
img = qr.make_image(fill_color='black', back_color='white')
|
||||
img_byte = io.BytesIO()
|
||||
img.save(img_byte, format='PNG') # type: ignore
|
||||
img_byte = img_byte.getvalue()
|
||||
return base64.b64encode(img_byte).decode()
|
||||
|
||||
|
||||
async def refresh(
|
||||
code_data: dict,
|
||||
) -> Union[Tuple[Literal[False], None], Tuple[Literal[True], Any]]:
|
||||
scanned = False
|
||||
while True:
|
||||
await asyncio.sleep(2)
|
||||
status_data = await mys_api.check_qrcode(
|
||||
code_data['app_id'], code_data['ticket'], code_data['device']
|
||||
)
|
||||
if isinstance(status_data, int):
|
||||
logger.warning('二维码已过期')
|
||||
return False, None
|
||||
if status_data['stat'] == 'Scanned':
|
||||
if not scanned:
|
||||
logger.info('二维码已扫描')
|
||||
scanned = True
|
||||
continue
|
||||
if status_data['stat'] == 'Confirmed':
|
||||
logger.info('二维码已确认')
|
||||
break
|
||||
return True, json.loads(status_data['payload']['raw'])
|
||||
|
||||
|
||||
async def qrcode_login(bot: Bot, ev: Event, user_id: str) -> str:
|
||||
sqla = get_sqla(ev.bot_id)
|
||||
|
||||
async def send_msg(msg: str):
|
||||
await bot.send(msg)
|
||||
return ''
|
||||
|
||||
code_data = await mys_api.create_qrcode_url()
|
||||
if isinstance(code_data, int):
|
||||
return await send_msg('链接创建失败...')
|
||||
try:
|
||||
im = []
|
||||
im.append(MessageSegment.text('请使用米游社扫描下方二维码登录:'))
|
||||
im.append(
|
||||
MessageSegment.image(
|
||||
f'base64://{get_qrcode_base64(code_data["url"])}'
|
||||
)
|
||||
)
|
||||
im.append(
|
||||
MessageSegment.text(
|
||||
'免责声明:您将通过扫码完成获取米游社sk以及ck。\n'
|
||||
'本Bot将不会保存您的登录状态。\n'
|
||||
'我方仅提供米游社查询及相关游戏内容服务,\n'
|
||||
'若您的账号封禁、被盗等处罚与我方无关。\n'
|
||||
'害怕风险请勿扫码~'
|
||||
)
|
||||
)
|
||||
await bot.send(MessageSegment.node(im))
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
logger.warning(f'[扫码登录] {user_id} 图片发送失败')
|
||||
status, game_token_data = await refresh(code_data)
|
||||
if status:
|
||||
assert game_token_data is not None # 骗过 pyright
|
||||
logger.info('game_token获取成功')
|
||||
cookie_token = await mys_api.get_cookie_token(**game_token_data)
|
||||
stoken_data = await mys_api.get_stoken_by_game_token(
|
||||
account_id=int(game_token_data['uid']),
|
||||
game_token=game_token_data['token'],
|
||||
)
|
||||
if isinstance(stoken_data, int):
|
||||
return await send_msg('获取SK失败...')
|
||||
account_id = game_token_data['uid']
|
||||
stoken = stoken_data['token']['token']
|
||||
mid = stoken_data['user_info']['mid']
|
||||
app_cookie = f'stuid={account_id};stoken={stoken};mid={mid}'
|
||||
ck = await mys_api.get_cookie_token_by_stoken(
|
||||
stoken, account_id, app_cookie
|
||||
)
|
||||
if isinstance(ck, int):
|
||||
return await send_msg('获取CK失败...')
|
||||
ck = ck['cookie_token']
|
||||
cookie_check = f'account_id={account_id};cookie_token={ck}'
|
||||
get_uid = await mys_api.get_mihoyo_bbs_info(account_id, cookie_check)
|
||||
# 剔除除了原神之外的其他游戏
|
||||
im = None
|
||||
if isinstance(get_uid, List):
|
||||
for i in get_uid:
|
||||
if i['game_id'] == 2:
|
||||
uid_check = i['game_role_id']
|
||||
break
|
||||
else:
|
||||
im = f'你的米游社账号{account_id}尚未绑定原神账号,请前往米游社操作!'
|
||||
return await send_msg(im)
|
||||
else:
|
||||
im = '请求失败, 请稍后再试...'
|
||||
return await send_msg(im)
|
||||
|
||||
uid_bind = await sqla.get_bind_uid(
|
||||
user_id
|
||||
) or await sqla.get_bind_sruid(user_id)
|
||||
# 没有在gsuid绑定uid的情况
|
||||
if not uid_bind:
|
||||
logger.warning('game_token获取失败')
|
||||
im = '你还没有绑定uid, 请输入[绑定uid123456]绑定你的uid, 再发送[扫码登录]进行绑定'
|
||||
return await send_msg(im)
|
||||
if isinstance(cookie_token, int):
|
||||
return await send_msg('获取CK失败...')
|
||||
# 比对gsuid数据库和扫码登陆获取到的uid
|
||||
if str(uid_bind) == uid_check or str(uid_bind) == account_id:
|
||||
return SimpleCookie(
|
||||
{
|
||||
'stoken_v2': stoken_data['token']['token'],
|
||||
'stuid': stoken_data['user_info']['aid'],
|
||||
'mid': stoken_data['user_info']['mid'],
|
||||
'cookie_token': cookie_token['cookie_token'],
|
||||
}
|
||||
).output(header='', sep=';')
|
||||
else:
|
||||
logger.warning('game_token获取失败')
|
||||
im = (
|
||||
f'检测到扫码登录UID{uid_check}与绑定UID{uid_bind}不同, '
|
||||
'gametoken获取失败, 请重新发送[扫码登录]进行登录!'
|
||||
)
|
||||
else:
|
||||
logger.warning('game_token获取失败')
|
||||
im = 'game_token获取失败: 二维码已过期'
|
||||
return await send_msg(im)
|
@ -1,3 +1,4 @@
|
||||
import asyncio
|
||||
from typing import Dict
|
||||
|
||||
from sqlalchemy import event
|
||||
@ -17,7 +18,9 @@ class DBSqla:
|
||||
self.is_sr = is_sr
|
||||
|
||||
def get_sqla(self, bot_id) -> SQLA:
|
||||
return self._get_sqla(bot_id, self.is_sr)
|
||||
sqla = self._get_sqla(bot_id, self.is_sr)
|
||||
asyncio.create_task(sqla.sr_adapter())
|
||||
return sqla
|
||||
|
||||
def _get_sqla(self, bot_id, is_sr: bool = False) -> SQLA:
|
||||
sqla_list = active_sr_sqla if is_sr else active_sqla
|
||||
|
48
gsuid_core/utils/error_reply.py
Normal file
48
gsuid_core/utils/error_reply.py
Normal file
@ -0,0 +1,48 @@
|
||||
from typing import Union
|
||||
|
||||
UID_HINT = '你还没有绑定过uid哦!\n请使用[绑定uid123456]命令绑定!'
|
||||
MYS_HINT = '你还没有绑定过mysid哦!\n请使用[绑定mys1234]命令绑定!'
|
||||
CK_HINT = """你还没有绑定过Cookie哦!发送【ck帮助】获取帮助!
|
||||
警告:绑定Cookie可能会带来未知的账号风险,请确保信任机器人管理员"""
|
||||
CHAR_HINT = '你还没有{}的缓存噢!\n请先使用【强制刷新】命令来缓存数据! \n或者使用【查询展柜角色】命令查看已缓存角色!'
|
||||
VERIFY_HINT = '''出现验证码!
|
||||
如已绑定CK: 请至米游社软件->我的->我的角色处解锁验证码
|
||||
(可使用[gs关闭推送]命令关闭体力推送以减少出现验证码风险)
|
||||
如未绑定CK: 可联系管理员使用[gs清除缓存]命令
|
||||
'''
|
||||
SK_HINT = '你还没有绑定过Stoken或者Stoken已失效~\n请加好友私聊Bot\n [扫码登陆] 或 [添加]后跟SK格式 以绑定SK'
|
||||
UPDATE_HINT = '''更新失败!更多错误信息请查看控制台...
|
||||
>> 可以尝试使用
|
||||
>> [gs强制更新](危险)
|
||||
>> [gs强行强制更新](超级危险)!'''
|
||||
|
||||
|
||||
def get_error(retcode: Union[int, str]) -> str:
|
||||
if retcode == -51:
|
||||
return CK_HINT
|
||||
elif retcode == -100:
|
||||
return '您的cookie已经失效, 请重新获取!'
|
||||
elif retcode == 10001:
|
||||
return '您的cookie已经失效, 请重新获取!'
|
||||
elif retcode == 10101:
|
||||
return '当前查询CK已超过每日30次上限!'
|
||||
elif retcode == 10102:
|
||||
return '当前查询id已经设置了隐私, 无法查询!'
|
||||
elif retcode == 1034:
|
||||
return VERIFY_HINT
|
||||
elif retcode == -10001:
|
||||
return '请求体出错, 请检查具体实现代码...'
|
||||
elif retcode == 10104:
|
||||
return CK_HINT
|
||||
elif retcode == -512009:
|
||||
return '[留影叙佳期]已经获取过该内容~!'
|
||||
elif retcode == -201:
|
||||
return '你的账号可能已被封禁, 请联系米游社客服...'
|
||||
elif retcode == -501101:
|
||||
return '当前角色冒险等阶未达到10级, 暂时无法参加此活动...'
|
||||
elif retcode == 400:
|
||||
return '[MINIGG]暂未找到此内容...'
|
||||
elif retcode == -400:
|
||||
return '请输入更详细的名称...'
|
||||
else:
|
||||
return f'API报错, 错误码为{retcode}!'
|
23
gsuid_core/utils/plugins_config/config_default.py
Normal file
23
gsuid_core/utils/plugins_config/config_default.py
Normal file
@ -0,0 +1,23 @@
|
||||
from typing import Dict
|
||||
|
||||
from .models import GSC, GsStrConfig, GsBoolConfig
|
||||
|
||||
CONIFG_DEFAULT: Dict[str, GSC] = {
|
||||
'proxy': GsStrConfig('设置代理', '设置国际服的代理地址', ''),
|
||||
'_pass_API': GsStrConfig('神奇API', '设置某种神奇的API', ''),
|
||||
'restart_command': GsStrConfig(
|
||||
'重启命令',
|
||||
'自定义使用gs重启时触发的控制台命令(看不懂勿改)',
|
||||
'poetry run python',
|
||||
),
|
||||
'CaptchaPass': GsBoolConfig(
|
||||
'失效项',
|
||||
'该选项已经无效且可能有一定危险性...',
|
||||
False,
|
||||
),
|
||||
'MysPass': GsBoolConfig(
|
||||
'无效项',
|
||||
'该选项已经无效且可能有一定危险性...',
|
||||
False,
|
||||
),
|
||||
}
|
115
gsuid_core/utils/plugins_config/gs_config.py
Normal file
115
gsuid_core/utils/plugins_config/gs_config.py
Normal file
@ -0,0 +1,115 @@
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Union
|
||||
|
||||
from msgspec import json as msgjson
|
||||
|
||||
from gsuid_core.logger import logger
|
||||
from gsuid_core.data_store import get_res_path
|
||||
|
||||
from .models import GSC, GsBoolConfig
|
||||
from .config_default import CONIFG_DEFAULT
|
||||
|
||||
|
||||
class StringConfig:
|
||||
def __new__(cls, *args, **kwargs):
|
||||
# 判断sv是否已经被初始化
|
||||
if len(args) >= 1:
|
||||
name = args[0]
|
||||
else:
|
||||
name = kwargs.get('config_name')
|
||||
|
||||
if name is None:
|
||||
raise ValueError('Config.name is None!')
|
||||
|
||||
if name in all_config_list:
|
||||
return all_config_list[name]
|
||||
else:
|
||||
_config = super().__new__(cls)
|
||||
all_config_list[name] = _config
|
||||
return _config
|
||||
|
||||
def __init__(
|
||||
self, config_name: str, CONFIG_PATH: Path, config_list: Dict[str, GSC]
|
||||
) -> None:
|
||||
self.config_list = config_list
|
||||
|
||||
if not CONFIG_PATH.exists():
|
||||
with open(CONFIG_PATH, 'wb') as file:
|
||||
file.write(msgjson.encode(config_list))
|
||||
|
||||
self.config_name = config_name
|
||||
self.CONFIG_PATH = CONFIG_PATH
|
||||
self.config: Dict[str, GSC] = {}
|
||||
self.update_config()
|
||||
|
||||
def __len__(self):
|
||||
return len(self.config)
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self.config)
|
||||
|
||||
def __getitem__(self, key) -> GSC:
|
||||
return self.config[key]
|
||||
|
||||
def write_config(self):
|
||||
with open(self.CONFIG_PATH, 'wb') as file:
|
||||
file.write(msgjson.format(msgjson.encode(self.config), indent=4))
|
||||
|
||||
def update_config(self):
|
||||
# 打开config.json
|
||||
with open(self.CONFIG_PATH, 'r', encoding='UTF-8') as f:
|
||||
self.config: Dict[str, GSC] = msgjson.decode(
|
||||
f.read(),
|
||||
type=Dict[str, GSC],
|
||||
)
|
||||
# 对没有的值,添加默认值
|
||||
for key in self.config_list:
|
||||
if key not in self.config:
|
||||
self.config[key] = self.config_list[key]
|
||||
|
||||
# 对默认值没有的值,直接删除
|
||||
delete_keys = []
|
||||
for key in self.config:
|
||||
if key not in self.config_list:
|
||||
delete_keys.append(key)
|
||||
for key in delete_keys:
|
||||
self.config.pop(key)
|
||||
|
||||
# 重新写回
|
||||
self.write_config()
|
||||
|
||||
def get_config(self, key: str) -> GSC:
|
||||
if key in self.config:
|
||||
return self.config[key]
|
||||
elif key in self.config_list:
|
||||
logger.info(f'[配置] 配置项 {key} 不存在, 但是默认配置存在, 已更新...')
|
||||
self.update_config()
|
||||
return self.config[key]
|
||||
else:
|
||||
logger.warning(f'[配置] 配置项 {key} 不存在也没有配置, 返回默认参数...')
|
||||
return GsBoolConfig('缺省值', '获取错误的配置项', False)
|
||||
|
||||
def set_config(
|
||||
self, key: str, value: Union[str, List, bool, Dict]
|
||||
) -> bool:
|
||||
if key in self.config_list:
|
||||
temp = self.config[key]
|
||||
if type(value) == type(temp):
|
||||
temp.data = value # type:ignore
|
||||
# 设置值
|
||||
self.config[key] = temp
|
||||
# 重新写回
|
||||
self.write_config()
|
||||
return True
|
||||
else:
|
||||
logger.warning(f'[配置] 配置项 {key} 写入类型不正确, 停止写入...')
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
all_config_list: Dict[str, StringConfig] = {}
|
||||
|
||||
core_plugins_config = StringConfig(
|
||||
'core', get_res_path() / 'core_config.json', CONIFG_DEFAULT
|
||||
)
|
33
gsuid_core/utils/plugins_config/models.py
Normal file
33
gsuid_core/utils/plugins_config/models.py
Normal file
@ -0,0 +1,33 @@
|
||||
from typing import Dict, List, Union
|
||||
|
||||
import msgspec
|
||||
|
||||
|
||||
class GsConfig(msgspec.Struct, tag=True):
|
||||
title: str
|
||||
desc: str
|
||||
|
||||
|
||||
class GsStrConfig(GsConfig, tag=True):
|
||||
data: str
|
||||
|
||||
|
||||
class GsBoolConfig(GsConfig, tag=True):
|
||||
data: bool
|
||||
|
||||
|
||||
class GsDictConfig(GsConfig, tag=True):
|
||||
data: Dict[str, List]
|
||||
|
||||
|
||||
class GsListStrConfig(GsConfig, tag=True):
|
||||
data: List[str]
|
||||
|
||||
|
||||
class GsListConfig(GsConfig, tag=True):
|
||||
data: List[int]
|
||||
|
||||
|
||||
GSC = Union[
|
||||
GsDictConfig, GsBoolConfig, GsListConfig, GsListStrConfig, GsStrConfig
|
||||
]
|
@ -1,18 +1,14 @@
|
||||
from gsuid_core.plugins.GenshinUID.GenshinUID.genshinuid_config import (
|
||||
gs_config,
|
||||
from gsuid_core.utils.plugins_config.gs_config import all_config_list
|
||||
from gsuid_core.utils.plugins_config.models import (
|
||||
GsStrConfig,
|
||||
GsBoolConfig,
|
||||
GsListStrConfig,
|
||||
)
|
||||
from gsuid_core.webconsole.create_base_panel import (
|
||||
get_text_panel,
|
||||
get_switch_panel,
|
||||
get_container_panel,
|
||||
)
|
||||
from gsuid_core.plugins.GenshinUID.GenshinUID.genshinuid_config.models import (
|
||||
GsStrConfig,
|
||||
GsBoolConfig,
|
||||
GsListStrConfig,
|
||||
)
|
||||
|
||||
gsconfig = gs_config.gsconfig
|
||||
|
||||
|
||||
def get_config_page():
|
||||
@ -57,8 +53,10 @@ def get_config_page():
|
||||
}
|
||||
body = []
|
||||
solo_body = []
|
||||
for config in gsconfig:
|
||||
gsc = gsconfig[config]
|
||||
for config_name in all_config_list:
|
||||
_config = all_config_list[config_name]
|
||||
for config in _config:
|
||||
gsc = _config[config]
|
||||
if isinstance(gsc, GsStrConfig):
|
||||
solo_body.append(get_text_panel(gsc.title, config, gsc.data))
|
||||
elif isinstance(gsc, GsBoolConfig):
|
||||
|
@ -1,6 +1,6 @@
|
||||
import fastapi_amis_admin
|
||||
|
||||
from gsuid_core.plugins.GenshinUID.GenshinUID.version import GenshinUID_version
|
||||
from gsuid_core.version import __version__ as GenshinUID_version
|
||||
|
||||
login_html = '''
|
||||
<p align='center'>
|
||||
|
@ -36,16 +36,15 @@ from fastapi_amis_admin.amis.components import (
|
||||
ButtonToolbar,
|
||||
)
|
||||
|
||||
from gsuid_core.logger import logger
|
||||
from gsuid_core.utils.database.api import db_url
|
||||
from gsuid_core.webconsole.models import WebUser
|
||||
from gsuid_core.utils.cookie_manager.add_ck import _deal_ck
|
||||
from gsuid_core.webconsole.html import gsuid_webconsole_help
|
||||
from gsuid_core.webconsole.create_sv_panel import get_sv_page
|
||||
from gsuid_core.version import __version__ as GenshinUID_version
|
||||
from gsuid_core.webconsole.create_config_panel import get_config_page
|
||||
from gsuid_core.plugins.GenshinUID.GenshinUID.utils.database import db_url
|
||||
from gsuid_core.utils.database.models import GsBind, GsPush, GsUser, GsCache
|
||||
from gsuid_core.plugins.GenshinUID.GenshinUID.version import GenshinUID_version
|
||||
from gsuid_core.plugins.GenshinUID.GenshinUID.genshinuid_user.add_ck import (
|
||||
_deal_ck,
|
||||
)
|
||||
from gsuid_core.webconsole.login_page import ( # noqa # 不要删
|
||||
AuthRouter,
|
||||
amis_admin,
|
||||
@ -260,8 +259,9 @@ class UserBindFormAdmin(admin.FormAdmin):
|
||||
) -> BaseApiOut[Any]:
|
||||
try:
|
||||
im = await _deal_ck(data.bot_id, data.cookie, data.user_id)
|
||||
except Exception:
|
||||
return BaseApiOut(status=-1, msg='你输入的CK可能已经失效,请按照[入门使用]进行操作!')
|
||||
except Exception as e:
|
||||
logger.warning(e)
|
||||
return BaseApiOut(status=-1, msg='你输入的CK可能已经失效/或者该用户ID未绑定UID')
|
||||
ok_num = im.count('成功')
|
||||
if ok_num < 1:
|
||||
return BaseApiOut(status=-1, msg=im)
|
||||
|
14
poetry.lock
generated
14
poetry.lock
generated
@ -2375,14 +2375,14 @@ reference = "mirrors"
|
||||
|
||||
[[package]]
|
||||
name = "uvicorn"
|
||||
version = "0.21.1"
|
||||
version = "0.22.0"
|
||||
description = "The lightning-fast ASGI server."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
{file = "uvicorn-0.21.1-py3-none-any.whl", hash = "sha256:e47cac98a6da10cd41e6fd036d472c6f58ede6c5dbee3dbee3ef7a100ed97742"},
|
||||
{file = "uvicorn-0.21.1.tar.gz", hash = "sha256:0fac9cb342ba099e0d582966005f3fdba5b0290579fed4a6266dc702ca7bb032"},
|
||||
{file = "uvicorn-0.22.0-py3-none-any.whl", hash = "sha256:e9434d3bbf05f310e762147f769c9f21235ee118ba2d2bf1155a7196448bd996"},
|
||||
{file = "uvicorn-0.22.0.tar.gz", hash = "sha256:79277ae03db57ce7d9aa0567830bbb51d7a612f54d6e1e3e92da3ef24c2c8ed8"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
@ -2399,14 +2399,14 @@ reference = "mirrors"
|
||||
|
||||
[[package]]
|
||||
name = "virtualenv"
|
||||
version = "20.22.0"
|
||||
version = "20.23.0"
|
||||
description = "Virtual Python Environment builder"
|
||||
category = "dev"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
{file = "virtualenv-20.22.0-py3-none-any.whl", hash = "sha256:48fd3b907b5149c5aab7c23d9790bea4cac6bc6b150af8635febc4cfeab1275a"},
|
||||
{file = "virtualenv-20.22.0.tar.gz", hash = "sha256:278753c47aaef1a0f14e6db8a4c5e1e040e90aea654d0fc1dc7e0d8a42616cc3"},
|
||||
{file = "virtualenv-20.23.0-py3-none-any.whl", hash = "sha256:6abec7670e5802a528357fdc75b26b9f57d5d92f29c5462ba0fbe45feacc685e"},
|
||||
{file = "virtualenv-20.23.0.tar.gz", hash = "sha256:a85caa554ced0c0afbd0d638e7e2d7b5f92d23478d05d17a76daeac8f279f924"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
@ -2416,7 +2416,7 @@ platformdirs = ">=3.2,<4"
|
||||
|
||||
[package.extras]
|
||||
docs = ["furo (>=2023.3.27)", "proselint (>=0.13)", "sphinx (>=6.1.3)", "sphinx-argparse (>=0.4)", "sphinxcontrib-towncrier (>=0.2.1a0)", "towncrier (>=22.12)"]
|
||||
test = ["covdefaults (>=2.3)", "coverage (>=7.2.3)", "coverage-enable-subprocess (>=1)", "flaky (>=3.7)", "packaging (>=23.1)", "pytest (>=7.3.1)", "pytest-env (>=0.8.1)", "pytest-freezegun (>=0.4.2)", "pytest-mock (>=3.10)", "pytest-randomly (>=3.12)", "pytest-timeout (>=2.1)"]
|
||||
test = ["covdefaults (>=2.3)", "coverage (>=7.2.3)", "coverage-enable-subprocess (>=1)", "flaky (>=3.7)", "packaging (>=23.1)", "pytest (>=7.3.1)", "pytest-env (>=0.8.1)", "pytest-freezegun (>=0.4.2)", "pytest-mock (>=3.10)", "pytest-randomly (>=3.12)", "pytest-timeout (>=2.1)", "setuptools (>=67.7.1)", "time-machine (>=2.9)"]
|
||||
|
||||
[package.source]
|
||||
type = "legacy"
|
||||
|
@ -55,7 +55,7 @@ typing-extensions==4.5.0 ; python_full_version >= "3.8.1" and python_full_versio
|
||||
tzdata==2023.3 ; python_full_version >= "3.8.1" and python_full_version < "4.0.0"
|
||||
tzlocal==4.3 ; python_full_version >= "3.8.1" and python_full_version < "4.0.0"
|
||||
urllib3==1.26.15 ; python_full_version >= "3.8.1" and python_full_version < "4.0.0"
|
||||
uvicorn==0.21.1 ; python_full_version >= "3.8.1" and python_full_version < "4.0.0"
|
||||
uvicorn==0.22.0 ; python_full_version >= "3.8.1" and python_full_version < "4.0.0"
|
||||
websockets==10.4 ; python_full_version >= "3.8.1" and python_full_version < "4.0.0"
|
||||
win32-setctime==1.1.0 ; python_full_version >= "3.8.1" and python_full_version < "4.0.0" and sys_platform == "win32"
|
||||
yarl==1.9.2 ; python_full_version >= "3.8.1" and python_full_version < "4.0.0"
|
||||
|
Loading…
x
Reference in New Issue
Block a user