diff --git a/ArknightsUID/arknightsuid_login/__init__.py b/ArknightsUID/arknightsuid_login/__init__.py new file mode 100644 index 0000000..c878df1 --- /dev/null +++ b/ArknightsUID/arknightsuid_login/__init__.py @@ -0,0 +1,78 @@ +from gsuid_core.bot import Bot +from gsuid_core.sv import SV +from gsuid_core.models import Event + +from ArknightsUID.utils.database.models import ( + ArknightsBind, + ArknightsPush, + ArknightsUser, +) + +from ArknightsUID.utils.ark_api import ark_skd_api +from ArknightsUID.utils.ark_prefix import PREFIX +from ArknightsUID.utils.error_reply import UID_HINT +from .login import SklandLogin + +sv_skland_login = SV("ark森空岛登录") + + +@sv_skland_login.on_fullmatch(f"{PREFIX}森空岛登录") +async def get_resp_msg(bot: Bot, ev: Event): + uid_list = await ArknightsBind.get_uid_list_by_game(ev.user_id, ev.bot_id) + if uid_list is None: + return UID_HINT + phone_number = ev.text.strip() + if not phone_number.isdigit(): + return await bot.send("你输入了错误的格式!") + resp = await bot.receive_resp( + f"请确认你的手机号码: {phone_number}." + "如果正确请回复'确认', 其他任何回复将取消本次操作." + ) + if resp is not None and resp.text == "确认": + login = SklandLogin(phone_number) + login.send_phone_code() + code = await bot.receive_resp("请输入验证码:") + if code is None or not code.text.isdigit(): + return await bot.send("你输入了错误的格式!") + login.token_by_phone_code(code.text) + login.post_account_info_hg() + login.user_oauth2_v2_grant() + (skland_cred, skland_token, skland_userId) = login.generate_cred_by_code() + + check_cred = await ark_skd_api.check_cred_valid( + cred=skland_cred, + token=skland_token, + ) + + if isinstance(check_cred, bool): + return "Cred无效!" + else: + skd_uid = check_cred.user.id_ + assert skland_userId == skd_uid + uid = check_cred.gameStatus.uid + if uid not in uid_list: + return "请先绑定该 Cred 对应的 uid" + + skd_data = await ArknightsUser.select_data_by_uid(uid) + push_data = await ArknightsPush.select_data_by_uid(uid) + if not skd_data: + await ArknightsUser.insert_data( + ev.user_id, + ev.bot_id, + cred=skland_cred, + uid=uid, + skd_uid=skd_uid, + token=skland_token, + ) + else: + await ArknightsUser.update_data( + ev.user_id, + ev.bot_id, + cred=skland_cred, + uid=uid, + skd_uid=skd_uid, + token=skland_token, + ) + if not push_data: + await ArknightsPush.insert_push_data(ev.bot_id, uid=uid, skd_uid=skd_uid) + return await bot.send("登录成功!") diff --git a/ArknightsUID/arknightsuid_login/constant.py b/ArknightsUID/arknightsuid_login/constant.py new file mode 100644 index 0000000..52501c9 --- /dev/null +++ b/ArknightsUID/arknightsuid_login/constant.py @@ -0,0 +1,9 @@ +ARK_ACCOUNT_SERVER = "https://as.hypergryph.com/" +SKLAND_WEB_API = "https://web-api.skland.com/" +ZONAI_SKLAND_URL = "https://zonai.skland.com/" + +ARK_LOGIN_SEND_PHONE_CODE = ARK_ACCOUNT_SERVER + "general/v1/send_phone_code" +ARK_TOKEN_BY_PHONE_CODE = ARK_ACCOUNT_SERVER + "user/auth/v2/token_by_phone_code" +ARK_USER_OAUTH2_V2_GRANT = ARK_ACCOUNT_SERVER + "user/oauth2/v2/grant" +ARK_ACCONUT_INFO_HG = SKLAND_WEB_API + "account/info/hg" +GENERATE_CRED_BY_CODE = ZONAI_SKLAND_URL + "web/v1/user/auth/generate_cred_by_code" diff --git a/ArknightsUID/arknightsuid_login/login.py b/ArknightsUID/arknightsuid_login/login.py new file mode 100644 index 0000000..2795353 --- /dev/null +++ b/ArknightsUID/arknightsuid_login/login.py @@ -0,0 +1,227 @@ +import re +from typing import ClassVar, Dict, TypeVar, Union +import httpx +from datetime import datetime + +from msgspec import UnsetType, convert +from msgspec import json as mscjson + +from gsuid_core.utils.plugins_config.gs_config import core_plugins_config +from ArknightsUID.arknightsuid_login.model import ( + AccountInfoHGRequest, + AccountInfoHGResponse, + FuckMysGeetestPassResponse, + GeneralGeetestData, + GeneralV1SendPhoneCodeRequest, + GeneralV1SendPhoneCodeResponse, + Oauth2V2GrantRequest, + Oauth2V2GrantResponse, + UserAuthV2TokenByPhoneCodeRequest, + UserAuthV2TokenByPhoneCodeResponse, + ZonaiSklandWebUserGenerateCredByCodeRequest, + ZonaiSklandWebUserGenerateCredByCodeResponse, +) +from ArknightsUID.arknightsuid_login.constant import ( + ARK_LOGIN_SEND_PHONE_CODE, + ARK_TOKEN_BY_PHONE_CODE, + ARK_ACCONUT_INFO_HG, + ARK_USER_OAUTH2_V2_GRANT, + GENERATE_CRED_BY_CODE, +) + +T1 = TypeVar("T1") +T2 = TypeVar("T2") + + +class SklandLoginError(Exception): + def __init__(self, url: str, message: str): + self.url = url + self.message = message + + def __str__(self): + return self.url + " " + self.message + + +def transUnset(v: Union[T1, UnsetType], d: T2 = None) -> Union[T1, T2]: + return v if not isinstance(v, UnsetType) else d + + +class SklandLogin: + _HEADER: ClassVar[Dict[str, str]] = { + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", # noqa: E501 + "content-type": "application/json;charset=UTF-8", + "origin": "https://www.skland.com", + "referer": "https://www.skland.com", + } + + def __init__(self, phone: str, geetest_token: Union[str, None] = None): + self.phone = phone + self.client = httpx.Client( + headers=self._HEADER, + verify=False, + ) + self.geetest_token = geetest_token + + def send_phone_code( + self, + override_geetest: Union[GeneralGeetestData, None] = None, + ): + if override_geetest: + data = GeneralV1SendPhoneCodeRequest( + phone=self.phone, + type=2, + captcha=override_geetest, + ) + else: + data = GeneralV1SendPhoneCodeRequest( + phone=self.phone, + type=1, + ) + response = self.client.post( + ARK_LOGIN_SEND_PHONE_CODE, + json=mscjson.decode(mscjson.encode(data)), + ) + response.raise_for_status() + result = convert(response.json(), GeneralV1SendPhoneCodeResponse) + if result.status == 1: + captcha_data = transUnset(result.data) + assert captcha_data is not None + _pass_api = core_plugins_config.get_config("_pass_API").data + if _pass_api is None: + raise SklandLoginError( + ARK_LOGIN_SEND_PHONE_CODE, + "config _pass_API is None", + ) + return_data = httpx.post( + f"{_pass_api}>={captcha_data.captcha.gt}&challenge={captcha_data.captcha.challenge}", + verify=False, + timeout=100, + ) + geetest_pass_data = convert(return_data.json(), FuckMysGeetestPassResponse) + if geetest_pass_data.code != 0: + raise SklandLoginError( + "_pass_API", + geetest_pass_data.info, + ) + self.send_phone_code( + override_geetest=GeneralGeetestData( + geetest_challenge=geetest_pass_data.data.challenge, + geetest_validate=geetest_pass_data.data.validate, + geetest_seccode=f"{geetest_pass_data.data.validate}|jordan", + ) + ) + elif result.status != 0: + return result.msg + + def token_by_phone_code(self, code: str): + data = UserAuthV2TokenByPhoneCodeRequest( + phone=self.phone, + code=code, + ) + response = self.client.post( + ARK_TOKEN_BY_PHONE_CODE, + json=mscjson.decode(mscjson.encode(data)), + ) + response.raise_for_status() + result = convert(response.json(), UserAuthV2TokenByPhoneCodeResponse) + status = result.status + if status == 101: + msg = transUnset(result.msg) + assert msg is not None + raise SklandLoginError(ARK_TOKEN_BY_PHONE_CODE, msg) + data = transUnset(result.data) + assert data is not None + self.token = data.token + + def post_account_info_hg(self): + if self.token is None: + raise SklandLoginError(ARK_ACCONUT_INFO_HG, "token not set!") + data = AccountInfoHGRequest( + content=self.token, + ) + response = self.client.post( + ARK_ACCONUT_INFO_HG, + json=mscjson.decode(mscjson.encode(data)), + ) + set_cookie = response.headers.get("set-cookie") + matches = re.findall(r"ACCOUNT=([^;]+)", set_cookie) + account_cookie = matches[0] + self.client = httpx.Client( + headers=self._HEADER, + verify=False, + cookies={"ACCOUNT": account_cookie}, + ) + response.raise_for_status() + result = convert(response.json(), AccountInfoHGResponse) + if result.code != 0: + raise SklandLoginError(ARK_ACCONUT_INFO_HG, result.msg) + self.get_account_info_hg() + + def get_account_info_hg(self): + if self.token is None: + raise SklandLoginError(ARK_ACCONUT_INFO_HG, "token not set!") + data = AccountInfoHGRequest( + content=self.token, + ) + response = self.client.post( + ARK_ACCONUT_INFO_HG, + json=mscjson.decode(mscjson.encode(data)), + ) + response.raise_for_status() + result = convert(response.json(), AccountInfoHGResponse) + if result.code != 0: + raise SklandLoginError(ARK_ACCONUT_INFO_HG, result.msg) + + def user_oauth2_v2_grant(self): + data = Oauth2V2GrantRequest( + token=self.token, + appCode="4ca99fa6b56cc2ba", + type=0, + ) + response = self.client.post( + ARK_USER_OAUTH2_V2_GRANT, + json=mscjson.decode(mscjson.encode(data)), + ) + response.raise_for_status() + result = convert(response.json(), Oauth2V2GrantResponse) + status = result.status + if status != 0: + raise SklandLoginError(ARK_USER_OAUTH2_V2_GRANT, result.msg) + result_data = transUnset(result.data) + if not result_data: + raise SklandLoginError(ARK_USER_OAUTH2_V2_GRANT, "result.data is None") + uid = transUnset(result_data.uid) + if not uid: + raise SklandLoginError(ARK_USER_OAUTH2_V2_GRANT, "result.data.uid is None") + code = transUnset(result_data.code) + if not code: + raise SklandLoginError(ARK_USER_OAUTH2_V2_GRANT, "result.data.code is None") + self.uid = uid + self.code = code + + def generate_cred_by_code(self): + data = ZonaiSklandWebUserGenerateCredByCodeRequest( + kind=1, + code=self.code, + ) + self.client.headers["platform"] = "3" + self.client.headers["vname"] = "1.0.0" + self.client.headers["timestamp"] = str(int(datetime.now().timestamp())) + self.client.headers["did"] = ( + "BFv9p5AJUOqJbpXDqjezZOf7TxJqppEd7iXxqqq7e0Z+Y0FUX/8kpnXqYe1UByxrjWKh43Webhic8ZEoJvquOgg==" + ) + response = self.client.post( + GENERATE_CRED_BY_CODE, + json=mscjson.decode(mscjson.encode(data)), + ) + response.raise_for_status() + result = convert(response.json(), ZonaiSklandWebUserGenerateCredByCodeResponse) + if result.code != 0: + raise SklandLoginError( + GENERATE_CRED_BY_CODE, + result.message, + ) + self.skland_cred = result.data.cred + self.skland_token = result.data.token + self.skland_userId = result.data.userId + return (self.skland_cred, self.skland_token, self.skland_userId) diff --git a/ArknightsUID/arknightsuid_login/model.py b/ArknightsUID/arknightsuid_login/model.py new file mode 100644 index 0000000..c7150c3 --- /dev/null +++ b/ArknightsUID/arknightsuid_login/model.py @@ -0,0 +1,112 @@ +from typing import Dict, Union +from msgspec import Struct, field, UnsetType, UNSET + + +class GeneralGeetestData(Struct): + geetest_challenge: str + geetest_seccode: str + geetest_validate: str + + +class GeneralV1SendPhoneCodeRequest(Struct): + phone: str + type: int + captcha: Union[GeneralGeetestData, UnsetType] = field(default=UNSET) + + +class CaptchaItemModel(Struct): + success: int + gt: str + challenge: str + new_captcha: bool + + +class GeneralV1SendPhoneCodeData(Struct): + captcha: CaptchaItemModel + + +class GeneralV1SendPhoneCodeResponse(Struct): + status: int + msg: Union[str, UnsetType] = field(default=UNSET) + type: Union[str, UnsetType] = field(default=UNSET) + data: Union[GeneralV1SendPhoneCodeData, UnsetType] = field(default=UNSET) + + +class TokenData(Struct): + token: str + + +class UserAuthV2TokenByPhoneCodeRequest(Struct): + code: str + phone: str + + +class UserAuthV2TokenByPhoneCodeResponse(Struct): + status: int + msg: Union[str, UnsetType] = field(default=UNSET) + data: Union[TokenData, UnsetType] = field(default=UNSET) + type: Union[str, UnsetType] = field(default=UNSET) + + +class AccountInfoHGRequest(Struct): + content: str + + +class AccountInfoHGResponse(Struct): + code: int + msg: str + data: Dict + + +class GeetestData(Struct): + gt: str + challenge: str + validate: str + type: str + + +class FuckMysGeetestPassResponse(Struct): + code: int + info: str + data: GeetestData + times: int + + +class Oauth2V2GrantRequest(Struct): + appCode: str + token: str + type: int + + +class Oauth2V2CodeDataItemResponse(Struct): + hgId: Union[str, UnsetType] = field(default=UNSET) + token: Union[str, UnsetType] = field(default=UNSET) + code: Union[str, UnsetType] = field(default=UNSET) + uid: Union[str, UnsetType] = field(default=UNSET) + delete_commit_ts: Union[int, UnsetType] = field(default=UNSET) + delete_request_ts: Union[int, UnsetType] = field(default=UNSET) + + +class Oauth2V2GrantResponse(Struct): + status: int + msg: str + data: Union[Oauth2V2CodeDataItemResponse, UnsetType] = field(default=UNSET) + type: Union[str, UnsetType] = field(default=UNSET) + + +class ZonaiSklandWebUserGenerateCredByCodeRequest(Struct): + kind: int + code: str + + +class ZonaiSklandWebUserData(Struct): + cred: str + userId: str + token: str + + +class ZonaiSklandWebUserGenerateCredByCodeResponse(Struct): + code: int + message: str + timestamp: str + data: ZonaiSklandWebUserData