all repos — python-meme-bot @ 881781f58f4949319ef6afcbd6421c43e8a6ae24

Telegram Bot that uses PIL to compute light image processing.

major code refactor
BiRabittoh andronacomarco@gmail.com
Fri, 31 May 2024 11:24:11 +0200
commit

881781f58f4949319ef6afcbd6421c43e8a6ae24

parent

9ab1c46da6bfcb2c0526266cea0e9f267ef0c40f

M python_meme_bot/bot.pypython_meme_bot/bot.py

@@ -1,126 +1,44 @@

-import os -import logging -from io import BytesIO +from functools import partial +import os, logging + -from PIL import Image from dotenv import load_dotenv -from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton, Message -from telegram.error import TelegramError +from telegram import Update from telegram.ext import ApplicationBuilder, CallbackQueryHandler, CommandHandler, MessageHandler, \ PicklePersistence, filters, PersistenceInput, ContextTypes +from telegram.error import TelegramError -from .api import get_random_image -from .constants import format_chat, get_localized_string as l, format_author, format_lang, langs, get_lang, lang_markup -from .effects import img_to_bio, tt_bt_effect, bt_effect, splash_effect, wot_effect, text_effect +from .utils import get_all, get_image, get_message_content +from .effects.functions import img_to_bio from .slot import spin, autospin, bet, cash - -load_dotenv() -logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO) - - -async def _get_message_content(message): - image = None - if len(message.photo) > 0: - p = message.photo[-1] - i = await p.get_file() - d = await i.download_as_bytearray() - image = Image.open(BytesIO(d)) +from .effects import effectsDict +from .localization import get_localized_string as l, format_lang, get_lang, langs, lang_markup - content = "" - if message.text is not None: - content = message.text.strip() - elif message.caption is not None: - content = message.caption.strip() - - lines = content.split("\n") - r = lines[0].split(" ") - +async def effect_handler(update: Update, context: ContextTypes.DEFAULT_TYPE, effect_name: str): try: - if r[0][0] == '/': - r.pop(0) - except IndexError: - pass - - lines[0] = " ".join(r) - content = "\n".join(lines) - - return image, content, _get_author(message) - - -async def _get_reply(message, fallback=""): - if message is None: - return None, fallback, None - - image, content, author = await _get_message_content(message) - - return image, content, author - - -def _get_lewd(context): - try: - return context.chat_data["lewd"] + effect_check, effect_fn = effectsDict[effect_name] except KeyError: - return False - - -def _get_image(context): - if context is not None: - image, url = get_random_image(_get_lewd(context)) + raise TelegramError("effect not supported: " + effect_name) + + content, image, markup = await get_all(update, effect_check, context) if image is None: - logging.warning("Getting Image failed") - raise TelegramError("bad image") - - markup = InlineKeyboardMarkup([[InlineKeyboardButton(text=l("sauce", context), url=url)]]) - - return image, markup - - -async def _get_all(update, check_fn, context): - image_reply, text_reply, author_reply = await _get_reply(update.message.reply_to_message) - image_content, text_content, author_content = await _get_message_content(update.message) - - info_struct = { - "reply": { - "author": author_reply, - "text": text_reply, - "image": image_reply - }, - "content": { - "author": author_content, - "text": text_content, - "image": image_content - } - } - - logging.info( - f"User {update.message.from_user.full_name}{f' (@{update.message.from_user.username})' if update.message.from_user.username is not None else ''} typed: {str(update.message.text)}") - - content = check_fn(info_struct) - - if content is None: - return None, None, None + await update.message.reply_text(l("no_caption", context)) + return - markup = "" - image = None - - if image_reply is not None: - image = image_reply - - if image_content is not None: - image = image_content + image = effect_fn(content, image) if image is None: - image, markup = _get_image(context) - - return content, image, markup + await update.message.reply_text(l("failed_effect", context)) + return + await update.message.reply_photo(photo=image, reply_markup=markup) async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_text(l("welcome", context)) -async def set_lewd(update: Update, context: ContextTypes.DEFAULT_TYPE): +async def lewd(update: Update, context: ContextTypes.DEFAULT_TYPE): try: output = False if context.chat_data["lewd"] else True except KeyError:

