all repos — python-meme-bot @ f51476814de89c368f723295be3bb3b623cf907e

Telegram Bot that uses PIL to compute light image processing.

main.py (view raw)

  1from PIL import Image
  2from Api import get_random_image
  3from Effects import img_to_bio, tt_bt_effect, bt_effect, splash_effect, wot_effect, text_effect
  4from Constants import get_localized_string as l, format_author, format_lang, langs, get_lang, lang_markup
  5from Slot import spin, autospin, bet, cash
  6
  7from dotenv import load_dotenv
  8load_dotenv()
  9import os, logging
 10logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
 11from io import BytesIO
 12
 13from telegram.error import TelegramError
 14from telegram.ext import ApplicationBuilder, Updater, CallbackContext, CallbackQueryHandler, CommandHandler, MessageHandler, PicklePersistence, filters, PersistenceInput
 15from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
 16
 17async def _get_message_content(message):
 18    
 19    image = None
 20    if len(message.photo) > 0:
 21        p = message.photo[-1]
 22        i = await p.get_file()
 23        d = await i.download_as_bytearray()
 24        image = Image.open(BytesIO(d))
 25    
 26    content = ""
 27    if message.text is not None:
 28        content = message.text.strip()
 29    elif message.caption is not None:
 30        content = message.caption.strip()
 31        
 32    lines = content.split("\n")
 33    r = lines[0].split(" ")
 34    
 35    try:
 36        if r[0][0] == '/':
 37            r.pop(0)
 38    except IndexError:
 39        pass
 40        
 41    lines[0] = " ".join(r)
 42    content = "\n".join(lines)
 43    
 44    return image, content, _get_author(message)
 45
 46async def _get_reply(message, fallback=""):
 47    
 48    if message is None:
 49        return None, fallback, None
 50    
 51    image, content, author = await _get_message_content(message)
 52    
 53    return image, content, author
 54
 55def _get_lewd(context):
 56    
 57    try:
 58        return context.chat_data["lewd"]
 59    except KeyError:
 60        return False
 61
 62def _get_image(context):
 63    
 64    if context is not None:
 65        image, url = get_random_image(_get_lewd(context))
 66    
 67    if image is None:
 68        logging.warning("Getting Image failed")
 69        raise TelegramError("bad image")
 70    
 71    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", context), url=url)]])
 72
 73    return image, markup
 74
 75async def _get_all(update, check_fn, context):
 76    
 77    image_reply, text_reply, author_reply = await _get_reply(update.message.reply_to_message)
 78    image_content, text_content, author_content = await _get_message_content(update.message)
 79    
 80    info_struct = {
 81        "reply": {
 82            "author": author_reply,
 83            "text": text_reply,
 84            "image": image_reply
 85        },
 86        "content": {
 87            "author": author_content,
 88            "text": text_content,
 89            "image": image_content
 90        }
 91    }
 92        
 93    logging.info(f"User {update.message.from_user.full_name}{f' (@{update.message.from_user.username})' if update.message.from_user.username is not None else ''} typed: {str(update.message.text)}")
 94    
 95    content = check_fn(info_struct)
 96    
 97    if content is None:
 98        return None, None, None
 99
