Some evil things

This commit is contained in:
baiqwerdvd 2024-04-05 22:38:22 +08:00
parent e8e54dd79d
commit 373db83bab
No known key found for this signature in database
GPG Key ID: 7717E46E1797411A
4 changed files with 426 additions and 0 deletions

View File

@ -0,0 +1,78 @@
from gsuid_core.bot import Bot
from gsuid_core.sv import SV
from gsuid_core.models import Event
from ArknightsUID.utils.database.models import (
ArknightsBind,
ArknightsPush,
ArknightsUser,
)
from ArknightsUID.utils.ark_api import ark_skd_api
from ArknightsUID.utils.ark_prefix import PREFIX
from ArknightsUID.utils.error_reply import UID_HINT
from .login import SklandLogin
sv_skland_login = SV("ark森空岛登录")
@sv_skland_login.on_fullmatch(f"{PREFIX}森空岛登录")
async def get_resp_msg(bot: Bot, ev: Event):
uid_list = await ArknightsBind.get_uid_list_by_game(ev.user_id, ev.bot_id)
if uid_list is None:
return UID_HINT
phone_number = ev.text.strip()
if not phone_number.isdigit():
return await bot.send("你输入了错误的格式!")
resp = await bot.receive_resp(
f"请确认你的手机号码: {phone_number}."
"如果正确请回复'确认', 其他任何回复将取消本次操作."
)
if resp is not None and resp.text == "确认":
login = SklandLogin(phone_number)
login.send_phone_code()
code = await bot.receive_resp("请输入验证码:")
if code is None or not code.text.isdigit():
return await bot.send("你输入了错误的格式!")
login.token_by_phone_code(code.text)
login.post_account_info_hg()
login.user_oauth2_v2_grant()
(skland_cred, skland_token, skland_userId) = login.generate_cred_by_code()
check_cred = await ark_skd_api.check_cred_valid(
cred=skland_cred,
token=skland_token,
)
if isinstance(check_cred, bool):
return "Cred无效!"
else:
skd_uid = check_cred.user.id_
assert skland_userId == skd_uid
uid = check_cred.gameStatus.uid
if uid not in uid_list:
return "请先绑定该 Cred 对应的 uid"
skd_data = await ArknightsUser.select_data_by_uid(uid)
push_data = await ArknightsPush.select_data_by_uid(uid)
if not skd_data:
await ArknightsUser.insert_data(
ev.user_id,
ev.bot_id,
cred=skland_cred,
uid=uid,
skd_uid=skd_uid,
token=skland_token,
)
else:
await ArknightsUser.update_data(
ev.user_id,
ev.bot_id,
cred=skland_cred,
uid=uid,
skd_uid=skd_uid,
token=skland_token,
)
if not push_data:
await ArknightsPush.insert_push_data(ev.bot_id, uid=uid, skd_uid=skd_uid)
return await bot.send("登录成功!")

View File

@ -0,0 +1,9 @@
ARK_ACCOUNT_SERVER = "https://as.hypergryph.com/"
SKLAND_WEB_API = "https://web-api.skland.com/"
ZONAI_SKLAND_URL = "https://zonai.skland.com/"
ARK_LOGIN_SEND_PHONE_CODE = ARK_ACCOUNT_SERVER + "general/v1/send_phone_code"
ARK_TOKEN_BY_PHONE_CODE = ARK_ACCOUNT_SERVER + "user/auth/v2/token_by_phone_code"
ARK_USER_OAUTH2_V2_GRANT = ARK_ACCOUNT_SERVER + "user/oauth2/v2/grant"
ARK_ACCONUT_INFO_HG = SKLAND_WEB_API + "account/info/hg"
GENERATE_CRED_BY_CODE = ZONAI_SKLAND_URL + "web/v1/user/auth/generate_cred_by_code"

View File

