抽象数据库模型和方法, 便于插件继承建表调用

This commit is contained in:
Wuyi无疑 2023-08-05 22:15:56 +08:00
parent 0cc9552c4a
commit 32dbd988be
7 changed files with 947 additions and 530 deletions

View File

@ -3,6 +3,7 @@ from gsuid_core.bot import Bot
from gsuid_core.gss import gss
from gsuid_core.models import Event
from gsuid_core.logger import logger
from gsuid_core.segment import Message
from .draw_user_card import get_user_card
@ -23,5 +24,10 @@ async def send_direct_msg(bot: Bot, ev: Event):
logger.info('开始执行[给我发消息]')
for bot_id in gss.active_bot:
await gss.active_bot[bot_id].target_send(
'这是一条主动消息', 'direct', ev.user_id, ev.bot_id, ev.bot_self_id, ''
[Message('text', '这是一条主动消息'), Message('group', ev.group_id)],
'direct',
ev.user_id,
ev.bot_id,
ev.bot_self_id,
'',
)

View File

@ -1,16 +1,18 @@
import re
import asyncio
from typing import Dict
from typing import Dict, Type, Tuple, Union, Optional, overload
from sqlalchemy import event
from gsuid_core.data_store import get_res_path
from gsuid_core.bot import Bot
from gsuid_core.models import Event
from gsuid_core.utils.database.dal import SQLA
from gsuid_core.utils.database.base_models import Bind, engine
is_wal = False
active_sqla: Dict[str, SQLA] = {}
active_sr_sqla: Dict[str, SQLA] = {}
db_url = str(get_res_path().parent / 'GsData.db')
class DBSqla:
@ -25,15 +27,15 @@ class DBSqla:
def _get_sqla(self, bot_id, is_sr: bool = False) -> SQLA:
sqla_list = active_sr_sqla if is_sr else active_sqla
if bot_id not in sqla_list:
sqla = SQLA(db_url, bot_id, is_sr)
sqla = SQLA(bot_id, is_sr)
sqla_list[bot_id] = sqla
sqla.create_all()
@event.listens_for(sqla.engine.sync_engine, 'connect')
def engine_connect(conn, branch):
@event.listens_for(engine.sync_engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
if is_wal:
cursor = conn.cursor()
cursor.execute('PRAGMA journal_mode=WAL')
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA journal_mode=WAL")
cursor.close()
return sqla_list[bot_id]
@ -43,3 +45,44 @@ class DBSqla:
def get_sr_sqla(self, bot_id):
return self._get_sqla(bot_id, True)
@overload
async def get_uid(
bot: Bot,
ev: Event,
bind_model: Type[Bind],
game_name: Optional[str] = None,
) -> Optional[str]:
...
@overload
async def get_uid(
bot: Bot,
ev: Event,
bind_model: Type[Bind],
game_name: Optional[str] = None,
get_user_id: bool = True,
) -> Tuple[Optional[str], str]:
...
async def get_uid(
bot: Bot,
ev: Event,
bind_model: Type[Bind],
game_name: Optional[str] = None,
get_user_id: bool = False,
) -> Union[Optional[str], Tuple[Optional[str], str]]:
uid_data = re.findall(r'\d+', ev.text)
user_id = ev.at if ev.at else ev.user_id
if uid_data:
uid: Optional[str] = uid_data[0]
if uid:
ev.text = ev.text.replace(uid, '')
else:
uid = await bind_model.get_uid_by_game(user_id, ev.bot_id, game_name)
if get_user_id:
return uid, user_id
return uid

View File

@ -0,0 +1,661 @@
from functools import wraps
from typing_extensions import ParamSpec, Concatenate
from typing import (
Any,
Dict,
List,
Type,
TypeVar,
Callable,
Optional,
Awaitable,
)
from sqlalchemy.future import select
from sqlalchemy.orm import sessionmaker
from sqlmodel import Field, SQLModel, col
from sqlalchemy.sql.expression import func
from sqlalchemy import and_, delete, update
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from gsuid_core.data_store import get_res_path
T_BaseModel = TypeVar('T_BaseModel', bound='BaseModel')
T_BaseIDModel = TypeVar('T_BaseIDModel', bound='BaseIDModel')
T_User = TypeVar('T_User', bound='User')
P = ParamSpec("P")
R = TypeVar("R")
db_url = str(get_res_path().parent / 'GsData.db')
url = f'sqlite+aiosqlite:///{db_url}'
engine = create_async_engine(url, pool_recycle=1500)
async_maker = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
def with_session(
func: Callable[Concatenate[Any, AsyncSession, P], Awaitable[R]]
) -> Callable[Concatenate[Any, P], Awaitable[R]]:
@wraps(func)
async def wrapper(self, *args: P.args, **kwargs: P.kwargs):
async with async_maker() as session:
return await func(self, session, *args, **kwargs)
return wrapper
class BaseIDModel(SQLModel):
id: Optional[int] = Field(default=None, primary_key=True, title='序号')
@classmethod
def get_gameid_name(cls, game_name: Optional[str] = None):
if game_name:
return f'{game_name}_uid'
else:
return 'uid'
@classmethod
@with_session
async def full_insert_data(
cls, session: AsyncSession, model: Type["BaseIDModel"], **data
) -> int:
session.add(model(**data))
await session.commit()
return 0
@classmethod
@with_session
async def base_select_data(
cls, session: AsyncSession, model: Type[T_BaseIDModel], **data
) -> Optional[T_BaseIDModel]:
conditions = []
for key, value in data.items():
conditions.append(getattr(model, key) == value)
where_clause = and_(*conditions)
sql = select(model).where(where_clause)
result = await session.execute(sql)
data = result.scalars().all()
return data[0] if data else None
@classmethod
async def data_exist(cls, model: Type[T_BaseIDModel], **data) -> bool:
return bool(await cls.base_select_data(model, **data))
class BaseBotIDModel(BaseIDModel):
bot_id: str = Field(title='平台')
@classmethod
@with_session
async def update_data_by_uid(
cls,
session: AsyncSession,
uid: str,
bot_id: str,
game_name: Optional[str] = None,
**data,
) -> int:
sql = update(cls).where(
getattr(cls, cls.get_gameid_name(game_name)) == uid,
cls.bot_id == bot_id,
)
if data is not None:
query = sql.values(**data)
query.execution_options(synchronize_session='fetch')
await session.execute(query)
return 0
return -1
class BaseModel(BaseBotIDModel):
user_id: str = Field(title='账号')
################################
# 基本的增删改查 #
################################
@classmethod
@with_session
async def select_data(
cls: Type[T_BaseModel],
session: AsyncSession,
user_id: str,
bot_id: Optional[str] = None,
) -> Optional[T_BaseModel]:
if bot_id is None:
sql = select(cls).where(cls.user_id == user_id)
else:
sql = select(cls).where(
cls.user_id == user_id, cls.bot_id == bot_id
)
result = await session.execute(sql)
data = result.scalars().all()
return data[0] if data else None
@classmethod
@with_session
async def insert_data(
cls, session: AsyncSession, user_id: str, bot_id: str, **data
) -> int:
session.add(cls(user_id=user_id, bot_id=bot_id, **data))
await session.commit()
return 0
@classmethod
@with_session
async def delete_data(
cls, session: AsyncSession, user_id: str, bot_id: str, **data
) -> int:
await session.delete(cls(user_id=user_id, bot_id=bot_id, **data))
await session.commit()
return 0
@classmethod
@with_session
async def update_data(
cls, session: AsyncSession, user_id: str, bot_id: str, **data
) -> int:
sql = update(cls).where(cls.user_id == user_id, cls.bot_id == bot_id)
if data is not None:
query = sql.values(**data)
query.execution_options(synchronize_session='fetch')
await session.execute(query)
await session.commit()
return 0
return -1
class Bind(BaseModel):
group_id: Optional[str] = Field(title='群号')
################################
# 额外的扩展方法 #
################################
@classmethod
async def get_uid_list_by_game(
cls,
user_id: str,
bot_id: str,
game_name: Optional[str] = None,
) -> Optional[List[str]]:
result = await cls.select_data(user_id, bot_id)
if result is None:
return None
uid = getattr(result, cls.get_gameid_name(game_name))
if uid is None:
return None
else:
uid_list = uid.split('_')
if uid_list:
return uid_list
else:
return None
@classmethod
async def get_uid_by_game(
cls,
user_id: str,
bot_id: str,
game_name: Optional[str] = None,
) -> Optional[str]:
result = await cls.get_uid_list_by_game(user_id, bot_id, game_name)
if result is None or not result:
return None
return result[0]
@classmethod
async def bind_exists(
cls,
user_id: str,
bot_id: str,
) -> bool:
'''
查询当前user_id是否已有绑定数据
'''
return bool(await cls.select_data(user_id, bot_id))
@classmethod
async def insert_uid(
cls,
user_id: str,
bot_id: str,
uid: str,
group_id: Optional[str] = None,
lenth_limit: Optional[int] = None,
is_digit: Optional[bool] = True,
game_name: Optional[str] = None,
) -> int:
'''
为数据库增加绑定UID
如果有传`lenth_limit`, 当uid位数不等于的时候, 返回`-1`
如果该UID已绑定, 则返回`-2`
`is_digit`默认为`True`, 进行合法性校验, 如果不是全数字, 返回`-3`
成功绑定, 则返回`0`
'''
result = await cls.get_uid_list_by_game(user_id, bot_id, game_name)
if lenth_limit:
if len(uid) != lenth_limit:
return -1
if is_digit is not None:
if not uid.isdigit():
return -3
if result is None and not await cls.bind_exists(user_id, bot_id):
return await cls.insert_data(
user_id,
bot_id,
**{cls.get_gameid_name(game_name): uid, 'group_id': group_id},
)
elif result is None:
new_uid = uid
elif uid in result:
return -2
else:
result.append(uid)
new_uid = '_'.join(result)
await cls.update_data(
user_id,
bot_id,
**{cls.get_gameid_name(game_name): new_uid},
)
return 0
@classmethod
async def delete_uid(
cls,
user_id: str,
bot_id: str,
uid: str,
game_name: Optional[str] = None,
):
result = await cls.get_uid_list_by_game(user_id, bot_id, game_name)
if result is None:
return -1
if uid not in result:
return -1
result.remove(uid)
new_uid = '_'.join(result)
await cls.update_data(
user_id,
bot_id,
**{cls.get_gameid_name(game_name): new_uid},
)
return 0
@classmethod
@with_session
async def get_all_uid_list_by_game(
cls,
session: AsyncSession,
bot_id: str,
game_name: Optional[str] = None,
) -> List[str]:
sql = select(cls).where(cls.bot_id == bot_id)
result = await session.execute(sql)
data: List["Bind"] = result.scalars().all()
uid_list: List[str] = []
for item in data:
uid = getattr(item, cls.get_gameid_name(game_name))
if uid is not None and uid:
game_uid_list: List[str] = uid.split("_")
uid_list.extend(game_uid_list)
return uid_list
@classmethod
async def switch_uid_by_game(
cls,
user_id: str,
bot_id: str,
uid: Optional[str] = None,
game_name: Optional[str] = None,
) -> int:
'''
切换用户UID, 成功返回0
可传确定的UID
如果不传UID,则自动切换序列下一个UID
如果不存在绑定记录,则返回-1
如果传了UID但是不存在绑定列表,则返回-2
如果绑定UID列表不足2个,返回-3
'''
uid_list = await cls.get_uid_list_by_game(user_id, bot_id, game_name)
if not uid_list:
return -1
elif len(uid_list) <= 1:
return -3
elif uid is None:
uid = uid_list[1]
old_uid = uid_list[0]
uid_list.remove(uid)
uid_list.remove(old_uid)
uid_list.insert(0, uid)
uid_list.append(old_uid)
elif uid not in uid_list:
return -2
else:
uid_list.remove(uid)
uid_list.insert(0, uid)
await cls.update_data(
user_id,
bot_id,
**{cls.get_gameid_name(game_name): uid_list},
)
return 0
@classmethod
async def get_bind_group_list(cls, user_id: str, bot_id: str) -> List[str]:
data: Optional["Bind"] = await cls.select_data(user_id, bot_id)
return data.group_id.split("_") if data and data.group_id else []
@classmethod
async def get_bind_group(cls, user_id: str, bot_id: str) -> Optional[str]:
data = await cls.get_bind_group_list(user_id, bot_id)
return data[0] if data else None
@classmethod
@with_session
async def get_group_all_uid(cls, session: AsyncSession, group_id: str):
result = await session.scalars(
select(cls).where(col(cls.group_id).contains(group_id))
)
data = result.all()
return data[0] if data else None
class User(BaseModel):
cookie: str = Field(default=None, title='Cookie')
stoken: Optional[str] = Field(default=None, title='Stoken')
status: Optional[str] = Field(default=None, title='状态')
push_switch: str = Field(default='off', title='全局推送开关')
sign_switch: str = Field(default='off', title='自动签到')
@classmethod
@with_session
async def select_data_by_uid(
cls: Type[T_User],
session: AsyncSession,
uid: str,
game_name: Optional[str] = None,
) -> Optional[T_User]:
result = await session.execute(
select(cls).where(
getattr(cls, cls.get_gameid_name(game_name)) == uid,
)
)
data = result.scalars().all()
return data[0] if data else None
@classmethod
@with_session
async def get_user_all_data_by_user_id(
cls: Type[T_User], session: AsyncSession, user_id: str
) -> Optional[List[T_User]]:
result = await session.execute(
select(cls).where(cls.user_id == user_id)
)
data = result.scalars().all()
return data if data else None
@classmethod
async def get_user_attr(
cls,
user_id: str,
bot_id: str,
attr: str,
) -> Optional[Any]:
result = await cls.select_data(user_id, bot_id)
return getattr(result, attr) if result else None
@classmethod
async def get_user_attr_by_uid(
cls,
uid: str,
attr: str,
game_name: Optional[str] = None,
) -> Optional[Any]:
result = await cls.select_data_by_uid(uid, game_name)
return getattr(result, attr) if result else None
@classmethod
async def get_user_attr_by_user_id(
cls,
user_id: str,
attr: str,
) -> Optional[Any]:
result = await cls.select_data(user_id)
return getattr(result, attr) if result else None
@classmethod
@with_session
async def mark_invalid(cls, session: AsyncSession, cookie: str, mark: str):
sql = update(cls).where(cls.cookie == cookie).values(status=mark)
await session.execute(sql)
await session.commit()
return True
@classmethod
async def get_user_cookie_by_uid(
cls, uid: str, game_name: Optional[str] = None
) -> Optional[str]:
return await cls.get_user_attr_by_uid(uid, 'cookie', game_name)
@classmethod
async def get_user_cookie_by_user_id(
cls, user_id: str, bot_id: str
) -> Optional[str]:
return await cls.get_user_attr(user_id, bot_id, 'cookie')
@classmethod
async def get_user_stoken_by_uid(
cls, uid: str, game_name: Optional[str] = None
) -> Optional[str]:
return await cls.get_user_attr_by_uid(uid, 'stoken', game_name)
@classmethod
async def get_user_stoken_by_user_id(
cls, user_id: str, bot_id: str
) -> Optional[str]:
return await cls.get_user_attr(user_id, bot_id, 'stoken')
@classmethod
async def cookie_validate(
cls, uid: str, game_name: Optional[str] = None
) -> bool:
data = await cls.get_user_attr_by_uid(uid, 'status', game_name)
if not data:
return True
else:
return False
@classmethod
@with_session
async def get_switch_open_list(
cls: Type[T_User], session: AsyncSession, switch_name: str
) -> List[T_User]:
_switch = getattr(cls, switch_name, cls.push_switch)
sql = select(cls).filter(_switch != 'off')
data = await session.execute(sql)
data_list: List[T_User] = data.scalars().all()
return [user for user in data_list]
@classmethod
@with_session
async def get_all_user(
cls: Type[T_User], session: AsyncSession
) -> List[T_User]:
sql = select(cls).where(cls.cookie is not None, cls.cookie != '')
result = await session.execute(sql)
data: List[T_User] = result.scalars().all()
return data
@classmethod
async def get_all_cookie(cls) -> List[str]:
data = await cls.get_all_user()
return [_u.cookie for _u in data if _u.cookie]
@classmethod
async def get_all_stoken(cls) -> List[str]:
data = await cls.get_all_user()
return [_u.stoken for _u in data if _u.stoken]
@classmethod
async def get_all_error_cookie(cls) -> List[str]:
data = await cls.get_all_user()
return [_u.cookie for _u in data if _u.cookie and _u.status]
@classmethod
async def get_all_push_user_list(cls: Type[T_User]) -> List[T_User]:
data = await cls.get_all_user()
return [user for user in data if user.push_switch != 'off']
@classmethod
async def user_exists(
cls, uid: str, game_name: Optional[str] = None
) -> bool:
data = await cls.select_data_by_uid(uid, game_name)
return True if data else False
@classmethod
@with_session
async def get_random_cookie(
cls: Type[T_User],
session: AsyncSession,
uid: str,
cache_model: Optional[Type["Cache"]] = None,
condition: Optional[Dict[str, str]] = None,
game_name: Optional[str] = None,
) -> Optional[str]:
# 有绑定自己CK 并且该CK有效的前提下优先使用自己CK
if await cls.user_exists(uid, game_name) and await cls.cookie_validate(
uid, game_name
):
return await cls.get_user_cookie_by_uid(uid, game_name)
# 自动刷新缓存
# await self.delete_error_cache()
# 获得缓存库Ck
if cache_model is not None:
cache_data = await cache_model.select_cache_cookie(uid, game_name)
if cache_data is not None:
return cache_data
# 随机取CK
if condition:
for i in condition:
sql = (
select(cls)
.where(getattr(cls, i) == condition[i])
.order_by(func.random())
)
data = await session.execute(sql)
user_list: List[T_User] = data.scalars().all()
break
else:
user_list = await cls.get_all_user()
else:
user_list = await cls.get_all_user()
for user in user_list:
if not user.status and user.cookie:
if cache_model:
# 进入缓存
await cache_model.insert_cache_data(
user.cookie,
**{cls.get_gameid_name(game_name): uid},
)
return user.cookie
continue
else:
return None
@classmethod
@with_session
async def delete_user_data_by_uid(
cls, session: AsyncSession, uid: str, game_name: Optional[str] = None
):
if await cls.user_exists(uid, game_name):
sql = delete(cls).where(
getattr(cls, cls.get_gameid_name(game_name)) == uid
)
await session.execute(sql)
await session.commit()
return True
return False
class Cache(BaseIDModel):
cookie: str = Field(default=None, title='Cookie')
@classmethod
@with_session
async def select_cache_cookie(
cls, session: AsyncSession, uid: str, game_name: Optional[str]
) -> Optional[str]:
sql = select(cls).where(
getattr(cls, cls.get_gameid_name(game_name)) == uid
)
result = await session.execute(sql)
data: List["Cache"] = result.scalars().all()
return data[0].cookie if len(data) >= 1 else None
@classmethod
@with_session
async def delete_error_cache(
cls, session: AsyncSession, user: Type["User"]
) -> bool:
data = await user.get_all_error_cookie()
for cookie in data:
sql = delete(cls).where(cls.cookie == cookie)
await session.execute(sql)
return True
@classmethod
@with_session
async def delete_all_cache(
cls, session: AsyncSession, user: Type["User"]
) -> bool:
sql = update(user).where(user.status == 'limit30').values(status=None)
empty_sql = delete(cls)
await session.execute(sql)
await session.execute(empty_sql)
await session.commit()
return True
@classmethod
@with_session
async def refresh_cache(
cls, session: AsyncSession, uid: str, game_name: Optional[str] = None
) -> bool:
await session.execute(
delete(cls).where(
getattr(cls, cls.get_gameid_name(game_name)) == uid
)
)
return True
@classmethod
@with_session
async def insert_cache_data(
cls, session: AsyncSession, cookie: str, **data
) -> bool:
new_data = cls(cookie=cookie, **data)
session.add(new_data)
await session.commit()
return True
class Push(BaseBotIDModel):
pass

View File

@ -2,27 +2,18 @@ import re
import asyncio
from typing import Dict, List, Literal, Optional
from sqlmodel import SQLModel
from sqlalchemy.sql import text
from sqlmodel import SQLModel, col
from sqlalchemy.future import select
from sqlalchemy import delete, update
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql.expression import func
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from .utils import SERVER, SR_SERVER
from .base_models import engine, async_maker
from .models import GsBind, GsPush, GsUser, GsCache
class SQLA:
def __init__(self, url: str, bot_id: str, is_sr: bool = False):
def __init__(self, bot_id: str, is_sr: bool = False):
self.bot_id = bot_id
self.is_sr = is_sr
self.url = f'sqlite+aiosqlite:///{url}'
self.engine = create_async_engine(self.url, pool_recycle=1500)
self.async_session = sessionmaker(
self.engine, expire_on_commit=False, class_=AsyncSession
)
def create_all(self):
try:
@ -33,7 +24,7 @@ class SQLA:
loop.close()
async def _create_all(self):
async with self.engine.begin() as conn:
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
await self.sr_adapter()
@ -50,7 +41,7 @@ class SQLA:
'ALTER TABLE GsUser ADD COLUMN draw_switch TEXT DEFAULT "off"',
'ALTER TABLE GsCache ADD COLUMN sr_uid TEXT',
]
async with self.async_session() as session:
async with async_maker() as session:
for _t in exec_list:
try:
await session.execute(text(_t))
@ -62,213 +53,119 @@ class SQLA:
# GsBind 部分 #
#####################
async def select_bind_data(self, user_id: str) -> Optional[GsBind]:
async with self.async_session() as session:
async with session.begin():
result = await session.execute(
select(GsBind).where(
GsBind.user_id == user_id, GsBind.bot_id == self.bot_id
)
)
data = result.scalars().all()
return data[0] if data else None
return await GsBind.select_data(user_id, self.bot_id)
async def insert_bind_data(self, user_id: str, **data) -> int:
async with self.async_session() as session:
async with session.begin():
new_uid: str = data['uid'] if 'uid' in data else ''
new_uid = new_uid.strip()
new_sr_uid: str = data['sr_uid'] if 'sr_uid' in data else ''
new_sr_uid = new_sr_uid.strip()
if len(new_uid) != 9 and len(new_sr_uid) != 9:
return -1
elif not new_uid.isdigit() and not new_sr_uid.isdigit():
return -3
if new_uid and await self.bind_exists(user_id):
uid_list = await self.get_bind_uid_list(user_id)
if new_uid not in uid_list:
uid_list.append(new_uid)
else:
return -2
data['uid'] = '_'.join(uid_list)
await self.update_bind_data(user_id, data)
elif new_sr_uid and await self.bind_exists(user_id):
sr_uid_list = await self.get_bind_sruid_list(user_id)
if new_sr_uid not in sr_uid_list:
sr_uid_list.append(new_sr_uid)
else:
return -2
data['sr_uid'] = '_'.join(sr_uid_list)
await self.update_bind_data(user_id, data)
else:
new_data = GsBind(
user_id=user_id, bot_id=self.bot_id, **data
)
session.add(new_data)
await session.commit()
return 0
group_id = data['group_id'] if 'group_id' in data else None
new_uid: str = data['uid'] if 'uid' in data else ''
new_uid = new_uid.strip()
new_sr_uid: str = data['sr_uid'] if 'sr_uid' in data else ''
new_sr_uid = new_sr_uid.strip()
if new_uid:
retcode = await GsBind.insert_uid(
user_id, self.bot_id, new_uid, group_id, 9, True
)
if retcode:
return retcode
if new_sr_uid:
retcode = await GsBind.insert_uid(
user_id,
self.bot_id,
new_sr_uid,
group_id,
9,
True,
'sr',
)
if retcode:
return retcode
return 0
async def delete_bind_data(self, user_id: str, **data) -> int:
async with self.async_session() as session:
async with session.begin():
_uid = data['uid'] if 'uid' in data else ''
_sr_uid = data['sr_uid'] if 'sr_uid' in data else ''
if _uid and await self.bind_exists(user_id):
uid_list = await self.get_bind_uid_list(user_id)
if uid_list and _uid in uid_list:
uid_list.remove(_uid)
else:
return -1
data['uid'] = '_'.join(uid_list)
await self.update_bind_data(user_id, data)
await session.commit()
return 0
elif _sr_uid and await self.bind_exists(user_id):
uid_list = await self.get_bind_sruid_list(user_id)
if uid_list and _sr_uid in uid_list:
uid_list.remove(_sr_uid)
else:
return -1
data['sr_uid'] = '_'.join(uid_list)
await self.update_bind_data(user_id, data)
await session.commit()
return 0
else:
return -1
_uid = data['uid'] if 'uid' in data else ''
_sr_uid = data['sr_uid'] if 'sr_uid' in data else ''
if _uid:
return await GsBind.delete_uid(user_id, self.bot_id, _uid)
elif _sr_uid:
return await GsBind.delete_uid(user_id, self.bot_id, _uid, 'sr')
else:
return -1
async def update_bind_data(self, user_id: str, data: Optional[Dict]):
async with self.async_session() as session:
async with session.begin():
sql = update(GsBind).where(
GsBind.user_id == user_id, GsBind.bot_id == self.bot_id
)
if data is not None:
query = sql.values(**data)
query.execution_options(synchronize_session='fetch')
await session.execute(query)
if data is not None:
await GsBind.update_data(user_id, self.bot_id, **data)
async def bind_exists(self, user_id: str) -> bool:
return bool(await self.select_bind_data(user_id))
return await GsBind.bind_exists(user_id, self.bot_id)
async def get_all_uid_list(self) -> List[str]:
async with self.async_session() as session:
async with session.begin():
sql = select(GsBind).where(GsBind.bot_id == self.bot_id)
result = await session.execute(sql)
data: List[GsBind] = result.scalars().all()
uid_list: List[str] = []
for item in data:
uid_list.extend(item.uid.split("_") if item.uid else [])
return uid_list
return await GsBind.get_all_uid_list_by_game(
self.bot_id, 'sr' if self.is_sr else None
)
async def get_bind_group_list(self, user_id: str) -> List[str]:
data = await self.select_bind_data(user_id)
return data.group_id.split("_") if data and data.group_id else []
return await GsBind.get_bind_group_list(user_id, self.bot_id)
async def get_bind_group(self, user_id: str) -> Optional[str]:
data = await self.get_bind_group_list(user_id)
return data[0] if data else None
return await GsBind.get_bind_group(user_id, self.bot_id)
async def get_group_all_uid(self, group_id: str):
async with self.async_session() as session:
async with session.begin():
result = await session.scalars(
select(GsBind).where(
col(GsBind.group_id).contains(group_id)
)
)
data = result.all()
return data[0] if data else None
return await GsBind.get_group_all_uid(group_id)
async def get_bind_uid_list(self, user_id: str) -> List[str]:
data = await self.select_bind_data(user_id)
return data.uid.split("_") if data and data.uid else []
async def get_bind_uid_list(self, user_id: str) -> Optional[List[str]]:
return await GsBind.get_uid_list_by_game(user_id, self.bot_id)
async def get_bind_uid(self, user_id: str) -> Optional[str]:
data = await self.get_bind_uid_list(user_id)
return data[0] if data else None
return await GsBind.get_uid_by_game(user_id, self.bot_id)
async def get_bind_sruid_list(self, user_id: str) -> List[str]:
data = await self.select_bind_data(user_id)
return data.sr_uid.split("_") if data and data.sr_uid else []
async def get_bind_sruid_list(self, user_id: str) -> Optional[List[str]]:
return await GsBind.get_uid_list_by_game(user_id, self.bot_id, 'sr')
async def get_bind_sruid(self, user_id: str) -> Optional[str]:
data = await self.get_bind_sruid_list(user_id)
return data[0] if data else None
return await GsBind.get_uid_by_game(user_id, self.bot_id)
async def switch_uid(
self, user_id: str, uid: Optional[str] = None
) -> Optional[List]:
uid_list = (
await self.get_bind_sruid_list(user_id)
if self.is_sr
else await self.get_bind_uid_list(user_id)
retcode = await GsBind.switch_uid_by_game(
user_id,
self.bot_id,
uid,
'sr' if self.is_sr else None,
)
id_type = 'sr_uid' if self.is_sr else 'uid'
if uid_list and len(uid_list) >= 1:
if uid and uid not in uid_list:
return None
elif uid:
pass
else:
uid = uid_list[1]
uid_list.remove(uid)
uid_list.insert(0, uid)
await self.update_bind_data(user_id, {id_type: '_'.join(uid_list)})
return uid_list
else:
return None
if retcode == 0:
return await GsBind.get_uid_list_by_game(
user_id,
self.bot_id,
'sr' if self.is_sr else None,
)
#####################
# GsUser、GsCache 部分 #
#####################
async def select_user_data(self, uid: str) -> Optional[GsUser]:
async with self.async_session() as session:
async with session.begin():
sql = (
select(GsUser).where(GsUser.sr_uid == uid)
if self.is_sr
else select(GsUser).where(GsUser.uid == uid)
)
result = await session.execute(sql)
return data[0] if (data := result.scalars().all()) else None
return await GsUser.select_data_by_uid(
uid, 'sr' if self.is_sr else None
)
async def select_user_all_data_by_user_id(
self, user_id: str
) -> Optional[List[GsUser]]:
async with self.async_session() as session:
async with session.begin():
sql = select(GsUser).where(GsUser.user_id == user_id)
result = await session.execute(sql)
data = result.scalars().all()
return data if data else None
return await GsUser.get_user_all_data_by_user_id(user_id)
async def select_user_data_by_user_id(
self, user_id: str
) -> Optional[GsUser]:
data = await self.select_user_all_data_by_user_id(user_id)
return data[0] if data else None
return await GsUser.select_data(user_id)
async def select_cache_cookie(self, uid: str) -> Optional[str]:
async with self.async_session() as session:
async with session.begin():
sql = (
select(GsCache).where(GsCache.sr_uid == uid)
if self.is_sr
else select(GsCache).where(GsCache.uid == uid)
)
result = await session.execute(sql)
data: List[GsCache] = result.scalars().all()
return data[0].cookie if len(data) >= 1 else None
return await GsCache.select_cache_cookie(
uid, 'sr' if self.is_sr else None
)
async def delete_error_cache(self) -> bool:
async with self.async_session() as session:
async with session.begin():
data = await self.get_all_error_cookie()
for cookie in data:
sql = delete(GsCache).where(GsCache.cookie == cookie)
await session.execute(sql)
return True
return await GsCache.delete_error_cache(GsUser)
async def get_user_fp(self, uid: str) -> Optional[str]:
data = await self.select_user_data(uid)
@ -285,14 +182,9 @@ class SQLA:
sr_uid: Optional[str] = None,
mys_id: Optional[str] = None,
) -> bool:
async with self.async_session() as session:
async with session.begin():
new_data = GsCache(
cookie=cookie, uid=uid, sr_uid=sr_uid, mys_id=mys_id
)
session.add(new_data)
await session.commit()
return True
return await GsCache.insert_cache_data(
cookie, uid=uid, sr_uid=sr_uid, mys_id=mys_id
)
async def insert_user_data(
self,
@ -304,122 +196,75 @@ class SQLA:
fp: Optional[str] = None,
device_id: Optional[str] = None,
) -> bool:
async with self.async_session() as session:
async with session.begin():
if uid and await self.user_exists(uid):
sql = (
update(GsUser)
.where(GsUser.uid == uid)
.values(
cookie=cookie,
status=None,
stoken=stoken,
bot_id=self.bot_id,
user_id=user_id,
sr_uid=sr_uid,
fp=fp,
)
)
await session.execute(sql)
elif sr_uid and await self.user_exists(sr_uid):
sql = (
update(GsUser)
.where(GsUser.sr_uid == sr_uid)
.values(
cookie=cookie,
status=None,
stoken=stoken,
bot_id=self.bot_id,
user_id=user_id,
uid=uid,
fp=fp,
)
)
await session.execute(sql)
else:
if cookie is None:
return False
account_id = re.search(r'account_id=(\d*)', cookie)
assert account_id is not None
account_id = str(account_id.group(1))
user_data = GsUser(
uid=uid,
sr_uid=sr_uid,
mys_id=account_id,
cookie=cookie,
stoken=stoken if stoken else None,
user_id=user_id,
bot_id=self.bot_id,
sign_switch='off',
push_switch='off',
bbs_switch='off',
draw_switch='off',
region=SERVER.get(uid[0], 'cn_gf01') if uid else None,
sr_region=SR_SERVER.get(sr_uid[0], None)
if sr_uid
else None,
fp=fp,
device_id=device_id,
sr_push_switch='off',
sr_sign_switch='off',
)
session.add(user_data)
await session.commit()
return True
async def update_user_data(self, uid: str, data: Optional[Dict]):
async with self.async_session() as session:
async with session.begin():
sql = (
update(GsUser).where(GsUser.sr_uid == uid)
if self.is_sr
else update(GsUser).where(GsUser.uid == uid)
)
if data is not None:
query = sql.values(**data)
query.execution_options(synchronize_session='fetch')
await session.execute(query)
await session.commit()
async def delete_user_data(self, uid: str):
async with self.async_session() as session:
async with session.begin():
if await self.user_exists(uid):
sql = (
delete(GsUser).where(GsUser.sr_uid == uid)
if self.is_sr
else delete(GsUser).where(GsUser.uid == uid)
)
await session.execute(sql)
await session.commit()
return True
if uid and await GsUser.user_exists(uid):
retcode = await GsUser.update_data_by_uid(
uid,
self.bot_id,
cookie=cookie,
status=None,
stoken=stoken,
sr_uid=sr_uid,
fp=fp,
)
elif sr_uid and await self.user_exists(sr_uid):
retcode = await GsUser.update_data_by_uid(
sr_uid,
self.bot_id,
'sr',
cookie=cookie,
status=None,
stoken=stoken,
sr_uid=sr_uid,
fp=fp,
)
else:
if cookie is None:
return False
account_id = re.search(r'account_id=(\d*)', cookie)
assert account_id is not None
account_id = str(account_id.group(1))
retcode = await GsUser.insert_data(
user_id=user_id,
bot_id=self.bot_id,
uid=uid,
sr_uid=sr_uid,
mys_id=account_id,
cookie=cookie,
stoken=stoken if stoken else None,
sign_switch='off',
push_switch='off',
bbs_switch='off',
draw_switch='off',
region=SERVER.get(uid[0], 'cn_gf01') if uid else None,
sr_region=SR_SERVER.get(sr_uid[0], None) if sr_uid else None,
fp=fp,
device_id=device_id,
sr_push_switch='off',
sr_sign_switch='off',
)
if retcode == 0:
return True
else:
return False
async def update_user_data(self, uid: str, data: Dict = {}):
return await GsUser.update_data_by_uid(
uid, self.bot_id, 'sr' if self.is_sr else None, **data
)
async def delete_user_data(self, uid: str):
if await GsUser.user_exists(uid):
return await GsUser.delete_user_data_by_uid(
uid, 'sr' if self.is_sr else None
)
async def delete_cache(self):
async with self.async_session() as session:
async with session.begin():
sql = (
update(GsUser)
.where(GsUser.status == 'limit30')
.values(status=None)
)
empty_sql = delete(GsCache)
await session.execute(sql)
await session.execute(empty_sql)
await session.commit()
return await GsCache.delete_all_cache(GsUser)
async def mark_invalid(self, cookie: str, mark: str):
async with self.async_session() as session:
async with session.begin():
sql = (
update(GsUser)
.where(GsUser.cookie == cookie)
.values(status=mark)
)
await session.execute(sql)
await session.commit()
await GsUser.mark_invalid(cookie, mark)
async def user_exists(self, uid: str) -> bool:
data = await self.select_user_data(uid)
@ -428,212 +273,108 @@ class SQLA:
async def update_user_stoken(
self, uid: str, stoken: Optional[str]
) -> bool:
async with self.async_session() as session:
async with session.begin():
if await self.user_exists(uid):
sql = (
(
update(GsUser)
.where(GsUser.sr_uid == uid)
.values(stoken=stoken)
)
if self.is_sr
else (
update(GsUser)
.where(GsUser.uid == uid)
.values(stoken=stoken)
)
)
await session.execute(sql)
await session.commit()
return True
return False
retcode = -1
if await GsUser.user_exists(uid):
retcode = await GsUser.update_data_by_uid(
uid, self.bot_id, 'sr' if self.is_sr else None, stoken=stoken
)
return bool(retcode)
async def update_user_cookie(
self, uid: str, cookie: Optional[str]
) -> bool:
async with self.async_session() as session:
async with session.begin():
if await self.user_exists(uid):
sql = (
(
update(GsUser)
.where(GsUser.sr_uid == uid)
.values(cookie=cookie)
)
if self.is_sr
else (
update(GsUser)
.where(GsUser.uid == uid)
.values(cookie=cookie)
)
)
await session.execute(sql)
await session.commit()
return True
return False
retcode = -1
if await GsUser.user_exists(uid):
retcode = await GsUser.update_data_by_uid(
uid, self.bot_id, 'sr' if self.is_sr else None, cookie=cookie
)
return bool(retcode)
async def update_switch_status(self, uid: str, data: Dict) -> bool:
async with self.async_session() as session:
async with session.begin():
if await self.user_exists(uid):
sql = (
(
update(GsUser)
.where(GsUser.sr_uid == uid)
.values(**data)
)
if self.is_sr
else (
update(GsUser)
.where(GsUser.uid == uid)
.values(**data)
)
)
await session.execute(sql)
await session.commit()
return True
return False
retcode = -1
if await GsUser.user_exists(uid):
retcode = await GsUser.update_data_by_uid(
uid, self.bot_id, 'sr' if self.is_sr else None, **data
)
return bool(retcode)
async def update_error_status(self, cookie: str, err: str) -> bool:
async with self.async_session() as session:
async with session.begin():
sql = (
update(GsUser)
.where(GsUser.cookie == cookie)
.values(status=err)
)
await session.execute(sql)
await session.commit()
return True
return await GsUser.mark_invalid(cookie, err)
async def get_user_cookie(self, uid: str) -> Optional[str]:
data = await self.select_user_data(uid)
return data.cookie if data else None
return await GsUser.get_user_cookie_by_uid(
uid, 'sr' if self.is_sr else None
)
async def get_user_cookie_by_user_id(self, user_id: str) -> Optional[str]:
data = await self.select_user_data_by_user_id(user_id)
return data.cookie if data else None
return await GsUser.get_user_cookie_by_user_id(user_id, self.bot_id)
async def cookie_validate(self, uid: str) -> bool:
data = await self.select_user_data(uid)
return True if data and data.status is None else False
return await GsUser.cookie_validate(uid, 'sr' if self.is_sr else None)
async def get_user_stoken_by_user_id(self, user_id: str) -> Optional[str]:
data = await self.select_user_data_by_user_id(user_id)
return data.stoken if data and data.stoken else None
return await GsUser.get_user_stoken_by_user_id(user_id, self.bot_id)
async def get_user_stoken(self, uid: str) -> Optional[str]:
data = await self.select_user_data(uid)
return data.stoken if data and data.stoken else None
return await GsUser.get_user_stoken_by_uid(
uid, 'sr' if self.is_sr else None
)
async def get_all_user(self) -> List[GsUser]:
async with self.async_session() as session:
async with session.begin():
sql = select(GsUser).where(
GsUser.cookie is not None, GsUser.cookie != ''
)
result = await session.execute(sql)
data: List[GsUser] = result.scalars().all()
return data
return await GsUser.get_all_user()
async def get_all_cookie(self) -> List[str]:
data = await self.get_all_user()
return [_u.cookie for _u in data if _u.cookie]
return await GsUser.get_all_cookie()
async def get_all_stoken(self) -> List[str]:
data = await self.get_all_user()
return [_u.stoken for _u in data if _u.stoken]
return await GsUser.get_all_stoken()
async def get_all_error_cookie(self) -> List[str]:
data = await self.get_all_user()
return [_u.cookie for _u in data if _u.cookie and _u.status]
return await GsUser.get_all_error_cookie()
async def get_all_push_user_list(self) -> List[GsUser]:
data = await self.get_all_user()
return [user for user in data if user.push_switch != 'off']
return await GsUser.get_all_push_user_list()
async def get_random_cookie(self, uid: str) -> Optional[str]:
async with self.async_session() as session:
async with session.begin():
# 有绑定自己CK 并且该CK有效的前提下优先使用自己CK
if await self.user_exists(uid) and await self.cookie_validate(
uid
):
return await self.get_user_cookie(uid)
# 自动刷新缓存
await self.delete_error_cache()
# 获得缓存库Ck
cache_data = await self.select_cache_cookie(uid)
if cache_data is not None:
return cache_data
# 随机取CK
server = SERVER.get(uid[0], 'cn_gf01')
sql = (
select(GsUser)
.where(GsUser.region == server)
.order_by(func.random())
)
data = await session.execute(sql)
user_list: List[GsUser] = data.scalars().all()
for user in user_list:
if not user.status and user.cookie:
# 进入缓存
if self.is_sr:
await self.insert_cache_data(
user.cookie, sr_uid=uid
)
else:
await self.insert_cache_data(user.cookie, uid)
return user.cookie
continue
else:
return None
server = SERVER.get(uid[0], 'cn_gf01')
return await GsUser.get_random_cookie(
uid, GsCache, {'region': server}, 'sr' if self.is_sr else None
)
async def get_switch_status_list(
self, switch: Literal['push', 'sign', 'bbs', 'sr_push', 'sr_sign']
) -> List[GsUser]:
async with self.async_session() as session:
async with session.begin():
_switch = getattr(GsUser, switch, GsUser.push_switch)
sql = select(GsUser).filter(_switch != 'off')
data = await session.execute(sql)
data_list: List[GsUser] = data.scalars().all()
return [user for user in data_list]
return await GsUser.get_switch_open_list(switch)
#####################
# GsPush 部分 #
#####################
async def insert_push_data(self, uid: str):
async with self.async_session() as session:
async with session.begin():
push_data = GsPush(
bot_id=self.bot_id,
uid=uid,
coin_push='off',
coin_value=2100,
coin_is_push='off',
resin_push='on',
resin_value=140,
resin_is_push='off',
go_push='off',
go_value=120,
go_is_push='off',
transform_push='off',
transform_value=140,
transform_is_push='off',
)
session.add(push_data)
await session.commit()
await GsPush.full_insert_data(
GsPush,
bot_id=self.bot_id,
uid=uid,
coin_push='off',
coin_value=2100,
coin_is_push='off',
resin_push='on',
resin_value=140,
resin_is_push='off',
go_push='off',
go_value=120,
go_is_push='off',
transform_push='off',
transform_value=140,
transform_is_push='off',
)
async def update_push_data(self, uid: str, data: dict) -> bool:
async with self.async_session() as session:
async with session.begin():
await self.push_exists(uid)
sql = update(GsPush).where(GsPush.uid == uid).values(**data)
await session.execute(sql)
await session.commit()
return True
retcode = -1
if await GsPush.data_exist(GsPush, uid=uid):
retcode = await GsPush.update_data_by_uid(
uid, self.bot_id, 'sr' if self.is_sr else None, **data
)
return not bool(retcode)
async def change_push_status(
self,
@ -644,54 +385,22 @@ class SQLA:
await self.update_push_data(uid, {f'{mode}_is_push': status})
async def select_push_data(self, uid: str) -> Optional[GsPush]:
async with self.async_session() as session:
async with session.begin():
await self.push_exists(uid)
sql = select(GsPush).where(GsPush.uid == uid)
result = await session.execute(sql)
data = result.scalars().all()
return data[0] if len(data) >= 1 else None
return await GsPush.base_select_data(GsPush, uid=uid)
async def push_exists(self, uid: str) -> bool:
async with self.async_session() as session:
async with session.begin():
sql = select(GsPush).where(GsPush.uid == uid)
result = await session.execute(sql)
data = result.scalars().all()
if not data:
await self.insert_push_data(uid)
return True
return await GsPush.data_exist(GsPush, uid=uid)
#####################
# 杂项部分 #
#####################
async def refresh_cache(self, uid: str):
async with self.async_session() as session:
async with session.begin():
sql = (
delete(GsCache).where(GsCache.sr_uid == uid)
if self.is_sr
else delete(GsCache).where(GsCache.uid == uid)
)
await session.execute(sql)
return True
await GsCache.refresh_cache(uid, 'sr' if self.is_sr else None)
async def close(self):
async with self.async_session() as session:
async with async_maker() as session:
async with session.begin():
await session.close()
async def insert_new_bind(self, **kwargs):
async with self.async_session() as session:
async with session.begin():
new_data = GsBind(**kwargs)
session.add(new_data)
await session.commit()
async def insert_new_user(self, **kwargs):
async with self.async_session() as session:
async with session.begin():
new_data = GsUser(**kwargs)
session.add(new_data)
await session.commit()
await GsBind.full_insert_data(GsBind, **kwargs)

View File

@ -1,54 +1,43 @@
from typing import Optional
from sqlmodel import Field, SQLModel
from sqlmodel import Field
from .base_models import Bind, Push, User, Cache
class GsBind(SQLModel, table=True):
__table_args__ = {'keep_existing': True}
id: Optional[int] = Field(default=None, primary_key=True, title='序号')
bot_id: str = Field(title='平台')
user_id: str = Field(title='账号')
group_id: Optional[str] = Field(title='群号')
class GsBind(Bind, table=True):
__table_args__ = {'extend_existing': True}
uid: Optional[str] = Field(default=None, title='原神UID')
sr_uid: Optional[str] = Field(default=None, title='星铁UID')
mys_id: Optional[str] = Field(default=None, title='米游社通行证')
class GsUser(SQLModel, table=True):
__table_args__ = {'keep_existing': True}
id: Optional[int] = Field(default=None, primary_key=True, title='序号')
bot_id: str = Field(title='平台')
class GsUser(User, table=True):
__table_args__ = {'extend_existing': True}
uid: Optional[str] = Field(default=None, title='原神UID')
sr_uid: Optional[str] = Field(default=None, title='星铁UID')
mys_id: Optional[str] = Field(default=None, title='米游社通行证')
region: Optional[str] = Field(default=None, title='原神地区')
sr_region: Optional[str] = Field(default=None, title='星铁地区')
cookie: Optional[str] = Field(default=None, title='Cookie')
stoken: Optional[str] = Field(default=None, title='Stoken')
user_id: str = Field(title='账号')
push_switch: str = Field(default='off', title='全局推送开关')
sign_switch: str = Field(default='off', title='自动签到')
bbs_switch: str = Field(default='off', title='自动米游币')
draw_switch: str = Field(default='off', title='自动留影叙佳期')
sr_push_switch: str = Field(default='off', title='星铁全局推送开关')
sr_sign_switch: str = Field(default='off', title='星铁自动签到')
status: Optional[str] = Field(default=None, title='状态')
fp: Optional[str] = Field(default=None, title='Fingerprint')
device_id: Optional[str] = Field(default=None, title='设备ID')
class GsCache(SQLModel, table=True):
__table_args__ = {'keep_existing': True}
id: Optional[int] = Field(default=None, primary_key=True, title='序号')
class GsCache(Cache, table=True):
__table_args__ = {'extend_existing': True}
cookie: str = Field(default=None, title='Cookie')
uid: Optional[str] = Field(default=None, title='原神UID')
sr_uid: Optional[str] = Field(default=None, title='星铁UID')
mys_id: Optional[str] = Field(default=None, title='米游社通行证')
class GsPush(SQLModel, table=True):
__table_args__ = {'keep_existing': True}
id: Optional[int] = Field(default=None, primary_key=True, title='序号')
class GsPush(Push, table=True):
__table_args__ = {'extend_existing': True}
bot_id: str = Field(title='平台')
uid: str = Field(default=None, title='原神UID')
coin_push: Optional[str] = Field(title='洞天宝钱推送', default='off')

View File

@ -0,0 +1,9 @@
from typing import Any, Dict
from gsuid_core.bot import Bot
async def send_diff_msg(bot: Bot, code: Any, data: Dict):
for retcode in data:
if code == retcode:
return await bot.send(data[retcode])

View File

@ -37,8 +37,8 @@ from fastapi_amis_admin.amis.components import (
)
from gsuid_core.logger import logger
from gsuid_core.utils.database.api import db_url
from gsuid_core.webconsole.models import WebUser
from gsuid_core.utils.database.base_models import db_url
from gsuid_core.utils.cookie_manager.add_ck import _deal_ck
from gsuid_core.webconsole.html import gsuid_webconsole_help
from gsuid_core.webconsole.create_sv_panel import get_sv_page