This commit is contained in:
baiqwerdvd 2024-10-13 14:35:02 +08:00
parent 714f76eb9d
commit d30e570b80
No known key found for this signature in database
GPG Key ID: 7717E46E1797411A
2 changed files with 93 additions and 139 deletions

View File

@ -137,26 +137,7 @@ class SklandLogin:
self.token = data.token
self.get_ark_uid()
# def post_account_info_hg(self):
# if self.token is None:
# raise SklandLoginError(ARK_ACCONUT_INFO_HG, "token not set!")
# response = self.client.post(
# ARK_ACCONUT_INFO_HG,
# json={"content": self.token},
# )
# set_cookie: str = response.headers.get("set-cookie")
# matches = re.findall(r"ACCOUNT=([^;]+)", set_cookie)
# account_cookie: str = matches[0]
# self.hg_token = account_cookie
# self.get_ark_uid()
def user_oauth2_v2_grant(self):
# data = Oauth2V2GrantRequest(
# token=self.token,
# appCode="4ca99fa6b56cc2ba",
# type=0,
# )
# self.client.headers["dId"] = get_d_id()
response = self.client.post(
ARK_USER_OAUTH2_V2_GRANT,
json={"appCode": "4ca99fa6b56cc2ba", "token": self.token, "type": 0},

View File

@ -38,6 +38,37 @@ _HEADER: Dict[str, str] = {
}
header_for_sign = {
"platform": "1",
"timestamp": "",
"dId": "",
"vName": "1.5.1",
}
def generate_signature(token: str, path: str, body_or_query: str):
t = str(int(time.time()) - 2)
_token = token.encode("utf-8")
header_ca = header_for_sign.copy()
header_ca["timestamp"] = t
header_ca_str = json.dumps(header_ca, separators=(",", ":"))
s = path + body_or_query + t + header_ca_str
hex_s = hmac.new(_token, s.encode("utf-8"), hashlib.sha256).hexdigest()
md5 = hashlib.md5(hex_s.encode("utf-8")).hexdigest()
return md5, header_ca
def get_sign_header(token: str, url: str, method: str, body: Union[dict, None], old_header: dict):
h = old_header.copy()
p = urlparse(url)
if method.lower() == "get":
h["sign"], header_ca = generate_signature(token, p.path, p.query)
else:
h["sign"], header_ca = generate_signature(token, p.path, json.dumps(body))
h.update(header_ca)
return h
class TokenExpiredError(Exception):
pass
@ -80,13 +111,19 @@ class BaseArkApi:
)
if cred is None:
return -60
is_vaild = await self.check_cred_valid(cred)
if isinstance(is_vaild, bool):
await ArknightsUser.delete_user_data_by_uid(uid)
return -61
header = deepcopy(_HEADER)
header["cred"] = cred
header = await self.set_sign(ARK_PLAYER_INFO, header=header)
token: Union[str, None] = await ArknightsUser.get_user_attr_by_uid(
uid=uid,
attr="token",
)
if token is None:
return -60
# is_vaild = await self.check_cred_valid(cred)
# if isinstance(is_vaild, bool):
# await ArknightsUser.delete_user_data_by_uid(uid)
# return -61
headers = deepcopy(_HEADER)
headers["cred"] = cred
header = get_sign_header(token, ARK_PLAYER_INFO, "get", None, headers)
raw_data = await self.ark_request(
url=ARK_PLAYER_INFO,
params={"uid": uid},
@ -109,20 +146,20 @@ class BaseArkApi:
)
if cred is None:
return -60
token: Union[str, None] = await ArknightsUser.get_user_attr_by_uid(
uid=uid,
attr="token",
)
if token is None:
return -60
# is_vaild = await self.check_cred_valid(cred)
# if isinstance(is_vaild, bool):
# await ArknightsUser.delete_user_data_by_uid(uid)
# return -61
header = deepcopy(_HEADER)
header["cred"] = cred
headers = deepcopy(_HEADER)
headers["cred"] = cred
data = {"uid": uid, "gameId": 1}
header = await self.set_sign(
ARK_SKD_SIGN,
header=header,
data=data,
)
header["Content-Type"] = "application/json"
header["Content-Length"] = str(len(json.dumps(data)))
header = get_sign_header(token, ARK_SKD_SIGN, "post", data, headers)
raw_data = await self.ark_request(
url=ARK_SKD_SIGN,
method="POST",
@ -149,20 +186,15 @@ class BaseArkApi:
)
if cred is None:
return -60
# is_vaild = await self.check_cred_valid(cred)
# if isinstance(is_vaild, bool):
# await ArknightsUser.delete_user_data_by_uid(uid)
# return -61
header = deepcopy(_HEADER)
header["cred"] = cred
header = await self.set_sign(
ARK_SKD_SIGN,
header=header,
params={
"uid": uid,
"gameId": 1,
},
token: Union[str, None] = await ArknightsUser.get_user_attr_by_uid(
uid=uid,
attr="token",
)
if token is None:
return -60
headers = deepcopy(_HEADER)
headers["cred"] = cred
header = get_sign_header(token, ARK_SKD_SIGN, "get", None, headers)
raw_data = await self.ark_request(
url=ARK_SKD_SIGN,
method="GET",
@ -182,34 +214,34 @@ class BaseArkApi:
else:
return msgspec.convert(unpack_data, ArknightsAttendanceCalendarModel)
async def check_cred_valid(
self,
cred: Union[str, None] = None,
token: Union[str, None] = None,
uid: Union[str, None] = None,
) -> Union[bool, ArknightsUserMeModel]:
if uid is not None:
cred = (
cred
if cred
else await ArknightsUser.get_user_attr_by_uid(
uid=uid,
attr="cred",
)
)
header = deepcopy(_HEADER)
if cred is None:
return False
header["cred"] = cred
header = await self.set_sign(ARK_WEB_USER, header=header, token=token)
raw_data = await self.ark_request(ARK_WEB_USER, header=header)
if isinstance(raw_data, int) or not raw_data:
return False
if "code" in raw_data and raw_data["code"] == 10001:
logger.info(f"cred is invalid {raw_data}")
return False
unpack_data = self.unpack(raw_data)
return msgspec.convert(unpack_data, type=ArknightsUserMeModel)
# async def check_cred_valid(
# self,
# cred: Union[str, None] = None,
# token: Union[str, None] = None,
# uid: Union[str, None] = None,
# ) -> Union[bool, ArknightsUserMeModel]:
# if uid is not None:
# cred = (
# cred
# if cred
# else await ArknightsUser.get_user_attr_by_uid(
# uid=uid,
# attr="cred",
# )
# )
# header = deepcopy(_HEADER)
# if cred is None:
# return False
# header["cred"] = cred
# header = await self.set_sign(ARK_WEB_USER, header=header, token=token)
# raw_data = await self.ark_request(ARK_WEB_USER, header=header)
# if isinstance(raw_data, int) or not raw_data:
# return False
# if "code" in raw_data and raw_data["code"] == 10001:
# logger.info(f"cred is invalid {raw_data}")
# return False
# unpack_data = self.unpack(raw_data)
# return msgspec.convert(unpack_data, type=ArknightsUserMeModel)
def unpack(self, raw_data: Dict) -> Dict:
try:
@ -236,46 +268,6 @@ class BaseArkApi:
)
return token
async def set_sign(
self,
url: str,
header: Dict[str, Any],
data: Union[Dict[str, Any], None] = None,
params: Union[Dict[str, Any], None] = None,
token: Union[str, None] = None,
) -> Dict:
parsed_url = urlparse(url)
path = parsed_url.path
timestamp = str(int(time.time()) - 2)
header_ca = {
"platform": "3",
"timestamp": timestamp,
"vName": "1.0.0",
}
header_ca_str = json.dumps(header_ca, separators=(",", ":"))
s2 = ""
if params:
logger.debug(f"params {params}")
s2 += "&".join([str(x) + "=" + str(params[x]) for x in params])
if data:
logger.debug(f"data {data}")
s2 += json.dumps(data)
logger.debug(f"{path} {s2} {timestamp} {header_ca_str}")
str2 = path + s2 + timestamp + header_ca_str
token_ = await ArknightsUser.get_token_by_cred(header["cred"])
logger.debug(f'cred {header["cred"]} token {token} token_ {token_}')
token = token if token else token_
if token is None:
raise Exception("token is None")
encode_token = token.encode("utf-8")
hex_s = hmac.new(encode_token, str2.encode("utf-8"), hashlib.sha256).hexdigest()
sign = hashlib.md5(hex_s.encode("utf-8")).hexdigest().encode("utf-8").decode("utf-8")
header["sign"] = sign
header["timestamp"] = timestamp
# header["dId"] = dId
logger.debug(header)
return header
async def ark_request(
self,
url: str,
@ -297,11 +289,13 @@ class BaseArkApi:
)
except TokenExpiredError:
await self.refresh_token(header["cred"])
header = await self.set_sign(url, header, data, params)
headers = deepcopy(header)
headers["cred"] = header["cred"]
headers = get_sign_header(headers["cred"], url, method, data, headers)
raw_data = await self._ark_request(
url=url,
method=method,
header=header,
header=headers,
params=params,
data=data,
use_proxy=use_proxy,
@ -352,24 +346,3 @@ class BaseArkApi:
if raw_data["code"] == 10001:
# 重复签到
return raw_data["code"]
# 判断status
# if 'status' in raw_data and 'msg' in raw_data:
# retcode = 1
# else:
# retcode = 0
# if retcode == 1 and data:
# vl, ch = await self._pass(
# gt=raw_data['data']['captcha']['gt'],
# ch=raw_data['data']['captcha']['challenge']
# )
# data['captcha'] = {}
# data['captcha']['geetest_challenge'] = ch
# data['captcha']['geetest_validate'] = vl
# data['captcha']['geetest_seccode'] = f'{vl}|jordan'
# elif retcode != 0:
# return retcode
# else:
# return raw_data
# return 10001