baiqwerdvd df108e60bd
update
2024-10-13 14:37:44 +08:00

350 lines
11 KiB
Python

import hashlib
import hmac
import json
import time
from copy import deepcopy
from typing import Any, Dict, Literal, Tuple, Union, cast
from urllib.parse import urlparse
import msgspec
from aiohttp import ClientSession, ContentTypeError, TCPConnector
from gsuid_core.logger import logger
from gsuid_core.utils.plugins_config.gs_config import core_plugins_config
from ...crypto import get_d_id
from ...database.models import ArknightsUser
from ...models.skland.models import (
ArknightsAttendanceCalendarModel,
ArknightsAttendanceModel,
ArknightsPlayerInfoModel,
ArknightsUserMeModel,
)
from .api import ARK_PLAYER_INFO, ARK_REFRESH_TOKEN, ARK_SKD_SIGN, ARK_WEB_USER
proxy_url = core_plugins_config.get_config("proxy").data
ssl_verify = core_plugins_config.get_config("MhySSLVerify").data
dId = get_d_id()
_HEADER: Dict[str, str] = {
"User-Agent": "Skland/1.5.1 (com.hypergryph.skland; build:100501001; Android 33; ) Okhttp/4.11.0",
"Accept-Encoding": "gzip",
"Connection": "close",
"Origin": "https://www.skland.com",
"Referer": "https://www.skland.com/",
"Content-Type": "application/json",
"manufacturer": "Xiaomi",
"os": "33",
"dId": dId, # "de9759a5afaa634f",
}
header_for_sign = {
"platform": "1",
"timestamp": "",
"dId": "",
"vName": "1.5.1",
}
def generate_signature(token: str, path: str, body_or_query: str):
t = str(int(time.time()) - 2)
_token = token.encode("utf-8")
header_ca = header_for_sign.copy()
header_ca["timestamp"] = t
header_ca_str = json.dumps(header_ca, separators=(",", ":"))
s = path + body_or_query + t + header_ca_str
hex_s = hmac.new(_token, s.encode("utf-8"), hashlib.sha256).hexdigest()
md5 = hashlib.md5(hex_s.encode("utf-8")).hexdigest()
return md5, header_ca
def get_sign_header(token: str, url: str, method: str, body: Union[dict, None], old_header: dict):
h = old_header.copy()
p = urlparse(url)
if method.lower() == "get":
h["sign"], header_ca = generate_signature(token, p.path, p.query)
else:
h["sign"], header_ca = generate_signature(token, p.path, json.dumps(body))
h.update(header_ca)
return h
class TokenExpiredError(Exception):
pass
class TokenRefreshFailed(Exception):
pass
class BaseArkApi:
proxy_url: Union[str, None] = proxy_url if proxy_url else None
async def _pass(
self,
gt: str,
ch: str,
) -> Tuple[Union[str, None], Union[str, None]]:
_pass_api = core_plugins_config.get_config("_pass_API").data
if _pass_api:
data = await self._ark_request(
url=f"{_pass_api}&gt={gt}&challenge={ch}",
method="GET",
)
if isinstance(data, int) or not data:
return None, None
else:
validate = data["data"]["validate"]
ch = data["data"]["challenge"]
else:
validate = None
return validate, ch
async def get_game_player_info(
self,
uid: str,
) -> Union[int, ArknightsPlayerInfoModel]:
cred: Union[str, None] = await ArknightsUser.get_user_attr_by_uid(
uid=uid,
attr="cred",
)
if cred is None:
return -60
token: Union[str, None] = await ArknightsUser.get_user_attr_by_uid(
uid=uid,
attr="token",
)
if token is None:
return -60
# is_vaild = await self.check_cred_valid(cred)
# if isinstance(is_vaild, bool):
# await ArknightsUser.delete_user_data_by_uid(uid)
# return -61
headers = deepcopy(_HEADER)
headers["cred"] = cred
header = get_sign_header(token, ARK_PLAYER_INFO, "get", None, headers)
raw_data = await self.ark_request(
url=ARK_PLAYER_INFO,
params={"uid": uid},
header=header,
)
if isinstance(raw_data, int):
return raw_data
if raw_data is None:
return -61
unpack_data = self.unpack(raw_data)
if isinstance(unpack_data, int):
return unpack_data
else:
return msgspec.convert(unpack_data, type=ArknightsPlayerInfoModel)
async def skd_sign(self, uid: str) -> Union[int, ArknightsAttendanceModel]:
cred: Union[str, None] = await ArknightsUser.get_user_attr_by_uid(
uid=uid,
attr="cred",
)
if cred is None:
return -60
token: Union[str, None] = await ArknightsUser.get_user_attr_by_uid(
uid=uid,
attr="token",
)
if token is None:
return -60
# is_vaild = await self.check_cred_valid(cred)
# if isinstance(is_vaild, bool):
# await ArknightsUser.delete_user_data_by_uid(uid)
# return -61
headers = deepcopy(_HEADER)
headers["cred"] = cred
data = {"uid": uid, "gameId": 1}
header = get_sign_header(token, ARK_SKD_SIGN, "post", data, headers)
raw_data = await self.ark_request(
url=ARK_SKD_SIGN,
method="POST",
data=data,
header=header,
)
if isinstance(raw_data, int):
return raw_data
if raw_data is None:
return -61
unpack_data = self.unpack(raw_data)
if isinstance(unpack_data, int):
return unpack_data
else:
return msgspec.convert(unpack_data, ArknightsAttendanceModel)
async def get_sign_info(
self,
uid: str,
) -> Union[int, ArknightsAttendanceCalendarModel]:
cred: Union[str, None] = await ArknightsUser.get_user_attr_by_uid(
uid=uid,
attr="cred",
)
if cred is None:
return -60
token: Union[str, None] = await ArknightsUser.get_user_attr_by_uid(
uid=uid,
attr="token",
)
if token is None:
return -60
headers = deepcopy(_HEADER)
headers["cred"] = cred
header = get_sign_header(token, ARK_SKD_SIGN, "get", None, headers)
raw_data = await self.ark_request(
url=ARK_SKD_SIGN,
method="GET",
params={
"uid": uid,
"gameId": 1,
},
header=header,
)
if isinstance(raw_data, int):
return raw_data
if raw_data is None:
return -61
unpack_data = self.unpack(raw_data)
if isinstance(unpack_data, int):
return unpack_data
else:
return msgspec.convert(unpack_data, ArknightsAttendanceCalendarModel)
# async def check_cred_valid(
# self,
# cred: Union[str, None] = None,
# token: Union[str, None] = None,
# uid: Union[str, None] = None,
# ) -> Union[bool, ArknightsUserMeModel]:
# if uid is not None:
# cred = (
# cred
# if cred
# else await ArknightsUser.get_user_attr_by_uid(
# uid=uid,
# attr="cred",
# )
# )
# header = deepcopy(_HEADER)
# if cred is None:
# return False
# header["cred"] = cred
# header = await self.set_sign(ARK_WEB_USER, header=header, token=token)
# raw_data = await self.ark_request(ARK_WEB_USER, header=header)
# if isinstance(raw_data, int) or not raw_data:
# return False
# if "code" in raw_data and raw_data["code"] == 10001:
# logger.info(f"cred is invalid {raw_data}")
# return False
# unpack_data = self.unpack(raw_data)
# return msgspec.convert(unpack_data, type=ArknightsUserMeModel)
def unpack(self, raw_data: Dict) -> Dict:
try:
data = raw_data["data"]
return data
except KeyError:
return raw_data
async def refresh_token(self, cred: str, uid: Union[str, None] = None) -> str:
header = deepcopy(_HEADER)
header["cred"] = cred
header["sign_enable"] = "false"
raw_data = await self.ark_request(url=ARK_REFRESH_TOKEN, header=header)
if isinstance(raw_data, int) or not raw_data:
raise TokenRefreshFailed
else:
token = cast(str, self.unpack(raw_data)["token"])
uid = await ArknightsUser.get_uid_by_cred(cred)
if uid is not None:
_ = await ArknightsUser.update_user_attr_by_uid(
uid=uid,
attr="token",
value=token,
)
return token
async def ark_request(
self,
url: str,
method: Literal["GET", "POST"] = "GET",
header: Dict[str, Any] = _HEADER,
params: Union[Dict[str, Any], None] = None,
data: Union[Dict[str, Any], None] = None,
use_proxy: Union[bool, None] = False,
) -> Union[Dict, Union[int, None]]:
logger.debug(f"ark_request {url} {method} {header} {params} {data} {use_proxy}")
try:
raw_data = await self._ark_request(
url=url,
method=method,
header=header,
params=params,
data=data,
use_proxy=use_proxy,
)
except TokenExpiredError:
await self.refresh_token(header["cred"])
headers = deepcopy(header)
headers["cred"] = header["cred"]
headers = get_sign_header(headers["cred"], url, method, data, headers)
raw_data = await self._ark_request(
url=url,
method=method,
header=headers,
params=params,
data=data,
use_proxy=use_proxy,
)
return raw_data
async def _ark_request(
self,
url: str,
method: Literal["GET", "POST"] = "GET",
header: Dict[str, Any] = _HEADER,
params: Union[Dict[str, Any], None] = None,
data: Union[Dict[str, Any], None] = None,
use_proxy: Union[bool, None] = False,
) -> Union[Dict, Union[int, None]]:
async with ClientSession(
connector=TCPConnector(verify_ssl=ssl_verify),
) as client:
raw_data = {}
if "cred" not in header:
return 10001
async with client.request(
method=method,
url=url,
headers=header,
params=params,
json=data,
proxy=self.proxy_url if use_proxy else None,
timeout=300,
) as resp:
try:
raw_data = await resp.json()
except ContentTypeError:
_raw_data = await resp.text()
raw_data = {"code": -999, "data": _raw_data}
logger.info(raw_data)
# 判断code
if raw_data["code"] == 0:
return raw_data
if raw_data["code"] == 10000:
# 请求异常
logger.info(f"{url} {raw_data}")
raise TokenExpiredError
if raw_data["code"] == 10001:
# 重复签到
return raw_data["code"]