@ -0,0 +1,227 @@
import re
from typing import ClassVar, Dict, TypeVar, Union
import httpx
from datetime import datetime
from msgspec import UnsetType, convert
from msgspec import json as mscjson
from gsuid_core.utils.plugins_config.gs_config import core_plugins_config
from ArknightsUID.arknightsuid_login.model import (
AccountInfoHGRequest,
AccountInfoHGResponse,
FuckMysGeetestPassResponse,
GeneralGeetestData,
GeneralV1SendPhoneCodeRequest,
GeneralV1SendPhoneCodeResponse,
Oauth2V2GrantRequest,
Oauth2V2GrantResponse,
UserAuthV2TokenByPhoneCodeRequest,
UserAuthV2TokenByPhoneCodeResponse,
ZonaiSklandWebUserGenerateCredByCodeRequest,
ZonaiSklandWebUserGenerateCredByCodeResponse,
)
from ArknightsUID.arknightsuid_login.constant import (
ARK_LOGIN_SEND_PHONE_CODE,
ARK_TOKEN_BY_PHONE_CODE,
ARK_ACCONUT_INFO_HG,
ARK_USER_OAUTH2_V2_GRANT,
GENERATE_CRED_BY_CODE,
)
T1 = TypeVar("T1")
T2 = TypeVar("T2")
class SklandLoginError(Exception):
def __init__(self, url: str, message: str):
self.url = url
self.message = message
def __str__(self):
return self.url + " " + self.message
def transUnset(v: Union[T1, UnsetType], d: T2 = None) -> Union[T1, T2]:
return v if not isinstance(v, UnsetType) else d
class SklandLogin:
_HEADER: ClassVar[Dict[str, str]] = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", # noqa: E501
"content-type": "application/json;charset=UTF-8",
"origin": "https://www.skland.com",
"referer": "https://www.skland.com",
}
def __init__(self, phone: str, geetest_token: Union[str, None] = None):
self.phone = phone
self.client = httpx.Client(
headers=self._HEADER,
verify=False,
)
self.geetest_token = geetest_token
def send_phone_code(
self,
override_geetest: Union[GeneralGeetestData, None] = None,
):
if override_geetest:
data = GeneralV1SendPhoneCodeRequest(
phone=self.phone,
type=2,
captcha=override_geetest,
)
else:
data = GeneralV1SendPhoneCodeRequest(
phone=self.phone,
type=1,
)
response = self.client.post(
ARK_LOGIN_SEND_PHONE_CODE,
json=mscjson.decode(mscjson.encode(data)),
)
response.raise_for_status()
result = convert(response.json(), GeneralV1SendPhoneCodeResponse)
if result.status == 1:
captcha_data = transUnset(result.data)
assert captcha_data is not None
_pass_api = core_plugins_config.get_config("_pass_API").data
if _pass_api is None:
raise SklandLoginError(
ARK_LOGIN_SEND_PHONE_CODE,
"config _pass_API is None",
)
return_data = httpx.post(
f"{_pass_api}&gt={captcha_data.captcha.gt}&challenge={captcha_data.captcha.challenge}",
verify=False,
timeout=100,
)
geetest_pass_data = convert(return_data.json(), FuckMysGeetestPassResponse)
if geetest_pass_data.code != 0:
raise SklandLoginError(
"_pass_API",
geetest_pass_data.info,
)
self.send_phone_code(
override_geetest=GeneralGeetestData(
geetest_challenge=geetest_pass_data.data.challenge,
geetest_validate=geetest_pass_data.data.validate,
geetest_seccode=f"{geetest_pass_data.data.validate}|jordan",
)
)
elif result.status != 0:
return result.msg
def token_by_phone_code(self, code: str):
data = UserAuthV2TokenByPhoneCodeRequest(
phone=self.phone,
code=code,
)
response = self.client.post(
ARK_TOKEN_BY_PHONE_CODE,
json=mscjson.decode(mscjson.encode(data)),
)
response.raise_for_status()
result = convert(response.json(), UserAuthV2TokenByPhoneCodeResponse)
status = result.status
if status == 101:
msg = transUnset(result.msg)
assert msg is not None
raise SklandLoginError(ARK_TOKEN_BY_PHONE_CODE, msg)
data = transUnset(result.data)
assert data is not None
self.token = data.token
def post_account_info_hg(self):
if self.token is None:
raise SklandLoginError(ARK_ACCONUT_INFO_HG, "token not set!")
data = AccountInfoHGRequest(
content=self.token,
)
response = self.client.post(
ARK_ACCONUT_INFO_HG,
json=mscjson.decode(mscjson.encode(data)),
)
set_cookie = response.headers.get("set-cookie")
matches = re.findall(r"ACCOUNT=([^;]+)", set_cookie)
account_cookie = matches[0]
self.client = httpx.Client(
headers=self._HEADER,
verify=False,
cookies={"ACCOUNT": account_cookie},
)
response.raise_for_status()
result = convert(response.json(), AccountInfoHGResponse)
if result.code != 0:
raise SklandLoginError(ARK_ACCONUT_INFO_HG, result.msg)
self.get_account_info_hg()
def get_account_info_hg(self):
if self.token is None:
raise SklandLoginError(ARK_ACCONUT_INFO_HG, "token not set!")
data = AccountInfoHGRequest(
content=self.token,
)
response = self.client.post(
ARK_ACCONUT_INFO_HG,
json=mscjson.decode(mscjson.encode(data)),
)
response.raise_for_status()
result = convert(response.json(), AccountInfoHGResponse)
if result.code != 0:
raise SklandLoginError(ARK_ACCONUT_INFO_HG, result.msg)
def user_oauth2_v2_grant(self):
data = Oauth2V2GrantRequest(
token=self.token,
appCode="4ca99fa6b56cc2ba",
type=0,
)
response = self.client.post(
ARK_USER_OAUTH2_V2_GRANT,
json=mscjson.decode(mscjson.encode(data)),
)
response.raise_for_status()
result = convert(response.json(), Oauth2V2GrantResponse)
status = result.status
if status != 0:
raise SklandLoginError(ARK_USER_OAUTH2_V2_GRANT, result.msg)
result_data = transUnset(result.data)
if not result_data:
raise SklandLoginError(ARK_USER_OAUTH2_V2_GRANT, "result.data is None")
uid = transUnset(result_data.uid)
if not uid:
raise SklandLoginError(ARK_USER_OAUTH2_V2_GRANT, "result.data.uid is None")
code = transUnset(result_data.code)
if not code:
raise SklandLoginError(ARK_USER_OAUTH2_V2_GRANT, "result.data.code is None")
self.uid = uid
self.code = code
def generate_cred_by_code(self):
data = ZonaiSklandWebUserGenerateCredByCodeRequest(
kind=1,
code=self.code,
)
self.client.headers["platform"] = "3"
self.client.headers["vname"] = "1.0.0"
self.client.headers["timestamp"] = str(int(datetime.now().timestamp()))
self.client.headers["did"] = (
"BFv9p5AJUOqJbpXDqjezZOf7TxJqppEd7iXxqqq7e0Z+Y0FUX/8kpnXqYe1UByxrjWKh43Webhic8ZEoJvquOgg=="
)
response = self.client.post(
GENERATE_CRED_BY_CODE,
json=mscjson.decode(mscjson.encode(data)),
)
response.raise_for_status()
result = convert(response.json(), ZonaiSklandWebUserGenerateCredByCodeResponse)
if result.code != 0:
raise SklandLoginError(
GENERATE_CRED_BY_CODE,
result.message,
)
self.skland_cred = result.data.cred
self.skland_token = result.data.token
self.skland_userId = result.data.userId
return (self.skland_cred, self.skland_token, self.skland_userId)

