update core bypass logic

This commit is contained in:
lulu666lulu 2023-05-19 23:46:52 +08:00
parent 92873e7de4
commit c7a7695571
2 changed files with 48 additions and 41 deletions

View File

@ -6,7 +6,10 @@ from __future__ import annotations
import copy import copy
import time import time
import uuid import uuid
import types
import random import random
import inspect
import traceback
from abc import abstractmethod from abc import abstractmethod
from string import digits, ascii_letters from string import digits, ascii_letters
from typing import Any, Dict, List, Union, Literal, Optional, cast from typing import Any, Dict, List, Union, Literal, Optional, cast
@ -87,6 +90,7 @@ class BaseMysApi:
MAPI = _API MAPI = _API
is_sr = False is_sr = False
RECOGNIZE_SERVER = RECOGNIZE_SERVER RECOGNIZE_SERVER = RECOGNIZE_SERVER
chs = {}
@abstractmethod @abstractmethod
async def _upass(self, header: Dict): async def _upass(self, header: Dict):
@ -196,6 +200,9 @@ class BaseMysApi:
async with ClientSession( async with ClientSession(
connector=TCPConnector(verify_ssl=ssl_verify) connector=TCPConnector(verify_ssl=ssl_verify)
) as client: ) as client:
if "Cookie" in header:
if header["Cookie"] in self.chs:
header["x-rpc-challenge"] = self.chs.pop(header["Cookie"])
async with client.request( async with client.request(
method, method,
url=url, url=url,
@ -207,6 +214,7 @@ class BaseMysApi:
) as resp: ) as resp:
try: try:
raw_data = await resp.json() raw_data = await resp.json()
print(raw_data)
except ContentTypeError: except ContentTypeError:
_raw_data = await resp.text() _raw_data = await resp.text()
raw_data = {'retcode': -999, 'data': _raw_data} raw_data = {'retcode': -999, 'data': _raw_data}
@ -218,8 +226,44 @@ class BaseMysApi:
else: else:
retcode = 0 retcode = 0
if retcode == 1034: if retcode == 1034:
await self._upass(header) try:
return retcode ch = await self._upass(header)
if "Cookie" in header:
self.chs[header["Cookie"]] = ch
curframe = inspect.currentframe()
assert curframe
calframe = curframe.f_back
assert calframe
caller_name = calframe.f_code.co_name
caller_args = inspect.getargvalues(calframe).locals
caller_args2 = inspect.getargvalues(calframe).args
caller_args3 = {
k: caller_args.get(k, None) for k in caller_args2
}
print(caller_args2)
if caller_name != "_mys_req_get":
print(caller_name, caller_args)
return await types.FunctionType(
calframe.f_code, globals()
)(**caller_args3)
else:
curframe = calframe
calframe = curframe.f_back
assert calframe
caller_name = calframe.f_code.co_name
caller_args = inspect.getargvalues(calframe).locals
caller_args2 = inspect.getargvalues(calframe).args
caller_args3 = {
k: caller_args.get(k, None)
for k in caller_args2
}
return await types.FunctionType(
calframe.f_code, globals()
)(**caller_args3)
except Exception as e:
logger.error(e)
traceback.print_exc()
return -999
elif retcode != 0: elif retcode != 0:
return retcode return retcode
return raw_data return raw_data
@ -261,8 +305,9 @@ class MysApi(BaseMysApi):
if vl: if vl:
await self.get_header_and_vl(header, ch, vl) await self.get_header_and_vl(header, ch, vl)
return ch
else: else:
return True return await self._upass(header, is_bbs)
async def get_upass_link(self, header: Dict) -> Union[int, Dict]: async def get_upass_link(self, header: Dict) -> Union[int, Dict]:
header['DS'] = get_ds_token('is_high=false') header['DS'] = get_ds_token('is_high=false')

View File

@ -13,44 +13,6 @@ class _MysApi(MysApi):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
async def _pass(self, gt: str, ch: str, header: Dict):
# 警告使用该服务例如某RR等需要注意风险问题
# 本项目不以任何形式提供相关接口
# 代码来源GITHUB项目MIT开源
_pass_api = gsconfig.get_config('_pass_API').data
if _pass_api:
data = await self._mys_request(
url=f'{_pass_api}&gt={gt}&challenge={ch}',
method='GET',
header=header,
)
if isinstance(data, int):
return None, None
else:
validate = data['data']['validate']
ch = data['data']['challenge']
else:
validate = None
return validate, ch
async def _upass(self, header: Dict, is_bbs: bool = False):
if is_bbs:
raw_data = await self.get_bbs_upass_link(header)
else:
raw_data = await self.get_upass_link(header)
if isinstance(raw_data, int):
return False
gt = raw_data['data']['gt']
ch = raw_data['data']['challenge']
vl, ch = await self._pass(gt, ch, header)
if vl:
await self.get_header_and_vl(header, ch, vl)
else:
return True
async def get_ck( async def get_ck(
self, uid: str, mode: Literal['OWNER', 'RANDOM'] = 'RANDOM' self, uid: str, mode: Literal['OWNER', 'RANDOM'] = 'RANDOM'
) -> Optional[str]: ) -> Optional[str]: