all repos — python-meme-bot @ b12530d195c1ac7ff3914ad87b62b488f6bad5b3

Telegram Bot that uses PIL to compute light image processing.

code refactor
Marco Andronaco andronacomarco@gmail.com
Mon, 20 Mar 2023 22:13:35 +0100
commit

b12530d195c1ac7ff3914ad87b62b488f6bad5b3

parent

f51476814de89c368f723295be3bb3b623cf907e

4 files changed, 179 insertions(+), 151 deletions(-)

jump to
M Api.pyApi.py

@@ -1,25 +1,26 @@

- -import requests, logging -from PIL import Image, UnidentifiedImageError +import logging +import requests from io import BytesIO +from PIL import Image, UnidentifiedImageError from anime_api.apis import WaifuPicsAPI from anime_api.apis.waifu_pics.types import ImageCategory max_tries = 5 -supported_file_types = [ ".jpg", ".jpeg", ".png" ] +supported_file_types = [".jpg", ".jpeg", ".png"] api = WaifuPicsAPI() + def _valid_extension(fname: str): for t in supported_file_types: if fname.lower().endswith(t): return True return False + def get_random_image(nsfw=False): - cat = ImageCategory.NSFW.WAIFU if nsfw else ImageCategory.SFW.WAIFU - + count = 0 while count < max_tries: try:

@@ -28,14 +29,14 @@ if not _valid_extension(img.url):

raise Exception r = requests.get(img.url) image = Image.open(BytesIO(r.content)) - + return image, img.url - + except (KeyError, IndexError, Exception): logging.warning("Can't display image.") except UnidentifiedImageError: logging.warning("Unidentified image: " + img.url) - + count += 1 logging.warning(f"Try #{count} failed.\n") logging.error(f"Reached {count} tries. Giving up.")
M Slot.pySlot.py

@@ -1,15 +1,15 @@

-from telegram import Update, Dice, InlineKeyboardMarkup, InlineKeyboardButton +from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton from telegram.ext import CallbackContext from datetime import date from Constants import get_localized_string as l import Constants as c -import time autospin_cap = 20 lastreset_default = date(1970, 1, 1) cash_default = 5000 bet_default = 50 slot_emoji = '🎰' + def read_arg(context: CallbackContext, default=None, cast=int): try:

@@ -17,57 +17,66 @@ return cast(context.args[0].replace(",", ".").replace("€", ""))

except (TypeError, IndexError, ValueError): return default -def set_user_value(context: CallbackContext, key:str, amount): + +def set_user_value(context: CallbackContext, key: str, amount): context.user_data[key] = amount return amount -def get_user_value(context: CallbackContext, key:str, default): + +def get_user_value(context: CallbackContext, key: str, default): try: return context.user_data[key] except KeyError: - #print(f"set {key} to {str(default)}") + # print(f"set {key} to {str(default)}") return set_user_value(context, key, default) + def get_cash(context: CallbackContext): return get_user_value(context, "cash", cash_default) + def set_cash(context: CallbackContext, amount: int): return set_user_value(context, "cash", amount) + def get_bet(context: CallbackContext): return get_user_value(context, "bet", bet_default) + def set_lastreset(context: CallbackContext, amount: int): return set_user_value(context, "lastreset", amount) + def get_lastreset(context: CallbackContext): return get_user_value(context, "lastreset", lastreset_default) - + + def set_bet(context: CallbackContext, amount: int): return set_user_value(context, "bet", max(0, amount)) -async def _spin(context: CallbackContext, id: float, delay=True): +async def _spin(context: CallbackContext, id: float, delay=True): bet = get_bet(context) cash = get_cash(context) - + if cash < bet: await context.bot.send_message(chat_id=id, text=l("not_enough_cash", context)) return None - + cash = set_cash(context, cash - bet) - + message = await context.bot.send_dice(chat_id=id, emoji=slot_emoji, disable_notification=True) - + multiplier = get_multiplier(message.dice.value) - + win = bet * multiplier - + cash = set_cash(context, cash + win) - + if delay: - markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("reroll", context).format(bet / 100), callback_data="reroll 1")]]) - + markup = InlineKeyboardMarkup( + [[InlineKeyboardButton(text=l("reroll", context).format(bet / 100), callback_data="reroll 1")]]) + text = l("you_lost", context) if multiplier == 0 else l("you_won", context).format(win / 100) if bet != 0: text += l("cash_result", context).format(cash / 100)

