main.py (view raw)
1from PIL import Image
2from Api import get_random_image, rating_normal, rating_lewd
3from Effects import tt_bt_effect, splash_effect
4import Constants as C
5from Constants import get_localized_string as l
6
7from dotenv import load_dotenv
8load_dotenv()
9import os, logging
10logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
11from io import BytesIO
12
13from telegram.error import TelegramError, BadRequest
14from telegram.ext import Updater, CallbackContext, CommandHandler, MessageHandler, Filters, PicklePersistence
15from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
16
17lang = 'us'
18
19def _get_args(context):
20 logging.info(context.args)
21 return ' '.join(context.args)
22
23def _img_to_bio(image):
24 bio = BytesIO()
25
26 if image.mode in ("RGBA", "P"):
27 image = image.convert("RGB")
28
29 image.save(bio, 'JPEG')
30 bio.seek(0)
31 return bio
32
33def _ttbt_general(context, text, image=None):
34 if text.strip() == "":
35 return None, l("no_caption", lang)
36
37 markup = ""
38 if image is None:
39 image, markup = _get_image(context, bio=False)
40 image = _img_to_bio(tt_bt_effect(text, image))
41 return image, markup
42
43def _get_message_content(message):
44 image = None
45 if len(message.photo) > 0:
46 image = message.photo[-1].get_file()
47 image = Image.open(BytesIO(image.download_as_bytearray()))
48
49 content = ""
50 if message.text is not None:
51 content = message.text.strip()
52 elif message.caption is not None:
53 content = message.caption.strip()
54
55 lines = content.split("\n")
56 r = lines[0].split(" ")
57
58 try:
59 if r[0][0] == '/':
60 r.pop(0)
61 except IndexError:
62 pass
63
64 lines[0] = " ".join(r)
65 return image, "\n".join(lines)
66
67def _get_reply(message, fallback=""):
68
69 if message is None:
70 return None, fallback
71
72 image, content = _get_message_content(message)
73
74 return image, content
75
76def _get_lewd(context):
77 try:
78 return context.chat_data["lewd"]
79 except KeyError:
80 return False
81
82def _get_image(context, tag="", bio=True):
83 if context is not None:
84 if _get_lewd(context):
85 image, url = get_random_image(rating_lewd, tag)
86 else:
87 image, url = get_random_image(rating_normal, tag)
88
89 if image is None:
90 logging.warning("Getting Image failed")
91 raise TelegramError("bad image")
92
93 markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
94
95 if bio:
96 return _img_to_bio(image), markup
97 return image, markup
98
99def _get_all(update, check_fn, context):
100 author = _get_author(update.message)
101 image_reply, reply = _get_reply(update.message.reply_to_message)
102
103 image_content, content = _get_message_content(update.message)
104 logging.info(f"User {update.message.from_user.username} typed: {str(update.message.text)}")
105
106 content = check_fn(author, reply, content)
107
108 if content is None:
109 return None, None, None
110
111 markup = ""
112 image = None
113 if image_reply is not None:
114 image = image_reply
115
116 if image_content is not None:
117 image = image_content
118
119 if image is None:
120 image, markup = _get_image(context, bio=False)
121
122 return content, image, markup
123
124def start(update: Update, context: CallbackContext):
125 context.bot.send_message(chat_id=update.effective_chat.id, text=l("welcome", lang))
126
127def set_lewd(update: Update, context: CallbackContext):
128 try:
129 output = False if context.chat_data["lewd"] else True
130 except KeyError:
131 output = True
132
133 context.chat_data['lewd'] = output
134 message = l("lewd_toggle", lang).format(l("enabled", lang) if output else l("disabled", lang))
135
136 context.bot.send_message(chat_id=update.effective_chat.id, text=message)
137
138def pic(update: Update, context: CallbackContext):
139 try:
140 tag = " " + context.args[0]
141 except IndexError:
142 tag = ""
143
144 image, markup = _get_image(context, tag)
145 update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
146
147def raw(update: Update, context: CallbackContext):
148 tag = ""
149 try:
150 tag += " " + context.args[0]
151 tag += " " + context.args[1]
152 except IndexError:
153 pass
154
155 image, url = get_random_image("", tag)
156
157 if image is None:
158 logging.warning("Getting Image failed")
159 raise TelegramError("bad image")
160
161 image = _img_to_bio(image)
162 markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
163
164 update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
165
166def pilu(update: Update, context: CallbackContext):
167 logging.warning(f"User {update.message.from_user.username} requested an explicit pic.")
168 try:
169 tag = " " + context.args[0]
170 except IndexError:
171 tag = ""
172 image, url = get_random_image("rating:e" + tag)
173 if image is None:
174 logging.warning("Getting Image failed")
175 raise TelegramError("bad image")
176 return
177
178 image = _img_to_bio(image)
179 markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
180
181 update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
182
183def _format_author(user):
184 if user.username is not None:
185 return user.full_name + f" ({user.username})"
186 return user.full_name
187
188def _get_author(message):
189
190 if message.reply_to_message is None:
191 return _format_author(message.from_user)
192
193 if message.reply_to_message.forward_from is not None:
194 return _format_author(message.reply_to_message.forward_from)
195
196 if message.reply_to_message.forward_sender_name is not None:
197 return message.reply_to_message.forward_sender_name
198
199 return _format_author(message.reply_to_message.from_user)
200
201def tt_check(author, content, reply):
202 input_text = f"{reply} {content}".replace("\n", " ")
203
204 if input_text.strip() == "":
205 return None
206
207 return input_text
208
209def ttbt_check(author, content, reply):
210 input_text = f"{reply}\n{content}"
211
212 if input_text.strip() == "":
213 return None
214
215 return input_text
216
217def splash_check(author, content, reply):
218 input_text = f"{author}\n{content}\n{reply}"
219
220 if len(input_text.strip().split("\n")) < 2:
221 return None
222
223 return input_text
224
225def ttbt(update: Update, context: CallbackContext):
226 content, image, markup = _get_all(update, ttbt_check, context)
227
228 if image is None:
229 update.message.reply_text(l("no_caption", lang))
230 return
231
232 image = _img_to_bio(tt_bt_effect(content, image))
233 update.message.reply_photo(photo=image, reply_markup=markup)
234
235
236def tt(update: Update, context: CallbackContext):
237 content, image, markup = _get_all(update, tt_check, context)
238
239 if image is None:
240 update.message.reply_text(l("no_caption", lang))
241 return
242
243 image = _img_to_bio(tt_bt_effect(content, image))
244 update.message.reply_photo(photo=image, reply_markup=markup)
245
246def bt(update: Update, context: CallbackContext):
247 content, image, markup = _get_all(update, tt_check, context)
248
249 if image is None:
250 update.message.reply_text(l("no_caption", lang))
251 return
252
253 image = _img_to_bio(tt_bt_effect("\n" + content, image))
254 update.message.reply_photo(photo=image, reply_markup=markup)
255
256def splash(update: Update, context: CallbackContext):
257 content, image, markup = _get_all(update, splash_check, context)
258
259 if image is None:
260 update.message.reply_text(l("no_caption", lang))
261 return
262
263 image = _img_to_bio(splash_effect(content, image))
264 update.message.reply_photo(photo=image, reply_markup=markup)
265
266def caps(update: Update, context: CallbackContext):
267 _, reply = _get_reply(update.message.reply_to_message, _get_args(context))
268 context.bot.send_message(chat_id=update.effective_chat.id, text=reply.upper())
269
270def unknown(update: Update, context: CallbackContext):
271 logging.info(f"User {update.message.from_user.username} sent {update.message.text_markdown_v2} and I don't know what that means.")
272
273def error_callback(update: Update, context: CallbackContext):
274 try:
275 raise context.error
276 except TelegramError:
277 logging.error("TelegramError!!")
278 context.bot.send_message(chat_id=update.effective_chat.id, text=l('error', lang))
279
280def _add_effect_handler(dispatcher, command: str, callback):
281 dispatcher.add_handler(CommandHandler(command, callback))
282 dispatcher.add_handler(MessageHandler(Filters.caption(update=[f"/{command}"]), callback))
283
284def main():
285
286 updater = Updater(token=os.getenv("token"),
287 persistence=PicklePersistence(filename='bot-data.pkl',
288 store_bot_data=False,
289 store_callback_data=False,
290 store_user_data=False))
291
292 dispatcher = updater.dispatcher
293 dispatcher.add_error_handler(error_callback)
294
295 dispatcher.add_handler(CommandHandler('start', start))
296 dispatcher.add_handler(CommandHandler('lewd', set_lewd))
297 dispatcher.add_handler(CommandHandler('caps', caps))
298 dispatcher.add_handler(CommandHandler('pic', pic))
299 dispatcher.add_handler(CommandHandler('raw', raw))
300 dispatcher.add_handler(CommandHandler('pilu', pilu))
301
302 _add_effect_handler(dispatcher, 'ttbt', ttbt)
303 _add_effect_handler(dispatcher, 'tt', tt)
304 _add_effect_handler(dispatcher, 'bt', bt)
305 _add_effect_handler(dispatcher, 'splash', splash)
306
307 dispatcher.add_handler(MessageHandler(Filters.command, unknown))
308 updater.start_polling()
309 updater.idle()
310
311if __name__ == "__main__":
312 main()