支持sr当前状态

This commit is contained in:
qwerdvd 2023-04-28 12:58:04 +08:00
parent 60f6706ac9
commit a1434b3705
7 changed files with 343 additions and 15 deletions

View File

@ -1,20 +1,21 @@
# flake8: noqa
OLD_URL = "https://api-takumi.mihoyo.com"
NEW_URL = 'https://api-takumi-record.mihoyo.com'
STAR_RAIL_SIGN_INFO_URL = f'{OLD_URL}/event/luna/info'
STAR_RAIL_SIGN_LIST_URL = f'{OLD_URL}/event/luna/home'
STAR_RAIL_SIGN_EXTRA_INFO_URL = f'{OLD_URL}/event/luna/extra_info'
STAR_RAIL_SIGN_EXTRA_REWARD_URL = f'{OLD_URL}/event/luna/extra_reward'
STAR_RAIL_SIGN_URL = f'{OLD_URL}/event/luna/sign'
STAR_RAIL_MONTH_INFO_URL = f'{OLD_URL}/event/srledger/month_info' # 开拓阅历接口
STAR_RAIL_MONTH_INFO_URL = f'{NEW_URL}/event/srledger/month_info' # 开拓阅历接口
STAR_RAIL_NOTE_URL = f'{OLD_URL}/game_record/app/hkrpg/api/note' # 实时便签接口
STAR_RAIL_INDEX_URL = f'{OLD_URL}/game_record/app/hkrpg/api/index' # 角色橱窗接口
STAR_RAIL_NOTE_URL = f'{NEW_URL}/game_record/app/hkrpg/api/note' # 实时便签接口
STAR_RAIL_INDEX_URL = f'{NEW_URL}/game_record/app/hkrpg/api/index' # 角色橱窗接口
STAR_RAIL_AVATAR_BASIC_URL = (
f'{OLD_URL}/game_record/app/hkrpg/api/avatar/basic' # 全部角色接口
f'{NEW_URL}/game_record/app/hkrpg/api/avatar/basic' # 全部角色接口
)
STAR_RAIL_ROLE_BASIC_INFO_URL = (
f'{OLD_URL}/game_record/app/hkrpg/api/role/basicInfo' # 角色基础信息接口
f'{NEW_URL}/game_record/app/hkrpg/api/role/basicInfo' # 角色基础信息接口
)

View File

@ -1,9 +1,25 @@
################
# 签到相关 #
################
from typing import Any, List, TypedDict
class SingleExpedition(TypedDict):
avatars: List[str] # 头像Url
status: str
remaining_time: int
name: str
class DailyNoteData(TypedDict):
current_stamina: int
max_stamina: int
stamina_recover_time: int
accepted_expedition_num: int
total_expedition_num: int
expeditions: List[SingleExpedition]
################
# 签到相关 #
################
class MysSign(TypedDict):
code: str
risk_code: int

View File

