mirror of
https://github.com/Genshin-bots/gsuid_core.git
synced 2025-05-12 06:55:49 +08:00
🎨 完善封装方法
This commit is contained in:
parent
6f04bc2180
commit
8ac55ceb5e
@ -1,4 +1,3 @@
|
|||||||
import random
|
|
||||||
import asyncio
|
import asyncio
|
||||||
from typing import Dict, List, Union, Literal, Optional
|
from typing import Dict, List, Union, Literal, Optional
|
||||||
|
|
||||||
@ -8,15 +7,10 @@ from msgspec import json as msgjson
|
|||||||
from gsuid_core.logger import logger
|
from gsuid_core.logger import logger
|
||||||
from gsuid_core.gs_logger import GsLogger
|
from gsuid_core.gs_logger import GsLogger
|
||||||
from gsuid_core.message_models import Button
|
from gsuid_core.message_models import Button
|
||||||
from gsuid_core.segment import MessageSegment
|
|
||||||
from gsuid_core.utils.image.convert import text2pic
|
|
||||||
from gsuid_core.models import Event, Message, MessageSend
|
from gsuid_core.models import Event, Message, MessageSend
|
||||||
from gsuid_core.utils.plugins_config.gs_config import core_plugins_config
|
from gsuid_core.utils.plugins_config.gs_config import core_plugins_config
|
||||||
|
from gsuid_core.segment import MessageSegment, to_markdown, convert_message
|
||||||
|
|
||||||
R_enabled = core_plugins_config.get_config('AutoAddRandomText').data
|
|
||||||
R_text = core_plugins_config.get_config('RandomText').data
|
|
||||||
is_text2pic = core_plugins_config.get_config('AutoTextToPic').data
|
|
||||||
text2pic_limit = core_plugins_config.get_config('TextToPicThreshold').data
|
|
||||||
is_specific_msg_id = core_plugins_config.get_config('EnableSpecificMsgId').data
|
is_specific_msg_id = core_plugins_config.get_config('EnableSpecificMsgId').data
|
||||||
specific_msg_id = core_plugins_config.get_config('SpecificMsgId').data
|
specific_msg_id = core_plugins_config.get_config('SpecificMsgId').data
|
||||||
|
|
||||||
@ -40,43 +34,11 @@ class _Bot:
|
|||||||
at_sender: bool = False,
|
at_sender: bool = False,
|
||||||
sender_id: str = '',
|
sender_id: str = '',
|
||||||
):
|
):
|
||||||
if isinstance(message, Message):
|
_message = await convert_message(message)
|
||||||
message = [message]
|
|
||||||
elif isinstance(message, str):
|
|
||||||
if message.startswith('base64://'):
|
|
||||||
message = [MessageSegment.image(message)]
|
|
||||||
else:
|
|
||||||
message = [MessageSegment.text(message)]
|
|
||||||
elif isinstance(message, bytes):
|
|
||||||
message = [MessageSegment.image(message)]
|
|
||||||
elif isinstance(message, List):
|
|
||||||
if all(isinstance(x, str) for x in message):
|
|
||||||
message = [MessageSegment.node(message)]
|
|
||||||
else:
|
|
||||||
message = [message]
|
|
||||||
|
|
||||||
_message: List[Message] = message # type: ignore
|
|
||||||
|
|
||||||
if at_sender and sender_id:
|
if at_sender and sender_id:
|
||||||
_message.append(MessageSegment.at(sender_id))
|
_message.append(MessageSegment.at(sender_id))
|
||||||
|
|
||||||
if R_enabled:
|
|
||||||
result = ''.join(
|
|
||||||
random.choice(R_text)
|
|
||||||
for _ in range(random.randint(1, len(R_text)))
|
|
||||||
)
|
|
||||||
_message.append(MessageSegment.text(result))
|
|
||||||
|
|
||||||
if is_text2pic:
|
|
||||||
if (
|
|
||||||
len(_message) == 1
|
|
||||||
and _message[0].type == 'text'
|
|
||||||
and isinstance(_message[0].data, str)
|
|
||||||
and len(_message[0].data) >= int(text2pic_limit)
|
|
||||||
):
|
|
||||||
img = await text2pic(_message[0].data)
|
|
||||||
_message = [MessageSegment.image(img)]
|
|
||||||
|
|
||||||
if is_specific_msg_id and not msg_id:
|
if is_specific_msg_id and not msg_id:
|
||||||
msg_id = specific_msg_id
|
msg_id = specific_msg_id
|
||||||
|
|
||||||
@ -130,22 +92,29 @@ class Bot:
|
|||||||
|
|
||||||
async def receive_resp(
|
async def receive_resp(
|
||||||
self,
|
self,
|
||||||
reply_text: Optional[str] = None,
|
reply: Optional[
|
||||||
|
Union[Message, List[Message], List[str], str, bytes]
|
||||||
|
] = None,
|
||||||
option_list: Optional[List[Union[str, Button]]] = None,
|
option_list: Optional[List[Union[str, Button]]] = None,
|
||||||
timeout: float = 60,
|
timeout: float = 60,
|
||||||
) -> Optional[Event]:
|
) -> Optional[Event]:
|
||||||
if option_list:
|
if option_list:
|
||||||
if reply_text is None:
|
if reply is None:
|
||||||
reply_text = '请在60秒内做出选择...'
|
reply = f'请在{timeout}秒内做出选择...'
|
||||||
|
|
||||||
|
_reply = await convert_message(reply)
|
||||||
|
|
||||||
if self.ev.real_bot_id in ['qqguild', 'qqgroup']:
|
if self.ev.real_bot_id in ['qqguild', 'qqgroup']:
|
||||||
|
_reply_str = await to_markdown(_reply)
|
||||||
_buttons: List[Button] = []
|
_buttons: List[Button] = []
|
||||||
for option in option_list:
|
for option in option_list:
|
||||||
if isinstance(option, Button):
|
if isinstance(option, Button):
|
||||||
_buttons.append(option)
|
_buttons.append(option)
|
||||||
else:
|
else:
|
||||||
_buttons.append(Button(option, option, option))
|
_buttons.append(Button(option, option, option))
|
||||||
await self.send(MessageSegment.markdown(reply_text, _buttons))
|
logger.info(_reply_str)
|
||||||
|
logger.info(_buttons)
|
||||||
|
await self.send(MessageSegment.markdown(_reply_str, _buttons))
|
||||||
else:
|
else:
|
||||||
_options: List[str] = []
|
_options: List[str] = []
|
||||||
for option in option_list:
|
for option in option_list:
|
||||||
@ -154,11 +123,11 @@ class Bot:
|
|||||||
else:
|
else:
|
||||||
_options.append(option)
|
_options.append(option)
|
||||||
|
|
||||||
reply_text += '/'.join(_options)
|
_reply.append(MessageSegment.text('/'.join(_options)))
|
||||||
await self.send(reply_text)
|
await self.send(_reply)
|
||||||
|
|
||||||
elif reply_text:
|
elif reply:
|
||||||
await self.send(reply_text)
|
await self.send(reply)
|
||||||
|
|
||||||
return await self.wait_for_key(timeout)
|
return await self.wait_for_key(timeout)
|
||||||
|
|
||||||
|
@ -183,7 +183,7 @@ def main():
|
|||||||
image.save(image_bytes, format='JPEG')
|
image.save(image_bytes, format='JPEG')
|
||||||
image_bytes.seek(0)
|
image_bytes.seek(0)
|
||||||
response = StreamingResponse(image_bytes, media_type='image/png')
|
response = StreamingResponse(image_bytes, media_type='image/png')
|
||||||
background_tasks.add_task(delete_image, path)
|
# background_tasks.add_task(delete_image, path)
|
||||||
return response
|
return response
|
||||||
|
|
||||||
site.mount_app(app)
|
site.mount_app(app)
|
||||||
|
@ -1,8 +1,9 @@
|
|||||||
import uuid
|
import uuid
|
||||||
|
import random
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from base64 import b64encode
|
from base64 import b64encode
|
||||||
from typing import List, Union, Literal
|
from typing import List, Tuple, Union, Literal, Optional
|
||||||
|
|
||||||
import msgspec
|
import msgspec
|
||||||
from PIL import Image
|
from PIL import Image
|
||||||
@ -10,8 +11,13 @@ from PIL import Image
|
|||||||
from gsuid_core.models import Message
|
from gsuid_core.models import Message
|
||||||
from gsuid_core.data_store import image_res
|
from gsuid_core.data_store import image_res
|
||||||
from gsuid_core.message_models import Button
|
from gsuid_core.message_models import Button
|
||||||
|
from gsuid_core.utils.image.convert import text2pic
|
||||||
from gsuid_core.utils.plugins_config.gs_config import core_plugins_config
|
from gsuid_core.utils.plugins_config.gs_config import core_plugins_config
|
||||||
|
|
||||||
|
R_enabled = core_plugins_config.get_config('AutoAddRandomText').data
|
||||||
|
R_text = core_plugins_config.get_config('RandomText').data
|
||||||
|
is_text2pic = core_plugins_config.get_config('AutoTextToPic').data
|
||||||
|
text2pic_limit = core_plugins_config.get_config('TextToPicThreshold').data
|
||||||
enable_pic_srv = core_plugins_config.get_config('EnablePicSrv').data
|
enable_pic_srv = core_plugins_config.get_config('EnablePicSrv').data
|
||||||
pic_srv = core_plugins_config.get_config('PicSrv').data
|
pic_srv = core_plugins_config.get_config('PicSrv').data
|
||||||
|
|
||||||
@ -56,11 +62,20 @@ class MessageSegment:
|
|||||||
return Message(type='text', data=content)
|
return Message(type='text', data=content)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def markdown(content: str, buttons: List[Button]) -> List[Message]:
|
def markdown(
|
||||||
return [
|
content: str, buttons: Optional[List[Button]] = None
|
||||||
Message(type='markdown', data=content),
|
) -> List[Message]:
|
||||||
Message(type='buttons', data=msgspec.to_builtins(buttons)),
|
data = [Message(type='markdown', data=content)]
|
||||||
]
|
if buttons:
|
||||||
|
data.append(
|
||||||
|
Message(type='buttons', data=msgspec.to_builtins(buttons))
|
||||||
|
)
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def image_size(size: Tuple[int, int]) -> Message:
|
||||||
|
return Message(type='image_size', data=size)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def at(user: str) -> Message:
|
def at(user: str) -> Message:
|
||||||
@ -128,3 +143,70 @@ class MessageSegment:
|
|||||||
type: Literal['INFO', 'WARNING', 'ERROR', 'SUCCESS'], content: str
|
type: Literal['INFO', 'WARNING', 'ERROR', 'SUCCESS'], content: str
|
||||||
) -> Message:
|
) -> Message:
|
||||||
return Message(type=f'log_{type}', data=content)
|
return Message(type=f'log_{type}', data=content)
|
||||||
|
|
||||||
|
|
||||||
|
async def convert_message(
|
||||||
|
message: Union[Message, List[Message], List[str], str, bytes]
|
||||||
|
) -> List[Message]:
|
||||||
|
if isinstance(message, Message):
|
||||||
|
message = [message]
|
||||||
|
elif isinstance(message, str):
|
||||||
|
if message.startswith('base64://'):
|
||||||
|
img = Image.open(message)
|
||||||
|
message = [
|
||||||
|
MessageSegment.image(message),
|
||||||
|
MessageSegment.image_size(img.size),
|
||||||
|
]
|
||||||
|
else:
|
||||||
|
message = [MessageSegment.text(message)]
|
||||||
|
elif isinstance(message, bytes):
|
||||||
|
img = Image.open(message)
|
||||||
|
message = [
|
||||||
|
MessageSegment.image(message),
|
||||||
|
MessageSegment.image_size(img.size),
|
||||||
|
]
|
||||||
|
elif isinstance(message, List):
|
||||||
|
if all(isinstance(x, str) for x in message):
|
||||||
|
message = [MessageSegment.node(message)]
|
||||||
|
else:
|
||||||
|
message = [message]
|
||||||
|
|
||||||
|
_message: List[Message] = message # type: ignore
|
||||||
|
|
||||||
|
if R_enabled:
|
||||||
|
result = ''.join(
|
||||||
|
random.choice(R_text)
|
||||||
|
for _ in range(random.randint(1, len(R_text)))
|
||||||
|
)
|
||||||
|
_message.append(MessageSegment.text(result))
|
||||||
|
|
||||||
|
if is_text2pic:
|
||||||
|
if (
|
||||||
|
len(_message) == 1
|
||||||
|
and _message[0].type == 'text'
|
||||||
|
and isinstance(_message[0].data, str)
|
||||||
|
and len(_message[0].data) >= int(text2pic_limit)
|
||||||
|
):
|
||||||
|
img = await text2pic(_message[0].data)
|
||||||
|
_message = [MessageSegment.image(img)]
|
||||||
|
|
||||||
|
return _message
|
||||||
|
|
||||||
|
|
||||||
|
async def to_markdown(message: List[Message]) -> str:
|
||||||
|
_markdown_list = []
|
||||||
|
url = None
|
||||||
|
size = None
|
||||||
|
for m in message:
|
||||||
|
if m.type == 'image':
|
||||||
|
url = m.data
|
||||||
|
elif m.type == 'image_size':
|
||||||
|
size = m.data
|
||||||
|
elif m.type == 'text':
|
||||||
|
_markdown_list.append(m.data)
|
||||||
|
|
||||||
|
if url is not None and size is not None:
|
||||||
|
_markdown_list.append(f'![test #{size[0]}px #{size[1]}px]({url})')
|
||||||
|
|
||||||
|
_markdown = '\n'.join(_markdown_list)
|
||||||
|
return _markdown
|
||||||
|
Loading…
x
Reference in New Issue
Block a user