python-meme-bot/bot.py (view raw)
1import os
2import logging
3from io import BytesIO
4
5from PIL import Image
6from dotenv import load_dotenv
7from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton, Message
8from telegram.error import TelegramError
9from telegram.ext import ApplicationBuilder, CallbackQueryHandler, CommandHandler, MessageHandler, \
10 PicklePersistence, filters, PersistenceInput, ContextTypes
11
12from .api import get_random_image
13from .constants import format_chat, get_localized_string as l, format_author, format_lang, langs, get_lang, lang_markup
14from .effects import img_to_bio, tt_bt_effect, bt_effect, splash_effect, wot_effect, text_effect
15from .slot import spin, autospin, bet, cash
16
17load_dotenv()
18logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
19
20
21async def _get_message_content(message):
22 image = None
23 if len(message.photo) > 0:
24 p = message.photo[-1]
25 i = await p.get_file()
26 d = await i.download_as_bytearray()
27 image = Image.open(BytesIO(d))
28
29 content = ""
30 if message.text is not None:
31 content = message.text.strip()
32 elif message.caption is not None:
33 content = message.caption.strip()
34
35 lines = content.split("\n")
36 r = lines[0].split(" ")
37
38 try:
39 if r[0][0] == '/':
40 r.pop(0)
41 except IndexError:
42 pass
43
44 lines[0] = " ".join(r)
45 content = "\n".join(lines)
46
47 return image, content, _get_author(message)
48
49
50async def _get_reply(message, fallback=""):
51 if message is None:
52 return None, fallback, None
53
54 image, content, author = await _get_message_content(message)
55
56 return image, content, author
57
58
59def _get_lewd(context):
60 try:
61 return context.chat_data["lewd"]
62 except KeyError:
63 return False
64
65
66def _get_image(context):
67 if context is not None:
68 image, url = get_random_image(_get_lewd(context))
69
70 if image is None:
71 logging.warning("Getting Image failed")
72 raise TelegramError("bad image")
73
74 markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", context), url=url)]])
75
76 return image, markup
77
78
79async def _get_all(update, check_fn, context):
80 image_reply, text_reply, author_reply = await _get_reply(update.message.reply_to_message)
81 image_content, text_content, author_content = await _get_message_content(update.message)
82
83 info_struct = {
84 "reply": {
85 "author": author_reply,
86 "text": text_reply,
87 "image": image_reply
88 },
89 "content": {
90 "author": author_content,
91 "text": text_content,
92 "image": image_content
93 }
94 }
95
96 logging.info(
97 f"User {update.message.from_user.full_name}{f' (@{update.message.from_user.username})' if update.message.from_user.username is not None else ''} typed: {str(update.message.text)}")
98
99 content = check_fn(info_struct)
100
101 if content is None:
102 return None, None, None
103
104 markup = ""
105 image = None
106
107 if image_reply is not None:
108 image = image_reply
109
110 if image_content is not None:
111 image = image_content
112
113 if image is None:
114 image, markup = _get_image(context)
115
116 return content, image, markup
117
118
119async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
120 await update.message.reply_text(l("welcome", context))
121
122
123async def set_lewd(update: Update, context: ContextTypes.DEFAULT_TYPE):
124 try:
125 output = False if context.chat_data["lewd"] else True
126 except KeyError:
127 output = True
128
129 context.chat_data['lewd'] = output
130 message = l("lewd_toggle", context).format(l("enabled", context) if output else l("disabled", context))
131 return await update.message.reply_text(message)
132
133
134async def pic(update: Update, context: ContextTypes.DEFAULT_TYPE):
135 image, markup = _get_image(context)
136 return await update.message.reply_photo(photo=img_to_bio(image), parse_mode="markdown", reply_markup=markup)
137
138
139def _get_author(message: Message):
140 origin = message.forward_origin
141
142 if origin is None: # message was not forwarded
143 return format_author(message.from_user)
144
145 try:
146 return format_author(origin['sender_user']) # MessageOriginUser
147 except KeyError:
148 pass
149
150 try:
151 return origin['sender_user_name'] # MessageOriginHiddenUser
152 except KeyError:
153 pass
154
155 try:
156 format_chat(origin['sender_chat']) # MessageOriginChat
157 except KeyError:
158 pass
159 try:
160 format_chat(origin['chat']) # MessageOriginChannel
161 except KeyError:
162 pass
163
164 logging.warn("Message was forwarded but I couldn't detect the original author.")
165 return format_author(message.from_user)
166
167
168def tt_check(info):
169 reply = info['reply']['text']
170 content = info['content']['text']
171
172 input_text = f"{reply} {content}".replace("\n", " ")
173
174 if input_text.strip() == "":
175 return None
176
177 return input_text
178
179
180def ttbt_check(info):
181 reply = info['reply']['text'].strip()
182 content = info['content']['text'].strip()
183
184 if len(content.split("\n")) > 1:
185 input_text = content
186 else:
187 input_text = f"{reply}\n{content}"
188
189 if input_text.strip() == "":
190 return None
191
192 return input_text
193
194
195def splash_check(info):
196 reply = info['reply']['text']
197 content = info['content']['text']
198
199 if content.strip() == "":
200 author = info['reply']['author']
201 input_text = f"{author}\n{reply}"
202 else:
203 author = info['content']['author']
204 input_text = f"{author}\n{content}"
205
206 if len(input_text.strip().split("\n")) < 2:
207 return None
208
209 return input_text
210
211
212def wot_check(info):
213 reply = info['reply']['text']
214 content = info['content']['text']
215
216 input_text = f"{reply}\n{content}"
217
218 if input_text.strip() == "":
219 return None
220
221 return input_text
222
223
224async def ttbt(update: Update, context: ContextTypes.DEFAULT_TYPE):
225 content, image, markup = await _get_all(update, ttbt_check, context)
226
227 if image is None:
228 return await update.message.reply_text(l("no_caption", context))
229
230 image = tt_bt_effect(content, image)
231
232 if image is None:
233 return await update.message.reply_text(l("failed_effect", context))
234
235 return await update.message.reply_photo(photo=image, reply_markup=markup)
236
237
238async def tt(update: Update, context: ContextTypes.DEFAULT_TYPE):
239 content, image, markup = await _get_all(update, tt_check, context)
240
241 if image is None:
242 return await update.message.reply_text(l("no_caption", context))
243
244 image = tt_bt_effect(content, image)
245
246 if image is None:
247 return await update.message.reply_text(l("failed_effect", context))
248
249 return await update.message.reply_photo(photo=image, reply_markup=markup)
250
251
252async def bt(update: Update, context: ContextTypes.DEFAULT_TYPE):
253 content, image, markup = await _get_all(update, tt_check, context)
254
255 if image is None:
256 return await update.message.reply_text(l("no_caption", context))
257
258 image = bt_effect(content, image)
259
260 if image is None:
261 return await update.message.reply_text(l("failed_effect", context))
262
263 return await update.message.reply_photo(photo=image, reply_markup=markup)
264
265
266async def splash(update: Update, context: ContextTypes.DEFAULT_TYPE):
267 content, image, markup = await _get_all(update, splash_check, context)
268
269 if image is None:
270 return await update.message.reply_text(l("no_caption", context))
271
272 image = splash_effect(content, image)
273
274 if image is None:
275 return await update.message.reply_text(l("failed_effect", context))
276
277 return await update.message.reply_photo(photo=image, reply_markup=markup)
278
279
280async def wot(update: Update, context: ContextTypes.DEFAULT_TYPE):
281 content, image, markup = await _get_all(update, wot_check, context)
282
283 if image is None:
284 return await update.message.reply_text(l("no_caption", context))
285
286 image = wot_effect(content, image)
287
288 if image is None:
289 await update.message.reply_text(l("failed_effect", context))
290
291 await update.message.reply_photo(photo=image, reply_markup=markup)
292
293
294async def text(update: Update, context: ContextTypes.DEFAULT_TYPE):
295 content, image, markup = await _get_all(update, wot_check, context)
296
297 if image is None:
298 await update.message.reply_text(l("no_caption", context))
299 return
300
301 image = text_effect(content, image)
302
303 if image is None:
304 await update.message.reply_text(l("failed_effect", context))
305
306 await update.message.reply_photo(photo=image, reply_markup=markup)
307
308
309async def caps(update: Update, context: ContextTypes.DEFAULT_TYPE):
310 _, reply, _ = await _get_reply(update.message.reply_to_message, ' '.join(context.args))
311 await update.message.reply_text(reply.upper())
312
313
314async def _set_lang(update: Update, context: ContextTypes.DEFAULT_TYPE, lang: str):
315 context.chat_data["lang"] = lang
316 response = l("language_set", context).format(format_lang(lang))
317 await update.message.reply_text(response)
318
319
320async def lang(update: Update, context: ContextTypes.DEFAULT_TYPE):
321 try:
322 selected = str(context.args[0])
323 except IndexError:
324 selected = None
325
326 if selected is None:
327 lang = format_lang(get_lang(context))
328 choices = ", ".join(langs) + "."
329 return await update.message.reply_text(text=l("current_language", context).format(lang, choices),
330 reply_markup=lang_markup)
331
332 if selected not in langs:
333 return await update.message.reply_text(text=l("invalid_language", context))
334
335 return await _set_lang(update, context, selected)
336
337
338def unknown(update: Update, context: ContextTypes.DEFAULT_TYPE):
339 logging.info(f"User {update.message.from_user.full_name} sent {update.message.text_markdown_v2} and I don't know what that means.")
340
341
342async def error_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
343 try:
344 raise context.error
345 except TelegramError as e:
346 logging.error("TelegramError! " + str(e))
347 await update.message.reply_text(l('error', context))
348
349
350def _add_effect_handler(application: ApplicationBuilder, command: str, callback):
351 application.add_handler(CommandHandler(command, callback))
352 application.add_handler(MessageHandler(filters.Caption([f"/{command}"]), callback))
353
354
355async def keyboard_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
356 query = update.callback_query
357 data = query.data
358
359 if data.startswith("reroll"):
360 amount = int(data.split(" ")[1])
361
362 if amount <= 1:
363 return await spin(update, context)
364 return await autospin(context, update.effective_chat.id, amount)
365
366 match data:
367 case "none":
368 return query.answer(l("none_callback", context))
369 case "set_lang_en":
370 lang = "en"
371 await _set_lang(update, context, lang)
372 return await query.answer(l("language_set", context).format(format_lang(lang)))
373 case "set_lang_it":
374 lang = "it"
375 await _set_lang(update, context, lang)
376 return await query.answer(l("language_set", context).format(format_lang(lang)))
377 case other:
378 logging.error(f"unknown callback: {data}")
379
380 return await query.answer()
381
382
383def main():
384 token = os.getenv("token")
385 pers = PersistenceInput(bot_data=False, callback_data=False)
386 persistence = PicklePersistence(filepath='bot-data.pkl', store_data=pers)
387
388 application = ApplicationBuilder().token(token).persistence(persistence).build()
389
390 application.add_error_handler(error_callback)
391 application.add_handler(CallbackQueryHandler(callback=keyboard_handler))
392
393 # commands
394 application.add_handler(CommandHandler('start', start))
395 application.add_handler(CommandHandler('lang', lang))
396 application.add_handler(CommandHandler('lewd', set_lewd))
397 application.add_handler(CommandHandler('caps', caps))
398 application.add_handler(CommandHandler('pic', pic))
399
400 # effects
401 _add_effect_handler(application, 'ttbt', ttbt)
402 _add_effect_handler(application, 'tt', tt)
403 _add_effect_handler(application, 'bt', bt)
404 _add_effect_handler(application, 'splash', splash)
405 _add_effect_handler(application, 'wot', wot)
406 _add_effect_handler(application, 'text', text)
407
408 # games
409 application.add_handler(CommandHandler('spin', spin))
410 application.add_handler(CommandHandler('bet', bet))
411 application.add_handler(CommandHandler('cash', cash))
412
413 # fallback
414 application.add_handler(MessageHandler(filters.COMMAND, unknown))
415 application.run_polling()
416
417
418if __name__ == "__main__":
419 main()