@ -0,0 +1,77 @@
# import asyncio
from gsuid_core.sv import SV
from gsuid_core.bot import Bot
# from gsuid_core.gss import gss
from gsuid_core.models import Event
from ..utils.convert import get_uid
# from .notice import get_notice_list
from .resin_text import get_resin_text
from ..utils.error_reply import UID_HINT
# from gsuid_core.aps import scheduler
# from gsuid_core.logger import logger
# from gsuid_core.segment import MessageSegment
# from .draw_resin_card import get_resin_img
sv_get_resin = SV('sr查询体力')
sv_get_resin_admin = SV('sr强制推送', pm=1)
@sv_get_resin.on_fullmatch(('sr当前状态'))
async def send_daily_info(bot: Bot, ev: Event):
await bot.logger.info('开始执行[sr每日信息文字版]')
uid = await get_uid(bot, ev)
print(uid)
if uid is None:
return await bot.send(UID_HINT)
await bot.logger.info('[sr每日信息文字版]UID: {}'.format(uid))
im = await get_resin_text(uid)
await bot.send(im)
# @sv_get_resin_admin.on_fullmatch(('sr强制推送体力提醒'))
# async def force_notice_job(bot: Bot, ev: Event):
# await bot.logger.info('开始执行[sr强制推送体力信息]')
# await notice_job()
# @scheduler.scheduled_job('cron', minute='*/30')
# async def notice_job():
# result = await get_notice_list()
# logger.info('[推送检查]完成!等待消息推送中...')
# logger.debug(result)
#
# # 执行私聊推送
# for bot_id in result:
# for BOT_ID in gss.active_bot:
# bot = gss.active_bot[BOT_ID]
# for user_id in result[bot_id]['direct']:
# msg = result[bot_id]['direct'][user_id]
# await bot.target_send(msg, 'direct', user_id, bot_id, '', '')
# await asyncio.sleep(0.5)
# logger.info('[推送检查] 私聊推送完成')
# for gid in result[bot_id]['group']:
# msg_list = []
# for user_id in result[bot_id]['group'][gid]:
# msg_list.append(MessageSegment.at(user_id))
# msg = result[bot_id]['group'][gid][user_id]
# msg_list.append(MessageSegment.text(msg))
# await bot.target_send(msg_list, 'group', gid, bot_id, '', '')
# await asyncio.sleep(0.5)
# logger.info('[推送检查] 群聊推送完成')
# @sv_get_resin.on_fullmatch(('每日', 'mr', '实时便笺', '便笺', '便签'))
# async def send_daily_info_pic(bot: Bot, ev: Event):
# await bot.logger.info('开始执行[每日信息]')
# user_id = ev.at if ev.at else ev.user_id
# await bot.logger.info('[每日信息]QQ号: {}'.format(user_id))
#
# im = await get_resin_img(bot.bot_id, user_id)
# await bot.send(im)

View File

@ -0,0 +1,77 @@
from typing import List
from gsuid_core.logger import logger
from ..utils.mys_api import mys_api
from ..utils.error_reply import get_error
daily_im = """*数据刷新可能存在一定延迟,请以当前游戏实际数据为准
==============
开拓力{}/{}{}
委托执行
总数/完成/上限{}/{}/{}
{}"""
def seconds2hours(seconds: int) -> str:
m, s = divmod(int(seconds), 60)
h, m = divmod(m, 60)
return '%02d:%02d:%02d' % (h, m, s)
async def get_resin_text(uid: str) -> str:
try:
dailydata = await mys_api.get_daily_data(uid)
if isinstance(dailydata, int):
return get_error(dailydata)
max_stamina = dailydata['max_stamina']
rec_time = ''
current_stamina = dailydata['current_stamina']
if current_stamina < 160:
stamina_recover_time = seconds2hours(
dailydata['stamina_recover_time']
)
next_stamina_rec_time = seconds2hours(
8 * 60
- (
(dailydata['max_stamina'] - dailydata['current_stamina'])
* 8
* 60
- int(dailydata['stamina_recover_time'])
)
)
rec_time = f' ({next_stamina_rec_time}/{stamina_recover_time})'
accepted_epedition_num = dailydata['accepted_epedition_num']
total_expedition_num = dailydata['total_expedition_num']
finished_expedition_num = 0
expedition_info: List[str] = []
for expedition in dailydata['expeditions']:
expedition_name = expedition['name']
if expedition['status'] == 'Finished':
expedition_info.append(f'{expedition_name} 探索完成')
finished_expedition_num += 1
else:
remaining_time: str = seconds2hours(
expedition['remaining_time']
)
expedition_info.append(
f'{expedition_name} 剩余时间{remaining_time}'
)
expedition_data = '\n'.join(expedition_info)
print(expedition_data)
send_mes = daily_im.format(
current_stamina,
max_stamina,
rec_time,
accepted_epedition_num,
finished_expedition_num,
total_expedition_num,
expedition_data,
)
return send_mes
except TypeError:
logger.exception('[查询当前状态]查询失败!')
return '你绑定过的UID中可能存在过期CK~请重新绑定一下噢~'

View File