@@ -128,186 +46,16 @@ output = True

context.chat_data['lewd'] = output message = l("lewd_toggle", context).format(l("enabled", context) if output else l("disabled", context)) - return await update.message.reply_text(message) + await update.message.reply_text(message) async def pic(update: Update, context: ContextTypes.DEFAULT_TYPE): - image, markup = _get_image(context) - return await update.message.reply_photo(photo=img_to_bio(image), parse_mode="markdown", reply_markup=markup) - - -def _get_author(message: Message): - origin = message.forward_origin - - if origin is None: # message was not forwarded - return format_author(message.from_user) - - try: - return format_author(origin['sender_user']) # MessageOriginUser - except KeyError: - pass - - try: - return origin['sender_user_name'] # MessageOriginHiddenUser - except KeyError: - pass - - try: - return format_chat(origin['sender_chat']) # MessageOriginChat - except KeyError: - pass - try: - return format_chat(origin['chat']) # MessageOriginChannel - except KeyError: - pass - - logging.warn("Message was forwarded but I couldn't detect the original author.") - return format_author(message.from_user) - - -def tt_check(info): - reply = info['reply']['text'] - content = info['content']['text'] - - input_text = f"{reply} {content}".replace("\n", " ") - - if input_text.strip() == "": - return None - - return input_text - - -def ttbt_check(info): - reply = info['reply']['text'].strip() - content = info['content']['text'].strip() - - if len(content.split("\n")) > 1: - input_text = content - else: - input_text = f"{reply}\n{content}" - - if input_text.strip() == "": - return None - - return input_text - - -def splash_check(info): - reply = info['reply']['text'] - content = info['content']['text'] - - if content.strip() == "": - author = info['reply']['author'] - input_text = f"{author}\n{reply}" - else: - author = info['content']['author'] - input_text = f"{author}\n{content}" - - if len(input_text.strip().split("\n")) < 2: - return None - - return input_text - - -def wot_check(info): - reply = info['reply']['text'] - content = info['content']['text'] - - input_text = f"{reply}\n{content}" - - if input_text.strip() == "": - return None - - return input_text - - -async def ttbt(update: Update, context: ContextTypes.DEFAULT_TYPE): - content, image, markup = await _get_all(update, ttbt_check, context) - - if image is None: - return await update.message.reply_text(l("no_caption", context)) - - image = tt_bt_effect(content, image) - - if image is None: - return await update.message.reply_text(l("failed_effect", context)) - - return await update.message.reply_photo(photo=image, reply_markup=markup) - - -async def tt(update: Update, context: ContextTypes.DEFAULT_TYPE): - content, image, markup = await _get_all(update, tt_check, context) - - if image is None: - return await update.message.reply_text(l("no_caption", context)) - - image = tt_bt_effect(content, image) - - if image is None: - return await update.message.reply_text(l("failed_effect", context)) - - return await update.message.reply_photo(photo=image, reply_markup=markup) - - -async def bt(update: Update, context: ContextTypes.DEFAULT_TYPE): - content, image, markup = await _get_all(update, tt_check, context) - - if image is None: - return await update.message.reply_text(l("no_caption", context)) - - image = bt_effect(content, image) - - if image is None: - return await update.message.reply_text(l("failed_effect", context)) - - return await update.message.reply_photo(photo=image, reply_markup=markup) - - -async def splash(update: Update, context: ContextTypes.DEFAULT_TYPE): - content, image, markup = await _get_all(update, splash_check, context) - - if image is None: - return await update.message.reply_text(l("no_caption", context)) - - image = splash_effect(content, image) - - if image is None: - return await update.message.reply_text(l("failed_effect", context)) - - return await update.message.reply_photo(photo=image, reply_markup=markup) - - -async def wot(update: Update, context: ContextTypes.DEFAULT_TYPE): - content, image, markup = await _get_all(update, wot_check, context) - - if image is None: - return await update.message.reply_text(l("no_caption", context)) - - image = wot_effect(content, image) - - if image is None: - await update.message.reply_text(l("failed_effect", context)) - - await update.message.reply_photo(photo=image, reply_markup=markup) - - -async def text(update: Update, context: ContextTypes.DEFAULT_TYPE): - content, image, markup = await _get_all(update, wot_check, context) - - if image is None: - await update.message.reply_text(l("no_caption", context)) - return - - image = text_effect(content, image) - - if image is None: - await update.message.reply_text(l("failed_effect", context)) - - await update.message.reply_photo(photo=image, reply_markup=markup) + image, markup = get_image(context) + await update.message.reply_photo(photo=img_to_bio(image), parse_mode="markdown", reply_markup=markup) async def caps(update: Update, context: ContextTypes.DEFAULT_TYPE): - _, reply, _ = await _get_reply(update.message.reply_to_message, ' '.join(context.args)) + _, reply, _ = await get_message_content(update.message.reply_to_message, ' '.join(context.args)) await update.message.reply_text(reply.upper())

