支持mys设备登录

This commit is contained in:
KimigaiiWuyi 2023-11-10 02:44:32 +08:00
parent 0e449457d9
commit ce695745f6
5 changed files with 84 additions and 48 deletions

View File

@ -58,7 +58,7 @@ exec_list.extend(
'ALTER TABLE GsBind ADD COLUMN bbb_uid TEXT', 'ALTER TABLE GsBind ADD COLUMN bbb_uid TEXT',
'ALTER TABLE GsBind ADD COLUMN zzz_uid TEXT', 'ALTER TABLE GsBind ADD COLUMN zzz_uid TEXT',
'ALTER TABLE GsBind ADD COLUMN wd_uid TEXT', 'ALTER TABLE GsBind ADD COLUMN wd_uid TEXT',
'ALTER TABLE GsUser ADD COLUMN OAID TEXT', 'ALTER TABLE GsUser ADD COLUMN device_info TEXT',
'ALTER TABLE GsUser ADD COLUMN sr_sign_switch TEXT DEFAULT "off"', 'ALTER TABLE GsUser ADD COLUMN sr_sign_switch TEXT DEFAULT "off"',
'ALTER TABLE GsUser ADD COLUMN sr_push_switch TEXT DEFAULT "off"', 'ALTER TABLE GsUser ADD COLUMN sr_push_switch TEXT DEFAULT "off"',
'ALTER TABLE GsUser ADD COLUMN draw_switch TEXT DEFAULT "off"', 'ALTER TABLE GsUser ADD COLUMN draw_switch TEXT DEFAULT "off"',

View File

@ -1,9 +1,11 @@
import json
from typing import Dict
from gsuid_core.sv import SV from gsuid_core.sv import SV
from gsuid_core.bot import Bot from gsuid_core.bot import Bot
from gsuid_core.models import Event from gsuid_core.models import Event
from gsuid_core.utils.api.mys_api import mys_api from gsuid_core.utils.api.mys_api import mys_api
from gsuid_core.utils.database.api import get_uid from gsuid_core.utils.database.models import GsUser
from gsuid_core.utils.database.models import GsBind, GsUser
from gsuid_core.utils.cookie_manager.qrlogin import qrcode_login from gsuid_core.utils.cookie_manager.qrlogin import qrcode_login
from gsuid_core.utils.cookie_manager.add_ck import ( from gsuid_core.utils.cookie_manager.add_ck import (
deal_ck, deal_ck,
@ -49,35 +51,33 @@ async def send_add_ck_msg(bot: Bot, ev: Event):
@sv_core_user_addck.on_prefix(('mys设备登录')) @sv_core_user_addck.on_prefix(('mys设备登录'))
async def send_add_device_msg(bot: Bot, ev: Event): async def send_add_device_msg(bot: Bot, ev: Event):
# ev.text = device + model_name + device_type + board + oaid + device_info data: Dict[str, str] = json.loads(ev.text.strip())
# ev.text = diting + 220812C + OP11 + taro + 1f12fd + One/PHK110/OP11:13/
data = ev.text.split('+')
uid = await get_uid(bot, ev, GsBind)
if len(data) != 6 or uid is None:
return await bot.send(
'登陆格式错误...\n请按照device + model_name + '
'device_type + board + oaid + device_info的方式输入'
)
device_id = mys_api.get_device_id() device_id = mys_api.get_device_id()
seed_id, seed_time = mys_api.get_seed() seed_id, seed_time = mys_api.get_seed()
device, model_name, device_type, board, oaid, device_info = (
data[0].strip(),
data[1].strip(),
data[2].strip(),
data[3].strip(),
data[4].strip(),
data[5].strip(),
)
fp = await mys_api.generate_fp( fp = await mys_api.generate_fp(
device_id, device_id,
model_name, data['deviceModel'],
device, data['deviceProduct'],
device_type, data['deviceName'],
board, data['deviceBoard'],
oaid, data['oaid'],
device_info, data['deviceFingerprint'],
seed_id, seed_id,
seed_time, seed_time,
) )
await GsUser.update_data_by_xx({'uid': uid}, fp=fp, device_id=device_id) await GsUser.update_data(
ev.user_id,
ev.bot_id,
fp=fp,
device_id=device_id,
device_info=data['deviceFingerprint'],
)
user_list = await GsUser.select_data_list(ev.user_id, ev.bot_id)
if user_list:
for user in user_list:
if user.cookie:
await mys_api.device_login_and_save(
device_id, fp, data['deviceModel'], user.cookie
)
await bot.send('设备绑定成功!') await bot.send('设备绑定成功!')

View File

@ -220,7 +220,7 @@ class BaseMysApi:
body = { body = {
"app_version": self.mysVersion, "app_version": self.mysVersion,
"device_id": device_id, "device_id": device_id,
"device_name": f"XiaoMi{model_name}", "device_name": f"{model_name}",
"os_version": "33", "os_version": "33",
"platform": "Android", "platform": "Android",
"registration_id": self.generate_seed(19), "registration_id": self.generate_seed(19),
@ -229,7 +229,7 @@ class BaseMysApi:
HEADER = copy.deepcopy(self._HEADER) HEADER = copy.deepcopy(self._HEADER)
HEADER['x-rpc-device_id'] = device_id HEADER['x-rpc-device_id'] = device_id
HEADER['x-rpc-device_fp'] = device_fp HEADER['x-rpc-device_fp'] = device_fp
HEADER['x-rpc-device_name'] = f"XiaoMi{model_name}" HEADER['x-rpc-device_name'] = f"{model_name}"
HEADER['x-rpc-device_model'] = model_name HEADER['x-rpc-device_model'] = model_name
HEADER['DS'] = generate_passport_ds('', body) HEADER['DS'] = generate_passport_ds('', body)
HEADER['cookie'] = cookie HEADER['cookie'] = cookie
@ -372,11 +372,27 @@ class BaseMysApi:
uid = None uid = None
if params and 'role_id' in params: if params and 'role_id' in params:
uid = params['role_id'] uid = params['role_id']
elif data and 'role_id' in data:
uid = data['role_id']
if uid is not None:
device_id = await self.get_user_device_id(uid) device_id = await self.get_user_device_id(uid)
header['x-rpc-device_fp'] = await self.get_user_fp(uid) header['x-rpc-device_fp'] = await self.get_user_fp(uid)
if device_id is not None: if device_id is not None:
header['x-rpc-device_id'] = device_id header['x-rpc-device_id'] = device_id
dfp: Optional[str] = await GsUser.get_user_attr_by_uid(
uid, 'device_info', 'sr' if self.is_sr else None
)
if dfp is not None:
df = dfp.split('/')
header['User-Agent'] = (
f"Mozilla/5.0 (Linux; Android 13; {df[1]} {df[3]}"
"; wv)AppleWebKit/537.36 (KHTML, like Gecko) "
"Version/4.0 Chrome/104.0.5112.97"
f"Mobile Safari/537.36 miHoYoBBS/2{mys_version}"
)
logger.debug(header) logger.debug(header)
for _ in range(2): for _ in range(2):
async with client.request( async with client.request(
@ -407,15 +423,6 @@ class BaseMysApi:
# 针对1034做特殊处理 # 针对1034做特殊处理
if retcode == 1034 or retcode == 5003: if retcode == 1034 or retcode == 5003:
if uid: if uid:
nd = await self.ck_in_new_device(uid)
ck = header['Cookie']
if 'DEVICEFP_SEED_ID' not in ck and nd:
header['Cookie'] = (
f'DEVICEFP_SEED_ID={nd[2]};'
f'DEVICEFP_SEED_TIME={nd[3]};'
f'{ck};DEVICE_FP={nd[0]}'
)
header['x-rpc-challenge_game'] = ( header['x-rpc-challenge_game'] = (
'6' if self.is_sr else '2' '6' if self.is_sr else '2'
) )

View File

@ -287,11 +287,47 @@ class BaseModel(BaseBotIDModel):
@classmethod @classmethod
@with_session @with_session
async def select_data( async def select_data_list(
cls: Type[T_BaseModel], cls: Type[T_BaseModel],
session: AsyncSession, session: AsyncSession,
user_id: str, user_id: str,
bot_id: Optional[str] = None, bot_id: Optional[str] = None,
) -> Optional[List[T_BaseModel]]:
'''📝简单介绍:
基类的数据选择方法
🌱参数:
🔹user_id (`str`):
传入的用户id, 例如QQ号, 一般直接取`event.user_id`
🔹bot_id (`Optional[str]`, 默认是 `None`):
传入的bot_id, 例如`onebot`, 一般直接取`event.bot_id`
🚀使用范例:
`await GsUser.select_data(user_id='444888', bot_id='onebot')`
返回值:
🔸`Optional[List[T_BaseModel]]`: 选中符合条件的全部数据不存在则为`None`
'''
if bot_id is None:
sql = select(cls).where(cls.user_id == user_id)
else:
sql = select(cls).where(
cls.user_id == user_id, cls.bot_id == bot_id
)
result = await session.execute(sql)
data = result.scalars().all()
return data if data else None
@classmethod
async def select_data(
cls: Type[T_BaseModel],
user_id: str,
bot_id: Optional[str] = None,
) -> Optional[T_BaseModel]: ) -> Optional[T_BaseModel]:
'''📝简单介绍: '''📝简单介绍:
@ -313,14 +349,7 @@ class BaseModel(BaseBotIDModel):
🔸`Optional[T_BaseModel]`: 选中符合条件的第一个数据不存在则为`None` 🔸`Optional[T_BaseModel]`: 选中符合条件的第一个数据不存在则为`None`
''' '''
if bot_id is None: data = await cls.select_data_list(user_id, bot_id)
sql = select(cls).where(cls.user_id == user_id)
else:
sql = select(cls).where(
cls.user_id == user_id, cls.bot_id == bot_id
)
result = await session.execute(sql)
data = result.scalars().all()
return data[0] if data else None return data[0] if data else None
@classmethod @classmethod

View File

@ -34,7 +34,7 @@ class GsUser(User, table=True):
sr_sign_switch: str = Field(default='off', title='星铁自动签到') sr_sign_switch: str = Field(default='off', title='星铁自动签到')
fp: Optional[str] = Field(default=None, title='Fingerprint') fp: Optional[str] = Field(default=None, title='Fingerprint')
device_id: Optional[str] = Field(default=None, title='设备ID') device_id: Optional[str] = Field(default=None, title='设备ID')
OAID: Optional[str] = Field(default=None, title='设备匿名标识符') device_info: Optional[str] = Field(default=None, title='设备fp')
class GsCache(Cache, table=True): class GsCache(Cache, table=True):