@ -0,0 +1,94 @@
import re
from typing import Tuple, Union, Optional, overload
from gsuid_core.bot import Bot
from gsuid_core.models import Event
from gsuid_core.utils.api.mys.models import AbyssData, IndexData
from .api import get_sqla
from .mys_api import mys_api
from .error_reply import VERIFY_HINT
@overload
async def get_uid(bot: Bot, ev: Event) -> Optional[str]:
...
@overload
async def get_uid(
bot: Bot, ev: Event, get_user_id: bool = True
) -> Tuple[Optional[str], str]:
...
async def get_uid(
bot: Bot, ev: Event, get_user_id: bool = False
) -> Union[Optional[str], Tuple[Optional[str], str]]:
uid_data = re.findall(r'\d{9}', ev.text)
user_id = ev.at if ev.at else ev.user_id
if uid_data:
sr_uid: Optional[str] = uid_data[0]
if sr_uid:
ev.text = ev.text.replace(sr_uid, '')
else:
sqla = get_sqla(ev.bot_id)
sr_uid = await sqla.get_bind_sruid(user_id)
if get_user_id:
return sr_uid, user_id
return sr_uid
class GsCookie:
def __init__(self) -> None:
self.cookie: Optional[str] = None
self.uid: str = '0'
self.raw_data = None
self.sqla = get_sqla('TEMP')
async def get_cookie(self, uid: str) -> str:
self.uid = uid
while True:
self.cookie = await self.sqla.get_random_cookie(uid)
if self.cookie is None:
return '没有可以使用的cookie!'
await self.get_uid_data()
msg = await self.check_cookies_useable()
if isinstance(msg, str):
return msg
elif msg:
return ''
async def get_uid_data(self) -> Union[int, IndexData]:
data = await mys_api.get_info(self.uid, self.cookie)
if not isinstance(data, int):
self.raw_data = data
return data
async def get_spiral_abyss_data(
self, schedule_type: str = '1'
) -> Union[AbyssData, int]:
data = await mys_api.get_spiral_abyss_info(
self.uid, schedule_type, self.cookie
)
return data
async def check_cookies_useable(self):
if isinstance(self.raw_data, int) and self.cookie:
retcode = self.raw_data
if retcode == 10001:
await self.sqla.mark_invalid(self.cookie, 'error')
return False
# return '您的cookie已经失效, 请重新获取!'
elif retcode == 10101:
await self.sqla.mark_invalid(self.cookie, 'limit30')
return False
# return '当前查询CK已超过每日30次上限!'
elif retcode == 10102:
return '当前查询id已经设置了隐私, 无法查询!'
elif retcode == 1034:
return VERIFY_HINT
else:
return f'API报错, 错误码为{retcode}!'
else:
return True

View File

@ -7,14 +7,26 @@ from gsuid_core.utils.api.mys.request import BaseMysApi
from gsuid_core.utils.api.mys.models import MysSign, SignInfo, SignList
from gsuid_core.utils.api.mys.tools import (
random_hex,
get_ds_token,
generate_os_ds,
get_web_ds_token,
)
from ..utils.api import get_sqla
from ..sruid_utils.api.mys.api import _API
from ..sruid_utils.api.mys.models import DailyNoteData
from ....GenshinUID.GenshinUID.genshinuid_config.gs_config import gsconfig
RECOGNIZE_SERVER = {
'1': 'prod_gf_cn',
# '2': 'cn_gf01',
# '5': 'cn_qd01',
# '6': 'os_usa',
# '7': 'os_euro',
# '8': 'os_asia',
# '9': 'os_cht',
}
class _MysApi(BaseMysApi):
def __init__(self, *args, **kwargs):
@ -91,6 +103,13 @@ class _MysApi(BaseMysApi):
}
return data
async def get_daily_data(self, uid: str) -> Union[DailyNoteData, int]:
data = await self.simple_mys_req('STAR_RAIL_NOTE_URL', uid)
print(data)
if isinstance(data, Dict):
data = cast(DailyNoteData, data['data'])
return data
async def get_sign_list(self, uid) -> Union[SignList, int]:
# is_os = self.check_os(uid)
is_os = False
@ -207,5 +226,49 @@ class _MysApi(BaseMysApi):
)
return data
async def simple_mys_req(
self,
URL: str,
uid: Union[str, bool],
params: Dict = {},
header: Dict = {},
cookie: Optional[str] = None,
) -> Union[Dict, int]:
if isinstance(uid, bool):
is_os = uid
server_id = 'cn_qd01' if is_os else 'prod_gf_cn'
else:
server_id = RECOGNIZE_SERVER.get(uid[0])
is_os = False if int(uid[0]) < 6 else True
ex_params = '&'.join([f'{k}={v}' for k, v in params.items()])
print(server_id, is_os, ex_params)
if is_os:
_URL = _API[f'{URL}_OS']
HEADER = copy.deepcopy(self._HEADER_OS)
HEADER['DS'] = generate_os_ds()
else:
_URL = _API[URL]
HEADER = copy.deepcopy(self._HEADER)
HEADER['DS'] = get_ds_token(
ex_params if ex_params else f'role_id={uid}&server={server_id}'
)
HEADER.update(header)
print(_URL)
if cookie is not None:
HEADER['Cookie'] = cookie
elif 'Cookie' not in HEADER and isinstance(uid, str):
ck = await self.get_ck(uid)
if ck is None:
return -51
HEADER['Cookie'] = ck
data = await self._mys_request(
url=_URL,
method='GET',
header=HEADER,
params=params if params else {'server': server_id, 'role_id': uid},
use_proxy=True if is_os else False,
)
return data
mys_api = _MysApi()

