🚧完成抽卡链接获取抽卡记录

This commit is contained in:
qwerdvd 2023-05-08 14:17:23 +08:00
parent f4b570c134
commit 41b2a549e2
3 changed files with 193 additions and 149 deletions

View File

@ -4,11 +4,12 @@ from gsuid_core.models import Event
from ..utils.convert import get_uid
from ..utils.error_reply import UID_HINT
from .get_gachalogs import save_gachalogs
# from .get_gachalogs import save_gachalogs
# from .draw_gachalogs import draw_gachalogs_img
sv_gacha_log = SV('sr抽卡记录')
sv_get_gachalog_by_link = SV('sr导入抽卡链接')
@sv_gacha_log.on_fullmatch(('sr抽卡记录'))
@ -20,3 +21,21 @@ async def send_gacha_log_card_info(bot: Bot, ev: Event):
# im = await draw_gachalogs_img(uid, user_id)
im = '画个饼先,在做了在做了'
await bot.send(im)
@sv_get_gachalog_by_link.on_command(('sr导入抽卡链接'))
async def get_gachalog_by_link(bot: Bot, ev: Event):
await bot.logger.info('开始执行[sr导入抽卡链接]')
uid = await get_uid(bot, ev, only_uid=True)
if uid is None:
return await bot.send(UID_HINT)
gacha_url = ev.text.strip()
if gacha_url and not isinstance(gacha_url, str):
return await bot.send('请给出正确的抽卡记录链接')
is_force = False
if ev.command.startswith('强制'):
await bot.logger.info('[WARNING]本次为强制刷新')
is_force = True
await bot.send(f'UID{uid}开始执行[刷新抽卡记录],需要一定时间...请勿重复触发!')
im = await save_gachalogs(uid, gacha_url, None, is_force)
return await bot.send(im)

View File