@@ -326,8 +74,8 @@

if selected is None: lang = format_lang(get_lang(context)) choices = ", ".join(langs) + "." - return await update.message.reply_text(text=l("current_language", context).format(lang, choices), - reply_markup=lang_markup) + text = l("current_language", context).format(lang, choices) + return await update.message.reply_text(text,reply_markup=lang_markup) if selected not in langs: return await update.message.reply_text(text=l("invalid_language", context))

@@ -342,15 +90,16 @@

async def error_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): try: raise context.error - except TelegramError as e: - logging.error("TelegramError! " + str(e)) + except Exception as e: + logging.error(str(e)) await update.message.reply_text(l('error', context)) -def _add_effect_handler(application: ApplicationBuilder, command: str, callback): +def _add_effect_handler(application: ApplicationBuilder, command: str): + callback = partial(effect_handler, effect_name=command) + application.add_handler(CommandHandler(command, callback)) application.add_handler(MessageHandler(filters.Caption([f"/{command}"]), callback)) - async def keyboard_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): query = update.callback_query

@@ -381,6 +130,9 @@ return await query.answer()

def main(): + load_dotenv() + logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO) + token = os.getenv("token") pers = PersistenceInput(bot_data=False, callback_data=False) persistence = PicklePersistence(filepath='bot-data.pkl', store_data=pers)

@@ -393,17 +145,17 @@

# commands application.add_handler(CommandHandler('start', start)) application.add_handler(CommandHandler('lang', lang)) - application.add_handler(CommandHandler('lewd', set_lewd)) + application.add_handler(CommandHandler('lewd', lewd)) application.add_handler(CommandHandler('caps', caps)) application.add_handler(CommandHandler('pic', pic)) # effects - _add_effect_handler(application, 'ttbt', ttbt) - _add_effect_handler(application, 'tt', tt) - _add_effect_handler(application, 'bt', bt) - _add_effect_handler(application, 'splash', splash) - _add_effect_handler(application, 'wot', wot) - _add_effect_handler(application, 'text', text) + _add_effect_handler(application, 'ttbt') + _add_effect_handler(application, 'tt') + _add_effect_handler(application, 'bt') + _add_effect_handler(application, 'splash') + _add_effect_handler(application, 'wot') + _add_effect_handler(application, 'text') # games application.add_handler(CommandHandler('spin', spin))
M python_meme_bot/constants.pypython_meme_bot/constants.py

@@ -1,114 +1,3 @@

