python_meme_bot/utils.py (view raw)
1from io import BytesIO
2import logging
3
4from telegram import Chat, InlineKeyboardButton, InlineKeyboardMarkup, Message, Update, User
5from telegram.ext import ContextTypes
6from telegram.error import TelegramError
7from PIL import Image
8
9from python_meme_bot.api import get_random_image
10
11def format_author(user: User):
12 if user.username is not None:
13 return user.full_name + f" ({user.username})"
14 return user.full_name
15
16def format_chat(chat: Chat):
17 return chat.title + ("" if chat.username is None else f" ({chat.username})")
18
19def _get_lewd(context: ContextTypes.DEFAULT_TYPE):
20 try:
21 return context.chat_data["lewd"]
22 except KeyError:
23 return False
24
25def _get_author(message: Message):
26 origin = message.forward_origin
27
28 if origin is None: # message was not forwarded
29 return format_author(message.from_user)
30
31 try:
32 return format_author(origin['sender_user']) # MessageOriginUser
33 except KeyError:
34 pass
35
36 try:
37 return origin['sender_user_name'] # MessageOriginHiddenUser
38 except KeyError:
39 pass
40
41 try:
42 return format_chat(origin['sender_chat']) # MessageOriginChat
43 except KeyError:
44 pass
45 try:
46 return format_chat(origin['chat']) # MessageOriginChannel
47 except KeyError:
48 pass
49
50 logging.warn("Message was forwarded but I couldn't detect the original author.")
51 return format_author(message.from_user)
52
53async def get_message_content(message: Message, fallback: str = ""):
54 if message is None:
55 return None, fallback, None
56
57 image = None
58 if len(message.photo) > 0:
59 p = message.photo[-1]
60 i = await p.get_file()
61 d = await i.download_as_bytearray()
62 image = Image.open(BytesIO(d))
63
64 content = ""
65 if message.text is not None:
66 content = message.text.strip()
67 elif message.caption is not None:
68 content = message.caption.strip()
69
70 lines = content.split("\n")
71 r = lines[0].split(" ")
72
73 try:
74 if r[0][0] == '/':
75 r.pop(0)
76 except IndexError:
77 pass
78
79 lines[0] = " ".join(r)
80 content = "\n".join(lines)
81
82 return image, content, _get_author(message)
83
84def get_image(context: ContextTypes.DEFAULT_TYPE):
85 if context is not None:
86 image, url = get_random_image(_get_lewd(context))
87
88 if image is None:
89 logging.warning("Getting Image failed")
90 raise TelegramError("bad image")
91
92 markup = InlineKeyboardMarkup([[InlineKeyboardButton(text="Sauce 🍝", url=url)]])
93 return image, markup
94
95async def get_all(update: Update, check_fn, context: ContextTypes.DEFAULT_TYPE):
96 image_reply, text_reply, author_reply = await get_message_content(update.message.reply_to_message)
97 image_content, text_content, author_content = await get_message_content(update.message)
98
99 info_struct = {
100 "reply": {
101 "author": author_reply,
102 "text": text_reply,
103 "image": image_reply
104 },
105 "content": {
106 "author": author_content,
107 "text": text_content,
108 "image": image_content
109 }
110 }
111
112 logging.info(f"User {update.message.from_user.full_name}{f' (@{update.message.from_user.username})' if update.message.from_user.username is not None else ''} typed: {str(update.message.text)}")
113
114 content = check_fn(info_struct)
115
116 if content is None:
117 return None, None, None
118
119 markup = ""
120 image = None
121
122 if image_reply is not None:
123 image = image_reply
124
125 if image_content is not None:
126 image = image_content
127
128 if image is None:
129 image, markup = get_image(context)
130
131 return content, image, markup