all repos — python-meme-bot @ 06f4f2d1be1acacaaf84179a7a8ad0d2b78ef739

Telegram Bot that uses PIL to compute light image processing.

main.py (view raw)

  1from PIL import Image
  2from Api import get_random_image
  3from Effects import img_to_bio, tt_bt_effect, bt_effect, splash_effect, wot_effect, text_effect
  4from Constants import get_localized_string as l, format_author, format_lang, langs, get_lang, lang_markup
  5from Slot import spin, autospin, bet, cash
  6
  7from dotenv import load_dotenv
  8load_dotenv()
  9import os, logging
 10logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
 11from io import BytesIO
 12
 13from telegram.error import TelegramError
 14from telegram.ext import ApplicationBuilder, Updater, CallbackContext, CallbackQueryHandler, CommandHandler, MessageHandler, PicklePersistence, filters, PersistenceInput
 15from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
 16
 17def _get_message_content(message):
 18    
 19    image = None
 20    if len(message.photo) > 0:
 21        image = message.photo[-1].get_file()
 22        image = Image.open(BytesIO(image.download_as_bytearray()))
 23    
 24    content = ""
 25    if message.text is not None:
 26        content = message.text.strip()
 27    elif message.caption is not None:
 28        content = message.caption.strip()
 29        
 30    lines = content.split("\n")
 31    r = lines[0].split(" ")
 32    
 33    try:
 34        if r[0][0] == '/':
 35            r.pop(0)
 36    except IndexError:
 37        pass
 38        
 39    lines[0] = " ".join(r)
 40    content = "\n".join(lines)
 41    
 42    return image, content, _get_author(message)
 43
 44def _get_reply(message, fallback=""):
 45    
 46    if message is None:
 47        return None, fallback, None
 48    
 49    image, content, author = _get_message_content(message)
 50    
 51    return image, content, author
 52
 53def _get_lewd(context):
 54    
 55    try:
 56        return context.chat_data["lewd"]
 57    except KeyError:
 58        return False
 59
 60def _get_image(context):
 61    
 62    if context is not None:
 63        image, url = get_random_image(_get_lewd(context))
 64    
 65    if image is None:
 66        logging.warning("Getting Image failed")
 67        raise TelegramError("bad image")
 68    
 69    markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", context), url=url)]])
 70
 71    return image, markup
 72
 73def _get_all(update, check_fn, context):
 74    
 75    image_reply, text_reply, author_reply = _get_reply(update.message.reply_to_message)
 76    image_content, text_content, author_content = _get_message_content(update.message)
 77    
 78    info_struct = {
 79        "reply": {
 80            "author": author_reply,
 81            "text": text_reply,
 82            "image": image_reply
 83        },
 84        "content": {
 85            "author": author_content,
 86            "text": text_content,
 87            "image": image_content
 88        }
 89    }
 90        
 91    logging.info(f"User {update.message.from_user.full_name}{f' (@{update.message.from_user.username})' if update.message.from_user.username is not None else ''} typed: {str(update.message.text)}")
 92    
 93    content = check_fn(info_struct)
 94    
 95    if content is None:
 96        return None, None, None
 97
 98    markup = ""
 99    image = None