-from telegram import InlineKeyboardMarkup, InlineKeyboardButton, User, Chat -from telegram.ext import CallbackContext -import logging - -localization = { - 'en': { - 'name' : "English", - 'emoji' : "🇬🇧", - 'welcome' : "Welcome to PILuAnimeBot!", - 'sauce' : "Sauce 🍝", - 'no_caption' : "No caption detected.", - 'lewd_toggle' : "Lewd content was {} for this chat.", - 'enabled' : "enabled", - 'disabled' : "disabled", - 'unknown' : "Sorry, I didn't understand that command.", - 'error': "An error has occurred. Please retry.", - 'failed_effect': "Couldn't apply effect.", - 'not_enough_cash': "You don't have enough money! Type /cash to reset it.", - 'you_lost': "You lost...", - 'you_won': "You won {:0.2f}$!", - 'cash_result': " Your money: {:0.2f}$.", - 'reroll': "Reroll (-{:0.2f}$)", - 'summary': "You bet {}$ and won a total of {}$.", - 'current_bet': "{}, your current bet is {:0.2f}$.", - 'current_cash': "{}, you currently have {:0.2f}$ in your account.", - 'cash_reset': "{}, your cash has been reset to {:0.2f}$. You can do this once per day.", - 'cash_reset_fail': "{}, you have {:0.2f}$ in your account and you cannot reset it today. Come back tomorrow.", - 'no_autospin': "Sorry, multiple spins are disabled in group chats.", - 'current_language': "Current language: {}.\nChoices: {}\nTo change it, type \"/lang <code>\" or use one of the buttons below.", - 'invalid_language': "Invalid language.", - 'language_set': "Language set: {}", - 'none_callback': "This button does nothing.", - 'fast_output': "Win: {:0.2f}$", - 'repeat_autospin': "Spin {} times again (-{:0.2f}$)" - }, - 'it': { - 'name': "Italiano", - 'emoji' : "🇮🇹", - 'welcome' : "Benvenuto da PILuAnimeBot!", - 'sauce' : "Salsa 🍝", - 'no_caption' : "Scrivi un testo per favore.", - 'lewd_toggle' : "La roba lewd è stata {} per questa chat.", - 'enabled' : "abilitata", - 'disabled' : "disabilitata", - 'unknown' : "Non ho capito.", - 'error': "Qualcosa è andato storto, riprova.", - 'failed_effect': "Impossibile applicare l'effetto.", - 'not_enough_cash': "Saldo insufficiente! Usa /cash per ripristinarlo.", - 'you_lost': "Hai perso...", - 'you_won': "Hai vinto {:0.2f}€!", - 'cash_result': " Saldo: {:0.2f}€.", - 'reroll': "Riprova (-{:0.2f}€)", - 'summary': "Hai giocato {}€ e vinto un totale di {}€.", - 'current_bet': "{}, il tuo bet attuale è {:0.2f}€.", - 'current_cash': "{}, il tuo saldo attuale è {:0.2f}€.", - 'cash_reset': "{}, il tuo saldo è stato ripristinato a {:0.2f}€. Puoi farlo una volta al giorno.", - 'cash_reset_fail': "{}, il tuo saldo è {:0.2f}€ e non puoi più resettarlo oggi. Riprova domani.", - 'no_autospin': "Gli spin multipli sono disabilitati nelle chat di gruppo.", - 'current_language': "Lingua attuale: {}.\nAltre lingue: {}\nPer cambiarla, scrivi \"/lang <codice>\" o usa uno dei tasti qui sotto.", - 'invalid_language': "Questa lingua non esiste.", - 'language_set': "Lingua impostata: {}", - 'none_callback': "Questo tasto non fa nulla.", - 'fast_output': "Vinto: {:0.2f}€", - 'repeat_autospin': "Altri {} spin (-{:0.2f}€)" - }, -} -langs = localization.keys() -default_lang = "en" - -n_entries = len(localization[default_lang].keys()) - -#markup = InlineKeyboardMarkup([[button, button][button, button]]) - -def format_lang(lang: str): - try: - return f"{localization[lang]['name']} {localization[lang]['emoji']}" - except KeyError: - return 'Unknown' - -buttons = [] -for i in langs: - assert(n_entries == len(localization[i].keys())) - buttons.append(InlineKeyboardButton(text=format_lang(i), callback_data=f"set_lang_{i}")) - -N = 2 -lang_markup = InlineKeyboardMarkup([buttons[n:n+N] for n in range(0, len(buttons), N)]) - -def format_author(user: User): - if user.username is not None: - return user.full_name + f" ({user.username})" - return user.full_name - -def format_chat(chat: Chat): - return chat.title + ("" if chat.username is None else f" ({chat.username})") - -def get_lang(context: CallbackContext): - try: - return context.chat_data["lang"] - except KeyError: - context.chat_data["lang"] = default_lang - return default_lang - -def get_localized_string(text:str, context:CallbackContext): - lang = get_lang(context) - - try: - return localization[lang][text] - except KeyError: - logging.error("No text was found.") - return "localization error {}{}{}{}{}{}" - slot_machine_value = { 1: (3, "bar"), 2: (2, "bar"),
M python_meme_bot/effects.pypython_meme_bot/effects/functions.py

