update sign

This commit is contained in:
lulu666lulu 2023-09-19 16:40:29 +08:00 committed by qwerdvd
parent d90dde6ea4
commit ab9b434bd9
2 changed files with 66 additions and 2 deletions

View File

@ -1,4 +1,8 @@
from copy import deepcopy from copy import deepcopy
import hashlib
import json
import time
import hmac
from typing import Any, ClassVar, Literal from typing import Any, ClassVar, Literal
import msgspec import msgspec
@ -154,6 +158,39 @@ class BaseArkApi:
def unpack(self, raw_data: dict) -> dict: def unpack(self, raw_data: dict) -> dict:
return raw_data['data'] return raw_data['data']
async def refresh_token(self, uid: str) -> int | str:
pass
async def set_sign(
self,
url: str,
headers: dict[str, Any],
data: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
) -> dict:
path = url.split("://")[1].split("/")[1]
timestamp = str(int(time.time()))
str1=json.dumps(
{
"platform":headers.get(['platform'],""),
'timestamp': timestamp,
'dId': headers.get(["dId"],""),
"vName":headers.get(["vName"],"")
}
,separators=(',', ':')
)
s2=""
if params:
s2+="&".join(["=".join(x) for x in params])
if data:
s2+=json.dumps(data,separators=(',', ':'))
str2=path+s2+headers["timestamp"]+str1
token: str | None = await ArknightsUser.get_token_by_cred(headers['Cred'])
sign=hashlib.md5(hmac.new(token.encode(),str2.encode(), hashlib.sha256).hexdigest().encode()).hexdigest()
headers["sign"]=sign
headers["timestamp"]=timestamp
return headers
async def _ark_request( async def _ark_request(
self, self,
url: str, url: str,
@ -190,6 +227,11 @@ class BaseArkApi:
if 'code' in raw_data and raw_data['code'] != 0: if 'code' in raw_data and raw_data['code'] != 0:
logger.info(raw_data) logger.info(raw_data)
return raw_data return raw_data
elif 'code' in raw_data and raw_data['code'] == 10000:
#token失效
logger.info(raw_data)
await self.refresh_token(header['Cred'])
# 判断status # 判断status
if 'status' in raw_data and 'msg' in raw_data: if 'status' in raw_data and 'msg' in raw_data:

View File

@ -1,8 +1,10 @@
from typing import Literal from typing import Literal
from gsuid_core.utils.database.base_models import Bind, Push, T_BaseIDModel, User from gsuid_core.utils.database.base_models import Bind, Push, T_BaseIDModel, T_User, User, with_session
from gsuid_core.webconsole.mount_app import GsAdminModel, PageSchema, site from gsuid_core.webconsole.mount_app import GsAdminModel, PageSchema, site
from sqlmodel import Field from sqlmodel import Field
from sqlalchemy.future import select
from sqlalchemy.ext.asyncio import AsyncSession
class ArknightsBind(Bind, table=True): class ArknightsBind(Bind, table=True):
@ -13,7 +15,27 @@ class ArknightsUser(User, table=True):
uid: str | None = Field(default=None, title='明日方舟UID') uid: str | None = Field(default=None, title='明日方舟UID')
skd_uid: str | None = Field(default=None, title='SKD用户ID') skd_uid: str | None = Field(default=None, title='SKD用户ID')
cred: str | None = Field(default=None, title='SKD凭证') cred: str | None = Field(default=None, title='SKD凭证')
token: str | None = Field(default=None, title='SKD令牌') toekn: str | None = Field(default=None, title='SKD Token')
@classmethod
@with_session
async def select_data_by_cred(
cls: type[T_User],
session: AsyncSession,
cred: str
) -> T_User | None:
result = await session.execute(
select(cls).where(
'cred' == cred,
)
)
data = result.scalars().all()
return data[0] if data else None
@classmethod
async def get_token_by_cred(cls, cred: str) -> str | None:
result = await cls.select_data_by_cred(cred)
return getattr(result, 'token') if result else None
class ArknightsPush(Push, table=True): class ArknightsPush(Push, table=True):