all repos — python-meme-bot @ 815dfde38783eb2c67d82b4244ad8d3fef20118f

Telegram Bot that uses PIL to compute light image processing.

python_meme_bot/bot.py (view raw)

  1import os
  2import logging
  3from io import BytesIO
  4
  5from PIL import Image
  6from dotenv import load_dotenv
  7from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton, Message
  8from telegram.error import TelegramError
  9from telegram.ext import ApplicationBuilder, CallbackQueryHandler, CommandHandler, MessageHandler, \
 10    PicklePersistence, filters, PersistenceInput, ContextTypes
 11
 12from .api import get_random_image
 13from .constants import format_chat, get_localized_string as l, format_author, format_lang, langs, get_lang, lang_markup
 14from .effects import img_to_bio, tt_bt_effect, bt_effect, splash_effect, wot_effect, text_effect
 15from .slot import spin, autospin, bet, cash
 16
 17load_dotenv()
 18logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
 19
 20
 21async def _get_message_content(message):
 22    image = None
 23    if len(message.photo) > 0:
 24        p = message.photo[-1]
 25        i = await p.get_file()
 26        d = await i.download_as_bytearray()
 27        image = Image.open(BytesIO(d))
 28
 29    content = ""
 30    if message.text is not None:
 31        content = message.text.strip()
 32    elif message.caption is not None:
 33        content = message.caption.strip()
 34
 35    lines = content.split("\n")
 36    r = lines[0].split(" ")
 37
 38    try:
 39        if r[0][0] == '/':
 40            r.pop(0)
 41    except IndexError:
 42        pass
 43
 44    lines[0] = " ".join(r)
 45    content = "\n".join(lines)
 46
 47    return image, content, _get_author(message)
 48
 49
 50async def _get_reply(message, fallback=""):
 51    if message is None:
 52        return None, fallback, None
 53
 54    image, content, author = await _get_message_content(message)
 55
 56    return image, content, author
 57
 58
 59def _get_lewd(context):
 60    try:
 61        return context.chat_data["lewd"]
 62    except KeyError:
 63        return False
 64
 65
 66def _get_image(context):
 67    if context is not None:
 68        image, url = get_random_image(_get_lewd(context))
 69
 70    if image is None:
 71        logging.warning("Getting Image failed")
 72        raise TelegramError("bad image")
 73
 74    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", context), url=url)]])
 75
 76    return image, markup
 77
 78
 79async def _get_all(update, check_fn, context):
 80    image_reply, text_reply, author_reply = await _get_reply(update.message.reply_to_message)
 81    image_content, text_content, author_content = await _get_message_content(update.message)
 82
 83    info_struct = {
 84        "reply": {
 85            "author": author_reply,
 86            "text": text_reply,
 87            "image": image_reply
 88        },
 89        "content": {
 90            "author": author_content,
 91            "text": text_content,
 92            "image": image_content
 93        }
 94    }
 95
 96    logging.info(
 97        f"User {update.message.from_user.full_name}{f' (@{update.message.from_user.username})' if update.message.from_user.username is not None else ''} typed: {str(update.message.text)}")
 98
 99    content = check_fn(info_struct)