View File

@ -0,0 +1,112 @@
from typing import Dict, Union
from msgspec import Struct, field, UnsetType, UNSET
class GeneralGeetestData(Struct):
geetest_challenge: str
geetest_seccode: str
geetest_validate: str
class GeneralV1SendPhoneCodeRequest(Struct):
phone: str
type: int
captcha: Union[GeneralGeetestData, UnsetType] = field(default=UNSET)
class CaptchaItemModel(Struct):
success: int
gt: str
challenge: str
new_captcha: bool
class GeneralV1SendPhoneCodeData(Struct):
captcha: CaptchaItemModel
class GeneralV1SendPhoneCodeResponse(Struct):
status: int
msg: Union[str, UnsetType] = field(default=UNSET)
type: Union[str, UnsetType] = field(default=UNSET)
data: Union[GeneralV1SendPhoneCodeData, UnsetType] = field(default=UNSET)
class TokenData(Struct):
token: str
class UserAuthV2TokenByPhoneCodeRequest(Struct):
code: str
phone: str
class UserAuthV2TokenByPhoneCodeResponse(Struct):
status: int
msg: Union[str, UnsetType] = field(default=UNSET)
data: Union[TokenData, UnsetType] = field(default=UNSET)
type: Union[str, UnsetType] = field(default=UNSET)
class AccountInfoHGRequest(Struct):
content: str
class AccountInfoHGResponse(Struct):
code: int
msg: str
data: Dict
class GeetestData(Struct):
gt: str
challenge: str
validate: str
type: str
class FuckMysGeetestPassResponse(Struct):
code: int
info: str
data: GeetestData
times: int
class Oauth2V2GrantRequest(Struct):
appCode: str
token: str
type: int
class Oauth2V2CodeDataItemResponse(Struct):
hgId: Union[str, UnsetType] = field(default=UNSET)
token: Union[str, UnsetType] = field(default=UNSET)
code: Union[str, UnsetType] = field(default=UNSET)
uid: Union[str, UnsetType] = field(default=UNSET)
delete_commit_ts: Union[int, UnsetType] = field(default=UNSET)
delete_request_ts: Union[int, UnsetType] = field(default=UNSET)
class Oauth2V2GrantResponse(Struct):
status: int
msg: str
data: Union[Oauth2V2CodeDataItemResponse, UnsetType] = field(default=UNSET)
type: Union[str, UnsetType] = field(default=UNSET)
class ZonaiSklandWebUserGenerateCredByCodeRequest(Struct):
kind: int
code: str
class ZonaiSklandWebUserData(Struct):
cred: str
userId: str
token: str
class ZonaiSklandWebUserGenerateCredByCodeResponse(Struct):
code: int
message: str
timestamp: str
data: ZonaiSklandWebUserData