100    markup = ""
101    image = None
102    
103    if image_reply is not None:
104        image = image_reply
105    
106    if image_content is not None:
107        image = image_content
108    
109    if image is None:
110        image, markup = _get_image(context)
111    
112    return content, image, markup
113
114def start(update: Update, context: CallbackContext):
115    context.bot.send_message(chat_id=update.effective_chat.id, text=l("welcome", context))
116
117async def set_lewd(update: Update, context: CallbackContext):
118    
119    try:
120        output = False if context.chat_data["lewd"] else True
121    except KeyError:
122        output = True
123        
124    context.chat_data['lewd'] = output
125    message = l("lewd_toggle", context).format(l("enabled", context) if output else l("disabled", context))
126    
127    return await context.bot.send_message(chat_id=update.effective_chat.id, text=message)
128
129async def pic(update: Update, context: CallbackContext):
130    
131    image, markup = _get_image(context)
132    return await update.message.reply_photo(photo=img_to_bio(image), parse_mode="markdown", reply_markup=markup)
133
134def _get_author(message):
135    
136    if message.forward_from is not None:
137        return format_author(message.forward_from)
138    
139    if message.forward_sender_name is not None:
140        return message.forward_sender_name
141    
142    if message.forward_from_chat is not None:
143        return message.forward_from_chat.title + ("" if message.forward_from_chat.username is None else f" ({message.forward_from_chat.username})")
144    
145    return format_author(message.from_user)
146
147def tt_check(info):
148    
149    reply = info['reply']['text']
150    content = info['content']['text']
151    
152    input_text = f"{reply} {content}".replace("\n", " ")
153    
154    if input_text.strip() == "":
155        return None
156
157    return input_text
158
159def ttbt_check(info):
160    
161    reply = info['reply']['text'].strip()
162    content = info['content']['text'].strip()
163    
164    if len(content.split("\n")) > 1:
165        input_text = content
166    else:
167        input_text = f"{reply}\n{content}"
168    
169    if input_text.strip() == "":
170        return None
171
172    return input_text
173
174def splash_check(info):
175    
176    reply = info['reply']['text']
177    content = info['content']['text']
178    
179    if content.strip() == "":
180        author = info['reply']['author']
181        input_text = f"{author}\n{reply}"
182    else:
183        author = info['content']['author']
184        input_text = f"{author}\n{content}"
185
186    if len(input_text.strip().split("\n")) < 2:
187        return None
188
189    return input_text
190
191def wot_check(info):
192    
193    reply = info['reply']['text']
194    content = info['content']['text']
195    
196    input_text = f"{reply}\n{content}"
197
198    if input_text.strip() == "":
199        return None
200
201    return input_text
202
203async def ttbt(update: Update, context: CallbackContext):
204    
205    content, image, markup = await _get_all(update, ttbt_check, context)
206    
207    if image is None:
208        return await update.message.reply_text(l("no_caption", context))
209    
210    image = tt_bt_effect(content, image)
211
212    if image is None:
213        return await update.message.reply_text(l("failed_effect", context))
214
215    return await update.message.reply_photo(photo=image, reply_markup=markup)
216
217
218async def tt(update: Update, context: CallbackContext):
219    
220    content, image, markup = await _get_all(update, tt_check, context)
221    
222    if image is None:
223        return await update.message.reply_text(l("no_caption", context))
224    
225    image = tt_bt_effect(content, image)
226
227    if image is None:
228        return await update.message.reply_text(l("failed_effect", context))
229
230    return await update.message.reply_photo(photo=image, reply_markup=markup)
231    
232async def bt(update: Update, context: CallbackContext):
233    
234    content, image, markup = await _get_all(update, tt_check, context)
235    
236    if image is None:
237        return await update.message.reply_text(l("no_caption", context))
238
239    image = bt_effect(content, image)
240
241    if image is None:
242        return await update.message.reply_text(l("failed_effect", context))
243
244    return await update.message.reply_photo(photo=image, reply_markup=markup)
245
246async def splash(update: Update, context: CallbackContext):
247    
248    content, image, markup = await _get_all(update, splash_check, context)
249    
250    if image is None:
251        return await update.message.reply_text(l("no_caption", context))
252    
253    image = splash_effect(content, image)
254
255    if image is None:
256        return await update.message.reply_text(l("failed_effect", context))
257    
258    return await update.message.reply_photo(photo=image, reply_markup=markup)
259    
260async def wot(update: Update, context: CallbackContext):
261    
262    content, image, markup = await _get_all(update, wot_check, context)
263    
264    if image is None:
265        return await update.message.reply_text(l("no_caption", context))
266    
267    image = wot_effect(content, image)
268
269    if image is None:
270        await update.message.reply_text(l("failed_effect", context))
271    
272    await update.message.reply_photo(photo=image, reply_markup=markup)
273    
274async def text(update: Update, context: CallbackContext):
275    
276    content, image, markup = await _get_all(update, wot_check, context)
277    
278    if image is None:
279        await update.message.reply_text(l("no_caption", context))
280        return
281    
282    image = text_effect(content, image)
283
284    if image is None:
285        await update.message.reply_text(l("failed_effect", context))
286    
287    await update.message.reply_photo(photo=image, reply_markup=markup)
288
289async def caps(update: Update, context: CallbackContext):
290    
291    _, reply, _ = await _get_reply(update.message.reply_to_message, ' '.join(context.args))
292    await context.bot.send_message(chat_id=update.effective_chat.id, text=reply.upper())
293
294async def _set_lang(update: Update, context: CallbackContext, lang: str):
295    context.chat_data["lang"] = lang
296    await context.bot.send_message(chat_id=update.effective_chat.id, text=l("language_set", context).format(format_lang(lang)))
297
298async def lang(update: Update, context: CallbackContext):
299    try:
300        selected = str(context.args[0])
301    except IndexError:
302        selected = None
303    
304    if selected is None:
305        lang = format_lang(get_lang(context))
306        choices = ", ".join(langs) + "."
307        return await update.message.reply_text(text=l("current_language", context).format(lang, choices), reply_markup=lang_markup)
308    
309    if selected not in langs:
310        return await update.message.reply_text(text=l("invalid_language", context))
311    
312    return await _set_lang(update, context, selected)
313    
314def unknown(update: Update, context: CallbackContext):
315    logging.info(f"User {update.message.from_user.full_name} sent {update.message.text_markdown_v2} and I don't know what that means.")
316    
317async def error_callback(update: Update, context: CallbackContext):
318    try:
319        raise context.error
320    except TelegramError as e:
321        logging.error("TelegramError! " + str(e))
322        await context.bot.send_message(chat_id=update.effective_chat.id, text=l('error', context))
323        
324def _add_effect_handler(application: ApplicationBuilder, command: str, callback):
325    application.add_handler(CommandHandler(command, callback))
326    application.add_handler(MessageHandler(filters.Caption([f"/{command}"]), callback))
327
328async def keyboard_handler(update: Update, context: CallbackContext):
329    query = update.callback_query
330    data = query.data
331    
332    if data.startswith("reroll"):
333        amount = int(data.split(" ")[1])
334        
335        if amount <= 1:
336            return await spin(update, context)
337        return await autospin(context, update.effective_chat.id, amount)
338    
339    match data:
340        case "none":
341            return query.answer(l("none_callback", context))
342        case "set_lang_en":
343            lang = "en"
344            await _set_lang(update, context, lang)
345            return await query.answer(l("language_set", context).format(format_lang(lang)))
346        case "set_lang_it":
347            lang = "it"
348            await _set_lang(update, context, lang)
349            return await query.answer(l("language_set", context).format(format_lang(lang)))
350        case other:
351            logging.error(f"unknown callback: {data}")
352    
353    return await query.answer()
354
355def main():
356    
357    pers = PersistenceInput(bot_data = False, callback_data = False)
358    
359    application = ApplicationBuilder()
360    application.token(os.getenv("token"))
361    application.persistence(PicklePersistence(filepath='bot-data.pkl', store_data=pers))
362    
363    application = application.build()
364
365    
366    application.add_error_handler(error_callback)
367    application.add_handler(CallbackQueryHandler(callback=keyboard_handler))
368    
369    # commands
370    application.add_handler(CommandHandler('start', start))
371    application.add_handler(CommandHandler('lang', lang))
372    application.add_handler(CommandHandler('lewd', set_lewd))
373    application.add_handler(CommandHandler('caps', caps))
374    application.add_handler(CommandHandler('pic', pic))
375    
376    # effects
377    _add_effect_handler(application, 'ttbt', ttbt)
378    _add_effect_handler(application, 'tt', tt)
379    _add_effect_handler(application, 'bt', bt)
380    _add_effect_handler(application, 'splash', splash)
381    _add_effect_handler(application, 'wot', wot)
382    _add_effect_handler(application, 'text', text)
383    
384    # games
385    application.add_handler(CommandHandler('spin', spin))
386    application.add_handler(CommandHandler('bet', bet))
387    application.add_handler(CommandHandler('cash', cash))
388    
389    # fallback
390    application.add_handler(MessageHandler(filters.Command(), unknown))
391    application.run_polling()
392
393if __name__ ==  "__main__":
394    main()