100
101    if content is None:
102        return None, None, None
103
104    markup = ""
105    image = None
106
107    if image_reply is not None:
108        image = image_reply
109
110    if image_content is not None:
111        image = image_content
112
113    if image is None:
114        image, markup = _get_image(context)
115
116    return content, image, markup
117
118
119async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
120    await update.message.reply_text(l("welcome", context))
121
122
123async def set_lewd(update: Update, context: ContextTypes.DEFAULT_TYPE):
124    try:
125        output = False if context.chat_data["lewd"] else True
126    except KeyError:
127        output = True
128
129    context.chat_data['lewd'] = output
130    message = l("lewd_toggle", context).format(l("enabled", context) if output else l("disabled", context))
131    return await update.message.reply_text(message)
132
133
134async def pic(update: Update, context: ContextTypes.DEFAULT_TYPE):
135    image, markup = _get_image(context)
136    return await update.message.reply_photo(photo=img_to_bio(image), parse_mode="markdown", reply_markup=markup)
137
138
139def _get_author(message: Message):
140    origin = message.forward_origin
141    
142    if origin is None: # message was not forwarded
143        return format_author(message.from_user)
144    
145    try:
146        return format_author(origin['sender_user']) # MessageOriginUser
147    except KeyError:
148        pass
149    
150    try:
151        return origin['sender_user_name'] # MessageOriginHiddenUser
152    except KeyError:
153        pass
154    
155    try:
156        return format_chat(origin['sender_chat']) # MessageOriginChat
157    except KeyError:
158        pass
159    try:
160        return format_chat(origin['chat']) # MessageOriginChannel
161    except KeyError:
162        pass
163    
164    logging.warn("Message was forwarded but I couldn't detect the original author.")
165    return format_author(message.from_user)
166
167
168def tt_check(info):
169    reply = info['reply']['text']
170    content = info['content']['text']
171
172    input_text = f"{reply} {content}".replace("\n", " ")
173
174    if input_text.strip() == "":
175        return None
176
177    return input_text
178
179
180def ttbt_check(info):
181    reply = info['reply']['text'].strip()
182    content = info['content']['text'].strip()
183
184    if len(content.split("\n")) > 1:
185        input_text = content
186    else:
187        input_text = f"{reply}\n{content}"
188
189    if input_text.strip() == "":
190        return None
191
192    return input_text
193
194
195def splash_check(info):
196    reply = info['reply']['text']
197    content = info['content']['text']
198
199    if content.strip() == "":
200        author = info['reply']['author']
201        input_text = f"{author}\n{reply}"
202    else:
203        author = info['content']['author']
204        input_text = f"{author}\n{content}"
205
206    if len(input_text.strip().split("\n")) < 2:
207        return None
208
209    return input_text
210
211
212def wot_check(info):
213    reply = info['reply']['text']
214    content = info['content']['text']
215
216    input_text = f"{reply}\n{content}"
217
218    if input_text.strip() == "":
219        return None
220
221    return input_text
222
223
224async def ttbt(update: Update, context: ContextTypes.DEFAULT_TYPE):
225    content, image, markup = await _get_all(update, ttbt_check, context)
226
227    if image is None:
228        return await update.message.reply_text(l("no_caption", context))
229
230    image = tt_bt_effect(content, image)
231
232    if image is None:
233        return await update.message.reply_text(l("failed_effect", context))
234
235    return await update.message.reply_photo(photo=image, reply_markup=markup)
236
237
238async def tt(update: Update, context: ContextTypes.DEFAULT_TYPE):
239    content, image, markup = await _get_all(update, tt_check, context)
240
241    if image is None:
242        return await update.message.reply_text(l("no_caption", context))
243
244    image = tt_bt_effect(content, image)
245
246    if image is None:
247        return await update.message.reply_text(l("failed_effect", context))
248
249    return await update.message.reply_photo(photo=image, reply_markup=markup)
250
251
252async def bt(update: Update, context: ContextTypes.DEFAULT_TYPE):
253    content, image, markup = await _get_all(update, tt_check, context)
254
255    if image is None:
256        return await update.message.reply_text(l("no_caption", context))
257
258    image = bt_effect(content, image)
259
260    if image is None:
261        return await update.message.reply_text(l("failed_effect", context))
262
263    return await update.message.reply_photo(photo=image, reply_markup=markup)
264
265
266async def splash(update: Update, context: ContextTypes.DEFAULT_TYPE):
267    content, image, markup = await _get_all(update, splash_check, context)
268
269    if image is None:
270        return await update.message.reply_text(l("no_caption", context))
271
272    image = splash_effect(content, image)
273
274    if image is None:
275        return await update.message.reply_text(l("failed_effect", context))
276
277    return await update.message.reply_photo(photo=image, reply_markup=markup)
278
279
280async def wot(update: Update, context: ContextTypes.DEFAULT_TYPE):
281    content, image, markup = await _get_all(update, wot_check, context)
282
283    if image is None:
284        return await update.message.reply_text(l("no_caption", context))
285
286    image = wot_effect(content, image)
287
288    if image is None:
289        await update.message.reply_text(l("failed_effect", context))
290
291    await update.message.reply_photo(photo=image, reply_markup=markup)
292
293
294async def text(update: Update, context: ContextTypes.DEFAULT_TYPE):
295    content, image, markup = await _get_all(update, wot_check, context)
296
297    if image is None:
298        await update.message.reply_text(l("no_caption", context))
299        return
300
301    image = text_effect(content, image)
302
303    if image is None:
304        await update.message.reply_text(l("failed_effect", context))
305
306    await update.message.reply_photo(photo=image, reply_markup=markup)
307
308
309async def caps(update: Update, context: ContextTypes.DEFAULT_TYPE):
310    _, reply, _ = await _get_reply(update.message.reply_to_message, ' '.join(context.args))
311    await update.message.reply_text(reply.upper())
312
313
314async def _set_lang(update: Update, context: ContextTypes.DEFAULT_TYPE, lang: str):
315    context.chat_data["lang"] = lang
316    response = l("language_set", context).format(format_lang(lang))
317    await update.message.reply_text(response)
318
319
320async def lang(update: Update, context: ContextTypes.DEFAULT_TYPE):
321    try:
322        selected = str(context.args[0])
323    except IndexError:
324        selected = None
325
326    if selected is None:
327        lang = format_lang(get_lang(context))
328        choices = ", ".join(langs) + "."
329        return await update.message.reply_text(text=l("current_language", context).format(lang, choices),
330                                               reply_markup=lang_markup)
331
332    if selected not in langs:
333        return await update.message.reply_text(text=l("invalid_language", context))
334
335    return await _set_lang(update, context, selected)
336
337
338def unknown(update: Update, context: ContextTypes.DEFAULT_TYPE):
339    logging.info(f"User {update.message.from_user.full_name} sent {update.message.text_markdown_v2} and I don't know what that means.")
340
341
342async def error_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
343    try:
344        raise context.error
345    except TelegramError as e:
346        logging.error("TelegramError! " + str(e))
347        await update.message.reply_text(l('error', context))
348
349
350def _add_effect_handler(application: ApplicationBuilder, command: str, callback):
351    application.add_handler(CommandHandler(command, callback))
352    application.add_handler(MessageHandler(filters.Caption([f"/{command}"]), callback))
353
354
355async def keyboard_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
356    query = update.callback_query
357    data = query.data
358
359    if data.startswith("reroll"):
360        amount = int(data.split(" ")[1])
361
362        if amount <= 1:
363            return await spin(update, context)
364        return await autospin(context, update.effective_chat.id, amount)
365
366    match data:
367        case "none":
368            return query.answer(l("none_callback", context))
369        case "set_lang_en":
370            lang = "en"
371            await _set_lang(update, context, lang)
372            return await query.answer(l("language_set", context).format(format_lang(lang)))
373        case "set_lang_it":
374            lang = "it"
375            await _set_lang(update, context, lang)
376            return await query.answer(l("language_set", context).format(format_lang(lang)))
377        case other:
378            logging.error(f"unknown callback: {data}")
379
380    return await query.answer()
381
382
383def main():
384    token = os.getenv("token")
385    pers = PersistenceInput(bot_data=False, callback_data=False)
386    persistence = PicklePersistence(filepath='bot-data.pkl', store_data=pers)
387
388    application = ApplicationBuilder().token(token).persistence(persistence).build()
389
390    application.add_error_handler(error_callback)
391    application.add_handler(CallbackQueryHandler(callback=keyboard_handler))
392
393    # commands
394    application.add_handler(CommandHandler('start', start))
395    application.add_handler(CommandHandler('lang', lang))
396    application.add_handler(CommandHandler('lewd', set_lewd))
397    application.add_handler(CommandHandler('caps', caps))
398    application.add_handler(CommandHandler('pic', pic))
399
400    # effects
401    _add_effect_handler(application, 'ttbt', ttbt)
402    _add_effect_handler(application, 'tt', tt)
403    _add_effect_handler(application, 'bt', bt)
404    _add_effect_handler(application, 'splash', splash)
405    _add_effect_handler(application, 'wot', wot)
406    _add_effect_handler(application, 'text', text)
407
408    # games
409    application.add_handler(CommandHandler('spin', spin))
410    application.add_handler(CommandHandler('bet', bet))
411    application.add_handler(CommandHandler('cash', cash))
412
413    # fallback
414    application.add_handler(MessageHandler(filters.COMMAND, unknown))
415    application.run_polling()
416
417
418if __name__ == "__main__":
419    main()