@@ -79,89 +88,93 @@ "reply_markup": markup

} context.job_queue.run_once(show_result, 2, data=args, name=str(id)) else: - message.edit_reply_markup(InlineKeyboardMarkup([[InlineKeyboardButton(text=l("fast_output", context).format(win / 100), callback_data="none")]])) + message.edit_reply_markup(InlineKeyboardMarkup( + [[InlineKeyboardButton(text=l("fast_output", context).format(win / 100), callback_data="none")]])) return win + async def show_result(context: CallbackContext): con = context.job.data - return await context.bot.send_message(chat_id=con["chat_id"], text=con["text"], reply_markup=con["reply_markup"], disable_notification=True) - + return await context.bot.send_message(chat_id=con["chat_id"], text=con["text"], reply_markup=con["reply_markup"], + disable_notification=True) + + async def spin(update: Update, context: CallbackContext): - bet = get_bet(context) / 100 - + amount = read_arg(context, 1) - + if amount > 1 and update.effective_chat.type != 'private': amount = 1 await context.bot.send_message(chat_id=update.effective_chat.id, text=l("no_autospin", context)) - + if amount == 1: return await _spin(context=context, id=update.effective_chat.id) else: return await autospin(context=context, id=update.effective_chat.id, amount=amount) - + + async def autospin(context: CallbackContext, id: int, amount: int): - bet = get_bet(context) / 100 count = 0 total_win = 0 amount = max(2, min(autospin_cap, amount)) - + for i in range(amount): - + win = await _spin(context=context, id=id, delay=False) - + if win is None: break - + count += 1 total_win += win - + result = l("summary", context).format(count * bet, total_win / 100) - markup = InlineKeyboardMarkup([[InlineKeyboardButton(text="Altri {} spin (-{}€)".format(amount, bet * amount), callback_data=f"reroll {amount}")]]) + markup = InlineKeyboardMarkup([[InlineKeyboardButton(text="Altri {} spin (-{}€)".format(amount, bet * amount), + callback_data=f"reroll {amount}")]]) return await context.bot.send_message(chat_id=id, text=result, reply_markup=markup, disable_notification=False) + async def bet(update: Update, context: CallbackContext): - amount = read_arg(context=context, cast=float) - + if amount is not None: bet = set_bet(context, int(amount * 100)) else: bet = get_bet(context) - + result = l("current_bet", context).format(c.format_author(update.effective_user), bet / 100) return await context.bot.send_message(chat_id=update.effective_chat.id, text=result) - + + def cash(update: Update, context: CallbackContext): - cash = get_cash(context) / 100 - + if cash < cash_default / 200: lastreset = get_lastreset(context) today = date.today() - + if lastreset < today: set_lastreset(context, today) cash = set_cash(context, cash_default) - + result = l("cash_reset", context).format(c.format_author(update.effective_user), cash / 100) return update.message.reply_text(text=result) else: - return update.message.reply_text(text=l("cash_reset_fail", context).format(c.format_author(update.effective_user), cash)) - - + return update.message.reply_text( + text=l("cash_reset_fail", context).format(c.format_author(update.effective_user), cash)) + result = l("current_cash", context).format(c.format_author(update.effective_user), cash) return update.message.reply_text(text=result) - + + def get_multiplier(value: int): - try: values = c.slot_machine_value[value] except IndexError: values = c.slot_machine_value[5] - + try: return c.win_table[values] except KeyError:
M main.pymain.py

@@ -1,82 +1,85 @@

+import os +import logging +from io import BytesIO + from PIL import Image +from dotenv import load_dotenv +from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton +from telegram.error import TelegramError +from telegram.ext import ApplicationBuilder, CallbackContext, CallbackQueryHandler, CommandHandler, MessageHandler, \ + PicklePersistence, filters, PersistenceInput + from Api import get_random_image -from Effects import img_to_bio, tt_bt_effect, bt_effect, splash_effect, wot_effect, text_effect from Constants import get_localized_string as l, format_author, format_lang, langs, get_lang, lang_markup +from Effects import img_to_bio, tt_bt_effect, bt_effect, splash_effect, wot_effect, text_effect from Slot import spin, autospin, bet, cash -from dotenv import load_dotenv load_dotenv() -import os, logging logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO) -from io import BytesIO -from telegram.error import TelegramError -from telegram.ext import ApplicationBuilder, Updater, CallbackContext, CallbackQueryHandler, CommandHandler, MessageHandler, PicklePersistence, filters, PersistenceInput -from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton async def _get_message_content(message): - image = None if len(message.photo) > 0: p = message.photo[-1] i = await p.get_file() d = await i.download_as_bytearray() image = Image.open(BytesIO(d)) - + content = "" if message.text is not None: content = message.text.strip() elif message.caption is not None: content = message.caption.strip() - + lines = content.split("\n") r = lines[0].split(" ") - + try: if r[0][0] == '/': r.pop(0) except IndexError: pass - + lines[0] = " ".join(r) content = "\n".join(lines) - + return image, content, _get_author(message) + async def _get_reply(message, fallback=""): - if message is None: return None, fallback, None - + image, content, author = await _get_message_content(message) - + return image, content, author + def _get_lewd(context): - try: return context.chat_data["lewd"] except KeyError: return False + def _get_image(context): - if context is not None: image, url = get_random_image(_get_lewd(context)) - + if image is None: logging.warning("Getting Image failed") raise TelegramError("bad image") - + markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", context), url=url)]]) return image, markup + async def _get_all(update, check_fn, context): - image_reply, text_reply, author_reply = await _get_reply(update.message.reply_to_message) image_content, text_content, author_content = await _get_message_content(update.message) - + info_struct = { "reply": { "author": author_reply,