14
poetry.lock generated
View File

@ -596,14 +596,14 @@ files = [
[[package]]
name = "platformdirs"
version = "3.4.0"
version = "3.5.0"
description = "A small Python package for determining appropriate platform-specific dirs, e.g. a \"user data dir\"."
category = "dev"
optional = false
python-versions = ">=3.7"
files = [
{file = "platformdirs-3.4.0-py3-none-any.whl", hash = "sha256:01437886022decaf285d8972f9526397bfae2ac55480ed372ed6d9eca048870a"},
{file = "platformdirs-3.4.0.tar.gz", hash = "sha256:a5e1536e5ea4b1c238a1364da17ff2993d5bd28e15600c2c8224008aff6bbcad"},
{file = "platformdirs-3.5.0-py3-none-any.whl", hash = "sha256:47692bc24c1958e8b0f13dd727307cff1db103fca36399f457da8e05f222fdc4"},
{file = "platformdirs-3.5.0.tar.gz", hash = "sha256:7954a68d0ba23558d753f73437c55f89027cf8f5108c19844d4b82e5af396335"},
]
[package.extras]
@ -992,14 +992,14 @@ socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"]
[[package]]
name = "virtualenv"
version = "20.22.0"
version = "20.23.0"
description = "Virtual Python Environment builder"
category = "dev"
optional = false
python-versions = ">=3.7"
files = [
{file = "virtualenv-20.22.0-py3-none-any.whl", hash = "sha256:48fd3b907b5149c5aab7c23d9790bea4cac6bc6b150af8635febc4cfeab1275a"},
{file = "virtualenv-20.22.0.tar.gz", hash = "sha256:278753c47aaef1a0f14e6db8a4c5e1e040e90aea654d0fc1dc7e0d8a42616cc3"},
{file = "virtualenv-20.23.0-py3-none-any.whl", hash = "sha256:6abec7670e5802a528357fdc75b26b9f57d5d92f29c5462ba0fbe45feacc685e"},
{file = "virtualenv-20.23.0.tar.gz", hash = "sha256:a85caa554ced0c0afbd0d638e7e2d7b5f92d23478d05d17a76daeac8f279f924"},
]
[package.dependencies]
@ -1009,7 +1009,7 @@ platformdirs = ">=3.2,<4"
[package.extras]
docs = ["furo (>=2023.3.27)", "proselint (>=0.13)", "sphinx (>=6.1.3)", "sphinx-argparse (>=0.4)", "sphinxcontrib-towncrier (>=0.2.1a0)", "towncrier (>=22.12)"]
test = ["covdefaults (>=2.3)", "coverage (>=7.2.3)", "coverage-enable-subprocess (>=1)", "flaky (>=3.7)", "packaging (>=23.1)", "pytest (>=7.3.1)", "pytest-env (>=0.8.1)", "pytest-freezegun (>=0.4.2)", "pytest-mock (>=3.10)", "pytest-randomly (>=3.12)", "pytest-timeout (>=2.1)"]
test = ["covdefaults (>=2.3)", "coverage (>=7.2.3)", "coverage-enable-subprocess (>=1)", "flaky (>=3.7)", "packaging (>=23.1)", "pytest (>=7.3.1)", "pytest-env (>=0.8.1)", "pytest-freezegun (>=0.4.2)", "pytest-mock (>=3.10)", "pytest-randomly (>=3.12)", "pytest-timeout (>=2.1)", "setuptools (>=67.7.1)", "time-machine (>=2.9)"]
[[package]]
name = "win32-setctime"