🐛 修复无限循环的问题

This commit is contained in:
Wuyi无疑 2023-05-20 21:48:17 +08:00
parent 4c648c005f
commit cfc0944c52

View File

@ -6,13 +6,10 @@ from __future__ import annotations
import copy import copy
import time import time
import uuid import uuid
import types
import random import random
import inspect
import traceback
from abc import abstractmethod from abc import abstractmethod
from string import digits, ascii_letters from string import digits, ascii_letters
from typing import Any, Dict, List, Union, Literal, Optional, cast from typing import Any, Dict, List, Tuple, Union, Literal, Optional, cast
from aiohttp import TCPConnector, ClientSession, ContentTypeError from aiohttp import TCPConnector, ClientSession, ContentTypeError
@ -93,11 +90,13 @@ class BaseMysApi:
chs = {} chs = {}
@abstractmethod @abstractmethod
async def _upass(self, header: Dict): async def _upass(self, header: Dict) -> str:
... ...
@abstractmethod @abstractmethod
async def _pass(self, gt: str, ch: str, header: Dict): async def _pass(
self, gt: str, ch: str, header: Dict
) -> Tuple[Optional[str], Optional[str]]:
... ...
@abstractmethod @abstractmethod
@ -200,9 +199,70 @@ class BaseMysApi:
async with ClientSession( async with ClientSession(
connector=TCPConnector(verify_ssl=ssl_verify) connector=TCPConnector(verify_ssl=ssl_verify)
) as client: ) as client:
if "Cookie" in header: raw_data = {}
if header["Cookie"] in self.chs: for _ in range(2):
header["x-rpc-challenge"] = self.chs.pop(header["Cookie"]) if 'Cookie' in header and header['Cookie'] in self.chs:
# header['x-rpc-challenge']=self.chs.pop(header['Cookie'])
header['x-rpc-challenge_game'] = '6' if self.is_sr else '2'
header['x-rpc-page'] = (
'3.1.3_#/rpg' if self.is_sr else '3.1.3_#/ys'
)
async with client.request(
method,
url=url,
headers=header,
params=params,
json=data,
proxy=self.proxy_url if use_proxy else None,
timeout=300,
) as resp:
try:
raw_data = await resp.json()
except ContentTypeError:
_raw_data = await resp.text()
raw_data = {'retcode': -999, 'data': _raw_data}
logger.debug(raw_data)
# 判断retcode
if 'retcode' in raw_data:
retcode: int = raw_data['retcode']
elif 'code' in raw_data:
retcode: int = raw_data['code']
else:
retcode = 0
# 针对1034做特殊处理
if retcode == 1034:
ch = await self._upass(header)
self.chs[header['Cookie']] = ch
elif retcode != 0:
return retcode
else:
return raw_data
else:
return -999
'''
async def _mys_request(
self,
url: str,
method: Literal['GET', 'POST'] = 'GET',
header: Dict[str, Any] = _HEADER,
params: Optional[Dict[str, Any]] = None,
data: Optional[Dict[str, Any]] = None,
use_proxy: Optional[bool] = False,
) -> Union[Dict, int]:
import types
import inspect
async with ClientSession(
connector=TCPConnector(verify_ssl=ssl_verify)
) as client:
if 'Cookie' in header:
if header['Cookie'] in self.chs:
header['x-rpc-challenge'] = self.chs.pop(header["Cookie"])
async with client.request( async with client.request(
method, method,
url=url, url=url,
@ -214,33 +274,46 @@ class BaseMysApi:
) as resp: ) as resp:
try: try:
raw_data = await resp.json() raw_data = await resp.json()
print(raw_data)
except ContentTypeError: except ContentTypeError:
_raw_data = await resp.text() _raw_data = await resp.text()
raw_data = {'retcode': -999, 'data': _raw_data} raw_data = {'retcode': -999, 'data': _raw_data}
logger.debug(raw_data) logger.debug(raw_data)
# 判断retcode
if 'retcode' in raw_data: if 'retcode' in raw_data:
retcode: int = raw_data['retcode'] retcode: int = raw_data['retcode']
elif 'code' in raw_data: elif 'code' in raw_data:
retcode: int = raw_data['code'] retcode: int = raw_data['code']
else: else:
retcode = 0 retcode = 0
# 针对1034做特殊处理
if retcode == 1034: if retcode == 1034:
try: try:
# 获取ch
ch = await self._upass(header) ch = await self._upass(header)
# 记录ck -> ch的对照表
if "Cookie" in header: if "Cookie" in header:
self.chs[header["Cookie"]] = ch self.chs[header["Cookie"]] = ch
# 获取当前的栈帧
curframe = inspect.currentframe() curframe = inspect.currentframe()
# 确保栈帧存在
assert curframe assert curframe
# 获取调用者的栈帧
calframe = curframe.f_back calframe = curframe.f_back
# 确保调用者的栈帧存在
assert calframe assert calframe
# 获取调用者的函数名
caller_name = calframe.f_code.co_name caller_name = calframe.f_code.co_name
# 获取调用者函数的局部变量字典
caller_args = inspect.getargvalues(calframe).locals caller_args = inspect.getargvalues(calframe).locals
# 获取调用者的参数列表
caller_args2 = inspect.getargvalues(calframe).args caller_args2 = inspect.getargvalues(calframe).args
# # 生成一个字典键为调用者的参数名值为对应的局部变量值如果不存在则为None
caller_args3 = { caller_args3 = {
k: caller_args.get(k, None) for k in caller_args2 k: caller_args.get(k, None) for k in caller_args2
} }
if caller_name != "_mys_req_get": if caller_name != '_mys_req_get':
return await types.FunctionType( return await types.FunctionType(
calframe.f_code, globals() calframe.f_code, globals()
)(**caller_args3) )(**caller_args3)
@ -265,10 +338,13 @@ class BaseMysApi:
elif retcode != 0: elif retcode != 0:
return retcode return retcode
return raw_data return raw_data
'''
class MysApi(BaseMysApi): class MysApi(BaseMysApi):
async def _pass(self, gt: str, ch: str, header: Dict): async def _pass(
self, gt: str, ch: str, header: Dict
) -> Tuple[Optional[str], Optional[str]]:
# 警告使用该服务例如某RR等需要注意风险问题 # 警告使用该服务例如某RR等需要注意风险问题
# 本项目不以任何形式提供相关接口 # 本项目不以任何形式提供相关接口
# 代码来源GITHUB项目MIT开源 # 代码来源GITHUB项目MIT开源
@ -289,13 +365,14 @@ class MysApi(BaseMysApi):
return validate, ch return validate, ch
async def _upass(self, header: Dict, is_bbs: bool = False): async def _upass(self, header: Dict, is_bbs: bool = False) -> str:
logger.info('[upass] 进入处理...')
if is_bbs: if is_bbs:
raw_data = await self.get_bbs_upass_link(header) raw_data = await self.get_bbs_upass_link(header)
else: else:
raw_data = await self.get_upass_link(header) raw_data = await self.get_upass_link(header)
if isinstance(raw_data, int): if isinstance(raw_data, int):
return False return ''
gt = raw_data['data']['gt'] gt = raw_data['data']['gt']
ch = raw_data['data']['challenge'] ch = raw_data['data']['challenge']
@ -303,9 +380,13 @@ class MysApi(BaseMysApi):
if vl: if vl:
await self.get_header_and_vl(header, ch, vl) await self.get_header_and_vl(header, ch, vl)
return ch if ch:
logger.info(f'[upass] 获取ch -> {ch}')
return ch
else:
return ''
else: else:
return await self._upass(header, is_bbs) return ''
async def get_upass_link(self, header: Dict) -> Union[int, Dict]: async def get_upass_link(self, header: Dict) -> Union[int, Dict]:
header['DS'] = get_ds_token('is_high=false') header['DS'] = get_ds_token('is_high=false')