@@ -89,93 +92,96 @@ "text": text_content,

"image": image_content } } - - logging.info(f"User {update.message.from_user.full_name}{f' (@{update.message.from_user.username})' if update.message.from_user.username is not None else ''} typed: {str(update.message.text)}") - + + logging.info( + f"User {update.message.from_user.full_name}{f' (@{update.message.from_user.username})' if update.message.from_user.username is not None else ''} typed: {str(update.message.text)}") + content = check_fn(info_struct) - + if content is None: return None, None, None markup = "" image = None - + if image_reply is not None: image = image_reply - + if image_content is not None: image = image_content - + if image is None: image, markup = _get_image(context) - + return content, image, markup + def start(update: Update, context: CallbackContext): context.bot.send_message(chat_id=update.effective_chat.id, text=l("welcome", context)) + async def set_lewd(update: Update, context: CallbackContext): - try: output = False if context.chat_data["lewd"] else True except KeyError: output = True - + context.chat_data['lewd'] = output message = l("lewd_toggle", context).format(l("enabled", context) if output else l("disabled", context)) - + return await context.bot.send_message(chat_id=update.effective_chat.id, text=message) + async def pic(update: Update, context: CallbackContext): - image, markup = _get_image(context) return await update.message.reply_photo(photo=img_to_bio(image), parse_mode="markdown", reply_markup=markup) + def _get_author(message): - if message.forward_from is not None: return format_author(message.forward_from) - + if message.forward_sender_name is not None: return message.forward_sender_name - + if message.forward_from_chat is not None: - return message.forward_from_chat.title + ("" if message.forward_from_chat.username is None else f" ({message.forward_from_chat.username})") - + return message.forward_from_chat.title + ( + "" if message.forward_from_chat.username is None else f" ({message.forward_from_chat.username})") + return format_author(message.from_user) + def tt_check(info): - reply = info['reply']['text'] content = info['content']['text'] - + input_text = f"{reply} {content}".replace("\n", " ") - + if input_text.strip() == "": return None return input_text + def ttbt_check(info): - reply = info['reply']['text'].strip() content = info['content']['text'].strip() - + if len(content.split("\n")) > 1: input_text = content else: input_text = f"{reply}\n{content}" - + if input_text.strip() == "": return None return input_text + def splash_check(info): - reply = info['reply']['text'] content = info['content']['text'] - + if content.strip() == "": author = info['reply']['author'] input_text = f"{author}\n{reply}"

@@ -188,11 +194,11 @@ return None

return input_text + def wot_check(info): - reply = info['reply']['text'] content = info['content']['text'] - + input_text = f"{reply}\n{content}" if input_text.strip() == "":

@@ -200,13 +206,13 @@ return None

return input_text + async def ttbt(update: Update, context: CallbackContext): - content, image, markup = await _get_all(update, ttbt_check, context) - + if image is None: return await update.message.reply_text(l("no_caption", context)) - + image = tt_bt_effect(content, image) if image is None:

@@ -216,23 +222,22 @@ return await update.message.reply_photo(photo=image, reply_markup=markup)

async def tt(update: Update, context: CallbackContext): - content, image, markup = await _get_all(update, tt_check, context) - + if image is None: return await update.message.reply_text(l("no_caption", context)) - + image = tt_bt_effect(content, image) if image is None: return await update.message.reply_text(l("failed_effect", context)) return await update.message.reply_photo(photo=image, reply_markup=markup) - + + async def bt(update: Update, context: CallbackContext): - content, image, markup = await _get_all(update, tt_check, context) - + if image is None: return await update.message.reply_text(l("no_caption", context))

@@ -243,99 +248,108 @@ return await update.message.reply_text(l("failed_effect", context))

