支持米游社账户同一游戏绑定多个游戏账户

This commit is contained in:
KimigaiiWuyi 2024-07-24 07:07:54 +08:00
parent ed9dcd7ccd
commit e274c3b5b9
6 changed files with 164 additions and 76 deletions

View File

@ -14,7 +14,7 @@ import httpx
from gsuid_core.bot import call_bot
from gsuid_core.logger import logger
from gsuid_core.utils.database.api import DBSqla
from gsuid_core.utils.database.models import GsUser
from gsuid_core.utils.database.models import GsUID, GsUser
from gsuid_core.utils.database.utils import SR_SERVER, ZZZ_SERVER
from gsuid_core.utils.database.utils import SERVER as RECOGNIZE_SERVER
from gsuid_core.utils.plugins_config.gs_config import core_plugins_config
@ -71,6 +71,14 @@ class BaseMysApi:
self, gt: str, ch: str, header: Dict
) -> Tuple[Optional[str], Optional[str]]: ...
async def get_uid(
self,
uid: str,
game_name: Optional[str] = None,
):
uid = await GsUID.get_main_uid(uid, game_name)
return uid
@abstractmethod
async def get_ck(
self,
@ -78,8 +86,7 @@ class BaseMysApi:
mode: Literal['OWNER', 'RANDOM'] = 'RANDOM',
game_name: Optional[str] = None,
) -> Optional[str]:
if game_name == 'gs':
game_name = None
uid = await self.get_uid(uid, game_name)
if mode == 'RANDOM':
if game_name == 'zzz':
@ -111,20 +118,49 @@ class BaseMysApi:
uid, game_name=game_name
)
@abstractmethod
async def get_stoken(
self, uid: str, game_name: Optional[str] = None
) -> Optional[str]: ...
) -> Optional[str]:
uid = await self.get_uid(uid, game_name)
return await GsUser.get_user_stoken_by_uid(uid, game_name)
@abstractmethod
async def get_user_fp(
self, uid: str, game_name: Optional[str] = None
) -> Optional[str]: ...
) -> Optional[str]:
uid = await self.get_uid(uid, game_name)
data = await GsUser.get_user_attr_by_uid(
uid,
'fp',
game_name,
)
if data is None:
seed_id, seed_time = self.get_seed()
device_id = self.get_device_id()
data = await self.generate_fake_fp(device_id, seed_id, seed_time)
await GsUser.update_data_by_uid_without_bot_id(
uid,
game_name,
fp=data,
)
return data
@abstractmethod
async def get_user_device_id(
self, uid: str, game_name: Optional[str] = None
) -> Optional[str]: ...
) -> Optional[str]:
uid = await self.get_uid(uid, game_name)
data = await GsUser.get_user_attr_by_uid(
uid,
'device_id',
game_name,
)
if data is None:
data = self.get_device_id()
await GsUser.update_data_by_uid_without_bot_id(
uid,
game_name,
device_id=data,
)
return data
def check_os(self, uid: str, game_name: str = 'gs') -> bool:
if game_name == 'gs' or game_name == 'sr':

View File