100    
101    if image_reply is not None:
102        image = image_reply
103    
104    if image_content is not None:
105        image = image_content
106    
107    if image is None:
108        image, markup = _get_image(context)
109    
110    return content, image, markup
111
112def start(update: Update, context: CallbackContext):
113    context.bot.send_message(chat_id=update.effective_chat.id, text=l("welcome", context))
114
115async def set_lewd(update: Update, context: CallbackContext):
116    
117    try:
118        output = False if context.chat_data["lewd"] else True
119    except KeyError:
120        output = True
121        
122    context.chat_data['lewd'] = output
123    message = l("lewd_toggle", context).format(l("enabled", context) if output else l("disabled", context))
124    
125    return await context.bot.send_message(chat_id=update.effective_chat.id, text=message)
126
127async def pic(update: Update, context: CallbackContext):
128    
129    image, markup = _get_image(context)
130    return await update.message.reply_photo(photo=img_to_bio(image), parse_mode="markdown", reply_markup=markup)
131
132def _get_author(message):
133    
134    if message.forward_from is not None:
135        return format_author(message.forward_from)
136    
137    if message.forward_sender_name is not None:
138        return message.forward_sender_name
139    
140    if message.forward_from_chat is not None:
141        return message.forward_from_chat.title + ("" if message.forward_from_chat.username is None else f" ({message.forward_from_chat.username})")
142    
143    return format_author(message.from_user)
144
145def tt_check(info):
146    
147    reply = info['reply']['text']
148    content = info['content']['text']
149    
150    input_text = f"{reply} {content}".replace("\n", " ")
151    
152    if input_text.strip() == "":
153        return None
154
155    return input_text
156
157def ttbt_check(info):
158    
159    reply = info['reply']['text'].strip()
160    content = info['content']['text'].strip()
161    
162    if len(content.split("\n")) > 1:
163        input_text = content
164    else:
165        input_text = f"{reply}\n{content}"
166    
167    if input_text.strip() == "":
168        return None
169
170    return input_text
171
172def splash_check(info):
173    
174    reply = info['reply']['text']
175    content = info['content']['text']
176    
177    if content.strip() == "":
178        author = info['reply']['author']
179        input_text = f"{author}\n{reply}"
180    else:
181        author = info['content']['author']
182        input_text = f"{author}\n{content}"
183
184    if len(input_text.strip().split("\n")) < 2:
185        return None
186
187    return input_text
188
189def wot_check(info):
190    
191    reply = info['reply']['text']
192    content = info['content']['text']
193    
194    input_text = f"{reply}\n{content}"
195
196    if input_text.strip() == "":
197        return None
198
199    return input_text
200
201async def ttbt(update: Update, context: CallbackContext):
202    
203    content, image, markup = _get_all(update, ttbt_check, context)
204    
205    if image is None:
206        return await update.message.reply_text(l("no_caption", context))
207    
208    image = tt_bt_effect(content, image)
209
210    if image is None:
211        return await update.message.reply_text(l("failed_effect", context))
212
213    return await update.message.reply_photo(photo=image, reply_markup=markup)
214
215
216async def tt(update: Update, context: CallbackContext):
217    
218    content, image, markup = _get_all(update, tt_check, context)
219    
220    if image is None:
221        return await update.message.reply_text(l("no_caption", context))
222    
223    image = tt_bt_effect(content, image)
224
225    if image is None:
226        return await update.message.reply_text(l("failed_effect", context))
227
228    return await update.message.reply_photo(photo=image, reply_markup=markup)
229    
230async def bt(update: Update, context: CallbackContext):
231    
232    content, image, markup = _get_all(update, tt_check, context)
233    
234    if image is None:
235        return await update.message.reply_text(l("no_caption", context))
236
237    image = bt_effect(content, image)
238
239    if image is None:
240        return await update.message.reply_text(l("failed_effect", context))
241
242    return await update.message.reply_photo(photo=image, reply_markup=markup)
243
244async def splash(update: Update, context: CallbackContext):
245    
246    content, image, markup = _get_all(update, splash_check, context)
247    
248    if image is None:
249        return await update.message.reply_text(l("no_caption", context))
250    
251    image = splash_effect(content, image)
252
253    if image is None:
254        return await update.message.reply_text(l("failed_effect", context))
255    
256    return await update.message.reply_photo(photo=image, reply_markup=markup)
257    
258async def wot(update: Update, context: CallbackContext):
259    
260    content, image, markup = _get_all(update, wot_check, context)
261    
262    if image is None:
263        return await update.message.reply_text(l("no_caption", context))
264    
265    image = wot_effect(content, image)
266
267    if image is None:
268        await update.message.reply_text(l("failed_effect", context))
269    
270    await update.message.reply_photo(photo=image, reply_markup=markup)
271    
272async def text(update: Update, context: CallbackContext):
273    
274    content, image, markup = _get_all(update, wot_check, context)
275    
276    if image is None:
277        await update.message.reply_text(l("no_caption", context))
278        return
279    
280    image = text_effect(content, image)
281
282    if image is None:
283        await update.message.reply_text(l("failed_effect", context))
284    
285    await update.message.reply_photo(photo=image, reply_markup=markup)
286
287async def caps(update: Update, context: CallbackContext):
288    
289    _, reply, _ = _get_reply(update.message.reply_to_message, ' '.join(context.args))
290    await context.bot.send_message(chat_id=update.effective_chat.id, text=reply.upper())
291
292async def _set_lang(update: Update, context: CallbackContext, lang: str):
293    context.chat_data["lang"] = lang
294    await context.bot.send_message(chat_id=update.effective_chat.id, text=l("language_set", context).format(format_lang(lang)))
295
296async def lang(update: Update, context: CallbackContext):
297    try:
298        selected = str(context.args[0])
299    except IndexError:
300        selected = None
301    
302    if selected is None:
303        lang = format_lang(get_lang(context))
304        choices = ", ".join(langs) + "."
305        return await update.message.reply_text(text=l("current_language", context).format(lang, choices), reply_markup=lang_markup)
306    
307    if selected not in langs:
308        return await update.message.reply_text(text=l("invalid_language", context))
309    
310    return await _set_lang(update, context, selected)
311    
312def unknown(update: Update, context: CallbackContext):
313    logging.info(f"User {update.message.from_user.full_name} sent {update.message.text_markdown_v2} and I don't know what that means.")
314    
315async def error_callback(update: Update, context: CallbackContext):
316    try:
317        raise context.error
318    except TelegramError as e:
319        logging.error("TelegramError! " + str(e))
320        await context.bot.send_message(chat_id=update.effective_chat.id, text=l('error', context))
321        
322def _add_effect_handler(application: ApplicationBuilder, command: str, callback):
323    application.add_handler(CommandHandler(command, callback))
324    application.add_handler(MessageHandler(filters.Caption([f"/{command}"]), callback))
325
326async def keyboard_handler(update: Update, context: CallbackContext):
327    query = update.callback_query
328    data = query.data
329    
330    if data.startswith("reroll"):
331        amount = int(data.split(" ")[1])
332        
333        if amount <= 1:
334            return await spin(update, context)
335        return await autospin(context, update.effective_chat.id, amount)
336    
337    match data:
338        case "none":
339            return query.answer(l("none_callback", context))
340        case "set_lang_en":
341            lang = "en"
342            await _set_lang(update, context, lang)
343            return await query.answer(l("language_set", context).format(format_lang(lang)))
344        case "set_lang_it":
345            lang = "it"
346            await _set_lang(update, context, lang)
347            return await query.answer(l("language_set", context).format(format_lang(lang)))
348        case other:
349            logging.error(f"unknown callback: {data}")
350    
351    return await query.answer()
352
353def main():
354    
355    pers = PersistenceInput(bot_data = False, callback_data = False)
356    
357    application = ApplicationBuilder()
358    application.token(os.getenv("token"))
359    application.persistence(PicklePersistence(filepath='bot-data.pkl', store_data=pers))
360    
361    application = application.build()
362
363    
364    application.add_error_handler(error_callback)
365    application.add_handler(CallbackQueryHandler(callback=keyboard_handler))
366    
367    # commands
368    application.add_handler(CommandHandler('start', start))
369    application.add_handler(CommandHandler('lang', lang))
370    application.add_handler(CommandHandler('lewd', set_lewd))
371    application.add_handler(CommandHandler('caps', caps))
372    application.add_handler(CommandHandler('pic', pic))
373    
374    # effects
375    _add_effect_handler(application, 'ttbt', ttbt)
376    _add_effect_handler(application, 'tt', tt)
377    _add_effect_handler(application, 'bt', bt)
378    _add_effect_handler(application, 'splash', splash)
379    _add_effect_handler(application, 'wot', wot)
380    _add_effect_handler(application, 'text', text)
381    
382    # games
383    application.add_handler(CommandHandler('spin', spin))
384    application.add_handler(CommandHandler('bet', bet))
385    application.add_handler(CommandHandler('cash', cash))
386    
387    # fallback
388    application.add_handler(MessageHandler(filters.Command(), unknown))
389    application.run_polling()
390
391if __name__ ==  "__main__":
392    main()