return await update.message.reply_photo(photo=image, reply_markup=markup) + async def splash(update: Update, context: CallbackContext): - content, image, markup = await _get_all(update, splash_check, context) - + if image is None: return await update.message.reply_text(l("no_caption", context)) - + image = splash_effect(content, image) if image is None: return await update.message.reply_text(l("failed_effect", context)) - + return await update.message.reply_photo(photo=image, reply_markup=markup) - + + async def wot(update: Update, context: CallbackContext): - content, image, markup = await _get_all(update, wot_check, context) - + if image is None: return await update.message.reply_text(l("no_caption", context)) - + image = wot_effect(content, image) if image is None: await update.message.reply_text(l("failed_effect", context)) - + await update.message.reply_photo(photo=image, reply_markup=markup) - + + async def text(update: Update, context: CallbackContext): - content, image, markup = await _get_all(update, wot_check, context) - + if image is None: await update.message.reply_text(l("no_caption", context)) return - + image = text_effect(content, image) if image is None: await update.message.reply_text(l("failed_effect", context)) - + await update.message.reply_photo(photo=image, reply_markup=markup) + async def caps(update: Update, context: CallbackContext): - _, reply, _ = await _get_reply(update.message.reply_to_message, ' '.join(context.args)) await context.bot.send_message(chat_id=update.effective_chat.id, text=reply.upper()) + async def _set_lang(update: Update, context: CallbackContext, lang: str): context.chat_data["lang"] = lang - await context.bot.send_message(chat_id=update.effective_chat.id, text=l("language_set", context).format(format_lang(lang))) + await context.bot.send_message(chat_id=update.effective_chat.id, + text=l("language_set", context).format(format_lang(lang))) + async def lang(update: Update, context: CallbackContext): try: selected = str(context.args[0]) except IndexError: selected = None - + if selected is None: lang = format_lang(get_lang(context)) choices = ", ".join(langs) + "." - return await update.message.reply_text(text=l("current_language", context).format(lang, choices), reply_markup=lang_markup) - + return await update.message.reply_text(text=l("current_language", context).format(lang, choices), + reply_markup=lang_markup) + if selected not in langs: return await update.message.reply_text(text=l("invalid_language", context)) - + return await _set_lang(update, context, selected) - + + def unknown(update: Update, context: CallbackContext): - logging.info(f"User {update.message.from_user.full_name} sent {update.message.text_markdown_v2} and I don't know what that means.") - + logging.info( + f"User {update.message.from_user.full_name} sent {update.message.text_markdown_v2} and I don't know what that means.") + + async def error_callback(update: Update, context: CallbackContext): try: raise context.error except TelegramError as e: logging.error("TelegramError! " + str(e)) await context.bot.send_message(chat_id=update.effective_chat.id, text=l('error', context)) - + + def _add_effect_handler(application: ApplicationBuilder, command: str, callback): application.add_handler(CommandHandler(command, callback)) application.add_handler(MessageHandler(filters.Caption([f"/{command}"]), callback)) + async def keyboard_handler(update: Update, context: CallbackContext): query = update.callback_query data = query.data - + if data.startswith("reroll"): amount = int(data.split(" ")[1]) - + if amount <= 1: return await spin(update, context) return await autospin(context, update.effective_chat.id, amount) - + match data: case "none": return query.answer(l("none_callback", context))

@@ -349,30 +363,29 @@ await _set_lang(update, context, lang)

return await query.answer(l("language_set", context).format(format_lang(lang))) case other: logging.error(f"unknown callback: {data}") - + return await query.answer() + def main(): - - pers = PersistenceInput(bot_data = False, callback_data = False) - + pers = PersistenceInput(bot_data=False, callback_data=False) + application = ApplicationBuilder() application.token(os.getenv("token")) application.persistence(PicklePersistence(filepath='bot-data.pkl', store_data=pers)) - + application = application.build() - application.add_error_handler(error_callback) application.add_handler(CallbackQueryHandler(callback=keyboard_handler)) - + # commands application.add_handler(CommandHandler('start', start)) application.add_handler(CommandHandler('lang', lang)) application.add_handler(CommandHandler('lewd', set_lewd)) application.add_handler(CommandHandler('caps', caps)) application.add_handler(CommandHandler('pic', pic)) - + # effects _add_effect_handler(application, 'ttbt', ttbt) _add_effect_handler(application, 'tt', tt)

@@ -380,15 +393,16 @@ _add_effect_handler(application, 'bt', bt)

_add_effect_handler(application, 'splash', splash) _add_effect_handler(application, 'wot', wot) _add_effect_handler(application, 'text', text) - + # games application.add_handler(CommandHandler('spin', spin)) application.add_handler(CommandHandler('bet', bet)) application.add_handler(CommandHandler('cash', cash)) - + # fallback application.add_handler(MessageHandler(filters.Command(), unknown)) application.run_polling() -if __name__ == "__main__": + +if __name__ == "__main__": main()
M requirements.txtrequirements.txt

@@ -1,5 +1,5 @@

anime-api==0.15.4 Pillow==9.4.0 -python-dotenv==0.21.1 +python-dotenv==1.0.0 python-telegram-bot[job-queue]==20.1 requests==2.28.2