main.py (view raw)
1from PIL import Image
2from Api import get_random_image, rating_normal, rating_lewd
3from Effects import img_to_bio, tt_bt_effect, bt_effect, splash_effect, wot_effect, text_effect
4from Constants import get_localized_string as l
5from Games import spin
6
7from dotenv import load_dotenv
8load_dotenv()
9import os, logging
10logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
11from io import BytesIO
12
13from telegram.error import TelegramError
14from telegram.ext import Updater, CallbackContext, CallbackQueryHandler, CommandHandler, MessageHandler, Filters, PicklePersistence
15from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
16
17lang = 'us'
18
19def _get_message_content(message):
20
21 image = None
22 if len(message.photo) > 0:
23 image = message.photo[-1].get_file()
24 image = Image.open(BytesIO(image.download_as_bytearray()))
25
26 content = ""
27 if message.text is not None:
28 content = message.text.strip()
29 elif message.caption is not None:
30 content = message.caption.strip()
31
32 lines = content.split("\n")
33 r = lines[0].split(" ")
34
35 try:
36 if r[0][0] == '/':
37 r.pop(0)
38 except IndexError:
39 pass
40
41 lines[0] = " ".join(r)
42 content = "\n".join(lines)
43
44 return image, content, _get_author(message)
45
46def _get_reply(message, fallback=""):
47
48 if message is None:
49 return None, fallback, None
50
51 image, content, author = _get_message_content(message)
52
53 return image, content, author
54
55def _get_lewd(context):
56
57 try:
58 return context.chat_data["lewd"]
59 except KeyError:
60 return False
61
62def _get_image(context, tag="", bio=True):
63
64 if context is not None:
65 if _get_lewd(context):
66 image, url = get_random_image(rating_lewd, tag)
67 else:
68 image, url = get_random_image(rating_normal, tag)
69
70 if image is None:
71 logging.warning("Getting Image failed")
72 raise TelegramError("bad image")
73
74 markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
75
76 if bio:
77 return image, markup
78 return image, markup
79
80def _get_all(update, check_fn, context):
81
82 image_reply, text_reply, author_reply = _get_reply(update.message.reply_to_message)
83 image_content, text_content, author_content = _get_message_content(update.message)
84
85 info_struct = {
86 "reply": {
87 "author": author_reply,
88 "text": text_reply,
89 "image": image_reply
90 },
91 "content": {
92 "author": author_content,
93 "text": text_content,
94 "image": image_content
95 }
96 }
97
98 logging.info(f"User {update.message.from_user.full_name}{f' (@{update.message.from_user.username})' if update.message.from_user.username is not None else ''} typed: {str(update.message.text)}")
99
100 content = check_fn(info_struct)
101
102 if content is None:
103 return None, None, None
104
105 markup = ""
106 image = None
107
108 if image_reply is not None:
109 image = image_reply
110
111 if image_content is not None:
112 image = image_content
113
114 if image is None:
115 image, markup = _get_image(context, bio=False)
116
117 return content, image, markup
118
119def start(update: Update, context: CallbackContext):
120 context.bot.send_message(chat_id=update.effective_chat.id, text=l("welcome", lang))
121
122def set_lewd(update: Update, context: CallbackContext):
123
124 try:
125 output = False if context.chat_data["lewd"] else True
126 except KeyError:
127 output = True
128
129 context.chat_data['lewd'] = output
130 message = l("lewd_toggle", lang).format(l("enabled", lang) if output else l("disabled", lang))
131
132 context.bot.send_message(chat_id=update.effective_chat.id, text=message)
133
134def pic(update: Update, context: CallbackContext):
135
136 try:
137 tag = " " + context.args[0]
138 except IndexError:
139 tag = ""
140
141 image, markup = _get_image(context, tag)
142 update.message.reply_photo(photo=img_to_bio(image), parse_mode="markdown", reply_markup=markup)
143
144def raw(update: Update, context: CallbackContext):
145
146 tag = ""
147 try:
148 tag += " " + context.args[0]
149 tag += " " + context.args[1]
150 except IndexError:
151 pass
152
153 image, url = get_random_image("", tag)
154
155 if image is None:
156 logging.warning("Getting Image failed")
157 raise TelegramError("bad image")
158
159 image = img_to_bio(image)
160 markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
161
162 update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
163
164def pilu(update: Update, context: CallbackContext):
165
166 logging.warning(f"User {update.message.from_user.username} requested an explicit pic.")
167 try:
168 tag = " " + context.args[0]
169 except IndexError:
170 tag = ""
171 image, url = get_random_image("rating:e" + tag)
172 if image is None:
173 logging.warning("Getting Image failed")
174 raise TelegramError("bad image")
175 return
176
177 image = img_to_bio(image)
178 markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", lang), url=url)]])
179
180 update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
181
182def _format_author(user):
183
184 if user.username is not None:
185 return user.full_name + f" ({user.username})"
186 return user.full_name
187
188def _get_author(message):
189
190 if message.forward_from is not None:
191 return _format_author(message.forward_from)
192
193 if message.forward_sender_name is not None:
194 return message.forward_sender_name
195
196 if message.forward_from_chat is not None:
197 return message.forward_from_chat.title + ("" if message.forward_from_chat.username is None else f" ({message.forward_from_chat.username})")
198
199 return _format_author(message.from_user)
200
201def tt_check(info):
202
203 reply = info['reply']['text']
204 content = info['content']['text']
205
206 input_text = f"{reply} {content}".replace("\n", " ")
207
208 if input_text.strip() == "":
209 return None
210
211 return input_text
212
213def ttbt_check(info):
214
215 reply = info['reply']['text'].strip()
216 content = info['content']['text'].strip()
217
218 if len(content.split("\n")) > 1:
219 input_text = content
220 else:
221 input_text = f"{reply}\n{content}"
222
223 if input_text.strip() == "":
224 return None
225
226 return input_text
227
228def splash_check(info):
229
230 reply = info['reply']['text']
231 content = info['content']['text']
232
233 if content.strip() == "":
234 author = info['reply']['author']
235 input_text = f"{author}\n{reply}"
236 else:
237 author = info['content']['author']
238 input_text = f"{author}\n{content}"
239
240 if len(input_text.strip().split("\n")) < 2:
241 return None
242
243 return input_text
244
245def wot_check(info):
246
247 reply = info['reply']['text']
248 content = info['content']['text']
249
250 input_text = f"{reply}\n{content}"
251
252 if input_text.strip() == "":
253 return None
254
255 return input_text
256
257def ttbt(update: Update, context: CallbackContext):
258
259 content, image, markup = _get_all(update, ttbt_check, context)
260
261 if image is None:
262 update.message.reply_text(l("no_caption", lang))
263 return
264
265 image = tt_bt_effect(content, image)
266
267 if image is None:
268 update.message.reply_text(l("failed_effect", lang))
269
270 update.message.reply_photo(photo=image, reply_markup=markup)
271
272
273def tt(update: Update, context: CallbackContext):
274
275 content, image, markup = _get_all(update, tt_check, context)
276
277 if image is None:
278 update.message.reply_text(l("no_caption", lang))
279 return
280
281 image = tt_bt_effect(content, image)
282
283 if image is None:
284 update.message.reply_text(l("failed_effect", lang))
285
286 update.message.reply_photo(photo=image, reply_markup=markup)
287
288def bt(update: Update, context: CallbackContext):
289
290 content, image, markup = _get_all(update, tt_check, context)
291
292 if image is None:
293 update.message.reply_text(l("no_caption", lang))
294 return
295
296 image = bt_effect(content, image)
297
298 if image is None:
299 update.message.reply_text(l("failed_effect", lang))
300
301 update.message.reply_photo(photo=image, reply_markup=markup)
302
303def splash(update: Update, context: CallbackContext):
304
305 content, image, markup = _get_all(update, splash_check, context)
306
307 if image is None:
308 update.message.reply_text(l("no_caption", lang))
309 return
310
311 image = splash_effect(content, image)
312
313 if image is None:
314 update.message.reply_text(l("failed_effect", lang))
315
316 update.message.reply_photo(photo=image, reply_markup=markup)
317
318def wot(update: Update, context: CallbackContext):
319
320 content, image, markup = _get_all(update, wot_check, context)
321
322 if image is None:
323 update.message.reply_text(l("no_caption", lang))
324 return
325
326 image = wot_effect(content, image)
327
328 if image is None:
329 update.message.reply_text(l("failed_effect", lang))
330
331 update.message.reply_photo(photo=image, reply_markup=markup)
332
333def text(update: Update, context: CallbackContext):
334
335 content, image, markup = _get_all(update, wot_check, context)
336
337 if image is None:
338 update.message.reply_text(l("no_caption", lang))
339 return
340
341 image = text_effect(content, image)
342
343 if image is None:
344 update.message.reply_text(l("failed_effect", lang))
345
346 update.message.reply_photo(photo=image, reply_markup=markup)
347
348def caps(update: Update, context: CallbackContext):
349
350 _, reply, _ = _get_reply(update.message.reply_to_message, ' '.join(context.args))
351 context.bot.send_message(chat_id=update.effective_chat.id, text=reply.upper())
352
353def unknown(update: Update, context: CallbackContext):
354 logging.info(f"User {update.message.from_user.full_name} sent {update.message.text_markdown_v2} and I don't know what that means.")
355
356def error_callback(update: Update, context: CallbackContext):
357 try:
358 raise context.error
359 except TelegramError as e:
360 logging.error("TelegramError! " + e)
361 context.bot.send_message(chat_id=update.effective_chat.id, text=l('error', lang))
362
363def _add_effect_handler(dispatcher, command: str, callback):
364 dispatcher.add_handler(CommandHandler(command, callback))
365 dispatcher.add_handler(MessageHandler(Filters.caption(update=[f"/{command}"]), callback))
366
367def main():
368
369 updater = Updater(token=os.getenv("token"),
370 persistence=PicklePersistence(filename='bot-data.pkl',
371 store_bot_data=False,
372 store_callback_data=False,
373 store_user_data=False))
374
375 dispatcher = updater.dispatcher
376 dispatcher.add_error_handler(error_callback)
377
378 # commands
379 dispatcher.add_handler(CommandHandler('start', start))
380 dispatcher.add_handler(CommandHandler('lewd', set_lewd))
381 dispatcher.add_handler(CommandHandler('caps', caps))
382 dispatcher.add_handler(CommandHandler('pic', pic))
383
384 # effects
385 _add_effect_handler(dispatcher, 'ttbt', ttbt)
386 _add_effect_handler(dispatcher, 'tt', tt)
387 _add_effect_handler(dispatcher, 'bt', bt)
388 _add_effect_handler(dispatcher, 'splash', splash)
389 _add_effect_handler(dispatcher, 'wot', wot)
390 _add_effect_handler(dispatcher, 'text', text)
391
392 # games
393 dispatcher.add_handler(CommandHandler('spin', spin))
394 dispatcher.add_handler(CallbackQueryHandler(callback=spin))
395 #dispatcher.add_handler(CallbackQueryHandler(callback=autospin))
396
397 # secrets
398 dispatcher.add_handler(CommandHandler('raw', raw))
399 dispatcher.add_handler(CommandHandler('pilu', pilu))
400
401 # fallback
402 dispatcher.add_handler(MessageHandler(Filters.command, unknown))
403 updater.start_polling()
404 updater.idle()
405
406if __name__ == "__main__":
407 main()