@@ -28,17 +28,6 @@ font_size = (math.sqrt(4 * k1 * k2 * h * n * w + math.pow(k2, 2) * math.pow(n, 2) * math.pow(letter_spacing, 2) + ((2 * k1 * k2 * k3 - 2 * k4 * math.pow(k2, 2)) * math.pow(n, 2) - 2 * k1 * k2 * line_spacing) * letter_spacing + math.pow(k1, 2) * math.pow(n, 2) * math.pow(line_spacing, 2) + (2 * k1 * k4 * k2 - 2 * math.pow(k1, 2) * k3) * math.pow(n, 2) * line_spacing + (math.pow(k1, 2) * math.pow(k3, 2) - 2 * k1 * k4 * k2 * k3 + math.pow(k4, 2) * math.pow(k2, 2)) * math.pow(n, 2)) - k2 * n * letter_spacing - k1 * n * line_spacing + (k1 * k3 + k4 * k2) * n) / (2 * k1 * k2 * n)

line_width = w / (k1 * font_size - k4 + letter_spacing) return font_size, line_width -def img_to_bio(image): - - bio = BytesIO() - - if image.mode in ("RGBA", "P"): - image = image.convert("RGB") - - image.save(bio, 'JPEG') - bio.seek(0) - return bio - def _darken_image(image: Image, amount=0.5): return Brightness(image).enhance(amount)

@@ -48,7 +37,7 @@ for i in range(len(line)):

d.text((x, y), line[i], fill=fill, stroke_width=stroke_width, font=font, stroke_fill=stroke_fill) x += font.getlength(line[i]) + letter_spacing -def _draw_tt_bt(text: str, img: Image, bottom=False): +def _draw_ttbt(text: str, img: Image, bottom=False): LETTER_SPACING = 9 LINE_SPACING = 10

@@ -86,6 +75,17 @@ _draw_line(d, x, y, line, font, LETTER_SPACING, FILL, STROKE_WIDTH, STROKE_FILL)

y += (txt_height + LINE_SPACING) * factor +def img_to_bio(image): + + bio = BytesIO() + + if image.mode in ("RGBA", "P"): + image = image.convert("RGB") + + image.save(bio, 'JPEG') + bio.seek(0) + return bio + def bt_effect(text: str, img: Image): lines = [x for x in text.split("\n") if x]

@@ -96,7 +96,7 @@

if bt is None: return img - _draw_tt_bt(bt, img, bottom=True) + _draw_ttbt(bt, img, bottom=True) img = img.resize((int(BASE_WIDTH / 2), int(float(img.size[1]) * (BASE_WIDTH / 2) / img.size[0])))

@@ -105,7 +105,7 @@ img = img.convert("RGB")

return img_to_bio(img) -def tt_bt_effect(text: str, img: Image): +def ttbt_effect(text: str, img: Image): lines = [x for x in text.split("\n") if x]

@@ -118,9 +118,9 @@ if tt is None and bt is None:

return img if (tt is not None): - _draw_tt_bt(tt, img) + _draw_ttbt(tt, img) if (bt is not None): - _draw_tt_bt(bt, img, bottom=True) + _draw_ttbt(bt, img, bottom=True) img = img.resize((int(BASE_WIDTH / 2), int(float(img.size[1]) * (BASE_WIDTH / 2) / img.size[0])))
A python_meme_bot/effects/__init__.py

@@ -0,0 +1,16 @@

