💡 完善部分方法文档, 提供gscore.mustache

This commit is contained in:
KimigaiiWuyi 2023-10-30 03:19:03 +08:00
parent 6232267fa7
commit c541543035
4 changed files with 626 additions and 17 deletions

45
gscore.mustache Normal file
View File

@ -0,0 +1,45 @@
{{! Google Docstring Template }}
📝简单介绍:
{{summaryPlaceholder}}
{{extendedSummaryPlaceholder}}
{{#parametersExist}}
🌱参数:
{{#args}}
🔹{{var}} (`{{typePlaceholder}}`):
{{descriptionPlaceholder}}
{{/args}}
{{#kwargs}}
🔹{{var}} (`{{typePlaceholder}}`, 默认是 `{{&default}}`):
{{descriptionPlaceholder}}
{{/kwargs}}
{{/parametersExist}}
🚀使用范例:
`{{descriptionPlaceholder}}`
{{#exceptionsExist}}
💩异常:
{{#exceptions}}
❗`{{type}}`: {{descriptionPlaceholder}}
{{/exceptions}}
{{/exceptionsExist}}
{{#returnsExist}}
✅返回值:
{{#returns}}
🔸`{{typePlaceholder}}`: {{descriptionPlaceholder}}
{{/returns}}
{{/returnsExist}}
{{#yieldsExist}}
Yields:
{{#yields}}
{{typePlaceholder}}: {{descriptionPlaceholder}}
{{/yields}}
{{/yieldsExist}}

View File

@ -25,7 +25,11 @@ class _MysApi(MysApi):
async def get_user_fp(self, uid: str) -> Optional[str]: async def get_user_fp(self, uid: str) -> Optional[str]:
data = await GsUser.get_user_attr_by_uid(uid, 'fp') data = await GsUser.get_user_attr_by_uid(uid, 'fp')
if data is None: if data is None:
data = await self.generate_fp_by_uid(uid) seed_id, seed_time = self.get_seed()
model_name = self.generate_model_name()
data = await self.generate_fp_by_uid(
uid, seed_id, seed_time, model_name
)
await GsUser.update_data_by_uid_without_bot_id(uid, fp=data) await GsUser.update_data_by_uid_without_bot_id(uid, fp=data)
return data return data

View File

@ -48,7 +48,24 @@ class BaseIDModel(SQLModel):
id: Optional[int] = Field(default=None, primary_key=True, title='序号') id: Optional[int] = Field(default=None, primary_key=True, title='序号')
@classmethod @classmethod
def get_gameid_name(cls, game_name: Optional[str] = None): def get_gameid_name(cls, game_name: Optional[str] = None) -> str:
'''📝简单介绍:
快速获取uid的列名
🌱参数:
🔹game_name (`Optional[str]`, 默认是 `None`):
假设传入`None`会返回`uid`而传入`sr`会返回`sr_uid`
🚀使用范例:
`await GsUser.get_gameid_name('sr')`
返回值:
🔸`str`: 游戏uid对应列名默认为`uid`
'''
if game_name: if game_name:
return f'{game_name}_uid' return f'{game_name}_uid'
else: else:
@ -57,6 +74,23 @@ class BaseIDModel(SQLModel):
@classmethod @classmethod
@with_session @with_session
async def full_insert_data(cls, session: AsyncSession, **data) -> int: async def full_insert_data(cls, session: AsyncSession, **data) -> int:
'''📝简单介绍:
数据库基类基础插入数据方法
🌱参数:
🔹`**data`
插入的数据, 入参列名等于数据即可
🚀使用范例:
`await GsUser.full_insert_data(uid='123',cookie='233', ...)`
返回值:
🔸`int`: 恒为0
'''
session.add(cls(**data)) session.add(cls(**data))
await session.commit() await session.commit()
return 0 return 0
@ -66,6 +100,23 @@ class BaseIDModel(SQLModel):
async def base_select_data( async def base_select_data(
cls: Type[T_BaseIDModel], session: AsyncSession, **data cls: Type[T_BaseIDModel], session: AsyncSession, **data
) -> Optional[T_BaseIDModel]: ) -> Optional[T_BaseIDModel]:
'''📝简单介绍:
数据库基类基础选择数据方法
🌱参数:
🔹`**data`
插入的数据, 入参列名等于数据即可
🚀使用范例:
`await GsUser.base_select_data(uid='100740568')`
返回值:
🔸`Optional[T_BaseIDModel]`: 选中符合条件的第一个数据或者为`None`
'''
stmt = select(cls) stmt = select(cls)
for k, v in data.items(): for k, v in data.items():
stmt = stmt.where(getattr(cls, k) == v) stmt = stmt.where(getattr(cls, k) == v)
@ -75,6 +126,19 @@ class BaseIDModel(SQLModel):
@classmethod @classmethod
async def data_exist(cls, **data) -> bool: async def data_exist(cls, **data) -> bool:
'''📝简单介绍:
数据库基类基础判定数据是否存在的方法
🚀使用范例:
`await GsUser.data_exist(uid='100740568')`
返回值:
🔸`bool`: 存在为`True`
'''
return bool(await cls.base_select_data(**data)) return bool(await cls.base_select_data(**data))
@ -90,6 +154,26 @@ class BaseBotIDModel(BaseIDModel):
game_name: Optional[str] = None, game_name: Optional[str] = None,
**data, **data,
) -> int: ) -> int:
'''📝简单介绍:
基类方法通过传入uid查找并更新数据无需bot_id
🌱参数:
🔹uid (`str`):
根据该入参寻找相应数据
🔹game_name (`Optional[str]`, 默认是 `None`):
根据该入参寻找相应列名
🚀使用范例:
`await GsUser.update_data_by_uid_without_bot_id(uid, cookie='2')`
返回值:
🔸`int`: 成功为`0`, 失败为`-1`
'''
sql = update(cls).where( sql = update(cls).where(
getattr(cls, cls.get_gameid_name(game_name)) == uid, getattr(cls, cls.get_gameid_name(game_name)) == uid,
) )
@ -111,6 +195,32 @@ class BaseBotIDModel(BaseIDModel):
game_name: Optional[str] = None, game_name: Optional[str] = None,
**data, **data,
) -> int: ) -> int:
'''📝简单介绍:
基类方法通过传入`uid``bot_id`查找并更新数据
🌱参数:
🔹uid (`str`)
根据该入参寻找相应数据
🔹bot_id (`str`)
根据该入参寻找相应数据
🔹game_name (`Optional[str]`, 默认是 `None`)
根据该入参修改寻找列名
🔹**data
根据该入参修改数据
🚀使用范例:
`await GsUser.update_data_by_uid(uid, 'onebot', cookie='2')`
返回值:
🔸`int`: 成功为`0`, 失败为`-1`
'''
uid_name = cls.get_gameid_name(game_name) uid_name = cls.get_gameid_name(game_name)
if not await cls.data_exist(**{uid_name: uid}): if not await cls.data_exist(**{uid_name: uid}):
data[uid_name] = uid data[uid_name] = uid
@ -144,6 +254,26 @@ class BaseModel(BaseBotIDModel):
user_id: str, user_id: str,
bot_id: Optional[str] = None, bot_id: Optional[str] = None,
) -> Optional[T_BaseModel]: ) -> Optional[T_BaseModel]:
'''📝简单介绍:
基类的数据选择方法
🌱参数:
🔹user_id (`str`):
传入的用户id, 例如QQ号, 一般直接取`event.user_id`
🔹bot_id (`Optional[str]`, 默认是 `None`):
传入的bot_id, 例如`onebot`, 一般直接取`event.bot_id`
🚀使用范例:
`await GsUser.select_data(user_id='444888', bot_id='onebot')`
返回值:
🔸`Optional[T_BaseModel]`: 选中符合条件的第一个数据不存在则为`None`
'''
if bot_id is None: if bot_id is None:
sql = select(cls).where(cls.user_id == user_id) sql = select(cls).where(cls.user_id == user_id)
else: else:
@ -159,6 +289,29 @@ class BaseModel(BaseBotIDModel):
async def insert_data( async def insert_data(
cls, session: AsyncSession, user_id: str, bot_id: str, **data cls, session: AsyncSession, user_id: str, bot_id: str, **data
) -> int: ) -> int:
'''📝简单介绍:
基类的数据插入方法
🌱参数:
🔹user_id (`str`):
传入的用户id, 例如QQ号, 一般直接取`event.user_id`
🔹bot_id (`str`):
传入的bot_id, 例如`onebot`, 一般直接取`event.bot_id`
🔹`**data`:
要插入的数据
🚀使用范例:
`await GsUser.insert_data(user_id='4', bot_id='onebot', uid='22')`
返回值:
🔸`int`: 恒为0
'''
if await cls.data_exist(user_id=user_id, bot_id=bot_id): if await cls.data_exist(user_id=user_id, bot_id=bot_id):
await cls.update_data(user_id, bot_id, **data) await cls.update_data(user_id, bot_id, **data)
else: else:
@ -171,6 +324,26 @@ class BaseModel(BaseBotIDModel):
async def delete_data( async def delete_data(
cls, session: AsyncSession, user_id: str, bot_id: str, **data cls, session: AsyncSession, user_id: str, bot_id: str, **data
) -> int: ) -> int:
'''📝简单介绍:
基类的数据删除方法
🌱参数:
🔹user_id (`str`):
传入的用户id, 例如QQ号, 一般直接取`event.user_id`
🔹bot_id (`str`):
传入的bot_id, 例如`onebot`, 一般直接取`event.bot_id`
🚀使用范例:
`await GsUser.delete_data(user_id='4', bot_id='onebot', uid='22')`
返回值:
🔸`int`: 恒为0
'''
await session.delete(cls(user_id=user_id, bot_id=bot_id, **data)) await session.delete(cls(user_id=user_id, bot_id=bot_id, **data))
await session.commit() await session.commit()
return 0 return 0
@ -180,6 +353,29 @@ class BaseModel(BaseBotIDModel):
async def update_data( async def update_data(
cls, session: AsyncSession, user_id: str, bot_id: str, **data cls, session: AsyncSession, user_id: str, bot_id: str, **data
) -> int: ) -> int:
'''📝简单介绍:
基类的数据更新方法
🌱参数:
🔹user_id (`str`):
传入的用户id, 例如QQ号, 一般直接取`event.user_id`
🔹bot_id (`str`):
传入的bot_id, 例如`onebot`, 一般直接取`event.bot_id`
🔹`**data`:
要更新的数据
🚀使用范例:
`await GsUser.update_data(user_id='4', bot_id='onebot', uid='22')`
返回值:
🔸`int`: 成功为0, 失败为-1未找到数据则无法更新
'''
sql = update(cls).where(cls.user_id == user_id, cls.bot_id == bot_id) sql = update(cls).where(cls.user_id == user_id, cls.bot_id == bot_id)
if data is not None: if data is not None:
query = sql.values(**data) query = sql.values(**data)
@ -203,6 +399,29 @@ class Bind(BaseModel):
bot_id: str, bot_id: str,
game_name: Optional[str] = None, game_name: Optional[str] = None,
) -> Optional[List[str]]: ) -> Optional[List[str]]:
'''📝简单介绍:
基础`Bind`类的扩展方法, 根据传入的`bot_id``user_id`拿到绑定的uid列表
🌱参数:
🔹user_id (`str`):
传入的用户id, 例如QQ号, 一般直接取`event.user_id`
🔹bot_id (`str`):
传入的bot_id, 例如`onebot`, 一般直接取`event.bot_id`
🔹game_name (`Optional[str]`, 默认是 `None`):
根据该入参寻找相应列名
🚀使用范例:
`await GsUser.get_uid_list_by_game(user_id='4', bot_id='onebot')`
返回值:
🔸`Optional[List[str]]`: 如果有数据则为uid的列表无则为`None`
'''
result = await cls.select_data(user_id, bot_id) result = await cls.select_data(user_id, bot_id)
if result is None: if result is None:
return None return None
@ -225,6 +444,29 @@ class Bind(BaseModel):
bot_id: str, bot_id: str,
game_name: Optional[str] = None, game_name: Optional[str] = None,
) -> Optional[str]: ) -> Optional[str]:
'''📝简单介绍:
基础`Bind`类的扩展方法, 根据传入的`bot_id``user_id`拿到单个绑定的uid
🌱参数:
🔹user_id (`str`):
传入的用户id, 例如QQ号, 一般直接取`event.user_id`
🔹bot_id (`str`):
传入的bot_id, 例如`onebot`, 一般直接取`event.bot_id`
🔹game_name (`Optional[str]`, 默认是 `None`):
根据该入参寻找相应列名
🚀使用范例:
`await GsUser.get_uid_by_game(user_id='4', bot_id='onebot')`
返回值:
🔸`Optional[str]`: 如果有绑定数据则返回当前绑定uid, 没有则为`None`
'''
result = await cls.get_uid_list_by_game(user_id, bot_id, game_name) result = await cls.get_uid_list_by_game(user_id, bot_id, game_name)
if result is None or not result: if result is None or not result:
return None return None
@ -252,16 +494,46 @@ class Bind(BaseModel):
is_digit: Optional[bool] = True, is_digit: Optional[bool] = True,
game_name: Optional[str] = None, game_name: Optional[str] = None,
) -> int: ) -> int:
''' '''📝简单介绍:
为数据库增加绑定UID
如果有传`lenth_limit`, 当uid位数不等或uid位数为0的时候, 返回`-1` 基础`Bind`类的扩展方法, 为给定的`user_id``bot_id`插入一条uid绑定数据
如果该UID已绑定, 则返回`-2` 可支持多uid的绑定, 如果绑定多个uid, 则数据库中uid列将会用`_`分割符相连接
`is_digit`默认为`True`, 进行合法性校验, 如果不是全数字, 返回`-3` 可以使用`cls.get_uid_list_by_game()`方法获取相应多绑定uid列表
成功绑定, 则返回`0` 或者使用`cls.get_uid_by_game()`方法获得当前绑定uid单个
🌱参数:
🔹user_id (`str`):
传入的用户id, 例如QQ号, 一般直接取`event.user_id`
🔹bot_id (`str`):
传入的bot_id, 例如`onebot`, 一般直接取`event.bot_id`
🔹uid (`str`):
将要插入的uid数据
🔹group_id (`Optional[str]`, 默认是 `None`):
将要插入的群组数据为绑定uid提供群组绑定
🔹lenth_limit (`Optional[int]`, 默认是 `None`):
如果有传该参数, 当uid位数不等于该参数或uid位数为0的时候, 返回`-1`
🔹is_digit (`Optional[bool]`, 默认是 `True`):
如果有传该参数, 当uid不为全数字的时候, 返回`-3`
🔹game_name (`Optional[str]`, 默认是 `None`):
根据该入参寻找相应列名
🚀使用范例:
`await GsBind.insert_uid(qid, ev.bot_id, uid, ev.group_id, 9)`
返回值:
🔸`int`: 如果该UID已绑定, 则返回`-2`, 成功则为`0`, 合法校验失败为`-3``-1`
''' '''
result = await cls.get_uid_list_by_game(user_id, bot_id, game_name) result = await cls.get_uid_list_by_game(user_id, bot_id, game_name)
@ -302,7 +574,35 @@ class Bind(BaseModel):
bot_id: str, bot_id: str,
uid: str, uid: str,
game_name: Optional[str] = None, game_name: Optional[str] = None,
): ) -> int:
'''📝简单介绍:
基础`Bind`类的扩展方法, 根据给定的`user_id``bot_id``uid`删除一个uid
该方法不会删除行如果只有一个uid会置空如果同时绑定多个uid只会删除其中一个
🌱参数:
🔹user_id (`str`):
传入的用户id, 例如QQ号, 一般直接取`event.user_id`
🔹bot_id (`str`):
传入的bot_id, 例如`onebot`, 一般直接取`event.bot_id`
🔹uid (`str`):
将要删除的uid数据
🔹game_name (`Optional[str]`, 默认是 `None`):
根据该入参寻找相应列名
🚀使用范例:
`await GsBind.delete_uid(qid, ev.bot_id, uid)`
返回值:
🔸`int`: 失败为`-1`, 成功为`0`
'''
result = await cls.get_uid_list_by_game(user_id, bot_id, game_name) result = await cls.get_uid_list_by_game(user_id, bot_id, game_name)
if result is None: if result is None:
return -1 return -1
@ -327,6 +627,26 @@ class Bind(BaseModel):
bot_id: str, bot_id: str,
game_name: Optional[str] = None, game_name: Optional[str] = None,
) -> List[str]: ) -> List[str]:
'''📝简单介绍:
基础`Bind`类的扩展方法, 根据给定的`bot_id`获取全部user绑定的uid列表
🌱参数:
🔹bot_id (`str`):
传入的bot_id, 例如`onebot`, 一般直接取`event.bot_id`
🔹game_name (`Optional[str]`, 默认是 `None`):
根据该入参寻找相应列名
🚀使用范例:
`await GsBind.get_all_uid_list_by_game(ev.bot_id)`
返回值:
🔸`List[str]`: 一个uid的列表, 如果没有任何用户的绑定信息将返回`[]`
'''
sql = select(cls).where(cls.bot_id == bot_id) sql = select(cls).where(cls.bot_id == bot_id)
result = await session.execute(sql) result = await session.execute(sql)
data: List["Bind"] = result.scalars().all() data: List["Bind"] = result.scalars().all()
@ -346,18 +666,41 @@ class Bind(BaseModel):
uid: Optional[str] = None, uid: Optional[str] = None,
game_name: Optional[str] = None, game_name: Optional[str] = None,
) -> int: ) -> int:
''' '''📝简单介绍:
切换用户UID, 成功返回0
可传确定的UID 基础`Bind`类的扩展方法, 根据给定的`bot_id``user_id`定位数据并切换当前uid
如果不传UID,则自动切换序列下一个UID 如果不传uid参数则默认切换下个uid
如果不存在绑定记录,则返回-1 🌱参数:
如果传了UID但是不存在绑定列表,则返回-2 🔹user_id (`str`):
传入的用户id, 例如QQ号, 一般直接取`event.user_id`
如果绑定UID列表不足2个,返回-3 🔹bot_id (`str`):
传入的bot_id, 例如`onebot`, 一般直接取`event.bot_id`
🔹uid (`Optional[str]`, 默认是 `None`):
将要切换的uid数据, 可以不传
🔹game_name (`Optional[str]`, 默认是 `None`):
根据该入参寻找相应列名
🚀使用范例:
`await GsBind.switch_uid_by_game(qid, ev.bot_id, uid)`
返回值:
🔸`int`:
成功返回`0`
如果不存在绑定记录,则返回`-1`
如果传了UID但是不存在绑定列表,则返回`-2`
如果绑定UID列表不足2个,返回`-3`
''' '''
uid_list = await cls.get_uid_list_by_game(user_id, bot_id, game_name) uid_list = await cls.get_uid_list_by_game(user_id, bot_id, game_name)
if not uid_list: if not uid_list:
@ -385,17 +728,20 @@ class Bind(BaseModel):
@classmethod @classmethod
async def get_bind_group_list(cls, user_id: str, bot_id: str) -> List[str]: async def get_bind_group_list(cls, user_id: str, bot_id: str) -> List[str]:
'''获取传入`user_id`和`bot_id`对应的绑定群列表'''
data: Optional["Bind"] = await cls.select_data(user_id, bot_id) data: Optional["Bind"] = await cls.select_data(user_id, bot_id)
return data.group_id.split("_") if data and data.group_id else [] return data.group_id.split("_") if data and data.group_id else []
@classmethod @classmethod
async def get_bind_group(cls, user_id: str, bot_id: str) -> Optional[str]: async def get_bind_group(cls, user_id: str, bot_id: str) -> Optional[str]:
'''获取传入`user_id`和`bot_id`对应的绑定群(如多个则返回第一个)'''
data = await cls.get_bind_group_list(user_id, bot_id) data = await cls.get_bind_group_list(user_id, bot_id)
return data[0] if data else None return data[0] if data else None
@classmethod @classmethod
@with_session @with_session
async def get_group_all_uid(cls, session: AsyncSession, group_id: str): async def get_group_all_uid(cls, session: AsyncSession, group_id: str):
'''根据传入`group_id`获取该群号下所有绑定`uid`列表'''
result = await session.scalars( result = await session.scalars(
select(cls).where(col(cls.group_id).contains(group_id)) select(cls).where(col(cls.group_id).contains(group_id))
) )
@ -418,6 +764,23 @@ class User(BaseModel):
uid: str, uid: str,
game_name: Optional[str] = None, game_name: Optional[str] = None,
) -> Optional[T_User]: ) -> Optional[T_User]:
'''📝简单介绍:
基础`User`类的数据选择方法
🌱参数:
🔹uid (`str`):
传入的用户uid, 一般是该游戏的用户唯一识别id
🚀使用范例:
`await GsUser.select_data_by_uid(uid='100740568')`
返回值:
🔸`Optional[T_BaseModel]`: 选中符合条件的第一个数据不存在则为`None`
'''
result = await session.execute( result = await session.execute(
select(cls).where( select(cls).where(
getattr(cls, cls.get_gameid_name(game_name)) == uid, getattr(cls, cls.get_gameid_name(game_name)) == uid,
@ -431,6 +794,23 @@ class User(BaseModel):
async def get_user_all_data_by_user_id( async def get_user_all_data_by_user_id(
cls: Type[T_User], session: AsyncSession, user_id: str cls: Type[T_User], session: AsyncSession, user_id: str
) -> Optional[List[T_User]]: ) -> Optional[List[T_User]]:
'''📝简单介绍:
基础`User`类的数据选择方法, 获取该`user_id`绑定的全部数据实例
🌱参数:
🔹user_id (`str`):
传入的用户id, 例如QQ号, 一般直接取`event.user_id`
🚀使用范例:
`await GsUser.get_user_all_data_by_user_id(user_id='2333')`
返回值:
🔸`Optional[T_BaseModel]`: 选中符合条件的数据列表不存在则为`None`
'''
result = await session.execute( result = await session.execute(
select(cls).where(cls.user_id == user_id) select(cls).where(cls.user_id == user_id)
) )
@ -444,6 +824,29 @@ class User(BaseModel):
bot_id: str, bot_id: str,
attr: str, attr: str,
) -> Optional[Any]: ) -> Optional[Any]:
'''📝简单介绍:
根据传入的`user_id``bot_id`选择数据实例然后返回数据的某个属性的值
🌱参数:
🔹user_id (`str`):
传入的用户id, 例如QQ号, 一般直接取`event.user_id`
🔹bot_id (`str`):
传入的bot_id, 例如`onebot`, 一般直接取`event.bot_id`
🔹attr (`str`):
想要获取的该数据的属性
🚀使用范例:
`await cls.get_user_attr(user_id, bot_id, 'cookie')`
返回值:
🔸`Optional[Any]`: 可能是任何值如果没获取到数据则为`None`
'''
result = await cls.select_data(user_id, bot_id) result = await cls.select_data(user_id, bot_id)
return getattr(result, attr) if result else None return getattr(result, attr) if result else None
@ -454,6 +857,10 @@ class User(BaseModel):
attr: str, attr: str,
game_name: Optional[str] = None, game_name: Optional[str] = None,
) -> Optional[Any]: ) -> Optional[Any]:
'''根据传入的`uid`选择数据实例,然后返回数据的`attr`属性的值
如果没获取到数据则为`None`
'''
result = await cls.select_data_by_uid(uid, game_name) result = await cls.select_data_by_uid(uid, game_name)
return getattr(result, attr) if result else None return getattr(result, attr) if result else None
@ -463,12 +870,20 @@ class User(BaseModel):
user_id: str, user_id: str,
attr: str, attr: str,
) -> Optional[Any]: ) -> Optional[Any]:
'''根据传入的`user_id`选择数据实例,然后返回数据的`attr`属性的值
如果没获取到数据则为`None`
'''
result = await cls.select_data(user_id) result = await cls.select_data(user_id)
return getattr(result, attr) if result else None return getattr(result, attr) if result else None
@classmethod @classmethod
@with_session @with_session
async def mark_invalid(cls, session: AsyncSession, cookie: str, mark: str): async def mark_invalid(cls, session: AsyncSession, cookie: str, mark: str):
'''令一个cookie所对应数据的`status`值为传入的mark
例如mark值可以是`error`, 标记该Cookie已失效
'''
sql = update(cls).where(cls.cookie == cookie).values(status=mark) sql = update(cls).where(cls.cookie == cookie).values(status=mark)
await session.execute(sql) await session.execute(sql)
await session.commit() await session.commit()
@ -478,30 +893,50 @@ class User(BaseModel):
async def get_user_cookie_by_uid( async def get_user_cookie_by_uid(
cls, uid: str, game_name: Optional[str] = None cls, uid: str, game_name: Optional[str] = None
) -> Optional[str]: ) -> Optional[str]:
'''根据传入的`uid`选择数据实例,然后返回该数据的`cookie`值
如果没获取到数据则为`None`
'''
return await cls.get_user_attr_by_uid(uid, 'cookie', game_name) return await cls.get_user_attr_by_uid(uid, 'cookie', game_name)
@classmethod @classmethod
async def get_user_cookie_by_user_id( async def get_user_cookie_by_user_id(
cls, user_id: str, bot_id: str cls, user_id: str, bot_id: str
) -> Optional[str]: ) -> Optional[str]:
'''根据传入的`user_id`选择数据实例,然后返回该数据的`cookie`值
如果没获取到数据则为`None`
'''
return await cls.get_user_attr(user_id, bot_id, 'cookie') return await cls.get_user_attr(user_id, bot_id, 'cookie')
@classmethod @classmethod
async def get_user_stoken_by_uid( async def get_user_stoken_by_uid(
cls, uid: str, game_name: Optional[str] = None cls, uid: str, game_name: Optional[str] = None
) -> Optional[str]: ) -> Optional[str]:
'''根据传入的`uid`选择数据实例,然后返回该数据的`stoken`值
如果没获取到数据则为`None`
'''
return await cls.get_user_attr_by_uid(uid, 'stoken', game_name) return await cls.get_user_attr_by_uid(uid, 'stoken', game_name)
@classmethod @classmethod
async def get_user_stoken_by_user_id( async def get_user_stoken_by_user_id(
cls, user_id: str, bot_id: str cls, user_id: str, bot_id: str
) -> Optional[str]: ) -> Optional[str]:
'''根据传入的`user_id`选择数据实例,然后返回该数据的`stoken`值
如果没获取到数据则为`None`
'''
return await cls.get_user_attr(user_id, bot_id, 'stoken') return await cls.get_user_attr(user_id, bot_id, 'stoken')
@classmethod @classmethod
async def cookie_validate( async def cookie_validate(
cls, uid: str, game_name: Optional[str] = None cls, uid: str, game_name: Optional[str] = None
) -> bool: ) -> bool:
'''根据传入的`uid`选择数据实例, 校验数据中的`cookie`是否有效
方法是判断数据中的`status`是否为空值, 如果没有异常标记, 则为`True`
'''
data = await cls.get_user_attr_by_uid(uid, 'status', game_name) data = await cls.get_user_attr_by_uid(uid, 'status', game_name)
if not data: if not data:
return True return True
@ -513,6 +948,29 @@ class User(BaseModel):
async def get_switch_open_list( async def get_switch_open_list(
cls: Type[T_User], session: AsyncSession, switch_name: str cls: Type[T_User], session: AsyncSession, switch_name: str
) -> List[T_User]: ) -> List[T_User]:
'''📝简单介绍:
根据表定义的结构, 根据传入的`switch_name`, 寻找表数据中的该列
如果不存在该列则选中`cls.push_switch`该列
对所有数据中该列满足`!= 'off'`的数据标记为有效数据
返回有效数据的列表
🌱参数:
🔹switch_name (`str`):
寻找表数据列名
🚀使用范例:
`await GsUser.get_switch_open_list('sign_switch')`
返回值:
🔸`List[T_User]`: 有效数据的列表, 如没有则为`[]`
'''
_switch = getattr(cls, switch_name, cls.push_switch) _switch = getattr(cls, switch_name, cls.push_switch)
sql = select(cls).filter(_switch != 'off') sql = select(cls).filter(_switch != 'off')
data = await session.execute(sql) data = await session.execute(sql)
@ -524,6 +982,25 @@ class User(BaseModel):
async def get_all_user( async def get_all_user(
cls: Type[T_User], session: AsyncSession, without_error: bool = True cls: Type[T_User], session: AsyncSession, without_error: bool = True
) -> List[T_User]: ) -> List[T_User]:
'''📝简单介绍:
基础`User`类的扩展方法, 获取到全部的数据列表
不一定会返回该表中所有的数据, 返回的数据中`cookie`必定存在值
🌱参数:
🔹without_error (`bool`, 默认是 `True`):
如果为`True`, 则会排除返回列表中存在`status != None`的数据
🚀使用范例:
`await GsUser.get_all_user()`
返回值:
🔸`List[T_User]`: 有效数据的列表, 如没有则为`[]`
'''
if without_error: if without_error:
sql = select(cls).where( sql = select(cls).where(
cls.status == null(), cls.cookie != null(), cls.cookie != '' cls.status == null(), cls.cookie != null(), cls.cookie != ''
@ -536,21 +1013,28 @@ class User(BaseModel):
@classmethod @classmethod
async def get_all_cookie(cls) -> List[str]: async def get_all_cookie(cls) -> List[str]:
'''获得表数据中全部的`cookie`列表'''
data = await cls.get_all_user() data = await cls.get_all_user()
return [_u.cookie for _u in data if _u.cookie] return [_u.cookie for _u in data if _u.cookie]
@classmethod @classmethod
async def get_all_stoken(cls) -> List[str]: async def get_all_stoken(cls) -> List[str]:
'''获得表数据中全部的`stoken`列表'''
data = await cls.get_all_user() data = await cls.get_all_user()
return [_u.stoken for _u in data if _u.stoken] return [_u.stoken for _u in data if _u.stoken]
@classmethod @classmethod
async def get_all_error_cookie(cls) -> List[str]: async def get_all_error_cookie(cls) -> List[str]:
'''获得表数据中,`status != None`情况下的所有`cookie`列表
也就是全部失效CK的列表
'''
data = await cls.get_all_user() data = await cls.get_all_user()
return [_u.cookie for _u in data if _u.cookie and _u.status] return [_u.cookie for _u in data if _u.cookie and _u.status]
@classmethod @classmethod
async def get_all_push_user_list(cls: Type[T_User]) -> List[T_User]: async def get_all_push_user_list(cls: Type[T_User]) -> List[T_User]:
'''获得表数据中全部的`push_switch != off`的数据列表'''
data = await cls.get_all_user() data = await cls.get_all_user()
return [user for user in data if user.push_switch != 'off'] return [user for user in data if user.push_switch != 'off']
@ -558,6 +1042,7 @@ class User(BaseModel):
async def user_exists( async def user_exists(
cls, uid: str, game_name: Optional[str] = None cls, uid: str, game_name: Optional[str] = None
) -> bool: ) -> bool:
'''根据传入`uid`,判定数据是否存在'''
data = await cls.select_data_by_uid(uid, game_name) data = await cls.select_data_by_uid(uid, game_name)
return True if data else False return True if data else False
@ -571,6 +1056,40 @@ class User(BaseModel):
condition: Optional[Dict[str, str]] = None, condition: Optional[Dict[str, str]] = None,
game_name: Optional[str] = None, game_name: Optional[str] = None,
) -> Optional[str]: ) -> Optional[str]:
'''📝简单介绍:
基础`User`类的扩展方法, 返回一个随机的cookie
如果该uid存在绑定cookie, 则返回他绑定的cookie
如果传入了`cache_model`并且该uid存在cache_model定义的表数据中返回该cookie
如果定义了condition, 则选取随机cookie时遵照此规则
🌱参数:
🔹uid (`str`):
传入的用户uid, 一般是该游戏的用户唯一识别id
🔹cache_model (`Optional[Type["Cache"]]`, 默认是 `None`):
继承基础`Cache`缓存表的模型, 例如`GsCache`
🔹condition (`Optional[Dict[str, str]]`, 默认是 `None`):
字典结构, 寻找表中符合`key == value`条件的值
🔹game_name (`Optional[str]`, 默认是 `None`):
根据该入参寻找相应列名
🚀使用范例:
`await GsUser.get_random_cookie(
uid, GsCache, {'region': server}, 'sr' if self.is_sr else None
)`
返回值:
🔸`Optional[str]`: 如找到符合条件的cookie则返回没有则为`None`
'''
# 有绑定自己CK 并且该CK有效的前提下优先使用自己CK # 有绑定自己CK 并且该CK有效的前提下优先使用自己CK
if await cls.user_exists(uid, game_name) and await cls.cookie_validate( if await cls.user_exists(uid, game_name) and await cls.cookie_validate(
uid, game_name uid, game_name
@ -618,7 +1137,11 @@ class User(BaseModel):
@with_session @with_session
async def delete_user_data_by_uid( async def delete_user_data_by_uid(
cls, session: AsyncSession, uid: str, game_name: Optional[str] = None cls, session: AsyncSession, uid: str, game_name: Optional[str] = None
): ) -> bool:
'''根据给定的`uid`获取数据后, 删除整行数据
如果该数据存在, 删除后返回`True`, 不存在, 则返回`False`
'''
if await cls.user_exists(uid, game_name): if await cls.user_exists(uid, game_name):
sql = delete(cls).where( sql = delete(cls).where(
getattr(cls, cls.get_gameid_name(game_name)) == uid getattr(cls, cls.get_gameid_name(game_name)) == uid
@ -637,6 +1160,7 @@ class Cache(BaseIDModel):
async def select_cache_cookie( async def select_cache_cookie(
cls, session: AsyncSession, uid: str, game_name: Optional[str] cls, session: AsyncSession, uid: str, game_name: Optional[str]
) -> Optional[str]: ) -> Optional[str]:
'''根据给定的`uid`获取表中存在缓存的`cookie`并返回'''
sql = select(cls).where( sql = select(cls).where(
getattr(cls, cls.get_gameid_name(game_name)) == uid getattr(cls, cls.get_gameid_name(game_name)) == uid
) )
@ -649,6 +1173,14 @@ class Cache(BaseIDModel):
async def delete_error_cache( async def delete_error_cache(
cls, session: AsyncSession, user: Type["User"] cls, session: AsyncSession, user: Type["User"]
) -> bool: ) -> bool:
'''根据给定的`user`模型中, 查找该模型所有数据的status
`status != None`, 则代表该数据cookie有问题
查找`Cache`表中该cookie对应数据行并删除
恒返回`True`
'''
data = await user.get_all_error_cookie() data = await user.get_all_error_cookie()
for cookie in data: for cookie in data:
sql = delete(cls).where(cls.cookie == cookie) sql = delete(cls).where(cls.cookie == cookie)
@ -660,6 +1192,14 @@ class Cache(BaseIDModel):
async def delete_all_cache( async def delete_all_cache(
cls, session: AsyncSession, user: Type["User"] cls, session: AsyncSession, user: Type["User"]
) -> bool: ) -> bool:
'''删除整个表的数据
根据给定的`user`模型中, 查找该模型所有数据的status
`status == limit30`, 则代表该数据cookie限制可能已回复
清除`User`表中该类cookie的status, 令其重新为`None`
'''
sql = update(user).where(user.status == 'limit30').values(status=None) sql = update(user).where(user.status == 'limit30').values(status=None)
empty_sql = delete(cls) empty_sql = delete(cls)
await session.execute(sql) await session.execute(sql)
@ -672,6 +1212,7 @@ class Cache(BaseIDModel):
async def refresh_cache( async def refresh_cache(
cls, session: AsyncSession, uid: str, game_name: Optional[str] = None cls, session: AsyncSession, uid: str, game_name: Optional[str] = None
) -> bool: ) -> bool:
'''删除指定`uid`的数据行'''
await session.execute( await session.execute(
delete(cls).where( delete(cls).where(
getattr(cls, cls.get_gameid_name(game_name)) == uid getattr(cls, cls.get_gameid_name(game_name)) == uid
@ -684,6 +1225,7 @@ class Cache(BaseIDModel):
async def insert_cache_data( async def insert_cache_data(
cls, session: AsyncSession, cookie: str, **data cls, session: AsyncSession, cookie: str, **data
) -> bool: ) -> bool:
'''新增指定`cookie`的数据行, `**data`为数据'''
new_data = cls(cookie=cookie, **data) new_data = cls(cookie=cookie, **data)
session.add(new_data) session.add(new_data)
await session.commit() await session.commit()
@ -699,6 +1241,23 @@ class Push(BaseBotIDModel):
uid: str, uid: str,
game_name: Optional[str] = None, game_name: Optional[str] = None,
) -> Optional[T_Push]: ) -> Optional[T_Push]:
'''📝简单介绍:
基础`Push`类的数据选择方法
🌱参数:
🔹uid (`str`):
传入的用户uid, 一般是该游戏的用户唯一识别id
🚀使用范例:
`await GsPush.select_data_by_uid(uid='100740568')`
返回值:
🔸`Optional[T_BaseModel]`: 选中符合条件的第一个数据不存在则为`None`
'''
result = await session.execute( result = await session.execute(
select(cls).where( select(cls).where(
getattr(cls, cls.get_gameid_name(game_name)) == uid, getattr(cls, cls.get_gameid_name(game_name)) == uid,

View File

@ -61,4 +61,5 @@ CONIFG_DEFAULT: Dict[str, GSC] = {
'AutoUpdateDep': GsBoolConfig('自动更新依赖', '更新插件时将会自动更新依赖', False), 'AutoUpdateDep': GsBoolConfig('自动更新依赖', '更新插件时将会自动更新依赖', False),
'EnablePicSrv': GsBoolConfig('将图片转链接发送(需公网)', '发送图片转链接', False), 'EnablePicSrv': GsBoolConfig('将图片转链接发送(需公网)', '发送图片转链接', False),
'PicSrv': GsStrConfig('将图片转链接发送(需公网)', '发送图片转链接', ''), 'PicSrv': GsStrConfig('将图片转链接发送(需公网)', '发送图片转链接', ''),
'PicUpload': GsBoolConfig('自动上传图片', '发送图片是将会自动上传', False),
} }