main.py (view raw)
1from PIL import Image
2from Api import get_random_image, rating_normal, rating_lewd
3from Effects import img_to_bio, tt_bt_effect, bt_effect, splash_effect, wot_effect, text_effect
4from Constants import get_localized_string as l, format_author, format_lang, langs, get_lang, lang_markup
5from Slot import spin, autospin, bet, cash
6
7from dotenv import load_dotenv
8load_dotenv()
9import os, logging
10logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
11from io import BytesIO
12
13from telegram.error import TelegramError
14from telegram.ext import Updater, CallbackContext, CallbackQueryHandler, CommandHandler, MessageHandler, Filters, PicklePersistence
15from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
16
17def _get_message_content(message):
18
19 image = None
20 if len(message.photo) > 0:
21 image = message.photo[-1].get_file()
22 image = Image.open(BytesIO(image.download_as_bytearray()))
23
24 content = ""
25 if message.text is not None:
26 content = message.text.strip()
27 elif message.caption is not None:
28 content = message.caption.strip()
29
30 lines = content.split("\n")
31 r = lines[0].split(" ")
32
33 try:
34 if r[0][0] == '/':
35 r.pop(0)
36 except IndexError:
37 pass
38
39 lines[0] = " ".join(r)
40 content = "\n".join(lines)
41
42 return image, content, _get_author(message)
43
44def _get_reply(message, fallback=""):
45
46 if message is None:
47 return None, fallback, None
48
49 image, content, author = _get_message_content(message)
50
51 return image, content, author
52
53def _get_lewd(context):
54
55 try:
56 return context.chat_data["lewd"]
57 except KeyError:
58 return False
59
60def _get_image(context, tag="", bio=True):
61
62 if context is not None:
63 if _get_lewd(context):
64 image, url = get_random_image(rating_lewd, tag)
65 else:
66 image, url = get_random_image(rating_normal, tag)
67
68 if image is None:
69 logging.warning("Getting Image failed")
70 raise TelegramError("bad image")
71
72 markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", context), url=url)]])
73
74 if bio:
75 return image, markup
76 return image, markup
77
78def _get_all(update, check_fn, context):
79
80 image_reply, text_reply, author_reply = _get_reply(update.message.reply_to_message)
81 image_content, text_content, author_content = _get_message_content(update.message)
82
83 info_struct = {
84 "reply": {
85 "author": author_reply,
86 "text": text_reply,
87 "image": image_reply
88 },
89 "content": {
90 "author": author_content,
91 "text": text_content,
92 "image": image_content
93 }
94 }
95
96 logging.info(f"User {update.message.from_user.full_name}{f' (@{update.message.from_user.username})' if update.message.from_user.username is not None else ''} typed: {str(update.message.text)}")
97
98 content = check_fn(info_struct)
99
100 if content is None:
101 return None, None, None
102
103 markup = ""
104 image = None
105
106 if image_reply is not None:
107 image = image_reply
108
109 if image_content is not None:
110 image = image_content
111
112 if image is None:
113 image, markup = _get_image(context, bio=False)
114
115 return content, image, markup
116
117def start(update: Update, context: CallbackContext):
118 context.bot.send_message(chat_id=update.effective_chat.id, text=l("welcome", context))
119
120def set_lewd(update: Update, context: CallbackContext):
121
122 try:
123 output = False if context.chat_data["lewd"] else True
124 except KeyError:
125 output = True
126
127 context.chat_data['lewd'] = output
128 message = l("lewd_toggle", context).format(l("enabled", context) if output else l("disabled", context))
129
130 context.bot.send_message(chat_id=update.effective_chat.id, text=message)
131
132def pic(update: Update, context: CallbackContext):
133
134 try:
135 tag = " " + context.args[0]
136 except IndexError:
137 tag = ""
138
139 image, markup = _get_image(context, tag)
140 update.message.reply_photo(photo=img_to_bio(image), parse_mode="markdown", reply_markup=markup)
141
142def raw(update: Update, context: CallbackContext):
143
144 tag = ""
145 try:
146 tag += " " + context.args[0]
147 tag += " " + context.args[1]
148 except IndexError:
149 pass
150
151 image, url = get_random_image("", tag)
152
153 if image is None:
154 logging.warning("Getting Image failed")
155 raise TelegramError("bad image")
156
157 image = img_to_bio(image)
158 markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", context), url=url)]])
159
160 update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
161
162def pilu(update: Update, context: CallbackContext):
163
164 logging.warning(f"User {update.message.from_user.username} requested an explicit pic.")
165 try:
166 tag = " " + context.args[0]
167 except IndexError:
168 tag = ""
169 image, url = get_random_image("rating:e" + tag)
170 if image is None:
171 logging.warning("Getting Image failed")
172 raise TelegramError("bad image")
173 return
174
175 image = img_to_bio(image)
176 markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", context), url=url)]])
177
178 update.message.reply_photo(photo=image, parse_mode="markdown", reply_markup=markup)
179
180def _get_author(message):
181
182 if message.forward_from is not None:
183 return format_author(message.forward_from)
184
185 if message.forward_sender_name is not None:
186 return message.forward_sender_name
187
188 if message.forward_from_chat is not None:
189 return message.forward_from_chat.title + ("" if message.forward_from_chat.username is None else f" ({message.forward_from_chat.username})")
190
191 return format_author(message.from_user)
192
193def tt_check(info):
194
195 reply = info['reply']['text']
196 content = info['content']['text']
197
198 input_text = f"{reply} {content}".replace("\n", " ")
199
200 if input_text.strip() == "":
201 return None
202
203 return input_text
204
205def ttbt_check(info):
206
207 reply = info['reply']['text'].strip()
208 content = info['content']['text'].strip()
209
210 if len(content.split("\n")) > 1:
211 input_text = content
212 else:
213 input_text = f"{reply}\n{content}"
214
215 if input_text.strip() == "":
216 return None
217
218 return input_text
219
220def splash_check(info):
221
222 reply = info['reply']['text']
223 content = info['content']['text']
224
225 if content.strip() == "":
226 author = info['reply']['author']
227 input_text = f"{author}\n{reply}"
228 else:
229 author = info['content']['author']
230 input_text = f"{author}\n{content}"
231
232 if len(input_text.strip().split("\n")) < 2:
233 return None
234
235 return input_text
236
237def wot_check(info):
238
239 reply = info['reply']['text']
240 content = info['content']['text']
241
242 input_text = f"{reply}\n{content}"
243
244 if input_text.strip() == "":
245 return None
246
247 return input_text
248
249def ttbt(update: Update, context: CallbackContext):
250
251 content, image, markup = _get_all(update, ttbt_check, context)
252
253 if image is None:
254 update.message.reply_text(l("no_caption", context))
255 return
256
257 image = tt_bt_effect(content, image)
258
259 if image is None:
260 update.message.reply_text(l("failed_effect", context))
261
262 update.message.reply_photo(photo=image, reply_markup=markup)
263
264
265def tt(update: Update, context: CallbackContext):
266
267 content, image, markup = _get_all(update, tt_check, context)
268
269 if image is None:
270 update.message.reply_text(l("no_caption", context))
271 return
272
273 image = tt_bt_effect(content, image)
274
275 if image is None:
276 update.message.reply_text(l("failed_effect", context))
277
278 update.message.reply_photo(photo=image, reply_markup=markup)
279
280def bt(update: Update, context: CallbackContext):
281
282 content, image, markup = _get_all(update, tt_check, context)
283
284 if image is None:
285 update.message.reply_text(l("no_caption", context))
286 return
287
288 image = bt_effect(content, image)
289
290 if image is None:
291 update.message.reply_text(l("failed_effect", context))
292
293 update.message.reply_photo(photo=image, reply_markup=markup)
294
295def splash(update: Update, context: CallbackContext):
296
297 content, image, markup = _get_all(update, splash_check, context)
298
299 if image is None:
300 update.message.reply_text(l("no_caption", context))
301 return
302
303 image = splash_effect(content, image)
304
305 if image is None:
306 update.message.reply_text(l("failed_effect", context))
307
308 update.message.reply_photo(photo=image, reply_markup=markup)
309
310def wot(update: Update, context: CallbackContext):
311
312 content, image, markup = _get_all(update, wot_check, context)
313
314 if image is None:
315 update.message.reply_text(l("no_caption", context))
316 return
317
318 image = wot_effect(content, image)
319
320 if image is None:
321 update.message.reply_text(l("failed_effect", context))
322
323 update.message.reply_photo(photo=image, reply_markup=markup)
324
325def text(update: Update, context: CallbackContext):
326
327 content, image, markup = _get_all(update, wot_check, context)
328
329 if image is None:
330 update.message.reply_text(l("no_caption", context))
331 return
332
333 image = text_effect(content, image)
334
335 if image is None:
336 update.message.reply_text(l("failed_effect", context))
337
338 update.message.reply_photo(photo=image, reply_markup=markup)
339
340def caps(update: Update, context: CallbackContext):
341
342 _, reply, _ = _get_reply(update.message.reply_to_message, ' '.join(context.args))
343 context.bot.send_message(chat_id=update.effective_chat.id, text=reply.upper())
344
345def _set_lang(update: Update, context: CallbackContext, lang: str):
346 context.chat_data["lang"] = lang
347 context.bot.send_message(chat_id=update.effective_chat.id, text=l("language_set", context).format(format_lang(lang)))
348
349def lang(update: Update, context: CallbackContext):
350 try:
351 selected = str(context.args[0])
352 except IndexError:
353 selected = None
354
355 if selected is None:
356 lang = format_lang(get_lang(context))
357 choices = ", ".join(langs) + "."
358 return update.message.reply_text(text=l("current_language", context).format(lang, choices), reply_markup=lang_markup)
359
360 if selected not in langs:
361 update.message.reply_text(text=l("invalid_language", context))
362 return
363
364 _set_lang(update, context, selected)
365
366def unknown(update: Update, context: CallbackContext):
367 logging.info(f"User {update.message.from_user.full_name} sent {update.message.text_markdown_v2} and I don't know what that means.")
368
369def error_callback(update: Update, context: CallbackContext):
370 try:
371 raise context.error
372 except TelegramError as e:
373 logging.error("TelegramError! " + str(e))
374 context.bot.send_message(chat_id=update.effective_chat.id, text=l('error', context))
375
376def _add_effect_handler(dispatcher, command: str, callback):
377 dispatcher.add_handler(CommandHandler(command, callback))
378 dispatcher.add_handler(MessageHandler(Filters.caption(update=[f"/{command}"]), callback))
379
380def keyboard_handler(update: Update, context: CallbackContext):
381 query = update.callback_query
382 data = query.data
383
384 if data.startswith("reroll"):
385 amount = int(data.split(" ")[1])
386
387 if amount <= 1:
388 return spin(update, context)
389 return autospin(context, update.effective_chat.id, amount)
390
391 match data:
392 case "none":
393 return query.answer(l("none_callback", context))
394 case "set_lang_en":
395 lang = "en"
396 _set_lang(update, context, lang)
397 return query.answer(l("language_set", context).format(format_lang(lang)))
398 case "set_lang_it":
399 lang = "it"
400 _set_lang(update, context, lang)
401 return query.answer(l("language_set", context).format(format_lang(lang)))
402 case other:
403 logging.error(f"unknown callback: {data}")
404
405 return query.answer()
406
407def main():
408
409 updater = Updater(token=os.getenv("token"),
410 persistence=PicklePersistence(filename='bot-data.pkl',
411 store_bot_data=False,
412 store_callback_data=False
413 ))
414
415 dispatcher = updater.dispatcher
416
417 dispatcher.add_error_handler(error_callback)
418 dispatcher.add_handler(CallbackQueryHandler(callback=keyboard_handler))
419
420 # commands
421 dispatcher.add_handler(CommandHandler('start', start))
422 dispatcher.add_handler(CommandHandler('lang', lang))
423 dispatcher.add_handler(CommandHandler('lewd', set_lewd))
424 dispatcher.add_handler(CommandHandler('caps', caps))
425 dispatcher.add_handler(CommandHandler('pic', pic))
426
427 # effects
428 _add_effect_handler(dispatcher, 'ttbt', ttbt)
429 _add_effect_handler(dispatcher, 'tt', tt)
430 _add_effect_handler(dispatcher, 'bt', bt)
431 _add_effect_handler(dispatcher, 'splash', splash)
432 _add_effect_handler(dispatcher, 'wot', wot)
433 _add_effect_handler(dispatcher, 'text', text)
434
435 # games
436 dispatcher.add_handler(CommandHandler('spin', spin))
437 dispatcher.add_handler(CommandHandler('bet', bet))
438 dispatcher.add_handler(CommandHandler('cash', cash))
439
440 # secrets
441 dispatcher.add_handler(CommandHandler('raw', raw))
442 dispatcher.add_handler(CommandHandler('pilu', pilu))
443
444 # fallback
445 dispatcher.add_handler(MessageHandler(Filters.command, unknown))
446 updater.start_polling()
447 updater.idle()
448
449if __name__ == "__main__":
450 main()