@ -6,23 +6,23 @@ from copy import deepcopy
from typing import Dict, Union, cast
from .bbs_request import BBSMysApi
from .api import (
GS_BASE,
ZZZ_BASE,
SIGN_BASE_OS,
SIGN_SR_BASE_OS,
SIGN_INFO_URL,
SIGN_INFO_URL_OS,
SIGN_URL_SR_OS,
SIGN_LIST_SR_OS,
SIGN_INFO_SR_OS,
SIGN_LIST_URL_OS,
SIGN_LIST_URL,
SIGN_URL,
SIGN_URL_OS,
)
from .models import MysSign, SignInfo, SignList, MonthlyAward
from .tools import random_hex, generate_os_ds, get_web_ds_token
from .api import (
GS_BASE,
SIGN_URL,
ZZZ_BASE,
SIGN_URL_OS,
SIGN_BASE_OS,
SIGN_INFO_URL,
SIGN_LIST_URL,
SIGN_URL_SR_OS,
SIGN_INFO_SR_OS,
SIGN_LIST_SR_OS,
SIGN_SR_BASE_OS,
SIGN_INFO_URL_OS,
SIGN_LIST_URL_OS,
)
_ACT_ID = {
'gs': {

View File

@ -1,7 +1,4 @@
from typing import Optional
from gsuid_core.utils.api.mys import MysApi
from gsuid_core.utils.database.models import GsUser
from gsuid_core.utils.plugins_config.gs_config import core_plugins_config
gsconfig = core_plugins_config
@ -11,46 +8,5 @@ class _MysApi(MysApi):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
async def get_stoken(
self, uid: str, game_name: Optional[str] = None
) -> Optional[str]:
return await GsUser.get_user_stoken_by_uid(uid, game_name)
async def get_user_fp(
self, uid: str, game_name: Optional[str] = None
) -> Optional[str]:
data = await GsUser.get_user_attr_by_uid(
uid,
'fp',
game_name,
)
if data is None:
seed_id, seed_time = self.get_seed()
device_id = self.get_device_id()
data = await self.generate_fake_fp(device_id, seed_id, seed_time)
await GsUser.update_data_by_uid_without_bot_id(
uid,
game_name,
fp=data,
)
return data
async def get_user_device_id(
self, uid: str, game_name: Optional[str] = None
) -> Optional[str]:
data = await GsUser.get_user_attr_by_uid(
uid,
'device_id',
game_name,
)
if data is None:
data = self.get_device_id()
await GsUser.update_data_by_uid_without_bot_id(
uid,
game_name,
device_id=data,
)
return data
mys_api = _MysApi()

View File

@ -9,9 +9,9 @@ from gsuid_core.utils.api.mys_api import mys_api
from gsuid_core.utils.error_reply import UID_HINT
from gsuid_core.utils.image.convert import convert_img
from gsuid_core.utils.fonts.fonts import core_font as cf
from gsuid_core.utils.database.models import GsBind, GsUser, GsCache
from gsuid_core.utils.database.utils import SERVER, SR_SERVER, ZZZ_SERVER
from gsuid_core.utils.image.image_tools import get_v4_bg, get_status_icon
from gsuid_core.utils.database.models import GsUID, GsBind, GsUser, GsCache
pic_path = Path(__file__).parent / 'pic'
id_list = [
@ -296,20 +296,61 @@ async def _deal_ck(bot_id: str, mes: str, user_id: str) -> str:
account_id, account_cookie, True
)
# 剔除除了原神之外的其他游戏
gs_uid_list: List[str] = []
sr_uid_list: List[str] = []
zzz_uid_list: List[str] = []
wd_uid_list: List[str] = []
bb_uid_list: List[str] = []
bbb_uid_list: List[str] = []
if isinstance(mys_data, List):
for i in mys_data:
if i['game_id'] == 2:
uid_bind = i['game_role_id']
gs_uid_list.append(uid_bind)
elif i['game_id'] == 6:
sr_uid_bind = i['game_role_id']
sr_uid_list.append(sr_uid_bind)
elif i['game_id'] == 8:
zzz_uid_bind = i['game_role_id']
zzz_uid_list.append(zzz_uid_bind)
elif i['game_id'] == 4:
wd_uid_bind = i['game_role_id']
wd_uid_list.append(wd_uid_bind)
elif i['game_id'] == 3:
bb_uid_bind = i['game_role_id']
bb_uid_list.append(bb_uid_bind)
elif i['game_id'] == 1:
bbb_uid_bind = i['game_role_id']
bbb_uid_list.append(bbb_uid_bind)
bind_dict = {
'gs': (uid_bind, gs_uid_list),
'sr': (sr_uid_bind, sr_uid_list),
'zzz': (zzz_uid_bind, zzz_uid_list),
'wd': (wd_uid_bind, wd_uid_list),
'bb': (bb_uid_bind, bb_uid_list),
'bbb': (bbb_uid_bind, bbb_uid_list),
}
for game_name in bind_dict:
_uid = bind_dict[game_name][0]
if _uid:
bind_dict[game_name][1].pop()
_uid_list = bind_dict[game_name][1][:4]
insert_dict = {}
for _i, _luid in enumerate(_uid_list):
insert_dict[f'uid_{_i}'] = _luid
_uid_exist = await GsUID.uid_exist(_uid, game_name)
if _uid_exist:
await GsUID.update_data(
_uid_exist, game_name, **insert_dict
)
else:
await GsUID.full_insert_data(
main_uid=_uid, game_name=game_name, **insert_dict
)
else:
if not (
uid_bind

View File

@ -63,6 +63,7 @@ class BaseIDModel(SQLModel):
🔹game_name (`Optional[str]`, 默认是 `None`):
假设传入`None`会返回`uid`而传入`sr`会返回`sr_uid`
特殊的, 传入`gs`也会返回`uid`!
🚀使用范例:
@ -72,6 +73,9 @@ class BaseIDModel(SQLModel):
🔸`str`: 游戏uid对应列名默认为`uid`
'''
if game_name == 'gs':
game_name = None
if game_name:
return f'{game_name}_uid'
else:

View File

@ -1,9 +1,9 @@
from typing import List, Type, Optional
from sqlmodel import Field
from sqlalchemy import or_
from sqlmodel import Field, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlmodel import Field, select
from .base_models import (
Bind,
Push,
@ -315,16 +315,17 @@ class GsUID(BaseIDModel, table=True):
@classmethod
@with_session
async def get_main_uid(
async def _get_main_uid(
cls,
session: AsyncSession,
uid: str,
game_name: Optional[str] = None,
) -> str:
) -> Optional["GsUID"]:
stmt = (
select(cls)
.where(
or_(
cls.main_uid == uid,
cls.uid_1 == uid,
cls.uid_2 == uid,
cls.uid_3 == uid,
@ -333,8 +334,58 @@ class GsUID(BaseIDModel, table=True):
)
.where(cls.game_name == game_name)
)
results = session.execute(stmt).scalars().all()
if results:
return results[0].uid
results = await session.execute(stmt)
data = results.scalars().all()
if data:
return data[0]
else:
return None
@classmethod
@with_session
async def get_main_uid(
cls,
session: AsyncSession,
uid: str,
game_name: Optional[str] = None,
) -> str:
data = await cls._get_main_uid(uid, game_name)
if data:
return data.main_uid
else:
return uid
@classmethod
@with_session
async def uid_exist(
cls,
session: AsyncSession,
uid: str,
game_name: Optional[str] = None,
) -> Optional[str]:
data = await cls._get_main_uid(uid, game_name)
if data:
return data.main_uid
else:
return None
@classmethod
@with_session
async def update_data(
cls,
session: AsyncSession,
uid: str,
game_name: Optional[str] = None,
**data,
):
sql = (
update(cls)
.where(cls.main_uid == uid)
.where(cls.game_name == game_name)
)
if data is not None:
query = sql.values(**data)
query.execution_options(synchronize_session='fetch')
await session.execute(query)
return 0
return -1