+ +from typing import Callable, Dict, Tuple +from .functions import ttbt_effect, bt_effect, splash_effect, wot_effect, text_effect +from .checks import ttbt_check, tt_check, splash_check, wot_check + +HandlerTuple = Tuple[Callable, Callable] + +effectsDict: Dict[str, HandlerTuple] = { + # "effect_name": (effect_check, effect_fn) + "ttbt": (ttbt_check, ttbt_effect), + "tt": (tt_check, ttbt_effect), + "bt": (tt_check, bt_effect), + "splash": (splash_check, splash_effect), + "wot": (wot_check, wot_effect), + "text": (wot_check, text_effect), +}
A python_meme_bot/effects/checks.py

@@ -0,0 +1,55 @@

+ +def tt_check(info): + reply = info['reply']['text'] + content = info['content']['text'] + + input_text = f"{reply} {content}".replace("\n", " ") + + if input_text.strip() == "": + return None + + return input_text + + +def ttbt_check(info): + reply = info['reply']['text'].strip() + content = info['content']['text'].strip() + + if len(content.split("\n")) > 1: + input_text = content + else: + input_text = f"{reply}\n{content}" + + if input_text.strip() == "": + return None + + return input_text + + +def splash_check(info): + reply = info['reply']['text'] + content = info['content']['text'] + + if content.strip() == "": + author = info['reply']['author'] + input_text = f"{author}\n{reply}" + else: + author = info['content']['author'] + input_text = f"{author}\n{content}" + + if len(input_text.strip().split("\n")) < 2: + return None + + return input_text + + +def wot_check(info): + reply = info['reply']['text'] + content = info['content']['text'] + + input_text = f"{reply}\n{content}" + + if input_text.strip() == "": + return None + + return input_text
A python_meme_bot/localization.py

@@ -0,0 +1,99 @@

+import logging +from telegram import InlineKeyboardButton, InlineKeyboardMarkup +from telegram.ext import ContextTypes + +localization = { + 'en': { + 'name' : "English", + 'emoji' : "🇬🇧", + 'welcome' : "Welcome to PILuAnimeBot!", + 'sauce' : "Sauce 🍝", + 'no_caption' : "No caption detected.", + 'lewd_toggle' : "Lewd content was {} for this chat.", + 'enabled' : "enabled", + 'disabled' : "disabled", + 'unknown' : "Sorry, I didn't understand that command.", + 'error': "An error has occurred. Please retry.", + 'failed_effect': "Couldn't apply effect.", + 'not_enough_cash': "You don't have enough money! Type /cash to reset it.", + 'you_lost': "You lost...", + 'you_won': "You won {:0.2f}$!", + 'cash_result': " Your money: {:0.2f}$.", + 'reroll': "Reroll (-{:0.2f}$)", + 'summary': "You bet {}$ and won a total of {}$.", + 'current_bet': "{}, your current bet is {:0.2f}$.", + 'current_cash': "{}, you currently have {:0.2f}$ in your account.", + 'cash_reset': "{}, your cash has been reset to {:0.2f}$. You can do this once per day.", + 'cash_reset_fail': "{}, you have {:0.2f}$ in your account and you cannot reset it today. Come back tomorrow.", + 'no_autospin': "Sorry, multiple spins are disabled in group chats.", + 'current_language': "Current language: {}.\nChoices: {}\nTo change it, type \"/lang <code>\" or use one of the buttons below.", + 'invalid_language': "Invalid language.", + 'language_set': "Language set: {}", + 'none_callback': "This button does nothing.", + 'fast_output': "Win: {:0.2f}$", + 'repeat_autospin': "Spin {} times again (-{:0.2f}$)" + }, + 'it': { + 'name': "Italiano", + 'emoji' : "🇮🇹", + 'welcome' : "Benvenuto da PILuAnimeBot!", + 'sauce' : "Salsa 🍝", + 'no_caption' : "Scrivi un testo per favore.", + 'lewd_toggle' : "La roba lewd è stata {} per questa chat.", + 'enabled' : "abilitata", + 'disabled' : "disabilitata", + 'unknown' : "Non ho capito.", + 'error': "Qualcosa è andato storto, riprova.", + 'failed_effect': "Impossibile applicare l'effetto.", + 'not_enough_cash': "Saldo insufficiente! Usa /cash per ripristinarlo.", + 'you_lost': "Hai perso...", + 'you_won': "Hai vinto {:0.2f}€!", + 'cash_result': " Saldo: {:0.2f}€.", + 'reroll': "Riprova (-{:0.2f}€)", + 'summary': "Hai giocato {}€ e vinto un totale di {}€.", + 'current_bet': "{}, il tuo bet attuale è {:0.2f}€.", + 'current_cash': "{}, il tuo saldo attuale è {:0.2f}€.", + 'cash_reset': "{}, il tuo saldo è stato ripristinato a {:0.2f}€. Puoi farlo una volta al giorno.", + 'cash_reset_fail': "{}, il tuo saldo è {:0.2f}€ e non puoi più resettarlo oggi. Riprova domani.", + 'no_autospin': "Gli spin multipli sono disabilitati nelle chat di gruppo.", + 'current_language': "Lingua attuale: {}.\nAltre lingue: {}\nPer cambiarla, scrivi \"/lang <codice>\" o usa uno dei tasti qui sotto.", + 'invalid_language': "Questa lingua non esiste.", + 'language_set': "Lingua impostata: {}", + 'none_callback': "Questo tasto non fa nulla.", + 'fast_output': "Vinto: {:0.2f}€", + 'repeat_autospin': "Altri {} spin (-{:0.2f}€)" + }, +} +langs = localization.keys() +default_lang = "en" + +def get_lang(context: ContextTypes.DEFAULT_TYPE): + try: + return context.chat_data["lang"] + except KeyError: + context.chat_data["lang"] = default_lang + return default_lang + +def get_localized_string(text:str, context: ContextTypes.DEFAULT_TYPE): + lang = get_lang(context) + + try: + return localization[lang][text] + except KeyError: + logging.error("No text was found.") + return "localization error {}{}{}{}{}{}" + +def format_lang(lang: str): + try: + return f"{localization[lang]['name']} {localization[lang]['emoji']}" + except KeyError: + return 'Unknown' + +_n_entries = len(localization[default_lang].keys()) +_buttons = [] +for i in langs: + assert(_n_entries == len(localization[i].keys())) + _buttons.append(InlineKeyboardButton(text=format_lang(i), callback_data=f"set_lang_{i}")) + +N = 2 +lang_markup = InlineKeyboardMarkup([_buttons[n:n+N] for n in range(0, len(_buttons), N)])
M python_meme_bot/slot.pypython_meme_bot/slot.py

@@ -1,7 +1,10 @@

from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton from telegram.ext import CallbackContext from datetime import date -from .constants import format_author, slot_machine_value, win_table, get_localized_string as l + +from .constants import slot_machine_value, win_table +from .utils import format_author +from .localization import get_localized_string as l autospin_cap = 20 lastreset_default = date(1970, 1, 1)

@@ -94,23 +97,20 @@

async def show_result(context: CallbackContext): con = context.job.data - return await context.bot.send_message(chat_id=con["chat_id"], text=con["text"], reply_markup=con["reply_markup"], - disable_notification=True) + await context.bot.send_message(chat_id=con["chat_id"], text=con["text"], reply_markup=con["reply_markup"], disable_notification=True) async def spin(update: Update, context: CallbackContext): - bet = get_bet(context) / 100 - amount = read_arg(context, 1) if amount > 1 and update.effective_chat.type != 'private': amount = 1 - await context.bot.send_message(chat_id=update.effective_chat.id, text=l("no_autospin", context)) + await update.message.reply_text(l("no_autospin", context)) if amount == 1: - return await _spin(context=context, id=update.effective_chat.id) - else: - return await autospin(context=context, id=update.effective_chat.id, amount=amount) + await _spin(context=context, id=update.effective_chat.id) + return + await autospin(context=context, id=update.effective_chat.id, amount=amount) async def autospin(context: CallbackContext, id: int, amount: int):

@@ -132,7 +132,7 @@

result = l("summary", context).format(count * bet, total_win / 100) markup = InlineKeyboardMarkup([[InlineKeyboardButton(text="Altri {} spin (-{}€)".format(amount, bet * amount), callback_data=f"reroll {amount}")]]) - return await context.bot.send_message(chat_id=id, text=result, reply_markup=markup, disable_notification=False) + await context.bot.send_message(chat_id=id, text=result, reply_markup=markup, disable_notification=False) async def bet(update: Update, context: CallbackContext):

@@ -144,10 +144,10 @@ else:

bet = get_bet(context) result = l("current_bet", context).format(format_author(update.effective_user), bet / 100) - return await context.bot.send_message(chat_id=update.effective_chat.id, text=result) + await update.message.reply_text(result) -def cash(update: Update, context: CallbackContext): +async def cash(update: Update, context: CallbackContext): cash = get_cash(context) / 100 if cash < cash_default / 200:

@@ -165,7 +165,7 @@ return update.message.reply_text(

text=l("cash_reset_fail", context).format(format_author(update.effective_user), cash)) result = l("current_cash", context).format(format_author(update.effective_user), cash) - return update.message.reply_text(text=result) + await update.message.reply_text(text=result) def get_multiplier(value: int):
A python_meme_bot/utils.py

@@ -0,0 +1,131 @@

+from io import BytesIO +import logging + +from telegram import Chat, InlineKeyboardButton, InlineKeyboardMarkup, Message, Update, User +from telegram.ext import ContextTypes +from telegram.error import TelegramError +from PIL import Image + +from python_meme_bot.api import get_random_image + +def format_author(user: User): + if user.username is not None: + return user.full_name + f" ({user.username})" + return user.full_name + +def format_chat(chat: Chat): + return chat.title + ("" if chat.username is None else f" ({chat.username})") + +def _get_lewd(context: ContextTypes.DEFAULT_TYPE): + try: + return context.chat_data["lewd"] + except KeyError: + return False + +def _get_author(message: Message): + origin = message.forward_origin + + if origin is None: # message was not forwarded + return format_author(message.from_user) + + try: + return format_author(origin['sender_user']) # MessageOriginUser + except KeyError: + pass + + try: + return origin['sender_user_name'] # MessageOriginHiddenUser + except KeyError: + pass + + try: + return format_chat(origin['sender_chat']) # MessageOriginChat + except KeyError: + pass + try: + return format_chat(origin['chat']) # MessageOriginChannel + except KeyError: + pass + + logging.warn("Message was forwarded but I couldn't detect the original author.") + return format_author(message.from_user) + +async def get_message_content(message: Message, fallback: str = ""): + if message is None: + return None, fallback, None + + image = None + if len(message.photo) > 0: + p = message.photo[-1] + i = await p.get_file() + d = await i.download_as_bytearray() + image = Image.open(BytesIO(d)) + + content = "" + if message.text is not None: + content = message.text.strip() + elif message.caption is not None: + content = message.caption.strip() + + lines = content.split("\n") + r = lines[0].split(" ") + + try: + if r[0][0] == '/': + r.pop(0) + except IndexError: + pass + + lines[0] = " ".join(r) + content = "\n".join(lines) + + return image, content, _get_author(message) + +def get_image(context: ContextTypes.DEFAULT_TYPE): + if context is not None: + image, url = get_random_image(_get_lewd(context)) + + if image is None: + logging.warning("Getting Image failed") + raise TelegramError("bad image") + + markup = InlineKeyboardMarkup([[InlineKeyboardButton(text="Sauce 🍝", url=url)]]) + return image, markup + +async def get_all(update: Update, check_fn, context: ContextTypes.DEFAULT_TYPE): + image_reply, text_reply, author_reply = await get_message_content(update.message.reply_to_message) + image_content, text_content, author_content = await get_message_content(update.message) + + info_struct = { + "reply": { + "author": author_reply, + "text": text_reply, + "image": image_reply + }, + "content": { + "author": author_content, + "text": text_content, + "image": image_content + } + } + + logging.info(f"User {update.message.from_user.full_name}{f' (@{update.message.from_user.username})' if update.message.from_user.username is not None else ''} typed: {str(update.message.text)}") + + content = check_fn(info_struct) + + if content is None: + return None, None, None + + markup = "" + image = None + + if image_reply is not None: + image = image_reply + + if image_content is not None: + image = image_content + + if image is None: + image, markup = get_image(context) + + return content, image, markup