@ -1,145 +1,165 @@
# import json
# import asyncio
# from datetime import datetime
# from typing import Dict, Optional
#
# from ..utils.mys_api import mys_api
# from ..utils.error_reply import SK_HINT
# from ..utils.resource.RESOURCE_PATH import PLAYER_PATH
#
# gacha_type_meta_data = {
# '新手祈愿': ['100'],
# '常驻祈愿': ['200'],
# '角色祈愿': ['301', '400'],
# '武器祈愿': ['302'],
# }
#
#
# async def get_new_gachalog(uid: str, full_data: Dict, is_force: bool):
# temp = []
# for gacha_name in gacha_type_meta_data:
# for gacha_type in gacha_type_meta_data[gacha_name]:
# end_id = '0'
# for page in range(1, 999):
# data = await mys_api.get_gacha_log_by_authkey(
# uid, gacha_type, page, end_id
# )
# await asyncio.sleep(0.9)
# if isinstance(data, int):
# return {}
# data = data['list']
# if data == []:
# break
# end_id = data[-1]['id']
# if data[-1] in full_data[gacha_name] and not is_force:
# for item in data:
# if item not in full_data[gacha_name]:
# temp.append(item)
# full_data[gacha_name][0:0] = temp
# temp = []
# break
# if len(full_data[gacha_name]) >= 1:
# if int(data[-1]['id']) <= int(
# full_data[gacha_name][0]['id']
# ):
# full_data[gacha_name].extend(data)
# else:
# full_data[gacha_name][0:0] = data
# else:
# full_data[gacha_name].extend(data)
# await asyncio.sleep(0.5)
# return full_data
#
#
# async def save_gachalogs(
# uid: str, raw_data: Optional[dict] = None, is_force: bool = False
# ) -> str:
# path = PLAYER_PATH / str(uid)
# if not path.exists():
# path.mkdir(parents=True, exist_ok=True)
#
# # 获取当前时间
# now = datetime.now()
# current_time = now.strftime('%Y-%m-%d %H-%M-%S')
#
# # 初始化最后保存的数据
# result = {}
#
# # 抽卡记录json路径
# gachalogs_path = path / 'gacha_logs.json'
#
# # 如果有老的,准备合并, 先打开文件
# gachalogs_history = {}
# old_normal_gacha_num, old_char_gacha_num, old_weapon_gacha_num = 0, 0, 0
# if gachalogs_path.exists():
# with open(gachalogs_path, "r", encoding='UTF-8') as f:
# gachalogs_history: Dict = json.load(f)
# gachalogs_history = gachalogs_history['data']
# old_normal_gacha_num = len(gachalogs_history['常驻祈愿'])
# old_char_gacha_num = len(gachalogs_history['角色祈愿'])
# old_weapon_gacha_num = len(gachalogs_history['武器祈愿'])
# else:
# gachalogs_history = {
# '新手祈愿': [],
# '常驻祈愿': [],
# '角色祈愿': [],
# '武器祈愿': [],
# }
#
# # 获取新抽卡记录
# if raw_data is None:
# raw_data = await get_new_gachalog(uid, gachalogs_history, is_force)
# else:
# new_data = {'新手祈愿': [], '常驻祈愿': [], '角色祈愿': [], '武器祈愿': []}
# if gachalogs_history:
# for i in ['新手祈愿', '常驻祈愿', '角色祈愿', '武器祈愿']:
# for item in raw_data[i]:
# if (
# item not in gachalogs_history[i]
# and item not in new_data[i]
# ):
# new_data[i].append(item)
# raw_data = new_data
# for i in ['新手祈愿', '常驻祈愿', '角色祈愿', '武器祈愿']:
# raw_data[i].extend(gachalogs_history[i])
#
# if raw_data == {} or not raw_data:
# return SK_HINT
#
# temp_data = {'新手祈愿': [], '常驻祈愿': [], '角色祈愿': [], '武器祈愿': []}
# for i in ['新手祈愿', '常驻祈愿', '角色祈愿', '武器祈愿']:
# for item in raw_data[i]:
# if item not in temp_data[i]:
# temp_data[i].append(item)
# raw_data = temp_data
#
# result['uid'] = uid
# result['data_time'] = current_time
# result['normal_gacha_num'] = len(raw_data['常驻祈愿'])
# result['char_gacha_num'] = len(raw_data['角色祈愿'])
# result['weapon_gacha_num'] = len(raw_data['武器祈愿'])
# for i in ['常驻祈愿', '角色祈愿', '武器祈愿']:
# if len(raw_data[i]) > 1:
# raw_data[i].sort(key=lambda x: (-int(x['id'])))
# result['data'] = raw_data
#
# # 计算数据
# normal_add = result['normal_gacha_num'] - old_normal_gacha_num
# char_add = result['char_gacha_num'] - old_char_gacha_num
# weapon_add = result['weapon_gacha_num'] - old_weapon_gacha_num
# all_add = normal_add + char_add + weapon_add
#
# # 保存文件
# with open(gachalogs_path, 'w', encoding='UTF-8') as file:
# json.dump(result, file, ensure_ascii=False)
#
# # 回复文字
# if all_add == 0:
# im = f'UID{uid}没有新增祈愿数据!'
# else:
# im = (
# f'UID{uid}数据更新成功!'
# f'本次更新{all_add}个数据\n'
# f'常驻祈愿{normal_add}个\n角色祈愿{char_add}个\n武器祈愿{weapon_add}个!'
# )
# return im
import json
import asyncio
from urllib import parse
from datetime import datetime
from typing import Dict, Optional
from ..utils.mys_api import mys_api
from ..utils.error_reply import SK_HINT
from ..utils.resource.RESOURCE_PATH import PLAYER_PATH
gacha_type_meta_data = {
'群星跃迁': ['1'],
'始发跃迁': ['2'],
'角色跃迁': ['11'],
'光锥跃迁': ['12'],
}
async def get_new_gachalog_by_link(
gacha_url: str, full_data: Dict, is_force: bool
):
temp = []
for gacha_name in gacha_type_meta_data:
for gacha_type in gacha_type_meta_data[gacha_name]:
end_id = '0'
for page in range(1, 999):
url = parse.urlparse(gacha_url)
url_parse = parse.parse_qs(url.query)
authkey = url_parse['authkey'][0]
data = await mys_api.get_gacha_log_by_link_in_authkey(
authkey, gacha_type, page, end_id
)
await asyncio.sleep(0.9)
if isinstance(data, int):
return {}
data = data['list']
if data == []:
break
end_id = data[-1]['id']
if data[-1] in full_data[gacha_name] and not is_force:
for item in data:
if item not in full_data[gacha_name]:
temp.append(item)
full_data[gacha_name][0:0] = temp
temp = []
break
if len(full_data[gacha_name]) >= 1:
if int(data[-1]['id']) <= int(
full_data[gacha_name][0]['id']
):
full_data[gacha_name].extend(data)
else:
full_data[gacha_name][0:0] = data
else:
full_data[gacha_name].extend(data)
await asyncio.sleep(0.5)
return full_data
async def save_gachalogs(
uid: str,
gacha_url: str,
raw_data: Optional[dict] = None,
is_force: bool = False,
) -> str:
path = PLAYER_PATH / str(uid)
if not path.exists():
path.mkdir(parents=True, exist_ok=True)
# 获取当前时间
now = datetime.now()
current_time = now.strftime('%Y-%m-%d %H-%M-%S')
# 初始化最后保存的数据
result = {}
# 抽卡记录json路径
gachalogs_path = path / 'gacha_logs.json'
# 如果有老的,准备合并, 先打开文件
gachalogs_history = {}
(
old_normal_gacha_num,
old_begin_gacha_num,
old_char_gacha_num,
old_weapon_gacha_num,
) = (0, 0, 0, 0)
if gachalogs_path.exists():
with open(gachalogs_path, "r", encoding='UTF-8') as f:
gachalogs_history: Dict = json.load(f)
gachalogs_history = gachalogs_history['data']
old_normal_gacha_num = len(gachalogs_history['群星跃迁'])
old_begin_gacha_num = len(gachalogs_history['始发跃迁'])
old_char_gacha_num = len(gachalogs_history['角色跃迁'])
old_weapon_gacha_num = len(gachalogs_history['光锥跃迁'])
else:
gachalogs_history = {
'群星跃迁': [],
'始发跃迁': [],
'角色跃迁': [],
'光锥跃迁': [],
}
# 获取新抽卡记录
if raw_data is None:
raw_data = await get_new_gachalog_by_link(
gacha_url, gachalogs_history, is_force
)
else:
new_data = {'始发跃迁': [], '群星跃迁': [], '角色跃迁': [], '光锥跃迁': []}
if gachalogs_history:
for i in ['始发跃迁', '群星跃迁', '角色跃迁', '光锥跃迁']:
for item in raw_data[i]:
if (
item not in gachalogs_history[i]
and item not in new_data[i]
):
new_data[i].append(item)
raw_data = new_data
for i in ['始发跃迁', '群星跃迁', '角色跃迁', '光锥跃迁']:
raw_data[i].extend(gachalogs_history[i])
if raw_data == {} or not raw_data:
return SK_HINT
temp_data = {'始发跃迁': [], '群星跃迁': [], '角色跃迁': [], '光锥跃迁': []}
for i in ['始发跃迁', '群星跃迁', '角色跃迁', '光锥跃迁']:
for item in raw_data[i]:
if item not in temp_data[i]:
temp_data[i].append(item)
raw_data = temp_data
result['uid'] = uid
result['data_time'] = current_time
result['normal_gacha_num'] = len(raw_data['群星跃迁'])
result['begin_gacha_num'] = len(raw_data['始发跃迁'])
result['char_gacha_num'] = len(raw_data['角色跃迁'])
result['weapon_gacha_num'] = len(raw_data['光锥跃迁'])
for i in ['群星跃迁', '角色跃迁', '光锥跃迁']:
if len(raw_data[i]) > 1:
raw_data[i].sort(key=lambda x: (-int(x['id'])))
result['data'] = raw_data
# 计算数据
normal_add = result['normal_gacha_num'] - old_normal_gacha_num
begin_gacha_add = result['begin_gacha_num'] - old_begin_gacha_num
char_add = result['char_gacha_num'] - old_char_gacha_num
weapon_add = result['weapon_gacha_num'] - old_weapon_gacha_num
all_add = normal_add + char_add + weapon_add + begin_gacha_add
# 保存文件
with open(gachalogs_path, 'w', encoding='UTF-8') as file:
json.dump(result, file, ensure_ascii=False)
# 回复文字
if all_add == 0:
im = f'UID{uid}没有新增祈愿数据!'
else:
im = (
f'UID{uid}数据更新成功!'
f'本次更新{all_add}个数据\n'
f'群星跃迁{normal_add}\n始发跃迁{begin_gacha_add}\n'
f'角色跃迁{char_add}\n光锥跃迁{weapon_add}个!'
)
return im

View File

@ -11,19 +11,21 @@ from .error_reply import VERIFY_HINT
@overload
async def get_uid(bot: Bot, ev: Event) -> Optional[str]:
async def get_uid(
bot: Bot, ev: Event, only_uid: bool = False
) -> Optional[str]:
...
@overload
async def get_uid(
bot: Bot, ev: Event, get_user_id: bool = True
bot: Bot, ev: Event, get_user_id: bool = True, only_uid: bool = False
) -> Tuple[Optional[str], str]:
...
async def get_uid(
bot: Bot, ev: Event, get_user_id: bool = False
bot: Bot, ev: Event, get_user_id: bool = False, only_uid: bool = False
) -> Union[Optional[str], Tuple[Optional[str], str]]:
uid_data = re.findall(r'\d{9}', ev.text)
user_id = ev.at if ev.at else ev.user_id
@ -34,6 +36,9 @@ async def get_uid(
else:
sqla = get_sqla(ev.bot_id)
sr_uid = await sqla.get_bind_sruid(user_id)
if only_uid:
sqla = get_sqla(ev.bot_id)
sr_uid = await sqla.get_bind_sruid(user_id)
if get_user_id:
return sr_uid, user_id
return sr_uid